fix ANR error (blockingSubscribe in setupWebsocket)

The request seems to cause problems so blockingSubscribe doesn't seem to be a good choice here. This commit will replace blockingSubscribe by subscribe and modify the related code in order to be executed when the request succeeds.
The root cause why the request seems to cause problems may have to be analyzed further.

I was not able to reproduce the ANR without this PR, however the following error was reported on gplay console very often!:

      at jdk.internal.misc.Unsafe.park (Native method)
      at java.util.concurrent.locks.LockSupport.park (LockSupport.java:341)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block (AbstractQueuedSynchronizer.java:506)
      at java.util.concurrent.ForkJoinPool.unmanagedBlock (ForkJoinPool.java:3466)
      at java.util.concurrent.ForkJoinPool.managedBlock (ForkJoinPool.java:3437)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await (AbstractQueuedSynchronizer.java:1623)
      at java.util.concurrent.LinkedBlockingQueue.take (LinkedBlockingQueue.java:435)
      at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe (ObservableBlockingSubscribe.java:56)
      at io.reactivex.Observable.blockingSubscribe (Observable.java:5552)
      at com.nextcloud.talk.chat.ChatActivity.setupWebsocket (ChatActivity.kt:2445)
      at com.nextcloud.talk.chat.ChatActivity.joinRoomWithPassword (ChatActivity.kt:2402)
      at com.nextcloud.talk.chat.ChatActivity.initObservers$lambda$13 (ChatActivity.kt:594)
      at com.nextcloud.talk.chat.ChatActivity.$r8$lambda$QKH5JCFLmCzRMlSJ-EV-m4IW5ig (unavailable)
      at com.nextcloud.talk.chat.ChatActivity$$ExternalSyntheticLambda38.invoke (D8$$SyntheticClass)
      at com.nextcloud.talk.chat.ChatActivity$sam$androidx_lifecycle_Observer$0.onChanged (unavailable:2)
      at androidx.lifecycle.LiveData.considerNotify (LiveData.java:133)
      at androidx.lifecycle.LiveData.dispatchingValue (LiveData.java:151)
      at androidx.lifecycle.LiveData.setValue (LiveData.java:309)
      at androidx.lifecycle.MutableLiveData.setValue (MutableLiveData.java:50)
      at com.nextcloud.talk.chat.viewmodels.ChatViewModel.getCapabilities (ChatViewModel.kt:240)
      at com.nextcloud.talk.chat.ChatActivity$initObservers$1$1.invokeSuspend (ChatActivity.kt:553)
      at com.nextcloud.talk.chat.ChatActivity$initObservers$1$1.invoke (unavailable:8)
      at com.nextcloud.talk.chat.ChatActivity$initObservers$1$1.invoke (unavailable:4)
      at kotlinx.coroutines.flow.FlowKt__TransformKt$onEach$$inlined$unsafeTransform$1$2.emit (Emitters.kt:219)
      at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit (Errors.kt:154)
      at kotlinx.coroutines.flow.FlowKt__TransformKt$onEach$$inlined$unsafeTransform$1$2.emit (Emitters.kt:220)
      at kotlinx.coroutines.flow.SharedFlowImpl.collect$suspendImpl (SharedFlow.kt:392)
      at kotlinx.coroutines.flow.SharedFlowImpl$collect$1.invokeSuspend (unavailable:15)
      at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith (ContinuationImpl.kt:33)
      at kotlinx.coroutines.DispatchedTask.run (DispatchedTask.kt:104)
      at android.os.Handler.handleCallback (Handler.java:938)
      at android.os.Handler.dispatchMessage (Handler.java:99)
      at android.os.Looper.loopOnce (Looper.java:210)
      at android.os.Looper.loop (Looper.java:299)
      at android.app.ActivityThread.main (ActivityThread.java:8168)
      at java.lang.reflect.Method.invoke (Native method)
      at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run (RuntimeInit.java:556)
      at com.android.internal.os.ZygoteInit.main (ZygoteInit.java:1037)

Signed-off-by: Marcel Hibbe <dev@mhibbe.de>
This commit is contained in:
Marcel Hibbe 2024-11-14 11:50:26 +01:00
parent e51ab67c37
commit 3bd1a533e3
No known key found for this signature in database
GPG Key ID: C793F8B59F43CE7B

View File

