Use a dispatch queue for websocket operations

Summary:
This diff ensures that all operations on the socket are put into a serial background queue, including delegate callbacks.

All operations are executed asynchronously except disconnect, which is made synchronous as to guarantee no resources are accessed after the call.

Reviewed By: fabiomassimo

Differential Revision: D35254499

fbshipit-source-id: 33d93926f7bfc8948095c59f12ca31f0a932b8ae
This commit is contained in:
Lorenzo Blasa
2022-04-12 02:30:02 -07:00
committed by Facebook GitHub Bot
parent 2bafe32f2a
commit f1fe66afd9
3 changed files with 141 additions and 72 deletions

View File

@@ -40,11 +40,12 @@ NS_ASSUME_NONNULL_BEGIN
/// Send a message to the endpoint. /// Send a message to the endpoint.
/// @param message The message as text to be sent to the endpoint. /// @param message The message as text to be sent to the endpoint.
/// @param error A pointer to variable for an `NSError` object. /// @param completionHandler A completion handler for the send operation.
/// If an error occurs, the pointer is set to an `NSError` object containing /// If an error occurs, the handler will be called with an `NSError` object
/// information about the error. You may specify `nil` to ignore the error /// containing information about the error. You may specify `nil` to ignore the
/// information. /// error information.
- (void)send:(NSString*)message error:(NSError**)error; - (void)send:(NSString*)message
withCompletionHandler:(void (^_Nullable)(NSError*))completionHandler;
@end @end

View File

