成都网站建设设计

将想法与焦点和您一起共享

RocketMQ存储中如何实现日志文件创建与映射

这篇文章将为大家详细讲解有关RocketMQ存储中如何实现日志文件创建与映射,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

成都创新互联2013年开创至今,是专业互联网技术服务公司,拥有项目成都网站建设、成都做网站网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元格尔木做网站,已为上家服务,为格尔木各地企业和个人服务,联系电话:028-86922220

1.问题描述

日志目录(可配置)/data/rocketmq/store/commitlog会有20位长度的日志文件。
1.日志文件什么时候创建的?
2.日志文件创建流程是什么?
3.日志文件和内存映射是怎么样的?

-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:50 00000117290188144640
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:52 00000117291261886464
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:54 00000117292335628288
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117293409370112
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:57 00000117294483111936
-rw-rw-r-- 1 baseuser baseuser 1073741824 Jun 27 22:56 00000117295556853760
2.日志映射相关类初始化

在Broker启动时实例化了两个类DefaultMessageStore和AllocateMappedFileService。
AllocateMappedFileService是线程类继承了Runnable接口,该线程类持有DefaultMessageStore的引用(即:可操作管理DefaultMessageStore),并启动该线程类。
调用链

//Broker启动时调用
@1 BrokerStartup#main#createBrokerController()
boolean initResult = controller.initialize();
@2 Controller#initialize()
this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig);
@3 DefaultMessageStore#DefaultMessageStore()
//将DefaultMessageStore自身引用传给AllocateMappedFileService
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.allocateMappedFileService.start();//启动该线程类
@4 class AllocateMappedFileService extends ServiceThread
public AllocateMappedFileService(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
}
3.线程类一直运行在干啥

既然在Broker启动时该线程类AllocateMappedFileService就启动了,那么在做什么呢?run方法为while循环,即:只要服务不停止并且mmapOperation()返回true则一直运行

//异步处理,调用mmapOperation完成请求的处理
public void run() {
log.info(this.getServiceName() + " service started");
//while循环,只要服务部停止即调用 mmapOperation方法
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}

mmapOperation方法流程图

RocketMQ存储中如何实现日志文件创建与映射

小结:mmapOperation方法主要做了两件事:初始化MappedFile和预热MappedFile。只要服务不停止和线程不被中断,这个过程一直重复运行。

4.提交映射文件请求(AllocateRequest)

既然AllocateMappedFileService一直从容器(优先级队列和ConcurrentHashMap)中获取AllocateRequest。AllocateRequest是什么时候产生并放到容器中的呢?
RocketMQ消息存储概览【源码笔记】中写入commitLog流程,获取最新的日志文件。
调用链

@1 CommitLog#putMessage
//文件已满或者没有映射文件重新创建一个文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
// Mark: NewFile may be cause noise
}
@2 MappedFileQueue#getLastMappedFile

流程图

RocketMQ存储中如何实现日志文件创建与映射

小结:MappedFileQueue#getLastMappedFile会向线程类AllocateMappedFileServic提交两个映射文件创建请求:分别为nextFilePath和nextNextFilePath;如果线程类AllocateMappedFileServic为null,则直接new一个MappedFile,此时只会创建一个文件。

下面为提交两个映射文件请求流程,即:

AllocateMappedFileServic#putRequestAndReturnMappedFile


调用链

@1 MappedFileQueue#getLastMappedFile
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
}
@2 AllocateMappedFileService#putRequestAndReturnMappedFile

流程图

RocketMQ存储中如何实现日志文件创建与映射

小结:处理提交的映射文件请求指的是:
实例化两个AllocateRequest并把他们提交到requestTable(ConcurrentHashMap)
和requestQueue(PriorityBlockingQueue)中,等待5秒,此段时间线程会从这两个容器中获取请求并创建MappedFile,并将结果返回。

5.MappedFile初始化

本段梳理下上文中mmapOperation方法流程图第5步初始化MappedFile的流程
调用链

@1 AllocateMappedFileService#mmapOperation
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
@2 MappedFile#init(fileName, fileSize);

流程图

RocketMQ存储中如何实现日志文件创建与映射

MappedFile主要干了两件事:1.创建日志文件。2.并将文件映射到内存中。

关于“RocketMQ存储中如何实现日志文件创建与映射”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。


文章标题:RocketMQ存储中如何实现日志文件创建与映射
本文链接:http://chengdu.cdxwcx.cn/article/jhhhjo.html