【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka
•
大数据
【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka
- 1)环境准备
- 2)准备相关 jar 包
- 3)实现场景
- 4)准备工作
-
- 4.1.Mysql
- 4.2.Kafka
- 5)Flink-Sql
- 6)验证
1)环境准备
Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)
2)准备相关 jar 包
- flink-connector-jdbc_2.11-1.12.0.jar
- mysql-connector-java-5.1.49.jar
下载地址:JDBC-Sql-Connector


- flink-format-changelog-json-1.2.0.jar
- flink-sql-connector-mysql-cdc-1.2.0.jar
- flink-sql-connector-postgres-cdc-1.2.0.jar
下载地址:ververica/flink-cdc-connectors

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)
- flink-sql-connector-kafka_2.11-1.12.0.jar
下载地址:flink-sql-connector-kafka
- 将下载好的包放在 Flink 的 lib 目录下
3)实现场景
1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

2、如果是本地环境的 Mysql 按照下面方式开启 binlog
在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加
log_bin = mysql-bin binlog_format = ROW expire_logs_days = 30
3、重启 Mysql 服务
4)准备工作
4.1.Mysql
1、在 Mysql 中创建 source 表:
CREATE TABLE `mysql2kafka_cdc_test` ( `id` int(11) NOT NULL AUTO_INCREMENT, `eventId` varchar(255) DEFAULT NULL, `eventStDt` varchar(255) DEFAULT NULL, `bak6` varchar(255) DEFAULT NULL, `bak7` varchar(255) DEFAULT NULL, `businessId` varchar(255) DEFAULT NULL, `phone` varchar(255) DEFAULT NULL, `bak1` varchar(255) DEFAULT NULL, `bak2` varchar(255) DEFAULT NULL, `bak13` varchar(255) DEFAULT NULL, `bak14` varchar(255) DEFAULT NULL, `bak11` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8
2、写入数据的语句准备就绪
INSERT INTO mysql2kafka_cdc_test( eventId, eventStDt, bak6, bak7, businessId, phone, bak1, bak2, bak13, bak14, bak11 ) VALUES( '111', '2022-11-3023:37:49', '测试', 'https://test?user', '1727980911111111111111111111', '12345678910', '1234', '2021-12-0100:00:00', '1727980911111111111111111111', 'APP', 'TEST1' );
4.2.Kafka
创建 Topic
5)Flink-Sql
- source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;
CREATE TABLE source_mysql_test(
id INT,
eventId STRING,
eventStDt STRING,
bak6 STRING,
bak7 STRING,
businessId STRING,
phone STRING,
bak1 STRING,
bak2 STRING,
bak13 STRING,
bak14 STRING,
bak11 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH(
'connector' = 'mysql-cdc',
'hostname' = '${ip}',
'port' = '${port}',
'database-name' = 'test',
'table-name' = 'mysql2kafka_cdc_test',
'username' = '${username}',
'password' = '${password}',
'scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis' = '1692115200000'
);
- sink
CREATE TABLE sink_kafka_test (
id INT,
eventId STRING,
eventStDt STRING,
bak6 STRING,
bak7 STRING,
businessId STRING,
phone STRING,
bak1 STRING,
bak2 STRING,
bak13 STRING,
bak14 STRING,
bak11 STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test',
'sink.parallelism' = '3',
'key.format' = 'json',
'value.format' = 'json',
'properties.bootstrap.servers' = '${kafka-bootstrap-server}',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.kerberos.service.name' = 'kafka',
'metadata.max.age.ms' = '300000'
);
- insert
insert into sink_kafka_test select * from source_mysql_test;
6)验证
Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。
INSERT INTO mysql2kafka_cdc_test( eventId, eventStDt, bak6, bak7, businessId, phone, bak1, bak2, bak13, bak14, bak11 ) VALUES( '111', '2022-11-3023:37:49', '测试', 'https://test?user', '1727980911111111111111111111', '12345678910', '1234', '2021-12-0100:00:00', '1727980911111111111111111111', 'APP', 'TEST1' );
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/030549e935.html
