转载

Spark Streaming+ Spark SQL 实现配置化ETL流程

Spark Streaming 非常适合ETL。但是其开发模块化程度不高,所以这里提供了一套方案,可以实现模块化,配置化,屏蔽底层细节,支持SQL通过SQL开发Spark Streaming程序。

该实现完全基于 ServiceframeworkDispatcher 完成,核心功能大概只花了三个小时。

特性有:

  1. ETL 流程配置化
  2. 支持多Job配置
  3. 支持JSON数据源
  4. 支持SQL
  5. 输出支持打印

未来可扩展的支持:

  1. 动态添加或者删除job,或者更新某个Job的模块配置
  2. 支持Storm等其他流式引擎
  3. 更好的多job互操作
{    "test": {     "desc": "测试",     "strategy": "streaming.core.strategy.SparkStreamingStrategy",     "algorithm": [],     "ref": [],     "compositor": [       {         "name": "streaming.core.compositor.kafka.MockKafkaStreamingCompositor",         "params": [           {             "metadata.broker.list":"xxx",             "auto.offset.reset":"largest",             "topics":"xxx"           }         ]       },       {         "name": "streaming.core.compositor.spark.JSONTableCompositor",         "params": [{"tableName":"test"}         ]       },       {         "name": "streaming.core.compositor.spark.SQLCompositor",         "params": [{"sql":"select a from test"}         ]       },       {         "name": "streaming.core.compositor.RDDPrintOutputCompositor",         "params": [           {           }         ]       }     ],     "configParams": {     }   } }

上面完成了:

  1. 从Kafka消费数据
  2. 将Kafka数据转化为表
  3. 通过SQL进行处理
  4. 打印输出

以SQLCompositor 实现为例:

class SQLCompositor[T] extends Compositor[T] {    private var _configParams: util.List[util.Map[Any, Any]] = _   val logger = Logger.getLogger(classOf[SQLCompositor[T]].getName)    override def initialize(typeFilters: util.List[String], configParams: util.List[util.Map[Any, Any]]): Unit = {     this._configParams = configParams   }    def sql = {     _configParams(0).get("sql").toString   }    def outputTable = {     _configParams(0).get("outputTable").toString   }    override def result(alg: util.List[Processor[T]], ref: util.List[Strategy[T]], middleResult: util.List[T], params: util.Map[Any, Any]): util.List[T] = {     var dataFrame: DataFrame = null     val func = params.get("table").asInstanceOf[(RDD[String]) => SQLContext]     params.put("sql",(rdd:RDD[String])=>{       val sqlContext = func(rdd)       dataFrame = sqlContext.sql(sql)       dataFrame     })     middleResult   } }

对应的配置为:

 {         "name": "streaming.core.compositor.spark.SQLCompositor",         "params": [{"sql":"select a from test"}         ]   },

对应的,整个SparkStreaming程序开发,其实就是正对Compositor 接口进行开发。完全屏蔽了Spark Streaming API。

原文  http://www.jianshu.com/p/cd26a413cbd4
正文到此结束
Loading...