OBP60v/obp60v.py

714 lines
25 KiB
Python
Executable File

#!/usr/bin/env python
"""
Virtuelles Multifunktionsgerät OBP60v
Zwei Displayvarianten
1. Fliegendes OBP60 auf großem Bildschirm. 400x300 Display
2. Fullscreen Display skaliert mit 1,6 ergibt 640x480
mit Platz für große Touch-Flächen am rechten Rand
Das Gerät kann Daten von NMEA2000 und NMEA0183 empfangen, sowie von
einem lokal angeschlossenen GPS-Empfänger.
NMEA2000
deviceclass: 120 - Display
devicefunction: 130 - Display
Benötigte Pakete:
python3-cairo python3-gi python3-gi-cairo gir1.2-rsvg-2.0
python3-astral
Um transparente Darstellung unter Openbox zu erhalten muß xcompmgr
installiert und konfiguriert werden.
Wenn ein lokaler GPS-Empfänger angeschlossen ist, so kann dieser genutzt
werden. Zusätzlich benötigte Pakete:
python-serial python3-nmea2
Für Unterstützung des BME280-Sensors sind die Bibliotheken smbus2 und
bme280 erforderlich.
Routen und Wegepunkte können von OpenCPN empfangen werden. Dazu muß eine
passende serielle Schnittstelle für NMEA0183-Ausgang definiert werden.
Im System kann diese in der Datei rc.local aktiviert werden:
# Create virtual serial connection
socat pty,rawer,echo=0,group-late=dialout,mode=0660,link=/dev/ttyV0 \
pty,rawer,echo=0,group-late=dialout,mode=0660,link=/dev/ttyV1 &
OpenCPN sendet dann Datensätze über ttyV0 und dieses Programm
empfängt sie über ttyV1.
Die Wegepunkte werden in OpenCPN im Standard auf 6 Zeichen gekürzt.
Über die Konfigurationsdatei Settings / MaxWaypointNameLength=<nn>
ist es möglich einen anderen Wert einzustellen im Bereich zwischen
3 und 32. Lt. NMEA0183 ist die maximale Länge 10 Zeichen.
Zeichensätze müssen, sofern sie noch nicht vorhanden sind in
/usr/local/share/fonts abgelegt werden
Buttonlayout:
[ 1 ] [ 2 ] [ 3 ] [ 4 ] [ 5 ] [ 6 ]
Button 6 ist reserviert für Beleuchtung ILUM
Standardbelegung der Tasten
Button 1: Mode
Button 2: Abbruch
Button 3: zurück
Button 4: weiter
Button 5: Ok/Menu
Verlagerung der Seitenanzeige in den Titel als Ziffer in Klammern
Vergleich zum Garmin GMI20:
[ Zurück ] [ Hoch ] [ Menü ] [ Runter ] [ on/off ]
Vergleich zum Raymarine I70s:
[ ON/ILUM ] [ Hoch ] [ Runter ] [ MENU ]
Button 1 wird für AVG verwendet auf mehreren Seiten
Button 5 wird für Trend TRND verwendet
Änderungsprotokoll
==================
Version Datum Änderung(en) von
-------- ----------- ------------------------------------------------------ ----
0.1 2024-10-31 Entwicklung begonnen tho
0.2 2024-12-24 Veröffentlichung als Git-Repository tho
0.3 2025 WIP tho
"""
import os
import sys
import configparser
from setproctitle import setproctitle, setthreadtitle
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Rsvg', '2.0')
from gi.repository import GLib, Gtk, Gdk, Rsvg
import cairo
import math
import threading
import can
import serial
import smbus2
import pynmea2
import bme280
import math
import time
from datetime import datetime
from nmea2000 import Device, BoatData, History, HistoryBuffer
from nmea2000 import parser
import pages
import struct
__author__ = "Thomas Hooge"
__copyright__ = "Copyleft 2024-2025, all rights reversed"
__version__ = "0.2"
__email__ = "thomas@hoogi.de"
__status__ = "Development"
cfg = {
'cfgfile': 'obp60v.conf',
'imgpath': os.path.join(sys.path[0], 'images'),
'deviceid': 100,
'manufcode': 2046, # Open Boat Projects (OBP)
'devfunc': 120, # Display
'devclass': 120, # Display
'industrygroup': 4, # Marine
'gps': False,
'bme280': False
}
def rxd_n2k(device):
setthreadtitle("N2Klistener")
bus = can.Bus(interface='socketcan', channel=device, bitrate=250000);
wip = False
sc = 0
nf = 0
while not shutdown:
msg = bus.recv(2)
if not msg:
continue
priority = (msg.arbitration_id & 0x1c000000) >> 26
source = msg.arbitration_id & 0x000000ff
pgn = (msg.arbitration_id & 0x3ffff00) >> 8
match pgn:
case 129025:
# Position
#lat = struct.unpack_from('<l', msg.data, 0)[0] * 1e-07
#lon = struct.unpack_from('<l', msg.data, 4)[0] * 1e-07
#boatdata.setValue("LAT", lat)
#boatdata.setValue("LON", lon)
parser.parse_129025(msg.data, boatdata)
case 127505:
# Fluid level
#fluidtype = msg.data[0] >> 4
#instance = msg.data[0] & 0x0f
#level = struct.unpack_from('<H', msg.data, 1)[0] * 0.004
#capacity = struct.unpack_from('<L', msg.data, 2)[0] * 0.1
#boatdata.tank[instance].capacity = capacity
#boatdata.tank[instance].volume = level
#boatdata.tank[instance].fluidtype = fluidtype
parser.parse_127505(msg.data, boatdata)
case 127508:
# Battery status
#instance = msg.data[0]
#boatdata.voltage = (msg.data[2] * 256 + msg.data[1]) * 0.01 # Spannung
#ampere = (msg.data[4] * 256 + msg.data[3]) * 0.1 # Stromstärke
#temp = (msg.data[6] * 256 + msg.data[5]) * 0.01 - 273.15 # Temperatur
#sid = msg.data[7]
parser.parse_127508(msg.data, boatdata)
case 129540:
# GNS sats in view
# TODO es kann mehrere Geräte geben die senden
# - source beachten (muß gleich bleiben)
# - gemischte Reihenfolge der Pakete ermöglichen
sc = (msg.data[0] & 0xf0) >> 5
fc = msg.data[0] & 0x1f
if not wip:
if fc != 0:
continue
source0 = source # muß über das Fast-packet konstant bleiben
sc0 = sc # -"-
fc0 = fc # dieser Zähler wird inkrementiert
datalen = msg.data[1]
nf = math.ceil((datalen - 6) / 7) + 1
buf129540 = msg.data[2:]
wip = True
else:
if (source == source0) and (sc == sc0) and (fc == fc0 + 1):
buf129540.extend(msg.data[1:8])
fc0 = fc
else:
# Dieser Frame paßt nicht
#print("PGN 129540: sc/fc mismatch")
pass
if fc == nf:
wip = False
parser.parse_129540(buf129540, boatdata)
case _:
pass
bus.shutdown()
def rxd_0183(devname):
# Prüfe ob NMEA0183-Port vorhanden ist und sich öffnen läßt
try:
ser = serial.Serial(devname, 115200, timeout=3)
except serial.SerialException as e:
print("NMEA0183 serial port not available")
return
setthreadtitle("0183listener")
while not shutdown:
try:
raw = ser.readline().decode('ascii')
msg = pynmea2.parse(raw)
except pynmea2.nmea.ParseError:
print(f"NMEA0183: Parse-Error: {raw}")
continue
# sentence_type kann fehlen
try:
stype = msg.sentence_type
except:
print(f"NMEA0183: Sentence type missing: {raw}")
continue
if stype == 'GLL':
boatdata.setValue("LAT", msg.latitude)
boatdata.setValue("LON", msg.longitude)
elif stype == 'VTG':
boatdata.setValue("COG", int(msg.true_track))
#TODO klären was für Typen hier ankommen können
# bytearray, str, decimal.Decimal?
sog = float(msg.spd_over_grnd_kts)
#str von OpenCPN: sog = float(msg.spd_over_grnd_kts[:-1])
boatdata.setValue("SOG", sog)
elif stype == 'VHW':
boatdata.setValue("STW", float(msg.water_speed_knots))
elif stype == 'WPL':
# Wegepunkt
print(msg.fields)
elif stype == 'RTE':
# Route
print(msg.fields)
else:
print(msg)
ser.close()
def rxd_gps(devname, devspeed):
# Prüfe ob GPS-Port vorhanden ist und sich öffnen läßt
try:
ser = serial.Serial(devname, devspeed, timeout=3)
except serial.SerialException as e:
print("GPS serial port not available")
return
setthreadtitle("GPSlistener")
while not shutdown:
try:
msg = pynmea2.parse(ser.readline().decode('ascii'))
except pynmea2.nmea.ParseError:
continue
if msg.sentence_type == 'GLL':
boatdata.setValue("LAT", msg.latitude)
boatdata.setValue("LON", msg.longitude)
else:
print(msg)
print(msg.fields)
ser.close()
def datareader(histpath, history):
"""
Daten zu fest definierten Zeitpunkten lesen
Die Schleife läuft einmal alle <n> Sekunden immer zum gleichen Zeitpunkt.
Die Nutzlast sollte demnach weniger als eine Sekunde Laufzeit haben.
Falls durch eine außergewöhnliche Situation doch einmal mehr als eine
Sekunde benötigt werden sollte, gleicht sich das in den darauffolgenden
Durchläufen wieder aus.
"""
setthreadtitle("datareader")
# Speicherpfad für Meßwertdaten
if not os.path.exists(histpath):
os.makedirs(histpath)
history.basepath = histpath
# Serien initialisieren
history.addSeries("BMP280-75", 336 ,75)
history.addSeries("BMP280-150", 336 , 150)
history.addSeries("BMP280-300", 336 , 300)
history.addSeries("BMP280-600", 336 , 600)
history.addSeries("BMP280-900", 336 , 900)
for s in history.series.values():
s.begin()
def g_tick(n=1):
t = time.time()
count = 0
while True:
count += n
yield max(t + count - time.time(), 0)
g = g_tick(1)
n = 0
while not shutdown:
time.sleep(next(g))
# BME280 abfragen
if cfg['bme280']:
sensordata = bme280.sample(smbus, cfg['bme280_address'], cfg['bme280_cp'])
# Aktuellen Wert merken
boatdata.setValue("xdrTemp", sensordata.temperature)
boatdata.setValue("xdrPress", sensordata.pressure)
boatdata.setValue("xdrHum", sensordata.humidity)
# Historie schreiben
pval = int(sensordata.pressure *10)
for k, v in history.series.items():
if n % k == 0:
v.add(pval)
n += 1
for s in history.series.values():
s.finish()
class Frontend(Gtk.Window):
def __init__(self, cfg, device, boatdata, profile):
super().__init__()
self.owndev = device
self.boatdata = boatdata
self._config = cfg['_config']
self._cfgfile = cfg['cfgfile']
self._fullscreen = cfg['guistyle'] == 'fullscreen'
self._mouseptr = cfg['mouseptr']
self.connect("delete-event", self.on_delete)
self.connect("destroy", self.on_destroy)
if self._fullscreen:
self.fullscreen()
#self.set_size_request(800, 480)
# Schaltflächen am unteren Bildschirmrand sind berührbar,
# zusätzlich gibt es die großen Flächen am rechten Rand
self.button_round = False
self.button = { # linke obere Ecke
1: (640, 96),
2: (640, 160),
3: (640, 224),
4: (640, 288),
5: (640, 352),
6: (640, 416)
}
self.button_w = 160
self.button_h = 64
else:
if cfg['win_x'] >= 0 and cfg['win_y'] >= 0:
self.move(cfg['win_x'], cfg['win_y'])
self.button_round = True
# Runde Tasten wie beim Original-Gerät
self.button = { # Mittelpunkt
1: (75, 485),
2: (150, 492),
3: (227, 496),
4: (306, 496),
5: (382, 492),
6: (459, 485)
}
self.button_radius = 30
self.set_position(Gtk.WindowPosition.CENTER)
self.set_size_request(530, 555)
self.set_title("OBP60 virt")
self.set_app_paintable(True)
self.set_decorated(False)
self.set_keep_above(True)
self.screen = self.get_screen()
self.visual = self.screen.get_rgba_visual()
if (self.visual is not None and self.screen.is_composited()):
self.set_visual(self.visual)
handle = Rsvg.Handle()
if self._fullscreen:
self._svg = handle.new_from_file(os.path.join(sys.path[0], "fullscreen.svg"))
else:
self._svg = handle.new_from_file(os.path.join(sys.path[0], "obp60.svg"))
self.connect("draw", self.on_draw)
self.da = Gtk.DrawingArea()
self.da.add_events(Gdk.EventMask.BUTTON_PRESS_MASK|Gdk.EventMask.BUTTON_RELEASE_MASK)
self.add(self.da)
self.da.connect("draw", self.da_draw)
self.da.connect('button-press-event', self.da_button_press)
self.da.connect('button-release-event', self.da_button_release)
self.button_clicked = 0 # Geklickter Button vor Loslassen
self.keylock = False
self.pages = profile
self.pageno = 1
self.curpage = self.pages[self.pageno]
print("Wische von 2 nach 1 für Programmende")
def on_realize(self, widget):
if self._fullscreen and not self._mouseptr:
# Mauszeiger ggf. ausschalten
self.get_window().set_cursor(Gdk.Cursor(Gdk.CursorType.BLANK_CURSOR))
def run(self):
GLib.timeout_add_seconds(2, self.on_timer)
self.show_all()
Gtk.main()
def on_timer(self):
# Boatdata validator
boatdata.updateValid(5)
# Tastaturstatus an Seite durchreichen
self.curpage.keylock = self.keylock
# Neuzeichnen
self.queue_draw()
return True
def on_draw(self, widget, ctx):
# Fenstertransparenz
ctx.set_source_rgba(0, 0, 0, 0)
ctx.paint()
def da_draw(self, widget, ctx):
viewport = Rsvg.Rectangle()
viewport.x = 0
viewport.y = 0
if self._fullscreen:
viewport.width = 800
viewport.height = 480
else:
viewport.width = 530
viewport.height = 555
self._svg.render_document(ctx, viewport)
ctx.set_source_rgb(1.0, 0, 0)
if not self._fullscreen:
ctx.translate(64, 95) # Koordinatenursprung auf virtuellen Displaybereich setzen
ctx.rectangle(0, 0, 400, 300)
ctx.clip()
else:
ctx.scale(1.6, 1.6)
ctx.set_source_rgb(0, 0, 0) # Schwarz auf Weiß
# Heartbeat umschalten
if self.curpage.header:
self.curpage.draw_header(ctx)
self.curpage.draw(ctx)
if self.curpage.footer:
self.curpage.draw_footer(ctx)
def da_button_press(self, widget, event):
# Es gibt eine Liste mit Objekten und hier wird
# geprüft ob und in welches Objekt geklickt wurde
# Die eigentliche Funktion wird beim Loslassen ausgelöst.
# Damit sind Wischgesten simulierbar
self.button_clicked = 0
if self.button_round:
# Horizontale runde Buttons
if (event.x < self.button[1][0] - self.button_radius or event.x > self.button[6][0] + self.button_radius):
return True
if (event.y < self.button[1][1] - self.button_radius or event.y > self.button[3][1] + self.button_radius):
return True
for b, v in self.button.items():
diff = math.sqrt((event.x - v[0])**2 + (event.y - v[1])**2)
if diff < self.button_radius:
self.button_clicked = b
break
else:
# Vertikale eckige Buttons
if (event.x < self.button[1][0]) or (event.x > self.button[1][0] + self.button_w):
return True
if (event.y < self.button[1][1]) or (event.y > self.button[6][1] + self.button_h):
return True
for b, v in self.button.items():
if event.x >= v[0] and event.x <= v[0] + self.button_w and \
event.y >= v[1] and event.y <= v[1] + self.button_h:
self.button_clicked = b
break
return True
def da_button_release(self, widget, event):
# Hier sind die eigentlichen Tastenfunktionen
#
# Die Auswertung ist abhängig von der jew. angezeigten Seite
# Jede Seite kann eine Methode "handle_key" implementieren
# Falls der Rückgabewert "True" ist, hat die Seite die Taste
# verarbeitet, die Funktion hier wird damit unterdrückt.
# TODO
if self.button_round:
if (event.x < self.button[1][0] - self.button_radius or event.x > self.button[6][0] + self.button_radius):
return True
if (event.y < self.button[1][1] - self.button_radius or event.y > self.button[3][1] + self.button_radius):
return True
selected = 0
for b, v in self.button.items():
diff = math.sqrt((event.x - v[0])**2 + (event.y - v[1])**2)
if diff < self.button_radius:
selected = b
break
else:
if event.x < self.button[1][0] or event.x > self.button[6][0] + self.button_w:
return True
if event.y < self.button[1][1] or event.y > self.button[6][1] + self.button_h:
return True
selected = 0
for b, v in self.button.items():
if event.x >= v[0] and event.x <= v[0] + self.button_w and \
event.y >= v[1] and event.y <= v[1] + self.button_h:
selected = b
break
if self.keylock:
# Bei Tastensperre einzige Möglichkeit: Tastensperre ausschalten
if selected == 6 and self.button_clicked == 1:
self.keylock = False
return True
if selected == 1:
if self.button_clicked == 2:
# Programmende bei Klicken auf 2 und loslassen auf 1
self.get_window().set_cursor(Gdk.Cursor(Gdk.CursorType.WATCH))
self.on_destroy(self)
elif self.button_clicked == 6:
# Klick auf 6 und loslassen auf 1 ist Tastatursperre
self.keylock = True
else:
self.curpage.handle_key(1)
elif selected == 2:
# Abbruch/Zurück
self.curpage.handle_key(2)
elif selected == 3:
# runter / eine Seite vor
if not self.curpage.handle_key(3):
if self.pageno > 1:
self.pageno -= 1
else:
self.pageno = len(self.pages) - 1
self.curpage = self.pages[self.pageno]
elif selected == 4:
if not self.curpage.handle_key(4):
if self.pageno < len(self.pages) - 1:
self.pageno += 1
else:
self.pageno = 1
# hoch / eine Seite zurück
self.curpage = self.pages[self.pageno]
elif selected == 5:
# Ok/Menü
if self.button_clicked == 4:
# Umschalten zur Systemseite
self.curpage = self.pages[0]
else:
self.curpage.handle_key(5)
elif selected == 6:
if not self.curpage.handle_key(6):
# Backlight on/off
self.curpage.backlight = not self.curpage.backlight
return True
def on_delete(self, widget, event):
# Einhängepunkt für zukünftige Erweiterungen
# Hier kann der Vorgang noch durch z.B. Benutzerinteraktion
# abgebrochen werden
print("Window delete event")
return False
def on_destroy(self, widget):
# Letzte Fensterposition speichern
position = self.get_position()
if not self._config.has_section('gui'):
config.add_section('gui')
self._config.set('gui', 'win_x', str(position[0]))
self._config.set('gui', 'win_y', str(position[1]))
with open(self._cfgfile, 'w') as fh:
self._config.write(fh)
Gtk.main_quit()
def init_profile(config, cfg, boatdata):
'''
config: Configparser-Objekt
cfg: Laufende Programmkonfiguration
Die Liste und Anordnung der Seiten nennen wir "Profil"
Seiten-Profil aus Konfiguration erstellen
Seite Nummer 0 ist immer die Systemseite. Diese ist nicht
über die normale Seitenreihenfolge erreichbar, sondern
durch eine spezielle Tastenkombination/Geste.
TODO Prüfungen einbauen:
Fortlaufende Seitennummern ab 1
Fortlaufende Wertenummern ab 1
Maximalwerte nicht überschreiten
'''
pages_max = config.getint('pages', 'number_of_pages')
# Suche alle Abschnitte, die mit "page" beginnen
sects = config.sections()
sects.remove('pages')
pagedef = {}
n = 0
for s in sects:
if s.startswith('page'):
# Nummer und Art ermitteln
pageno = int(s[4:])
pagedef[pageno] = {'type': config.get(s, "type")}
# Hole ein bin maximal 4 Werte je Seite
values = {}
valno = 1
for i in (1, 2, 3, 4):
try:
values[i] = config.get(s, f"value{i}")
except configparser.NoOptionError:
break
pagedef[pageno]['values'] = values
n += 1
if n >= pages_max:
break
clist = {
0: pages.System(0, cfg, boatdata)
}
for i, p in pagedef.items():
try:
cls = getattr(pages, p['type'])
except AttributeError:
# Klasse nicht vorhanden, Seite wird nicht benutzt
print(f"Klasse '{type}' nicht gefunden")
continue
c = cls(i, cfg, boatdata, *[v for v in p['values'].values()])
clist[i] = c
return clist
if __name__ == "__main__":
#setproctitle("obp60v")
shutdown = False
owndevice = Device(100)
# Hardcoding device, not intended to change
owndevice.manufacturercode = cfg['manufcode']
owndevice.industrygroup = cfg['industrygroup']
owndevice.deviceclass = cfg['devclass']
owndevice.devicefunction = cfg['devfunc']
boatdata = BoatData()
boatdata.addTank(0)
boatdata.addEngine(0)
# Basiskonfiguration aus Datei lesen
config = configparser.ConfigParser()
ret = config.read(os.path.join(sys.path[0], cfg['cfgfile']))
if len(ret) == 0:
print("Konfigurationsdatei '{}' konnte nicht gelesen werden!".format(cfg['cfgfile']))
sys.exit(1)
cfg['_config'] = config # Objekt zum späteren schreiben
cfg['deviceid'] = config.getint('system', 'deviceid')
cfg['simulation'] = config.getboolean('system', 'simulation')
cfg['histpath'] = os.path.expanduser(config.get('system', 'histpath'))
cfg['guistyle'] = config.get('system', 'guistyle')
print("Setting GUI style to '{}'".format(cfg['guistyle']))
cfg['mouseptr'] = config.getboolean('system', 'mouseptr')
try:
cfg['win_x'] = config.getint('gui', 'win_x')
cfg['win_y'] = config.getint('gui', 'win_y')
except (configparser.NoSectionError, configparser.NoOptionError):
# Keine vorgegebene Position
cfg['win_x'] = -1
cfg['win_y'] = -1
cfg['can'] = config.getboolean('can', 'enabled')
if cfg['can']:
cfg['can_intf'] = config.get('can', 'interface')
cfg['nmea0183'] = config.getboolean('nmea0183', 'enabled')
if cfg['nmea0183']:
cfg['0183_port'] = config.get('nmea0183', 'port')
cfg['gps'] = config.getboolean('gps', 'enabled')
if cfg['gps']:
cfg['gps_port'] = config.get('gps', 'port')
cfg['bme280'] = config.getboolean('bme280', 'enabled')
if cfg['bme280']:
cfg['bme280_port'] = config.getint('bme280', 'port')
cfg['bme280_address'] = int(config.get('bme280', 'address'), 16) # convert 0x76
smbus = smbus2.SMBus(cfg['bme280_port'])
cfg['bme280_cp'] = bme280.load_calibration_params(smbus, cfg['bme280_address'])
history = History("press", 75)
boatdata.addHistory(history, "press")
if cfg['simulation']:
boatdata.enableSimulation()
profile = init_profile(config, cfg, boatdata)
# Schnittstellen aktivieren
if cfg['can']:
t_rxd_n2k = threading.Thread(target=rxd_n2k, args=(cfg['can_intf'],))
t_rxd_n2k.start()
if cfg['nmea0183']:
t_rxd_0183 = threading.Thread(target=rxd_0183, args=(cfg['0183_port'],))
t_rxd_0183.start()
if cfg['gps']:
t_rxd_gps = threading.Thread(target=rxd_gps, args=(cfg['gps_port'],))
t_rxd_gps.start()
if not cfg['simulation']:
t_data = threading.Thread(target=datareader, args=(cfg['histpath'], history))
t_data.start()
app = Frontend(cfg, owndevice, boatdata, profile)
app.run()
shutdown = True
if cfg['can']:
t_rxd_n2k.join()
if cfg['nmea0183']:
t_rxd_0183.join()
if cfg['gps']:
t_rxd_gps.join()
if not cfg['simulation']:
t_data.join()
print("Another fine product of the Sirius Cybernetics Corporation.")