diff --git a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt index 9220952c0..3bcc7611b 100644 --- a/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt +++ b/app/src/main/java/com/nextcloud/talk/activities/CallActivity.kt @@ -1174,12 +1174,12 @@ class CallActivity : CallBaseActivity() { if (isConnectionEstablished && othersInCall) { if (!hasMCU) { for (peerConnectionWrapper in peerConnectionWrapperList) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage)) + peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage)) } } else { for (peerConnectionWrapper in peerConnectionWrapperList) { if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage)) + peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage)) break } } @@ -1370,12 +1370,12 @@ class CallActivity : CallBaseActivity() { if (isConnectionEstablished) { if (!hasMCU) { for (peerConnectionWrapper in peerConnectionWrapperList) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(message)) + peerConnectionWrapper.send(DataChannelMessage(message)) } } else { for (peerConnectionWrapper in peerConnectionWrapperList) { if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) { - peerConnectionWrapper.sendChannelData(DataChannelMessage(message)) + peerConnectionWrapper.send(DataChannelMessage(message)) break } } @@ -2563,7 +2563,7 @@ class CallActivity : CallBaseActivity() { } override fun onNext(aLong: Long) { - peerConnectionWrapper.sendChannelData(dataChannelMessage) + peerConnectionWrapper.send(dataChannelMessage) } override fun onError(e: Throwable) { diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index fe44e797d..9f2a90f85 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -7,11 +7,9 @@ */ package com.nextcloud.talk.webrtc; -import android.content.Context; import android.util.Log; import com.bluelinelabs.logansquare.LoganSquare; -import com.nextcloud.talk.application.NextcloudTalkApplication; import com.nextcloud.talk.models.json.signaling.DataChannelMessage; import com.nextcloud.talk.models.json.signaling.NCIceCandidate; import com.nextcloud.talk.models.json.signaling.NCMessagePayload; @@ -36,21 +34,15 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import javax.inject.Inject; - import androidx.annotation.Nullable; -import autodagger.AutoInjector; -@AutoInjector(NextcloudTalkApplication.class) public class PeerConnectionWrapper { - @Inject - Context context; - private static final String TAG = PeerConnectionWrapper.class.getCanonicalName(); private final SignalingMessageReceiver signalingMessageReceiver; @@ -66,7 +58,8 @@ public class PeerConnectionWrapper { private PeerConnection peerConnection; private String sessionId; private final MediaConstraints mediaConstraints; - private DataChannel dataChannel; + private final Map dataChannels = new HashMap<>(); + private final List pendingDataChannelMessages = new ArrayList<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -81,6 +74,9 @@ public class PeerConnectionWrapper { /** * Listener for data channel messages. *

+ * Messages might have been received on any data channel, independently of its label or whether it was open by the + * local or the remote peer. + *

* The messages are bound to a specific peer connection, so each listener is expected to handle messages only for * a single peer connection. *

@@ -117,9 +113,6 @@ public class PeerConnectionWrapper { boolean isMCUPublisher, boolean hasMCU, String videoStreamType, SignalingMessageReceiver signalingMessageReceiver, SignalingMessageSender signalingMessageSender) { - - Objects.requireNonNull(NextcloudTalkApplication.Companion.getSharedApplication()).getComponentApplication().inject(this); - this.localStream = localStream; this.videoStreamType = videoStreamType; @@ -153,8 +146,11 @@ public class PeerConnectionWrapper { if (hasMCU || hasInitiated) { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; - dataChannel = peerConnection.createDataChannel("status", init); - dataChannel.registerObserver(new DataChannelObserver()); + + DataChannel statusDataChannel = peerConnection.createDataChannel("status", init); + statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel)); + dataChannels.put("status", statusDataChannel); + if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -239,16 +235,15 @@ public class PeerConnectionWrapper { return stream; } - public void removePeerConnection() { + public synchronized void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); - if (dataChannel != null) { + for (DataChannel dataChannel: dataChannels.values()) { + Log.d(TAG, "Disposed DataChannel " + dataChannel.label()); + dataChannel.dispose(); - dataChannel = null; - Log.d(TAG, "Disposed DataChannel"); - } else { - Log.d(TAG, "DataChannel is null."); } + dataChannels.clear(); if (peerConnection != null) { peerConnection.close(); @@ -278,15 +273,51 @@ public class PeerConnectionWrapper { } } - public void sendChannelData(DataChannelMessage dataChannelMessage) { - ByteBuffer buffer; - if (dataChannel != null && dataChannelMessage != null) { - try { - buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); - dataChannel.send(new DataChannel.Buffer(buffer, false)); - } catch (Exception e) { - Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); - } + /** + * Sends a data channel message. + *

+ * Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used, + * messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel; + * messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the + * "status" data channel subscriber connections will receive it on a data channel with a different label, as + * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel + * messages on it, independently of on which data channel they were originally sent. + *

+ * Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be + * queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will + * be received by other participants, as it is only known when the data channel of the publisher was opened, but + * not if the data channel of the subscribers was. However, in general this should be a concern only during the + * first seconds after a participant joins; after some time the subscriber connections should be established and + * their data channels open. + * + * @param dataChannelMessage the message to send + */ + public synchronized void send(DataChannelMessage dataChannelMessage) { + if (dataChannelMessage == null) { + return; + } + + DataChannel statusDataChannel = dataChannels.get("status"); + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN || + !pendingDataChannelMessages.isEmpty()) { + Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId); + + pendingDataChannelMessages.add(dataChannelMessage); + + return; + } + + sendWithoutQueuing(statusDataChannel, dataChannelMessage); + } + + private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) { + try { + Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId); + + ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); + statusDataChannel.send(new DataChannel.Buffer(buffer, false)); + } catch (Exception e) { + Log.w(TAG, "Failed to send data channel message"); } } @@ -301,15 +332,15 @@ public class PeerConnectionWrapper { private void sendInitialMediaStatus() { if (localStream != null) { if (localStream.videoTracks.size() == 1 && localStream.videoTracks.get(0).enabled()) { - sendChannelData(new DataChannelMessage("videoOn")); + send(new DataChannelMessage("videoOn")); } else { - sendChannelData(new DataChannelMessage("videoOff")); + send(new DataChannelMessage("videoOff")); } if (localStream.audioTracks.size() == 1 && localStream.audioTracks.get(0).enabled()) { - sendChannelData(new DataChannelMessage("audioOn")); + send(new DataChannelMessage("audioOn")); } else { - sendChannelData(new DataChannelMessage("audioOff")); + send(new DataChannelMessage("audioOff")); } } } @@ -373,6 +404,14 @@ public class PeerConnectionWrapper { private class DataChannelObserver implements DataChannel.Observer { + private final DataChannel dataChannel; + private final String dataChannelLabel; + + public DataChannelObserver(DataChannel dataChannel) { + this.dataChannel = Objects.requireNonNull(dataChannel); + this.dataChannelLabel = dataChannel.label(); + } + @Override public void onBufferedAmountChange(long l) { @@ -380,16 +419,40 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { - if (dataChannel != null && - dataChannel.state() == DataChannel.State.OPEN) { - sendInitialMediaStatus(); + synchronized (PeerConnectionWrapper.this) { + // The PeerConnection could have been removed in parallel even with the synchronization (as just after + // "onStateChange" was called "removePeerConnection" could have acquired the lock). + if (peerConnection == null) { + return; + } + + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) { + for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) { + sendWithoutQueuing(dataChannel, dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + + if (dataChannel.state() == DataChannel.State.OPEN) { + sendInitialMediaStatus(); + } } } @Override public void onMessage(DataChannel.Buffer buffer) { + synchronized (PeerConnectionWrapper.this) { + // It is assumed that, even if its data channel was disposed, its buffers can be used while there is + // a reference to them, so it would not be necessary to check this from a thread-safety point of view. + // Nevertheless, if the remote peer connection was removed it would not make sense to notify the + // listeners anyway. + if (peerConnection == null) { + return; + } + } + if (buffer.binary) { - Log.d(TAG, "Received binary msg over " + TAG + " " + sessionId); + Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId); return; } @@ -397,7 +460,7 @@ public class PeerConnectionWrapper { final byte[] bytes = new byte[data.capacity()]; data.get(bytes); String strData = new String(bytes); - Log.d(TAG, "Got msg: " + strData + " over " + TAG + " " + sessionId); + Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannelLabel + " " + sessionId); DataChannelMessage dataChannelMessage; try { @@ -517,12 +580,45 @@ public class PeerConnectionWrapper { @Override public void onDataChannel(DataChannel dataChannel) { - if (PeerConnectionWrapper.this.dataChannel != null) { - Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label() - + " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label()); + synchronized (PeerConnectionWrapper.this) { + // Another data channel with the same label, no matter if the same instance or a different one, should + // not be added, but this is handled just in case. + // Moreover, if it were possible that an already added data channel was added again there would be a + // potential race condition with "removePeerConnection", even with the synchronization, as it would + // be possible that "onDataChannel" was called, then "removePeerConnection" disposed the data + // channel, and then "onDataChannel" continued in the synchronized statements and tried to get the + // label, which would throw an exception due to the data channel having been disposed already. + String dataChannelLabel; + try { + dataChannelLabel = dataChannel.label(); + } catch (IllegalStateException e) { + // The data channel was disposed already, nothing to do. + return; + } + + DataChannel oldDataChannel = dataChannels.get(dataChannelLabel); + if (oldDataChannel == dataChannel) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again"); + + return; + } + + if (oldDataChannel != null) { + Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists"); + + oldDataChannel.dispose(); + } + + // If the peer connection was removed in parallel dispose the data channel instead of adding it. + if (peerConnection == null) { + dataChannel.dispose(); + + return; + } + + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannels.put(dataChannel.label(), dataChannel); } - PeerConnectionWrapper.this.dataChannel = dataChannel; - PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver()); } @Override diff --git a/app/src/test/java/android/util/Log.java b/app/src/test/java/android/util/Log.java new file mode 100644 index 000000000..7090f73cd --- /dev/null +++ b/app/src/test/java/android/util/Log.java @@ -0,0 +1,51 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package android.util; + +/** + * Dummy implementation of android.util.Log to be used in unit tests. + *

+ * The Android Gradle plugin provides a library with the APIs of the Android framework that throws an exception if any + * of them are called. This class is loaded before that library and therefore becomes the implementation used during the + * tests, simply printing the messages to the system console. + */ +public class Log { + + public static int d(String tag, String msg) { + System.out.println("DEBUG: " + tag + ": " + msg); + + return 1; + } + + public static int e(String tag, String msg) { + System.out.println("ERROR: " + tag + ": " + msg); + + return 1; + } + + public static int i(String tag, String msg) { + System.out.println("INFO: " + tag + ": " + msg); + + return 1; + } + + public static boolean isLoggable(String tag, int level) { + return true; + } + + public static int v(String tag, String msg) { + System.out.println("VERBOSE: " + tag + ": " + msg); + + return 1; + } + + public static int w(String tag, String msg) { + System.out.println("WARN: " + tag + ": " + msg); + + return 1; + } +} diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt new file mode 100644 index 000000000..b6dd40962 --- /dev/null +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -0,0 +1,763 @@ +/* + * Nextcloud Talk - Android Client + * + * SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez + * SPDX-License-Identifier: GPL-3.0-or-later + */ +package com.nextcloud.talk.webrtc + +import com.bluelinelabs.logansquare.LoganSquare +import com.nextcloud.talk.models.json.signaling.DataChannelMessage +import com.nextcloud.talk.signaling.SignalingMessageReceiver +import com.nextcloud.talk.signaling.SignalingMessageSender +import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListener +import org.junit.Before +import org.junit.Test +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatcher +import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.argThat +import org.mockito.ArgumentMatchers.eq +import org.mockito.Mockito +import org.mockito.Mockito.atLeast +import org.mockito.Mockito.atMostOnce +import org.mockito.Mockito.doAnswer +import org.mockito.Mockito.doNothing +import org.mockito.Mockito.inOrder +import org.mockito.Mockito.never +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.webrtc.DataChannel +import org.webrtc.MediaConstraints +import org.webrtc.PeerConnection +import org.webrtc.PeerConnectionFactory +import java.nio.ByteBuffer +import java.util.HashMap +import kotlin.concurrent.thread + +@Suppress("LongMethod", "TooGenericExceptionCaught") +class PeerConnectionWrapperTest { + + private var peerConnectionWrapper: PeerConnectionWrapper? = null + private var mockedPeerConnection: PeerConnection? = null + private var mockedPeerConnectionFactory: PeerConnectionFactory? = null + private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null + private var mockedSignalingMessageSender: SignalingMessageSender? = null + + /** + * Helper answer for DataChannel methods. + */ + private class ReturnValueOrThrowIfDisposed(val value: T) : + Answer { + override fun answer(currentInvocation: InvocationOnMock): T { + if (Mockito.mockingDetails(currentInvocation.mock).invocations.find { + it!!.method.name === "dispose" + } !== null + ) { + throw IllegalStateException("DataChannel has been disposed") + } + + return value + } + } + + /** + * Helper matcher for DataChannelMessages. + */ + private inner class MatchesDataChannelMessage( + private val expectedDataChannelMessage: DataChannelMessage + ) : ArgumentMatcher { + override fun matches(buffer: DataChannel.Buffer): Boolean { + // DataChannel.Buffer does not implement "equals", so the comparison needs to be done on the ByteBuffer + // instead. + return dataChannelMessageToBuffer(expectedDataChannelMessage).data.equals(buffer.data) + } + } + + private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { + return DataChannel.Buffer( + ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), + false + ) + } + + @Before + fun setUp() { + mockedPeerConnection = Mockito.mock(PeerConnection::class.java) + mockedPeerConnectionFactory = Mockito.mock(PeerConnectionFactory::class.java) + mockedSignalingMessageReceiver = Mockito.mock(SignalingMessageReceiver::class.java) + mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) + } + + @Test + fun testSendDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + } + + @Test + fun testSendDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) + } + + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannel() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + peerConnectionWrapper!!.send(DataChannelMessage("another-message-type")) + + Mockito.verify(mockedStatusDataChannel, never()).send(any()) + + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + statusDataChannelObserverArgumentCaptor.value.onStateChange() + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type"))) + ) + } + + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannelWithDifferentThreads() { + // A brute force approach is used to test race conditions between different threads just repeating the test + // several times. Due to this the test passing could be a false positive, as it could have been a matter of + // luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with + // that number of reruns, it fails when it should). + for (i in 1..1000) { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel) + .registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelMessageCount = 5 + + val sendThread = thread { + for (j in 1..dataChannelMessageCount) { + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type-$j")) + } + } + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for ConcurrentModificationExceptions when iterating over the data channel messages). + var exceptionOnStateChange: Exception? = null + + val openDataChannelThread = thread { + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + + try { + statusDataChannelObserverArgumentCaptor.value.onStateChange() + } catch (e: Exception) { + exceptionOnStateChange = e + } + } + + sendThread.join() + openDataChannelThread.join() + + if (exceptionOnStateChange !== null) { + throw exceptionOnStateChange!! + } + + val inOrder = inOrder(mockedStatusDataChannel) + + for (j in 1..dataChannelMessageCount) { + inOrder.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j"))) + ) + } + } + } + + @Test + fun testReceiveDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + // The payload must be a map to be able to serialize it and, therefore, generate the data that would have been + // received from another participant, so it is not possible to test receiving the nick as a String payload. + val payloadMap = HashMap() + payloadMap["name"] = "the-nick-in-map" + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("nickChanged", null, payloadMap)) + ) + + Mockito.verify(mockedDataChannelMessageListener).onNickChanged("the-nick-in-map") + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("videoOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onVideoOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("videoOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onVideoOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + + @Test + fun testReceiveDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val randomIdDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + doNothing().`when`(mockedRandomIdDataChannel).registerObserver( + randomIdDataChannelObserverArgumentCaptor.capture() + ) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOn() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + + randomIdDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + + Mockito.verify(mockedDataChannelMessageListener).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + + @Test + fun testRemovePeerConnectionWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.removePeerConnection() + + Mockito.verify(mockedStatusDataChannel).dispose() + Mockito.verify(mockedRandomIdDataChannel).dispose() + } + + @Test + fun testRemovePeerConnectionWhileAddingRemoteDataChannelsWithDifferentThreads() { + // A brute force approach is used to test race conditions between different threads just repeating the test + // several times. Due to this the test passing could be a false positive, as it could have been a matter of + // luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with + // that number of reruns, it fails when it should). + for (i in 1..1000) { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer( + ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN) + ) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelCount = 5 + + val mockedRandomIdDataChannels: MutableList = ArrayList() + val dataChannelObservers: MutableList = ArrayList() + for (j in 0.. + if (Mockito.mockingDetails(invocation.mock).invocations.find { + it!!.method.name === "dispose" + } !== null + ) { + throw IllegalStateException("DataChannel has been disposed") + } + + dataChannelObservers[j] = invocation.getArgument(0, DataChannel.Observer::class.java) + + null + }.`when`(mockedRandomIdDataChannels[j]).registerObserver(any()) + } + + val onDataChannelThread = thread { + // Add again "status" data channel to test that it is correctly disposed also in that case (which + // should not happen anyway even if it was added by the remote peer, but just in case) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedStatusDataChannel) + + for (j in 0.. = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()) + .thenAnswer(ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN)) + Mockito.`when`(mockedStatusDataChannel.send(any())).thenAnswer(ReturnValueOrThrowIfDisposed(true)) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val dataChannelMessageCount = 5 + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for IllegalStateExceptions when using a disposed data channel). + var exceptionSend: Exception? = null + + val sendThread = thread { + try { + for (j in 0.. = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status")) + Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer( + ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN) + ) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel) + .registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java) + peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener) + + // Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done + // (for example, for IllegalStateExceptions when using a disposed data channel). + var exceptionOnMessage: Exception? = null + + val onMessageThread = thread { + try { + // It is assumed that, even if its data channel was disposed, its buffers can be used while there + // is a reference to them, so no special mock behaviour is added to throw an exception in that case. + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOn")) + ) + + statusDataChannelObserverArgumentCaptor.value.onMessage( + dataChannelMessageToBuffer(DataChannelMessage("audioOff")) + ) + } catch (e: Exception) { + exceptionOnMessage = e + } + } + + val removePeerConnectionThread = thread { + peerConnectionWrapper!!.removePeerConnection() + } + + onMessageThread.join() + removePeerConnectionThread.join() + + if (exceptionOnMessage !== null) { + throw exceptionOnMessage!! + } + + Mockito.verify(mockedStatusDataChannel).registerObserver(any()) + Mockito.verify(mockedStatusDataChannel).dispose() + Mockito.verify(mockedStatusDataChannel, atLeast(0)).label() + Mockito.verify(mockedStatusDataChannel, atLeast(0)).state() + Mockito.verifyNoMoreInteractions(mockedStatusDataChannel) + Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOn() + Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOff() + Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener) + } + } +}