RKDEEP

Использование Debezium коннектора в Kafka Connect

Kirill Rybkin2022-01-14

Наша команда разрабатывает систему мониторинга сетевых интерфейсов, решение чем-то схожее с openSOC от CISCO. Данные от сенсоров в сети поступают в топик kafka. Дальнейшая обработка потока данных осуществляется на Spring Kafka + Kafka Stream. В простейшем случае система должна реагировать на превышение определенных уставок по каналам. Значения настроек для мониторинга исторически сохранялись в базу Postgres и для обработки настроек в потоковом приложении их необходимо как-то получить. Тут есть несколько вариантов:

  1. Использовать событийную модель.
  2. Использовать Shared Database и запрашивать периодически данные с кэшированием в приложении.

Выбрали первый вариант т.к. предполагал меньшую нагрузку на базу, изменение параметров для мониторинга изменяются пользователем и весьма редко.

Есть несколько подходов для реализации событий:

  1. Писать свои продюсеры в монолите. Надо найти все места в коде, где происходит изменения в базе. В этих местах добавлять также отправку событий в шину. Если у вас хорошая архитектура монолита, с выделенным слоем репозитория, то сделать это — лишь вопрос времени. Но Legacy не будет Legacy, если там всё хорошо с архитектурой. Так что этот вариант тоже очень сложен и трудозатратен.
  2. Использовать готовые решения для интеграции базы данных и Kafka. Можно использовать фреймворк Kafka Connect.

Мы попробовали оба варианта. Но найти все места в коде оказалось не так просто. И каскадное изменение данных в базе Postgres оставило позади это решение. Далее пробовали использовать kafka-connect и на нем остановились.

Debezium Kafka Connect
Здесь так же есть несколько вариантов коннекторов.
  • JdbcSourceConnector. Используется JDBC драйвер базы данный. Коннектор делает периодические запросы в базу, и мы не стали его использовать из-за принципа работы т.к. добавляет нагрузку на базу, а изменений не так много.

  • DebeziumPostgresConnector. Коннектор делает классную вещь: подключается к кластеру баз данных как обычная реплика и умеет читать бинлог. Таким образом, мы не создаём дополнительную нагрузку на базу.

На последнем варианте мы и остановились.

Deployment

  1. Устанавливаем плагин для Postgresql. Использовать будем wal2json

    sudo apt-get install postgresql-12-wal2json
  2. Добавляем конфигурации в postgresql.conf

    # MODULES
    shared_preload_libraries = 'wal2json'
    # REPLICATION
    wal_level = logical             
    max_wal_senders = 1             
    max_replication_slots = 1       
  3. Добляем Postgresql connectors

    mkdir /opt/connectors
    cd /opt/connectors
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.7.2.Final/debezium-connector-postgres-1.7.2.Final-plugin.tar.gz
    tar -xvzf debezium-connector-postgres-1.7.2.Final-plugin.tar.gz
  4. Добавляем в конфигурацию коннектора

    nano /opt/kafka/config/connect-distributed.properties
  5. Добавляем путь к debezium плагину

    plugin.path=/opt/connectors
  6. Запускаем connector

    nohup ./bin/connect-distributed.sh ./config/connect-distributed.properties &
  7. Проверяем подключенный плагин. Должны увидеть список подключенных плагинов.

    dev@dev:/opt/kafka$ curl -s localhost:8085/connector-plugins | jq
    [
     {
       "class": "io.debezium.connector.postgresql.PostgresConnector",
       "type": "source",
       "version": "1.7.2.Final"
     },
     {
       "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
       "type": "sink",
       "version": "2.7.0"
     },
     {
       "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
       "type": "source",
       "version": "2.7.0"
     },
     {
       "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
       "type": "source",
       "version": "1"
     },
     {
       "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
       "type": "source",
       "version": "1"
     },
     {
       "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
       "type": "source",
       "version": "1"
     }
    ]
    

Добавляем пользователя postgresql

Создаем пользователя, даем разрешения на репликацию схеме данных.

CREATE USER debeziumreplica WITH password 'debeziumreplica' REPLICATION LOGIN;
GRANT ALL ON database debeziumdb TO debeziumreplica;
GRANT USAGE ON SCHEMA debezium TO debeziumreplica;
GRANT SELECT ON ALL TABLES IN SCHEMA debezium to debeziumreplica;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA debezium to debeziumreplica;

Создаем коннектор

 curl -i -X PUT -H "Content-Type:application/json" \
    http://localhost:8083/connectors/entity/config \
    -d '{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "wal2json",
    "key.converter": "org.apache.kafka.connect.converters.IntegerConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "database.hostname": "localhost", 
    "database.port": "5432", 
    "database.user": "debeziumreplica", 
    "database.password": "debeziumreplica", 
    "database.dbname" : "debeziumdb",
    "database.server.name" : "debezium-replic",
    "decimal.handling.mode" : "string",
    "transforms": "ExtractId",
    "transforms.ExtractId.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.ExtractId.field" : "id",
    "transforms.ExtractId.predicate" : "IsID",
    "predicates" : "IsID",
    "predicates.IsID.type" : "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsID.pattern" : ".*(debezium\\.threshold)",
    "publication.autocreate.mode" : "filtered",
    "snapshot.mode" : "always",
    "table.include.list": "debezium.threshold" 
}'

Проверяем созданный connector

  curl -s -XGET "http://localhost:8083/connectors/" | jq '.'

Смотрим в какой топик событий

  curl -s -XGET "http://localhost:8083/connectors/entity/topics" | jq '.'

Можем удалить connector

 curl -s -XDELETE "http://localhost:8083/connectors/simple-object-text"

Материалы

  1. Materializing Aggregate Views With Hibernate and Debezium