mirror of
https://github.com/uNetworking/uWebSockets.js.git
synced 2026-03-28 05:01:24 -04:00
Implement onStream: unified body streaming + abort callback (#1246)
* Initial plan * Implement onStream C++ function and add TypeScript type definition Co-authored-by: uNetworkingAB <110806833+uNetworkingAB@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: uNetworkingAB <110806833+uNetworkingAB@users.noreply.github.com>
This commit is contained in:
6
docs/index.d.ts
vendored
6
docs/index.d.ts
vendored
@@ -199,6 +199,12 @@ export interface HttpResponse {
|
||||
* If the total body size exceeds maxSize bytes, handler is called with null instead. */
|
||||
onFullData(maxSize: number, handler: (fullBody: ArrayBuffer | null) => void) : HttpResponse;
|
||||
|
||||
/** Combined handler for HTTP request body streaming and connection abort events.
|
||||
* If chunk is null, the connection was aborted. If maxRemainingBodyLength is 0n, the last chunk has arrived.
|
||||
* You can safely preallocate using maxRemainingBodyLength (it is very large for chunked transfer encoding).
|
||||
* You MUST copy the data of chunk if maxRemainingBodyLength is not 0n. We Neuter ArrayBuffers on return, making them zero length. */
|
||||
onStream(handler: (chunk: ArrayBuffer | null, maxRemainingBodyLength: bigint) => void) : HttpResponse;
|
||||
|
||||
/** Returns the remote IP address in binary format (4 or 16 bytes). */
|
||||
getRemoteAddress() : ArrayBuffer;
|
||||
|
||||
|
||||
@@ -208,6 +208,47 @@ struct HttpResponseWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
/* Takes a function of (chunk, maxRemainingBodyLength). Combines onAborted and onData into a single callback.
|
||||
* If chunk is null, the connection was aborted. If maxRemainingBodyLength is 0, the last chunk has arrived.
|
||||
* The JS object is invalidated before the abort callback is called. Returns this. */
|
||||
template <int SSL>
|
||||
static void res_onStream(const FunctionCallbackInfo<Value> &args) {
|
||||
Isolate *isolate = args.GetIsolate();
|
||||
auto *res = getHttpResponse<SSL>(args);
|
||||
if (res) {
|
||||
/* Share the persistent function between both onAborted and onData lambdas */
|
||||
auto sharedP = std::make_shared<UniquePersistent<Function>>(isolate, Local<Function>::Cast(args[0]));
|
||||
|
||||
/* This is how we capture res (C++ this in invocation of this function) */
|
||||
UniquePersistent<Object> resObject(isolate, args.This());
|
||||
|
||||
res->onAborted([resObject = std::move(resObject), sharedP, isolate]() {
|
||||
HandleScope hs(isolate);
|
||||
|
||||
/* Mark this resObject invalid */
|
||||
Local<Object>::New(isolate, resObject)->SetAlignedPointerInInternalField(0, nullptr);
|
||||
|
||||
/* Call handler with (null, 0n) to signal abort */
|
||||
Local<Value> argv[] = {Null(isolate), BigInt::NewFromUnsigned(isolate, 0)};
|
||||
CallJS(isolate, Local<Function>::New(isolate, *sharedP), 2, argv);
|
||||
});
|
||||
|
||||
res->onData([res, sharedP, isolate](std::string_view data, bool last) {
|
||||
HandleScope hs(isolate);
|
||||
|
||||
Local<ArrayBuffer> dataArrayBuffer = ArrayBuffer_New(isolate, (void *) data.data(), data.length());
|
||||
|
||||
/* Pass maxRemainingBodyLength so user can preallocate; 0 signals the last chunk */
|
||||
Local<Value> argv[] = {dataArrayBuffer, BigInt::NewFromUnsigned(isolate, res->maxRemainingBodyLength())};
|
||||
CallJS(isolate, Local<Function>::New(isolate, *sharedP), 2, argv);
|
||||
|
||||
dataArrayBuffer->Detach();
|
||||
});
|
||||
|
||||
args.GetReturnValue().Set(args.This());
|
||||
}
|
||||
}
|
||||
|
||||
/* Takes nothing, returns nothing. Cb wants nothing returned. */
|
||||
template <int SSL>
|
||||
static void res_onAborted(const FunctionCallbackInfo<Value> &args) {
|
||||
@@ -597,6 +638,7 @@ struct HttpResponseWrapper {
|
||||
|
||||
/* QUIC has a lot of functions unimplemented */
|
||||
if constexpr (SSL != 2) {
|
||||
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onStream", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_onStream<SSL>));
|
||||
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "onFullData", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_onFullData<SSL>));
|
||||
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "getWriteOffset", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_getWriteOffset<SSL>));
|
||||
resTemplateLocal->PrototypeTemplate()->Set(String::NewFromUtf8(isolate, "maxRemainingBodyLength", NewStringType::kNormal).ToLocalChecked(), FunctionTemplate::New(isolate, res_maxRemainingBodyLength<SSL>));
|
||||
|
||||
Reference in New Issue
Block a user