import unittest import sqlite3 from io import BytesIO import os import shutil import json from pmtiles.writer import Writer from pmtiles.reader import Reader, MemorySource from pmtiles.convert import ( pmtiles_to_mbtiles, pmtiles_to_dir, mbtiles_to_pmtiles, mbtiles_to_header_json, ) from pmtiles.tile import TileType, Compression class TestConvert(unittest.TestCase): def tearDown(self): try: os.remove("test_tmp.pmtiles") except: pass try: os.remove("test_tmp.mbtiles") except: pass try: os.remove("test_tmp_2.mbtiles") except: pass try: shutil.rmtree("test_dir") except: pass def test_roundtrip(self): with open("test_tmp.pmtiles", "wb") as f: writer = Writer(f) writer.write_tile(0, b"0") writer.write_tile(1, b"1") writer.write_tile(2, b"2") writer.write_tile(3, b"3") writer.write_tile(4, b"4") writer.write_tile(5, b"5") writer.write_tile(6, b"6") writer.write_tile(7, b"7") writer.finalize( { "tile_type": TileType.MVT, "tile_compression": Compression.GZIP, "min_zoom": 0, "max_zoom": 2, "min_lon_e7": 0, "max_lon_e7": 0, "min_lat_e7": 0, "max_lat_e7": 0, "center_zoom": 0, "center_lon_e7": 0, "center_lat_e7": 0, }, {"vector_layers": ['vector','layers'], "tilestats":{'tile':'stats'}}, ) pmtiles_to_mbtiles("test_tmp.pmtiles", "test_tmp.mbtiles") conn = sqlite3.connect('test_tmp.mbtiles') cursor = conn.cursor() res = cursor.execute("SELECT value from metadata where name = 'json'") data = res.fetchone()[0] json_metadata = json.loads(data) self.assertEqual(json_metadata['vector_layers'], ['vector', 'layers']) self.assertEqual(json_metadata['tilestats'], {'tile':'stats'}) mbtiles_to_pmtiles("test_tmp.mbtiles", "test_tmp_2.pmtiles", 3) pmtiles_to_dir("test_tmp.pmtiles","test_dir") def test_mbtiles_header(self): header, json_metadata = mbtiles_to_header_json( { "name": "test_name", "format": "pbf", "bounds": "-180.0,-85,180,85", "center": "-122.1906,37.7599,11", "minzoom": "1", "maxzoom": "2", "attribution": "