Clean up onDataV2 and collectBody code, and update their documentation (#1256)

This commit is contained in:
BV-WebDev
2026-03-26 20:08:48 +01:00
committed by GitHub
parent 0b0c8f90d3
commit 2aff4e4bf4
2 changed files with 43 additions and 65 deletions

9
docs/index.d.ts vendored
View File

@@ -195,17 +195,18 @@ export interface HttpResponse {
/** Resume HTTP request body streaming (unthrottle). */
resume() : void;
/** Accumulates all data chunks and calls handler with the complete body as an ArrayBuffer once all data has arrived.
/** collectBody is a helper function making optimal use of the new onDataV2.
* It allows efficient and easy collection of smallish HTTP request body data into RAM.
* It accumulates all data chunks and calls handler with the complete body as an ArrayBuffer once all data has arrived.
* If the total body size exceeds maxSize bytes, handler is called with null instead. */
collectBody(maxSize: number, handler: (fullBody: ArrayBuffer | null) => void) : HttpResponse;
/** Handler for reading HTTP request body data. V2.
* Must be attached before performing any asynchronous operation, otherwise data may be lost.
* You MUST copy the data of chunk if maxRemainingBodyLength is not 0. We Neuter ArrayBuffers on return, making them zero length.
*
* You MUST copy the data of chunk if maxRemainingBodyLength is not 0n. We Neuter ArrayBuffers on return, making them zero length.
* maxRemainingBodyLength is the known maximum of the remaining body length. Can be used to preallocate a receive buffer.
*/
onDataV2(handler: (chunk: ArrayBuffer | null, maxRemainingBodyLength: bigint) => void) : HttpResponse;
onDataV2(handler: (chunk: ArrayBuffer, maxRemainingBodyLength: bigint) => void) : HttpResponse;
/** Returns the remote IP address in binary format (4 or 16 bytes). */
getRemoteAddress() : ArrayBuffer;

View File

@@ -122,16 +122,6 @@ struct HttpResponseWrapper {
Isolate *isolate = args.GetIsolate();
auto *res = getHttpResponse<SSL>(args);
if (res) {
/* This is how we capture res (C++ this in invocation of this function) */
UniquePersistent<Object> resObject(isolate, args.This());
res->onAborted([resObject = std::move(resObject), isolate]() {
HandleScope hs(isolate);
/* Mark this resObject invalid */
Local<Object>::New(isolate, resObject)->SetAlignedPointerInInternalField(0, nullptr);
});
size_t maxSize = (size_t) args[0]->NumberValue(isolate->GetCurrentContext()).ToChecked();
/* This thing perfectly fits in with unique_function, and will Reset on destructor */
@@ -142,49 +132,43 @@ struct HttpResponseWrapper {
std::unique_ptr<std::vector<char>> buffer;
bool overflow = false;
res->onDataV2([res, p = std::move(p), buffer = std::move(buffer), overflow, maxSize, isolate](std::string_view data, uint64_t maxRemainingBodyLength) mutable {
res->onDataV2([p = std::move(p), buffer = std::move(buffer), overflow, maxSize, isolate](std::string_view data, uint64_t maxRemainingBodyLength) mutable {
HandleScope hs(isolate);
if (!overflow) {
if (!buffer) {
/* Fast path: this is the very first (and possibly only) chunk */
if (maxRemainingBodyLength == 0) {
if (data.size() <= maxSize) {
/* Single-chunk zero-copy: wrap data directly, detach after call like onData */
Local<ArrayBuffer> ab = ArrayBuffer_New(isolate, (void *) data.data(), data.size());
Local<Value> argv[] = {ab};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
ab->Detach();
} else {
Local<Value> argv[] = {Null(isolate)};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
}
return;
}
/* Slow path begins: allocate buffer lazily for first non-terminal chunk */
if (data.size() <= maxSize) {
buffer = std::make_unique<std::vector<char>>();
/* Preallocate with hint */
if (maxRemainingBodyLength <= maxSize) {
buffer->reserve(maxRemainingBodyLength); // this includes the total size on first call (look over this)
}
buffer->assign(data.begin(), data.end());
} else {
overflow = true;
}
if (overflow) {
return;
} else if (!buffer) {
/* First and possibly only chunk */
if (data.size() > maxSize) {
/* Overflow: return to JS with null */
overflow = true;
Local<Value> argv[] = {Null(isolate)};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
} else if (maxRemainingBodyLength == 0) {
/* Fast path: Single-chunk zero-copy: wrap data directly, detach after call like onData */
Local<ArrayBuffer> ab = ArrayBuffer_New(isolate, (void *) data.data(), data.size());
Local<Value> argv[] = {ab};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
ab->Detach();
} else {
/* Subsequent chunks: accumulate or mark overflow; guard both sides of subtraction */
if (buffer->size() <= maxSize && data.size() <= maxSize - buffer->size()) {
buffer->insert(buffer->end(), data.begin(), data.end());
} else {
buffer.reset();
overflow = true;
/* Slow path begins: allocate buffer lazily for first non-terminal chunk */
buffer = std::make_unique<std::vector<char>>();
if (maxRemainingBodyLength <= maxSize - data.size()) {
/* Preallocate with hint */
buffer->reserve(maxRemainingBodyLength + data.size());
}
buffer->assign(data.begin(), data.end());
}
}
if (maxRemainingBodyLength == 0) {
if (!overflow) {
} else if (data.size() > maxSize - buffer->size()) {
/* Subsequent chunks Overflow: return to JS with null */
buffer.reset();
overflow = true;
Local<Value> argv[] = {Null(isolate)};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
} else {
/* Subsequent chunks: accumulate */
buffer->insert(buffer->end(), data.begin(), data.end());
if (maxRemainingBodyLength == 0) {
/* Zero-copy: hand V8 the vector's own memory via a custom deleter */
auto *rawBuffer = buffer.release();
auto backingStore = ArrayBuffer::NewBackingStore(
@@ -197,9 +181,6 @@ struct HttpResponseWrapper {
Local<ArrayBuffer> ab = ArrayBuffer::New(isolate, std::move(backingStore));
Local<Value> argv[] = {ab};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
} else {
Local<Value> argv[] = {Null(isolate)};
CallJS(isolate, Local<Function>::New(isolate, p), 1, argv);
}
}
});
@@ -208,28 +189,24 @@ struct HttpResponseWrapper {
}
}
/* Takes a function of (chunk, maxRemainingBodyLength). Combines onAborted and onData into a single callback.
* If chunk is null, the connection was aborted. If maxRemainingBodyLength is 0, the last chunk has arrived.
* The JS object is invalidated before the abort callback is called. Returns this. */
/* Takes function of chunk and maxRemainingBodyLength. Returns this.
* If maxRemainingBodyLength is 0, the last chunk has arrived. */
template <int SSL>
static void res_onDataV2(const FunctionCallbackInfo<Value> &args) {
Isolate *isolate = args.GetIsolate();
auto *res = getHttpResponse<SSL>(args);
if (res) {
/* Share the persistent function between both onAborted and onData lambdas */
auto sharedP = std::make_shared<UniquePersistent<Function>>(isolate, Local<Function>::Cast(args[0]));
/* This thing perfectly fits in with unique_function, and will Reset on destructor */
UniquePersistent<Function> p(isolate, Local<Function>::Cast(args[0]));
/* This is how we capture res (C++ this in invocation of this function) */
UniquePersistent<Object> resObject(isolate, args.This());
res->onDataV2([res, sharedP, isolate](std::string_view data, uint64_t maxRemainingBodyLength) {
res->onDataV2([p = std::move(p), isolate](std::string_view data, uint64_t maxRemainingBodyLength) {
HandleScope hs(isolate);
Local<ArrayBuffer> dataArrayBuffer = ArrayBuffer_New(isolate, (void *) data.data(), data.length());
/* Pass maxRemainingBodyLength so user can preallocate; 0 signals the last chunk */
Local<Value> argv[] = {dataArrayBuffer, BigInt::NewFromUnsigned(isolate, maxRemainingBodyLength)};
CallJS(isolate, Local<Function>::New(isolate, *sharedP), 2, argv);
CallJS(isolate, Local<Function>::New(isolate, p), 2, argv);
dataArrayBuffer->Detach();
});