Storm 是目前流行的分布式实时流计算框架之一,它提供了可容错分布式计算所要求的基本需求和保障机制,可以满足高吞吐 , 实时的关键业务应用的需求。在编写基于 Storm 的分布式计算应用时,我们首先需要创建一个拓扑(Topology), 拓扑是一个由 Spout 节点和 Bolt 节点构成的有向图,其中 Spout 节点负责采集数据并发射数据流到 Storm 集群,Bolt 节点负责从 Spout 节点或其他 Bolt 节点接收数据流,并进行业务处理。在编写基于 Storm 的分布式应用时,我们可以在 Bolt 节点中调用 ODM 规则集,从而将业务规则相关的逻辑和数据采集以及数据流的控制分离开来,在业务发生变更时,我们可以动态的重新部署新的规则集,而不是重新发布并运行整个 Storm 拓扑。
在编写好 Storm 拓扑后,我们需要将 Storm 应用打包成一个 Uber Jar( 也称为 Fat Jar, 即将所有的依赖类库打包到一起发布),并提交到 Storm 集群上运行,Storm 集群由 Nimbus 节点和 Supervisor 节点构成,其中 Nimbus 节点负责将应用代码(Uber Jar)分发给 Supervisor 节点 , 指派任务,并监控任务的执行。Supervisor 节点负责监听从 Nimbus 节点分配来的任务,启动工作进程来运行相应的任务。Supervisor 节点和 Nimbus 节点通过 zookeeper 来进行通信。
Rule Execution Server (RES) 是 IBM ODM 产品在分布式环境中管理和执行规则集的组件。RES 可作为集中式服务部署,从而响应多个客户机的请求并同时执行多个规则集。它提供的多种规则执行组件,让用户可以选择合适的执行模式,将业务规则管理系统集成到企业应用中。RES 基于模块化的体系结构,使它即可以作为一组 Java SE POJO 对象来部署和运行,又可以在完全兼容 Java EE 的应用程序服务器上部署和运行。它将规则引擎封装为一个 JCA 资源适配器——执行单元 (XU) 资源适配器,在应用程序服务器与规则引擎之间实施 JCA 交互,由 XU 资源适配器来管理规则引擎,加载规则集,并在业务应用和规则引擎间传递业务规则的调用参数和执行结果。
根据规则执行单元 XU 的部署方式,可以将规则调用分成基于 J2EE 的调用方式和基于 J2SE 的调用方式两大类:
除了基于 J2EE 和 J2SE 的两种调用方式外,ODM RES 还提供了一个规则决策服务 (Decision Service) 应用,用户可以通过 Decision Service 用 SOAP 方式或 REST 方式调用规则集。
在 Storm 中调用规则集时,客户端代码运行在 Storm 的工作节点 JVM 中,因此能选择的规则调用方式有:
前三种方式要求将规则执行单元 XU 安装到独立的应用服务器上,当规则执行成为 Storm 实时计算的瓶颈时,我们只能垂直或水平的扩展运行 ODM RES 的应用服务器,并且无论是 EJB 远程调用,MDB 异步消息调用还是 Decision Service 调用,都需要访问位于外部网络的应用服务器,网络延时可能会影响 Storm 实时处理的能力。
在 Storm 中调用规则集的一个比较好的方式是使用基于 J2SE 的规则会话,采用这种调用方式,每个 Storm Executor 会初始化自己的 XU 实例,进行规则运算。规则运算也成为了 Storm 拓扑的一部分,而不是依赖运行于外部网络的 RES 服务器集群。接下来我们将通过 ODM Getting Started 的 miniloan sample 为例,演示一下如何在 Storm 中用 J2SE 规则会话的方式调用的规则集。
<config-property-value> DRIVER_CLASS_NAME=com.ibm.db2.jcc.DB2Driver, URL=jdbc:db2://dbHost:50000/ODMDB, USER=dbUser, PASSWORD=dbPassword, XOM_PERSISTENCE_TYPE=jdbc, XOM_PERSISTENCE_DRIVER_CLASS_NAME=com.ibm.db2.jcc.DB2Driver, XOM_PERSISTENCE_URL=jdbc:db2://dbHost:50000/ODMDB, XOM_PERSISTENCE_USER= dbUser, XOM_PERSISTENCE_PASSWORD= dbPassword </config-property-value>
设置 XU 的 plugins 属性,支持规则集热部署,在 J2SE 的调用方式中,规则集的热部署通知方式为 TCP/IP, 除了在这里作如下设置外:
<config-property-value>
{pluginClass=Management,xuName=default,protocol=tcpip,tcpip.port=1883,tcpip.host=<RES_Console_Host>,tcpip.retryInterval=20}
</config-property-value>
还需要指定 RES 控制台的管理协议为 TCP/IP, 并重新部署 RES 控制台应用,具体配置详情参见 Configuring the Rule Execution Server EAR for TCP/IP management
public class RulesSpout extends BaseRichSpout { SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } @Override public void nextTuple() { Random ran = new Random(); Borrower borrower = new Borrower("John", ran.nextInt(2000), ran.nextInt(100000)); Loan loan = new Loan(ran.nextInt(1000000), 12, 0.05); _collector.emit(new Values(borrower, loan)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("borrower", "loan")); } }
public class RulesExecutionBolt extends BaseRichBolt { private IlrJ2SESessionFactory factory; private IlrStatelessSession session; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { try { factory = new IlrJ2SESessionFactory(); session = factory.createStatelessSession(); } catch (IlrSessionCreationException e) { e.printStackTrace(); } } @Override public void execute(Tuple tuple) { Borrower borrower = (Borrower) tuple.getValueByField("borrower"); Loan loan = (Loan) tuple.getValueByField("loan"); Map<String,Object> rulesetParams = new HashMap<String, Object>(); rulesetParams.put("borrower", borrower); rulesetParams.put("loan", loan); Map<String,Object> result = executeRules(rulesetParams, "/miniloanruleapp/miniloanrules"); System.out.println(((Loan)result.get("loan")).getApprovalStatus()); } private Map<String,Object> executeRules(Map<String,Object> rulesetParams, String rulesetPath) { try { IlrSessionRequest sessionRequest = factory.createRequest(); sessionRequest.setRulesetPath(IlrPath.parsePath(rulesetPath)); sessionRequest.setForceUptodate(true); sessionRequest.setInputParameters(rulesetParams); IlrSessionResponse sessionResponse = session.execute(sessionRequest); return sessionResponse.getOutputParameters(); } catch (IlrSessionException rse) { rse.printStackTrace(); } catch (IlrFormatException rse) { rse.printStackTrace(); } return null; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
public class RulesTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("ruleInput", new RulesSpout(), Integer.parseInt(args[0])); builder.setBolt("ruleExectuion", new RulesExecutionBolt(), Integer.parseInt(args[1])).localOrShuffleGrouping("ruleInput"); Config conf = new Config(); conf.setNumWorkers(Integer.parseInt(args[2])); if (args.length > 3) { StormSubmitter.submitTopology(args[3], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test-minloan", conf, builder.createTopology()); } } }
将 Storm 项目打包成 Uber Jar(minloan-toplogy.jar),并用如下的命令提交打包好的拓扑到 Storm 集群上:
storm jar minloan-toplogy.jar storm. RulesTopology 8 4 4 MiniloanTopology
在提交成功后,我们可以通过 Storm UI 看到运行中的 MiniloanTopology 拓扑以及每个节点处理的吞吐量。在本例中,我们将拓扑提交到 4 个工作节点上,每个工作节点上会运行两个数据采集节点和一个规则运算节点,4 个规则运算节点从同一个规则数据库加载规则集。在业务逻辑发生变更后,业务人员可以直接在 Decision Center 上修改业务规则,然后部署新的规则集到 RES 控制台,RES 控制台会通知每个 Storm 工作节点重新加载最新的规则集,而不需要重新部署整个 Storm 拓扑。
Storm 提供了一个简单的编程模型,让我们能够快速的开发实时的分布式应用,将 Storm 和 ODM 结合起来,我们可以方便的使用 Storm 提供的数据采集框架,从 Kafak 等开源消息队列中采集数据,然后采用 J2SE 的方式调用 ODM 规则集,这样可以很好的支持大批量实时的规则调用需求,并能根据业务需求,动态的扩展规则计算节点的数量。