Параллельное программирование на Elixir

Elixir предоставляет разработчикам простые инструменты для написания конкурентного кода. В одной из предыдущих статей была освещена конкурентная модель Elixir через призму основных структурных элементов конкурентности. В Elixir процессы подчиняются конкурентной модели акторов и являются фундаментом, на основе которого другие процессы обмениваются между собой сообщениями.

При реализации типовых задач вы наверняка уже сталкивались с необходимостью осуществления значительного количества стандартных действий, таких как отслеживание состояния внутри процесса или выполнение блокирующего вызова и ожидание ответа.

На этот счёт Elixir предоставляет ряд общепринятых абстракций, ещё более облегчающих написание конкурентного кода. В данной статье будут рассмотрены три таких абстракции: Task, Agent и GenServer.

Модуль Task

Модуль Task упрощает работу с конкурентными процессами еще больше. В нём наиболее примечательны два метода: async — начать асинхронное выполнение операций в другом процессе и await — ждать завершения операции и возвратить результат.

Для иллюстрации этого рассмотрим простейший пример, а затем выясним, каким образом он поможет ускорить работу map через распараллеливание.

task = Task.async(fn -> 5 + 5 end)

Команда в строке выше запускает анонимную функцию в отдельном процессе. Если на стадии разработки у вас под рукой имеется ссылка на задачу, вы сможете получить доступ к результату данной анонимной функции. Функция async возвращает не что иное, как ссылку на необходимый для получения результата PID процесса, в котором выполняется данная задача:

