mirror of
https://github.com/protomaps/PMTiles.git
synced 2026-02-04 02:41:09 +00:00
284 lines
7.1 KiB
Python
284 lines
7.1 KiB
Python
from enum import Enum
|
|
import io
|
|
import gzip
|
|
|
|
|
|
class Entry:
|
|
__slots__ = ("tile_id", "offset", "length", "run_length")
|
|
|
|
def __init__(self, tile_id, offset, length, run_length):
|
|
self.tile_id = tile_id
|
|
self.offset = offset
|
|
self.length = length
|
|
self.run_length = run_length
|
|
|
|
def __str__(self):
|
|
return f"id={self.tile_id} offset={self.offset} length={self.length} runlength={self.run_length}"
|
|
|
|
|
|
def rotate(n, xy, rx, ry):
|
|
if ry == 0:
|
|
if rx == 1:
|
|
xy[0] = n - 1 - xy[0]
|
|
xy[1] = n - 1 - xy[1]
|
|
xy[0], xy[1] = xy[1], xy[0]
|
|
|
|
|
|
def t_on_level(z, pos):
|
|
n = 1 << z
|
|
rx, ry, t = pos, pos, pos
|
|
xy = [0, 0]
|
|
s = 1
|
|
while s < n:
|
|
rx = 1 & (t // 2)
|
|
ry = 1 & (t ^ rx)
|
|
rotate(s, xy, rx, ry)
|
|
xy[0] += s * rx
|
|
xy[1] += s * ry
|
|
t //= 4
|
|
s *= 2
|
|
return z, xy[0], xy[1]
|
|
|
|
|
|
def zxy_to_tileid(z, x, y):
|
|
if z > 31:
|
|
raise OverflowError("tile zoom exceeds 64-bit limit")
|
|
if x > (1 << z) - 1 or y > (1 << z) - 1:
|
|
raise ValueError("tile x/y outside zoom level bounds")
|
|
acc = 0
|
|
tz = 0
|
|
while tz < z:
|
|
acc += (0x1 << tz) * (0x1 << tz)
|
|
tz += 1
|
|
n = 1 << z
|
|
rx = 0
|
|
ry = 0
|
|
d = 0
|
|
xy = [x, y]
|
|
s = n // 2
|
|
while s > 0:
|
|
if (xy[0] & s) > 0:
|
|
rx = 1
|
|
else:
|
|
rx = 0
|
|
if (xy[1] & s) > 0:
|
|
ry = 1
|
|
else:
|
|
ry = 0
|
|
d += s * s * ((3 * rx) ^ ry)
|
|
rotate(s, xy, rx, ry)
|
|
s //= 2
|
|
return acc + d
|
|
|
|
|
|
def tileid_to_zxy(tile_id):
|
|
num_tiles = 0
|
|
acc = 0
|
|
for z in range(0,32):
|
|
num_tiles = (1 << z) * (1 << z)
|
|
if acc + num_tiles > tile_id:
|
|
return t_on_level(z, tile_id - acc)
|
|
acc += num_tiles
|
|
raise OverflowError("tile zoom exceeds 64-bit limit")
|
|
|
|
|
|
def find_tile(entries, tile_id):
|
|
m = 0
|
|
n = len(entries) - 1
|
|
while m <= n:
|
|
k = (n + m) >> 1
|
|
c = tile_id - entries[k].tile_id
|
|
if c > 0:
|
|
m = k + 1
|
|
elif c < 0:
|
|
n = k - 1
|
|
else:
|
|
return entries[k]
|
|
|
|
if n >= 0:
|
|
if entries[n].run_length == 0:
|
|
return entries[n]
|
|
if tile_id - entries[n].tile_id < entries[n].run_length:
|
|
return entries[n]
|
|
|
|
|
|
def read_varint(b_io):
|
|
shift = 0
|
|
result = 0
|
|
while True:
|
|
raw = b_io.read(1)
|
|
if raw == b"":
|
|
raise EOFError("unexpectedly reached end of varint stream")
|
|
i = ord(raw)
|
|
result |= (i & 0x7F) << shift
|
|
shift += 7
|
|
if not (i & 0x80):
|
|
break
|
|
return result
|
|
|
|
|
|
def write_varint(b_io, i):
|
|
while True:
|
|
towrite = i & 0x7F
|
|
i >>= 7
|
|
if i:
|
|
b_io.write(bytes([towrite | 0x80]))
|
|
else:
|
|
b_io.write(bytes([towrite]))
|
|
break
|
|
|
|
|
|
class Compression(Enum):
|
|
UNKNOWN = 0
|
|
NONE = 1
|
|
GZIP = 2
|
|
BROTLI = 3
|
|
ZSTD = 4
|
|
|
|
|
|
class TileType(Enum):
|
|
UNKNOWN = 0
|
|
MVT = 1
|
|
PNG = 2
|
|
JPEG = 3
|
|
WEBP = 4
|
|
|
|
|
|
def deserialize_directory(buf):
|
|
b_io = io.BytesIO(gzip.decompress(buf))
|
|
entries = []
|
|
num_entries = read_varint(b_io)
|
|
|
|
last_id = 0
|
|
for i in range(num_entries):
|
|
tmp = read_varint(b_io)
|
|
entries.append(Entry(last_id + tmp, 0, 0, 0))
|
|
last_id += tmp
|
|
|
|
for i in range(num_entries):
|
|
entries[i].run_length = read_varint(b_io)
|
|
|
|
for i in range(num_entries):
|
|
entries[i].length = read_varint(b_io)
|
|
|
|
for i in range(num_entries):
|
|
tmp = read_varint(b_io)
|
|
if i > 0 and tmp == 0:
|
|
entries[i].offset = entries[i - 1].offset + entries[i - 1].length
|
|
else:
|
|
entries[i].offset = tmp - 1
|
|
|
|
return entries
|
|
|
|
|
|
def serialize_directory(entries):
|
|
b_io = io.BytesIO()
|
|
write_varint(b_io, len(entries))
|
|
|
|
last_id = 0
|
|
for e in entries:
|
|
write_varint(b_io, e.tile_id - last_id)
|
|
last_id = e.tile_id
|
|
|
|
for e in entries:
|
|
write_varint(b_io, e.run_length)
|
|
|
|
for e in entries:
|
|
write_varint(b_io, e.length)
|
|
|
|
for i, e in enumerate(entries):
|
|
if i > 0 and e.offset == entries[i - 1].offset + entries[i - 1].length:
|
|
write_varint(b_io, 0)
|
|
else:
|
|
write_varint(b_io, e.offset + 1)
|
|
|
|
return gzip.compress(b_io.getvalue())
|
|
|
|
class SpecVersionUnsupported(Exception):
|
|
pass
|
|
|
|
class MagicNumberNotFound(Exception):
|
|
pass
|
|
|
|
def deserialize_header(buf):
|
|
if buf[0:7].decode() != "PMTiles":
|
|
raise MagicNumberNotFound()
|
|
|
|
if buf[7] != 0x3:
|
|
raise SpecVersionUnsupported()
|
|
|
|
def read_uint64(pos):
|
|
return int.from_bytes(buf[pos : pos + 8], byteorder="little")
|
|
|
|
def read_int32(pos):
|
|
return int.from_bytes(buf[pos : pos + 4], byteorder="little", signed=True)
|
|
|
|
return {
|
|
"root_offset": read_uint64(8),
|
|
"root_length": read_uint64(16),
|
|
"metadata_offset": read_uint64(24),
|
|
"metadata_length": read_uint64(32),
|
|
"leaf_directory_offset": read_uint64(40),
|
|
"leaf_directory_length": read_uint64(48),
|
|
"tile_data_offset": read_uint64(56),
|
|
"tile_data_length": read_uint64(64),
|
|
"addressed_tiles_count": read_uint64(72),
|
|
"tile_entries_count": read_uint64(80),
|
|
"tile_contents_count": read_uint64(88),
|
|
"clustered": buf[96] == 0x1,
|
|
"internal_compression": Compression(buf[97]),
|
|
"tile_compression": Compression(buf[98]),
|
|
"tile_type": TileType(buf[99]),
|
|
"min_zoom": buf[100],
|
|
"max_zoom": buf[101],
|
|
"min_lon_e7": read_int32(102),
|
|
"min_lat_e7": read_int32(106),
|
|
"max_lon_e7": read_int32(110),
|
|
"max_lat_e7": read_int32(114),
|
|
"center_zoom": buf[118],
|
|
"center_lon_e7": read_int32(119),
|
|
"center_lat_e7": read_int32(123),
|
|
}
|
|
|
|
|
|
def serialize_header(h):
|
|
b_io = io.BytesIO()
|
|
|
|
def write_uint64(i):
|
|
b_io.write(i.to_bytes(8, byteorder="little"))
|
|
|
|
def write_int32(i):
|
|
b_io.write(i.to_bytes(4, byteorder="little", signed=True))
|
|
|
|
def write_uint8(i):
|
|
b_io.write(i.to_bytes(1, byteorder="little"))
|
|
|
|
b_io.write("PMTiles".encode())
|
|
b_io.write(b"\x03")
|
|
write_uint64(h["root_offset"])
|
|
write_uint64(h["root_length"])
|
|
write_uint64(h["metadata_offset"])
|
|
write_uint64(h["metadata_length"])
|
|
write_uint64(h.get("leaf_directory_offset", 0))
|
|
write_uint64(h.get("leaf_directory_length", 0))
|
|
write_uint64(h["tile_data_offset"])
|
|
write_uint64(h["tile_data_length"])
|
|
write_uint64(h.get("addressed_tiles_count", 0))
|
|
write_uint64(h.get("tile_entries_count", 0))
|
|
write_uint64(h.get("tile_contents_count", 0))
|
|
b_io.write(b"\x01" if h["clustered"] else b"\x00")
|
|
write_uint8(h["internal_compression"].value)
|
|
write_uint8(h["tile_compression"].value)
|
|
write_uint8(h["tile_type"].value)
|
|
write_uint8(h["min_zoom"])
|
|
write_uint8(h["max_zoom"])
|
|
write_int32(h["min_lon_e7"])
|
|
write_int32(h["min_lat_e7"])
|
|
write_int32(h["max_lon_e7"])
|
|
write_int32(h["max_lat_e7"])
|
|
write_uint8(h["center_zoom"])
|
|
write_int32(h["center_lon_e7"])
|
|
write_int32(h["center_lat_e7"])
|
|
|
|
return b_io.getvalue()
|