From a1eb90da9f6a747f34a1187c0e46ed824907045f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 11:32:51 +0100 Subject: [PATCH 01/16] Remove Dagger related code from PeerConnectionWrapper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The PeerConnectionWrapper does not need to be injected in the application, nor the Context needs to be injected in the PeerConnectionWrapper. This all seems to be leftovers from the past, and removing them would ease adding unit tests for the PeerConnectionWrapper. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index fe44e797d..b77950ec3 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -7,11 +7,9 @@ */ package com.nextcloud.talk.webrtc; -import android.content.Context; import android.util.Log; import com.bluelinelabs.logansquare.LoganSquare; -import com.nextcloud.talk.application.NextcloudTalkApplication; import com.nextcloud.talk.models.json.signaling.DataChannelMessage; import com.nextcloud.talk.models.json.signaling.NCIceCandidate; import com.nextcloud.talk.models.json.signaling.NCMessagePayload; @@ -38,19 +36,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; - -import javax.inject.Inject; import androidx.annotation.Nullable; -import autodagger.AutoInjector; -@AutoInjector(NextcloudTalkApplication.class) public class PeerConnectionWrapper { - @Inject - Context context; - private static final String TAG = PeerConnectionWrapper.class.getCanonicalName(); private final SignalingMessageReceiver signalingMessageReceiver; @@ -117,9 +107,6 @@ public class PeerConnectionWrapper { boolean isMCUPublisher, boolean hasMCU, String videoStreamType, SignalingMessageReceiver signalingMessageReceiver, SignalingMessageSender signalingMessageSender) { - - Objects.requireNonNull(NextcloudTalkApplication.Companion.getSharedApplication()).getComponentApplication().inject(this); - this.localStream = localStream; this.videoStreamType = videoStreamType; From 26be0655454b04c27ad50e1c8dffb52921918201 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:13:36 +0100 Subject: [PATCH 02/16] Add dummy Log implementation to be used in tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Log methods are static, so they can not be mocked using Mockito. Although it might be possible to use PowerMockito a dummy implementation was added instead, as Log uses are widespread and it is not something worth mocking anyway. Signed-off-by: Daniel Calviño Sánchez --- app/src/test/java/android/util/Log.java | 51 +++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 app/src/test/java/android/util/Log.java diff --git a/app/src/test/java/android/util/Log.java b/app/src/test/java/android/util/Log.java new file mode 100644 index 000000000..7090f73cd --- /dev/null +++ b/app/src/test/java/android/util/Log.java @@ -0,0 +1,51 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package android.util; + +/** + * Dummy implementation of android.util.Log to be used in unit tests. + *

+ * The Android Gradle plugin provides a library with the APIs of the Android framework that throws an exception if any + * of them are called. This class is loaded before that library and therefore becomes the implementation used during the + * tests, simply printing the messages to the system console. + */ +public class Log { + + public static int d(String tag, String msg) { + System.out.println("DEBUG: " + tag + ": " + msg); + + return 1; + } + + public static int e(String tag, String msg) { + System.out.println("ERROR: " + tag + ": " + msg); + + return 1; + } + + public static int i(String tag, String msg) { + System.out.println("INFO: " + tag + ": " + msg); + + return 1; + } + + public static boolean isLoggable(String tag, int level) { + return true; + } + + public static int v(String tag, String msg) { + System.out.println("VERBOSE: " + tag + ": " + msg); + + return 1; + } + + public static int w(String tag, String msg) { + System.out.println("WARN: " + tag + ": " + msg); + + return 1; + } +} From a43a33a16a8d175eaa9fed3222e1fdf9618394c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:34:06 +0100 Subject: [PATCH 03/16] Add unit tests for receiving data channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 3 + .../talk/webrtc/PeerConnectionWrapperTest.kt | 194 ++++++++++++++++++ 2 files changed, 197 insertions(+) create mode 100644 app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index b77950ec3..fdc211e74 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -71,6 +71,9 @@ public class PeerConnectionWrapper { /** * Listener for data channel messages. *

+ * Messages might have been received on any data channel, independently of its label or whether it was open by the + * local or the remote peer. + *

* The messages are bound to a specific peer connection, so each listener is expected to handle messages only for * a single peer connection. *

diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt new file mode 100644 index 000000000..98e2de4ae --- /dev/null +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -0,0 +1,194 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package com.nextcloud.talk.webrtc + +import com.bluelinelabs.logansquare.LoganSquare +import com.nextcloud.talk.models.json.signaling.DataChannelMessage +import com.nextcloud.talk.signaling.SignalingMessageReceiver +import com.nextcloud.talk.signaling.SignalingMessageSender +import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListener +import org.junit.Before +import org.junit.Test +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.eq +import org.mockito.Mockito +import org.mockito.Mockito.doNothing +import org.webrtc.DataChannel +import org.webrtc.MediaConstraints +import org.webrtc.PeerConnection +import org.webrtc.PeerConnectionFactory +import java.nio.ByteBuffer +import java.util.HashMap + +class PeerConnectionWrapperTest { + + private var peerConnectionWrapper: PeerConnectionWrapper? = null + private var mockedPeerConnection: PeerConnection? = null + private var mockedPeerConnectionFactory: PeerConnectionFactory? = null + private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null + private var mockedSignalingMessageSender: SignalingMessageSender? = null + + private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { + return DataChannel.Buffer( + ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), + false + ) + } + + @Before + fun setUp() { + mockedPeerConnection = Mockito.mock(PeerConnection::class.java) + mockedPeerConnectionFactory = Mockito.mock(PeerConnectionFactory::class.java) + mockedSignalingMessageReceiver = Mockito.mock(SignalingMessageReceiver::class.java) + mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) + } + + @Test + fun testReceiveDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + // The payload must be a map to be able to serialize it and, therefore, generate the data that would have been + // received from another participant, so it is not possible to test receiving the nick as a String payload. + val payloadMap = HashMap() + payloadMap["name"] = "the-nick-in-map" + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("nickChanged", null, payloadMap)) + ) + + Mockito.verify(mockedDataChannelMessageListener).onNickChanged("the-nick-in-map") + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("videoOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onVideoOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("videoOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onVideoOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + + @Test + fun testReceiveDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val randomIdDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + doNothing().`when`(mockedRandomIdDataChannel).registerObserver( + randomIdDataChannelObserverArgumentCaptor.capture() + ) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + randomIdDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } +} From 0555e0edbdc9a86d81f667d66bab311c0e18480a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:54:00 +0100 Subject: [PATCH 04/16] Unify log messages for received data channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index fdc211e74..34ea47eaa 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -379,7 +379,7 @@ public class PeerConnectionWrapper { @Override public void onMessage(DataChannel.Buffer buffer) { if (buffer.binary) { - Log.d(TAG, "Received binary msg over " + TAG + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + TAG + " " + sessionId); return; } @@ -387,7 +387,7 @@ public class PeerConnectionWrapper { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Got msg: " + strData + " over " + TAG + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + TAG + " " + sessionId); DataChannelMessage dataChannelMessage; try { From 1f4c25caa37824283aa14447ca76904448003c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 14:55:57 +0100 Subject: [PATCH 05/16] Include data channel label in log message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This implicitly fixes trying to send the initial state on the latest remote data channel found (which is the one stored in the "dataChannel" attribute of the "PeerConnectionWrapper") when any other existing data channel changes its status to open. Nevertheless, as all this will be reworked, no unit test was added for it. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 34ea47eaa..2d033d08e 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import androidx.annotation.Nullable; @@ -144,7 +145,7 @@ public class PeerConnectionWrapper { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; dataChannel = peerConnection.createDataChannel("status", init); - dataChannel.registerObserver(new DataChannelObserver()); + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -363,6 +364,12 @@ public class PeerConnectionWrapper { private class DataChannelObserver implements DataChannel.Observer { + private final DataChannel dataChannel; + + public DataChannelObserver(DataChannel dataChannel) { + this.dataChannel = Objects.requireNonNull(dataChannel); + } + @Override public void onBufferedAmountChange(long l) { @@ -370,8 +377,7 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { - if (dataChannel != null && - dataChannel.state() == DataChannel.State.OPEN) { + if (dataChannel.state() == DataChannel.State.OPEN) { sendInitialMediaStatus(); } } @@ -379,7 +385,7 @@ public class PeerConnectionWrapper { @Override public void onMessage(DataChannel.Buffer buffer) { if (buffer.binary) { - Log.d(TAG, "Received binary data channel message over " + TAG + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + dataChannel.label() + " " + sessionId); return; } @@ -387,7 +393,7 @@ public class PeerConnectionWrapper { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Received data channel message (" + strData + ") over " + TAG + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannel.label() + " " + sessionId); DataChannelMessage dataChannelMessage; try { @@ -512,7 +518,7 @@ public class PeerConnectionWrapper { + " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label()); } PeerConnectionWrapper.this.dataChannel = dataChannel; - PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver()); + PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver(dataChannel)); } @Override From d1a4265491d998b8371f48fb8b9e9b926b3e0b9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 14:00:57 +0100 Subject: [PATCH 06/16] Rename "sendChannelData" to "send" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The legacy name was a bit strange, so now it is renamed to just "send" as the parameter type ("DataChannelMessage") gives enough context. Signed-off-by: Daniel Calviño Sánchez --- .../java/com/nextcloud/talk/activities/CallActivity.kt | 10 +++++----- .../nextcloud/talk/webrtc/PeerConnectionWrapper.java | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt index 9220952c0..3bcc7611b 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt @@ -1174,12 +1174,12 @@ class CallActivity : CallBaseActivity() { if (isConnectionEstablished && othersInCall) { if (!hasMCU) { for (peerConnectionWrapper in peerConnectionWrapperList) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage)) + peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage)) } } else { for (peerConnectionWrapper in peerConnectionWrapperList) { if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage)) + peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage)) break } } @@ -1370,12 +1370,12 @@ class CallActivity : CallBaseActivity() { if (isConnectionEstablished) { if (!hasMCU) { for (peerConnectionWrapper in peerConnectionWrapperList) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(message)) + peerConnectionWrapper.send(DataChannelMessage(message)) } } else { for (peerConnectionWrapper in peerConnectionWrapperList) { if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(message)) + peerConnectionWrapper.send(DataChannelMessage(message)) break } } @@ -2563,7 +2563,7 @@ class CallActivity : CallBaseActivity() { } override fun onNext(aLong: Long) { - peerConnectionWrapper.sendChannelData(dataChannelMessage) + peerConnectionWrapper.send(dataChannelMessage) } override fun onError(e: Throwable) { diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 2d033d08e..bf95befb8 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -269,7 +269,7 @@ public class PeerConnectionWrapper { } } - public void sendChannelData(DataChannelMessage dataChannelMessage) { + public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; if (dataChannel != null && dataChannelMessage != null) { try { @@ -292,15 +292,15 @@ public class PeerConnectionWrapper { private void sendInitialMediaStatus() { if (localStream != null) { if (localStream.videoTracks.size() == 1 && localStream.videoTracks.get(0).enabled()) { - sendChannelData(new DataChannelMessage("videoOn")); + send(new DataChannelMessage("videoOn")); } else { - sendChannelData(new DataChannelMessage("videoOff")); + send(new DataChannelMessage("videoOff")); } if (localStream.audioTracks.size() == 1 && localStream.audioTracks.get(0).enabled()) { - sendChannelData(new DataChannelMessage("audioOn")); + send(new DataChannelMessage("audioOn")); } else { - sendChannelData(new DataChannelMessage("audioOff")); + send(new DataChannelMessage("audioOff")); } } } From 3ffc6db56dbfdf33a12dd4557ce0e826b7c367c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Thu, 5 Dec 2024 17:42:34 +0100 Subject: [PATCH 07/16] Send data channel messages using "status" data channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data channel messages are expected to be sent using the "status" data channel that is locally created. However, if another data channel was opened by the remote peer the reference to the "status" data channel was overwritten with the new data channel, and messages were sent instead on the remote data channel. In current Talk versions that was not a problem, and the change makes no difference either, because since the support for Janus 1.x was added data channel messages are listened on all data channels, independently of their label or whether they were created by the local or remote peer. However, in older Talk versions this fixes a regression introduced with the support for Janus 1.x. In those versions only messages coming from the "status" or "JanusDataChannel" data channels were taken into account. When Janus is not used the WebUI opens the legacy "simplewebrtc" data channel, so that data channel may be the one used to send data channel messages (if it is open after the "status" data channel), but the messages received on that data channel were ignored by the WebUI. Nevertheless, at this point this is more an academic problem than a real world problem, as it is unlikely that there are many Nextcloud servers with Talk < 16 and without HPB being used. Independently of all that, when the peer connection is removed only the "status" data channel is disposed, but none of the remote data channels are. This is just a variation of an already existing bug (the last open data channel was the one disposed due to being the last saved reference, but the rest were not) and it will be fixed in another commit. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 35 ++++--- .../talk/webrtc/PeerConnectionWrapperTest.kt | 97 +++++++++++++++++++ 2 files changed, 118 insertions(+), 14 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index bf95befb8..6413f0344 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -57,7 +57,7 @@ public class PeerConnectionWrapper { private PeerConnection peerConnection; private String sessionId; private final MediaConstraints mediaConstraints; - private DataChannel dataChannel; + private DataChannel statusDataChannel; private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -144,8 +144,8 @@ public class PeerConnectionWrapper { if (hasMCU || hasInitiated) { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; - dataChannel = peerConnection.createDataChannel("status", init); - dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + statusDataChannel = peerConnection.createDataChannel("status", init); + statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel)); if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -233,9 +233,9 @@ public class PeerConnectionWrapper { public void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); - if (dataChannel != null) { - dataChannel.dispose(); - dataChannel = null; + if (statusDataChannel != null) { + statusDataChannel.dispose(); + statusDataChannel = null; Log.d(TAG, "Disposed DataChannel"); } else { Log.d(TAG, "DataChannel is null."); @@ -269,12 +269,24 @@ public class PeerConnectionWrapper { } } + /** + * Sends a data channel message. + *

+ * Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used, + * messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel; + * messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the + * "status" data channel subscriber connections will receive it on a data channel with a different label, as + * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel + * messages on it, independently of on which data channel they were originally sent. + * + * @param dataChannelMessage the message to send + */ public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; - if (dataChannel != null && dataChannelMessage != null) { + if (statusDataChannel != null && dataChannelMessage != null) { try { buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); - dataChannel.send(new DataChannel.Buffer(buffer, false)); + statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); } @@ -513,12 +525,7 @@ public class PeerConnectionWrapper { @Override public void onDataChannel(DataChannel dataChannel) { - if (PeerConnectionWrapper.this.dataChannel != null) { - Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label() - + " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label()); - } - PeerConnectionWrapper.this.dataChannel = dataChannel; - PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 98e2de4ae..be628c2af 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -14,10 +14,13 @@ import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListene import org.junit.Before import org.junit.Test import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatcher import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.argThat import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito import org.mockito.Mockito.doNothing +import org.mockito.Mockito.never import org.webrtc.DataChannel import org.webrtc.MediaConstraints import org.webrtc.PeerConnection @@ -33,6 +36,19 @@ class PeerConnectionWrapperTest { private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null private var mockedSignalingMessageSender: SignalingMessageSender? = null + /** + * Helper matcher for DataChannelMessages. + */ + private inner class MatchesDataChannelMessage( + private val expectedDataChannelMessage: DataChannelMessage + ) : ArgumentMatcher { + override fun matches(buffer: DataChannel.Buffer): Boolean { + // DataChannel.Buffer does not implement "equals", so the comparison needs to be done on the ByteBuffer + // instead. + return dataChannelMessageToBuffer(expectedDataChannelMessage).data.equals(buffer.data) + } + } + private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { return DataChannel.Buffer( ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), @@ -48,6 +64,87 @@ class PeerConnectionWrapperTest { mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) } + @Test + fun testSendDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + } + + @Test + fun testSendDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`( From 014b18de8923e9ab398521d7ae1b0d0c5b4f74c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 03:08:15 +0100 Subject: [PATCH 08/16] Fix remote data channels not disposed when removing peer connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 36 ++++++++++++---- .../talk/webrtc/PeerConnectionWrapperTest.kt | 43 +++++++++++++++++++ 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 6413f0344..27ad9e439 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -57,7 +58,7 @@ public class PeerConnectionWrapper { private PeerConnection peerConnection; private String sessionId; private final MediaConstraints mediaConstraints; - private DataChannel statusDataChannel; + private final Map dataChannels = new HashMap<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -144,8 +145,11 @@ public class PeerConnectionWrapper { if (hasMCU || hasInitiated) { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; - statusDataChannel = peerConnection.createDataChannel("status", init); + + DataChannel statusDataChannel = peerConnection.createDataChannel("status", init); statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel)); + dataChannels.put("status", statusDataChannel); + if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -233,13 +237,12 @@ public class PeerConnectionWrapper { public void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); - if (statusDataChannel != null) { - statusDataChannel.dispose(); - statusDataChannel = null; - Log.d(TAG, "Disposed DataChannel"); - } else { - Log.d(TAG, "DataChannel is null."); + for (DataChannel dataChannel: dataChannels.values()) { + Log.d(TAG, "Disposed DataChannel " + dataChannel.label()); + + dataChannel.dispose(); } + dataChannels.clear(); if (peerConnection != null) { peerConnection.close(); @@ -283,6 +286,7 @@ public class PeerConnectionWrapper { */ public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; + DataChannel statusDataChannel = dataChannels.get("status"); if (statusDataChannel != null && dataChannelMessage != null) { try { buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); @@ -525,7 +529,23 @@ public class PeerConnectionWrapper { @Override public void onDataChannel(DataChannel dataChannel) { + // Another data channel with the same label, no matter if the same instance or a different one, should not + // be added, but just in case. + DataChannel oldDataChannel = dataChannels.get(dataChannel.label()); + if (oldDataChannel == dataChannel) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); + + return; + } + + if (oldDataChannel != null) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); + + oldDataChannel.dispose(); + } + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannels.put(dataChannel.label(), dataChannel); } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index be628c2af..450cc4fa8 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -288,4 +288,47 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedDataChannelMessageListener).onAudioOff() Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) } + + @Test + fun testRemovePeerConnectionWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.removePeerConnection() + + Mockito.verify(mockedStatusDataChannel).dispose() + Mockito.verify(mockedRandomIdDataChannel).dispose() + } } From a3cf897d137bad3dc3bd3e58d7c5bf02feb044e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:13:49 +0100 Subject: [PATCH 09/16] Move variable declaration into try block MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 27ad9e439..08c6b8fcf 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -285,11 +285,10 @@ public class PeerConnectionWrapper { * @param dataChannelMessage the message to send */ public void send(DataChannelMessage dataChannelMessage) { - ByteBuffer buffer; DataChannel statusDataChannel = dataChannels.get("status"); if (statusDataChannel != null && dataChannelMessage != null) { try { - buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); From f34bc17b36708d422f30d467a24f9ab0c1e72e8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:14:58 +0100 Subject: [PATCH 10/16] Rewrite method to return early MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 08c6b8fcf..9474471fe 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -286,13 +286,15 @@ public class PeerConnectionWrapper { */ public void send(DataChannelMessage dataChannelMessage) { DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel != null && dataChannelMessage != null) { - try { - ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); - statusDataChannel.send(new DataChannel.Buffer(buffer, false)); - } catch (Exception e) { - Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); - } + if (statusDataChannel == null || dataChannelMessage == null) { + return; + } + + try { + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); + statusDataChannel.send(new DataChannel.Buffer(buffer, false)); + } catch (Exception e) { + Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); } } From 4eaa2f3e2cbdb0f4e64453314706eb17ab9fe8c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:15:46 +0100 Subject: [PATCH 11/16] Split condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 9474471fe..26f09d31e 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -285,8 +285,12 @@ public class PeerConnectionWrapper { * @param dataChannelMessage the message to send */ public void send(DataChannelMessage dataChannelMessage) { + if (dataChannelMessage == null) { + return; + } + DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null || dataChannelMessage == null) { + if (statusDataChannel == null) { return; } From f387e576285a6d211c714e15d2d4c5401a3e0a77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:29:22 +0100 Subject: [PATCH 12/16] Queue data channel messages sent when data channel is not open MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data channel messages can be sent only when the data channel is open. Otherwise the message is simply lost. Clients of the PeerConnectionWrapper do not need to be aware of that detail or keep track of whether the data channel was open already or not, so now data channel messages sent before the data channel is open are queued and sent once the data channel is opened. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 19 ++++++- .../talk/webrtc/PeerConnectionWrapperTest.kt | 50 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 26f09d31e..be55a6863 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -59,6 +59,7 @@ public class PeerConnectionWrapper { private String sessionId; private final MediaConstraints mediaConstraints; private final Map dataChannels = new HashMap<>(); + private final List pendingDataChannelMessages = new ArrayList<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -281,6 +282,13 @@ public class PeerConnectionWrapper { * "status" data channel subscriber connections will receive it on a data channel with a different label, as * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel * messages on it, independently of on which data channel they were originally sent. + *

+ * Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be + * queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will + * be received by other participants, as it is only known when the data channel of the publisher was opened, but + * not if the data channel of the subscribers was. However, in general this should be a concern only during the + * first seconds after a participant joins; after some time the subscriber connections should be established and + * their data channels open. * * @param dataChannelMessage the message to send */ @@ -290,7 +298,9 @@ public class PeerConnectionWrapper { } DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null) { + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + pendingDataChannelMessages.add(dataChannelMessage); + return; } @@ -398,6 +408,13 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) { + for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { + send(dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + if (dataChannel.state() == DataChannel.State.OPEN) { sendInitialMediaStatus(); } diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 450cc4fa8..61c5f5daa 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -145,6 +145,56 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) } + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannel() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + peerConnectionWrapper!!.send(DataChannelMessage("another-message-type")) + + Mockito.verify(mockedStatusDataChannel, never()).send(any()) + + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + statusDataChannelObserverArgumentCaptor.value.onStateChange() + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type"))) + ) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`( From 89a69327c8faf8ea3158ab8442ce7435c514c85a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:37:13 +0100 Subject: [PATCH 13/16] Add logs for sending data channel messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index be55a6863..7bc1f0469 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -299,16 +299,20 @@ public class PeerConnectionWrapper { DataChannel statusDataChannel = dataChannels.get("status"); if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId); + pendingDataChannelMessages.add(dataChannelMessage); return; } try { + Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId); + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { - Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); + Log.w(TAG, "Failed to send data channel message"); } } From bbeb85d8a39270a5e66454792e5e09f4c9213456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Mon, 9 Dec 2024 03:11:54 +0100 Subject: [PATCH 14/16] Store data channel label MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Getting the label is no longer possible once the data channel has been disposed. This will help to make the observer thread-safe. Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 7bc1f0469..08da1f726 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -400,9 +400,11 @@ public class PeerConnectionWrapper { private class DataChannelObserver implements DataChannel.Observer { private final DataChannel dataChannel; + private final String dataChannelLabel; public DataChannelObserver(DataChannel dataChannel) { this.dataChannel = Objects.requireNonNull(dataChannel); + this.dataChannelLabel = dataChannel.label(); } @Override @@ -412,7 +414,7 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { - if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) { + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { send(dataChannelMessage); } @@ -427,7 +429,7 @@ public class PeerConnectionWrapper { @Override public void onMessage(DataChannel.Buffer buffer) { if (buffer.binary) { - Log.d(TAG, "Received binary data channel message over " + dataChannel.label() + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId); return; } @@ -435,7 +437,7 @@ public class PeerConnectionWrapper { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannel.label() + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannelLabel + " " + sessionId); DataChannelMessage dataChannelMessage; try { From cb81bca72a4e281d4c920ffc9561493bd3621861 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Sun, 8 Dec 2024 05:23:11 +0100 Subject: [PATCH 15/16] Fix "removePeerConnection" not being thread-safe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding and disposing remote data channels is done from different threads; they are added from the WebRTC signaling thread when "onDataChannel" is called, while they can be disposed potentially from any thread when "removePeerConnection" is called. To prevent race conditions between them now both operations are synchronized. However, as "onDataChannel" belongs to an inner class it needs to use a synchronized statement with the outer class lock. This could still cause a race condition if the same data channel was added again; this should not happen, but it is handled just in case. Moreover, once a data channel is disposed it can be no longer used, and trying to call any of its methods throws an "IllegalStateException". Due to this, as sending can be also done potentially from any thread, it needs to be synchronized too with removing the peer connection. State changes on data channels as well as receiving messages are also done in the WebRTC signaling thread. State changes needs synchronization as well, although receiving messages should not, as it does not directly use the data channel (and it is assumed that using the buffers of a disposed data channel is safe). Nevertheless a little check (which in this case requires synchronization) was added to ignore the received messages if the peer connection was removed already. Finally, the synchronization added to "send" and "onStateChange" had the nice side effect of making the pending data channel messages thread-safe too, as before it could happen that a message was enqueued when the pending messages were being sent, which caused a "ConcurrentModificationException". Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 88 ++-- .../talk/webrtc/PeerConnectionWrapperTest.kt | 376 ++++++++++++++++++ 2 files changed, 440 insertions(+), 24 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 08da1f726..13bb5f2f7 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -235,7 +235,7 @@ public class PeerConnectionWrapper { return stream; } - public void removePeerConnection() { + public synchronized void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); for (DataChannel dataChannel: dataChannels.values()) { @@ -292,7 +292,7 @@ public class PeerConnectionWrapper { * * @param dataChannelMessage the message to send */ - public void send(DataChannelMessage dataChannelMessage) { + public synchronized void send(DataChannelMessage dataChannelMessage) { if (dataChannelMessage == null) { return; } @@ -414,20 +414,38 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { - if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { - for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { - send(dataChannelMessage); + synchronized (PeerConnectionWrapper.this) { + // The PeerConnection could have been removed in parallel even with the synchronization (as just after + // "onStateChange" was called "removePeerConnection" could have acquired the lock). + if (peerConnection == null) { + return; } - pendingDataChannelMessages.clear(); - } - if (dataChannel.state() == DataChannel.State.OPEN) { - sendInitialMediaStatus(); + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { + for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) { + send(dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + + if (dataChannel.state() == DataChannel.State.OPEN) { + sendInitialMediaStatus(); + } } } @Override public void onMessage(DataChannel.Buffer buffer) { + synchronized (PeerConnectionWrapper.this) { + // It is assumed that, even if its data channel was disposed, its buffers can be used while there is + // a reference to them, so it would not be necessary to check this from a thread-safety point of view. + // Nevertheless, if the remote peer connection was removed it would not make sense to notify the + // listeners anyway. + if (peerConnection == null) { + return; + } + } + if (buffer.binary) { Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId); return; @@ -557,23 +575,45 @@ public class PeerConnectionWrapper { @Override public void onDataChannel(DataChannel dataChannel) { - // Another data channel with the same label, no matter if the same instance or a different one, should not - // be added, but just in case. - DataChannel oldDataChannel = dataChannels.get(dataChannel.label()); - if (oldDataChannel == dataChannel) { - Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); + synchronized (PeerConnectionWrapper.this) { + // Another data channel with the same label, no matter if the same instance or a different one, should + // not be added, but this is handled just in case. + // Moreover, if it were possible that an already added data channel was added again there would be a + // potential race condition with "removePeerConnection", even with the synchronization, as it would + // be possible that "onDataChannel" was called, then "removePeerConnection" disposed the data + // channel, and then "onDataChannel" continued in the synchronized statements and tried to get the + // label, which would throw an exception due to the data channel having been disposed already. + String dataChannelLabel; + try { + dataChannelLabel = dataChannel.label(); + } catch (IllegalStateException e) { + // The data channel was disposed already, nothing to do. + return; + } - return; + DataChannel oldDataChannel = dataChannels.get(dataChannelLabel); + if (oldDataChannel == dataChannel) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); + + return; + } + + if (oldDataChannel != null) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); + + oldDataChannel.dispose(); + } + + // If the peer connection was removed in parallel dispose the data channel instead of adding it. + if (peerConnection == null) { + dataChannel.dispose(); + + return; + } + + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannels.put(dataChannel.label(), dataChannel); } - - if (oldDataChannel != null) { - Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); - - oldDataChannel.dispose(); - } - - dataChannel.registerObserver(new DataChannelObserver(dataChannel)); - dataChannels.put(dataChannel.label(), dataChannel); } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 61c5f5daa..f4f7e6443 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -19,15 +19,22 @@ import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.argThat import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito +import org.mockito.Mockito.atLeast +import org.mockito.Mockito.atMostOnce +import org.mockito.Mockito.doAnswer import org.mockito.Mockito.doNothing import org.mockito.Mockito.never +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.webrtc.DataChannel import org.webrtc.MediaConstraints import org.webrtc.PeerConnection import org.webrtc.PeerConnectionFactory import java.nio.ByteBuffer import java.util.HashMap +import kotlin.concurrent.thread +@Suppress("LongMethod", "TooGenericExceptionCaught") class PeerConnectionWrapperTest { private var peerConnectionWrapper: PeerConnectionWrapper? = null @@ -36,6 +43,23 @@ class PeerConnectionWrapperTest { private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null private var mockedSignalingMessageSender: SignalingMessageSender? = null + /** + * Helper answer for DataChannel methods. + */ + private class ReturnValueOrThrowIfDisposed(val value: T) : + Answer { + override fun answer(currentInvocation: InvocationOnMock): T { + if (Mockito.mockingDetails(currentInvocation.mock).invocations.find { + it!!.method.name === "dispose" + } !== null + ) { + throw IllegalStateException("DataChannel has been disposed") + } + + return value + } + } + /** * Helper matcher for DataChannelMessages. */ @@ -195,6 +219,83 @@ class PeerConnectionWrapperTest { ) } + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannelWithDifferentThreads() { + // A brute force approach is used to test race conditions between different threads just repeating the test + // several times. Due to this the test passing could be a false positive, as it could have been a matter of + // luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with + // that number of reruns, it fails when it should). + for (i in 1..1000) { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel) + .registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelMessageCount = 5 + + val sendThread = thread { + for (j in 1..dataChannelMessageCount) { + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type-$j")) + } + } + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for ConcurrentModificationExceptions when iterating over the data channel messages). + var exceptionOnStateChange: Exception? = null + + val openDataChannelThread = thread { + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + + try { + statusDataChannelObserverArgumentCaptor.value.onStateChange() + } catch (e: Exception) { + exceptionOnStateChange = e + } + } + + sendThread.join() + openDataChannelThread.join() + + if (exceptionOnStateChange !== null) { + throw exceptionOnStateChange!! + } + + for (j in 1..dataChannelMessageCount) { + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j"))) + ) + } + } + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`( @@ -381,4 +482,279 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedStatusDataChannel).dispose() Mockito.verify(mockedRandomIdDataChannel).dispose() } + + @Test + fun testRemovePeerConnectionWhileAddingRemoteDataChannelsWithDifferentThreads() { + // A brute force approach is used to test race conditions between different threads just repeating the test + // several times. Due to this the test passing could be a false positive, as it could have been a matter of + // luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with + // that number of reruns, it fails when it should). + for (i in 1..1000) { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer( + ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN) + ) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelCount = 5 + + val mockedRandomIdDataChannels: MutableList = ArrayList() + val dataChannelObservers: MutableList = ArrayList() + for (j in 0.. + if (Mockito.mockingDetails(invocation.mock).invocations.find { + it!!.method.name === "dispose" + } !== null + ) { + throw IllegalStateException("DataChannel has been disposed") + } + + dataChannelObservers[j] = invocation.getArgument(0, DataChannel.Observer::class.java) + + null + }.`when`(mockedRandomIdDataChannels[j]).registerObserver(any()) + } + + val onDataChannelThread = thread { + // Add again "status" data channel to test that it is correctly disposed also in that case (which + // should not happen anyway even if it was added by the remote peer, but just in case) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedStatusDataChannel) + + for (j in 0.. = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()) + .thenAnswer(ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN)) + Mockito.`when`(mockedStatusDataChannel.send(any())).thenAnswer(ReturnValueOrThrowIfDisposed(true)) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelMessageCount = 5 + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for IllegalStateExceptions when using a disposed data channel). + var exceptionSend: Exception? = null + + val sendThread = thread { + try { + for (j in 0.. = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer( + ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN) + ) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel) + .registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for IllegalStateExceptions when using a disposed data channel). + var exceptionOnMessage: Exception? = null + + val onMessageThread = thread { + try { + // It is assumed that, even if its data channel was disposed, its buffers can be used while there + // is a reference to them, so no special mock behaviour is added to throw an exception in that case. + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + } catch (e: Exception) { + exceptionOnMessage = e + } + } + + val removePeerConnectionThread = thread { + peerConnectionWrapper!!.removePeerConnection() + } + + onMessageThread.join() + removePeerConnectionThread.join() + + if (exceptionOnMessage !== null) { + throw exceptionOnMessage!! + } + + Mockito.verify(mockedStatusDataChannel).registerObserver(any()) + Mockito.verify(mockedStatusDataChannel).dispose() + Mockito.verify(mockedStatusDataChannel, atLeast(0)).label() + Mockito.verify(mockedStatusDataChannel, atLeast(0)).state() + Mockito.verifyNoMoreInteractions(mockedStatusDataChannel) + Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOn() + Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + } } From f4380f08339b3559db258dfc8839363c792d9ba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Mon, 9 Dec 2024 18:36:22 +0100 Subject: [PATCH 16/16] Fix "send" not respecting order of pending messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the data channel is not open yet data channel messages are queued and then sent once opened. "onStateChange" is called from the WebRTC signaling thread, while "send" can be called potentially from any thread, so to send the data channel messages in the same order that they were added new messages need to be enqueued until all the pending messages have been sent. Otherwise, even if there is synchronization already, it could happen that "onStateChange" was called but, before getting the lock, "send" gets it and sends the new message before the pending messages were sent. Signed-off-by: Daniel Calviño Sánchez --- .../com/nextcloud/talk/webrtc/PeerConnectionWrapper.java | 9 +++++++-- .../nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt | 5 ++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 13bb5f2f7..9f2a90f85 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -298,7 +298,8 @@ public class PeerConnectionWrapper { } DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN || + !pendingDataChannelMessages.isEmpty()) { Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId); pendingDataChannelMessages.add(dataChannelMessage); @@ -306,6 +307,10 @@ public class PeerConnectionWrapper { return; } + sendWithoutQueuing(statusDataChannel, dataChannelMessage); + } + + private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) { try { Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId); @@ -423,7 +428,7 @@ public class PeerConnectionWrapper { if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) { - send(dataChannelMessage); + sendWithoutQueuing(dataChannel, dataChannelMessage); } pendingDataChannelMessages.clear(); } diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index f4f7e6443..b6dd40962 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -23,6 +23,7 @@ import org.mockito.Mockito.atLeast import org.mockito.Mockito.atMostOnce import org.mockito.Mockito.doAnswer import org.mockito.Mockito.doNothing +import org.mockito.Mockito.inOrder import org.mockito.Mockito.never import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -288,8 +289,10 @@ class PeerConnectionWrapperTest { throw exceptionOnStateChange!! } + val inOrder = inOrder(mockedStatusDataChannel) + for (j in 1..dataChannelMessageCount) { - Mockito.verify(mockedStatusDataChannel).send( + inOrder.verify(mockedStatusDataChannel).send( argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j"))) ) }