diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 26f09d31e..be55a6863 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -59,6 +59,7 @@ public class PeerConnectionWrapper { private String sessionId; private final MediaConstraints mediaConstraints; private final Map dataChannels = new HashMap<>(); + private final List pendingDataChannelMessages = new ArrayList<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -281,6 +282,13 @@ public class PeerConnectionWrapper { * "status" data channel subscriber connections will receive it on a data channel with a different label, as * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel * messages on it, independently of on which data channel they were originally sent. + *

+ * Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be + * queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will + * be received by other participants, as it is only known when the data channel of the publisher was opened, but + * not if the data channel of the subscribers was. However, in general this should be a concern only during the + * first seconds after a participant joins; after some time the subscriber connections should be established and + * their data channels open. * * @param dataChannelMessage the message to send */ @@ -290,7 +298,9 @@ public class PeerConnectionWrapper { } DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null) { + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + pendingDataChannelMessages.add(dataChannelMessage); + return; } @@ -398,6 +408,13 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) { + for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { + send(dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + if (dataChannel.state() == DataChannel.State.OPEN) { sendInitialMediaStatus(); } diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 450cc4fa8..61c5f5daa 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -145,6 +145,56 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) } + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannel() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + peerConnectionWrapper!!.send(DataChannelMessage("another-message-type")) + + Mockito.verify(mockedStatusDataChannel, never()).send(any()) + + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + statusDataChannelObserverArgumentCaptor.value.onStateChange() + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type"))) + ) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`(