kafka源码阅读(1)-日志段的读/写/恢复

Note 基于开源 kafka 2.5 版本 。 如无特殊说明,文中代码片段将删除 debug 信息、异常的代码 path 等代码,以便观看核心代码。 ...

四月 19, 2025 · by NOSAE

nsq阅读(1)——概述

Note 基于nsq v1.3.0 简介 NSQ是类似kafka、rabbitmq那样的消息队列系统,关于他怎么高性能,怎么好上手这些都不必多说,都是吹逼。这篇主要介绍一下nsq的整个大致架构,建立一个概念,方便后续的源码分析有迹可循。 架构 NSQ由三个守护进程组成: ...

十一月 23, 2024 · by NOSAE

nsq阅读(2)——diskqueue

开篇 DiskQueue 是 NSQ 中的持久化组件,承担着将消息存储到磁盘的任务,它以高效、可靠的方式在内存与磁盘之间进行切换。在开始分析 nsq 那几个大件之前,先深入分析下 DiskQueue 的核心设计和实现,解密其背后的关键技术点,包括写入机制、读取逻辑以及在异常场景下的恢复策略。 设计 nsq将diskqueue抽取出来作为一个库来维护:https://github.com/nsqio/go-diskqueue 里面只有一个源码文件diskqueue.go,里面核心结构体是基于磁盘的消息队列diskQueue,实现了nsqd中的BackendQueue接口,比较简单: type BackendQueue interface { Put([]byte) error ReadChan() <-chan []byte Close() error Delete() error Depth() int64 Empty() error } 消息以字节数组的方式传递,接收端需要对字节数组进行解码,存储端的实现就能相对简单。 diskqueue在创建时会运行一个叫做ioLoop的协程用于处理io相关的事务,并与主协程通过以下各个chan进行通信: type diskQueue struct { ... // 队列大小(即depth在nsq中表示的就是可读消息数) depthChan chan int64 // 写磁盘的数据 writeChan chan []byte // 写磁盘的结果 writeResponseChan chan error // 清空队列 emptyChan chan int // 清空队列的结果 emptyResponseChan chan error // 退出 exitChan chan int // 主协程等待ioLoop退出 exitSyncChan chan int } logrotation与metadata diskqueue还用了rotation的方式来限制单个文件的大小(拓展:在linux中有一个实现logrotation命令叫logrotate),这样便于在消息读出之后,通过直接删除文件的方式来删除不再需要的数据。为了实现logrotation,还会维护一些元数据: 队列大小(消息数) 正在读第几个文件中的第几个字节 正在写第几个文件中的第几个字节 代码中通过persistMetaData方法来原子性地持久化这些元数据。元数据非常重要,如果不是原子性地写入,比如写一半就宕机了,那么所有数据将会不可用。原子性是通过写临时文件、将临时文件重命名为最终文件名来保证的 写数据 使用接口方法Put写入: func (d *diskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.writeChan <- data return <-d.writeResponseChan } 由ioLoop协程处理数据的写入,调用writeOne进行处理: func (d *diskQueue) writeOne(data []byte) error { var err error dataLen := int32(len(data)) totalBytes := int64(4 + dataLen) if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { // 写入的大小将超出单个文件大小限制,触发logrotation,即写入下一个新文件 if d.readFileNum == d.writeFileNum { d.maxBytesPerFileRead = d.writePos } d.writeFileNum++ d.writePos = 0 // 每次logrotation前,都sync将最后一个文件以及元数据落盘 err = d.sync() if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) } // 关闭序号为i的数据文件 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } } // 创建序号为i+1的数据文件 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } } // 数据大小写入缓存 d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 数据写入缓存 _, err = d.writeBuf.Write(data) if err != nil { return err } // 缓存写入文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err } d.writePos += totalBytes d.depth += 1 return err } 这里要注意在整个rotation的过程中并不保证原子性,而是分为三步: ...

十一月 23, 2024 · by NOSAE

nsq阅读(3)——nsqd

