NSQ 源码分析之NSQD--Topic

今天主要讲的是NSQ topic 的代码实现,topic作为MQ的重要概念,了解它的实现,对我们理解其他MQ的topic,有很多的益处。

主要代码文件:

1.nsqd/topic.go

topic结构体

type Topic struct {
	// 64bit atomic vars need to be first for proper alignment on 32bit platforms
	messageCount uint64 //消息累计条数  后期查看每个topic的状态时有用
	messageBytes uint64 //消息累计字节数 后期查看每个topic的状态时有用

	sync.RWMutex

	name              string  //主题名称
	channelMap        map[string]*Channel  //主题拥有的channel
	backend           BackendQueue //磁盘队列
	memoryMsgChan     chan *Message //内存队列
	startChan         chan int  //topic 开始接收消息的chan
	exitChan          chan int //topic 结束的chann
	channelUpdateChan chan int //channel 更新通知(停止,删除,新增)
	waitGroup         util.WaitGroupWrapper
	exitFlag          int32
	idFactory         *guidFactory  //id 生成器

	ephemeral      bool
	deleteCallback func(*Topic) //topic 删除的 回调函数
	deleter        sync.Once

	paused    int32
	pauseChan chan int

	ctx *context //包装了nsqd的context
}

NewTopic 函数 主要做三件事,一是实例化topic, 二是开启messagePump goroutine 进行消息处理,三是通知 nsqd 有新的 topic创建,让 nsqd 上报 lookupd

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
	//实例化topic
    t := &Topic{
        ...
	}
	// 创建内存队列
	if ctx.nsqd.getOpts().MemQueueSize > 0 {
		t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
	}
    //ephemeral 有特殊的用途,暂时还不知道干啥?
	if strings.HasSuffix(topicName, "#ephemeral") {
		t.ephemeral = true
		t.backend = newDummyBackendQueue()
	} else {//创建磁盘队列
    	....
        //go-diskqueue 的实例
        t.backend = diskqueue.New(
            ....
		)
	}
	//messagePump 主要作用是,发送msg给所有订阅了这个 topic 下的 channel。(channelMap)
	t.waitGroup.Wrap(t.messagePump)
    //通知 nsqd 有新的 topi c创建。
	t.ctx.nsqd.Notify(t)

	return t
}

messagePump 函数,主要处理 topic/channel 的变动及发布消息给 channel

func (t *Topic) messagePump() {
	var msg *Message //消息
	var buf []byte //缓存
	var err error
	var chans []*Channel 
	var memoryMsgChan chan *Message
	var backendChan chan []byte
	for {
		select {
		case <-t.channelUpdateChan: //channel 变动通知
			continue
		case <-t.pauseChan: //topic 暂停
			continue
		case <-t.exitChan: //topic 退出
			goto exit
		case <-t.startChan: //topic 开始接收消息
		}
		break
	}
	t.RLock()
    //收集订阅的channel
	for _, c := range t.channelMap {
		chans = append(chans, c)
	}
	t.RUnlock()
	if len(chans) > 0 && !t.IsPaused() {
		memoryMsgChan = t.memoryMsgChan
		backendChan = t.backend.ReadChan()
	}

	// main message loop
	for {
		select {
		case msg = <-memoryMsgChan: //内存队列
		case buf = <-backendChan: //磁盘队列
			msg, err = decodeMessage(buf) //解析成Message
	        ...
		case <-t.channelUpdateChan: //有变动通知,重新收集topic订阅的channel
            ...
			continue
		case <-t.pauseChan: //topic 停止通知 设置 memoryMsgChan、backendChan  
	        ....
			continue
		case <-t.exitChan:
			goto exit
		}
        //将 msg 发送给所有订阅的 channel
		for i, channel := range chans {
			chanMsg := msg
			
			if i > 0 {
				chanMsg = NewMessage(msg.ID, msg.Body)
				chanMsg.Timestamp = msg.Timestamp
				chanMsg.deferred = msg.deferred
			}
			if chanMsg.deferred != 0 { //是否需要延时发送,如果是,先将消息放入延时队列中
				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
				continue
			}
            //发送 msg 到 channel
			err := channel.PutMessage(chanMsg)
		    ...
		}
	}

exit:
	t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}

PutMessage/PutMessages 函数都是将消息发送(put)到topic的队列(内存/磁盘)中,流程基本相同,都是要累计消息条数和累计消息的字节总数。

func (t *Topic) PutMessage(m *Message) error {
	t.RLock()
	defer t.RUnlock()
	if atomic.LoadInt32(&t.exitFlag) == 1 {
		return errors.New("exiting")
	}
    //发送消息
	err := t.put(m)
	if err != nil {
		return err
	}
	atomic.AddUint64(&t.messageCount, 1)  //累计消息条数
	atomic.AddUint64(&t.messageBytes, uint64(len(m.Body))) //累计字节总数
	return nil
}
func (t *Topic) put(m *Message) error {
	select {
	case t.memoryMsgChan <- m: //内存队列
	default: //内存不足
		b := bufferPoolGet()
		err := writeMessageToBackend(b, m, t.backend)
		bufferPoolPut(b)
	    ...
	}
	return nil
}

 

其他函数:

Delete/Close: Topic 退出结束

Pause/UnPause:Topic 暂停/重启

flush:将内存队列的消息,全部刷新到磁盘进行持久化(exit 操作的时候)

总结:

今天主要分析了Topic的代码实现,在这里主要需要关注的是 topic 如何接收消息(pub),又如何将消息发送给 channel(messagePump),最后需要关注的是什么时候将消息存储在内存,什么时候存储在磁盘。

下次分享:channel的代码实现


版权声明:本文为H_L_S原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。