Spark访问外部数据源HBase和通过自定义HBaseSink处理Structured Streaming数据

shc: Apache Spark - Apache HBase Connector。
shc是用于支持Spark访问HBase的,通过shc,将HBase作为外部数据源供Spark加载和查询数据,同时Spark也可以将DataFrame、DataSet数据存储到HBase中【Sink】。
通过自定义的class实现StreamSinkProvider可以将Structured Streaming处理的数据存储到HBase中。

  1. writeStream使用complete,每次用全量的数据进行计算,把结果插入(更新)到HBase中。【数据量激增、Spark运算量大-耗时】。
  2. writeStream使用append,在自定义的Sink中对HBase先查询取值加总再插入(更新)-类似Spark Streaming的批处理。【数据量可控、HBase查询频繁-无大影响】。

查看shc源码

git clone https://github.com/hortonworks-spark/shc.git
使用Intellij idea打开项目

HBaseRelation.scala
class DefaultSource

class DefaultSource继承了RelationProvider和实现了CreatableRelationProvider
重写了createRelation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[sql] class DefaultSource extends RelationProvider with CreatableRelationProvider {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
HBaseRelation(parameters, None)(sqlContext)
}

override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val relation = HBaseRelation(parameters, Some(data.schema))(sqlContext)
relation.createTableIfNotExist()
relation.insert(data, false)
relation
}
}

trait RelationProvider
注意阅读注释For example
当option配置format为org.apache.spark.sql.execution.datasources.hbase时,会解析数据源org.apache.spark.sql.execution.datasources.hbase.DefaultSource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Implemented by objects that produce relations for a specific kind of data source. When
* Spark SQL is given a DDL operation with a USING clause specified (to specify the implemented
* RelationProvider), this interface is used to pass in the parameters specified by a user.
*
* Users may specify the fully qualified class name of a given data source. When that class is
* not found Spark SQL will append the class name `DefaultSource` to the path, allowing for
* less verbose invocation. For example, 'org.apache.spark.sql.json' would resolve to the
* data source 'org.apache.spark.sql.json.DefaultSource'
*
* A new instance of this class will be instantiated each time a DDL call is made.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait RelationProvider {
/**
* Returns a new base relation with the given parameters.
*
* @note The parameters' keywords are case insensitive and this insensitivity is enforced
* by the Map that is passed to the function.
*/
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
}

trait CreatableRelationProvider

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @since 1.3.0
*/
@InterfaceStability.Stable
trait CreatableRelationProvider {
/**
* Saves a DataFrame to a destination (using data source-specific parameters)
*
* @param sqlContext SQLContext
* @param mode specifies what happens when the destination already exists
* @param parameters data source-specific parameters
* @param data DataFrame to save (i.e. the rows after executing the query)
* @return Relation with a known schema
*
* @since 1.3.0
*/
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation
}

数据源的别名
with DataSourceRegister定义一个字符串即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Data sources should implement this trait so that they can register an alias to their data source.
* This allows users to give the data source alias as the format type over the fully qualified
* class name.
*
* A new instance of this class will be instantiated each time a DDL call is made.
*
* @since 1.5.0
*/
@InterfaceStability.Stable
trait DataSourceRegister {

/**
* The string that represents the format that this data source provider uses. This is
* overridden by children to provide a nice alias for the data source. For example:
*
* {{{
* override def shortName(): String = "parquet"
* }}}
*
* @since 1.5.0
*/
def shortName(): String
}
class HBaseRelation

class HBaseRelation
继承BaseRelation实现了PrunedFilteredScan、InsertableRelation和Logging
abstract class BaseRelation { unhandledFilters }

1
2
3
4
5
6
7
8
9
10
case class HBaseRelation(
parameters: Map[String, String],
userSpecifiedschema: Option[StructType]
)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {

// insert
// unhandledFilters
// buildScan
}

trait PrunedFilteredScan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A BaseRelation that can eliminate unneeded columns and filter using selected
* predicates before producing an RDD containing all matching tuples as Row objects.
*
* The actual filter should be the conjunction of all `filters`,
* i.e. they should be "and" together.
*
* The pushed down filters are currently purely an optimization as they will all be evaluated
* again. This means it is safe to use them with methods that produce false positives such
* as filtering partitions based on a bloom filter.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait PrunedFilteredScan {
def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
}

