Оптимизация Elixir- и Phoenix-приложений с помощью ETS
Многие Elixir-разработчики наверняка слышали о «ETS» или встречали в коде вызовы Erlang-модуля :ets
, но, скорее всего, большинство из них всё ещё никогда не применяли этот подход на практике. Настало время это изменить.
ETS (Erlang Term Storage) — один из инструментов Erlang, который всё это время прятался на видном месте. Такие проекты, как Phoenix.PubSub
, Phoenix.Presence
и Registry
, наряду с многочисленными Elixir- и Erlang-модулями пользуются преимуществами ETS. В двух словах: ETS — это хранилище данных с быстрым доступом для объектов Elixir и Erlang, главной особенностью которого является возможность получения состояния за пределами процесса и без передачи сообщений. Прежде чем перейти к вопросу об уместности использования ETS, пробежимся по основам основ, запустив iex
.
Сначала создадим ETS-таблицу с помощью команды :ets.new/2
:
iex> tab = :ets.new(:my_table, [:set])
8211
Ничего сложного. Мы создали таблицу типа :set
, которая позволит связывать уникальные ключи со значениями подобно обычному хранилищу «ключ-значение». Добавим в неё парочку строк:
iex> :ets.insert(tab, {:key1, "value1"})
true
iex> :ets.insert(tab, {:key2, "value1"})
true
Теперь, имея несколько значений, осуществим поиск: elixir iex> :ets.lookup(tab, :key1) [key1: "value1"]
Заметьте, что повторного связывания таблицы с новым значением после вставки элементов не происходит. ETS-таблицами управляет виртуальная машина, и их существование зависит от процесса, в котором они были созданы. На практике это можно увидеть, уничтожив текущий процесс iex
:
iex> Process.exit(self(), :kill)
** (EXIT from #PID<0.88.0>) killed
Interactive Elixir (1.4.4) - press Ctrl+C to exit (type h() ENTER for help)
iex> :ets.insert(8211, {:key1, "value1"})
** (ArgumentError) argument error
(stdlib) :ets.insert(12307, {:key1, "value1"})
Как только это произойдет, прежние связи будут утеряны, но можно будет передать функции :ets.new/2
возвращаемый вызовом идентификатор таблицы. Можно заметить, что когда мы запросили доступ к таблице, после того как её процесс-создатель был остановлен, возникла ошибка ArgumentError
. Такая автоматическая чистка таблиц и данных после аварийного завершения процесса-создателя — одна из главных особенностей ETS. Незачем беспокоиться об утечках памяти, если процесс завершается после создания таблиц и ввода данных. Здесь вы столкнетесь с мистическим API :ets
и его зачастую неинформативными ошибками (в нашем случае ArgumentError
), не содержащими каких-либо намеков на то, что могло их породить. Но чем чаще вы будете работать с модулем :ets
, тем лучше познакомитесь с этими досадными ошибками и изучите документацию :ets
вдоль и поперёк.
Функции, которые мы использовали, — лишь малая часть того, что предлагает ETS. Помните, что основное его преимущество в быстром чтении и записи в хранилище «ключ-значение» и возможности сопоставления большинства содержащихся в таблице объектов, за исключением объектов типа map. В рамках этой статьи мы рассмотрим только простейшие пары «ключ-значение» и несколько основных операций поиска, поэтому для получения более подробной информации о возможностях ETS лучше обратиться к документации.
Оптимизация доступа к GenServer с помощью ETS-таблицы
Оптимизация кода приносит свои плоды, особенно, когда не нужно менять публичный интерфейс. Рассмотрим один из часто используемых способов применения ETS для оптимизации доступа к состоянию, обёрнутому в GenServer
.
Предположим, нужно разработать GenServer
с ограничением скорости — процесс в приложении, который будет подсчитывать количество запросов пользователя и закрывать доступ к данным, если пользователем превышено определённое количество запросов в минуту. Очевидно, что состояние подсчета запросов пользователей и процесс, подчищающий это состояние раз в минуту, нужно будет где-то хранить. Наброски простейшего кода с GenServer
выглядят примерно так:
defmodule RateLimiter do
use GenServer
require Logger
@max_per_minute 5
@sweep_after :timer.seconds(60)
## Client
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def log(uid) do
GenServer.call(__MODULE__, {:log, uid})
end
## Server
def init(_) do
schedule_sweep()
{:ok, %{requests: %{}}}
end
def handle_info(:sweep, state) do
Logger.debug("Sweeping requests")
schedule_sweep()
{:noreply, %{state | requests: %{}}}
end
def handle_call({:log, uid}, _from, state) do
case state.requests[uid] do
count when is_nil(count) or count < @max_per_minute ->
{:reply, :ok, put_in(state, [:requests, uid], (count || 0) + 1)}
count when count >= @max_per_minute ->
{:reply, {:error, :rate_limited}, state}
end
end
defp schedule_sweep do
Process.send_after(self(), :sweep, @sweep_after)
end
end
Сначала определим функцию start_link/0
, которая запустит GenServer
, используя модуль RateLimiter
в качестве модуля обратного вызова. Сервер назовём так же, как и модуль, чтобы впоследствии ссылаться на него при обращении к функции log/1
. Теперь определим функцию log/1
, осуществляющую синхронный вызов к GenServer
и производящую логирование запросов пользователя. На выходе ожидаем получить либо :ok
, сигнализирующее о том, что пользователь не превысил допустимое количество запросов и может продолжать свои действия, либо {:error, :rate_limited}
в противном случае.
Далее в init/1
вызываем функцию schedule_sweep/0
, которая раз в минуту посылает серверу сообщения для удаления всех данных запросов. После этого определяем условие handle_info/2
, чтобы перехватить событие :sweep
и сбросить состояние запроса. И напоследок определяем условие handle_call/3
, которое будет отслеживать состояние запроса пользователя и возвращать :ok
или {:error, :rate_limited}
вызывающей функции log/2
.
Попробуем выполнить это в iex
:
iex> RateLimiter.start_link()
{:ok, #PID<0.126.0>}
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
{:error, :rate_limited}
13:55:44.803 [debug] Sweeping requests
iex(9)> RateLimiter.log("user1")
:ok
Работает! Если количество запросов пользователя в минуту превысит 5, то сервер возвратит ожидаемое сообщение об ошибке. По прошествии некоторого времени вывод отладчика покажет сброс состояния, и счетчик количества запросов обнулится. Вроде неплохо, да? К сожалению, в приведённой выше реализации присутствуют серьёзные проблемы производительности. Давайте-ка разберёмся.
Поскольку нашей целью было ограничение количества запросов в единицу времени, все запросы пользователя должны проходить через заданный сервер. Сообщения обрабатываются последовательно, что сводит наше приложение в однопоточное, и этот единственный процесс превращается в узкое место программы.
ETS спешит на помощь
К счастью, Erlang-разработчики уже придумали решение этой проблемы за нас. Можно провести рефакторинг кода, чтобы сервер использовал общедоступную ETS-таблицу и клиенты могли вводить свои запросы напрямую в ETS, а процесс-создатель отвечал бы только за очистку таблицы. Это позволит производить чтение и запись конкурентно и избавит от необходимости упорядочивания вызовов на сервере. Что ж, реализуем всё вышесказанное:
defmodule RateLimiter do
use GenServer
require Logger
@max_per_minute 5
@sweep_after :timer.seconds(60)
@tab :rate_limiter_requests
## Client
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def log(uid) do
case :ets.update_counter(@tab, uid, {2, 1}, {uid, 0}) do
count when count > @max_per_minute -> {:error, :rate_limited}
_count -> :ok
end
end
## Server
def init(_) do
:ets.new(@tab, [:set, :named_table, :public, read_concurrency: true,
write_concurrency: true])
schedule_sweep()
{:ok, %{}}
end
def handle_info(:sweep, state) do
Logger.debug("Sweeping requests")
:ets.delete_all_objects(@tab)
schedule_sweep()
{:noreply, state}
end
defp schedule_sweep do
Process.send_after(self(), :sweep, @sweep_after)
end
end
Во-первых, сделаем так, чтобы функция init/1
создавала ETS-таблицу с опциями :named_table
и :public
, тогда вызывающие функции вне процесса смогут получить к ней доступ. Также для оптимизации доступа воспользуемся функциями read_concurrency
и write_concurrency
. Далее, поправим функцию log/1
, чтобы она записывала количество запросов напрямую в :ets
, а не через GenServer
. Таким образом, запросы сами будут отслеживать, не превышен ли на них лимит. Кроме этого будем использовать свойство ETS update_counter/4
, что позволит нам обновлять счетчик быстро и атомарно. После проверки предельного значения в вызывающую функцию будет передано то же число, что и раньше. И последнее, в обратном вызове :sweep
для очистки таблицы будем использовать :ets.delete_all_objects/1
.
Ну, что, попробуем?
iex> RateLimiter.start_link
{:ok, #PID<0.124.0>}
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
:ok
iex> RateLimiter.log("user1")
{:error, :rate_limited}
iex> :ets.tab2list(:rate_limiter_requests)
[{"user1", 7}]
iex> RateLimiter.log("user2")
:ok
iex> :ets.tab2list(:rate_limiter_requests)
[{"user2", 1}, {"user1", 7}]
14:27:19.082 [debug] Sweeping requests
iex> :ets.tab2list(:rate_limiter_requests)
[]
iex> RateLimiter.log("user1")
:ok
И снова всё работает. Для отслеживания данных в таблице можно использовать :ets.tab2list/1
. Можно видеть, что механизмы подсчета запросов пользователей и очистки таблицы работают верно.
Вот, пожалуй, и всё. Публичный интерфейс остался нетронутым, а производительность важнейшей функции в приложении значительно возросла. Здорово, правда?
TERRA INCOGNITA
В данной статье мы затронули лишь часть необозримых возможностей ETS. Но прежде чем эти возможности затуманят ваш разум и вы решите перенести весь код с последовательным доступом к данным из GenServer
и Agent
в ETS, хорошенько подумайте о том, какие действия в ваших приложениях атомарны, а какие требуют последовательной обработки. В погоне за производительностью, разрешив конкурентные чтение и запись, можно с лёгкостью заполучить состояние гонки.
Одна из выигрышных особенностей процессной модели Elixir — это последовательная обработка сообщений, позволяющая избежать состояния гонки путём упорядочивания доступа к состоянию, требующему атомарных операций. В случае с нашим примером, каждый пользователь производит запись в ETS с помощью атомарной операции update_counter
, поэтому конкурентная запись не вызывает проблем. Для определения возможности перехода от последовательного доступа к ETS пользуйтесь следующими правилами:
-
Операции должны быть атомарными. Если клиенты осуществляют чтение данных из ETS в одной операции, то, записывая напрямую в ETS, вы получите состояние гонки, и выходом из этой ситуации будет последовательный доступ к данным на сервере.
-
Если операции не атомарные, сделайте так, чтобы разные процессы производили запись в разные ключевые поля. Например, реестр Elixir в качестве ключей использует PID и позволяет изменять данные только текущему процессу. Таким образом, в системе гарантированно не возникнет состояния гонки, т. к. каждый процесс будет обрабатывать разные строки ETS-таблицы.
-
Если первые два подхода неприменимы, стоит рассматривать последовательное выполнение операций. Конкурентное считывание и последовательная запись — основная модель работы с ETS.
Если вам интересна тема применения ETS для свободного от зависимостей хранящегося в памяти кэша, обратите внимание на библиотеку ConCache.