events: implement captureRejections for async handlers by mcollina · Pull Request #27867 · nodejs/node · GitHub
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions doc/api/events.md
6 changes: 6 additions & 0 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const {

const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const EE = require('events');
const Stream = require('stream');
const internalUtil = require('internal/util');
const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
Expand Down Expand Up @@ -884,6 +885,11 @@ OutgoingMessage.prototype.pipe = function pipe() {
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
};

OutgoingMessage.prototype[EE.captureRejectionSymbol] =
function(err, event) {
this.destroy(err);
};

module.exports = {
OutgoingMessage
};
23 changes: 23 additions & 0 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const {
} = primordials;

const net = require('net');
const EE = require('events');
const assert = require('internal/assert');
const {
parsers,
Expand Down Expand Up @@ -351,6 +352,28 @@ Server.prototype.setTimeout = function setTimeout(msecs, callback) {
return this;
};

Server.prototype[EE.captureRejectionSymbol] = function(
err, event, ...args) {

switch (event) {
case 'request':
const [ , res] = args;
if (!res.headersSent && !res.writableEnded) {
// Don't leak headers.
for (const name of res.getHeaderNames()) {
res.removeHeader(name);
}
res.statusCode = 500;
res.end(STATUS_CODES[500]);
} else {
res.destroy();
}
break;
Comment thread
mcollina marked this conversation as resolved.
Outdated
default:
net.Server.prototype[Symbol.for('nodejs.rejection')]
.call(this, err, event, ...args);
}
};

function connectionListener(socket) {
defaultTriggerAsyncIdScope(
Expand Down
10 changes: 9 additions & 1 deletion lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ function Readable(options) {
this._destroy = options.destroy;
}

Stream.call(this);
Stream.call(this, options);
}

ObjectDefineProperty(Readable.prototype, 'destroyed', {
Expand Down Expand Up @@ -233,6 +233,14 @@ Readable.prototype._destroy = function(err, cb) {
cb(err);
};

Readable.prototype[EE.captureRejectionSymbol] = function(err) {
// TODO(mcollina): remove the destroyed if once errorEmitted lands in
// Readable.

This comment was marked as resolved.

if (!this.destroyed) {

This comment was marked as resolved.

this.destroy(err);
}
};

// Manually shove something into the read() buffer.
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
Expand Down
7 changes: 6 additions & 1 deletion lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ module.exports = Writable;
Writable.WritableState = WritableState;

const internalUtil = require('internal/util');
const EE = require('events');
const Stream = require('stream');
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
Expand Down Expand Up @@ -250,7 +251,7 @@ function Writable(options) {
this._final = options.final;
}

Stream.call(this);
Stream.call(this, options);
}

// Otherwise people can pipe Writable streams, which is just wrong.
Expand Down Expand Up @@ -808,3 +809,7 @@ Writable.prototype._undestroy = destroyImpl.undestroy;
Writable.prototype._destroy = function(err, cb) {
cb(err);
};

Writable.prototype[EE.captureRejectionSymbol] = function(err) {
this.destroy(err);
};
14 changes: 14 additions & 0 deletions lib/_tls_wrap.js
Loading