Оптимизация Elixir- и Phoenix-приложений с помощью ETS

Многие Elixir-разработчики наверняка слышали о «ETS» или встречали в коде вызовы Erlang-модуля :ets, но, скорее всего, большинство из них всё ещё никогда не применяли этот подход на практике. Настало время это изменить.

ETS (Erlang Term Storage) — один из инструментов Erlang, который всё это время прятался на видном месте. Такие проекты, как Phoenix.PubSub, Phoenix.Presence и Registry, наряду с многочисленными Elixir- и Erlang-модулями пользуются преимуществами ETS. В двух словах: ETS — это хранилище данных с быстрым доступом для объектов Elixir и Erlang, главной особенностью которого является возможность получения состояния за пределами процесса и без передачи сообщений. Прежде чем перейти к вопросу об уместности использования ETS, пробежимся по основам основ, запустив iex.

Сначала создадим ETS-таблицу с помощью команды :ets.new/2:

iex> tab = :ets.new(:my_table, [:set])
8211

Ничего сложного. Мы создали таблицу типа :set, которая позволит связывать уникальные ключи со значениями подобно обычному хранилищу «ключ-значение». Добавим в неё парочку строк:

iex> :ets.insert(tab, {:key1, "value1"})
true
iex> :ets.insert(tab, {:key2, "value1"})
true

Теперь, имея несколько значений, осуществим поиск: elixir iex> :ets.lookup(tab, :key1) [key1: "value1"]

Заметьте, что повторного связывания таблицы с новым значением после вставки элементов не происходит. ETS-таблицами управляет виртуальная машина, и их существование зависит от процесса, в котором они были созданы. На практике это можно увидеть, уничтожив текущий процесс iex:

