Files
nmea2000/device.py

283 lines
11 KiB
Python

"""
NMEA2000-Gerät
- auf dem Bus erkannte Geräte
- für das eigene Gerät steht initUid() zur Verfügung
TODO
- logging
"""
import time
import struct
from . import lookup
def set_to_lines(values, maxlen, indent=0):
spaces = ' ' * indent
lines = []
current = ""
for v in values:
part = str(v)
emax = maxlen - indent
if not current:
current = part
elif len(current) + 1 + len(part) <= emax:
current += ',' + part
else:
lines.append(spaces + current + ',')
current = part
if current:
lines.append(spaces + current)
return lines
class Device():
def __init__(self, address):
# WIP: Felder können sich noch ändern!
self.address = address # Kann sich zur Laufzeit ändern
self.lastseen = time.time()
self.lastpinfo = None # Wann letztes Mal Productinfo erhalten?
self.lastcinfo = None # Wann letztes Mal Configurationinfo erhalten?
self.has_cinfo = True # Weitere Abfragen können verhindert werden
self.has_pgnlist = False # PGN-Listen Tx,Rx vorhanden
# Device info
self.NAME = 0 # Wird über getNAME (address claim) gefüllt, 64bit Integer
self.uniqueid = None # Z.B. aus der Geräteseriennummer abgeleitet
self.manufacturercode = 2046 # Open Boat Projects
self.instance = 0 # default 0
self.instlower = 0 # 3bit, ISO ECU Instance
self.instupper = 0 # 5bit, ISO Function Instance
self.sysinstance = 0 # used with bridged networks, default 0
self.devicefunction = None
self.deviceclass = None
# Product info
self.product = None # Product name (ModelID)
self.productcode = None # Product code (16bit unsigned)
self.serial = None # Device serial number
self.modelvers = None # Hardware Version
self.softvers = None # Current Software Version
self.n2kvers = None # NMEA2000 Network Message Database Version
self.certlevel = None # NMEA2000 Certification Level
self.loadequiv = None # NMEA2000 LEN
# Configuration info
self.instdesc1 = None
self.instdesc2 = None
self.manufinfo = None
# PGN lists, fill with functions defined below
self.pgns_transmit = set()
self.pgns_receive = set()
# Additional data
self.customname = None # User defined device name
def _ISOtime(self, epoch):
if not epoch:
return "n/a"
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(epoch))
def initUid(self):
# Initialize unique id from raspi cpu id and return 21bit value
with open("/sys/firmware/devicetree/base/serial-number", "r") as f:
hexid = f.read().rstrip('\x00')
return int(hexid, 16) & 0x001FFFFF # 21bit mask
def getNAME(self):
"""
NAME is unique on bus
Returns bytearray and sets integer NAME for later easy and fast access
"""
data = bytearray()
data.extend(struct.pack('<I', (self.uniqueid & 0x001fffff) | (self.manufacturercode << 21)))
data.append((self.instlower & 0x07) | ((self.instupper & 0x1f) << 3))
data.append(self.devicefunction)
data.append((self.deviceclass & 0x7f) << 1)
data.append(0x80 | ((self.industrygroup & 0x07) << 4) | (self.sysinstance & 0x0f))
self.NAME = struct.unpack_from('<Q', data, 0)[0]
return data
def send_address_claim(self, bus):
data = self.getNAME()
print("sending address claim for '{}' with name {} ...".format(self.address, data))
priority = 6
pgn = 60928
msg = can.Message(
arbitration_id = (((priority << 18) + pgn) << 8) + self.address,
data = data,
is_extended_id = True
);
try:
bus.send(msg)
print(f"Address claim sent to bus {bus.channel_info}")
except can.CanError:
print("Address claim NOT sent")
def claim_address(self, bus):
self.send_address_claim(bus)
pgn = None
claim_ok = False
t0 = time.monotonic()
while not time.monotonic() - t0 > 0.25:
# recv ist blocking. if timeout reached msg should be None
msg = bus.recv(0.25)
if msg:
baseid = (msg.arbitration_id & 0x3ffff00) >> 8
pgn = baseid & 0xffff00
if not pgn == 60928:
continue
dest = baseid & 0x0000ff
if dest == 0xff:
print("Claim detected for 0xff (broadcast)")
if self.NAME > struct.unpack_from('<Q', msg.data, 0)[0]:
# collision we have a lower priority:
# start over with new address
print("Address is in use, trying next one")
t0 = time.monotonic()
owndevice.address += 1
send_address_claim(bus)
else:
print("Ignore collision because of our lower NAME")
else:
print(f"Answer: DEST={dest}")
print("Data:", msg.data)
if dest == self.address:
print("We are addressed: WIP!")
else:
print("Frame ist not for us.")
else:
# timeout without answer
print("Timeout after claim. Keep selected address")
claim_ok = True
if not claim_ok and (time.monotonic() - t0 > 0.25):
print("claim seems ok after 250ms")
return claim_ok
# TODO String handling
# Spec says ASCII is only 0x20 to 0x7E. But some devices use full
# Latin-1 / ISO-8859-1 code set.
# Unicode:
# UTF-16 Little Endian
# UTF-8 eventually used in some PGNs?
def getStringFix(self, text, maxlength=32, filler=b' '):
if not text:
text = ''
if text.isascii():
# text.encode('latin1')
# text.encode('ascii') can throw errors
# text.encode('ascii', 'replace') sets question mark for unknown codes
#return b'x01' + bytes(text or '', 'ascii').ljust(maxlength, b' ')
return b'x01' + text.encode('ascii', 'ignore').ljust(maxlength, filler)
else:
# convert to UTF-16 Little Endian
return b'x00' + text.encode('utf-16-le').ljust(maxlength, filler)
def getProductInfo(self):
"""
Returns data for msg prepared to be sent out as fast packet
"""
print("Set Productinfo:", self.product)
data = bytearray()
data.extend(struct.pack('<H', self.n2kvers)) # 2 Byte
data.extend(struct.pack('<H', self.productcode)) # 2 Byte
data.append(0x01) # String
data.extend(bytes(self.product, 'latin1').ljust(32, b' '))
#data.extend(getStringFix(self.product))
data.append(0x01) # ASCII String
data.extend(bytes(self.softvers, 'latin1').ljust(32, b' '))
data.append(0x01)
data.extend(bytes(self.modelvers, 'latin1').ljust(32, b' '))
data.append(0x01)
data.extend(bytes(self.serial, 'latin1').ljust(32, b' '))
data.append(0x00) # Certification level
data.append(0x00) # Load equivalency 0=Not powered by bus
return data
def getConfigInfo(self):
"""
Configuration info
"""
print("Set Configinfo")
data = bytearray()
data.extend(self.getStringFix(self.instdesc1), 32)
data.extend(self.getStringFix(self.instdesc2), 32)
data.extend(self.getStringFix(self.manufinfo), 32)
#data.append(0x01) # ASCII String
#data.extend(bytes(self.instdesc1 or '', 'ascii').ljust(32, b' '))
#data.append(0x01)
#data.extend(bytes(self.instdesc2 or '', 'ascii').ljust(32, b' '))
#data.append(0x01)
#data.extend(bytes(self.manufinfo or '', 'ascii').ljust(32, b' '))
return data
"""
PGN lists (mandatory)
59904 ISO Request
60928 ISO Address Claim
126208 Group Function (Request / Command / Acknowledge)
126464 PGN List (Transmit / Receive)
126993 Heartbeat
126996 Product Information
126998 Configuration Information
"""
def setTxPGNs(self, pgnlist, mandatory=True):
self.pgns_transmit = set(pgnlist)
if mandatory:
self.pgns_transmit.add((60928, 126208, 126464, 126996))
def setRxPGNs(self, pgnlist, mandatory=True):
self.pgns_receive = set(pgnlist)
if mandatory:
self.pgns_receive.add((59904, 60928, 126208, 126996, 126998))
def __str__(self):
out = f"Device: {self.address} : '{self.product}'\n"
out += " NAME: {} ({})\n".format(self.getNAME(), self.NAME)
out += " last seen: {}\n".format(self._ISOtime(self.lastseen))
out += " Device info\n"
out += f" Unique ID: {self.uniqueid}\n"
out += f" Instance: {self.instance} ({self.instupper}/{self.instlower})\n"
out += f" System instance: {self.sysinstance}\n"
try:
devfnname = lookup.devicefunction[self.deviceclass][self.devicefunction]
except KeyError:
devfnname = "*key error*"
out += f" Device function: {devfnname} ({self.devicefunction})\n"
try:
devclassname = lookup.deviceclass[self.deviceclass]
except KeyError:
devclassname = "*key error*"
out += f" Device class: {devclassname} ({self.deviceclass})\n"
try:
igrpname = lookup.industrygroup[self.industrygroup]
except KeyError:
igrpname = "*key error*"
out += f" Industry group: {igrpname} ({self.industrygroup})\n"
try:
manufname = lookup.manufacturer[self.manufacturercode]
except KeyError:
manufname = "*key error*"
out += f" Manufacturer code: {manufname} ({self.manufacturercode})\n"
out += " Product info at {}\n".format(self._ISOtime(self.lastpinfo))
out += f" Product Code: {self.productcode}\n"
out += f" Product: {self.product}\n"
out += f" Serial: {self.serial}\n"
out += f" Model Version: {self.modelvers}\n"
out += f" Software Version: {self.softvers}\n"
out += f" NMEA2000 Version: {self.n2kvers}\n"
out += f" Cert-Level: {lookup.certlevel[self.certlevel]} ({self.certlevel})\n"
out += f" LEN: {self.loadequiv}\n"
out += " Configuration info at {}\n".format(self._ISOtime(self.lastcinfo))
if self.has_cinfo:
out += f" Installation description 1: {self.instdesc1}\n"
out += f" Installation description 2: {self.instdesc2}\n"
out += f" Manufacturer info: {self.manufinfo}\n"
else:
out += " not available\n"
out += " PGNs RX: \n{}\n".format('\n'.join(set_to_lines(self.pgns_receive, 72, 4)) or 'n/a')
out += " PGNs TX: \n{}\n".format('\n'.join(set_to_lines(self.pgns_transmit, 72, 4)) or 'n/a')
return out