Writing simple Publisher-Subscriber in Golang using Channel

Ari Nurcahya
5 min readApr 26, 2024

Channel is one of the most tremendous features in Golang, it can be used as a handy tool for doing a concurrency job when implementing a concurrency job the most difficult part is handling or syncing the communication between the concurrency process. Channel comes to solve that problem.

The channel works like a queue in Golang with the channel you can synchronize the communication between concurrent processes and wait until the channel has filled with the data

the flow of the pub-sub

In the pub-sub system, the publisher will publish the message, and the consumer will receive the message

package pubsub

import (
"bytes"
"encoding/json"
"io"
"strings"
)

type MessageHandler func() (io.Reader, error)

func SendJSON(data json.RawMessage) MessageHandler {
return func() (io.Reader, error) {
buff := bytes.Buffer{}

err := json.NewEncoder(&buff).Encode(data)

return &buff, err
}
}

func SendString(data string) MessageHandler {
return func() (io.Reader, error) {
return strings.NewReader(data), nil
}
}

Create the message handler first, the message handler is useful to send. the message to the system. the example above is the handler for sending JSON and string data through the system

After you’ve done writing the message handler then write this line of code

package pubsub

import (
"io"

"github.com/google/uuid"
)

type ConsumerHandler func(id uuid.UUID, message io.Reader) error

type Consumer struct {
handler ConsumerHandler
}

func newConsumer(handler ConsumerHandler) Consumer {
return Consumer{
handler: handler,
}
}

func (s *Consumer) dispatcher(data chan MessageHandler) {
for {
message := <-data

id := uuid.New()
msg, err := message()
if err != nil {
continue
}
if err := s.handler(id, msg); err != nil {
continue
}
}
}

In the code above, I created the constructor for the subscriber object. it has one parameter which is the handler of the subscriber.

The subscriber object only has one method, it’s a dispatcher. it’ll be used for consuming data from the publisher and sending data to the handler. after getting data from the publisher, the dispatcher will consume the data from the channel.

The dispatcher has the for-infinite loop because it’ll eventually consume the message and regularly check if the channel has been filled or not. the loop will be delayed if the channel is still empty.



type PubsubOption struct {
// maximum message stores in in-memory
MaxMessage int
// maximum workerpool
WorkerPool int
}

type PubsubFunc func(opt *PubsubOption)

func SetMaxMessage(maxMessage int) PubsubFunc {
return func(opt *PubsubOption) {
opt.MaxMessage = maxMessage
}
}

func SetWorkerPool(wp int) PubsubFunc {
return func(opt *PubsubOption) {
if wp == 0 {
wp = 1
}
opt.WorkerPool = wp
}
}

Then I created the lines of code for setting the configuration. here I’m using an Options-Pattern. it has two options.

max message is used to determine the maximum message in the channel. it also determines is the channel an unbuffered channel or a buffered channel.

worker pool determines the maximum worker pool used for consuming the message per topic.

type Pubsub struct {
maxWorkerpool int
maxMessage int
mu sync.Mutex
message map[string]chan MessageHandler
consumer map[string]Consumer
}

func NewPubsub(opts ...PubsubFunc) *Pubsub {
// default value
qopt := PubsubOption{
WorkerPool: 5,
MaxMessage: 100,
}

for _, opt := range opts {
opt(&qopt)
}

q := Pubsub{
maxWorkerpool: qopt.WorkerPool,
maxMessage: qopt.MaxMessage,
message: make(map[string]chan MessageHandler),
consumer: make(map[string]Consumer),
}

return &q
}

The message field is used for storing the message from the publisher and keeping it on the channel. the message is grouped by the topic name. so each topic has its channel

func (q *Pubsub) ConsumerRegister(topic string, handler ConsumerHandler) error {
consumer := newConsumer(handler)
q.mu.Lock()
defer q.mu.Unlock()
q.consumer[topic] = consumer
if _, exist := q.message[topic]; !exist {
q.message[topic] = make(chan MessageHandler, q.maxMessage)
}

return nil
}

The system needs the consumer registration. so before publishing the message we need to create the consumer first. the consumer's responsibilities are to create the consumer and the message channel.

the message channel has the maximum message, so it’s just trivial channel creation in Go.

func (q *Pubsub) Listen() error {
for topic, consumer := range q.consumer {
for i := 0; i < q.maxWorkerpool; i++ {
go consumer.dispatcher(q.message[topic])
}
}

return nil
}

after the system is capable of creating the consumer the next time to do is create the listener. the listener is just the for-loop for the consumer and determines the maximum of the worker pool. the listener also calls the consumer’s dispatcher method. and sends the message to the consumer through parameter

func (q *Pubsub) Publish(topic string, message MessageHandler) error {
if _, exist := q.message[topic]; !exist {
q.message[topic] = make(chan MessageHandler, q.maxMessage)
}
if _, exist := q.consumer[topic]; !exist {
return errors.New("err: publishing message to unregistered consumer")
}

q.mu.Lock()
defer q.mu.Unlock()
q.message[topic] <- message

return nil
}

and the last is the publish method. it has two parameters, first, is the topic and the last is the message. before publishing the message, it needs to validate first whether the message has already been registered or not. if the message hasn’t been registered yet then it should throw an error. because for what purpose do we send the message to the unregistered topic, it also will break the system because the message is a channel, so if the channel is full or it’s an unbuffered channel then it’ll block the publish method. because it’s the nature of the Golang channel, it will block the process if there are no receivers of the channel.

What is the benefit of this approach?
In my honest opinion, there are several benefits of using this technique

  • locking the spawning maximum of Goroutine

when you read about Golang’s Goroutine, it only takes a few kilobytes (around 2KB, I guess). but what if you’re spawning million goroutines? it’ll consume your memory.

I’m limiting the maximum of the goroutine by employing a worker pool and without this technique this can also seamlessly employ the worker pool directly instead of the pub-sub. but with a pub-sub pattern, you can create your goroutine more scalable and flexible

  • Integrate with the graceful shutdown

with the pub-sub pattern first, you lock the maximum goroutine, and then you also have the power to do something during the graceful period. When you consider your data from the channel important, you could save it to storage, such as a file system or database. and retrieve the data when your system is booting up.

  • You can create a consumer

Because you have the consumer for each goroutine, you could create the goroutine function from another layer. like the controller or something, then you could create the logic here, or you could separate the logic here. since your publisher doesn’t care about the subscriber your application will be more flexible

so implementing a pub-sub pattern in Golang using the channel can create flexible, maintainable, and scalable applications.

This is the repo: https://github.com/nurcahyaari/Go-Simple-Pubsub

--

--