Flink学习-实验3-Side Outputs

1. Flink学习-Streaming介绍
2. Flink学习-设置开发环境
3. Flink学习-实验1-对流数据进行过滤
4. Flink学习-实验1-对流数据进行转换
5. Flink学习-实验1-keyby分组
6. Flink学习-实验1-有状态的转换
7. Flink学习-实验1-相互连接的流
8. Flink学习-实验2-连接实现流join
9. Flink学习-实验3-时间和水印
10. Flink学习-实验3-窗口
11. Flink学习-实验3-ProcessFunction
flink-quickstart
flink-training-exercises

使用侧面输出的场景

Side Outputs【侧面输出】

  1. 异常【Exception】
  2. MALFORMED异常事件【java.lang.IllegalArgumentException:MALFORMED】
  3. 延迟事件
  4. 操作警报,例如与外部服务的超时连接

Side Outputs

 每个侧面输出通道都与OutputTag相关联。
 标签具有与侧面输出的DataStream的类型相对应的泛型类型,并且它们是有名称的。
 具有相同名称的两个OutputTag应具有相同的类型,并且将引用相同的侧面输出。

在这个例子中,我们将经典的单词计数程序扩展为仅计算至少5个字母长的单词,并将较短的单词发送到侧面输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WordCount {
static final OutputTag<String> shortWordsTag = new OutputTag<String>("short") {};

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = env
.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
.process(new Tokenizer());

DataStream<String> shortWords = tokenized.getSideOutput(shortWordsTag);
shortWords.print();

DataStream<Tuple2<String, Integer>> wordCounts = tokenized.keyBy(0).sum(1);
wordCounts.print();

env.execute("Streaming WordCount");
}

public static final class Tokenizer
extends ProcessFunction<String, Tuple2<String, Integer>> { ... }
}

 请注意,如果要访问侧面输出流,则需要捕获ProcessFunction发出的流,并从那里访问侧面输出。

 下面我们会看到传递给processElement方法的上下文用于写入侧面输出,方法是使用输出标记指定要写入的侧面输出。
 在这个例子中,长度小于5的发给了shortWordsTag,而剩余的单词被out收集起来,采用经典的单词计数方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {

@Override
public void processElement(String value, Context ctx, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");

for (String token : tokens) {
if (token.length() < 5) {
// send short words to a side output
ctx.output(shortWordsTag, token);
} else if (token.length() > 0) {
// emit the pair
out.collect(new Tuple2<>(token, 1));
}
}
}
}
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%