%Task{owner: #PID<0.80.0>, pid: #PID<0.82.0>, ref: #Reference<0.0.7.207>}

Давайте взглянем на этот PID, чтобы понять, что происходит с текущим процессом:

Process.info(task.pid)
nil
Process.alive?(task.pid)
false

Как можно заметить, такого процесса больше не существует; процесс завершается сразу после того, как модуль Task завершит работу. Как же тогда получить от него отклик? В этом нам поможет почтовый ящик вызывающего процесса:

Process.info(task.owner)[:messages]
[{#Reference<0.0.7.207>, 10},
{:DOWN, #Reference<0.0.7.207>, :process, #PID<0.82.0>, :normal}]

task.owner — это и есть текущий процесс или self(). Здесь также можно видеть два входящих сообщения. Первое из них — это отклик на задачу async, а второе — уведомление от ссылки на Task-процесс о его завершении. Чтобы получить доступ к этим сообщениям, можно использовать блок receive, но модуль Task предлагает более простое решение:

Task.await(task)
10

Если сейчас взглянуть на входящие сообщения текущему процессу, то можно заметить, что они исчезли:

Process.info(task.owner)[:messages]
[]

Параллельное выполнение функции map при помощи модуля Task

Рассмотрим пример работы с map, построенный с помощью функций async и await модуля Task. Этот же пример, но без использования Task, приведён в предыдущей статье. Можно видеть, что теперь он значительно упростился:

defmodule Statuses do
  def map(urls) do
    urls
    |> Enum.map(&(Task.async(fn -> process(&1) end)))
    |> Enum.map(&(Task.await(&1)))
  end

  def process(url) do
    case HTTPoison.get(url) do
      {:ok, %HTTPoison.Response{status_code: status_code}} ->
        {url, status_code}
      {:error, %HTTPoison.Error{reason: reason}} ->
        {:error, reason}
    end
  end
end

Проведём тестирование по той же схеме, что и в прошлый раз:

defmodule StatusesTest do
  use ExUnit.Case

  test "parallel status map" do
    urls = [
      url1 = "http://www.fakeresponse.com/api/?sleep=2",
      url2 = "http://www.fakeresponse.com/api/?sleep=1",
      url3 = "http://www.fakeresponse.com/api/?status=500",
      url4 = "https://www.leighhalliday.com",
      url5 = "https://www.reddit.com"
    ]
    assert Statuses.map(urls) == [
      {url1, 200},
      {url2, 200},
      {url3, 500},
      {url4, 200},
      {url5, 200}
    ]
  end
end

На тесты ушло около 2,1 секунды, что сигнализирует о работоспособности системы, поскольку это время сопоставимо с длительностью самого медленного HTTP-вызова в 2 секунды.

Модуль Agent

Агенты упрощают процедуру хранения состояния в процессе: они позволяют производить обмен состоянием/данными между различными процессами без необходимости передачи большого объёма данных каждой функции, которой может понадобиться доступ к ним.

Создадим с помощью Agent небольшой модуль для получения и установки ключей конфигураций. Функция start_link запускает Agent-процесс, после чего функции get и set получают доступ к конфигурационным данным и обновляют их.

defmodule Configure do
  def start_link(initial \\ %{}) do
    Agent.start_link(fn -> initial end, name: __MODULE__)
  end

  def get(key) do
    Agent.get(__MODULE__, &Map.fetch(&1, key))
  end

  def set(key, value) do
    Agent.update(__MODULE__, &Map.put(&1, key, value))
  end
end

Вы наверняка заметили, что в примере выше обошлось без метода return и PID по той причине, что этот процесс привязан к имени модуля, а значит, такой процесс будет единственным. Проведём несколько тестов, чтобы убедиться, что всё работает как надо.

defmodule ConfigureTest do
  use ExUnit.Case

  test "get with initial value" do
    Configure.start_link(%{env: :production})
    assert Configure.get(:env) == {:ok, :production}
  end

  test "error when missing value" do
    Configure.start_link()
    assert Configure.get(:env) == :error
  end

  test "set and then get value" do
    Configure.start_link()
    Configure.set(:env, :production)
    assert Configure.get(:env) == {:ok, :production}
  end
end

Состояния агентов хранятся в памяти, в связи с чем следует запоминать только те данные, которые можно было бы быстро восстановить с помощью какой-либо стратегии перезапуска (через супервизор!). Состояния могут принадлежать к любому типу данных Elixir, несмотря на то, что в предыдущем примере использовался тип Map.

В Elixir все процессы обрабатывают запросы последовательно. Даже при том, что Elixir является языком параллельного программирования, каждый процесс обрабатывает только один запрос в единицу времени. Если запрос к процессу выполняется медленно, доступ к данным в этом процессе будет перекрыт. На самом деле это даже удобно. Запросы обрабатываются последовательно в порядке их поступления, так что в некотором смысле это вполне предсказуемо. Нужно лишь удостовериться, что «тяжелые» задачи и медленно выполняющиеся вычисления осуществляются не в процессе агента, а в другом месте, например, в вызывающем процессе.

Если вам интересны более мощные и гибкие компоненты, то прочитайте про ETS. В сети можно найти замечательную статью Барри Джонса о сопоставлении ETS, агентов, Redis и других внешних инструментов.

Модуль GenServer

Итак, мы рассмотрели управление выполнением параллельных вычислений с использованием задач и управление состоянием в процессе с помощью агентов. Теперь обратимся к модулю GenServer, объединяющему работу с двумя описанными выше процедурами.

Воспользуемся примером MyLogger из этой статьи и перепишем его с использованием GenServer.

defmodule MyLogger do
  use GenServer

  # Client

  def start_link do
    GenServer.start_link(__MODULE__, 0)
  end

  def log(pid, msg) do
    GenServer.cast(pid, {:log, msg})
  end

  def print_stats(pid) do
    GenServer.cast(pid, {:print_stats})
  end

  def return_stats(pid) do
    GenServer.call(pid, {:return_stats})
  end

  # Server

  def handle_cast({:log, msg}, count) do
    IO.puts msg
    {:noreply, count + 1}
  end

  def handle_cast({:print_stats}, count) do
    IO.puts "I've logged #{count} messages"
    {:noreply, count}
  end

  def handle_call({:return_stats}, _from, count) do
    {:reply, count, count}
  end
end

Во-первых, в самом начале модуля включаем в него GenServer. Это позволит в полной мере пользоваться функциональностью GenServer.

Следом идёт функция start_link. Она вызывает функцию GenServer.start_link, создаёт новый процесс для текущего модуля и передаёт начальное состояние, которое в данном случае равняется 0. Она возвращает кортеж {:ok, pid}.

GenServer может работать как синхронно, так и асинхронно. Синхронные запросы, при которых необходимо блокировать действия в ожидании ответа, называются call, асинхронные — cast.

Асинхронные запросы / Cast

При вызове GenServer.cast(pid, {:log, msg}) внутри функции log, процессу будет отправлено сообщение с кортежем {:log, msg} в виде аргументов. Теперь предстоит написать функцию для обработки этого сообщения.

Создадим функцию handle_cast, в которой первый аргумент должен совпадать с полученными входящими аргументами {:log, msg}, а второй аргумент представляет собой текущее состояние процесса.

В теле функции handle_cast следует выполнить задачу чтения сообщения и отправки отклика, содержащего атом :noreply и новое состояние процесса.

Синхронные запросы / Call

В случае если от процесса требуется своего рода отклик, нужно использовать синхронные запросы call. GenServer.call(pid, {:return_stats}) посылает процессу сообщение с аргументом {:return_stats}. По тому же принципу, что и для cast, нужно реализовать функцию handle_call для обработки сообщения и ответа на него.

def handle_call({:return_stats}, _from, count) do
  {:reply, count, count}
end

Аргументы запросов call несколько разнятся с аргументами cast. Они включают дополнительный аргумент from — процесс, пославший запрос. Отклик также отличен. Это кортеж, состоящий из трех элементов: атома :reply, значения, которое нужно выслать вызывающему процессы в качестве отклика, и нового состояния процесса.

В приведённом выше примере возвращаемое значение и новое состояние — одно и то же, поэтому count повторяется дважды.

Теперь модуль готов к практическому использованию!

{:ok, pid} = MyLogger.start_link
MyLogger.log(pid, "First message")
MyLogger.log(pid, "Another message")
MyLogger.print_stats(pid)
stats = MyLogger.return_stats(pid)

Выводы

В данной статье были рассмотрены три абстракции для параллельного программирования на Elixir, построенные на основе предоставленных языком программных возможностей. Первая из них — модуль Task, позволяющий исполнять код внутри отдельного процесса и при необходимости ожидать ответа. Вторая — Agent — фреймворк для управления состоянием внутри процесса. Третья — GenServer — фреймворк для синхронного и асинхронного исполнения кода и управления состоянием одновременно.

Не хватает только реализации возможности отслеживания процессов и реагирования на их завершение. Это, конечно, можно проделать вручную, но в таком случае вы рискуете остаться без «фантастических» способностей супервизоров и OTP. Фактически, они позволяют отслеживать процессы и предпринимать действия по их завершению, автоматически перезапуская отслеживаемые процессы и их потомков. Но об этом в следующий раз!

© 2020 / Россия Любые мысли и вопросы пишите на elixir@wunsh.ru.