From dbde2a20d6b753d4fec11c0f7f65504072f61e46 Mon Sep 17 00:00:00 2001 From: Mygod Date: Tue, 25 May 2021 15:24:28 -0400 Subject: [PATCH] Make ip monitor nonblocking --- .../be/mygod/vpnhotspot/net/IpNeighbour.kt | 9 +-- .../mygod/vpnhotspot/net/monitor/IpMonitor.kt | 58 ++++++++++--------- .../net/monitor/IpNeighbourMonitor.kt | 4 +- 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/mobile/src/main/java/be/mygod/vpnhotspot/net/IpNeighbour.kt b/mobile/src/main/java/be/mygod/vpnhotspot/net/IpNeighbour.kt index a930a10a..80179905 100644 --- a/mobile/src/main/java/be/mygod/vpnhotspot/net/IpNeighbour.kt +++ b/mobile/src/main/java/be/mygod/vpnhotspot/net/IpNeighbour.kt @@ -8,7 +8,6 @@ import be.mygod.vpnhotspot.root.ReadArp import be.mygod.vpnhotspot.root.RootManager import be.mygod.vpnhotspot.util.parseNumericAddress import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.runBlocking import timber.log.Timber import java.io.File import java.io.FileNotFoundException @@ -47,7 +46,7 @@ data class IpNeighbour(val ip: InetAddress, val dev: String, val lladdr: MacAddr return setOf(dev) } - fun parse(line: String, fullMode: Boolean): List { + suspend fun parse(line: String, fullMode: Boolean): List { return if (line.isBlank()) emptyList() else try { val match = parser.matchEntire(line)!! val ip = parseNumericAddress(match.groupValues[2]) // by regex, ip is non-empty @@ -118,15 +117,13 @@ data class IpNeighbour(val ip: InetAddress, val dev: String, val lladdr: MacAddr .drop(1) .filter { it.size >= 6 && mac.matcher(it[ARP_HW_ADDRESS]).matches() } .toList() - private fun arp(): List> { + private suspend fun arp(): List> { if (System.nanoTime() - arpCacheTime >= ARP_CACHE_EXPIRE) try { arpCache = File("/proc/net/arp").bufferedReader().useLines { it.makeArp() } } catch (e: IOException) { if (e is FileNotFoundException && Build.VERSION.SDK_INT >= 29 && (e.cause as? ErrnoException)?.errno == OsConstants.EACCES) try { - arpCache = runBlocking { - RootManager.use { it.execute(ReadArp()) } - }.value.lineSequence().makeArp() + arpCache = RootManager.use { it.execute(ReadArp()) }.value.lineSequence().makeArp() } catch (eRoot: Exception) { eRoot.addSuppressed(e) if (eRoot !is CancellationException) Timber.w(eRoot) diff --git a/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpMonitor.kt b/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpMonitor.kt index 2ebb4f6e..7d127372 100644 --- a/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpMonitor.kt +++ b/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpMonitor.kt @@ -49,15 +49,16 @@ abstract class IpMonitor { private class FlushFailure : RuntimeException() protected abstract val monitoredObject: String - protected abstract fun processLine(line: String) - protected abstract fun processLines(lines: Sequence) + protected abstract suspend fun processLine(line: String) + protected abstract suspend fun processLines(lines: Sequence) @Volatile private var destroyed = false private var monitor: Process? = null - private val worker = Job() + private lateinit var worker: Job - private fun handleProcess(builder: ProcessBuilder) { + @Suppress("BlockingMethodInNonBlockingContext") + private suspend fun handleProcess(builder: ProcessBuilder) { val process = try { builder.start() } catch (e: IOException) { @@ -65,24 +66,25 @@ abstract class IpMonitor { return } monitor = process - val err = thread(name = "${javaClass.simpleName}-error") { + coroutineScope { + launch(Dispatchers.IO) { + try { + process.errorStream.bufferedReader().forEachLine { Timber.e(it) } + } catch (_: InterruptedIOException) { } catch (e: IOException) { + if (!e.isEBADF) Timber.w(e) + } + } try { - process.errorStream.bufferedReader().forEachLine { Timber.e(it) } + process.inputStream.bufferedReader().forEachLine { + if (errorMatcher.containsMatchIn(it)) { + Timber.w(it) + process.destroy() // move on to next mode + } else processLine(it) + } } catch (_: InterruptedIOException) { } catch (e: IOException) { if (!e.isEBADF) Timber.w(e) } } - try { - process.inputStream.bufferedReader().forEachLine { - if (errorMatcher.containsMatchIn(it)) { - Timber.w(it) - process.destroy() // move on to next mode - } else processLine(it) - } - } catch (_: InterruptedIOException) { } catch (e: IOException) { - if (!e.isEBADF) Timber.w(e) - } - err.join() Timber.d("Monitor process exited with ${process.waitFor()}") } private suspend fun handleChannel(channel: ReceiveChannel) { @@ -98,30 +100,30 @@ abstract class IpMonitor { } protected fun init() { - thread(name = "${javaClass.simpleName}-input") { + worker = GlobalScope.launch(Dispatchers.Unconfined) { val mode = currentMode if (mode.isMonitor) { if (mode != Mode.MonitorRoot) { // monitor may get rejected by SELinux enforcing - handleProcess(ProcessBuilder(Routing.IP, "monitor", monitoredObject)) - if (destroyed) return@thread + withContext(Dispatchers.IO) { + handleProcess(ProcessBuilder(Routing.IP, "monitor", monitoredObject)) + } + if (destroyed) return@launch } try { - runBlocking(worker) { - RootManager.use { server -> - // while we only need to use this server once, we need to also keep the server alive - handleChannel(server.create(ProcessListener(errorMatcher, Routing.IP, "monitor", monitoredObject), - this)) - } + RootManager.use { server -> + // while we only need to use this server once, we need to also keep the server alive + handleChannel(server.create(ProcessListener(errorMatcher, Routing.IP, "monitor", monitoredObject), + this)) } } catch (_: CancellationException) { } catch (e: Exception) { Timber.w(e) } - if (destroyed) return@thread + if (destroyed) return@launch app.logEvent("ip_monitor_failure") } - GlobalScope.launch(Dispatchers.IO + worker) { + withContext(Dispatchers.IO) { var server: RootServer? = null try { while (isActive) { diff --git a/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpNeighbourMonitor.kt b/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpNeighbourMonitor.kt index df0bb130..b0e07773 100644 --- a/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpNeighbourMonitor.kt +++ b/mobile/src/main/java/be/mygod/vpnhotspot/net/monitor/IpNeighbourMonitor.kt @@ -61,7 +61,7 @@ class IpNeighbourMonitor private constructor() : IpMonitor() { override val monitoredObject: String get() = "neigh" - override fun processLine(line: String) { + override suspend fun processLine(line: String) { val old = neighbours for (neighbour in IpNeighbour.parse(line, fullMode)) neighbours = when (neighbour.state) { IpNeighbour.State.DELETING -> neighbours.remove(IpDev(neighbour)) @@ -70,7 +70,7 @@ class IpNeighbourMonitor private constructor() : IpMonitor() { if (neighbours != old) aggregator.trySendBlocking(neighbours).onFailure { throw it!! } } - override fun processLines(lines: Sequence) { + override suspend fun processLines(lines: Sequence) { neighbours = lines .flatMap { IpNeighbour.parse(it, fullMode).asSequence() } .filter { it.state != IpNeighbour.State.DELETING } // skip entries without lladdr