301 lines
9.7 KiB
Python
301 lines
9.7 KiB
Python
"""
|
|
History Buffer
|
|
|
|
Permanent storage backed buffer for sensordata
|
|
Only supported at the moment: file system storage
|
|
|
|
Values can be 1 to 4 bytes in length
|
|
|
|
Header: 32 bytes of size
|
|
0 0x00 HB00 4 magic number
|
|
4 0x04 xxxxxxxxxxxxxxxx 16 name, space padded
|
|
20 0x14 n 1 byte size of values in buffer
|
|
21 0x15 mm 2 buffer size in count of values
|
|
23 0x17 dd 2 time step in seconds between values
|
|
25 0x19 tttt 4 unix timestamp of head
|
|
29 0x1d hh 2 head pointer
|
|
31 0x1f 0xff 1 header end sign
|
|
|
|
32 0x20 ... start of buffer data
|
|
|
|
|
|
Usage example: 7 hours of data collected every 75 seconds
|
|
|
|
def g_tick(n=1):
|
|
t = time.time()
|
|
count = 0
|
|
while True:
|
|
count += n
|
|
yield max(t + count - time.time(), 0)
|
|
|
|
hb = HistoryBuffer("test", 336, 75)
|
|
g = g_tick(hb.dt)
|
|
hb.filename = "/tmp/test.dat"
|
|
hb.begin()
|
|
while True:
|
|
time.sleep(next(g))
|
|
hb.add(measured_new_value)
|
|
hb.finish()
|
|
|
|
TODO
|
|
- Logging
|
|
- Additional backend: I2C FRAM module
|
|
- Sync to next tick after loading from storage
|
|
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import struct
|
|
|
|
class HistoryBuffer():
|
|
|
|
def __init__(self, name, size, delta_t):
|
|
"""
|
|
Buffer can have an optional name of max. 16 characters
|
|
"""
|
|
self.magic = b'HB00'
|
|
self.name = name[:16] or ''
|
|
self.bytesize = 2
|
|
self.size = size
|
|
self.dt = delta_t
|
|
self.headdate = int(time.time())
|
|
self.head = 0
|
|
self.buf = [0 for _ in range(size)]
|
|
self.filename = f"/tmp/hb{name}_{size}-{delta_t}.dat"
|
|
self.fp = None
|
|
|
|
def begin(self):
|
|
# Check if data exists and format is correct
|
|
if not os.path.exists(self.filename):
|
|
self.createfile()
|
|
else:
|
|
if not self.checkfile():
|
|
print(f"Incompatible data file: {self.filename}")
|
|
return False
|
|
|
|
# Read old data to continue processing
|
|
self.fp = open(self.filename, 'r+b')
|
|
self.headdate = int(time.time())
|
|
|
|
self.fp.seek(25)
|
|
timestamp = struct.unpack('I', self.fp.read(4))[0]
|
|
self.head = struct.unpack('H', self.fp.read(2))[0]
|
|
|
|
self.fp.seek(32)
|
|
data = self.fp.read(self.bytesize * self.size)
|
|
|
|
# Fix difference between current time and data time
|
|
missing = (self.headdate - timestamp) // self.dt
|
|
if missing > self.size:
|
|
# too old start new
|
|
self.clearfile
|
|
self.head = 0
|
|
else:
|
|
# usable data found, fix missing
|
|
self.fp.seek(32)
|
|
data = self.fp.read(self.bytesize * self.size)
|
|
i = 0
|
|
for d in range(0, self.size, self.bytesize):
|
|
if self.bytesize == 1:
|
|
self.buf[i] = data[d]
|
|
elif self.bytesize == 2:
|
|
self.buf[i] = data[d] + data[d+1] * 256
|
|
elif self.bytesize == 3:
|
|
self.buf[i] = data[d] + (data[d+1] << 8) + (data[d+2] << 16)
|
|
elif self.bytesize == 4:
|
|
self.buf[i] = data[d] + (data[d+1] << 8) + (data[d+2] << 16) + (data[d+3] << 24)
|
|
i += 1
|
|
# add empty data for missing steps
|
|
for s in range(missing):
|
|
self.add(0)
|
|
return True
|
|
|
|
def finish(self):
|
|
if not self.fp.closed:
|
|
self.fp.close()
|
|
|
|
def add(self, value):
|
|
# check if add request perhaps too early
|
|
timestamp = int(time.time())
|
|
if timestamp - self.headdate < self.dt * 0.98: # a little bit of tolerance
|
|
print("add new value too early, ignored")
|
|
return False
|
|
self.headdate = timestamp
|
|
self.buf[self.head] = value
|
|
self.updatefile(value)
|
|
self.head += 1
|
|
if self.head == self.size:
|
|
self.head = 0
|
|
return True
|
|
|
|
def get(self):
|
|
"""
|
|
Return buffer in linear sequence, newest values first
|
|
"""
|
|
for i in range(self.head -1, -1, -1):
|
|
yield self.buf[i]
|
|
for i in range(self.size - 1, self.head -1, -1):
|
|
yield self.buf[i]
|
|
|
|
def getvalue(self, delta):
|
|
"""
|
|
Return a single value dt seconds ago
|
|
delta has to be smaller than self.dt * self.size
|
|
TODO check if value is missing, perhaps allow tolerance (+/- <n>)
|
|
"""
|
|
index = self.head - abs(delta) // self.dt
|
|
if index < 0:
|
|
index = self.size - index - 1
|
|
return self.buf[index]
|
|
|
|
def getvalue3(self, delta):
|
|
"""
|
|
same as getvalue but calculate mean with two neighbor values
|
|
TODO check for missing values (=0)
|
|
"""
|
|
index = self.head - abs(delta) // self.dt
|
|
if index < 0:
|
|
index = self.size - index - 1
|
|
ixprev = index - 1
|
|
if ixprev < 0:
|
|
ixprev = self.size - 1
|
|
ixnext = index + 1
|
|
if ixnext > self.size - 1:
|
|
ixnext = 0
|
|
return round((self.buf[index] + self.buf[ixprev] + self.buf[ixnext]) / 3)
|
|
|
|
def setname(self, newname):
|
|
"""
|
|
set new name in buffer and storage backend
|
|
"""
|
|
self.name = newname[:16] or ''
|
|
self.fp.seek(4)
|
|
fp.write(self.name.ljust(16, ' ').encode())
|
|
|
|
def createfile(self):
|
|
""""
|
|
Creates new file from current buffer
|
|
"""
|
|
with open(self.filename, 'wb') as fp:
|
|
fp.write(self.magic)
|
|
fp.write(self.name.ljust(16, ' ').encode())
|
|
fp.write(struct.pack('B', self.bytesize))
|
|
fp.write(struct.pack('H', self.size))
|
|
fp.write(struct.pack('H', self.dt))
|
|
fp.write(struct.pack('I', self.headdate))
|
|
fp.write(struct.pack('H', self.head))
|
|
fp.write(b"\xff") # header end
|
|
if self.bytesize == 1:
|
|
fp.write(bytes(self.buf))
|
|
elif self.bytesize == 2:
|
|
for val in self.buf:
|
|
fp.write(struct.pack('H', val))
|
|
elif self.bytesize == 3:
|
|
for val in self.buf:
|
|
fp.write((val >> 16) & 0xff)
|
|
fp.write((val >> 8) & 0xff)
|
|
fp.write(val & 0xff)
|
|
elif self.bytesize == 4:
|
|
for val in self.buf:
|
|
fp.write(struct.pack('I', val))
|
|
return True
|
|
|
|
def checkfile(self):
|
|
"""
|
|
Check if file header matches buffer metadata
|
|
Name is not taken into account because it is optional
|
|
"""
|
|
with open(self.filename, 'rb') as fp:
|
|
header = fp.read(32)
|
|
magic = header[:4]
|
|
if not (header[:4] == self.magic):
|
|
print(f"Invalid magic: {magic}")
|
|
return False
|
|
bs = header[20]
|
|
if not (bs == self.bytesize):
|
|
print(f"Invalid bytesize: {bs}")
|
|
return False
|
|
vc = struct.unpack('H', header[21:23])[0]
|
|
if not (vc == self.size):
|
|
eprint(f"Invalid value count: {vc}")
|
|
return False
|
|
ts = struct.unpack('H', header[23:25])[0]
|
|
if not (ts == self.dt):
|
|
eprint(f"Invalid time step: {ts}")
|
|
return False
|
|
return True
|
|
|
|
def updatefile(self, value):
|
|
"""
|
|
Write value to file and update header accordingly
|
|
"""
|
|
pos = 32 + self.head * self.bytesize
|
|
self.fp.seek(25)
|
|
self.fp.write(struct.pack('IH', self.headdate, self.head + 1))
|
|
self.fp.seek(pos)
|
|
if self.bytesize == 1:
|
|
self.fp.write(struct.pack('B', value))
|
|
elif self.bytesize == 2:
|
|
self.fp.write(struct.pack('H', value))
|
|
elif self.bytesize == 3:
|
|
self.fp.write((value >> 16) & 0xff)
|
|
self.fp.write((value >> 8) & 0xff)
|
|
self.fp.write(value & 0xff)
|
|
elif self.bytesize == 4:
|
|
self.fp.write(struct.pack('I', value))
|
|
|
|
def clearfile(self):
|
|
"""
|
|
Clear data part of history file
|
|
"""
|
|
self.fp.seek(25)
|
|
self.fp.write(struct.pack('IH', int(time.time()), 0))
|
|
fp.seek(32)
|
|
for p in range(32, self.size * self.bytesize):
|
|
fp.write(0)
|
|
|
|
class History():
|
|
"""
|
|
A history can consist of different time series with different
|
|
temporal resolutions
|
|
|
|
TODO implement type (e.g. pressure humidity temp etc.) to identify data
|
|
"""
|
|
def __init__(self, basename, delta_min):
|
|
self.delta_t = delta_min # smallest unit of time in the series
|
|
self.series = dict()
|
|
self.basepath = "/tmp"
|
|
self.basename = basename
|
|
|
|
def __str__(self):
|
|
out = f"History {self.basename} (min. {self.delta_t}s) in {self.basepath}\n"
|
|
n = 0
|
|
for ser in self.series.values():
|
|
out += f" Series: {ser.name} {ser.dt}s {ser.filename}\n"
|
|
n += 1
|
|
if n == 0:
|
|
out += " No series found\n"
|
|
return out
|
|
|
|
def addSeries(self, name, size, delta_t):
|
|
"""
|
|
Check whether a series already exists and throw an error if so.
|
|
The new delta t must also be divisible by delta_min
|
|
"""
|
|
if delta_t in self.series:
|
|
raise KeyError(f"Error: delta t {delta_t} already exists")
|
|
if delta_t < self.delta_t:
|
|
raise ValueError(f"Error: delta t {delta_t} too small, minimum is {self.delta_t}")
|
|
if delta_t % self.delta_t != 0:
|
|
raise ValueError(f"Error: delta t have to be a multiple of {self.delta_t}")
|
|
hb = HistoryBuffer(name, size, delta_t)
|
|
histfilename = f"hb{self.basename}_{size}-{delta_t}.dat"
|
|
hb.filename = os.path.join(self.basepath, histfilename)
|
|
self.series[delta_t] = hb
|
|
return hb
|
|
|
|
def clear(self):
|
|
# Clear all time series buffer
|
|
self.series.clear()
|