JS: ETags logic as part of Sources [#90] (#341)

* Simplifies ETag logic into Source, making if-match conditional requests possible.
* Avoid if-match in FetchSource for latency reasons - use cache buster on ETag change
* handle weak ETags correctly
* add mock fetch server for testing ETags [#90]
This commit is contained in:
Brandon Liu
2024-01-31 23:00:24 +08:00
committed by GitHub
parent a0ee7c4906
commit 179b1590b1
6 changed files with 1964 additions and 141 deletions

View File

@@ -257,7 +257,8 @@ export interface Source {
getBytes: ( getBytes: (
offset: number, offset: number,
length: number, length: number,
signal?: AbortSignal signal?: AbortSignal,
etag?: string
) => Promise<RangeResponse>; ) => Promise<RangeResponse>;
getKey: () => string; getKey: () => string;
@@ -286,10 +287,12 @@ export class FileSource implements Source {
export class FetchSource implements Source { export class FetchSource implements Source {
url: string; url: string;
customHeaders: Headers; customHeaders: Headers;
mustReload: boolean;
constructor(url: string, customHeaders: Headers = new Headers()) { constructor(url: string, customHeaders: Headers = new Headers()) {
this.url = url; this.url = url;
this.customHeaders = customHeaders; this.customHeaders = customHeaders;
this.mustReload = false;
} }
getKey() { getKey() {
@@ -303,32 +306,41 @@ export class FetchSource implements Source {
async getBytes( async getBytes(
offset: number, offset: number,
length: number, length: number,
passedSignal?: AbortSignal passedSignal?: AbortSignal,
etag?: string
): Promise<RangeResponse> { ): Promise<RangeResponse> {
let controller: AbortController | undefined; let controller: AbortController | undefined;
let signal: AbortSignal | undefined; let signal: AbortSignal | undefined;
if (passedSignal) { if (passedSignal) {
signal = passedSignal; signal = passedSignal;
} else { } else {
// TODO check this works or assert 206
controller = new AbortController(); controller = new AbortController();
signal = controller.signal; signal = controller.signal;
} }
const requestHeaders = new Headers(this.customHeaders); const requestHeaders = new Headers(this.customHeaders);
requestHeaders.set("Range", `bytes=${offset}-${offset + length - 1}`); requestHeaders.set("range", `bytes=${offset}-${offset + length - 1}`);
// we don't send if match because:
// * it disables browser caching completely (Chromium)
// * it requires a preflight request for every tile request
// * it requires CORS configuration becasue If-Match is not a CORs-safelisted header
// CORs configuration should expose ETag.
// if any etag mismatch is detected, we need to ignore the browser cache
let cache: string | undefined;
if (this.mustReload) {
cache = "reload";
}
let resp = await fetch(this.url, { let resp = await fetch(this.url, {
signal: signal, signal: signal,
cache: cache,
headers: requestHeaders, headers: requestHeaders,
}); //biome-ignore lint: "cache" is incompatible between cloudflare workers and browser
} as any);
// TODO: can return 416 with offset > 0 if content changed, which will have a blank etag. // handle edge case where the archive is < 16384 kb total.
// See https://github.com/protomaps/PMTiles/issues/90 if (offset === 0 && resp.status === 416) {
if (resp.status === 416 && offset === 0) {
// some HTTP servers don't accept ranges beyond the end of the resource.
// Retry with the exact length
const contentRange = resp.headers.get("Content-Range"); const contentRange = resp.headers.get("Content-Range");
if (!contentRange || !contentRange.startsWith("bytes */")) { if (!contentRange || !contentRange.startsWith("bytes */")) {
throw Error("Missing content-length on 416 response"); throw Error("Missing content-length on 416 response");
@@ -336,18 +348,31 @@ export class FetchSource implements Source {
const actualLength = +contentRange.substr(8); const actualLength = +contentRange.substr(8);
resp = await fetch(this.url, { resp = await fetch(this.url, {
signal: signal, signal: signal,
cache: "reload",
headers: { range: `bytes=0-${actualLength - 1}` }, headers: { range: `bytes=0-${actualLength - 1}` },
}); //biome-ignore lint: "cache" is incompatible between cloudflare workers and browser
} as any);
}
// if it's a weak etag, it's not useful for us, so ignore it.
let newEtag = resp.headers.get("Etag");
if (newEtag?.startsWith("W/")) {
newEtag = null;
}
// some storage systems are misbehaved (Cloudflare R2)
if (resp.status === 416 || (etag && newEtag && newEtag !== etag)) {
this.mustReload = true;
throw new EtagMismatch(etag);
} }
if (resp.status >= 300) { if (resp.status >= 300) {
throw Error(`Bad response code: ${resp.status}`); throw Error(`Bad response code: ${resp.status}`);
} }
const contentLength = resp.headers.get("Content-Length");
// some well-behaved backends, e.g. DigitalOcean CDN, respond with 200 instead of 206 // some well-behaved backends, e.g. DigitalOcean CDN, respond with 200 instead of 206
// but we also need to detect no support for Byte Serving which is returning the whole file // but we also need to detect no support for Byte Serving which is returning the whole file
const contentLength = resp.headers.get("Content-Length");
if (resp.status === 200 && (!contentLength || +contentLength > length)) { if (resp.status === 200 && (!contentLength || +contentLength > length)) {
if (controller) controller.abort(); if (controller) controller.abort();
throw Error( throw Error(
@@ -358,7 +383,7 @@ export class FetchSource implements Source {
const a = await resp.arrayBuffer(); const a = await resp.arrayBuffer();
return { return {
data: a, data: a,
etag: resp.headers.get("ETag") || undefined, etag: newEtag || undefined,
cacheControl: resp.headers.get("Cache-Control") || undefined, cacheControl: resp.headers.get("Cache-Control") || undefined,
expires: resp.headers.get("Expires") || undefined, expires: resp.headers.get("Expires") || undefined,
}; };
@@ -463,7 +488,7 @@ function detectVersion(a: ArrayBuffer): number {
export class EtagMismatch extends Error {} export class EtagMismatch extends Error {}
export interface Cache { export interface Cache {
getHeader: (source: Source, currentEtag?: string) => Promise<Header>; getHeader: (source: Source) => Promise<Header>;
getDirectory: ( getDirectory: (
source: Source, source: Source,
offset: number, offset: number,
@@ -476,14 +501,13 @@ export interface Cache {
length: number, length: number,
header: Header header: Header
) => Promise<ArrayBuffer>; ) => Promise<ArrayBuffer>;
invalidate: (source: Source, currentEtag: string) => Promise<void>; invalidate: (source: Source) => Promise<void>;
} }
async function getHeaderAndRoot( async function getHeaderAndRoot(
source: Source, source: Source,
decompress: DecompressFunc, decompress: DecompressFunc,
prefetch: boolean, prefetch: boolean
currentEtag?: string
): Promise<[Header, [string, number, Entry[] | ArrayBuffer]?]> { ): Promise<[Header, [string, number, Entry[] | ArrayBuffer]?]> {
const resp = await source.getBytes(0, 16384); const resp = await source.getBytes(0, 16384);
@@ -499,15 +523,7 @@ async function getHeaderAndRoot(
const headerData = resp.data.slice(0, HEADER_SIZE_BYTES); const headerData = resp.data.slice(0, HEADER_SIZE_BYTES);
let respEtag = resp.etag; const header = bytesToHeader(headerData, resp.etag);
if (currentEtag && resp.etag !== currentEtag) {
console.warn(
`ETag conflict detected; your HTTP server might not support content-based ETag headers. ETags disabled for ${source.getKey()}`
);
respEtag = undefined;
}
const header = bytesToHeader(headerData, respEtag);
// optimistically set the root directory // optimistically set the root directory
// TODO check root bounds // TODO check root bounds
@@ -536,12 +552,7 @@ async function getDirectory(
length: number, length: number,
header: Header header: Header
): Promise<Entry[]> { ): Promise<Entry[]> {
const resp = await source.getBytes(offset, length); const resp = await source.getBytes(offset, length, undefined, header.etag);
if (header.etag && header.etag !== resp.etag) {
throw new EtagMismatch(resp.etag);
}
const data = await decompress(resp.data, header.internalCompression); const data = await decompress(resp.data, header.internalCompression);
const directory = deserializeIndex(data); const directory = deserializeIndex(data);
if (directory.length === 0) { if (directory.length === 0) {
@@ -584,12 +595,7 @@ export class ResolvedValueCache {
return data as Header; return data as Header;
} }
const res = await getHeaderAndRoot( const res = await getHeaderAndRoot(source, this.decompress, this.prefetch);
source,
this.decompress,
this.prefetch,
currentEtag
);
if (res[1]) { if (res[1]) {
this.cache.set(res[1][0], { this.cache.set(res[1][0], {
lastUsed: this.counter++, lastUsed: this.counter++,
@@ -653,10 +659,8 @@ export class ResolvedValueCache {
return data as ArrayBuffer; return data as ArrayBuffer;
} }
const resp = await source.getBytes(offset, length); const resp = await source.getBytes(offset, length, undefined, header.etag);
if (header.etag && header.etag !== resp.etag) {
throw new EtagMismatch(header.etag);
}
this.cache.set(cacheKey, { this.cache.set(cacheKey, {
lastUsed: this.counter++, lastUsed: this.counter++,
data: resp.data, data: resp.data,
@@ -681,9 +685,9 @@ export class ResolvedValueCache {
} }
} }
async invalidate(source: Source, currentEtag: string) { async invalidate(source: Source) {
this.cache.delete(source.getKey()); this.cache.delete(source.getKey());
await this.getHeader(source, currentEtag); await this.getHeader(source);
} }
} }
@@ -725,7 +729,7 @@ export class SharedPromiseCache {
} }
const p = new Promise<Header>((resolve, reject) => { const p = new Promise<Header>((resolve, reject) => {
getHeaderAndRoot(source, this.decompress, this.prefetch, currentEtag) getHeaderAndRoot(source, this.decompress, this.prefetch)
.then((res) => { .then((res) => {
if (res[1]) { if (res[1]) {
this.cache.set(res[1][0], { this.cache.set(res[1][0], {
@@ -793,11 +797,8 @@ export class SharedPromiseCache {
const p = new Promise<ArrayBuffer>((resolve, reject) => { const p = new Promise<ArrayBuffer>((resolve, reject) => {
source source
.getBytes(offset, length) .getBytes(offset, length, undefined, header.etag)
.then((resp) => { .then((resp) => {
if (header.etag && header.etag !== resp.etag) {
throw new EtagMismatch(resp.etag);
}
resolve(resp.data); resolve(resp.data);
if (this.cache.has(cacheKey)) { if (this.cache.has(cacheKey)) {
} }
@@ -827,9 +828,9 @@ export class SharedPromiseCache {
} }
} }
async invalidate(source: Source, currentEtag: string) { async invalidate(source: Source) {
this.cache.delete(source.getKey()); this.cache.delete(source.getKey());
await this.getHeader(source, currentEtag); await this.getHeader(source);
} }
} }
@@ -898,11 +899,9 @@ export class PMTiles {
const resp = await this.source.getBytes( const resp = await this.source.getBytes(
header.tileDataOffset + entry.offset, header.tileDataOffset + entry.offset,
entry.length, entry.length,
signal signal,
header.etag
); );
if (header.etag && header.etag !== resp.etag) {
throw new EtagMismatch(resp.etag);
}
return { return {
data: await this.decompress(resp.data, header.tileCompression), data: await this.decompress(resp.data, header.tileCompression),
cacheControl: resp.cacheControl, cacheControl: resp.cacheControl,
@@ -930,7 +929,7 @@ export class PMTiles {
return await this.getZxyAttempt(z, x, y, signal); return await this.getZxyAttempt(z, x, y, signal);
} catch (e) { } catch (e) {
if (e instanceof EtagMismatch) { if (e instanceof EtagMismatch) {
this.cache.invalidate(this.source, e.message); this.cache.invalidate(this.source);
return await this.getZxyAttempt(z, x, y, signal); return await this.getZxyAttempt(z, x, y, signal);
} }
throw e; throw e;
@@ -942,11 +941,10 @@ export class PMTiles {
const resp = await this.source.getBytes( const resp = await this.source.getBytes(
header.jsonMetadataOffset, header.jsonMetadataOffset,
header.jsonMetadataLength header.jsonMetadataLength,
undefined,
header.etag
); );
if (header.etag && header.etag !== resp.etag) {
throw new EtagMismatch(resp.etag);
}
const decompressed = await this.decompress( const decompressed = await this.decompress(
resp.data, resp.data,
header.internalCompression header.internalCompression
@@ -960,7 +958,7 @@ export class PMTiles {
return await this.getMetadataAttempt(); return await this.getMetadataAttempt();
} catch (e) { } catch (e) {
if (e instanceof EtagMismatch) { if (e instanceof EtagMismatch) {
this.cache.invalidate(this.source, e.message); this.cache.invalidate(this.source);
return await this.getMetadataAttempt(); return await this.getMetadataAttempt();
} }
throw e; throw e;

1844
js/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -27,6 +27,7 @@
"@biomejs/biome": "^1.5.3", "@biomejs/biome": "^1.5.3",
"@types/node": "^18.11.9", "@types/node": "^18.11.9",
"esbuild": "^0.20.0", "esbuild": "^0.20.0",
"msw": "^2.1.5",
"tsx": "^4.7.0", "tsx": "^4.7.0",
"typescript": "^4.5.5" "typescript": "^4.5.5"
}, },

View File

@@ -1,6 +1,8 @@
import fs from "fs"; import fs from "fs";
import assert from "node:assert"; import assert from "node:assert";
import { test } from "node:test"; import { test } from "node:test";
import { http, HttpResponse } from "msw";
import { setupServer } from "msw/node";
import { import {
BufferPosition, BufferPosition,
@@ -16,6 +18,45 @@ import {
zxyToTileId, zxyToTileId,
} from "../index"; } from "../index";
class MockServer {
etag?: string;
numRequests: number;
reset() {
this.numRequests = 0;
this.etag = undefined;
}
constructor() {
this.numRequests = 0;
this.etag = undefined;
const serverBuffer = fs.readFileSync("test/data/test_fixture_1.pmtiles");
const server = setupServer(
http.get(
"http://localhost:1337/example.pmtiles",
({ request, params }) => {
this.numRequests++;
const range = request.headers.get("range")?.substr(6).split("-");
if (!range) {
throw Error("invalid range");
}
const offset = +range[0];
const length = +range[1];
const body = serverBuffer.slice(offset, offset + length - 1);
return new HttpResponse(body, {
status: 206,
statusText: "OK",
headers: { etag: this.etag } as HeadersInit,
});
}
)
);
server.listen({ onUnhandledRequest: "error" });
}
}
const mockserver = new MockServer();
test("varint", () => { test("varint", () => {
let b: BufferPosition = { let b: BufferPosition = {
buf: new Uint8Array([0, 1, 127, 0xe5, 0x8e, 0x26]), buf: new Uint8Array([0, 1, 127, 0xe5, 0x8e, 0x26]),
@@ -291,67 +332,31 @@ test("multiple sources in a single cache", async () => {
assert.strictEqual(cache.cache.size, 4); assert.strictEqual(cache.cache.size, 4);
}); });
test("etags are part of key", async () => { test("etag change", async () => {
const cache = new SharedPromiseCache(6400, false); const p = new PMTiles("http://localhost:1337/example.pmtiles");
const source = new TestNodeFileSource( const tile = await p.getZxy(0, 0, 0);
"test/data/test_fixture_1.pmtiles", // header + tile
"1" assert.strictEqual(2, mockserver.numRequests);
); mockserver.etag = "etag_2";
source.etag = "etag_1"; await p.getZxy(0, 0, 0);
let header = await cache.getHeader(source); // tile + header again + tile
assert.strictEqual(header.etag, "etag_1"); assert.strictEqual(5, mockserver.numRequests);
source.etag = "etag_2";
assert.rejects(async () => {
await cache.getDirectory(
source,
header.rootDirectoryOffset,
header.rootDirectoryLength,
header
);
});
cache.invalidate(source, "etag_2");
header = await cache.getHeader(source);
assert.ok(
await cache.getDirectory(
source,
header.rootDirectoryOffset,
header.rootDirectoryLength,
header
)
);
}); });
test("soft failure on etag weirdness", async () => { test("weak etags", async () => {
const cache = new SharedPromiseCache(6400, false); mockserver.reset();
const source = new TestNodeFileSource( const p = new PMTiles("http://localhost:1337/example.pmtiles");
"test/data/test_fixture_1.pmtiles", const tile = await p.getZxy(0, 0, 0);
"1" // header + tile
); assert.strictEqual(2, mockserver.numRequests);
source.etag = "etag_1"; mockserver.etag = "W/weak_etag";
let header = await cache.getHeader(source); await p.getZxy(0, 0, 0);
assert.strictEqual(header.etag, "etag_1"); assert.strictEqual(3, mockserver.numRequests);
source.etag = "etag_2";
assert.rejects(async () => {
await cache.getDirectory(
source,
header.rootDirectoryOffset,
header.rootDirectoryLength,
header
);
});
source.etag = "etag_1";
cache.invalidate(source, "etag_2");
header = await cache.getHeader(source);
assert.strictEqual(header.etag, undefined);
}); });
// handle < 16384 bytes archive case
// handle DigitalOcean case returning 200 instead of 206
test("cache pruning by byte size", async () => { test("cache pruning by byte size", async () => {
const cache = new SharedPromiseCache(2, false); const cache = new SharedPromiseCache(2, false);
cache.cache.set("0", { lastUsed: 0, data: Promise.resolve([]) }); cache.cache.set("0", { lastUsed: 0, data: Promise.resolve([]) });
@@ -375,16 +380,3 @@ test("pmtiles get metadata", async () => {
}); });
// echo '{"type":"Polygon","coordinates":[[[0,0],[0,1],[1,0],[0,0]]]}' | ./tippecanoe -zg -o test_fixture_2.pmtiles // echo '{"type":"Polygon","coordinates":[[[0,0],[0,1],[1,0],[0,0]]]}' | ./tippecanoe -zg -o test_fixture_2.pmtiles
test("pmtiles handle retries", async () => {
const source = new TestNodeFileSource(
"test/data/test_fixture_1.pmtiles",
"1"
);
source.etag = "1";
const p = new PMTiles(source);
const metadata = await p.getMetadata();
assert.ok((metadata as { name: string }).name);
source.etag = "2";
source.replaceData("test/data/test_fixture_2.pmtiles");
assert.ok(await p.getZxy(0, 0, 0));
});

View File

@@ -54,7 +54,7 @@ class S3Source implements Source {
return this.archive_name; return this.archive_name;
} }
async getBytes(offset: number, length: number): Promise<RangeResponse> { async getBytes(offset: number, length: number, signal?:AbortSignal, etag?: string): Promise<RangeResponse> {
const resp = await s3client.send( const resp = await s3client.send(
new GetObjectCommand({ new GetObjectCommand({
Bucket: process.env.BUCKET!, Bucket: process.env.BUCKET!,

View File

@@ -52,7 +52,7 @@ class R2Source implements Source {
return this.archive_name; return this.archive_name;
} }
async getBytes(offset: number, length: number): Promise<RangeResponse> { async getBytes(offset: number, length: number, signal?: AbortSignal, etag?: string): Promise<RangeResponse> {
const resp = await this.env.BUCKET.get( const resp = await this.env.BUCKET.get(
pmtiles_path(this.archive_name, this.env.PMTILES_PATH), pmtiles_path(this.archive_name, this.env.PMTILES_PATH),
{ {