Оптимизация обработки данных с помощью GenStage/Flow
Существует множество задач, которые можно решить с помощью GenStage/Flow.
Недавно мне довелось работать над одной из подобных задач: необходимо было осуществить выборку записей из базы данных PostgreSQL, загрузить связанные с этими записями файлы из Amazon S3, извлечь из файлов текст и провести его индексацию в ElasticSearch.
Задание можно представить в виде конвейера, состоящего из четырёх этапов:
-
SELECT – выбор записи из базы данных
-
DOWNLOAD – загрузка pdf-файла
-
EXTRACT – извлечение текста из файла
-
INDEX – индексация текста
Так как в данном случае речь идёт о сотнях тысяч записей, мне пришлось поразмыслить над тем, как сделать конвейер наиболее эффективным.
В результате мне удалось уместить реализацию эффективного алгоритма параллельной обработки данных всего в несколько строк кода.
Постановка задачи
Прежде чем предлагать какие-либо решения, необходимо как можно лучше разобраться в условии задачи.
Каждый этап будущего конвейера включает определённый набор действий и особенностей реализации:
-
SELECT: Необходимо выполнить SQL-запрос, который возвратит 100 000 записей. Лучше всего использовать курсоры PostgreSQL, а новая функция Repo.stream упростит этот процесс. Этап будет выполняться снова и снова, обеспечивая непрерывный поток записей.
-
DOWNLOAD: На данном этапе реализуется сетевой ввод-вывод данных, который достаточно легко распараллелить.
-
EXTRACT: Извлечение текста – задача CPU, которая тоже может выполняться параллельно на нескольких ядрах процессора.
-
INDEX: В ElasticSearch гораздо производительнее индексировать файлы не отдельно, а пакетами.
Проведя такой простой анализ, можно нарисовать полную картину того, над чем придётся работать. Кроме того, решение этой задачи в полной мере иллюстрирует работу GenStage, а особенно Flow.
Реализация
GenStage разделяет этапы конвейера на три типа:
-
Поставщик (Producer) – порождает события (этап SELECT)
-
Поставщик-потребитель (Producer-Consumer) – получает одни события и порождает другие (этапы DOWNLOAD и EXTRACT)
-
Потребитель (Consumer) – получает события, при этом не порождая новых (этап INDEX)
Каждый этап можно представить в виде простой функции:
Весь конвейер выглядит следующим образом:
-
Один этап SELECT
-
Несколько этапов DOWNLOAD
-
Несколько этапов EXTRACT
-
Этап ACCUMULATE, объединяющий файлы для последующей индексации
-
Несколько этапов INDEX
Воплотить всё это в жизнь можно с помощью следующих 12 строк кода:
И это всё?
К сожалению, нет. Придётся ещё немного попотеть.
Оптимизация
В примере выше переменные max_demand и stages имели произвольные значения. Несмотря на то, что приведённый код работает безошибочно, его производительность далека от идеальной. Для наглядности приведу график, полученный после обработки 700 записей.
Кривые на графике показывают количество обработанных на определённом этапе элементов в единицу времени. Можно видеть, что на этапе SELECT действия по выборке 700 записей из базы данных и помещению их в память осуществились мгновенно. Извлечение текста на этапах EXTRACT начало выполняться с задержкой в 3 секунды (когда уже было загружено почти 200 файлов). Индексация также началась очень поздно (в очереди уже находились 300 текстовых элементов).
В конвейерах такого рода производительность отдельных ступеней не так важна, как производительность конвейера в целом. Обрабатывая большие объёмы данных, обычно стремятся к тому, чтобы вся система была стабильна по части использования ресурсов.
Поэкспериментировав с различными значениями (и построив несколько графиков для разных случаев), я остановился на следующем:
Результаты получились намного лучше:
С такими подстроенными параметрами все этапы конвейера обрабатывают данные с минимальной буферизацией и низким потреблением памяти. Сам конвейер также отработал немного быстрее, но так как имеем дело с внешними сервисами, этот случай не подойдёт в качестве теста скорости.
Заключение
GenStage и его высокоуровневая "оболочка" Flow предоставляют необходимые средства для реализации эффективной параллельной обработки данных. Какой бы волшебной ни казалась эта возможность, забывать об оценке производительности не стоит. И всё же я уверен, что при достижении "правильного" показателя производительности система будет иметь большой потенциал по масштабируемости, обеспечивая низкий расход ресурсов при работе с большими объёмами данных.