Send data channel messages using "status" data channel

Data channel messages are expected to be sent using the "status" data
channel that is locally created. However, if another data channel was
opened by the remote peer the reference to the "status" data channel was
overwritten with the new data channel, and messages were sent instead on
the remote data channel.

In current Talk versions that was not a problem, and the change makes no
difference either, because since the support for Janus 1.x was added
data channel messages are listened on all data channels, independently
of their label or whether they were created by the local or remote peer.

However, in older Talk versions this fixes a regression introduced with
the support for Janus 1.x. In those versions only messages coming from
the "status" or "JanusDataChannel" data channels were taken into
account. When Janus is not used the WebUI opens the legacy
"simplewebrtc" data channel, so that data channel may be the one used to
send data channel messages (if it is open after the "status" data
channel), but the messages received on that data channel were ignored by
the WebUI. Nevertheless, at this point this is more an academic problem
than a real world problem, as it is unlikely that there are many
Nextcloud servers with Talk < 16 and without HPB being used.

Independently of all that, when the peer connection is removed only the
"status" data channel is disposed, but none of the remote data channels
are. This is just a variation of an already existing bug (the last open
data channel was the one disposed due to being the last saved reference,
but the rest were not) and it will be fixed in another commit.

Signed-off-by: Daniel Calviño Sánchez <danxuliu@gmail.com>
This commit is contained in:
Daniel Calviño Sánchez 2024-12-05 17:42:34 +01:00 committed by backportbot[bot]
parent d1a4265491
commit 3ffc6db56d
2 changed files with 118 additions and 14 deletions

View File

