Building a Message Queue Using Redis in Go

Redis is not just a very good in-memory database, it can also double as a message queue. I want to show you our first shot at implementing a queue in Redis that can fulfill all our requirements: reliable, persistent, fast, monitorable. While still being a Gedankenexperiment the basic idea should be a very good starting point.

Although we used Go for our implementation, the concept could be transferred to any language you like.

The main reason we used Go is that this approach to build a message queue uses a native Redis client as basis and our backend is written in Go.

Criteria for a feasible message queue system

In order to be used in production at adeven we need the following features:

  • persistent (none to little data loss in case of a crash)
  • reliable, atomic (each message can only be fetched once)
  • monitorable (message rates, queue length and worker statuses need to be monitored through an API)
  • multi-message ack and reject (acknowledge multiple messages at once while rejecting them individually)

We do not need any complex broker system. Simple queues with multi publishers and multi consumers are the only thing we currently use. So if you want to have a fancy broker or exchange system this is probably not what you’re looking for.

Basic Ideas

Our starting point was the “Reliable queue” pattern described here: http://redis.io/commands/rpoplpush

The basis of our message queue system is that Redis has atomic list operations as well as blocking (waiting) versions of those. Using those commands it is possible to write a system where all messages are atomically moved between Redis lists. This way a crashing consumer cannot lose any messages if you use a “processing list” or “working queue”.

The other great feature of Redis is, that you can exactly tune the level of persistence vs. performance you need and easily adopt it to your specific needs.

So we already got the persistence and the reliability covered. Performance is usually one of the big advantages when using redis. As we’ll see later this also applies to this case.

Implementation

While the underlying idea is simply using LPUSH, BRPOPLPUSH, RPOPLPUSH and RPOP with a bunch of cleverly named lists, we added some magic with statistics counters and unique consumers.

Overview

Each queue is a LIST in Redis with multiple “publishers” LPUSHing into it. It also comes with a failed list to collect all rejected messages for later use.

Consumers each have a “working queue” where the currently unacked messages are stored. In order to make a consumer tag unique we write a “Hearbeat-Key” and check if it’s already set. See consumer.go for the implementation. This also enables us to monitor which consumers have subscribed to a queue even from another program.

Message rates are measured by INCRementing counters for each stream of messages (INPUT, WORKING, ACKED, FAILED). An observer then resets them each second to get the throughput.

Naming Conventions

We used a scheme where redismq::queue_name is the input list and every related list and key has the prefix redismq::queue_name:: to group related keys together.

All names of the lists can be found here:

queue.go

Code Excerpts

First off we give each message a package with headers that can be expanded for stuff like rejection handling.

package.go

1
2
3
4
type Package struct {
  Payload    string
  CreatedAt  time.Time
}

So the basic PUSH command is simply:

1
2
3
4
5
6
func (self *Queue) Put(payload string) error {
  p := &Package{CreatedAt: time.Now(), Payload: payload, Queue: self}
  answer := self.redisClient.LPush(self.InputName(), p.GetString())
  self.redisClient.Incr(self.InputCounterName())
  return answer.Err()
}

The Consumer is a bit more complex since it includes message rate, checking for unacked messages of the same consumer tag and so on.

consumer_commands.go

But the basic commands are easy to understand:

1
2
3
4
5
func (self *Consumer) unsafeGet() (*Package, error) {
  answer := self.GetQueue().redisClient.BRPopLPush(self.GetQueue().InputName(), self.WorkingName(), 0)
  self.GetQueue().redisClient.Incr(self.WorkingCounterName())
  return self.parseRedisAnswer(answer)
}

It’s just called unsafe because the check for unacked messages has to be done separately. So once we GET a message we can either ACK or REJECT (with or without re-queuing) it.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (self *Consumer) AckPackage(p *Package) error {
  answer := self.GetQueue().redisClient.RPop(self.WorkingName())
  self.GetQueue().redisClient.Incr(self.AckCounterName())
  return answer.Err()
}

func (self *Consumer) RequeuePackage(p *Package) error {
  answer := self.GetQueue().redisClient.RPopLPush(self.WorkingName(), self.GetQueue().InputName())
  self.GetQueue().redisClient.Incr(self.GetQueue().InputCounterName())
  return answer.Err()
}

func (self *Consumer) FailPackage(p *Package) error {
  answer := self.GetQueue().redisClient.RPopLPush(self.WorkingName(), self.GetQueue().FailedName())
  self.GetQueue().redisClient.Incr(self.GetQueue().FailedCounterName())
  return answer.Err()
}

Everything else is just to facilitate MULTI-ACK and monitoring. As I already said: you can easily rewrite this into any language you want as long as you have a Redis client at hand.

As this is only a preliminary piece of work some commands may be implemented in a slower than optimal way. The general functionality is fully tested.

Usage

First off, this software creates quite a bunch of keys in your redis (5 per queue plus 5 per consumer). So make sure to pick an otherwise empty db. You can set this up in the config.yml as we use goenv to configure everything.

A basic example is included with the code:

example/simple.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
  "fmt"
  "github.com/adeven/goenv"
  "github.com/adeven/redismq"
)

func main() {
  goenv := goenv.DefaultGoenv()
  testQueue := redismq.NewQueue(goenv, "clicks")
  for i := 0; i < 10; i++ {
      testQueue.Put("testpayload")
  }
  consumer, err := testQueue.AddConsumer("testconsumer")
  if err != nil {
      panic(err)
  }
  for i := 0; i < 10; i++ {
      p, err := consumer.Get()
      if err != nil {
          fmt.Println(err)
          continue
      }
      fmt.Println(p.CreatedAt)
      err = p.Ack()
      if err != nil {
          fmt.Println(err)
      }
  }
}

As you can see the usage is very straight forward. Just import the package, create a queue and you can start pushing into it. Reading is equally simple and does work quite well even for huge payloads greater than 10 MB.

Benchmarks

To give you a rough idea what this baby can do i conducted some simple benchmarks. Please keep in mind that these are highly dependent on the setting of your Redis and what kind of machine you run this on.

So on my 2.4GHz i7 and SSD for Redis i got following results.

2 writers and 4 consumers with standard Redis persistence:

1
2
InputRate: 8813
WorkRate:  7118

That means we can publish and read around 8k messages per second simultaneously.

I am pretty sure though that there is a lot of space for optimizations as we, so far, only focused on implementing the basic functionality.

Next Steps

In order to make this a viable option for our own production system we need to access the monitoring information via a JSON API. This will enable our Zabbix to keep an eye on the message queue system and warn us if anything looks fishy.

Therefore we’ll write a standalone monitoring server that enables access to the queue observers. In a next step a pretty little webview with control functions and stats overview could follow.

I hope you enjoyed our little idea and if you interested to contribute, just send us a pull request.

Have fun.

Update

I have written a new article that explains how we implemented some of the next steps and also made it 6 times faster.

Comments