From 2e83f8b22d9d0f1818cf6f9c10f145a1e218019c Mon Sep 17 00:00:00 2001 From: Mygod Date: Tue, 25 May 2021 14:14:38 -0400 Subject: [PATCH] Use synchronized instead of Mutex --- .../be/mygod/librootkotlinx/RootServer.kt | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/mobile/src/main/java/be/mygod/librootkotlinx/RootServer.kt b/mobile/src/main/java/be/mygod/librootkotlinx/RootServer.kt index e0c8fb93..f6f5fc4e 100644 --- a/mobile/src/main/java/be/mygod/librootkotlinx/RootServer.kt +++ b/mobile/src/main/java/be/mygod/librootkotlinx/RootServer.kt @@ -12,8 +12,6 @@ import androidx.collection.set import androidx.collection.valueIterator import kotlinx.coroutines.* import kotlinx.coroutines.channels.* -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock import java.io.* import java.util.* import java.util.concurrent.CountDownLatch @@ -27,7 +25,7 @@ class RootServer { abstract fun cancel() abstract fun shouldRemove(result: Byte): Boolean abstract operator fun invoke(input: DataInputStream, result: Byte) - suspend fun sendClosed() = withContext(NonCancellable) { server.execute(CancelCommand(index)) } + fun sendClosed() = server.execute(CancelCommand(index)) private fun initException(targetClass: Class<*>, message: String): Throwable { @Suppress("NAME_SHADOWING") @@ -76,7 +74,7 @@ class RootServer { when (result.toInt()) { SUCCESS -> channel.trySend(input.readParcelable(classLoader)).onClosed { active = false - GlobalScope.launch(Dispatchers.Unconfined) { sendClosed() } + sendClosed() finish.completeExceptionally(it ?: ClosedSendChannelException("Channel was closed normally")) return @@ -92,7 +90,7 @@ class RootServer { private lateinit var process: Process /** - * Thread safety: needs to be protected by mutex. + * Thread safety: needs to be protected by callbackLookup. */ private lateinit var output: DataOutputStream @@ -101,7 +99,6 @@ class RootServer { private var counter = 0L private var callbackListenerExit: Deferred? = null private val callbackLookup = LongSparseArray() - private val mutex = Mutex() private fun readUnexpectedStderr(): String? { if (!this::process.isInitialized) return null @@ -176,7 +173,7 @@ class RootServer { break } val result = input.readByte() - val callback = mutex.synchronized { + val callback = synchronized(callbackLookup) { if (active) (callbackLookup[index] ?: error("Empty callback #$index")).also { if (it.shouldRemove(result)) { callbackLookup.remove(index) @@ -238,14 +235,14 @@ class RootServer { counter++ } - suspend fun execute(command: RootCommandOneWay) = mutex.withLock { if (active) sendLocked(command) } + fun execute(command: RootCommandOneWay) = synchronized(callbackLookup) { if (active) sendLocked(command) } @Throws(RemoteException::class) suspend inline fun execute(command: RootCommand) = execute(command, T::class.java.classLoader) @Throws(RemoteException::class) suspend fun execute(command: RootCommand, classLoader: ClassLoader?): T { val future = CompletableDeferred() - val callback = mutex.withLock { + val callback = synchronized(callbackLookup) { @Suppress("UNCHECKED_CAST") val callback = Callback.Ordinary(this, counter, classLoader, future as CompletableDeferred) if (active) { @@ -276,7 +273,7 @@ class RootServer { else -> throw IllegalArgumentException("Unsupported channel capacity $it") } }) { - val callback = mutex.withLock { + val callback = synchronized(callbackLookup) { @Suppress("UNCHECKED_CAST") val callback = Callback.Channel(this@RootServer, counter, classLoader, this as SendChannel) if (active) { @@ -293,7 +290,7 @@ class RootServer { } } - private suspend fun closeInternal(fromWorker: Boolean = false) = mutex.withLock { + private suspend fun closeInternal(fromWorker: Boolean = false) = synchronized(callbackLookup) { if (active) { active = false Logger.me.d(if (fromWorker) "Shutting down from worker" else "Shutting down from client") @@ -349,10 +346,6 @@ class RootServer { override fun resolveClass(desc: ObjectStreamClass) = Class.forName(desc.name, false, classLoader) }.readObject() - private inline fun Mutex.synchronized(crossinline block: () -> T): T = runBlocking { - withLock { block() } - } - @JvmStatic fun main(args: Array) { Thread.setDefaultUncaughtExceptionHandler { thread, throwable ->