kafka源码阅读(3)-日志的增删改查
Note 基于开源 kafka 2.5 版本。 如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码 ...
Note 基于开源 kafka 2.5 版本。 如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码 ...
Note 基于开源 kafka 2.5 版本。 如无特殊说明,文中代码片段将删除 debug 信息、异常触发、英文注释等代码,以便观看核心代码。 ...
Note 基于开源 kafka 2.5 版本 。 如无特殊说明,文中代码片段将删除 debug 信息、异常的代码 path 等代码,以便观看核心代码。 ...
Note 基于nsq v1.3.0 简介 NSQ是类似kafka、rabbitmq那样的消息队列系统,关于他怎么高性能,怎么好上手这些都不必多说,都是吹逼。这篇主要介绍一下nsq的整个大致架构,建立一个概念,方便后续的源码分析有迹可循。 架构 NSQ由三个守护进程组成: ...
开篇 DiskQueue 是 NSQ 中的持久化组件,承担着将消息存储到磁盘的任务,它以高效、可靠的方式在内存与磁盘之间进行切换。在开始分析 nsq 那几个大件之前,先深入分析下 DiskQueue 的核心设计和实现,解密其背后的关键技术点,包括写入机制、读取逻辑以及在异常场景下的恢复策略。 设计 nsq将diskqueue抽取出来作为一个库来维护:https://github.com/nsqio/go-diskqueue 里面只有一个源码文件diskqueue.go,里面核心结构体是基于磁盘的消息队列diskQueue,实现了nsqd中的BackendQueue接口,比较简单: type BackendQueue interface { Put([]byte) error ReadChan() <-chan []byte Close() error Delete() error Depth() int64 Empty() error } 消息以字节数组的方式传递,接收端需要对字节数组进行解码,存储端的实现就能相对简单。 diskqueue在创建时会运行一个叫做ioLoop的协程用于处理io相关的事务,并与主协程通过以下各个chan进行通信: type diskQueue struct { ... // 队列大小(即depth在nsq中表示的就是可读消息数) depthChan chan int64 // 写磁盘的数据 writeChan chan []byte // 写磁盘的结果 writeResponseChan chan error // 清空队列 emptyChan chan int // 清空队列的结果 emptyResponseChan chan error // 退出 exitChan chan int // 主协程等待ioLoop退出 exitSyncChan chan int } logrotation与metadata diskqueue还用了rotation的方式来限制单个文件的大小(拓展:在linux中有一个实现logrotation命令叫logrotate),这样便于在消息读出之后,通过直接删除文件的方式来删除不再需要的数据。为了实现logrotation,还会维护一些元数据: 队列大小(消息数) 正在读第几个文件中的第几个字节 正在写第几个文件中的第几个字节 代码中通过persistMetaData方法来原子性地持久化这些元数据。元数据非常重要,如果不是原子性地写入,比如写一半就宕机了,那么所有数据将会不可用。原子性是通过写临时文件、将临时文件重命名为最终文件名来保证的 写数据 使用接口方法Put写入: func (d *diskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.writeChan <- data return <-d.writeResponseChan } 由ioLoop协程处理数据的写入,调用writeOne进行处理: func (d *diskQueue) writeOne(data []byte) error { var err error dataLen := int32(len(data)) totalBytes := int64(4 + dataLen) if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { // 写入的大小将超出单个文件大小限制,触发logrotation,即写入下一个新文件 if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } d.writeFileNum++ d.writePos = 0 // 每次logrotation前,都sync将最后一个文件以及元数据落盘 err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } // 关闭序号为i的数据文件 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } } // 创建序号为i+1的数据文件 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } } // 数据大小写入缓存 d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 数据写入缓存 _, err = d.writeBuf.Write(data) if err != nil { return err } // 缓存写入文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err } d.writePos += totalBytes d.depth += 1 return err } 这里要注意在整个rotation的过程中并不保证原子性,而是分为三步: ...