我们按照 storm 规范开发的 spout 和 bolt 需要使用 TopologyBuilder 构建成有向无环图(拓扑),并指定消息的分组方式,然后提交给 storm 集群执行,本篇我们将分析 topology 的构建和提交过程。前面分析 storm 的编程接口时曾介绍过 StormTopology 这个 thrift 类,topology 在构建完成之后会封装成一个 StormTopology 对象,并通过 RPC 方法提交给 storm 集群的 nimbus 节点。
拓扑结构在 storm 集群中以 StormTopology 对象的形式表示,这是一个 thrift 类,其定义如下:
struct StormTopology { 1: required map<string, SpoutSpec> spouts; // topology 中的 spout 集合 2: required map<string, Bolt> bolts; // topology 中的 bolt 集合 3: required map<string, StateSpoutSpec> state_spouts; // topology 中的 state spout 集合 }
属性 spouts 的 key 是 spout 对应的 ID,value 是 SpoutSpec 类型对象,这也是一个 thrift 类,封装了 spout 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。属性 bolts 的 key 是 bolt 对应的 ID,value 是 Bolt 类型对象,Bolt 同样是一个 thrfit 类,封装了 bolt 的序列化 ComponentObject 对象和通用组件 ComponentCommon 对象。
ComponentObject 是一个 thrift 联合类型(union),在这里主要使用了 serialized_java 字段记录组件的序列化值:
union ComponentObject { 1: binary serialized_java; // 序列化后的 java 对象 2: ShellComponent shell; // ShellComponent 对象 3: JavaObject java_object; // java 对象 }
ComponentCommon 是对组件的抽象表示,spout 和 bolt 在 topology 中统称为组件,topology 构建过程中会将 spout 和 bolt 都封装成为 ComponentCommon 对象:
struct ComponentCommon { // 组件将从哪些 GlobalStreamId 以何种分组方式接收数据 1: required map<GlobalStreamId, Grouping> inputs; // 组件要输出的所有流,key 是 streamId 2: required map<string, StreamInfo> streams; // 组件并行度(即多少个线程),这些线程可能分布在不同的机器或进程空间中 3: optional i32 parallelism_hint; // 组件相关配置项 4: optional string json_conf; }
StormTopology 作为 thrift 类在编译成 java 实现时比较冗长,所以 storm 提供了 TopologyBuilder 构造器类来简化 topology 的构造,其使用形式一般如下:
public class WordCountTopology implements ComponentId, FieldName { private static final String TOPOLOGY_NAME = "wordcount-topology"; public static void main(String[] args) throws Exception { SentenceSpout sentenceSpout = new SentenceSpout(); SentenceSplitBolt sentenceSplitBolt = new SentenceSplitBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ReportBolt reportBolt = new ReportBolt(); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout); builder.setBolt(SENTENCE_SPLIT_BOLT_ID, sentenceSplitBolt).shuffleGrouping(SENTENCE_SPOUT_ID); builder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SENTENCE_SPLIT_BOLT_ID, new Fields(WORD)); builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology()); TimeUnit.MINUTES.sleep(10); localCluster.killTopology(TOPOLOGY_NAME); localCluster.shutdown(); } }
创建完 TopologyBuilder 实例之后,我们可以调用 setSpout
方法往 topology 中添加并设置 spout 组件,调用 setBolt
方法往 topology 中添加并设置 bolt 组件,并最后通过调用 createTopology
方法来完成 topology 的构建,该方法会返回一个 StormTopology 对象。TopologyBuilder 类中主要关注 3 个属性:
/** 记录拓扑范围内所有的 Bolt 对象 */ protected Map<String, IRichBolt> _bolts = new HashMap<>(); /** 记录拓扑范围内所有的 Spout 对象 */ protected Map<String, IRichSpout> _spouts = new HashMap<>(); /** 记录拓扑范围内封装所有的 Spout 和 Bolt 的 ComponentCommon 组件对象 */ protected Map<String, ComponentCommon> _commons = new HashMap<>();
下面来看一下 spout 和 bolt 的构造过程,即 setSpout
和 setBolt
方法,针对这两类方法,TopologyBuilder 都提供了多种重载版本,其中 setSpout
对应的底层实现如下:
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) throws IllegalArgumentException { // 保证 id 在 topology 范围内的全局唯一 this.validateUnusedId(id); // 以 ComponentCommon 的形式封装组件,并记录到 _commons 属性中 this.initCommon(id, spout, parallelism_hint); // 记录组件到 _spout 集合中 _spouts.put(id, spout); return new SpoutGetter(id); }
方法首先会验证 spoutId 在 topology 范围内的全局唯一性,即没有被已有的 spout 和 bolt 占用,否则会抛出 IllegalArgumentException 异常。 initCommon
方法会构造 spout 对应的 ComponentCommon 对象并记录到 _commons
属性中:
protected void initCommon(String id, IComponent component, Number parallelism) throws IllegalArgumentException { ComponentCommon common = new ComponentCommon(); common.set_inputs(new HashMap<GlobalStreamId, Grouping>()); // 设置并行度 if (parallelism != null) { int dop = parallelism.intValue(); if (dop < 1) { throw new IllegalArgumentException("Parallelism must be positive."); } common.set_parallelism_hint(dop); // 设置组件并行度 } else { // 如果没有设置的话,默认设置并行度为 1 common.set_parallelism_hint(1); } // 获取组件相关的配置并以 json 的形式记录到 ComponentCommon 对象中 Map conf = component.getComponentConfiguration(); if (conf != null) common.set_json_conf(JSONValue.toJSONString(conf)); _commons.put(id, common); }
最后将 spout 对象记录到 _spouts
属性中,并构造当前 spout 对应的 SpoutGetter 对象。
SpoutGetter 可以理解为 spout 对应的属性配置器,用于为当前 spout 加载通用的配置和设置私有的属性值,并最终将所有的配置项序列化为 json 格式记录到封装当前 spout 的 ComponentCommon 对象中(json_conf 属性)。
protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer { public SpoutGetter(String id) { super(id); } }
SpoutGetter 类继承了 ConfigGetter 类,并实现了 SpoutDeclarer 接口,该接口主要是声明了一些 spout 组件相关的配置方法,具体的实现都在 ConfigGetter 的父类 BaseConfigurationDeclarer 中,实现比较简单,不展开说明。ConfigGetter 覆盖实现了父类的 addConfigurations
方法,并在该方法中将当前 spout 所有相关的配置项序列化成 json 记录到对应的 ComponentCommon 对象中:
@Override public T addConfigurations(Map conf) { if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) { // ${topology.kryo.register} // 在通常的非事务流处理中,不允许设置组件的序列化方式 throw new IllegalArgumentException("Cannot set serializations for a component using fluent API"); } String currConf = _commons.get(_id).get_json_conf(); // 将 currConf 与 conf 的配置项合并,并以 json string 的形式记录到对应组件的 json_conf 字段中 _commons.get(_id).set_json_conf(JStormUtils.mergeIntoJson(JStormUtils.parseJson(currConf), conf)); return (T) this; }
下面继续分析 setBolt
方法,上一篇在介绍 Bolt 组件接口时我们知道 storm 提供了三种基础的 Bolt 组件类型,即 IBolt(or IRichBolt)、IBasicBolt,以及 IBatchBolt。针对每种 Bolt 类型,TopologyBuilder 都有提供相应版本的 setBolt
方法实现,下面以最常见的 IBolt 类型为例,对应的方法实现如下:
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) throws IllegalArgumentException { // 保证 id 在 topology 范围内的全局唯一 this.validateUnusedId(id); // 以 ComponentCommon 形式封装组件,并记录到 _commons 属性中 this.initCommon(id, bolt, parallelism_hint); // 记录 bolt 到 _bolt 集合中 _bolts.put(id, bolt); return new BoltGetter(id); }
流程上与 setSpout
大同小异,不再重复撰述,方法最终会将 bolt 对象记录到 _bolts
属性中,并构造当前 bolt 对应的 BoltGetter 对象。
前面我们分析了 SpoutGetter,知道其作用主要是为 spout 配置相关属性,BoltGetter 的作用同样如此,不过相对于 SpoutGetter 增加了消息分组方式的配置入口,最后同样将属性序列化为 json 格式记录到与组件相对应的 ComponentCommon 对象中。
在完成调用 setSpout
和 setBolt
往 topology 中添加 spout 和 bolt 组件之后,我们需要调用 createTopology
方法创建相应的 StormTopology 对象,该方法的实现如下:
public StormTopology createTopology() { Map<String, Bolt> boltSpecs = new HashMap<>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<>(); // 如果当前 topology 中含有 stateful-bolt,就为 topology 自动添加一个 CheckpointSpout this.maybeAddCheckpointSpout(); // 遍历处理 bolt,封装 bolt 的序列化形式和 ComponentCommon 形式为 Bolt 对象,并记录到 boltSpecs 中 for (String boltId : _bolts.keySet()) { IRichBolt bolt = _bolts.get(boltId); // 如果当前 topology 中含有 stateful-bolt,那么针对 non-stateful bolt 都采用 CheckpointTupleForwarder 进行包装 bolt = this.maybeAddCheckpointTupleForwarder(bolt); ComponentCommon common = this.getComponentCommon(boltId, bolt); try { this.maybeAddCheckpointInputs(common); this.maybeAddWatermarkInputs(common, bolt); // 封装 bolt 的序列化形式和 ComponentCommon 形式为 Bolt 对象,并记录到 boltSpecs 中 boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common)); } catch (RuntimeException wrapperCause) { // 省略异常处理逻辑 throw wrapperCause; } } // 遍历处理 spout,封装 spout 的序列化形式和 ComponentCommon 形式为 SpoutSpec 对象,并记录到 spoutSpecs 中 for (String spoutId : _spouts.keySet()) { IRichSpout spout = _spouts.get(spoutId); ComponentCommon common = this.getComponentCommon(spoutId, spout); try { // 封装 spout 的序列化形式和 ComponentCommon 形式为 SpoutSpec 对象,并记录到 spoutSpecs 中 spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); } catch (RuntimeException wrapperCause) { // 省略异常处理逻辑 throw wrapperCause; } } // 封装成为 stormTopology 对象返回 return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>()); }
整个方法的执行流程可以概括为:
当构建完 topology 之后,我们需要以任务的形式将其提交到 storm 集群运行。此外,为了方便调试,storm 也支持通过 LocalCluster 在本地提交运行任务,本节我们主要介绍如何向 storm 集群提交任务。
Storm 提供了 StormSubmitter 类用于向 storm 集群提交任务,并提供了两类方法: submitTopology
和 submitTopologyWithProgressBar
。后者是对前者的封装,在原版 storm 中用于支持显示任务的提交进度,但是这一设计在 jstorm 中被移除,所以两类方法实际上是等价的。接下来我们对 submitTopology
方法的实现进行分析,storm 为该方法提供了多个重载版本,对应的底层实现如下:
public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException { // 验证配置是否为 json 格式 if (!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } // 封装配置(构建 topology 期间添加的、提交 topology 时传入的,以及命令行参数) Map userTotalConf = new HashMap(); userTotalConf.putAll(TopologyBuilder.getStormConf()); // add the configuration generated during topology building userTotalConf.putAll(stormConf); userTotalConf.putAll(Utils.readCommandLineOpts()); // 加载配置文件配置 Map conf = Utils.readStormConfig(); conf.putAll(stormConf); putUserInfo(conf, stormConf); try { String serConf = Utils.to_json(userTotalConf); // 转换成 json 形式 if (localNimbus != null) { // 本地模式 LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { // 集群模式 // 创建 Thrift 客户端 NimbusClient client = NimbusClient.getConfiguredClient(conf); try { // 是否允许热部署 ${topology.hot.deploy.enable} boolean enableDeploy = ConfigExtension.getTopologyHotDeplogyEnable(userTotalConf); // 是否是灰度发布 ${topology.upgrade} boolean isUpgrade = ConfigExtension.isUpgradeTopology(userTotalConf); // 是否允许动态更新 boolean dynamicUpdate = enableDeploy || isUpgrade; if (topologyNameExists(client, conf, name) != dynamicUpdate) { if (dynamicUpdate) { // 动态更新,但是对应的 topology 不存在 throw new RuntimeException("Topology with name `" + name + "` does not exist on cluster"); } else { // 提交新任务,但是对应的 topology 已经存在 throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } } // 上传 jar 包 submitJar(client, conf); LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); // 提交任务 if (opts != null) { client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts); } else { // for backward compatibility client.getClient().submitTopology(name, path, serConf, topology); } } finally { client.close(); } } LOG.info("Finished submitting topology: " + name); } // 省略 catch 代码块 }
Storm 任务提交的过程本质上是一个与 nimbus 节点进行 RPC 通信的过程,整体流程可以概括为:
配置的加载与封装过程会验证配置是否为 json 格式,并聚合多个来源的配置封装成 map 集合。在任务提交之前会验证当前 topology 在远程集群的状态,如果当前操作是热部署或灰度发布,则必须保证对应的 topology 在远程集群已经存在,而对于新提交的 topology 来说,如果远程集群存在同名的 topology 则会禁止提交。
Storm 任务的提交分为两个步骤,首先上传 topology 对应的 jar 文件到 nimbus 服务器,上传成功之后才会调用远程方法通知 nimbus 有新任务加入,需要开始为该 topology 制定运行方案。下面先来看一下 jar 报上传的过程,该过程位于 submitJar
方法中:
private static void submitJar(NimbusClient client, Map conf) { if (submittedJar == null) { try { LOG.info("Jar not uploaded to master yet. Submitting jar..."); // 获取对应的 client jar 名称,例如 jstorm-1.0.0-SNAPSHOT.jar String localJar = System.getProperty("storm.jar"); // 为待上传的 jar 包创建存储路径和 Channel,并返回路径值 // ${storm.local.dir}/nimbus/inbox/${key} path = client.getClient().beginFileUpload(); String[] pathCache = path.split("/"); // ${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar"; // 如果设置了 lib jar 则先上传 lib jar List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); // topology.lib.name Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH); // topology.lib.path if (lib != null && lib.size() != 0) { for (String libName : lib) { String jarPath = path + "/lib/" + libName; client.getClient().beginLibUpload(jarPath); submitJar(conf, libPath.get(libName), jarPath, client); } } else { if (localJar == null) { // no lib, no client jar throw new RuntimeException("No client app jar found, please upload it"); } } // 上传 client jar if (localJar != null) { submittedJar = submitJar(conf, localJar, uploadLocation, client); } else { // no client jar, but with lib jar client.getClient().finishFileUpload(uploadLocation); } } catch (Exception e) { throw new RuntimeException(e); } } else { LOG.info("Jar has already been uploaded to master. Will not submit again."); } }
方法首先会获取 topology 对应的 jar 文件名称(项目打包后对应的 jar 文件),然后调用 thrift 方法 beginFileUpload
为待上传的 jar 文件创建存储路径和传输通道,并返回对应的路径值。在 storm.thrift
文件中定义了一个 service 类型的 Nimbus 类,如果你对 thrift 熟悉就应该知道这是一个 service 接口声明,Nimbus 类声明了一些能够与 nimbus 节点进行远程通信的方法,相应方法实现位于 ServiceHandler 类中,可以在该类中找到 beginFileUpload
方法的实现:
public String beginFileUpload() throws TException { String fileLoc = null; try { String key = UUID.randomUUID().toString(); String path = StormConfig.masterInbox(conf) + "/" + key; // ${storm.local.dir}/nimbus/inbox/${key} FileUtils.forceMkdir(new File(path)); FileUtils.cleanDirectory(new File(path)); fileLoc = path + "/stormjar-" + key + ".jar"; // ${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar data.getUploaders().put(fileLoc, Channels.newChannel(new FileOutputStream(fileLoc))); LOG.info("Begin upload file from client to " + fileLoc); return path; } // 省略 catch 代码块 }
方法首先会基于 UUID 为本次需要上传的 jar 文件创建一个唯一的名称标识,然后在 nimbus 本地对应的目录下创建 jar 文件存储路径(如下),同时为该路径创建一个传输通道,并返回该路径(不包含文件名称):
${storm.local.dir}/nimbus/inbox/${key}/stormjar-${key}.jar
接下来就开始执行 jar 文件的上传逻辑,如果我们在自己的代码中提交 topology 时指定了一些依赖包,那么这里首先会上传这些依赖包,然后再上传主程序包。所有的文件上传都位于一个重载版本的 submitJar
方法中,该重载方法会调用远程 uploadChunk
方法执行具体的文件上传操作,并在上传完成之后调用远程 finishFileUpload
方法关闭对应的上传通道。整个过程就是将我们发布机上本地的 topology jar 文件上传到 nimbus 节点对应的本地路径 nimbus/inbox/${key}/stormjar-${key}.jar
下面,其中 key 是一个 UUID 唯一标识。
接下来方法会调用 submitTopology
方法提交 topology 任务,默认会设置 topology 的初始化状态为 ACTIVE。Nimbus 在接收到 RPC 请求之后开始对提交的任务制定运行方案,主要是依据 topology 配置和集群的运行状态为提交的任务分配 task、worker,以及 supervisor。如果成功则返回对应的 topologyId,否则会抛出相应的异常,我们将在下一篇中对整个 topology 任务分配过程进行深入分析。
(本篇完)