From c8e77c3d3b55fe1b3c9017f9821ace1f360b990d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 20 Oct 2022 00:21:05 +0200 Subject: [PATCH] Split message receiver for internal and external signaling servers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Note that the thread used to handle and notify messages from the external signaling server does not change; the EventBus subscriber mode was "BACKGROUND", but as the message was posted from a WebSocket handler, which runs in a worker thread rather than in the main thread, the subscriber was executed in the same thread as the poster. Signed-off-by: Daniel Calviño Sánchez --- .../talk/activities/CallActivity.java | 22 +++++++++------- .../talk/webrtc/MagicWebSocketInstance.java | 26 +++++++++++++++---- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java index baa99b302..24078e5d2 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java @@ -264,7 +264,8 @@ public class CallActivity extends CallBaseActivity { private SpotlightView spotlightView; - private CallActivitySignalingMessageReceiver signalingMessageReceiver = new CallActivitySignalingMessageReceiver(); + private InternalSignalingMessageReceiver internalSignalingMessageReceiver = new InternalSignalingMessageReceiver(); + private SignalingMessageReceiver signalingMessageReceiver; private Map callParticipantMessageListeners = new HashMap<>(); @@ -532,8 +533,6 @@ public class CallActivity extends CallBaseActivity { sdpConstraints.optional.add(new MediaConstraints.KeyValuePair("internalSctpDataChannels", "true")); sdpConstraints.optional.add(new MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true")); - signalingMessageReceiver.addListener(offerMessageListener); - if (!isVoiceOnlyCall) { cameraInitialization(); } @@ -1350,6 +1349,8 @@ public class CallActivity extends CallBaseActivity { if (hasExternalSignalingServer) { setupAndInitiateWebSocketsConnection(); } else { + signalingMessageReceiver = internalSignalingMessageReceiver; + signalingMessageReceiver.addListener(offerMessageListener); joinRoomAndCall(); } } @@ -1548,6 +1549,10 @@ public class CallActivity extends CallBaseActivity { externalSignalingServer.getExternalSignalingServer(), conversationUser, externalSignalingServer.getExternalSignalingTicket(), TextUtils.isEmpty(credentials)); + // Although setupAndInitiateWebSocketsConnection could be called several times the web socket is + // initialized just once, so the message receiver is also initialized just once. + signalingMessageReceiver = webSocketClient.getSignalingMessageReceiver(); + signalingMessageReceiver.addListener(offerMessageListener); } else { if (webSocketClient.isConnected() && currentCallStatus == CallStatus.PUBLISHER_FAILED) { webSocketClient.restartWebSocket(); @@ -1622,11 +1627,6 @@ public class CallActivity extends CallBaseActivity { } break; - case "signalingMessage": - Log.d(TAG, "onMessageEvent 'signalingMessage'"); - signalingMessageReceiver.process((NCSignalingMessage) webSocketClient.getJobWithId( - Integer.valueOf(webSocketCommunicationEvent.getHashMap().get("jobId")))); - break; case "peerReadyForRequestingOffer": Log.d(TAG, "onMessageEvent 'peerReadyForRequestingOffer'"); webSocketClient.requestOfferForSessionIdWithType( @@ -1670,7 +1670,7 @@ public class CallActivity extends CallBaseActivity { } else if ("message".equals(messageType)) { NCSignalingMessage ncSignalingMessage = LoganSquare.parse(signaling.getMessageWrapper().toString(), NCSignalingMessage.class); - signalingMessageReceiver.process(ncSignalingMessage); + internalSignalingMessageReceiver.process(ncSignalingMessage); } else { Log.e(TAG, "unexpected message type when receiving signaling message"); } @@ -2636,8 +2636,10 @@ public class CallActivity extends CallBaseActivity { /** * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted from CallActivity. + * + * All listeners are called in the main thread. */ - private static class CallActivitySignalingMessageReceiver extends SignalingMessageReceiver { + private static class InternalSignalingMessageReceiver extends SignalingMessageReceiver { public void process(NCSignalingMessage message) { processSignalingMessage(message); } diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/MagicWebSocketInstance.java b/app/src/main/java/com/nextcloud/talk/webrtc/MagicWebSocketInstance.java index bd1a9d429..956a28346 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/MagicWebSocketInstance.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/MagicWebSocketInstance.java @@ -40,6 +40,7 @@ import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage; import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage; import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage; import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage; +import com.nextcloud.talk.signaling.SignalingMessageReceiver; import com.nextcloud.talk.utils.MagicMap; import com.nextcloud.talk.utils.bundle.BundleKeys; @@ -109,6 +110,8 @@ public class MagicWebSocketInstance extends WebSocketListener { private List messagesQueue = new ArrayList<>(); + private final ExternalSignalingMessageReceiver signalingMessageReceiver = new ExternalSignalingMessageReceiver(); + MagicWebSocketInstance(User conversationUser, String connectionUrl, String webSocketTicket) { NextcloudTalkApplication.Companion.getSharedApplication().getComponentApplication().inject(this); @@ -326,11 +329,7 @@ public class MagicWebSocketInstance extends WebSocketListener { ncSignalingMessage.setFrom(callOverallWebSocketMessage.getCallWebSocketMessage().getSenderWebSocketMessage().getSessionId()); } - if (!TextUtils.isEmpty(ncSignalingMessage.getFrom())) { - HashMap messageHashMap = new HashMap<>(); - messageHashMap.put(JOB_ID, Integer.toString(magicMap.add(ncSignalingMessage))); - eventBus.post(new WebSocketCommunicationEvent("signalingMessage", messageHashMap)); - } + signalingMessageReceiver.process(ncSignalingMessage); break; case "bye": connected = false; @@ -471,4 +470,21 @@ public class MagicWebSocketInstance extends WebSocketListener { restartWebSocket(); } } + + public SignalingMessageReceiver getSignalingMessageReceiver() { + return signalingMessageReceiver; + } + + /** + * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling + * class. + * + * All listeners are called in the WebSocket reader thread. This thread should be the same as long as the + * WebSocket stays connected, but it may change whenever it is connected again. + */ + private static class ExternalSignalingMessageReceiver extends SignalingMessageReceiver { + public void process(NCSignalingMessage message) { + processSignalingMessage(message); + } + } }