Модули Task и :gen_tcp

В этой главе мы изучим, как использовать модуль Эрланга :gen_tcp для обработки запросов. При этом мы получим возможность посмотреть также модуль Эликсира Task. В дальнейших главах мы расширим наш сервер так, чтобы он действительно мог обрабатывать команды.

Эхо-сервер

Начнём разработку TCP-сервера с реализации эхо-сервера. Он будет посылать в ответ тот текст, который он получил в запросе. Будем постепенно улучшать наш сервер, пока он не будет управляться супервизором и будет готов к работе с множеством подключений.

TCP-сервер, грубо говоря, делает следующие шаги:

  1. Слушает порт, пока порт доступен и удерживает сокет.
  2. Ждёт подключение клиента на этом порту и принимает его.
  3. Читает запросы клиента и отправляет ответы.

Давайте реализуем эти шаги. Перейдите к приложению apps/kv_server, откройте файл lib/kv_server.ix и добавьте следующие функции:

require Logger

def accept(port) do
  # Опции ниже означают:
  #
  # 1. `:binary` – принимает данные в двоичном виде (вместо списков)
  # 2. `packet: :line` – принимает данные построчно
  # 3. `active: false` – блокирует функцию `:gen_tcp.recv/2`, пока данные не будут доступны
  # 4. `reuseaddr: true` – разрешает повторно использовать адрес, если слушатель отвалился с ошибкой
  #
  {:ok, socket} = :gen_tcp.listen(port,
                    [:binary, packet: :line, active: false, reuseaddr: true])
  Logger.info "Accepting connections on port #{port}"
  loop_acceptor(socket)
end

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

defp serve(socket) do
  socket
  |> read_line()
  |> write_line(socket)

  serve(socket)
end

defp read_line(socket) do
  {:ok, data} = :gen_tcp.recv(socket, 0)
  data
end

defp write_line(line, socket) do
  :gen_tcp.send(socket, line)
end

Запускаем сервер, вызвав KVServer.accept(4040), где 4040 – это порт. Первый шаг в функции accept/1 – слушать порт, пока сокет не станет доступен и затем вызвать функцию loop_acceptor/1. Функция loop_acceptor/1 – цикл, принимающий подключения клиентов. Для каждого принятого подключения мы вызываем функцию serve/1.

Функция serve/1 – другой цикл, который читает строки из сокета и пишет эти строки обратно в сокет. Обратите внимание, что функция serve/1 использует пайп-оператор |> для выполнения этого потока операций. Оператор конвейера выполняет левую часть и передаёт её результат в качестве первого аргумента в функцию в правой части. Пример выше:

socket |> read_line() |> write_line(socket)

эквивалентен этому:

write_line(read_line(socket), socket)

Функция read_line/1 получает данные из сокета, используя функцию :gen_tcp.recv/2, и функция write_line/2 пишет в сокет, используя функцию :gen_tcp.send/2.

Обратите внимание, что функция serve/1 – это бесконечный цикл, вызываемый последовательно внутри функции loop_acceptor/1, поэтому её конечный вызов никогда не будет достигнут и его можно опустить. Однако, как мы увидим, нам нужно будет выполнять функцию serve/1 в отдельном процессе, поэтому нам скоро понадобится этот конечный вызов.

Это всё, что нам нужно для реализации нашего эхо-сервера. Давайте попробуем его в деле!

Запустите IEx-сессию внутри приложения kv_server командой iex -S mix. В консоли выполните:

iex> KVServer.accept(4040)

Сервер теперь запущен, и вы можете увидеть, что консоль заблокирована. Давайте используем Телнет-клиент для доступа к этому серверу. Такие клиенты доступны для многих операционных систем, а их команды обычно похожи:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

Введите «hello», нажмите Enter, и вы получите «hello» в ответ. Прекрасно!

Телнет-клиент можно закрыть, нажав ctrl + ], набрав следом quit, и нажав <Enter>, либо другой последовательностью действий, предусмотренной вашим клиентом.

После закрытия Телнет-клиента, вы скорее всего увидите ошибку в IEx-сессии:

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:45: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:37: KVServer.serve/1
    (kv_server) lib/kv_server.ex:30: KVServer.loop_acceptor/1

Это происходит, потому что мы ждём данные от функции :gen_tcp.recv/2, но клиент закрывает соединение. Нам нужно лучше обрабатывать подобные сценарии в будущих версиях нашего сервера.

Пока у нас есть более важные проблемы, которые нужно решить: что произойдёт, если наш TCP-приёмник соединений упадёт? Т. к. у нас нет супервизора, сервер умрёт и мы не сможем больше обрабатывать запросы, потому что он не перезапустится. Поэтому нам необходимо поместить наш сервер в дерево супервизора.

Задачи

Мы изучили агенты, генсерверы и супервизоры. Они все работают с множеством сообщений или управляют состоянием. Но что делать, если нам нужно всего лишь выполнить какую-то одну задачу?

Модуль Task предоставляет именно такую функциональность. Например, в нём есть фукнция start_link/3, которая принимает модуль, функцию и аргументы, позволяя запустить переданную фукнцию как часть дерева супервизора.

