RocketMQ:消息发送与消费使用方式
1. 开发环境
(1)引入RocketMQ客户端依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
(2)生产和消费步骤分析
- 消息生产者
1.创建消息生产者producer,并指定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
- 消息消费者
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer
2. 基本样例
2.2 消息发送
2.2.1 发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 1.创建消息生产者producer,并指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer(“base-sync-producer”);
// 2.指定NameServer地址
producer.setNamesrvAddr(“192.168.1.17:9876”);
// 3.启动producer
producer.start();
// 4.创建消息对象,指定topic、tag和消息体
Message message = new Message(“TestTopic”, “TagA”, “Hello World!”.getBytes());
// 5.发送消息
SendResult sendResult = producer.send(message);
System.out.printf(“%s%n”, sendResult);
// 6.关闭生产者
producer.shutdown();
}
}
2.2.2 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 1.实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(“base-async-producer”);
// 2.设置NameServer的地址
producer.setNamesrvAddr(“192.168.1.17:9876”);
// 3.启动Producer实例
producer.start();
// 异步发送失败时,重试次数。默认为2
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10; i++) {
final int index = i;
// 4.创建消息,并指定Topic,Tag和消息体
Message msg = new Message(“TestTopic”,
“TagA”,
“OrderID188”,
“Hello World!”.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 5.SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf(“%-10d OK %s %n”, index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf(“%-10d Exception %s %n”, index, e);
e.printStackTrace();
}
});
}
// 休眠一分钟,否则当producer关闭时,无法接收mq的异步回调结果
TimeUnit.MINUTES.sleep(1);
// 6.如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
2.2.3 发送单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 1.实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(“base-oneway-producer”);
// 2.设置NameServer的地址
producer.setNamesrvAddr(“192.168.1.17:9876”);
// 3.启动Producer实例
producer.start();
for (int i = 0; i < 5; i++) {
// 4.创建消息,并指定Topic,Tag和消息体
Message msg = new Message(“TestTopic”,
“TagA”,
(“Hello RocketMQ ” + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 5.发送单向消息,没有任何返回结果
producer.sendOneway(msg);
}
// 6.如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
7. 本站有不少源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别!
66源码网 » RocketMQ:消息发送与消费使用方式
