181 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			181 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
| """
 | |
| Map access inspired by Norbert Walter
 | |
| 
 | |
| """
 | |
| 
 | |
| import os
 | |
| import math
 | |
| import http.client
 | |
| import ssl
 | |
| from io import BytesIO
 | |
| from PIL import Image, ImageDraw
 | |
| import cairo
 | |
| 
 | |
| class MapService():
 | |
| 
 | |
|     def __init__(self, logger, cfg):
 | |
|         self.cachepath = os.path.join(cfg['histpath'], "tilecache")
 | |
|         self.lat = 53.56938345759218
 | |
|         self.lon = 9.679658234303275
 | |
|         self.dither_type = Image.FLOYDSTEINBERG
 | |
|         self.width = 260
 | |
|         self.height = 260
 | |
|         self.zoom_level = 15
 | |
|         self.debug = False
 | |
| 
 | |
|     def set_output_size(self, width, height):
 | |
|         self.width = width
 | |
|         self.height = height
 | |
| 
 | |
|     def web_get_tile(self, domain, path):
 | |
|         ssl_context = ssl.create_default_context()
 | |
|         conn = http.client.HTTPSConnection(domain, 443, context=ssl_context)
 | |
|         headers = {
 | |
|             #"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:92.0) Gecko/20100101 Firefox/92.0"
 | |
|             "User-Agent": "OBP/1.0 (X11; Linux x86_64; rv:0.1) OBP60v/0.1"
 | |
|         }
 | |
|         data = None
 | |
|         try:
 | |
|             conn.request("GET", path, headers=headers)
 | |
|             response = conn.getresponse()
 | |
|             if response.status == 200:
 | |
|                 data = response.read()
 | |
|             else:
 | |
|                 print(f"Error: {response.status}")
 | |
|         except http.client.HTTPException as e:
 | |
|             print(f"HTTP error occurred: {e}")
 | |
|         except ssl.SSLError as ssl_error:
 | |
|             print(f"SSL error occurred: {ssl_error}")
 | |
|         finally:
 | |
|             conn.close()
 | |
|         return data
 | |
| 
 | |
|     def latlon_to_xyz(self, lat, lon, zoom):
 | |
|         """
 | |
|         Converts geographic coordinates (Lat, Lon) to X, Y coordinates for the tiling system
 | |
|         and returns the pixel offset of the exact position in the tile.
 | |
|         """
 | |
|         # Calculate the X tile coordinate
 | |
|         x_tile = (lon + 180.0) / 360.0 * (2 ** zoom)
 | |
| 
 | |
|         # Calculate the Y tile coordinate
 | |
|         lat_rad = math.radians(lat)
 | |
|         y_tile = (1.0 - math.log(math.tan(lat_rad) + (1 / math.cos(lat_rad))) / math.pi) / 2.0 * (2 ** zoom)
 | |
| 
 | |
|         # Integer part is the tile coordinates
 | |
|         x = int(x_tile)
 | |
|         y = int(y_tile)
 | |
| 
 | |
|         # Decimal part determines the offset within the tile
 | |
|         x_offset = int((x_tile - x) * 256)  # Each tile is 256x256 pixels
 | |
|         y_offset = int((y_tile - y) * 256)
 | |
| 
 | |
|         return x, y, x_offset, y_offset
 | |
| 
 | |
|     def fetch_osm_tile(self, x, y, zoom):
 | |
| 
 | |
|         tile_key = f"{zoom}/{x}/{y}.png"
 | |
|         cache_dir = os.path.join(self.cachepath, str(zoom), str(x))
 | |
| 
 | |
|         os.makedirs(cache_dir, exist_ok=True)  # Create the directory if it doesn't exist
 | |
|         tile_path = os.path.join(cache_dir, f"{y}.png")
 | |
| 
 | |
|         # Check if the tile exists in the disk cache
 | |
|         if os.path.exists(tile_path):
 | |
|             #print(f"Tile {x}, {y} loaded from disk cache.")
 | |
|             with open(tile_path, 'rb') as f:
 | |
|                 tile_data = f.read()
 | |
|                 #ram_cache.set(cache_key, tile_data)  # Load into RAM cache
 | |
|             return Image.open(tile_path)
 | |
| 
 | |
|         data = self.web_get_tile("freenauticalchart.net", f"/qmap-de/{zoom}/{x}/{y}.png")
 | |
|         if data:
 | |
|             tile = Image.open(BytesIO(data))
 | |
|             tile.save(tile_path)
 | |
|             return tile
 | |
|         else:
 | |
|             return Image.new('RGB', (256, 256), (200, 200, 200)) # Fallback image
 | |
| 
 | |
|     def draw_cross(self, draw, x_offset, y_offset):
 | |
|         line_length = 10  # Length of cross lines
 | |
|         line_color = (255, 0, 0)  # Red
 | |
