转载

worker启动executor源码分析-executor.clj

在"supervisor启动worker源码分析-worker.clj"一文中,我们详细讲解了worker是如何初始化的。主要通过调用mk-worker函数实现的。在启动worker时,需要启动属于该worker的executor。executor是通过worker执行mk-executor函数进行初始化的。在mk-worker函数中调用了mk-executor函数。mk-executor函数定义在executor.clj中。

mk-executor函数

;; worker绑定worker的元数据,executor-id标识改executor的id (defn mk-executor [worker executor-id]   ;; executor-data绑定executor的元数据,mk-executor-data函数用于生成executor的元数据。mk-executor-data函数请参见其定义部分   (let [executor-data (mk-executor-data worker executor-id)         _ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))  ;; task-datas绑定task_id->task元数据的map。         task-datas (->> executor-data                  ;; 获取分配给该executor的task_id集合                         :task-ids    ;; 为每个task生成元数据                         (map (fn [t] [t (task/mk-task executor-data t)]))    ;; 生成task_id->task元数据的map                         (into {})    ;; 将clojure结构的map转换成java的HashMap                         (HashMap.))         _ (log-message "Loaded executor tasks " (:component-id executor-data) ":" (pr-str executor-id))  ;; report-error-and-die函数内调用了向zookeeper写入错误信息的函数和关闭当前executor本身的函数         report-error-and-die (:report-error-and-die executor-data)  ;; component-id标识该executor所属的组件         component-id (:component-id executor-data)          ;; starting the batch-transfer->worker ensures that anything publishing to that queue          ;; doesn't block (because it's a single threaded queue and the caching/consumer started         ;; trick isn't thread-safe)  ;; 调用start-batch-transfer->worker-handler!函数为每个executor生成"一个"专有的"发送线程"(即executor输出队列disruptor queue的"消息处理者")。system-threads绑定execut        ;; or发送线程所对应的SmartThread实例。start-batch-transfer->worker-handler!函数请参见其定义部分         system-threads [(start-batch-transfer->worker-handler! worker executor-data)]  ;; with-error-reaction宏主要功能就是当mk-threads函数抛出异常时执行report-error-and-die函数。mk-threads函数为executor生成业务逻辑处理线程,spout的nextTuple方法和bolt的        ;; execute方法都是在业务逻辑处理线程中执行的。handlers绑定业务逻辑处理线程所对应的SmartThread实例         handlers (with-error-reaction report-error-and-die             ;; 调用mk-threads函数为executor生成业务逻辑处理线程,业务逻辑处理线程分为spout和bolt两种。mk-threads函数将会在以后文章中详细分析                    (mk-threads executor-data task-datas))         ;; threads绑定exectuor发送线程和业务逻辑处理线程所对应的SmartThread实例的集合         threads (concat handlers system-threads)]        ;; 使用schedule-recurring定期产生SYSTEM_TICK(触发spout pending轮换) 。setup-ticks!函数会在以后文章中详细分析     (setup-ticks! worker executor-data)      (log-message "Finished loading executor " component-id ":" (pr-str executor-id))     ;; TODO: add method here to get rendered stats... have worker call that when heartbeating     ;; 返回一个实现了RunningExecutor协议和Shutdownable协议的实例标识该exectuor     (reify       RunningExecutor       ;; render-stats函数用于生成该exectuor的统计信息       (render-stats [this]         (stats/render-stats! (:stats executor-data)))       ;; get-executor-id函数用于获取exectuor的id       (get-executor-id [this]         executor-id )       Shutdownable       ;; shutdown函数可以关闭该executor所占有的资源       (shutdown         [this]         (log-message "Shutting down executor " component-id ":" (pr-str executor-id))  ;; 关闭该exectuor的接收队列         (disruptor/halt-with-interrupt! (:receive-queue executor-data))  ;; 关闭该executor的发送队列         (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))  ;; 中断该executor所拥有的业务逻辑处理线程和发送线程         (doseq [t threads]           (.interrupt t)           (.join t))                  (doseq [user-context (map :user-context (vals task-datas))]           (doseq [hook (.getHooks user-context)]             (.cleanup hook)))  ;; 关闭zookeeper连接         (.disconnect (:storm-cluster-state executor-data))  ;; 如果spout的open方法或bolt的prepare方法已经执行         (when @(:open-or-prepare-was-called? executor-data)    ;; 那么需要调用spout的close方法和bolt的cleanup方法           (doseq [obj (map :object (vals task-datas))]             (close-component executor-data obj)))         (log-message "Shut down executor " component-id ":" (pr-str executor-id)))         ))) 

mk-executor-data函数

;; mk-executor-data函数生成executor的元数据 (defn mk-executor-data [worker executor-id]   ;; worker-context绑定worker元数据中的backtype.storm.task.WorkerTopologyContext对象,该对象主要封装了运行在该worker上的topology的相关信息   (let [worker-context (worker-context worker)         ;; task-ids标识运行在该worker上的任务,executor-id->tasks函数通过分解executor_id生成task_id。如果executor_id形如[1 3],那么task-ids=(1 2 3)。executor-id->tasks函数         ;; 请参见其定义部分         task-ids (executor-id->tasks executor-id)  ;; component-id绑定该executor所属的组件,即某个spout或bolt的id。worker-context对象的getComponentId方法从WorkerTopologyContext的Map<Integer, String> _taskToCompone        ;; nt成员属性中获取组件id         component-id (.getComponentId worker-context (first task-ids))  ;; storm-conf绑定该executor所属的组件的配置信息。normalized-component-conf函数请参见其定义部分         storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)  ;; executor-type绑定该executor所属的组件的类型,即该组件是spout还是bolt。executor-type函数请参见其定义部分         executor-type (executor-type worker-context component-id)  ;; 创建该executor的输出队列,executor的输出队列是disruptor队列         batch-transfer->worker (disruptor/disruptor-queue       ;; 输出队列名称                                   (str "executor"  executor-id "-send-queue")       ;; 输出队列的大小                                   (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)       ;; 单线程生产者向输出队列发送消息                                   :claim-strategy :single-threaded       ;; disruptor输出队列满时的等待策略                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))         ]     ;; recursive-map宏返回一个map作为该executor的元数据     (recursive-map      ;; 保存所属worker的元数据      :worker worker      ;; 保存worker-context      :worker-context worker-context      ;; 保存executor-id      :executor-id executor-id      ;; 保存运行在该executor上的任务id      :task-ids task-ids      ;; 保存该executor所属的组件的id      :component-id component-id      ;; 定义一个原子类型变量,保存spout的open方法或bolt的prepare方法是否执行      :open-or-prepare-was-called? (atom false)      ;; 保存该executor的配置信息      :storm-conf storm-conf      ;; 保存该executor的接收队列,也是一个disruptor队列      :receive-queue ((:executor-receive-queue-map worker) executor-id)      ;; 保存该executor所属topology的id      :storm-id (:storm-id worker)      ;; 保存所属worker的配置信息      :conf (:conf worker)      ;; 创建一个HashMap对象用于保存共享信息      :shared-executor-data (HashMap.)      ;; 保存该executor所属topology的活跃状态      :storm-active-atom (:storm-active-atom worker)      ;; 保存该executor的输出队列      :batch-transfer-queue batch-transfer->worker      ;; 保存executor的executor-transfer-fn函数,用于将消息发送给executor的输出队列      :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)      ;; 保存所属worker的关闭函数      :suicide-fn (:suicide-fn worker)      ;; 创建属于该executor的StormClusterState对象      :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))      ;; 保存该executor的类型      :type executor-type      ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)      ;; mk-executor-stats函数为该executor生成一个"统计实例",如果该executor属于某个spout,那么生成SpoutExecutorStats实例,如果该executor属于某个bolt,那么生成BoltExecutorStat     ;; s实例。关于统计模块的内容会在以后文章中详细分析      ;; sampling-rate函数从executor配置信息中获取消息采样频率。      :stats (mk-executor-stats <> (sampling-rate storm-conf))      :interval->task->metric-registry (HashMap.)      ;; 保存该executor所属的worker上任务id->组件名称键值对的map      :task->component (:task->component worker)      ;; 调用outbound-components函数生成stream_id->(接收组件id->分组函数的map)的map。这样storm就知道component-id发送的消息由哪些组件的哪些task接收。outbound-components函数请参      ;; 见其定义部分      :stream->component->grouper (outbound-components worker-context component-id)      ;; 调用throttled-report-error-fn函数生成报告错误函数,用于将错误信息写入zookeeper。throttled-report-error-fn函数请参见其定义部分      :report-error (throttled-report-error-fn <>)      ;; 保存一个匿名函数,该匿名函数调用了report-error函数和suicide-fn函数      :report-error-and-die (fn [error]                              ((:report-error <>) error)                              ((:suicide-fn <>)))      ;; 生成kryo反序列化器      :deserializer (KryoTupleDeserializer. storm-conf worker-context)      ;; 调用mk-stats-sampler函数,根据conf里面的采样频率创建一个采样器sampler。mk-stats-sampler函数请参见其定义部分      :sampler (mk-stats-sampler storm-conf)      ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?      ))) 

