FLINK v2-异步IO的设计与实现

动机

什么原因激发了实现异步IO这个想法

 多数情况下,IO访问是一个很耗时的过程,对于单个算子来说,IO使得吞吐量TPS比在内存计算中要小很多,特别是在流作业中用户最关心的低延迟问题的时候。虽然可以使用多线程来处理这类问题,但是缺点也是显而易见的:由于用户必须实现算子上的线程模型,从而将编程模型变得更加复杂。此外,还要注意与检查点机制的协调。

场景

什么样的应用场景更需要异步IO

 对于(机器学习)ML流作业,数据流必须从HBase(一个包含数十亿条记录的数据集合)获取数据,然后根据它进行计算。这个作业的瓶颈是访问HBase的算子设计。尽管HBase已经进行了高度优化,但由于I/O操作较慢,每个子任务的TPS不能很高,因此整个集群的QPS非常高。

设计

设计图

image

时序图

image

异步函数-AsyncFunction接口

AsyncFunction

 AsyncFunction在AsyncWaitOperator中作为一个用户函数,它看起来像StreamFlatMap算子,有open()/processElement(StreamRecord< in > record)/processWatermark(Watermark mark)方法。
 对于用户自己实现的AsyncFunction,必须重写asyncInvoke(IN input, AsyncCollector collector)来提供调用异步操作的代码。

1
2
3
4
5
6
7
8
9
10
public interface AsyncFunction<IN, OUT> extends Function, Serializable {
/**
* Trigger async operation for each stream input.
* The AsyncCollector should be registered into async client.
*
* @param input Stream Input
* @param collector AsyncCollector
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
}

异步资源

 异步资源是指用于执行异步操作的客户端或资源连接。比如以从HBase获取数据结果为例,异步资源可以是一个HBase连接池。
 用户可以将async资源作为成员变量放在AsyncFunction中。如果不能序列化,则可以使用关键字transient。

与AsyncWaitOperator进行交互

 对于AsyncWaitOperator算子的每一个输入的流记录,都会被AsyncFunction.asyncInvoke(IN input, AsyncCollector cb)方法来处理。然后AsyncCollector会将数据追加到AsyncCollectorBuffer里面。稍后将会介绍AsyncCollector和AsyncCollectorBuffer。

AsyncCollector

 AsyncCollector由AsyncWaitOperator创建,并传递给AsyncFunction,在这里它应该被添加到用户的回调函数中。它充当从用户代码中获取结果或错误的角色,并通知AsyncCollectorBuffer发出结果。
 对用来说有个特别的方法collect,当异步操作完成或抛出错误时,应该调用它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class AsyncCollector<OUT> {
private List<OUT> result;
private Throwable error;
private AsyncCollectorBuffer<OUT> buffer;
/**
* Set result
* @param result A list of results.
*/
public void collect(List<OUT> result) {
this.result = result;
buffer.mark(this);
}
/**
* Set error
* @param error A Throwable object.
*/
public void collect(Throwable error) {
this.error = error;
buffer.mark(this);
}
/**
* Get result. Throw RuntimeException while encountering an error.
* @return A List of result.
* @throws RuntimeException RuntimeException wrapping errors from user codes.
*/
public List<OUT> getResult() throws RuntimeException { ... }
}

 在调用AsyncFunction的asyncInvoke方法之前, AsyncWaitOperator将尝试从AsyncCollectorBuffer获取AsyncCollector的实例。然后它将被放到用户的回调函数中。如果缓冲区已满,它将一直等到一些正在进行的回调完成。
 一旦异步操作完成,AsyncCollector.collect()将收集结果或错误,同时AsyncCollectorBuffer将会得到通知。
 AsyncCollector是由FLINK实现的。

AsyncCollectorBuffer

 AsyncCollectorBuffer保存所有的asynccollector,并将结果发送给下一个节点。
 当调用AsyncCollector.collect()时,将在AsyncCollectorBuffer中放置一个标记,指示完成的AsyncCollectors。一旦AsyncCollector获得结果,一个名为Emitter的工作线程也会收到信号,然后根据有序或无序设置尝试发出结果。
 为了简单起见,我们将在下面的文本中将task引用到AsyncCollectorBuffer中的AsycnCollector。

image

有序和无序

 根据用户配置,保证或者不保证输出元素的顺序。如果顺序得不到保证,后来完成的AsyncCollectors有可能会被提前发出去。

