OBP60v/obp60.py

562 lines
19 KiB
Python
Executable File

#!/usr/bin/env python
"""
Virtuelles Multifunktionsgerät
NMEA2000
deviceclass: 120 - Display
devicefunction: 130 - Display
Benötigte Pakete:
python3-cairo python3-gi python3-gi-cairo gir1.2-rsvg-2.0
Um transparente Darstellung unter Openbox zu erhalten muß xcompmgr
installiert und konfiguriert werden.
Wenn ein lokaler GPS-Empfänger angeschlossen ist, so kann dieser genutzt
werden. Zusätzlich benötigte Pakete:
python-serial python3-nmea2
Für Unterstützung des BME280-Sensors sind die Bibliotheken smbus2 und
bme280 erforderlich.
Routen und Wegepunkte können von OpenCPN empfangen werden. Dazu muß eine
passende serielle Schnittstelle für NMEA0183-Ausgang definiert werden.
Im System kann diese in der Datei rc.local aktiviert werden:
# Create virtual serial connection
socat pty,rawer,echo=0,group-late=dialout,mode=0660,link=/dev/ttyV0 \
pty,rawer,echo=0,group-late=dialout,mode=0660,link=/dev/ttyV1 &
OpenCPN sendet dann Datensätze über ttyV0 und dieses Programm
empfängt sie über ttyV1.
Die Wegepunkte werden in OpenCPN im Standard auf 6 Zeichen gekürzt.
Über die Konfigurationsdatei Settings / MaxWaypointNameLength=<nn>
ist es möglich einen anderen Wert einzustellen im Bereich zwischen
3 und 32. Lt. NMEA0183 ist die maximale Länge 10 Zeichen.
Zeichensätze müssen, sofern sie noch nicht vorhanden sind in
/usr/local/share/fonts abgelegt werden
Buttonlayout:
[ 1 ] [ 2 ] [ 3 ] [ 4 ] [ 5 ] [ 6 ]
Button 6 ist reserviert für Beleuchtung ILUM
Standardbelegung der Tasten
Button 1: Mode
Button 2: Abbruch
Button 3: zurück
Button 4: weiter
Button 5: Ok/Menu
Verlagerung der Seitenanzeige in den Titel als Ziffer in Klammern
Vergleich zum Garmin GMI20:
[ Zurück ] [ Hoch ] [ Menü ] [ Runter ] [ on/off ]
Vergleich zum Raymarine I70s:
[ ON/ILUM ] [ Hoch ] [ Runter ] [ MENU ]
Button 1 wird für AVG verwendet auf mehreren Seiten
Button 5 wird für Trend TRND verwendet
Änderungsprotokoll
==================
Version Datum Änderung(en) von
-------- ----------- ------------------------------------------------------ ----
0.1 2024-10-31 Entwicklung begonnen tho
0.2 2024-12-24 Veräffentlichung als Git-Repository tho
"""
import os
import sys
import configparser
from setproctitle import setproctitle, setthreadtitle
import gi
gi.require_version('Gtk', '3.0')
gi.require_version('Rsvg', '2.0')
from gi.repository import GLib, Gtk, Gdk, Rsvg
import cairo
import math
import threading
import can
import serial
import smbus2
import pynmea2
import bme280
import math
import time
from datetime import datetime
from nmea2000 import Device, BoatData, History, HistoryBuffer
from nmea2000 import parser
import pages
import struct
__author__ = "Thomas Hooge"
__copyright__ = "Copyleft 2024-2025, all rights reversed"
__version__ = "0.2"
__email__ = "thomas@hoogi.de"
__status__ = "Development"
cfg = {
'cfgfile': 'obp60.conf',
'imgpath': os.path.join(sys.path[0], 'images'),
'deviceid': 100,
'manufcode': 2046, # Open Boat Projects (OBP)
'devfunc': 120, # Display
'devclass': 120, # Display
'industrygroup': 4, # Marine
'gps': False,
'bme280': False
}
def rxd_n2k():
setthreadtitle("N2Klistener")
bus = can.Bus(interface='socketcan', channel='can0', bitrate=250000);
wip = False
sc = 0
nf = 0
while not shutdown:
msg = bus.recv(2)
if not msg:
continue
priority = (msg.arbitration_id & 0x1c000000) >> 26
source = msg.arbitration_id & 0x000000ff
pgn = (msg.arbitration_id & 0x3ffff00) >> 8
match pgn:
case 129025:
# Position
#lat = struct.unpack_from('<l', msg.data, 0)[0] * 1e-07
#lon = struct.unpack_from('<l', msg.data, 4)[0] * 1e-07
#boatdata.setValue("LAT", lat)
#boatdata.setValue("LON", lon)
parser.parse_129025(msg.data, boatdata)
case 127505:
# Fluid level
#fluidtype = msg.data[0] >> 4
#instance = msg.data[0] & 0x0f
#level = struct.unpack_from('<H', msg.data, 1)[0] * 0.004
#capacity = struct.unpack_from('<L', msg.data, 2)[0] * 0.1
#boatdata.tank[instance].capacity = capacity
#boatdata.tank[instance].volume = level
#boatdata.tank[instance].fluidtype = fluidtype
parser.parse_127505(msg.data, boatdata)
case 127508:
# Battery status
#instance = msg.data[0]
#boatdata.voltage = (msg.data[2] * 256 + msg.data[1]) * 0.01 # Spannung
#ampere = (msg.data[4] * 256 + msg.data[3]) * 0.1 # Stromstärke
#temp = (msg.data[6] * 256 + msg.data[5]) * 0.01 - 273.15 # Temperatur
#sid = msg.data[7]
parser.parse_127508(msg.data, boatdata)
case 129540:
# GNS sats in view
# TODO es kann mehrere Geräte geben die senden
# - source beachten (muß gleich bleiben)
# - gemischte Reihenfolge der Pakete ermöglichen
sc = (msg.data[0] & 0xf0) >> 5
fc = msg.data[0] & 0x1f
if not wip:
if fc != 0:
continue
source0 = source # muß über das Fast-packet konstant bleiben
sc0 = sc # -"-
fc0 = fc # dieser Zähler wird inkrementiert
datalen = msg.data[1]
nf = math.ceil((datalen - 6) / 7) + 1
buf129540 = msg.data[2:]
wip = True
else:
if (source == source0) and (sc == sc0) and (fc == fc0 + 1):
buf129540.extend(msg.data[1:8])
fc0 = fc
else:
# Dieser Frame paßt nicht
#print("PGN 129540: sc/fc mismatch")
pass
if fc == nf:
wip = False
parser.parse_129540(buf129540, boatdata)
case _:
pass
bus.shutdown()
def rxd_0183(devname):
# Prüfe ob Port vorhanden ist und sich öffnen läßt
try:
ser = serial.Serial(devname, 115200, timeout=3)
except serial.SerialException as e:
print("OpenCPN serial port not available")
return
setthreadtitle("0183listener")
while not shutdown:
try:
msg = pynmea2.parse(ser.readline().decode('ascii'))
except pynmea2.nmea.ParseError:
continue
if msg.sentence_type == 'GLL':
boatdata.setValue("LAT", msg.latitude)
boatdata.setValue("LON", msg.longitude)
elif msg.sentence_type == 'VTG':
boatdata.setValue("COG", int(msg.true_track))
boatdata.setValue("SOG", float(msg.spd_over_grnd_kts[:-1]))
elif msg.sentence_type == 'VHW':
boatdata.setValue("STW", float(msg.water_speed_knots))
elif msg.sentence_type == 'WPL':
# Wegepunkt
print(msg.fields)
elif msg.sentence_type == 'RTE':
# Route
print(msg.fields)
ser.close()
def datareader(histpath, history):
"""
Daten zu fest definierten Zeitpunkten lesen
Die Schleife läuft einmal alle <n> Sekunden immer zum gleichen Zeitpunkt.
Die Nutzlast sollte demnach weniger als eine Sekunde Laufzeit haben.
Falls durch eine außergewöhnliche Situation doch einmal mehr als eine
Sekunde benötigt werden sollte, gleicht sich das in den darauffolgenden
Durchläufen wieder aus.
"""
setthreadtitle("datareader")
# Speicherpfad für Meßwertdaten
if not os.path.exists(histpath):
os.makedirs(histpath)
history.basepath = histpath
# Serien initialisieren
history.addSeries("BMP280-75", 336 ,75)
history.addSeries("BMP280-150", 336 , 150)
history.addSeries("BMP280-300", 336 , 300)
history.addSeries("BMP280-600", 336 , 600)
history.addSeries("BMP280-900", 336 , 900)
for s in history.series.values():
s.begin()
def g_tick(n=1):
t = time.time()
count = 0
while True:
count += n
yield max(t + count - time.time(), 0)
g = g_tick(1)
n = 0
while not shutdown:
time.sleep(next(g))
# BME280 abfragen
if cfg['bme280']:
sensordata = bme280.sample(smbus, cfg['bme280_address'], cfg['bme280_cp'])
# Aktuellen Wert merken
boatdata.setValue("xdrTemp", sensordata.temperature)
boatdata.setValue("xdrPress", sensordata.pressure)
boatdata.setValue("xdrHum", sensordata.humidity)
# Historie schreiben
pval = int(sensordata.pressure *10)
for k, v in history.series.items():
if n % k == 0:
v.add(pval)
# Lokales GPS abfragen
# TODO
if cfg['gps']:
pass
n += 1
for s in history.series.values():
s.finish()
class Frontend(Gtk.Window):
button = {
1: (75, 485),
2: (150, 492),
3: (227, 496),
4: (306, 496),
5: (382, 492),
6: (459, 485)
}
radius = 30
def __init__(self, device, boatdata, profile):
super().__init__()
self.owndev = device
self.boatdata = boatdata
self.connect("destroy", self.on_destroy)
self.set_position(Gtk.WindowPosition.CENTER)
self.set_size_request(530, 555)
self.set_title("OBP60 virt")
self.set_app_paintable(True)
self.set_decorated(False)
self.set_keep_above(True)
self.screen = self.get_screen()
self.visual = self.screen.get_rgba_visual()
if (self.visual is not None and self.screen.is_composited()):
self.set_visual(self.visual)
handle = Rsvg.Handle()
self._svg = handle.new_from_file(os.path.join(sys.path[0], "obp60.svg"))
self.connect("draw", self.on_draw)
self.da = Gtk.DrawingArea()
self.da.add_events(Gdk.EventMask.BUTTON_PRESS_MASK|Gdk.EventMask.BUTTON_RELEASE_MASK)
self.add(self.da)
self.da.connect("draw", self.da_draw)
self.da.connect('button-press-event', self.da_button_press)
self.da.connect('button-release-event', self.da_button_release)
self.button_clicked = 0 # Geklickter Button vor Loslassen
self.keylock = False
self.pages = profile
self.pageno = 1
self.curpage = self.pages[self.pageno]
print("Wische von 2 nach 1 für Programmende")
def run(self):
GLib.timeout_add_seconds(2, self.on_timer)
self.show_all()
Gtk.main()
def on_timer(self):
# Boatdata validator
boatdata.updateValid(5)
# Tastaturstatus an Seite durchreichen
self.curpage.keylock = self.keylock
# Neuzeichnen
self.queue_draw()
return True
def on_draw(self, widget, ctx):
# Fenstertransparenz
ctx.set_source_rgba(0, 0, 0, 0)
ctx.paint()
def da_draw(self, widget, ctx):
viewport = Rsvg.Rectangle()
viewport.x = 0
viewport.y = 0
viewport.width = 530
viewport.height = 555
self._svg.render_document(ctx, viewport)
ctx.set_source_rgb(1.0, 0, 0)
ctx.translate(64, 95) # Koordinatenursprung auf virtuellen Displaybereich setzen
ctx.rectangle(0, 0, 400, 300)
ctx.clip()
ctx.set_source_rgb(0, 0, 0) # Schwarz auf Weiß
# Heartbeat umschalten
if self.curpage.header:
self.curpage.draw_header(ctx)
self.curpage.draw(ctx)
if self.curpage.footer:
self.curpage.draw_footer(ctx)
def da_button_press(self, widget, event):
# Es gibt eine Liste mit Objekten und hier wird
# geprüft ob und in welches Objekt geklickt wurde
# Die eigentliche Funktion wird beim Loslassen ausgelöst.
# Damit sind Wischgesten simulierbar
self.button_clicked = 0
if (event.x < self.button[1][0] - self.radius or event.x > self.button[6][0] + self.radius):
return True
if (event.y < self.button[1][1] - self.radius or event.y > self.button[3][1] + self.radius):
return True
for b, v in self.button.items():
diff = math.sqrt((event.x - v[0])**2 + (event.y - v[1])**2)
if diff < self.radius:
self.button_clicked = b
break
return True
def da_button_release(self, widget, event):
# Hier sind die eigentlichen Tastenfunktionen
#
# Die Auswertung ist abhängig von der jew. angezeigten Seite
# Jede Seite kann eine Methode "handle_key" implementieren
# Falls der Rückgabewert "True" ist, hat die Seite die Taste
# verarbeitet, die Funktion hier wird damit unterdrückt.
# TODO
if (event.x < self.button[1][0] - self.radius or event.x > self.button[6][0] + self.radius):
return True
if (event.y < self.button[1][1] - self.radius or event.y > self.button[3][1] + self.radius):
return True
selected = 0
for b, v in self.button.items():
diff = math.sqrt((event.x - v[0])**2 + (event.y - v[1])**2)
if diff < self.radius:
selected = b
break
if self.keylock:
# Bei Tastensperre einzige Möglichkeit: Tastensperre ausschalten
if selected == 6 and self.button_clicked == 1:
self.keylock = False
return True
if selected == 1:
if self.button_clicked == 2:
# Programmende bei Klicken auf 2 und loslassen auf 1
self.get_window().set_cursor(Gdk.Cursor(Gdk.CursorType.WATCH))
Gtk.main_quit()
elif self.button_clicked == 6:
# Klick auf 6 und loslassen auf 1 ist Tastatursperre
self.keylock = True
else:
self.curpage.handle_key(1)
elif selected == 2:
# Abbruch/Zurück
self.curpage.handle_key(2)
elif selected == 3:
# runter / eine Seite vor
if not self.curpage.handle_key(3):
if self.pageno > 1:
self.pageno -= 1
else:
self.pageno = len(self.pages) - 1
self.curpage = self.pages[self.pageno]
elif selected == 4:
if not self.curpage.handle_key(4):
if self.pageno < len(self.pages) - 1:
self.pageno += 1
else:
self.pageno = 1
# hoch / eine Seite zurück
self.curpage = self.pages[self.pageno]
elif selected == 5:
# Ok/Menü
if self.button_clicked == 4:
# Umschalten zur Systemseite
self.curpage = self.pages[0]
else:
self.curpage.handle_key(5)
elif selected == 6:
if not self.curpage.handle_key(6):
# Backlight on/off
self.curpage.backlight = not self.curpage.backlight
return True
def on_destroy(self, widget):
Gtk.main_quit()
def init_profile(config, cfg, boatdata):
'''
config: Configparser-Objekt
cfg: Laufende Programmkonfiguration
Die Liste und Anordnung der Seiten nennen wir "Profil"
Seiten-Profil aus Konfiguration erstellen
Seite Nummer 0 ist immer die Systemseite. Diese ist nicht
über die normale Seitenreihenfolge erreichbar, sondern
durch eine spezielle Tastenkombination/Geste.
TODO Prüfungen einbauen:
Fortlaufende Seitennummern ab 1
Fortlaufende Wertenummern ab 1
Maximalwerte nicht überschreiten
'''
pages_max = config.getint('pages', 'number_of_pages')
# Suche alle Abschnitte, die mit "page" beginnen
sects = config.sections()
sects.remove('pages')
pagedef = {}
n = 0
for s in sects:
if s.startswith('page'):
# Nummer und Art ermitteln
pageno = int(s[4:])
pagedef[pageno] = {'type': config.get(s, "type")}
# Hole ein bin maximal 4 Werte je Seite
values = {}
valno = 1
for i in (1, 2, 3, 4):
try:
values[i] = config.get(s, f"value{i}")
except configparser.NoOptionError:
break
pagedef[pageno]['values'] = values
n += 1
if n >= pages_max:
break
clist = {
0: pages.System(0, cfg, boatdata)
}
for i, p in pagedef.items():
try:
cls = getattr(pages, p['type'])
except AttributeError:
# Klasse nicht vorhanden, Seite wird nicht benutzt
print(f"Klasse '{type}' nicht gefunden")
continue
c = cls(i, cfg, boatdata, *[v for v in p['values'].values()])
clist[i] = c
return clist
if __name__ == "__main__":
#setproctitle("obp60v")
shutdown = False
owndevice = Device(100)
# Hardcoding device, not intended to change
owndevice.manufacturercode = cfg['manufcode']
owndevice.industrygroup = cfg['industrygroup']
owndevice.deviceclass = cfg['devclass']
owndevice.devicefunction = cfg['devfunc']
boatdata = BoatData()
boatdata.addTank(0)
boatdata.addEngine(0)
# Basiskonfiguration aus Datei lesen
config = configparser.ConfigParser()
config.read(os.path.join(sys.path[0], cfg['cfgfile']))
cfg['deviceid'] = config.getint('system', 'deviceid')
cfg['simulation'] = config.getboolean('system', 'simulation')
cfg['histpath'] = os.path.expanduser(config.get('system', 'histpath'))
cfg['gps'] = config.getboolean('gps', 'enabled')
if cfg['gps']:
cfg['gps_port'] = config.get('gps', 'port')
cfg['bme280'] = config.getboolean('bme280', 'enabled')
if cfg['bme280']:
cfg['bme280_port'] = config.getint('bme280', 'port')
cfg['bme280_address'] = int(config.get('bme280', 'address'), 16) # convert 0x76
smbus = smbus2.SMBus(cfg['bme280_port'])
cfg['bme280_cp'] = bme280.load_calibration_params(smbus, cfg['bme280_address'])
history = History("press", 75)
boatdata.addHistory(history, "press")
cfg['ocpn_port'] = config.get('opencpn', 'port')
if cfg['simulation']:
boatdata.enableSimulation()
profile = init_profile(config, cfg, boatdata)
t_rxd_n2k = threading.Thread(target=rxd_n2k)
t_rxd_n2k.start()
t_rxd_0183 = threading.Thread(target=rxd_0183, args=(cfg['ocpn_port'],))
t_rxd_0183.start()
t_data = threading.Thread(target=datareader, args=(cfg['histpath'], history))
t_data.start()
app = Frontend(owndevice, boatdata, profile)
app.run()
shutdown = True
t_rxd_n2k.join()
t_rxd_0183.join()
t_data.join()
print("Another fine product of the Sirius Cybernetics Corporation.")