@@ -113,6 +113,8 @@ static constexpr int connectionKeepaliveSeconds = 10;
#pragma mark - FlipperPlatformWebSocket #pragma mark - FlipperPlatformWebSocket
@interface FlipperPlatformWebSocket ()<SRWebSocketDelegate> { @interface FlipperPlatformWebSocket ()<SRWebSocketDelegate> {
NSOperationQueue* _dispatchQueue;
NSURL* _url; NSURL* _url;
NSTimer* _keepAlive; NSTimer* _keepAlive;
@@ -130,6 +132,8 @@ static constexpr int connectionKeepaliveSeconds = 10;
if (self) { if (self) {
_url = url; _url = url;
_policy = [FlipperClientCertificateSecurityPolicy new]; _policy = [FlipperClientCertificateSecurityPolicy new];
_dispatchQueue = [[NSOperationQueue alloc] init];
_dispatchQueue.maxConcurrentOperationCount = 1;
} }
return self; return self;
@@ -140,6 +144,10 @@ static constexpr int connectionKeepaliveSeconds = 10;
return; return;
} }
__weak auto weakSelf = self;
[_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
// Before attempting to establish a connection, check if // Before attempting to establish a connection, check if
// there is a process listening at the specified port. // there is a process listening at the specified port.
// CFNetwork seems to be quite verbose when the host cannot be reached // CFNetwork seems to be quite verbose when the host cannot be reached
@@ -152,7 +160,10 @@ static constexpr int connectionKeepaliveSeconds = 10;
hints.ai_flags = AI_PASSIVE; hints.ai_flags = AI_PASSIVE;
struct addrinfo* address; struct addrinfo* address;
getaddrinfo( getaddrinfo(
_url.host.UTF8String, _url.port.stringValue.UTF8String, &hints, &address); strongSelf->_url.host.UTF8String,
strongSelf->_url.port.stringValue.UTF8String,
&hints,
&address);
int sfd = int sfd =
socket(address->ai_family, address->ai_socktype, address->ai_protocol); socket(address->ai_family, address->ai_socktype, address->ai_protocol);
@@ -187,39 +198,50 @@ static constexpr int connectionKeepaliveSeconds = 10;
close(sfd); close(sfd);
if (!listening) { if (!listening) {
_eventHandler(facebook::flipper::SocketEvent::ERROR); strongSelf->_eventHandler(facebook::flipper::SocketEvent::ERROR);
return; return;
} }
self.socket = [[SRWebSocket alloc] initWithURL:_url securityPolicy:_policy]; strongSelf->_socket = [[SRWebSocket alloc] initWithURL:self->_url
[_socket setDelegate:self]; securityPolicy:self->_policy];
[_socket open]; strongSelf->_socket.delegate = self;
[strongSelf->_socket open];
}];
} }
- (void)disconnect { - (void)disconnect {
[_dispatchQueue cancelAllOperations];
if ([_keepAlive isValid]) { if ([_keepAlive isValid]) {
[_keepAlive invalidate]; [_keepAlive invalidate];
} }
_keepAlive = nil; _keepAlive = nil;
if (_socket) {
// Manually trigger a 'close' event as SocketRocket close method will // Manually trigger a 'close' event as SocketRocket close method will
// not notify the delegate. SocketRocket only triggers the close event // not notify the delegate. SocketRocket only triggers the close event
// when the connection is closed from the server. // when the connection is closed from the server.
_eventHandler(facebook::flipper::SocketEvent::CLOSE); _eventHandler(facebook::flipper::SocketEvent::CLOSE);
if (_socket) {
// Clear the socket delegate before close. Ensures that we won't get // Clear the socket delegate before close. Ensures that we won't get
// any messages after the disconnect takes place. // any messages after the disconnect takes place.
_socket.delegate = nil; _socket.delegate = nil;
[_socket close]; [_socket close];
_socket = nil; _socket = nil;
} };
} }
- (void)send:(NSString*)message error:(NSError**)error { - (void)send:(NSString*)message
[_socket sendString:message error:error]; withCompletionHandler:(void (^_Nullable)(NSError*))completionHandler {
if (error != nil && *error) { __weak auto weakSelf = self;
facebook::flipper::log("Unable to send message."); [_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
NSError* error = nil;
[strongSelf->_socket sendString:message error:&error];
if (completionHandler) {
completionHandler(error);
} }
}];
} }
- (void)setCertificateProvider: - (void)setCertificateProvider:
@@ -229,10 +251,15 @@ static constexpr int connectionKeepaliveSeconds = 10;
} }
- (void)sendScheduledKeepAlive:(NSTimer*)timer { - (void)sendScheduledKeepAlive:(NSTimer*)timer {
[_socket sendPing:nil error:nil]; __weak auto weakSelf = self;
[_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
[strongSelf->_socket sendPing:nil error:nil];
}];
} }
- (void)webSocketDidOpen:(SRWebSocket*)webSocket { #pragma mark - Web Socket internal handlers
- (void)_webSocketDidOpen {
_eventHandler(facebook::flipper::SocketEvent::OPEN); _eventHandler(facebook::flipper::SocketEvent::OPEN);
if (!_keepAlive) { if (!_keepAlive) {
@@ -247,7 +274,7 @@ static constexpr int connectionKeepaliveSeconds = 10;
} }
} }
- (void)webSocket:(SRWebSocket*)webSocket didFailWithError:(NSError*)error { - (void)_webSocketDidFailWithError:(NSError*)error {
/** Check for the error domain and code. Need to filter out SSL handshake /** Check for the error domain and code. Need to filter out SSL handshake
errors and dispatch them accordingly. CFNetwork SSLHandshake failed: errors and dispatch them accordingly. CFNetwork SSLHandshake failed:
- Domain: NSOSStatusErrorDomain - Domain: NSOSStatusErrorDomain
@@ -261,21 +288,59 @@ static constexpr int connectionKeepaliveSeconds = 10;
_socket = nil; _socket = nil;
} }
- (void)webSocket:(SRWebSocket*)webSocket - (void)_webSocketDidClose {
didCloseWithCode:(NSInteger)code if ([_keepAlive isValid]) {
reason:(NSString*)reason [_keepAlive invalidate];
wasClean:(BOOL)wasClean { }
_keepAlive = nil;
_eventHandler(facebook::flipper::SocketEvent::CLOSE); _eventHandler(facebook::flipper::SocketEvent::CLOSE);
_socket = nil; _socket = nil;
} }
- (void)webSocket:(SRWebSocket*)webSocket didReceiveMessage:(id)message { - (void)_webSocketDidReceiveMessage:(id)message {
if (message && _messageHandler) { if (message && _messageHandler) {
NSString* response = message; NSString* response = message;
_messageHandler([response UTF8String]); _messageHandler([response UTF8String]);
} }
} }
#pragma mark - Web Socket
- (void)webSocketDidOpen:(SRWebSocket*)webSocket {
__weak auto weakSelf = self;
[_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
[strongSelf _webSocketDidOpen];
}];
}
- (void)webSocket:(SRWebSocket*)webSocket didFailWithError:(NSError*)error {
__weak auto weakSelf = self;
[_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
[strongSelf _webSocketDidFailWithError:error];
}];
}
- (void)webSocket:(SRWebSocket*)webSocket
didCloseWithCode:(NSInteger)code
reason:(NSString*)reason
wasClean:(BOOL)wasClean {
__weak auto weakSelf = self;
[_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
[strongSelf _webSocketDidClose];
}];
}
- (void)webSocket:(SRWebSocket*)webSocket didReceiveMessage:(id)message {
__weak auto weakSelf = self;
[_dispatchQueue addOperationWithBlock:^{
__strong auto strongSelf = weakSelf;
[strongSelf _webSocketDidReceiveMessage:message];
}];
}
@end @end
#endif #endif

View File

@@ -158,8 +158,10 @@ void FlipperWebSocket::send(
return; return;
} }
NSString* messageObjc = [NSString stringWithUTF8String:message.c_str()]; NSString* messageObjc = [NSString stringWithUTF8String:message.c_str()];
[socket_ send:messageObjc error:NULL]; [socket_ send:messageObjc
withCompletionHandler:^(NSError*) {
completion(); completion();
}];
} }
/** /**
@@ -178,12 +180,13 @@ void FlipperWebSocket::sendExpectResponse(
[socket_ setMessageHandler:^(const std::string& msg) { [socket_ setMessageHandler:^(const std::string& msg) {
completion(msg, false); completion(msg, false);
}]; }];
NSError* error = NULL;
[socket_ send:messageObjc error:&error];
[socket_ send:messageObjc
withCompletionHandler:^(NSError* error) {
if (error != NULL) { if (error != NULL) {
completion(error.description.UTF8String, true); completion(error.description.UTF8String, true);
} }
}];
} }
} // namespace flipper } // namespace flipper