trait InsertableRelation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* A BaseRelation that can be used to insert data into it through the insert method.
* If overwrite in insert method is true, the old data in the relation should be overwritten with
* the new data. If overwrite in insert method is false, the new data should be appended.
*
* InsertableRelation has the following three assumptions.
* 1. It assumes that the data (Rows in the DataFrame) provided to the insert method
* exactly matches the ordinal of fields in the schema of the BaseRelation.
* 2. It assumes that the schema of this relation will not be changed.
* Even if the insert method updates the schema (e.g. a relation of JSON or Parquet data may have a
* schema update after an insert operation), the new schema will not be used.
* 3. It assumes that fields of the data provided in the insert method are nullable.
* If a data source needs to check the actual nullability of a field, it needs to do it in the
* insert method.
*
* @since 1.3.0
*/
@InterfaceStability.Stable
trait InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit
}
SHC对HBase的insert实现

insert 插入数据 调用 convertToPut 封装数据,最后调用saveAsNewAPIHadoopDataset保存

1
2
3
4
rdd.mapPartitions(iter => {
SHCCredentialsManager.processShcToken(serializedToken)
iter.map(convertToPut(rkFields))
}).saveAsNewAPIHadoopDataset(jobConfig)

convertToPut
此处用到了自定义封装的SHCDataType的toBytes,不同数据类型对应字节数组的相互转换
可以在此设置开关判断是否先查询处理后再插入

1
2
3
4
5
6
7
8
9
10
val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))
colsIdxedFields.foreach { case (x, y) =>
val value = row(x)
if(value != null) {
put.addColumn(
coder.toBytes(y.cf),
coder.toBytes(y.col),
SHCDataTypeFactory.create(y).toBytes(value))
}
}
PrimitiveType.scala

默认的原生数据类型,由于HBase存储的是字符串的字节数组,本身与int bigint double boolean等无对应关系。
直接将不同数据类型的数据转成字符串,再转成字节数组即可,否则无法存储。
将下面代码做调整,如Long => Bytes.toBytes(data) 改为 Long => Bytes.toBytes(String.valueOf(data))。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class PrimitiveType(f:Option[Field] = None) extends SHCDataType {
def toBytes(input: Any): Array[Byte] = {
input match {
case data: Boolean => Bytes.toBytes(data)
case data: Byte => Array(data)
case data: Array[Byte] => data
case data: Double => Bytes.toBytes(data)
case data: Float => Bytes.toBytes(data)
case data: Int => Bytes.toBytes(data)
case data: Long => Bytes.toBytes(data)
case data: Short => Bytes.toBytes(data)
case data: UTF8String => data.getBytes
case data: String => Bytes.toBytes(data)
case _ => throw new
UnsupportedOperationException(s"PrimitiveType coder: unsupported data type $input")
}
}

查看trait Sink

org.apache.spark.sql.execution.streaming.Sink。
BaseStreamingSink: The shared interface between V1 and V2 streaming sinks。
BaseStreamingSink会在未来的版本中删除。
FileStreamSink通过继承Sink实现【FileFormatWriter写数据到文件中】。
ForeachSink通过Sink实现【ForeachWriter接收一个函数处理数据 Spark2.2.0】。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* An interface for systems that can collect the results of a streaming query. In order to preserve
* exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
* batch.
*/
trait Sink extends BaseStreamingSink {

/**
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once.
*
* Note 1: You cannot apply any operators on `data` except consuming it (e.g., `collect/foreach`).
* Otherwise, you may get a wrong result.
*
* Note 2: The method is supposed to be executed synchronously, i.e. the method should only return
* after data is consumed by sink successfully.
*/
def addBatch(batchId: Long, data: DataFrame): Unit
}

自定义HBase的Sink输出

提供一个类似ConsoleSinkProvider【别名console】的format。
创建DemoSink继承Sink,接收Structured Streaming的数据流,通过shc的org.apache.spark.sql.execution.datasources.hbase保存数据。
定义DemoSinkProvider,创建DemoSink。
为DemoSinkProvider定义一个别名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package streaming

import org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode

case class DemoSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode) extends Sink {
// 获取HBase参数
private val hBaseCatalog = parameters.get("catalog").map(_.toString).getOrElse("")

override def addBatch(batchId: Long, data: DataFrame): Unit = {
println(s"addBatch($batchId)")
// println(parameters.get("catalog"))
// println(hBaseCatalog)
// data.explain()
// data.sparkSession.createDataFrame(
// data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
// .show(10)

data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.write
.options(Map(HBaseTableCatalog.tableCatalog -> hBaseCatalog,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase").save()
}
}

class DemoSinkProvider extends StreamSinkProvider with DataSourceRegister {
// 重写StreamSinkProvider的createSink
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink = {
DemoSink(sqlContext, parameters, partitionColumns, outputMode)
}

// 需要create org.apache.spark.sql.sources.DataSourceRegister in META-INF/services
// 直接format('streaming.DemoSinkProvider')即可
override def shortName(): String = "demosink"
}
邵志鹏 wechat
扫一扫上面的二维码关注我的公众号
0%