Использование Debezium коннектора в Kafka Connect
Наша команда разрабатывает систему мониторинга сетевых интерфейсов, решение чем-то схожее с openSOC от CISCO. Данные от сенсоров в сети поступают в топик kafka. Дальнейшая обработка потока данных осуществляется на Spring Kafka + Kafka Stream. В простейшем случае система должна реагировать на превышение определенных уставок по каналам. Значения настроек для мониторинга исторически сохранялись в базу Postgres и для обработки настроек в потоковом приложении их необходимо как-то получить. Тут есть несколько вариантов:
- Использовать событийную модель.
- Использовать Shared Database и запрашивать периодически данные с кэшированием в приложении.
Выбрали первый вариант т.к. предполагал меньшую нагрузку на базу, изменение параметров для мониторинга изменяются пользователем и весьма редко.
Есть несколько подходов для реализации событий:
- Писать свои продюсеры в монолите. Надо найти все места в коде, где происходит изменения в базе. В этих местах добавлять также отправку событий в шину. Если у вас хорошая архитектура монолита, с выделенным слоем репозитория, то сделать это — лишь вопрос времени. Но Legacy не будет Legacy, если там всё хорошо с архитектурой. Так что этот вариант тоже очень сложен и трудозатратен.
- Использовать готовые решения для интеграции базы данных и Kafka. Можно использовать фреймворк Kafka Connect.
Мы попробовали оба варианта. Но найти все места в коде оказалось не так просто. И каскадное изменение данных в базе Postgres оставило позади это решение. Далее пробовали использовать kafka-connect и на нем остановились.

JdbcSourceConnector. Используется JDBC драйвер базы данный. Коннектор делает периодические запросы в базу, и мы не стали его использовать из-за принципа работы т.к. добавляет нагрузку на базу, а изменений не так много.
DebeziumPostgresConnector. Коннектор делает классную вещь: подключается к кластеру баз данных как обычная реплика и умеет читать бинлог. Таким образом, мы не создаём дополнительную нагрузку на базу.
На последнем варианте мы и остановились.
Deployment
Устанавливаем плагин для Postgresql. Использовать будем wal2json
sudo apt-get install postgresql-12-wal2json
Добавляем конфигурации в postgresql.conf
# MODULES shared_preload_libraries = 'wal2json' # REPLICATION wal_level = logical max_wal_senders = 1 max_replication_slots = 1
Добляем Postgresql connectors
mkdir /opt/connectors cd /opt/connectors wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.7.2.Final/debezium-connector-postgres-1.7.2.Final-plugin.tar.gz tar -xvzf debezium-connector-postgres-1.7.2.Final-plugin.tar.gz
Добавляем в конфигурацию коннектора
nano /opt/kafka/config/connect-distributed.properties
Добавляем путь к debezium плагину
plugin.path=/opt/connectors
Запускаем connector
nohup ./bin/connect-distributed.sh ./config/connect-distributed.properties &
Проверяем подключенный плагин. Должны увидеть список подключенных плагинов.
dev@dev:/opt/kafka$ curl -s localhost:8085/connector-plugins | jq [ { "class": "io.debezium.connector.postgresql.PostgresConnector", "type": "source", "version": "1.7.2.Final" }, { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "2.7.0" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "2.7.0" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "1" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "1" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "1" } ]
Добавляем пользователя postgresql
Создаем пользователя, даем разрешения на репликацию схеме данных.
CREATE USER debeziumreplica WITH password 'debeziumreplica' REPLICATION LOGIN;
GRANT ALL ON database debeziumdb TO debeziumreplica;
GRANT USAGE ON SCHEMA debezium TO debeziumreplica;
GRANT SELECT ON ALL TABLES IN SCHEMA debezium to debeziumreplica;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA debezium to debeziumreplica;
Создаем коннектор
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/entity/config \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "wal2json",
"key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "debeziumreplica",
"database.password": "debeziumreplica",
"database.dbname" : "debeziumdb",
"database.server.name" : "debezium-replic",
"decimal.handling.mode" : "string",
"transforms": "ExtractId",
"transforms.ExtractId.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractId.field" : "id",
"transforms.ExtractId.predicate" : "IsID",
"predicates" : "IsID",
"predicates.IsID.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsID.pattern" : ".*(debezium\\.threshold)",
"publication.autocreate.mode" : "filtered",
"snapshot.mode" : "always",
"table.include.list": "debezium.threshold"
}'
Проверяем созданный connector
curl -s -XGET "http://localhost:8083/connectors/" | jq '.'
Смотрим в какой топик событий
curl -s -XGET "http://localhost:8083/connectors/entity/topics" | jq '.'
Можем удалить connector
curl -s -XDELETE "http://localhost:8083/connectors/simple-object-text"