Send batch immediately when full in HttpBatchClient

This commit is contained in:
Simon Warta 2022-11-15 01:23:47 +01:00
parent 37ed295a2e
commit 66124fcc93
3 changed files with 63 additions and 18 deletions

View File

@ -13,8 +13,11 @@ and this project adheres to
fields you want to change ([#1309]).
- @cosmjs/tendermint-rpc: Add missing exports `HttpBatchClient`,
`HttpBatchClientOptions`, `RpcClient` ([#1311]).
- @cosmjs/tendermint-rpc: Send batch immediately when full in `HttpBatchClient`
([#1310]).
[#1309]: https://github.com/cosmos/cosmjs/issues/1309
[#1310]: https://github.com/cosmos/cosmjs/issues/1310
[#1311]: https://github.com/cosmos/cosmjs/issues/1311
### Fixed

View File

@ -30,4 +30,23 @@ describe("HttpBatchClient", () => {
client.disconnect();
});
it("dispatches requests as soon as batch size limit is reached", async () => {
pendingWithoutTendermint();
const client = new HttpBatchClient(tendermintUrl, {
dispatchInterval: 3600_000 /* 1h to make test time out if this is not working */,
batchSizeLimit: 3,
});
const healthResponse = await Promise.all([
client.execute(createJsonRpcRequest("health")),
client.execute(createJsonRpcRequest("health")),
client.execute(createJsonRpcRequest("health")),
]);
expect(healthResponse[0].result).toEqual({});
expect(healthResponse[1].result).toEqual({});
expect(healthResponse[2].result).toEqual({});
client.disconnect();
});
});

View File

@ -10,7 +10,9 @@ import { HttpEndpoint } from "./httpclient";
import { hasProtocol, RpcClient } from "./rpcclient";
export interface HttpBatchClientOptions {
/** Interval for dispatching batches (in milliseconds) */
dispatchInterval: number;
/** Max number of items sent in one request */
batchSizeLimit: number;
}
@ -58,6 +60,11 @@ export class HttpBatchClient implements RpcClient {
public async execute(request: JsonRpcRequest): Promise<JsonRpcSuccessResponse> {
return new Promise((resolve, reject) => {
this.queue.push({ request, resolve, reject });
if (this.queue.length >= this.options.batchSizeLimit) {
// this train is full, let's go
this.tick();
}
});
}
@ -71,19 +78,26 @@ export class HttpBatchClient implements RpcClient {
}
}
private async tick(): Promise<void> {
/**
* This is called in an interval where promise rejections cannot be handled.
* So this is not async and HTTP errors need to be handled by the queued promises.
*/
private tick(): void {
// Avoid race conditions
const queue = this.queue.splice(0, this.options.batchSizeLimit);
const batch = this.queue.splice(0, this.options.batchSizeLimit);
if (!queue.length) return;
if (!batch.length) return;
const request = queue.map((s) => s.request);
const raw = await http("POST", this.url, this.headers, request);
const requests = batch.map((s) => s.request);
const requestIds = requests.map((request) => request.id);
http("POST", this.url, this.headers, requests).then(
(raw) => {
// Requests with a single entry return as an object
const arr = Array.isArray(raw) ? raw : [raw];
arr.forEach((el) => {
const req = queue.find((s) => s.request.id === el.id);
const req = batch.find((s) => s.request.id === el.id);
if (!req) return;
const { reject, resolve } = req;
const response = parseJsonRpcResponse(el);
@ -93,5 +107,14 @@ export class HttpBatchClient implements RpcClient {
resolve(response);
}
});
},
(error) => {
for (const requestId of requestIds) {
const req = batch.find((s) => s.request.id === requestId);
if (!req) return;
req.reject(error);
}
},
);
}
}