Use synchronized instead of Mutex

This commit is contained in:
Mygod
2021-05-25 14:14:38 -04:00
parent 4ad0e596d0
commit 2e83f8b22d

View File

@@ -12,8 +12,6 @@ import androidx.collection.set
import androidx.collection.valueIterator import androidx.collection.valueIterator
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.* import kotlinx.coroutines.channels.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.* import java.io.*
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
@@ -27,7 +25,7 @@ class RootServer {
abstract fun cancel() abstract fun cancel()
abstract fun shouldRemove(result: Byte): Boolean abstract fun shouldRemove(result: Byte): Boolean
abstract operator fun invoke(input: DataInputStream, result: Byte) abstract operator fun invoke(input: DataInputStream, result: Byte)
suspend fun sendClosed() = withContext(NonCancellable) { server.execute(CancelCommand(index)) } fun sendClosed() = server.execute(CancelCommand(index))
private fun initException(targetClass: Class<*>, message: String): Throwable { private fun initException(targetClass: Class<*>, message: String): Throwable {
@Suppress("NAME_SHADOWING") @Suppress("NAME_SHADOWING")
@@ -76,7 +74,7 @@ class RootServer {
when (result.toInt()) { when (result.toInt()) {
SUCCESS -> channel.trySend(input.readParcelable(classLoader)).onClosed { SUCCESS -> channel.trySend(input.readParcelable(classLoader)).onClosed {
active = false active = false
GlobalScope.launch(Dispatchers.Unconfined) { sendClosed() } sendClosed()
finish.completeExceptionally(it finish.completeExceptionally(it
?: ClosedSendChannelException("Channel was closed normally")) ?: ClosedSendChannelException("Channel was closed normally"))
return return
@@ -92,7 +90,7 @@ class RootServer {
private lateinit var process: Process private lateinit var process: Process
/** /**
* Thread safety: needs to be protected by mutex. * Thread safety: needs to be protected by callbackLookup.
*/ */
private lateinit var output: DataOutputStream private lateinit var output: DataOutputStream
@@ -101,7 +99,6 @@ class RootServer {
private var counter = 0L private var counter = 0L
private var callbackListenerExit: Deferred<Unit>? = null private var callbackListenerExit: Deferred<Unit>? = null
private val callbackLookup = LongSparseArray<Callback>() private val callbackLookup = LongSparseArray<Callback>()
private val mutex = Mutex()
private fun readUnexpectedStderr(): String? { private fun readUnexpectedStderr(): String? {
if (!this::process.isInitialized) return null if (!this::process.isInitialized) return null
@@ -176,7 +173,7 @@ class RootServer {
break break
} }
val result = input.readByte() val result = input.readByte()
val callback = mutex.synchronized { val callback = synchronized(callbackLookup) {
if (active) (callbackLookup[index] ?: error("Empty callback #$index")).also { if (active) (callbackLookup[index] ?: error("Empty callback #$index")).also {
if (it.shouldRemove(result)) { if (it.shouldRemove(result)) {
callbackLookup.remove(index) callbackLookup.remove(index)
@@ -238,14 +235,14 @@ class RootServer {
counter++ counter++
} }
suspend fun execute(command: RootCommandOneWay) = mutex.withLock { if (active) sendLocked(command) } fun execute(command: RootCommandOneWay) = synchronized(callbackLookup) { if (active) sendLocked(command) }
@Throws(RemoteException::class) @Throws(RemoteException::class)
suspend inline fun <reified T : Parcelable?> execute(command: RootCommand<T>) = suspend inline fun <reified T : Parcelable?> execute(command: RootCommand<T>) =
execute(command, T::class.java.classLoader) execute(command, T::class.java.classLoader)
@Throws(RemoteException::class) @Throws(RemoteException::class)
suspend fun <T : Parcelable?> execute(command: RootCommand<T>, classLoader: ClassLoader?): T { suspend fun <T : Parcelable?> execute(command: RootCommand<T>, classLoader: ClassLoader?): T {
val future = CompletableDeferred<T>() val future = CompletableDeferred<T>()
val callback = mutex.withLock { val callback = synchronized(callbackLookup) {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val callback = Callback.Ordinary(this, counter, classLoader, future as CompletableDeferred<Parcelable?>) val callback = Callback.Ordinary(this, counter, classLoader, future as CompletableDeferred<Parcelable?>)
if (active) { if (active) {
@@ -276,7 +273,7 @@ class RootServer {
else -> throw IllegalArgumentException("Unsupported channel capacity $it") else -> throw IllegalArgumentException("Unsupported channel capacity $it")
} }
}) { }) {
val callback = mutex.withLock { val callback = synchronized(callbackLookup) {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
val callback = Callback.Channel(this@RootServer, counter, classLoader, this as SendChannel<Parcelable?>) val callback = Callback.Channel(this@RootServer, counter, classLoader, this as SendChannel<Parcelable?>)
if (active) { if (active) {
@@ -293,7 +290,7 @@ class RootServer {
} }
} }
private suspend fun closeInternal(fromWorker: Boolean = false) = mutex.withLock { private suspend fun closeInternal(fromWorker: Boolean = false) = synchronized(callbackLookup) {
if (active) { if (active) {
active = false active = false
Logger.me.d(if (fromWorker) "Shutting down from worker" else "Shutting down from client") Logger.me.d(if (fromWorker) "Shutting down from worker" else "Shutting down from client")
@@ -349,10 +346,6 @@ class RootServer {
override fun resolveClass(desc: ObjectStreamClass) = Class.forName(desc.name, false, classLoader) override fun resolveClass(desc: ObjectStreamClass) = Class.forName(desc.name, false, classLoader)
}.readObject() }.readObject()
private inline fun <T> Mutex.synchronized(crossinline block: () -> T): T = runBlocking {
withLock { block() }
}
@JvmStatic @JvmStatic
fun main(args: Array<String>) { fun main(args: Array<String>) {
Thread.setDefaultUncaughtExceptionHandler { thread, throwable -> Thread.setDefaultUncaughtExceptionHandler { thread, throwable ->