pyspark通过mongo-hadoop-spark访问MongoDB数据

mongo-hadoop-spark-1.5.2.jar
pymongo-spark
pymongo

wget http://central.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-spark/1.5.2/mongo-hadoop-spark-1.5.2.jar
wget https://github.com/mongodb/mongo-hadoop/archive/r1.5.2.tar.gz

tar zxvf r1.5.2.tar.gz
cd mongo-hadoop-r1.5.2/spark/src/main/python
python3.5 setup.py install
pip3.5 install pymongo
Requirement already satisfied…

文档资料

https://docs.mongodb.com/spark-connector/v1.1/python-api/
mongo-spark-connector仓库地址
http://maven-repository.com/artifact/org.mongodb.spark/mongo-spark-connector_2.10
Mongo-Spark版本与Spark版本对应关系

+———————-+———-+
| Mongo-Spark | Spark |
+———————-+———-+
| 2.2.1 | 2.2.x |
| 2.1.1 | 2.1.x |
| 1.1.0 | 1.6.x |
+———————-+———-+

测试使用版本

mongodb3.4.6
spark1.6.2
scala2.10.x mongo-spark-connector_2.10

Jar包

mongo-java-driver-3.6.2.jar http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.6.2/mongo-java-driver-3.6.2.jar
mongo-spark-connector_2.10-0.1.jar Date转换异常java.lang.ClassCastException: java.util.Date cannot be cast to java.sql.Date
mongo-spark-connector_2.10-1.1.0.jar 直接使用对应1.6.x的最后版本1.1.0
http://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.10/1.1.0/mongo-spark-connector_2.10-1.1.0.jar

测试代码和结果输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# -*- coding: utf-8 -*-
import os

os.environ["SPARK_HOME"] = "/usr/local/spark-1.6.2-bin-hadoop2.6/"
os.environ["PYSPARK_PYTHON"]="/usr/local/bin/python3.5"

from pyspark import SparkContext, SparkConf, SQLContext

# mongodb://username:password@ip:port
# ip:port或者域名
uri = "mongodb://root:123456@127.0.0.1:27017"
# database name
database = "test_db"
# collection name
collection = "test_collection"

# SparkConf
conf = (SparkConf().setAppName('pyspark-mongodb-test'))

# 设置MongoDB配置
conf.set("spark.mongodb.input.uri", uri)
conf.set("spark.mongodb.input.database", database)
conf.set("spark.mongodb.input.collection", collection)

# SparkContext
sc = SparkContext(conf=conf)
# SQLContext
sqlctx = SQLContext(sc)

# read
df = sqlctx.read.format("com.mongodb.spark.sql.DefaultSource").load()

# collection schema
df.printSchema()

df.registerTempTable("test_collection")

# 测试聚合
test_df = sqlctx.sql("""
select
sum(funnelInActivate) sum_funnelInActivate,
sum(funnelInActivateRate) sum_funnelInActivateRate,
sum(funnelInAdd) sum_funnelInAdd,
sum(funnelInDealRate) sum_funnelInDealRate
from test_collection
""")

test_df.show()

# 测试日期
test_df2 = sqlctx.sql("""
select
concat(addDate, '') concat_addDate_test
from test_collection limit 10
""")

test_df2.show()

# 综合测试
test_df3 = sqlctx.sql("""
select
substring(addDate, 1, 10) addDate,
sum(funnelInActivate) sum_funnelInActivate,
sum(funnelInActivateRate) sum_funnelInActivateRate,
sum(funnelInAdd) sum_funnelInAdd,
sum(funnelInDealRate) sum_funnelInDealRate
from test_collection where addDate>='2018-02-01'
group by substring(addDate, 1, 10)
""")

test_df3.show()

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
root
|-- __v: integer (nullable = true)
|-- _id: string (nullable = true)
|-- addDate: date (nullable = true)
|-- authCode: string (nullable = true)
|-- funnelInActivate: integer (nullable = true)
|-- funnelInActivateRate: integer (nullable = true)
|-- funnelInAdd: integer (nullable = true)
|-- funnelInDealRate: integer (nullable = true)

+--------------------+------------------------+---------------+--------------------+
|sum_funnelInActivate|sum_funnelInActivateRate|sum_funnelInAdd|sum_funnelInDealRate|
+--------------------+------------------------+---------------+--------------------+
| 6| 4| 9| 2|
+--------------------+------------------------+---------------+--------------------+

+--------------------+
| concat_addDate_test|
+--------------------+
|2018-02-05 16:45:...|
|2018-02-05 16:49:...|
|2018-02-05 16:50:...|
|2018-02-05 16:52:...|
|2018-02-05 16:55:...|
|2018-02-05 17:20:...|
|2018-02-05 17:25:...|
|2018-02-05 17:31:...|
|2018-02-05 17:36:...|
|2018-02-05 17:53:...|
+--------------------+

+----------+--------------------+------------------------+---------------+--------------------+
| addDate|sum_funnelInActivate|sum_funnelInActivateRate|sum_funnelInAdd|sum_funnelInDealRate|
+----------+--------------------+------------------------+---------------+--------------------+
|2018-02-20| 0| 0| 0| 0|
|2018-02-21| 0| 0| 0| 0|
|2018-02-22| 1| 1| 2| 0|
|2018-02-23| 0| 0| 1| 0|
|2018-02-24| 0| 0| 3| 0|
|2018-02-25| 0| 1| 1| 0|
|2018-02-26| 0| 0| 0| 0|
|2018-02-05| 2| 2| 1| 2|
|2018-02-06| 1| 0| 4| 0|
|2018-02-07| 0| 0| 0| 0|
|2018-02-08| 0| 0| 0| 0|
|2018-02-09| 0| 0| 0| 0|
|2018-02-10| 1| 1| 0| 0|
|2018-02-11| 1| 0| 1| 0|
|2018-02-12| 0| 0| 0| 0|
|2018-02-13| 0| 0| 0| 0|
|2018-02-14| 0| 0| 0| 0|
|2018-02-18| 0| 0| 0| 0|
|2018-02-19| 0| 0| 0| 0|
+----------+--------------------+------------------------+---------------+--------------------+

mongo-hadoop-core-1.5.2.jar
pymongo-spark
pymongo

wget http://central.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-core/1.5.2/mongo-hadoop-core-1.5.2.jar
wget https://github.com/mongodb/mongo-hadoop/archive/r1.5.2.tar.gz

tar zxvf r1.5.2.tar.gz
cd mongo-hadoop-r1.5.2/spark/src/main/python
python3.5 setup.py install
pip3.5 install pymongo
Requirement already satisfied…

邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%