fix TooManyRequestsException

By using
networkMonitor.isOnline.first()
the function
unregisterNetworkCallback
was triggered, which sometimes causes the ConnectivityManager$TooManyRequestsException.

So each time isOnline.first() was called, the callbackFlow would:
- Register a new NetworkCallback.
- Emit a value and cancel the flow.
- Unregister the NetworkCallback.

The exception was:
Exception android.net.ConnectivityManager$TooManyRequestsException:
  at android.net.ConnectivityManager.convertServiceException (ConnectivityManager.java:3771)
  at android.net.ConnectivityManager.sendRequestForNetwork (ConnectivityManager.java:3960)
  at android.net.ConnectivityManager.sendRequestForNetwork (ConnectivityManager.java:3967)
  at android.net.ConnectivityManager.registerNetworkCallback (ConnectivityManager.java:4349)
  at android.net.ConnectivityManager.registerNetworkCallback (ConnectivityManager.java:4319)
  at com.nextcloud.talk.data.network.NetworkMonitorImpl$isOnline$1.invokeSuspend (NetworkMonitorImpl.kt:61)

To fix this, the cold flow from callbackFlow is converted to a StateFlow.

Signed-off-by: Marcel Hibbe <dev@mhibbe.de>
This commit is contained in:
Marcel Hibbe 2025-01-14 15:38:47 +01:00
parent ce7ed877ca
commit 6e1114d4d8
No known key found for this signature in database
GPG Key ID: C793F8B59F43CE7B
9 changed files with 72 additions and 67 deletions

View File

