【flink番外篇】17、DataStream 和 Table集成-仅插入流Insert-Only示例
Flink 系列文章
一、Flink 专栏
Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。
-
1、Flink 部署系列
本部分介绍Flink的部署、配置相关基础内容。
-
2、Flink基础系列
本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。
-
3、Flik Table API和SQL基础系列
本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。
-
4、Flik Table API和SQL提高与应用系列
本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。
-
5、Flink 监控系列
本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。
两专栏的所有文章入口点击:Flink 系列文章汇总索引
文章目录
- Flink 系列文章
- 一、DataStream 和 Table集成-仅插入流Insert-Only示例
-
- 1、maven依赖
- 2、Insert-Only集成说明
- 3、fromDataStream 示例
- 4、createTemporaryView 示例
- 5、toDataStream示例
本文介绍了Flink 的insert-only流的datastream和table的相互转换三个示例。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。
更多详细内容参考文章:
21、Flink 的table API与DataStream API 集成(完整版)
一、DataStream 和 Table集成-仅插入流Insert-Only示例
1、maven依赖
UTF-8
UTF-8
1.8
1.8
1.8
2.12
1.17.0
org.apache.flink
flink-clients
${flink.version}
provided
org.apache.flink
flink-java
${flink.version}
provided
org.apache.flink
flink-table-common
${flink.version}
provided
org.apache.flink
flink-streaming-java
${flink.version}
provided
org.apache.flink
flink-table-api-java-bridge
${flink.version}
provided
org.apache.flink
flink-sql-gateway
${flink.version}
provided
org.apache.flink
flink-csv
${flink.version}
provided
org.apache.flink
flink-json
${flink.version}
provided
org.apache.flink
flink-table-planner_2.12
${flink.version}
provided
org.apache.flink
flink-table-api-java-uber
${flink.version}
provided
org.apache.flink
flink-table-runtime
${flink.version}
provided
org.apache.flink
flink-connector-jdbc
3.1.0-1.17
mysql
mysql-connector-java
5.1.38
org.apache.flink
flink-connector-hive_2.12
1.17.0
org.apache.hive
hive-exec
3.1.2
org.apache.flink
flink-connector-kafka
${flink.version}
org.apache.flink
flink-sql-connector-kafka
${flink.version}
provided
org.apache.commons
commons-compress
1.24.0
org.projectlombok
lombok
1.18.2
<!-- provided -->
2、Insert-Only集成说明
StreamTableEnvironment提供了以下方法进行datastream的转换API:
- fromDataStream(DataStream):将仅插入更改和任意类型的流解释为表。默认情况下,不会传播事件时间和水印。
- fromDataStream(DataStream, Schema):将仅插入更改和任意类型的流解释为表。可选模式允许丰富列数据类型,并添加时间属性、水印策略、其他计算列或主键。
- createTemporaryView(String, DataStream):注册一个可以在sql中访问的流名称(虚表、视图)。它是createTemporaryView(String,fromDataStream(DataStream))的快捷方式。
- createTemporaryView(String, DataStream, Schema):注册一个可以在sql中访问的流名称(虚表、视图)。 它是createTemporaryView(String,fromDataStream(DataStream,Schema))的快捷方式。
- toDataStream(Table):将表转换为仅插入更改的流。默认的流记录类型为org.apache.flink.types.Row。将单个rowtime属性列写回DataStream API的记录中。水印也会传播。
- toDataStream(Table, AbstractDataType):将表转换为仅插入更改的流。该方法接受数据类型来表示所需的流记录类型。planner 可以插入隐式转换和重新排序列,以将列映射到(可能是嵌套的)数据类型的字段。
- toDataStream(Table, Class):toDataStream(Table,DataTypes.of(Class))的快捷方式,用于反射地快速创建所需的数据类型。
从Table API的角度来看,和DataStream API的转换类似于读取或写入在SQL中使用CREATE Table DDL定义的虚拟表连接器。
虚拟CREATE TABLE name(schema)WITH(options)语句中的模式部分可以自动从DataStream的类型信息中派生、丰富或完全使用org.apache.flink.table.api.Schema手动定义。
The virtual DataStream table connector exposes the following metadata for every row:
虚拟DataStream table 连接器为每一行暴露以下元数据:
| Key | Data Type | Description | R/W |
|---|---|---|---|
| rowtime | TIMESTAMP_LTZ(3) NOT NULL | Stream record’s timestamp. | R/W |
虚拟DataStream table source实现SupportsSourceWatermark,因此允许调用source_WATERMARK()内置函数作为水印策略,以采用来自DataStream API的水印。
3、fromDataStream 示例
下面的代码展示了如何将fromDataStream用于不同的场景。其输出结果均在每个步骤的输出注释部分。
import java.time.Instant;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestFromDataStreamDemo {
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class User {
public String name;
public Integer score;
public Instant event_time;
}
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、创建数据源
DataStream dataStream =
env.fromElements(
new User("alan", 4, Instant.ofEpochMilli(1000)),
new User("alanchan", 6, Instant.ofEpochMilli(1001)),
new User("alanchanchn", 10, Instant.ofEpochMilli(1002)));
// 示例1、显示table的数据类型
// 说明了不需要基于时间的操作时的简单用例。
Table table = tenv.fromDataStream(dataStream);
// table.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9)
// )
// 示例2、增加一列,并显示table的数据类型
// 这些基于时间的操作应在处理时间内工作的最常见用例。
Table table2 = tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.build());
// table2.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `proc_time` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
// )
// 示例3、增加rowtime列,并增加watermark
Table table3 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
.build());
// table3.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* AS CAST(event_time AS TIMESTAMP_LTZ(3)),
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS rowtime - INTERVAL '10' SECOND
// )
// 示例4、增加rowtime列,并增加watermark(SOURCE_WATERMARK()水印策略假设已经实现了,本部分仅仅是展示用法)
// 基于时间的操作(如窗口或间隔联接)应成为管道的一部分时最常见的用例。
Table table4 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
// table4.printSchema();
// (
// `name` STRING,
// `score` INT,
// `event_time` TIMESTAMP_LTZ(9),
// `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
// WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
// )
// 示例5、修改event_time类型长度,增加event_time的水印策略(SOURCE_WATERMARK()水印策略假设已经实现了,本部分仅仅是展示用法)
// 完全依赖于用户的声明。这对于用适当的数据类型替换DataStream API中的泛型类型(在Table API中是RAW)很有用。
Table table5 =
tenv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("event_time", "TIMESTAMP_LTZ(3)")
.column("name", "STRING")
.column("score", "INT")
.watermark("event_time", "SOURCE_WATERMARK()")
.build());
table5.printSchema();
// (
// `event_time` TIMESTAMP_LTZ(3) *ROWTIME*,
// `name` STRING,
// `score` INT
// )
env.execute();
}
public static void main(String[] args) throws Exception {
test1() ;
}
}
由于DataType比TypeInformation更丰富,我们可以轻松地启用不可变POJO和其他复杂的数据结构。
下面的Java示例显示了可能的情况。
package org.tablesql.convert;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestFromDataStreamDemo {
// user2的属性都加上了final修饰符
public static class User2 {
public final String name;
public final Integer score;
public User2(String name, Integer score) {
this.name = name;
this.score = score;
}
}
public static void test2() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
//the DataStream API does not support immutable POJOs yet, the class will result in a generic type that is a RAW type in Table API by defaul
//DataStream API尚不支持不可变POJO,该类的结果默认情况下将是一个Table API中是RAW类型的泛型。
// 2、创建数据源
DataStream dataStream = env.fromElements(
new User2("Alice", 4),
new User2("Bob", 6),
new User2("Alice", 10));
// 示例1:输出表结构
Table table = tenv.fromDataStream(dataStream);
// table.printSchema();
// (
// `f0` RAW('org.tablesql.convert.TestFromDataStreamDemo$User2', '...')
// )
// 示例2:声明式输出表结构
// 在自定义模式中使用table API的类型系统为列声明更有用的数据类型,并在下面的“as”投影中重命名列
Table table2 = tenv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column("f0", DataTypes.of(User2.class))
.build())
.as("user");
// table2.printSchema();
// (
// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2*
// )
//示例3:数据类型可以如上所述反射地提取或显式定义
//
Table table3 = tenv
.fromDataStream(
dataStream,
Schema.newBuilder()
.column(
"f0",
DataTypes.STRUCTURED(
User2.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT())))
.build())
.as("user");
table3.printSchema();
// (
// `user` *org.tablesql.convert.TestFromDataStreamDemo$User2*
// )
env.execute();
}
public static void main(String[] args) throws Exception {
test2();
}
}
4、createTemporaryView 示例
DataStream可以直接注册为视图。
从DataStream 创建的视图只能注册为临时视图。由于它们的内联/匿名性质,无法在永久目录(permanent catalog)中注册它们。
下面的代码展示了如何对不同的场景使用createTemporaryView。每个示例中的运行结果均在输出部分以注释展示。
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author alanchan
*
*/
public class TestCreateTemporaryViewDemo {
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、创建数据源
DataStream<Tuple2> dataStream = env.fromElements(Tuple2.of(12L, "alan"), Tuple2.of(0L, "alanchan"));
// 示例1:创建视图、输出表结构
tenv.createTemporaryView("MyView", dataStream);
tenv.from("MyView").printSchema();
// (
// `f0` BIGINT NOT NULL,
// `f1` STRING
// )
// 示例2:创建视图、输出表结构,使用Schema显示定义列,类似于fromDataStream的定义
//在这个例子中,输出的NOT NULL没有定义
tenv.createTemporaryView(
"MyView",
dataStream,
Schema.newBuilder()
.column("f0", "BIGINT")
.column("f1", "STRING")
.build());
tenv.from("MyView").printSchema();
// (
// `f0` BIGINT,
// `f1` STRING
// )
// 示例3:创建视图,并输出表结构
// 在创建视图前修改(或定义)列名称,as一般是指重命名,原名称是f0、f1
tenv.createTemporaryView(
"MyView",
tenv.fromDataStream(dataStream).as("id", "name"));
tenv.from("MyView").printSchema();
// (
// `id` BIGINT NOT NULL,
// `name` STRING
// )
env.execute();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
test1();
}
}
5、toDataStream示例
下面的代码展示了如何在不同的场景中使用toDataStream。每个示例中的运行结果均在输出部分以注释展示。
import java.time.Instant;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
public class TestToDataStreamDemo {
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class User {
public String name;
public Integer score;
public Instant event_time;
}
static final String SQL = "CREATE TABLE GeneratedTable "
+ "("
+ " name STRING,"
+ " score INT,"
+ " event_time TIMESTAMP_LTZ(3),"
+ " WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND"
+ ")"
+ "WITH ('connector'='datagen')";
public static void test1() throws Exception {
// 1、创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 2、建表
tenv.executeSql(SQL);
Table table = tenv.from("GeneratedTable");
// 示例1:table 转 datastream
// 使用默认的Row实例转换
// 由于`event_time`是单个行时间属性,因此它被插入到DataStream元数据中,并传播水印
// DataStream dataStream = tenv.toDataStream(table);
// dataStream.print();
// 以下是示例性输出,实际上是连续的数据
// 10> +I[9b979ecef142c06746ff2be0f79f4afe7ef7089f60f267184e052c12ef5f2c2a144c73d3653bee51b351ed5b20ecaf0673ec, -1424631858, 2023-11-14T02:58:56.071Z]
// 1> +I[444998c8992accc54e2c10cac4f4a976cda516d84817a8fd728c9d013da3d87e91d28537a564f09fb07308142ca83c2548e9, -1240938499, 2023-11-14T02:58:56.071Z]
// 12> +I[fa42df01fe1f789535df26f81c2e58c02feaeba60338e4cfb7c8fdb06ed96c69b46e9a966d93d0cf811b24dd9434a8ef2253, 2039663083, 2023-11-14T02:58:56.070Z]
// 1> +I[25aa121a0d656a5355c32148a0c68cc39ac05443bd7de6a0c499a2daae85868422dd024c6803598133dc26a607cd1e60e747, 1912789884, 2023-11-14T02:58:56.071Z]
// 示例2:table 转 datastream
// 从类“User”中提取数据类型,planner重新排序字段,并在可能的情况下插入隐式转换,以将内部数据结构转换为所需的结构化类型
// 由于`event_time`是单个行时间属性,因此它被插入到DataStream元数据中,并传播水印
DataStream dataStream2 = tenv.toDataStream(table, User.class);
// dataStream2.print();
// 以下是示例性输出,实际上是连续的数据
// 4> TestToDataStreamDemo.User(name=e80b612e48443a292c11e28159c73475b9ef9531b91d5712420753d5d6041a06f5de634348210b151f4fc220b4ec91ed5c72, score=2146560121, event_time=2023-11-14T03:01:17.657Z)
// 14> TestToDataStreamDemo.User(name=290b48dea62368bdb35567f31e5e2690ad8b5dd50c1c0f7184f15d2e85b24ea84155f1edef875f4c96e3a2133a320fcb6e41, score=2062379192, event_time=2023-11-14T03:01:17.657Z)
// 12> TestToDataStreamDemo.User(name=a0b31a03ad951b53876445001bbc74178c9818ece7d5e53166635d40cb8ef07980eabd7463ca6be38b34b1f0fbd4e2251df0, score=16953697, event_time=2023-11-14T03:01:17.657Z)
// 示例3:table 转 datastream
// 数据类型可以如上所述反射地提取或显式定义
DataStream dataStream3 =
tenv.toDataStream(
table,
DataTypes.STRUCTURED(
User.class,
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP_LTZ(3))));
dataStream3.print();
// 以下是示例性输出,实际上是连续的数据
// 9> TestToDataStreamDemo.User(name=49550693e3cb3a41cd785504c699684bf2015f0ebff5918dbdea454291c265d316773f2d9507ce73dd18f91a2f5fdbd6e500, score=744771891, event_time=2023-11-14T03:06:13.010Z)
// 2> TestToDataStreamDemo.User(name=60589709fe41decb647fcf4e2f91d45c82961bbe64469f3ea8a9a12b0cac071481ec9cfd65a9c218e3799986dd72ab80e457, score=-1056249244, event_time=2023-11-14T03:06:13.010Z)
// 15> TestToDataStreamDemo.User(name=d0a179f075c8b521bf5ecb08a32f6c715b5f2c616f815f8173c0a1c2961c53774faf396ddf55a44db49abe8085772f35d75c, score=862651361, event_time=2023-11-14T03:06:13.010Z)
env.execute();
}
public static void main(String[] args) throws Exception {
test1() ;
}
}
toDataStream仅支持非更新表。通常,基于时间的操作(如windows, interval joins或MATCH_RECOGNIZE子句)非常适合于在 insert-only pipelines的简单操作(如投影(projections )和过滤)。
具有生成更新的操作的管道可以使用toChangelogStream。
以上,本文介绍了Flink 的insert-only流的datastream和table的相互转换三个示例。
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/c419e2c4ec.html