|         draw.line((x_offset - line_length, y_offset, x_offset + line_length, y_offset), fill=line_color, width=2)
 | |
|         draw.line((x_offset, y_offset - line_length, x_offset, y_offset + line_length), fill=line_color, width=2)
 | |
| 
 | |
|     def draw_tile_borders(self, draw, tile_x, tile_y):
 | |
|         """
 | |
|         Draws a black line around each tile with a pixel width.
 | |
|         """
 | |
|         top_left_x = tile_x * 256
 | |
|         top_left_y = tile_y * 256
 | |
|         bottom_right_x = top_left_x + 256
 | |
|         bottom_right_y = top_left_y + 256
 | |
|         draw.rectangle((top_left_x, top_left_y, bottom_right_x - 1, bottom_right_y - 1), outline="black", width=1)
 | |
| 
 | |
|     def stitch_tiles(self, lat, lon, zoom, debug=False):
 | |
|         """
 | |
|         Loads the required tiles and stitches them into one image,
 | |
|         and then crop that image so that lat/lon ist centered
 | |
|         """
 | |
|         # Convert geo-coordinates to tile coordinates and offset
 | |
|         x_tile, y_tile, x_offset, y_offset = self.latlon_to_xyz(lat, lon, zoom)
 | |
| 
 | |
|         # No rotation, north up: calculation of needed tiles
 | |
|         num_tiles_x = int(self.width // 256) + 2
 | |
|         num_tiles_y = int(self.height // 256) + 2
 | |
|         central_tile_x = num_tiles_x // 2
 | |
|         central_tile_y = num_tiles_y // 2
 | |
|         center_x = central_tile_x * 256 + x_offset
 | |
|         center_y = central_tile_y * 256 + y_offset
 | |
| 
 | |
|         # Create an empty image for the final mosaic
 | |
|         combined_image = Image.new('RGB', (num_tiles_x * 256, num_tiles_y * 256))
 | |
|         draw = ImageDraw.Draw(combined_image)
 | |
| 
 | |
|         # Download and stitch the tiles
 | |
|         x_tile -= num_tiles_x // 2
 | |
|         y_tile -= num_tiles_y // 2
 | |
|         for i in range(num_tiles_x):
 | |
|             for j in range(num_tiles_y):
 | |
|                 tile = self.fetch_osm_tile(x_tile + i, y_tile + j, zoom)
 | |
|                 combined_image.paste(tile, (i * 256, j * 256))
 | |
|                 if debug == True:
 | |
|                     # Draw the black line around each tile
 | |
|                     self.draw_tile_borders(draw, i, j)
 | |
|         if debug:
 | |
|             # Draw a cross on the central tile at the offset position
 | |
|             self.draw_cross(draw, center_x, center_y)
 | |
| 
 | |
|         # Determine the crop area
 | |
|         new_left = int(center_x - self.width // 2)
 | |
|         new_top = int(center_y - self.height // 2)
 | |
|         new_right = new_left + self.width
 | |
|         new_bottom = new_top + self.height
 | |
| 
 | |
|         return combined_image.crop((new_left, new_top, new_right, new_bottom))
 | |
| 
 | |
|     def get_round_bwmap(self, lat, lon, zoom, debug=False):
 | |
|         # Rundes s/w Bild erzeugen z.B. für Ankerkreis
 | |
|         image = self.stitch_tiles(lat, lon, zoom, debug)
 | |
|         mask = Image.new('1', (self.width, self.height), 0)
 | |
|         draw = ImageDraw.Draw(mask)
 | |
|         radius = min(self.width, self.height) // 2
 | |
|         center_x = self.width // 2
 | |
|         center_y = self.height // 2
 | |
|         bounding_box = (center_x - radius, center_y - radius, center_x + radius, center_y + radius)
 | |
|         draw.ellipse(bounding_box, fill=255)
 | |
|         image.putalpha(mask)
 | |
|         roundimage = Image.new('RGB', (self.width, self.height), (255, 255, 255))
 | |
|         roundimage.paste(image, mask=image.split()[3])
 | |
|         return roundimage.convert('1', dither=self.dither_type)       
 | |
| 
 | |
|     def get_round_bwmap_cairo(self, lat, lon, zoom, bgcolor=(255,255,255), debug=False):
 | |
|         image = self.get_round_bwmap(lat, lon, zoom, debug).convert("RGBA")
 | |
|         r, g, b, a = image.split()
 | |
|         r = r.point(lambda p: bgcolor[0] if p == 255 else p)
 | |
|         g = g.point(lambda p: bgcolor[1] if p == 255 else p)
 | |
|         b = b.point(lambda p: bgcolor[2] if p == 255 else p)
 | |
|         image = Image.merge("RGBA", (r, g, b, a))
 | |
|         data = bytearray(image.tobytes())
 | |
|         return cairo.ImageSurface.create_for_data(data, cairo.FORMAT_ARGB32, self.width, self.height)
 |