RabbitMQ with Go
In my previous article, Implementing Domain-Driven Design with Go, I mentioned RabbitMQ in the Event section. The examples of implementation of RabbitMQ are limited in that section. It is just like I only gave skeletons of objects or classes. In this article, we are going deep into RabbitMQ with Go.
If you are new to RabbitMQ, I suggest you follow the official tutorial. You can clone https://github.com/aristorinjuang/go-rabbitmq. I already coded all sections of the official tutorial. There are many ways to install RabbitMQ, but I recommend using Docker for your local.
One of the reasons I prefer RabbitMQ is RabbitMQ Management. Once you installed RabbitMQ on your local, you can access it by visiting http://localhost:15672/ and putting guest
as your username and password. The UI is quite simple and easy to understand in my opinion.
Events are for published and listened to be handled. That is also the purpose of message brokers or RabbitMQ. For example, let's say we want to publish an event of whatever by Direct Exchange. Direct Exchange is a default exchange. It is simple and recommended.
type OrderEventPublisher interface {
Publish(OrderEvent) error
}
type orderEventPublisherRabbitMQ struct {
url string
}
func (p *orderEventPublisherRabbitMQ) Publish(orderEvent OrderEvent) error {
conn, err := amqp.Dial(p.url)
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
q, err := ch.QueueDeclare(
orderEvent.Name(), // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Body: []byte(orderEvent.ID()),
},
)
if err != nil {
return err
}
return nil
}
func NewOrderEventPublisherRabbitMQ(_url string) (OrderEventPublisher, error) {
if _url == "" {
return nil, errors.New("RabbitMQ URL cannot be empty")
}
return &orderEventPublisherRabbitMQ{_url}, nil
}
Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to durable exchanges.
DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.
There are 4 steps in the code of publishing above.
- Create a connection.
- Create a channel.
- Declare a queue with the event name as the queue name.
- Publish the event.
In steps 1 and 2, you might think it's better to use a singleton channel. Hmm, I don't think so for publishing. It's okay to create connections and channels for every publishing event. Because sometimes it can get lost if you use the singleton connection or channel. So, if the connection is lost and we can't publish the event, it's okay to return the error.
You may use event names as queue names or routing keys at the infrastructure layer. In the code above, the event or message will not disappear even after the broker restarts.
We just coded the publishing part of the event. That was quite simple. Now, we are going to create a standalone process or app that listens to events and runs event handlers.
type EventName string
type EventHandlers map[EventName][]EventHandler
type eventHandlerRabbitMQ struct {
eventHandlers EventHandlers
url string
}
func (h *eventHandlerRabbitMQ) handle(msgs <-chan amqp.Delivery, eventName EventName) {
for d := range msgs {
var (
e Event
err error
)
switch eventName {
case OrderPaidName:
if e, err = NewOrderEvent(d.Body); err != nil {
log.Println(err)
continue
}
}
for _, handler := range h.eventHandlers[eventName] {
if err = handler.Handle(e); err != nil {
log.Println(err)
}
}
}
}
func (h *eventHandlerRabbitMQ) reconnect(notify chan *amqp.Error) {
if err := <-notify; err != nil {
log.Println(err)
h.Run()
}
}
func (h *eventHandlerRabbitMQ) Run() {
conn, err := amqp.Dial(p.url)
for err != nil {
log.Println(err)
time.Sleep(time.Second)
connection, err = amqp.Dial(h.url)
}
go h.reconnect(conn.NotifyClose(make(chan *amqp.Error)))
for eventName := range h.eventHandlers {
ch, err := conn.Channel()
if err != nil {
log.Println(err)
continue
}
q, err := ch.QueueDeclare(
eventName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Println(err)
continue
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Println(err)
continue
}
go h.handle(msgs, eventName)
}
forever := make(chan bool)
<-forever
}
func NewEventHandlerRabbitMQ(eventHandlers EventHandlers, _url string) *eventHandlerRabbitMQ {
return &eventHandlerRabbitMQ{eventHandlers, _url}
}
func main() {
eventHandlers := EventHandlers{} // TODO: append event handlers here
eventHandler := NewEventHandlerRabbitMQ(eventHandlers)
eventHandler.Run()
}
The code above is for handling multiple events and each event has multiple handlers. I tried to avoid sharing a channel for multiple queues. So, it doesn't break a lot of queues if a channel has an issue.
I turned on autoAck
/ auto-ack for simplicity in that example. Ack means to acknowledge. It is used to acknowledge that we received the event, even though we haven't handled it yet. Some people prefer to use manual-ack. Let's say we got an infrastructure error while handling an event, so we can still handle the event later after we fix the infrastructure. We need to acknowledge the event manually when using manual-ack by using d.Ack(false)
for example.
*amqp.Error
is important. It can notify us if the connection gets lost then we can reconnect. If the connection gets lost, we don't need to dial it every time. Let's put a delay there, maybe 1 second.
As you can see, RabbitMQ's default exchange is sufficient for our needs. We can make 1 event have many handlers. Let's say we have a lot of event listeners, maybe because we want to make them easy to maintain or have high availability.
func (p *orderEventPublisherRabbitMQ) Publish(orderEvent OrderEvent) error {
conn, err := amqp.Dial(p.url)
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
err = ch.ExchangeDeclare(
ExchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}
err = ch.Publish(
ExchangeName, // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp.Persistent,
Body: []byte(orderEvent.ID()),
},
)
if err != nil {
return err
}
return nil
}
On publishing, we replace QueueDeclare
with ExchangeDeclare
. You can name the exchange whatever you want, maybe a domain, project, application, etc.
func (h *eventHandlerRabbitMQ) Run() {
conn, err := amqp.Dial(p.url)
for err != nil {
log.Println(err)
time.Sleep(time.Second)
connection, err = amqp.Dial(h.url)
}
go h.reconnect(conn.NotifyClose(make(chan *amqp.Error)))
for eventName := range h.eventHandlers {
ch, err := conn.Channel()
if err != nil {
log.Println(err)
continue
}
err = ch.ExchangeDeclare(
ExchangeName, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Println(err)
continue
}
q, err := ch.QueueDeclare(
QueueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Println(err)
continue
}
err = ch.QueueBind(
q.Name, // queue name
eventName, // routing key
ExchangeName, // exchange
false,
nil,
)
if err != nil {
log.Println(err)
continue
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Println(err)
continue
}
go h.handle(msgs, eventName)
}
forever := make(chan bool)
<-forever
}
While listening, you can name your queue with your service name or whatever to name your listeners. Then you route your queue with QueueBind
.
Let's say you deploy event listeners on orchestrations like Kubernetes, OpenShift, etc and they have a lot of pods. With the code above, RabbitMQ is smart enough to distribute the request across pods.
I was using https://github.com/rabbitmq/amqp091-go which is also compatible with Amazon MQ.
RabbitMQ has four types of exchanges; Direct, Fanout, Topic, and Headers. Each type routes messages differently. You can read https://www.rabbitmq.com/tutorials/amqp-concepts.html for more information about them or AMQP (Advanced Message Queuing Protocol) concepts.
Related Articles
- Implementing Domain-Driven Design with Go
- Design Patterns with Go and TypeScript
- Build, Test, and Run Go Microservices with Bazel
- Monorepo vs Polyrepo, Microservices with Monorepo in Go
- Hugo vs Jekyll