From 9594cd1764b5593028cc247da2daf04a5836b5eb Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Tue, 4 Oct 2022 11:47:26 +0800 Subject: [PATCH] add second implementation of Cache for environments where promises cannot be shared between requests --- js/v3.ts | 312 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 250 insertions(+), 62 deletions(-) diff --git a/js/v3.ts b/js/v3.ts index b1433da..9fe8275 100644 --- a/js/v3.ts +++ b/js/v3.ts @@ -295,12 +295,6 @@ export class FetchSource implements Source { } } -interface CacheEntry { - lastUsed: number; - size: number; // 0 if the promise has not resolved - data: Promise
; -} - export function bytesToHeader(bytes: ArrayBuffer, etag?: string): Header { const v = new DataView(bytes); if (v.getUint16(0, true) !== 0x4d50) { @@ -378,22 +372,217 @@ export interface Cache { length: number, header: Header ) => Promise; + getArrayBuffer: ( + source: Source, + offset: number, + length: number, + header: Header + ) => Promise; invalidate: (source: Source) => void; } +async function getHeaderAndRoot( + source: Source, + prefetch: boolean +): Promise<[Header, [string, number, Entry[] | ArrayBuffer]?]> { + let resp = await source.getBytes(0, 16384); + + // check spec revision + if (false) { + } else { + const headerData = resp.data.slice(0, HEADER_SIZE_BYTES); + const header = bytesToHeader(headerData, resp.etag); + + // optimistically set the root directory + // TODO check root bounds + if (prefetch) { + const rootDirData = resp.data.slice( + header.rootDirectoryOffset, + header.rootDirectoryOffset + header.rootDirectoryLength + ); + const dirKey = + source.getKey() + + "|" + + (header.etag || "") + + "|" + + header.rootDirectoryOffset + + "|" + + header.rootDirectoryLength; + + const rootDir = deserializeIndex( + tryDecompress(rootDirData, header.internalCompression) + ); + return [header, [dirKey, ENTRY_SIZE_BYTES * rootDir.length, rootDir]]; + } + + return [header, undefined]; + } +} + +async function getDirectory( + source: Source, + offset: number, + length: number, + header: Header +): Promise { + let resp = await source.getBytes(offset, length); + + if (header.etag && header.etag !== resp.etag) { + throw new VersionMismatch("ETag mismatch: " + header.etag); + } + + const data = tryDecompress(resp.data, header.internalCompression); + const directory = deserializeIndex(data); + if (directory.length === 0) { + throw new Error("Empty directory is invalid"); + } + + return directory; +} + +interface ResolvedValue { + lastUsed: number; + size: number; + data: Header | Entry[] | ArrayBuffer; +} + +export class ResolvedValueCache { + cache: Map; + sizeBytes: number; + maxSizeBytes: number; + counter: number; + prefetch: boolean; + + constructor(maxSizeBytes = 64000000, prefetch = true) { + this.cache = new Map(); + this.sizeBytes = 0; + this.maxSizeBytes = maxSizeBytes; + this.counter = 1; + this.prefetch = prefetch; + } + + async getHeader(source: Source): Promise
{ + const cacheKey = source.getKey(); + if (this.cache.has(cacheKey)) { + this.cache.get(cacheKey)!.lastUsed = this.counter++; + const data = this.cache.get(cacheKey)!.data; + return data as Header; + } + + let res = await getHeaderAndRoot(source, this.prefetch); + if (res[1]) { + this.cache.set(res[1][0], { + lastUsed: this.counter++, + size: res[1][1], + data: res[1][2], + }); + } + + this.cache.set(cacheKey, { + lastUsed: this.counter++, + data: res[0], + size: HEADER_SIZE_BYTES, + }); + this.sizeBytes += HEADER_SIZE_BYTES; + this.prune(); + return res[0]; + } + + async getDirectory( + source: Source, + offset: number, + length: number, + header: Header + ): Promise { + const cacheKey = + source.getKey() + "|" + (header.etag || "") + "|" + offset + "|" + length; + if (this.cache.has(cacheKey)) { + this.cache.get(cacheKey)!.lastUsed = this.counter++; + const data = this.cache.get(cacheKey)!.data; + return data as Entry[]; + } + + const directory = await getDirectory(source, offset, length, header); + this.cache.set(cacheKey, { + lastUsed: this.counter++, + data: directory, + size: ENTRY_SIZE_BYTES * directory.length, + }); + this.sizeBytes += ENTRY_SIZE_BYTES * directory.length; + this.prune(); + return directory; + } + + // for v2 backwards compatibility + async getArrayBuffer( + source: Source, + offset: number, + length: number, + header: Header + ): Promise { + const cacheKey = + source.getKey() + "|" + (header.etag || "") + "|" + offset + "|" + length; + if (this.cache.has(cacheKey)) { + this.cache.get(cacheKey)!.lastUsed = this.counter++; + const data = await this.cache.get(cacheKey)!.data; + return data as ArrayBuffer; + } + + let resp = await source.getBytes(offset, length); + if (header.etag && header.etag !== resp.etag) { + throw new VersionMismatch("ETag mismatch: " + header.etag); + } + this.cache.set(cacheKey, { + lastUsed: this.counter++, + data: resp.data, + size: resp.data.byteLength, + }); + this.sizeBytes += resp.data.byteLength; + this.prune(); + return resp.data; + } + + prune() { + while (this.sizeBytes > this.maxSizeBytes) { + let minUsed = Infinity; + let minKey = undefined; + this.cache.forEach((cache_value: ResolvedValue, key: string) => { + if (cache_value.lastUsed < minUsed) { + minUsed = cache_value.lastUsed; + minKey = key; + } + }); + if (minKey) { + this.sizeBytes -= this.cache.get(minKey)!.size; + this.cache.delete(minKey); + } + } + } + + invalidate(source: Source) { + this.cache.delete(source.getKey()); + } +} + +interface SharedPromiseCacheValue { + lastUsed: number; + size: number; // 0 if the promise has not resolved + data: Promise
; +} + // a "dumb" bag of bytes. // only caches headers and directories // deduplicates simultaneous responses // (estimates) the maximum size of the cache. export class SharedPromiseCache { - cache: Map; + cache: Map; sizeBytes: number; maxSizeBytes: number; counter: number; prefetch: boolean; constructor(maxSizeBytes = 64000000, prefetch = true) { - this.cache = new Map(); + this.cache = new Map(); this.sizeBytes = 0; this.maxSizeBytes = maxSizeBytes; this.counter = 1; @@ -409,48 +598,20 @@ export class SharedPromiseCache { } const p = new Promise
((resolve, reject) => { - source - .getBytes(0, 16384) - .then((resp) => { + getHeaderAndRoot(source, this.prefetch) + .then((res) => { if (this.cache.has(cacheKey)) { this.cache.get(cacheKey)!.size = HEADER_SIZE_BYTES; this.sizeBytes += HEADER_SIZE_BYTES; } - - const headerData = resp.data.slice(0, HEADER_SIZE_BYTES); - if (headerData.byteLength !== HEADER_SIZE_BYTES) { - throw new Error("Invalid PMTiles header"); - } - const header = bytesToHeader(headerData, resp.etag); - - // optimistically set the root directory - // TODO check root bounds - if (this.prefetch) { - const rootDirData = resp.data.slice( - header.rootDirectoryOffset, - header.rootDirectoryOffset + header.rootDirectoryLength - ); - const dirKey = - source.getKey() + - "|" + - (header.etag || "") + - "|" + - header.rootDirectoryOffset + - "|" + - header.rootDirectoryLength; - - const rootDir = deserializeIndex( - tryDecompress(rootDirData, header.internalCompression) - ); - - this.cache.set(dirKey, { + if (res[1]) { + this.cache.set(res[1][0], { lastUsed: this.counter++, - data: Promise.resolve(rootDir), - size: ENTRY_SIZE_BYTES * rootDir.length, + size: res[1][1], + data: Promise.resolve(res[1][2]), }); } - - resolve(header); + resolve(res[0]); this.prune(); }) .catch((e) => { @@ -476,21 +637,9 @@ export class SharedPromiseCache { } const p = new Promise((resolve, reject) => { - source - .getBytes(offset, length) - .then((resp) => { - if (header.etag && header.etag !== resp.etag) { - throw new VersionMismatch("ETag mismatch: " + header.etag); - } - - const data = tryDecompress(resp.data, header.internalCompression); - const directory = deserializeIndex(data); - if (directory.length === 0) { - return reject(new Error("Empty directory is invalid")); - } - + getDirectory(source, offset, length, header) + .then((directory) => { resolve(directory); - if (this.cache.has(cacheKey)) { this.cache.get(cacheKey)!.size = ENTRY_SIZE_BYTES * directory.length; @@ -506,16 +655,55 @@ export class SharedPromiseCache { return p; } + // for v2 backwards compatibility + async getArrayBuffer( + source: Source, + offset: number, + length: number, + header: Header + ): Promise { + const cacheKey = + source.getKey() + "|" + (header.etag || "") + "|" + offset + "|" + length; + if (this.cache.has(cacheKey)) { + this.cache.get(cacheKey)!.lastUsed = this.counter++; + const data = await this.cache.get(cacheKey)!.data; + return data as ArrayBuffer; + } + + const p = new Promise((resolve, reject) => { + source + .getBytes(offset, length) + .then((resp) => { + if (header.etag && header.etag !== resp.etag) { + throw new VersionMismatch("ETag mismatch: " + header.etag); + } + resolve(resp.data); + if (this.cache.has(cacheKey)) { + this.cache.get(cacheKey)!.size = resp.data.byteLength; + this.sizeBytes += resp.data.byteLength; + } + this.prune(); + }) + .catch((e) => { + reject(e); + }); + }); + this.cache.set(cacheKey, { lastUsed: this.counter++, data: p, size: 0 }); + return p; + } + prune() { while (this.sizeBytes > this.maxSizeBytes) { let minUsed = Infinity; let minKey = undefined; - this.cache.forEach((cache_entry: CacheEntry, key: string) => { - if (cache_entry.lastUsed < minUsed) { - minUsed = cache_entry.lastUsed; - minKey = key; + this.cache.forEach( + (cache_value: SharedPromiseCacheValue, key: string) => { + if (cache_value.lastUsed < minUsed) { + minUsed = cache_value.lastUsed; + minKey = key; + } } - }); + ); if (minKey) { this.sizeBytes -= this.cache.get(minKey)!.size; this.cache.delete(minKey);