executor-id->tasks函数

;; executor-id->tasks函数通过分解executor_id生成task_id,首先顺序解构executor_id,executor_id形如[1 1]或[1 3] (defn executor-id->tasks [[first-task-id last-task-id]]   ;; range函数生成序列,last-task-id+1>序列范围>=first-task-id   (->> (range first-task-id (inc last-task-id))        ;; 返回int序列        (map int))) 

normalized-component-conf函数

;; normalized-component-conf函数主要功能就是获取组件的配置信息 (defn- normalized-component-conf [storm-conf general-context component-id]   ;; to-remove绑定删除组件特有配置项后,剩余的配置项。ALL-CONFIGS函数获取backtype.storm.Config类所有属性名   (let [to-remove (disj (set ALL-CONFIGS)    TOPOLOGY-DEBUG    TOPOLOGY-MAX-SPOUT-PENDING    TOPOLOGY-MAX-TASK-PARALLELISM    TOPOLOGY-TRANSACTIONAL-ID    TOPOLOGY-TICK-TUPLE-FREQ-SECS    TOPOLOGY-SLEEP-SPOUT-WAIT-STRATEGY-TIME-MS    TOPOLOGY-SPOUT-WAIT-STRATEGY    )  ;; spec-conf绑定指定component-id的特有配置信息         spec-conf (-> general-context         ;; 从backtype.storm.task.WorkerTopologyContext对象中获取指定component-id的ComponentCommon对象  (.getComponentCommon component-id)         ;; 从ComponentCommon对象中获取组件特有配置信息  .get_json_conf         ;; 将json字符串转换成map  from-json)]     ;; 首先从spec-conf中删除键包含在to-remove集合的键值对,即保留组件特有的配置信息;然后将spec-conf合并到storm-conf中。进而得到组件的全部配置信息     (merge storm-conf (apply dissoc spec-conf to-remove))     ))  

executor-type函数

;; executor-type函数的主要功能就是获取指定组件的类型,即该组件是spout还是bolt (defn executor-type [^WorkerTopologyContext context component-id]   ;; topology绑定StormTopology对象   (let [topology (.getRawTopology context)  ;; spouts绑定component_id->SpoutSpec的map  spouts (.get_spouts topology)  ;; bolts绑定component_id->Bolt的map  bolts (.get_bolts topology)]     ;; 如果spouts包含该component-id,则返回关键字:spout;如果bolts包含该component-id,则返回关键字:bolt       (cond (contains? spouts component-id) :spout    (contains? bolts component-id) :bolt    :else (throw-runtime "Could not find " component-id " in topology " topology))))  

mk-executor-transfer-fn函数

;; mk-executor-transfer-fn函数用于生成executor的transfer-fn函数,transfer-fn函数主要功能就是将executor生成的消息发送给executor特有的输出队列 ;; 为了避免component block(大量的tuple没有被及时处理),额外创建了overflow buffer,只有当这个overflow-buffer也满了,才停止nextTuple(对于spout executor比较需要overflow-buffer) (defn mk-executor-transfer-fn [batch-transfer->worker]   ;; this函数用于将[task, tuple]放到overflow-buffer或者batch-transfer->worker队列中   (fn this     ([task tuple block? ^List overflow-buffer]       ;; 如果overflow-buffer存在并且不为空,说明该executor的输出队列已经满了,那么直接把消息放到overflow-buffer中       (if (and overflow-buffer (not (.isEmpty overflow-buffer)))         (.add overflow-buffer [task tuple])  ;; 否则就把消息放到该executor的输出队列中         (try-cause           (disruptor/publish batch-transfer->worker [task tuple] block?)         (catch InsufficientCapacityException e    ;; 在消息放入输出队列的过程中抛出了异常,那么首先检查overflow-buffer是否存在,如果存在,则将出错的消息存入overflow-buffer           (if overflow-buffer             (.add overflow-buffer [task tuple])             (throw e))           ))))     ([task tuple overflow-buffer]       (this task tuple (nil? overflow-buffer) overflow-buffer))     ([task tuple]       (this task tuple nil)       ))) 

sampling-rate函数

;; sampling-rate函数获取消息采样频率,最终返回一个整数X=1/采样频率,即每隔X个消息采样一次 (defn sampling-rate   [conf]   (->> (conf TOPOLOGY-STATS-SAMPLE-RATE)        (/ 1)        int)) 

mk-executor-stats函数

;; mk-executor-stats函数是一个多重函数。根据executor-selector函数的返回结果决定具体调用哪个函数 (defmulti mk-executor-stats executor-selector)  ;; executor-selector函数返回executor所属组件的类型 (defn executor-selector [executor-data & _] (:type executor-data))  ;; 如果executor-selector函数返回关键字:spout,那么执行该函数 (defmethod mk-executor-stats :spout [_ rate]   (stats/mk-spout-stats rate))  ;; 如果executor-selector函数返回关键字:bolt,那么执行该函数 (defmethod mk-executor-stats :bolt [_ rate]   (stats/mk-bolt-stats rate)) 

outbound-components函数

(defn outbound-components   "Returns map of stream id to component id to grouper"   [^WorkerTopologyContext worker-context component-id]   ;; 调用context的getTargets方法,获取哪些组件接收了componentId输出的消息,getTargets方法返回一个stream_id->{receive_component_id->Grouping}的map。getTargets方法请参见其定义部分   (->> (.getTargets worker-context component-id)         ;; clojurify-structure函数比较简单就是将java的Map对象转换成clojure结构的map         clojurify-structure  ;; stream-id标识流id,component->grouping标识组件到分组的map。map函数的返回结果形如[stream-id-1 {component_1 fn_1 component_2 fn_2 component_n fn_n} ...... ]         (map (fn [[stream-id component->grouping]]                [stream-id          ;; outbound-groupings函数生成接收组件id->分组函数的map,分组函数确定由接收组件的那些task接收该消息。outbound-groupings函数请参见其定义部分                 (outbound-groupings                   worker-context                   component-id                   stream-id                   (.getComponentOutputFields worker-context component-id stream-id)                   component->grouping)]))          ;; 生成stream_id->(接收组件id->分组函数的map)的map          (into {})          (HashMap.)))  

getTargets方法

;; WorkerTopologyContext类继承GeneralTopologyContext类,getTargets方法是GeneralTopologyContext类实例方法,主要功能就是获取哪些组件接收了componentId输出的消息 ;; 返回值为一个stream_id->{receive_component_id->Grouping}的map,receive_component_id就是接收组件的id      public Map<String, Map<String, Grouping>> getTargets(String componentId) {   ;; 创建返回结果map,ret   Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();   ;; 获取该topology的所有组件ids,并遍历   for(String otherComponentId: getComponentIds()) {    ;; 通过组件id获取组件的ComponentCommon对象,然后再获取其输入信息inputs    Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs();    ;; 遍历输入信息,GlobalStreamId对象有两个成员属性,一个是流id,一个是发送该流的组件id    for(GlobalStreamId id: inputs.keySet()) {     ;; 如果输入流的组件id和componentId相等,那么说明该组件接收来自componentId的输出,则将其添加到ret中     if(id.get_componentId().equals(componentId)) {      Map<String, Grouping> curr = ret.get(id.get_streamId());      if(curr==null) curr = new HashMap<String, Grouping>();      curr.put(otherComponentId, inputs.get(id));      ret.put(id.get_streamId(), curr);     }    }   }   return ret;  }  

outbound-groupings函数

(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping]   ;; component->grouping绑定接收来自组件this-component-id消息的组件id->Grouping的map   (->> component->grouping        ;; 从component->grouping中过滤掉组件任务数为0的键值对。filter-key函数的第一个参数是一个匿名函数,这个匿名函数以组件id作为参数,如果匿名函数返回false,那么就将组件id所对应的键值对删除        (filter-key #(-> worker-context   ;; 调用WorkerTopologyContext的getComponentTasks方法获取获取指定组件的任务列表   (.getComponentTasks %)   ;; 统计指定组件任务总数   count   ;; 任务数大于0返回true,否则返回false   pos?))        ;; 为每个接收组件生成分组函数,map函数的返回结果形如([component_1 fn_1] [component_2 fn_2] ...... [component_n fn_n])        (map (fn [[component tgrouping]]     [component      ;; 为接收组件component生成分组函数,mk-grouper函数请参见其定义部分      (mk-grouper worker-context       this-component-id       stream-id       out-fields       tgrouping       (.getComponentTasks worker-context component)       )]))        ;; 生成接收组件id->分组函数的map        (into {})        (HashMap.)))  

mk-grouper函数

(defn- mk-grouper "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index." ;; stream-id标识component-id发送的流,out-fields标识component-id发送的stream-id的输出域,thrift-grouping标识接收组件所使用的分组Grouping,target-tasks标识接收组件的任务id列  ;; 表java.util.List [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks] ;; num-tasks标识接收组件任务总数 (let [num-tasks (count target-tasks)  ;; random标识Random对象       random (Random.)       ;; 对列表target-tasks进行排序并转换成clojure中的vector类型       target-tasks (vec (sort target-tasks))]   ;; 调用grouping-type函数获取分组类型,thrift-grouping是backtype.storm.generated.Grouping类的对象,grouping-type函数请参见其定义部分   (condp = (thrift/grouping-type thrift-grouping)     ;; 如果为:fields分组关键字     :fields       ;; 由于"field-grouping按域分组"和"global-grouping分组"共用分组类型关键字:fields,所以需要确定到底是哪种分组。Grouping类继承了org.apache.thrift.TUnion类,这样Grouping类        ;; 有属性Object value_和Grouping._Fields setField_,       ;; 当setField_值为_Fields.FIELDS时,value_值为List<String>。如果value_为空列表,那么就是"global-grouping分组";否则为"field-grouping按域分组"。global-grouping?函数就        ;; 是通过检查value_是否为空列表来确定分组的。       (if (thrift/global-grouping? thrift-grouping)         ;; 如果是"global-grouping分组",返回该匿名函数作为分组函数,通过这个分组函数,storm就知道"global-grouping分组"的消息该如何发送了         (fn [task-id tuple]           ;; It's possible for target to have multiple tasks if it reads multiple sources           ;; "global-grouping分组"会将消息发送给接收组件任务id最小的task,也就是说只有任务id最小的task会接收到消息           (first target-tasks))         ;; 如果是"field-grouping按域分组",调用field-grouping函数获取用于分组的域列表并生成backtype.storm.tuple.Fields对象         (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))]           ;; mk-fields-grouper用于生成"field-grouping按域分组"的分组函数。mk-fields-grouper函数请参见其定义部分           (mk-fields-grouper out-fields group-fields target-tasks)           ))     ;; 如果为:all分组关键字     :all       ;; 返回该分组函数。接收组件每个任务都会接收该消息       (fn [task-id tuple] target-tasks)     ;; 如果为:shuffle分组关键字     :shuffle       ;; 返回该分组函数。将消息随机发送给接收组件的一个任务。mk-shuffle-grouper函数请参见其定义部分       (mk-shuffle-grouper target-tasks)     ;; 如果为:local-or-shuffle分组关键字     :local-or-shuffle       ;; same-tasks绑定运行在该worker上的接收组件的任务id集合,也就是与当前消息发送组件component-id位于同一个worker的接收组件id的集合       (let [same-tasks (set/intersection                          ;; 将接收组件任务id列表转换成集合                          (set target-tasks)                          ;; 调用WorkerTopologyContext的getThisWorkerTasks方法获取运行在该worker上的任务id集合                          (set (.getThisWorkerTasks context)))]         (if-not (empty? same-tasks)           ;; 如果same-tasks不为空,则将same-tasks作为接收组件任务id集合,这样就会在same-tasks集合中随机选择接收组件任务,也就是只选择本地接收组件任务           (mk-shuffle-grouper (vec same-tasks))           ;; 否则same-tasks为空,那么"local-or-shuffle分组"与"shuffle分组"的分组函数完全相同,两个分组的效果相同           (mk-shuffle-grouper target-tasks)))     ;; 如果为:none分组关键字     :none       ;; 返回该分组函数。       (fn [task-id tuple]         ;; 随机生成一个int值,将该int值取模num-tasks获取接收组件任务索引         (let [i (mod (.nextInt random) num-tasks)]           ;; 获取接收任务组件id           (.get target-tasks i)           ))     ;; 如果为:custom-object分组关键字,即使用用户自定义的分组     :custom-object       ;; grouping绑定用户自定义的分组对象       (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]         ;; 调用mk-custom-grouper函数生成用户自定义分组的分组函数。mk-custom-grouper函数请参见其定义部分         (mk-custom-grouper grouping context component-id stream-id target-tasks))     ;; 如果为:custom-serialized分组关键字,即分组对象是序列化对象     :custom-serialized       ;; 获取分组对象序列化的方式,将分组对象反序列化。grouping绑定反序列化后的分组对象       (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]         ;; 调用mk-custom-grouper函数生成用户自定义分组的分组函数。mk-custom-grouper函数请参见其定义部分         (mk-custom-grouper grouping context component-id stream-id target-tasks))     ;; 如果为:direct分组关键字,直接返回:direct关键字     :direct       :direct     )))   

grouping-type函数

;; grouping-type函数用于获取分组类型   (defn grouping-type   ;; grouping参数的类型是backtype.storm.generated.Grouping,该类是thrift结构生成的java类。   [^Grouping grouping]   ;; grouping-constants是一个map,键为enum _Fields值,值为分组类型关键字。调用grouping的getSetField方法获取分组类型,分组类型是一个enum _Fields值,enum _Fields类是Grouping类的内部类。请参见其定义部分   (grouping-constants (.getSetField grouping))) 

grouping-constants

(def grouping-constants {Grouping$_Fields/FIELDS :fields  Grouping$_Fields/SHUFFLE :shuffle  Grouping$_Fields/ALL :all  Grouping$_Fields/NONE :none  Grouping$_Fields/CUSTOM_SERIALIZED :custom-serialized  Grouping$_Fields/CUSTOM_OBJECT :custom-object  Grouping$_Fields/DIRECT :direct  Grouping$_Fields/LOCAL_OR_SHUFFLE :local-or-shuffle})  

enum _Fields类

public enum _Fields implements org.apache.thrift.TFieldIdEnum {  FIELDS((short)1, "fields"),  SHUFFLE((short)2, "shuffle"),  ALL((short)3, "all"),  NONE((short)4, "none"),  DIRECT((short)5, "direct"),  CUSTOM_OBJECT((short)6, "custom_object"),  CUSTOM_SERIALIZED((short)7, "custom_serialized"),  LOCAL_OR_SHUFFLE((short)8, "local_or_shuffle");  private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();  static {   for (_Fields field : EnumSet.allOf(_Fields.class)) {    byName.put(field.getFieldName(), field);   }  }  /**   * Find the _Fields constant that matches fieldId, or null if its not found.   */  public static _Fields findByThriftId(int fieldId) {   switch (fieldId) {   case 1: // FIELDS    return FIELDS;   case 2: // SHUFFLE    return SHUFFLE;   case 3: // ALL    return ALL;   case 4: // NONE    return NONE;   case 5: // DIRECT    return DIRECT;   case 6: // CUSTOM_OBJECT    return CUSTOM_OBJECT;   case 7: // CUSTOM_SERIALIZED    return CUSTOM_SERIALIZED;   case 8: // LOCAL_OR_SHUFFLE    return LOCAL_OR_SHUFFLE;   default:    return null;   }  }  /**   * Find the _Fields constant that matches fieldId, throwing an exception   * if it is not found.   */  public static _Fields findByThriftIdOrThrow(int fieldId) {   _Fields fields = findByThriftId(fieldId);   if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");   return fields;  }  /**   * Find the _Fields constant that matches name, or null if its not found.   */  public static _Fields findByName(String name) {   return byName.get(name);  }  private final short _thriftId;  private final String _fieldName;  _Fields(short thriftId, String fieldName) {   _thriftId = thriftId;   _fieldName = fieldName;  }  public short getThriftFieldId() {   return _thriftId;  }  public String getFieldName() {   return _fieldName;  } }  

mk-fields-grouper函数

;; mk-fields-grouper函数用于生成"field-grouping按域分组"的分组函数 (defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks]   ;; num-tasks标识接收组件任务总数   (let [num-tasks (count target-tasks)   ;; task-getter根据List索引获取任务id   task-getter (fn [i] (.get target-tasks i))]  ;; 返回"field-grouping按域分组"的分组函数。task-id标识输出消息的任务id,values标识域的值列表  (fn [task-id ^List values]    ;; 调用out-fields的select方法获取分组域的值列表    (-> (.select out-fields group-fields values)     ;; 获取分组域值列表的hashcode     tuple/list-hash-code     ;; 用分组域值列表的hashcode mod 接收组件任务总数num-tasks,获取接收任务在target-tasks列表中的索引     (mod num-tasks)     ;; 根据任务索引从target-tasks列表中获取接收任务的id     task-getter))))   

mk-shuffle-grouper函数

;; mk-shuffle-grouper函数用于生成"shuffle-grouping随机分组"的分组函数           (defn- mk-shuffle-grouper [^List target-tasks]   ;; choices绑定打乱顺序后的接收组件任务id列表,形如:[(MutableInt. -1) choices rand],第一个元素为列表的当前索引,第二元素为打乱顺序后的任务id列表,第三个元素为打乱顺序时使用的Random。rotating-random-range函数请参见其定义部分   (let [choices (rotating-random-range target-tasks)]     ;; 返回分组函数     (fn [task-id tuple]       ;; 调用acquire-random-range-id函数从choices中获取一个任务id,acquire-random-range-id函数请参见其定义部分       (acquire-random-range-id choices))))    

rotating-random-range函数

;; rotating-random-range函数用于打乱集合choices的元素顺序 (defn rotating-random-range   [choices]   ;; rand绑定一个Random对象,用于shuffle方法   (let [rand (Random.)    ;; 复制一个新的ArrayList对象    choices (ArrayList. choices)]     ;; 调用Collections类的shuffle方法使用rand打乱choices中元素的顺序     (Collections/shuffle choices rand)     ;; 返回打乱后的choices     [(MutableInt. -1) choices rand]))    

acquire-random-range-id函数

;; acquire-random-range-id函数用于获取列表中指定索引位置的元素    (defn acquire-random-range-id   [[^MutableInt curr ^List state ^Random rand]]   ;; 当前索引curr自增加1,如果curr大于列表的size,那么将curr重置为0   (when (>= (.increment curr) (.size state))     (.set curr 0)     ;; 然后调用Collections类的shuffle方法使用rand打乱state中元素的顺序     (Collections/shuffle state rand))   ;; 返回列表state中curr位置上的元素   (.get state (.get curr)))  

mk-custom-grouper函数

;; 用户自定义的分组需要实现CustomStreamGrouping接口,CustomStreamGrouping接口有两个方法,分别为prepare方法和chooseTasks方法。prepare方法主要为chooseTasks方法执行做些准备工作 (defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks]   (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks)   (fn [task-id ^List values]     (.chooseTasks grouping task-id values)     ))   

throttled-report-error-fn函数

(defn throttled-report-error-fn [executor]   ;; storm-conf绑定executor的配置信息   (let [storm-conf (:storm-conf executor)      ;; error-interval-secs绑定错误报告间隔时间,单位是分钟    error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)    ;; max-per-interval绑定在一个时间间隔内允许一个任务最多报告错误次数    max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)    ;; interval-start-time标识起始时间    interval-start-time (atom (current-time-secs))    ;; interval-errors标识当前时间间隔内已经报告错误的次数    interval-errors (atom 0)    ]     ;; 返回报告错误函数     (fn [error]  ;; 记录错误日志  (log-error error)  ;; 如果当前时间 - 起始时间 > 错误报告间隔时间,说明进入了一个新的错误报告周期  (when (> (time-delta @interval-start-time)      error-interval-secs)    ;; 将错误总数置0    (reset! interval-errors 0)    ;; 重置interval-start-time为当前时间    (reset! interval-start-time (current-time-secs)))  ;; 将该周期的错误总数加1  (swap! interval-errors inc)  ;; 如果当前周期错误总数 > 每周期最大错误总数  (when (<= @interval-errors max-per-interval)    ;; 调用cluster.clj中的report-error函数将错误信息写入zookeeper。    (cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor)      (memoized-local-hostname) (.getThisWorkerPort (:worker-context executor)) error)    ))))   

report-error函数

;; report-error函数用于将组件异常信息写入zookeeper      report-error   [this storm-id component-id node port error]   ;; path绑定"/errors/{storm-id}/{component-id}"   (let [path (error-path storm-id component-id)    ;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口    data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}    ;; 创建/errors/{storm-id}/{component-id}节点    _ (mkdirs cluster-state path)    ;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息    _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))    ;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合    to-kill (->> (get-children cluster-state path false)       (sort-by parse-error-path)       reverse       (drop 10))]     ;; 删除to-kill中包含的节点            (doseq [k to-kill]  (delete-node cluster-state (str path "/" k))))    

mk-stats-sampler函数

;; mk-stats-sampler函数根据conf里面的采样频率创建一个采样器sampler (defn mk-stats-sampler   [conf]   ;; even-sampler绑定一个带有元数据{:rate freq}的采样函数。sampling-rate函数请参见其定义部分   (even-sampler (sampling-rate conf))) 

even-sampler函数

;; even-sampler函数返回一个带有元数据{:rate freq}的匿名函数作为采样函数 (defn even-sampler   ;; freq绑定采用频率的倒数,即最多每隔freq个消息采样一次   [freq]   ;; 将frep转换成int类型   (let [freq (int freq)         ;; start初始值为0         start (int 0)         ;; r标识一个Random对象         r (java.util.Random.)         ;; curr初始值为-1         curr (MutableInt. -1)         ;; target标识[0, freq)之间的一个int值         target (MutableInt. (.nextInt r freq))]     ;; 调用clojure的with-meta函数为匿名函数fn添加元数据{:rate freq},并返回该匿名函数作为采样函数。通过(:rate (meta sampler)),可以从sampler的meta里面取出rate值     (with-meta       ;; 该匿名函数作为采样函数,主要功能就是curr第一次从-1开始递增,之后从start开始递增,在于target相等之前,采样函数都返回false,只有当curr和target相等时才返回true,如果curr >= freq时,将curr重置为0,target重新绑定[0, freq)之间的一个int值       ;; 通过不停的调用采样函数我们就可以获取若干次false和一次true,当true时进行采样。       (fn []         ;; i = ++curr         (let [i (.increment curr)]    ;; 如果i >= freq,那么将curr重置为0,target重新绑定[0, freq)之间的一个int值           (when (>= i freq)             (.set curr start)             (.set target (.nextInt r freq))))  ;; 如果curr和target相等,那么返回true,否则返回false         (= (.get curr) (.get target)))       ;; 元数据       {:rate freq}))) 

start-batch-transfer->worker-handler!函数

;; start-batch-transfer->worker-handler!函数为每个executor生成"一个"专有的"发送线程"(即executor输出队列disruptor queue的"消息处理者")。发送线程不停的从executor的输出队列请求消息,然后以"批"为单位发送消息 ;; 对于"本地消息"(即接收该消息的executor与发送该消息的executor分布在同一个worker上)直接发送到接收executor的接收队列中。对于"远程消息",将消息发送到executor所属的worker的输出队列中 (defn start-batch-transfer->worker-handler! [worker executor-data]   ;; worker-transfer-fn绑定worker的传输函数    (let [worker-transfer-fn (:transfer-fn worker)         ;; cached-emit是一个ArrayList,用于缓存消息,缓存的消息数达到batch size时,将其发送出去         cached-emit (MutableObject. (ArrayList.))  ;; storm-conf绑定executor的元数据         storm-conf (:storm-conf executor-data)  ;; serializer绑定一个KryoTupleSerializer对象,serializer用于序列化消息         serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data))         ]     ;; 调用disruptor.clj中的consume-loop*函数,     (disruptor/consume-loop*       ;; executor的输出队列       (:batch-transfer-queue executor-data)       ;; 调用handler宏获取clojure-handler函数,通过调用clojure-handler函数获取disruptor queue的"消息处理者"。[o seq-id batch-end?]是消息"具体处理函数"的参数列表,handler宏请参见其定义部分       (disruptor/handler [o seq-id batch-end?]         ;; let格式中包含的内容作为消息"具体处理函数"的函数体         (let [^ArrayList alist (.getObject cached-emit)]    ;; "消息处理者"不停的从disruptor queue中请求消息,每获取一个消息就将其缓存到cached-emit中           (.add alist o)    ;; 如果cached-emit缓存的消息总数达到了batch size           (when batch-end?      ;; 那么就调用worker-transfer-fn函数将一批消息(消息的ArrayList对象)发送出去             (worker-transfer-fn serializer alist)      ;; 然后重置cached-emit             (.setObject cached-emit (ArrayList.))             )))       :kill-fn (:report-error-and-die executor-data)))) 

consume-loop*函数

(defnk consume-loop*   ;; handler标识queue的"消息处理者"   [^DisruptorQueue queue handler    :kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]   ;; async-loop函数接收一个"函数"或"函数工厂"作为参数生成一个java thread,这个java thread不断循环执行这个"函数"或"函数工厂"生产的函数。async-loop函数返回实现SmartThread协议的实例,通过该实例我们可以start、join、interrupt接收线程   (let [ret (async-loop    ;; 生成的java thread将不停的调用该匿名函数    (fn [] (consume-batch-when-available queue handler) 0)    :kill-fn kill-fn        ;; 指定生成的java thread的名称    :thread-name (.getName queue))]      (consumer-started! queue) ret))  

handler宏

;; handler宏用于将消息的"具体处理函数"传递给clojure-handler函数,并返回clojure-handler函数 (defmacro handler   [& args]   `(clojure-handler (fn ~@args))) 

clojure-handler函数

;; clojure-handler函数用于生成一个disruptor queue的"消息处理者"(也可以看成disruptor queue的消费者)。消息处理者不停的从disruptor queue中请求消息,每获取一个消息将会触发执行onEvent函数。消息的具体处理逻辑是在afn函数中实现的 (defn clojure-handler   [afn]   ;; 返回一个实现com.lmax.disruptor.EventHandler接口的实例作为disruptor queue的"消息处理者"。消息的具体处理逻辑是在afn函数中实现的   (reify com.lmax.disruptor.EventHandler     (onEvent       [this o seq-id batchEnd?]       ;; 调用afn函数处理消息       (afn o seq-id batchEnd?)))) 

以上就是worker启动executor源码分析全部内容。

正文到此结束
Loading...