From 5ff1c5497b7ea82db37def33dfb16e1fbece0d04 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Sun, 15 Mar 2026 12:43:55 +0100 Subject: [PATCH] Implement `onStream`: unified body streaming + abort callback (#1246) * Initial plan * Implement onStream C++ function and add TypeScript type definition Co-authored-by: uNetworkingAB <110806833+uNetworkingAB@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: uNetworkingAB <110806833+uNetworkingAB@users.noreply.github.com> --- docs/index.d.ts | 6 ++++++ src/HttpResponseWrapper.h | 42 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/docs/index.d.ts b/docs/index.d.ts index 29da76ab..f71a0d92 100644 --- a/docs/index.d.ts +++ b/docs/index.d.ts @@ -199,6 +199,12 @@ export interface HttpResponse { * If the total body size exceeds maxSize bytes, handler is called with null instead. */ onFullData(maxSize: number, handler: (fullBody: ArrayBuffer | null) => void) : HttpResponse; + /** Combined handler for HTTP request body streaming and connection abort events. + * If chunk is null, the connection was aborted. If maxRemainingBodyLength is 0n, the last chunk has arrived. + * You can safely preallocate using maxRemainingBodyLength (it is very large for chunked transfer encoding). + * You MUST copy the data of chunk if maxRemainingBodyLength is not 0n. We Neuter ArrayBuffers on return, making them zero length. */ + onStream(handler: (chunk: ArrayBuffer | null, maxRemainingBodyLength: bigint) => void) : HttpResponse; + /** Returns the remote IP address in binary format (4 or 16 bytes). */ getRemoteAddress() : ArrayBuffer; diff --git a/src/HttpResponseWrapper.h b/src/HttpResponseWrapper.h index ec71920c..b6a1955d 100644 --- a/src/HttpResponseWrapper.h +++ b/src/HttpResponseWrapper.h @@ -208,6 +208,47 @@ struct HttpResponseWrapper { } } + /* Takes a function of (chunk, maxRemainingBodyLength). Combines onAborted and onData into a single callback. + * If chunk is null, the connection was aborted. If maxRemainingBodyLength is 0, the last chunk has arrived. + * The JS object is invalidated before the abort callback is called. Returns this. */ + template + static void res_onStream(const FunctionCallbackInfo &args) { + Isolate *isolate = args.GetIsolate(); + auto *res = getHttpResponse(args); + if (res) { + /* Share the persistent function between both onAborted and onData lambdas */ + auto sharedP = std::make_shared>(isolate, Local::Cast(args[0])); + + /* This is how we capture res (C++ this in invocation of this function) */ + UniquePersistent resObject(isolate, args.This()); + + res->onAborted([resObject = std::move(resObject), sharedP, isolate]() { + HandleScope hs(isolate); + + /* Mark this resObject invalid */ + Local::New(isolate, resObject)->SetAlignedPointerInInternalField(0, nullptr); + + /* Call handler with (null, 0n) to signal abort */ + Local argv[] = {Null(isolate), BigInt::NewFromUnsigned(isolate, 0)}; + CallJS(isolate, Local::New(isolate, *sharedP), 2, argv); + }); + + res->onData([res, sharedP, isolate](std::string_view data, bool last) { + HandleScope hs(isolate); + + Local dataArrayBuffer = ArrayBuffer_New(isolate, (void *) data.data(), data.length()); + + /* Pass maxRemainingBodyLength so user can preallocate; 0 signals the last chunk */ + Local argv[] = {dataArrayBuffer, BigInt::NewFromUnsigned(isolate, res->maxRemainingBodyLength())}; + CallJS(isolate, Local::New(isolate, *sharedP), 2, argv); + + dataArrayBuffer->Detach(); + }); + + args.GetReturnValue().Set(args.This()); + } + } + /* Takes nothing, returns nothing. Cb wants nothing returned. */ template static void res_onAborted(const FunctionCallbackInfo &args) { @@ -597,6 +638,7 @@ struct HttpResponseWrapper { /* QUIC has a lot of functions unimplemented */ if constexpr (SSL != 2) { + resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onStream", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_onStream)); resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onFullData", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_onFullData)); resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getWriteOffset", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_getWriteOffset)); resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "maxRemainingBodyLength", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_maxRemainingBodyLength));