Spark Streaming 非常适合ETL。但是其开发模块化程度不高,所以这里提供了一套方案,可以实现模块化,配置化,屏蔽底层细节,支持SQL通过SQL开发Spark Streaming程序。
该实现完全基于 ServiceframeworkDispatcher 完成,核心功能大概只花了三个小时。
特性有:
未来可扩展的支持:
{ "test": { "desc": "测试", "strategy": "streaming.core.strategy.SparkStreamingStrategy", "algorithm": [], "ref": [], "compositor": [ { "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor", "params": [ { "metadata.broker.list":"xxx", "auto.offset.reset":"largest", "topics":"xxx" } ] }, { "name": "streaming.core.compositor.spark.JSONTableCompositor", "params": [{"tableName":"test"} ] }, { "name": "streaming.core.compositor.spark.SQLCompositor", "params": [{"sql":"select a from test"} ] }, { "name": "streaming.core.compositor.RDDPrintOutputCompositor", "params": [ { } ] } ], "configParams": { } } }
上面完成了:
以SQLCompositor 实现为例:
class SQLCompositor[T] extends Compositor[T] { private var _configParams: util.List[util.Map[Any, Any]] = _ val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName) override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = { this._configParams = configParams } def sql = { _configParams(0).get("sql").toString } def outputTable = { _configParams(0).get("outputTable").toString } override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = { var dataFrame: DataFrame = null val func = params.get("table").asInstanceOf[(RDD[String]) => SQLContext] params.put("sql",(rdd:RDD[String])=>{ val sqlContext = func(rdd) dataFrame = sqlContext.sql(sql) dataFrame }) middleResult } }
对应的配置为:
{ "name": "streaming.core.compositor.spark.SQLCompositor", "params": [{"sql":"select a from test"} ] },
对应的,整个SparkStreaming程序开发,其实就是正对Compositor 接口进行开发。完全屏蔽了Spark Streaming API。