mirror of
https://github.com/protomaps/PMTiles.git
synced 2026-02-04 19:01:08 +00:00
103 lines
3.0 KiB
Python
103 lines
3.0 KiB
Python
import base64
|
|
import collections
|
|
from functools import lru_cache
|
|
import gzip
|
|
import json
|
|
import os
|
|
import re
|
|
|
|
# Exists inside all lambda functions
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
# create_lambda_function.py will vendor the relevant file
|
|
from pmtiles.reader import Reader
|
|
|
|
Zxy = collections.namedtuple("Zxy", ["z", "x", "y"])
|
|
|
|
s3 = boto3.client("s3")
|
|
|
|
# Given a 512MB lambda function, use half of the memory for the cache,
|
|
# assuming the average root/leaf/tile size is 512 KB
|
|
@lru_cache(maxsize=500)
|
|
def get_object_bytes(key, offset, length):
|
|
end = offset + length - 1
|
|
return (
|
|
s3.get_object(
|
|
Bucket=os.environ["BUCKET"],
|
|
Key=key,
|
|
Range=f"bytes={offset}-{end}",
|
|
)
|
|
.get("Body")
|
|
.read()
|
|
)
|
|
|
|
|
|
def pmtiles_path(p, name):
|
|
if not p:
|
|
p = "{name}.pmtiles"
|
|
return p.replace("{name}", name)
|
|
|
|
|
|
def parse_tile_path(p, str):
|
|
if not p:
|
|
p = "/{name}/{z}/{x}/{y}.pbf"
|
|
p = re.escape(p)
|
|
p = p.replace(r"\{name\}", r"(?P<name>[0-9a-zA-Z/!\-_\.\*'\(\)]+)")
|
|
p = p.replace(r"\{z\}", r"(?P<z>\d+)")
|
|
p = p.replace(r"\{x\}", r"(?P<x>\d+)")
|
|
p = p.replace(r"\{y\}", r"(?P<y>\d+)")
|
|
m = re.match(f"^{p}$", str)
|
|
if not m:
|
|
return None, None
|
|
return (
|
|
m.group("name"),
|
|
Zxy(int(m.group("z")), int(m.group("x")), int(m.group("y"))),
|
|
)
|
|
|
|
|
|
# Assumes event is a API Gateway V2 or Lambda Function URL formatted dict
|
|
# and returns API Gateway V2 / Lambda Function dict responses
|
|
# Does not work with CloudFront events/Lambda@Edge; see README
|
|
def lambda_handler(event, context):
|
|
uri = event["rawPath"]
|
|
name, tile = parse_tile_path(os.environ.get("TILE_PATH"), uri)
|
|
|
|
if not tile:
|
|
return {"statusCode": 400, "body": "Invalid tile URL"}
|
|
|
|
def get_bytes(offset, length):
|
|
return get_object_bytes(
|
|
pmtiles_path(os.environ.get("PMTILES_PATH"), name), offset, length
|
|
)
|
|
|
|
reader = Reader(get_bytes)
|
|
minzoom = int(reader.header().metadata["minzoom"])
|
|
maxzoom = int(reader.header().metadata["maxzoom"])
|
|
if tile.z < minzoom or tile.z > maxzoom:
|
|
return {"statusCode": 404, "body": "Tile not found"}
|
|
|
|
try:
|
|
tile_data = reader.get(tile.z, tile.x, tile.y)
|
|
if not tile_data:
|
|
return {"statusCode": 204}
|
|
except ClientError as e:
|
|
error_code = e.response["Error"]["Code"]
|
|
if error_code == "AccessDenied":
|
|
return {"statusCode": 404, "body": "Archive not found"}
|
|
else:
|
|
raise e
|
|
|
|
# CloudFront requires decompressed responses from lambda
|
|
# in order to implement the Compressed CacheOptimized policy correctly
|
|
# as well as Brotli support
|
|
if reader.header().metadata.get("compression") == "gzip":
|
|
tile_data = gzip.decompress(tile_data)
|
|
|
|
return {
|
|
"statusCode": 200,
|
|
"body": base64.b64encode(tile_data),
|
|
"isBase64Encoded": True,
|
|
"headers": {"Content-Type": "application/protobuf"},
|
|
}
|