单机部署
部署条件:
- JDK 1.8.0_322
- Maven Apache Maven 3.5.4
下载源码
打开 RocketMQ release_notes 页面,我们可以看到 RocketMQ 所有的发布版本。这里,我们选择最新的 RocketMQ 4.6.0 版本。点击进入该版本的发布页面后,我们可以看到两种发布版本:
一般情况下,我们可以直接使用 Binary 版本,它是 RocketMQ 已经编译好,可以直接使用的 RocketMQ 软件包。
这里,我们想带着胖友们编译一次 RocketMQ 源码,所以使用 Source 版本。下面,我们开始下载 RocketMQ 4.6.0 Source 源码。命令行操作如下:
1 | 下载 |
编译源码
使用 Maven 编译 RocketMQ 源码。命令行操作如下:
1 | 进入 RocketMQ 源码目录 |
编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:
1 | 进入 distribution 目录下 |
启动Namesrv
启动一个 RocketMQ Namesrv 服务。命令行操作如下:
1 | nohup sh bin/mqnamesrv & |
启动完成后,查看日志。
1 | 查看 Namesrv 日志。 |
- 默认情况下,Namesrv 日志文件所在地址为
~/logs/rocketmqlogs/namesrv.log
。如果想要自定义,可以通过conf/logback_namesrv.xml
配置文件来进行修改。
本人是Mac m1 芯片可能会出现
1 | 启动RocketMQ报错:Please set the JAVA_HOME variable in your environment, We need java 64 |
解决方法:
https://blog.csdn.net/suiyu_eran/article/details/104432164
启动 Broker
在 conf
目录下,RocketMQ 提供了多种 Broker 的配置文件:
broker.conf
:单主,异步刷盘。2m/
:双主,异步刷盘。2m-2s-async/
:两主两从,异步复制,异步刷盘。2m-2s-sync/
:两主两从,同步复制,异步刷盘。dledger/
:Dledger 集群,至少三节点。
这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf
配置文件。命令行操作如下:
1 | nohup sh bin/mqbroker -c conf/broker.conf -n 127.0.0.1:9876 & |
通过
-c
参数,配置读取的主 Broker 配置。通过
-n
参数,设置 RocketMQ Namesrv 地址。如果胖友的服务器的存相对小,可以修改下
bin/runbroker.sh
脚本,将 Broker JVM 内存调小。如下:1
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"
启动完成后,查看日志。
1 | tail -f ~/logs/rocketmqlogs/broker.log |
- 默认情况下,Broker 日志文件所在地址为
~/logs/rocketmqlogs/broker.log
。如果想要自定义,可以通过conf/logback_broker.xml
配置文件来进行修改。
测试发送消息
通过使用 bin/tools.sh
工具类,实现测试发送消息。命令行操作如下:
1 | //查看broker配置 |
如果发送成功,我们会看到大量成功的发送日志。
1 | SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61] |
- 通过发送结果为
sendStatus=SEND_OK
状态,说明消息都发送成功了。
1 | 发送的启动地址 |
测试消费消息
通过使用 bin/tools.sh
工具类,实现测试消费消息。命令行操作如下:
1 | 设置 Namesrv 服务器的地址 |
如果消费成功,我们会看到大量成功的消费日志。
1 | ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 54], transactionId='null'}]] |
- 通过
ConsumeMessageThread_4
和ConsumeMessageThread_3
线程名,我们可以看出,目前是进行并发消费消息。
简单示例
Producer
1 |
|
初始化一个 Producer 生产者。
- 1处,创建 DefaultMQProducer 对象,这里设置的生产者分组是
"please_rename_unique_group_name"
。 - 2处,设置 设置
producer
的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。 - 3处,启动
producer
生产者。
使用 Producer 发送 1000 条消息。
4处,创建 Message 消息。这里设置了其 Topic 为
"TopicTest"
,Tag 为TagA
、消息体 Body 为"Hello RocketMQ"
的二进制数组。5 处,调用生产者的
#send(Message msg)
方法,同步发送消息,等待发送结果。RocketMQ Producer 一共有三种发送消息的方式,除了我们这里看到的同步发送消息之外,还有异步发送消息(可见 AsyncProducer 示例),和 Oneway 发送消息。6处,打印发送结果。
7处,关闭
producer
生产者。
Consumer
1 | public class Consumer { |
1 处,创建 DefaultMQPushConsumer 对象,这里设置的消费者分组是
"please_rename_unique_group_name"
。注意,消费者分组的概念:FROM 概念(Concept)
同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic 。
RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
- 在集群消费下,同一条消息只会被相同消费者分组的一个消费者所消费。
- 在广播消费下,同一条消息会被相同消费者分组的所有消费者所消费。
- 在当前示例里,我们采用的是 DefaultMQPushConsumer 的默认消费方式,集群消费。
2处,设置
consumer
的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。3处,设置一个新的消费集群,初始的消费进度。目前有三个选项:
CONSUME_FROM_FIRST_OFFSET
:每个 Topic 队列的第一条消息。CONSUME_FROM_LAST_OFFSET
:每个 Topic 队列的最后一条消息。CONSUME_FROM_TIMESTAMP
:每个 Topic 队列的指定时间开始的消息。- 注意,只针对新的消费集群。如果一个集群每个 Topic 已经有消费进度,则继续使用该消费进度。仔细理解一下哈~
4处,设置订阅
"TopicTest"
主题的消息。有一定一定要注意!!!消费者组的消费者实例必须订阅完全相同的 Topic + Tag 。5处,添加消息监听器。这里我们采用的是 MessageListenerConcurrently 并发消费消息的监听器。如果胖友需要实现顺序消费消息,需要使用 MessageListenerOrderly 顺序消费的监听器。
6处,启动
consumer
消费者。此时,Consumer 就开始正式的消费消息啦。。