Close
Glad You're Ready. Let's Get Started!

Let us know how we can contact you.

Thank you!

We'll respond shortly.

LABS
A channel-based ring buffer in Go

After Jared’s excellent introduction to Go concurrency and his look at patterns of Go channel usage I would like to share a channel-based ring buffer in Go based on channels that we developed for the Loggregator Server in CloudFoundry (CF).

CloudFoundry’s Loggregator Server

The goal of Loggregator is to allow application developers to tail the logs of their applications when these are running on CF. The central component of this is the Loggregator server which routes incoming messages. One of the key requirements for this server is that all developers get their logs fairly and that a malicious developer can not cause message loss for other developers by writing very fast loggers or really slow log consumers.

The following drawing shows the basic mechanism of message distribution (every sprocket is a goroutine). Messages come into the system on the left and are processed by the main processing loop which determines whether a message ids match and should thus be forwarded to a particular consumer. Every consumer forwarder has an internal incoming queue, which it takes messages out of to forward to the external consumer.

Loggregator message processing

Congestion in a naive implementation

If a consumer, say consumer 1, slows down it is going to fill up its incoming channel over time. When it is full that channel will block the main message processing loop. A buffered channel will cause the same problem when the buffer runs full.

Loggregator message processing (3)

A channel-based ring buffer solution

Channels and goroutines to the rescue!

The idea is simple: Connect two buffered channels through one goroutine that forwards messages from the incoming channel to the outgoing channel. Whenever a new message can not be placed on on the outgoing channel, take one message out of the outgoing channel (that is the oldest message in the buffer), drop it, and place the new message in the newly freed up outgoing channel.

In the following code snippet we are use int as our messages.

package main

import "fmt"

type RingBuffer struct {
    inputChannel  <-chan int
    outputChannel chan int
}

func NewRingBuffer(inputChannel <-chan int, outputChannel chan int) *RingBuffer {
    return &RingBuffer{inputChannel, outputChannel}
}

func (r *RingBuffer) Run() {
    for v := range r.inputChannel {
        select {
        case r.outputChannel <- v:
        default:
            <-r.outputChannel
            r.outputChannel <- v
        }
    }
    close(r.outputChannel)
}
func main() {
    in := make(chan int)
    out := make(chan int, 5)
    rb := NewRingBuffer(in, out)
    go rb.Run()

    for i := 0; i < 10; i++ {
        in <- i
    }

    close(in)

    for res := range out {
        fmt.Println(res)
    }
}
//Prints:
//4
//5
//6
//7
//8
//9
//
//Program exited.

Plugging in this “channel struct” will never block and will simply behave like a ring buffer. That is, slower consumers might loose (their oldest) messages, but will never be able to block the main message processing loop.

Loggregator message processing (4)

Here is this idea at work in the Loggregator server source code.

Other solutions

A few packages are available that implement ring buffers in a more classic way by using slices and moving pointers: e.g., container/ring and gringo.

The problem with these implementations is that they need locking to be used concurrently. In the case of container/ring proper locking needs to be ensured by the user of the package. In the case of gringo you will see extensive locking throughout the package when looking at the source code.

Comments
  1. Jesse Zhang says:

    Nice post on the ring buffer. I had a similar (and slightly simpler) solution in mind, which is just dropping the new incoming messages whenever there is a slow consumer.
    It is not precisely the ring buffer semantic, but on a higher level solves the same problem: not blocking the producer (main loop). It also avoid the (theoretic) problem of doing a potentially blocking read on the output channel: there is a small window between:
    – when we detected that the output channel was blocked; and
    – when we attempt to unblock it.
    Depending on the implementation of the goroutine scheduler, the output channel could have become empty before we read it, therefore blocking the middle “unclogger” goroutine

    Jesse

  2. Lukas says:

    AFAIKS there is a race condition in the example that will cause a deadlock. When sending to outputChannel is not possible, the default case tries to drain one value from it by receiving from it (“<-r.outputChannel").
    Notice however that another goroutine may receive the blocking value from outputChannel before the default case get's executed. The RingBuffer{} now waits forever on the "<-r.outputChannel" line.

    One solution might be to wrap the receiving operation in another select statement with an empty default case (so it always proceeds).

    • Stephan Hagemann says:

      Yes, I believe, as Jesse was noting, this could happen if the select picks the unclogger branch and before the unclogger gets to take the value off the channel the channel gets emptied externally. I don’t think we ever observed this in production. If I remember correctly, the length of the buffer was 20 by default. As such, the buffer would have to be full at 20 items, pick the unclogger branch, have the entire channel drained before the unclogger does anything. Now, the routine would be locked.

Post a Comment

Your Information (Name required. Email address will not be displayed with comment.)

* Copy This Password *

* Type Or Paste Password Here *