Давайте попробуем на практике. Откройте файл lib/kv_server/application.ex и измените супервизор в функции start/2 как показано ниже:

  def start(_type, _args) do
    children = [
      {Task, fn -> KVServer.accept(4040) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

Этим изменением мы говорим, что хотим запустить KVServer.accept(4040) как задачу. Мы задали порт прямо в коде, но он может быть изменён несколькими способами, например, его можно взять из системного окружения при запуске приложения:

port = String.to_integer(System.get_env("PORT") || raise "missing $PORT environment variable")
# ...
{Task, fn -> KVServer.accept(port) end}

Теперь сервер является частью дерева супервизора и должен запуститься автоматически при запуске приложения. Выполните команду mix run --no-halt в консоли и снова используйте Телнет-клиент, чтобы убедиться, что всё работает:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

Да, всё отлично! Однако, может ли это решение масштабироваться?

Попробуйте подключить два Телнет-клиента одновременно. Когда вы сделаете это, вы увидите, что второй клиент не отвечает:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

Не похоже, что он вообще работает. Это происходит потому, что мы обрабатываем запросы в том же процессе, в котором принимаем подключения. Когда один клиент подключен, мы не можем подключить другой клиент.

Супервизор задач

Чтобы наш сервер мог обрабатывать множество подключений, нам нужно сделать так, чтобы один процесс принимал подключения и порождал другие процессы, которые обрабатывают запросы. Одно из решений – изменить код:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

Используя функцию Task.start_link/1, которая похожа на функцию Task.start_link/3, но принимает анонимную функцию вместо модуля, функции и аргументов:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

Мы запускаем связанную задачу прямо из процесса-приёмника. Но мы уже делали такую ошибку раньше. Вы помните?

Это такая же ошибка, как в случае с вызовом функции KV.Bucket.start_link/1 прямо из реестра. Там падение любой корзины приводило к падению всего реестра.

Код выше имеет такую же проблему: если мы связываем задачу serve(client) с приёмником, падение обработки запроса приведёт к падению приёмника и всех других подключений.

Для реестра мы решили эту проблему, используя супервизор. Мы придержимся этой же тактики здесь, т. к. этот шаблон настолько распространён для задач, что модуль Task уже содержит решение: простой супервизор «one for one», который запускает временные задачи как часть дерева супервизора.

Давайте изменим функцию start/2 ещё раз, добавив супервизор в дерево:

  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      {Task, fn -> KVServer.accept(4040) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

Сейчас мы запустим процесс Task.Supervisor с именем KVServer.TaskSupervisor. Помните, т. к. задача приёмника зависит от нашего супервизора, супервизор должен быть запущен первым.

Теперь нам нужно изменить функцию loop_acceptor/1 с использованием модуля Task.Supervisor для обработки каждого запроса:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid)
  loop_acceptor(socket)
end

Вы можете заметить, что мы добавили строку :ok = :gen_tcp.controlling_process(client, pid). Это сделает процесс-потомок «контроллирующим процессом» для сокета client. Если бы мы не сделали это, приёмник при падении отключил бы всех клиентов, потому что сокеты были бы связаны с процессом, который принимает их (это стандартное поведение).

Запустите новый сервер с помощью PORT=4040 mix run --no-halt, а следом откройте несколько Телнет-клиентов параллельно. Вы сможете убедиться, что отключение клиента не приводит к отключению приёмника. Прекрасно!

Вот полная реализация эхо-сервера:

defmodule KVServer do
  require Logger

  @doc """
  Начинает принимать подключения на определённом порту `port`.
  """
  def accept(port) do
    {:ok, socket} = :gen_tcp.listen(port,
                      [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

Мы изменили спецификацию супервизора, теперь нужно спросить: является ли используемая стратегия супервизора всё ещё правильной?

В данном случае ответ «да»: если приёмник падает, нет необходимости разрушать все существующие подключения. С другой стороны, если задача супервизора падает, также нет нужды отключать приёмник.

Однако, осталась ещё одна проблема: стратегия перезапуска. Задачи, по умолчанию, имеют в поле :restart значение :temporary, то есть они не будут перезапущены. Это отличный вариант для соединений, запущенных через Task.Supervisor, т. к. нет смысла перезапускать упавшее подключение, но это плохой выбор для приёмника. Если он упадёт, мы хотим снова его запустить.

Мы можем исправить это, определив в модуле вызов use Task, restart: :permanent и назначить функцию start_link ответственной за перезапуск, по аналогии с модулями Agent и GenServer. Однако, давайте поступим другим образом в данном случае. При интеграции с чужой библиотекой, мы не сможем изменить там определение агентов, задач и серверов. В этом случае нам нужно иметь возможность изменить спецификацию их потомков динамически. Это можно сделать с помощью Supervisor.child_spec/2, функции, с которой мы познакомились в предыдущих главах. Давайте перепишем start/2 в KVServer.Application ещё раз:

  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      Supervisor.child_spec({Task, fn -> KVServer.accept(4040) end}, restart: :permanent)
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

Функция Supervisor.child_spec/2 может собирать спецификации потомков из переданного модуля и/или кортежа, и также принимает значения, которые переопределяют существующую спецификацию потомков. Теперь у нас есть всегда запущенный приёмник, который запускает временные процессы задач через всегда запущенный супервизор задач.

В следующей главе мы сделаем парсер для запросов клиентов и отправку ответов на них, и доделаем наш сервер до конца.

© 2020 / Россия Любые мысли и вопросы пишите на elixir@wunsh.ru.