mirror of
https://github.com/protomaps/PMTiles.git
synced 2026-02-04 19:01:08 +00:00
refactor tileset to name; preserve pmtiles module ZIP structure
This commit is contained in:
@@ -2,6 +2,6 @@ import zipfile
|
|||||||
|
|
||||||
with zipfile.ZipFile("lambda_function.zip", "w", zipfile.ZIP_DEFLATED) as z:
|
with zipfile.ZipFile("lambda_function.zip", "w", zipfile.ZIP_DEFLATED) as z:
|
||||||
z.write("lambda_function.py")
|
z.write("lambda_function.py")
|
||||||
z.write("../../python/pmtiles/reader.py", "pmtiles.py")
|
z.write("../../python/pmtiles/reader.py", "pmtiles/reader.py")
|
||||||
|
|
||||||
print(f"created lambda_function.zip")
|
print(f"created lambda_function.zip")
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import re
|
|||||||
import boto3
|
import boto3
|
||||||
|
|
||||||
# create_lambda_function.py will vendor the relevant file
|
# create_lambda_function.py will vendor the relevant file
|
||||||
import pmtiles
|
from pmtiles.reader import Reader
|
||||||
|
|
||||||
Zxy = collections.namedtuple("Zxy", ["z", "x", "y"])
|
Zxy = collections.namedtuple("Zxy", ["z", "x", "y"])
|
||||||
|
|
||||||
@@ -33,17 +33,17 @@ def get_object_bytes(key, offset, length):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def pmtiles_path(p, tileset):
|
def pmtiles_path(p, name):
|
||||||
if not p:
|
if not p:
|
||||||
p = "{tileset}.pmtiles"
|
p = "{name}.pmtiles"
|
||||||
return p.replace("{tileset}", tileset)
|
return p.replace("{name}", name)
|
||||||
|
|
||||||
|
|
||||||
def parse_tile_path(p, str):
|
def parse_tile_path(p, str):
|
||||||
if not p:
|
if not p:
|
||||||
p = "/{tileset}/{z}/{x}/{y}.pbf"
|
p = "/{name}/{z}/{x}/{y}.pbf"
|
||||||
p = re.escape(p)
|
p = re.escape(p)
|
||||||
p = p.replace(r"\{tileset\}", r"(?P<tileset>[0-9a-zA-Z/!\-_\.\*'\(\)]+)")
|
p = p.replace(r"\{name\}", r"(?P<name>[0-9a-zA-Z/!\-_\.\*'\(\)]+)")
|
||||||
p = p.replace(r"\{z\}", r"(?P<z>\d+)")
|
p = p.replace(r"\{z\}", r"(?P<z>\d+)")
|
||||||
p = p.replace(r"\{x\}", r"(?P<x>\d+)")
|
p = p.replace(r"\{x\}", r"(?P<x>\d+)")
|
||||||
p = p.replace(r"\{y\}", r"(?P<y>\d+)")
|
p = p.replace(r"\{y\}", r"(?P<y>\d+)")
|
||||||
@@ -51,7 +51,7 @@ def parse_tile_path(p, str):
|
|||||||
if not m:
|
if not m:
|
||||||
return None, None
|
return None, None
|
||||||
return (
|
return (
|
||||||
m.group("tileset"),
|
m.group("name"),
|
||||||
Zxy(int(m.group("z")), int(m.group("x")), int(m.group("y"))),
|
Zxy(int(m.group("z")), int(m.group("x")), int(m.group("y"))),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -60,19 +60,18 @@ def parse_tile_path(p, str):
|
|||||||
# and returns API Gateway V2 / Lambda Function dict responses
|
# and returns API Gateway V2 / Lambda Function dict responses
|
||||||
# Does not work with CloudFront events/Lambda@Edge; see README
|
# Does not work with CloudFront events/Lambda@Edge; see README
|
||||||
def lambda_handler(event, context):
|
def lambda_handler(event, context):
|
||||||
start = datetime.now()
|
|
||||||
uri = event["rawPath"]
|
uri = event["rawPath"]
|
||||||
tileset, tile = parse_tile_uri(os.environ.get("TILE_PATH"), uri)
|
name, tile = parse_tile_path(os.environ.get("TILE_PATH"), uri)
|
||||||
|
|
||||||
if not tile:
|
if not tile:
|
||||||
return {"statusCode": 400, "body": "Invalid Tile URL"}
|
return {"statusCode": 400, "body": "Invalid Tile URL"}
|
||||||
|
|
||||||
def get_bytes(offset, length):
|
def get_bytes(offset, length):
|
||||||
return get_object_bytes(
|
return get_object_bytes(
|
||||||
pmtiles_path(os.environ.get("PMTILES_PATH"), tileset), offset, length
|
pmtiles_path(os.environ.get("PMTILES_PATH"), name), offset, length
|
||||||
)
|
)
|
||||||
|
|
||||||
reader = pmtiles.Reader(get_bytes)
|
reader = Reader(get_bytes)
|
||||||
tile_data = reader.get(tile.z, tile.x, tile.y)
|
tile_data = reader.get(tile.z, tile.x, tile.y)
|
||||||
if not tile_data:
|
if not tile_data:
|
||||||
return {"statusCode": 404, "body": "Tile not found"}
|
return {"statusCode": 404, "body": "Tile not found"}
|
||||||
|
|||||||
@@ -4,43 +4,51 @@ from lambda_function import parse_tile_path, pmtiles_path
|
|||||||
|
|
||||||
class TestLambda(unittest.TestCase):
|
class TestLambda(unittest.TestCase):
|
||||||
def test_parse_tile_default(self):
|
def test_parse_tile_default(self):
|
||||||
tileset, tile = parse_tile_path(None, "abcd")
|
name, tile = parse_tile_path(None, "abcd")
|
||||||
self.assertEqual(tile, None)
|
self.assertEqual(tile, None)
|
||||||
|
|
||||||
tileset, tile = parse_tile_path(None, "/foo/11/22/33.pbf")
|
name, tile = parse_tile_path(None, "/foo/11/22/33.pbf")
|
||||||
self.assertEqual(tileset, "foo")
|
self.assertEqual(name, "foo")
|
||||||
self.assertEqual(tile.z, 11)
|
self.assertEqual(tile.z, 11)
|
||||||
self.assertEqual(tile.x, 22)
|
self.assertEqual(tile.x, 22)
|
||||||
self.assertEqual(tile.y, 33)
|
self.assertEqual(tile.y, 33)
|
||||||
|
|
||||||
def test_parse_tile_path_setting(self):
|
def test_parse_tile_path_setting(self):
|
||||||
tileset, tile = parse_tile_path(
|
name, tile = parse_tile_path("/{name}/{z}/{y}/{x}.pbf", "/foo/11/22/33.pbf")
|
||||||
"/{tileset}/{z}/{y}/{x}.pbf", "/foo/11/22/33.pbf"
|
|
||||||
)
|
|
||||||
self.assertEqual(tile.x, 33)
|
self.assertEqual(tile.x, 33)
|
||||||
self.assertEqual(tile.y, 22)
|
self.assertEqual(tile.y, 22)
|
||||||
|
|
||||||
tileset, tile = parse_tile_path(
|
name, tile = parse_tile_path(
|
||||||
"/tiles/{tileset}/{z}/{x}/{y}.mvt", "/tiles/foo/4/2/3.mvt"
|
"/tiles/{name}/{z}/{x}/{y}.mvt", "/tiles/foo/4/2/3.mvt"
|
||||||
)
|
)
|
||||||
self.assertEqual(tileset, "foo")
|
self.assertEqual(name, "foo")
|
||||||
self.assertEqual(tile.z, 4)
|
self.assertEqual(tile.z, 4)
|
||||||
self.assertEqual(tile.x, 2)
|
self.assertEqual(tile.x, 2)
|
||||||
self.assertEqual(tile.y, 3)
|
self.assertEqual(tile.y, 3)
|
||||||
|
|
||||||
def test_parse_tile_path_setting_special_chars(self):
|
def test_parse_tile_path_setting_special_chars(self):
|
||||||
tileset, tile = parse_tile_path(
|
name, tile = parse_tile_path(
|
||||||
"/folder(new/{tileset}/{z}/{y}/{x}.pbf", "/folder(new/foo/11/22/33.pbf"
|
"/folder(new/{name}/{z}/{y}/{x}.pbf", "/folder(new/foo/11/22/33.pbf"
|
||||||
)
|
)
|
||||||
self.assertEqual(tileset, "foo")
|
self.assertEqual(name, "foo")
|
||||||
|
|
||||||
|
def test_parse_tile_path_setting_slash(self):
|
||||||
|
name, tile = parse_tile_path("/{name}/{z}/{y}/{x}.pbf", "/foo/bar/11/22/33.pbf")
|
||||||
|
self.assertEqual(name, "foo/bar")
|
||||||
|
|
||||||
def test_pmtiles_path(self):
|
def test_pmtiles_path(self):
|
||||||
self.assertEqual(pmtiles_path(None, "foo"), "foo.pmtiles")
|
self.assertEqual(pmtiles_path(None, "foo"), "foo.pmtiles")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
pmtiles_path("folder/{tileset}/file.pmtiles", "foo"),
|
pmtiles_path("folder/{name}/file.pmtiles", "foo"),
|
||||||
"folder/foo/file.pmtiles",
|
"folder/foo/file.pmtiles",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_pmtiles_path_slash(self):
|
||||||
|
self.assertEqual(
|
||||||
|
pmtiles_path("folder/{name}.pmtiles", "foo/bar"),
|
||||||
|
"folder/foo/bar.pmtiles",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
Reference in New Issue
Block a user