nmea2000/hbuffer.py

301 lines
9.7 KiB
Python

"""
History Buffer
Permanent storage backed buffer for sensordata
Only supported at the moment: file system storage
Values can be 1 to 4 bytes in length
Header: 32 bytes of size
0 0x00 HB00 4 magic number
4 0x04 xxxxxxxxxxxxxxxx 16 name, space padded
20 0x14 n 1 byte size of values in buffer
21 0x15 mm 2 buffer size in count of values
23 0x17 dd 2 time step in seconds between values
25 0x19 tttt 4 unix timestamp of head
29 0x1d hh 2 head pointer
31 0x1f 0xff 1 header end sign
32 0x20 ... start of buffer data
Usage example: 7 hours of data collected every 75 seconds
def g_tick(n=1):
t = time.time()
count = 0
while True:
count += n
yield max(t + count - time.time(), 0)
hb = HistoryBuffer("test", 336, 75)
g = g_tick(hb.dt)
hb.filename = "/tmp/test.dat"
hb.begin()
while True:
time.sleep(next(g))
hb.add(measured_new_value)
hb.finish()
TODO
- Logging
- Additional backend: I2C FRAM module
- Sync to next tick after loading from storage
"""
import os
import time
import struct
class HistoryBuffer():
def __init__(self, name, size, delta_t):
"""
Buffer can have an optional name of max. 16 characters
"""
self.magic = b'HB00'
self.name = name[:16] or ''
self.bytesize = 2
self.size = size
self.dt = delta_t
self.headdate = int(time.time())
self.head = 0
self.buf = [0 for _ in range(size)]
self.filename = f"/tmp/hb{name}_{size}-{delta_t}.dat"
self.fp = None
def begin(self):
# Check if data exists and format is correct
if not os.path.exists(self.filename):
self.createfile()
else:
if not self.checkfile():
print(f"Incompatible data file: {self.filename}")
return False
# Read old data to continue processing
self.fp = open(self.filename, 'r+b')
self.headdate = int(time.time())
self.fp.seek(25)
timestamp = struct.unpack('I', self.fp.read(4))[0]
self.head = struct.unpack('H', self.fp.read(2))[0]
self.fp.seek(32)
data = self.fp.read(self.bytesize * self.size)
# Fix difference between current time and data time
missing = (self.headdate - timestamp) // self.dt
if missing > self.size:
# too old start new
self.clearfile
self.head = 0
else:
# usable data found, fix missing
self.fp.seek(32)
data = self.fp.read(self.bytesize * self.size)
i = 0
for d in range(0, self.size, self.bytesize):
if self.bytesize == 1:
self.buf[i] = data[d]
elif self.bytesize == 2:
self.buf[i] = data[d] + data[d+1] * 256
elif self.bytesize == 3:
self.buf[i] = data[d] + (data[d+1] << 8) + (data[d+2] << 16)
elif self.bytesize == 4:
self.buf[i] = data[d] + (data[d+1] << 8) + (data[d+2] << 16) + (data[d+3] << 24)
i += 1
# add empty data for missing steps
for s in range(missing):
self.add(0)
return True
def finish(self):
if not self.fp.closed:
self.fp.close()
def add(self, value):
# check if add request perhaps too early
timestamp = int(time.time())
if timestamp - self.headdate < self.dt * 0.98: # a little bit of tolerance
print("add new value too early, ignored")
return False
self.headdate = timestamp
self.buf[self.head] = value
self.updatefile(value)
self.head += 1
if self.head == self.size:
self.head = 0
return True
def get(self):
"""
Return buffer in linear sequence, newest values first
"""
for i in range(self.head -1, -1, -1):
yield self.buf[i]
for i in range(self.size - 1, self.head -1, -1):
yield self.buf[i]
def getvalue(self, delta):
"""
Return a single value dt seconds ago
delta has to be smaller than self.dt * self.size
TODO check if value is missing, perhaps allow tolerance (+/- <n>)
"""
index = self.head - abs(delta) // self.dt
if index < 0:
index = self.size - index - 1
return self.buf[index]
def getvalue3(self, delta):
"""
same as getvalue but calculate mean with two neighbor values
TODO check for missing values (=0)
"""
index = self.head - abs(delta) // self.dt
if index < 0:
index = self.size - index - 1
ixprev = index - 1
if ixprev < 0:
ixprev = self.size - 1
ixnext = index + 1
if ixnext > self.size - 1:
ixnext = 0
return round((self.buf[index] + self.buf[ixprev] + self.buf[ixnext]) / 3)
def setname(self, newname):
"""
set new name in buffer and storage backend
"""
self.name = newname[:16] or ''
self.fp.seek(4)
fp.write(self.name.ljust(16, ' ').encode())
def createfile(self):
""""
Creates new file from current buffer
"""
with open(self.filename, 'wb') as fp:
fp.write(self.magic)
fp.write(self.name.ljust(16, ' ').encode())
fp.write(struct.pack('B', self.bytesize))
fp.write(struct.pack('H', self.size))
fp.write(struct.pack('H', self.dt))
fp.write(struct.pack('I', self.headdate))
fp.write(struct.pack('H', self.head))
fp.write(b"\xff") # header end
if self.bytesize == 1:
fp.write(bytes(self.buf))
elif self.bytesize == 2:
for val in self.buf:
fp.write(struct.pack('H', val))
elif self.bytesize == 3:
for val in self.buf:
fp.write((val >> 16) & 0xff)
fp.write((val >> 8) & 0xff)
fp.write(val & 0xff)
elif self.bytesize == 4:
for val in self.buf:
fp.write(struct.pack('I', val))
return True
def checkfile(self):
"""
Check if file header matches buffer metadata
Name is not taken into account because it is optional
"""
with open(self.filename, 'rb') as fp:
header = fp.read(32)
magic = header[:4]
if not (header[:4] == self.magic):
print(f"Invalid magic: {magic}")
return False
bs = header[20]
if not (bs == self.bytesize):
print(f"Invalid bytesize: {bs}")
return False
vc = struct.unpack('H', header[21:23])[0]
if not (vc == self.size):
eprint(f"Invalid value count: {vc}")
return False
ts = struct.unpack('H', header[23:25])[0]
if not (ts == self.dt):
eprint(f"Invalid time step: {ts}")
return False
return True
def updatefile(self, value):
"""
Write value to file and update header accordingly
"""
pos = 32 + self.head * self.bytesize
self.fp.seek(25)
self.fp.write(struct.pack('IH', self.headdate, self.head + 1))
self.fp.seek(pos)
if self.bytesize == 1:
self.fp.write(struct.pack('B', value))
elif self.bytesize == 2:
self.fp.write(struct.pack('H', value))
elif self.bytesize == 3:
self.fp.write((value >> 16) & 0xff)
self.fp.write((value >> 8) & 0xff)
self.fp.write(value & 0xff)
elif self.bytesize == 4:
self.fp.write(struct.pack('I', value))
def clearfile(self):
"""
Clear data part of history file
"""
self.fp.seek(25)
self.fp.write(struct.pack('IH', int(time.time()), 0))
fp.seek(32)
for p in range(32, self.size * self.bytesize):
fp.write(0)
class History():
"""
A history can consist of different time series with different
temporal resolutions
TODO implement type (e.g. pressure humidity temp etc.) to identify data
"""
def __init__(self, basename, delta_min):
self.delta_t = delta_min # smallest unit of time in the series
self.series = dict()
self.basepath = "/tmp"
self.basename = basename
def __str__(self):
out = f"History {self.basename} (min. {self.delta_t}s) in {self.basepath}\n"
n = 0
for ser in self.series.values():
out += f" Series: {ser.name} {ser.dt}s {ser.filename}\n"
n += 1
if n == 0:
out += " No series found\n"
return out
def addSeries(self, name, size, delta_t):
"""
Check whether a series already exists and throw an error if so.
The new delta t must also be divisible by delta_min
"""
if delta_t in self.series:
raise KeyError(f"Error: delta t {delta_t} already exists")
if delta_t < self.delta_t:
raise ValueError(f"Error: delta t {delta_t} too small, minimum is {self.delta_t}")
if delta_t % self.delta_t != 0:
raise ValueError(f"Error: delta t have to be a multiple of {self.delta_t}")
hb = HistoryBuffer(name, size, delta_t)
histfilename = f"hb{self.basename}_{size}-{delta_t}.dat"
hb.filename = os.path.join(self.basepath, histfilename)
self.series[delta_t] = hb
return hb
def clear(self):
# Clear all time series buffer
self.series.clear()