Streaming介绍

使用Apache Flink进行流式处理

 四个关键概念:流数据的持续处理、事件时间、有状态的流计算、状态快照。

流处理

 有界数据流:bounded streams 有始有终。
 无界数据流:unbounded streams 有始无终。
 批处理:Batch processing 对有界数据流的处理。
 流处理:Stream processing 对无界数据流的处理。

image

 Flink程序由数据流(flow)和定义好的一系列操作组成。这些数据流(flow)构成了包含一或多个数据源和数据输出组成的有向图。

image

 Flink程序不仅可以处理来自消息队列或者分布式日志系统的实时数据,还可以处理历史的有界数据。
 Flink处理后的数据结果可以被发送或存储到各种各样的存储系统或消息队列中。
 Flink持有的State状态数据可以通过REST API进行访问。

image

及时流处理

 事件时间和处理事件。

有状态的流处理

image

 状态总是在本地访问,这样使得Flink程序实现高吞吐和低延迟。
 状态数据可以存储在JVM堆内存、或者状态数据量很大时存储到磁盘(如HDFS RocksDB)。

image

【健壮的】流处理

 状态快照和流重放(重播)实现了容错和恰好一次语义。

什么样的数据可以作为流数据?

 数据类型:String, Long, Integer, Boolean, Array
 Tuples, POJOs, and Scala case classes

Java

 Tuples

1
2
3
4
5
Tuple2<String, Integer> person = new Tuple2<>("Fred", 35);

// zero based index!
String name = person.f0;
Integer age = person.f1;

 POJOs

1
2
3
4
5
6
7
8
9
10
public class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {

};
}

Person person = new Person("Fred Flintstone", 35);

Scala tuples and case classes

一个完整的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));

DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});

adults.print();

env.execute();
}

public static class Person {
public String name;
public Integer age;
public Person() {};

public Person(String name, Integer age) {
this.name = name;
this.age = age;
};

public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}

Stream执行环境

 DataStream API调用会构建一个任务图附着在StreamExecutionEnvironment上面,当env.execute()被调用时,任务图会被打包大送给任务管理器JobManager,JobManager对作业进行分区(并行化)并将其分片分发给任务管理器TaskManager以供执行。并行的每一个分片会在一个task slot里面执行。如果execute()没有被调用,程序将不会被执行。

image

数据来源

 fromElements…fromCollection…Socket…File…

1
2
3
4
5
6
7
List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);
1
DataStream<String> lines = env.socketTextStream("localhost", 9999)
1
DataStream<String> lines = env.readTextFile("file:///path");

数据输出

 print()…text file…CSV file…socket…

1
2
3
4
5
stream.writeAsText("/path/to/file")

stream.writeAsCsv("/path/to/file")

stream.writeToSocket(host, port, SerializationSchema)

Debugging

  1. 查看日志:job manager 和 task manager logs
  2. IDE里面进行断点调试

training.ververica

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%