Unlock the Power of Pub/Sub Services in Go
What is Pub/Sub?
Imagine a messaging system where publishers send messages to subscribers without knowing the specifics of any single subscriber. Meanwhile, subscribers receive messages associated with a specific topic without knowing any details about the publisher. This system provides greater network scalability and can be used in various applications, such as streaming analytics or data integration pipelines.
Implementing Pub/Sub in Go
In this guide, we’ll explore how to implement a pub/sub service in Go using Go Patterns. We’ll use in-process communication between several Goroutines over channels, leveraging concurrent programming to communicate between independently running Goroutines.
File Structure
Our file structure consists of a new package named pubsub
and a module called main.go
, where we’ll run a crypto price example.
Creating and Publishing Messages
Let’s start with a simple implementation. Each message object has multiple attributes, including the topic and message body. Subscribers have a unique identifier string and a channel of messages, where publishers push messages via the signal()
method. The Broker structure consists of all subscribers and a map of topics for subscribers to subscribe to.
type Message struct {
Topic string
Body []byte
}
type Subscriber struct {
ID string
Channel chan *Message
}
type Broker struct {
Subscribers map[string]*Subscriber
Topics map[string]map[string]bool
}
Subscribing and Unsubscribing
The Subscribe
method subscribes a given topic to a given subscriber by adding a topic to the Subscriber and creating an entry in the broker topics with a subscriber ID. The Unsubscribe
method unsubscribes a subscriber from a given topic by deleting the subscriber ID from the specific topic map and removing the topic from the list of topics for that subscriber.
func (b *Broker) Subscribe(subscriber *Subscriber, topic string) {
b.Subscribers[subscriber.ID] = subscriber
if _, ok := b.Topics[topic];!ok {
b.Topics[topic] = make(map[string]bool)
}
b.Topics[topic][subscriber.ID] = true
}
func (b *Broker) Unsubscribe(subscriber *Subscriber, topic string) {
delete(b.Topics[topic], subscriber.ID)
delete(subscriber.Topics, topic)
}
Removing Subscribers and Destruct Method
The RemoveSubscriber
method removes a given subscriber from the broker by unsubscribing the subscriber from all topics and deleting the subscriber from the main subscriber list. The Destruct
method sets the active flag to false, closing the message channel once we’re done sending.
func (b *Broker) RemoveSubscriber(subscriber *Subscriber) {
for topic := range subscriber.Topics {
b.Unsubscribe(subscriber, topic)
}
delete(b.Subscribers, subscriber.ID)
}
func (b *Broker) Destruct() {
b.Active = false
close(b.MessageChannel)
}
Final Code
Now that we’ve covered the important snippets, let’s discuss the final complete code. We have pubsub/message.go
, pubsub/subscriber.go
, and pubsub/broker.go
, where we define the message structure, subscriber, and broker methods, respectively.
Example: Crypto Price Updates
Let’s use our pub/sub service to get price updates of cryptocurrencies. The publisher publishes the price value of cryptocurrencies, and subscribers receive the price update. We’ll generate random price values for each cryptocurrency and publish them with their respective topic names.
func main() {
broker := NewBroker()
subscriber1 := &Subscriber{ID: "subscriber1"}
subscriber2 := &Subscriber{ID: "subscriber2"}
broker.Subscribe(subscriber1, "bitcoin")
broker.Subscribe(subscriber2, "ethereum")
go func() {
for {
price := rand.Intn(1000)
broker.Publish("bitcoin", []byte(fmt.Sprintf("Bitcoin price: %d", price)))
price = rand.Intn(1000)
broker.Publish("ethereum", []byte(fmt.Sprintf("Ethereum price: %d", price)))
time.Sleep(time.Second)
}
}()
go func() {
for msg := range subscriber1.Channel {
fmt.Println(string(msg.Body))
}
}()
go func() {
for msg := range subscriber2.Channel {
fmt.Println(string(msg.Body))
}
}()
time.Sleep(10 * time.Second)
broker.Destruct()
}
Output
The output will display the received messages in the console.
Go’s Approach to Concurrency
Go follows a distinctive approach to concurrency, emphasizing communication over shared memory. While Go is a pragmatic language, it’s essential to clean up resources after the job is done.