Files
PMTiles/python/pmtiles/tile.py
Taku Fukada 9b69ab4b64 Simplify and optimize Hilbert tile ID <-> XYZ conversion (#527)
* Optimize Hilbert tile ID <-> XYZ conversion

* biome

* tile_id: more improvements

* tile_id(cpp): uint32
2025-02-21 15:37:49 +08:00

275 lines
7.1 KiB
Python

import gzip
import io
from enum import Enum
class Entry:
__slots__ = ("tile_id", "offset", "length", "run_length")
def __init__(self, tile_id, offset, length, run_length):
self.tile_id = tile_id
self.offset = offset
self.length = length
self.run_length = run_length
def __str__(self):
return f"id={self.tile_id} offset={self.offset} length={self.length} runlength={self.run_length}"
def rotate(n, x, y, rx, ry):
if ry == 0:
if rx != 0:
x = n - 1 - x
y = n - 1 - y
x, y = y, x
return x, y
def zxy_to_tileid(z, x, y):
if z > 31:
raise OverflowError("tile zoom exceeds 64-bit limit")
if x > (1 << z) - 1 or y > (1 << z) - 1:
raise ValueError("tile x/y outside zoom level bounds")
acc = ((1 << (z * 2)) - 1) // 3
a = z - 1
while a >= 0:
s = 1 << a
rx = s & x
ry = s & y
acc += ((3 * rx) ^ ry) << a
(x, y) = rotate(s, x, y, rx, ry)
a -= 1
return acc
def tileid_to_zxy(tile_id):
z = ((3 * tile_id + 1).bit_length() - 1) // 2
if z >= 32:
raise OverflowError("tile zoom exceeds 64-bit limit")
acc = ((1 << (z * 2)) - 1) // 3
pos = tile_id - acc
x = 0
y = 0
s = 1
n = 1 << z
while s < n:
rx = (pos // 2) & s
ry = (pos ^ rx) & s
(x, y) = rotate(s, x, y, rx, ry)
x += rx
y += ry
pos >>= 1
s <<= 1
return (z, x, y)
def find_tile(entries, tile_id):
m = 0
n = len(entries) - 1
while m <= n:
k = (n + m) >> 1
c = tile_id - entries[k].tile_id
if c > 0:
m = k + 1
elif c < 0:
n = k - 1
else:
return entries[k]
if n >= 0:
if entries[n].run_length == 0:
return entries[n]
if tile_id - entries[n].tile_id < entries[n].run_length:
return entries[n]
def read_varint(b_io):
shift = 0
result = 0
while True:
raw = b_io.read(1)
if raw == b"":
raise EOFError("unexpectedly reached end of varint stream")
i = ord(raw)
result |= (i & 0x7F) << shift
shift += 7
if not (i & 0x80):
break
return result
def write_varint(b_io, i):
while True:
towrite = i & 0x7F
i >>= 7
if i:
b_io.write(bytes([towrite | 0x80]))
else:
b_io.write(bytes([towrite]))
break
class Compression(Enum):
UNKNOWN = 0
NONE = 1
GZIP = 2
BROTLI = 3
ZSTD = 4
class TileType(Enum):
UNKNOWN = 0
MVT = 1
PNG = 2
JPEG = 3
WEBP = 4
AVIF = 5
def deserialize_directory(buf):
b_io = io.BytesIO(gzip.decompress(buf))
entries = []
num_entries = read_varint(b_io)
last_id = 0
for i in range(num_entries):
tmp = read_varint(b_io)
entries.append(Entry(last_id + tmp, 0, 0, 0))
last_id += tmp
for i in range(num_entries):
entries[i].run_length = read_varint(b_io)
for i in range(num_entries):
entries[i].length = read_varint(b_io)
for i in range(num_entries):
tmp = read_varint(b_io)
if i > 0 and tmp == 0:
entries[i].offset = entries[i - 1].offset + entries[i - 1].length
else:
entries[i].offset = tmp - 1
return entries
def serialize_directory(entries):
b_io = io.BytesIO()
write_varint(b_io, len(entries))
last_id = 0
for e in entries:
write_varint(b_io, e.tile_id - last_id)
last_id = e.tile_id
for e in entries:
write_varint(b_io, e.run_length)
for e in entries:
write_varint(b_io, e.length)
for i, e in enumerate(entries):
if i > 0 and e.offset == entries[i - 1].offset + entries[i - 1].length:
write_varint(b_io, 0)
else:
write_varint(b_io, e.offset + 1)
return gzip.compress(b_io.getvalue())
class SpecVersionUnsupported(Exception):
pass
class MagicNumberNotFound(Exception):
pass
def deserialize_header(buf):
if buf[0:7].decode() != "PMTiles":
raise MagicNumberNotFound()
if buf[7] != 0x3:
raise SpecVersionUnsupported()
def read_uint64(pos):
return int.from_bytes(buf[pos : pos + 8], byteorder="little")
def read_int32(pos):
return int.from_bytes(buf[pos : pos + 4], byteorder="little", signed=True)
return {
"version": buf[7],
"root_offset": read_uint64(8),
"root_length": read_uint64(16),
"metadata_offset": read_uint64(24),
"metadata_length": read_uint64(32),
"leaf_directory_offset": read_uint64(40),
"leaf_directory_length": read_uint64(48),
"tile_data_offset": read_uint64(56),
"tile_data_length": read_uint64(64),
"addressed_tiles_count": read_uint64(72),
"tile_entries_count": read_uint64(80),
"tile_contents_count": read_uint64(88),
"clustered": buf[96] == 0x1,
"internal_compression": Compression(buf[97]),
"tile_compression": Compression(buf[98]),
"tile_type": TileType(buf[99]),
"min_zoom": buf[100],
"max_zoom": buf[101],
"min_lon_e7": read_int32(102),
"min_lat_e7": read_int32(106),
"max_lon_e7": read_int32(110),
"max_lat_e7": read_int32(114),
"center_zoom": buf[118],
"center_lon_e7": read_int32(119),
"center_lat_e7": read_int32(123),
}
def serialize_header(h):
b_io = io.BytesIO()
def write_uint64(i):
b_io.write(i.to_bytes(8, byteorder="little"))
def write_int32(i):
b_io.write(i.to_bytes(4, byteorder="little", signed=True))
def write_uint8(i):
b_io.write(i.to_bytes(1, byteorder="little"))
b_io.write("PMTiles".encode())
b_io.write(b"\x03")
write_uint64(h["root_offset"])
write_uint64(h["root_length"])
write_uint64(h["metadata_offset"])
write_uint64(h["metadata_length"])
write_uint64(h.get("leaf_directory_offset", 0))
write_uint64(h.get("leaf_directory_length", 0))
write_uint64(h["tile_data_offset"])
write_uint64(h["tile_data_length"])
write_uint64(h.get("addressed_tiles_count", 0))
write_uint64(h.get("tile_entries_count", 0))
write_uint64(h.get("tile_contents_count", 0))
b_io.write(b"\x01" if h["clustered"] else b"\x00")
write_uint8(h["internal_compression"].value)
write_uint8(h["tile_compression"].value)
write_uint8(h["tile_type"].value)
write_uint8(h["min_zoom"])
write_uint8(h["max_zoom"])
min_lon_e7 = h.get("min_lon_e7", int(-180 * 10000000))
write_int32(min_lon_e7)
min_lat_e7 = h.get("min_lat_e7", int(-90 * 10000000))
write_int32(min_lat_e7)
max_lon_e7 = h.get("max_lon_e7", int(180 * 10000000))
write_int32(max_lon_e7)
max_lat_e7 = h.get("max_lat_e7", int(90 * 10000000))
write_int32(max_lat_e7)
write_uint8(h.get("center_zoom", h["min_zoom"]))
write_int32(h.get("center_lon_e7", round((min_lon_e7 + max_lon_e7) / 2)))
write_int32(h.get("center_lat_e7", round((min_lat_e7 + max_lat_e7) / 2)))
return b_io.getvalue()