RocketMQ: 客户端使用指南
一、代码中指定 Name Server 地址
producer.setNamesrvAddr(“192.168.0.1:9876;192.168.0.2:9876”);
或
consumer.setNamesrvAddr(“192.168.0.1:9876;192.168.0.2:9876”);
二、Java 启劢参数中指定 Name Server 地址
Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
三、环境发量挃定 Name Server 地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
四、HTTP 静态服务器寻址(默认)
客户端启动后,会定时访问一个静态 HTTP 服务器,地址如下:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
这个 URL 的返回内容如下:192.168.0.1:9876;192.168.0.2:9876
客户端默认每隔 2 分钟访问一次返个 HTTP 服务器,并更新本地的 Name Server 地址
URL 已经在代码中写死,可通过修改/etc/hosts 文件来改发要访问的服务器
例如在/etc/hosts 增加如下配置
10.232.22.67 jmenv.taobao.net
推荐使用 HTTP 静态服务器寻址方式,好处是客户端部署简单,且 Name Server 集群可以热升级
自定义客户端行为
1 ) 客户端 API 形式
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer 都继承于 ClientConfig 类,ClientConfig 为客户端的公共配置类
客户端的配置都是 get、set 形式,每个参数都可以用 spring 来配置,也可以在代码中配置,例如 namesrvAddr 这个参数可以这样配置,其他参数同理
producer.setNamesrvAddr(“192.168.0.1:9876”);
2 )客户端的公共配置
参数名 默认值 说明
namesrvAddr Name Server 地址列表,多个 NameServer 地址用分号隔开
clientIP 本机 IP 客户端本机 IP 地址,某些机器会发生无法识别客户端,
IP 地址情况,需要应用在代码中强制指定
instanceName DEFAULT 客户端实例名称,客户端创建的多个 Producer、Consumer 实际是共用一个内部实例
(这个实例包含网络连接、线程资源等)
clientCallbackExecutorThreads 4 通信层异步回调线程数
pollNameServerInteval 30000 轮询 Name Server 间隔时间,单位毫秒
heartbeatBrokerInterval 30000 向 Broker 发送心跳间隔时间,单位毫秒
persistConsumerOffsetInterval 5000 持久化 Consumer 消费进度间隔时间,单位毫秒
3 )Producer 配置
参数名 默认值 说明
producerGroup DEFAULT_PRODUCER Producer 组名,多个 Producer 如果属于一个应用,发送同样的消息,则应该将它们归为同一组
createTopicKey TBW102
导管 $1 在发送消息时,自动创建服务器不存在的 topic,需要指定 Key
defaultTopicQueueNums 4 在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
sendMsgTimeout 10000 发送消息超时时间,单位毫秒
compressMsgBodyOverHowmuch 4096 消息 Body 超过多大开始压缩(Consumer 收到消息会自动解压缩),单位字节
retryAnotherBrokerWhenNotStoreOK FALSE 如果发送消息返回 sendResult,但是sendStatus!=SEND_OK,是否重试发送
maxMessageSize 131072 客户端限制的消息大小,超过报错,同时服务端也会限制
transactionCheckListener 事务消息回查监听器,如果发送事务消息,必须设置
checkThreadPoolMinSize 1 Broker 回查 Producer 事务状态时,线程池大小
checkThreadPoolMaxSize 1 Broker 回查 Producer 事务状态时,线程池大小
checkRequestHoldMax 2000 Broker 回查 Producer 事务状态时,Producer 本地缓冲请求队列大小
4 ) PushConsumer 配置
参数名 默认值 说明
consumerGroup DEFAULT_CONSUMER Consumer 组名,多个 Consumer 如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
messageModel CLUSTERING 消息模型,支持以下两种
1、集群消费
2、广播消费
consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer 启动后,默认从什么位置开始消费
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance 算法实现策略
subscription {} 订阅关系
messageListener 消息监听器
offsetStore 消费进度存储
consumeThreadMin 10 消费线程池数量
consumeThreadMax 20 消费线程池数量
consumeConcurrentlyMaxSpan 2000 单队列并行消费允许的最大跨度
pullThresholdForQueue 1000 拉消息本地队列缓存消息最大数
pullInterval 0 拉消息间隔,由于是长轮询,所以为 0,但是如果应用为了流控,也可以设置大于 0 的值,单位毫秒
consumeMessageBatchMaxSize 1 批量消费,一次消费多少条消息
pullBatchSize 32 批量拉消息,一次最多拉多少条
5 ) PullConsumer 配置
参数名 默认值 说明
consumerGroup DEFAULT_CONSUMER Consumer 组名,多个Consumer 如果属于一个应用,订阅同样的消息,且消费逻辑一致,则应该将它们归为同一组
brokerSuspendMaxTimeMillis 20000 长轮询,Consumer 拉消息请求在 Broker 挂起最长时间,单位毫秒
consumerTimeoutMillisWhenSuspend 30000 长轮询,Consumer 拉消息请求在 Broker 挂起超过指定时间,客户端认为超时,单位毫秒
consumerPullTimeoutMillis 10000 非长轮询,拉消息超时时间,单位毫秒
messageModel BROADCASTING 消息模型,支持以下两种
1、集群消费
2、广播消费
messageQueueListener 监听队列变化
offsetStore 消费进度存储
registerTopics [] 注册的 topic 集合
allocateMessageQueueStrategy AllocateMessageQueueAveragely Rebalance 算法实现策略
Message 数据结构
1 ) 针对 Producer
字段名 默认值 说明
Topic null 必填,线下环境不需要申请,线上环境需要申请后才能使用
Body null 必填,二进制形式,序列化由应用决定,Producer 与 Consumer 要协商好序列化形式
Tags null 选填,类似于 Gmail 为每封邮件设置的标签,方便服务器过滤使用。
目前只支持每个消息设置一个 tag,所以也可以类比为 Notify 的 MessageType 概念
Keys null 选填,代表这条消息的业务关键词,服务器会根据 keys 创建哈希索引,设置后,
可以在 Console 系统根据 Topic、Keys 来查询消息,由于是哈希索引,请尽可能保证 key 唯一,例如订单号,商品 Id 等
Flag 0 选填,完全由应用来设置,RocketMQ 不做干预
DelayTimeLevel 0 选填,消息延时级别,0 表示不延时,大于 0 会延时特定的时间才会被消费
WaitStoreMsgOK TRUE 选填,表示消息是否在服务器落盘后才返回应答
Message 数据结构各个字段都可以通过 get、set 方式访问,例如访问 topic
msg.getTopic();
msg.setTopic(“TopicTest”);
其他字段访问方式类似
2 ) 针对 Consumer
在Producer端,使用 com.alibaba.rocketmq.common.message.Message 则个数据结构
由于 Broker 会为Message 增加数据结构,所以消息到达 Consumer 后
会在 Message 基础之上增加多个字段,Consumer 看到的是
com.alibaba.rocketmq.common.message.MessageExt 返个数据结构
MessageExt 继承于 Message,MessageExt
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
7. 本站有不少源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别!
66源码网 » RocketMQ: 客户端使用指南
