""" History Buffer Permanent storage backed buffer for sensordata Only supported at the moment: file system storage Values can be 1 to 4 bytes in length Header: 32 bytes of size 0 0x00 HB00 4 magic number 4 0x04 xxxxxxxxxxxxxxxx 16 name, space padded 20 0x14 n 1 byte size of values in buffer 21 0x15 mm 2 buffer size in count of values 23 0x17 dd 2 time step in seconds between values 25 0x19 tttt 4 unix timestamp of head 29 0x1d hh 2 head pointer 31 0x1f 0xff 1 header end sign 32 0x20 ... start of buffer data Usage example: 7 hours of data collected every 75 seconds def g_tick(n=1): t = time.time() count = 0 while True: count += n yield max(t + count - time.time(), 0) hb = HistoryBuffer("test", 336, 75) g = g_tick(hb.dt) hb.filename = "/tmp/test.dat" hb.begin() while True: time.sleep(next(g)) hb.add(measured_new_value) hb.finish() TODO - Logging - Additional backend: I2C FRAM module - Sync to next tick after loading from storage """ import os import time import struct class HistoryBuffer(): def __init__(self, name, size, delta_t): """ Buffer can have an optional name of max. 16 characters """ self.magic = b'HB00' self.name = name[:16] or '' self.bytesize = 2 self.size = size self.dt = delta_t self.headdate = int(time.time()) self.head = 0 self.buf = [0 for _ in range(size)] self.filename = f"/tmp/hb{name}_{size}-{delta_t}.dat" self.fp = None def begin(self): # Check if data exists and format is correct if not os.path.exists(self.filename): self.createfile() else: if not self.checkfile(): print(f"Incompatible data file: {self.filename}") return False # Read old data to continue processing self.fp = open(self.filename, 'r+b') self.headdate = int(time.time()) self.fp.seek(25) timestamp = struct.unpack('I', self.fp.read(4))[0] self.head = struct.unpack('H', self.fp.read(2))[0] self.fp.seek(32) data = self.fp.read(self.bytesize * self.size) # Fix difference between current time and data time missing = (self.headdate - timestamp) // self.dt if missing > self.size: # too old start new self.clearfile self.head = 0 else: # usable data found, fix missing self.fp.seek(32) data = self.fp.read(self.bytesize * self.size) i = 0 for d in range(0, self.size, self.bytesize): if self.bytesize == 1: self.buf[i] = data[d] elif self.bytesize == 2: self.buf[i] = data[d] + data[d+1] * 256 elif self.bytesize == 3: self.buf[i] = data[d] + (data[d+1] << 8) + (data[d+2] << 16) elif self.bytesize == 4: self.buf[i] = data[d] + (data[d+1] << 8) + (data[d+2] << 16) + (data[d+3] << 24) i += 1 # add empty data for missing steps for s in range(missing): self.add(0) return True def finish(self): if not self.fp.closed: self.fp.close() def add(self, value): # check if add request perhaps too early timestamp = int(time.time()) if timestamp - self.headdate < self.dt * 0.98: # a little bit of tolerance print("add new value too early, ignored") return False self.headdate = timestamp self.buf[self.head] = value self.updatefile(value) self.head += 1 if self.head == self.size: self.head = 0 return True def get(self): """ Return buffer in linear sequence, newest values first """ for i in range(self.head -1, -1, -1): yield self.buf[i] for i in range(self.size - 1, self.head -1, -1): yield self.buf[i] def getvalue(self, delta): """ Return a single value dt seconds ago delta has to be smaller than self.dt * self.size TODO check if value is missing, perhaps allow tolerance (+/- ) """ index = self.head - abs(delta) // self.dt if index < 0: index = self.size - index - 1 return self.buf[index] def getvalue3(self, delta): """ same as getvalue but calculate mean with two neighbor values TODO check for missing values (=0) """ index = self.head - abs(delta) // self.dt if index < 0: index = self.size - index - 1 ixprev = index - 1 if ixprev < 0: ixprev = self.size - 1 ixnext = index + 1 if ixnext > self.size - 1: ixnext = 0 return round((self.buf[index] + self.buf[ixprev] + self.buf[ixnext]) / 3) def setname(self, newname): """ set new name in buffer and storage backend """ self.name = newname[:16] or '' self.fp.seek(4) fp.write(self.name.ljust(16, ' ').encode()) def createfile(self): """" Creates new file from current buffer """ with open(self.filename, 'wb') as fp: fp.write(self.magic) fp.write(self.name.ljust(16, ' ').encode()) fp.write(struct.pack('B', self.bytesize)) fp.write(struct.pack('H', self.size)) fp.write(struct.pack('H', self.dt)) fp.write(struct.pack('I', self.headdate)) fp.write(struct.pack('H', self.head)) fp.write(b"\xff") # header end if self.bytesize == 1: fp.write(bytes(self.buf)) elif self.bytesize == 2: for val in self.buf: fp.write(struct.pack('H', val)) elif self.bytesize == 3: for val in self.buf: fp.write((val >> 16) & 0xff) fp.write((val >> 8) & 0xff) fp.write(val & 0xff) elif self.bytesize == 4: for val in self.buf: fp.write(struct.pack('I', val)) return True def checkfile(self): """ Check if file header matches buffer metadata Name is not taken into account because it is optional """ with open(self.filename, 'rb') as fp: header = fp.read(32) magic = header[:4] if not (header[:4] == self.magic): print(f"Invalid magic: {magic}") return False bs = header[20] if not (bs == self.bytesize): print(f"Invalid bytesize: {bs}") return False vc = struct.unpack('H', header[21:23])[0] if not (vc == self.size): eprint(f"Invalid value count: {vc}") return False ts = struct.unpack('H', header[23:25])[0] if not (ts == self.dt): eprint(f"Invalid time step: {ts}") return False return True def updatefile(self, value): """ Write value to file and update header accordingly """ pos = 32 + self.head * self.bytesize self.fp.seek(25) self.fp.write(struct.pack('IH', self.headdate, self.head + 1)) self.fp.seek(pos) if self.bytesize == 1: self.fp.write(struct.pack('B', value)) elif self.bytesize == 2: self.fp.write(struct.pack('H', value)) elif self.bytesize == 3: self.fp.write((value >> 16) & 0xff) self.fp.write((value >> 8) & 0xff) self.fp.write(value & 0xff) elif self.bytesize == 4: self.fp.write(struct.pack('I', value)) def clearfile(self): """ Clear data part of history file """ self.fp.seek(25) self.fp.write(struct.pack('IH', int(time.time()), 0)) fp.seek(32) for p in range(32, self.size * self.bytesize): fp.write(0) class History(): """ A history can consist of different time series with different temporal resolutions TODO implement type (e.g. pressure humidity temp etc.) to identify data """ def __init__(self, basename, delta_min): self.delta_t = delta_min # smallest unit of time in the series self.series = dict() self.basepath = "/tmp" self.basename = basename def __str__(self): out = f"History {self.basename} (min. {self.delta_t}s) in {self.basepath}\n" n = 0 for ser in self.series.values(): out += f" Series: {ser.name} {ser.dt}s {ser.filename}\n" n += 1 if n == 0: out += " No series found\n" return out def addSeries(self, name, size, delta_t): """ Check whether a series already exists and throw an error if so. The new delta t must also be divisible by delta_min """ if delta_t in self.series: raise KeyError(f"Error: delta t {delta_t} already exists") if delta_t < self.delta_t: raise ValueError(f"Error: delta t {delta_t} too small, minimum is {self.delta_t}") if delta_t % self.delta_t != 0: raise ValueError(f"Error: delta t have to be a multiple of {self.delta_t}") hb = HistoryBuffer(name, size, delta_t) histfilename = f"hb{self.basename}_{size}-{delta_t}.dat" hb.filename = os.path.join(self.basepath, histfilename) self.series[delta_t] = hb return hb def clear(self): # Clear all time series buffer self.series.clear()