Процессы в конкурентной модели Elixir

Введение

Процессы — фундамент конкурентной модели Elixir. Вместо потоков операционной системы в Elixir реализованы высокопроизводительные легковесные «зелёные» потоки, а процессы взаимодействуют и синхронизируются друг с другом посредством передачи сообщений. Elixir- и Erlang-разработчикам не приходится беспокоиться о производительности при одновременном запуске нескольких тысяч процессов, поскольку они являются изолированными, а память освобождается после их отработки или уничтожения.

Создание процесса

Для создания нового процесса в Elixir используется функция spawn. В неё можно передать либо анонимную функцию, либо функцию с аргументами из модуля. Новый процесс создастся в текущем процессе виртуальной машины Beam, однако эти два процесса совершенно не связаны друг с другом и не имеют общих данных. Все необходимые переменные копируются в только что созданные процессы.

Использование анонимной функции

spawn fn ->
IO.inspect "Process body here."
end

Использование функции модуля

spawn Module, :function, [arg1, arg2]

Обмен сообщениями

Elixir опирается на Erlang и его мощную виртуальную машину в целях обеспечения конкурентности, основанной на принципах модели акторов. Данные передаются от процесса к процессу с помощью сообщений, посылаемых идентификаторам.

pid = spawn Module, :function, [arg1, arg2]
pid2 = spawn fn -> IO.inspect "Current Process: #{self}." end

Идентификаторы процессов

При создании нового процесса в качестве возвращаемого параметра выступает идентификатор процесса pid. pid в Elixir — это тип данных, используемый для работы с процессами. С помощью идентификатора можно проверить статус процесса, послать ему сообщения, уничтожить его и многое другое. Функция self позволяет получить pid текущего процесса.

pid = spawn Account, :add, []
send pid, {:deposit, 1_000_000}

Отправка сообщений

Послать сообщение процессу можно с помощью макроса send. В него потребуется передать идентификатор pid процесса-получателя и само сообщение. Сообщение может представлять собой любой тип данных, однако для Elixir-сообщества характерно использование атомов или тегированных кортежей, которые можно сопоставлять с образцом для проверки на ошибки.

Получение сообщений

Обработать все сообщения в почтовом ящике процесса поможет макрос receive. Можно обрабатывать все полученные сообщения таким способом или же проводить сопоставлением с образцом.

Получение обычным способом

receive do
  message ->
    IO.inspect "Generic method to handle all the messages: #{message}."
  end
end

Сопоставление с образцом

receive do
  {:ok, data} ->
    IO.puts "Received data: #{data}."
  {:error, reason} ->
    IO.puts "Failed to get data: #{reason}."
  end
end

receive продолжит работать в фоновом режиме, ожидая получения новых сообщений. Тем не менее в блоке after макроса recieve есть возможность указать необходимое время ожидания.

receive do
  ...
after 500 ->
  IO.puts "Didn't get any messages."
end

Пример

Процессы выгодно использовать в тех случаях, когда последовательное выполнение заданий занимает слишком много времени. Например, реализация параллельного загрузчика файлов: просто передаём ему набор ссылок, и файлы загружаются параллельно. Это значительно сокращает время выполнения задачи по сравнению с линейным способом реализации.

Пример кода

defmodule Downloader do
  def pget(url) when is_list(url) do
    url
    |> Enum.map(&spawn_process(&1, self))
    |> Enum.map(&await/1)
  end

  defp spawn_process(url, parent) do
    spawn_link fn ->
      case HTTPoison.get(url) do
        {:ok, %HTTPoison.Response{body: body, headers: _headers, status_code: 200}} ->
          send parent, {:ok, body}
        {:error, %HTTPoison.Error{reason: reason}} ->
          send parent, {:error, "Failed: #{reason}."}
        _ ->
          send parent, {:error, "Failed."}
      end
    end
  end

  defp await(pid) do
    receive do
      {:ok, body} ->
        File.write("./#{:rand.uniform(1_000_000)}.html", body)
      {:error, reason} ->
        IO.puts "#{reason}"
    end
  end
end

Описание

Модуль Downloader содержит функцию pget, в который помещаются ссылки. В pget имеется охранное условие when is_list(url), проверяющее, что функция вызывается только если ему передать аргумент list.

Ссылки нумеруются и передаются функции spawn_process, который запрашивает содержимое той или иной ссылки (url), используя библиотеку HTTPoison. Функции spawn_process также передаётся pid родительского процесса. В spawn_process проводится сопоставление полученного ответа с образцом, после чего с помощью макроса send в pid родительского процесса направляется сообщение в форме тегированного кортежа.

По завершении spawn_process функция await обрабатывает все сообщения, полученные родительскими процессами от дочерних, созданных с помощью функции spawn_process. Если получен кортеж :ok, он сохраняется в файл, названный случайным именем, в противном случае в stdout регистрируется ошибка.

Важно отметить, что новый процесс создаётся в течение нескольких микросекунд. Это может привести к задержке при работе с заданиями, которые завершаются сразу же после запуска. В таком случае использование одного процесса, возможно, будет более эффективным решением.

© 2017 / Россия Любые мысли и вопросы пишите на elixir@wunsh.ru.