单机部署

部署条件:

  • JDK 1.8.0_322
  • Maven Apache Maven 3.5.4

下载源码

打开 RocketMQ release_notes 页面,我们可以看到 RocketMQ 所有的发布版本。这里,我们选择最新的 RocketMQ 4.6.0 版本。点击进入该版本的发布页面后,我们可以看到两种发布版本:

一般情况下,我们可以直接使用 Binary 版本,它是 RocketMQ 已经编译好,可以直接使用的 RocketMQ 软件包。

这里,我们想带着胖友们编译一次 RocketMQ 源码,所以使用 Source 版本。下面,我们开始下载 RocketMQ 4.6.0 Source 源码。命令行操作如下:

1
2
3
4
5
# 下载
$ wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.9.3/rocketmq-all-4.9.3-source-release.zip

# 解压
$ unzip rocketmq-all-4.9.3-source-release.zip

编译源码

使用 Maven 编译 RocketMQ 源码。命令行操作如下:

1
2
3
4
5
# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.9.3-source-release

# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:

1
2
3
4
5
6
7
8
9
10
11
12
# 进入 distribution 目录下
$ cd distribution/target/rocketmq-4.9.3/rocketmq-4.9.3

# 打印目录
$ ls
40 -rwxr-xr-x 1 yunai staff 17336 Nov 19 20:59 LICENSE
8 -rwxr-xr-x 1 yunai staff 1338 Nov 19 20:59 NOTICE
16 -rwxr-xr-x 1 yunai staff 4225 Nov 19 20:59 README.md
0 drwxr-xr-x 6 yunai staff 192 Dec 3 12:48 benchmark # 性能基准测试
0 drwxr-xr-x 30 yunai staff 960 Nov 19 20:59 bin # 执行脚本
0 drwxr-xr-x 12 yunai staff 384 Nov 19 20:59 conf # 配置文件
0 drwxr-xr-x 36 yunai staff 1152 Dec 3 12:48 lib # RocketMQ jar 包

启动Namesrv

启动一个 RocketMQ Namesrv 服务。命令行操作如下:

1
nohup sh bin/mqnamesrv &

启动完成后,查看日志。

1
2
3
4
# 查看 Namesrv 日志。
$ tail -f ~/logs/rocketmqlogs/namesrv.log

2019-12-03 12:58:04 INFO main - The Name Server boot success. serializeType=JSON
  • 默认情况下,Namesrv 日志文件所在地址为 ~/logs/rocketmqlogs/namesrv.log 。如果想要自定义,可以通过 conf/logback_namesrv.xml 配置文件来进行修改。

本人是Mac m1 芯片可能会出现

1
启动RocketMQ报错:Please set the JAVA_HOME variable in your environment, We need java 64

解决方法:

https://blog.csdn.net/suiyu_eran/article/details/104432164

启动 Broker

conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/Dledger 集群,至少三节点。

这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf 配置文件。命令行操作如下:

1
nohup sh bin/mqbroker -c conf/broker.conf  -n 127.0.0.1:9876 &
  • 通过 -c 参数,配置读取的主 Broker 配置。

  • 通过 -n 参数,设置 RocketMQ Namesrv 地址。

  • 如果胖友的服务器的存相对小,可以修改下 bin/runbroker.sh 脚本,将 Broker JVM 内存调小。如下:

    1
    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g"

启动完成后,查看日志。

1
2
3
tail -f ~/logs/rocketmqlogs/broker.log

2019-12-03 14:27:07 INFO main - The broker[broker-a, 192.168.3.44:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
  • 默认情况下,Broker 日志文件所在地址为 ~/logs/rocketmqlogs/broker.log 。如果想要自定义,可以通过 conf/logback_broker.xml 配置文件来进行修改。

测试发送消息

通过使用 bin/tools.sh 工具类,实现测试发送消息。命令行操作如下:

1
2
3
4
5
6
7
8
9
10
11
//查看broker配置
sh ./bin/mqbroker -m

//关闭broker
sh bin/mqshutdown broker

//将本机远程ip写入配置文件中
echo 'brokerIP1=111.231.XX.XX' > conf/broker.properties

//重新启动broker 最好用这个命令
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &

如果发送成功,我们会看到大量成功的发送日志。

1
2
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F200F5, offsetMsgId=C0A8032C00002A9F000000000000D8D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=61]
  • 通过发送结果为 sendStatus=SEND_OK 状态,说明消息都发送成功了。
