Fix "send" not respecting order of pending messages

When the data channel is not open yet data channel messages are queued
and then sent once opened. "onStateChange" is called from the WebRTC
signaling thread, while "send" can be called potentially from any
thread, so to send the data channel messages in the same order that they
were added new messages need to be enqueued until all the pending
messages have been sent. Otherwise, even if there is synchronization
already, it could happen that "onStateChange" was called but, before
getting the lock, "send" gets it and sends the new message before the
pending messages were sent.

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2024-12-09 18:36:22 +01:00 committed by Marcel Hibbe
parent b6d6986b62
commit d63bb31595
No known key found for this signature in database
GPG Key ID: C793F8B59F43CE7B
2 changed files with 11 additions and 3 deletions

View File

@ -298,7 +298,8 @@ public class PeerConnectionWrapper {
}
DataChannel statusDataChannel = dataChannels.get("status");
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) {
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN ||
!pendingDataChannelMessages.isEmpty()) {
Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId);
pendingDataChannelMessages.add(dataChannelMessage);
@ -306,6 +307,10 @@ public class PeerConnectionWrapper {
return;
}
sendWithoutQueuing(statusDataChannel, dataChannelMessage);
}
private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) {
try {
Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId);
@ -423,7 +428,7 @@ public class PeerConnectionWrapper {
if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) {
for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) {
send(dataChannelMessage);
sendWithoutQueuing(dataChannel, dataChannelMessage);
}
pendingDataChannelMessages.clear();
}

View File

@ -23,6 +23,7 @@ import org.mockito.Mockito.atLeast
import org.mockito.Mockito.atMostOnce
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.doNothing
import org.mockito.Mockito.inOrder
import org.mockito.Mockito.never
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
@ -288,8 +289,10 @@ class PeerConnectionWrapperTest {
throw exceptionOnStateChange!!
}
val inOrder = inOrder(mockedStatusDataChannel)
for (j in 1..dataChannelMessageCount) {
Mockito.verify(mockedStatusDataChannel).send(
inOrder.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j")))
)
}