Параллельное программирование на Elixir
Elixir предоставляет разработчикам простые инструменты для написания конкурентного кода. В одной из предыдущих статей была освещена конкурентная модель Elixir через призму основных структурных элементов конкурентности. В Elixir процессы подчиняются конкурентной модели акторов и являются фундаментом, на основе которого другие процессы обмениваются между собой сообщениями.
При реализации типовых задач вы наверняка уже сталкивались с необходимостью осуществления значительного количества стандартных действий, таких как отслеживание состояния внутри процесса или выполнение блокирующего вызова и ожидание ответа.
На этот счёт Elixir предоставляет ряд общепринятых абстракций, ещё более облегчающих написание конкурентного кода. В данной статье будут рассмотрены три таких абстракции: Task, Agent и GenServer.
Модуль Task
Модуль Task
упрощает работу с конкурентными процессами еще больше. В нём наиболее примечательны два метода: async
— начать асинхронное выполнение операций в другом процессе и await
— ждать завершения операции и возвратить результат.
Для иллюстрации этого рассмотрим простейший пример, а затем выясним, каким образом он поможет ускорить работу map
через распараллеливание.
Команда в строке выше запускает анонимную функцию в отдельном процессе. Если на стадии разработки у вас под рукой имеется ссылка на задачу, вы сможете получить доступ к результату данной анонимной функции. Функция async
возвращает не что иное, как ссылку на необходимый для получения результата PID процесса, в котором выполняется данная задача:
Давайте взглянем на этот PID
, чтобы понять, что происходит с текущим процессом:
Как можно заметить, такого процесса больше не существует; процесс завершается сразу после того, как модуль Task завершит работу. Как же тогда получить от него отклик? В этом нам поможет почтовый ящик вызывающего процесса:
task.owner
— это и есть текущий процесс или self()
. Здесь также можно видеть два входящих сообщения. Первое из них — это отклик на задачу async
, а второе — уведомление от ссылки на Task-процесс о его завершении. Чтобы получить доступ к этим сообщениям, можно использовать блок receive, но модуль Task
предлагает более простое решение:
Если сейчас взглянуть на входящие сообщения текущему процессу, то можно заметить, что они исчезли:
Параллельное выполнение функции map при помощи модуля Task
Рассмотрим пример работы с map, построенный с помощью функций async
и await
модуля Task. Этот же пример, но без использования Task
, приведён в предыдущей статье. Можно видеть, что теперь он значительно упростился:
Проведём тестирование по той же схеме, что и в прошлый раз:
На тесты ушло около 2,1 секунды, что сигнализирует о работоспособности системы, поскольку это время сопоставимо с длительностью самого медленного HTTP-вызова в 2 секунды.
Модуль Agent
Агенты упрощают процедуру хранения состояния в процессе: они позволяют производить обмен состоянием/данными между различными процессами без необходимости передачи большого объёма данных каждой функции, которой может понадобиться доступ к ним.
Создадим с помощью Agent небольшой модуль для получения и установки ключей конфигураций. Функция start_link
запускает Agent-процесс, после чего функции get
и set
получают доступ к конфигурационным данным и обновляют их.
Вы наверняка заметили, что в примере выше обошлось без метода return и PID по той причине, что этот процесс привязан к имени модуля, а значит, такой процесс будет единственным. Проведём несколько тестов, чтобы убедиться, что всё работает как надо.
Состояния агентов хранятся в памяти, в связи с чем следует запоминать только те данные, которые можно было бы быстро восстановить с помощью какой-либо стратегии перезапуска (через супервизор!). Состояния могут принадлежать к любому типу данных Elixir, несмотря на то, что в предыдущем примере использовался тип Map
.
В Elixir все процессы обрабатывают запросы последовательно. Даже при том, что Elixir является языком параллельного программирования, каждый процесс обрабатывает только один запрос в единицу времени. Если запрос к процессу выполняется медленно, доступ к данным в этом процессе будет перекрыт. На самом деле это даже удобно. Запросы обрабатываются последовательно в порядке их поступления, так что в некотором смысле это вполне предсказуемо. Нужно лишь удостовериться, что «тяжелые» задачи и медленно выполняющиеся вычисления осуществляются не в процессе агента, а в другом месте, например, в вызывающем процессе.
Если вам интересны более мощные и гибкие компоненты, то прочитайте про ETS. В сети можно найти замечательную статью Барри Джонса о сопоставлении ETS, агентов, Redis и других внешних инструментов.
Модуль GenServer
Итак, мы рассмотрели управление выполнением параллельных вычислений с использованием задач и управление состоянием в процессе с помощью агентов. Теперь обратимся к модулю GenServer, объединяющему работу с двумя описанными выше процедурами.
Воспользуемся примером MyLogger
из этой статьи и перепишем его с использованием GenServer
.
Во-первых, в самом начале модуля включаем в него GenServer
. Это позволит в полной мере пользоваться функциональностью GenServer
.
Следом идёт функция start_link
. Она вызывает функцию GenServer.start_link
, создаёт новый процесс для текущего модуля и передаёт начальное состояние, которое в данном случае равняется 0. Она возвращает кортеж {:ok, pid}
.
GenServer может работать как синхронно, так и асинхронно. Синхронные запросы, при которых необходимо блокировать действия в ожидании ответа, называются call
, асинхронные — cast
.
Асинхронные запросы / Cast
При вызове GenServer.cast(pid, {:log, msg})
внутри функции log
, процессу будет отправлено сообщение с кортежем {:log, msg}
в виде аргументов. Теперь предстоит написать функцию для обработки этого сообщения.
Создадим функцию handle_cast
, в которой первый аргумент должен совпадать с полученными входящими аргументами {:log, msg}
, а второй аргумент представляет собой текущее состояние процесса.
В теле функции handle_cast
следует выполнить задачу чтения сообщения и отправки отклика, содержащего атом :noreply
и новое состояние процесса.
Синхронные запросы / Call
В случае если от процесса требуется своего рода отклик, нужно использовать синхронные запросы call
. GenServer.call(pid, {:return_stats})
посылает процессу сообщение с аргументом {:return_stats}
. По тому же принципу, что и для cast
, нужно реализовать функцию handle_call
для обработки сообщения и ответа на него.
Аргументы запросов call
несколько разнятся с аргументами cast
. Они включают дополнительный аргумент from
— процесс, пославший запрос. Отклик также отличен. Это кортеж, состоящий из трех элементов: атома :reply
, значения, которое нужно выслать вызывающему процессы в качестве отклика, и нового состояния процесса.
В приведённом выше примере возвращаемое значение и новое состояние — одно и то же, поэтому count
повторяется дважды.
Теперь модуль готов к практическому использованию!
Выводы
В данной статье были рассмотрены три абстракции для параллельного программирования на Elixir, построенные на основе предоставленных языком программных возможностей. Первая из них — модуль Task
, позволяющий исполнять код внутри отдельного процесса и при необходимости ожидать ответа. Вторая — Agent
— фреймворк для управления состоянием внутри процесса. Третья — GenServer
— фреймворк для синхронного и асинхронного исполнения кода и управления состоянием одновременно.
Не хватает только реализации возможности отслеживания процессов и реагирования на их завершение. Это, конечно, можно проделать вручную, но в таком случае вы рискуете остаться без «фантастических» способностей супервизоров и OTP. Фактически, они позволяют отслеживать процессы и предпринимать действия по их завершению, автоматически перезапуская отслеживаемые процессы и их потомков. Но об этом в следующий раз!