pyspark访问Elasticsearch数据

文档资料

https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
https://stackoverflow.com/questions/42114519/inserting-arrays-in-elasticsearch-via-pyspark

测试使用版本

elasticsearch-2.4.4
spark1.6.2

Jar包

elasticsearch-hadoop-2.4.4.jar

https://www.elastic.co/downloads/past-releases
http://download.elastic.co/hadoop/elasticsearch-hadoop-2.4.4.zip
elasticsearch-hadoop-2.4.4/dist/elasticsearch-hadoop-2.4.4.jar

测试代码和结果输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# -*- coding: utf-8 -*-
import os

os.environ["SPARK_HOME"] = "/usr/local/spark-1.6.2-bin-hadoop2.6/"
os.environ["PYSPARK_PYTHON"] = "/usr/local/bin/python3.5"

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.types import *

# SparkConf
conf = (SparkConf().setAppName('pyspark-es-test'))
# SparkContext
sc = SparkContext(conf=conf)
# SQLContext
sqlctx = SQLContext(sc)

# save data to elasticsearch
schema = StructType([
StructField("id", StringType(), True),
StructField("email", StringType(), True)
])

data = [('1', 'ss'), ('2', 'dd'), ('3', 'ff_update'), ('4', '嘻嘻嘻')]

save_df = sqlctx.createDataFrame(data, ['id', 'uname'])
save_df.show(truncate=False)

# 重复save相同数据【使用overwrite】
# 使用overwrite会抹掉原来的所有数据,重新再写入新的数据
# 建议使用append,已经存在的有变化的数据会更新,新的数据会插入,未发生变化的数据不受影响;当然数据量控制可能需要额外的增量删除
save_df.write \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", "192.168.0.101,192.168.0.102,192.168.0.103") \
.option("es.resource", "pyspark_es/data") \
.option("es.mapping.id", "id") \
.mode('append') \
.save()

# read data from elasticsearch
df = sqlctx.read \
.format("org.elasticsearch.spark.sql") \
.option("es.nodes", "192.168.0.101,192.168.0.102,192.168.0.103") \
.option("es.resource", "pyspark_es/data") \
.load()

df.registerTempTable('pyspark_es_tmp')

test_df = sqlctx.sql("select * from pyspark_es_tmp order by id desc limit 10")

test_df.show()

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+---+---------+
|id |uname |
+---+---------+
|1 |ss |
|2 |dd |
|3 |ff_update|
|4 |嘻嘻嘻 |
+---+---------+

+---+---------+
| id| uname|
+---+---------+
| 4| 嘻嘻嘻|
| 3|ff_update|
| 2| dd|
| 1| ss|
+---+---------+

查看Elasticsearch

image

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%