Use listener for participant list messages

Note that the thread used to handle the participant list messages from
the external signaling server does not change; the EventBus subscriber
mode was "BACKGROUND", but as the message was posted from a WebSocket
handler, which runs in a worker thread rather than in the main thread,
the subscriber was executed in the same thread as the poster.

Also note that the removed "userId" remark was not fully accurate;
although some external signaling messages do actually use "userid" those
currently handled to process the users do not, they always use "userId"
(as documented in the SignalingMessageReceiver).

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2022-10-22 02:57:44 +02:00
parent e0c676bb35
commit 5e224c5a24
2 changed files with 41 additions and 85 deletions

View File

@ -270,6 +270,27 @@ public class CallActivity extends CallBaseActivity {
private Map<String, SignalingMessageReceiver.CallParticipantMessageListener> callParticipantMessageListeners = private Map<String, SignalingMessageReceiver.CallParticipantMessageListener> callParticipantMessageListeners =
new HashMap<>(); new HashMap<>();
private SignalingMessageReceiver.ParticipantListMessageListener participantListMessageListener = new SignalingMessageReceiver.ParticipantListMessageListener() {
@Override
public void onUsersInRoom(List<Participant> participants) {
processUsersInRoom(participants);
}
@Override
public void onParticipantsUpdate(List<Participant> participants) {
processUsersInRoom(participants);
}
@Override
public void onAllParticipantsUpdate(long inCall) {
if (inCall == Participant.InCallFlags.DISCONNECTED) {
Log.d(TAG, "A moderator ended the call for all.");
hangup(true);
}
}
};
private SignalingMessageReceiver.OfferMessageListener offerMessageListener = new SignalingMessageReceiver.OfferMessageListener() { private SignalingMessageReceiver.OfferMessageListener offerMessageListener = new SignalingMessageReceiver.OfferMessageListener() {
@Override @Override
public void onOffer(String sessionId, String roomType, String sdp, String nick) { public void onOffer(String sessionId, String roomType, String sdp, String nick) {
@ -1217,6 +1238,7 @@ public class CallActivity extends CallBaseActivity {
@Override @Override
public void onDestroy() { public void onDestroy() {
signalingMessageReceiver.removeListener(participantListMessageListener);
signalingMessageReceiver.removeListener(offerMessageListener); signalingMessageReceiver.removeListener(offerMessageListener);
if (localStream != null) { if (localStream != null) {
@ -1350,6 +1372,7 @@ public class CallActivity extends CallBaseActivity {
setupAndInitiateWebSocketsConnection(); setupAndInitiateWebSocketsConnection();
} else { } else {
signalingMessageReceiver = internalSignalingMessageReceiver; signalingMessageReceiver = internalSignalingMessageReceiver;
signalingMessageReceiver.addListener(participantListMessageListener);
signalingMessageReceiver.addListener(offerMessageListener); signalingMessageReceiver.addListener(offerMessageListener);
joinRoomAndCall(); joinRoomAndCall();
} }
@ -1552,6 +1575,7 @@ public class CallActivity extends CallBaseActivity {
// Although setupAndInitiateWebSocketsConnection could be called several times the web socket is // Although setupAndInitiateWebSocketsConnection could be called several times the web socket is
// initialized just once, so the message receiver is also initialized just once. // initialized just once, so the message receiver is also initialized just once.
signalingMessageReceiver = webSocketClient.getSignalingMessageReceiver(); signalingMessageReceiver = webSocketClient.getSignalingMessageReceiver();
signalingMessageReceiver.addListener(participantListMessageListener);
signalingMessageReceiver.addListener(offerMessageListener); signalingMessageReceiver.addListener(offerMessageListener);
} else { } else {
if (webSocketClient.isConnected() && currentCallStatus == CallStatus.PUBLISHER_FAILED) { if (webSocketClient.isConnected() && currentCallStatus == CallStatus.PUBLISHER_FAILED) {
@ -1596,37 +1620,6 @@ public class CallActivity extends CallBaseActivity {
performCall(); performCall();
} }
break; break;
case PARTICIPANTS_UPDATE:
Log.d(TAG, "onMessageEvent 'participantsUpdate'");
// See MagicWebSocketInstance#onMessage in case "participants" how the 'updateParameters' are created
Map<String, String> updateParameters = webSocketCommunicationEvent.getHashMap();
if (updateParameters == null) {
break;
}
String updateRoomToken = updateParameters.get(ROOM_TOKEN);
String updateAll = updateParameters.get(UPDATE_ALL);
String updateInCall = updateParameters.get(UPDATE_IN_CALL);
String jobId = updateParameters.get(JOB_ID);
if (roomToken.equals(updateRoomToken)) {
if (updateAll != null && Boolean.parseBoolean(updateAll)) {
if ("0".equals(updateInCall)) {
Log.d(TAG, "Most probably a moderator ended the call for all.");
hangup(true);
}
} else if (jobId != null) {
// In that case a list of users for the room is passed.
processUsersInRoom(
(List<HashMap<String, Object>>) webSocketClient
.getJobWithId(
Integer.valueOf(jobId)));
}
}
break;
case "peerReadyForRequestingOffer": case "peerReadyForRequestingOffer":
Log.d(TAG, "onMessageEvent 'peerReadyForRequestingOffer'"); Log.d(TAG, "onMessageEvent 'peerReadyForRequestingOffer'");
webSocketClient.requestOfferForSessionIdWithType( webSocketClient.requestOfferForSessionIdWithType(
@ -1666,7 +1659,7 @@ public class CallActivity extends CallBaseActivity {
} }
if ("usersInRoom".equals(messageType)) { if ("usersInRoom".equals(messageType)) {
processUsersInRoom((List<HashMap<String, Object>>) signaling.getMessageWrapper()); internalSignalingMessageReceiver.process((List<Map<String, Object>>) signaling.getMessageWrapper());
} else if ("message".equals(messageType)) { } else if ("message".equals(messageType)) {
NCSignalingMessage ncSignalingMessage = LoganSquare.parse(signaling.getMessageWrapper().toString(), NCSignalingMessage ncSignalingMessage = LoganSquare.parse(signaling.getMessageWrapper().toString(),
NCSignalingMessage.class); NCSignalingMessage.class);
@ -1781,7 +1774,7 @@ public class CallActivity extends CallBaseActivity {
} }
} }
private void processUsersInRoom(List<HashMap<String, Object>> users) { private void processUsersInRoom(List<Participant> participants) {
Log.d(TAG, "processUsersInRoom"); Log.d(TAG, "processUsersInRoom");
List<String> newSessions = new ArrayList<>(); List<String> newSessions = new ArrayList<>();
Set<String> oldSessions = new HashSet<>(); Set<String> oldSessions = new HashSet<>();
@ -1800,27 +1793,20 @@ public class CallActivity extends CallBaseActivity {
boolean isSelfInCall = false; boolean isSelfInCall = false;
for (HashMap<String, Object> participant : users) { for (Participant participant : participants) {
long inCallFlag = (long) participant.get("inCall"); long inCallFlag = participant.getInCall();
if (!participant.get("sessionId").equals(currentSessionId)) { if (!participant.getSessionId().equals(currentSessionId)) {
Log.d(TAG, " inCallFlag of participant " Log.d(TAG, " inCallFlag of participant "
+ participant.get("sessionId").toString().substring(0, 4) + participant.getSessionId().substring(0, 4)
+ " : " + " : "
+ inCallFlag); + inCallFlag);
boolean isInCall = inCallFlag != 0; boolean isInCall = inCallFlag != 0;
if (isInCall) { if (isInCall) {
newSessions.add(participant.get("sessionId").toString()); newSessions.add(participant.getSessionId());
} }
// The property is "userId" when not using the external signaling server and "userid" when using it. userIdsBySessionId.put(participant.getSessionId(), participant.getUserId());
String userId = null;
if (participant.get("userId") != null) {
userId = participant.get("userId").toString();
} else if (participant.get("userid") != null) {
userId = participant.get("userid").toString();
}
userIdsBySessionId.put(participant.get("sessionId").toString(), userId);
} else { } else {
Log.d(TAG, " inCallFlag of currentSessionId: " + inCallFlag); Log.d(TAG, " inCallFlag of currentSessionId: " + inCallFlag);
isSelfInCall = inCallFlag != 0; isSelfInCall = inCallFlag != 0;
@ -2640,6 +2626,10 @@ public class CallActivity extends CallBaseActivity {
* All listeners are called in the main thread. * All listeners are called in the main thread.
*/ */
private static class InternalSignalingMessageReceiver extends SignalingMessageReceiver { private static class InternalSignalingMessageReceiver extends SignalingMessageReceiver {
public void process(List<Map<String, Object>> users) {
processUsersInRoom(users);
}
public void process(NCSignalingMessage message) { public void process(NCSignalingMessage message) {
processSignalingMessage(message); processSignalingMessage(message);
} }

View File

@ -279,45 +279,7 @@ public class MagicWebSocketInstance extends WebSocketListener {
} }
break; break;
case TARGET_PARTICIPANTS: case TARGET_PARTICIPANTS:
if (EVENT_TYPE_UPDATE.equals(eventOverallWebSocketMessage.getEventMap().get(EVENT_TYPE))) { signalingMessageReceiver.process(eventOverallWebSocketMessage.getEventMap());
HashMap<String, String> refreshChatHashMap = new HashMap<>();
HashMap<String, Object> updateEventMap = (HashMap<String, Object>) eventOverallWebSocketMessage.getEventMap().get(EVENT_TYPE_UPDATE);
if (updateEventMap == null) {
break;
}
if (updateEventMap.containsKey(UPDATE_ROOM_ID)) {
Object updateRoomId = updateEventMap.get(UPDATE_ROOM_ID);
if (updateRoomId != null) {
refreshChatHashMap.put(ROOM_TOKEN,
(String) updateEventMap.get(UPDATE_ROOM_ID));
}
}
if (updateEventMap.containsKey(UPDATE_USERS)) {
Object updateUsers = updateEventMap.get(UPDATE_USERS);
if (updateUsers != null) {
refreshChatHashMap.put(JOB_ID, Integer.toString(magicMap.add(updateUsers)));
}
}
if (updateEventMap.containsKey(UPDATE_IN_CALL)) {
Object inCall = updateEventMap.get(UPDATE_IN_CALL);
if (inCall != null) {
refreshChatHashMap.put(UPDATE_IN_CALL, Long.toString((Long) inCall));
}
}
if (updateEventMap.containsKey(UPDATE_ALL)) {
Object updateAll = updateEventMap.get(UPDATE_ALL);
if (updateAll != null) {
refreshChatHashMap.put(UPDATE_ALL, Boolean.toString((Boolean) updateAll));
}
}
eventBus.post(new WebSocketCommunicationEvent(PARTICIPANTS_UPDATE, refreshChatHashMap));
}
break; break;
} }
} }
@ -483,6 +445,10 @@ public class MagicWebSocketInstance extends WebSocketListener {
* WebSocket stays connected, but it may change whenever it is connected again. * WebSocket stays connected, but it may change whenever it is connected again.
*/ */
private static class ExternalSignalingMessageReceiver extends SignalingMessageReceiver { private static class ExternalSignalingMessageReceiver extends SignalingMessageReceiver {
public void process(Map<String, Object> eventMap) {
processEvent(eventMap);
}
public void process(NCSignalingMessage message) { public void process(NCSignalingMessage message) {
processSignalingMessage(message); processSignalingMessage(message);
} }