Параллельное программирование на Elixir
Elixir предоставляет разработчикам простые инструменты для написания конкурентного кода. В одной из предыдущих статей была освещена конкурентная модель Elixir через призму основных структурных элементов конкурентности. В Elixir процессы подчиняются конкурентной модели акторов и являются фундаментом, на основе которого другие процессы обмениваются между собой сообщениями.
При реализации типовых задач вы наверняка уже сталкивались с необходимостью осуществления значительного количества стандартных действий, таких как отслеживание состояния внутри процесса или выполнение блокирующего вызова и ожидание ответа.
На этот счёт Elixir предоставляет ряд общепринятых абстракций, ещё более облегчающих написание конкурентного кода. В данной статье будут рассмотрены три таких абстракции: Task, Agent и GenServer.
Модуль Task
Модуль Task
упрощает работу с конкурентными процессами еще больше. В нём наиболее примечательны два метода: async
— начать асинхронное выполнение операций в другом процессе и await
— ждать завершения операции и возвратить результат.
Для иллюстрации этого рассмотрим простейший пример, а затем выясним, каким образом он поможет ускорить работу map
через распараллеливание.
task = Task.async(fn -> 5 + 5 end)
Команда в строке выше запускает анонимную функцию в отдельном процессе. Если на стадии разработки у вас под рукой имеется ссылка на задачу, вы сможете получить доступ к результату данной анонимной функции. Функция async
возвращает не что иное, как ссылку на необходимый для получения результата PID процесса, в котором выполняется данная задача:
%Task{owner: #PID<0.80.0>, pid: #PID<0.82.0>, ref: #Reference<0.0.7.207>}
Давайте взглянем на этот PID
, чтобы понять, что происходит с текущим процессом:
Process.info(task.pid)
nil
Process.alive?(task.pid)
false
Как можно заметить, такого процесса больше не существует; процесс завершается сразу после того, как модуль Task завершит работу. Как же тогда получить от него отклик? В этом нам поможет почтовый ящик вызывающего процесса:
Process.info(task.owner)[:messages]
[{#Reference<0.0.7.207>, 10},
{:DOWN, #Reference<0.0.7.207>, :process, #PID<0.82.0>, :normal}]
task.owner
— это и есть текущий процесс или self()
. Здесь также можно видеть два входящих сообщения. Первое из них — это отклик на задачу async
, а второе — уведомление от ссылки на Task-процесс о его завершении. Чтобы получить доступ к этим сообщениям, можно использовать блок receive, но модуль Task
предлагает более простое решение:
Task.await(task)
10
Если сейчас взглянуть на входящие сообщения текущему процессу, то можно заметить, что они исчезли:
Process.info(task.owner)[:messages]
[]
Параллельное выполнение функции map при помощи модуля Task
Рассмотрим пример работы с map, построенный с помощью функций async
и await
модуля Task. Этот же пример, но без использования Task
, приведён в предыдущей статье. Можно видеть, что теперь он значительно упростился:
defmodule Statuses do
def map(urls) do
urls
|> Enum.map(&(Task.async(fn -> process(&1) end)))
|> Enum.map(&(Task.await(&1)))
end
def process(url) do
case HTTPoison.get(url) do
{:ok, %HTTPoison.Response{status_code: status_code}} ->
{url, status_code}
{:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason}
end
end
end
Проведём тестирование по той же схеме, что и в прошлый раз:
defmodule StatusesTest do
use ExUnit.Case
test "parallel status map" do
urls = [
url1 = "http://www.fakeresponse.com/api/?sleep=2",
url2 = "http://www.fakeresponse.com/api/?sleep=1",
url3 = "http://www.fakeresponse.com/api/?status=500",
url4 = "https://www.leighhalliday.com",
url5 = "https://www.reddit.com"
]
assert Statuses.map(urls) == [
{url1, 200},
{url2, 200},
{url3, 500},
{url4, 200},
{url5, 200}
]
end
end
На тесты ушло около 2,1 секунды, что сигнализирует о работоспособности системы, поскольку это время сопоставимо с длительностью самого медленного HTTP-вызова в 2 секунды.
Модуль Agent
Агенты упрощают процедуру хранения состояния в процессе: они позволяют производить обмен состоянием/данными между различными процессами без необходимости передачи большого объёма данных каждой функции, которой может понадобиться доступ к ним.
Создадим с помощью Agent небольшой модуль для получения и установки ключей конфигураций. Функция start_link
запускает Agent-процесс, после чего функции get
и set
получают доступ к конфигурационным данным и обновляют их.
defmodule Configure do
def start_link(initial \\ %{}) do
Agent.start_link(fn -> initial end, name: __MODULE__)
end
def get(key) do
Agent.get(__MODULE__, &Map.fetch(&1, key))
end
def set(key, value) do
Agent.update(__MODULE__, &Map.put(&1, key, value))
end
end
Вы наверняка заметили, что в примере выше обошлось без метода return и PID по той причине, что этот процесс привязан к имени модуля, а значит, такой процесс будет единственным. Проведём несколько тестов, чтобы убедиться, что всё работает как надо.
defmodule ConfigureTest do
use ExUnit.Case
test "get with initial value" do
Configure.start_link(%{env: :production})
assert Configure.get(:env) == {:ok, :production}
end
test "error when missing value" do
Configure.start_link()
assert Configure.get(:env) == :error
end
test "set and then get value" do
Configure.start_link()
Configure.set(:env, :production)
assert Configure.get(:env) == {:ok, :production}
end
end
Состояния агентов хранятся в памяти, в связи с чем следует запоминать только те данные, которые можно было бы быстро восстановить с помощью какой-либо стратегии перезапуска (через супервизор!). Состояния могут принадлежать к любому типу данных Elixir, несмотря на то, что в предыдущем примере использовался тип Map
.
В Elixir все процессы обрабатывают запросы последовательно. Даже при том, что Elixir является языком параллельного программирования, каждый процесс обрабатывает только один запрос в единицу времени. Если запрос к процессу выполняется медленно, доступ к данным в этом процессе будет перекрыт. На самом деле это даже удобно. Запросы обрабатываются последовательно в порядке их поступления, так что в некотором смысле это вполне предсказуемо. Нужно лишь удостовериться, что «тяжелые» задачи и медленно выполняющиеся вычисления осуществляются не в процессе агента, а в другом месте, например, в вызывающем процессе.
Если вам интересны более мощные и гибкие компоненты, то прочитайте про ETS. В сети можно найти замечательную статью Барри Джонса о сопоставлении ETS, агентов, Redis и других внешних инструментов.
Модуль GenServer
Итак, мы рассмотрели управление выполнением параллельных вычислений с использованием задач и управление состоянием в процессе с помощью агентов. Теперь обратимся к модулю GenServer, объединяющему работу с двумя описанными выше процедурами.
Воспользуемся примером MyLogger
из этой статьи и перепишем его с использованием GenServer
.
defmodule MyLogger do
use GenServer
# Client
def start_link do
GenServer.start_link(__MODULE__, 0)
end
def log(pid, msg) do
GenServer.cast(pid, {:log, msg})
end
def print_stats(pid) do
GenServer.cast(pid, {:print_stats})
end
def return_stats(pid) do
GenServer.call(pid, {:return_stats})
end
# Server
def handle_cast({:log, msg}, count) do
IO.puts msg
{:noreply, count + 1}
end
def handle_cast({:print_stats}, count) do
IO.puts "I've logged #{count} messages"
{:noreply, count}
end
def handle_call({:return_stats}, _from, count) do
{:reply, count, count}
end
end
Во-первых, в самом начале модуля включаем в него GenServer
. Это позволит в полной мере пользоваться функциональностью GenServer
.
Следом идёт функция start_link
. Она вызывает функцию GenServer.start_link
, создаёт новый процесс для текущего модуля и передаёт начальное состояние, которое в данном случае равняется 0. Она возвращает кортеж {:ok, pid}
.
GenServer может работать как синхронно, так и асинхронно. Синхронные запросы, при которых необходимо блокировать действия в ожидании ответа, называются call
, асинхронные — cast
.
Асинхронные запросы / Cast
При вызове GenServer.cast(pid, {:log, msg})
внутри функции log
, процессу будет отправлено сообщение с кортежем {:log, msg}
в виде аргументов. Теперь предстоит написать функцию для обработки этого сообщения.
Создадим функцию handle_cast
, в которой первый аргумент должен совпадать с полученными входящими аргументами {:log, msg}
, а второй аргумент представляет собой текущее состояние процесса.
В теле функции handle_cast
следует выполнить задачу чтения сообщения и отправки отклика, содержащего атом :noreply
и новое состояние процесса.
Синхронные запросы / Call
В случае если от процесса требуется своего рода отклик, нужно использовать синхронные запросы call
. GenServer.call(pid, {:return_stats})
посылает процессу сообщение с аргументом {:return_stats}
. По тому же принципу, что и для cast
, нужно реализовать функцию handle_call
для обработки сообщения и ответа на него.
def handle_call({:return_stats}, _from, count) do
{:reply, count, count}
end
Аргументы запросов call
несколько разнятся с аргументами cast
. Они включают дополнительный аргумент from
— процесс, пославший запрос. Отклик также отличен. Это кортеж, состоящий из трех элементов: атома :reply
, значения, которое нужно выслать вызывающему процессы в качестве отклика, и нового состояния процесса.
В приведённом выше примере возвращаемое значение и новое состояние — одно и то же, поэтому count
повторяется дважды.
Теперь модуль готов к практическому использованию!
{:ok, pid} = MyLogger.start_link
MyLogger.log(pid, "First message")
MyLogger.log(pid, "Another message")
MyLogger.print_stats(pid)
stats = MyLogger.return_stats(pid)
Выводы
В данной статье были рассмотрены три абстракции для параллельного программирования на Elixir, построенные на основе предоставленных языком программных возможностей. Первая из них — модуль Task
, позволяющий исполнять код внутри отдельного процесса и при необходимости ожидать ответа. Вторая — Agent
— фреймворк для управления состоянием внутри процесса. Третья — GenServer
— фреймворк для синхронного и асинхронного исполнения кода и управления состоянием одновременно.
Не хватает только реализации возможности отслеживания процессов и реагирования на их завершение. Это, конечно, можно проделать вручную, но в таком случае вы рискуете остаться без «фантастических» способностей супервизоров и OTP. Фактически, они позволяют отслеживать процессы и предпринимать действия по их завершению, автоматически перезапуская отслеживаемые процессы и их потомков. Но об этом в следующий раз!