1
2
# 发送的启动地址
/Users/leslie/rocketmq-all-4.9.3/distribution/target/rocketmq-4.9.3/rocketmq-4.9.3

测试消费消息

通过使用 bin/tools.sh 工具类,实现测试消费消息。命令行操作如下:

1
2
3
4
5
# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876

# 执行消费者 Consumer 消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

如果消费成功,我们会看到大量成功的消费日志。

1
2
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 54], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=130, sysFlag=0, bornTimestamp=1575354513729, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513729, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001CE70, commitLogOffset=118384, bodyCRC=1530218044, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867103, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E941020A, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 50], transactionId='null'}]]
  • 通过 ConsumeMessageThread_4ConsumeMessageThread_3 线程名,我们可以看出,目前是进行并发消费消息。

简单示例

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException{
// 1. 创建 DefaultMQProduer 对象
DefaultMQProducer producer = new DefaultMQProducer("Please_rename_unique_group_name");
// 2. 设置RocketMQ Namesrv的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 3. 启动producer 生产者
producer.start();

for(int i = 0; i < 1000; ++i){
try{
// 4. 创建 Message消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 5. 同步message发送消息
SendResult sendResult = producer.send(msg);
// 6. 打印发送的结果
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 7. 关闭 producer 生产者
producer.shutdown();
}
}

初始化一个 Producer 生产者。

  • 1处,创建 DefaultMQProducer 对象,这里设置的生产者分组是 "please_rename_unique_group_name"
  • 2处,设置 设置 producer 的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。
  • 3处,启动 producer 生产者。

使用 Producer 发送 1000 条消息。

  • 4处,创建 Message 消息。这里设置了其 Topic 为 "TopicTest",Tag 为 TagA、消息体 Body 为 "Hello RocketMQ" 的二进制数组。

  • 5 处,调用生产者的 #send(Message msg) 方法,同步发送消息,等待发送结果。RocketMQ Producer 一共有三种发送消息的方式,除了我们这里看到的同步发送消息之外,还有异步发送消息(可见 AsyncProducer 示例),和 Oneway 发送消息。

  • 6处,打印发送结果。

  • 7处,关闭 producer 生产者。

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 1. 创建DefaultMQPushConsumer 对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Please_rename_unique_group_name");
// 2. 设置RocketMQ Namesrv 地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 3. 设置消费进度, 从Topic最初位置开始
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4. 订阅TopicTest主题
consumer.subscribe("TopicTest", "*");

// 5. 添加消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//6. 启动producer 消费者
consumer.start();

// 7. 打印 Consumer 启动完成
System.out.println("Consumer Started.%n");
}
}
  • 1 处,创建 DefaultMQPushConsumer 对象,这里设置的消费者分组是 "please_rename_unique_group_name" 。注意,消费者分组的概念:

    FROM 概念(Concept)

    同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。

    要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic 。

    RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

    • 在集群消费下,同一条消息会被相同消费者分组的一个消费者所消费。
    • 在广播消费下,同一条消息会被相同消费者分组的所有消费者所消费。
    • 在当前示例里,我们采用的是 DefaultMQPushConsumer 的默认消费方式,集群消费。
  • 2处,设置 consumer 的 RocketMQ Namesrv 地址。这里,是艿艿额外添加的代码。

  • 3处,设置一个新的消费集群,初始的消费进度。目前有三个选项:

    • CONSUME_FROM_FIRST_OFFSET :每个 Topic 队列的第一条消息。
    • CONSUME_FROM_LAST_OFFSET :每个 Topic 队列的最后一条消息。
    • CONSUME_FROM_TIMESTAMP :每个 Topic 队列的指定时间开始的消息。
    • 注意,只针对新的消费集群。如果一个集群每个 Topic 已经有消费进度,则继续使用该消费进度。仔细理解一下哈~
  • 4处,设置订阅 "TopicTest" 主题的消息。有一定一定要注意!!!消费者组的消费者实例必须订阅完全相同的 Topic + Tag

  • 5处,添加消息监听器。这里我们采用的是 MessageListenerConcurrently 并发消费消息的监听器。如果胖友需要实现顺序消费消息,需要使用 MessageListenerOrderly 顺序消费的监听器。

  • 6处,启动 consumer 消费者。此时,Consumer 就开始正式的消费消息啦。。