Периодические задачи в Эликсире

Большинство проектов, над которыми мы работаем, так или иначе включают в себя периодические задачи, будь то ежедневная рассылка, обработка данных в ночное время, периодические запросы на API без веб-хуков и т.п.

На Руби проблема решается всегда одинаково: создаётся задача в cron, вызывающая выполнение основного действия. Здесь может возникнуть пару проблем. Первая – с форматом crontab не всегда удаётся совладать, вторая – где хранить конфигурацию и как убедиться, что она актуальна.

Гемы вроде Whenever помогали привести код в порядок, но с Эликсиром всё можно сделать гораздо симпатичнее.

Использование receive/1 для тайм-аутов

Функция Эликсира receive/1 определена в модуле Kernel.SpecialForms и доступна в любом месте кода без указания префикса модуля.

Она обрабатывает дополнительное условие after, которое содержит значение тайм-аута и выполняется в случае, если процесс не получил соответствующее сообщение в определённый период времени. Сочетая эту функцию с рекурсивным вызовом, можно добиться выполнения кода в заданные промежутки времени.

Поясню свои слова небольшим примером:

defmodule Example do
  def process() do
    receive do
      after
        5_000 ->
          IO.puts "5 seconds elapsed"
          process()
      end
    end
  end
end

Если в этом примере вызвать функцию process/0, каждые пять секунд на экран будет выводиться надпись «прошло 5 секунд».

Создание периодического модуля Task

Вышеупомянутая функция сама по себе не особо практична, так как она ничего не возвращает и не начинает работу сразу при запуске приложения.

Можно создать модуль для запуска процесса в Task, который может быть добавлен в дерево супервизоров:

defmodule Example.BitcoinPriceUpdater do
  use Task
  
  def start_link(_arg) do
    Task.start_link(&poll/0)
  end
  
  def poll() do
    receive do
    after
      60_000 ->
        get_price()
        poll()
    end
  end
  
  defp get_price() do
    # Call API & Persist
    IO.puts "To the moon!"
  end
end

При использовании use Task в шапке модуля в Эликсире версии 1.5 и выше можно задействовать супервизор следующим образом:

defmodule Example.Application do
  @moduledoc false
  
  use Application
  
  def start(_type, _args) do
    children = [
      Example.BitcoinPriceUpdater
    ]
  
    opts = [strategy: :one_for_one, name: Example.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

В более ранних версиях Эликсира придётся указать всех потомков, например, вот так:

children = [
  worker(Example.BitcoinPriceUpdater, [])
]

И в том, и в другом случае модуль является частью дерева супервизоров, которое запускается вместе с приложением и периодически выполняет заданный участок кода.

Таким образом, мы получили небольшой и простой модуль, с помощью которого в проект с лёгкостью можно добавлять периодические задачи, не беспокоясь о постоянном обновлении crontab.

Можно сказать, что тайм-аут достаточно точный для большинства периодических задач, даже при использовании в продакшне.

Однако не стоит полностью полагаться на приведённое решение в тех случаях, когда точность стоит на первом месте. Также не забывайте, что если процесс упадёт или приложение будет перезапущено, таймер начинается с нуля.

Часть 2. Разбираем проблемы и двигаемся дальше

Двумя наиболее обсуждаемыми проблемами вышеприведённых примеров являются использование функции Process.send_after и возникающий сдвиг времени начала выполнения задачи. Давайте с ними разберёмся.

Process.send_after/3

Воспроизведём в новых условиях скелет модуля, аналогичный предыдущему примеру:

defmodule Periodic.Safter do
  use GenServer
  
  def start_link() do
    GenServer.start_link(__MODULE__, %{})
  end
  
  def init(state) do
    schedule_work()
    {:ok, state}
  end
  
  def handle_info(:work, state) do
    # do important stuff
    IO.puts "Important stuff in progress..."
    schedule_work()
    {:noreply, state}
  end
  
  defp schedule_work() do
    Process.send_after(self(), :work, 1_000)
  end
end

Изначально функция была представлена в ответе Жозе Валима на StackOverflow в 2015 году, но также её можно найти в документации модуля GenServer в разделе «Receive regular messages».

На первый взгляд можно заключить, что обе функции работают одинаково, поэтому основное различие сводится к семантике.

Для простейших случаев и там, где нет необходимости отслеживать состояние, я всё же советовал бы следовать оригинальному решению, работая с состоянием традиционным, свойственным для GenServer методом с помощью пустого блока receive.

Сдвиг начала выполнения задач

Оба описанных способа подвержены сдвигу во времени, когда время завершения определённой задачи смещает начало выполнения следующей.

Скажем, нам бы хотелось выполнять определённое действие каждую секунду, но его завершение занимает 100 миллисекунд. По прошествии 10 секунд будет завершено только 9 действий, потому что время начала выполнения каждого из них было смещено.

В большинстве случаев самое простое, что можно сделать, – это породить новый процесс для выполнения заданного действия и поручить текущему процессу планирование следующего действия.

Возьмём предыдущий пример с GenServer:

def handle_info(:work, state) do
  # do important stuff
  IO.puts "Important stuff in progress..."
  schedule_work()
  {:noreply, state}
end

Вместо этого можно просто сделать следующее:

def handle_info(:work, state) do
  spawn_link(&do_work/0)
  schedule_work()
  {:noreply, state}
end

defp do_work() do
  # do important stuff
  IO.puts "Important stuff in progress..."
end

Используя spawn_link/1, можно добиться того же поведения по отношению к обработке выходного сигнала и тому, что мы получаем, когда выполняющий задачу процесс падает.

В Эликсире новые процессы порождаются супербыстро, что практически устраняет сдвиг. Забавы ради я решил проверить, сколько по времени занимает работа spawn_link/1, и в среднем получилось 10 микросекунд 😍 Путём простейших математических вычислений получаем сдвиг в 1 секунду при выполнении 100 000 задач.

Более сложное и более точное решение – поместить время выполнения в состоянии GenServer, вычислить значение сдвига, после чего использовать это значение в качестве аргумента для функции schedule_work, чтобы потом вычесть его из времени ожидания и компенсировтаь сдвиг.

Библиотеки

Напоследок упомянем о нескольких библиотеках, которые помогут в решении поставленной в начале статьи задачи.

  • Erlcron – популярная библиотека Эрланга, предоставляющая неплохой функционал для читабельного описания задач.

  • Quantum – хорошо поддерживаемая и часто используемая библиотека для Эликсира. Quantum описывает задачи в файле конфигурации, используя синтаксис cron:

# Каждую минуту
{"* * * * *", {Heartbeat, :send, []}}

# Каждую полночь
{"@daily", {Backup, :backup, []}}
© 2020 / Россия Любые мысли и вопросы пишите на elixir@wunsh.ru.