Flink异步IO结合Redisson访问Redis

Flink异步IO源码简析。
使用Redisson框架封装的异步请求API。
对key进行异步累计递增计数和计算业务值并保存在Redis中。
Lua脚本和事务API。

FLINK v2-异步IO的设计与实现
Flink使用异步IO访问外部数据
AsyncRedisJob代码

AsyncFunction

 AsyncFunction是一个异步算子接口,本身继承Function和Serializable。
 asyncInvoke()方法会对每一个上游任务下发的流数据进行异步操作,操作完了将结果输出到ResultFuture,回调方式是把ResultFuture传入回调API,Future方式是要调用resultFuture.complete才算异步调用完成【回调和Future看外部系统客户端的封装】。
 timeout()方法用来处理异步调用超时的问题,有default修饰,有默认实现,可以不做处理,但通常要做进一步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@PublicEvolving
public interface AsyncFunction<IN, OUT> extends Function, Serializable {

/**
* Trigger async operation for each stream input.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;

/**
* {@link AsyncFunction#asyncInvoke} timeout occurred.
* By default, the result future is exceptionally completed with a timeout exception.
*
* @param input element coming from an upstream task
* @param resultFuture to be completed with the result data
*/
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
resultFuture.completeExceptionally(
new TimeoutException("Async function call has timed out."));
}

}

 AsyncFunction的使用:可以通过直接实现AsyncFunction接口的方式来自定义访问外部系统,比如HBase。

使用回调函数的例子:

1
2
3
4
5
6
7
8
public class HBaseAsyncFunc implements AsyncFunction<String, String> {

public void asyncInvoke(String row, ResultFuture<String> result) throws Exception {
HBaseCallback cb = new HBaseCallback(result);
Get get = new Get(Bytes.toBytes(row));
hbase.asyncGet(get, cb);
}
}

使用Future的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HBaseAsyncFunc implements AsyncFunction<String, String> {

public void asyncInvoke(String row, final ResultFuture<String> result) throws Exception {
Get get = new Get(Bytes.toBytes(row));
ListenableFuture<Result> future = hbase.asyncGet(get);
Futures.addCallback(future, new FutureCallback<Result>() {
public void onSuccess(Result result) {
List<String> ret = process(result);
result.complete(ret);
}

public void onFailure(Throwable thrown) {
result.completeExceptionally(thrown);
}
});
}
}
}

RichAsyncFunction

 RichAsyncFunction是个抽象类,由于继承了AbstractRichFunction,也就实现了RichFunction,RichFunction里面有着更多比较有用的方法,比如可以使用重新实现后的RuntimeContext。
 Function是在DataStream里的各种算子中被调用的。
 异步的AsyncFunction会被传入AsyncWaitOperator中,AsyncWaitOperator的processElement方法会去调用asyncInvoke(),会注册一个定时器去调用timeout()方法。
 AsyncWaitOperator是在异步的DataStream辅助类AsyncDataStream中被实例化后传入DataStream的transform()方法中被调用【算子名称是「async wait operator」】。

RichFunction接口

1
2
3
4
5
6
7
8
9
10
11
12
@Public
public interface RichFunction extends Function {
void open(Configuration var1) throws Exception;

void close() throws Exception;

RuntimeContext getRuntimeContext();

IterationRuntimeContext getIterationRuntimeContext();

void setRuntimeContext(RuntimeContext var1);
}

 RichAsyncFunction接口,主要是实现了RichAsyncFunctionIterationRuntimeContext和RichAsyncFunctionRuntimeContext。
 RichAsyncFunctionRuntimeContext:这个运行时上下文只支持线程安全的一些基本操作,像状态、全局累加、广播变量和分布式缓存都是不支持的。
 RichAsyncFunctionIterationRuntimeContext:本身继承RichAsyncFunctionRuntimeContext,所以限制同上,只是实现了IterationRuntimeContext接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@PublicEvolving
public abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {
private static final long serialVersionUID = 3858030061138121840L;

@Override
public void setRuntimeContext(RuntimeContext runtimeContext) {
Preconditions.checkNotNull(runtimeContext);

if (runtimeContext instanceof IterationRuntimeContext) {
super.setRuntimeContext(
new RichAsyncFunctionIterationRuntimeContext(
(IterationRuntimeContext) runtimeContext));
} else {
super.setRuntimeContext(new RichAsyncFunctionRuntimeContext(runtimeContext));
}
}

@Override
public abstract void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
...
...
}

AsyncWaitOperator

 主要看下processElement()方法,它是在OneInputStreamOperator接口中定义的,OneInputStreamOperator继承自StreamOperator接口。
 另外,如果需要自定义Operator,可以继承AbstractStreamOperator,也可以通过实现OneInputStreamOperator或TwoInputStreamOperator。
 AbstractUdfStreamOperator是用于自定义Function的抽象类,主要是用来处理自定义Function的打开和关闭;它也是StreamOperator,因为它继承了AbstractStreamOperator,而AbstractStreamOperator实现了StreamOperator。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Internal
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {

@Override
...
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);

if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();

final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});

// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}

addAsyncBufferEntry(streamRecordBufferEntry);

userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
...
}

AsyncDataStream

 AsyncDataStream是DataStream的辅助类,提供unorderedWait和orderedWait的静态方法,方法里是添加异步算子即AsyncWaitOperator。
 其中维护了一个有序和无序的枚举,一个默认的队列容量100。
 需要传入一个上游的DataStream、一个自定义的AsyncFunction(RichAsyncFunction)和超时时间,缓冲区队列大小和顺序可以默认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@PublicEvolving
public class AsyncDataStream {
public enum OutputMode { ORDERED, UNORDERED }

private static final int DEFAULT_QUEUE_CAPACITY = 100;

/**
* Add an AsyncWaitOperator.
*
* @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
* @param timeout for the asynchronous operation to complete
* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
* @param mode Processing mode for {@link AsyncWaitOperator}.
* @param <IN> Input type.
* @param <OUT> Output type.
* @return A new {@link SingleOutputStreamOperator}
*/
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {

TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncFunction.class,
0,
1,
new int[]{1, 0},
in.getType(),
Utils.getCallLocationName(),
true);

// create transform
AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
mode);

return in.transform("async wait operator", outTypeInfo, operator);
}
...
}

AsyncRedisRequest

 示例,没有太多逻辑。
 继承自RichAsyncFunction。
 在open方法中创建RedissonClient。
 在close方法中关闭RedissonClient。
 在asyncInvoke实现异步调用。
 在timeout中处理超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* AsyncRedisRequest
* Redis在数据访问方面是单线程的,通过并发获得的唯一好处是将协议和I/O工作分散到不同的线程中去做。
*/
public class AsyncRedisRequest extends RichAsyncFunction<String, String> {
private static Logger logger = LoggerFactory.getLogger(AsyncRedisRequest.class);

private static final long serialVersionUID = -8022470346098502168L;
private transient RedissonClient redissonClient = null;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// ((SingleServerConfig)config.useSingleServer().setTimeout(1000000)).setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create();
}

@Override
public void close() throws Exception {
super.close();
if (redissonClient != null && !redissonClient.isShutdown()) {
redissonClient.shutdown();
}
}

@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
POJO pojo = new Gson().fromJson(input, POJO.class);
pojo.getAid();
pojo.getLogTime();

/**
RBucket<String> bucket = redissonClient.getBucket("", StringCodec.INSTANCE);
RFuture<String> future = bucket.getAsync();
*/
RBucket<String> bucket = redissonClient.getBucket("asyncio_" + pojo.getAid(), StringCodec.INSTANCE);
RFuture future = bucket.getAndSetAsync(input, 24, TimeUnit.HOURS);

future.whenComplete((res, exception) -> {
resultFuture.complete(Arrays.asList(res.toString()));
});
}

