@@ -172,6 +172,13 @@ public final class JSONRPCConnection {
172
172
}
173
173
return ready
174
174
}
175
+
176
+ /// *Public for testing*
177
+ public func _send( _ message: JSONRPCMessage , async : Bool = true ) {
178
+ send ( async : async ) { encoder in
179
+ try encoder. encode ( message)
180
+ }
181
+ }
175
182
176
183
/// Parse and handle all messages in `bytes`, returning a slice containing any remaining incomplete data.
177
184
func parseAndHandleMessages( from bytes: UnsafeBufferPointer < UInt8 > ) -> UnsafeBufferPointer < UInt8 > . SubSequence {
@@ -209,9 +216,7 @@ public final class JSONRPCConnection {
209
216
switch error. messageKind {
210
217
case . request:
211
218
if let id = error. id {
212
- send { encoder in
213
- try encoder. encode ( JSONRPCMessage . errorResponse ( ResponseError ( error) , id: id) )
214
- }
219
+ _send ( . errorResponse( ResponseError ( error) , id: id) )
215
220
continue MESSAGE_LOOP
216
221
}
217
222
case . response:
@@ -229,17 +234,19 @@ public final class JSONRPCConnection {
229
234
continue MESSAGE_LOOP
230
235
}
231
236
case . unknown:
232
- log ( " error decoding message: \( error. message) " , level: . error)
237
+ _send ( . errorResponse( ResponseError ( error) , id: nil ) ,
238
+ async : false ) // synchronous because the following fatalError
233
239
break
234
240
}
235
241
// FIXME: graceful shutdown?
236
- Logger . shared. flush ( )
237
242
fatalError ( " fatal error encountered decoding message \( error) " )
238
243
239
244
} catch {
240
- log ( " error decoding message: \( error. localizedDescription) " , level: . error)
245
+ let responseError = ResponseError ( code: . parseError,
246
+ message: " Failed to decode message. \( error. localizedDescription) " )
247
+ _send ( . errorResponse( responseError, id: nil ) ,
248
+ async : false ) // synchronous because the following fatalError
241
249
// FIXME: graceful shutdown?
242
- Logger . shared. flush ( )
243
250
fatalError ( " fatal error encountered decoding message \( error) " )
244
251
}
245
252
}
@@ -265,16 +272,21 @@ public final class JSONRPCConnection {
265
272
}
266
273
outstanding. replyHandler ( . success( response) )
267
274
case . errorResponse( let error, id: let id) :
275
+ guard let id = id else {
276
+ log ( " Received error response for unknown request: \( error. message) " , level: . error)
277
+ return
278
+ }
268
279
guard let outstanding = outstandingRequests. removeValue ( forKey: id) else {
269
- log ( " Unknown request for \( id) " , level: . error)
280
+ log ( " No outstanding requests for request ID \( id) " , level: . error)
270
281
return
271
282
}
272
283
outstanding. replyHandler ( . failure( error) )
273
284
}
274
285
}
275
286
276
287
/// *Public for testing*.
277
- public func send( _rawData dispatchData: DispatchData ) {
288
+ public func send( _rawData dispatchData: DispatchData ,
289
+ handleCompletion: ( ( ) -> Void ) ? = nil ) {
278
290
guard readyToSend ( ) else { return }
279
291
280
292
sendIO. write ( offset: 0 , data: dispatchData, queue: sendQueue) { [ weak self] done, _, errorCode in
@@ -283,13 +295,16 @@ public final class JSONRPCConnection {
283
295
if done {
284
296
self ? . queue. async {
285
297
self ? . _close ( )
298
+ handleCompletion ? ( )
286
299
}
287
300
}
301
+ } else if done {
302
+ handleCompletion ? ( )
288
303
}
289
304
}
290
305
}
291
306
292
- func send( messageData: Data ) {
307
+ func send( messageData: Data , handleCompletion : ( ( ) -> Void ) ? = nil ) {
293
308
294
309
var dispatchData = DispatchData . empty
295
310
let header = " Content-Length: \( messageData. count) \r \n \r \n "
@@ -300,10 +315,22 @@ public final class JSONRPCConnection {
300
315
dispatchData. append ( rawBufferPointer)
301
316
}
302
317
303
- send ( _rawData: dispatchData)
318
+ send ( _rawData: dispatchData, handleCompletion : handleCompletion )
304
319
}
305
320
306
- func send( encoding: ( JSONEncoder ) throws -> Data ) {
321
+ private func sendMessageSynchronously( _ messageData: Data ,
322
+ timeoutInSeconds seconds: Int ) {
323
+ let synchronizationSemaphore = DispatchSemaphore ( value: 0 )
324
+
325
+ send ( messageData: messageData) {
326
+ synchronizationSemaphore. signal ( )
327
+ }
328
+
329
+ // blocks until timeout expires or message sending completes
330
+ _ = synchronizationSemaphore. wait ( timeout: . now( ) + . seconds( seconds) )
331
+ }
332
+
333
+ func send( async : Bool = true , encoding: ( JSONEncoder ) throws -> Data ) {
307
334
guard readyToSend ( ) else { return }
308
335
309
336
let encoder = JSONEncoder ( )
@@ -317,7 +344,11 @@ public final class JSONRPCConnection {
317
344
fatalError ( " unexpected error while encoding response: \( error) " )
318
345
}
319
346
320
- send ( messageData: data)
347
+ if async {
348
+ send ( messageData: data)
349
+ } else {
350
+ sendMessageSynchronously ( data, timeoutInSeconds: 3 )
351
+ }
321
352
}
322
353
323
354
/// Close the connection.
0 commit comments