Skip to content

Commit e5f32c7

Browse files
authored
[fix] Emit at most one event per event loop iteration (#2218)
Fixes #2216
1 parent 2aa0405 commit e5f32c7

File tree

3 files changed

+24
-59
lines changed

3 files changed

+24
-59
lines changed

lib/receiver.js

+4-45
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,6 @@ const { concat, toArrayBuffer, unmask } = require('./buffer-util');
1313
const { isValidStatusCode, isValidUTF8 } = require('./validation');
1414

1515
const FastBuffer = Buffer[Symbol.species];
16-
const promise = Promise.resolve();
17-
18-
//
19-
// `queueMicrotask()` is not available in Node.js < 11.
20-
//
21-
const queueTask =
22-
typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;
2316

2417
const GET_INFO = 0;
2518
const GET_PAYLOAD_LENGTH_16 = 1;
@@ -567,17 +560,12 @@ class Receiver extends Writable {
567560
data = fragments;
568561
}
569562

570-
//
571-
// If the state is `INFLATING`, it means that the frame data was
572-
// decompressed asynchronously, so there is no need to defer the event
573-
// as it will be emitted asynchronously anyway.
574-
//
575-
if (this._state === INFLATING || this._allowSynchronousEvents) {
563+
if (this._allowSynchronousEvents) {
576564
this.emit('message', data, true);
577565
this._state = GET_INFO;
578566
} else {
579567
this._state = DEFER_EVENT;
580-
queueTask(() => {
568+
setImmediate(() => {
581569
this.emit('message', data, true);
582570
this._state = GET_INFO;
583571
this.startLoop(cb);
@@ -604,7 +592,7 @@ class Receiver extends Writable {
604592
this._state = GET_INFO;
605593
} else {
606594
this._state = DEFER_EVENT;
607-
queueTask(() => {
595+
setImmediate(() => {
608596
this.emit('message', buf, false);
609597
this._state = GET_INFO;
610598
this.startLoop(cb);
@@ -675,7 +663,7 @@ class Receiver extends Writable {
675663
this._state = GET_INFO;
676664
} else {
677665
this._state = DEFER_EVENT;
678-
queueTask(() => {
666+
setImmediate(() => {
679667
this.emit(this._opcode === 0x09 ? 'ping' : 'pong', data);
680668
this._state = GET_INFO;
681669
this.startLoop(cb);
@@ -711,32 +699,3 @@ class Receiver extends Writable {
711699
}
712700

713701
module.exports = Receiver;
714-
715-
/**
716-
* A shim for `queueMicrotask()`.
717-
*
718-
* @param {Function} cb Callback
719-
*/
720-
function queueMicrotaskShim(cb) {
721-
promise.then(cb).catch(throwErrorNextTick);
722-
}
723-
724-
/**
725-
* Throws an error.
726-
*
727-
* @param {Error} err The error to throw
728-
* @private
729-
*/
730-
function throwError(err) {
731-
throw err;
732-
}
733-
734-
/**
735-
* Throws an error in the next tick.
736-
*
737-
* @param {Error} err The error to throw
738-
* @private
739-
*/
740-
function throwErrorNextTick(err) {
741-
process.nextTick(throwError, err);
742-
}

test/receiver.test.js

+18-10
Original file line numberDiff line numberDiff line change
@@ -1085,17 +1085,21 @@ describe('Receiver', () => {
10851085
receiver.write(Buffer.from([0x88, 0x03, 0x03, 0xe8, 0xf8]));
10861086
});
10871087

1088-
it('emits at most one event per microtask', (done) => {
1088+
it('emits at most one event per event loop iteration', (done) => {
10891089
const actual = [];
10901090
const expected = [
10911091
'1',
1092-
'microtask 1',
1092+
'- 1',
1093+
'-- 1',
10931094
'2',
1094-
'microtask 2',
1095+
'- 2',
1096+
'-- 2',
10951097
'3',
1096-
'microtask 3',
1098+
'- 3',
1099+
'-- 3',
10971100
'4',
1098-
'microtask 4'
1101+
'- 4',
1102+
'-- 4'
10991103
];
11001104

11011105
function listener(data) {
@@ -1104,12 +1108,16 @@ describe('Receiver', () => {
11041108

11051109
// `queueMicrotask()` is not available in Node.js < 11.
11061110
Promise.resolve().then(() => {
1107-
actual.push(`microtask ${message}`);
1111+
actual.push(`- ${message}`);
11081112

1109-
if (actual.length === 8) {
1110-
assert.deepStrictEqual(actual, expected);
1111-
done();
1112-
}
1113+
Promise.resolve().then(() => {
1114+
actual.push(`-- ${message}`);
1115+
1116+
if (actual.length === 12) {
1117+
assert.deepStrictEqual(actual, expected);
1118+
done();
1119+
}
1120+
});
11131121
});
11141122
}
11151123

test/websocket.test.js

+2-4
Original file line numberDiff line numberDiff line change
@@ -4234,8 +4234,7 @@ describe('WebSocket', () => {
42344234

42354235
if (messages.push(message.toString()) > 1) return;
42364236

4237-
// `queueMicrotask()` is not available in Node.js < 11.
4238-
Promise.resolve().then(() => {
4237+
setImmediate(() => {
42394238
process.nextTick(() => {
42404239
assert.strictEqual(ws._receiver._state, 5);
42414240
ws.close(1000);
@@ -4485,8 +4484,7 @@ describe('WebSocket', () => {
44854484

44864485
if (messages.push(message.toString()) > 1) return;
44874486

4488-
// `queueMicrotask()` is not available in Node.js < 11.
4489-
Promise.resolve().then(() => {
4487+
setImmediate(() => {
44904488
process.nextTick(() => {
44914489
assert.strictEqual(ws._receiver._state, 5);
44924490
ws.terminate();

0 commit comments

Comments
 (0)