diff --git a/CHANGELOG.md b/CHANGELOG.md index e6bf9dc967..0125474b45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,11 @@ and this project adheres to fields you want to change ([#1309]). - @cosmjs/tendermint-rpc: Add missing exports `HttpBatchClient`, `HttpBatchClientOptions`, `RpcClient` ([#1311]). +- @cosmjs/tendermint-rpc: Send batch immediately when full in `HttpBatchClient` + ([#1310]). [#1309]: https://github.com/cosmos/cosmjs/issues/1309 +[#1310]: https://github.com/cosmos/cosmjs/issues/1310 [#1311]: https://github.com/cosmos/cosmjs/issues/1311 ### Fixed diff --git a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts index d242f84f80..b05c3e56de 100644 --- a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts +++ b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.spec.ts @@ -30,4 +30,23 @@ describe("HttpBatchClient", () => { client.disconnect(); }); + + it("dispatches requests as soon as batch size limit is reached", async () => { + pendingWithoutTendermint(); + const client = new HttpBatchClient(tendermintUrl, { + dispatchInterval: 3600_000 /* 1h to make test time out if this is not working */, + batchSizeLimit: 3, + }); + + const healthResponse = await Promise.all([ + client.execute(createJsonRpcRequest("health")), + client.execute(createJsonRpcRequest("health")), + client.execute(createJsonRpcRequest("health")), + ]); + expect(healthResponse[0].result).toEqual({}); + expect(healthResponse[1].result).toEqual({}); + expect(healthResponse[2].result).toEqual({}); + + client.disconnect(); + }); }); diff --git a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts index d91dc10499..4b8e4fe653 100644 --- a/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts +++ b/packages/tendermint-rpc/src/rpcclients/httpbatchclient.ts @@ -10,7 +10,9 @@ import { HttpEndpoint } from "./httpclient"; import { hasProtocol, RpcClient } from "./rpcclient"; export interface HttpBatchClientOptions { + /** Interval for dispatching batches (in milliseconds) */ dispatchInterval: number; + /** Max number of items sent in one request */ batchSizeLimit: number; } @@ -58,6 +60,11 @@ export class HttpBatchClient implements RpcClient { public async execute(request: JsonRpcRequest): Promise { return new Promise((resolve, reject) => { this.queue.push({ request, resolve, reject }); + + if (this.queue.length >= this.options.batchSizeLimit) { + // this train is full, let's go + this.tick(); + } }); } @@ -71,27 +78,43 @@ export class HttpBatchClient implements RpcClient { } } - private async tick(): Promise { + /** + * This is called in an interval where promise rejections cannot be handled. + * So this is not async and HTTP errors need to be handled by the queued promises. + */ + private tick(): void { // Avoid race conditions - const queue = this.queue.splice(0, this.options.batchSizeLimit); + const batch = this.queue.splice(0, this.options.batchSizeLimit); - if (!queue.length) return; + if (!batch.length) return; - const request = queue.map((s) => s.request); - const raw = await http("POST", this.url, this.headers, request); - // Requests with a single entry return as an object - const arr = Array.isArray(raw) ? raw : [raw]; + const requests = batch.map((s) => s.request); + const requestIds = requests.map((request) => request.id); - arr.forEach((el) => { - const req = queue.find((s) => s.request.id === el.id); - if (!req) return; - const { reject, resolve } = req; - const response = parseJsonRpcResponse(el); - if (isJsonRpcErrorResponse(response)) { - reject(new Error(JSON.stringify(response.error))); - } else { - resolve(response); - } - }); + http("POST", this.url, this.headers, requests).then( + (raw) => { + // Requests with a single entry return as an object + const arr = Array.isArray(raw) ? raw : [raw]; + + arr.forEach((el) => { + const req = batch.find((s) => s.request.id === el.id); + if (!req) return; + const { reject, resolve } = req; + const response = parseJsonRpcResponse(el); + if (isJsonRpcErrorResponse(response)) { + reject(new Error(JSON.stringify(response.error))); + } else { + resolve(response); + } + }); + }, + (error) => { + for (const requestId of requestIds) { + const req = batch.find((s) => s.request.id === requestId); + if (!req) return; + req.reject(error); + } + }, + ); } }