Apache Flink中keyBy三种方式指定key

keyBy 如何指定key

DataStream

DataStream input = // […]

DataStream windowed = input

.keyBy(/define key here/)

.window(/window specification/);

类似于mysql中的join操作:select a.* , b.* from a join b on a.id=b.id

这里的keyBy就是a.id=b.id

有哪几种方式定义Key?

方式一:Tuple

DataStream<Tuple3> input = // […]

KeyedStream<Tuple3,Tuple> keyed = input.keyBy(0)

可以传字段的位置

DataStream<Tuple3> input = // […]

KeyedStream<Tuple3,Tuple> keyed = input.keyBy(0,1)

可以传字段位置的组合

这对于简单的使用时没问题的。但是对于内嵌的Tuple,如下所示:

DataStream<Tuple3<Tuple2,String,Long>> ds;

如果使用keyBy(0),那么他就会使用整个Tuple2作为key,(因为Tuple2是Tuple3<Tuple2,String,Long>的0号位置)。如果想要指定key到Tuple2内部中,可以使用下面的方式。

方式二:字段表达式

我们可以使用基于字符串字段表达式来引用内嵌字段去定义key。

之前我们的算子写法是这样的:

text.flatMap(new FlatMapFunction<String, Tuple2>() {
            @Override
            public void flatMap(String value, Collector<Tuple2> out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for(String token: tokens) {
                    if(token.length() > 0) {
                        out.collect(new Tuple2(token, 1));
                    }
                }
            }
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

其中的new FlatMapFunction<String, Tuple2>表示输入是一个String,输出是一个Tuple2。这里我们重新定义一个内部类:

public static class WC {
        private String word;
        private int count;
 
        public WC() {
        }
 
        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }
 
        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
 
        public String getWord() {
            return word;
        }
 
        public void setWord(String word) {
            this.word = word;
        }
 
        public int getCount() {
            return count;
        }
 
        public void setCount(int count) {
            this.count = count;
        }
    }

修改算子的写法:

  text.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                String[] tokens = value.toLowerCase().split(",");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new WC(token, 1));
                    }
                }
            }
        }).keyBy("word").timeWindow(Time.seconds(5)).sum("count").print().setParallelism(1);

将原来的输出Tuple2,修改为输出WC类型;将原来的keyBy(0)修改为keyBy(“word”);将原来的sum(1)修改为sum(“count”)

因此,在这个例子中我们有一个POJO类,有两个字段分别是”word”和”count”,可以传递字段名到keyBy(“”)中。

语法:

字段名一定要与POJO类中的字段名一致。一定要提供默认的构造函数,和get与set方法。

使用Tuple时,0表示第一个字段

可以使用嵌套方式,举例如下:

public static class WC {
  public ComplexNestedClass complex; //nested POJO
  private int count;
  // getter / setter for private field (count)
  public int getCount() {
    return count;
  }
  public void setCount(int c) {
    this.count = c;
  }
}
public static class ComplexNestedClass {
  public Integer someNumber;
  public float someFloat;
  public Tuple3 word;
  public IntWritable hadoopCitizen;
}

“count”,指向的是WC中的字段count

“complex”,指向的是复杂数据类型,会递归选择所有ComplexNestedClass的字段

“complex.word.f2”,指向的是Tuple3中的最后一个字段。

“complex.hadoopCitizen”,指向的是Hadoop IntWritable type

scala写法:

object StreamingWCScalaApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
    // 引入隐式转换
    import org.apache.flink.api.scala._
 
    val text = env.socketTextStream("192.168.152.45", 9999)
    text.flatMap(_.split(","))
        .map(x => WC(x,1))
        .keyBy("word")
        .timeWindow(Time.seconds(5))
        .sum("count")
        .print()
        .setParallelism(1)
 
    env.execute("StreamingWCScalaApp");
  }
  case class WC(word: String, count: Int)
}

方式三:key选择器函数

.keyBy(new KeySelector() {
            @Override
            public Object getKey(WC value) throws Exception {
                return value.word;
            }
        })

其中WC是输入类型,Object是数据类型在这里插入代码片

本文来自网络,不代表协通编程立场,如若转载,请注明出处:https://www.net2asp.com/3c3cf9e09e.html