MagicWebSocketInstance.java -> WebSocketInstance.kt

Signed-off-by: Marcel Hibbe <dev@mhibbe.de>
This commit is contained in:
Marcel Hibbe 2023-01-20 11:16:10 +01:00
parent 690a174f64
commit a37edc4421
No known key found for this signature in database
GPG Key ID: C793F8B59F43CE7B
5 changed files with 484 additions and 533 deletions

View File

@ -105,10 +105,10 @@ import com.nextcloud.talk.utils.power.PowerManagerUtils;
import com.nextcloud.talk.utils.singletons.ApplicationWideCurrentRoomHolder; import com.nextcloud.talk.utils.singletons.ApplicationWideCurrentRoomHolder;
import com.nextcloud.talk.viewmodels.CallRecordingViewModel; import com.nextcloud.talk.viewmodels.CallRecordingViewModel;
import com.nextcloud.talk.webrtc.MagicWebRTCUtils; import com.nextcloud.talk.webrtc.MagicWebRTCUtils;
import com.nextcloud.talk.webrtc.MagicWebSocketInstance;
import com.nextcloud.talk.webrtc.PeerConnectionWrapper; import com.nextcloud.talk.webrtc.PeerConnectionWrapper;
import com.nextcloud.talk.webrtc.WebRtcAudioManager; import com.nextcloud.talk.webrtc.WebRtcAudioManager;
import com.nextcloud.talk.webrtc.WebSocketConnectionHelper; import com.nextcloud.talk.webrtc.WebSocketConnectionHelper;
import com.nextcloud.talk.webrtc.WebSocketInstance;
import com.wooplr.spotlight.SpotlightView; import com.wooplr.spotlight.SpotlightView;
import org.apache.commons.lang3.StringEscapeUtils; import org.apache.commons.lang3.StringEscapeUtils;
@ -315,7 +315,7 @@ public class CallActivity extends CallBaseActivity {
}; };
private ExternalSignalingServer externalSignalingServer; private ExternalSignalingServer externalSignalingServer;
private MagicWebSocketInstance webSocketClient; private WebSocketInstance webSocketClient;
private WebSocketConnectionHelper webSocketConnectionHelper; private WebSocketConnectionHelper webSocketConnectionHelper;
private boolean hasMCU; private boolean hasMCU;
private boolean hasExternalSignalingServer; private boolean hasExternalSignalingServer;

View File

