@@ -217,6 +217,8 @@ @implementation RCTSRWebSocket
217
217
int _closeCode;
218
218
219
219
BOOL _isPumping;
220
+
221
+ BOOL _cleanupScheduled;
220
222
221
223
NSMutableSet <NSArray *> *_scheduledRunloops;
222
224
@@ -324,17 +326,11 @@ - (void)dealloc
324
326
325
327
[_inputStream close ];
326
328
[_outputStream close ];
327
-
328
- _workQueue = NULL ;
329
-
329
+
330
330
if (_receivedHTTPHeaders) {
331
331
CFRelease (_receivedHTTPHeaders);
332
332
_receivedHTTPHeaders = NULL ;
333
333
}
334
-
335
- if (_delegateDispatchQueue) {
336
- _delegateDispatchQueue = NULL ;
337
- }
338
334
}
339
335
340
336
#ifndef NDEBUG
@@ -626,11 +622,11 @@ - (void)_failWithError:(NSError *)error;
626
622
}];
627
623
628
624
self.readyState = RCTSR_CLOSED;
629
- self->_selfRetain = nil ;
630
-
625
+
631
626
RCTSRLog (@" Failing with error %@ " , error.localizedDescription );
632
627
633
628
[self _disconnect ];
629
+ [self _scheduleCleanup ];
634
630
}
635
631
});
636
632
}
@@ -1036,12 +1032,7 @@ - (void)_pumpWriting;
1036
1032
!_sentClose) {
1037
1033
_sentClose = YES ;
1038
1034
1039
- [_outputStream close ];
1040
- [_inputStream close ];
1041
-
1042
- for (NSArray *runLoop in [_scheduledRunloops copy ]) {
1043
- [self unscheduleFromRunLoop: runLoop[0 ] forMode: runLoop[1 ]];
1044
- }
1035
+ [self _scheduleCleanup ];
1045
1036
1046
1037
if (!_failed) {
1047
1038
[self _performDelegateBlock: ^{
@@ -1050,8 +1041,6 @@ - (void)_pumpWriting;
1050
1041
}
1051
1042
}];
1052
1043
}
1053
-
1054
- _selfRetain = nil ;
1055
1044
}
1056
1045
}
1057
1046
@@ -1345,94 +1334,142 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
1345
1334
}
1346
1335
}
1347
1336
}
1348
-
1349
- assert (_workQueue != NULL );
1337
+
1338
+ // _workQueue cannot be NULL
1339
+ if (!_workQueue) {
1340
+ return ;
1341
+ }
1342
+ __weak typeof (self) weakSelf = self;
1350
1343
dispatch_async (_workQueue, ^{
1351
- switch (eventCode) {
1352
- case NSStreamEventOpenCompleted: {
1353
- RCTSRLog (@" NSStreamEventOpenCompleted %@ " , aStream);
1354
- if (self.readyState >= RCTSR_CLOSING) {
1355
- return ;
1356
- }
1357
- assert (self->_readBuffer );
1358
-
1359
- if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream ) {
1360
- [self didConnect ];
1361
- }
1362
- [self _pumpWriting ];
1363
- [self _pumpScanner ];
1364
- break ;
1365
- }
1366
-
1367
- case NSStreamEventErrorOccurred: {
1368
- RCTSRLog (@" NSStreamEventErrorOccurred %@ %@ " , aStream, [aStream.streamError copy ]);
1369
- // TODO: specify error better!
1370
- [self _failWithError: aStream.streamError];
1371
- self->_readBufferOffset = 0 ;
1372
- self->_readBuffer .length = 0 ;
1373
- break ;
1344
+ typeof (self) strongSelf = weakSelf;
1345
+ if (!strongSelf) {
1346
+ return ;
1347
+ }
1348
+ [strongSelf safeHandleEvent: eventCode stream: aStream];
1349
+ });
1350
+ }
1374
1351
1352
+ - (void )safeHandleEvent : (NSStreamEvent )eventCode stream : (NSStream *)aStream
1353
+ {
1354
+ switch (eventCode) {
1355
+ case NSStreamEventOpenCompleted: {
1356
+ RCTSRLog (@" NSStreamEventOpenCompleted %@ " , aStream);
1357
+ if (self.readyState >= RCTSR_CLOSING) {
1358
+ return ;
1375
1359
}
1376
-
1377
- case NSStreamEventEndEncountered: {
1378
- [self _pumpScanner ];
1379
- RCTSRLog (@" NSStreamEventEndEncountered %@ " , aStream);
1380
- if (aStream.streamError ) {
1381
- [self _failWithError: aStream.streamError];
1382
- } else {
1383
- dispatch_async (self->_workQueue , ^{
1384
- if (self.readyState != RCTSR_CLOSED) {
1385
- self.readyState = RCTSR_CLOSED;
1386
- self->_selfRetain = nil ;
1387
- }
1388
-
1389
- if (!self->_sentClose && !self->_failed ) {
1390
- self->_sentClose = YES ;
1391
- // If we get closed in this state it's probably not clean because we should be sending this when we send messages
1392
- [self _performDelegateBlock: ^{
1393
- if ([self .delegate respondsToSelector: @selector (webSocket:didCloseWithCode:reason:wasClean: )]) {
1394
- [self .delegate webSocket: self didCloseWithCode: RCTSRStatusCodeGoingAway reason: @" Stream end encountered" wasClean: NO ];
1395
- }
1396
- }];
1397
- }
1398
- });
1399
- }
1400
-
1401
- break ;
1360
+ assert (self->_readBuffer );
1361
+
1362
+ if (self.readyState == RCTSR_CONNECTING && aStream == self->_inputStream ) {
1363
+ [self didConnect ];
1402
1364
}
1403
-
1404
- case NSStreamEventHasBytesAvailable: {
1405
- RCTSRLog (@" NSStreamEventHasBytesAvailable %@ " , aStream);
1406
- const int bufferSize = 2048 ;
1407
- uint8_t buffer[bufferSize];
1408
-
1409
- while (self->_inputStream .hasBytesAvailable ) {
1410
- NSInteger bytes_read = [self ->_inputStream read :buffer maxLength: bufferSize];
1411
-
1412
- if (bytes_read > 0 ) {
1413
- [self ->_readBuffer appendBytes: buffer length: bytes_read];
1414
- } else if (bytes_read < 0 ) {
1415
- [self _failWithError: self ->_inputStream.streamError];
1365
+ [self _pumpWriting ];
1366
+ [self _pumpScanner ];
1367
+ break ;
1368
+ }
1369
+
1370
+ case NSStreamEventErrorOccurred: {
1371
+ RCTSRLog (@" NSStreamEventErrorOccurred %@ %@ " , aStream, [aStream.streamError copy ]);
1372
+ // TODO: specify error better!
1373
+ [self _failWithError: aStream.streamError];
1374
+ self->_readBufferOffset = 0 ;
1375
+ self->_readBuffer .length = 0 ;
1376
+ break ;
1377
+
1378
+ }
1379
+
1380
+ case NSStreamEventEndEncountered: {
1381
+ [self _pumpScanner ];
1382
+ RCTSRLog (@" NSStreamEventEndEncountered %@ " , aStream);
1383
+ if (aStream.streamError ) {
1384
+ [self _failWithError: aStream.streamError];
1385
+ } else {
1386
+ dispatch_async (self->_workQueue , ^{
1387
+ if (self.readyState != RCTSR_CLOSED) {
1388
+ self.readyState = RCTSR_CLOSED;
1389
+ [self _scheduleCleanup ];
1416
1390
}
1417
-
1418
- if (bytes_read != bufferSize) {
1419
- break ;
1391
+
1392
+ if (!self->_sentClose && !self->_failed ) {
1393
+ self->_sentClose = YES ;
1394
+ // If we get closed in this state it's probably not clean because we should be sending this when we send messages
1395
+ [self _performDelegateBlock: ^{
1396
+ if ([self .delegate respondsToSelector: @selector (webSocket:didCloseWithCode:reason:wasClean: )]) {
1397
+ [self .delegate webSocket: self didCloseWithCode: RCTSRStatusCodeGoingAway reason: @" Stream end encountered" wasClean: NO ];
1398
+ }
1399
+ }];
1420
1400
}
1421
- };
1422
- [self _pumpScanner ];
1423
- break ;
1401
+ });
1424
1402
}
1403
+
1404
+ break ;
1405
+ }
1406
+
1407
+ case NSStreamEventHasBytesAvailable: {
1408
+ RCTSRLog (@" NSStreamEventHasBytesAvailable %@ " , aStream);
1409
+ const int bufferSize = 2048 ;
1410
+ uint8_t buffer[bufferSize];
1411
+
1412
+ while (self->_inputStream .hasBytesAvailable ) {
1413
+ NSInteger bytes_read = [self ->_inputStream read :buffer maxLength: bufferSize];
1414
+
1415
+ if (bytes_read > 0 ) {
1416
+ [self ->_readBuffer appendBytes: buffer length: bytes_read];
1417
+ } else if (bytes_read < 0 ) {
1418
+ [self _failWithError: self ->_inputStream.streamError];
1419
+ }
1420
+
1421
+ if (bytes_read != bufferSize) {
1422
+ break ;
1423
+ }
1424
+ };
1425
+ [self _pumpScanner ];
1426
+ break ;
1427
+ }
1428
+
1429
+ case NSStreamEventHasSpaceAvailable: {
1430
+ RCTSRLog (@" NSStreamEventHasSpaceAvailable %@ " , aStream);
1431
+ [self _pumpWriting ];
1432
+ break ;
1433
+ }
1434
+
1435
+ default :
1436
+ RCTSRLog (@" (default) %@ " , aStream);
1437
+ break ;
1438
+ }
1439
+ }
1425
1440
1426
- case NSStreamEventHasSpaceAvailable: {
1427
- RCTSRLog (@" NSStreamEventHasSpaceAvailable %@ " , aStream);
1428
- [self _pumpWriting ];
1429
- break ;
1430
- }
1441
+ - (void )_scheduleCleanup
1442
+ {
1443
+ if (_cleanupScheduled) {
1444
+ return ;
1445
+ }
1446
+
1447
+ _cleanupScheduled = YES ;
1448
+
1449
+ // Cleanup NSStream's delegate in the same RunLoop used by the streams themselves:
1450
+ // This way we'll prevent race conditions between handleEvent and SRWebsocket's dealloc
1451
+ NSTimer *timer = [NSTimer timerWithTimeInterval: (0 .0f ) target: self selector: @selector (_cleanupSelfReference: ) userInfo: nil repeats: NO ];
1452
+ [[NSRunLoop RCTSR_networkRunLoop ] addTimer: timer forMode: NSDefaultRunLoopMode ];
1453
+ }
1431
1454
1432
- default :
1433
- RCTSRLog (@" (default) %@ " , aStream);
1434
- break ;
1435
- }
1455
+ - (void )_cleanupSelfReference : (NSTimer *)timer
1456
+ {
1457
+ // Remove the streams, right now, from the networkRunLoop
1458
+ [_inputStream close ];
1459
+ [_outputStream close ];
1460
+
1461
+ // Unschedule from RunLoop
1462
+ for (NSArray *runLoop in [_scheduledRunloops copy ]) {
1463
+ [self unscheduleFromRunLoop: runLoop[0 ] forMode: runLoop[1 ]];
1464
+ }
1465
+
1466
+ // Nuke NSStream's delegate
1467
+ _inputStream.delegate = nil ;
1468
+ _outputStream.delegate = nil ;
1469
+
1470
+ // Cleanup selfRetain in the same GCD queue as usual
1471
+ dispatch_async (_workQueue, ^{
1472
+ self->_selfRetain = nil ;
1436
1473
});
1437
1474
}
1438
1475
0 commit comments