Queue data channel messages sent when data channel is not open

Data channel messages can be sent only when the data channel is open.
Otherwise the message is simply lost. Clients of the
PeerConnectionWrapper do not need to be aware of that detail or keep
track of whether the data channel was open already or not, so now data
channel messages sent before the data channel is open are queued and
sent once the data channel is opened.

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2024-12-06 04:29:22 +01:00 committed by Marcel Hibbe
parent 7cfee8f848
commit ddd451dadb
No known key found for this signature in database
GPG Key ID: C793F8B59F43CE7B
2 changed files with 68 additions and 1 deletions

View File

@ -59,6 +59,7 @@ public class PeerConnectionWrapper {
private String sessionId;
private final MediaConstraints mediaConstraints;
private final Map<String, DataChannel> dataChannels = new HashMap<>();
private final List<DataChannelMessage> pendingDataChannelMessages = new ArrayList<>();
private final SdpObserver sdpObserver;
private final boolean hasInitiated;
@ -281,6 +282,13 @@ public class PeerConnectionWrapper {
* "status" data channel subscriber connections will receive it on a data channel with a different label, as
* Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel
* messages on it, independently of on which data channel they were originally sent.
* <p>
* Data channel messages can be sent at any time; if the "status" data channel is not open yet the messages will be
* queued and sent once it is opened. Nevertheless, if Janus is used, it is not guaranteed that the messages will
* be received by other participants, as it is only known when the data channel of the publisher was opened, but
* not if the data channel of the subscribers was. However, in general this should be a concern only during the
* first seconds after a participant joins; after some time the subscriber connections should be established and
* their data channels open.
*
* @param dataChannelMessage the message to send
*/
@ -290,7 +298,9 @@ public class PeerConnectionWrapper {
}
DataChannel statusDataChannel = dataChannels.get("status");
if (statusDataChannel == null) {
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) {
pendingDataChannelMessages.add(dataChannelMessage);
return;
}
@ -398,6 +408,13 @@ public class PeerConnectionWrapper {
@Override
public void onStateChange() {
if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannel.label())) {
for (DataChannelMessage dataChannelMessage: pendingDataChannelMessages) {
send(dataChannelMessage);
}
pendingDataChannelMessages.clear();
}
if (dataChannel.state() == DataChannel.State.OPEN) {
sendInitialMediaStatus();
}

View File

@ -145,6 +145,56 @@ class PeerConnectionWrapperTest {
Mockito.verify(mockedRandomIdDataChannel, never()).send(any())
}
@Test
fun testSendDataChannelMessageBeforeOpeningDataChannel() {
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
any(PeerConnection.Observer::class.java)
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.CONNECTING)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
val statusDataChannelObserverArgumentCaptor: ArgumentCaptor<DataChannel.Observer> =
ArgumentCaptor.forClass(DataChannel.Observer::class.java)
doNothing().`when`(mockedStatusDataChannel).registerObserver(statusDataChannelObserverArgumentCaptor.capture())
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type"))
peerConnectionWrapper!!.send(DataChannelMessage("another-message-type"))
Mockito.verify(mockedStatusDataChannel, never()).send(any())
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
statusDataChannelObserverArgumentCaptor.value.onStateChange()
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type")))
)
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("another-message-type")))
)
}
@Test
fun testReceiveDataChannelMessage() {
Mockito.`when`(