【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列

    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列

    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列

    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列

    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列

    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖及数据结构
    • 1、maven依赖
    • 2、数据结构
    • 3、数据源
    • 4、验证结果
  • 四、通过广播将维表数据传递到下游
    • 1、说明
    • 2、示例:将事实流与维表进行关联-通过Flink 的Broadcast
      • 1)、广播实现
      • 2)、实现事实流与维度流join

本文是通过Flink的广播方式进行维度表数据进行广播,事实流进行connection。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇文章:

【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据

【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源

【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游

【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join

【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)

【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

一、maven依赖及数据结构

1、maven依赖

本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。

	UTF-8
	UTF-8
	1.8
	1.8
	1.8
	2.12
	1.17.0



	
	
		org.apache.flink
		flink-clients
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-java
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-table-common
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-streaming-java
		${flink.version}
	
	
		org.apache.flink
		flink-table-api-java-bridge
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-csv
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-json
		${flink.version}
		provided
	
	
	
		org.apache.flink
		flink-table-planner_2.12
		${flink.version}
		provided
	
	
	
		org.apache.flink
		flink-table-api-java-uber
		${flink.version}
		provided
	
	
	
		org.apache.flink
		flink-table-runtime
		${flink.version}
		provided
	
	
		org.apache.flink
		flink-connector-jdbc
		3.1.0-1.17
	
	
		mysql
		mysql-connector-java
		5.1.38
	
	
		com.google.guava
		guava
		32.0.1-jre
	
	
	
	
		org.apache.flink
		flink-connector-kafka
		${flink.version}
	
	
	
		org.apache.flink
		flink-sql-connector-kafka
		${flink.version}
		provided
	
	
	
		org.apache.commons
		commons-compress
		1.24.0
	
	
		org.projectlombok
		lombok
		1.18.2
	
	
		org.apache.bahir
		flink-connector-redis_2.12
		1.1.0
		
			
				flink-streaming-java_2.12
				org.apache.flink
			
			
				flink-runtime_2.12
				org.apache.flink
			
			
				flink-core
				org.apache.flink
			
			
				flink-java
				org.apache.flink
			
			
				org.apache.flink
				flink-table-api-java
			
			
				org.apache.flink
				flink-table-api-java-bridge_2.12
			
			
				org.apache.flink
				flink-table-common
			
			
				org.apache.flink
				flink-table-planner_2.12
			
		
	
	
	
		com.alibaba
		fastjson
		2.0.43
	

2、数据结构

本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2。

  • 事实流 order
    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }
  • 维度流 user
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

3、数据源

事实流数据有几种,具体见示例部分,比如socket、redis、kafka等

维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。

如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。

4、验证结果

本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。

四、通过广播将维表数据传递到下游

1、说明

利用Flink的Broadcast State将维表数据流广播到下游做join操作。该种方式实现比较方便,完全满足需求,美中不足的是需要充分利用系统的内存,也就是将数据存储在内容中。

更多内容见文章:

53、Flink 的Broadcast State 模式介绍及示例

2、示例:将事实流与维表进行关联-通过Flink 的Broadcast

1)、广播实现

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package org.tablesql.join;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;

// final BroadcastProcessFunction function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2> {
    // 用于存储规则名称与规则本身的 map 存储结构 
    MapStateDescriptor broadcastDesc;

    JoinBroadcastProcessFunctionImpl(MapStateDescriptor broadcastDesc) {
        this.broadcastDesc = broadcastDesc;
    }

    // 负责处理广播流的元素
    @Override
    public void processBroadcastElement(User value,
            BroadcastProcessFunction<Order, User, Tuple2>.Context ctx,
            Collector<Tuple2> out) throws Exception {
        System.out.println("收到广播数据:" + value);
        // 得到广播流的存储状态
        ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
    }

    // 处理非广播流,关联维度
    @Override
    public void processElement(Order value,
            BroadcastProcessFunction<Order, User, Tuple2>.ReadOnlyContext ctx,
            Collector<Tuple2> out) throws Exception {
        // 得到广播流的存储状态
        ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);

        out.collect(new Tuple2(value, state.get(value.getUId()).getName()));
    }
}

2)、实现事实流与维度流join

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
package org.tablesql.join;

import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

public class TestJoinDimFromBroadcastDataStreamDemo {
    // 维表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class User {
        private Integer id;
        private String name;
        private Double balance;
        private Integer age;
        private String email;
    }

    // 事实表
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private Integer id;
        private Integer uId;
        private Double total;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // order 实时流
        DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
                });

        // user 实时流
        DataStream userDs = env.socketTextStream("192.168.10.42", 8888)
                .map(o -> {
                    String[] lines = o.split(",");
                    return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
                }).setParallelism(1);
                
        // 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
        // MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor(
        //         "RulesBroadcastState",
        //         BasicTypeInfo.STRING_TYPE_INFO,
        //         TypeInformation.of(new TypeHint() {
        //         }));

        // 广播流,广播规则并且创建 broadcast state
        // BroadcastStream ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

        // 将user流(维表)定义为广播流
        final MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
                Integer.class,
                User.class);

        BroadcastStream broadcastStream = userDs.broadcast(broadcastDesc);

        // 需要由非广播流来进行调用
        DataStream result = orderDs.connect(broadcastStream)
                .process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));

        result.print();
        // user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
        // 1001,alan,18,20,alan.chan.chn@163.com
        // 1002,alanchan,19,25,alan.chan.chn@163.com
        // 1003,alanchanchn,20,30,alan.chan.chn@163.com
        // 1004,alan_chan,27,20,alan.chan.chn@163.com
        // 1005,alan_chan_chn,36,10,alan.chan.chn@163.com

        // order 流数据
        // 16,1002,211
        // 17,1004,234
        // 18,1005,175
        
        // 控制台输出
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
        // ......
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
        // ......
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
        // ......
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
        // ......
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
        // ......
        // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
        // 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
        // 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
        // 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)

        env.execute();

    }

 }

以上,本文是通过Flink的广播方式进行维度表数据进行广播,事实流进行connection。

本专题分为以下几篇文章:

【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据

【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源

【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游

【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join

【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)

【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/76d0a07aa6.html