@@ -8,6 +8,25 @@ const assert = require('assert');
8
8
const { once } = require ( 'events' ) ;
9
9
const { setTimeout } = require ( 'timers/promises' ) ;
10
10
11
+ function createDependentPromises ( n ) {
12
+ const promiseAndResolveArray = [ ] ;
13
+
14
+ for ( let i = 0 ; i < n ; i ++ ) {
15
+ let res ;
16
+ const promise = new Promise ( ( resolve ) => {
17
+ if ( i === 0 ) {
18
+ res = resolve ;
19
+ return ;
20
+ }
21
+ res = ( ) => promiseAndResolveArray [ i - 1 ] [ 0 ] . then ( resolve ) ;
22
+ } ) ;
23
+
24
+ promiseAndResolveArray . push ( [ promise , res ] ) ;
25
+ }
26
+
27
+ return promiseAndResolveArray ;
28
+ }
29
+
11
30
{
12
31
// Map works on synchronous streams with a synchronous mapper
13
32
const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( ( x ) => x + x ) ;
@@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises');
143
162
const stream = range . map ( common . mustCall ( async ( _ , { signal } ) => {
144
163
await once ( signal , 'abort' ) ;
145
164
throw signal . reason ;
146
- } , 2 ) , { signal : ac . signal , concurrency : 2 } ) ;
165
+ } , 2 ) , { signal : ac . signal , concurrency : 2 , highWaterMark : 0 } ) ;
147
166
// pump
148
167
assert . rejects ( async ( ) => {
149
168
for await ( const item of stream ) {
@@ -173,12 +192,164 @@ const { setTimeout } = require('timers/promises');
173
192
} ) ( ) . then ( common . mustCall ( ) ) ;
174
193
}
175
194
195
+
196
+ {
197
+ // highWaterMark with small concurrency
198
+ const finishOrder = [ ] ;
199
+
200
+ const promises = createDependentPromises ( 4 ) ;
201
+
202
+ const raw = Readable . from ( [ 2 , 0 , 1 , 3 ] ) ;
203
+ const stream = raw . map ( async ( item ) => {
204
+ const [ promise , resolve ] = promises [ item ] ;
205
+ resolve ( ) ;
206
+
207
+ await promise ;
208
+ finishOrder . push ( item ) ;
209
+ return item ;
210
+ } , { concurrency : 2 } ) ;
211
+
212
+ ( async ( ) => {
213
+ await stream . toArray ( ) ;
214
+
215
+ assert . deepStrictEqual ( finishOrder , [ 0 , 1 , 2 , 3 ] ) ;
216
+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
217
+ }
218
+
219
+ {
220
+ // highWaterMark with a lot of items and large concurrency
221
+ const finishOrder = [ ] ;
222
+
223
+ const promises = createDependentPromises ( 20 ) ;
224
+
225
+ const input = [ 10 , 1 , 0 , 3 , 4 , 2 , 5 , 7 , 8 , 9 , 6 , 11 , 12 , 13 , 18 , 15 , 16 , 17 , 14 , 19 ] ;
226
+ const raw = Readable . from ( input ) ;
227
+ // Should be
228
+ // 10, 1, 0, 3, 4, 2 | next: 0
229
+ // 10, 1, 3, 4, 2, 5 | next: 1
230
+ // 10, 3, 4, 2, 5, 7 | next: 2
231
+ // 10, 3, 4, 5, 7, 8 | next: 3
232
+ // 10, 4, 5, 7, 8, 9 | next: 4
233
+ // 10, 5, 7, 8, 9, 6 | next: 5
234
+ // 10, 7, 8, 9, 6, 11 | next: 6
235
+ // 10, 7, 8, 9, 11, 12 | next: 7
236
+ // 10, 8, 9, 11, 12, 13 | next: 8
237
+ // 10, 9, 11, 12, 13, 18 | next: 9
238
+ // 10, 11, 12, 13, 18, 15 | next: 10
239
+ // 11, 12, 13, 18, 15, 16 | next: 11
240
+ // 12, 13, 18, 15, 16, 17 | next: 12
241
+ // 13, 18, 15, 16, 17, 14 | next: 13
242
+ // 18, 15, 16, 17, 14, 19 | next: 14
243
+ // 18, 15, 16, 17, 19 | next: 15
244
+ // 18, 16, 17, 19 | next: 16
245
+ // 18, 17, 19 | next: 17
246
+ // 18, 19 | next: 18
247
+ // 19 | next: 19
248
+ //
249
+
250
+ const stream = raw . map ( async ( item ) => {
251
+ const [ promise , resolve ] = promises [ item ] ;
252
+ resolve ( ) ;
253
+
254
+ await promise ;
255
+ finishOrder . push ( item ) ;
256
+ return item ;
257
+ } , { concurrency : 6 } ) ;
258
+
259
+ ( async ( ) => {
260
+ const outputOrder = await stream . toArray ( ) ;
261
+
262
+ assert . deepStrictEqual ( outputOrder , input ) ;
263
+ assert . deepStrictEqual ( finishOrder , [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 ] ) ;
264
+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
265
+ }
266
+
267
+ {
268
+ // Custom highWaterMark with a lot of items and large concurrency
269
+ const finishOrder = [ ] ;
270
+
271
+ const promises = createDependentPromises ( 20 ) ;
272
+
273
+ const input = [ 11 , 1 , 0 , 3 , 4 , 2 , 5 , 7 , 8 , 9 , 6 , 10 , 12 , 13 , 18 , 15 , 16 , 17 , 14 , 19 ] ;
274
+ const raw = Readable . from ( input ) ;
275
+ // Should be
276
+ // 11, 1, 0, 3, 4 | next: 0, buffer: []
277
+ // 11, 1, 3, 4, 2 | next: 1, buffer: [0]
278
+ // 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
279
+ // 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
280
+ // 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
281
+ // 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
282
+ // 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
283
+ // 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
284
+ // 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
285
+ // 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
286
+ // 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
287
+ // 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
288
+ // 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
289
+ // 13, 18, 15, 16, 17 | next: 13, buffer: []
290
+ // 18, 15, 16, 17, 14 | next: 14, buffer: []
291
+ // 18, 15, 16, 17, 19 | next: 15, buffer: [14]
292
+ // 18, 16, 17, 19 | next: 16, buffer: [14, 15]
293
+ // 18, 17, 19 | next: 17, buffer: [14, 15, 16]
294
+ // 18, 19 | next: 18, buffer: [14, 15, 16, 17]
295
+ // 19 | next: 19, buffer: [] -- all items flushed
296
+ //
297
+
298
+ const stream = raw . map ( async ( item ) => {
299
+ const [ promise , resolve ] = promises [ item ] ;
300
+ resolve ( ) ;
301
+
302
+ await promise ;
303
+ finishOrder . push ( item ) ;
304
+ return item ;
305
+ } , { concurrency : 5 , highWaterMark : 7 } ) ;
306
+
307
+ ( async ( ) => {
308
+ const outputOrder = await stream . toArray ( ) ;
309
+
310
+ assert . deepStrictEqual ( outputOrder , input ) ;
311
+ assert . deepStrictEqual ( finishOrder , [ 0 , 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 , 11 , 12 , 13 , 14 , 15 , 16 , 17 , 18 , 19 ] ) ;
312
+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
313
+ }
314
+
315
+ {
316
+ // Where there is a delay between the first and the next item it should not wait for filled queue
317
+ // before yielding to the user
318
+ const promises = createDependentPromises ( 3 ) ;
319
+
320
+ const raw = Readable . from ( [ 0 , 1 , 2 ] ) ;
321
+
322
+ const stream = raw
323
+ . map ( async ( item ) => {
324
+ if ( item !== 0 ) {
325
+ await promises [ item ] [ 0 ] ;
326
+ }
327
+
328
+ return item ;
329
+ } , { concurrency : 2 } )
330
+ . map ( ( item ) => {
331
+ // eslint-disable-next-line no-unused-vars
332
+ for ( const [ _ , resolve ] of promises ) {
333
+ resolve ( ) ;
334
+ }
335
+
336
+ return item ;
337
+ } ) ;
338
+
339
+ ( async ( ) => {
340
+ await stream . toArray ( ) ;
341
+ } ) ( ) . then ( common . mustCall ( ) , common . mustNotCall ( ) ) ;
342
+ }
343
+
176
344
{
177
345
// Error cases
178
346
assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( 1 ) , / E R R _ I N V A L I D _ A R G _ T Y P E / ) ;
179
347
assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , {
180
348
concurrency : 'Foo'
181
349
} ) , / E R R _ O U T _ O F _ R A N G E / ) ;
350
+ assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , {
351
+ concurrency : - 1
352
+ } ) , / E R R _ O U T _ O F _ R A N G E / ) ;
182
353
assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , 1 ) , / E R R _ I N V A L I D _ A R G _ T Y P E / ) ;
183
354
assert . throws ( ( ) => Readable . from ( [ 1 ] ) . map ( ( x ) => x , { signal : true } ) , / E R R _ I N V A L I D _ A R G _ T Y P E / ) ;
184
355
}
0 commit comments