From 20f56a5067cad8ee45119d8d2945a92cbd07167d Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Wed, 25 Feb 2026 11:04:54 -0500 Subject: [PATCH] abort pending fetches --- js/src/index.ts | 63 +++++- js/test/abort.test.ts | 511 ++++++++++++++++++++++++++++++++++++++++++ js/test/index.test.ts | 1 + 3 files changed, 567 insertions(+), 8 deletions(-) create mode 100644 js/test/abort.test.ts diff --git a/js/src/index.ts b/js/src/index.ts index 5631517..f1e86b2 100644 --- a/js/src/index.ts +++ b/js/src/index.ts @@ -573,7 +573,8 @@ export interface Cache { source: Source, offset: number, length: number, - header: Header + header: Header, + signal?: AbortSignal ) => Promise; invalidate: (source: Source) => Promise; } @@ -614,9 +615,10 @@ async function getDirectory( decompress: DecompressFunc, offset: number, length: number, - header: Header + header: Header, + signal?: AbortSignal ): Promise { - const resp = await source.getBytes(offset, length, undefined, header.etag); + const resp = await source.getBytes(offset, length, signal, header.etag); const data = await decompress(resp.data, header.internalCompression); const directory = deserializeIndex(data); if (directory.length === 0) { @@ -684,7 +686,8 @@ export class ResolvedValueCache { source: Source, offset: number, length: number, - header: Header + header: Header, + signal?: AbortSignal ): Promise { const cacheKey = `${source.getKey()}|${ header.etag || "" @@ -701,7 +704,8 @@ export class ResolvedValueCache { this.decompress, offset, length, - header + header, + signal ); this.cache.set(cacheKey, { lastUsed: this.counter++, @@ -742,9 +746,15 @@ interface SharedPromiseCacheValue { * * Only caches headers and directories, not individual tile contents. */ +interface PendingFetch { + controller: AbortController; + refs: number; +} + export class SharedPromiseCache { cache: Map; invalidations: Map>; + pendingFetches: Map; maxCacheEntries: number; counter: number; decompress: DecompressFunc; @@ -756,6 +766,7 @@ export class SharedPromiseCache { ) { this.cache = new Map(); this.invalidations = new Map>(); + this.pendingFetches = new Map(); this.maxCacheEntries = maxCacheEntries; this.counter = 1; this.decompress = decompress; @@ -790,11 +801,34 @@ export class SharedPromiseCache { return p; } + private trackSignal( + cacheKey: string, + pending: PendingFetch, + signal: AbortSignal + ) { + pending.refs++; + signal.addEventListener( + "abort", + () => { + if ( + --pending.refs <= 0 && + this.pendingFetches.get(cacheKey) === pending + ) { + pending.controller.abort(); + this.cache.delete(cacheKey); + this.pendingFetches.delete(cacheKey); + } + }, + { once: true } + ); + } + async getDirectory( source: Source, offset: number, length: number, - header: Header + header: Header, + signal?: AbortSignal ): Promise { const cacheKey = `${source.getKey()}|${ header.etag || "" @@ -802,13 +836,23 @@ export class SharedPromiseCache { const cacheValue = this.cache.get(cacheKey); if (cacheValue) { cacheValue.lastUsed = this.counter++; + const pending = this.pendingFetches.get(cacheKey); + if (pending) { + this.trackSignal(cacheKey, pending, signal ?? new AbortController().signal); + } const data = await cacheValue.data; return data as Entry[]; } + const ac = new AbortController(); + const pending: PendingFetch = { controller: ac, refs: 0 }; + this.trackSignal(cacheKey, pending, signal ?? new AbortController().signal); + this.pendingFetches.set(cacheKey, pending); + const p = new Promise((resolve, reject) => { - getDirectory(source, this.decompress, offset, length, header) + getDirectory(source, this.decompress, offset, length, header, ac.signal) .then((directory) => { + this.pendingFetches.delete(cacheKey); resolve(directory); this.prune(); }) @@ -908,6 +952,7 @@ export class PMTiles { ): Promise { const tileId = zxyToTileId(z, x, y); const header = await this.cache.getHeader(this.source); + signal?.throwIfAborted(); if (z < header.minZoom || z > header.maxZoom) { return undefined; @@ -920,8 +965,10 @@ export class PMTiles { this.source, dO, dL, - header + header, + signal ); + signal?.throwIfAborted(); const entry = findTile(directory, tileId); if (entry) { if (entry.runLength > 0) { diff --git a/js/test/abort.test.ts b/js/test/abort.test.ts new file mode 100644 index 0000000..b7ff309 --- /dev/null +++ b/js/test/abort.test.ts @@ -0,0 +1,511 @@ +import assert from "node:assert"; +import { describe, test } from "node:test"; + +import { + Entry, + PMTiles, + RangeResponse, + SharedPromiseCache, + Source, +} from "../src/index"; + +/** + * A Source where each getBytes call can be individually controlled: + * paused, resolved, or rejected. Tracks calls for assertions. + */ +class ControllableSource implements Source { + key: string; + calls: { + offset: number; + length: number; + signal?: AbortSignal; + resolve: (resp: RangeResponse) => void; + reject: (err: Error) => void; + }[]; + + constructor(key: string) { + this.key = key; + this.calls = []; + } + + getKey() { + return this.key; + } + + async getBytes( + offset: number, + length: number, + signal?: AbortSignal, + etag?: string + ): Promise { + return new Promise((resolve, reject) => { + this.calls.push({ offset, length, signal, resolve, reject }); + if (signal) { + signal.addEventListener( + "abort", + () => { + reject(signal.reason ?? new DOMException("Aborted", "AbortError")); + }, + { once: true } + ); + } + }); + } +} + +/** Encode a number as a varint into buf at pos, return new pos. */ +function writeVarint(buf: Uint8Array, pos: number, value: number): number { + while (value >= 0x80) { + buf[pos++] = (value & 0x7f) | 0x80; + value >>>= 7; + } + buf[pos++] = value; + return pos; +} + +/** + * Serialize a directory (array of entries) into the PMTiles columnar varint format. + * Entries must be sorted by tileId. + */ +function serializeDirectory(entries: Entry[]): ArrayBuffer { + const buf = new Uint8Array(1024); + let pos = 0; + + // numEntries + pos = writeVarint(buf, pos, entries.length); + + // delta-encoded tileIds + let lastId = 0; + for (const e of entries) { + pos = writeVarint(buf, pos, e.tileId - lastId); + lastId = e.tileId; + } + + // runLengths + for (const e of entries) { + pos = writeVarint(buf, pos, e.runLength); + } + + // lengths + for (const e of entries) { + pos = writeVarint(buf, pos, e.length); + } + + // offsets: first entry stores offset+1, subsequent entries store 0 if + // offset == prev.offset + prev.length, else offset+1 + for (let i = 0; i < entries.length; i++) { + if (i > 0 && entries[i].offset === entries[i - 1].offset + entries[i - 1].length) { + pos = writeVarint(buf, pos, 0); + } else { + pos = writeVarint(buf, pos, entries[i].offset + 1); + } + } + + return buf.slice(0, pos).buffer; +} + +/** + * Build a minimal valid 16KB PMTiles buffer containing a header whose root + * directory has a single leaf-pointer entry (runLength=0), forcing tile + * lookups to make a separate HTTP request for the leaf directory. + * + * Returns { data, leafDirOffset } so tests know where the leaf lives. + */ +function buildHeaderWithLeaf() { + const LEAF_DIR_OFFSET = 20000; + const LEAF_DIR_LENGTH = 100; + const TILE_DATA_OFFSET = 30000; + + // Root directory: one leaf pointer entry covering all tileIds from 0 + const rootDirBuf = serializeDirectory([ + { tileId: 0, runLength: 0, length: LEAF_DIR_LENGTH, offset: 0 }, + ]); + + const ROOT_DIR_OFFSET = 127; + const ROOT_DIR_LENGTH = rootDirBuf.byteLength; + const JSON_OFFSET = ROOT_DIR_OFFSET + ROOT_DIR_LENGTH; + + const buf = new ArrayBuffer(16384); + const view = new DataView(buf); + const u8 = new Uint8Array(buf); + + // Magic: "PM" as little-endian uint16 = 0x4d50 + view.setUint16(0, 0x4d50, true); + // Spec version at byte 7 + u8[7] = 3; + + function setUint64(offset: number, value: number) { + view.setBigUint64(offset, BigInt(value), true); + } + + // Header fields (offsets match bytesToHeader in index.ts) + setUint64(8, ROOT_DIR_OFFSET); // rootDirectoryOffset + setUint64(16, ROOT_DIR_LENGTH); // rootDirectoryLength + setUint64(24, JSON_OFFSET); // jsonMetadataOffset + setUint64(32, 0); // jsonMetadataLength + setUint64(40, LEAF_DIR_OFFSET); // leafDirectoryOffset + setUint64(48, LEAF_DIR_LENGTH); // leafDirectoryLength + setUint64(56, TILE_DATA_OFFSET); // tileDataOffset + setUint64(64, 1000); // tileDataLength + setUint64(72, 1); // numAddressedTiles + setUint64(80, 1); // numTileEntries + setUint64(88, 1); // numTileContents + u8[96] = 0; // clustered = false + u8[97] = 1; // internalCompression = None + u8[98] = 1; // tileCompression = None + u8[99] = 1; // tileType = MVT + u8[100] = 0; // minZoom + u8[101] = 4; // maxZoom + view.setInt32(110, 10000000, true); // maxLon + view.setInt32(114, 10000000, true); // maxLat + + // Write root directory into the buffer + u8.set(new Uint8Array(rootDirBuf), ROOT_DIR_OFFSET); + + return { data: buf, leafDirOffset: LEAF_DIR_OFFSET, tileDataOffset: TILE_DATA_OFFSET }; +} + +describe("directory abort cancellation", () => { + test("getZxy bails out early when signal is already aborted after header fetch", async () => { + let dirFetchAttempted = false; + const { data: headerData } = buildHeaderWithLeaf(); + + const source: Source = { + getKey: () => "test", + getBytes: async ( + offset: number, + length: number, + signal?: AbortSignal + ): Promise => { + if (offset === 0) { + return { data: new Uint8Array(headerData).slice(0, length).buffer }; + } + dirFetchAttempted = true; + return { data: new ArrayBuffer(0) }; + }, + }; + + const ac = new AbortController(); + const p = new PMTiles(source); + + // Pre-warm header cache + await p.getHeader(); + + // Abort before calling getZxy + ac.abort(); + + await assert.rejects( + () => p.getZxy(1, 0, 0, ac.signal), + (err: Error) => err.name === "AbortError" + ); + + assert.strictEqual( + dirFetchAttempted, + false, + "leaf directory fetch should not be attempted after signal already aborted" + ); + }); + + test("SharedPromiseCache cancels directory fetch when all callers abort", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + // Populate header in cache + const headerPromise = cache.getHeader(source); + assert.strictEqual(source.calls.length, 1); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + const ac2 = new AbortController(); + + const dirPromise1 = cache.getDirectory( + source, leafDirOffset, 100, header, ac1.signal + ); + const dirPromise2 = cache.getDirectory( + source, leafDirOffset, 100, header, ac2.signal + ); + + // Only one underlying getBytes call for the directory (deduplicated) + assert.strictEqual(source.calls.length, 2); // header + 1 directory + const dirCall = source.calls[1]; + assert.ok(dirCall.signal, "directory fetch should have an AbortSignal"); + assert.strictEqual(dirCall.signal.aborted, false); + + // Abort first caller - fetch should NOT cancel yet + ac1.abort(); + assert.strictEqual(dirCall.signal.aborted, false, + "fetch must not abort with one caller remaining"); + + // Abort second caller - fetch SHOULD cancel now + ac2.abort(); + assert.strictEqual(dirCall.signal.aborted, true, + "fetch should abort when all callers have cancelled"); + + // Both promises reject + await assert.rejects(dirPromise1); + await assert.rejects(dirPromise2); + + // Cache entry should be evicted + const cacheKey = `test||${leafDirOffset}|100`; + assert.strictEqual(cache.cache.has(cacheKey), false); + assert.strictEqual(cache.pendingFetches.has(cacheKey), false); + }); + + test("SharedPromiseCache does NOT cancel when some callers are still active", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + const headerPromise = cache.getHeader(source); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + const ac2 = new AbortController(); + + cache.getDirectory(source, leafDirOffset, 100, header, ac1.signal); + cache.getDirectory(source, leafDirOffset, 100, header, ac2.signal); + + const dirCall = source.calls[1]; + + // Abort only one caller + ac1.abort(); + assert.strictEqual(dirCall.signal!.aborted, false, + "fetch must not abort while one caller remains active"); + + // Resolve for the remaining caller + const leafDir = serializeDirectory([ + { tileId: 0, runLength: 1, length: 50, offset: 0 }, + ]); + dirCall.resolve({ data: leafDir }); + }); + + test("SharedPromiseCache pins fetch when a later caller has no signal", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + const headerPromise = cache.getHeader(source); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + + // First caller has a signal + cache.getDirectory(source, leafDirOffset, 100, header, ac1.signal); + // Second caller has NO signal → pins the fetch + cache.getDirectory(source, leafDirOffset, 100, header, undefined); + + const dirCall = source.calls[1]; + + // Abort the signaled caller — fetch must NOT be cancelled (pinned) + ac1.abort(); + assert.strictEqual(dirCall.signal!.aborted, false, + "fetch must not abort when pinned by an unsignaled caller"); + }); + + test("SharedPromiseCache pins fetch when first caller has no signal", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + const headerPromise = cache.getHeader(source); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + + // First caller has NO signal → pinned immediately + cache.getDirectory(source, leafDirOffset, 100, header, undefined); + // Second caller has a signal + cache.getDirectory(source, leafDirOffset, 100, header, ac1.signal); + + const dirCall = source.calls[1]; + + ac1.abort(); + assert.strictEqual(dirCall.signal!.aborted, false, + "fetch must not abort when pinned by an unsignaled initial caller"); + }); + + test("completed fetch is not affected by late abort", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + const headerPromise = cache.getHeader(source); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + + const dirPromise = cache.getDirectory( + source, leafDirOffset, 100, header, ac1.signal + ); + + // Resolve the directory fetch BEFORE aborting + const leafDir = serializeDirectory([ + { tileId: 0, runLength: 1, length: 50, offset: 0 }, + ]); + source.calls[1].resolve({ data: leafDir }); + const directory = await dirPromise; + assert.strictEqual(directory.length, 1); + + // Now abort AFTER completion + ac1.abort(); + + // The cache entry must still be present + const cacheKey = `test||${leafDirOffset}|100`; + assert.strictEqual(cache.cache.has(cacheKey), true, + "cache entry must survive late abort"); + + // Subsequent reads still work + const dir2 = await cache.getDirectory( + source, leafDirOffset, 100, header + ); + assert.strictEqual(dir2.length, 1); + }); + + test("evicted cache entry allows fresh re-fetch", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + const headerPromise = cache.getHeader(source); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + + const dirPromise1 = cache.getDirectory( + source, leafDirOffset, 100, header, ac1.signal + ); + + // Abort → cancels fetch, evicts cache entry + ac1.abort(); + await assert.rejects(dirPromise1); + + // New caller should trigger a fresh fetch + const dirPromise2 = cache.getDirectory( + source, leafDirOffset, 100, header + ); + + assert.strictEqual(source.calls.length, 3, + "a fresh getBytes call should be made after eviction"); + + // Resolve the new fetch + const leafDir = serializeDirectory([ + { tileId: 0, runLength: 1, length: 50, offset: 0 }, + ]); + source.calls[2].resolve({ data: leafDir }); + + const directory = await dirPromise2; + assert.strictEqual(directory.length, 1); + }); + + test("getZxy passes signal through to leaf directory fetch", async () => { + const { data: headerData } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const p = new PMTiles(source); + + const ac = new AbortController(); + const tilePromise = p.getZxy(1, 0, 0, ac.signal); + + // Resolve header fetch + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + + // Wait for the leaf directory fetch to be initiated + await new Promise((r) => setTimeout(r, 10)); + assert.strictEqual(source.calls.length, 2, "leaf directory fetch expected"); + + const dirCall = source.calls[1]; + assert.ok(dirCall.signal, "directory fetch should have a signal"); + assert.strictEqual(dirCall.signal.aborted, false); + + // Abort the tile request — single caller so directory fetch should cancel + ac.abort(); + assert.strictEqual(dirCall.signal.aborted, true, + "directory fetch signal should be aborted"); + + await assert.rejects(tilePromise); + }); + + test("getZxy does not start tile data fetch if signal aborts before directory resolves", async () => { + const { data: headerData } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const p = new PMTiles(source); + + const ac = new AbortController(); + const tilePromise = p.getZxy(1, 0, 0, ac.signal); + + // Resolve header fetch + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + + // Wait for leaf directory fetch to begin + await new Promise((r) => setTimeout(r, 10)); + assert.strictEqual(source.calls.length, 2); + + // Abort the signal first — this cancels the directory fetch (single caller). + ac.abort(); + + // Let microtasks and rejections settle + await assert.rejects(tilePromise); + + // Only header + directory calls were made, no tile data fetch. + assert.strictEqual(source.calls.length, 2, + "tile data fetch should not start after signal abort"); + }); + + test("three callers: fetch cancels only when last one aborts", async () => { + const { data: headerData, leafDirOffset } = buildHeaderWithLeaf(); + const source = new ControllableSource("test"); + const cache = new SharedPromiseCache(); + + const headerPromise = cache.getHeader(source); + source.calls[0].resolve({ + data: new Uint8Array(headerData).slice(0, 16384).buffer, + }); + const header = await headerPromise; + + const ac1 = new AbortController(); + const ac2 = new AbortController(); + const ac3 = new AbortController(); + + const p1 = cache.getDirectory(source, leafDirOffset, 100, header, ac1.signal); + const p2 = cache.getDirectory(source, leafDirOffset, 100, header, ac2.signal); + const p3 = cache.getDirectory(source, leafDirOffset, 100, header, ac3.signal); + + const dirCall = source.calls[1]; + + ac1.abort(); + assert.strictEqual(dirCall.signal!.aborted, false, "2 callers remain"); + + ac2.abort(); + assert.strictEqual(dirCall.signal!.aborted, false, "1 caller remains"); + + ac3.abort(); + assert.strictEqual(dirCall.signal!.aborted, true, "all callers aborted"); + + // Catch all rejections + await assert.rejects(p1); + await assert.rejects(p2); + await assert.rejects(p3); + }); +}); diff --git a/js/test/index.test.ts b/js/test/index.test.ts index 0289b92..62aa4b5 100644 --- a/js/test/index.test.ts +++ b/js/test/index.test.ts @@ -1,2 +1,3 @@ import "./adapter.test"; import "./v3.test"; +import "./abort.test";