Note基于nsq v1.3.0
执行流程
在第一篇中,主要对nsq中涉及到的主要组件,以及数据流在这些组件中的流动进行了简单说明。在本篇中,我们跟随上一篇最后给出的demo,对应到nsqd的代码中,看一下执行流程是怎么样的。
上篇最后我们向4151端口发起http请求来发送消息:
curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic"
这个调用的是http接口/pub
,对应nsqd源码http.go
中的doPUB
函数:
- 调用
nsqd.GetTopic
获取/创建topic - 调用
NewMesssage
创建message - 调用
topic.PutMessage
将消息投递到topic中
创建topic的时候会启动topic的messagePump协程,负责处理topic的相关事件,包括:
- 处理内存/磁盘消息
- 监听channel数量变更
- 暂停topic
- 优雅退出
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan <-chan []byte
...
for {
select {
case msg = <-memoryMsgChan:
// 获取到内存队列的消息
case buf = <-backendChan:
// 获取到磁盘队列的消息,需要解码
msg, err = decodeMessage(buf)
...
case <-t.channelUpdateChan:
// channel数量发生变化
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
// 暂停channel
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
// 优雅退出
goto exit
}
// 将消息投递到每个channel
for i, channel := range chans {
// 第一个channel直接用msg
// 其它channel拷贝msg
chanMsg := msg
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
// 投递延时消息,用优先队列维护
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
// 投递普通消息,放到channel的内存队列或者磁盘队列
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
以上是topic的所有处理逻辑,外界通过信号,其实最主要的只有「将消息投递到所有绑定的channel」这一步。
而channel的创建时机有三个:
- 程序启动时的
NSQD.LoadMetadata
,读取本地配置文件配置,创建channel - 通过http接口
/channel/create
直接创建channel - client通过发起SUB请求订阅topic、channel的时候创建channel
我们最常见的是client通过SUB订阅topic来创建channel,client即连接到nsqd的消费者:
func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
...
topicName := string(params[1])
channelName := string(params[2])
...
var channel *Channel
for i := 1; ; i++ {
// 获取topic,如果不存在就会创建
topic := p.nsqd.GetTopic(topicName)
// 获取channel,如果不存在就会创建
channel = topic.GetChannel(channelName)
// 将client加入channel中
if err := channel.AddClient(client.ID, client); err != nil {
return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
}
...
break
}
atomic.StoreInt32(&client.State, stateSubscribed)
client.Channel = channel
// 通知client订阅了channel
client.SubEventChan <- channel
return okBytes, nil
}
将client加入channel后,channel中的消息就会负载均衡到所有订阅该channel的client。
SUB只是将topic、channel、client三者关系做了绑定,最后一行将这个channel发送到了client的messagePump当中,client的messagePump是在IOLoop中启动的,messagePump负责处理client相关的各种事件,包括:
- 消息发送控制
- 消息缓冲管理
- 心跳维护
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
var flusherChan <-chan time.Time
var sampleRate int32
// client发送 SUB 命令订阅某个主题时,新的 Channel 会通过这个通道传递
subEventChan := client.SubEventChan
// client发送 IDENTIFY 命令协商client的配置的时候,携带的数据会通过这个通道传递
identifyEventChan := client.IdentifyEventChan
// 控制缓冲刷新的最小时间间隔
// 1. 批量发送优化: 将多个消息缓存起来一起发送,减少系统调用
// 2. 强制刷新控制: 如果消息量很小,确保消息不会在缓冲区停留太久
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
// 心跳间隔,用于探测client的存活状态
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
// 控制消息处理超时
// 1. 限制客户端处理单个消息的最大时间,当消息处理超时时,NSQ 会重新投递该消息
// 2. 确保每个消息都得到正确处理或重试
msgTimeout := client.MsgTimeout
// 是否已经冲刷缓冲区,等下次有数据的时候再置为false,然后ticker到时间后会触发刷新
flushed := true
// 通知IOLoop,messagePump已初始化完毕
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// 上次已冲刷,不需要监听 flusherChan
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// 未冲刷,需要监听 flusherChan
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// 冲刷缓冲区
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
//
case subChannel = <-subEventChan:
// 只能订阅一次
subEventChan = nil
case identifyData := <-identifyEventChan:
// 只能协商一次
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}
程序入口main
这篇进入nsqd程序的源码分析,废话不多说直接看到apps/nsqd/main.go这个程序入口文件。这里用了go-svc库,其实就是对系统信号通知进行了封装,定义回调接口的方式将代码规范化。
// 实现了svc.Service接口
type program struct {
once sync.Once
nsqd *nsqd.NSQD
}
func main() {
prg := &program{}
// 运行
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
logFatal("%s", err)
}
}
Run方法很简单:
func Run(service Service, sig ...os.Signal) error {
env := environment{}
// 回调Service.Init方法
if err := service.Init(env); err != nil {
return err
}
// 回调service.Start方法
if err := service.Start(); err != nil {
return err
}
if len(sig) == 0 {
sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
}
// 监听信号
signalChan := make(chan os.Signal, 1)
signalNotify(signalChan, sig...)
var ctx context.Context
if s, ok := service.(Context); ok {
ctx = s.Context()
} else {
ctx = context.Background()
}
for {
select {
case s := <-signalChan:
// 信号发生
if h, ok := service.(Handler); ok {
// 如果还实现了处理信号的接口,回调Service.Handle
if err := h.Handle(s); err == ErrStop {
goto stop
}
} else {
// 否则默认为退出
goto stop
}
case <-ctx.Done():
goto stop
}
}
stop:
return service.Stop()
}
所以在Start函数中应该再起一个协程去处理主事务,而主协程用来监听信号的发生,并且做优雅退出之类的处理。整个main函数就干了三件事,比较简单就不再贴代码:
- 解析命令行参数并初始化nsqd
- 加载本地的元数据并启动nsqd
- 出错或停止时调用nsqd.Exit优雅退出
初始化nsqd.New
主要做了如下工作:
- 初始化了一些配置参数
- 初始化与nsqlookupd通信的client,以在运行时感知集群的元数据
- 监听4150 tcp端口
- 监听4151 http端口
启动nsqd.Main
上面的“启动nsqd”就是运行nsqd.Main方法:
func (n *NSQD) Main() error {
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
// 通知退出Main方法
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
// 启动TPCServer
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
if n.httpListener != nil {
// 启动httpServer
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
}
// https
...
//
n.waitGroup.Wrap(n.queueScanLoop)
//
n.waitGroup.Wrap(n.lookupLoop)
//
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
err := <-exitCh
return err
}
TPC和HTTP都同时监听,不过干的都是一样的事情,前者是基于TCP,使用自己定义的包格式来传输,后者则是基于HTTP包格式进行传输。默认分别监听4150和4151端口。
协议 | 场景 | 支持的命令示例 |
---|---|---|
TCP | 持续连接、高频通信、大规模消息流处理 | PUB, SUB, RDY, FIN |
HTTP | 临时操作、调试、运维工具、REST API 调用 | /pub, /stats, /ping |
TCP
使用无限循环与每个客户端建立连接,开启协程单独处理每个连接:
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
var wg sync.WaitGroup
for {
// 建立连接
clientConn, err := listener.Accept()
...
wg.Add(1)
// 开启协程处理连接
go func() {
handler.Handle(clientConn)
wg.Done()
}()
}
wg.Wait()
...
}
每个连接建立的时候,开始的4字节为协议约定好的魔数“ V2”,如果不是这个魔数则返回协议错误
func (p *tcpServer) Handle(conn net.Conn) {
// 读取魔数,作为协议的序言
buf := make([]byte, 4)
_, err := io.ReadFull(conn, buf)
protocolMagic := string(buf)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
prot = &protocolV2{nsqd: p.nsqd}
default:
// 魔数不是预期值,直接返回协议错误
...
}
client := prot.NewClient(conn)
p.conns.Store(conn.RemoteAddr(), client)
// IOLoop循环读取并处理客户端的请求
err = prot.IOLoop(client)
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
}
p.conns.Delete(conn.RemoteAddr())
client.Close()
}
确定是这个协议,再接着进入IOLoop读取客户端的请求并执行请求:
func (p *protocolV2) IOLoop(c protocol.Client) error {
var err error
var line []byte
var zeroTime time.Time
client := c.(*clientV2)
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
for {
// 读取一行客户端请求
line, err = client.Reader.ReadSlice('\n')
...
params := bytes.Split(line, separatorBytes)
var response []byte
// 执行请求
response, err = p.Exec(client, params)
...
if response != nil {
// 返回结果给客户端
err = p.Send(client, frameTypeResponse, response)
...
}
}
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
close(client.ExitChan)
if client.Channel != nil {
client.Channel.RemoveClient(client.ID)
}
return err
}
HTTP
HTTP使用了httprouter这个库来进行路由
func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer {
log := http_api.Log(nsqd.logf)
router := httprouter.New()
router.HandleMethodNotAllowed = true
router.PanicHandler = http_api.LogPanicHandler(nsqd.logf)
router.NotFound = http_api.LogNotFoundHandler(nsqd.logf)
router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf)
s := &httpServer{
nsqd: nsqd,
tlsEnabled: tlsEnabled,
tlsRequired: tlsRequired,
router: router,
}
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
// v1 negotiate
router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
...
// only v1
router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
...
// debug
router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
...
return s
}
对于每个路由的Handler还加了一个http_api.Decorate:
type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
// 组装成调用链
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}
Decorate的作用是给Handler增加中间件,比如log中间件:
func Log(logf lg.AppLogFunc) Decorator {
return func(f APIHandler) APIHandler {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
// 记录调用前的时间
start := time.Now()
// 调用
response, err := f(w, req, ps)
// 记录调用后的时间
elapsed := time.Since(start)
status := 200
if e, ok := err.(Err); ok {
status = e.Code
}
// 打印调用耗时
logf(lg.INFO, "%d %s %s (%s) %s",
status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
return response, err
}
}
}
queueScanLoop
这个函数用时间最小堆来管理延时消息和等待确认的消息。这个函数管理了一个协程池,里面的协程叫做queueScanWorker
,他们并发地处理所有channel。
因为channel可能会空闲而没有消息,那么这些worker就应该睡眠等待有消息再起来工作。nsq可能考虑到实时唤醒这些worker比较复杂,于是参考了redis的概率过期算法(probabilistic expiration algorithm),采用轮询的方式去处理到达的消息:每隔一段时间(默认100ms)唤醒worker让他们从本地缓存中随机选择一部分channels(默认20个),发现有消息到达的channel被标记为dirty并处理。每轮如果dirty的channel占总选择的channel的一定比例以上(默认25%),那么继续重复上面的操作,否则睡眠等待下一轮唤醒。
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
// 调整worker池大小
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
// 每隔一段时间唤醒worker
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
// 每隔一段时间刷新channel数量,以调整worker数量
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
// nsqd退出
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
// 随机选取一部分的channels,将它们发送给workers进行处理
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
// 统计标记为dirty的channel数
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
// 如果dirty数到达一定量级,继续选取,不用睡眠
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
...
}
选取一部分channel的时候是随机的,这里用了一个从n个元素中随机选择m个的算法技巧(m<=n):第 i 个元素与后面随机一个元素进行交换,不断重复上述步骤,就得到实现了这样的效果。
而每个worker的工作是从最小堆中获取消息,然后保存到内存或者磁盘队列中,等待消费:
func (c *Channel) put(m *Message) error {
select {
// 保存到内存
case c.memoryMsgChan <- m:
default:
// 保存到磁盘
err := writeMessageToBackend(m, c.backend)
c.nsqd.SetHealth(err)
if err != nil {
c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
消息接收
上一节分析了nsq怎么处理已经接收到的消息的:基于时间最小堆,将消息保存到内存或者磁盘中等待下一步的处理。我们再回过头来看一下nsq怎么接收消息的
在demo中,我们用http接口进行消息的发送:
curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic"
对应的handler代码为:
func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
// TODO: one day I'd really like to just error on chunked requests
// to be able to fail "too big" requests before we even read
if req.ContentLength > s.nsqd.getOpts().MaxMsgSize {
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}
// add 1 so that it's greater than our max when we test for it
// (LimitReader returns a "fake" EOF)
readMax := s.nsqd.getOpts().MaxMsgSize + 1
body, err := io.ReadAll(io.LimitReader(req.Body, readMax))
if err != nil {
return nil, http_api.Err{500, "INTERNAL_ERROR"}
}
if int64(len(body)) == readMax {
return nil, http_api.Err{413, "MSG_TOO_BIG"}
}
if len(body) == 0 {
return nil, http_api.Err{400, "MSG_EMPTY"}
}
reqParams, topic, err := s.getTopicFromQuery(req)
if err != nil {
return nil, err
}
var deferred time.Duration
if ds, ok := reqParams["defer"]; ok {
var di int64
di, err = strconv.ParseInt(ds[0], 10, 64)
if err != nil {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
deferred = time.Duration(di) * time.Millisecond
if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
}
msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
}
return "OK", nil
}
。其实是在IOLoop中,开启的messagePump协程中接收消息。
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
// v2 opportunistically buffers data to clients to reduce write system calls
// we force flush in two cases:
// 1. when the client is not ready to receive messages
// 2. we're buffered and the channel has nothing left to send us
// (ie. we would block in this loop anyway)
//
flushed := true
// signal to the goroutine that started the messagePump
// that we've started up
close(startedChan)
for {
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
select {
case <-flusherChan:
// if this case wins, we're either starved
// or we won the race between other channels...
// in either case, force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
// you can't SUB anymore
subEventChan = nil
case identifyData := <-identifyEventChan:
// you can't IDENTIFY anymore
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg, err := decodeMessage(b)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
heartbeatTicker.Stop()
outputBufferTicker.Stop()
if err != nil {
p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
}
}