Merge pull request #4572 from nextcloud/backport/4536/stable-20.1

[stable-20.1] Improve handling of data channels
This commit is contained in:
Marcel Hibbe 2025-01-03 12:47:25 +01:00 committed by GitHub
commit 958d4c30a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 959 additions and 49 deletions

View File

@ -1174,12 +1174,12 @@ class CallActivity : CallBaseActivity() {
if (isConnectionEstablished && othersInCall) {
if (!hasMCU) {
for (peerConnectionWrapper in peerConnectionWrapperList) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage))
peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage))
}
} else {
for (peerConnectionWrapper in peerConnectionWrapperList) {
if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(isSpeakingMessage))
peerConnectionWrapper.send(DataChannelMessage(isSpeakingMessage))
break
}
}
@ -1370,12 +1370,12 @@ class CallActivity : CallBaseActivity() {
if (isConnectionEstablished) {
if (!hasMCU) {
for (peerConnectionWrapper in peerConnectionWrapperList) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(message))
peerConnectionWrapper.send(DataChannelMessage(message))
}
} else {
for (peerConnectionWrapper in peerConnectionWrapperList) {
if (peerConnectionWrapper.sessionId == webSocketClient!!.sessionId) {
peerConnectionWrapper.sendChannelData(DataChannelMessage(message))
peerConnectionWrapper.send(DataChannelMessage(message))
break
}
}
@ -2563,7 +2563,7 @@ class CallActivity : CallBaseActivity() {
}
override fun onNext(aLong: Long) {
peerConnectionWrapper.sendChannelData(dataChannelMessage)
peerConnectionWrapper.send(dataChannelMessage)
}
override fun onError(e: Throwable) {

View File

@ -7,11 +7,9 @@
*/
package com.nextcloud.talk.webrtc;
import android.content.Context;
import android.util.Log;
import com.bluelinelabs.logansquare.LoganSquare;
import com.nextcloud.talk.application.NextcloudTalkApplication;
import com.nextcloud.talk.models.json.signaling.DataChannelMessage;
import com.nextcloud.talk.models.json.signaling.NCIceCandidate;
import com.nextcloud.talk.models.json.signaling.NCMessagePayload;
@ -36,21 +34,15 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.inject.Inject;
import androidx.annotation.Nullable;
import autodagger.AutoInjector;
@AutoInjector(NextcloudTalkApplication.class)
public class PeerConnectionWrapper {
@Inject
Context context;
private static final String TAG = PeerConnectionWrapper.class.getCanonicalName();
private final SignalingMessageReceiver signalingMessageReceiver;
@ -66,7 +58,8 @@ public class PeerConnectionWrapper {
private PeerConnection peerConnection;
private String sessionId;
private final MediaConstraints mediaConstraints;
private DataChannel dataChannel;
private final Map<String, DataChannel> dataChannels = new HashMap<>();
private final List<DataChannelMessage> pendingDataChannelMessages = new ArrayList<>();
private final SdpObserver sdpObserver;
private final boolean hasInitiated;
@ -81,6 +74,9 @@ public class PeerConnectionWrapper {
/**
* Listener for data channel messages.
* <p>
* Messages might have been received on any data channel, independently of its label or whether it was open by the
* local or the remote peer.
* <p>
* The messages are bound to a specific peer connection, so each listener is expected to handle messages only for
* a single peer connection.
* <p>
@ -117,9 +113,6 @@ public class PeerConnectionWrapper {
boolean isMCUPublisher, boolean hasMCU, String videoStreamType,
SignalingMessageReceiver signalingMessageReceiver,
SignalingMessageSender signalingMessageSender) {
Objects.requireNonNull(NextcloudTalkApplication.Companion.getSharedApplication()).getComponentApplication().inject(this);
this.localStream = localStream;
this.videoStreamType = videoStreamType;
@ -153,8 +146,11 @@ public class PeerConnectionWrapper {
if (hasMCU || hasInitiated) {
DataChannel.Init init = new DataChannel.Init();
init.negotiated = false;
dataChannel = peerConnection.createDataChannel("status", init);
dataChannel.registerObserver(new DataChannelObserver());
DataChannel statusDataChannel = peerConnection.createDataChannel("status", init);
statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel));
dataChannels.put("status", statusDataChannel);
if (isMCUPublisher) {
peerConnection.createOffer(sdpObserver, mediaConstraints);
} else if (hasMCU && "video".equals(this.videoStreamType)) {
@ -239,16 +235,15 @@ public class PeerConnectionWrapper {
return stream;
}
public void removePeerConnection() {
public synchronized void removePeerConnection() {
signalingMessageReceiver.removeListener(webRtcMessageListener);
if (dataChannel != null) {
for (DataChannel dataChannel: dataChannels.values()) {
Log.d(TAG, "Disposed DataChannel " + dataChannel.label());
dataChannel.dispose();
dataChannel = null;
Log.d(TAG, "Disposed DataChannel");
} else {
Log.d(TAG, "DataChannel is null.");
}
dataChannels.clear();
if (peerConnection != null) {
peerConnection.close();
@ -278,15 +273,51 @@ public class PeerConnectionWrapper {
}
}
public void sendChannelData(DataChannelMessage dataChannelMessage) {
ByteBuffer buffer;
if (dataChannel != null && dataChannelMessage != null) {
try {
buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes());
dataChannel.send(new DataChannel.Buffer(buffer, false));
} catch (Exception e) {
Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage);
}
/**
* Sends a data channel message.
* <p>
* Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used,
* messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel;
* messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the
* "status" data channel subscriber connections will receive it on a data channel with a different label, as
* Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel
* messages on it, independently of on which data channel they were originally sent.
* <p>
* Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be
* queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will
* be received by other participants, as it is only known when the data channel of the publisher was opened, but
* not if the data channel of the subscribers was. However, in general this should be a concern only during the
* first seconds after a participant joins; after some time the subscriber connections should be established and
* their data channels open.
*
* @param dataChannelMessage the message to send
*/
public synchronized void send(DataChannelMessage dataChannelMessage) {
if (dataChannelMessage == null) {
return;
}
DataChannel statusDataChannel = dataChannels.get("status");
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN ||
!pendingDataChannelMessages.isEmpty()) {
Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId);
pendingDataChannelMessages.add(dataChannelMessage);
return;
}
sendWithoutQueuing(statusDataChannel, dataChannelMessage);
}
private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) {
try {
Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId);
ByteBuffer buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes());
statusDataChannel.send(new DataChannel.Buffer(buffer, false));
} catch (Exception e) {
Log.w(TAG, "Failed to send data channel message");
}
}
@ -301,15 +332,15 @@ public class PeerConnectionWrapper {
private void sendInitialMediaStatus() {
if (localStream != null) {
if (localStream.videoTracks.size() == 1 && localStream.videoTracks.get(0).enabled()) {
sendChannelData(new DataChannelMessage("videoOn"));
send(new DataChannelMessage("videoOn"));
} else {
sendChannelData(new DataChannelMessage("videoOff"));
send(new DataChannelMessage("videoOff"));
}
if (localStream.audioTracks.size() == 1 && localStream.audioTracks.get(0).enabled()) {
sendChannelData(new DataChannelMessage("audioOn"));
send(new DataChannelMessage("audioOn"));
} else {
sendChannelData(new DataChannelMessage("audioOff"));
send(new DataChannelMessage("audioOff"));
}
}
}
@ -373,6 +404,14 @@ public class PeerConnectionWrapper {
private class DataChannelObserver implements DataChannel.Observer {
private final DataChannel dataChannel;
private final String dataChannelLabel;
public DataChannelObserver(DataChannel dataChannel) {
this.dataChannel = Objects.requireNonNull(dataChannel);
this.dataChannelLabel = dataChannel.label();
}
@Override
public void onBufferedAmountChange(long l) {
@ -380,16 +419,40 @@ public class PeerConnectionWrapper {
@Override
public void onStateChange() {
if (dataChannel != null &&
dataChannel.state() == DataChannel.State.OPEN) {
sendInitialMediaStatus();
synchronized (PeerConnectionWrapper.this) {
// The PeerConnection could have been removed in parallel even with the synchronization (as just after
// "onStateChange" was called "removePeerConnection" could have acquired the lock).
if (peerConnection == null) {
return;
}
if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) {
for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) {
sendWithoutQueuing(dataChannel, dataChannelMessage);
}
pendingDataChannelMessages.clear();
}
if (dataChannel.state() == DataChannel.State.OPEN) {
sendInitialMediaStatus();
}
}
}
@Override
public void onMessage(DataChannel.Buffer buffer) {
synchronized (PeerConnectionWrapper.this) {
// It is assumed that, even if its data channel was disposed, its buffers can be used while there is
// a reference to them, so it would not be necessary to check this from a thread-safety point of view.
// Nevertheless, if the remote peer connection was removed it would not make sense to notify the
// listeners anyway.
if (peerConnection == null) {
return;
}
}
if (buffer.binary) {
Log.d(TAG, "Received binary msg over " + TAG + " " + sessionId);
Log.d(TAG, "Received binary data channel message over " + dataChannelLabel + " " + sessionId);
return;
}
@ -397,7 +460,7 @@ public class PeerConnectionWrapper {
final byte[] bytes = new byte[data.capacity()];
data.get(bytes);
String strData = new String(bytes);
Log.d(TAG, "Got msg: " + strData + " over " + TAG + " " + sessionId);
Log.d(TAG, "Received data channel message (" + strData + ") over " + dataChannelLabel + " " + sessionId);
DataChannelMessage dataChannelMessage;
try {
@ -517,12 +580,45 @@ public class PeerConnectionWrapper {
@Override
public void onDataChannel(DataChannel dataChannel) {
if (PeerConnectionWrapper.this.dataChannel != null) {
Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label()
+ " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label());
synchronized (PeerConnectionWrapper.this) {
// Another data channel with the same label, no matter if the same instance or a different one, should
// not be added, but this is handled just in case.
// Moreover, if it were possible that an already added data channel was added again there would be a
// potential race condition with "removePeerConnection", even with the synchronization, as it would
// be possible that "onDataChannel" was called, then "removePeerConnection" disposed the data
// channel, and then "onDataChannel" continued in the synchronized statements and tried to get the
// label, which would throw an exception due to the data channel having been disposed already.
String dataChannelLabel;
try {
dataChannelLabel = dataChannel.label();
} catch (IllegalStateException e) {
// The data channel was disposed already, nothing to do.
return;
}
DataChannel oldDataChannel = dataChannels.get(dataChannelLabel);
if (oldDataChannel == dataChannel) {
Log.w(TAG, "Data channel with label " + dataChannel.label() + " added again");
return;
}
if (oldDataChannel != null) {
Log.w(TAG, "Data channel with label " + dataChannel.label() + " exists");
oldDataChannel.dispose();
}
// If the peer connection was removed in parallel dispose the data channel instead of adding it.
if (peerConnection == null) {
dataChannel.dispose();
return;
}
dataChannel.registerObserver(new DataChannelObserver(dataChannel));
dataChannels.put(dataChannel.label(), dataChannel);
}
PeerConnectionWrapper.this.dataChannel = dataChannel;
PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver());
}
@Override

View File

@ -0,0 +1,51 @@
/*
* Nextcloud Talk - Android Client
*
* SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez <danxuliu@gmail.com>
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package android.util;
/**
* Dummy implementation of android.util.Log to be used in unit tests.
* <p>
* The Android Gradle plugin provides a library with the APIs of the Android framework that throws an exception if any
* of them are called. This class is loaded before that library and therefore becomes the implementation used during the
* tests, simply printing the messages to the system console.
*/
public class Log {
public static int d(String tag, String msg) {
System.out.println("DEBUG: " + tag + ": " + msg);
return 1;
}
public static int e(String tag, String msg) {
System.out.println("ERROR: " + tag + ": " + msg);
return 1;
}
public static int i(String tag, String msg) {
System.out.println("INFO: " + tag + ": " + msg);
return 1;
}
public static boolean isLoggable(String tag, int level) {
return true;
}
public static int v(String tag, String msg) {
System.out.println("VERBOSE: " + tag + ": " + msg);
return 1;
}
public static int w(String tag, String msg) {
System.out.println("WARN: " + tag + ": " + msg);
return 1;
}
}

View File

@ -0,0 +1,763 @@
/*
* Nextcloud Talk - Android Client
*
* SPDX-FileCopyrightText: 2024 Daniel Calviño Sánchez <danxuliu@gmail.com>
* SPDX-License-Identifier: GPL-3.0-or-later
*/
package com.nextcloud.talk.webrtc
import com.bluelinelabs.logansquare.LoganSquare
import com.nextcloud.talk.models.json.signaling.DataChannelMessage
import com.nextcloud.talk.signaling.SignalingMessageReceiver
import com.nextcloud.talk.signaling.SignalingMessageSender
import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListener
import org.junit.Before
import org.junit.Test
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.any
import org.mockito.ArgumentMatchers.argThat
import org.mockito.ArgumentMatchers.eq
import org.mockito.Mockito
import org.mockito.Mockito.atLeast
import org.mockito.Mockito.atMostOnce
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.doNothing
import org.mockito.Mockito.inOrder
import org.mockito.Mockito.never
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.webrtc.DataChannel
import org.webrtc.MediaConstraints
import org.webrtc.PeerConnection
import org.webrtc.PeerConnectionFactory
import java.nio.ByteBuffer
import java.util.HashMap
import kotlin.concurrent.thread
@Suppress("LongMethod", "TooGenericExceptionCaught")
class PeerConnectionWrapperTest {
private var peerConnectionWrapper: PeerConnectionWrapper? = null
private var mockedPeerConnection: PeerConnection? = null
private var mockedPeerConnectionFactory: PeerConnectionFactory? = null
private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null
private var mockedSignalingMessageSender: SignalingMessageSender? = null
/**
* Helper answer for DataChannel methods.
*/
private class ReturnValueOrThrowIfDisposed<T>(val value: T) :
Answer<T> {
override fun answer(currentInvocation: InvocationOnMock): T {
if (Mockito.mockingDetails(currentInvocation.mock).invocations.find {
it!!.method.name === "dispose"
} !== null
) {
throw IllegalStateException("DataChannel has been disposed")
}
return value
}
}
/**
* Helper matcher for DataChannelMessages.
*/
private inner class MatchesDataChannelMessage(
private val expectedDataChannelMessage: DataChannelMessage
) : ArgumentMatcher<DataChannel.Buffer> {
override fun matches(buffer: DataChannel.Buffer): Boolean {
// DataChannel.Buffer does not implement "equals", so the comparison needs to be done on the ByteBuffer
// instead.
return dataChannelMessageToBuffer(expectedDataChannelMessage).data.equals(buffer.data)
}
}
private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer {
return DataChannel.Buffer(
ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()),
false
)
}
@Before
fun setUp() {
mockedPeerConnection = Mockito.mock(PeerConnection::class.java)
mockedPeerConnectionFactory = Mockito.mock(PeerConnectionFactory::class.java)
mockedSignalingMessageReceiver = Mockito.mock(SignalingMessageReceiver::class.java)
mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java)
}
@Test
fun testSendDataChannelMessage() {
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
any(PeerConnection.Observer::class.java)
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type"))
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type")))
)
}
@Test
fun testSendDataChannelMessageWithOpenRemoteDataChannel() {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id")
Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN)
peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel)
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type"))
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type")))
)
Mockito.verify(mockedRandomIdDataChannel, never()).send(any())
}
@Test
fun testSendDataChannelMessageBeforeOpeningDataChannel() {
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
any(PeerConnection.Observer::class.java)
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
val statusDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture())
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type"))
peerConnectionWrapper!!.send(DataChannelMessage("another-message-type"))
Mockito.verify(mockedStatusDataChannel, never()).send(any())
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
statusDataChannelObserverArgumentCaptor.value.onStateChange()
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type")))
)
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type")))
)
}
@Test
fun testSendDataChannelMessageBeforeOpeningDataChannelWithDifferentThreads() {
// A brute force approach is used to test race conditions between different threads just repeating the test
// several times. Due to this the test passing could be a false positive, as it could have been a matter of
// luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with
// that number of reruns, it fails when it should).
for (i in 1..1000) {
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
any(PeerConnection.Observer::class.java)
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
val statusDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
doNothing().`when`(mockedStatusDataChannel)
.registerObserver(statusDataChannelObserverArgumentCaptor.capture())
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val dataChannelMessageCount = 5
val sendThread = thread {
for (j in 1..dataChannelMessageCount) {
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type-$j"))
}
}
// Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done
// (for example, for ConcurrentModificationExceptions when iterating over the data channel messages).
var exceptionOnStateChange: Exception? = null
val openDataChannelThread = thread {
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
try {
statusDataChannelObserverArgumentCaptor.value.onStateChange()
} catch (e: Exception) {
exceptionOnStateChange = e
}
}
sendThread.join()
openDataChannelThread.join()
if (exceptionOnStateChange !== null) {
throw exceptionOnStateChange!!
}
val inOrder = inOrder(mockedStatusDataChannel)
for (j in 1..dataChannelMessageCount) {
inOrder.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j")))
)
}
}
}
@Test
fun testReceiveDataChannelMessage() {
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
any(PeerConnection.Observer::class.java)
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
val statusDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture())
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java)
peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener)
// The payload must be a map to be able to serialize it and, therefore, generate the data that would have been
// received from another participant, so it is not possible to test receiving the nick as a String payload.
val payloadMap = HashMap<String, String>()
payloadMap["name"] = "the-nick-in-map"
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("nickChanged", null, payloadMap))
)
Mockito.verify(mockedDataChannelMessageListener).onNickChanged("the-nick-in-map")
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("audioOn"))
)
Mockito.verify(mockedDataChannelMessageListener).onAudioOn()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("audioOff"))
)
Mockito.verify(mockedDataChannelMessageListener).onAudioOff()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("videoOn"))
)
Mockito.verify(mockedDataChannelMessageListener).onVideoOn()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("videoOff"))
)
Mockito.verify(mockedDataChannelMessageListener).onVideoOff()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
}
@Test
fun testReceiveDataChannelMessageWithOpenRemoteDataChannel() {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
val statusDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture())
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val randomIdDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id")
Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN)
doNothing().`when`(mockedRandomIdDataChannel).registerObserver(
randomIdDataChannelObserverArgumentCaptor.capture()
)
peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel)
val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java)
peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener)
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("audioOn"))
)
Mockito.verify(mockedDataChannelMessageListener).onAudioOn()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
randomIdDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("audioOff"))
)
Mockito.verify(mockedDataChannelMessageListener).onAudioOff()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
}
@Test
fun testRemovePeerConnectionWithOpenRemoteDataChannel() {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id")
Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN)
peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel)
peerConnectionWrapper!!.removePeerConnection()
Mockito.verify(mockedStatusDataChannel).dispose()
Mockito.verify(mockedRandomIdDataChannel).dispose()
}
@Test
fun testRemovePeerConnectionWhileAddingRemoteDataChannelsWithDifferentThreads() {
// A brute force approach is used to test race conditions between different threads just repeating the test
// several times. Due to this the test passing could be a false positive, as it could have been a matter of
// luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with
// that number of reruns, it fails when it should).
for (i in 1..1000) {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status"))
Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer(
ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN)
)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val dataChannelCount = 5
val mockedRandomIdDataChannels: MutableList<DataChannel> = ArrayList()
val dataChannelObservers: MutableList<DataChannel.Observer?> = ArrayList()
for (j in 0..<dataChannelCount) {
mockedRandomIdDataChannels.add(Mockito.mock(DataChannel::class.java))
// Add data channels with duplicated labels (from the second data channel and onwards) to test that
// they are correctly disposed also in that case (which should not happen anyway, but just in case).
Mockito.`when`(mockedRandomIdDataChannels[j].label())
.thenAnswer(ReturnValueOrThrowIfDisposed("random-id-" + ((j + 1) / 2)))
Mockito.`when`(mockedRandomIdDataChannels[j].state())
.thenAnswer(ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN))
// Store a reference to the registered observer, if any, to be called after the registration. The call
// is done outside the mock to better simulate the normal behaviour, as it would not be called during
// the registration itself.
dataChannelObservers.add(null)
doAnswer { invocation ->
if (Mockito.mockingDetails(invocation.mock).invocations.find {
it!!.method.name === "dispose"
} !== null
) {
throw IllegalStateException("DataChannel has been disposed")
}
dataChannelObservers[j] = invocation.getArgument(0, DataChannel.Observer::class.java)
null
}.`when`(mockedRandomIdDataChannels[j]).registerObserver(any())
}
val onDataChannelThread = thread {
// Add again "status" data channel to test that it is correctly disposed also in that case (which
// should not happen anyway even if it was added by the remote peer, but just in case)
peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedStatusDataChannel)
for (j in 0..<dataChannelCount) {
peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannels[j])
// Call "onStateChange" on the registered observer to simulate that the data channel was opened.
dataChannelObservers[j]?.onStateChange()
}
}
// Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done
// (for example, for ConcurrentModificationExceptions when iterating over the data channels).
var exceptionRemovePeerConnection: Exception? = null
val removePeerConnectionThread = thread {
try {
peerConnectionWrapper!!.removePeerConnection()
} catch (e: Exception) {
exceptionRemovePeerConnection = e
}
}
onDataChannelThread.join()
removePeerConnectionThread.join()
if (exceptionRemovePeerConnection !== null) {
throw exceptionRemovePeerConnection!!
}
Mockito.verify(mockedStatusDataChannel).dispose()
for (j in 0..<dataChannelCount) {
Mockito.verify(mockedRandomIdDataChannels[j]).dispose()
}
}
}
@Test
fun testRemovePeerConnectionWhileSendingWithDifferentThreads() {
// A brute force approach is used to test race conditions between different threads just repeating the test
// several times. Due to this the test passing could be a false positive, as it could have been a matter of
// luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with
// that number of reruns, it fails when it should).
for (i in 1..1000) {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status"))
Mockito.`when`(mockedStatusDataChannel.state())
.thenAnswer(ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN))
Mockito.`when`(mockedStatusDataChannel.send(any())).thenAnswer(ReturnValueOrThrowIfDisposed(true))
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val dataChannelMessageCount = 5
// Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done
// (for example, for IllegalStateExceptions when using a disposed data channel).
var exceptionSend: Exception? = null
val sendThread = thread {
try {
for (j in 0..<dataChannelMessageCount) {
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type-$j"))
}
} catch (e: Exception) {
exceptionSend = e
}
}
val removePeerConnectionThread = thread {
peerConnectionWrapper!!.removePeerConnection()
}
sendThread.join()
removePeerConnectionThread.join()
if (exceptionSend !== null) {
throw exceptionSend!!
}
Mockito.verify(mockedStatusDataChannel).registerObserver(any())
Mockito.verify(mockedStatusDataChannel).dispose()
Mockito.verify(mockedStatusDataChannel, atLeast(0)).label()
Mockito.verify(mockedStatusDataChannel, atLeast(0)).state()
Mockito.verify(mockedStatusDataChannel, atLeast(0)).send(any())
Mockito.verifyNoMoreInteractions(mockedStatusDataChannel)
}
}
@Test
fun testRemovePeerConnectionWhileReceivingWithDifferentThreads() {
// A brute force approach is used to test race conditions between different threads just repeating the test
// several times. Due to this the test passing could be a false positive, as it could have been a matter of
// luck, but even if the test may wrongly pass sometimes it is better than nothing (although, in general, with
// that number of reruns, it fails when it should).
for (i in 1..1000) {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenAnswer(ReturnValueOrThrowIfDisposed("status"))
Mockito.`when`(mockedStatusDataChannel.state()).thenAnswer(
ReturnValueOrThrowIfDisposed(DataChannel.State.OPEN)
)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
val statusDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
doNothing().`when`(mockedStatusDataChannel)
.registerObserver(statusDataChannelObserverArgumentCaptor.capture())
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val mockedDataChannelMessageListener = Mockito.mock(DataChannelMessageListener::class.java)
peerConnectionWrapper!!.addListener(mockedDataChannelMessageListener)
// Exceptions thrown in threads are not propagated to the main thread, so it needs to be explicitly done
// (for example, for IllegalStateExceptions when using a disposed data channel).
var exceptionOnMessage: Exception? = null
val onMessageThread = thread {
try {
// It is assumed that, even if its data channel was disposed, its buffers can be used while there
// is a reference to them, so no special mock behaviour is added to throw an exception in that case.
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("audioOn"))
)
statusDataChannelObserverArgumentCaptor.value.onMessage(
dataChannelMessageToBuffer(DataChannelMessage("audioOff"))
)
} catch (e: Exception) {
exceptionOnMessage = e
}
}
val removePeerConnectionThread = thread {
peerConnectionWrapper!!.removePeerConnection()
}
onMessageThread.join()
removePeerConnectionThread.join()
if (exceptionOnMessage !== null) {
throw exceptionOnMessage!!
}
Mockito.verify(mockedStatusDataChannel).registerObserver(any())
Mockito.verify(mockedStatusDataChannel).dispose()
Mockito.verify(mockedStatusDataChannel, atLeast(0)).label()
Mockito.verify(mockedStatusDataChannel, atLeast(0)).state()
Mockito.verifyNoMoreInteractions(mockedStatusDataChannel)
Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOn()
Mockito.verify(mockedDataChannelMessageListener, atMostOnce()).onAudioOff()
Mockito.verifyNoMoreInteractions(mockedDataChannelMessageListener)
}
}
}