Note 基于nsq v1.3.0 执行流程 在第一篇中,主要对nsq中涉及到的主要组件,以及数据流在这些组件中的流动进行了简单说明。在本篇中,我们跟随上一篇最后给出的demo,对应到nsqd的代码中,看一下执行流程是怎么样的。 上篇最后我们向4151端口发起http请求来发送消息: curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic" 这个调用的是http接口/pub,对应nsqd源码http.go中的doPUB函数: ...

十一月 23, 2024 · by NOSAE

gRPC阅读(4)——负载均衡

负载均衡算法 常见的负载均衡算法如下: RoundRobin(轮询) Weight-RoundRobin(加权轮询) 不同的后端服务器可能机器的配置和当前系统的负载并不相同,因此它们的抗压能力也不相同。给配置高、负载低的机器配置更高的权重,而配置低、负载高的机器,给其分配较低的权重,降低其系统负载,加权轮询能很好地处理这一问题,并将请求顺序且按照权重分配到后端。 Random(随机) Weight-Random(加权随机) 通过系统的随机算法,根据后端服务器的列表随机选取其中的一台服务器进行访问 源地址哈希法 源地址哈希的思想是根据获取客户端的 IP 地址,通过哈希函数计算得到的一个数值,用该数值对服务器列表的大小进行取模运算,得到的结果便是客服端要访问服务器的序号。采用源地址哈希法进行负载均衡,同一 IP 地址的客户端,当后端服务器列表不变时,它每次都会映射到同一台后端服务器进行访问 最小连接数法 最小连接数算法比较灵活和智能,由于后端服务器的配置不尽相同,对于请求的处理有快有慢,它是根据后端服务器当前的连接情况,动态地选取其中当前积压连接数最少的一台服务器来处理当前的请求,尽可能地提高后端服务的利用效率,将负责合理地分流到每一台服务器 Consistent-Hash(一致性哈希算法) 常见的是 Ketama 算法(虚拟节点),该算法是用来解决 cache 失效导致的缓存穿透的问题的,当然也可以适用于 gRPC 长连接的场景 自适应算法(P2C:多选二,二选一):即从可用节点列表中随机选择两个节点,计算它们的负载率,选择负载率较低的进行请求 基于最小负载策略:该策略是 linkerd 的默认负载均衡器。当确定发送请求的位置时,linkerd 随机从负载均衡器池中选择两个副本,并选择两者中最少负载的副本。负载由每个副本的未完成请求数决定。该算法为单个副本提供了可管理的负载上限,与具有相似性能的其他算法相比,开销较少 峰值 EWMA(预测)策略:该算法是上述的变体,同样在发送请求时仍然在两个副本之间进行选择。不一样的是,该算法需要保持观察到的延迟的动态平均值,并且使用它来对每个副本的未完成请求的数量进行加权。这种方法对延迟波动更敏感,并通过向较慢的后端发送更少的请求来允许他们恢复时间(可以通过调参来改变对请求延时的敏感度) gRPC负载均衡 接着上篇,通过DNS进行服务发现获取服务器IP list之后,调用ccResolverWrapper.UpdateState更新状态: // UpdateState is called by resolver implementations to report new state to gRPC // which includes addresses and service config. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { ... return ccr.cc.updateResolverStateAndUnlock(s, nil) } func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error { ... var ret error // 应用最新的从dns发现的ServiceConfig if cc.dopts.disableServiceConfig { channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig) cc.maybeApplyDefaultServiceConfig() } else if s.ServiceConfig == nil { cc.maybeApplyDefaultServiceConfig() } else { if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { configSelector := iresolver.GetConfigSelector(s) if configSelector != nil { if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector") } } else { configSelector = &defaultConfigSelector{sc} } cc.applyServiceConfigAndBalancer(sc, configSelector) } else { ... } } // ServiceConfig的负载均衡配置 var balCfg serviceconfig.LoadBalancingConfig if cc.sc != nil && cc.sc.lbConfig != nil { balCfg = cc.sc.lbConfig } // 负载均衡器 bw := cc.balancerWrapper cc.mu.Unlock() // 应用服务发现结果 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) if ret == nil { ret = uccsErr // prefer ErrBadResolver state since any other error is // currently meaningless to the caller. } return ret } // ccBalancerWrapper解耦ClientConn和Balancer,在updateClientConnState调用时才会创建Balancer // 并且ccBalancerWrapper使用gracefulswitch.Balancer支持Balancer的优雅切换 func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { errCh := make(chan error) uccs := func(ctx context.Context) { defer close(errCh) if ctx.Err() != nil || ccb.balancer == nil { return } name := gracefulswitch.ChildName(ccs.BalancerConfig) if ccb.curBalancerName != name { ccb.curBalancerName = name channelz.Infof(logger, ccb.cc.channelz, "Channel switches to new LB policy %q", name) } // 通知gracefulBalancer状态更新 err := ccb.balancer.UpdateClientConnState(*ccs) if logger.V(2) && err != nil { logger.Infof("error from balancer.UpdateClientConnState: %v", err) } errCh <- err } onFailure := func() { close(errCh) } ccb.serializer.ScheduleOr(uccs, onFailure) return <-errCh } // gsb更新状态 func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error { // 获取最新的balancer balToUpdate := gsb.latestBalancer() gsbCfg, ok := state.BalancerConfig.(*lbConfig) if ok { // Switch to the child in the config unless it is already active. if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() { var err error // 切换到新的balancer balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder) if err != nil { return fmt.Errorf("could not switch to new child balancer: %w", err) } } // Unwrap the child balancer's config. state.BalancerConfig = gsbCfg.childConfig } if balToUpdate == nil { return errBalancerClosed } // 通知真正的Balancer状态更新 return balToUpdate.UpdateClientConnState(state) } // gsb优雅切换balancer func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) { gsb.mu.Lock() if gsb.closed { gsb.mu.Unlock() return nil, errBalancerClosed } bw := &balancerWrapper{ builder: builder, gsb: gsb, lastState: balancer.State{ ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), }, subconns: make(map[balancer.SubConn]bool), } balToClose := gsb.balancerPending // nil if there is no pending balancer if gsb.balancerCurrent == nil { gsb.balancerCurrent = bw } else { gsb.balancerPending = bw } gsb.mu.Unlock() // 关闭旧的balancer balToClose.Close() // 创建新的balancer newBalancer := builder.Build(bw, gsb.bOpts) if newBalancer == nil { gsb.mu.Lock() if gsb.balancerPending != nil { gsb.balancerPending = nil } else { gsb.balancerCurrent = nil } gsb.mu.Unlock() return nil, balancer.ErrBadResolverState } bw.Balancer = newBalancer return bw, nil } // pick_first balancer状态更新 func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { ... cfg, ok := state.BalancerConfig.(pfConfig) ... // 展开endpoints/addrs var addrs []resolver.Address ... // balancer之前维护了子连接 // 如果连接的地址在addrs中,则保持连接 // 否则断开并用addrs重连 if b.subConn != nil { b.cc.UpdateAddresses(b.subConn, addrs) return nil } // balancer之前没有维护子连接 // 创建新的子连接 var subConn balancer.SubConn subConn, err := b.cc.NewSubConn(addrs, balancer.NewSubConnOptions{ StateListener: func(state balancer.SubConnState) { // 子连接状态更新,回调通知balancer b.updateSubConnState(subConn, state) }, }) ... b.subConn = subConn b.state = connectivity.Idle // 通知cc现在是Connecting状态 b.cc.UpdateState(balancer.State{ ConnectivityState: connectivity.Connecting, Picker: &picker{err: balancer.ErrNoSubConnAvailable}, }) b.subConn.Connect() return nil } 可以看到在pick_first策略下,会将所有地址都试一遍,连上其中一个后,后续picker返回的都是那个连接,相当于不做负载均衡: ...

十一月 20, 2024 · by NOSAE