@ -186,8 +186,8 @@ import com.nextcloud.talk.utils.remapchat.RemapChatModel
import com.nextcloud.talk.utils.rx.DisposableSet import com.nextcloud.talk.utils.rx.DisposableSet
import com.nextcloud.talk.utils.singletons.ApplicationWideCurrentRoomHolder import com.nextcloud.talk.utils.singletons.ApplicationWideCurrentRoomHolder
import com.nextcloud.talk.utils.text.Spans import com.nextcloud.talk.utils.text.Spans
import com.nextcloud.talk.webrtc.MagicWebSocketInstance
import com.nextcloud.talk.webrtc.WebSocketConnectionHelper import com.nextcloud.talk.webrtc.WebSocketConnectionHelper
import com.nextcloud.talk.webrtc.WebSocketInstance
import com.otaliastudios.autocomplete.Autocomplete import com.otaliastudios.autocomplete.Autocomplete
import com.stfalcon.chatkit.commons.ImageLoader import com.stfalcon.chatkit.commons.ImageLoader
import com.stfalcon.chatkit.commons.models.IMessage import com.stfalcon.chatkit.commons.models.IMessage
@ -280,7 +280,7 @@ class ChatController(args: Bundle) :
private var conversationVideoMenuItem: MenuItem? = null private var conversationVideoMenuItem: MenuItem? = null
private var conversationSharedItemsItem: MenuItem? = null private var conversationSharedItemsItem: MenuItem? = null
var magicWebSocketInstance: MagicWebSocketInstance? = null var webSocketInstance: WebSocketInstance? = null
var lobbyTimerHandler: Handler? = null var lobbyTimerHandler: Handler? = null
var pastPreconditionFailed = false var pastPreconditionFailed = false
@ -1926,9 +1926,9 @@ class ChatController(args: Bundle) :
pullChatMessages(1, 0) pullChatMessages(1, 0)
} }
if (magicWebSocketInstance != null) { if (webSocketInstance != null) {
magicWebSocketInstance?.joinRoomWithRoomTokenAndSession( webSocketInstance?.joinRoomWithRoomTokenAndSession(
roomToken, roomToken!!,
currentConversation?.sessionId currentConversation?.sessionId
) )
} }
@ -1951,9 +1951,9 @@ class ChatController(args: Bundle) :
inConversation = true inConversation = true
ApplicationWideCurrentRoomHolder.getInstance().session = currentConversation?.sessionId ApplicationWideCurrentRoomHolder.getInstance().session = currentConversation?.sessionId
if (magicWebSocketInstance != null) { if (webSocketInstance != null) {
magicWebSocketInstance?.joinRoomWithRoomTokenAndSession( webSocketInstance?.joinRoomWithRoomTokenAndSession(
roomToken, roomToken!!,
currentConversation?.sessionId currentConversation?.sessionId
) )
} }
@ -2005,8 +2005,8 @@ class ChatController(args: Bundle) :
lobbyTimerHandler?.removeCallbacksAndMessages(null) lobbyTimerHandler?.removeCallbacksAndMessages(null)
} }
if (magicWebSocketInstance != null && currentConversation != null) { if (webSocketInstance != null && currentConversation != null) {
magicWebSocketInstance?.joinRoomWithRoomTokenAndSession( webSocketInstance?.joinRoomWithRoomTokenAndSession(
"", "",
currentConversation?.sessionId currentConversation?.sessionId
) )
@ -2129,7 +2129,7 @@ class ChatController(args: Bundle) :
private fun setupWebsocket() { private fun setupWebsocket() {
if (conversationUser != null) { if (conversationUser != null) {
magicWebSocketInstance = webSocketInstance =
if (WebSocketConnectionHelper.getMagicWebSocketInstanceForUserId(conversationUser.id!!) != null) { if (WebSocketConnectionHelper.getMagicWebSocketInstanceForUserId(conversationUser.id!!) != null) {
WebSocketConnectionHelper.getMagicWebSocketInstanceForUserId(conversationUser.id!!) WebSocketConnectionHelper.getMagicWebSocketInstanceForUserId(conversationUser.id!!)
} else { } else {

View File

@ -1,504 +0,0 @@
/*
* Nextcloud Talk application
*
* @author Mario Danic
* Copyright (C) 2017-2018 Mario Danic <mario@lovelyhq.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.nextcloud.talk.webrtc;
import android.content.Context;
import android.text.TextUtils;
import android.util.Log;
import com.bluelinelabs.logansquare.LoganSquare;
import com.nextcloud.talk.application.NextcloudTalkApplication;
import com.nextcloud.talk.data.user.model.User;
import com.nextcloud.talk.events.NetworkEvent;
import com.nextcloud.talk.events.WebSocketCommunicationEvent;
import com.nextcloud.talk.models.json.participants.Participant;
import com.nextcloud.talk.models.json.signaling.NCSignalingMessage;
import com.nextcloud.talk.models.json.websocket.BaseWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.ByeWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.CallOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.ErrorWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage;
import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage;
import com.nextcloud.talk.signaling.SignalingMessageReceiver;
import com.nextcloud.talk.signaling.SignalingMessageSender;
import com.nextcloud.talk.utils.bundle.BundleKeys;
import org.greenrobot.eventbus.EventBus;
import org.greenrobot.eventbus.Subscribe;
import org.greenrobot.eventbus.ThreadMode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import androidx.annotation.NonNull;
import autodagger.AutoInjector;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
import static com.nextcloud.talk.models.json.participants.Participant.ActorType.GUESTS;
import static com.nextcloud.talk.models.json.participants.Participant.ActorType.USERS;
import static com.nextcloud.talk.webrtc.Globals.ROOM_TOKEN;
import static com.nextcloud.talk.webrtc.Globals.TARGET_PARTICIPANTS;
import static com.nextcloud.talk.webrtc.Globals.TARGET_ROOM;
@AutoInjector(NextcloudTalkApplication.class)
public class MagicWebSocketInstance extends WebSocketListener {
private static final String TAG = "MagicWebSocketInstance";
@Inject
OkHttpClient okHttpClient;
@Inject
EventBus eventBus;
@Inject
Context context;
private final User conversationUser;
private final String webSocketTicket;
private String resumeId;
private String sessionId;
private boolean hasMCU;
private boolean connected;
private final WebSocketConnectionHelper webSocketConnectionHelper;
private WebSocket internalWebSocket;
private final String connectionUrl;
private String currentRoomToken;
private boolean reconnecting = false;
private HashMap<String, Participant> usersHashMap;
private List<String> messagesQueue = new ArrayList<>();
private final ExternalSignalingMessageReceiver signalingMessageReceiver = new ExternalSignalingMessageReceiver();
private final ExternalSignalingMessageSender signalingMessageSender = new ExternalSignalingMessageSender();
MagicWebSocketInstance(User conversationUser, String connectionUrl, String webSocketTicket) {
NextcloudTalkApplication.Companion.getSharedApplication().getComponentApplication().inject(this);
this.connectionUrl = connectionUrl;
this.conversationUser = conversationUser;
this.webSocketTicket = webSocketTicket;
this.webSocketConnectionHelper = new WebSocketConnectionHelper();
this.usersHashMap = new HashMap<>();
connected = false;
eventBus.register(this);
restartWebSocket();
}
private void sendHello() {
try {
if (TextUtils.isEmpty(resumeId)) {
internalWebSocket.send(
LoganSquare.serialize(webSocketConnectionHelper
.getAssembledHelloModel(conversationUser, webSocketTicket)));
} else {
internalWebSocket.send(
LoganSquare.serialize(webSocketConnectionHelper
.getAssembledHelloModelForResume(resumeId)));
}
} catch (IOException e) {
Log.e(TAG, "Failed to serialize hello model");
}
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
internalWebSocket = webSocket;
sendHello();
}
private void closeWebSocket(WebSocket webSocket) {
webSocket.close(1000, null);
webSocket.cancel();
if (webSocket == internalWebSocket) {
connected = false;
messagesQueue = new ArrayList<>();
}
restartWebSocket();
}
public void clearResumeId() {
resumeId = "";
}
public final void restartWebSocket() {
reconnecting = true;
// TODO when improving logging, keep in mind this issue: https://github.com/nextcloud/talk-android/issues/1013
Log.d(TAG, "restartWebSocket: " + connectionUrl);
Request request = new Request.Builder().url(connectionUrl).build();
okHttpClient.newWebSocket(request, this);
}
@Override
public void onMessage(@NonNull WebSocket webSocket, @NonNull String text) {
if (webSocket == internalWebSocket) {
Log.d(TAG, "Receiving : " + webSocket + " " + text);
try {
BaseWebSocketMessage baseWebSocketMessage = LoganSquare.parse(text, BaseWebSocketMessage.class);
String messageType = baseWebSocketMessage.getType();
if (messageType != null) {
switch (messageType) {
case "hello":
processHelloMessage(webSocket, text);
break;
case "error":
processErrorMessage(webSocket, text);
break;
case "room":
processJoinedRoomMessage(text);
break;
case "event":
processEventMessage(text);
break;
case "message":
processMessage(text);
break;
case "bye":
connected = false;
resumeId = "";
break;
default:
break;
}
} else {
Log.e(TAG, "Received message with type: null");
}
} catch (IOException e) {
Log.e(TAG, "Failed to recognize WebSocket message", e);
}
}
}
private void processMessage(String text) throws IOException {
CallOverallWebSocketMessage callOverallWebSocketMessage =
LoganSquare.parse(text, CallOverallWebSocketMessage.class);
if (callOverallWebSocketMessage.getCallWebSocketMessage() != null) {
NCSignalingMessage ncSignalingMessage = callOverallWebSocketMessage
.getCallWebSocketMessage()
.getNcSignalingMessage();
if (ncSignalingMessage != null && TextUtils.isEmpty(ncSignalingMessage.getFrom()) &&
callOverallWebSocketMessage.getCallWebSocketMessage().getSenderWebSocketMessage() != null) {
ncSignalingMessage.setFrom(
callOverallWebSocketMessage.getCallWebSocketMessage().getSenderWebSocketMessage().getSessionId());
}
signalingMessageReceiver.process(ncSignalingMessage);
}
}
private void processEventMessage(String text) throws IOException {
EventOverallWebSocketMessage eventOverallWebSocketMessage =
LoganSquare.parse(text, EventOverallWebSocketMessage.class);
if (eventOverallWebSocketMessage.getEventMap() != null) {
String target = (String) eventOverallWebSocketMessage.getEventMap().get("target");
if (target != null) {
switch (target) {
case TARGET_ROOM:
if ("message".equals(eventOverallWebSocketMessage.getEventMap().get("type"))) {
processRoomMessageMessage(eventOverallWebSocketMessage);
} else if ("join".equals(eventOverallWebSocketMessage.getEventMap().get("type"))) {
processRoomJoinMessage(eventOverallWebSocketMessage);
}
break;
case TARGET_PARTICIPANTS:
signalingMessageReceiver.process(eventOverallWebSocketMessage.getEventMap());
break;
default:
Log.i(TAG, "Received unknown/ignored event target: " + target);
break;
}
} else {
Log.w(TAG, "Received message with event target: null");
}
}
}
private void processRoomMessageMessage(EventOverallWebSocketMessage eventOverallWebSocketMessage) {
Map<String, Object> messageHashMap = (Map<String, Object>) eventOverallWebSocketMessage
.getEventMap()
.get("message");
if (messageHashMap != null && messageHashMap.containsKey("data")) {
Map<String, Object> dataHashMap = (Map<String, Object>) messageHashMap.get("data");
if (dataHashMap != null && dataHashMap.containsKey("chat")) {
Map<String, Object> chatMap = (Map<String, Object>) dataHashMap.get("chat");
if (chatMap != null && chatMap.containsKey("refresh") && (boolean) chatMap.get("refresh")) {
HashMap<String, String> refreshChatHashMap = new HashMap<>();
refreshChatHashMap.put(BundleKeys.KEY_ROOM_TOKEN, (String) messageHashMap.get("roomid"));
refreshChatHashMap.put(BundleKeys.KEY_INTERNAL_USER_ID, Long.toString(conversationUser.getId()));
eventBus.post(new WebSocketCommunicationEvent("refreshChat", refreshChatHashMap));
}
} else if (dataHashMap != null && dataHashMap.containsKey("recording")) {
Map<String, Object> recordingMap = (Map<String, Object>) dataHashMap.get("recording");
if (recordingMap != null && recordingMap.containsKey("status")) {
int status = ((Long) recordingMap.get("status")).intValue();
Log.d(TAG, "status is " + status);
HashMap<String, String> recordingHashMap = new HashMap<>();
recordingHashMap.put(BundleKeys.KEY_RECORDING_STATE, Integer.toString(status));
eventBus.post(new WebSocketCommunicationEvent("recordingStatus", recordingHashMap));
}
}
}
}
private void processRoomJoinMessage(EventOverallWebSocketMessage eventOverallWebSocketMessage) {
List<HashMap<String, Object>> joinEventList = (List<HashMap<String, Object>>) eventOverallWebSocketMessage
.getEventMap()
.get("join");
HashMap<String, Object> internalHashMap;
Participant participant;
for (int i = 0; i < joinEventList.size(); i++) {
internalHashMap = joinEventList.get(i);
HashMap<String, Object> userMap = (HashMap<String, Object>) internalHashMap.get("user");
participant = new Participant();
String userId = (String) internalHashMap.get("userid");
if (userId != null) {
participant.setActorType(USERS);
participant.setActorId(userId);
} else {
participant.setActorType(GUESTS);
// FIXME seems to be not given by the HPB: participant.setActorId();
}
if (userMap != null) {
// There is no "user" attribute for guest participants.
participant.setDisplayName((String) userMap.get("displayname"));
}
usersHashMap.put((String) internalHashMap.get("sessionid"), participant);
}
}
private void processJoinedRoomMessage(String text) throws IOException {
JoinedRoomOverallWebSocketMessage joinedRoomOverallWebSocketMessage =
LoganSquare.parse(text, JoinedRoomOverallWebSocketMessage.class);
if (joinedRoomOverallWebSocketMessage.getRoomWebSocketMessage() != null) {
currentRoomToken = joinedRoomOverallWebSocketMessage.getRoomWebSocketMessage().getRoomId();
if (joinedRoomOverallWebSocketMessage
.getRoomWebSocketMessage()
.getRoomPropertiesWebSocketMessage() != null &&
!TextUtils.isEmpty(currentRoomToken)) {
sendRoomJoinedEvent();
}
}
}
private void processErrorMessage(WebSocket webSocket, String text) throws IOException {
Log.e(TAG, "Received error: " + text);
ErrorOverallWebSocketMessage errorOverallWebSocketMessage =
LoganSquare.parse(text, ErrorOverallWebSocketMessage.class);
ErrorWebSocketMessage message = errorOverallWebSocketMessage.getErrorWebSocketMessage();
if(message != null) {
if ("no_such_session".equals(message.getCode())) {
Log.d(TAG, "WebSocket " + webSocket.hashCode() + " resumeID " + resumeId + " expired");
resumeId = "";
currentRoomToken = "";
restartWebSocket();
} else if ("hello_expected".equals(message.getCode())) {
restartWebSocket();
}
}
}
private void processHelloMessage(WebSocket webSocket, String text) throws IOException {
connected = true;
reconnecting = false;
String oldResumeId = resumeId;
HelloResponseOverallWebSocketMessage helloResponseWebSocketMessage =
LoganSquare.parse(text, HelloResponseOverallWebSocketMessage.class);
if (helloResponseWebSocketMessage.getHelloResponseWebSocketMessage() != null) {
resumeId = helloResponseWebSocketMessage.getHelloResponseWebSocketMessage().getResumeId();
sessionId = helloResponseWebSocketMessage.getHelloResponseWebSocketMessage().getSessionId();
hasMCU = helloResponseWebSocketMessage.getHelloResponseWebSocketMessage().serverHasMCUSupport();
}
for (int i = 0; i < messagesQueue.size(); i++) {
webSocket.send(messagesQueue.get(i));
}
messagesQueue = new ArrayList<>();
HashMap<String, String> helloHasHap = new HashMap<>();
if (!TextUtils.isEmpty(oldResumeId)) {
helloHasHap.put("oldResumeId", oldResumeId);
} else {
currentRoomToken = "";
}
if (!TextUtils.isEmpty(currentRoomToken)) {
helloHasHap.put(ROOM_TOKEN, currentRoomToken);
}
eventBus.post(new WebSocketCommunicationEvent("hello", helloHasHap));
}
private void sendRoomJoinedEvent() {
HashMap<String, String> joinRoomHashMap = new HashMap<>();
joinRoomHashMap.put(ROOM_TOKEN, currentRoomToken);
eventBus.post(new WebSocketCommunicationEvent("roomJoined", joinRoomHashMap));
}
@Override
public void onMessage(@NonNull WebSocket webSocket, ByteString bytes) {
Log.d(TAG, "Receiving bytes : " + bytes.hex());
}
@Override
public void onClosing(@NonNull WebSocket webSocket, int code, @NonNull String reason) {
Log.d(TAG, "Closing : " + code + " / " + reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
Log.d(TAG, "Error : WebSocket " + webSocket.hashCode() + " onFailure: " + t.getMessage());
closeWebSocket(webSocket);
}
public String getSessionId() {
return sessionId;
}
public boolean hasMCU() {
return hasMCU;
}
public void joinRoomWithRoomTokenAndSession(String roomToken, String normalBackendSession) {
Log.d(TAG, "joinRoomWithRoomTokenAndSession");
Log.d(TAG, " roomToken: " + roomToken);
Log.d(TAG, " session: " + normalBackendSession);
try {
String message = LoganSquare.serialize(
webSocketConnectionHelper.getAssembledJoinOrLeaveRoomModel(roomToken, normalBackendSession));
if (!connected || reconnecting) {
messagesQueue.add(message);
} else {
if (roomToken.equals(currentRoomToken)) {
sendRoomJoinedEvent();
} else {
internalWebSocket.send(message);
}
}
} catch (IOException e) {
Log.e(TAG, e.getMessage(), e);
}
}
private void sendCallMessage(NCSignalingMessage ncSignalingMessage) {
try {
String message = LoganSquare.serialize(
webSocketConnectionHelper.getAssembledCallMessageModel(ncSignalingMessage));
if (!connected || reconnecting) {
messagesQueue.add(message);
} else {
internalWebSocket.send(message);
}
} catch (IOException e) {
Log.e(TAG, "Failed to serialize signaling message", e);
}
}
void sendBye() {
if (connected) {
try {
ByeWebSocketMessage byeWebSocketMessage = new ByeWebSocketMessage();
byeWebSocketMessage.setType("bye");
byeWebSocketMessage.setBye(new HashMap<>());
internalWebSocket.send(LoganSquare.serialize(byeWebSocketMessage));
} catch (IOException e) {
Log.e(TAG, "Failed to serialize bye message");
}
}
}
public boolean isConnected() {
return connected;
}
public String getDisplayNameForSession(String session) {
Participant participant = usersHashMap.get(session);
if (participant != null) {
if (participant.getDisplayName() != null) {
return participant.getDisplayName();
}
}
return "";
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
public void onMessageEvent(NetworkEvent networkEvent) {
if (networkEvent.getNetworkConnectionEvent() == NetworkEvent.NetworkConnectionEvent.NETWORK_CONNECTED &&
!isConnected()) {
restartWebSocket();
}
}
public SignalingMessageReceiver getSignalingMessageReceiver() {
return signalingMessageReceiver;
}
public SignalingMessageSender getSignalingMessageSender() {
return signalingMessageSender;
}
/**
* Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
* class.
* <p>
* All listeners are called in the WebSocket reader thread. This thread should be the same as long as the WebSocket
* stays connected, but it may change whenever it is connected again.
*/
private static class ExternalSignalingMessageReceiver extends SignalingMessageReceiver {
public void process(Map<String, Object> eventMap) {
processEvent(eventMap);
}
public void process(NCSignalingMessage message) {
processSignalingMessage(message);
}
}
private class ExternalSignalingMessageSender implements SignalingMessageSender {
@Override
public void send(NCSignalingMessage ncSignalingMessage) {
sendCallMessage(ncSignalingMessage);
}
}
}

View File

@ -48,7 +48,7 @@ import okhttp3.OkHttpClient;
@AutoInjector(NextcloudTalkApplication.class) @AutoInjector(NextcloudTalkApplication.class)
public class WebSocketConnectionHelper { public class WebSocketConnectionHelper {
public static final String TAG = "WebSocketConnectionHelper"; public static final String TAG = "WebSocketConnectionHelper";
private static Map<Long, MagicWebSocketInstance> magicWebSocketInstanceMap = new HashMap<>(); private static Map<Long, WebSocketInstance> magicWebSocketInstanceMap = new HashMap<>();
@Inject @Inject
OkHttpClient okHttpClient; OkHttpClient okHttpClient;
@ -59,8 +59,8 @@ public class WebSocketConnectionHelper {
} }
@SuppressLint("LongLogTag") @SuppressLint("LongLogTag")
public static synchronized MagicWebSocketInstance getMagicWebSocketInstanceForUserId(long userId) { public static synchronized WebSocketInstance getMagicWebSocketInstanceForUserId(long userId) {
MagicWebSocketInstance webSocketInstance = magicWebSocketInstanceMap.get(userId); WebSocketInstance webSocketInstance = magicWebSocketInstanceMap.get(userId);
if (webSocketInstance == null) { if (webSocketInstance == null) {
Log.d(TAG, "No magicWebSocketInstance found for user " + userId); Log.d(TAG, "No magicWebSocketInstance found for user " + userId);
@ -69,9 +69,9 @@ public class WebSocketConnectionHelper {
return webSocketInstance; return webSocketInstance;
} }
public static synchronized MagicWebSocketInstance getExternalSignalingInstanceForServer(String url, public static synchronized WebSocketInstance getExternalSignalingInstanceForServer(String url,
User user, User user,
String webSocketTicket, boolean isGuest) { String webSocketTicket, boolean isGuest) {
String generatedURL = url.replace("https://", "wss://").replace("http://", "ws://"); String generatedURL = url.replace("https://", "wss://").replace("http://", "ws://");
if (generatedURL.endsWith("/")) { if (generatedURL.endsWith("/")) {
@ -82,24 +82,24 @@ public class WebSocketConnectionHelper {
long userId = isGuest ? -1 : user.getId(); long userId = isGuest ? -1 : user.getId();
MagicWebSocketInstance magicWebSocketInstance; WebSocketInstance webSocketInstance;
if (userId != -1 && magicWebSocketInstanceMap.containsKey(user.getId()) && (magicWebSocketInstance = magicWebSocketInstanceMap.get(user.getId())) != null) { if (userId != -1 && magicWebSocketInstanceMap.containsKey(user.getId()) && (webSocketInstance = magicWebSocketInstanceMap.get(user.getId())) != null) {
return magicWebSocketInstance; return webSocketInstance;
} else { } else {
if (userId == -1) { if (userId == -1) {
deleteExternalSignalingInstanceForUserEntity(userId); deleteExternalSignalingInstanceForUserEntity(userId);
} }
magicWebSocketInstance = new MagicWebSocketInstance(user, generatedURL, webSocketTicket); webSocketInstance = new WebSocketInstance(user, generatedURL, webSocketTicket);
magicWebSocketInstanceMap.put(user.getId(), magicWebSocketInstance); magicWebSocketInstanceMap.put(user.getId(), webSocketInstance);
return magicWebSocketInstance; return webSocketInstance;
} }
} }
public static synchronized void deleteExternalSignalingInstanceForUserEntity(long id) { public static synchronized void deleteExternalSignalingInstanceForUserEntity(long id) {
MagicWebSocketInstance magicWebSocketInstance; WebSocketInstance webSocketInstance;
if ((magicWebSocketInstance = magicWebSocketInstanceMap.get(id)) != null) { if ((webSocketInstance = magicWebSocketInstanceMap.get(id)) != null) {
if (magicWebSocketInstance.isConnected()) { if (webSocketInstance.isConnected()) {
magicWebSocketInstance.sendBye(); webSocketInstance.sendBye();
magicWebSocketInstanceMap.remove(id); magicWebSocketInstanceMap.remove(id);
} }
} }

View File

@ -0,0 +1,455 @@
/*
* Nextcloud Talk application
*
* @author Mario Danic
* Copyright (C) 2017-2018 Mario Danic <mario@lovelyhq.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.nextcloud.talk.webrtc
import android.content.Context
import android.text.TextUtils
import android.util.Log
import autodagger.AutoInjector
import com.bluelinelabs.logansquare.LoganSquare
import com.nextcloud.talk.application.NextcloudTalkApplication
import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedApplication
import com.nextcloud.talk.data.user.model.User
import com.nextcloud.talk.events.NetworkEvent
import com.nextcloud.talk.events.WebSocketCommunicationEvent
import com.nextcloud.talk.models.json.participants.Participant
import com.nextcloud.talk.models.json.participants.Participant.ActorType
import com.nextcloud.talk.models.json.signaling.NCSignalingMessage
import com.nextcloud.talk.models.json.websocket.BaseWebSocketMessage
import com.nextcloud.talk.models.json.websocket.ByeWebSocketMessage
import com.nextcloud.talk.models.json.websocket.CallOverallWebSocketMessage
import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage
import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage
import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage
import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage
import com.nextcloud.talk.signaling.SignalingMessageReceiver
import com.nextcloud.talk.signaling.SignalingMessageSender
import com.nextcloud.talk.utils.bundle.BundleKeys
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import okio.ByteString
import org.greenrobot.eventbus.EventBus
import org.greenrobot.eventbus.Subscribe
import org.greenrobot.eventbus.ThreadMode
import java.io.IOException
import javax.inject.Inject
@AutoInjector(NextcloudTalkApplication::class)
class WebSocketInstance internal constructor(
conversationUser: User,
connectionUrl: String,
webSocketTicket: String
) : WebSocketListener() {
@JvmField
@Inject
var okHttpClient: OkHttpClient? = null
@JvmField
@Inject
var eventBus: EventBus? = null
@JvmField
@Inject
var context: Context? = null
private val conversationUser: User
private val webSocketTicket: String
private var resumeId: String? = null
var sessionId: String? = null
private set
private var hasMCU = false
var isConnected: Boolean
private set
private val webSocketConnectionHelper: WebSocketConnectionHelper
private var internalWebSocket: WebSocket? = null
private val connectionUrl: String
private var currentRoomToken: String? = null
private var reconnecting = false
private val usersHashMap: HashMap<String?, Participant>
private var messagesQueue: MutableList<String> = ArrayList()
private val signalingMessageReceiver = ExternalSignalingMessageReceiver()
val signalingMessageSender = ExternalSignalingMessageSender()
init {
sharedApplication!!.componentApplication.inject(this)
this.connectionUrl = connectionUrl
this.conversationUser = conversationUser
this.webSocketTicket = webSocketTicket
webSocketConnectionHelper = WebSocketConnectionHelper()
usersHashMap = HashMap()
isConnected = false
eventBus!!.register(this)
restartWebSocket()
}
private fun sendHello() {
try {
if (TextUtils.isEmpty(resumeId)) {
internalWebSocket!!.send(
LoganSquare.serialize(
webSocketConnectionHelper
.getAssembledHelloModel(conversationUser, webSocketTicket)
)
)
} else {
internalWebSocket!!.send(
LoganSquare.serialize(
webSocketConnectionHelper
.getAssembledHelloModelForResume(resumeId)
)
)
}
} catch (e: IOException) {
Log.e(TAG, "Failed to serialize hello model")
}
}
override fun onOpen(webSocket: WebSocket, response: Response) {
internalWebSocket = webSocket
sendHello()
}
private fun closeWebSocket(webSocket: WebSocket) {
webSocket.close(1000, null)
webSocket.cancel()
if (webSocket === internalWebSocket) {
isConnected = false
messagesQueue = ArrayList()
}
restartWebSocket()
}
fun clearResumeId() {
resumeId = ""
}
fun restartWebSocket() {
reconnecting = true
// TODO when improving logging, keep in mind this issue: https://github.com/nextcloud/talk-android/issues/1013
Log.d(TAG, "restartWebSocket: $connectionUrl")
val request = Request.Builder().url(connectionUrl).build()
okHttpClient!!.newWebSocket(request, this)
}
override fun onMessage(webSocket: WebSocket, text: String) {
if (webSocket === internalWebSocket) {
Log.d(TAG, "Receiving : $webSocket $text")
try {
val (messageType) = LoganSquare.parse(text, BaseWebSocketMessage::class.java)
if (messageType != null) {
when (messageType) {
"hello" -> processHelloMessage(webSocket, text)
"error" -> processErrorMessage(webSocket, text)
"room" -> processJoinedRoomMessage(text)
"event" -> processEventMessage(text)
"message" -> processMessage(text)
"bye" -> {
isConnected = false
resumeId = ""
}
else -> {}
}
} else {
Log.e(TAG, "Received message with type: null")
}
} catch (e: IOException) {
Log.e(TAG, "Failed to recognize WebSocket message", e)
}
}
}
@Throws(IOException::class)
private fun processMessage(text: String) {
val (_, callWebSocketMessage) = LoganSquare.parse(text, CallOverallWebSocketMessage::class.java)
if (callWebSocketMessage != null) {
val ncSignalingMessage = callWebSocketMessage
.ncSignalingMessage
if (ncSignalingMessage != null &&
TextUtils.isEmpty(ncSignalingMessage.from) &&
callWebSocketMessage.senderWebSocketMessage != null
) {
ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId
}
signalingMessageReceiver.process(ncSignalingMessage)
}
}
@Throws(IOException::class)
private fun processEventMessage(text: String) {
val eventOverallWebSocketMessage = LoganSquare.parse(text, EventOverallWebSocketMessage::class.java)
if (eventOverallWebSocketMessage.eventMap != null) {
val target = eventOverallWebSocketMessage.eventMap!!["target"] as String?
if (target != null) {
when (target) {
Globals.TARGET_ROOM ->
if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) {
processRoomMessageMessage(eventOverallWebSocketMessage)
} else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) {
processRoomJoinMessage(eventOverallWebSocketMessage)
}
Globals.TARGET_PARTICIPANTS ->
signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
else ->
Log.i(TAG, "Received unknown/ignored event target: $target")
}
} else {
Log.w(TAG, "Received message with event target: null")
}
}
}
private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>?
if (messageHashMap != null && messageHashMap.containsKey("data")) {
val dataHashMap = messageHashMap["data"] as Map<*, *>?
if (dataHashMap != null && dataHashMap.containsKey("chat")) {
val chatMap = dataHashMap["chat"] as Map<*, *>?
if (chatMap != null && chatMap.containsKey("refresh") && chatMap["refresh"] as Boolean) {
val refreshChatHashMap = HashMap<String, String?>()
refreshChatHashMap[BundleKeys.KEY_ROOM_TOKEN] = messageHashMap["roomid"] as String?
refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString()
eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap))
}
} else if (dataHashMap != null && dataHashMap.containsKey("recording")) {
val recordingMap = dataHashMap["recording"] as Map<*, *>?
if (recordingMap != null && recordingMap.containsKey("status")) {
val status = (recordingMap["status"] as Long?)!!.toInt()
Log.d(TAG, "status is $status")
val recordingHashMap = HashMap<String, String>()
recordingHashMap[BundleKeys.KEY_RECORDING_STATE] = status.toString()
eventBus!!.post(WebSocketCommunicationEvent("recordingStatus", recordingHashMap))
}
}
}
}
private fun processRoomJoinMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
val joinEventList = eventOverallWebSocketMessage.eventMap?.get("join") as List<HashMap<String, Any>>?
var internalHashMap: HashMap<String, Any>
var participant: Participant
for (i in joinEventList!!.indices) {
internalHashMap = joinEventList[i]
val userMap = internalHashMap["user"] as HashMap<String, Any>?
participant = Participant()
val userId = internalHashMap["userid"] as String?
if (userId != null) {
participant.actorType = ActorType.USERS
participant.actorId = userId
} else {
participant.actorType = ActorType.GUESTS
// FIXME seems to be not given by the HPB: participant.setActorId();
}
if (userMap != null) {
// There is no "user" attribute for guest participants.
participant.displayName = userMap["displayname"] as String?
}
usersHashMap[internalHashMap["sessionid"] as String?] = participant
}
}
@Throws(IOException::class)
private fun processJoinedRoomMessage(text: String) {
val (_, roomWebSocketMessage) = LoganSquare.parse(text, JoinedRoomOverallWebSocketMessage::class.java)
if (roomWebSocketMessage != null) {
currentRoomToken = roomWebSocketMessage.roomId
if (roomWebSocketMessage
.roomPropertiesWebSocketMessage != null &&
!TextUtils.isEmpty(currentRoomToken)
) {
sendRoomJoinedEvent()
}
}
}
@Throws(IOException::class)
private fun processErrorMessage(webSocket: WebSocket, text: String) {
Log.e(TAG, "Received error: $text")
val (_, message) = LoganSquare.parse(text, ErrorOverallWebSocketMessage::class.java)
if (message != null) {
if ("no_such_session" == message.code) {
Log.d(TAG, "WebSocket " + webSocket.hashCode() + " resumeID " + resumeId + " expired")
resumeId = ""
currentRoomToken = ""
restartWebSocket()
} else if ("hello_expected" == message.code) {
restartWebSocket()
}
}
}
@Throws(IOException::class)
private fun processHelloMessage(webSocket: WebSocket, text: String) {
isConnected = true
reconnecting = false
val oldResumeId = resumeId
val (_, helloResponseWebSocketMessage1) = LoganSquare.parse(
text,
HelloResponseOverallWebSocketMessage::class.java
)
if (helloResponseWebSocketMessage1 != null) {
resumeId = helloResponseWebSocketMessage1.resumeId
sessionId = helloResponseWebSocketMessage1.sessionId
hasMCU = helloResponseWebSocketMessage1.serverHasMCUSupport()
}
for (i in messagesQueue.indices) {
webSocket.send(messagesQueue[i])
}
messagesQueue = ArrayList()
val helloHasHap = HashMap<String, String?>()
if (!TextUtils.isEmpty(oldResumeId)) {
helloHasHap["oldResumeId"] = oldResumeId
} else {
currentRoomToken = ""
}
if (!TextUtils.isEmpty(currentRoomToken)) {
helloHasHap[Globals.ROOM_TOKEN] = currentRoomToken
}
eventBus!!.post(WebSocketCommunicationEvent("hello", helloHasHap))
}
private fun sendRoomJoinedEvent() {
val joinRoomHashMap = HashMap<String, String?>()
joinRoomHashMap[Globals.ROOM_TOKEN] = currentRoomToken
eventBus!!.post(WebSocketCommunicationEvent("roomJoined", joinRoomHashMap))
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
Log.d(TAG, "Receiving bytes : " + bytes.hex())
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
Log.d(TAG, "Closing : $code / $reason")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
Log.d(TAG, "Error : WebSocket " + webSocket.hashCode() + " onFailure: " + t.message)
closeWebSocket(webSocket)
}
fun hasMCU(): Boolean {
return hasMCU
}
fun joinRoomWithRoomTokenAndSession(roomToken: String, normalBackendSession: String?) {
Log.d(TAG, "joinRoomWithRoomTokenAndSession")
Log.d(TAG, " roomToken: $roomToken")
Log.d(TAG, " session: $normalBackendSession")
try {
val message = LoganSquare.serialize(
webSocketConnectionHelper.getAssembledJoinOrLeaveRoomModel(roomToken, normalBackendSession)
)
if (!isConnected || reconnecting) {
messagesQueue.add(message)
} else {
if (roomToken == currentRoomToken) {
sendRoomJoinedEvent()
} else {
internalWebSocket!!.send(message)
}
}
} catch (e: IOException) {
Log.e(TAG, e.message, e)
}
}
private fun sendCallMessage(ncSignalingMessage: NCSignalingMessage) {
try {
val message = LoganSquare.serialize(
webSocketConnectionHelper.getAssembledCallMessageModel(ncSignalingMessage)
)
if (!isConnected || reconnecting) {
messagesQueue.add(message)
} else {
internalWebSocket!!.send(message)
}
} catch (e: IOException) {
Log.e(TAG, "Failed to serialize signaling message", e)
}
}
fun sendBye() {
if (isConnected) {
try {
val byeWebSocketMessage = ByeWebSocketMessage()
byeWebSocketMessage.type = "bye"
byeWebSocketMessage.bye = HashMap()
internalWebSocket!!.send(LoganSquare.serialize(byeWebSocketMessage))
} catch (e: IOException) {
Log.e(TAG, "Failed to serialize bye message")
}
}
}
fun getDisplayNameForSession(session: String?): String? {
val participant = usersHashMap[session]
if (participant != null) {
if (participant.displayName != null) {
return participant.displayName
}
}
return ""
}
@Subscribe(threadMode = ThreadMode.BACKGROUND)
fun onMessageEvent(networkEvent: NetworkEvent) {
if (networkEvent.networkConnectionEvent == NetworkEvent.NetworkConnectionEvent.NETWORK_CONNECTED &&
!isConnected
) {
restartWebSocket()
}
}
fun getSignalingMessageReceiver(): SignalingMessageReceiver {
return signalingMessageReceiver
}
/**
* Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
* class.
*
*
* All listeners are called in the WebSocket reader thread. This thread should be the same as long as the WebSocket
* stays connected, but it may change whenever it is connected again.
*/
private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() {
fun process(eventMap: Map<String, Any?>?) {
processEvent(eventMap)
}
fun process(message: NCSignalingMessage?) {
processSignalingMessage(message)
}
}
inner class ExternalSignalingMessageSender : SignalingMessageSender {
override fun send(ncSignalingMessage: NCSignalingMessage) {
sendCallMessage(ncSignalingMessage)
}
}
companion object {
private const val TAG = "MagicWebSocketInstance"
}
}