【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

  • 1)导入依赖
  • 2)resources
    • 2.1.appconfig.yml
    • 2.2.application.properties
    • 2.3.log4j.properties
    • 2.4.log4j2.xml
  • 3)util
    • 3.1.KafkaMysqlUtils
    • 3.2.CustomDeSerializationSchema
  • 4)po
    • 4.1.TableBean
  • 5)kafkacdc2mysql
    • 5.1.Kafka2MysqlApp

需求描述:

1、数据从 Kafka 写入 Mysql。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。


    4.0.0

    gaei.cn.x5l
    x8vbusiness
    1.0.0

    
        UTF-8
        1.8
        ${target.java.version}
        ${target.java.version}

        2.12
        2.12.10
        1.14.0
        2.17.2
        3.1.2
        3.1.2

        3.12.6
        4.3.1

    
    
        
            com.ververica
            flink-connector-mysql-cdc
            2.3.0
            <!--            -->
            <!--                -->
            <!--                    mysql-->
            <!--                    mysql-connector-java-->
            <!--                -->
            <!--            -->
        

        
            redis.clients
            jedis
            2.9.0
        

        
        
            org.apache.flink
            flink-java
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
            provided
        
        
        
        
            org.apache.flink
            flink-table-api-java-bridge_${scala.binary.version}
            1.14.0
            provided
        

        
        
            org.apache.flink
            flink-table-planner_${scala.binary.version}
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-streaming-scala_${scala.binary.version}
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-table-common
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-cep_${scala.binary.version}
            ${flink.version}
        
        
        
        
        
            org.apache.flink
            flink-json
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-csv
            ${flink.version}
            provided
        
        
        
        
            org.apache.flink
            flink-sql-connector-kafka_${scala.binary.version}
            ${flink.version}
        
        
        
        
            org.apache.flink
            flink-state-processor-api_${scala.binary.version}
            ${flink.version}
            provided
        


        
            commons-lang
            commons-lang
            2.5
            compile
        

        
        
            org.apache.flink
            flink-runtime-web_${scala.binary.version}
            ${flink.version}
            provided
        
        
        
        
            org.apache.logging.log4j
            log4j-slf4j-impl
            ${log4j.version}
            runtime
        
        
            org.apache.logging.log4j
            log4j-api
            ${log4j.version}
            runtime
        

        
            org.apache.logging.log4j
            log4j-core
            ${log4j.version}
            runtime
        
        
        
            org.apache.hadoop
            hadoop-client
            3.3.1
            
                
                    org.apache.avro
                    avro
                
            
        


        
        
            org.apache.hadoop
            hadoop-auth
            ${hadoop.version}


        
        
        
            org.apache.flink
            flink-statebackend-rocksdb_${scala.binary.version}
            ${flink.version}
            provided
        

        
        
            com.alibaba
            fastjson
            1.1.23
        
        
            org.projectlombok
            lombok
            1.16.18
            provided
        
        
            org.jyaml
            jyaml
            1.3
        


        
        
            org.apache.flink
            flink-table-planner-blink_${scala.binary.version}
            <!--            ${flink.version}-->
            1.13.5
            provided
        


        


        
            com.google.code.gson
            gson
            2.8.3
        

        
            com.ververica
            flink-connector-mongodb-cdc
            2.3.0
        

        
            mysql
            mysql-connector-java
            <!--            5.1.44-->
            8.0.27
            runtime
        

        
            com.alibaba
            druid
            1.2.8
        



        
            org.mongodb
            bson
            ${mongo.driver.core.version}
        


        
            org.mongodb
            mongodb-driver-core
            ${mongo.driver.core.version}
        

        
        
            org.mongodb
            mongodb-driver
            3.12.6
        

    
    
        

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    ${target.java.version}
                    ${target.java.version}
                
            

            
            <!-- Change the value of ... if your program entry point changes. -->
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    org.apache.flink:force-shading
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    org.apache.logging.log4j:*
                                    org.apache.flink:flink-runtime-web_2.11
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        META-INF/*.SF
                                        META-INF/*.DSA
                                        META-INF/*.RSA
                                    
                                
                            
                            
                                
                                    com.owp.flink.kafka.KafkaSourceDemo
                                
                                
                                
                                
                                
                            
                        
                    
                
            
        

        
            
                
                
                    org.eclipse.m2e
                    lifecycle-mapping
                    1.0.0
                    
                        
                            
                                
                                    
                                        org.apache.maven.plugins
                                        maven-shade-plugin
                                        [3.0.0,)
                                        
                                            shade
                                        
                                    
                                    
                                        
                                    
                                
                                
                                    
                                        org.apache.maven.plugins
                                        maven-compiler-plugin
                                        [3.1,)
                                        
                                            testCompile
                                            compile
                                        
                                    
                                    
                                        
                                    
                                
                            
                        
                    
                

            
        

    


2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1

# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongo

mysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456

#envType=PRE
envType=PRD

# mysql  druid 连接池生产环境连接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生产
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化连接数
druid.initialSize=1
# 最大连接数
druid.maxActive=5
# 最大等待时间
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml


    
        
        
    

    
        
            
            
        
        
            
        
    

    
        
            
            
        
    

3)util

3.1.KafkaMysqlUtils

public class KafkaUtils {
    public static FlinkKafkaConsumer<ConsumerRecord> getKafkaConsumer(List topic) throws IOException {
        Properties prop1 = confFromYaml();
        //认证环境
        String envType = prop1.getProperty("envType");


        Properties prop = new Properties();

        System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");
        prop.put("security.protocol", "SASL_PLAINTEXT");
        prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                + "useTicketCache=false  "
                + "serviceName=\"" + "kafka" + "\" "
                + "useKeyTab=true "
                + "keyTab=\"" + "/opt/conf/test.keytab" + "\" "
                + "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");
        prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));
        prop.put("group.id", "Kafka2Mysql_test");
        prop.put("auto.offset.reset", "earliest");
        prop.put("enable.auto.commit", "false");
        prop.put("max.poll.interval.ms", "60000");
        prop.put("max.poll.records", "3000");
        prop.put("session.timeout.ms", "600000");

//        List topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());

        prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
        prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");


        FlinkKafkaConsumer<ConsumerRecord> consumer = new FlinkKafkaConsumer<ConsumerRecord>(topic, new CustomDeSerializationSchema(), prop);

        consumer.setStartFromGroupOffsets();
        consumer.setCommitOffsetsOnCheckpoints(true);

        return consumer;
    }

    public static void main(String[] args) throws Exception {
        Properties druidConf = KafkaUtils.getDruidConf();
        if (druidConf == null) {
            throw new RuntimeException("缺少druid相关配置信息,请检查");
        }

        DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);
        Connection connection = dataSource.getConnection();
        PreparedStatement showDatabases = connection.prepareStatement("\n" +
                "select count(*) from tab_factory");
        ResultSet resultSet = showDatabases.executeQuery();
        while (resultSet.next()) {
            String string = resultSet.getString(1);
            System.out.println(string);
        }
        resultSet.close();
        showDatabases.close();

        connection.close();


    }

    public static Properties getDruidConf() {
        try {
            Properties prop = confFromYaml();
            String driverClassName = prop.get("druid.driverClassName").toString();
            String url = prop.get("druid.url").toString();
            String username = prop.get("druid.username").toString();
            String password = prop.get("druid.password").toString();
            String initialSize = prop.get("druid.initialSize").toString();
            String maxActive = prop.get("druid.maxActive").toString();
            String maxWait = prop.get("druid.maxWait").toString();

            Properties p = new Properties();
            p.put("driverClassName", driverClassName);
            p.put("url", url);
            p.put("username", username);
            p.put("password", password);
            p.put("initialSize", initialSize);
            p.put("maxActive", maxActive);
            p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));
            return p;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }


    // envType     PRE  PRD
    public static Map getKafkaKerberos(String envType) {
        Map map = new HashMap();
        if ("PRD".equalsIgnoreCase(envType)) {
            map.put("principal", "prd@PRD.PRD.COM");
            map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");
        } else if ("PRE".equalsIgnoreCase(envType)) {
            map.put("principal", "pre@PRE.PRE.COM");
            map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");
        } /*else if ("TEST".equalsIgnoreCase(envType)) {
            map.put("principal","test@TEST.TEST.COM");
            map.put("bootstrap.servers","test@TEST.TEST.COM");
        } */ else {
            System.out.println("没有该" + envType + "环境");
            throw new RuntimeException("没有该" + envType + "环境");
        }

        return map;
    }

    public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {
        Properties prop = confFromYaml();
        env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));
        env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次
                Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时
        ));
        // 设置状态后端存储方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());
        env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));
        return env;

    }

    public static Properties confFromYaml() {
        Properties prop = new Properties();
        InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");
        try {
            prop.load(resourceStream);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (resourceStream != null) {
                    resourceStream.close();
                }
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }
        return prop;
    }
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord> {
    private static String encoding = "UTF8";

    //是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来
    @Override
    public boolean isEndOfStream(ConsumerRecord nextElement) {
        return false;
    }

    //这里返回一个ConsumerRecord类型的数据,除了原数据还包括topic,offset,partition等信息
    @Override
    public ConsumerRecord deserialize(ConsumerRecord record) throws Exception {
        byte[] key = (record.key() == null ? "".getBytes() : record.key());
        return new ConsumerRecord(
                record.topic(),
                record.partition(),
                record.offset(),
                record.timestamp(),
                record.timestampType(),
                record.checksum(),
                record.serializedKeySize(),
                record.serializedValueSize(),
                /*这里我没有进行空值判断,生产一定记得处理*/
                new  String(key, encoding),
                new  String(record.value(), encoding));
    }

    //指定数据的输入类型
    @Override
    public TypeInformation<ConsumerRecord> getProducedType() {
        return TypeInformation.of(new TypeHint<ConsumerRecord>() {
        });
    }
}

