python client: implement v3 directory serialization/deserialization

This commit is contained in:
Brandon Liu
2022-10-11 15:57:15 +08:00
parent 6087d89391
commit 904031a85e
2 changed files with 140 additions and 6 deletions

View File

@@ -1,9 +1,21 @@
from collections import namedtuple
from enum import Enum
import io
import gzip
Entry = namedtuple("Entry", ["tile_id", "offset", "length", "run_length"])
Header = namedtuple("Header", [])
class Entry:
__slots__ = ("tile_id", "offset", "length", "run_length")
def __init__(self, tile_id, offset, length, run_length):
self.tile_id = tile_id
self.offset = offset
self.length = length
self.run_length = run_length
def rotate(n, xy, rx, ry):
if ry == 0:
if rx == 1:
@@ -87,12 +99,89 @@ def find_tile(entries, tile_id):
return entries[n]
def deserialize_directory(bytes):
pass
def read_varint(b_io):
shift = 0
result = 0
while True:
raw = b_io.read(1)
if raw == b"":
raise EOFError("unexpectedly reached end of varint stream")
i = ord(raw)
result |= (i & 0x7F) << shift
shift += 7
if not (i & 0x80):
break
return result
def serialize_directory(bytes):
pass
def write_varint(b_io, i):
while True:
towrite = i & 0x7F
i >>= 7
if i:
b_io.write(bytes([towrite | 0x80]))
else:
b_io.write(bytes([towrite]))
break
class Compression(Enum):
UNKNOWN = 0
NONE = 1
GZIP = 2
BROTLI = 3
ZSTD = 4
def deserialize_directory(buf):
b_io = io.BytesIO(gzip.decompress(buf))
entries = []
num_entries = read_varint(b_io)
last_id = 0
for i in range(num_entries):
tmp = read_varint(b_io)
entries.append(Entry(last_id + tmp, 0, 0, 0))
last_id += tmp
for i in range(num_entries):
entries[i].run_length = read_varint(b_io)
for i in range(num_entries):
entries[i].length = read_varint(b_io)
for i in range(num_entries):
tmp = read_varint(b_io)
if i > 0 and tmp == 0:
entries[i].offset = entries[i - 1].offset + entries[i - 1].length
else:
entries[i].offset = tmp - 1
return entries
def serialize_directory(entries):
b_io = io.BytesIO()
write_varint(b_io, len(entries))
last_id = 0
for e in entries:
write_varint(b_io, e.tile_id - last_id)
last_id = e.tile_id
for e in entries:
write_varint(b_io, e.run_length)
for e in entries:
write_varint(b_io, e.length)
for i, e in enumerate(entries):
if i > 0 and e.offset == entries[i - 1].offset + entries[i - 1].length:
write_varint(b_io, 0)
else:
write_varint(b_io, e.offset + 1)
return gzip.compress(b_io.getvalue())
def deserialize_header(bytes):