Make ip monitor nonblocking

This commit is contained in:
Mygod
2021-05-25 15:24:28 -04:00
parent 2e83f8b22d
commit dbde2a20d6
3 changed files with 35 additions and 36 deletions

View File

@@ -8,7 +8,6 @@ import be.mygod.vpnhotspot.root.ReadArp
import be.mygod.vpnhotspot.root.RootManager import be.mygod.vpnhotspot.root.RootManager
import be.mygod.vpnhotspot.util.parseNumericAddress import be.mygod.vpnhotspot.util.parseNumericAddress
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.runBlocking
import timber.log.Timber import timber.log.Timber
import java.io.File import java.io.File
import java.io.FileNotFoundException import java.io.FileNotFoundException
@@ -47,7 +46,7 @@ data class IpNeighbour(val ip: InetAddress, val dev: String, val lladdr: MacAddr
return setOf(dev) return setOf(dev)
} }
fun parse(line: String, fullMode: Boolean): List<IpNeighbour> { suspend fun parse(line: String, fullMode: Boolean): List<IpNeighbour> {
return if (line.isBlank()) emptyList() else try { return if (line.isBlank()) emptyList() else try {
val match = parser.matchEntire(line)!! val match = parser.matchEntire(line)!!
val ip = parseNumericAddress(match.groupValues[2]) // by regex, ip is non-empty val ip = parseNumericAddress(match.groupValues[2]) // by regex, ip is non-empty
@@ -118,15 +117,13 @@ data class IpNeighbour(val ip: InetAddress, val dev: String, val lladdr: MacAddr
.drop(1) .drop(1)
.filter { it.size >= 6 && mac.matcher(it[ARP_HW_ADDRESS]).matches() } .filter { it.size >= 6 && mac.matcher(it[ARP_HW_ADDRESS]).matches() }
.toList() .toList()
private fun arp(): List<List<String>> { private suspend fun arp(): List<List<String>> {
if (System.nanoTime() - arpCacheTime >= ARP_CACHE_EXPIRE) try { if (System.nanoTime() - arpCacheTime >= ARP_CACHE_EXPIRE) try {
arpCache = File("/proc/net/arp").bufferedReader().useLines { it.makeArp() } arpCache = File("/proc/net/arp").bufferedReader().useLines { it.makeArp() }
} catch (e: IOException) { } catch (e: IOException) {
if (e is FileNotFoundException && Build.VERSION.SDK_INT >= 29 && if (e is FileNotFoundException && Build.VERSION.SDK_INT >= 29 &&
(e.cause as? ErrnoException)?.errno == OsConstants.EACCES) try { (e.cause as? ErrnoException)?.errno == OsConstants.EACCES) try {
arpCache = runBlocking { arpCache = RootManager.use { it.execute(ReadArp()) }.value.lineSequence().makeArp()
RootManager.use { it.execute(ReadArp()) }
}.value.lineSequence().makeArp()
} catch (eRoot: Exception) { } catch (eRoot: Exception) {
eRoot.addSuppressed(e) eRoot.addSuppressed(e)
if (eRoot !is CancellationException) Timber.w(eRoot) if (eRoot !is CancellationException) Timber.w(eRoot)

View File

@@ -49,15 +49,16 @@ abstract class IpMonitor {
private class FlushFailure : RuntimeException() private class FlushFailure : RuntimeException()
protected abstract val monitoredObject: String protected abstract val monitoredObject: String
protected abstract fun processLine(line: String) protected abstract suspend fun processLine(line: String)
protected abstract fun processLines(lines: Sequence<String>) protected abstract suspend fun processLines(lines: Sequence<String>)
@Volatile @Volatile
private var destroyed = false private var destroyed = false
private var monitor: Process? = null private var monitor: Process? = null
private val worker = Job() private lateinit var worker: Job
private fun handleProcess(builder: ProcessBuilder) { @Suppress("BlockingMethodInNonBlockingContext")
private suspend fun handleProcess(builder: ProcessBuilder) {
val process = try { val process = try {
builder.start() builder.start()
} catch (e: IOException) { } catch (e: IOException) {
@@ -65,24 +66,25 @@ abstract class IpMonitor {
return return
} }
monitor = process monitor = process
val err = thread(name = "${javaClass.simpleName}-error") { coroutineScope {
launch(Dispatchers.IO) {
try {
process.errorStream.bufferedReader().forEachLine { Timber.e(it) }
} catch (_: InterruptedIOException) { } catch (e: IOException) {
if (!e.isEBADF) Timber.w(e)
}
}
try { try {
process.errorStream.bufferedReader().forEachLine { Timber.e(it) } process.inputStream.bufferedReader().forEachLine {
if (errorMatcher.containsMatchIn(it)) {
Timber.w(it)
process.destroy() // move on to next mode
} else processLine(it)
}
} catch (_: InterruptedIOException) { } catch (e: IOException) { } catch (_: InterruptedIOException) { } catch (e: IOException) {
if (!e.isEBADF) Timber.w(e) if (!e.isEBADF) Timber.w(e)
} }
} }
try {
process.inputStream.bufferedReader().forEachLine {
if (errorMatcher.containsMatchIn(it)) {
Timber.w(it)
process.destroy() // move on to next mode
} else processLine(it)
}
} catch (_: InterruptedIOException) { } catch (e: IOException) {
if (!e.isEBADF) Timber.w(e)
}
err.join()
Timber.d("Monitor process exited with ${process.waitFor()}") Timber.d("Monitor process exited with ${process.waitFor()}")
} }
private suspend fun handleChannel(channel: ReceiveChannel<ProcessData>) { private suspend fun handleChannel(channel: ReceiveChannel<ProcessData>) {
@@ -98,30 +100,30 @@ abstract class IpMonitor {
} }
protected fun init() { protected fun init() {
thread(name = "${javaClass.simpleName}-input") { worker = GlobalScope.launch(Dispatchers.Unconfined) {
val mode = currentMode val mode = currentMode
if (mode.isMonitor) { if (mode.isMonitor) {
if (mode != Mode.MonitorRoot) { if (mode != Mode.MonitorRoot) {
// monitor may get rejected by SELinux enforcing // monitor may get rejected by SELinux enforcing
handleProcess(ProcessBuilder(Routing.IP, "monitor", monitoredObject)) withContext(Dispatchers.IO) {
if (destroyed) return@thread handleProcess(ProcessBuilder(Routing.IP, "monitor", monitoredObject))
}
if (destroyed) return@launch
} }
try { try {
runBlocking(worker) { RootManager.use { server ->
RootManager.use { server -> // while we only need to use this server once, we need to also keep the server alive
// while we only need to use this server once, we need to also keep the server alive handleChannel(server.create(ProcessListener(errorMatcher, Routing.IP, "monitor", monitoredObject),
handleChannel(server.create(ProcessListener(errorMatcher, Routing.IP, "monitor", monitoredObject), this))
this))
}
} }
} catch (_: CancellationException) { } catch (_: CancellationException) {
} catch (e: Exception) { } catch (e: Exception) {
Timber.w(e) Timber.w(e)
} }
if (destroyed) return@thread if (destroyed) return@launch
app.logEvent("ip_monitor_failure") app.logEvent("ip_monitor_failure")
} }
GlobalScope.launch(Dispatchers.IO + worker) { withContext(Dispatchers.IO) {
var server: RootServer? = null var server: RootServer? = null
try { try {
while (isActive) { while (isActive) {

View File

@@ -61,7 +61,7 @@ class IpNeighbourMonitor private constructor() : IpMonitor() {
override val monitoredObject: String get() = "neigh" override val monitoredObject: String get() = "neigh"
override fun processLine(line: String) { override suspend fun processLine(line: String) {
val old = neighbours val old = neighbours
for (neighbour in IpNeighbour.parse(line, fullMode)) neighbours = when (neighbour.state) { for (neighbour in IpNeighbour.parse(line, fullMode)) neighbours = when (neighbour.state) {
IpNeighbour.State.DELETING -> neighbours.remove(IpDev(neighbour)) IpNeighbour.State.DELETING -> neighbours.remove(IpDev(neighbour))
@@ -70,7 +70,7 @@ class IpNeighbourMonitor private constructor() : IpMonitor() {
if (neighbours != old) aggregator.trySendBlocking(neighbours).onFailure { throw it!! } if (neighbours != old) aggregator.trySendBlocking(neighbours).onFailure { throw it!! }
} }
override fun processLines(lines: Sequence<String>) { override suspend fun processLines(lines: Sequence<String>) {
neighbours = lines neighbours = lines
.flatMap { IpNeighbour.parse(it, fullMode).asSequence() } .flatMap { IpNeighbour.parse(it, fullMode).asSequence() }
.filter { it.state != IpNeighbour.State.DELETING } // skip entries without lladdr .filter { it.state != IpNeighbour.State.DELETING } // skip entries without lladdr