今天主要讲的是NSQ topic 的代码实现,topic作为MQ的重要概念,了解它的实现,对我们理解其他MQ的topic,有很多的益处。
主要代码文件:
1.nsqd/topic.go
topic结构体
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
messageCount uint64 //消息累计条数 后期查看每个topic的状态时有用
messageBytes uint64 //消息累计字节数 后期查看每个topic的状态时有用
sync.RWMutex
name string //主题名称
channelMap map[string]*Channel //主题拥有的channel
backend BackendQueue //磁盘队列
memoryMsgChan chan *Message //内存队列
startChan chan int //topic 开始接收消息的chan
exitChan chan int //topic 结束的chann
channelUpdateChan chan int //channel 更新通知(停止,删除,新增)
waitGroup util.WaitGroupWrapper
exitFlag int32
idFactory *guidFactory //id 生成器
ephemeral bool
deleteCallback func(*Topic) //topic 删除的 回调函数
deleter sync.Once
paused int32
pauseChan chan int
ctx *context //包装了nsqd的context
}NewTopic 函数 主要做三件事,一是实例化topic, 二是开启messagePump goroutine 进行消息处理,三是通知 nsqd 有新的 topic创建,让 nsqd 上报 lookupd
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
//实例化topic
t := &Topic{
...
}
// 创建内存队列
if ctx.nsqd.getOpts().MemQueueSize > 0 {
t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
}
//ephemeral 有特殊的用途,暂时还不知道干啥?
if strings.HasSuffix(topicName, "#ephemeral") {
t.ephemeral = true
t.backend = newDummyBackendQueue()
} else {//创建磁盘队列
....
//go-diskqueue 的实例
t.backend = diskqueue.New(
....
)
}
//messagePump 主要作用是,发送msg给所有订阅了这个 topic 下的 channel。(channelMap)
t.waitGroup.Wrap(t.messagePump)
//通知 nsqd 有新的 topi c创建。
t.ctx.nsqd.Notify(t)
return t
}messagePump 函数,主要处理 topic/channel 的变动及发布消息给 channel
func (t *Topic) messagePump() {
var msg *Message //消息
var buf []byte //缓存
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan chan []byte
for {
select {
case <-t.channelUpdateChan: //channel 变动通知
continue
case <-t.pauseChan: //topic 暂停
continue
case <-t.exitChan: //topic 退出
goto exit
case <-t.startChan: //topic 开始接收消息
}
break
}
t.RLock()
//收集订阅的channel
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
// main message loop
for {
select {
case msg = <-memoryMsgChan: //内存队列
case buf = <-backendChan: //磁盘队列
msg, err = decodeMessage(buf) //解析成Message
...
case <-t.channelUpdateChan: //有变动通知,重新收集topic订阅的channel
...
continue
case <-t.pauseChan: //topic 停止通知 设置 memoryMsgChan、backendChan
....
continue
case <-t.exitChan:
goto exit
}
//将 msg 发送给所有订阅的 channel
for i, channel := range chans {
chanMsg := msg
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 { //是否需要延时发送,如果是,先将消息放入延时队列中
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
//发送 msg 到 channel
err := channel.PutMessage(chanMsg)
...
}
}
exit:
t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}PutMessage/PutMessages 函数都是将消息发送(put)到topic的队列(内存/磁盘)中,流程基本相同,都是要累计消息条数和累计消息的字节总数。
func (t *Topic) PutMessage(m *Message) error {
t.RLock()
defer t.RUnlock()
if atomic.LoadInt32(&t.exitFlag) == 1 {
return errors.New("exiting")
}
//发送消息
err := t.put(m)
if err != nil {
return err
}
atomic.AddUint64(&t.messageCount, 1) //累计消息条数
atomic.AddUint64(&t.messageBytes, uint64(len(m.Body))) //累计字节总数
return nil
}func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m: //内存队列
default: //内存不足
b := bufferPoolGet()
err := writeMessageToBackend(b, m, t.backend)
bufferPoolPut(b)
...
}
return nil
}
其他函数:
Delete/Close: Topic 退出结束
Pause/UnPause:Topic 暂停/重启
flush:将内存队列的消息,全部刷新到磁盘进行持久化(exit 操作的时候)
总结:
今天主要分析了Topic的代码实现,在这里主要需要关注的是 topic 如何接收消息(pub),又如何将消息发送给 channel(messagePump),最后需要关注的是什么时候将消息存储在内存,什么时候存储在磁盘。
下次分享:channel的代码实现
版权声明:本文为H_L_S原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。