flink1.17.0 集成kafka,并且计算
•
大数据
前言
flink是实时计算的重要集成组件,这里演示如何集成,并且使用一个小例子。例子是kafka输入消息,用逗号隔开,统计每个相同单词出现的次数,这么一个功能。
一、kafka环境准备
1.1 启动kafka
这里我使用的kafka版本是3.2.0,部署的方法可以参考,
kafka部署
cd kafka_2.13-3.2.0 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties
启动后查看java进程是否存在,存在后执行下一步。
1.2 新建topic
新建一个专门用于flink消费topic
bin/kafka-topics.sh --create --topic flinkTest --bootstrap-server 192.168.184.129:9092
1.3 测试生产消费是否正常
生产端:
bin/kafka-console-producer.sh --topic flinkTest --bootstrap-server 192.168.184.129:9092
客户端:
bin/kafka-console-consumer.sh --topic flinkTest --from-beginning --bootstrap-server 192.168.184.129:9092
1.4 测试生产消费
在生产端输入aaa

查看客户端是否能消费到

可以看到客户端已经消费成功了,kafka环境准备好了。
二、flink集成kafka
2.1 pom文件修改
pom文件修改之前,先看看官网的指导依赖是什么样的,
这里我们使用的是datastream api去做,
flink1.17.0官方文档

这里说明了相关的依赖需要引入的依赖包的版本,还有使用kafka消费的时候需要引入的连接包版本

完整的pom引入依赖如下:
4.0.0
com.wh.flink
flink
1.0-SNAPSHOT
flink
http://www.example.com
UTF-8
1.8
1.8
1.17.1
org.apache.flink
flink-connector-kafka
${flink.version}
org.apache.flink
flink-java
${flink.version}
<!--provided-->
org.apache.flink
flink-streaming-java
${flink.version}
<!--provided-->
<!-- -->
<!-- org.apache.flink-->
<!-- flink-connector-kafka-0.11_2.11-->
<!-- ${flink.version}-->
<!-- -->
junit
junit
4.11
test
org.apache.flink
flink-scala_2.12
${flink.version}
<!--provided-->
org.apache.flink
flink-streaming-scala_2.12
${flink.version}
<!--provided-->
<!---->
<!--org.scala-lang-->
<!--scala-library-->
<!--2.11.12-->
<!---->
<!---->
<!--org.slf4j-->
<!--slf4j-log4j12-->
<!--1.7.25-->
<!--test-->
<!---->
<!---->
<!--log4j-->
<!--log4j-->
<!--1.2.17-->
<!---->
<!---->
<!--org.slf4j-->
<!--slf4j-api-->
<!--1.7.25-->
<!---->
<!---->
<!--org.slf4j-->
<!--slf4j-nop-->
<!--1.7.25-->
<!--test-->
<!---->
<!---->
<!--org.slf4j-->
<!--slf4j-simple-->
<!--1.7.5-->
<!---->
<!-- -->
<!-- org.scala-tools-->
<!-- maven-scala-plugin-->
<!-- 2.15.2-->
<!-- -->
<!-- -->
<!-- -->
<!-- compile-->
<!-- testCompile-->
<!-- -->
<!-- -->
<!-- -->
<!-- -->
maven-assembly-plugin
2.4
<!--false-->
com.hadoop.demo.service.flinkDemo.FlinkDemo
jar-with-dependencies
make-assembly
package
assembly
项目结构如图

2.2 代码编写
package com.hadoop.demo.service.flinkDemo;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.Iterator;
public class FlinkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//新建kafka连接
KafkaSource kfkSource = KafkaSource.builder()
.setBootstrapServers("192.168.184.129:9092")
.setGroupId("flink")
.setTopics("flinkTest")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
//添加到flink环境
DataStreamSource lines = env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), "kafka source");
//根据逗号分组
SingleOutputStreamOperator<Tuple2> map = lines.flatMap(new FlatMapIterator() {
@Override
public Iterator flatMap(String s) throws Exception {
return Arrays.asList(s.split(",")).iterator();
}
}).map(new MapFunction<String, Tuple2>() {
@Override
public Tuple2 map(String s) throws Exception {
return new Tuple2(s, 1);
}
});
//统计每个单词的数量
SingleOutputStreamOperator<Tuple2> sum = map.keyBy(0).sum(1);
sum.print();
//System.out.println(sum.get);
env.execute();
}
}
2.3 maven打包
点击打包按钮,这里注意要选择带依赖的jar包,否则会出现以下错误。
NoClassDefFoundError: org/apache/flink/connector/kafka/source/KafkaSource
三、测试
3.1启动 hadoop集群,启动flink集群
这里如果不知道怎么搭建这两个集群可以看我其他文章
hadoop集成flink
./hadoop.sh start ./bin/yarn-session.sh --detached
3.2 上传jar包到flink集群

上传后填写主类类名,点击提交

3.3 测试
点击后,可以看到执行job这里能看到在运行的job

点击运行的task

点击输出

这里可以看到输出内容,
在kafka消费端输入内容,

这里的jbs出现了4次,看下输出控制台,

可以看到这里依次累加了四次,说明统计生效了。
总结
这里只是做了一个简单的消费kafka的flink例子,消费成功后还可以通过sink发送出去,还可以用transform进行转换,这里后面再演示,如果不对的可以指出。
本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/9caa6c60f9.html
