Por que usar Debezium e Kafka Connect?
Debezium e Kafka Connect são plataformas de código aberto que fornecem uma solução poderosa para streaming de alterações de dados em tempo real entre sistemas. Essa interação em tempo real permite que você mantenha seus dados sincronizados e acessíveis para vários casos de uso, como análises em tempo real, armazenamento de dados e integrações de pipeline de dados.
Tecnologia usada
Antes de entrarmos nos detalhes da configuração do Debezium e do Kafka Connect para sincronizar dados do MySQL com o BigQuery, é importante entender as tecnologias que você usará e como elas estão conectadas.
Alterar captura de dados
A captura de dados de alterações (CDC) é uma técnica para capturar e registrar todas as alterações feitas em um banco de dados ao longo do tempo. Isso permite a replicação de dados em tempo real, facilitando a sincronização de vários sistemas.
O CDC faz isso detectando alterações em nível de linha nas tabelas de origem do banco de dados, que são caracterizadas como eventos “Insert”, “Update” e “Delete”. O CDC notifica então outros sistemas ou serviços que dependem dos mesmos dados.
Apache Kafka
Apache Kafka é uma plataforma de streaming distribuída usada para construir pipelines de dados em tempo real e aplicativos de streaming. Ele permite o armazenamento e processamento de fluxos de registros de forma tolerante a falhas.
Conexão Kafka
Kafka Connect é uma estrutura para conectar o Kafka a sistemas externos, como bancos de dados, armazenamentos de valores-chave, índices de pesquisa e sistemas de arquivos, usando os chamados conectores.
Os conectores Kafka são componentes prontos para uso que podem ajudá-lo a importar dados de sistemas externos para tópicos Kafka e exportar dados de tópicos Kafka para sistemas externos. Você pode usar implementações de conectores existentes para fontes de dados e sincronizações comuns ou implementar nossos próprios conectores.
Debézio
Debezium é uma plataforma de código aberto que permite transmitir facilmente alterações do seu banco de dados MySQL para outros sistemas usando CDC. Ele funciona lendo o log binário do MySQL para capturar alterações de dados de maneira transacional, para que você possa ter certeza de que está sempre trabalhando com os dados mais atualizados.
Ao usar o Debezium, você pode capturar as alterações feitas no banco de dados MySQL e transmiti-las para o Kafka. Os dados sobre as alterações podem então ser consumidos pelo Kafka Connect para carregá-los no BigQuery.
Configuração do BigQuery
-
- Criando um projeto e um conjunto de dados do BigQuery:
-
- No Console do Google Cloud, navegue até a página do BigQuery e crie um novo projeto ( Criando e gerenciando projetos | Documentação do Resource Manager | Google Cloud ). Vamos chamá-lo de “mysql-bigquery” para este tutorial.
- Dentro do projeto, crie um novo conjunto de dados ( Criando conjuntos de dados | BigQuery | Google Cloud ). Vamos chamá-lo de “debezium” para este tutorial.
- Observe que o Debezium criará automaticamente tabelas no conjunto de dados que correspondam à estrutura das tabelas MySQL que estão sendo monitoradas.
-
- Criando uma conta de serviço do GCP com função de editor do BigQuery:
-
- No Console do Google Cloud, navegue até a página IAM e administrador e crie uma nova conta de serviço ( Criando e gerenciando contas de serviço | Documentação do IAM | Google Cloud ).
- Dê um nome e uma descrição à conta de serviço e selecione a função “BigQuery Data Editor”.
-
- Gerando e baixando uma chave para a conta de serviço:
-
- No Console do Google Cloud, navegue até a página IAM e Admin, encontre a conta de serviço, clique nos três pontos à direita e selecione “criar chave” (Criar e gerenciar chaves de conta de serviço | Documentação IAM | Google Cloud ) .
- Selecione JSON como o tipo de chave e baixe o arquivo de chave.
- Armazene o arquivo de chave com segurança e use-o para autenticar o conector no Kafka Connect ao acessar o conjunto de dados do BigQuery.
-
- Criando um projeto e um conjunto de dados do BigQuery:
Tutorial
Para começar a sincronizar dados do MySQL com o BigQuery, precisaremos dos seguintes componentes:
- Zelador do Zoológico Apache.
- Apache Kafka.
- Serviço Kafka Connect/Debezium com conector MySQL e plug-ins de conector Google BigQuery .
- Banco de dados MySQL.
Iniciar os serviços necessários
-
- Vamos começar criando um novo diretório. Abra o Terminal e execute:
$ mkdir mysql-to-bigquery $ cd mysql-to-bigquery
- Crie um diretório de plug-ins
$ mkdir plugins
- Baixe o plugin Debezium mysql:
$ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.1.Final/debezium-connector-mysql-2.1.1.Final-plugin.tar.gz -O mysql-plugin.tar.gz $ tar -xzf mysql-plugin.tar.gz -C plugins
- Baixe o plugin BigQuery e coloque o conteúdo em seu diretório de plugins (neste tutorial estamos usando a versão v2.4.3). Agora seu diretório de plugins deve ficar assim:
$ ls plugins debezium-connector-mysql wepay-kafka-connect-bigquery-2.4.3
- Crie um novo arquivo (“docker-compose.yml”) com estas configurações:
version: '2' services: zookeeper: container_name: zookeeper image: quay.io/debezium/zookeeper:2.1 ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: container_name: kafka image: quay.io/debezium/kafka:2.1 ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 mysql: container_name: mysql image: quay.io/debezium/example-mysql:2.1 ports: - 3306:3306 environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw connect: container_name: connect image: quay.io/debezium/connect-base:2.1 volumes: - ./plugins:/kafka/connect ports: - 8083:8083 links: - kafka - mysql environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - Vamos iniciar os serviços:
$ docker-compose up
- Você deverá ver uma saída semelhante à seguinte:
... 2023-01-16 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser] ... 2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder] 2023-01-16 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
- Verifique se o Debezium está rodando com a API Kafka Connect. Uma matriz vazia em resposta mostra que não há conectores registrados atualmente no Kafka Connect.
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors []
- Também temos o MySQL rodando com um exemplo de inventário de banco de dados. Você pode verificar quais tabelas existem executando:
$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SHOW TABLES;" +---------------------+ | Tables_in_inventory | +---------------------+ | addresses | | customers | | geom | | orders | | products | | products_on_hand | +---------------------+
- Vamos começar criando um novo diretório. Abra o Terminal e execute:
Vamos verificar o que está dentro da tabela de clientes:
$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "SELECT * FROM customers;" +------+------------+-----------+-----------------------+ | id | first_name | last_name | email | +------+------------+-----------+-----------------------+ | 1001 | Sally | Thomas | sally.thomas@acme.com | | 1002 | George | Bailey | gbailey@foobar.com | | 1003 | Edward | Walker | ed@walker.com | | 1004 | Anne | Kretchmar | annek@noanswer.org | +------+------------+-----------+-----------------------+
Configure o Debezium para iniciar a sincronização do MySQL com o Kafka
Agora vamos configurar o Debezium para começar a sincronizar o banco de dados de inventário com o Kafka.
-
- Crie um novo arquivo (“register-mysql.json”) com essas configurações (Você pode encontrar informações sobre essas propriedades de configuração na documentação do Debezium ):
{ "name": "inventory-connector-mysql", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "root", "database.password": "debezium", "database.server.id": "184054", "topic.prefix": "debezium", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } } - Registre um conector MySQL:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
- Verifique se “conector de inventário” está incluído na lista de conectores:
$ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector-mysql"]
- Agora você pode ver o conteúdo do banco de dados no Kafka. Para ver os tópicos, execute:
$ docker exec -it kafka bash bin/kafka-topics.sh --list --bootstrap-server kafka:9092 ... debezium.inventory.addresses debezium.inventory.customers ...
- Crie um novo arquivo (“register-mysql.json”) com essas configurações (Você pode encontrar informações sobre essas propriedades de configuração na documentação do Debezium ):
Vamos verificar debezium.inventory.addresses:
$ docker exec -it kafka bash bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic debezium.inventory.addresses --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"debezium.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium.inventory.addresses.Envelope","version":1},"payload":{"before":null,"after":{"id":10,"customer_id":1001,"street":"3183 Moore Avenue","city":"Euless","state":"Texas","zip":"76036","type":"SHIPPING"},"source":{"version":"2.1.1.Final","connector":"mysql","name":"debezium","ts_ms":1673446748000,"snapshot":"first","db":"inventory","sequence":null,"table":"addresses","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1673446748425,"transaction":null}}
...
Para obter mais informações sobre eventos do Debezium, consulte esta documentação do Debezium .
Configure o Debezium para começar a sincronizar dados com o Google BigQuery
Antes de começar a configurar o conector do BigQuery, mova o arquivo de chave da conta de serviço do Google BigQuery (detalhes na seção anterior) para seu diretório de trabalho e nomeie-o como “bigquery-keyfile.json”.
-
- Assim que tiver o arquivo-chave, copie-o para o contêiner do Connect:
$ docker cp bigquery-keyfile.json connect:/bigquery-keyfile.json
- Agora crie um arquivo register-bigquery.json com essas configurações (você pode encontrar informações sobre essas propriedades de configuração na documentação oficial ):
{ "name": "inventory-connector-bigquery", "config": { "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", "tasks.max": "1", "consumer.auto.offset.reset": "earliest", "topics.regex": "debezium.inventory.*", "sanitizeTopics": "true", "autoCreateTables": "true", "keyfile": "/bigquery-keyfile.json", "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever", "project": "mysql-bigquery", "defaultDataset": "debezium", "allBQFieldsNullable": true, "allowNewBigQueryFields": true, "transforms": "regexTopicRename,extractAfterData", "transforms.regexTopicRename.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.regexTopicRename.regex": "debezium.inventory.(.*)", "transforms.regexTopicRename.replacement": "$1", "transforms.extractAfterData.type": "io.debezium.transforms.ExtractNewRecordState" } } - Para registrar o conector do BigQuery, execute:
$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-bigquery.json
- Verifique se “conector de inventário” está incluído na lista de conectores:
$ curl -H "Accept:application/json" localhost:8083/connectors/ ["inventory-connector-mysql","inventory-connector-bigquery"]
- Assim que tiver o arquivo-chave, copie-o para o contêiner do Connect:
No seu conjunto de dados do BigQuery, agora você poderá ver tabelas que correspondem às do MySQL.
Agora, selecione os dados da sua tabela de clientes. Os e-mails utilizados são apenas para fins de exemplo e não correspondem a indivíduos reais.
Você pode criar uma nova entrada na tabela de clientes do MySQL:
$ docker exec -it mysql mysql -uroot -pdebezium -D inventory -e "INSERT INTO customers VALUES(1005, "Tom", "Addams", "tom.addams@mailer.net");"
Você verá que uma nova entrada foi sincronizada automaticamente com o BigQuery.
Conclusão
Agora você deve ter uma compreensão clara dos benefícios de sincronizar dados do MySQL com o BigQuery usando Debezium e Kafka Connect. Com o tutorial detalhado encontrado neste artigo, você mesmo poderá instalar e configurar o Debezium e o Kafka Connect.
Como lembrete, é importante testar e monitorizar o pipeline para garantir que os dados estão a ser sincronizados conforme esperado e para resolver quaisquer problemas que possam surgir.
Para obter mais informações sobre Debezium e Kafka Connect, você pode consultar os seguintes recursos:








