转载

『互联网架构』软件架构-rocketmq之实践(62)

上次完成rocketMq的安装,并测试了发送和接收,这次简单的利用rocketMq的源码连接rocketMq集群。

源码:https://github.com/limingios/netFuture/tree/master/jms

『互联网架构』软件架构-rocketmq之实践(62)

(一)broker的properties配置文件

  • 启动2m-2s-sync的rocket集群
    『互联网架构』软件架构-rocketmq之实践(62)
  • 配置文件

    上节直接用了我提供的properties文件,并没有详细解释每个字段的含义这次详细说下。

『互联网架构』软件架构-rocketmq之实践(62)

『互联网架构』软件架构-rocketmq之实践(62)

  • broker参数
参数名 默认值 描述
listenPort 10911 broker的服务端口号,作为对producer和consumer使用服务的端口号
namesrvAddr null namesrv的ip地址。格式: ip:port;ip:port
brokerIP1 本机IP broker所在的机器ip,默认不用设置,如果机器有多个网卡,需要手动设置
brokerName 本机主机名 作用为一组master与slave通过brokerName是否相同来标示,通过brokerId来区分master还是slave brokerClusterName DefaultCluster 整个broker集群的名字,创建topic时需要指定。
brokerId 0 0:master 非0:slave
storePathCommitLog $HOME/store/commitlog/ commitLog存储路径
storePathConsumerQueue $HOME/store/consumequeue/ 消费队列存储路径
mapedFileSizeCommitLog 1024 * 1024 * 1024(1G) commitLog每个文件的大小,默认1G
deleteWhen 4 删除文件时间点,默认凌晨 4点
fileReservedTime 72 文件保留时间,默认72小时.
brokerRole ASYNC_MASTER Broker 的角色ASYNC_MASTER 异步复制Master SYNC_MASTER 同步双写Master SLAVE
flushDiskType ASYNC_FLUSH 刷盘方式 ASYNC_FLUSH 异步刷盘 SYNC_FLUSH 同步刷盘
defaultTopicQueueNums 4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数。
autoCreateTopicEnable true 是否自动创建topic。
autoCreateSubscriptionGroup true 是否允许Broker自动创建订阅组,建议线下开启,线上关闭
rejectTransactionMessage false 是否拒绝事务消息接入
fetchNamesrvAddrByAddressServer false 是否从web服务器获取Name Server地址,针对大规模的Broker集群建议使用这种方式
storePathIndex $HOME/store/index 消息索引存储路径
storeCheckpoint $HOME/store/checkpoint checkpoint文件存储路径
abortFile $HOME/store/abort abort文件存储路径
maxTransferBytesOnMessageInMemory 262144 单次Pull消息(内存)传输的最大字节数
maxTransferCountOnMessageInMemory 32 单次Pull消息(内存)传输的最大条数
maxTransferBytesOnMessageInDisk 65536 单次Pull消息(磁盘)传输的最大字节数
maxTransferCountOnMessageInDisk 8 单次Pull消息(磁盘)传输的最大条数
messageIndexEnable true 是否开启消息索引功能
messageIndexSafe false 是否提供安全的消息索引机制,索引保证不丢
haMasterAddress 在Slave上直接设置Master地址,默认从Name Server上自动获取,也可以手工强制配置
cleanFileForciblyEnable true 磁盘满、且无过期文件情况下 TRUE 表示强制删除文件,优先保证服务可用 FALSE 标记服务不可用,文件不删除
  • Consumer
参数名 默认值 描述
namesrvAddr Name Server地址列表,多个NameServer地址用分号隔开
clientIP 本机IP 客户端本机IP地址,某些机器会发生无法识别客户端IP地址情况,需要应用在代码中强制指定
instanceName DEFAULT 客户端实例名称,客户端创建的多个Producer、Consumer实际是共用一个内部实例(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads 4 通信层异步回调线程数
pollNameServerInteval 30000 轮询Name Server间隔时间,单位毫秒
heartbeatBrokerInterval 30000 向Broker发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval 5000 持久化Consumer消费进度间隔时间,单位毫秒
  • Producer参数

    >Producer配置

参数名 默认值 描述
producerGroup DEFAULT_PRODUCER Producer组名,多个Producer如果属于一个应用,发送同样的消息,则应该将它们归为同一组
createTopicKey TBW102 在发送消息时,自动创建服务器不存在的topic,需要指定Key。
defaultTopicQueueNums 4 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
sendMsgTimeout 10000 发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回sendResult,但是sendStatus!=SEND_OK,是否重试发送
maxMessageSize 131072 客户端限制的消息大小,超过报错,同时服务端也会限制
transactionCheckListener 事务消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize 1 Broker回查Producer事务状态时,线程池大小
checkThreadPoolMaxSize Broker回查Producer事务状态时,线程池大小
checkRequestHoldMax 2000 Broker回查Producer事务状态时,Producer本地缓冲请求队列大小

Push Consumer配置

参数名 默认值 描述
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
messageModel CLUSTERING 消息模型,支持以下两种1、集群消费(CLSUTER)2、广播消费(BROADCASTING)
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从什么位置开始消费1、CONSUME_FROM_LAST_OFFSET:默认策略,从该队列最尾开始消费,即跳过历史消息2、CONSUME_FROM_FIRST_OFFSET:从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍3、CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
subscription {} 订阅关系
messageListener 消息监听器
offsetStore 消费进度存储
consumeThreadMin 10 消费线程池数量
consumeThreadMax 20 消费线程池数量
consumeConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度
pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数
pullInterval 0 拉消息间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒
consumeMessageBatchMaxSize 1 批量消费,一次消费多少条消息
pullBatchSize 32 批量拉消息,一次最多拉多少条

Pull Consumer配置

参数名 默认值 描述
consumerGroup DEFAULT_CONSUMER Consumer组名,多个Consumer如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis 20000 长轮询,Consumer拉消息请求在Broker挂起最长时间,单位毫秒
consumerTimeoutMillisWhenSuspend 30000 长轮询,Consumer拉消息请求在Broker挂起超过指定时间,客户端认为超时,单位毫秒
consumerPullTimeoutMillis 10000 非长轮询,拉消息超时时间,单位毫秒
messageModel BROADCASTING 消息模型,支持以下两种1、集群消费2、广播消费
messageQueueListener 监听队列变化
offsetStore 消费进度存储
registerTopics [] 注册的topic集合
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance算法实现策略
  • Meesage数据结构

    >Message数据结构各个字段都可以通过get、set方式访问,例如访问topic:

    msg.getTopic();

    msg.setTopic(“test”);

字段名 默认值 必填 说明
Topic null true 线下环境不需要申请,线上环境需要申请后才能使用
Body null true 二进制形式,序列化由应用决定,Producer与Consumer要协商好序列化形式。
Tags null false 类似于Gmail为每封邮件设置的标签,方便服务器过滤使用。目前只支持每个消息设置一个tag,所以也可以类比为Notify的MessageType概念。
Keys null false 代表这条消息的业务关键词,服务器会根据keys创建哈希索引,设置后,可以再Console系统根据Topic、Keys来查询消息,由于是哈希索引,请尽可能保证key唯一,例如订单号,商品ID等。
Flag 0 false 完全由应用来设置,RocketMQ不做敢于。
DelayTimeLevel 0 false 消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费。
WaitStoreMsgOK TRUE false 表示消息是否在服务器罗盘后才返回应答。

(二)源码测试

  • 连接集群

    > 修改这2个文件:Producer 和 Consumer

『互联网架构』软件架构-rocketmq之实践(62)

 Producer 

『互联网架构』软件架构-rocketmq之实践(62)

『互联网架构』软件架构-rocketmq之实践(62)

 Consumer 

『互联网架构』软件架构-rocketmq之实践(62)

『互联网架构』软件架构-rocketmq之实践(62)

  • 部署rocketmq-console

    >GitHub地址:https://github.com/apache/rocketmq-externals

『互联网架构』软件架构-rocketmq之实践(62)

『互联网架构』软件架构-rocketmq之实践(62)

『互联网架构』软件架构-rocketmq之实践(62)

添加192.168.89.100:9876;192.168.89.101:9876

『互联网架构』软件架构-rocketmq之实践(62)

『互联网架构』软件架构-rocketmq之实践(62)

(三)流程梳理

『互联网架构』软件架构-rocketmq之实践(62)

生产者流程

1. 生产者首先需要设置namesrv,或者指定其他方式更新namesrv。

2. 从namesrv获取topic的路由信息,路由信息包括broker以及Message Queue等信息,同时将路由信息保存在本地内存中,方便下次使用。

3. 从Message Queue列表中选择合适的Queue发送消息,实现负载均衡。

消费者流程

1. namesrv告诉消费者,他从broker中获取消息。

2. 获取完之后开始消费。

  • RocketMq高可用

    >特点:master挂了之后角色不会做切换(slave不会成为master)(商用版本的这种情况不知) 然后master和slave需要制定。

| | 发送消息 | 存储消息 | 接受消息 |

| :——: | :——–: | :——–: | :——–: |

|停掉一个namesrv |不受影响 |不受影响 |不受影响|

|停全部的namesrv| 影响 |不受影响| 影响|

|停单个master broker |不受影响 |受影响(很小) |不影响|

|停全部master broker |影响 |影响 |影响|

|停全部salve broker |不影响| 不影响 |不影响|

恢复任意master broker| 不受影响 |受影响(很小)| 受影响(很小)|

  • Rocketmq文档参考

    > 源码:jm下有文档

『互联网架构』软件架构-rocketmq之实践(62)

PS:说了rocketmq的概念的东西,下次重点说说rocketMq在双11是如何做到的抗压,我听过一次公开课,稍后总结下,分享给各位老铁。

>>原创文章,欢迎转载。转载请注明:转载自,谢谢!>>原文链接地址:上一篇:

已是最新文章

原文  https://idig8.com/2019/05/26/hulianwangjiagouruanjianjiagou-rocketmqzhishijian62/
正文到此结束
Loading...