http: fix stalled pipeline bug by indutny · Pull Request #3342 · nodejs/node · GitHub
Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/_http_common.js
66 changes: 31 additions & 35 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) {
this.outputEncodings.unshift('binary');
this.outputCallbacks.unshift(null);
this.outputSize += this._header.length;
if (this._onPendingData !== null)
if (typeof this._onPendingData === 'function')

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is checking for typeof function part of a fix, or different cleanup?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a cleanup.

this._onPendingData(this._header.length);
}
this._headerSent = true;
Expand All @@ -158,22 +158,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) {
// There might be pending data in the this.output buffer.
var outputLength = this.output.length;
if (outputLength > 0) {
var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
connection.cork();
for (var i = 0; i < outputLength; i++) {
connection.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
connection.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
if (this._onPendingData !== null)
this._onPendingData(-this.outputSize);
this.outputSize = 0;
this._flushOutput(connection);
} else if (data.length === 0) {
if (typeof callback === 'function')
process.nextTick(callback);
Expand All @@ -198,7 +183,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) {
this.outputEncodings.push(encoding);
this.outputCallbacks.push(callback);
this.outputSize += data.length;
if (this._onPendingData !== null)
if (typeof this._onPendingData === 'function')
this._onPendingData(data.length);
return false;
};
Expand Down Expand Up @@ -644,26 +629,11 @@ OutgoingMessage.prototype._finish = function() {
// to attempt to flush any pending messages out to the socket.
OutgoingMessage.prototype._flush = function() {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var socket = this.socket;
var outputLength, ret;
var ret;

if (socket && socket.writable) {
// There might be remaining data in this.output; write it out
outputLength = this.output.length;
if (outputLength > 0) {
var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
socket.cork();
for (var i = 0; i < outputLength; i++) {
ret = socket.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
socket.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
}
ret = this._flushOutput(socket);

if (this.finished) {
// This is a queue to the server or client to bring in the next this.
Expand All @@ -675,6 +645,32 @@ OutgoingMessage.prototype._flush = function() {
}
};

OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
var ret;
var outputLength = this.output.length;
if (outputLength <= 0)
return ret;

var output = this.output;
var outputEncodings = this.outputEncodings;
var outputCallbacks = this.outputCallbacks;
socket.cork();
for (var i = 0; i < outputLength; i++) {
ret = socket.write(output[i], outputEncodings[i],
outputCallbacks[i]);
}
socket.uncork();

this.output = [];
this.outputEncodings = [];
this.outputCallbacks = [];
if (typeof this._onPendingData === 'function')
this._onPendingData(-this.outputSize);
this.outputSize = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is in OutgoingMessage#_writeRaw() but not in OutgoingMessage#_finish(). Is this already addressed in the git commit message? Don't see any direct reference to why this is necessary for the fix.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant _flush?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah oop. Yeah _flush.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is used there too.


return ret;
};


OutgoingMessage.prototype.flushHeaders = function() {
if (!this._header) {
Expand Down
39 changes: 33 additions & 6 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,10 @@ function connectionListener(socket) {
socket.on = socketOnWrap;

var external = socket._handle._externalStream;
if (external)
if (external) {
parser._consumed = true;
parser.consume(external);
}
external = null;
parser[kOnExecute] = onParserExecute;

Expand Down Expand Up @@ -382,7 +384,7 @@ function connectionListener(socket) {
socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
parser.unconsume(socket._handle._externalStream);
unconsume(parser, socket);
parser.finish();
freeParser(parser, req, null);
parser = null;
Expand Down Expand Up @@ -530,13 +532,38 @@ function connectionListener(socket) {
exports._connectionListener = connectionListener;

function onSocketResume() {
if (this._handle)
// It may seem that the socket is resumed, but this is an enemy's trick to
// deceive us! `resume` is emitted asynchronously, and may be called from
// `incoming.readStart()`. Stop the socket again here, just to preserve the
// state.
//
// We don't care about stream semantics for the consumed socket anyway.
if (this._paused) {
this.pause();
return;
}

if (this._handle && !this._handle.reading) {
this._handle.reading = true;
this._handle.readStart();
}
}

function onSocketPause() {
if (this._handle)
if (this._handle && this._handle.reading) {
this._handle.reading = false;
this._handle.readStop();
}
}

function unconsume(parser, socket) {
if (socket._handle) {
if (parser._consumed)
parser.unconsume(socket._handle._externalStream);
parser._consumed = false;
socket.removeListener('pause', onSocketPause);
socket.removeListener('resume', onSocketResume);
}
}

function socketOnWrap(ev, fn) {
Expand All @@ -546,8 +573,8 @@ function socketOnWrap(ev, fn) {
return res;
}

if (this._handle && (ev === 'data' || ev === 'readable'))
this.parser.unconsume(this._handle._externalStream);
if (ev === 'data' || ev === 'readable')
unconsume(this.parser, this);

return res;
}
17 changes: 11 additions & 6 deletions src/node_http_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,18 @@ class Parser : public BaseObject {
if (parser->prev_alloc_cb_.is_empty())
return;

CHECK(args[0]->IsExternal());
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);
// Restore stream's callbacks
if (args.Length() == 1 && args[0]->IsExternal()) {
Local<External> stream_obj = args[0].As<External>();
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
CHECK_NE(stream, nullptr);

stream->set_alloc_cb(parser->prev_alloc_cb_);
stream->set_read_cb(parser->prev_read_cb_);
}

stream->set_alloc_cb(parser->prev_alloc_cb_);
stream->set_read_cb(parser->prev_read_cb_);
parser->prev_alloc_cb_.clear();
parser->prev_read_cb_.clear();
}


Expand Down
41 changes: 41 additions & 0 deletions test/parallel/test-http-pipeline-regr-3332.js