Note

基于nsq v1.3.0

执行流程

在第一篇中,主要对nsq中涉及到的主要组件,以及数据流在这些组件中的流动进行了简单说明。在本篇中,我们跟随上一篇最后给出的demo,对应到nsqd的代码中,看一下执行流程是怎么样的。

上篇最后我们向4151端口发起http请求来发送消息:

curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic"

这个调用的是http接口/pub,对应nsqd源码http.go中的doPUB函数:

  1. 调用nsqd.GetTopic获取/创建topic
  2. 调用NewMesssage创建message
  3. 调用topic.PutMessage将消息投递到topic中

创建topic的时候会启动topic的messagePump协程,负责处理topic的相关事件,包括:

  • 处理内存/磁盘消息
  • 监听channel数量变更
  • 暂停topic
  • 优雅退出
func (t *Topic) messagePump() {
	var msg *Message
	var buf []byte
	var err error
	var chans []*Channel
	var memoryMsgChan chan *Message
	var backendChan <-chan []byte

    ...

	for {
		select {
		case msg = <-memoryMsgChan:
            // 获取到内存队列的消息
		case buf = <-backendChan:
            // 获取到磁盘队列的消息,需要解码
			msg, err = decodeMessage(buf)
            ...
		case <-t.channelUpdateChan:
            // channel数量发生变化
			chans = chans[:0]
			t.RLock()
			for _, c := range t.channelMap {
				chans = append(chans, c)
			}
			t.RUnlock()
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.pauseChan:
            // 暂停channel
			if len(chans) == 0 || t.IsPaused() {
				memoryMsgChan = nil
				backendChan = nil
			} else {
				memoryMsgChan = t.memoryMsgChan
				backendChan = t.backend.ReadChan()
			}
			continue
		case <-t.exitChan:
            // 优雅退出
			goto exit
		}

        // 将消息投递到每个channel
		for i, channel := range chans {
            // 第一个channel直接用msg
            // 其它channel拷贝msg
			chanMsg := msg
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
            // 投递延时消息,用优先队列维护
			if chanMsg.deferred != 0 {
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
            // 投递普通消息,放到channel的内存队列或者磁盘队列
			err := channel.PutMessage(chanMsg)
			if err != nil {
				t.nsqd.logf(LOG_ERROR,
					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
					t.name, msg.ID, channel.name, err)
			}
		}
	}

exit:
	t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

以上是topic的所有处理逻辑,外界通过信号,其实最主要的只有「将消息投递到所有绑定的channel」这一步。

而channel的创建时机有三个:

  1. 程序启动时的NSQD.LoadMetadata,读取本地配置文件配置,创建channel
  2. 通过http接口/channel/create直接创建channel
  3. client通过发起SUB请求订阅topic、channel的时候创建channel

我们最常见的是client通过SUB订阅topic来创建channel,client即连接到nsqd的消费者:

func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
	...

	topicName := string(params[1])
	channelName := string(params[2])

	...

	var channel *Channel
	for i := 1; ; i++ {
        // 获取topic,如果不存在就会创建
		topic := p.nsqd.GetTopic(topicName)
        // 获取channel,如果不存在就会创建
		channel = topic.GetChannel(channelName)
        // 将client加入channel中
		if err := channel.AddClient(client.ID, client); err != nil {
			return nil, protocol.NewFatalClientErr(err, "E_SUB_FAILED", "SUB failed "+err.Error())
		}

		...
		break
	}
    
	atomic.StoreInt32(&client.State, stateSubscribed)
	client.Channel = channel
    // 通知client订阅了channel
	client.SubEventChan <- channel

	return okBytes, nil
}

将client加入channel后,channel中的消息就会负载均衡到所有订阅该channel的client。

SUB只是将topic、channel、client三者关系做了绑定,最后一行将这个channel发送到了client的messagePump当中,client的messagePump是在IOLoop中启动的,messagePump负责处理client相关的各种事件,包括:

  1. 消息发送控制
  2. 消息缓冲管理
  3. 心跳维护
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	var err error
	var memoryMsgChan chan *Message
	var backendMsgChan <-chan []byte
	var subChannel *Channel
	var flusherChan <-chan time.Time
	var sampleRate int32

    // client发送 SUB 命令订阅某个主题时,新的 Channel 会通过这个通道传递
	subEventChan := client.SubEventChan
    // client发送 IDENTIFY 命令协商client的配置的时候,携带的数据会通过这个通道传递
	identifyEventChan := client.IdentifyEventChan
    // 控制缓冲刷新的最小时间间隔
    // 1. 批量发送优化: 将多个消息缓存起来一起发送,减少系统调用
    // 2. 强制刷新控制: 如果消息量很小,确保消息不会在缓冲区停留太久
	outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    // 心跳间隔,用于探测client的存活状态
	heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
	heartbeatChan := heartbeatTicker.C
    // 控制消息处理超时
    // 1. 限制客户端处理单个消息的最大时间,当消息处理超时时,NSQ 会重新投递该消息
    // 2. 确保每个消息都得到正确处理或重试
	msgTimeout := client.MsgTimeout

	// 是否已经冲刷缓冲区,等下次有数据的时候再置为false,然后ticker到时间后会触发刷新
	flushed := true

	// 通知IOLoop,messagePump已初始化完毕
	close(startedChan)

	for {
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			memoryMsgChan = nil
			backendMsgChan = nil
			flusherChan = nil
			// force flush
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		} else if flushed {
			// 上次已冲刷,不需要监听 flusherChan
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		} else {
			// 未冲刷,需要监听 flusherChan
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = outputBufferTicker.C
		}

		select {
		case <-flusherChan:
			// 冲刷缓冲区
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		case <-client.ReadyStateChan:
            // 
		case subChannel = <-subEventChan:
            // 只能订阅一次
			subEventChan = nil
		case identifyData := <-identifyEventChan:
            // 只能协商一次
			identifyEventChan = nil

			outputBufferTicker.Stop()
			if identifyData.OutputBufferTimeout > 0 {
				outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
			}

			heartbeatTicker.Stop()
			heartbeatChan = nil
			if identifyData.HeartbeatInterval > 0 {
				heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
				heartbeatChan = heartbeatTicker.C
			}

			if identifyData.SampleRate > 0 {
				sampleRate = identifyData.SampleRate
			}

			msgTimeout = identifyData.MsgTimeout
		case <-heartbeatChan:
			err = p.Send(client, frameTypeResponse, heartbeatBytes)
			if err != nil {
				goto exit
			}
		case b := <-backendMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}

			msg, err := decodeMessage(b)
			if err != nil {
				p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}

exit:
	p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
	heartbeatTicker.Stop()
	outputBufferTicker.Stop()
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
	}
}

