RabbitMQ with Go

In my previous article, Implementing Domain-Driven Design with Go, I mentioned RabbitMQ in the Event section. The examples of implementation of RabbitMQ are limited in that section. It is just like I only gave skeletons of objects or classes. In this article, we are going deep into RabbitMQ with Go.

If you are new to RabbitMQ, I suggest you follow the official tutorial. You can clone https://github.com/aristorinjuang/go-rabbitmq. I already coded all sections of the official tutorial. There are many ways to install RabbitMQ, but I recommend using Docker for your local.

One of the reasons I prefer RabbitMQ is RabbitMQ Management. Once you installed RabbitMQ on your local, you can access it by visiting http://localhost:15672/ and putting guest as your username and password. The UI is quite simple and easy to understand in my opinion.

Events are for published and listened to be handled. That is also the purpose of message brokers or RabbitMQ. For example, let's say we want to publish an event of whatever by Direct Exchange. Direct Exchange is a default exchange. It is simple and recommended.

type OrderEventPublisher interface {
    Publish(OrderEvent) error
}

type orderEventPublisherRabbitMQ struct {
    url string
}

func (p *orderEventPublisherRabbitMQ) Publish(orderEvent OrderEvent) error {
    conn, err := amqp.Dial(p.url)
    if err != nil {
        return err
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        return err
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        orderEvent.Name(), // name
        true,              // durable
        false,             // delete when unused
        false,             // exclusive
        false,             // no-wait
        nil,               // arguments
    )
    if err != nil {
        return err
    }

    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType:  "text/plain",
            DeliveryMode: amqp.Persistent,
            Body:         []byte(orderEvent.ID()),
        },
    )
    if err != nil {
        return err
    }

    return nil
}

func NewOrderEventPublisherRabbitMQ(_url string) (OrderEventPublisher, error) {
    if _url == "" {
        return nil, errors.New("RabbitMQ URL cannot be empty")
    }
    return &orderEventPublisherRabbitMQ{_url}, nil
}

Durable and Non-Auto-Deleted queues will survive server restarts and remain when there are no remaining consumers or bindings. Persistent publishings will be restored in this queue on server restart. These queues are only able to be bound to durable exchanges.

DeliveryMode. Transient means higher throughput but messages will not be restored on broker restart. The delivery mode of publishings is unrelated to the durability of the queues they reside on. Transient messages will not be restored to durable queues, persistent messages will be restored to durable queues and lost on non-durable queues during server restart.

There are 4 steps in the code of publishing above.

  1. Create a connection.
  2. Create a channel.
  3. Declare a queue with the event name as the queue name.
  4. Publish the event.

In steps 1 and 2, you might think it's better to use a singleton channel. Hmm, I don't think so for publishing. It's okay to create connections and channels for every publishing event. Because sometimes it can get lost if you use the singleton connection or channel. So, if the connection is lost and we can't publish the event, it's okay to return the error.

You may use event names as queue names or routing keys at the infrastructure layer. In the code above, the event or message will not disappear even after the broker restarts.

We just coded the publishing part of the event. That was quite simple. Now, we are going to create a standalone process or app that listens to events and runs event handlers.

type EventName string

type EventHandlers map[EventName][]EventHandler

type eventHandlerRabbitMQ struct {
    eventHandlers EventHandlers
    url string
}

func (h *eventHandlerRabbitMQ) handle(msgs <-chan amqp.Delivery, eventName EventName) {
    for d := range msgs {
        var (
            e Event
            err error
        )

        switch eventName {
        case OrderPaidName:
            if e, err = NewOrderEvent(d.Body); err != nil {
                log.Println(err)

                continue
            }
        }

        for _, handler := range h.eventHandlers[eventName] {
            if err = handler.Handle(e); err != nil {
                log.Println(err)
            }
        }
    }
}

func (h *eventHandlerRabbitMQ) reconnect(notify chan *amqp.Error) {
    if err := <-notify; err != nil {
        log.Println(err)

        h.Run()
    }
}

func (h *eventHandlerRabbitMQ) Run() {
    conn, err := amqp.Dial(p.url)
    for err != nil {
        log.Println(err)

        time.Sleep(time.Second)

        connection, err = amqp.Dial(h.url)
    }

    go h.reconnect(conn.NotifyClose(make(chan *amqp.Error)))

    for eventName := range h.eventHandlers {
        ch, err := conn.Channel()
        if err != nil {
            log.Println(err)

            continue
        }

        q, err := ch.QueueDeclare(
            eventName, // name
            true,      // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        if err != nil {
            log.Println(err)

            continue
        }

        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        if err != nil {
            log.Println(err)

            continue
        }

        go h.handle(msgs, eventName)
    }

    forever := make(chan bool)
    <-forever
}

