mirror of
https://github.com/protomaps/PMTiles.git
synced 2026-02-04 19:01:08 +00:00
aws lambda has configurable tile_path and pmtiles_path
This commit is contained in:
@@ -5,7 +5,11 @@ import gzip
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
|
||||
# Exists inside all lambda functions
|
||||
import boto3
|
||||
|
||||
# create_lambda_function.py will vendor the relevant file
|
||||
import pmtiles
|
||||
|
||||
Zxy = collections.namedtuple("Zxy", ["z", "x", "y"])
|
||||
@@ -13,7 +17,9 @@ Zxy = collections.namedtuple("Zxy", ["z", "x", "y"])
|
||||
s3 = boto3.client("s3")
|
||||
|
||||
|
||||
@lru_cache
|
||||
# Given a 512MB lambda function, use half of the memory for the cache,
|
||||
# assuming the average root/leaf/tile size is 512 KB
|
||||
@lru_cache(maxsize=500)
|
||||
def get_object_bytes(key, offset, length):
|
||||
end = offset + length - 1
|
||||
return (
|
||||
@@ -27,29 +33,53 @@ def get_object_bytes(key, offset, length):
|
||||
)
|
||||
|
||||
|
||||
def parse_tile_uri(str):
|
||||
m = re.match("^(?:/([0-9a-zA-Z/!\-_\.\*'\(\)]+))?/(\d+)/(\d+)/(\d+).pbf$", str)
|
||||
def pmtiles_path(p, tileset):
|
||||
if not p:
|
||||
p = "{tileset}.pmtiles"
|
||||
return p.replace("{tileset}", tileset)
|
||||
|
||||
|
||||
def parse_tile_path(p, str):
|
||||
if not p:
|
||||
p = "/{tileset}/{z}/{x}/{y}.pbf"
|
||||
p = re.escape(p)
|
||||
p = p.replace(r"\{tileset\}", r"(?P<tileset>[0-9a-zA-Z/!\-_\.\*'\(\)]+)")
|
||||
p = p.replace(r"\{z\}", r"(?P<z>\d+)")
|
||||
p = p.replace(r"\{x\}", r"(?P<x>\d+)")
|
||||
p = p.replace(r"\{y\}", r"(?P<y>\d+)")
|
||||
m = re.match(f"^{p}$", str)
|
||||
if not m:
|
||||
return None, None
|
||||
return (m.group(1), Zxy(int(m.group(2)), int(m.group(3)), int(m.group(4))))
|
||||
return (
|
||||
m.group("tileset"),
|
||||
Zxy(int(m.group("z")), int(m.group("x")), int(m.group("y"))),
|
||||
)
|
||||
|
||||
|
||||
# Assumes event is a API Gateway V2 or Lambda Function URL formatted dict
|
||||
# and returns API Gateway V2 / Lambda Function dict responses
|
||||
# Does not work with CloudFront events/Lambda@Edge; see README
|
||||
def lambda_handler(event, context):
|
||||
start = datetime.now()
|
||||
uri = event["rawPath"] # API Gateway and Lambda Function URLs
|
||||
tileset, tile = parse_tile_uri(uri)
|
||||
uri = event["rawPath"]
|
||||
tileset, tile = parse_tile_uri(os.environ.get("TILE_PATH"), uri)
|
||||
|
||||
if not tile:
|
||||
return {"statusCode": 400, "body": "Invalid Tile URL"}
|
||||
|
||||
def get_bytes(offset, length):
|
||||
return get_object_bytes(tileset + ".pmtiles", offset, length)
|
||||
return get_object_bytes(
|
||||
pmtiles_path(os.environ.get("PMTILES_PATH"), tileset), offset, length
|
||||
)
|
||||
|
||||
reader = pmtiles.Reader(get_bytes)
|
||||
tile_data = reader.get(tile.z, tile.x, tile.y)
|
||||
if not tile_data:
|
||||
return {"statusCode": 404, "body": "Tile not found"}
|
||||
|
||||
# CloudFront requires decompressed responses from lambda
|
||||
# in order to implement the Compressed CacheOptimized policy correctly
|
||||
# as well as Brotli support
|
||||
if reader.header().metadata.get("compression") == "gzip":
|
||||
tile_data = gzip.decompress(tile_data)
|
||||
|
||||
|
||||
@@ -1,18 +1,46 @@
|
||||
import unittest
|
||||
from lambda_function import parse_tile_uri
|
||||
from lambda_function import parse_tile_path, pmtiles_path
|
||||
|
||||
|
||||
class TestLambda(unittest.TestCase):
|
||||
def test_parse_regex(self):
|
||||
tileset, tile = parse_tile_uri("/0/0/0.pbf")
|
||||
self.assertEqual(tileset, None)
|
||||
self.assertEqual(tile.x, 0)
|
||||
self.assertEqual(tile.y, 0)
|
||||
self.assertEqual(tile.z, 0)
|
||||
|
||||
tileset, tile = parse_tile_uri("abcd")
|
||||
def test_parse_tile_default(self):
|
||||
tileset, tile = parse_tile_path(None, "abcd")
|
||||
self.assertEqual(tile, None)
|
||||
|
||||
tileset, tile = parse_tile_path(None, "/foo/11/22/33.pbf")
|
||||
self.assertEqual(tileset, "foo")
|
||||
self.assertEqual(tile.z, 11)
|
||||
self.assertEqual(tile.x, 22)
|
||||
self.assertEqual(tile.y, 33)
|
||||
|
||||
def test_parse_tile_path_setting(self):
|
||||
tileset, tile = parse_tile_path(
|
||||
"/{tileset}/{z}/{y}/{x}.pbf", "/foo/11/22/33.pbf"
|
||||
)
|
||||
self.assertEqual(tile.x, 33)
|
||||
self.assertEqual(tile.y, 22)
|
||||
|
||||
tileset, tile = parse_tile_path(
|
||||
"/tiles/{tileset}/{z}/{x}/{y}.mvt", "/tiles/foo/4/2/3.mvt"
|
||||
)
|
||||
self.assertEqual(tileset, "foo")
|
||||
self.assertEqual(tile.z, 4)
|
||||
self.assertEqual(tile.x, 2)
|
||||
self.assertEqual(tile.y, 3)
|
||||
|
||||
def test_parse_tile_path_setting_special_chars(self):
|
||||
tileset, tile = parse_tile_path(
|
||||
"/folder(new/{tileset}/{z}/{y}/{x}.pbf", "/folder(new/foo/11/22/33.pbf"
|
||||
)
|
||||
self.assertEqual(tileset, "foo")
|
||||
|
||||
def test_pmtiles_path(self):
|
||||
self.assertEqual(pmtiles_path(None, "foo"), "foo.pmtiles")
|
||||
self.assertEqual(
|
||||
pmtiles_path("folder/{tileset}/file.pmtiles", "foo"),
|
||||
"folder/foo/file.pmtiles",
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user