Flink实现同时消费多个kafka topic,并输出到多个topic

Flink实现同时消费多个kafka topic,并输出到多个topic

  • 1.说明
  • 2.依赖引用
  • 3. 方案一:适用于==sink topic==存在跨集群等kafka生产者配置信息不相同的情况
    • 3.1配置文件
    • 3.2 java代码
    • 3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)
  • 4.方案二:适用于输入及输出topic都用属于一个集群的场景
    • 4.1 配置文件同上
    • 4.2 Java代码
  • 5. 业务使用场景:

1.说明

1)代码使用的flink版本为1.16.1,旧版本的依赖及api可能不同,同时使用了hutool的JSON工具类,两者均可自行更换;

2)本次编写的两个方案,均只适用于数据源topic来自同一个集群,且kafka消费组相同,暂未研究flink的connect算子join多条流

2.依赖引用

 
        8
        8
        UTF-8
        1.16.1
        5.8.15
    
    
 
            cn.hutool
            hutool-all
            ${hutool.version}
        
        
        
            org.apache.flink
            flink-java
            ${flink.version}
            
                
                    commons-lang3
                    org.apache.commons
                
            
        
        
            org.apache.flink
            flink-streaming-java
            ${flink.version}
            
                
                    commons-lang3
                    org.apache.commons
                
            
        
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-kafka
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-base
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-files
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-jdbc
            ${flink.version}
        
        
            org.apache.flink
            flink-runtime-web
            ${flink.version}
        

3. 方案一:适用于sink topic存在跨集群等kafka生产者配置信息不相同的情况

代码涉及Hadoop相关环境,若无该环境的同学,可以设置为本地路径

3.1配置文件

# 输入topic列表
newInputTopic=hive_data_input_topic
# 输出topic列表
newOutputTopic=topic-test

3.2 java代码

public static void main(String[] args) throws Exception {
		// 设置操作HDFS的用户
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        // 获取命令行参数,args[0] 为配置文件路径 input/customer.properties
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);
        String inputTopic = parameterTool.get("newInputTopic");
        String outputTopic = parameterTool.get("newOutputTopic");
        // 构建输入topic
        ArrayList inputTopicList = new ArrayList();
        inputTopicList.add("canal_mysql_input_topic");
        if (!StringUtils.isNullOrWhitespaceOnly(inputTopic)) {
            inputTopicList.add(inputTopic);
        }
        // 构建输出topic
        Map hashMap = new HashMap();
        hashMap.put("ap_article", "canal_input_topic");
        hashMap.put("ap_user", "cast_topic_input");
        if (!StringUtils.isNullOrWhitespaceOnly(outputTopic)) {
            hashMap.put("hive_table_orders", "topic-test");
        }

        // 构建配置
        Configuration configuration = new Configuration();
        // 设定本地flink dashboard的webUi访问端口,即http://localhost:9091
        configuration.setString("rest.port", "9091");
        // 设定从指定的checkpoint恢复,此处为HDFS路径,可更换为本地路径"file:///D:\\test\\flink-tuning\\checkpoint\\jobId\\chk-xx"
        String savePointPath = "hdfs://masterNode:8020/flink-tuning/checkpoint/b66ee8431170f07764db0e777c58848a/chk-36";
        // 设置savepoint路径,以及是否允许本次提交的程序有新增有状态算子,必须给原来的算子配置uid作为唯一标识,否则会出现问题
        SavepointRestoreSettings restoreSettings = SavepointRestoreSettings.forPath(savePointPath, true);
        SavepointRestoreSettings.toConfiguration(restoreSettings, configuration);
        
		// 获取执行环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        // 开启检查点,设置检查点间隔时间
        environment.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
        // 设置状态后端类型
        environment.setStateBackend(new HashMapStateBackend());
        CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
        // 设置checkpoint文件存放路径,设置本地路径:file:///D:\\test\\flink-tuning\\checkpoint
        checkpointConfig.setCheckpointStorage("hdfs://masterNode:8020/flink-tuning/checkpoint");
        // 设置并发数,同时最多可以有几个checkpoint执行
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        // checkpoint失败次数,超过此次数,job挂掉(checkpoint不会重试,会等待下一个checkpoint)
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        // 超时多久没完成checkpoint,任务失败
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
        // 手动cancel掉job时,保留在外部系统的checkpoint不会被删除
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 从kafka读取数据
        KafkaSource kafkaSource = KafkaSource.builder()
                .setBootstrapServers("192.168.200.130:9092")
                .setTopics(inputTopicList)
                .setGroupId("group-test-savepoint")
                // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        SingleOutputStreamOperator streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
                .uid("kafka_source")	// 最好设置一下算子的id
                .setParallelism(5); // 设置并行度 = topic分区数

        // 此处使用循环,会开辟map键值对个数的算子链,多个filter --> sink算子链,详情见下图
      	// map中可配置topic所属集群,以及鉴权信息等,此处省略
        for (String key : hashMap.keySet()) {
        	// filter算子根据数据中的表名table与topic之间的映射关系,过滤数据
            SingleOutputStreamOperator outputStreamOperator = streamSource.filter(vo -> {
                JSONObject jsonObject = JSONUtil.parseObj(vo);
                String tableName = (String) jsonObject.get("table");
                return tableName.equals(key);
            }).uid("filter-" + key).setParallelism(5);

			// 构建kafka sink
            KafkaSink kafkaSink = KafkaSink.builder()
            		// kafka集群,可根据不同topic所在集群不同,动态更换ip
                    .setBootstrapServers("192.168.200.130:9092")
                    // 自定义kafka序列化器
                    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    		// 根据映射获取输出topic
                            .setTopic(hashMap.get(key))
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
                     // 一致性语义:至少一次
                    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                    .build();
            // sink算子
            outputStreamOperator.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
        }

        // 执行
        environment.execute();

3.3 运行图(ps:为了更好的展示循环中包含算子,将sink算子并行度设为了1,发生了rebalance)

算子运行图

4.方案二:适用于输入及输出topic都用属于一个集群的场景

4.1 配置文件同上

4.2 Java代码

public static void main(String[] args) throws Exception {
		// 环境配置同上,故此处省略。。。

        // 从kafka读取数据
        KafkaSource kafkaSource = KafkaSource.builder()
                .setBootstrapServers("192.168.200.130:9092")
                .setTopics(inputTopicList)
                .setGroupId("group-test-savepoint")
                // 从消费组的offset提交位点开始消费,若未找到上一次消费位点,则从设置该topic的offset为最新的位置
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区,避免分区扩容导致没有算子消费
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
        SingleOutputStreamOperator streamSource = environment.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka source")
                .uid("kafka_source")	// 最好设置一下算子的id
                .setParallelism(5); // 设置并行度 = topic分区数
                
        // 输出到kafka,此处没有循环,只会产生一条算子链
        KafkaSink kafkaSink = KafkaSink.builder()
                .setBootstrapServers("192.168.200.130:9092")	// 输出topic的kafka集群固定
                .setRecordSerializer((KafkaRecordSerializationSchema) (data, context, timestamp) -> {
                    JSONObject jsonObject = JSONUtil.parseObj(data);
                    // 获取表名
                    String table = (String) jsonObject.get("table");
                    // 获取topic
                    String topic = hashMap.get(table);
                    return new ProducerRecord(topic, data.getBytes(StandardCharsets.UTF_8));
                })
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        // sink算子
        streamSource.sinkTo(kafkaSink).uid("sink-" + key).setParallelism(1);
        }
        // 执行
        environment.execute();

5. 业务使用场景:

在这里插入图片描述

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/4112ec2f6d.html