Flink学习-设置开发环境

1. Flink学习-Streaming介绍
flink-quickstart
flink-training-exercises
需要注意的是官方flink-training-exercises更新了Flink和Scala的版本(1.8.0和2.12)。

设置开发环境

软件要求
  1. Java JDK 8 ,非JRE,另外不建议8+版本的JAVA(Only Java 8 will work; not Java 7, or 9 (or newer).)
  2. Apache Maven 3.x 编译Flink或blink源码最好使用3.2.5
  3. Git
  4. IDE for Java (and/or Scala) 推荐使用IntelliJ
1
2
3
git clone https://github.com/dataArtisans/flink-training-exercises.git
cd flink-training-exercises
mvn clean package

 也可以直接从IDE直接检出。

 IntelliJ需要注意安装Scala插件。

  1. IntelliJ IDEA -> Preferences -> Plugins … Install Jetbrains plugin
  2. 选择并安装Scala插件
  3. 重启IDEA
下载数据集

 比如出租车数据:

1
2
wget http://training.ververica.com/trainingData/nycTaxiRides.gz
wget http://training.ververica.com/trainingData/nycTaxiFares.gz

安装本地flink集群

下载flink版本

 如果不使用HDFS或YARN,可以下载没有绑定hadoop的版本。下载后直接解压即可。

启动
1
./bin/start-cluster.sh
验证是否启动成功
  1. 查看日志 ./log/
  2. 查看WEB UI 默认http://localhost:8081
编译打包并提交Job

 上面已经操作过,flink-training-exercises。
 这里一个Jar包含很多Job,-c进行选择,生产的Job也可以使用此方式进行编译打包,挺好的。

1
2
3
4
5
mvn clean package

./bin/flink run -c \
com.dataartisans.flinktraining.examples.datastream_java.basics.RideCount \
~/flink-training-exercises/target/flink-training-exercises-2.7.1.jar

 如果代码有改动破坏了checkstyle,编译打包时把pom.xml的maven-checkstyle-plugin注掉。

1
./bin/stop-cluster.sh

使用SQL客户端

编辑sql-client-config.yaml文件

 文件再flink-training-exercises目录下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
tables:
- name: TaxiRides
type: source-table
update-mode: append
connector:
type: taxi-rides
path: "/Users/shaozhipeng/Resources/2019/trainingData/nycTaxiRides.gz"
max-event-delay-secs: 60
serving-speed-factor: 1800
- name: TaxiFares
type: source-table
update-mode: append
connector:
type: taxi-fares
path: "/Users/shaozhipeng/Resources/2019/trainingData/nycTaxiFares.gz"
max-event-delay-secs: 60
serving-speed-factor: 1800

启动本地flink集群

1
2
3
4
$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host localhost.
Starting taskexecutor daemon on host localhost.

启动SQL客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
$ /Users/shaozhipeng/Development/flink-1.7.2/bin/sql-client.sh embedded --jar  target/flink-training-exercises-2.6.0.jar -e sql-client-config.yaml
No default environment specified.
Searching for '/Users/shaozhipeng/Development/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/shaozhipeng/Development/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/Users/shaozhipeng/Development/project/java/flink-training-exercises/sql-client-config.yaml
Validating current environment...done.

▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░

______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|

Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.


Flink SQL> SHOW TABLES;
TaxiFares
TaxiRides

Flink SQL> DESCRIBE TaxiRides;
root
|-- rideId: Long
|-- taxiId: Long
|-- driverId: Long
|-- isStart: Boolean
|-- startLon: Float
|-- startLat: Float
|-- endLon: Float
|-- endLat: Float
|-- passengerCnt: Short
|-- eventTime: TimeIndicatorTypeInfo(rowtime)
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%