程序入口main

这篇进入nsqd程序的源码分析,废话不多说直接看到apps/nsqd/main.go这个程序入口文件。这里用了go-svc库,其实就是对系统信号通知进行了封装,定义回调接口的方式将代码规范化。

// 实现了svc.Service接口
type program struct {
	once sync.Once
	nsqd *nsqd.NSQD
}

func main() {
	prg := &program{}
    // 运行
	if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
		logFatal("%s", err)
	}
}

Run方法很简单:

func Run(service Service, sig ...os.Signal) error {
	env := environment{}
    // 回调Service.Init方法
	if err := service.Init(env); err != nil {
		return err
	}

    // 回调service.Start方法
	if err := service.Start(); err != nil {
		return err
	}

	if len(sig) == 0 {
		sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM}
	}

    // 监听信号
	signalChan := make(chan os.Signal, 1)
	signalNotify(signalChan, sig...)

	var ctx context.Context
	if s, ok := service.(Context); ok {
		ctx = s.Context()
	} else {
		ctx = context.Background()
	}

	for {
		select {
		case s := <-signalChan:
            // 信号发生
			if h, ok := service.(Handler); ok {
                // 如果还实现了处理信号的接口,回调Service.Handle
				if err := h.Handle(s); err == ErrStop {
					goto stop
				}
			} else {
                // 否则默认为退出
				goto stop
			}
		case <-ctx.Done():
			goto stop
		}
	}

stop:
	return service.Stop()
}

所以在Start函数中应该再起一个协程去处理主事务,而主协程用来监听信号的发生,并且做优雅退出之类的处理。整个main函数就干了三件事,比较简单就不再贴代码:

  1. 解析命令行参数并初始化nsqd
  2. 加载本地的元数据并启动nsqd
  3. 出错或停止时调用nsqd.Exit优雅退出

初始化nsqd.New

主要做了如下工作:

  1. 初始化了一些配置参数
  2. 初始化与nsqlookupd通信的client,以在运行时感知集群的元数据
  3. 监听4150 tcp端口
  4. 监听4151 http端口

启动nsqd.Main

上面的“启动nsqd”就是运行nsqd.Main方法:

func (n *NSQD) Main() error {
	exitCh := make(chan error)
	var once sync.Once
	exitFunc := func(err error) {
		once.Do(func() {
            // 通知退出Main方法
			if err != nil {
				n.logf(LOG_FATAL, "%s", err)
			}
			exitCh <- err
		})
	}

    // 启动TPCServer
	n.waitGroup.Wrap(func() {
		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
	})
	if n.httpListener != nil {
        // 启动httpServer
		httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
		n.waitGroup.Wrap(func() {
			exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
		})
	}
    
    // https
    ...

    // 
	n.waitGroup.Wrap(n.queueScanLoop)
    // 
	n.waitGroup.Wrap(n.lookupLoop)
    // 
	if n.getOpts().StatsdAddress != "" {
		n.waitGroup.Wrap(n.statsdLoop)
	}

	err := <-exitCh
	return err
}

TPC和HTTP都同时监听,不过干的都是一样的事情,前者是基于TCP,使用自己定义的包格式来传输,后者则是基于HTTP包格式进行传输。默认分别监听4150和4151端口。

协议场景支持的命令示例
TCP持续连接、高频通信、大规模消息流处理PUB, SUB, RDY, FIN
HTTP临时操作、调试、运维工具、REST API 调用/pub, /stats, /ping

TCP

使用无限循环与每个客户端建立连接,开启协程单独处理每个连接:

func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
	var wg sync.WaitGroup

	for {
        // 建立连接
		clientConn, err := listener.Accept()
		...

		wg.Add(1)
        // 开启协程处理连接
		go func() {
			handler.Handle(clientConn)
			wg.Done()
		}()
	}

	wg.Wait()
    ...
}

每个连接建立的时候,开始的4字节为协议约定好的魔数“ V2”,如果不是这个魔数则返回协议错误

func (p *tcpServer) Handle(conn net.Conn) {
    // 读取魔数,作为协议的序言
	buf := make([]byte, 4)
	_, err := io.ReadFull(conn, buf)
    
	protocolMagic := string(buf)

	var prot protocol.Protocol
	switch protocolMagic {
	case "  V2":
		prot = &protocolV2{nsqd: p.nsqd}
	default:
        // 魔数不是预期值,直接返回协议错误
		...
	}

	client := prot.NewClient(conn)
	p.conns.Store(conn.RemoteAddr(), client)

    // IOLoop循环读取并处理客户端的请求
	err = prot.IOLoop(client)
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "client(%s) - %s", conn.RemoteAddr(), err)
	}

	p.conns.Delete(conn.RemoteAddr())
	client.Close()
}

确定是这个协议,再接着进入IOLoop读取客户端的请求并执行请求:

func (p *protocolV2) IOLoop(c protocol.Client) error {
	var err error
	var line []byte
	var zeroTime time.Time

	client := c.(*clientV2)

	messagePumpStartedChan := make(chan bool)
	go p.messagePump(client, messagePumpStartedChan)
	<-messagePumpStartedChan

	for {
        // 读取一行客户端请求
		line, err = client.Reader.ReadSlice('\n')
		...
        
		params := bytes.Split(line, separatorBytes)

		var response []byte
        // 执行请求
		response, err = p.Exec(client, params)
        ...
        
		if response != nil {
            // 返回结果给客户端
			err = p.Send(client, frameTypeResponse, response)
            ...
		}
	}

	p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
	close(client.ExitChan)
	if client.Channel != nil {
		client.Channel.RemoveClient(client.ID)
	}

	return err
}

HTTP

HTTP使用了httprouter这个库来进行路由

func newHTTPServer(nsqd *NSQD, tlsEnabled bool, tlsRequired bool) *httpServer {
	log := http_api.Log(nsqd.logf)

	router := httprouter.New()
	router.HandleMethodNotAllowed = true
	router.PanicHandler = http_api.LogPanicHandler(nsqd.logf)
	router.NotFound = http_api.LogNotFoundHandler(nsqd.logf)
	router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(nsqd.logf)
	s := &httpServer{
		nsqd:        nsqd,
		tlsEnabled:  tlsEnabled,
		tlsRequired: tlsRequired,
		router:      router,
	}

	router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
	router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))

	// v1 negotiate
	router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
	...

	// only v1
	router.Handle("POST", "/topic/create", http_api.Decorate(s.doCreateTopic, log, http_api.V1))
	...

	// debug
	router.HandlerFunc("GET", "/debug/pprof/", pprof.Index)
	...

	return s
}