Emitter线程

 Emitter线程会等待完成的AsyncCollectors,当Emitter线程收到信号时。会根据配置对缓冲区的结果作如下任务处理:

  1. 有序模式
    如果缓冲区中的第一个任务已经完成,那么Emitter将收集它的结果,然后继续执行第二个任务。如果第一个任务还没有完成,就再等一次。
  2. 无序模式
    检查缓冲区中所有已完成的任务,并从缓冲区中最老的水印之前的任务中收集结果。

 Emitter线程和Task线程将通过从StreamTask获取/释放检查点锁来独占访问。
 当所有任务完成时,向Task线程发出信号,通知它所有数据都已处理完毕,可以关闭算子。
 从缓冲区中删除一些任务后,向Task线程发送信号。
 将异常抛给Task线程。

Task线程

 只有Emitter线程能够访问AsyncCollectorBuffer。
 获取并向缓冲区添加一个新的AsyncCollector,在缓冲区满时等待。

水印

 所有的水印也将保存在AsyncCollectorBuffer中。当且仅当在所有AsyncCollectors发出当前水印之前的异步收集器都已发出时,才会发出水印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface AsyncCollectorBuffer<IN, OUT> {
/**
* Add an AsyncCollector into the buffer.
*
* @param collector AsyncCollector
* @throws Exception InterruptedException or exceptions from AsyncCollector.
*/
void add(AsyncCollector<OUT> collector) throws Exception;
/**
* Notify the Emitter Thread that a AsyncCollector has completed.
*
* @param collector Completed AsyncCollector
* @throws Exception InterruptedException.
*/
void mark(AsyncCollector<OUT> collector) throws Exception;
/**
* Caller will wait here if buffer is not empty, meaning that not all tasks have returned yet.
*
* @throws Exception InterruptedException or Exceptions from AsyncCollector.
*/
void waitEmpty() throws Exception;
}

状态、容错和检查点

 一个新的算子AsyncWaitOperator<IN, OUT>已经添加到Flink Streaming里了。
 这个算子会对所有AsyncCollectors进行缓冲,然后将处理后的数据发送给后面的其它算子。

image

状态和检查点

 所有输入的流记录都会被保存在state中,在对算子状态进行快照时,AsyncWaitOperator算子会将AsyncCollectorBuffer中所有输入数据流存入state,而不是在处理时对单个输入流一个接一个的存入state,这些流数据会在持久化之前被清除。
 当AsyncWaitOperator算子中所有屏障都已到达时,可以立即进行检查点操作(快照)。

容错

 在恢复算子的状态时,算子将扫描状态中的所有元素,获取AsyncCollectors,调用AsyncFunction.asyncInvoke()并将它们插入AsyncCollectorBuffer中。

API

 在FLINK中不修改当前的DataStream类。AsyncDataStream将处理AsyncWaitOperator算子。

AsyncDataStream

 AsyncDataStream提供了两种方法将AsyncWaitOperator和AsyncFunction添加到FLINK流作业中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class AsyncDataStream {
/**
* Add an AsyncWaitOperator. The order of output stream records may be reordered.
*
* @param func AsyncWaitFunction
* @return A new DataStream.
*/
public static DataStream<OUT> unorderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);
/**
* Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
*
* @param func AsyncWaitFunction
* @return A new DataStream.
*/
public static DataStream<OUT> orderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);
}

错误(异常)处理

 异常可以抛出到框架代码中,从而可以引发任务容错。

注意

异步资源共享

 在同一个TaskManager(JVM进程),我们可以使用静态的资源连接,以便同一进程中的所有线程都可以共享同一个实例。
 资源如HBase连接、Netty连接,所有线程如不同的slots(task workers)。
 当然也要特别注意线程安全问题。

HBase Example

使用回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
// UserCallback is from user’s async client.
((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
}
}
// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
DataStream<String> source = getDataStream(env);
DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
stream.print();
}

使用ListenableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;
public class HBaseAsyncFunction implements AsyncFunction<String, String> {
// initialize it while reading object
transient Connection connection;
@Override
public void asyncInvoke(String val, AsyncCollector<String> c) {
Get get = new Get(Bytes.toBytes(val));
Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
ListenableFuture<Result> future = ht.asyncGet(get);
Futures.addCallback(future,
new FutureCallback<Result>() {
@Override public void onSuccess(Result result) {
List ret = new ArrayList<String>();
ret.add(result.get(...));
c.collect(ret);
}
@Override public void onFailure(Throwable t) {
c.collect(t);
}
},
MoreExecutors.newDirectExecutorService()
);
}
}

Apache Flink Home

origional document

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%