modular decompression implementation

This commit is contained in:
Brandon Liu
2022-11-14 21:58:20 +08:00
parent bcb2313d1e
commit 5de95b8fbe

View File

@@ -141,7 +141,15 @@ export enum Compression {
Zstd = 4,
}
async function tryDecompress(buf: ArrayBuffer, compression: Compression) {
type DecompressFunc = (
buf: ArrayBuffer,
compression: Compression
) => Promise<ArrayBuffer>;
async function fflateDecompress(
buf: ArrayBuffer,
compression: Compression
): Promise<ArrayBuffer> {
if (compression === Compression.None || compression === Compression.Unknown) {
return buf;
} else if (compression === Compression.Gzip) {
@@ -416,6 +424,7 @@ export interface Cache {
async function getHeaderAndRoot(
source: Source,
decompress: DecompressFunc,
prefetch: boolean,
current_etag?: string
): Promise<[Header, [string, number, Entry[] | ArrayBuffer]?]> {
@@ -461,7 +470,7 @@ async function getHeaderAndRoot(
header.rootDirectoryLength;
const rootDir = deserializeIndex(
await tryDecompress(rootDirData, header.internalCompression)
await decompress(rootDirData, header.internalCompression)
);
return [header, [dirKey, ENTRY_SIZE_BYTES * rootDir.length, rootDir]];
}
@@ -471,6 +480,7 @@ async function getHeaderAndRoot(
async function getDirectory(
source: Source,
decompress: DecompressFunc,
offset: number,
length: number,
header: Header
@@ -481,7 +491,7 @@ async function getDirectory(
throw new EtagMismatch(resp.etag);
}
const data = await tryDecompress(resp.data, header.internalCompression);
const data = await decompress(resp.data, header.internalCompression);
const directory = deserializeIndex(data);
if (directory.length === 0) {
throw new Error("Empty directory is invalid");
@@ -502,13 +512,19 @@ export class ResolvedValueCache {
maxSizeBytes: number;
counter: number;
prefetch: boolean;
decompress: DecompressFunc;
constructor(maxSizeBytes = 64000000, prefetch = true) {
constructor(
maxSizeBytes = 64000000,
prefetch = true,
decompress: DecompressFunc = fflateDecompress
) {
this.cache = new Map<string, ResolvedValue>();
this.sizeBytes = 0;
this.maxSizeBytes = maxSizeBytes;
this.counter = 1;
this.prefetch = prefetch;
this.decompress = decompress;
}
async getHeader(source: Source, current_etag?: string): Promise<Header> {
@@ -519,7 +535,12 @@ export class ResolvedValueCache {
return data as Header;
}
const res = await getHeaderAndRoot(source, this.prefetch, current_etag);
const res = await getHeaderAndRoot(
source,
this.decompress,
this.prefetch,
current_etag
);
if (res[1]) {
this.cache.set(res[1][0], {
lastUsed: this.counter++,
@@ -552,7 +573,13 @@ export class ResolvedValueCache {
return data as Entry[];
}
const directory = await getDirectory(source, offset, length, header);
const directory = await getDirectory(
source,
this.decompress,
offset,
length,
header
);
this.cache.set(cacheKey, {
lastUsed: this.counter++,
data: directory,
@@ -631,13 +658,19 @@ export class SharedPromiseCache {
maxSizeBytes: number;
counter: number;
prefetch: boolean;
decompress: DecompressFunc;
constructor(maxSizeBytes = 64000000, prefetch = true) {
constructor(
maxSizeBytes = 64000000,
prefetch = true,
decompress: DecompressFunc = fflateDecompress
) {
this.cache = new Map<string, SharedPromiseCacheValue>();
this.sizeBytes = 0;
this.maxSizeBytes = maxSizeBytes;
this.counter = 1;
this.prefetch = prefetch;
this.decompress = decompress;
}
async getHeader(source: Source, current_etag?: string): Promise<Header> {
@@ -649,7 +682,7 @@ export class SharedPromiseCache {
}
const p = new Promise<Header>((resolve, reject) => {
getHeaderAndRoot(source, this.prefetch, current_etag)
getHeaderAndRoot(source, this.decompress, this.prefetch, current_etag)
.then((res) => {
if (this.cache.has(cacheKey)) {
this.cache.get(cacheKey)!.size = HEADER_SIZE_BYTES;
@@ -688,7 +721,7 @@ export class SharedPromiseCache {
}
const p = new Promise<Entry[]>((resolve, reject) => {
getDirectory(source, offset, length, header)
getDirectory(source, this.decompress, offset, length, header)
.then((directory) => {
resolve(directory);
if (this.cache.has(cacheKey)) {
@@ -771,13 +804,23 @@ export class SharedPromiseCache {
export class PMTiles {
source: Source;
cache: Cache;
decompress: DecompressFunc;
constructor(source: Source | string, cache?: Cache) {
constructor(
source: Source | string,
cache?: Cache,
decompress?: DecompressFunc
) {
if (typeof source === "string") {
this.source = new FetchSource(source);
} else {
this.source = source;
}
if (decompress) {
this.decompress = decompress;
} else {
this.decompress = fflateDecompress;
}
if (cache) {
this.cache = cache;
} else {
@@ -843,7 +886,7 @@ export class PMTiles {
throw new EtagMismatch(resp.etag);
}
return {
data: await tryDecompress(resp.data, header.tileCompression),
data: await this.decompress(resp.data, header.tileCompression),
cacheControl: resp.cacheControl,
expires: resp.expires,
};
@@ -886,7 +929,10 @@ export class PMTiles {
if (header.etag && header.etag !== resp.etag) {
throw new EtagMismatch(resp.etag);
}
const decompressed = await tryDecompress(resp.data, header.internalCompression);
const decompressed = await this.decompress(
resp.data,
header.internalCompression
);
const dec = new TextDecoder("utf-8");
return JSON.parse(dec.decode(decompressed));
}