From 04f1679e2a7c304a6b4f9e14e196a103b8fe5958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Tue, 22 Nov 2022 19:24:39 +0100 Subject: [PATCH] Add observer for peer connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The observer is just an adapter for the "PeerConnection.Observer" provided by the WebRTC library; a custom observer is used to expose only the events needed outside "PeerConnectionWrapper". For now only the same events that were already handled are taken into account, but at a later point additional events (like "onAddTrack" instead of "onAddStream", which is deprecated) could be added too. Note that the thread used to handle the events has changed; the EventBus subscriber mode was "MAIN", but as the events were posted from a PeerConnection observer, which run in a worker thread rather than in the main thread, the subscriber was executed in the main thread rather than in the same thread as the poster. Due to this the actions performed by the handler now must be explicitly run in the main thread. Signed-off-by: Daniel Calviño Sánchez --- .../talk/activities/CallActivity.java | 123 +++++++++++------- .../talk/webrtc/PeerConnectionNotifier.java | 68 ++++++++++ .../talk/webrtc/PeerConnectionWrapper.java | 62 +++++---- 3 files changed, 180 insertions(+), 73 deletions(-) create mode 100644 app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionNotifier.java diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java index 8d25ebd80..8acd14b52 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.java @@ -63,7 +63,6 @@ import com.nextcloud.talk.application.NextcloudTalkApplication; import com.nextcloud.talk.data.user.model.User; import com.nextcloud.talk.databinding.CallActivityBinding; import com.nextcloud.talk.events.ConfigurationChangeEvent; -import com.nextcloud.talk.events.MediaStreamEvent; import com.nextcloud.talk.events.NetworkEvent; import com.nextcloud.talk.events.PeerConnectionEvent; import com.nextcloud.talk.events.WebSocketCommunicationEvent; @@ -269,6 +268,8 @@ public class CallActivity extends CallBaseActivity { private Map dataChannelMessageListeners = new HashMap<>(); + private Map peerConnectionObservers = new HashMap<>(); + private SignalingMessageReceiver.ParticipantListMessageListener participantListMessageListener = new SignalingMessageReceiver.ParticipantListMessageListener() { @Override @@ -1989,6 +1990,11 @@ public class CallActivity extends CallBaseActivity { signalingMessageReceiver.addListener(offerAnswerNickProvider.getScreenWebRtcMessageListener(), sessionId, "screen"); } + PeerConnectionWrapper.PeerConnectionObserver peerConnectionObserver = + new CallActivityPeerConnectionObserver(sessionId, type); + peerConnectionObservers.put(sessionId + "-" + type, peerConnectionObserver); + peerConnectionWrapper.addObserver(peerConnectionObserver); + if (!publisher) { runOnUiThread(() -> { // userId is unknown here, but it will be got based on the session id, and the stream will be @@ -2031,6 +2037,9 @@ public class CallActivity extends CallBaseActivity { } String videoStreamType = peerConnectionWrapper.getVideoStreamType(); if (VIDEO_STREAM_TYPE_SCREEN.equals(videoStreamType) || !justScreen) { + PeerConnectionWrapper.PeerConnectionObserver peerConnectionObserver = peerConnectionObservers.remove(sessionId + "-" + videoStreamType); + peerConnectionWrapper.removeObserver(peerConnectionObserver); + runOnUiThread(() -> removeMediaStream(sessionId, videoStreamType)); deletePeerConnection(peerConnectionWrapper); } @@ -2116,24 +2125,7 @@ public class CallActivity extends CallBaseActivity { @Subscribe(threadMode = ThreadMode.MAIN) public void onMessageEvent(PeerConnectionEvent peerConnectionEvent) { - String sessionId = peerConnectionEvent.getSessionId(); - - if (peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_CONNECTED || - peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_COMPLETED) { - handlePeerConnected(sessionId, peerConnectionEvent.getVideoStreamType()); - } else if (peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_DISCONNECTED || - peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_NEW || - peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_CHECKING) { - handlePeerDisconnected(sessionId, peerConnectionEvent.getVideoStreamType()); - } else if (peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_CLOSED) { - endPeerConnection(sessionId, VIDEO_STREAM_TYPE_SCREEN.equals(peerConnectionEvent.getVideoStreamType())); - } else if (peerConnectionEvent.getPeerConnectionEventType() == + if (peerConnectionEvent.getPeerConnectionEventType() == PeerConnectionEvent.PeerConnectionEventType.SENSOR_FAR || peerConnectionEvent.getPeerConnectionEventType() == PeerConnectionEvent.PeerConnectionEventType.SENSOR_NEAR) { @@ -2147,15 +2139,6 @@ public class CallActivity extends CallBaseActivity { toggleMedia(enableVideo, true); } } - } else if (peerConnectionEvent.getPeerConnectionEventType() == - PeerConnectionEvent.PeerConnectionEventType.PEER_FAILED) { - if (webSocketClient != null && webSocketClient.getSessionId() != null && webSocketClient.getSessionId().equals(sessionId)) { - setCallState(CallStatus.PUBLISHER_FAILED); - webSocketClient.clearResumeId(); - hangup(false); - } else { - handlePeerDisconnected(sessionId, peerConnectionEvent.getVideoStreamType()); - } } } @@ -2221,25 +2204,6 @@ public class CallActivity extends CallBaseActivity { } } - @Subscribe(threadMode = ThreadMode.MAIN) - public void onMessageEvent(MediaStreamEvent mediaStreamEvent) { - String participantDisplayItemId = mediaStreamEvent.getSession() + "-" + mediaStreamEvent.getVideoStreamType(); - if (participantDisplayItems.get(participantDisplayItemId) == null) { - return; - } - - boolean hasAtLeastOneVideoStream = false; - if (mediaStreamEvent.getMediaStream() != null) { - hasAtLeastOneVideoStream = mediaStreamEvent.getMediaStream().videoTracks != null - && mediaStreamEvent.getMediaStream().videoTracks.size() > 0; - } - - ParticipantDisplayItem participantDisplayItem = participantDisplayItems.get(participantDisplayItemId); - participantDisplayItem.setMediaStream(mediaStreamEvent.getMediaStream()); - participantDisplayItem.setStreamEnabled(hasAtLeastOneVideoStream); - participantsAdapter.notifyDataSetChanged(); - } - @Override public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions, @NonNull int[] grantResults) { @@ -2694,6 +2658,71 @@ public class CallActivity extends CallBaseActivity { } } + private class CallActivityPeerConnectionObserver implements PeerConnectionWrapper.PeerConnectionObserver { + + private final String sessionId; + private final String videoStreamType; + private final String participantDisplayItemId; + + private CallActivityPeerConnectionObserver(String sessionId, String videoStreamType) { + this.sessionId = sessionId; + this.videoStreamType = videoStreamType; + this.participantDisplayItemId = sessionId + "-" + videoStreamType; + } + + @Override + public void onStreamAdded(MediaStream mediaStream) { + handleStream(mediaStream); + } + + @Override + public void onStreamRemoved(MediaStream mediaStream) { + handleStream(null); + } + + private void handleStream(MediaStream mediaStream) { + runOnUiThread(() -> { + if (participantDisplayItems.get(participantDisplayItemId) == null) { + return; + } + + boolean hasAtLeastOneVideoStream = false; + if (mediaStream != null) { + hasAtLeastOneVideoStream = mediaStream.videoTracks != null && mediaStream.videoTracks.size() > 0; + } + + ParticipantDisplayItem participantDisplayItem = participantDisplayItems.get(participantDisplayItemId); + participantDisplayItem.setMediaStream(mediaStream); + participantDisplayItem.setStreamEnabled(hasAtLeastOneVideoStream); + participantsAdapter.notifyDataSetChanged(); + }); + } + + @Override + public void onIceConnectionStateChanged(PeerConnection.IceConnectionState iceConnectionState) { + runOnUiThread(() -> { + if (iceConnectionState == PeerConnection.IceConnectionState.CONNECTED || + iceConnectionState == PeerConnection.IceConnectionState.COMPLETED) { + handlePeerConnected(sessionId, videoStreamType); + } else if (iceConnectionState == PeerConnection.IceConnectionState.DISCONNECTED || + iceConnectionState == PeerConnection.IceConnectionState.NEW || + iceConnectionState == PeerConnection.IceConnectionState.CHECKING) { + handlePeerDisconnected(sessionId, videoStreamType); + } else if (iceConnectionState == PeerConnection.IceConnectionState.CLOSED) { + endPeerConnection(sessionId, VIDEO_STREAM_TYPE_SCREEN.equals(videoStreamType)); + } else if (iceConnectionState == PeerConnection.IceConnectionState.FAILED) { + if (webSocketClient != null && webSocketClient.getSessionId() != null && webSocketClient.getSessionId().equals(sessionId)) { + setCallState(CallStatus.PUBLISHER_FAILED); + webSocketClient.clearResumeId(); + hangup(false); + } else { + handlePeerDisconnected(sessionId, videoStreamType); + } + } + }); + } + } + private class InternalSignalingMessageSender implements SignalingMessageSender { @Override diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionNotifier.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionNotifier.java new file mode 100644 index 000000000..d2a9db207 --- /dev/null +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionNotifier.java @@ -0,0 +1,68 @@ +/* + * Nextcloud Talk application + * + * @author Daniel Calviño Sánchez + * Copyright (C) 2022 Daniel Calviño Sánchez + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package com.nextcloud.talk.webrtc; + +import org.webrtc.MediaStream; +import org.webrtc.PeerConnection; + +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * Helper class to register and notify PeerConnectionObserver. + * + * This class is only meant for internal use by PeerConnectionWrapper; observers must register themselves against + * a PeerConnectionWrapper rather than against a PeerConnectionNotifier. + */ +public class PeerConnectionNotifier { + + private final Set peerConnectionObservers = new LinkedHashSet<>(); + + public synchronized void addObserver(PeerConnectionWrapper.PeerConnectionObserver observer) { + if (observer == null) { + throw new IllegalArgumentException("PeerConnectionObserver can not be null"); + } + + peerConnectionObservers.add(observer); + } + + public synchronized void removeObserver(PeerConnectionWrapper.PeerConnectionObserver observer) { + peerConnectionObservers.remove(observer); + } + + public synchronized void notifyStreamAdded(MediaStream stream) { + for (PeerConnectionWrapper.PeerConnectionObserver observer : new ArrayList<>(peerConnectionObservers)) { + observer.onStreamAdded(stream); + } + } + + public synchronized void notifyStreamRemoved(MediaStream stream) { + for (PeerConnectionWrapper.PeerConnectionObserver observer : new ArrayList<>(peerConnectionObservers)) { + observer.onStreamRemoved(stream); + } + } + + public synchronized void notifyIceConnectionStateChanged(PeerConnection.IceConnectionState state) { + for (PeerConnectionWrapper.PeerConnectionObserver observer : new ArrayList<>(peerConnectionObservers)) { + observer.onIceConnectionStateChanged(state); + } + } +} diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 5c9eac672..f4a7d6b7e 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -28,8 +28,6 @@ import android.util.Log; import com.bluelinelabs.logansquare.LoganSquare; import com.nextcloud.talk.application.NextcloudTalkApplication; -import com.nextcloud.talk.events.MediaStreamEvent; -import com.nextcloud.talk.events.PeerConnectionEvent; import com.nextcloud.talk.models.json.signaling.DataChannelMessage; import com.nextcloud.talk.models.json.signaling.NCIceCandidate; import com.nextcloud.talk.models.json.signaling.NCMessagePayload; @@ -37,7 +35,6 @@ import com.nextcloud.talk.models.json.signaling.NCSignalingMessage; import com.nextcloud.talk.signaling.SignalingMessageReceiver; import com.nextcloud.talk.signaling.SignalingMessageSender; -import org.greenrobot.eventbus.EventBus; import org.webrtc.AudioTrack; import org.webrtc.DataChannel; import org.webrtc.IceCandidate; @@ -85,6 +82,21 @@ public class PeerConnectionWrapper { void onNickChanged(String nick); } + /** + * Observer for changes on the peer connection. + * + * The changes are bound to a specific peer connection, so each observer is expected to handle messages only for + * a single peer connection. + * + * All methods are called on the so called "signaling" thread of WebRTC, which is an internal thread created by the + * WebRTC library and NOT the same thread where signaling messages are received. + */ + public interface PeerConnectionObserver { + void onStreamAdded(MediaStream mediaStream); + void onStreamRemoved(MediaStream mediaStream); + void onIceConnectionStateChanged(PeerConnection.IceConnectionState iceConnectionState); + } + private static final String TAG = PeerConnectionWrapper.class.getCanonicalName(); private final SignalingMessageReceiver signalingMessageReceiver; @@ -94,6 +106,8 @@ public class PeerConnectionWrapper { private final DataChannelMessageNotifier dataChannelMessageNotifier = new DataChannelMessageNotifier(); + private final PeerConnectionNotifier peerConnectionNotifier = new PeerConnectionNotifier(); + private List iceCandidates = new ArrayList<>(); private PeerConnection peerConnection; private String sessionId; @@ -186,6 +200,21 @@ public class PeerConnectionWrapper { dataChannelMessageNotifier.removeListener(listener); } + /** + * Adds an observer for peer connection changes. + * + * An observer is expected to be added only once. If the same observer is added again it will be notified just once. + * + * @param observer the PeerConnectionObserver + */ + public void addObserver(PeerConnectionObserver observer) { + peerConnectionNotifier.addObserver(observer); + } + + public void removeObserver(PeerConnectionObserver observer) { + peerConnectionNotifier.removeObserver(observer); + } + public String getVideoStreamType() { return videoStreamType; } @@ -412,31 +441,12 @@ public class PeerConnectionWrapper { Log.d("iceConnectionChangeTo: ", iceConnectionState.name() + " over " + peerConnection.hashCode() + " " + sessionId); if (iceConnectionState == PeerConnection.IceConnectionState.CONNECTED) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_CONNECTED, - sessionId, null, null, videoStreamType)); - if (hasInitiated) { sendInitialMediaStatus(); } - } else if (iceConnectionState == PeerConnection.IceConnectionState.COMPLETED) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_COMPLETED, - sessionId, null, null, videoStreamType)); - } else if (iceConnectionState == PeerConnection.IceConnectionState.CLOSED) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType - .PEER_CLOSED, sessionId, null, null, videoStreamType)); - } else if (iceConnectionState == PeerConnection.IceConnectionState.DISCONNECTED) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_DISCONNECTED, - sessionId, null, null, videoStreamType)); - } else if (iceConnectionState == PeerConnection.IceConnectionState.NEW) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_NEW, - sessionId, null, null, videoStreamType)); - } else if (iceConnectionState == PeerConnection.IceConnectionState.CHECKING) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_CHECKING, - sessionId, null, null, videoStreamType)); - } else if (iceConnectionState == PeerConnection.IceConnectionState.FAILED) { - EventBus.getDefault().post(new PeerConnectionEvent(PeerConnectionEvent.PeerConnectionEventType.PEER_FAILED, - sessionId, null, null, videoStreamType)); } + + peerConnectionNotifier.notifyIceConnectionStateChanged(iceConnectionState); } @Override @@ -473,12 +483,12 @@ public class PeerConnectionWrapper { @Override public void onAddStream(MediaStream mediaStream) { - EventBus.getDefault().post(new MediaStreamEvent(mediaStream, sessionId, videoStreamType)); + peerConnectionNotifier.notifyStreamAdded(mediaStream); } @Override public void onRemoveStream(MediaStream mediaStream) { - EventBus.getDefault().post(new MediaStreamEvent(null, sessionId, videoStreamType)); + peerConnectionNotifier.notifyStreamRemoved(mediaStream); } @Override