对于每个路由的Handler还加了一个http_api.Decorate:

type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error)
type Decorator func(APIHandler) APIHandler

func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
	decorated := f
    // 组装成调用链
	for _, decorate := range ds {
		decorated = decorate(decorated)
	}
	return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
		decorated(w, req, ps)
	}
}

Decorate的作用是给Handler增加中间件,比如log中间件:

func Log(logf lg.AppLogFunc) Decorator {
	return func(f APIHandler) APIHandler {
		return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
            // 记录调用前的时间
			start := time.Now()
            // 调用
			response, err := f(w, req, ps)
            // 记录调用后的时间
			elapsed := time.Since(start)
			status := 200
			if e, ok := err.(Err); ok {
				status = e.Code
			}
            // 打印调用耗时
			logf(lg.INFO, "%d %s %s (%s) %s",
				status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed)
			return response, err
		}
	}
}

queueScanLoop

这个函数用时间最小堆来管理延时消息和等待确认的消息。这个函数管理了一个协程池,里面的协程叫做queueScanWorker,他们并发地处理所有channel。

因为channel可能会空闲而没有消息,那么这些worker就应该睡眠等待有消息再起来工作。nsq可能考虑到实时唤醒这些worker比较复杂,于是参考了redis的概率过期算法(probabilistic expiration algorithm),采用轮询的方式去处理到达的消息:每隔一段时间(默认100ms)唤醒worker让他们从本地缓存中随机选择一部分channels(默认20个),发现有消息到达的channel被标记为dirty并处理。每轮如果dirty的channel占总选择的channel的一定比例以上(默认25%),那么继续重复上面的操作,否则睡眠等待下一轮唤醒。

func (n *NSQD) queueScanLoop() {
	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
	closeCh := make(chan int)

	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)    
	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

	channels := n.channels()
    // 调整worker池大小
	n.resizePool(len(channels), workCh, responseCh, closeCh)

	for {
		select {
		case <-workTicker.C:
            // 每隔一段时间唤醒worker
			if len(channels) == 0 {
				continue
			}
		case <-refreshTicker.C:
            // 每隔一段时间刷新channel数量,以调整worker数量
			channels = n.channels()
			n.resizePool(len(channels), workCh, responseCh, closeCh)
			continue
		case <-n.exitChan:
            // nsqd退出
			goto exit
		}

		num := n.getOpts().QueueScanSelectionCount
		if num > len(channels) {
			num = len(channels)
		}

	loop:
        // 随机选取一部分的channels,将它们发送给workers进行处理
		for _, i := range util.UniqRands(num, len(channels)) {
			workCh <- channels[i]
		}

        // 统计标记为dirty的channel数
		numDirty := 0
		for i := 0; i < num; i++ {
			if <-responseCh {
				numDirty++
			}
		}

        // 如果dirty数到达一定量级,继续选取,不用睡眠
		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
			goto loop
		}
	}

exit:
	...
}

选取一部分channel的时候是随机的,这里用了一个从n个元素中随机选择m个的算法技巧(m<=n):第 i 个元素与后面随机一个元素进行交换,不断重复上述步骤,就得到实现了这样的效果。

而每个worker的工作是从最小堆中获取消息,然后保存到内存或者磁盘队列中,等待消费:

func (c *Channel) put(m *Message) error {
	select {
    // 保存到内存
	case c.memoryMsgChan <- m:
	default:
    // 保存到磁盘
		err := writeMessageToBackend(m, c.backend)
		c.nsqd.SetHealth(err)
		if err != nil {
			c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
				c.name, err)
			return err
		}
	}
	return nil
}

消息接收

上一节分析了nsq怎么处理已经接收到的消息的:基于时间最小堆,将消息保存到内存或者磁盘中等待下一步的处理。我们再回过头来看一下nsq怎么接收消息的

在demo中,我们用http接口进行消息的发送:

curl -d "test message" "http://127.0.0.1:4151/pub?topic=test_topic"

对应的handler代码为:

func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
	// TODO: one day I'd really like to just error on chunked requests
	// to be able to fail "too big" requests before we even read

	if req.ContentLength > s.nsqd.getOpts().MaxMsgSize {
		return nil, http_api.Err{413, "MSG_TOO_BIG"}
	}

	// add 1 so that it's greater than our max when we test for it
	// (LimitReader returns a "fake" EOF)
	readMax := s.nsqd.getOpts().MaxMsgSize + 1
	body, err := io.ReadAll(io.LimitReader(req.Body, readMax))
	if err != nil {
		return nil, http_api.Err{500, "INTERNAL_ERROR"}
	}
	if int64(len(body)) == readMax {
		return nil, http_api.Err{413, "MSG_TOO_BIG"}
	}
	if len(body) == 0 {
		return nil, http_api.Err{400, "MSG_EMPTY"}
	}

	reqParams, topic, err := s.getTopicFromQuery(req)
	if err != nil {
		return nil, err
	}

	var deferred time.Duration
	if ds, ok := reqParams["defer"]; ok {
		var di int64
		di, err = strconv.ParseInt(ds[0], 10, 64)
		if err != nil {
			return nil, http_api.Err{400, "INVALID_DEFER"}
		}
		deferred = time.Duration(di) * time.Millisecond
		if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
			return nil, http_api.Err{400, "INVALID_DEFER"}
		}
	}

	msg := NewMessage(topic.GenerateID(), body)
	msg.deferred = deferred
	err = topic.PutMessage(msg)
	if err != nil {
		return nil, http_api.Err{503, "EXITING"}
	}

	return "OK", nil
}

。其实是在IOLoop中,开启的messagePump协程中接收消息。

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
	var err error
	var memoryMsgChan chan *Message
	var backendMsgChan <-chan []byte
	var subChannel *Channel
	// NOTE: `flusherChan` is used to bound message latency for
	// the pathological case of a channel on a low volume topic
	// with >1 clients having >1 RDY counts
	var flusherChan <-chan time.Time
	var sampleRate int32

	subEventChan := client.SubEventChan
	identifyEventChan := client.IdentifyEventChan
	outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
	heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
	heartbeatChan := heartbeatTicker.C
	msgTimeout := client.MsgTimeout

	// v2 opportunistically buffers data to clients to reduce write system calls
	// we force flush in two cases:
	//    1. when the client is not ready to receive messages
	//    2. we're buffered and the channel has nothing left to send us
	//       (ie. we would block in this loop anyway)
	//
	flushed := true

	// signal to the goroutine that started the messagePump
	// that we've started up
	close(startedChan)

	for {
		if subChannel == nil || !client.IsReadyForMessages() {
			// the client is not ready to receive messages...
			memoryMsgChan = nil
			backendMsgChan = nil
			flusherChan = nil
			// force flush
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		} else if flushed {
			// last iteration we flushed...
			// do not select on the flusher ticker channel
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = nil
		} else {
			// we're buffered (if there isn't any more data we should flush)...
			// select on the flusher ticker channel, too
			memoryMsgChan = subChannel.memoryMsgChan
			backendMsgChan = subChannel.backend.ReadChan()
			flusherChan = outputBufferTicker.C
		}

		select {
		case <-flusherChan:
			// if this case wins, we're either starved
			// or we won the race between other channels...
			// in either case, force flush
			client.writeLock.Lock()
			err = client.Flush()
			client.writeLock.Unlock()
			if err != nil {
				goto exit
			}
			flushed = true
		case <-client.ReadyStateChan:
		case subChannel = <-subEventChan:
			// you can't SUB anymore
			subEventChan = nil
		case identifyData := <-identifyEventChan:
			// you can't IDENTIFY anymore
			identifyEventChan = nil

			outputBufferTicker.Stop()
			if identifyData.OutputBufferTimeout > 0 {
				outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
			}

			heartbeatTicker.Stop()
			heartbeatChan = nil
			if identifyData.HeartbeatInterval > 0 {
				heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
				heartbeatChan = heartbeatTicker.C
			}

			if identifyData.SampleRate > 0 {
				sampleRate = identifyData.SampleRate
			}

			msgTimeout = identifyData.MsgTimeout
		case <-heartbeatChan:
			err = p.Send(client, frameTypeResponse, heartbeatBytes)
			if err != nil {
				goto exit
			}
		case b := <-backendMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}

			msg, err := decodeMessage(b)
			if err != nil {
				p.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case msg := <-memoryMsgChan:
			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
				continue
			}
			msg.Attempts++

			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
			client.SendingMessage()
			err = p.SendMessage(client, msg)
			if err != nil {
				goto exit
			}
			flushed = false
		case <-client.ExitChan:
			goto exit
		}
	}

exit:
	p.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
	heartbeatTicker.Stop()
	outputBufferTicker.Stop()
	if err != nil {
		p.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
	}
}

lookupLoop