diff --git a/python/pmtiles/tile.py b/python/pmtiles/tile.py index 0f063e0..cbcdcd3 100644 --- a/python/pmtiles/tile.py +++ b/python/pmtiles/tile.py @@ -1,9 +1,21 @@ from collections import namedtuple +from enum import Enum +import io +import gzip -Entry = namedtuple("Entry", ["tile_id", "offset", "length", "run_length"]) Header = namedtuple("Header", []) +class Entry: + __slots__ = ("tile_id", "offset", "length", "run_length") + + def __init__(self, tile_id, offset, length, run_length): + self.tile_id = tile_id + self.offset = offset + self.length = length + self.run_length = run_length + + def rotate(n, xy, rx, ry): if ry == 0: if rx == 1: @@ -87,12 +99,89 @@ def find_tile(entries, tile_id): return entries[n] -def deserialize_directory(bytes): - pass +def read_varint(b_io): + shift = 0 + result = 0 + while True: + raw = b_io.read(1) + if raw == b"": + raise EOFError("unexpectedly reached end of varint stream") + i = ord(raw) + result |= (i & 0x7F) << shift + shift += 7 + if not (i & 0x80): + break + return result -def serialize_directory(bytes): - pass +def write_varint(b_io, i): + while True: + towrite = i & 0x7F + i >>= 7 + if i: + b_io.write(bytes([towrite | 0x80])) + else: + b_io.write(bytes([towrite])) + break + + +class Compression(Enum): + UNKNOWN = 0 + NONE = 1 + GZIP = 2 + BROTLI = 3 + ZSTD = 4 + + +def deserialize_directory(buf): + b_io = io.BytesIO(gzip.decompress(buf)) + entries = [] + num_entries = read_varint(b_io) + + last_id = 0 + for i in range(num_entries): + tmp = read_varint(b_io) + entries.append(Entry(last_id + tmp, 0, 0, 0)) + last_id += tmp + + for i in range(num_entries): + entries[i].run_length = read_varint(b_io) + + for i in range(num_entries): + entries[i].length = read_varint(b_io) + + for i in range(num_entries): + tmp = read_varint(b_io) + if i > 0 and tmp == 0: + entries[i].offset = entries[i - 1].offset + entries[i - 1].length + else: + entries[i].offset = tmp - 1 + + return entries + + +def serialize_directory(entries): + b_io = io.BytesIO() + write_varint(b_io, len(entries)) + + last_id = 0 + for e in entries: + write_varint(b_io, e.tile_id - last_id) + last_id = e.tile_id + + for e in entries: + write_varint(b_io, e.run_length) + + for e in entries: + write_varint(b_io, e.length) + + for i, e in enumerate(entries): + if i > 0 and e.offset == entries[i - 1].offset + entries[i - 1].length: + write_varint(b_io, 0) + else: + write_varint(b_io, e.offset + 1) + + return gzip.compress(b_io.getvalue()) def deserialize_header(bytes): diff --git a/python/test/test_tile.py b/python/test/test_tile.py index 7a4505c..5ef429b 100644 --- a/python/test/test_tile.py +++ b/python/test/test_tile.py @@ -1,5 +1,30 @@ import unittest -from pmtiles.tile import zxy_to_tileid, tileid_to_zxy, Entry, find_tile +from pmtiles.tile import zxy_to_tileid, tileid_to_zxy, Entry +from pmtiles.tile import read_varint, write_varint +from pmtiles.tile import Entry, find_tile +from pmtiles.tile import serialize_directory, deserialize_directory +import io + + +class TestVarint(unittest.TestCase): + def test_read_varint(self): + buf = io.BytesIO(b"\x00\x01\x7f\xe5\x8e\x26") + self.assertEqual(read_varint(buf), 0) + self.assertEqual(read_varint(buf), 1) + self.assertEqual(read_varint(buf), 127) + self.assertEqual(read_varint(buf), 624485) + + def test_read_varint_eof(self): + buf = io.BytesIO(b"") + self.assertRaises(EOFError, read_varint, buf) + + def test_write_varint(self): + buf = io.BytesIO() + write_varint(buf, 0) + write_varint(buf, 1) + write_varint(buf, 127) + write_varint(buf, 624485) + self.assertEqual(buf.getvalue(), b"\x00\x01\x7f\xe5\x8e\x26") class TestTileId(unittest.TestCase): @@ -58,3 +83,23 @@ class TestFindTile(unittest.TestCase): result = find_tile(entries, 150) self.assertEqual(result.offset, 1) self.assertEqual(result.length, 1) + + +class TestDirectory(unittest.TestCase): + def test_roundtrip(self): + entries = [Entry(0, 0, 0, 0), Entry(1, 1, 1, 1), Entry(2, 2, 2, 2)] + serialized = serialize_directory(entries) + result = deserialize_directory(serialized) + self.assertEqual(len(result), 3) + self.assertEqual(result[0].tile_id, 0) + self.assertEqual(result[0].offset, 0) + self.assertEqual(result[0].length, 0) + self.assertEqual(result[0].run_length, 0) + self.assertEqual(result[1].tile_id, 1) + self.assertEqual(result[1].offset, 1) + self.assertEqual(result[1].length, 1) + self.assertEqual(result[1].run_length, 1) + self.assertEqual(result[2].tile_id, 2) + self.assertEqual(result[2].offset, 2) + self.assertEqual(result[2].length, 2) + self.assertEqual(result[2].run_length, 2)