org.apache.rocketmq.namesrv.NamesrvStartup
程序入口
定时任务
1 | //定时扫描不可用的broker,同时删除不可用的broker,同时打印相关日志 |
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
requestHeader.setBodyCrc32(bodyCrc32);
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
Broker
定时任务
1 | //每30秒循环 Broker 发送心跳包 |
消息队列如何进行负载 ?
消息发送如何实现高可用 ?
批量消息发送如何实现一致性?
2 )消息队列负载机制
消息生产者在发送消息时,如果本地路由表中未缓存 topic 的路由信息,向 NameServer 发送获取路由信息请求,更新本地路由信息表,并且消息生产者每隔 30s 从 NameServer 更新路由表 。
3 )消息发送异常机制
消息发送高可用主要通过两个手段 : 重试与 Broker 规避 。 Brok巳r 规避就是在一次消息
发送过程中发现错误,在某一时间段内,消息生产者不会选择该 Broker(消息服务器)上的
消息队列,提高发送消息的成功率 。
4 )批量消息发送
RocketMQ 支持将 同一主题下 的多条消息一次性发送到消息服务端 。
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//消息发送组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
//消息的topic
requestHeader.setTopic(msg.getTopic());
//消息的默认topic
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//消息的默认queue数量
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
//消息发送的queueid
requestHeader.setQueueId(mq.getQueueId());
//特殊标识
requestHeader.setSysFlag(sysFlag);
//消息创建的时间戳
requestHeader.setBornTimestamp(System.currentTimeMillis());
//标识
requestHeader.setFlag(msg.getFlag());
//消息的扩展属性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
//消息的消费
requestHeader.setReconsumeTimes(0);
//模型
requestHeader.setUnitMode(this.isUnitMode());
//批量
requestHeader.setBatch(msg instanceof MessageBatch);
response.setOpaque(request.getOpaque());
response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
responseHeader.setQueueId(queueIdInt);
responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, false);
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 3);
putProperty(msg, MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, msgExt.getMsgId());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
1 ) TOTALSIZE : 该消息条目总长度 , 4 字节 。
2 ) MAGICCODE : 魔数, 4 字节 。 固定值 Oxdaa320a7 。
3 ) BODYCRC : 消息体 ere 校验码, 4 字节 。
4 ) QUEUEID : 消息消费队列 ID , 4 字节 。
5 ) FLAG : 消息 FLAG , RocketMQ 不做处理 , 供应用程序使用,默认 4 字节 。
6 ) QUEUEOFFSET :消息在消息消费队列的偏移量 , 8 字节 。
7 ) PHYSICALOFFSET : 消息在 CommitLog 文件中的偏移量 , 8 字节 。
8 ) SYSFLAG : 消息系统 Flag ,例如是否压缩 、 是否是事务消息等 , 4 字节 。
9 ) BORNTIMESTAMP : 消息生产者调用消息发送 API 的时间戳, 8 字节 。
10 ) BORNHOST :消息发送者 IP 、端 口 号, 8 字节 。
11 ) STORETIMESTAMP : 消息存储时间戳, 8 字节 。
12 ) STOREHOSTADDRESS: Broker 服务器 IP+ 端 口 号, 8 字节 。
13 ) 阻CONSUMETIMES : 消息重试次数, 4 字节 。
14 ) Prepared Transaction Offset : 事务消息物理偏移量 , 8 字节 。
15 ) BodyLength :消息体长度, 4 字节 。
16 ) Body : 消息体内容,长度为 bodyLenth 中存储的值。
17 ) TopieLength : 主题存储长度, 1 字节 ,表示主题名称不能超过 255 个字符 。
18) Topie : 主题,长度为 TopieLength 中存储的值。
19 ) PropertiesLength : 消息属性长度 , 2 字节 , 表示消息属性长度不能超过 6 553 6 个
字符 。
20 ) Properties : 消息属性,长度为 PropertiesLength 中存储的值 。
DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
assert bodyOffset == DLedgerEntry.BODY_OFFSET;
buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
buffer.putLong(entry.getPos() + bodyOffset);
};
dLedgerFileStore.addAppendHook(appendHook);
org.apache.rocketmq.common.namesrv.NamesrvConfig
| 配置项 | key | 默认值 | 说明 |
| rocketmq 主目录 | rocketmqHome | System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)) | 可以通过 -Drocketmq.home.dir=path 或通过设置环境变量 ROCKETMQ_HOME 来配置 RocketMQ 的主目录 |
| NameServer存储 KV 配置属性的持久化路径 | kvConfigPath | System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json" | NameServer 存储 KV 配置属性的持久化路径 |
| NameServer 默认配置文件路径 | configStorePath | System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties" | NameServer 默认配置文件路径,不生效. NameServer 启动时如果要通过配置文件配置 NameServer 启动属性的话,请使用 -c 选项 |
| productEnvName | center | ||
| 集群测试 | clusterTest | false | 是否开启集群测试 |
| 顺序消息 | orderMessageEnable | false | 是否支持顺序消息,默认是不支持 |
org.apache.rocketmq.remoting.netty.NettyServerConfig
| 配置项 | 默认值类型 | 默认值 | 说明 |
| listenPort | int | 8888 | 服务端监听端口,NameServer 监昕端口,该值默认会被初始化为 9876 |
| serverWorkerThreads | int | 8 | Netty 业务线程池线程个数 |
| serverCallbackExecutorThreads | int | 0 | Netty public 任务线程池线程个数,Netty 网络设计,根据业务类型会创建不同的线程池,比如处理消息发送、消息消费、心跳检测等。如果该业务类型(RequestCode)未注册线程池,则由 public 线程池执行 |
| serverSelectorThreads | int | 3 | IO线程池线程个数,主要是 NameServer 、 Broker 端解析请求、返回相应的线程个数,这类线程主要是处理网络请求的,解析请求包,然后转发到各个业务线程池完成具体的业务操作,然后将结果再返回调用方 |
| serverOnewaySemaphoreValue | int | 256 | send oneway 消息请求井发度( Broker 端参数) |
| serverAsyncSemaphoreValue | int | 64 | 异步消息发送最大并发度( Broker 端参数) |
| serverChannelMaxIdleTimeSeconds | int | 120 | 网络连接最大空闲时间,默认120s。如果连接空闲时间超过该参数设置的值,连接将被关闭 |
| serverSocketSndBufSize | int | 65535 | 网络 socket 发送缓存区大小,默认64k |
| serverSocketRcvBufSize | int | 65535 | 网络 socket 接收缓存区大小,默认64k |
| serverPooledByteBufAllocatorEnable | boolean | true | ByteBuffer 是否开启缓存,建议开启 |
| useEpollNativeSelector | boolean | false | 是否启用 EpollIO 模型,Linux 环境建议开启 |
| rocketmqHome | class java.lang.String | System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)) | |
| namesrvAddr | class java.lang.String | System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV)) | |
| brokerIP1 | class java.lang.String | RemotingUtil.getLocalAddress() | |
| brokerIP2 | class java.lang.String | RemotingUtil.getLocalAddress() | |
| brokerName | class java.lang.String | localHostName() | |
| brokerClusterName | class java.lang.String | DefaultCluster | |
| brokerId | long | 1 | |
| brokerPermission | int | 1 | |
| defaultTopicQueueNums | int | 8 | |
| autoCreateTopicEnable | boolean | true | |
| clusterTopicEnable | boolean | true | |
| brokerTopicEnable | boolean | true | |
| autoCreateSubscriptionGroup | boolean | true | |
| messageStorePlugIn | class java.lang.String | ||
| msgTraceTopicName | class java.lang.String | TopicValidator.RMQ_SYS_TRACE_TOPIC | |
| traceTopicEnable | boolean | false | |
| sendMessageThreadPoolNums | int | 1 | |
| pullMessageThreadPoolNums | int | 32 | |
| processReplyMessageThreadPoolNums | int | 32 | |
| queryMessageThreadPoolNums | int | 16 | |
| adminBrokerThreadPoolNums | int | 16 | |
| clientManageThreadPoolNums | int | 32 | |
| consumerManageThreadPoolNums | int | 32 | |
| heartbeatThreadPoolNums | int | 8 | |
| endTransactionThreadPoolNums | int | 24 | |
| flushConsumerOffsetInterval | int | 5000 | |
| flushConsumerOffsetHistoryInterval | int | 60000 | |
| rejectTransactionMessage | boolean | false | |
| fetchNamesrvAddrByAddressServer | boolean | false | |
| sendThreadPoolQueueCapacity | int | 10000 | |
| pullThreadPoolQueueCapacity | int | 100000 | |
| replyThreadPoolQueueCapacity | int | 10000 | |
| queryThreadPoolQueueCapacity | int | 20000 | |
| clientManagerThreadPoolQueueCapacity | int | 1000000 | |
| consumerManagerThreadPoolQueueCapacity | int | 1000000 | |
| heartbeatThreadPoolQueueCapacity | int | 50000 | |
| endTransactionPoolQueueCapacity | int | 100000 | |
| filterServerNums | int | 0 | |
| longPollingEnable | boolean | true | |
| shortPollingTimeMills | long | 1000 | |
| notifyConsumerIdsChangedEnable | boolean | true | |
| highSpeedMode | boolean | false | |
| commercialEnable | boolean | true | |
| commercialTimerCount | int | 1 | |
| commercialTransCount | int | 1 | |
| commercialBigCount | int | 1 | |
| commercialBaseCount | int | 1 | |
| transferMsgByHeap | boolean | true | |
| maxDelayTime | int | 40 | |
| regionId | class java.lang.String | MixAll.DEFAULT_TRACE_REGION_ID | |
| registerBrokerTimeoutMills | int | 6000 | |
| slaveReadEnable | boolean | false | |
| disableConsumeIfConsumerReadSlowly | boolean | false | |
| consumerFallbehindThreshold | long | 17179869184 | |
| brokerFastFailureEnable | boolean | true | |
| waitTimeMillsInSendQueue | long | 200 | |
| waitTimeMillsInPullQueue | long | 5000 | |
| waitTimeMillsInHeartbeatQueue | long | 31000 | |
| waitTimeMillsInTransactionQueue | long | 3000 | |
| startAcceptSendRequestTimeStamp | long | 0 | |
| traceOn | boolean | true | |
| enableCalcFilterBitMap | boolean | false | |
| expectConsumerNumUseFilter | int | 32 | |
| maxErrorRateOfBloomFilter | int | 20 | |
| filterDataCleanTimeSpan | long | 86400000 | |
| filterSupportRetry | boolean | false | |
| enablePropertyFilter | boolean | false | |
| compressedRegister | boolean | false | |
| forceRegister | boolean | true | |
| registerNameServerPeriod | int | 30000 | |
| transactionTimeOut | long | 6000 | |
| transactionCheckMax | int | 15 | |
| transactionCheckInterval | long | 60000 | |
| aclEnable | boolean | false | |
| storeReplyMessageEnable | boolean | true | |
| autoDeleteUnusedStats | boolean | false |
