在"supervisor启动worker源码分析-worker.clj"一文中,我们详细讲解了worker是如何初始化的。主要通过调用mk-worker函数实现的。在启动worker时,需要启动属于该worker的executor。executor是通过worker执行mk-executor函数进行初始化的。在mk-worker函数中调用了mk-executor函数。mk-executor函数定义在executor.clj中。
mk-executor函数
;; worker绑定worker的元数据,executor-id标识改executor的id (defn mk-executor [worker executor-id] ;; executor-data绑定executor的元数据,mk-executor-data函数用于生成executor的元数据。mk-executor-data函数请参见其定义部分 (let [executor-data (mk-executor-data worker executor-id) _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id)) ;; task-datas绑定task_id->task元数据的map。 task-datas (->> executor-data ;; 获取分配给该executor的task_id集合 :task-ids ;; 为每个task生成元数据 (map (fn [t] [t (task/mk-task executor-data t)])) ;; 生成task_id->task元数据的map (into {}) ;; 将clojure结构的map转换成java的HashMap (HashMap.)) _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id)) ;; report-error-and-die函数内调用了向zookeeper写入错误信息的函数和关闭当前executor本身的函数 report-error-and-die (:report-error-and-die executor-data) ;; component-id标识该executor所属的组件 component-id (:component-id executor-data) ;; starting the batch-transfer->worker ensures that anything publishing to that queue ;; doesn't block (because it's a single threaded queue and the caching/consumer started ;; trick isn't thread-safe) ;; 调用start-batch-transfer->worker-handler!函数为每个executor生成"一个"专有的"发送线程"(即executor输出队列disruptor queue的"消息处理者")。system-threads绑定execut ;; or发送线程所对应的SmartThread实例。start-batch-transfer->worker-handler!函数请参见其定义部分 system-threads [(start-batch-transfer->worker-handler! worker executor-data)] ;; with-error-reaction宏主要功能就是当mk-threads函数抛出异常时执行report-error-and-die函数。mk-threads函数为executor生成业务逻辑处理线程,spout的nextTuple方法和bolt的 ;; execute方法都是在业务逻辑处理线程中执行的。handlers绑定业务逻辑处理线程所对应的SmartThread实例 handlers (with-error-reaction report-error-and-die ;; 调用mk-threads函数为executor生成业务逻辑处理线程,业务逻辑处理线程分为spout和bolt两种。mk-threads函数将会在以后文章中详细分析 (mk-threads executor-data task-datas)) ;; threads绑定exectuor发送线程和业务逻辑处理线程所对应的SmartThread实例的集合 threads (concat handlers system-threads)] ;; 使用schedule-recurring定期产生SYSTEM_TICK(触发spout pending轮换) 。setup-ticks!函数会在以后文章中详细分析 (setup-ticks! worker executor-data) (log-message "Finished loading executor " component-id ":" (pr-str executor-id)) ;; TODO: add method here to get rendered stats... have worker call that when heartbeating ;; 返回一个实现了RunningExecutor协议和Shutdownable协议的实例标识该exectuor (reify RunningExecutor ;; render-stats函数用于生成该exectuor的统计信息 (render-stats [this] (stats/render-stats! (:stats executor-data))) ;; get-executor-id函数用于获取exectuor的id (get-executor-id [this] executor-id ) Shutdownable ;; shutdown函数可以关闭该executor所占有的资源 (shutdown [this] (log-message "Shutting down executor " component-id ":" (pr-str executor-id)) ;; 关闭该exectuor的接收队列 (disruptor/halt-with-interrupt! (:receive-queue executor-data)) ;; 关闭该executor的发送队列 (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data)) ;; 中断该executor所拥有的业务逻辑处理线程和发送线程 (doseq [t threads] (.interrupt t) (.join t)) (doseq [user-context (map :user-context (vals task-datas))] (doseq [hook (.getHooks user-context)] (.cleanup hook))) ;; 关闭zookeeper连接 (.disconnect (:storm-cluster-state executor-data)) ;; 如果spout的open方法或bolt的prepare方法已经执行 (when @(:open-or-prepare-was-called? executor-data) ;; 那么需要调用spout的close方法和bolt的cleanup方法 (doseq [obj (map :object (vals task-datas))] (close-component executor-data obj))) (log-message "Shut down executor " component-id ":" (pr-str executor-id))) )))
mk-executor-data函数
;; mk-executor-data函数生成executor的元数据 (defn mk-executor-data [worker executor-id] ;; worker-context绑定worker元数据中的backtype.storm.task.WorkerTopologyContext对象,该对象主要封装了运行在该worker上的topology的相关信息 (let [worker-context (worker-context worker) ;; task-ids标识运行在该worker上的任务,executor-id->tasks函数通过分解executor_id生成task_id。如果executor_id形如[1 3],那么task-ids=(1 2 3)。executor-id->tasks函数 ;; 请参见其定义部分 task-ids (executor-id->tasks executor-id) ;; component-id绑定该executor所属的组件,即某个spout或bolt的id。worker-context对象的getComponentId方法从WorkerTopologyContext的Map<Integer, String> _taskToCompone ;; nt成员属性中获取组件id component-id (.getComponentId worker-context (first task-ids)) ;; storm-conf绑定该executor所属的组件的配置信息。normalized-component-conf函数请参见其定义部分 storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id) ;; executor-type绑定该executor所属的组件的类型,即该组件是spout还是bolt。executor-type函数请参见其定义部分 executor-type (executor-type worker-context component-id) ;; 创建该executor的输出队列,executor的输出队列是disruptor队列 batch-transfer->worker (disruptor/disruptor-queue ;; 输出队列名称 (str "executor" executor-id "-send-queue") ;; 输出队列的大小 (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) ;; 单线程生产者向输出队列发送消息 :claim-strategy :single-threaded ;; disruptor输出队列满时的等待策略 :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) ] ;; recursive-map宏返回一个map作为该executor的元数据 (recursive-map ;; 保存所属worker的元数据 :worker worker ;; 保存worker-context :worker-context worker-context ;; 保存executor-id :executor-id executor-id ;; 保存运行在该executor上的任务id :task-ids task-ids ;; 保存该executor所属的组件的id :component-id component-id ;; 定义一个原子类型变量,保存spout的open方法或bolt的prepare方法是否执行 :open-or-prepare-was-called? (atom false) ;; 保存该executor的配置信息 :storm-conf storm-conf ;; 保存该executor的接收队列,也是一个disruptor队列 :receive-queue ((:executor-receive-queue-map worker) executor-id) ;; 保存该executor所属topology的id :storm-id (:storm-id worker) ;; 保存所属worker的配置信息 :conf (:conf worker) ;; 创建一个HashMap对象用于保存共享信息 :shared-executor-data (HashMap.) ;; 保存该executor所属topology的活跃状态 :storm-active-atom (:storm-active-atom worker) ;; 保存该executor的输出队列 :batch-transfer-queue batch-transfer->worker ;; 保存executor的executor-transfer-fn函数,用于将消息发送给executor的输出队列 :transfer-fn (mk-executor-transfer-fn batch-transfer->worker) ;; 保存所属worker的关闭函数 :suicide-fn (:suicide-fn worker) ;; 创建属于该executor的StormClusterState对象 :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)) ;; 保存该executor的类型 :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) ;; mk-executor-stats函数为该executor生成一个"统计实例",如果该executor属于某个spout,那么生成SpoutExecutorStats实例,如果该executor属于某个bolt,那么生成BoltExecutorStat ;; s实例。关于统计模块的内容会在以后文章中详细分析 ;; sampling-rate函数从executor配置信息中获取消息采样频率。 :stats (mk-executor-stats <> (sampling-rate storm-conf)) :interval->task->metric-registry (HashMap.) ;; 保存该executor所属的worker上任务id->组件名称键值对的map :task->component (:task->component worker) ;; 调用outbound-components函数生成stream_id->(接收组件id->分组函数的map)的map。这样storm就知道component-id发送的消息由哪些组件的哪些task接收。outbound-components函数请参 ;; 见其定义部分 :stream->component->grouper (outbound-components worker-context component-id) ;; 调用throttled-report-error-fn函数生成报告错误函数,用于将错误信息写入zookeeper。throttled-report-error-fn函数请参见其定义部分 :report-error (throttled-report-error-fn <>) ;; 保存一个匿名函数,该匿名函数调用了report-error函数和suicide-fn函数 :report-error-and-die (fn [error] ((:report-error <>) error) ((:suicide-fn <>))) ;; 生成kryo反序列化器 :deserializer (KryoTupleDeserializer. storm-conf worker-context) ;; 调用mk-stats-sampler函数,根据conf里面的采样频率创建一个采样器sampler。mk-stats-sampler函数请参见其定义部分 :sampler (mk-stats-sampler storm-conf) ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function? )))
executor-id->tasks函数
;; executor-id->tasks函数通过分解executor_id生成task_id,首先顺序解构executor_id,executor_id形如[1 1]或[1 3] (defn executor-id->tasks [[first-task-id last-task-id]] ;; range函数生成序列,last-task-id+1>序列范围>=first-task-id (->> (range first-task-id (inc last-task-id)) ;; 返回int序列 (map int)))
normalized-component-conf函数
;; normalized-component-conf函数主要功能就是获取组件的配置信息 (defn- normalized-component-conf [storm-conf general-context component-id] ;; to-remove绑定删除组件特有配置项后,剩余的配置项。ALL-CONFIGS函数获取backtype.storm.Config类所有属性名 (let [to-remove (disj (set ALL-CONFIGS) TOPOLOGY-DEBUG TOPOLOGY-MAX-SPOUT-PENDING TOPOLOGY-MAX-TASK-PARALLELISM TOPOLOGY-TRANSACTIONAL-ID TOPOLOGY-TICK-TUPLE-FREQ-SECS TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS TOPOLOGY-SPOUT-WAIT-STRATEGY ) ;; spec-conf绑定指定component-id的特有配置信息 spec-conf (-> general-context ;; 从backtype.storm.task.WorkerTopologyContext对象中获取指定component-id的ComponentCommon对象 (.getComponentCommon component-id) ;; 从ComponentCommon对象中获取组件特有配置信息 .get_json_conf ;; 将json字符串转换成map from-json)] ;; 首先从spec-conf中删除键包含在to-remove集合的键值对,即保留组件特有的配置信息;然后将spec-conf合并到storm-conf中。进而得到组件的全部配置信息 (merge storm-conf (apply dissoc spec-conf to-remove)) ))
executor-type函数
;; executor-type函数的主要功能就是获取指定组件的类型,即该组件是spout还是bolt (defn executor-type [^WorkerTopologyContext context component-id] ;; topology绑定StormTopology对象 (let [topology (.getRawTopology context) ;; spouts绑定component_id->SpoutSpec的map spouts (.get_spouts topology) ;; bolts绑定component_id->Bolt的map bolts (.get_bolts topology)] ;; 如果spouts包含该component-id,则返回关键字:spout;如果bolts包含该component-id,则返回关键字:bolt (cond (contains? spouts component-id) :spout (contains? bolts component-id) :bolt :else (throw-runtime "Could not find " component-id " in topology " topology))))
mk-executor-transfer-fn函数
;; mk-executor-transfer-fn函数用于生成executor的transfer-fn函数,transfer-fn函数主要功能就是将executor生成的消息发送给executor特有的输出队列 ;; 为了避免component block(大量的tuple没有被及时处理),额外创建了overflow buffer,只有当这个overflow-buffer也满了,才停止nextTuple(对于spout executor比较需要overflow-buffer) (defn mk-executor-transfer-fn [batch-transfer->worker] ;; this函数用于将[task, tuple]放到overflow-buffer或者batch-transfer->worker队列中 (fn this ([task tuple block? ^List overflow-buffer] ;; 如果overflow-buffer存在并且不为空,说明该executor的输出队列已经满了,那么直接把消息放到overflow-buffer中 (if (and overflow-buffer (not (.isEmpty overflow-buffer))) (.add overflow-buffer [task tuple]) ;; 否则就把消息放到该executor的输出队列中 (try-cause (disruptor/publish batch-transfer->worker [task tuple] block?) (catch InsufficientCapacityException e ;; 在消息放入输出队列的过程中抛出了异常,那么首先检查overflow-buffer是否存在,如果存在,则将出错的消息存入overflow-buffer (if overflow-buffer (.add overflow-buffer [task tuple]) (throw e)) )))) ([task tuple overflow-buffer] (this task tuple (nil? overflow-buffer) overflow-buffer)) ([task tuple] (this task tuple nil) )))
sampling-rate函数
;; sampling-rate函数获取消息采样频率,最终返回一个整数X=1/采样频率,即每隔X个消息采样一次 (defn sampling-rate [conf] (->> (conf TOPOLOGY-STATS-SAMPLE-RATE) (/ 1) int))
mk-executor-stats函数
;; mk-executor-stats函数是一个多重函数。根据executor-selector函数的返回结果决定具体调用哪个函数 (defmulti mk-executor-stats executor-selector) ;; executor-selector函数返回executor所属组件的类型 (defn executor-selector [executor-data & _] (:type executor-data)) ;; 如果executor-selector函数返回关键字:spout,那么执行该函数 (defmethod mk-executor-stats :spout [_ rate] (stats/mk-spout-stats rate)) ;; 如果executor-selector函数返回关键字:bolt,那么执行该函数 (defmethod mk-executor-stats :bolt [_ rate] (stats/mk-bolt-stats rate))
outbound-components函数
(defn outbound-components "Returns map of stream id to component id to grouper" [^WorkerTopologyContext worker-context component-id] ;; 调用context的getTargets方法,获取哪些组件接收了componentId输出的消息,getTargets方法返回一个stream_id->{receive_component_id->Grouping}的map。getTargets方法请参见其定义部分 (->> (.getTargets worker-context component-id) ;; clojurify-structure函数比较简单就是将java的Map对象转换成clojure结构的map clojurify-structure ;; stream-id标识流id,component->grouping标识组件到分组的map。map函数的返回结果形如[stream-id-1 {component_1 fn_1 component_2 fn_2 component_n fn_n} ...... ] (map (fn [[stream-id component->grouping]] [stream-id ;; outbound-groupings函数生成接收组件id->分组函数的map,分组函数确定由接收组件的那些task接收该消息。outbound-groupings函数请参见其定义部分 (outbound-groupings worker-context component-id stream-id (.getComponentOutputFields worker-context component-id stream-id) component->grouping)])) ;; 生成stream_id->(接收组件id->分组函数的map)的map (into {}) (HashMap.)))
getTargets方法
;; WorkerTopologyContext类继承GeneralTopologyContext类,getTargets方法是GeneralTopologyContext类实例方法,主要功能就是获取哪些组件接收了componentId输出的消息 ;; 返回值为一个stream_id->{receive_component_id->Grouping}的map,receive_component_id就是接收组件的id public Map<String, Map<String, Grouping>> getTargets(String componentId) { ;; 创建返回结果map,ret Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>(); ;; 获取该topology的所有组件ids,并遍历 for(String otherComponentId: getComponentIds()) { ;; 通过组件id获取组件的ComponentCommon对象,然后再获取其输入信息inputs Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs(); ;; 遍历输入信息,GlobalStreamId对象有两个成员属性,一个是流id,一个是发送该流的组件id for(GlobalStreamId id: inputs.keySet()) { ;; 如果输入流的组件id和componentId相等,那么说明该组件接收来自componentId的输出,则将其添加到ret中 if(id.get_componentId().equals(componentId)) { Map<String, Grouping> curr = ret.get(id.get_streamId()); if(curr==null) curr = new HashMap<String, Grouping>(); curr.put(otherComponentId, inputs.get(id)); ret.put(id.get_streamId(), curr); } } } return ret; }
outbound-groupings函数
(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping] ;; component->grouping绑定接收来自组件this-component-id消息的组件id->Grouping的map (->> component->grouping ;; 从component->grouping中过滤掉组件任务数为0的键值对。filter-key函数的第一个参数是一个匿名函数,这个匿名函数以组件id作为参数,如果匿名函数返回false,那么就将组件id所对应的键值对删除 (filter-key #(-> worker-context ;; 调用WorkerTopologyContext的getComponentTasks方法获取获取指定组件的任务列表 (.getComponentTasks %) ;; 统计指定组件任务总数 count ;; 任务数大于0返回true,否则返回false pos?)) ;; 为每个接收组件生成分组函数,map函数的返回结果形如([component_1 fn_1] [component_2 fn_2] ...... [component_n fn_n]) (map (fn [[component tgrouping]] [component ;; 为接收组件component生成分组函数,mk-grouper函数请参见其定义部分 (mk-grouper worker-context this-component-id stream-id out-fields tgrouping (.getComponentTasks worker-context component) )])) ;; 生成接收组件id->分组函数的map (into {}) (HashMap.)))
mk-grouper函数
(defn- mk-grouper "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index." ;; stream-id标识component-id发送的流,out-fields标识component-id发送的stream-id的输出域,thrift-grouping标识接收组件所使用的分组Grouping,target-tasks标识接收组件的任务id列 ;; 表java.util.List [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks] ;; num-tasks标识接收组件任务总数 (let [num-tasks (count target-tasks) ;; random标识Random对象 random (Random.) ;; 对列表target-tasks进行排序并转换成clojure中的vector类型 target-tasks (vec (sort target-tasks))] ;; 调用grouping-type函数获取分组类型,thrift-grouping是backtype.storm.generated.Grouping类的对象,grouping-type函数请参见其定义部分 (condp = (thrift/grouping-type thrift-grouping) ;; 如果为:fields分组关键字 :fields ;; 由于"field-grouping按域分组"和"global-grouping分组"共用分组类型关键字:fields,所以需要确定到底是哪种分组。Grouping类继承了org.apache.thrift.TUnion类,这样Grouping类 ;; 有属性Object value_和Grouping._Fields setField_, ;; 当setField_值为_Fields.FIELDS时,value_值为List<String>。如果value_为空列表,那么就是"global-grouping分组";否则为"field-grouping按域分组"。global-grouping?函数就 ;; 是通过检查value_是否为空列表来确定分组的。 (if (thrift/global-grouping? thrift-grouping) ;; 如果是"global-grouping分组",返回该匿名函数作为分组函数,通过这个分组函数,storm就知道"global-grouping分组"的消息该如何发送了 (fn [task-id tuple] ;; It's possible for target to have multiple tasks if it reads multiple sources ;; "global-grouping分组"会将消息发送给接收组件任务id最小的task,也就是说只有任务id最小的task会接收到消息 (first target-tasks)) ;; 如果是"field-grouping按域分组",调用field-grouping函数获取用于分组的域列表并生成backtype.storm.tuple.Fields对象 (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] ;; mk-fields-grouper用于生成"field-grouping按域分组"的分组函数。mk-fields-grouper函数请参见其定义部分 (mk-fields-grouper out-fields group-fields target-tasks) )) ;; 如果为:all分组关键字 :all ;; 返回该分组函数。接收组件每个任务都会接收该消息 (fn [task-id tuple] target-tasks) ;; 如果为:shuffle分组关键字 :shuffle ;; 返回该分组函数。将消息随机发送给接收组件的一个任务。mk-shuffle-grouper函数请参见其定义部分 (mk-shuffle-grouper target-tasks) ;; 如果为:local-or-shuffle分组关键字 :local-or-shuffle ;; same-tasks绑定运行在该worker上的接收组件的任务id集合,也就是与当前消息发送组件component-id位于同一个worker的接收组件id的集合 (let [same-tasks (set/intersection ;; 将接收组件任务id列表转换成集合 (set target-tasks) ;; 调用WorkerTopologyContext的getThisWorkerTasks方法获取运行在该worker上的任务id集合 (set (.getThisWorkerTasks context)))] (if-not (empty? same-tasks) ;; 如果same-tasks不为空,则将same-tasks作为接收组件任务id集合,这样就会在same-tasks集合中随机选择接收组件任务,也就是只选择本地接收组件任务 (mk-shuffle-grouper (vec same-tasks)) ;; 否则same-tasks为空,那么"local-or-shuffle分组"与"shuffle分组"的分组函数完全相同,两个分组的效果相同 (mk-shuffle-grouper target-tasks))) ;; 如果为:none分组关键字 :none ;; 返回该分组函数。 (fn [task-id tuple] ;; 随机生成一个int值,将该int值取模num-tasks获取接收组件任务索引 (let [i (mod (.nextInt random) num-tasks)] ;; 获取接收任务组件id (.get target-tasks i) )) ;; 如果为:custom-object分组关键字,即使用用户自定义的分组 :custom-object ;; grouping绑定用户自定义的分组对象 (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))] ;; 调用mk-custom-grouper函数生成用户自定义分组的分组函数。mk-custom-grouper函数请参见其定义部分 (mk-custom-grouper grouping context component-id stream-id target-tasks)) ;; 如果为:custom-serialized分组关键字,即分组对象是序列化对象 :custom-serialized ;; 获取分组对象序列化的方式,将分组对象反序列化。grouping绑定反序列化后的分组对象 (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))] ;; 调用mk-custom-grouper函数生成用户自定义分组的分组函数。mk-custom-grouper函数请参见其定义部分 (mk-custom-grouper grouping context component-id stream-id target-tasks)) ;; 如果为:direct分组关键字,直接返回:direct关键字 :direct :direct )))
grouping-type函数
;; grouping-type函数用于获取分组类型 (defn grouping-type ;; grouping参数的类型是backtype.storm.generated.Grouping,该类是thrift结构生成的java类。 [^Grouping grouping] ;; grouping-constants是一个map,键为enum _Fields值,值为分组类型关键字。调用grouping的getSetField方法获取分组类型,分组类型是一个enum _Fields值,enum _Fields类是Grouping类的内部类。请参见其定义部分 (grouping-constants (.getSetField grouping)))
grouping-constants
(def grouping-constants {Grouping$_Fields/FIELDS :fields Grouping$_Fields/SHUFFLE :shuffle Grouping$_Fields/ALL :all Grouping$_Fields/NONE :none Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized Grouping$_Fields/CUSTOM_OBJECT :custom-object Grouping$_Fields/DIRECT :direct Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})
enum _Fields类
public enum _Fields implements org.apache.thrift.TFieldIdEnum { FIELDS((short)1, "fields"), SHUFFLE((short)2, "shuffle"), ALL((short)3, "all"), NONE((short)4, "none"), DIRECT((short)5, "direct"), CUSTOM_OBJECT((short)6, "custom_object"), CUSTOM_SERIALIZED((short)7, "custom_serialized"), LOCAL_OR_SHUFFLE((short)8, "local_or_shuffle"); private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); static { for (_Fields field : EnumSet.allOf(_Fields.class)) { byName.put(field.getFieldName(), field); } } /** * Find the _Fields constant that matches fieldId, or null if its not found. */ public static _Fields findByThriftId(int fieldId) { switch (fieldId) { case 1: // FIELDS return FIELDS; case 2: // SHUFFLE return SHUFFLE; case 3: // ALL return ALL; case 4: // NONE return NONE; case 5: // DIRECT return DIRECT; case 6: // CUSTOM_OBJECT return CUSTOM_OBJECT; case 7: // CUSTOM_SERIALIZED return CUSTOM_SERIALIZED; case 8: // LOCAL_OR_SHUFFLE return LOCAL_OR_SHUFFLE; default: return null; } } /** * Find the _Fields constant that matches fieldId, throwing an exception * if it is not found. */ public static _Fields findByThriftIdOrThrow(int fieldId) { _Fields fields = findByThriftId(fieldId); if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); return fields; } /** * Find the _Fields constant that matches name, or null if its not found. */ public static _Fields findByName(String name) { return byName.get(name); } private final short _thriftId; private final String _fieldName; _Fields(short thriftId, String fieldName) { _thriftId = thriftId; _fieldName = fieldName; } public short getThriftFieldId() { return _thriftId; } public String getFieldName() { return _fieldName; } }
mk-fields-grouper函数
;; mk-fields-grouper函数用于生成"field-grouping按域分组"的分组函数 (defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks] ;; num-tasks标识接收组件任务总数 (let [num-tasks (count target-tasks) ;; task-getter根据List索引获取任务id task-getter (fn [i] (.get target-tasks i))] ;; 返回"field-grouping按域分组"的分组函数。task-id标识输出消息的任务id,values标识域的值列表 (fn [task-id ^List values] ;; 调用out-fields的select方法获取分组域的值列表 (-> (.select out-fields group-fields values) ;; 获取分组域值列表的hashcode tuple/list-hash-code ;; 用分组域值列表的hashcode mod 接收组件任务总数num-tasks,获取接收任务在target-tasks列表中的索引 (mod num-tasks) ;; 根据任务索引从target-tasks列表中获取接收任务的id task-getter))))
mk-shuffle-grouper函数
;; mk-shuffle-grouper函数用于生成"shuffle-grouping随机分组"的分组函数 (defn- mk-shuffle-grouper [^List target-tasks] ;; choices绑定打乱顺序后的接收组件任务id列表,形如:[(MutableInt. -1) choices rand],第一个元素为列表的当前索引,第二元素为打乱顺序后的任务id列表,第三个元素为打乱顺序时使用的Random。rotating-random-range函数请参见其定义部分 (let [choices (rotating-random-range target-tasks)] ;; 返回分组函数 (fn [task-id tuple] ;; 调用acquire-random-range-id函数从choices中获取一个任务id,acquire-random-range-id函数请参见其定义部分 (acquire-random-range-id choices))))
rotating-random-range函数
;; rotating-random-range函数用于打乱集合choices的元素顺序 (defn rotating-random-range [choices] ;; rand绑定一个Random对象,用于shuffle方法 (let [rand (Random.) ;; 复制一个新的ArrayList对象 choices (ArrayList. choices)] ;; 调用Collections类的shuffle方法使用rand打乱choices中元素的顺序 (Collections/shuffle choices rand) ;; 返回打乱后的choices [(MutableInt. -1) choices rand]))
acquire-random-range-id函数
;; acquire-random-range-id函数用于获取列表中指定索引位置的元素 (defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]] ;; 当前索引curr自增加1,如果curr大于列表的size,那么将curr重置为0 (when (>= (.increment curr) (.size state)) (.set curr 0) ;; 然后调用Collections类的shuffle方法使用rand打乱state中元素的顺序 (Collections/shuffle state rand)) ;; 返回列表state中curr位置上的元素 (.get state (.get curr)))
mk-custom-grouper函数
;; 用户自定义的分组需要实现CustomStreamGrouping接口,CustomStreamGrouping接口有两个方法,分别为prepare方法和chooseTasks方法。prepare方法主要为chooseTasks方法执行做些准备工作 (defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks] (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks) (fn [task-id ^List values] (.chooseTasks grouping task-id values) ))
throttled-report-error-fn函数
(defn throttled-report-error-fn [executor] ;; storm-conf绑定executor的配置信息 (let [storm-conf (:storm-conf executor) ;; error-interval-secs绑定错误报告间隔时间,单位是分钟 error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS) ;; max-per-interval绑定在一个时间间隔内允许一个任务最多报告错误次数 max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL) ;; interval-start-time标识起始时间 interval-start-time (atom (current-time-secs)) ;; interval-errors标识当前时间间隔内已经报告错误的次数 interval-errors (atom 0) ] ;; 返回报告错误函数 (fn [error] ;; 记录错误日志 (log-error error) ;; 如果当前时间 - 起始时间 > 错误报告间隔时间,说明进入了一个新的错误报告周期 (when (> (time-delta @interval-start-time) error-interval-secs) ;; 将错误总数置0 (reset! interval-errors 0) ;; 重置interval-start-time为当前时间 (reset! interval-start-time (current-time-secs))) ;; 将该周期的错误总数加1 (swap! interval-errors inc) ;; 如果当前周期错误总数 > 每周期最大错误总数 (when (<= @interval-errors max-per-interval) ;; 调用cluster.clj中的report-error函数将错误信息写入zookeeper。 (cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) (memoized-local-hostname) (.getThisWorkerPort (:worker-context executor)) error) ))))
report-error函数
;; report-error函数用于将组件异常信息写入zookeeper report-error [this storm-id component-id node port error] ;; path绑定"/errors/{storm-id}/{component-id}" (let [path (error-path storm-id component-id) ;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口 data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port} ;; 创建/errors/{storm-id}/{component-id}节点 _ (mkdirs cluster-state path) ;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息 _ (create-sequential cluster-state (str path "/e") (Utils/serialize data)) ;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合 to-kill (->> (get-children cluster-state path false) (sort-by parse-error-path) reverse (drop 10))] ;; 删除to-kill中包含的节点 (doseq [k to-kill] (delete-node cluster-state (str path "/" k))))
mk-stats-sampler函数
;; mk-stats-sampler函数根据conf里面的采样频率创建一个采样器sampler (defn mk-stats-sampler [conf] ;; even-sampler绑定一个带有元数据{:rate freq}的采样函数。sampling-rate函数请参见其定义部分 (even-sampler (sampling-rate conf)))
even-sampler函数
;; even-sampler函数返回一个带有元数据{:rate freq}的匿名函数作为采样函数 (defn even-sampler ;; freq绑定采用频率的倒数,即最多每隔freq个消息采样一次 [freq] ;; 将frep转换成int类型 (let [freq (int freq) ;; start初始值为0 start (int 0) ;; r标识一个Random对象 r (java.util.Random.) ;; curr初始值为-1 curr (MutableInt. -1) ;; target标识[0, freq)之间的一个int值 target (MutableInt. (.nextInt r freq))] ;; 调用clojure的with-meta函数为匿名函数fn添加元数据{:rate freq},并返回该匿名函数作为采样函数。通过(:rate (meta sampler)),可以从sampler的meta里面取出rate值 (with-meta ;; 该匿名函数作为采样函数,主要功能就是curr第一次从-1开始递增,之后从start开始递增,在于target相等之前,采样函数都返回false,只有当curr和target相等时才返回true,如果curr >= freq时,将curr重置为0,target重新绑定[0, freq)之间的一个int值 ;; 通过不停的调用采样函数我们就可以获取若干次false和一次true,当true时进行采样。 (fn [] ;; i = ++curr (let [i (.increment curr)] ;; 如果i >= freq,那么将curr重置为0,target重新绑定[0, freq)之间的一个int值 (when (>= i freq) (.set curr start) (.set target (.nextInt r freq)))) ;; 如果curr和target相等,那么返回true,否则返回false (= (.get curr) (.get target))) ;; 元数据 {:rate freq})))
start-batch-transfer->worker-handler!函数
;; start-batch-transfer->worker-handler!函数为每个executor生成"一个"专有的"发送线程"(即executor输出队列disruptor queue的"消息处理者")。发送线程不停的从executor的输出队列请求消息,然后以"批"为单位发送消息 ;; 对于"本地消息"(即接收该消息的executor与发送该消息的executor分布在同一个worker上)直接发送到接收executor的接收队列中。对于"远程消息",将消息发送到executor所属的worker的输出队列中 (defn start-batch-transfer->worker-handler! [worker executor-data] ;; worker-transfer-fn绑定worker的传输函数 (let [worker-transfer-fn (:transfer-fn worker) ;; cached-emit是一个ArrayList,用于缓存消息,缓存的消息数达到batch size时,将其发送出去 cached-emit (MutableObject. (ArrayList.)) ;; storm-conf绑定executor的元数据 storm-conf (:storm-conf executor-data) ;; serializer绑定一个KryoTupleSerializer对象,serializer用于序列化消息 serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) ] ;; 调用disruptor.clj中的consume-loop*函数, (disruptor/consume-loop* ;; executor的输出队列 (:batch-transfer-queue executor-data) ;; 调用handler宏获取clojure-handler函数,通过调用clojure-handler函数获取disruptor queue的"消息处理者"。[o seq-id batch-end?]是消息"具体处理函数"的参数列表,handler宏请参见其定义部分 (disruptor/handler [o seq-id batch-end?] ;; let格式中包含的内容作为消息"具体处理函数"的函数体 (let [^ArrayList alist (.getObject cached-emit)] ;; "消息处理者"不停的从disruptor queue中请求消息,每获取一个消息就将其缓存到cached-emit中 (.add alist o) ;; 如果cached-emit缓存的消息总数达到了batch size (when batch-end? ;; 那么就调用worker-transfer-fn函数将一批消息(消息的ArrayList对象)发送出去 (worker-transfer-fn serializer alist) ;; 然后重置cached-emit (.setObject cached-emit (ArrayList.)) ))) :kill-fn (:report-error-and-die executor-data))))
consume-loop*函数
(defnk consume-loop* ;; handler标识queue的"消息处理者" [^DisruptorQueue queue handler :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))] ;; async-loop函数接收一个"函数"或"函数工厂"作为参数生成一个java thread,这个java thread不断循环执行这个"函数"或"函数工厂"生产的函数。async-loop函数返回实现SmartThread协议的实例,通过该实例我们可以start、join、interrupt接收线程 (let [ret (async-loop ;; 生成的java thread将不停的调用该匿名函数 (fn [] (consume-batch-when-available queue handler) 0) :kill-fn kill-fn ;; 指定生成的java thread的名称 :thread-name (.getName queue))] (consumer-started! queue) ret))
handler宏
;; handler宏用于将消息的"具体处理函数"传递给clojure-handler函数,并返回clojure-handler函数 (defmacro handler [& args] `(clojure-handler (fn ~@args)))
clojure-handler函数
;; clojure-handler函数用于生成一个disruptor queue的"消息处理者"(也可以看成disruptor queue的消费者)。消息处理者不停的从disruptor queue中请求消息,每获取一个消息将会触发执行onEvent函数。消息的具体处理逻辑是在afn函数中实现的 (defn clojure-handler [afn] ;; 返回一个实现com.lmax.disruptor.EventHandler接口的实例作为disruptor queue的"消息处理者"。消息的具体处理逻辑是在afn函数中实现的 (reify com.lmax.disruptor.EventHandler (onEvent [this o seq-id batchEnd?] ;; 调用afn函数处理消息 (afn o seq-id batchEnd?))))
以上就是worker启动executor源码分析全部内容。