@ -57,7 +57,7 @@ public class PeerConnectionWrapper {
private PeerConnection peerConnection; private PeerConnection peerConnection;
private String sessionId; private String sessionId;
private final MediaConstraints mediaConstraints; private final MediaConstraints mediaConstraints;
private DataChannel dataChannel; private DataChannel statusDataChannel;
private final SdpObserver sdpObserver; private final SdpObserver sdpObserver;
private final boolean hasInitiated; private final boolean hasInitiated;
@ -144,8 +144,8 @@ public class PeerConnectionWrapper {
if (hasMCU || hasInitiated) { if (hasMCU || hasInitiated) {
DataChannel.Init init = new DataChannel.Init(); DataChannel.Init init = new DataChannel.Init();
init.negotiated = false; init.negotiated = false;
dataChannel = peerConnection.createDataChannel("status", init); statusDataChannel = peerConnection.createDataChannel("status", init);
dataChannel.registerObserver(new DataChannelObserver(dataChannel)); statusDataChannel.registerObserver(new DataChannelObserver(statusDataChannel));
if (isMCUPublisher) { if (isMCUPublisher) {
peerConnection.createOffer(sdpObserver, mediaConstraints); peerConnection.createOffer(sdpObserver, mediaConstraints);
} else if (hasMCU && "video".equals(this.videoStreamType)) { } else if (hasMCU && "video".equals(this.videoStreamType)) {
@ -233,9 +233,9 @@ public class PeerConnectionWrapper {
public void removePeerConnection() { public void removePeerConnection() {
signalingMessageReceiver.removeListener(webRtcMessageListener); signalingMessageReceiver.removeListener(webRtcMessageListener);
if (dataChannel != null) { if (statusDataChannel != null) {
dataChannel.dispose(); statusDataChannel.dispose();
dataChannel = null; statusDataChannel = null;
Log.d(TAG, "Disposed DataChannel"); Log.d(TAG, "Disposed DataChannel");
} else { } else {
Log.d(TAG, "DataChannel is null."); Log.d(TAG, "DataChannel is null.");
@ -269,12 +269,24 @@ public class PeerConnectionWrapper {
} }
} }
/**
* Sends a data channel message.
* <p>
* Data channel messages are always sent on the "status" data channel locally opened. However, if Janus is used,
* messages can be sent only on publisher connections, even if subscriber connections have a "status" data channel;
* messages sent on subscriber connections will be simply ignored. Moreover, even if the message is sent on the
* "status" data channel subscriber connections will receive it on a data channel with a different label, as
* Janus opens its own data channel on subscriber connections and "multiplexes" all the received data channel
* messages on it, independently of on which data channel they were originally sent.
*
* @param dataChannelMessage the message to send
*/
public void send(DataChannelMessage dataChannelMessage) { public void send(DataChannelMessage dataChannelMessage) {
ByteBuffer buffer; ByteBuffer buffer;
if (dataChannel != null && dataChannelMessage != null) { if (statusDataChannel != null && dataChannelMessage != null) {
try { try {
buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes()); buffer = ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).getBytes());
dataChannel.send(new DataChannel.Buffer(buffer, false)); statusDataChannel.send(new DataChannel.Buffer(buffer, false));
} catch (Exception e) { } catch (Exception e) {
Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage); Log.d(TAG, "Failed to send channel data, attempting regular " + dataChannelMessage);
} }
@ -513,12 +525,7 @@ public class PeerConnectionWrapper {
@Override @Override
public void onDataChannel(DataChannel dataChannel) { public void onDataChannel(DataChannel dataChannel) {
if (PeerConnectionWrapper.this.dataChannel != null) { dataChannel.registerObserver(new DataChannelObserver(dataChannel));
Log.w(TAG, "Data channel with label " + PeerConnectionWrapper.this.dataChannel.label()
+ " exists, but received onDataChannel event for DataChannel with label " + dataChannel.label());
}
PeerConnectionWrapper.this.dataChannel = dataChannel;
PeerConnectionWrapper.this.dataChannel.registerObserver(new DataChannelObserver(dataChannel));
} }
@Override @Override

View File

@ -14,10 +14,13 @@ import com.nextcloud.talk.webrtc.PeerConnectionWrapper.DataChannelMessageListene
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import org.mockito.ArgumentCaptor import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatcher
import org.mockito.ArgumentMatchers.any import org.mockito.ArgumentMatchers.any
import org.mockito.ArgumentMatchers.argThat
import org.mockito.ArgumentMatchers.eq import org.mockito.ArgumentMatchers.eq
import org.mockito.Mockito import org.mockito.Mockito
import org.mockito.Mockito.doNothing import org.mockito.Mockito.doNothing
import org.mockito.Mockito.never
import org.webrtc.DataChannel import org.webrtc.DataChannel
import org.webrtc.MediaConstraints import org.webrtc.MediaConstraints
import org.webrtc.PeerConnection import org.webrtc.PeerConnection
@ -33,6 +36,19 @@ class PeerConnectionWrapperTest {
private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null private var mockedSignalingMessageReceiver: SignalingMessageReceiver? = null
private var mockedSignalingMessageSender: SignalingMessageSender? = null private var mockedSignalingMessageSender: SignalingMessageSender? = null
/**
* Helper matcher for DataChannelMessages.
*/
private inner class MatchesDataChannelMessage(
private val expectedDataChannelMessage: DataChannelMessage
) : ArgumentMatcher<DataChannel.Buffer> {
override fun matches(buffer: DataChannel.Buffer): Boolean {
// DataChannel.Buffer does not implement "equals", so the comparison needs to be done on the ByteBuffer
// instead.
return dataChannelMessageToBuffer(expectedDataChannelMessage).data.equals(buffer.data)
}
}
private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer { private fun dataChannelMessageToBuffer(dataChannelMessage: DataChannelMessage): DataChannel.Buffer {
return DataChannel.Buffer( return DataChannel.Buffer(
ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()), ByteBuffer.wrap(LoganSquare.serialize(dataChannelMessage).toByteArray()),
@ -48,6 +64,87 @@ class PeerConnectionWrapperTest {
mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java) mockedSignalingMessageSender = Mockito.mock(SignalingMessageSender::class.java)
} }
@Test
fun testSendDataChannelMessage() {
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
any(PeerConnection.Observer::class.java)
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type"))
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type")))
)
}
@Test
fun testSendDataChannelMessageWithOpenRemoteDataChannel() {
val peerConnectionObserverArgumentCaptor: ArgumentCaptor<PeerConnection.Observer> =
ArgumentCaptor.forClass(PeerConnection.Observer::class.java)
Mockito.`when`(
mockedPeerConnectionFactory!!.createPeerConnection(
any(PeerConnection.RTCConfiguration::class.java),
peerConnectionObserverArgumentCaptor.capture()
)
).thenReturn(mockedPeerConnection)
val mockedStatusDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedStatusDataChannel.label()).thenReturn("status")
Mockito.`when`(mockedStatusDataChannel.state()).thenReturn(DataChannel.State.OPEN)
Mockito.`when`(mockedPeerConnection!!.createDataChannel(eq("status"), any()))
.thenReturn(mockedStatusDataChannel)
peerConnectionWrapper = PeerConnectionWrapper(
mockedPeerConnectionFactory,
ArrayList<PeerConnection.IceServer>(),
MediaConstraints(),
"the-session-id",
"the-local-session-id",
null,
true,
true,
"video",
mockedSignalingMessageReceiver,
mockedSignalingMessageSender
)
val mockedRandomIdDataChannel = Mockito.mock(DataChannel::class.java)
Mockito.`when`(mockedRandomIdDataChannel.label()).thenReturn("random-id")
Mockito.`when`(mockedRandomIdDataChannel.state()).thenReturn(DataChannel.State.OPEN)
peerConnectionObserverArgumentCaptor.value.onDataChannel(mockedRandomIdDataChannel)
peerConnectionWrapper!!.send(DataChannelMessage("the-message-type"))
Mockito.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type")))
)
Mockito.verify(mockedRandomIdDataChannel, never()).send(any())
}
@Test @Test
fun testReceiveDataChannelMessage() { fun testReceiveDataChannelMessage() {
Mockito.`when`( Mockito.`when`(