Потоковая обработка данных с помощью Ecto

Примечание: если вы не знакомы с потоками Elixir, советуем обратиться к документации, иначе содержимое данной статьи может вызвать затруднения.

Любой разработчик неизбежно сталкивается с необходимостью проведения определённых операций над набором данных. Если данных не так много, то часто можно отделаться разовой выборкой записей и выполнить с ними необходимые действия. Но при больших объёмах данных неизбежно возникнут некоторые типовые проблемы. Процесс этот медленный, создаёт блокирующее соединение с базой данных, и большинство библиотек базы будут загружать данные в память, что может замедлить или даже уничтожить ваш сервер.

К счастью, как раз для такой ситуации в Ecto имеется отличный инструмент Repo.stream/2. Вместо разовой загрузки данных он выполняет её в итеративном цикле, обрабатывая каждую запись отдельно.

Рассмотрим пример, прекрасно иллюстрирующий гибкость потоков Elixir, в котором реализуем экспорт записей из базы данных в CSV-файлы.

Для начала взгляните на код:

defmodule UserExporter do
  @columns ~w( id email inserted_at updated_at )a

  def export(query) do
    path = "/tmp/users.csv"

    Repo.transaction fn ->
      query
      |> Repo.stream
      |> Stream.map(&parse_line/1)
      |> CSV.encode
      |> Enum.into(File.stream!(path, [:write, :utf8]))
    end
  end

  defp parse_line(user) do
    # order our data to match our column order
    Enum.map(@columns, &Map.get(user, &1))
  end
end

Итак, имеется модуль с одной общедоступной функцией export/1, получающей Ecto-запрос на входе и возвращающей файловый поток на выходе.

В самом начале указываем атрибут модуля @columns, чтобы определить, какие именно данные будут записаны в файл. Позднее к этому атрибуту необходимо будет обратиться ещё раз для повторного упорядочивания данных в CSV-файле.

Дальше за дело берётся модуль экспорта. Первое, что он делает, – открывает транзакцию базы данных. Это обязательное действие при использовании Repo.stream/2 в MySQL или Postgres, но ещё это хорошее решение для работы с итерационными операциями над большим массивом данных. Все действия с Repo.stream/2 необходимо выполнять в рамках транзакции.

Открыв транзакцию, передаём запрос к Repo.stream/2, после чего произойдёт создание начального потока. Обратите внимание, что это лишь создаёт поток, и, поскольку потоки исполняются лениво, никаких запросов к базе данных ещё не отправляется.

Следующим шагом передаём поток функции Stream.map, которая осуществит выборку и преобразует данные каждой структуры пользователя в список в соответствии с порядком, указанном ранее в @columns.

Воспользовавшись библиотекой CSV, передадим поток функции CSV.encode/1, и она проделает всю основную работу. CSV.encode/1 оперирует потоками как на входе, так и на выходе, следовательно, она идеально вписывается в наш пайплайн.

Наконец, запускаем поток вызовом Enum.into, после чего начинают выполняться запросы к базе данных. Repo.stream/2 по умолчанию обрабатывает 500 записей разом, каждая из них анализируется и записывается в файл по адресу tmp/users.csv. И последнее, функция возвращает файловый поток с указателем на только что созданный файл. Этот новый поток можно использовать для выполнения множества других операций с файлами или потоками, например, загрузка файла на Amazon S3 и т. п.

В качестве теста можно вывести содержимое CSV-файла в консоль iex:

iex> {:ok, file} = UserExporter.export(from u in User)
{:ok,
 %File.Stream{line_or_bytes: :line,
  modes: [:write, {:encoding, :utf8}, :binary], path: "/tmp/users.csv",
  raw: false}}
  
iex> File.read!(file.path)
#=> "1,joe@example.com,2017-04-27 21:57:42.972524,2017-05-20 18:48:16.235083\r\n2,jane@example.com,2017-04-27 18:36:32.053556,2017-04-27 18:36:32.065434\r\n3,jill@example.com,2017-04-27 18:37:43.503567,2017-04-27 18:37:43.503575\r\n"

Здорово, правда? С помощью потоков был создан простой и последовательный пайплайн для эффективной выборки данных, их преобразования и записи в файл по 500 строк за раз. Repo.stream/2 – очень полезный инструмент, предназначенный для работы с большими объёмами данных. Рекомендую опробовать его, если вы ещё этого не сделали.

© 2020 / Россия Любые мысли и вопросы пишите на elixir@wunsh.ru.