@Override
public void timeout(String input, ResultFuture<String> resultFuture) throws Exception {
// 可以使用一个侧面输出处理一下
logger.info("timeout: ");
}

}

示例Job:异步执行Redisson事务和Lua脚本

数据源Kafka

 kafka的server.properties中,num.partitions=3。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MessageProducer {
private static final String LOCAL_KAFKA_BROKER = "localhost:9092";

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", LOCAL_KAFKA_BROKER);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
props.put("request.timeout.ms", "60000");

Producer<String, String> producer = new KafkaProducer<>(props);
Gson gson = new Gson();

for (int i = 0; i < 100000; i++) {
POJO pojo = new POJO();
int j = (int) (Math.random() * 3);
pojo.setAid("ID000-" + i);
pojo.setAname("NAME-" + i);
pojo.setAstyle("STYLE000-" + j);
pojo.setEnergy(new BigDecimal(1000 * Math.random()).setScale(2, RoundingMode.HALF_UP));
pojo.setAge(j * 9);
long time = System.currentTimeMillis();
pojo.setTt(new Date(time));
pojo.setLogTime(time);
pojo.setAstatus("02");

String value = gson.toJson(pojo);

producer.send(new ProducerRecord<String, String>(AsyncRedisJob.class.getSimpleName(), Integer.toString(i), value));

System.out.println(value);
}

producer.close();
}
}
使用事务异步提交

 RFuture commitAsync();是没有返回结果的,即Void,对于超时的最好重写timeout进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
RTransaction transaction = redissonClient.createTransaction(TransactionOptions.defaults());
// 注意时区
String dateStr = DateTime.now().toString(DateTimeFormat.forPattern("yyyy-MM-dd"));
RMap<String, Object> transactionMap = transaction.getMap("asyncio_atype_" + pojo.getAstyle() + "_" + dateStr);
// 按天计算累计计数和Energy(卡路里-能量变化)
if (transactionMap.isExists()) {
transactionMap.put("totalNum", (Long) transactionMap.get("totalNum") + 1);
transactionMap.put("totalEnergy", new BigDecimal(transactionMap.get("totalNum").toString()).add(pojo.getEnergy()));
} else {
transactionMap.put("totalNum", 1L);
transactionMap.put("totalEnergy", new BigDecimal(0.00));
}

RFuture transactionFuture = transaction.commitAsync();

 transaction.getMap,transactionMap.isExists()和后续的计数、能量的计算在partition>1即多个线程执行的时候,是非线程安全的。
 transaction本身也不是分布式事务,所以并发+异步情况下直接使用redisson的transaction是不合适的。

1
2
3
4
5
6
7
8
9
10
pojo.getAstyle(): STYLE000-0 transactionMap.isExists(): false threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
pojo.getAstyle(): STYLE000-0 transactionMap.isExists(): false threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
pojo.getAstyle(): STYLE000-0 transactionMap.isExists(): false threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
pojo.getAstyle(): STYLE000-0 transactionMap.isExists(): true threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
pojo.getAstyle(): STYLE000-1 transactionMap.isExists(): false threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
pojo.getAstyle(): STYLE000-1 transactionMap.isExists(): true threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
pojo.getAstyle(): STYLE000-1 transactionMap.isExists(): true threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
pojo.getAstyle(): STYLE000-1 transactionMap.isExists(): true threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
pojo.getAstyle(): STYLE000-1 transactionMap.isExists(): true threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
pojo.getAstyle(): STYLE000-0 transactionMap.isExists(): true threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
异步执行lua脚本

 lua脚本,需要注意isExists返回的可能不是false而是0。
 异步和同步执行的结果差异:4) “\xfc\x05907.0” These letters are object header written by FstCodec

