mirror of
https://github.com/protomaps/PMTiles.git
synced 2026-02-04 19:01:08 +00:00
python: fix magic number in writer, assert current spec version and magic number in reader
This commit is contained in:
@@ -191,8 +191,19 @@ def serialize_directory(entries):
|
|||||||
|
|
||||||
return gzip.compress(b_io.getvalue())
|
return gzip.compress(b_io.getvalue())
|
||||||
|
|
||||||
|
class SpecVersionUnsupported(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class MagicNumberNotFound(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
def deserialize_header(buf):
|
def deserialize_header(buf):
|
||||||
|
if buf[0:7].decode() != "PMTiles":
|
||||||
|
raise MagicNumberNotFound()
|
||||||
|
|
||||||
|
if buf[7] != 0x3:
|
||||||
|
raise SpecVersionUnsupported()
|
||||||
|
|
||||||
def read_uint64(pos):
|
def read_uint64(pos):
|
||||||
return int.from_bytes(buf[pos : pos + 8], byteorder="little")
|
return int.from_bytes(buf[pos : pos + 8], byteorder="little")
|
||||||
|
|
||||||
@@ -240,7 +251,7 @@ def serialize_header(h):
|
|||||||
b_io.write(i.to_bytes(1, byteorder="little"))
|
b_io.write(i.to_bytes(1, byteorder="little"))
|
||||||
|
|
||||||
b_io.write("PMTiles".encode())
|
b_io.write("PMTiles".encode())
|
||||||
b_io.write(b"3")
|
b_io.write(b"\x03")
|
||||||
write_uint64(h["root_offset"])
|
write_uint64(h["root_offset"])
|
||||||
write_uint64(h["root_length"])
|
write_uint64(h["root_length"])
|
||||||
write_uint64(h["metadata_offset"])
|
write_uint64(h["metadata_offset"])
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ from pmtiles.tile import zxy_to_tileid, tileid_to_zxy, Entry
|
|||||||
from pmtiles.tile import read_varint, write_varint
|
from pmtiles.tile import read_varint, write_varint
|
||||||
from pmtiles.tile import Entry, find_tile, Compression, TileType
|
from pmtiles.tile import Entry, find_tile, Compression, TileType
|
||||||
from pmtiles.tile import serialize_directory, deserialize_directory
|
from pmtiles.tile import serialize_directory, deserialize_directory
|
||||||
from pmtiles.tile import serialize_header, deserialize_header
|
from pmtiles.tile import serialize_header, deserialize_header, SpecVersionUnsupported, MagicNumberNotFound
|
||||||
import io
|
import io
|
||||||
|
|
||||||
|
|
||||||
@@ -160,3 +160,10 @@ class TestHeader(unittest.TestCase):
|
|||||||
self.assertEqual(result["center_zoom"], 3)
|
self.assertEqual(result["center_zoom"], 3)
|
||||||
self.assertEqual(result["center_lon_e7"], 3.1 * 10000000)
|
self.assertEqual(result["center_lon_e7"], 3.1 * 10000000)
|
||||||
self.assertEqual(result["center_lat_e7"], 3.2 * 10000000)
|
self.assertEqual(result["center_lat_e7"], 3.2 * 10000000)
|
||||||
|
|
||||||
|
def test_spec_version(self):
|
||||||
|
with self.assertRaises(SpecVersionUnsupported):
|
||||||
|
result = deserialize_header(b'PMTiles\x04')
|
||||||
|
|
||||||
|
with self.assertRaises(MagicNumberNotFound):
|
||||||
|
result = deserialize_header(b'PM\x00\x02')
|
||||||
|
|||||||
Reference in New Issue
Block a user