Split message receiver for internal and external signaling servers

Note that the thread used to handle and notify messages from the
external signaling server does not change; the EventBus subscriber mode
was "BACKGROUND", but as the message was posted from a WebSocket
handler, which runs in a worker thread rather than in the main thread,
the subscriber was executed in the same thread as the poster.

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2022-10-20 00:21:05 +02:00
parent 9df56dccda
commit c8e77c3d3b
2 changed files with 33 additions and 15 deletions

View File

@ -264,7 +264,8 @@ public class CallActivity extends CallBaseActivity {
private SpotlightView spotlightView; private SpotlightView spotlightView;
private CallActivitySignalingMessageReceiver signalingMessageReceiver = new CallActivitySignalingMessageReceiver(); private InternalSignalingMessageReceiver internalSignalingMessageReceiver = new InternalSignalingMessageReceiver();
private SignalingMessageReceiver signalingMessageReceiver;
private Map<String, SignalingMessageReceiver.CallParticipantMessageListener> callParticipantMessageListeners = private Map<String, SignalingMessageReceiver.CallParticipantMessageListener> callParticipantMessageListeners =
new HashMap<>(); new HashMap<>();
@ -532,8 +533,6 @@ public class CallActivity extends CallBaseActivity {
sdpConstraints.optional.add(new MediaConstraints.KeyValuePair("internalSctpDataChannels", "true")); sdpConstraints.optional.add(new MediaConstraints.KeyValuePair("internalSctpDataChannels", "true"));
sdpConstraints.optional.add(new MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true")); sdpConstraints.optional.add(new MediaConstraints.KeyValuePair("DtlsSrtpKeyAgreement", "true"));
signalingMessageReceiver.addListener(offerMessageListener);
if (!isVoiceOnlyCall) { if (!isVoiceOnlyCall) {
cameraInitialization(); cameraInitialization();
} }
@ -1350,6 +1349,8 @@ public class CallActivity extends CallBaseActivity {
if (hasExternalSignalingServer) { if (hasExternalSignalingServer) {
setupAndInitiateWebSocketsConnection(); setupAndInitiateWebSocketsConnection();
} else { } else {
signalingMessageReceiver = internalSignalingMessageReceiver;
signalingMessageReceiver.addListener(offerMessageListener);
joinRoomAndCall(); joinRoomAndCall();
} }
} }
@ -1548,6 +1549,10 @@ public class CallActivity extends CallBaseActivity {
externalSignalingServer.getExternalSignalingServer(), externalSignalingServer.getExternalSignalingServer(),
conversationUser, externalSignalingServer.getExternalSignalingTicket(), conversationUser, externalSignalingServer.getExternalSignalingTicket(),
TextUtils.isEmpty(credentials)); TextUtils.isEmpty(credentials));
// Although setupAndInitiateWebSocketsConnection could be called several times the web socket is
// initialized just once, so the message receiver is also initialized just once.
signalingMessageReceiver = webSocketClient.getSignalingMessageReceiver();
signalingMessageReceiver.addListener(offerMessageListener);
} else { } else {
if (webSocketClient.isConnected() && currentCallStatus == CallStatus.PUBLISHER_FAILED) { if (webSocketClient.isConnected() && currentCallStatus == CallStatus.PUBLISHER_FAILED) {
webSocketClient.restartWebSocket(); webSocketClient.restartWebSocket();
@ -1622,11 +1627,6 @@ public class CallActivity extends CallBaseActivity {
} }
break; break;
case "signalingMessage":
Log.d(TAG, "onMessageEvent 'signalingMessage'");
signalingMessageReceiver.process((NCSignalingMessage) webSocketClient.getJobWithId(
Integer.valueOf(webSocketCommunicationEvent.getHashMap().get("jobId"))));
break;
case "peerReadyForRequestingOffer": case "peerReadyForRequestingOffer":
Log.d(TAG, "onMessageEvent 'peerReadyForRequestingOffer'"); Log.d(TAG, "onMessageEvent 'peerReadyForRequestingOffer'");
webSocketClient.requestOfferForSessionIdWithType( webSocketClient.requestOfferForSessionIdWithType(
@ -1670,7 +1670,7 @@ public class CallActivity extends CallBaseActivity {
} else if ("message".equals(messageType)) { } else if ("message".equals(messageType)) {
NCSignalingMessage ncSignalingMessage = LoganSquare.parse(signaling.getMessageWrapper().toString(), NCSignalingMessage ncSignalingMessage = LoganSquare.parse(signaling.getMessageWrapper().toString(),
NCSignalingMessage.class); NCSignalingMessage.class);
signalingMessageReceiver.process(ncSignalingMessage); internalSignalingMessageReceiver.process(ncSignalingMessage);
} else { } else {
Log.e(TAG, "unexpected message type when receiving signaling message"); Log.e(TAG, "unexpected message type when receiving signaling message");
} }
@ -2636,8 +2636,10 @@ public class CallActivity extends CallBaseActivity {
/** /**
* Temporary implementation of SignalingMessageReceiver until signaling related code is extracted from CallActivity. * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted from CallActivity.
*
* All listeners are called in the main thread.
*/ */
private static class CallActivitySignalingMessageReceiver extends SignalingMessageReceiver { private static class InternalSignalingMessageReceiver extends SignalingMessageReceiver {
public void process(NCSignalingMessage message) { public void process(NCSignalingMessage message) {
processSignalingMessage(message); processSignalingMessage(message);
} }

View File

@ -40,6 +40,7 @@ import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage; import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage; import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage; import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage;
import com.nextcloud.talk.signaling.SignalingMessageReceiver;
import com.nextcloud.talk.utils.MagicMap; import com.nextcloud.talk.utils.MagicMap;
import com.nextcloud.talk.utils.bundle.BundleKeys; import com.nextcloud.talk.utils.bundle.BundleKeys;
@ -109,6 +110,8 @@ public class MagicWebSocketInstance extends WebSocketListener {
private List<String> messagesQueue = new ArrayList<>(); private List<String> messagesQueue = new ArrayList<>();
private final ExternalSignalingMessageReceiver signalingMessageReceiver = new ExternalSignalingMessageReceiver();
MagicWebSocketInstance(User conversationUser, String connectionUrl, String webSocketTicket) { MagicWebSocketInstance(User conversationUser, String connectionUrl, String webSocketTicket) {
NextcloudTalkApplication.Companion.getSharedApplication().getComponentApplication().inject(this); NextcloudTalkApplication.Companion.getSharedApplication().getComponentApplication().inject(this);
@ -326,11 +329,7 @@ public class MagicWebSocketInstance extends WebSocketListener {
ncSignalingMessage.setFrom(callOverallWebSocketMessage.getCallWebSocketMessage().getSenderWebSocketMessage().getSessionId()); ncSignalingMessage.setFrom(callOverallWebSocketMessage.getCallWebSocketMessage().getSenderWebSocketMessage().getSessionId());
} }
if (!TextUtils.isEmpty(ncSignalingMessage.getFrom())) { signalingMessageReceiver.process(ncSignalingMessage);
HashMap<String, String> messageHashMap = new HashMap<>();
messageHashMap.put(JOB_ID, Integer.toString(magicMap.add(ncSignalingMessage)));
eventBus.post(new WebSocketCommunicationEvent("signalingMessage", messageHashMap));
}
break; break;
case "bye": case "bye":
connected = false; connected = false;
@ -471,4 +470,21 @@ public class MagicWebSocketInstance extends WebSocketListener {
restartWebSocket(); restartWebSocket();
} }
} }
public SignalingMessageReceiver getSignalingMessageReceiver() {
return signalingMessageReceiver;
}
/**
* Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
* class.
*
* All listeners are called in the WebSocket reader thread. This thread should be the same as long as the
* WebSocket stays connected, but it may change whenever it is connected again.
*/
private static class ExternalSignalingMessageReceiver extends SignalingMessageReceiver {
public void process(NCSignalingMessage message) {
processSignalingMessage(message);
}
}
} }