Балуемся с сокетами и процессами в Эликсире
Не так давно я работал над приложением-демоном без какой-либо веб-части. Оно функционировало в качестве хранилища поступающих в реальном времени данных местоположения транспортных средств. Данные передавались через обычный порт TCP. Конечно, мне доводилось реализовывать нечто подобное и раньше, но это было достаточно давно, и с Эликсиром я прежде не сталкивался.
Механизм работы порта вполне стандартный:
- Прослушивание порта.
- Установление подключения.
- Получение данных.
- Передача данных.
Это обычная схема действия TCP-протокола. Благодаря виртуальной машине BEAM, поверх которой работает Эликсир, можно не беспокоиться, что какие-либо из этих задач будут выполняться одновременно или наложатся друг на друга. Мы можем:
- Создать один процесс для прослушивания порта.
- Создать второй процесс для установления связи.
- Поручить третьему процессу приём и обработку данных.
- Передать данные четвёртому процессу.
Теперь, когда каждый шаг помещён в отдельный процесс, наложения исключены, даже если использовать синхронные функции.
Модуль :gen_tcp
Для получения доступа к неструктурированным TCP-сокетам воспользуемся gen_tcp
. Это модуль на Эрланге, потребность в функциональности которого всегда была актуальна. Вы можете говорить, что работаете с ним на Эликсире, потому что получаете доступ к функциям напрямую от атома.
Первый шаг – прослушивание порта – довольно просто реализуется с помощью функции listen/2
:
defmodule Receiver do
require Logger
def start(port) do
spawn fn ->
case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do
{:ok, socket} ->
Logger.info("Connected.")
accept_connection(socket) # <--- We'll handle this next.
{:error, reason} ->
Logger.error("Could not listen: #{reason}")
end
end
end
end
Параметр active: false
означает, что необходимо запросить данные у :gen_tcp
. Если бы на том месте стояло true
, то следующим действием был бы приём входящих данных в виде сообщений в почтовом ящике. Это, безусловно, полезная функция, но сейчас она только добавит приложению излишней сложности. Поэтому оставляем active: false
.
Обернём всё тело функции :gen_tcp.listen/2
в spawn/1
, чтобы вызвать её в отдельном процессе и не допустить блокировки других процессов. Понятно, что нужно остановить listen/2
, но при этом всё остальное должно продолжать работать.
Установление соединения
Итак, сокет прослушивается на наличие подключений. Как только появится хоть одно, необходимо будет считать данные. Но ведь хотелось бы обрабатывать больше одного подключения за раз… Не проблема, создадим ещё несколько процессов!
Давайте пропишем функцию accept_connection
для установления подключения и получения читабельных данных.
def accept_connection(socket) do
{:ok, client} = :gen_tcp.accept(socket)
spawn fn ->
{:ok, buffer_pid} = Buffer.create() # <--- this is next
Process.flag(:trap_exit, true)
serve(client, buffer_pid) # <--- and then we'll cover this
end
loop_accept(socket)
end
Функция :gen_tcp.accept/1
находится в ожидании подключения. Как только оно появляется, запускается новый процесс, который создаёт Buffer
для данного подключения и находится в ожидании передачи данных (функция serve
). Её мы реализуем позже.
Вы спросите, что это ещё за Buffer
? Это модуль, описывающий строковый буфер, в задачи которого входит хранение полученных данных в оперативной памяти, их анализ и передача в приёмник. Подробно рассматривать его не будем, остановимся лишь на управлении его процессом.
Продолжаем разбираться
Внутри функции spawn
мы создаём Buffer
для каждого подключения к сокету и отслеживаем этот процесс. Процессы Эрланга (и, соответственно, Эликсира) являются легковесными, и предполагается, что их очень просто создавать и перезапускать.
Существует два основных способа наблюдения за процессами:
spawn
– порождает процесс и не отслеживает его статус.spawn_link
– порождает новый процесс, привязанный к текущему. Связь между ними означает, что если один процесс упадёт, упадёт и другой. Полезная функция, помогающая убедиться в том, что состояния процессов не болтаются где-то в памяти без дела.
Важно то, что обе функции никак не отслеживают жизненный цикл процесса. К счастью, у этой проблемы есть несколько доступных решений.
-
С помощью функции
Process.monitor/1
можно отслеживать абсолютно любой процесс, если известен егоpid
. Если процесс упадёт, мы получим сообщение:DOWN
. Лучше всего работает с процессами, порождёнными через spawn. -
Можно отлавливать сообщения о непреднамеренном завершении процессов с помощью функции
Process.flag(:trap_exit, true)
, как в примере выше. Работает только с процессами, порождёнными черезspawn_link
. Если дочерний процесс упал, то его предок получит сообщение:EXIT
.
Как вы могли заметить, именно второй вариант был использован при порождении процесса-слушателя. Если процесс с модулем Buffer
упадёт, то мы об этом узнаем.
Связываемся и слушаем
Итак, наше приложение уже умеет слушать сокет и устанавливать с ним связь, а также отслеживать падение дочерних процессов и складывать полученные данные в Buffer
. Как же, собственно, получать эти данные? Раз уж мы выбрали вариант прослушки порта с active: false
, то данные придётся запросить. Для этого воспользуемся функцией :gen_tcp.recv/2
.
def serve(socket, buffer_pid) do
case :gen_tcp.recv(socket, 0) do
{:ok, data} ->
buffer_pid = maybe_recreate_buffer(buffer_pid) # <-- coming up next
Buffer.receive(buffer_pid, data)
serve(socket, buffer_pid)
{:error, reason} ->
Logger.info("Socket terminating: #{inspect reason}")
end
end
Теперь процесс находится в вечном ожидании данных. Получив их, он убедится, что Buffer
работает (или запустит новый), а затем передаст ему данные и продолжит ждать. Это единственный возможный сценарий, потому что получение данных происходит в отдельном процессе.
И последняя часть головоломки – определить, запущен ли Buffer
-процесс. Как было отмечено выше, если он упал, то будет получено сообщение :EXIT
, а значит, нужно проверить почтовый ящик.
defp maybe_recreate_buffer(original_pid) do
receive do
{:EXIT, ^original_pid, _reason} ->
{:ok, new_buffer_pid} = Buffer.create()
new_buffer_pid
after
10 ->
original_pid
end
end
Если там уже находится это сообщение (или оно будет получено в ближайшие 10 мс), следует создать новый Buffer
. В противном случае никаких действий не требуется.
Без Buffer
невозможно создать работающий канал передачи данных функциям, которые проделывают с ними основную работу. Если Buffer
-процесс вдруг упадёт, он будет немедленно создан заново. Функция maybe_recreate_buffer/1
делает это максимально прозрачно для функции serve/2
.
Подводим итоги
Мы создали простенький сервер на сокетах, работающий со строчно-ориентированными протоколами и передающий сформированные строки в парсер. Buffer
-процесс – прост в реализации, а вероятность его непреднамеренного завершения стремится к нулю. Даже если это и произойдёт, проблема будет тут же решена. Мы рассмотрели, каким образом процессы связаны между собой и как они могут отслеживать состояние друг друга.
Важно отметить, что статья представляет собой исследование взаимодействия с сокетами и процессами. Не исключено, что существует более устойчивое к сбоям решение с использование супервизоров, но в рамках данной статьи оно не представляет интереса.