pipelinedb将kafka流数据转为结构化数据的学习实践

使用rpm安装

pipelinedb-0.9.7u5-centos6-x86_64.rpm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ sudo rpm -ivh pipelinedb-0.9.7u5-centos6-x86_64.rpm
[sudo] password for hadoop:
Preparing... ########################################### [100%]
1:pipelinedb ########################################### [100%]

____ _ ___ ____ ____
/ __ \(_)___ ___ / (_)___ ___ / __ \/ __ )
/ /_/ / / __ \/ _ \/ / / __ \/ _ \/ / / / __ |
/ ____/ / /_/ / __/ / / / / / __/ /_/ / /_/ /
/_/ /_/ .___/\___/_/_/_/ /_/\___/_____/_____/
/_/

PipelineDB successfully installed. To get started, initialize a
database directory:

pipeline-init -D <data directory>

where <data directory> is a nonexistent directory where you'd
like all of your database files to live.

You can find the PipelineDB documentation at:

http://docs.pipelinedb.com

根据提示初始化存储目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
$ pipeline-init -D pipe_data
The files belonging to this database system will be owned by user "hadoop".
This user must also own the server process.

The database cluster will be initialized with locale "zh_CN.UTF-8".
The default database encoding has accordingly been set to "UTF8".
pipeline-init: could not find suitable text search configuration for locale "zh_CN.UTF-8"
The default text search configuration will be set to "simple".

Data page checksums are disabled.

creating directory pipe_data ... ok
creating subdirectories ... ok
selecting default max_connections ... 100
selecting default shared_buffers ... 128MB
selecting dynamic shared memory implementation ... posix
creating configuration files ... ok
creating template1 database in pipe_data/base/1 ... ok
initializing pg_authid ... ok
initializing dependencies ... ok
creating system views ... ok
loading system objects' descriptions ... ok
creating collations ... ok
creating conversions ... ok
creating dictionaries ... ok
setting privileges on built-in objects ... ok
creating information schema ... ok
loading PL/pgSQL server-side language ... ok
loading PipelineDB ... ok
vacuuming database template1 ... ok
copying template1 to template0 ... ok
copying template1 to pipeline ... ok
syncing data to disk ... ok

WARNING: enabling "trust" authentication for local connections
You can change this by editing pg_hba.conf or using the option -A, or
--auth-local and --auth-host, the next time you run initdb.

Success. You can now start the database server using:

pipeline-ctl -D pipe_data -l logfile start

启动pipelinedb

1
pipeline-ctl -D pipe_data -l logfile start

停止pipelinedb

1
pipeline-ctl -D pipe_data -l logfile stop

修改pipelinedb配置文件

配置允许特定IP访问
置服务IP和端口

1
2
3
4
5
6
7
8
$ vi pg_hba.conf
host all all 192.168.99.1/24 trust
$ vi pipelinedb.conf
listen_addresses = '192.168.99.100' # what IP address(es) to listen on;
# comma-separated list of addresses;
# defaults to 'localhost'; use '*' for all
# (change requires restart)
port = 5432 # (change requires restart)

重启pipelinedb

1
pipeline-ctl -D pipe_data -l logfile restart

psql访问并创建用户和分配角色

1
2
3
4
5
6
7
8
$ psql -p 5432 -h bigdata1 pipeline
psql (9.5.3)
Type "help" for help.

pipeline=# create user pipeline password 'pipeline' ;
CREATE ROLE
pipeline=# alter role pipeline superuser;
ALTER ROLE

退出和查看数据库

1
2
3
4
5
6
7
8
9
10
11
12
pipeline-# \q 退出pipeline[psql]
pipeline=# \l 查看数据库
pipeline=# \l
List of databases
Name | Owner | Encoding | Collate | Ctype | Access privileges
-----------+--------+----------+-------------+-------------+-------------------
pipeline | hadoop | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 |
template0 | hadoop | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | =c/hadoop +
| | | | | hadoop=CTc/hadoop
template1 | hadoop | UTF8 | zh_CN.UTF-8 | zh_CN.UTF-8 | =c/hadoop +
| | | | | hadoop=CTc/hadoop
(3 rows)

下载安装扩展librdkafka

1
2
3
4
5
git clone -b 0.9.1 https://github.com/edenhill/librdkafka.git ~/librdkafka
cd ~/librdkafka
./configure --prefix=/usr
make
sudo make install

使用Navicate客户端连接pipelinedb

连接选择postgres
更多内容 pipeline_kafka
image
image
image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
-- 加载扩展到数据库pipeline
CREATE EXTENSION pipeline_kafka;
-- 删除和新增kafka brokers
SELECT pipeline_kafka.remove_broker('192.168.99.100:9092,192.168.99.101:9092,192.168.99.102:9092');
SELECT pipeline_kafka.add_broker('内网ip1:9092,内网ip2:9092,内网ip3:9092');

-- 删除视图和STREAM
DROP CONTINUOUS VIEW msg_result;
DROP STREAM msg_stream;

-- 创建STREAM和视图
CREATE STREAM msg_stream (msg varchar);
CREATE CONTINUOUS VIEW msg_result AS SELECT msg FROM msg_stream;

-- kafka消费者启动消费
SELECT pipeline_kafka.consume_begin ( 'SearchLog', 'msg_stream',
format := 'text', group_id := 'PipelineDB-1' , batchsize := 1000,
maxbytes := 32000000, parallelism := 1, start_offset := 0 );

-- 查询数据 347126
select count(*) from msg_result where msg like '%,"create_time":"2017-07-11%'

-- 停止kafka消费
SELECT pipeline_kafka.consume_end('SearchLog', 'msg_stream');
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%