3
3
const { WebsocketFrameSend } = require ( './frame' )
4
4
const { opcodes, sendHints } = require ( './constants' )
5
5
6
- /** @type {Uint8Array } */
6
+ /** @type {typeof Uint8Array } */
7
7
const FastBuffer = Buffer [ Symbol . species ]
8
8
9
- class SendQueue {
10
- #queued = new Set ( )
11
- #size = 0
9
+ /**
10
+ * @typedef {object } SendQueueNode
11
+ * @property {SendQueueNode | null } next
12
+ * @property {Promise<void> | null } promise
13
+ * @property {((...args: any[]) => any) } callback
14
+ * @property {Buffer | null } frame
15
+ * @property {boolean } resolved
16
+ */
12
17
13
- /** @type {import('net').Socket } */
18
+ class SendQueue {
19
+ /**
20
+ * @type {SendQueueNode | null }
21
+ */
22
+ #head = null
23
+ /**
24
+ * @type {SendQueueNode | null }
25
+ */
26
+ #tail = null
27
+
28
+ /**
29
+ * @type {boolean }
30
+ */
31
+ #running = false
32
+
33
+ /** @type {import('node:net').Socket } */
14
34
#socket
15
35
16
36
constructor ( socket ) {
@@ -19,66 +39,87 @@ class SendQueue {
19
39
20
40
add ( item , cb , hint ) {
21
41
if ( hint !== sendHints . blob ) {
22
- const data = clone ( item , hint )
23
-
24
- if ( this . #size === 0 ) {
25
- this . #dispatch ( data , cb , hint )
42
+ const frame = createFrame ( item , hint )
43
+ if ( ! this . #running ) {
44
+ // fast-path
45
+ this . #socket . write ( frame , cb )
26
46
} else {
27
- this . #queued. add ( [ data , cb , true , hint ] )
28
- this . #size++
29
-
30
- this . #run( )
47
+ /** @type {SendQueueNode } */
48
+ const node = {
49
+ next : null ,
50
+ promise : null ,
51
+ callback : cb ,
52
+ frame,
53
+ resolved : true
54
+ }
55
+ if ( this . #tail !== null ) {
56
+ this . #tail. next = node
57
+ }
58
+ this . #tail = node
31
59
}
32
-
33
60
return
34
61
}
35
62
36
- const promise = item . arrayBuffer ( )
37
- const queue = [ null , cb , false , hint ]
38
- promise . then ( ( ab ) => {
39
- queue [ 0 ] = clone ( ab , hint )
40
- queue [ 2 ] = true
41
-
42
- this . #run( )
43
- } )
44
-
45
- this . #queued. add ( queue )
46
- this . #size++
47
- }
48
-
49
- #run ( ) {
50
- for ( const queued of this . #queued) {
51
- const [ data , cb , done , hint ] = queued
63
+ /** @type {SendQueueNode } */
64
+ const node = {
65
+ next : null ,
66
+ promise : item . arrayBuffer ( ) . then ( ( ab ) => {
67
+ node . resolved = true
68
+ node . frame = createFrame ( ab , hint )
69
+ } ) ,
70
+ callback : cb ,
71
+ frame : null ,
72
+ resolved : false
73
+ }
52
74
53
- if ( ! done ) return
75
+ if ( this . #tail === null ) {
76
+ this . #tail = node
77
+ }
54
78
55
- this . #queued. delete ( queued )
56
- this . #size--
79
+ if ( this . #head === null ) {
80
+ this . #head = node
81
+ }
57
82
58
- this . #dispatch( data , cb , hint )
83
+ if ( ! this . #running) {
84
+ this . #run( )
59
85
}
60
86
}
61
87
62
- #dispatch ( data , cb , hint ) {
63
- const frame = new WebsocketFrameSend ( )
64
- const opcode = hint === sendHints . string ? opcodes . TEXT : opcodes . BINARY
65
-
66
- frame . frameData = data
67
- const buffer = frame . createFrame ( opcode )
68
-
69
- this . #socket. write ( buffer , cb )
88
+ async #run ( ) {
89
+ this . #running = true
90
+ /** @type {SendQueueNode | null } */
91
+ let node = this . #head
92
+ while ( node !== null ) {
93
+ // wait pending promise
94
+ if ( ! node . resolved ) {
95
+ await node . promise
96
+ }
97
+ // write
98
+ this . #socket. write ( node . frame , node . callback )
99
+ // cleanup
100
+ node . callback = node . frame = node . promise = null
101
+ // set next
102
+ node = node . next
103
+ }
104
+ this . #head = null
105
+ this . #tail = null
106
+ this . #running = false
70
107
}
71
108
}
72
109
73
- function clone ( data , hint ) {
110
+ function createFrame ( data , hint ) {
111
+ return new WebsocketFrameSend ( toBuffer ( data , hint ) ) . createFrame ( hint === sendHints . string ? opcodes . TEXT : opcodes . BINARY )
112
+ }
113
+
114
+ function toBuffer ( data , hint ) {
74
115
switch ( hint ) {
75
116
case sendHints . string :
76
117
return Buffer . from ( data )
77
118
case sendHints . arrayBuffer :
78
119
case sendHints . blob :
79
120
return new FastBuffer ( data )
80
121
case sendHints . typedArray :
81
- return Buffer . copyBytesFrom ( data )
122
+ return new FastBuffer ( data . buffer , data . byteOffset , data . byteLength )
82
123
}
83
124
}
84
125
0 commit comments