func NewEventHandlerRabbitMQ(eventHandlers EventHandlers, _url string) *eventHandlerRabbitMQ {
    return &eventHandlerRabbitMQ{eventHandlers, _url}
}

func main() {
    eventHandlers := EventHandlers{} // TODO: append event handlers here

    eventHandler := NewEventHandlerRabbitMQ(eventHandlers)

    eventHandler.Run()
}

The code above is for handling multiple events and each event has multiple handlers. I tried to avoid sharing a channel for multiple queues. So, it doesn't break a lot of queues if a channel has an issue.

I turned on autoAck / auto-ack for simplicity in that example. Ack means to acknowledge. It is used to acknowledge that we received the event, even though we haven't handled it yet. Some people prefer to use manual-ack. Let's say we got an infrastructure error while handling an event, so we can still handle the event later after we fix the infrastructure. We need to acknowledge the event manually when using manual-ack by using d.Ack(false) for example.

*amqp.Error is important. It can notify us if the connection gets lost then we can reconnect. If the connection gets lost, we don't need to dial it every time. Let's put a delay there, maybe 1 second.

As you can see, RabbitMQ's default exchange is sufficient for our needs. We can make 1 event have many handlers. Let's say we have a lot of event listeners, maybe because we want to make them easy to maintain or have high availability.

func (p *orderEventPublisherRabbitMQ) Publish(orderEvent OrderEvent) error {
    conn, err := amqp.Dial(p.url)
    if err != nil {
        return err
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        return err
    }
    defer ch.Close()

    err = ch.ExchangeDeclare(
        ExchangeName, // name
        "direct",     // type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )
    if err != nil {
        return err
    }

    err = ch.Publish(
        ExchangeName, // exchange
        q.Name,       // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType:  "text/plain",
            DeliveryMode: amqp.Persistent,
            Body:         []byte(orderEvent.ID()),
        },
    )
    if err != nil {
        return err
    }

    return nil
}

On publishing, we replace QueueDeclare with ExchangeDeclare. You can name the exchange whatever you want, maybe a domain, project, application, etc.

func (h *eventHandlerRabbitMQ) Run() {
    conn, err := amqp.Dial(p.url)
    for err != nil {
        log.Println(err)

        time.Sleep(time.Second)

        connection, err = amqp.Dial(h.url)
    }

    go h.reconnect(conn.NotifyClose(make(chan *amqp.Error)))

    for eventName := range h.eventHandlers {
        ch, err := conn.Channel()
        if err != nil {
            log.Println(err)

            continue
        }

        err = ch.ExchangeDeclare(
            ExchangeName, // name
            "direct",     // type
            true,         // durable
            false,        // auto-deleted
            false,        // internal
            false,        // no-wait
            nil,          // arguments
        )
        if err != nil {
            log.Println(err)

            continue
        }

        q, err := ch.QueueDeclare(
            QueueName, // name
            true,      // durable
            false,     // delete when unused
            false,     // exclusive
            false,     // no-wait
            nil,       // arguments
        )
        if err != nil {
            log.Println(err)

            continue
        }

        err = ch.QueueBind(
            q.Name,       // queue name
            eventName,    // routing key
            ExchangeName, // exchange
            false,
            nil,
        )
        if err != nil {
            log.Println(err)

            continue
        }

        msgs, err := ch.Consume(
            q.Name, // queue
            "",     // consumer
            true,   // auto-ack
            false,  // exclusive
            false,  // no-local
            false,  // no-wait
            nil,    // args
        )
        if err != nil {
            log.Println(err)

            continue
        }

        go h.handle(msgs, eventName)
    }

    forever := make(chan bool)
    <-forever
}

While listening, you can name your queue with your service name or whatever to name your listeners. Then you route your queue with QueueBind.

Let's say you deploy event listeners on orchestrations like Kubernetes, OpenShift, etc and they have a lot of pods. With the code above, RabbitMQ is smart enough to distribute the request across pods.

I was using https://github.com/rabbitmq/amqp091-go which is also compatible with Amazon MQ.

RabbitMQ has four types of exchanges; Direct, Fanout, Topic, and Headers. Each type routes messages differently. You can read https://www.rabbitmq.com/tutorials/amqp-concepts.html for more information about them or AMQP (Advanced Message Queuing Protocol) concepts.

Related Articles

Comments

comments powered by Disqus