Time Series - Playing Around

Why?

I wanted to play with Time Series for a role I am interviewing for, and I wanted to do it by using a lot of common constructs and building it myself. There is no way this is production ready, nor is it bulletproof in anyway. Its for fun!

You can find all the code at the github here. Some hardcoded paths exist, be forewared. Concepts

I read a bit about Time Series and knew a few things:

Fast Ingestion
    10k items a second
Queryable
    At least in concept able to query the data while ingesting
Actors
    I wanted to use actors
    I love using actor systems and rarely get to in daily development
    I thought they would be good to be a way to control where ingested data gets deposited and allow super wide scaling through a cluster of actors. Having the actors scaled by service/metric/minute should allow hundreds of thousands to exist in a cluster ingesting millions of records.

Data Layout

Data would come in with at least 4 things

Time
Service Name
Metric Name
Metric Value

Example

{ “time”: 637398493800000000, “serviceName”: “ordering”, “metric” : “calls”, “value”: 1 }

Optionally can include any number of attribute key value pairs

{ “time”: 637398493800000000, “serviceName”: “ordering”, “metric” : “calls”, “value”: 1, “attributes”: [ {“Verb”: “Get”} ] }

The record would be written to the log, with minor changes.

TimeOffset - uint32
    The time would be truncated to an offset based on uint32 compared to the start time in the file.
Value - double
Attributes - map<string,string>
    If was I going fancier I would have converted this to at least <int,string> and done a lookup, since within a slice there are probably limited field names, and kept the field names as strings. That being said this is by far the chunkiest section
RecordId - uint32
    Unique Id within the set, probably not needed, and could easily be removed.
Service Name and Metric are inherent in the file.

The Stack

dotnet core
    Framework I am most familiar with
proto.actor
    Super fast actor framework
    Originally went olreans but performance capped at around 1k messages a second
faster log
    Data structure to write records to
protobuf
    Data exchange format

The Server

Code Here

Important code below, initializing the Actor system, creating a BoundedMailbox of a million records for sanity, and start it listening on 8000. Also

var system = new ActorSystem(); var serialization = new Serialization(); var context = new RootContext(system); serialization.RegisterFileDescriptor(RecordReflection.Descriptor); // Limit our inbox to 2m entries, that seemed good to handle an inflow of 100k/s var props = Props.FromProducer(() => services.GetService()) .WithMailbox(() => BoundedMailbox.Create(2_000_000)); var remote = new Remote(system, new RemoteConfig() { Serialization = serialization, Host = “127.0.0.1”, Port = 8000, RemoteKinds = { {“record”, props}} }); await remote.StartAsync(); Console.WriteLine(“Server started”); Console.ReadLine();

The Client

Code Here

var system = new ActorSystem();
var serialization = new Serialization();
serialization.RegisterFileDescriptor(RecordReflection.Descriptor);
var remote = new Remote(system, new RemoteConfig()
{
  Serialization = serialization,
  Host = "127.0.0.1",
  Port = 0,
});
await remote.StartAsync();
var context = new RootContext(system, default);
await DoClientWork(remote, context);

Very similar to the above except not listening to a port. Now if I wanted to use the actor in a clustering environment I could, then I would use the built in support for Redis/Consul has a hash ring provider.

The record spawning looks like:

while (true) { var currentTime = DateTimeOffset.Now.ToString(“g”); var metric = “latency”;

var currentKey = $“test!{currentTime}!{metric}"; if (currentKey != lastKey) { // This can be expensive var result = await client.SpawnNamedAsync(“127.0.0.1:8000”,currentKey, “record”, TimeSpan.FromMinutes(30)); pid = result.Pid; lastKey = currentKey; }

index++; if (index % 10_000 == 0) { // Add some breathing room for the server to catch up System.Threading.Thread.Sleep(100); Console.WriteLine(index); }

var r = new Record() { Service = “test”, Time = (ulong)DateTimeOffset.Now.Ticks, Metricvalue = 10 }; r.Attributes.Add(“Verb”, verbs[random.Next(0, 3)]); context.Send(pid,r);

}

Only fancy thing above is only spawn PID if needed (when the minute changes), there is a race condition here with multiple clients, that I would solve by checking for the existence of the actor first. The Actor

The actor keeps an internal count of the record it has processed, and has mentioned above flattens out the attributes into their own entries into the log.

public async Task AddRecord(Record record) { // Received a record so increment our counter, probably not needed, but if we want to ever know the // order that entries came in, as opposed to their time _recordCount++; // We don’t need to store the whole time, just the offset from the start of the minute var offset = (long) record.Time - _recordStart.UtcTicks;

var entry = new LogEntry()
{
    Offset = (uint) offset,
    MetricValue = record.Metricvalue,
    RecordId = _recordCount,
};
foreach (var attr in record.Attributes)
{
    entry.Attributes.Add(attr.Key,attr.Value);    
}

var allBytes = entry.ToByteArray();
await _log.EnqueueAsync(allBytes);

if (_recordCount % 10_000 == 0)
{
    _log.Commit();
    Console.WriteLine($"{_recordCount} {_recordStart.Ticks}");
}

}

Every 10k messages we flush to the disk, ideally I would actually move this to a background thread to not slow down the ingestion thread.

The actor on startup also registers a callback timer so that if it doesn’t get a message in 90 seconds it will flush to disk and stop itself. Since an individual actor should only be awake for 60 seconds this should work fine.

All the records get persisted to the Faster.Log as they come in. Querying

A query comes in with several things

Start Time
End Time
Service Name
Metric
Aggregate
    Sum
    Count
Attributes Optional
    This is use to filter the results
    Is AND only and case sensitive

The query comes into the QueryGrain class where it is then sliced into minute sections between the Start and End times, then passed to individual workers (QueryMapGrain) that process each section.

Check the Key Value store for a pre-calculated value since the data is immutable, if found return early.

If there are Attributes to check it does it by counting the number of records that matched any attribute passed in, if that count >= the number of Query Attributes then we know it is a matching record.

Finally cache the results in LMDB Key Value store. Results

On my 2019 Macbook Pro, I was able to ingest around 100k records a second at peak and write them to the log until things started to tear down.

The log records generated were around 600MB for a minute slice of time, that had on average 17 million records. I generally did not run the peak 100k records during testing.

Querying the 4 minutes of data I had that was several gigs of data took around 2 minutes. Once cached it was instantaneous.

Example swagger page below:

My test with one producer yielded:

2020-11-01T22:16:00+00:00 PUT : 1,176,058 GET : 1,173,983 POST: 1,172,582 ALL : 3,522,623

What did I learn?

Had a fun time working through some of the basic concepts of how to store data and query effeciently. Once I got FasterLog working it was fantastic, though I never got FasterKV working reliably. I was thrilled with the output that the actors allowed me, learning about slicing the buckets.

Further work would be refining buckets to automatically cache common patterns. Add more query patterns aside from SUM and COUNT.

Also Faster and Proto.Actor documention is pretty rough, there is a fair amount of documentation but much of it is outdated or flat our incorrect, it is something I may spend some time correcting!

Orleans, my original actor love feels old and clunky. It has a ton of weight and the documentation is also out of date. (Though proto.actors is almost nonexistent). Cool next steps?

Pre-calculate the cache based on usage patterns. A 100 MB file takes around 4 second to read and parse, which means a long query the cold start that covers days is harsh. Since the data is immutable the common usage patterns can be pre-calculated and stored next to the raw data. This avoids storing that expensive data in memory cache.

Cloud file system providers for the data, instead of storing it locally, move the data to S3 automatically. Faster supports the idea of tiered storage.

More query options, support for a more robust query language.