BY Y!an - 2026年6月9日

从零实现一个消息队列:当一条消息需要被多个不同消费者消费

解耦!

从零实现一个消息队列:当一条消息需要被多个不同消费者消费

半年多以前写过一篇《300 行代码实现一个消息队列》,实现了一个最简单的消息队列,能够把消息事件放到异步去处理。但随着业务的发展,简单的异步处理已经不能满足需求了:一条消息需要被多个不同的消费者消费。

按照原先的消息队列设计,要实现这个需求,要么是多注册几个队列,生产者往多个队列发消息;要么是在消费者那多开几个协程去分别处理。但这两种方案都高耦合,很不优雅。

优雅的方案应该是引入一个负责转发消息的中间层,生产者只需要将消息发给这个中间层,再由中间层转发给不同的队列(消费者),这个中间层我们称为 Exchange

队列 Queue 调整

在开始介绍 Exchange 之前,我们需要对 Queue 进行调整。

先前我们定义了一个 type Q struct,它的作用是将 QueueComsumerFunc、并发数和消息序号封装在一起,现在我们可以将消费者函数和消费者并发数直接作为 Queue 的属性,并将消息序号交由 Exchange 管理。因此 Q 没有存在的必要了,同时 Queue 修改为:

type Queue struct {
	wg sync.WaitGroup

	in  chan *Msg
	out chan *Msg

	msgs []*Msg

	// 消费者函数
	consumer ConsumerFunc
	// 消费者并发数
	concurrency int
}

// 同时更新实例化函数
func NewQueue(inBufSize, outBufSize, concurrency int, consumer ConsumerFunc) *Queue {
	if inBufSize <= 0 {
		inBufSize = 1
	}
	if outBufSize <= 0 {
		outBufSize = 1
	}

	return &Queue{
		in:          make(chan *Msg, inBufSize),
		out:         make(chan *Msg, outBufSize),
		msgs:        make([]*Msg, 0, 1024),
		consumer:    consumer,
		concurrency: concurrency,
	}
}

然后再新加两个方法,以便做消费者调度:

func (q *Queue) GetConsumer() ConsumerFunc {
	return q.consumer
}

func (q *Queue) GetConcurrency() int {
	return q.concurrency
}

至此,Queue 的改造就完成了。

Exchange

Exchange 应该实现以下接口:

type ExchangeInterface interface {
	Publish(any) error   // 发布消息
	Bind(*Queue)         // 注册队列
	GetQueues() []*Queue // 获取全部队列(供 MQ 调度)
	Start()              // 启动 Exchange 的消息转发
	Close()              // 关闭 Exchange
}

具体实现如下:

import (
	"sync"
)

type Exchange struct {
	mu sync.Mutex

	// 绑定的队列列表
	queues []*Queue
	// 当前消息序号
	seq uint64
}

func NewExchange() Exchange {
	return &Exchange{}
}

func (e *Exchange) Publish(data any) error {
	e.mu.Lock()
	defer e.mu.Unlock()

	if len(e.queues) == 0 {
		return nil // 没有绑定的队列,直接丢弃消息
	}

	msg := &Msg{
		ID:   e.seq,
		Data: data,
	}
	e.seq++

	for _, q := range e.queues {
		cloned := *msg
		q.Enqueue(&cloned) // TODO: 可按需增加错误处理逻辑,例如记录失败的消息等
	}

	return nil
}

func (e *Exchange) Bind(q *Queue) {
	e.mu.Lock()
	defer e.mu.Unlock()

	e.queues = append(e.queues, q)
}

func (e *Exchange) Type() ExchangeType {
	return Fanout
}

func (e *Exchange) GetQueues() []*Queue {
	return e.queues
}

func (e *Exchange) Start() {
	for _, q := range e.queues {
		q.Start()
	}
}

func (e *Exchange) Close() {
	for _, q := range e.queues {
		q.Close()
	}
}

MQ 调整

MQ 最主要的调整就是 Q 废弃后需要将直接调度 queues 改为 exchanges

 type MQ struct {
 	wg sync.WaitGroup
 	mu sync.Mutex
 
 	running bool
 
-	queues map[string]*Q
+	exchanges map[string]*Exchange
 
 	log contract.Logger
 }

 func NewMQ(log contract.Logger) *MQ {
 	return &MQ{
-		queues: make(map[string]*Q),
-		log:    log,
+		exchanges: make(map[string]Exchange),
+		log:       log,
 	}
 }

然后修改注册队列的方法:

// 注册一个队列,自动创建对应的 Exchange
func (m *MQ) RegisterQueue(name string, typ ExchangeType, consumer ConsumerFunc, concurrency int) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		return errors.New("mq: cannot register queue while MQ is running")
	}

	if _, exists := m.exchanges[name]; !exists {
		m.exchanges[name] = NewExchange()
	}

	m.exchanges[name].Bind(NewQueue(concurrency, concurrency, concurrency, consumer))
	return nil
}

发布消息的方法也需要移除消息序号的逻辑,同时调整为通过 Exchange 去发布消息:

func (m *MQ) Publish(queueName string, data any) error {
	m.mu.Lock()
	e, exists := m.exchanges[queueName]
	m.mu.Unlock()

	if !exists {
		return errors.New("mq: queue not found")
	}

	return e.Publish(data)
}

相应的,启动消息队列和消费调度的方法也需要调整:

func (m *MQ) Start(ctx context.Context) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		return errors.New("mq: MQ is already running")
	}

	m.running = true

	for exchangeName, exchange := range m.exchanges {
		exchange.Start()
		for idx, queue := range exchange.GetQueues() {
			for workerID := 0; workerID < queue.GetConcurrency(); workerID++ {
				m.wg.Add(1)
				go m.consume(ctx, exchangeName, workerID, queue)
			}
			m.log.Infof("Started Exchange[%s].queue[%d] with concurrency %d", exchangeName, idx, queue.GetConcurrency())
		}
	}
	return nil
}

func (m *MQ) consume(ctx context.Context, queueName string, workerID int, q *Queue) {
	defer m.wg.Done()

	for {
		defer func() {
			if r := recover(); r != nil {
				m.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
			}
		}()

		select {
		case <-ctx.Done():
			m.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
			q.Close()

		case msg, ok := <-q.Dequeue():
			if !ok {
				// 队列已关闭,退出
				m.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
				return
			}
			if err := q.GetConsumer()(ctx, msg); err != nil {
				// 处理消息失败,记录日志或进行其他处理
				m.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
			}
		}
	}
}

最后将停止 MQ 的方法也做相应调整:

func (m *MQ) Stop() {
	m.mu.Lock()

	if !m.running {
		m.mu.Unlock()
		return
	}

	m.running = false

	for _, e := range m.exchanges {
		e.Close()
	}
	m.mu.Unlock()

	m.wg.Wait()
}

至此,我们的消息队列支持同一条消息给多个不同的消费者消费了。没错,细心的你可能发现了,现在这个队列跟 RabbitMQ 的 FanoutExchange 非常像😉



如果你觉得文章对你有些帮助,可以请我喝杯咖啡 ↓