NoSQL Zone is brought to you in partnership with:

Ayende Rahien is working for Hibernating Rhinos LTD, a Israeli based company producing developer productivity tools for OLTP applications such as NHibernate Profiler (nhprof.com), Linq to SQL Profiler(l2sprof.com), Entity Framework Profiler (efprof.com) and more. Ayende is a DZone MVB and is not an employee of DZone and has posted 461 posts at DZone. You can read more from them at their website. View Full User Profile

Voron and Time Series Data

01.13.2014
| 3179 views |
  • submit to reddit

One of the things that Voron does very well is the ability to read a lot of data fast. One of the interesting scenarios we deal with is when we want to deal with time series data.

For example, let us say that we have a bunch of sensors reporting on the temperature metrics within an area (said while the heaviest storm in 5 decades is blowing outside). Every minute, we have some data coming in. For fun, we will make the following assumptions:

  • We have do deal with late writes (a sensor sending us updates from 1 hour ago because of communication update).
  • Dates aren’t unique.
  • All queries will take into account the dates.

First, let me show you the full code for that, then we can talk about how it works:

   1: public class DateTimeSeries : IDisposable
   2: {
   3:     private readonly JsonSerializer _serializer = new JsonSerializer();
   4:     private readonly StorageEnvironment _storageEnvironment;
   5:     private long _last;
   6:     private readonly Slice _lastKey;
   7:  
   8:     public DateTimeSeries(string path)
   9:     {
  10:         _lastKey = "last-key";
  11:         _storageEnvironment = new StorageEnvironment(StorageEnvironmentOptions.ForPath(path));
  12:         using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
  13:         {
  14:             _storageEnvironment.CreateTree(tx, "data");
  15:             var read = tx.State.Root.Read(tx, _lastKey);
  16:  
  17:             _last = read != null ? read.Reader.ReadInt64() : 1;
  18:  
  19:             tx.Commit();
  20:         }
  21:     }
  22:  
  23:     public void AddRange<T>(IEnumerable<KeyValuePair<DateTime, T>> values)
  24:     {
  25:         using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.ReadWrite))
  26:         {
  27:             var data = tx.GetTree("data");
  28:             var buffer = new byte[16];
  29:             var key = new Slice(buffer);
  30:             var ms = new MemoryStream();
  31:             foreach (var kvp in values)
  32:             {
  33:                 var date = kvp.Key;
  34:                 EndianBitConverter.Big.CopyBytes(date.ToBinary(), buffer, 0);
  35:                 EndianBitConverter.Big.CopyBytes(_last++, buffer, 8);
  36:                 ms.SetLength(0);
  37:                 _serializer.Serialize(new StreamWriter(ms), kvp.Value);
  38:                 ms.Position = 0;
  39:  
  40:                 data.Add(tx, key, ms);
  41:             }
  42:  
  43:             tx.State.Root.Add(tx, _lastKey, new MemoryStream(BitConverter.GetBytes(_last)));
  44:             tx.Commit();
  45:         }
  46:     }
  47:  
  48:     public IEnumerable<T> ScanRange<T>(DateTime start, DateTime end)
  49:     {
  50:         using (var tx = _storageEnvironment.NewTransaction(TransactionFlags.Read))
  51:         {
  52:             var data = tx.GetTree("data");
  53:             var startBuffer = new byte[16];
  54:             EndianBitConverter.Big.CopyBytes(start.ToBinary(), startBuffer, 0);
  55:             var startKey = new Slice(startBuffer);
  56:  
  57:             using (var it = data.Iterate(tx))
  58:             {
  59:                 var endBuffer = new byte[16];
  60:                 EndianBitConverter.Big.CopyBytes(end.ToBinary(), endBuffer, 0);
  61:                 EndianBitConverter.Big.CopyBytes(long.MaxValue, endBuffer, 8);
  62:  
  63:                 it.MaxKey = new Slice(endBuffer);
  64:                 if (it.Seek(startKey) == false)
  65:                     yield break;
  66:                 do
  67:                 {
  68:                     var reader = it.CreateReaderForCurrent();
  69:                     using (var stream = reader.AsStream())
  70:                     {
  71:                         yield return _serializer.Deserialize<T>(new JsonTextReader(new StreamReader(stream)));
  72:                     }
  73:                 } while (it.MoveNext());
  74:             }
  75:         }
  76:             
  77:     }
  78:  
  79:     public void Dispose()
  80:     {
  81:         _storageEnvironment.Dispose();
  82:     }
  83: }

In line 14, we create the data tree, which will hold the actual time series data, and the last-key, which I’ll explain in a bit.

The AddRange method in line 23 is probably the most interesting. We create a key that is composed of the date of the entry, and an incrementing number. Note that we use big endian encoding because that allow easy byte string sorting. The implications of this sort of key is that the values are actually sorted by the date, but if we have multiple values for the same millisecond, we don’t overwrite the data. Along with adding the actual data, we record the change in the incrementing counter ,so if we need to restart, we’ll continue from where we left off.

Finally, we have the actual ScanRange method. Here we basically start from the minimum value for the start date, and set the MaxKey as the stop condition for the maximum value for the end date. And then it is just getting the values out.

Pretty simple, I think.


Published at DZone with permission of Ayende Rahien, author and DZone MVB. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)