diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index bf95befb8..6413f0344 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -57,7 +57,7 @@ public class PeerConnectionWrapper { private PeerConnection peerConnection; private String sessionId; private final MediaConstraints mediaConstraints; - private DataChannel dataChannel; + private DataChannel statusDataChannel; private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -144,8 +144,8 @@ public class PeerConnectionWrapper { if (hasMCU || hasInitiated) { DataChannel.Init init = new DataChannel.Init(); init.negotiated = false; - dataChannel = peerConnection.createDataChannel("status", init); - dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + statusDataChannel = peerConnection.createDataChannel("status", init); + statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel)); if (isMCUPublisher) { peerConnection.createOffer(sdpObserver, mediaConstraints); } else if (hasMCU && "video".equals(this.videoStreamType)) { @@ -233,9 +233,9 @@ public class PeerConnectionWrapper { public void removePeerConnection() { signalingMessageReceiver.removeListener(webRtcMessageListener); - if (dataChannel != null) { - dataChannel.dispose(); - dataChannel = null; + if (statusDataChannel != null) { + statusDataChannel.dispose(); + statusDataChannel = null; Log.d(TAG, "Disposed DataChannel"); } else { Log.d(TAG, "DataChannel is null."); @@ -269,12 +269,24 @@ public class PeerConnectionWrapper { } } + /** + * Sends a data channel message. + *

+ * Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used, + * messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel; + * messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the + * "status" data channel subscriber connections will receive it on a data channel with a different label, as + * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel + * messages on it, independently of on which data channel they were originally sent. + * + * @param dataChannelMessage the message to send + */ public void send(DataChannelMessage dataChannelMessage) { ByteBuffer buffer; - if (dataChannel != null && dataChannelMessage != null) { + if (statusDataChannel != null && dataChannelMessage != null) { try { buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); - dataChannel.send(new DataChannel.Buffer(buffer, false)); + statusDataChannel.send(new DataChannel.Buffer(buffer, false)); } catch (Exception e) { Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); } @@ -513,12 +525,7 @@ public class PeerConnectionWrapper { @Override public void onDataChannel(DataChannel dataChannel) { - if (PeerConnectionWrapper.this.dataChannel != null) { - Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label() - + " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label()); - } - PeerConnectionWrapper.this.dataChannel = dataChannel; - PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver(dataChannel)); + dataChannel.registerObserver(new DataChannelObserver(dataChannel)); } @Override diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 98e2de4ae..be628c2af 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -14,10 +14,13 @@ import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListene import org.junit.Before import org.junit.Test import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatcher import org.mockito.ArgumentMatchers.any +import org.mockito.ArgumentMatchers.argThat import org.mockito.ArgumentMatchers.eq import org.mockito.Mockito import org.mockito.Mockito.doNothing +import org.mockito.Mockito.never import org.webrtc.DataChannel import org.webrtc.MediaConstraints import org.webrtc.PeerConnection @@ -33,6 +36,19 @@ class PeerConnectionWrapperTest { private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null private var mockedSignalingMessageSender: SignalingMessageSender? = null + /** + * Helper matcher for DataChannelMessages. + */ + private inner class MatchesDataChannelMessage( + private val expectedDataChannelMessage: DataChannelMessage + ) : ArgumentMatcher { + override fun matches(buffer: DataChannel.Buffer): Boolean { + // DataChannel.Buffer does not implement "equals", so the comparison needs to be done on the ByteBuffer + // instead. + return dataChannelMessageToBuffer(expectedDataChannelMessage).data.equals(buffer.data) + } + } + private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { return DataChannel.Buffer( ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), @@ -48,6 +64,87 @@ class PeerConnectionWrapperTest { mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) } + @Test + fun testSendDataChannelMessage() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + } + + @Test + fun testSendDataChannelMessageWithOpenRemoteDataChannel() { + val peerConnectionObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(PeerConnection.Observer::class.java) + + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + peerConnectionObserverArgumentCaptor.capture() + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id") + Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN) + peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`(