I've been writing some Go code recently, for processing reasonably medium amounts (millions of data points) of time-series data. The data processing involves aggregating multiple time series by combining similar time periods and then performing simple operations across sets.
Reading the pipelines post on the golang blog demonstrated how channels can be used for some really neat looking (and potentially parallel) code, with very little work.
Without worrying too much about the implementations of Interpolate(), RollingAverage() and GetAllData(), I ended up with some code that followed this format:
type Value struct {
Timestamp time.Time
Value float64
}
// Create the processing pipeline
input := make(chan *Value)
steadyData := Interpolate(input, 300) // Fix the interval between points to 5 minutes
output := RollingAverage(steadyData) // Calculate rolling average
// Write all data to the pipeline
for v := range GetAllData() {
input <- v
}
// Fetch the results from the end
for v := range output {
fmt.Println("Got an output value", v.Value)
}
Interpolate(), RollingAverage() and GetAllData() all create goroutines so that the processing that they do can all be performed in parallel.
It seems relatively elegant and does make it very easy to insert other steps into the pipeline, or change order or functions. It's generally what I'd regard as pretty code.
Unfortunately, it's SLOW. Extremely slow. I ended up throwing away all the pipeline code and just passing around []*Value everywhere, taking the hit of creating and copying new slices, and the potential loss in productivity by only using a single core.
Even when the number crunching in each step is relatively complex, the performance increase by using more cores is dwarfed by the loss of using channels.
To demonstrate the performance difference, this is the code I threw together, which you can run to see for yourself:
package main
import (
"fmt"
"math/rand"
"time"
)
type Value struct {
Timestamp time.Time
Value float64
}
func averageOfChan(in chan *Value) float64 {
var sum float64
var count int
for v := range in {
sum += v.Value
count++
}
return sum / float64(count)
}
func averageOfSlice(in []*Value) float64 {
var sum float64
var count int
for _, v := range in {
sum += v.Value
count++
}
return sum / float64(count)
}
func main() {
// Create a large array of random numbers
input := make([]*Value, 1e7)
for i := 0; i < 1e7; i++ {
input[i] = &Value{time.Unix(int64(i), 0), rand.Float64()}
}
func() {
st := time.Now()
in := make(chan *Value, 1e4)
go func() {
defer close(in)
for _, v := range input {
in <- v
}
}()
averageOfChan(in)
fmt.Println("Channel version took", time.Since(st))
}()
func() {
st := time.Now()
averageOfSlice(input)
fmt.Println("Slice version took", time.Since(st))
}()
}
Running this on my home PC, I get this:
Channel version took 1.14759465s
Slice version took 24.839719ms
Yes, it's 46x faster to pass around a slice in this contrived (but representative) example. I did attempt to optimise this by changing the size of the input channel, and 1e4 is about the fastest channel size I found.
In short: channels are neat. pipelines are neat. channels are slow.
I'd be happy to hear if I'm doing something wrong or there is a better (faster) way.