iex> Process.exit(self(), :kill)
** (EXIT from #PID<0.88.0>) killed

Interactive Elixir (1.4.4) - press Ctrl+C to exit (type h() ENTER for help)
iex> :ets.insert(8211, {:key1, "value1"})
** (ArgumentError) argument error
    (stdlib) :ets.insert(12307, {:key1, "value1"})

Как только это произойдет, прежние связи будут утеряны, но можно будет передать функции :ets.new/2 возвращаемый вызовом идентификатор таблицы. Можно заметить, что когда мы запросили доступ к таблице, после того как её процесс-создатель был остановлен, возникла ошибка ArgumentError. Такая автоматическая чистка таблиц и данных после аварийного завершения процесса-создателя — одна из главных особенностей ETS. Незачем беспокоиться об утечках памяти, если процесс завершается после создания таблиц и ввода данных. Здесь вы столкнетесь с мистическим API :ets и его зачастую неинформативными ошибками (в нашем случае ArgumentError), не содержащими каких-либо намеков на то, что могло их породить. Но чем чаще вы будете работать с модулем :ets, тем лучше познакомитесь с этими досадными ошибками и изучите документацию :ets вдоль и поперёк.

Функции, которые мы использовали, — лишь малая часть того, что предлагает ETS. Помните, что основное его преимущество в быстром чтении и записи в хранилище «ключ-значение» и возможности сопоставления большинства содержащихся в таблице объектов, за исключением объектов типа map. В рамках этой статьи мы рассмотрим только простейшие пары «ключ-значение» и несколько основных операций поиска, поэтому для получения более подробной информации о возможностях ETS лучше обратиться к документации.

Оптимизация доступа к GenServer с помощью ETS-таблицы

Оптимизация кода приносит свои плоды, особенно, когда не нужно менять публичный интерфейс. Рассмотрим один из часто используемых способов применения ETS для оптимизации доступа к состоянию, обёрнутому в GenServer.

Предположим, нужно разработать GenServer с ограничением скорости — процесс в приложении, который будет подсчитывать количество запросов пользователя и закрывать доступ к данным, если пользователем превышено определённое количество запросов в минуту. Очевидно, что состояние подсчета запросов пользователей и процесс, подчищающий это состояние раз в минуту, нужно будет где-то хранить. Наброски простейшего кода с GenServer выглядят примерно так:

defmodule RateLimiter do
  use GenServer
  require Logger

  @max_per_minute 5
  @sweep_after :timer.seconds(60)

  ## Client

  def start_link do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def log(uid) do
    GenServer.call(__MODULE__, {:log, uid})
  end

  ## Server
  def init(_) do
    schedule_sweep()
    {:ok, %{requests: %{}}}
  end

  def handle_info(:sweep, state) do
    Logger.debug("Sweeping requests")
    schedule_sweep()
    {:noreply, %{state | requests: %{}}}
  end

  def handle_call({:log, uid}, _from, state) do
    case state.requests[uid] do
      count when is_nil(count) or count < @max_per_minute ->
        {:reply, :ok, put_in(state, [:requests, uid], (count || 0) + 1)}
      count when count >= @max_per_minute ->
        {:reply, {:error, :rate_limited}, state}
    end
  end

  defp schedule_sweep do
    Process.send_after(self(), :sweep, @sweep_after)
  end
end

Сначала определим функцию start_link/0, которая запустит GenServer, используя модуль RateLimiter в качестве модуля обратного вызова. Сервер назовём так же, как и модуль, чтобы впоследствии ссылаться на него при обращении к функции log/1. Теперь определим функцию log/1, осуществляющую синхронный вызов к GenServer и производящую логирование запросов пользователя. На выходе ожидаем получить либо :ok, сигнализирующее о том, что пользователь не превысил допустимое количество запросов и может продолжать свои действия, либо {:error, :rate_limited} в противном случае.

Далее в init/1 вызываем функцию schedule_sweep/0, которая раз в минуту посылает серверу сообщения для удаления всех данных запросов. После этого определяем условие handle_info/2, чтобы перехватить событие :sweep и сбросить состояние запроса. И напоследок определяем условие handle_call/3, которое будет отслеживать состояние запроса пользователя и возвращать :ok или {:error, :rate_limited} вызывающей функции log/2.

Попробуем выполнить это в iex:

iex> RateLimiter.start_link()
{:ok, #PID<0.126.0>}
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
{:error, :rate_limited}

13:55:44.803 [debug] Sweeping requests
iex(9)> RateLimiter.log("user1")
:ok

Работает! Если количество запросов пользователя в минуту превысит 5, то сервер возвратит ожидаемое сообщение об ошибке. По прошествии некоторого времени вывод отладчика покажет сброс состояния, и счетчик количества запросов обнулится. Вроде неплохо, да? К сожалению, в приведённой выше реализации присутствуют серьёзные проблемы производительности. Давайте-ка разберёмся.

Поскольку нашей целью было ограничение количества запросов в единицу времени, все запросы пользователя должны проходить через заданный сервер. Сообщения обрабатываются последовательно, что сводит наше приложение в однопоточное, и этот единственный процесс превращается в узкое место программы.

ETS спешит на помощь

К счастью, Erlang-разработчики уже придумали решение этой проблемы за нас. Можно провести рефакторинг кода, чтобы сервер использовал общедоступную ETS-таблицу и клиенты могли вводить свои запросы напрямую в ETS, а процесс-создатель отвечал бы только за очистку таблицы. Это позволит производить чтение и запись конкурентно и избавит от необходимости упорядочивания вызовов на сервере. Что ж, реализуем всё вышесказанное:

defmodule RateLimiter do
  use GenServer
  require Logger

  @max_per_minute 5
  @sweep_after :timer.seconds(60)
  @tab :rate_limiter_requests

  ## Client

  def start_link do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  def log(uid) do
    case :ets.update_counter(@tab, uid, {2, 1}, {uid, 0}) do
      count when count > @max_per_minute -> {:error, :rate_limited}
      _count -> :ok
    end
  end

  ## Server
  def init(_) do
    :ets.new(@tab, [:set, :named_table, :public, read_concurrency: true,
                                                 write_concurrency: true])
    schedule_sweep()
    {:ok, %{}}
  end

  def handle_info(:sweep, state) do
    Logger.debug("Sweeping requests")
    :ets.delete_all_objects(@tab)
    schedule_sweep()
    {:noreply, state}
  end

  defp schedule_sweep do
    Process.send_after(self(), :sweep, @sweep_after)
  end
end

Во-первых, сделаем так, чтобы функция init/1 создавала ETS-таблицу с опциями :named_table и :public, тогда вызывающие функции вне процесса смогут получить к ней доступ. Также для оптимизации доступа воспользуемся функциями read_concurrency и write_concurrency. Далее, поправим функцию log/1, чтобы она записывала количество запросов напрямую в :ets, а не через GenServer. Таким образом, запросы сами будут отслеживать, не превышен ли на них лимит. Кроме этого будем использовать свойство ETS update_counter/4, что позволит нам обновлять счетчик быстро и атомарно. После проверки предельного значения в вызывающую функцию будет передано то же число, что и раньше. И последнее, в обратном вызове :sweep для очистки таблицы будем использовать :ets.delete_all_objects/1.

Ну, что, попробуем?

iex> RateLimiter.start_link
{:ok, #PID<0.124.0>}
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
{:error, :rate_limited}
iex> :ets.tab2list(:rate_limiter_requests)
[{"user1", 7}]
iex> RateLimiter.log("user2")
:ok
iex> :ets.tab2list(:rate_limiter_requests)
[{"user2", 1}, {"user1", 7}]

14:27:19.082 [debug] Sweeping requests

iex> :ets.tab2list(:rate_limiter_requests)
[]
iex> RateLimiter.log("user1")
:ok

И снова всё работает. Для отслеживания данных в таблице можно использовать :ets.tab2list/1. Можно видеть, что механизмы подсчета запросов пользователей и очистки таблицы работают верно.

Вот, пожалуй, и всё. Публичный интерфейс остался нетронутым, а производительность важнейшей функции в приложении значительно возросла. Здорово, правда?

TERRA INCOGNITA

В данной статье мы затронули лишь часть необозримых возможностей ETS. Но прежде чем эти возможности затуманят ваш разум и вы решите перенести весь код с последовательным доступом к данным из GenServer и Agent в ETS, хорошенько подумайте о том, какие действия в ваших приложениях атомарны, а какие требуют последовательной обработки. В погоне за производительностью, разрешив конкурентные чтение и запись, можно с лёгкостью заполучить состояние гонки.

Одна из выигрышных особенностей процессной модели Elixir — это последовательная обработка сообщений, позволяющая избежать состояния гонки путём упорядочивания доступа к состоянию, требующему атомарных операций. В случае с нашим примером, каждый пользователь производит запись в ETS с помощью атомарной операции update_counter, поэтому конкурентная запись не вызывает проблем. Для определения возможности перехода от последовательного доступа к ETS пользуйтесь следующими правилами:

  • Операции должны быть атомарными. Если клиенты осуществляют чтение данных из ETS в одной операции, то, записывая напрямую в ETS, вы получите состояние гонки, и выходом из этой ситуации будет последовательный доступ к данным на сервере.

  • Если операции не атомарные, сделайте так, чтобы разные процессы производили запись в разные ключевые поля. Например, реестр Elixir в качестве ключей использует PID и позволяет изменять данные только текущему процессу. Таким образом, в системе гарантированно не возникнет состояния гонки, т. к. каждый процесс будет обрабатывать разные строки ETS-таблицы.

  • Если первые два подхода неприменимы, стоит рассматривать последовательное выполнение операций. Конкурентное считывание и последовательная запись — основная модель работы с ETS.

Если вам интересна тема применения ETS для свободного от зависимостей хранящегося в памяти кэша, обратите внимание на библиотеку ConCache.

© 2020 / Россия Любые мысли и вопросы пишите на elixir@wunsh.ru.