1
2
3
4
5
6
7
8
9
10
11
local k = KEYS[1]
local e = ARGV[1]
local isExists = redis.call('exists', k)
if not isExists or isExists == 0 then
redis.call('hmset', k, 'totalNum', 1)
redis.call('hmset', k, 'totalEnergy', e)
else
redis.call('hmset', k, 'totalNum', tonumber(redis.call('hget', k, 'totalNum')) + 1)
redis.call('hmset', k, 'totalEnergy', string.format("%.2f", tonumber(redis.call('hget', k, 'totalEnergy')) + tonumber(e)))
end
return redis.call('hgetall', k)
1
2
3
4
5
// Mode mode, String shaDigest, ReturnType returnType, List<Object> keys, Object... values
RFuture<List<Object>> scriptFuture = script.evalShaAsync(RScript.Mode.READ_ONLY, sha, RScript.ReturnType.MAPVALUELIST, Arrays.asList(key), String.valueOf(pojo.getEnergy().doubleValue()));
scriptFuture.whenComplete((res, exception) -> {
resultFuture.complete(Arrays.asList(res.toString()));
});

 需要注意Codec。
 要反复确认whenComplete是否能够拿到异步执行的结果,确定是否走同步执行lua脚本。

1
2
3
4
5
6
7
8
// ((SingleServerConfig)config.useSingleServer().setTimeout(1000000)).setAddress("redis://127.0.0.1:6379");
Config config = new Config();
// 默认的Config在evalShaAsync时有bug,默认Codec是oldConf.setCodec(new FstCodec()); 不会使用getScript(StringCodec.INSTANCE)。
config.setCodec(StringCodec.INSTANCE);
config.useSingleServer()
.setTimeout(1000000)
.setAddress("redis://127.0.0.1:6379");
redissonClient = Redisson.create(config);

异步执行lua测试,先后发送了5和15条数据,共3中astyle:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
value: [totalNum, 1, totalEnergy, 295.0]
value: [totalNum, 1, totalEnergy, 867.0]
value: [totalNum, 1, totalEnergy, 704.0]
value: [totalNum, 2, totalEnergy, 1050.00]
value: [totalNum, 3, totalEnergy, 1864.00]
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (2/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (3/3)
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
value: [totalNum, 4, totalEnergy, 1873.00]
sha: 0219d65422f424b3646391cacd4d90854b270c9a threadName: Source: Custom Source -> async wait operator -> Sink: Unnamed (1/3)
value: [totalNum, 2, totalEnergy, 1121.00]
value: [totalNum, 3, totalEnergy, 1439.00]
value: [totalNum, 2, totalEnergy, 892.00]
value: [totalNum, 4, totalEnergy, 2190.00]
value: [totalNum, 7, totalEnergy, 3049.00]
value: [totalNum, 6, totalEnergy, 2562.00]
value: [totalNum, 8, totalEnergy, 3587.00]
value: [totalNum, 5, totalEnergy, 2374.00]
value: [totalNum, 7, totalEnergy, 3436.00]
value: [totalNum, 6, totalEnergy, 2971.00]
value: [totalNum, 5, totalEnergy, 2250.00]
value: [totalNum, 4, totalEnergy, 1942.00]
value: [totalNum, 5, totalEnergy, 2311.00]
value: [totalNum, 3, totalEnergy, 1288.00]

Redis情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
127.0.0.1:6379> keys *
1) "asyncio_atype_STYLE000-1_2019-07-15"
2) "asyncio_atype_STYLE000-2_2019-07-15"
3) "asyncio_atype_STYLE000-0_2019-07-15"
127.0.0.1:6379> hgetall asyncio_atype_STYLE000-1_2019-07-15
1) "totalNum"
2) "7"
3) "totalEnergy"
4) "3049.00"
127.0.0.1:6379> hgetall asyncio_atype_STYLE000-2_2019-07-15
1) "totalNum"
2) "8"
3) "totalEnergy"
4) "3587.00"
127.0.0.1:6379> hgetall asyncio_atype_STYLE000-0_2019-07-15
1) "totalNum"
2) "5"
3) "totalEnergy"
4) "2374.00"

【完】

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%