BY Y!an - June 9, 2026

Evolving TinyMQ with Exchange: When a Message Needs to Be Consumed by Multiple Consumers

Decoupling!

Evolving TinyMQ with Exchange: When a Message Needs to Be Consumed by Multiple Consumers

More than six months ago, I wrote “Implementing a Message Queue in 300 Lines of Code” to build a basic message queue that handles message events asynchronously. However, as business requirements evolve, simple asynchronous processing is no longer sufficient: a single message now needs to be consumed by multiple different consumers.

Under the original design of our message queue, to meet this requirement, we would have to either register multiple queues and have the producer publish to all of them, or spawn multiple goroutines within the consumer to process the message in different ways. However, both approaches are tightly coupled and rather inelegant.

An elegant solution is to introduce a middleware layer responsible for forwarding messages. The producer only needs to publish messages to this middleware layer, which then routes them to different queues (consumers). We call this middleware layer an Exchange.

Queue Adjustments

Before introducing the Exchange, we need to refactor the Queue.

Previously, we defined a type Q struct to bundle the Queue, ConsumerFunc, concurrency level, and message sequence number together. Now, we can directly define the consumer function and consumer concurrency as attributes of the Queue, and delegate message sequence number management to the Exchange. As a result, Q is no longer necessary, and we can refactor Queue as follows:

type Queue struct {
	wg sync.WaitGroup

	in  chan *Msg
	out chan *Msg

	msgs []*Msg

	// Consumer function
	consumer ConsumerFunc
	// Consumer concurrency
	concurrency int
}

// Update the constructor function accordingly
func NewQueue(inBufSize, outBufSize, concurrency int, consumer ConsumerFunc) *Queue {
	if inBufSize <= 0 {
		inBufSize = 1
	}
	if outBufSize <= 0 {
		outBufSize = 1
	}

	return &Queue{
		in:          make(chan *Msg, inBufSize),
		out:         make(chan *Msg, outBufSize),
		msgs:        make([]*Msg, 0, 1024),
		consumer:    consumer,
		concurrency: concurrency,
	}
}

Then, we add two new methods to facilitate consumer scheduling:

func (q *Queue) GetConsumer() ConsumerFunc {
	return q.consumer
}

func (q *Queue) GetConcurrency() int {
	return q.concurrency
}

With this, the refactoring of Queue is complete.

Exchange

The Exchange should implement the following interface:

type ExchangeInterface interface {
	Publish(any) error   // Publish a message
	Bind(*Queue)         // Bind a queue
	GetQueues() []*Queue // Get all queues (for MQ scheduling)
	Start()              // Start message routing for the Exchange
	Close()              // Close the Exchange
}

The concrete implementation is as follows:

import (
	"sync"
)

type Exchange struct {
	mu sync.Mutex

	// List of bound queues
	queues []*Queue
	// Current message sequence number
	seq uint64
}

func NewExchange() Exchange {
	return &Exchange{}
}

func (e *Exchange) Publish(data any) error {
	e.mu.Lock()
	defer e.mu.Unlock()

	if len(e.queues) == 0 {
		return nil // No bound queues, discard the message directly
	}

	msg := &Msg{
		ID:   e.seq,
		Data: data,
	}
	e.seq++

	for _, q := range e.queues {
		cloned := *msg
		q.Enqueue(&cloned) // TODO: Add error handling logic as needed (e.g., logging failed messages)
	}

	return nil
}

func (e *Exchange) Bind(q *Queue) {
	e.mu.Lock()
	defer e.mu.Unlock()

	e.queues = append(e.queues, q)
}

func (e *Exchange) Type() ExchangeType {
	return Fanout
}

func (e *Exchange) GetQueues() []*Queue {
	return e.queues
}

func (e *Exchange) Start() {
	for _, q := range e.queues {
		q.Start()
	}
}

func (e *Exchange) Close() {
	for _, q := range e.queues {
		q.Close()
	}
}

MQ Adjustments

The primary adjustment for the MQ is that, with Q deprecated, we now route messages to exchanges instead of scheduling queues directly:

 type MQ struct {
 	wg sync.WaitGroup
 	mu sync.Mutex
 
 	running bool
 
-	queues map[string]*Q
+	exchanges map[string]*Exchange
 
 	log contract.Logger
 }

 func NewMQ(log contract.Logger) *MQ {
 	return &MQ{
-		queues: make(map[string]*Q),
-		log:    log,
+		exchanges: make(map[string]Exchange),
+		log:       log,
 	}
 }

Next, modify the queue registration method:

// Register a queue, automatically creating the corresponding Exchange
func (m *MQ) RegisterQueue(name string, typ ExchangeType, consumer ConsumerFunc, concurrency int) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		return errors.New("mq: cannot register queue while MQ is running")
	}

	if _, exists := m.exchanges[name]; !exists {
		m.exchanges[name] = NewExchange()
	}

	m.exchanges[name].Bind(NewQueue(concurrency, concurrency, concurrency, consumer))
	return nil
}

The message publishing method also needs to have its sequence number logic removed, and be adjusted to publish messages through the Exchange instead:

func (m *MQ) Publish(queueName string, data any) error {
	m.mu.Lock()
	e, exists := m.exchanges[queueName]
	m.mu.Unlock()

	if !exists {
		return errors.New("mq: queue not found")
	}

	return e.Publish(data)
}

Accordingly, the methods for starting the message queue and consumer scheduling need to be updated:

func (m *MQ) Start(ctx context.Context) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		return errors.New("mq: MQ is already running")
	}

	m.running = true

	for exchangeName, exchange := range m.exchanges {
		exchange.Start()
		for idx, queue := range exchange.GetQueues() {
			for workerID := 0; workerID < queue.GetConcurrency(); workerID++ {
				m.wg.Add(1)
				go m.consume(ctx, exchangeName, workerID, queue)
			}
			m.log.Infof("Started Exchange[%s].queue[%d] with concurrency %d", exchangeName, idx, queue.GetConcurrency())
		}
	}
	return nil
}

func (m *MQ) consume(ctx context.Context, queueName string, workerID int, q *Queue) {
	defer m.wg.Done()

	for {
		defer func() {
			if r := recover(); r != nil {
				m.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
			}
		}()

		select {
		case <-ctx.Done():
			m.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
			q.Close()

		case msg, ok := <-q.Dequeue():
			if !ok {
				// Queue closed, exit
				m.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
				return
			}
			if err := q.GetConsumer()(ctx, msg); err != nil {
				// Failed to process message, log it or handle otherwise
				m.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
			}
		}
	}
}

Finally, adjust the method for stopping the MQ accordingly:

func (m *MQ) Stop() {
	m.mu.Lock()

	if !m.running {
		m.mu.Unlock()
		return
	}

	m.running = false

	for _, e := range m.exchanges {
		e.Close()
	}
	m.mu.Unlock()

	m.wg.Wait()
}

With this, our message queue now supports consuming the same message across multiple different consumers. As you might have noticed, this design is very similar to RabbitMQ’s FanoutExchange 😉



If you found this article helpful, you can buy me a coffee ↓