org.apache.rocketmq.namesrv.NamesrvStartup

程序入口

定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//定时扫描不可用的broker,同时删除不可用的broker,同时打印相关日志
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);

//定时将kv的配置信息输出到info日志中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
requestHeader.setBodyCrc32(bodyCrc32);

requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);

Broker

定时任务

1
2
3
4
5
6
7
8
9
10
11
12
//每30秒循环 Broker 发送心跳包
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

消息队列如何进行负载 ?
消息发送如何实现高可用 ?
批量消息发送如何实现一致性?
2 )消息队列负载机制
消息生产者在发送消息时,如果本地路由表中未缓存 topic 的路由信息,向 NameServer 发送获取路由信息请求,更新本地路由信息表,并且消息生产者每隔 30s 从 NameServer 更新路由表 。
3 )消息发送异常机制
消息发送高可用主要通过两个手段 : 重试与 Broker 规避 。 Brok巳r 规避就是在一次消息
发送过程中发现错误,在某一时间段内,消息生产者不会选择该 Broker(消息服务器)上的
消息队列,提高发送消息的成功率 。
4 )批量消息发送
RocketMQ 支持将 同一主题下 的多条消息一次性发送到消息服务端 。

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//消息发送组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//消息的topic
requestHeader.setTopic(msg.getTopic());
//消息的默认topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//消息的默认queue数量
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//消息发送的queueid
requestHeader.setQueueId(mq.getQueueId());
//特殊标识
requestHeader.setSysFlag(sysFlag);
//消息创建的时间戳
requestHeader.setBornTimestamp(System.currentTimeMillis());
//标识
requestHeader.setFlag(msg.getFlag());
//消息的扩展属性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息的消费
requestHeader.setReconsumeTimes(0);
//模型
requestHeader.setUnitMode(this.isUnitMode());
//批量
requestHeader.setBatch(msg instanceof MessageBatch);

response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());

MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);

MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, false);
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3);
putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, msgExt.getMsgId());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

1 ) TOTALSIZE : 该消息条目总长度 , 4 字节 。
2 ) MAGICCODE : 魔数, 4 字节 。 固定值 Oxdaa320a7 。
3 ) BODYCRC : 消息体 ere 校验码, 4 字节 。
4 ) QUEUEID : 消息消费队列 ID , 4 字节 。
5 ) FLAG : 消息 FLAG , RocketMQ 不做处理 , 供应用程序使用,默认 4 字节 。
6 ) QUEUEOFFSET :消息在消息消费队列的偏移量 , 8 字节 。
7 ) PHYSICALOFFSET : 消息在 CommitLog 文件中的偏移量 , 8 字节 。
8 ) SYSFLAG : 消息系统 Flag ,例如是否压缩 、 是否是事务消息等 , 4 字节 。
9 ) BORNTIMESTAMP : 消息生产者调用消息发送 API 的时间戳, 8 字节 。
10 ) BORNHOST :消息发送者 IP 、端 口 号, 8 字节 。
11 ) STORETIMESTAMP : 消息存储时间戳, 8 字节 。
12 ) STOREHOSTADDRESS: Broker 服务器 IP+ 端 口 号, 8 字节 。
13 ) 阻CONSUMETIMES : 消息重试次数, 4 字节 。
14 ) Prepared Transaction Offset : 事务消息物理偏移量 , 8 字节 。
15 ) BodyLength :消息体长度, 4 字节 。
16 ) Body : 消息体内容,长度为 bodyLenth 中存储的值。
17 ) TopieLength : 主题存储长度, 1 字节 ,表示主题名称不能超过 255 个字符 。
18) Topie : 主题,长度为 TopieLength 中存储的值。
19 ) PropertiesLength : 消息属性长度 , 2 字节 , 表示消息属性长度不能超过 6 553 6 个
字符 。
20 ) Properties : 消息属性,长度为 PropertiesLength 中存储的值 。

DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLedgerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLedgerFileStore.addAppendHook(appendHook);

org.apache.rocketmq.common.namesrv.NamesrvConfig

配置项 key 默认值 说明
rocketmq 主目录 rocketmqHome System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)) 可以通过 -Drocketmq.home.dir=path 或通过设置环境变量 ROCKETMQ_HOME 来配置 RocketMQ 的主目录
NameServer存储 KV 配置属性的持久化路径 kvConfigPath System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json" NameServer 存储 KV 配置属性的持久化路径
NameServer 默认配置文件路径 configStorePath System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties" NameServer 默认配置文件路径,不生效. NameServer 启动时如果要通过配置文件配置 NameServer 启动属性的话,请使用 -c 选项
productEnvName center
集群测试 clusterTest false 是否开启集群测试
顺序消息 orderMessageEnable false 是否支持顺序消息,默认是不支持

