【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、maven依赖及数据结构
-
- 1、maven依赖
- 2、数据结构
- 3、数据源
- 4、验证结果
- 二、维表来源于初始化的静态数据
-
- 1、说明
- 2、示例:将事实流与维表进行关联
- 三、维表来源于第三方数据源
-
- 1、说明
- 2、示例:将事实流与维表进行关联-通过缓存降低性能开销
- 3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率
-
- 1)、redis 异步I/O实现
- 2)、实现事实流与维度流join
- 四、通过广播将维表数据传递到下游
-
- 1、说明
- 2、示例:将事实流与维表进行关联-通过Flink 的Broadcast
-
- 1)、广播实现
- 2)、实现事实流与维度流join
本文介绍了flink 维表的前三种实现方式,即通过初始化静态数据、通过异步IO访问外部数据和通过广播维表数据。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,本文还依赖redis环境。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
一、maven依赖及数据结构
1、maven依赖
本文的所有示例均依赖本部分的pom.xml内容,可能针对下文中的某些示例存在过多的引入,根据自己的情况进行删减。
UTF-8
UTF-8
1.8
1.8
1.8
2.12
1.17.0
org.apache.flink
flink-clients
${flink.version}
provided
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-table-common
${flink.version}
provided
org.apache.flink
flink-streaming-java
${flink.version}
org.apache.flink
flink-table-api-java-bridge
${flink.version}
provided
org.apache.flink
flink-csv
${flink.version}
provided
org.apache.flink
flink-json
${flink.version}
provided
org.apache.flink
flink-table-planner_2.12
${flink.version}
provided
org.apache.flink
flink-table-api-java-uber
${flink.version}
provided
org.apache.flink
flink-table-runtime
${flink.version}
provided
org.apache.flink
flink-connector-jdbc
3.1.0-1.17
mysql
mysql-connector-java
5.1.38
com.google.guava
guava
32.0.1-jre
org.apache.flink
flink-connector-kafka
${flink.version}
org.apache.flink
flink-sql-connector-kafka
${flink.version}
provided
org.apache.commons
commons-compress
1.24.0
org.projectlombok
lombok
1.18.2
org.apache.bahir
flink-connector-redis_2.12
1.1.0
flink-streaming-java_2.12
org.apache.flink
flink-runtime_2.12
org.apache.flink
flink-core
org.apache.flink
flink-java
org.apache.flink
org.apache.flink
flink-table-api-java
org.apache.flink
flink-table-api-java-bridge_2.12
org.apache.flink
flink-table-common
org.apache.flink
flink-table-planner_2.12
com.alibaba
fastjson
2.0.43
2、数据结构
本示例仅仅为实现需求:将订单中uId与用户id进行关联,然后输出Tuple2。
- 事实流 order
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
- 维度流 user
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
3、数据源
事实流数据有几种,具体见示例部分,比如socket、redis、kafka等
维度表流有几种,具体见示例部分,比如静态数据、mysql、socket、kafka等。
如此,实现本文中的示例就需要准备好相应的环境,即mysql、redis、kafka、netcat等。
4、验证结果
本文提供的所有示例均为验证通过的示例,测试的数据均在每个示例中,分为事实流、维度流和运行结果进行注释,在具体的示例中关于验证不再赘述。
二、维表来源于初始化的静态数据
1、说明
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在事实流map()方法中与维表数据进行关联。
由于数据存储于内存中,所以只适合小数据量并且维表数据更新频率不高的情况下使用。虽然可以在open中定义一个定时器定时更新维表,但是还是存在维表更新不及时的情况或资源开销较大的情况。一般如果数据量较小且不大会变(或变化影响也不大)的情况下,理想选择之一。
2、示例:将事实流与维表进行关联
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description: 采用在RichMapfunction类的open方法中将维表数据加载到内存
*/
public class TestJoinDimFromStaticDataDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 事实流
DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
DataStream<Tuple2> result = orderDs.map(new RichMapFunction<Order, Tuple2>() {
Map userDim = null;
// 维表-静态数据,本处使用的是匿名内部类实现的
@Override
public void open(Configuration parameters) throws Exception {
userDim = new HashMap();
userDim.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
userDim.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
userDim.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
userDim.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
userDim.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
}
@Override
public Tuple2 map(Order value) throws Exception {
return new Tuple2(value, userDim.get(value.getUId()).getName());
}
});
result.print();
// nc 输入
// 1,1004,345
// 2,1001,678
// 控制台输出
// 2> (TestJoinDimFromStaticData.Order(id=1, uId=1004, total=345.0),alan_chan)
// 3> (TestJoinDimFromStaticData.Order(id=2, uId=1001, total=678.0),alan)
env.execute("TestJoinDimFromStaticData");
}
}
三、维表来源于第三方数据源
1、说明
这种方式是将维表数据存储在Redis、HBase、MySQL等外部存储中,事实流在关联维表数据的时候实时去外部存储中查询。
由于维度数据量不受内存限制,可以存储很大的数据量。同时维表数据来源于第三方数据源,读取速度受制于外部存储的读取速度。一般常见的做法该种方式较多。
2、示例:将事实流与维表进行关联-通过缓存降低性能开销
如果频繁的访问第三方数据源进行join,会带来很大的开销,为降低该种情况的开销,一般使用cache来减轻访问压力,但该种方式存在数据同步的不一致或延迟情况。如果使用缓存,则会存在将数据存在内存中,也会增加系统开销。该种情况的实际应用以具体的业务场景而定。本示例使用的是guava Cache,缓存的实现有很多种方式,具体以自己的实际情况进行选择。
本示例的数据源仅仅以静态的数据进行展示,实际上可能数据来源于Hbase、mysql等。
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestJoinDimFromCacheDataDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 维表
DataStream<Tuple2> result = orderDs.map(new RichMapFunction<Order, Tuple2>() {
// 缓存接口这里是LoadingCache,LoadingCache在缓存项不存在时可以自动加载缓存
LoadingCache userDim;
@Override
public void open(Configuration parameters) throws Exception {
// 使用google LoadingCache来进行缓存
// CacheBuilder的构造函数是私有的,只能通过其静态方法newBuilder()来获得CacheBuilder的实例
userDim = CacheBuilder.newBuilder()
// 设置并发级别为8,并发级别是指可以同时写缓存的线程数
.concurrencyLevel(8)
// 最多缓存个数,超过了就根据最近最少使用算法来移除缓存
.maximumSize(1000)
// 设置写缓存后10分钟过期
.expireAfterWrite(10, TimeUnit.MINUTES)
// 设置缓存容器的初始容量为10
.initialCapacity(10)
// 设置要统计缓存的命中率
.recordStats()
// 指定移除通知
.removalListener(new RemovalListener() {
@Override
public void onRemoval(RemovalNotification removalNotification) {
System.out.println(removalNotification.getKey() + "被移除了,值为:" + removalNotification.getValue());
}
})
.build(
// 指定加载缓存的逻辑
new CacheLoader() {
@Override
public User load(Integer uId) throws Exception {
return dataSource(uId);
}
});
System.out.println("userDim:" + userDim.get(1002));
}
private User dataSource(Integer uId) {
// 可以是任何数据源,本处仅仅示例
Map users = new HashMap();
users.put(1001, new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"));
users.put(1002, new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"));
users.put(1003, new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"));
users.put(1004, new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"));
users.put(1005, new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));
User user = null;
if (users.containsKey(uId)) {
user = users.get(uId);
}
return user;
}
@Override
public Tuple2 map(Order value) throws Exception {
return new Tuple2(value, userDim.get(value.getUId()).getName());
}
});
result.print();
// 输入数据
// 7,1003,111
// 8,1005,234
// 9,1002,875
// 控制台输出数据
// 5> (TestJoinDimFromCacheDataDemo.Order(id=7, uId=1003, total=111.0),alanchanchn)
// 6> (TestJoinDimFromCacheDataDemo.Order(id=8, uId=1005, total=234.0),alan_chan_chn)
// 7> (TestJoinDimFromCacheDataDemo.Order(id=9, uId=1002, total=875.0),alanchan)
env.execute("TestJoinDimFromCacheDataDemo");
}
}
3、示例:将事实流与维表进行关联-通过Flink 的异步 I/O提高系统效率
Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,比如redis、MongoDB等。
更多内容见文章:
55、Flink之用于外部数据访问的异步 I/O介绍及示例
1)、redis 异步I/O实现
package org.tablesql.join;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.tablesql.join.TestJoinDimFromAsyncDataStreamDemo.Order;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class JoinAyncFunctionByRedis extends RichAsyncFunction<Order, Tuple2> {
private JedisPoolConfig config = null;
private static String ADDR = "192.168.10.41";
private static int PORT = 6379;
private static int TIMEOUT = 10000;
private JedisPool jedisPool = null;
private Jedis jedis = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ADDR, PORT, TIMEOUT);
jedis = jedisPool.getResource();
}
@Override
public void asyncInvoke(Order input, ResultFuture<Tuple2> resultFuture) throws Exception {
// order 实时流中的单行数据
System.out.println("输入参数input----:" + input);
// 发起一个异步请求,返回结果
CompletableFuture.supplyAsync(new Supplier() {
@Override
public String get() {
// 数据格式:1002,alanchan,19,25,alan.chan.chn@163.com
String userLine = jedis.hget("AsyncReadUserById_Redis", input.getUId() + "");
String[] userTemp = userLine.split(",");
// 返回 用户名
return userTemp[1];
}
}).thenAccept((String dbResult) -> {
// 设置请求完成时的回调,将结果返回
List list = new ArrayList<Tuple2>();
list.add(new Tuple2(input, dbResult));
resultFuture.complete(list);
});
}
// 连接超时的时候调用的方法
public void timeout(Order input, ResultFuture<Tuple2> resultFuture)
throws Exception {
List list = new ArrayList<Tuple2>();
// 数据源超时,不能获取到维表信息,置为"
list.add(new Tuple2(input, ""));
resultFuture.complete(list);
}
@Override
public void close() throws Exception {
super.close();
if (jedis.isConnected()) {
jedis.close();
}
}
}
2)、实现事实流与维度流join
package org.tablesql.join;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
public class TestJoinDimFromAsyncDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
testJoinAyncFunctionByRedis();
}
static void testJoinAyncFunctionByRedis() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// 保证顺序:异步返回的结果保证顺序,超时时间1秒,最大容量2,超出容量触发反压
DataStream<Tuple2> result = AsyncDataStream.orderedWait(orderDs, new JoinAyncFunctionByRedis(),
1000L, TimeUnit.MILLISECONDS, 2);
result.print("result:");
// 允许乱序:异步返回的结果允许乱序,超时时间1秒,最大容量2,超出容量触发反压
DataStream<Tuple2> unorderedResult = AsyncDataStream
.unorderedWait(orderDs, new JoinAyncFunctionByRedis(), 1000L, TimeUnit.MILLISECONDS, 2)
.setParallelism(1);
unorderedResult.print("unorderedResult");
// redis的操作命令及数据
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1001 '1001,alan,18,20,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1002 '1002,alanchan,19,25,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1003 '1003,alanchanchn,20,30,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1004 '1004,alan_chan,27,20,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hset AsyncReadUserById_Redis 1005 '1005,alan_chan_chn,36,10,alan.chan.chn@163.com'
// (integer) 1
// 127.0.0.1:6379> hgetall AsyncReadUserById_Redis
// 1) "1001"
// 2) "1001,alan,18,20,alan.chan.chn@163.com"
// 3) "1002"
// 4) "1002,alanchan,19,25,alan.chan.chn@163.com"
// 5) "1003"
// 6) "1003,alanchanchn,20,30,alan.chan.chn@163.com"
// 7) "1004"
// 8) "1004,alan_chan,27,20,alan.chan.chn@163.com"
// 9) "1005"
// 10) "1005,alan_chan_chn,36,10,alan.chan.chn@163.com"
// 输入数据
// 13,1002,811
// 14,1004,834
// 15,1005,975
// 控制台输出数据
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
// result::12> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0)
// unorderedResult:9> (TestJoinDimFromAsyncDataStreamDemo.Order(id=13, uId=1002, total=811.0),1002,alanchan,19,25,alan.chan.chn@163.com)
// result::5> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0)
// unorderedResult:2> (TestJoinDimFromAsyncDataStreamDemo.Order(id=14, uId=1004, total=834.0),alan_chan)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
// result::6> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
// 输入参数input----:TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0)
// unorderedResult:3> (TestJoinDimFromAsyncDataStreamDemo.Order(id=15, uId=1005, total=975.0),alan_chan_chn)
env.execute("TestJoinDimFromAsyncDataStreamDemo");
}
}
四、通过广播将维表数据传递到下游
1、说明
利用Flink的Broadcast State将维表数据流广播到下游做join操作。该种方式实现比较方便,完全满足需求,美中不足的是需要充分利用系统的内存,也就是将数据存储在内容中。
更多内容见文章:
53、Flink 的Broadcast State 模式介绍及示例
2、示例:将事实流与维表进行关联-通过Flink 的Broadcast
1)、广播实现
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;
// final BroadcastProcessFunction function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2> {
// 用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor broadcastDesc;
JoinBroadcastProcessFunctionImpl(MapStateDescriptor broadcastDesc) {
this.broadcastDesc = broadcastDesc;
}
// 负责处理广播流的元素
@Override
public void processBroadcastElement(User value,
BroadcastProcessFunction<Order, User, Tuple2>.Context ctx,
Collector<Tuple2> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
}
// 处理非广播流,关联维度
@Override
public void processElement(Order value,
BroadcastProcessFunction<Order, User, Tuple2>.ReadOnlyContext ctx,
Collector<Tuple2> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);
out.collect(new Tuple2(value, state.get(value.getUId()).getName()));
}
}
2)、实现事实流与维度流join
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestJoinDimFromBroadcastDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 实时流
DataStream userDs = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
}).setParallelism(1);
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor(
// "RulesBroadcastState",
// BasicTypeInfo.STRING_TYPE_INFO,
// TypeInformation.of(new TypeHint() {
// }));
// 广播流,广播规则并且创建 broadcast state
// BroadcastStream ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 将user流(维表)定义为广播流
final MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
Integer.class,
User.class);
BroadcastStream broadcastStream = userDs.broadcast(broadcastDesc);
// 需要由非广播流来进行调用
DataStream result = orderDs.connect(broadcastStream)
.process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));
result.print();
// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常
// 1001,alan,18,20,alan.chan.chn@163.com
// 1002,alanchan,19,25,alan.chan.chn@163.com
// 1003,alanchanchn,20,30,alan.chan.chn@163.com
// 1004,alan_chan,27,20,alan.chan.chn@163.com
// 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
// order 流数据
// 16,1002,211
// 17,1004,234
// 18,1005,175
// 控制台输出
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com)
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// ......
// 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com)
// 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan)
// 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan)
// 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
env.execute();
}
}
以上,本文介绍了flink 维表的前三种实现方式,即通过初始化静态数据、通过异步IO访问外部数据和通过广播维表数据。
本专题分为以下几篇文章:
【flink番外篇】15、Flink维表实战之6种实现方式-初始化的静态数据
【flink番外篇】15、Flink维表实战之6种实现方式-维表来源于第三方数据源
【flink番外篇】15、Flink维表实战之6种实现方式-通过广播将维表数据传递到下游
【flink番外篇】15、Flink维表实战之6种实现方式-通过Temporal table实现维表数据join
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(1)
【flink番外篇】15、Flink维表实战之6种实现方式-完整版(2)
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/6de38cd061.html
