转载

流式计算常见模块用法说明

说明

StreamingPro有非常多的模块可以直接在配置文件中使用,本文主要针对流式计算中涉及到的模块。

Kafka Compositor

{    "name": "streaming.core.compositor.spark.streaming.source.KafkaStreamingCompositor",    "params": [{                  "topics":"your topic",                  "metadata.broker.list":"brokers",                  "auto.offset.reset": "smallest|largest"              }] }

参数说明:

Property Name Meaning
topics Kafka主题,可以多个,按 逗号分隔
metadata.broker.list Kafka Broker地址
auto.offset.reset 重头消费还是从最新消费

MockInputStreamCompositor

模拟数据源,主要为了方便测试。

{         "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",         "params": [{                       "batch-1":["1","2","3"],                       "batch-2":["1","2","3"],                       "batch-3":["1","2","3"],                       "batch-4":["1","2","3"]                   }] }

MockInputStreamFromPathCompositor

模拟数据源,主要为了方便测试。可以接入一个外部文件作为mock数据

{         "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamFromPathCompositor",         "params": [{"path":"file:///tmp/test.txt"}] }

SingleColumnJSONCompositor

把一条日志转化一个单列的json文件。

{         "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",         "params": [{             "name": "a"           }] }

params.name 则是列名,方便后续的sql使用。

ScalaMapToJSONCompositor

{         "name": "streaming.core.compositor.spark.streaming.transformation.ScalaMapToJSONCompositor",         "params": [{}] }

可以把scala Map转化为JSon

JavaMapToJSONCompositor

{         "name": "streaming.core.compositor.spark.streaming.transformation.JavaMapToJSONCompositor",         "params": [{}] }

可以把java Map转化为JSon

FlatJSONCompositor

{         "name": "streaming.core.compositor.spark.streaming.transformation.FlatJSONCompositor",         "params": [{"a":"$['store']['book'][0]['title']"}] }

从JSON里抽取字段,映射到新的列名上。主要是对复杂JSON结构进行扁平化。语法参考该库 JsonPath

NginxParserCompositor

{         "name": "streaming.core.compositor.spark.streaming.transformation.NginxParserCompositor",         "params": [{"time":0,"url":1}] }

Nginx 日志解析工具,按位置给列进行命名。

SQLCompositor

{         "name": "streaming.core.compositor.spark.streaming.transformation.SQLCompositor",         "params": [           {             "sql": "select a, /"5/" as b from test",             "outputTableName": "test2"           }         ]       }
Property Name Meaning
sql sql 语句
outputTableName 输出的表名,方便后续的SQL语句可以衔接

SQLESOutputCompositor

将数据存储到ES中

{         "name":"streaming.core.compositor.spark.streaming.output.SQLESOutputCompositor",         "params":[           {             "es.nodes":"",             "es.resource":"",             "es.mapping.include":"",             "timeFormat":"yyyyMMdd"           }         ] }
Property Name Meaning
es.nodes 节点,多个节点用逗号分隔
es.resource 索引名称以及类型名称
.... 其他一些elasticsearch-hadoop的配置

SQLPrintOutputCompositor(output)

{         "name": "streaming.core.compositor.spark.streaming.output.SQLPrintOutputCompositor",         "params": [{}] }

把处理结果打印到终端控制台。主要是为了调试使用

JSONTableCompositor

{         "name": "streaming.core.compositor.spark.streaming.transformation.JSONTableCompositor",         "params": [{             "tableName": "test"           }] }

把字符串(JSON格式)的数据注册成一张表。 params.tableName可以让你指定表名。

ConsoleOutputCompositor

{         "name": "streaming.core.compositor.spark.streaming.output.ConsoleOutputCompositor",         "params": [{ }] }

控制台打印,非SQL类。

SQLCSVOutputCompositor

{         "name": "streaming.core.compositor.spark.streaming.output.SQLCSVOutputCompositor",         "params": [{   "path":"",   "mode":""  }] }
Property Name Meaning
path cvs 存储路径
mode ErrorIfExists 或者Overwrite 或者Append或者Ignore

作为CSV 输出,需要前面是一张表。

SQLParquetOutputCompositor

{         "name": "streaming.core.compositor.spark.streaming.output.SQLParquetOutputCompositor",         "params": [{   "path":"",   "mode":""  }] }
Property Name Meaning
path parquet 存储路径
mode ErrorIfExists 或者Overwrite 或者Append或者Ignore

作为parquet 输出,需要前面是一张表。

原文  http://www.jianshu.com/p/9c0d00498cb8
正文到此结束
Loading...