org.apache.rocketmq.remoting.netty.NettyServerConfig

配置项 默认值类型 默认值 说明
listenPort int 8888 服务端监听端口,NameServer 监昕端口,该值默认会被初始化为 9876
serverWorkerThreads int 8 Netty 业务线程池线程个数
serverCallbackExecutorThreads int 0 Netty public 任务线程池线程个数,Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由 public 线程池执行
serverSelectorThreads int 3 IO线程池线程个数,主要是 NameServer 、 Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方
serverOnewaySemaphoreValue int 256 send oneway 消息请求井发度( Broker 端参数)
serverAsyncSemaphoreValue int 64 异步消息发送最大并发度( Broker 端参数)
serverChannelMaxIdleTimeSeconds int 120 网络连接最大空闲时间,默认120s。如果连接空闲时间超过该参数设置的值,连接将被关闭
serverSocketSndBufSize int 65535 网络 socket 发送缓存区大小,默认64k
serverSocketRcvBufSize int 65535 网络 socket 接收缓存区大小,默认64k
serverPooledByteBufAllocatorEnable boolean true ByteBuffer 是否开启缓存,建议开启
useEpollNativeSelector boolean false 是否启用 EpollIO 模型,Linux 环境建议开启
rocketmqHome class java.lang.String System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV))
namesrvAddr class java.lang.String System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV))
brokerIP1 class java.lang.String RemotingUtil.getLocalAddress()
brokerIP2 class java.lang.String RemotingUtil.getLocalAddress()
brokerName class java.lang.String localHostName()
brokerClusterName class java.lang.String DefaultCluster
brokerId long 1
brokerPermission int 1
defaultTopicQueueNums int 8
autoCreateTopicEnable boolean true
clusterTopicEnable boolean true
brokerTopicEnable boolean true
autoCreateSubscriptionGroup boolean true
messageStorePlugIn class java.lang.String
msgTraceTopicName class java.lang.String TopicValidator.RMQ_SYS_TRACE_TOPIC
traceTopicEnable boolean false
sendMessageThreadPoolNums int 1
pullMessageThreadPoolNums int 32
processReplyMessageThreadPoolNums int 32
queryMessageThreadPoolNums int 16
adminBrokerThreadPoolNums int 16
clientManageThreadPoolNums int 32
consumerManageThreadPoolNums int 32
heartbeatThreadPoolNums int 8
endTransactionThreadPoolNums int 24
flushConsumerOffsetInterval int 5000
flushConsumerOffsetHistoryInterval int 60000
rejectTransactionMessage boolean false
fetchNamesrvAddrByAddressServer boolean false
sendThreadPoolQueueCapacity int 10000
pullThreadPoolQueueCapacity int 100000
replyThreadPoolQueueCapacity int 10000
queryThreadPoolQueueCapacity int 20000
clientManagerThreadPoolQueueCapacity int 1000000
consumerManagerThreadPoolQueueCapacity int 1000000
heartbeatThreadPoolQueueCapacity int 50000
endTransactionPoolQueueCapacity int 100000
filterServerNums int 0
longPollingEnable boolean true
shortPollingTimeMills long 1000
notifyConsumerIdsChangedEnable boolean true
highSpeedMode boolean false
commercialEnable boolean true
commercialTimerCount int 1
commercialTransCount int 1
commercialBigCount int 1
commercialBaseCount int 1
transferMsgByHeap boolean true
maxDelayTime int 40
regionId class java.lang.String MixAll.DEFAULT_TRACE_REGION_ID
registerBrokerTimeoutMills int 6000
slaveReadEnable boolean false
disableConsumeIfConsumerReadSlowly boolean false
consumerFallbehindThreshold long 17179869184
brokerFastFailureEnable boolean true
waitTimeMillsInSendQueue long 200
waitTimeMillsInPullQueue long 5000
waitTimeMillsInHeartbeatQueue long 31000
waitTimeMillsInTransactionQueue long 3000
startAcceptSendRequestTimeStamp long 0
traceOn boolean true
enableCalcFilterBitMap boolean false
expectConsumerNumUseFilter int 32
maxErrorRateOfBloomFilter int 20
filterDataCleanTimeSpan long 86400000
filterSupportRetry boolean false
enablePropertyFilter boolean false
compressedRegister boolean false
forceRegister boolean true
registerNameServerPeriod int 30000
transactionTimeOut long 6000
transactionCheckMax int 15
transactionCheckInterval long 60000
aclEnable boolean false
storeReplyMessageEnable boolean true
autoDeleteUnusedStats boolean false