【flink番外篇】13、Broadcast State 模式示例(完整版)
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、示例:按照分组规则进行图形匹配-KeyedBroadcastProcessFunction
-
- 1、maven依赖
- 2、实现
- 3、验证
-
- 1)、规则输入
- 2)、item输入
- 3)、控制台输出
- 二、示例:BroadcastProcessFunction将维表数据广播给其他流
-
- 1、maven依赖
- 2、实现
-
- 1)、BroadcastProcessFunction实现
- 2)、连接实现
- 3、验证
-
- 1)、输入user数据
- 2)、输入事实流订单数据
- 3)、观察程序控制台输出
本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
一、示例:按照分组规则进行图形匹配-KeyedBroadcastProcessFunction
本示例是简单的应用broadcast state实现简单模式匹配,即实现:
1、按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。
2、相同颜色的规则1:长方形后是三角形
3、相同颜色的规则2:正方形后是长方形
如匹配上述规则1或规则2则输出匹配成功。
1、maven依赖
UTF-8
UTF-8
1.8
1.8
1.8
2.12
1.17.0
org.apache.flink
flink-clients
${flink.version}
provided
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-streaming-java
${flink.version}
<!-- provided -->
org.apache.flink
flink-csv
${flink.version}
provided
org.apache.flink
flink-json
${flink.version}
provided
org.apache.commons
commons-compress
1.24.0
org.projectlombok
lombok
1.18.2
<!-- provided -->
2、实现
package org.tablesql.join;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/*
* @Author: alanchan
*
* @LastEditors: alanchan
*
* @Description: 按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。相同颜色的规则1:长方形后是三角形;规则2:正方形后是长方形
*/
public class TestJoinDimKeyedBroadcastProcessFunctionDemo {
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Shape {
private String name;
private String desc;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Colour {
private String name;
private Long blue;
private Long red;
private Long green;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Item {
private Shape shape;
private Colour color;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Rule {
private String name;
private Shape first;
private Shape second;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// item 实时流
DataStream itemStream = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
// 解析item流
// 数据结构:Item[shape(name,desc);color(name,blue,red,green)]
String[] lines = o.split(";");
String[] shapeString = lines[0].split(",");
String[] colorString = lines[1].split(",");
Shape shape = new Shape(shapeString[0],shapeString[1]);
Colour color = new Colour(colorString[0],Long.valueOf(colorString[1]),Long.valueOf(colorString[2]),Long.valueOf(colorString[3]));
return new Item(shape,color);
});
// rule 实时流
DataStream ruleStream = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
// 解析rule流
// 数据结构:Rule[name;shape(name,desc);shape(name,desc)]
String[] lines = o.split(";");
String name = lines[0];
String[] firstShapeString = lines[1].split(",");
String[] secondShapeString = lines[2].split(",");
Shape firstShape = new Shape(firstShapeString[0],firstShapeString[1]);
Shape secondShape = new Shape(secondShapeString[0],secondShapeString[1]);
return new Rule(name,firstShape,secondShape);
}).setParallelism(1);
// 将图形使用颜色进行划分
KeyedStream colorPartitionedStream = itemStream
.keyBy(new KeySelector() {
@Override
public Colour getKey(Item value) throws Exception {
return value.getColor();// 实现分组
}
});
colorPartitionedStream.print("colorPartitionedStream:---->");
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint() {
}));
// 将rule定义为广播流,广播规则并且创建 broadcast state
BroadcastStream ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 连接,输出流,connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。
DataStream output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 结果的类型,在这里是 string
new KeyedBroadcastProcessFunction() {
// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素
// 用一个数组来存储,因为同时可能有很多第一个元素正在等待
private final MapStateDescriptor<String, List> itemMapStateDesc = new MapStateDescriptor(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo(Item.class));
// 与之前的 ruleStateDescriptor 相同,用于存储规则名称与规则本身的 map 存储结构
private final MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint() {
}));
// 负责处理广播流的元素
@Override
public void processBroadcastElement(Rule ruleValue,
KeyedBroadcastProcessFunction.Context ctx,
Collector out) throws Exception {
// 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor stateDescriptor)
// 查询元素的时间戳:ctx.timestamp()
// 查询目前的Watermark:ctx.currentWatermark()
// 目前的处理时间(processing time):ctx.currentProcessingTime()
// 产生旁路输出:ctx.output(OutputTag outputTag, X value)
// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
ctx.getBroadcastState(ruleStateDescriptor).put(ruleValue.getName(), ruleValue);
}
// 负责处理另一个流的元素
@Override
public void processElement(Item itemValue,
KeyedBroadcastProcessFunction.ReadOnlyContext ctx,
Collector out) throws Exception {
final MapState<String, List> itemMapState = getRuntimeContext().getMapState(itemMapStateDesc);
final Shape shape = itemValue.getShape();
System.out.println("shape:"+shape);
// 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
ReadOnlyBroadcastState readOnlyBroadcastState = ctx.getBroadcastState(ruleStateDescriptor);
Iterable<Entry> iterableRule = readOnlyBroadcastState.immutableEntries();
for (Entry entry : iterableRule) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
// 初始化
List itemStoredList = itemMapState.get(ruleName);
if (itemStoredList == null) {
itemStoredList = new ArrayList();
}
// 比较 shape
if (shape.getName().equals(rule.second.getName()) && !itemStoredList.isEmpty()) {
for (Item item : itemStoredList) {
// 符合规则,收集匹配结果
out.collect("匹配成功: " + item + " - " + itemValue);
}
itemStoredList.clear();
}
// 规则连续性设置
if (shape.getName().equals(rule.first.getName())) {
itemStoredList.add(itemValue);
}
//
if (itemStoredList.isEmpty()) {
itemMapState.remove(ruleName);
} else {
itemMapState.put(ruleName, itemStoredList);
}
}
}
});
output.print("output:------->");
env.execute();
}
}
3、验证
在netcat中启动两个端口,分别是8888和9999,8888输入规则,9999输入item,然后关键控制台输出。
1)、规则输入
red;rectangle,is a rectangle;tripe,is a tripe green;square,is a square;rectangle,is a rectangle
2)、item输入
# 匹配成功 rectangle,is a rectangle;red,100,100,100 tripe,is a tripe;red,100,100,100 # 匹配成功 square,is square;green,150,150,150 rectangle,is a rectangle;green,150,150,150 # 匹配不成功 tripe,is tripe;blue,200,200,200 # 匹配成功 rectangle,is a rectangle;blue,100,100,100 tripe,is a tripe;blue,100,100,100 # 匹配不成功 tripe,is a tripe;blue,100,100,100 rectangle,is a rectangle;blue,100,100,100
3)、控制台输出
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle) colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe) output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100)) colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=square, desc=is square), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) colorPartitionedStream:---->:3> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=200, red=200, green=200)) colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) output:------->:1> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
二、示例:BroadcastProcessFunction将维表数据广播给其他流
本示例是将用户信息作为维表通过流进行广播,在事实表订单流中进行连接匹配输出。
1、maven依赖
参考上述示例中的依赖。
2、实现
实现方式可以使用匿名内部类或内部类实现,本示例为了清楚其中的逻辑关系,特意以一个具体class来实现。
1)、BroadcastProcessFunction实现
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.Order;
import org.tablesql.join.TestJoinDimFromBroadcastDataStreamDemo.User;
// final BroadcastProcessFunction function)
public class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2> {
// 用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor broadcastDesc;
JoinBroadcastProcessFunctionImpl(MapStateDescriptor broadcastDesc) {
this.broadcastDesc = broadcastDesc;
}
// 负责处理广播流的元素
@Override
public void processBroadcastElement(User value,
BroadcastProcessFunction<Order, User, Tuple2>.Context ctx,
Collector<Tuple2> out) throws Exception {
System.out.println("收到广播数据:" + value);
// 得到广播流的存储状态
ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
}
// 处理非广播流,关联维度
@Override
public void processElement(Order value,
BroadcastProcessFunction<Order, User, Tuple2>.ReadOnlyContext ctx,
Collector<Tuple2> out) throws Exception {
// 得到广播流的存储状态
ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);
out.collect(new Tuple2(value, state.get(value.getUId()).getName()));
}
}
2)、连接实现
/*
* @Author: alanchan
* @LastEditors: alanchan
* @Description:
*/
package org.tablesql.join;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public class TestJoinDimFromBroadcastDataStreamDemo {
// 维表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class User {
private Integer id;
private String name;
private Double balance;
private Integer age;
private String email;
}
// 事实表
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private Integer id;
private Integer uId;
private Double total;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// order 实时流
DataStream orderDs = env.socketTextStream("192.168.10.42", 9999)
.map(o -> {
String[] lines = o.split(",");
return new Order(Integer.valueOf(lines[0]), Integer.valueOf(lines[1]), Double.valueOf(lines[2]));
});
// user 实时流
DataStream userDs = env.socketTextStream("192.168.10.42", 8888)
.map(o -> {
String[] lines = o.split(",");
return new User(Integer.valueOf(lines[0]), lines[1], Double.valueOf(lines[2]), Integer.valueOf(lines[3]), lines[4]);
}).setParallelism(1);
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor ruleStateDescriptor = new MapStateDescriptor(
// "RulesBroadcastState",
// BasicTypeInfo.STRING_TYPE_INFO,
// TypeInformation.of(new TypeHint() {
// }));
// 广播流,广播规则并且创建 broadcast state
// BroadcastStream ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
// 将user流(维表)定义为广播流
final MapStateDescriptor broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
Integer.class,
User.class);
BroadcastStream broadcastStream = userDs.broadcast(broadcastDesc);
// 需要由非广播流来进行调用
DataStream result = orderDs.connect(broadcastStream)
.process(new JoinBroadcastProcessFunctionImpl(broadcastDesc));
result.print();
env.execute();
}
// final BroadcastProcessFunction function)
// static class JoinBroadcastProcessFunctionImpl extends BroadcastProcessFunction<Order, User, Tuple2> {
// // 用于存储规则名称与规则本身的 map 存储结构
// MapStateDescriptor broadcastDesc;
// JoinBroadcastProcessFunctionImpl(MapStateDescriptor broadcastDesc) {
// this.broadcastDesc = broadcastDesc;
// }
// // 负责处理广播流的元素
// @Override
// public void processBroadcastElement(User value,
// BroadcastProcessFunction<Order, User, Tuple2>.Context ctx,
// Collector<Tuple2> out) throws Exception {
// System.out.println("收到广播数据:" + value);
// // 得到广播流的存储状态
// ctx.getBroadcastState(broadcastDesc).put(value.getId(), value);
// }
// // 处理非广播流,关联维度
// @Override
// public void processElement(Order value,
// BroadcastProcessFunction<Order, User, Tuple2>.ReadOnlyContext ctx,
// Collector<Tuple2> out) throws Exception {
// // 得到广播流的存储状态
// ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastDesc);
// out.collect(new Tuple2(value, state.get(value.getUId()).getName()));
// }
// }
}
3、验证
本示例使用的是两个socket数据源,通过netcat进行模拟。
1)、输入user数据
“192.168.10.42”, 8888
// user 流数据(维度表),由于未做容错处理,需要先广播维度数据,否则会出现空指针异常 // 1001,alan,18,20,alan.chan.chn@163.com // 1002,alanchan,19,25,alan.chan.chn@163.com // 1003,alanchanchn,20,30,alan.chan.chn@163.com // 1004,alan_chan,27,20,alan.chan.chn@163.com // 1005,alan_chan_chn,36,10,alan.chan.chn@163.com
2)、输入事实流订单数据
“192.168.10.42”, 9999
// order 流数据 // 16,1002,211 // 17,1004,234 // 18,1005,175
3)、观察程序控制台输出
// 控制台输出 // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1001, name=alan, balance=18.0, age=20, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1002, name=alanchan, balance=19.0, age=25, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1003, name=alanchanchn, balance=20.0, age=30, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1004, name=alan_chan, balance=27.0, age=20, email=alan.chan.chn@163.com) // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com) // ...... // 收到广播数据:TestJoinDimFromBroadcastDataStreamDemo.User(id=1005, name=alan_chan_chn, balance=36.0, age=10, email=alan.chan.chn@163.com) // 7> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=16, uId=1002, total=211.0),alanchan) // 8> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=17, uId=1004, total=234.0),alan_chan) // 9> (TestJoinDimFromBroadcastDataStreamDemo.Order(id=18, uId=1005, total=175.0),alan_chan_chn)
以上,本文详细的介绍了broadcast state的具体使用,并以两个例子分别介绍了BroadcastProcessFunction和KeyedBroadcastProcessFunction的具体实现。
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/db3ba11fd7.html