@ -134,7 +134,7 @@ class OutcomingTextMessageViewHolder(itemView: View) :
} }
CoroutineScope(Dispatchers.Main).launch { CoroutineScope(Dispatchers.Main).launch {
if (message.isTemporary && !networkMonitor.isOnline.first()) { if (message.isTemporary && !networkMonitor.isOnline.value) {
updateStatus( updateStatus(
R.drawable.ic_signal_wifi_off_white_24dp, R.drawable.ic_signal_wifi_off_white_24dp,
context.resources?.getString(R.string.nc_message_offline) context.resources?.getString(R.string.nc_message_offline)

View File

@ -201,7 +201,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
@ -452,7 +451,7 @@ class ChatActivity :
this.lifecycleScope.launch { this.lifecycleScope.launch {
delay(DELAY_TO_SHOW_PROGRESS_BAR) delay(DELAY_TO_SHOW_PROGRESS_BAR)
if (adapter?.isEmpty == true && networkMonitor.isOnline.first()) { if (adapter?.isEmpty == true && networkMonitor.isOnline.value) {
binding.progressBar.visibility = View.VISIBLE binding.progressBar.visibility = View.VISIBLE
} }
} }
@ -927,7 +926,7 @@ class ChatActivity :
chatViewModel.getGeneralUIFlow.onEach { key -> chatViewModel.getGeneralUIFlow.onEach { key ->
when (key) { when (key) {
NO_OFFLINE_MESSAGES_FOUND -> { NO_OFFLINE_MESSAGES_FOUND -> {
if (networkMonitor.isOnline.first().not()) { if (networkMonitor.isOnline.value.not()) {
binding.offline.root.visibility = View.VISIBLE binding.offline.root.visibility = View.VISIBLE
} }
} }

View File

@ -54,7 +54,7 @@ class OfflineFirstChatRepository @Inject constructor(
private val chatDao: ChatMessagesDao, private val chatDao: ChatMessagesDao,
private val chatBlocksDao: ChatBlocksDao, private val chatBlocksDao: ChatBlocksDao,
private val network: ChatNetworkDataSource, private val network: ChatNetworkDataSource,
private val monitor: NetworkMonitor, private val networkMonitor: NetworkMonitor,
userProvider: CurrentUserProviderNew userProvider: CurrentUserProviderNew
) : ChatMessageRepository { ) : ChatMessageRepository {
@ -303,7 +303,7 @@ class OfflineFirstChatRepository @Inject constructor(
var showUnreadMessagesMarker = true var showUnreadMessagesMarker = true
while (true) { while (true) {
if (!monitor.isOnline.first() || itIsPaused) { if (!networkMonitor.isOnline.value || itIsPaused) {
Thread.sleep(HALF_SECOND) Thread.sleep(HALF_SECOND)
} else { } else {
// sync database with server // sync database with server
@ -530,7 +530,7 @@ class OfflineFirstChatRepository @Inject constructor(
} }
private suspend fun sync(bundle: Bundle): List<ChatMessageEntity>? { private suspend fun sync(bundle: Bundle): List<ChatMessageEntity>? {
if (!monitor.isOnline.first()) { if (!networkMonitor.isOnline.value) {
Log.d(TAG, "Device is offline, can't load chat messages from server") Log.d(TAG, "Device is offline, can't load chat messages from server")
return null return null
} }
@ -810,7 +810,7 @@ class OfflineFirstChatRepository @Inject constructor(
sendWithoutNotification: Boolean, sendWithoutNotification: Boolean,
referenceId: String referenceId: String
): Flow<Result<ChatMessage?>> { ): Flow<Result<ChatMessage?>> {
if (!monitor.isOnline.first()) { if (!networkMonitor.isOnline.value) {
return flow { return flow {
emit(Result.failure(IOException("Skipped to send message as device is offline"))) emit(Result.failure(IOException("Skipped to send message as device is offline")))
} }

View File

@ -138,7 +138,6 @@ import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.BehaviorSubject import io.reactivex.subjects.BehaviorSubject
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import org.apache.commons.lang3.builder.CompareToBuilder import org.apache.commons.lang3.builder.CompareToBuilder
@ -1359,7 +1358,7 @@ class ConversationsListActivity :
override fun onItemLongClick(position: Int) { override fun onItemLongClick(position: Int) {
this.lifecycleScope.launch { this.lifecycleScope.launch {
if (showShareToScreen || !networkMonitor.isOnline.first()) { if (showShareToScreen || !networkMonitor.isOnline.value) {
Log.d(TAG, "sharing to multiple rooms not yet implemented. onItemLongClick is ignored.") Log.d(TAG, "sharing to multiple rooms not yet implemented. onItemLongClick is ignored.")
} else { } else {
val clickedItem: Any? = adapter!!.getItem(position) val clickedItem: Any? = adapter!!.getItem(position)

View File

@ -39,7 +39,7 @@ class OfflineFirstConversationsRepository @Inject constructor(
private val dao: ConversationsDao, private val dao: ConversationsDao,
private val network: ConversationsNetworkDataSource, private val network: ConversationsNetworkDataSource,
private val chatNetworkDataSource: ChatNetworkDataSource, private val chatNetworkDataSource: ChatNetworkDataSource,
private val monitor: NetworkMonitor, private val networkMonitor: NetworkMonitor,
private val currentUserProviderNew: CurrentUserProviderNew private val currentUserProviderNew: CurrentUserProviderNew
) : OfflineConversationsRepository { ) : OfflineConversationsRepository {
override val roomListFlow: Flow<List<ConversationModel>> override val roomListFlow: Flow<List<ConversationModel>>
@ -58,7 +58,7 @@ class OfflineFirstConversationsRepository @Inject constructor(
val initialConversationModels = getListOfConversations(user.id!!) val initialConversationModels = getListOfConversations(user.id!!)
_roomListFlow.emit(initialConversationModels) _roomListFlow.emit(initialConversationModels)
if (monitor.isOnline.first()) { if (networkMonitor.isOnline.value) {
val conversationEntitiesFromSync = getRoomsFromServer() val conversationEntitiesFromSync = getRoomsFromServer()
if (!conversationEntitiesFromSync.isNullOrEmpty()) { if (!conversationEntitiesFromSync.isNullOrEmpty()) {
val conversationModelsFromSync = conversationEntitiesFromSync.map(ConversationEntity::asModel) val conversationModelsFromSync = conversationEntitiesFromSync.map(ConversationEntity::asModel)
@ -108,7 +108,7 @@ class OfflineFirstConversationsRepository @Inject constructor(
private suspend fun getRoomsFromServer(): List<ConversationEntity>? { private suspend fun getRoomsFromServer(): List<ConversationEntity>? {
var conversationsFromSync: List<ConversationEntity>? = null var conversationsFromSync: List<ConversationEntity>? = null
if (!monitor.isOnline.first()) { if (!networkMonitor.isOnline.value) {
Log.d(TAG, "Device is offline, can't load conversations from server") Log.d(TAG, "Device is offline, can't load conversations from server")
return null return null
} }

View File

@ -2,13 +2,14 @@
* Nextcloud Talk - Android Client * Nextcloud Talk - Android Client
* *
* SPDX-FileCopyrightText: 2024 Julius Linus <juliuslinus1@gmail.com> * SPDX-FileCopyrightText: 2024 Julius Linus <juliuslinus1@gmail.com>
* SPDX-FileCopyrightText: 2024 Marcel Hibbe <dev@mhibbe.de>
* SPDX-License-Identifier: GPL-3.0-or-later * SPDX-License-Identifier: GPL-3.0-or-later
*/ */
package com.nextcloud.talk.data.network package com.nextcloud.talk.data.network
import androidx.lifecycle.LiveData import androidx.lifecycle.LiveData
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.StateFlow
/** /**
* Utility for reporting app connectivity status. * Utility for reporting app connectivity status.
@ -17,7 +18,7 @@ interface NetworkMonitor {
/** /**
* Returns the device's current connectivity status. * Returns the device's current connectivity status.
*/ */
val isOnline: Flow<Boolean> val isOnline: StateFlow<Boolean>
/** /**
* Returns the device's current connectivity status as LiveData for better interop with Java code. * Returns the device's current connectivity status as LiveData for better interop with Java code.

View File

@ -2,6 +2,7 @@
* Nextcloud Talk - Android Client * Nextcloud Talk - Android Client
* *
* SPDX-FileCopyrightText: 2024 Julius Linus <juliuslinus1@gmail.com> * SPDX-FileCopyrightText: 2024 Julius Linus <juliuslinus1@gmail.com>
* SPDX-FileCopyrightText: 2024 Marcel Hibbe <dev@mhibbe.de>
* SPDX-License-Identifier: GPL-3.0-or-later * SPDX-License-Identifier: GPL-3.0-or-later
*/ */
@ -11,17 +12,17 @@ import android.content.Context
import android.net.ConnectivityManager import android.net.ConnectivityManager
import android.net.Network import android.net.Network
import android.net.NetworkCapabilities import android.net.NetworkCapabilities
import android.net.NetworkRequest.Builder import android.util.Log
import androidx.core.content.getSystemService import androidx.core.content.getSystemService
import androidx.lifecycle.LiveData import androidx.lifecycle.LiveData
import androidx.lifecycle.asLiveData import androidx.lifecycle.asLiveData
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.conflate import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flowOn
import javax.inject.Inject import javax.inject.Inject
import javax.inject.Singleton import javax.inject.Singleton
@ -29,49 +30,57 @@ import javax.inject.Singleton
class NetworkMonitorImpl @Inject constructor( class NetworkMonitorImpl @Inject constructor(
private val context: Context private val context: Context
) : NetworkMonitor { ) : NetworkMonitor {
private val connectivityManager = context.getSystemService<ConnectivityManager>()!!
override val isOnlineLiveData: LiveData<Boolean> override val isOnlineLiveData: LiveData<Boolean>
get() = isOnline.asLiveData() get() = isOnline.asLiveData()
override val isOnline: Flow<Boolean> = callbackFlow { override val isOnline: StateFlow<Boolean> get() = _isOnline
val connectivityManager = context.getSystemService<ConnectivityManager>()
if (connectivityManager == null) { private val _isOnline: StateFlow<Boolean> = callbackFlow {
channel.trySend(false) val callback = object : ConnectivityManager.NetworkCallback() {
channel.close() override fun onCapabilitiesChanged(network: Network, networkCapabilities: NetworkCapabilities) {
return@callbackFlow super.onCapabilitiesChanged(network, networkCapabilities)
val connected = networkCapabilities.hasCapability(
NetworkCapabilities.NET_CAPABILITY_VALIDATED
)
trySend(connected)
Log.d(TAG, "Network status changed: $connected")
} }
val networkRequest = Builder() override fun onUnavailable() {
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) super.onUnavailable()
.build() trySend(false)
Log.d(TAG, "Network status: onUnavailable")
val networkCallback = object : ConnectivityManager.NetworkCallback() {
private val networks = mutableSetOf<Network>()
override fun onAvailable(network: Network) {
networks += network
channel.trySend(true)
} }
override fun onLost(network: Network) { override fun onLost(network: Network) {
networks -= network super.onLost(network)
channel.trySend(networks.isNotEmpty()) trySend(false)
Log.d(TAG, "Network status: onLost")
}
override fun onAvailable(network: Network) {
super.onAvailable(network)
trySend(true)
Log.d(TAG, "Network status: onAvailable")
} }
} }
connectivityManager.registerNetworkCallback(networkRequest, networkCallback) connectivityManager.registerDefaultNetworkCallback(callback)
channel.trySend(connectivityManager.isCurrentlyConnected())
awaitClose { awaitClose {
connectivityManager.unregisterNetworkCallback(networkCallback) connectivityManager.unregisterNetworkCallback(callback)
} }
} }.stateIn(
.distinctUntilChanged() CoroutineScope(Dispatchers.IO),
.flowOn(Dispatchers.IO) SharingStarted.WhileSubscribed(COROUTINE_TIMEOUT),
.conflate() false
)
private fun ConnectivityManager.isCurrentlyConnected() = companion object {
activeNetwork private val TAG = NetworkMonitorImpl::class.java.simpleName
?.let(::getNetworkCapabilities) private const val COROUTINE_TIMEOUT = 5000L
?.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET) ?: false }
} }

View File

@ -51,7 +51,6 @@ import io.reactivex.Observer
import io.reactivex.android.schedulers.AndroidSchedulers import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.disposables.Disposable import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import java.util.Date import java.util.Date
import javax.inject.Inject import javax.inject.Inject
@ -134,7 +133,7 @@ class MessageActionsDialog(
initMenuAddToNote( initMenuAddToNote(
!message.isDeleted && !message.isDeleted &&
!ConversationUtils.isNoteToSelfConversation(currentConversation) && !ConversationUtils.isNoteToSelfConversation(currentConversation) &&
networkMonitor.isOnline.first(), networkMonitor.isOnline.value,
state.roomToken state.roomToken
) )
} }
@ -147,16 +146,16 @@ class MessageActionsDialog(
} }
} }
initMenuItems() initMenuItems(networkMonitor.isOnline.value)
} }
private fun initMenuItems() { private fun initMenuItems(isOnline: Boolean) {
this.lifecycleScope.launch { this.lifecycleScope.launch {
initMenuItemTranslate( initMenuItemTranslate(
!message.isDeleted && !message.isDeleted &&
ChatMessage.MessageType.REGULAR_TEXT_MESSAGE == message.getCalculateMessageType() && ChatMessage.MessageType.REGULAR_TEXT_MESSAGE == message.getCalculateMessageType() &&
CapabilitiesUtil.isTranslationsSupported(spreedCapabilities) && CapabilitiesUtil.isTranslationsSupported(spreedCapabilities) &&
networkMonitor.isOnline.first() isOnline
) )
initMenuEditorDetails(message.lastEditTimestamp!! != 0L && !message.isDeleted) initMenuEditorDetails(message.lastEditTimestamp!! != 0L && !message.isDeleted)
initMenuReplyToMessage(message.replyable && hasChatPermission) initMenuReplyToMessage(message.replyable && hasChatPermission)
@ -165,29 +164,29 @@ class MessageActionsDialog(
hasUserId(user) && hasUserId(user) &&
hasUserActorId(message) && hasUserActorId(message) &&
currentConversation?.type != ConversationEnums.ConversationType.ROOM_TYPE_ONE_TO_ONE_CALL && currentConversation?.type != ConversationEnums.ConversationType.ROOM_TYPE_ONE_TO_ONE_CALL &&
networkMonitor.isOnline.first() isOnline
) )
initMenuEditMessage(isMessageEditable) initMenuEditMessage(isMessageEditable)
initMenuDeleteMessage(showMessageDeletionButton && networkMonitor.isOnline.first()) initMenuDeleteMessage(showMessageDeletionButton && isOnline)
initMenuForwardMessage( initMenuForwardMessage(
ChatMessage.MessageType.REGULAR_TEXT_MESSAGE == message.getCalculateMessageType() && ChatMessage.MessageType.REGULAR_TEXT_MESSAGE == message.getCalculateMessageType() &&
!(message.isDeletedCommentMessage || message.isDeleted) && !(message.isDeletedCommentMessage || message.isDeleted) &&
networkMonitor.isOnline.first() isOnline
) )
initMenuRemindMessage( initMenuRemindMessage(
!message.isDeleted && !message.isDeleted &&
hasSpreedFeatureCapability(spreedCapabilities, SpreedFeatures.REMIND_ME_LATER) && hasSpreedFeatureCapability(spreedCapabilities, SpreedFeatures.REMIND_ME_LATER) &&
networkMonitor.isOnline.first() isOnline
) )
initMenuMarkAsUnread( initMenuMarkAsUnread(
message.previousMessageId > NO_PREVIOUS_MESSAGE_ID && message.previousMessageId > NO_PREVIOUS_MESSAGE_ID &&
ChatMessage.MessageType.SYSTEM_MESSAGE != message.getCalculateMessageType() && ChatMessage.MessageType.SYSTEM_MESSAGE != message.getCalculateMessageType() &&
networkMonitor.isOnline.first() isOnline
) )
initMenuShare(messageHasFileAttachment || messageHasRegularText && networkMonitor.isOnline.first()) initMenuShare(messageHasFileAttachment || messageHasRegularText && isOnline)
initMenuItemOpenNcApp( initMenuItemOpenNcApp(
ChatMessage.MessageType.SINGLE_NC_ATTACHMENT_MESSAGE == message.getCalculateMessageType() && ChatMessage.MessageType.SINGLE_NC_ATTACHMENT_MESSAGE == message.getCalculateMessageType() &&
networkMonitor.isOnline.first() isOnline
) )
initMenuItemSave(message.getCalculateMessageType() == ChatMessage.MessageType.SINGLE_NC_ATTACHMENT_MESSAGE) initMenuItemSave(message.getCalculateMessageType() == ChatMessage.MessageType.SINGLE_NC_ATTACHMENT_MESSAGE)
} }

View File

@ -23,7 +23,6 @@ import com.nextcloud.talk.databinding.DialogTempMessageActionsBinding
import com.nextcloud.talk.ui.theme.ViewThemeUtils import com.nextcloud.talk.ui.theme.ViewThemeUtils
import com.nextcloud.talk.utils.ApiUtils import com.nextcloud.talk.utils.ApiUtils
import com.nextcloud.talk.utils.DateUtils import com.nextcloud.talk.utils.DateUtils
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import javax.inject.Inject import javax.inject.Inject
@ -59,10 +58,9 @@ class TempMessageActionsDialog(
private fun initMenuItems() { private fun initMenuItems() {
this.lifecycleScope.launch { this.lifecycleScope.launch {
val isOnline = networkMonitor.isOnline.first() initResendMessage(message.sendingFailed && networkMonitor.isOnline.value)
initResendMessage(message.sendingFailed && isOnline) initMenuEditMessage(message.sendingFailed || !networkMonitor.isOnline.value)
initMenuEditMessage(message.sendingFailed || !isOnline) initMenuDeleteMessage(message.sendingFailed || !networkMonitor.isOnline.value)
initMenuDeleteMessage(message.sendingFailed || !isOnline)
initMenuItemCopy() initMenuItemCopy()
} }
} }