Flink|《Flink 官方文档 – DataStream API – 概览》学习笔记
学习文档:Flink 官方文档 – DataStream API – 概览
学习笔记如下:
DataStream
Flink 的 DataStream API:
- 数据里的起始是各种 source,例如消息队列、socket 流、文件等;
- 对数据流进行转换,例如过滤、更新状态、定义窗口、聚合等;
- 结果通过 sink 返回,例如可以将数据写入文件或标准输出。
DataStream:Flink 程序中的数据集合;可以将其理解为包含重复项的不可变数据集合。这些数据可以是有界的,也可以是无界的,但用于处理它们的 API 是相同的。
相较于常规的 Java 集合,DataStream 有以下差异:
- 不可变,一旦创建就不能添加或删除元素
- 不能简单地查看内部元素,只能使用 DataStream API 来处理它们
DataStream 源码:flink-streaming-java: org.apache.flink.streaming.api.datastream.DataStream
Flink 程序
Flink 程序看起来像一个转换 DataStream 的常规程序,但是 Flink 程序都是延迟执行的。当程序的 main() 方法被执行时,数据加载和转换不会直接发生,只会将每个算子都创建并添加到 dataflow 形成的有向图;只有被执行环境的 execute() 方法显式地被处罚后,这些算子才会真正执行。
每个程序由相同的基本部分组成:
Step 1|获取一个执行环境(execution environment)
通常,调用 StreamExecutionEnvironment 的如下静态方法获取执行环境:
- getExecutionEnvironment():通常调用这个方法即可
- createLocalEnvironment()
- createRemoteEnvironment(String host, int port, String… jarFiles)
样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Step 2|加载 / 创建初始数据
执行环境提供了一些方法,用于从任何第三方提供的 source 或本地文件中读取数据。这将生成一个 DataStream,可以在上面应用转换(trasformation)来创建新的派生 DataStream。
样例:以直接逐行读取本地文件中的数据
DataStream text = env.readTextFile("file:///path/to/file");
Step 3|指定数据相关的转换
可以通过调用 DataStream 上具有转换功能的方法来应用转换。
样例:使用 map 进行转换(将每个字符串转换为一个整数并创建一个新的 DataStream)
DataStream input = ...; DataStream parsed = input.map(new MapFunction() { @Override public Integer map(String value) { return Integer.parseInt(value); } });
Step 4|指定计算结果的存储位置
可以将包含最终结果的 DataStream,通过创建 sink 写出到外部系统。
样例:将数据结果写出到文件
writeAsText(String path);
Step 5|触发程序执行
需要调用 StreamExecutionEnvironment 的 execute() 或 executeAsync() 来触发程序执行。根据 StreamExecutionEnvironment 的类型,执行会在本地机器上触发,或提交到某个集群上执行。
- execute() 方法:等待作业完成,然后返回一个 JobExecutionResult,其中包括执行时间和累加器结果
- executeAsync() 方法:触发作业异步执行,它会返回一个 JobClient,可以通过它与刚刚提交的作业进行通信。
Data Source
Source 是 Flink 程序读取输入的地方。可以通过 StreamExecutionEnvironment.addSource(sourceFunction) 添加 Source,也可以使用 Flink 自带的 source function。
DataSink
Sink 是 Flink 程序写出结果的地方。可以通过 DataStream.addSink(sinkFunction) 添加 Sink,也可以使用 Flink 自带的 sink function。
但是需要注意,DataStream 的 write*() 方法主要用于调试目的,它们不涉及 checkpoint,因此这些函数通常具有至少一次语义。
Iterations
Iterations 是对数据进行迭代处理的机制。在使用 IterativeStream 时,需要指定哪一部分反馈给迭代,哪一部分使用旁路输出或使用过滤器转发到下游。通常来说,我们首先定义一个 IterativeStream 流,例如:
IterativeStream iteration = input.iterate();
然后,指定循环内执行的转换逻辑,例如:
DataStream iterationBody = iteration.map(/* this is executed many times */);
最后,使用 IterativeStream 的 closeWith(feedbackStream) 方法,定义迭代的结束条件。提供的 closeWith 的 DataStream 将反馈给迭代头。一种常见的模式,是使用过滤器将反馈的流部分和向前传播(重新迭代)的流部分分开。例如:
iteration.closeWith(iterationBody.filter(/* one part of the stream */)); // 继续迭代 DataStream output = iterationBody.filter(/* some other part of the stream */); // 传递给下游的流
样例:下面的程序从一系列整数中连续减去 1,直到它们达到零
DataStream someIntegers = env.generateSequence(0, 1000); IterativeStream iteration = someIntegers.iterate(); DataStream minusOne = iteration.map(new MapFunction() { @Override public Long map(Long value) throws Exception { return value - 1 ; } }); DataStream stillGreaterThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value > 0); } }); iteration.closeWith(stillGreaterThanZero); DataStream lessThanZero = minusOne.filter(new FilterFunction() { @Override public boolean filter(Long value) throws Exception { return (value <= 0); } });
控制延迟
如果将元素在网络上逐个传输,会导致大量频繁的网络请求。因此,Flink 不会将元素不会在网络上一一传输,而是会进行缓冲,每次通过通过传输直接传输整个完整的缓冲区。缓冲区的大小可以在 Flink 配置文件中传输。
触发缓冲区的网络传输,有两种情况:
- 缓冲区已满
- 缓冲区已达到缓冲区的最长等待时间,如果超过缓冲区的最长等待时间,那么即使缓冲区没有满也会被自动发送。超时时间的默认值为 100 毫秒
缓冲区的设置样例如下:
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setBufferTimeout(timeoutMillis); // 给流设置缓冲区超时时间 env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis); // 给 Operator 设置缓冲区超时时间
缓冲区的超时时间设置得越长,吞吐量越大,极限可以设置 setBufferTimeout(-1) 来关闭超时,这样缓冲区只有在它们已满时才会被刷新;缓冲区设置得越小,延迟越少,但应避免设置超时为 0 的缓冲区,因为它会导致严重的性能下降。
调试
本地执行环境
在本地调试时,可以使用 LocalStreamEnvironment 的本地执行环境,它将在创建它的同一个 JVM 进程中启动 Flink 程序。如果从 IDE 启动 LocalEnvironment,则可以在代码中设置断点已实现调试。
样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); DataStream lines = env.addSource(/* some source */); // 构建你的程序 env.execute();
从 Java 集合构造 Data Sources
在本地调试时,可以使用 fromElements 或 fromCollection 方法从 Java 集合中读取数据构造 DataStream。
样例:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); // 从元素列表创建一个 DataStream DataStream myInts = env.fromElements(1, 2, 3, 4, 5); // 从任何 Java 集合创建一个 DataStream List<Tuple2> data = ... DataStream<Tuple2> myTuples = env.fromCollection(data); // 从迭代器创建一个 DataStream Iterator longIt = ... DataStream myLongs = env.fromCollection(longIt, Long.class);
将 Data Sink 写出到 Java 迭代器
类似地,也可以将 DataStream 写出到本地的迭代器。
样例:
DataStream<Tuple2> myResult = ... Iterator<Tuple2> myOutput = myResult.collectAsync();
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/d7e00ca599.html
