nsq阅读(1)——概述
Note 基于nsq v1.3.0 简介 NSQ是类似kafka、rabbitmq那样的消息队列系统,关于他怎么高性能,怎么好上手这些都不必多说,都是吹逼。这篇主要介绍一下nsq的整个大致架构,建立一个概念,方便后续的源码分析有迹可循。 架构 NSQ由三个守护进程组成: ...
Note 基于nsq v1.3.0 简介 NSQ是类似kafka、rabbitmq那样的消息队列系统,关于他怎么高性能,怎么好上手这些都不必多说,都是吹逼。这篇主要介绍一下nsq的整个大致架构,建立一个概念,方便后续的源码分析有迹可循。 架构 NSQ由三个守护进程组成: ...
开篇 DiskQueue 是 NSQ 中的持久化组件,承担着将消息存储到磁盘的任务,它以高效、可靠的方式在内存与磁盘之间进行切换。在开始分析 nsq 那几个大件之前,先深入分析下 DiskQueue 的核心设计和实现,解密其背后的关键技术点,包括写入机制、读取逻辑以及在异常场景下的恢复策略。 设计 nsq将diskqueue抽取出来作为一个库来维护:https://github.com/nsqio/go-diskqueue 里面只有一个源码文件diskqueue.go,里面核心结构体是基于磁盘的消息队列diskQueue,实现了nsqd中的BackendQueue接口,比较简单: type BackendQueue interface { Put([]byte) error ReadChan() <-chan []byte Close() error Delete() error Depth() int64 Empty() error } 消息以字节数组的方式传递,接收端需要对字节数组进行解码,存储端的实现就能相对简单。 diskqueue在创建时会运行一个叫做ioLoop的协程用于处理io相关的事务,并与主协程通过以下各个chan进行通信: type diskQueue struct { ... // 队列大小(即depth在nsq中表示的就是可读消息数) depthChan chan int64 // 写磁盘的数据 writeChan chan []byte // 写磁盘的结果 writeResponseChan chan error // 清空队列 emptyChan chan int // 清空队列的结果 emptyResponseChan chan error // 退出 exitChan chan int // 主协程等待ioLoop退出 exitSyncChan chan int } logrotation与metadata diskqueue还用了rotation的方式来限制单个文件的大小(拓展:在linux中有一个实现logrotation命令叫logrotate),这样便于在消息读出之后,通过直接删除文件的方式来删除不再需要的数据。为了实现logrotation,还会维护一些元数据: 队列大小(消息数) 正在读第几个文件中的第几个字节 正在写第几个文件中的第几个字节 代码中通过persistMetaData方法来原子性地持久化这些元数据。元数据非常重要,如果不是原子性地写入,比如写一半就宕机了,那么所有数据将会不可用。原子性是通过写临时文件、将临时文件重命名为最终文件名来保证的 写数据 使用接口方法Put写入: func (d *diskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.writeChan <- data return <-d.writeResponseChan } 由ioLoop协程处理数据的写入,调用writeOne进行处理: func (d *diskQueue) writeOne(data []byte) error { var err error dataLen := int32(len(data)) totalBytes := int64(4 + dataLen) if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { // 写入的大小将超出单个文件大小限制,触发logrotation,即写入下一个新文件 if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } d.writeFileNum++ d.writePos = 0 // 每次logrotation前,都sync将最后一个文件以及元数据落盘 err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } // 关闭序号为i的数据文件 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } } // 创建序号为i+1的数据文件 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } } // 数据大小写入缓存 d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 数据写入缓存 _, err = d.writeBuf.Write(data) if err != nil { return err } // 缓存写入文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err } d.writePos += totalBytes d.depth += 1 return err } 这里要注意在整个rotation的过程中并不保证原子性,而是分为三步: ...
Note 基于nsq v1.3.0 执行流程 在第一篇中,主要对nsq中涉及到的主要组件,以及数据流在这些组件中的流动进行了简单说明。在本篇中,我们跟随上一篇最后给出的demo,对应到nsqd的代码中,看一下执行流程是怎么样的。 上篇最后我们向4151端口发起http请求来发送消息: curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic" 这个调用的是http接口/pub,对应nsqd源码http.go中的doPUB函数: ...