@ -184,7 +184,9 @@ import com.stfalcon.chatkit.messages.MessageHolders.ContentChecker
import com.stfalcon.chatkit.messages.MessagesListAdapter
import com.stfalcon.chatkit.utils.DateFormatter
import io.reactivex.Observer
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
@ -702,14 +704,8 @@ class ChatActivity :
logConversationInfos("joinRoomWithPassword#onNext")
setupWebsocket()
if (webSocketInstance != null) {
webSocketInstance?.joinRoomWithRoomTokenAndSession(
roomToken,
sessionIdAfterRoomJoined,
externalSignalingServer?.federation
)
}
if (startCallFromNotification != null && startCallFromNotification) {
if (startCallFromNotification) {
startCallFromNotification = false
startACall(voiceOnly, false)
}
@ -2477,13 +2473,6 @@ class ChatActivity :
Log.d(TAG, "sessionID was valid -> skip joinRoom")
setupWebsocket()
if (webSocketInstance != null) {
webSocketInstance?.joinRoomWithRoomTokenAndSession(
roomToken,
sessionIdAfterRoomJoined,
externalSignalingServer?.federation
)
}
}
}
@ -2519,52 +2508,73 @@ class ChatActivity :
ncApi.getSignalingSettings(
credentials,
ApiUtils.getUrlForSignalingSettings(apiVersion, conversationUser!!.baseUrl, roomToken)
).blockingSubscribe(object : Observer<SignalingSettingsOverall> {
override fun onSubscribe(d: Disposable) {
// unused atm
}
override fun onNext(signalingSettingsOverall: SignalingSettingsOverall) {
if (signalingSettingsOverall.ocs!!.settings!!.externalSignalingServer == null ||
signalingSettingsOverall.ocs!!.settings!!.externalSignalingServer?.isEmpty() == true
) {
return
)
.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(object : Observer<SignalingSettingsOverall> {
override fun onSubscribe(d: Disposable) {
// unused atm
}
externalSignalingServer = ExternalSignalingServer()
externalSignalingServer!!.externalSignalingServer = signalingSettingsOverall.ocs!!.settings!!
.externalSignalingServer
externalSignalingServer!!.externalSignalingTicket = signalingSettingsOverall.ocs!!.settings!!
.externalSignalingTicket
externalSignalingServer!!.federation = signalingSettingsOverall.ocs!!.settings!!.federation
override fun onNext(signalingSettingsOverall: SignalingSettingsOverall) {
if (signalingSettingsOverall.ocs!!.settings!!.externalSignalingServer == null ||
signalingSettingsOverall.ocs!!.settings!!.externalSignalingServer?.isEmpty() == true
) {
return
}
webSocketInstance = WebSocketConnectionHelper.getExternalSignalingInstanceForServer(
externalSignalingServer!!.externalSignalingServer,
conversationUser,
externalSignalingServer!!.externalSignalingTicket,
TextUtils.isEmpty(credentials)
)
}
externalSignalingServer = ExternalSignalingServer()
externalSignalingServer!!.externalSignalingServer = signalingSettingsOverall.ocs!!.settings!!
.externalSignalingServer
externalSignalingServer!!.externalSignalingTicket = signalingSettingsOverall.ocs!!.settings!!
.externalSignalingTicket
externalSignalingServer!!.federation = signalingSettingsOverall.ocs!!.settings!!.federation
override fun onError(e: Throwable) {
Log.e(CallActivity.TAG, e.message, e)
}
webSocketInstance = WebSocketConnectionHelper.getExternalSignalingInstanceForServer(
externalSignalingServer!!.externalSignalingServer,
conversationUser,
externalSignalingServer!!.externalSignalingTicket,
TextUtils.isEmpty(credentials)
)
override fun onComplete() {
// unused atm
}
})
if (webSocketInstance != null) {
webSocketInstance?.joinRoomWithRoomTokenAndSession(
roomToken,
sessionIdAfterRoomJoined,
externalSignalingServer?.federation
)
}
signalingMessageSender = webSocketInstance?.signalingMessageSender
webSocketInstance?.getSignalingMessageReceiver()?.addListener(localParticipantMessageListener)
webSocketInstance?.getSignalingMessageReceiver()?.addListener(conversationMessageListener)
}
override fun onError(e: Throwable) {
Log.e(TAG, e.message, e)
}
override fun onComplete() {
// unused atm
}
})
} else {
webSocketInstance = WebSocketConnectionHelper.getWebSocketInstanceForUser(conversationUser!!)
}
if (webSocketInstance == null) {
Log.d(TAG, "webSocketInstance not set up. This should only happen when not using the HPB")
}
if (webSocketInstance != null) {
webSocketInstance?.joinRoomWithRoomTokenAndSession(
roomToken,
sessionIdAfterRoomJoined,
null
)
signalingMessageSender = webSocketInstance?.signalingMessageSender
webSocketInstance?.getSignalingMessageReceiver()?.addListener(localParticipantMessageListener)
webSocketInstance?.getSignalingMessageReceiver()?.addListener(conversationMessageListener)
signalingMessageSender = webSocketInstance?.signalingMessageSender
webSocketInstance?.getSignalingMessageReceiver()?.addListener(localParticipantMessageListener)
webSocketInstance?.getSignalingMessageReceiver()?.addListener(conversationMessageListener)
} else {
Log.d(TAG, "webSocketInstance not set up. This is only expected when not using the HPB")
}
}
}
private fun processCallStartedMessages(chatMessageList: List<ChatMessage>) {