From b1cfa7da24ef769e5d45afde68520895095706b9 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Tue, 11 Oct 2022 16:58:42 +0800 Subject: [PATCH] python v3: serialize/deserialize header --- python/pmtiles/tile.py | 88 ++++++++++++++++++++++++++++++++++++---- python/test/test_tile.py | 59 ++++++++++++++++++++++++++- 2 files changed, 139 insertions(+), 8 deletions(-) diff --git a/python/pmtiles/tile.py b/python/pmtiles/tile.py index cbcdcd3..d4cefbe 100644 --- a/python/pmtiles/tile.py +++ b/python/pmtiles/tile.py @@ -1,10 +1,7 @@ -from collections import namedtuple from enum import Enum import io import gzip -Header = namedtuple("Header", []) - class Entry: __slots__ = ("tile_id", "offset", "length", "run_length") @@ -133,6 +130,14 @@ class Compression(Enum): ZSTD = 4 +class TileType(Enum): + UNKNOWN = 0 + MVT = 1 + PNG = 2 + JPEG = 3 + WEBP = 4 + + def deserialize_directory(buf): b_io = io.BytesIO(gzip.decompress(buf)) entries = [] @@ -184,9 +189,78 @@ def serialize_directory(entries): return gzip.compress(b_io.getvalue()) -def deserialize_header(bytes): - pass +def deserialize_header(buf): + def read_uint64(pos): + return int.from_bytes(buf[pos : pos + 8], byteorder="little") + + def read_int32(pos): + return int.from_bytes(buf[pos : pos + 4], byteorder="little") + + return { + "root_offset": read_uint64(8), + "root_length": read_uint64(16), + "metadata_offset": read_uint64(24), + "metadata_length": read_uint64(32), + "leaf_directory_offset": read_uint64(40), + "leaf_directory_length": read_uint64(48), + "tile_data_offset": read_uint64(56), + "tile_data_length": read_uint64(64), + "addressed_tiles_count": read_uint64(72), + "tile_entries_count": read_uint64(80), + "tile_contents_count": read_uint64(88), + "clustered": buf[96] == 0x1, + "internal_compression": Compression(buf[97]), + "tile_compression": Compression(buf[98]), + "tile_type": TileType(buf[99]), + "min_zoom": buf[100], + "max_zoom": buf[101], + "min_lon_e7": read_int32(102), + "min_lat_e7": read_int32(106), + "max_lon_e7": read_int32(110), + "max_lat_e7": read_int32(114), + "center_zoom": buf[118], + "center_lon_e7": read_int32(119), + "center_lat_e7": read_int32(123), + } -def serialize_header(bytes): - pass +def serialize_header(h): + b_io = io.BytesIO() + + def write_uint64(i): + b_io.write(i.to_bytes(8, byteorder="little")) + + def write_int32(i): + b_io.write(i.to_bytes(4, byteorder="little")) + + def write_uint8(i): + b_io.write(i.to_bytes(1, byteorder="little")) + + b_io.write("PMTiles".encode()) + b_io.write(b"3") + write_uint64(h["root_offset"]) + write_uint64(h["root_length"]) + write_uint64(h["metadata_offset"]) + write_uint64(h["metadata_length"]) + write_uint64(h.get("leaf_directory_offset", 0)) + write_uint64(h.get("leaf_directory_length", 0)) + write_uint64(h["tile_data_offset"]) + write_uint64(h["tile_data_length"]) + write_uint64(h.get("addressed_tiles_count", 0)) + write_uint64(h.get("tile_entries_count", 0)) + write_uint64(h.get("tile_contents_count", 0)) + b_io.write(b"\x01" if h["clustered"] else b"\x00") + write_uint8(h["internal_compression"].value) + write_uint8(h["tile_compression"].value) + write_uint8(h["tile_type"].value) + write_uint8(h["min_zoom"]) + write_uint8(h["max_zoom"]) + write_int32(h["min_lon_e7"]) + write_int32(h["min_lat_e7"]) + write_int32(h["max_lon_e7"]) + write_int32(h["max_lat_e7"]) + write_uint8(h["center_zoom"]) + write_int32(h["center_lon_e7"]) + write_int32(h["center_lat_e7"]) + + return b_io.getvalue() diff --git a/python/test/test_tile.py b/python/test/test_tile.py index 5ef429b..a473617 100644 --- a/python/test/test_tile.py +++ b/python/test/test_tile.py @@ -1,8 +1,9 @@ import unittest from pmtiles.tile import zxy_to_tileid, tileid_to_zxy, Entry from pmtiles.tile import read_varint, write_varint -from pmtiles.tile import Entry, find_tile +from pmtiles.tile import Entry, find_tile, Compression, TileType from pmtiles.tile import serialize_directory, deserialize_directory +from pmtiles.tile import serialize_header, deserialize_header import io @@ -103,3 +104,59 @@ class TestDirectory(unittest.TestCase): self.assertEqual(result[2].offset, 2) self.assertEqual(result[2].length, 2) self.assertEqual(result[2].run_length, 2) + + +class TestHeader(unittest.TestCase): + def test_roundtrip(self): + header = { + "root_offset": 1, + "root_length": 2, + "metadata_offset": 3, + "metadata_length": 4, + "leaf_directory_offset": 5, + "leaf_directory_length": 6, + "tile_data_offset": 7, + "tile_data_length": 8, + "addressed_tiles_count": 9, + "tile_entries_count": 10, + "tile_contents_count": 11, + "clustered": True, + "internal_compression": Compression.GZIP, + "tile_compression": Compression.BROTLI, + "tile_type": TileType.MVT, + "min_zoom": 1, + "max_zoom": 2, + "min_lon_e7": int(1.1 * 10000000), + "min_lat_e7": int(2.1 * 10000000), + "max_lon_e7": int(1.2 * 10000000), + "max_lat_e7": int(2.2 * 10000000), + "center_zoom": 3, + "center_lon_e7": int(3.1 * 10000000), + "center_lat_e7": int(3.2 * 10000000), + } + serialized = serialize_header(header) + result = deserialize_header(serialized) + self.assertEqual(result["root_offset"], 1) + self.assertEqual(result["root_length"], 2) + self.assertEqual(result["metadata_offset"], 3) + self.assertEqual(result["metadata_length"], 4) + self.assertEqual(result["leaf_directory_offset"], 5) + self.assertEqual(result["leaf_directory_length"], 6) + self.assertEqual(result["tile_data_offset"], 7) + self.assertEqual(result["tile_data_length"], 8) + self.assertEqual(result["addressed_tiles_count"], 9) + self.assertEqual(result["tile_entries_count"], 10) + self.assertEqual(result["tile_contents_count"], 11) + self.assertEqual(result["clustered"], True) + self.assertEqual(result["internal_compression"], Compression.GZIP) + self.assertEqual(result["tile_compression"], Compression.BROTLI) + self.assertEqual(result["tile_type"], TileType.MVT) + self.assertEqual(result["min_zoom"], 1) + self.assertEqual(result["max_zoom"], 2) + self.assertEqual(result["min_lon_e7"], 1.1 * 10000000) + self.assertEqual(result["min_lat_e7"], 2.1 * 10000000) + self.assertEqual(result["max_lon_e7"], 1.2 * 10000000) + self.assertEqual(result["max_lat_e7"], 2.2 * 10000000) + self.assertEqual(result["center_zoom"], 3) + self.assertEqual(result["center_lon_e7"], 3.1 * 10000000) + self.assertEqual(result["center_lat_e7"], 3.2 * 10000000)