From ddd451dadb60cbcb3a1e482f7a69fb80036699b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Calvi=C3=B1o=20S=C3=A1nchez?= Date: Fri, 6 Dec 2024 04:29:22 +0100 Subject: [PATCH] Queue data channel messages sent when data channel is not open MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Data channel messages can be sent only when the data channel is open. Otherwise the message is simply lost. Clients of the PeerConnectionWrapper do not need to be aware of that detail or keep track of whether the data channel was open already or not, so now data channel messages sent before the data channel is open are queued and sent once the data channel is opened. Signed-off-by: Daniel Calviño Sánchez --- .../talk/webrtc/PeerConnectionWrapper.java | 19 ++++++- .../talk/webrtc/PeerConnectionWrapperTest.kt | 50 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java index 26f09d31e..be55a6863 100644 --- a/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java +++ b/app/src/main/java/com/nextcloud/talk/webrtc/PeerConnectionWrapper.java @@ -59,6 +59,7 @@ public class PeerConnectionWrapper { private String sessionId; private final MediaConstraints mediaConstraints; private final Map dataChannels = new HashMap<>(); + private final List pendingDataChannelMessages = new ArrayList<>(); private final SdpObserver sdpObserver; private final boolean hasInitiated; @@ -281,6 +282,13 @@ public class PeerConnectionWrapper { * "status" data channel subscriber connections will receive it on a data channel with a different label, as * Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel * messages on it, independently of on which data channel they were originally sent. + *

+ * Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be + * queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will + * be received by other participants, as it is only known when the data channel of the publisher was opened, but + * not if the data channel of the subscribers was. However, in general this should be a concern only during the + * first seconds after a participant joins; after some time the subscriber connections should be established and + * their data channels open. * * @param dataChannelMessage the message to send */ @@ -290,7 +298,9 @@ public class PeerConnectionWrapper { } DataChannel statusDataChannel = dataChannels.get("status"); - if (statusDataChannel == null) { + if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) { + pendingDataChannelMessages.add(dataChannelMessage); + return; } @@ -398,6 +408,13 @@ public class PeerConnectionWrapper { @Override public void onStateChange() { + if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) { + for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) { + send(dataChannelMessage); + } + pendingDataChannelMessages.clear(); + } + if (dataChannel.state() == DataChannel.State.OPEN) { sendInitialMediaStatus(); } diff --git a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt index 450cc4fa8..61c5f5daa 100644 --- a/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt +++ b/app/src/test/java/com/nextcloud/talk/webrtc/PeerConnectionWrapperTest.kt @@ -145,6 +145,56 @@ class PeerConnectionWrapperTest { Mockito.verify(mockedRandomIdDataChannel, never()).send(any()) } + @Test + fun testSendDataChannelMessageBeforeOpeningDataChannel() { + Mockito.`when`( + mockedPeerConnectionFactory!!.createPeerConnection( + any(PeerConnection.RTCConfiguration::class.java), + any(PeerConnection.Observer::class.java) + ) + ).thenReturn(mockedPeerConnection) + + val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java) + Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status") + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING) + Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any())) + .thenReturn(mockedStatusDataChannel) + + val statusDataChannelObserverArgumentCaptor: ArgumentCaptor = + ArgumentCaptor.forClass(DataChannel.Observer::class.java) + + doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture()) + + peerConnectionWrapper = PeerConnectionWrapper( + mockedPeerConnectionFactory, + ArrayList(), + MediaConstraints(), + "the-session-id", + "the-local-session-id", + null, + true, + true, + "video", + mockedSignalingMessageReceiver, + mockedSignalingMessageSender + ) + + peerConnectionWrapper!!.send(DataChannelMessage("the-message-type")) + peerConnectionWrapper!!.send(DataChannelMessage("another-message-type")) + + Mockito.verify(mockedStatusDataChannel, never()).send(any()) + + Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN) + statusDataChannelObserverArgumentCaptor.value.onStateChange() + + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type"))) + ) + Mockito.verify(mockedStatusDataChannel).send( + argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type"))) + ) + } + @Test fun testReceiveDataChannelMessage() { Mockito.`when`(