4)po

4.1.TableBean

@Data
public class TableBean {
    private String database;
    private String table;
    private String primaryKey;

    private TableBean() {
    }

    public TableBean(String database, String table, String primaryKey) {
        this.database = '`' + database + '`';
        this.table = '`' + table + '`';
        this.primaryKey = primaryKey;
    }
}

5)kafkacdc2mysql

5.1.Kafka2MysqlApp

public class Kafka2MysqlApp {
    // key 是 topic 名,value是对应数据库表中的主键列名
    private static final Map map = new HashMap();

    static {
    	//表名这里没有进行配置,后面根据实际业务进行配置即可
        map.put("mysql_tab1", new TableBean("db1", "", "alarm_id"));
        map.put("mysql_tab2", new TableBean("db2", "", "id"));
    }

    public static void main(String[] args) throws Exception {

        ArrayList topicList = new ArrayList(map.keySet());

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();
        KafkaUtils.setupFlinkEnv(env);

        RichSinkFunction<ConsumerRecord> sinkFunction =
                new RichSinkFunction<ConsumerRecord>() {
                    DataSource dataSource = null;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        initDruidDataSource();
                    }

                    private void initDruidDataSource() throws Exception {
                        Properties druidConf = KafkaUtils.getDruidConf();
                        if (druidConf == null) {
                            throw new RuntimeException("缺少druid相关配置信息,请检查");
                        }

                        dataSource = DruidDataSourceFactory.createDataSource(druidConf);
                    }

                    @Override
                    public void close() throws Exception {

                    }

                    @Override
                    public void invoke(ConsumerRecord record, Context context) throws Exception {
                        if (dataSource == null) {
                            throw new RuntimeException("连接池未初始化");
                        }
                        String operationType = "";
                        String keyId = "";
                        String sql = "";
                        try (Connection connection = dataSource.getConnection()) {
                        	//定义表名
                            String table_name = record.topic();

                            JSONObject jsonObject = JSONObject.parseObject(record.value());
                            operationType = jsonObject.getString("operationType");
                            jsonObject.remove("operationType");
                            String primaryKey = map.get(record.topic()).getPrimaryKey();
                            String database = map.get(record.topic()).getDatabase();
                            keyId = jsonObject.getString(primaryKey);
                            List columns = new ArrayList();
                            List columnValues = new ArrayList();

                            jsonObject.forEach((k, v) -> {
                                columns.add(k);
                                columnValues.add(v.toString());
                            });

                            if ("INSERT".equals(operationType)) {
                                try {
                                    sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";

                                    PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                    preparedStatement.setObject(1, keyId);
                                    preparedStatement.executeUpdate();
                                    preparedStatement.close();
                                } catch (Exception ignore) {
                                }


                                StringBuilder sb = new StringBuilder();
                                sb.append("insert into ").append(database).append(".").append(table_name).append("(");
                                for (String column : columns) {
                                    sb.append("`").append(column).append("`,");
                                }
                                sb.append(") values(");
                                for (String columnValue : columnValues) {
                                    sb.append("?,");
                                }
                                sb.append(")");
                                //去除最后一个逗号
                                sql = sb.toString().replace(",)", ")");

                                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                for (int i = 0; i < columnValues.size(); i++) {
                                    preparedStatement.setObject(i + 1, columnValues.get(i));
                                }
                                preparedStatement.executeUpdate();
                                preparedStatement.close();


                            } else if ("UPDATE".equals(operationType)) {

                                StringBuilder sb = new StringBuilder();
                                sb.append("update ").append(database).append(".").append(table_name).append(" set ");
                                for (String column : columns) {
                                    sb.append("`").append(column).append("`= ?,");
                                }
                                String sqlPre = sb.substring(0, sb.length() - 1);
                                sql = sqlPre + " where " + primaryKey + "='" + keyId + "'";

                                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                for (int i = 0; i < columnValues.size(); i++) {
                                    preparedStatement.setObject(i + 1, columnValues.get(i));
                                }
                                preparedStatement.executeUpdate();
                                preparedStatement.close();

                            } else if ("DELETE".equals(operationType)) {
                                sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";

                                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                                preparedStatement.setObject(1, keyId);
                                preparedStatement.executeUpdate();
                                preparedStatement.close();
                            }
                        } catch (Exception e) {
                            System.out.printf("mysql同步操作(%s)有误,主键是%s,原因是%s,对应topic数据是%s%n", operationType, keyId, e.getMessage(), record);
                            System.out.println("执行sql语句为 " + sql);
                            throw new RuntimeException(e);
                        }
                    }
                };

        env.addSource(KafkaUtils.getKafkaConsumer(topicList)).addSink(sinkFunction);

        env.execute("kafka2mysql synchronization " + topicList.toString());
    }
}

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/7a7fd773ad.html