add second implementation of Cache for environments where promises cannot be shared between requests

This commit is contained in:
Brandon Liu
2022-10-04 11:47:26 +08:00
parent 6d6e931f1d
commit 9594cd1764

322
js/v3.ts
View File

@@ -295,12 +295,6 @@ export class FetchSource implements Source {
}
}
interface CacheEntry {
lastUsed: number;
size: number; // 0 if the promise has not resolved
data: Promise<Header | Entry[] | ArrayBuffer>;
}
export function bytesToHeader(bytes: ArrayBuffer, etag?: string): Header {
const v = new DataView(bytes);
if (v.getUint16(0, true) !== 0x4d50) {
@@ -378,54 +372,30 @@ export interface Cache {
length: number,
header: Header
) => Promise<Entry[]>;
getArrayBuffer: (
source: Source,
offset: number,
length: number,
header: Header
) => Promise<ArrayBuffer>;
invalidate: (source: Source) => void;
}
// a "dumb" bag of bytes.
// only caches headers and directories
// deduplicates simultaneous responses
// (estimates) the maximum size of the cache.
export class SharedPromiseCache {
cache: Map<string, CacheEntry>;
sizeBytes: number;
maxSizeBytes: number;
counter: number;
prefetch: boolean;
constructor(maxSizeBytes = 64000000, prefetch = true) {
this.cache = new Map<string, CacheEntry>();
this.sizeBytes = 0;
this.maxSizeBytes = maxSizeBytes;
this.counter = 1;
this.prefetch = prefetch;
}
async getHeader(source: Source): Promise<Header> {
const cacheKey = source.getKey();
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.lastUsed = this.counter++;
const data = await this.cache.get(cacheKey)!.data;
return data as Header;
}
const p = new Promise<Header>((resolve, reject) => {
source
.getBytes(0, 16384)
.then((resp) => {
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.size = HEADER_SIZE_BYTES;
this.sizeBytes += HEADER_SIZE_BYTES;
}
async function getHeaderAndRoot(
source: Source,
prefetch: boolean
): Promise<[Header, [string, number, Entry[] | ArrayBuffer]?]> {
let resp = await source.getBytes(0, 16384);
// check spec revision
if (false) {
} else {
const headerData = resp.data.slice(0, HEADER_SIZE_BYTES);
if (headerData.byteLength !== HEADER_SIZE_BYTES) {
throw new Error("Invalid PMTiles header");
}
const header = bytesToHeader(headerData, resp.etag);
// optimistically set the root directory
// TODO check root bounds
if (this.prefetch) {
if (prefetch) {
const rootDirData = resp.data.slice(
header.rootDirectoryOffset,
header.rootDirectoryOffset + header.rootDirectoryLength
@@ -442,15 +412,206 @@ export class SharedPromiseCache {
const rootDir = deserializeIndex(
tryDecompress(rootDirData, header.internalCompression)
);
return [header, [dirKey, ENTRY_SIZE_BYTES * rootDir.length, rootDir]];
}
this.cache.set(dirKey, {
return [header, undefined];
}
}
async function getDirectory(
source: Source,
offset: number,
length: number,
header: Header
): Promise<Entry[]> {
let resp = await source.getBytes(offset, length);
if (header.etag && header.etag !== resp.etag) {
throw new VersionMismatch("ETag mismatch: " + header.etag);
}
const data = tryDecompress(resp.data, header.internalCompression);
const directory = deserializeIndex(data);
if (directory.length === 0) {
throw new Error("Empty directory is invalid");
}
return directory;
}
interface ResolvedValue {
lastUsed: number;
size: number;
data: Header | Entry[] | ArrayBuffer;
}
export class ResolvedValueCache {
cache: Map<string, ResolvedValue>;
sizeBytes: number;
maxSizeBytes: number;
counter: number;
prefetch: boolean;
constructor(maxSizeBytes = 64000000, prefetch = true) {
this.cache = new Map<string, ResolvedValue>();
this.sizeBytes = 0;
this.maxSizeBytes = maxSizeBytes;
this.counter = 1;
this.prefetch = prefetch;
}
async getHeader(source: Source): Promise<Header> {
const cacheKey = source.getKey();
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.lastUsed = this.counter++;
const data = this.cache.get(cacheKey)!.data;
return data as Header;
}
let res = await getHeaderAndRoot(source, this.prefetch);
if (res[1]) {
this.cache.set(res[1][0], {
lastUsed: this.counter++,
data: Promise.resolve(rootDir),
size: ENTRY_SIZE_BYTES * rootDir.length,
size: res[1][1],
data: res[1][2],
});
}
resolve(header);
this.cache.set(cacheKey, {
lastUsed: this.counter++,
data: res[0],
size: HEADER_SIZE_BYTES,
});
this.sizeBytes += HEADER_SIZE_BYTES;
this.prune();
return res[0];
}
async getDirectory(
source: Source,
offset: number,
length: number,
header: Header
): Promise<Entry[]> {
const cacheKey =
source.getKey() + "|" + (header.etag || "") + "|" + offset + "|" + length;
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.lastUsed = this.counter++;
const data = this.cache.get(cacheKey)!.data;
return data as Entry[];
}
const directory = await getDirectory(source, offset, length, header);
this.cache.set(cacheKey, {
lastUsed: this.counter++,
data: directory,
size: ENTRY_SIZE_BYTES * directory.length,
});
this.sizeBytes += ENTRY_SIZE_BYTES * directory.length;
this.prune();
return directory;
}
// for v2 backwards compatibility
async getArrayBuffer(
source: Source,
offset: number,
length: number,
header: Header
): Promise<ArrayBuffer> {
const cacheKey =
source.getKey() + "|" + (header.etag || "") + "|" + offset + "|" + length;
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.lastUsed = this.counter++;
const data = await this.cache.get(cacheKey)!.data;
return data as ArrayBuffer;
}
let resp = await source.getBytes(offset, length);
if (header.etag && header.etag !== resp.etag) {
throw new VersionMismatch("ETag mismatch: " + header.etag);
}
this.cache.set(cacheKey, {
lastUsed: this.counter++,
data: resp.data,
size: resp.data.byteLength,
});
this.sizeBytes += resp.data.byteLength;
this.prune();
return resp.data;
}
prune() {
while (this.sizeBytes > this.maxSizeBytes) {
let minUsed = Infinity;
let minKey = undefined;
this.cache.forEach((cache_value: ResolvedValue, key: string) => {
if (cache_value.lastUsed < minUsed) {
minUsed = cache_value.lastUsed;
minKey = key;
}
});
if (minKey) {
this.sizeBytes -= this.cache.get(minKey)!.size;
this.cache.delete(minKey);
}
}
}
invalidate(source: Source) {
this.cache.delete(source.getKey());
}
}
interface SharedPromiseCacheValue {
lastUsed: number;
size: number; // 0 if the promise has not resolved
data: Promise<Header | Entry[] | ArrayBuffer>;
}
// a "dumb" bag of bytes.
// only caches headers and directories
// deduplicates simultaneous responses
// (estimates) the maximum size of the cache.
export class SharedPromiseCache {
cache: Map<string, SharedPromiseCacheValue>;
sizeBytes: number;
maxSizeBytes: number;
counter: number;
prefetch: boolean;
constructor(maxSizeBytes = 64000000, prefetch = true) {
this.cache = new Map<string, SharedPromiseCacheValue>();
this.sizeBytes = 0;
this.maxSizeBytes = maxSizeBytes;
this.counter = 1;
this.prefetch = prefetch;
}
async getHeader(source: Source): Promise<Header> {
const cacheKey = source.getKey();
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.lastUsed = this.counter++;
const data = await this.cache.get(cacheKey)!.data;
return data as Header;
}
const p = new Promise<Header>((resolve, reject) => {
getHeaderAndRoot(source, this.prefetch)
.then((res) => {
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.size = HEADER_SIZE_BYTES;
this.sizeBytes += HEADER_SIZE_BYTES;
}
if (res[1]) {
this.cache.set(res[1][0], {
lastUsed: this.counter++,
size: res[1][1],
data: Promise.resolve(res[1][2]),
});
}
resolve(res[0]);
this.prune();
})
.catch((e) => {
@@ -476,21 +637,9 @@ export class SharedPromiseCache {
}
const p = new Promise<Entry[]>((resolve, reject) => {
source
.getBytes(offset, length)
.then((resp) => {
if (header.etag && header.etag !== resp.etag) {
throw new VersionMismatch("ETag mismatch: " + header.etag);
}
const data = tryDecompress(resp.data, header.internalCompression);
const directory = deserializeIndex(data);
if (directory.length === 0) {
return reject(new Error("Empty directory is invalid"));
}
getDirectory(source, offset, length, header)
.then((directory) => {
resolve(directory);
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.size =
ENTRY_SIZE_BYTES * directory.length;
@@ -506,16 +655,55 @@ export class SharedPromiseCache {
return p;
}
// for v2 backwards compatibility
async getArrayBuffer(
source: Source,
offset: number,
length: number,
header: Header
): Promise<ArrayBuffer> {
const cacheKey =
source.getKey() + "|" + (header.etag || "") + "|" + offset + "|" + length;
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.lastUsed = this.counter++;
const data = await this.cache.get(cacheKey)!.data;
return data as ArrayBuffer;
}
const p = new Promise<ArrayBuffer>((resolve, reject) => {
source
.getBytes(offset, length)
.then((resp) => {
if (header.etag && header.etag !== resp.etag) {
throw new VersionMismatch("ETag mismatch: " + header.etag);
}
resolve(resp.data);
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.size = resp.data.byteLength;
this.sizeBytes += resp.data.byteLength;
}
this.prune();
})
.catch((e) => {
reject(e);
});
});
this.cache.set(cacheKey, { lastUsed: this.counter++, data: p, size: 0 });
return p;
}
prune() {
while (this.sizeBytes > this.maxSizeBytes) {
let minUsed = Infinity;
let minKey = undefined;
this.cache.forEach((cache_entry: CacheEntry, key: string) => {
if (cache_entry.lastUsed < minUsed) {
minUsed = cache_entry.lastUsed;
this.cache.forEach(
(cache_value: SharedPromiseCacheValue, key: string) => {
if (cache_value.lastUsed < minUsed) {
minUsed = cache_value.lastUsed;
minKey = key;
}
});
}
);
if (minKey) {
this.sizeBytes -= this.cache.get(minKey)!.size;
this.cache.delete(minKey);