mirror of
https://github.com/protomaps/PMTiles.git
synced 2026-02-04 10:51:07 +00:00
python: pmtiles-convert from mbtiles writes v3 spec
This commit is contained in:
@@ -3,7 +3,12 @@ from io import BytesIO
|
||||
import os
|
||||
from pmtiles.writer import Writer
|
||||
from pmtiles.reader import Reader, MemorySource
|
||||
from pmtiles.convert import pmtiles_to_mbtiles, mbtiles_to_pmtiles
|
||||
from pmtiles.convert import (
|
||||
pmtiles_to_mbtiles,
|
||||
mbtiles_to_pmtiles,
|
||||
mbtiles_to_header_json,
|
||||
)
|
||||
from pmtiles.tile import TileType, Compression
|
||||
|
||||
|
||||
class TestConvert(unittest.TestCase):
|
||||
@@ -22,18 +27,52 @@ class TestConvert(unittest.TestCase):
|
||||
pass
|
||||
|
||||
def test_roundtrip(self):
|
||||
pass
|
||||
# with open("test_tmp.pmtiles", "wb") as f:
|
||||
# writer = Writer(f, 7)
|
||||
# writer.write_tile(1, 0, 0, b"0")
|
||||
# writer.write_tile(1, 0, 1, b"1")
|
||||
# writer.write_tile(1, 1, 0, b"2")
|
||||
# writer.write_tile(1, 1, 1, b"3")
|
||||
# writer.write_tile(2, 0, 0, b"4")
|
||||
# writer.write_tile(3, 0, 0, b"5")
|
||||
# writer.write_tile(2, 0, 1, b"6")
|
||||
# writer.write_tile(3, 0, 2, b"7")
|
||||
# writer.finalize({"key": "value"})
|
||||
|
||||
with open("test_tmp.pmtiles", "wb") as f:
|
||||
writer = Writer(f, 7)
|
||||
writer.write_tile(1, 0, 0, b"0")
|
||||
writer.write_tile(1, 0, 1, b"1")
|
||||
writer.write_tile(1, 1, 0, b"2")
|
||||
writer.write_tile(1, 1, 1, b"3")
|
||||
writer.write_tile(2, 0, 0, b"4")
|
||||
writer.write_tile(3, 0, 0, b"5")
|
||||
writer.write_tile(2, 0, 1, b"6")
|
||||
writer.write_tile(3, 0, 2, b"7")
|
||||
writer.finalize({"key": "value"})
|
||||
# pmtiles_to_mbtiles("test_tmp.pmtiles", "test_tmp.mbtiles", False)
|
||||
# mbtiles_to_pmtiles("test_tmp.mbtiles", "test_tmp_2.pmtiles", 3, False)
|
||||
|
||||
pmtiles_to_mbtiles("test_tmp.pmtiles", "test_tmp.mbtiles", False)
|
||||
mbtiles_to_pmtiles("test_tmp.mbtiles", "test_tmp_2.pmtiles", 3, False)
|
||||
def test_mbtiles_header(self):
|
||||
header, json_metadata = mbtiles_to_header_json(
|
||||
{
|
||||
"name": "test_name",
|
||||
"format": "pbf",
|
||||
"bounds": "-180.0,-85,180,85",
|
||||
"center": "-122.1906,37.7599,11",
|
||||
"minzoom": "1",
|
||||
"maxzoom": "2",
|
||||
"attribution": "<div>abc</div>",
|
||||
"compression": "gzip",
|
||||
"json": '{"vector_layers":[{"abc":123}],"tilestats":{"def":456}}',
|
||||
}
|
||||
)
|
||||
self.assertEqual(header["min_lon_e7"], -180 * 10000000)
|
||||
self.assertTrue(isinstance(header["min_lon_e7"], int))
|
||||
self.assertEqual(header["min_lat_e7"], -85 * 10000000)
|
||||
self.assertEqual(header["max_lon_e7"], 180 * 10000000)
|
||||
self.assertEqual(header["max_lat_e7"], 85 * 10000000)
|
||||
self.assertEqual(header["tile_type"], TileType.MVT)
|
||||
self.assertEqual(header["center_lon_e7"], -122.1906 * 10000000)
|
||||
self.assertEqual(header["center_lat_e7"], 37.7599 * 10000000)
|
||||
self.assertEqual(header["center_zoom"], 11)
|
||||
self.assertEqual(header["min_zoom"], 1)
|
||||
self.assertEqual(header["max_zoom"], 2)
|
||||
self.assertEqual(header["tile_compression"], Compression.GZIP)
|
||||
|
||||
self.assertTrue("name" in json_metadata)
|
||||
self.assertTrue("format" in json_metadata)
|
||||
self.assertTrue("compression" in json_metadata)
|
||||
self.assertFalse("center" in json_metadata)
|
||||
self.assertFalse("bounds" in json_metadata)
|
||||
self.assertFalse("bounds" in json_metadata)
|
||||
|
||||
@@ -7,24 +7,24 @@ from pmtiles.reader import Reader, MemorySource
|
||||
class TestReader(unittest.TestCase):
|
||||
def test_roundtrip(self):
|
||||
buf = BytesIO()
|
||||
writer = Writer(buf, 5)
|
||||
writer.write_tile(1, 0, 0, b"0")
|
||||
writer.write_tile(1, 0, 1, b"1")
|
||||
writer.write_tile(1, 1, 0, b"2")
|
||||
writer.write_tile(2, 0, 0, b"4")
|
||||
writer.write_tile(3, 0, 0, b"5")
|
||||
writer.write_tile(2, 0, 1, b"6")
|
||||
writer.write_tile(3, 0, 2, b"7")
|
||||
writer.finalize({"key": "value"})
|
||||
# writer = Writer(buf, 5)
|
||||
# writer.write_tile(1, 0, 0, b"0")
|
||||
# writer.write_tile(1, 0, 1, b"1")
|
||||
# writer.write_tile(1, 1, 0, b"2")
|
||||
# writer.write_tile(2, 0, 0, b"4")
|
||||
# writer.write_tile(3, 0, 0, b"5")
|
||||
# writer.write_tile(2, 0, 1, b"6")
|
||||
# writer.write_tile(3, 0, 2, b"7")
|
||||
# writer.finalize({"key": "value"})
|
||||
|
||||
reader = Reader(MemorySource(buf.getvalue()))
|
||||
self.assertEqual(reader.header().version, 2)
|
||||
self.assertEqual(reader.header().metadata["key"], "value")
|
||||
self.assertEqual(reader.get(1, 0, 0), b"0")
|
||||
self.assertEqual(reader.get(1, 0, 1), b"1")
|
||||
self.assertEqual(reader.get(1, 1, 0), b"2")
|
||||
self.assertEqual(reader.get(2, 0, 0), b"4")
|
||||
self.assertEqual(reader.get(3, 0, 0), b"5")
|
||||
self.assertEqual(reader.get(2, 0, 1), b"6")
|
||||
self.assertEqual(reader.get(3, 0, 2), b"7")
|
||||
self.assertEqual(reader.get(1, 1, 1), None)
|
||||
# reader = Reader(MemorySource(buf.getvalue()))
|
||||
# self.assertEqual(reader.header().version, 2)
|
||||
# self.assertEqual(reader.header().metadata["key"], "value")
|
||||
# self.assertEqual(reader.get(1, 0, 0), b"0")
|
||||
# self.assertEqual(reader.get(1, 0, 1), b"1")
|
||||
# self.assertEqual(reader.get(1, 1, 0), b"2")
|
||||
# self.assertEqual(reader.get(2, 0, 0), b"4")
|
||||
# self.assertEqual(reader.get(3, 0, 0), b"5")
|
||||
# self.assertEqual(reader.get(2, 0, 1), b"6")
|
||||
# self.assertEqual(reader.get(3, 0, 2), b"7")
|
||||
# self.assertEqual(reader.get(1, 1, 1), None)
|
||||
|
||||
@@ -126,7 +126,7 @@ class TestHeader(unittest.TestCase):
|
||||
"tile_type": TileType.MVT,
|
||||
"min_zoom": 1,
|
||||
"max_zoom": 2,
|
||||
"min_lon_e7": int(1.1 * 10000000),
|
||||
"min_lon_e7": int(-1.1 * 10000000),
|
||||
"min_lat_e7": int(2.1 * 10000000),
|
||||
"max_lon_e7": int(1.2 * 10000000),
|
||||
"max_lat_e7": int(2.2 * 10000000),
|
||||
@@ -153,7 +153,7 @@ class TestHeader(unittest.TestCase):
|
||||
self.assertEqual(result["tile_type"], TileType.MVT)
|
||||
self.assertEqual(result["min_zoom"], 1)
|
||||
self.assertEqual(result["max_zoom"], 2)
|
||||
self.assertEqual(result["min_lon_e7"], 1.1 * 10000000)
|
||||
self.assertEqual(result["min_lon_e7"], -1.1 * 10000000)
|
||||
self.assertEqual(result["min_lat_e7"], 2.1 * 10000000)
|
||||
self.assertEqual(result["max_lon_e7"], 1.2 * 10000000)
|
||||
self.assertEqual(result["max_lat_e7"], 2.2 * 10000000)
|
||||
|
||||
@@ -1,96 +1 @@
|
||||
import unittest
|
||||
from pmtiles import Entry
|
||||
from pmtiles.writer import find_leaf_level, make_pyramid
|
||||
|
||||
|
||||
class TestTilePyramid(unittest.TestCase):
|
||||
def test_root_sorted(self):
|
||||
entries = [
|
||||
Entry(1, 0, 0, 1, 1, False),
|
||||
Entry(1, 0, 1, 2, 1, False),
|
||||
Entry(1, 1, 0, 3, 1, False),
|
||||
Entry(1, 1, 1, 4, 1, False),
|
||||
Entry(0, 0, 0, 0, 1, False),
|
||||
]
|
||||
root_entries, leaf_dirs = make_pyramid(entries, 0, 6)
|
||||
self.assertEqual(len(root_entries), 5)
|
||||
self.assertEqual(len(leaf_dirs), 0)
|
||||
self.assertEqual(root_entries[0].z, 0)
|
||||
self.assertEqual(root_entries[4].z, 1)
|
||||
|
||||
def test_leafdir(self):
|
||||
entries = [
|
||||
Entry(0, 0, 0, 0, 1, False),
|
||||
Entry(1, 0, 0, 1, 1, False),
|
||||
Entry(1, 0, 1, 2, 1, False),
|
||||
Entry(1, 1, 0, 3, 1, False),
|
||||
Entry(1, 1, 1, 4, 1, False),
|
||||
Entry(2, 0, 0, 5, 1, False),
|
||||
Entry(3, 0, 0, 6, 1, False),
|
||||
Entry(2, 0, 1, 7, 1, False),
|
||||
Entry(3, 0, 2, 8, 1, False),
|
||||
]
|
||||
root_entries, leaf_dirs = make_pyramid(entries, 0, 7)
|
||||
self.assertEqual(len(root_entries), 7)
|
||||
self.assertEqual(root_entries[5].y, 0)
|
||||
self.assertEqual(root_entries[6].y, 1)
|
||||
self.assertEqual(len(leaf_dirs), 1)
|
||||
self.assertEqual(len(leaf_dirs[0]), 4)
|
||||
self.assertEqual(leaf_dirs[0][0].z, 2)
|
||||
self.assertEqual(leaf_dirs[0][1].z, 2)
|
||||
self.assertEqual(leaf_dirs[0][2].z, 3)
|
||||
self.assertEqual(leaf_dirs[0][3].z, 3)
|
||||
|
||||
def test_leafdir_overflow(self):
|
||||
entries = [
|
||||
Entry(0, 0, 0, 0, 1, False),
|
||||
Entry(1, 0, 0, 1, 1, False),
|
||||
Entry(1, 0, 1, 2, 1, False),
|
||||
Entry(1, 1, 0, 3, 1, False),
|
||||
Entry(1, 1, 1, 4, 1, False),
|
||||
Entry(2, 0, 0, 5, 1, False),
|
||||
Entry(3, 0, 0, 6, 1, False),
|
||||
Entry(3, 0, 1, 7, 1, False),
|
||||
Entry(3, 1, 0, 8, 1, False),
|
||||
Entry(3, 1, 1, 9, 1, False),
|
||||
Entry(2, 0, 1, 10, 1, False),
|
||||
Entry(3, 0, 2, 11, 1, False),
|
||||
Entry(3, 0, 3, 12, 1, False),
|
||||
Entry(3, 1, 2, 13, 1, False),
|
||||
Entry(3, 1, 3, 14, 1, False),
|
||||
]
|
||||
root_entries, leaf_dirs = make_pyramid(entries, 0, 7)
|
||||
self.assertEqual(len(root_entries), 7)
|
||||
self.assertEqual(root_entries[5].y, 0)
|
||||
self.assertEqual(root_entries[6].y, 1)
|
||||
|
||||
def test_sparse_pyramid(self):
|
||||
entries = [
|
||||
Entry(0, 0, 0, 0, 1, False),
|
||||
Entry(1, 0, 0, 1, 1, False),
|
||||
Entry(1, 0, 1, 2, 1, False),
|
||||
Entry(1, 1, 0, 3, 1, False),
|
||||
Entry(1, 1, 1, 4, 1, False),
|
||||
Entry(2, 0, 0, 5, 1, False),
|
||||
Entry(3, 0, 0, 6, 1, False),
|
||||
# Entry(2,0,1,7,1,False), make this entry missing
|
||||
Entry(3, 0, 2, 8, 1, False),
|
||||
]
|
||||
root_entries, leaf_dirs = make_pyramid(entries, 0, 7)
|
||||
self.assertEqual(len(root_entries), 7)
|
||||
self.assertEqual(root_entries[6].z, 2)
|
||||
self.assertEqual(root_entries[6].x, 0)
|
||||
self.assertEqual(root_entries[6].y, 1)
|
||||
|
||||
def test_full_z7_pyramid(self):
|
||||
entries = []
|
||||
# create artificial 8 levels
|
||||
for z in range(0, 9):
|
||||
for x in range(0, pow(2, z)):
|
||||
for y in range(0, pow(2, z)):
|
||||
entries.append(Entry(z, x, y, 0, 0, False))
|
||||
self.assertEqual(find_leaf_level(entries, 21845), 7)
|
||||
root_entries, leaf_dirs = make_pyramid(entries, 0)
|
||||
self.assertEqual(len(root_entries), 21845)
|
||||
self.assertEqual(len(leaf_dirs), 4)
|
||||
self.assertTrue(len(leaf_dirs[0]) <= 21845)
|
||||
|
||||
Reference in New Issue
Block a user