Refine aggregator

This commit is contained in:
Mygod
2019-08-12 13:01:37 +08:00
parent 5b3de2fda6
commit 05a79acf78
6 changed files with 31 additions and 40 deletions

View File

@@ -94,6 +94,7 @@ dependencies {
implementation 'com.takisoft.preferencex:preferencex-simplemenu:1.0.0' implementation 'com.takisoft.preferencex:preferencex-simplemenu:1.0.0'
implementation 'net.glxn.qrgen:android:2.0' implementation 'net.glxn.qrgen:android:2.0'
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion" implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlinVersion"
implementation 'org.jetbrains.kotlinx:kotlinx-collections-immutable:0.2'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC2' implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.0-RC2'
for (dep in aux) { for (dep in aux) {
freedomImplementation dep freedomImplementation dep

View File

@@ -5,12 +5,12 @@ import be.mygod.vpnhotspot.net.IpNeighbour
import be.mygod.vpnhotspot.net.monitor.IpNeighbourMonitor import be.mygod.vpnhotspot.net.monitor.IpNeighbourMonitor
abstract class IpNeighbourMonitoringService : Service(), IpNeighbourMonitor.Callback { abstract class IpNeighbourMonitoringService : Service(), IpNeighbourMonitor.Callback {
private var neighbours = emptyList<IpNeighbour>() private var neighbours: Collection<IpNeighbour> = emptyList()
protected abstract val activeIfaces: List<String> protected abstract val activeIfaces: List<String>
protected open val inactiveIfaces get() = emptyList<String>() protected open val inactiveIfaces get() = emptyList<String>()
override fun onIpNeighbourAvailable(neighbours: List<IpNeighbour>) { override fun onIpNeighbourAvailable(neighbours: Collection<IpNeighbour>) {
this.neighbours = neighbours this.neighbours = neighbours
updateNotification() updateNotification()
} }

View File

@@ -133,7 +133,7 @@ class LocalOnlyHotspotService : IpNeighbourMonitoringService(), CoroutineScope {
stopSelf() stopSelf()
} }
override fun onIpNeighbourAvailable(neighbours: List<IpNeighbour>) { override fun onIpNeighbourAvailable(neighbours: Collection<IpNeighbour>) {
super.onIpNeighbourAvailable(neighbours) super.onIpNeighbourAvailable(neighbours)
if (Build.VERSION.SDK_INT >= 28) timeoutMonitor?.onClientsChanged(neighbours.none { if (Build.VERSION.SDK_INT >= 28) timeoutMonitor?.onClientsChanged(neighbours.none {
it.state != IpNeighbour.State.FAILED it.state != IpNeighbour.State.FAILED

View File

@@ -27,7 +27,7 @@ class ClientViewModel : ViewModel(), ServiceConnection, IpNeighbourMonitor.Callb
private var repeater: RepeaterService.Binder? = null private var repeater: RepeaterService.Binder? = null
private var p2p: Collection<WifiP2pDevice> = emptyList() private var p2p: Collection<WifiP2pDevice> = emptyList()
private var neighbours = emptyList<IpNeighbour>() private var neighbours: Collection<IpNeighbour> = emptyList()
val clients = MutableLiveData<List<Client>>() val clients = MutableLiveData<List<Client>>()
private fun populateClients() { private fun populateClients() {
@@ -81,7 +81,7 @@ class ClientViewModel : ViewModel(), ServiceConnection, IpNeighbourMonitor.Callb
binder.groupChanged -= this binder.groupChanged -= this
} }
override fun onIpNeighbourAvailable(neighbours: List<IpNeighbour>) { override fun onIpNeighbourAvailable(neighbours: Collection<IpNeighbour>) {
this.neighbours = neighbours this.neighbours = neighbours
populateClients() populateClients()
} }

View File

@@ -218,7 +218,7 @@ class Routing(private val caller: Any, private val downstream: String) : IpNeigh
} }
} }
private val clients = HashMap<InetAddress, Client>() private val clients = HashMap<InetAddress, Client>()
override fun onIpNeighbourAvailable(neighbours: List<IpNeighbour>) = synchronized(this) { override fun onIpNeighbourAvailable(neighbours: Collection<IpNeighbour>) = synchronized(this) {
val toRemove = HashSet(clients.keys) val toRemove = HashSet(clients.keys)
for (neighbour in neighbours) { for (neighbour in neighbours) {
if (neighbour.dev != downstream || neighbour.ip !is Inet4Address || runBlocking { if (neighbour.dev != downstream || neighbour.ip !is Inet4Address || runBlocking {

View File

@@ -1,11 +1,13 @@
package be.mygod.vpnhotspot.net.monitor package be.mygod.vpnhotspot.net.monitor
import be.mygod.vpnhotspot.net.IpNeighbour import be.mygod.vpnhotspot.net.IpNeighbour
import kotlinx.coroutines.Dispatchers import kotlinx.collections.immutable.PersistentMap
import kotlinx.collections.immutable.persistentMapOf
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.sendBlocking
import java.net.InetAddress import java.net.InetAddress
import java.util.*
class IpNeighbourMonitor private constructor() : IpMonitor() { class IpNeighbourMonitor private constructor() : IpMonitor() {
companion object { companion object {
@@ -31,46 +33,34 @@ class IpNeighbourMonitor private constructor() : IpMonitor() {
} }
interface Callback { interface Callback {
fun onIpNeighbourAvailable(neighbours: List<IpNeighbour>) fun onIpNeighbourAvailable(neighbours: Collection<IpNeighbour>)
} }
private var updatePosted = false private val aggregator = GlobalScope.actor<PersistentMap<InetAddress, IpNeighbour>>(capacity = Channel.CONFLATED) {
private val neighbours = HashMap<InetAddress, IpNeighbour>() for (value in channel) {
val neighbours = value.values
synchronized(callbacks) { for (callback in callbacks) callback.onIpNeighbourAvailable(neighbours) }
}
}
private var neighbours = persistentMapOf<InetAddress, IpNeighbour>()
override val monitoredObject: String get() = "neigh" override val monitoredObject: String get() = "neigh"
override fun processLine(line: String) { override fun processLine(line: String) {
synchronized(neighbours) { val old = neighbours
if (IpNeighbour.parse(line).map { neighbour -> for (neighbour in IpNeighbour.parse(line)) neighbours = when (neighbour.state) {
if (neighbour.state == IpNeighbour.State.DELETING) IpNeighbour.State.DELETING -> neighbours.remove(neighbour.ip)
neighbours.remove(neighbour.ip) != null else -> neighbours.put(neighbour.ip, neighbour)
else neighbours.put(neighbour.ip, neighbour) != neighbour
}.any { it }) postUpdateLocked()
} }
if (neighbours != old) aggregator.sendBlocking(neighbours)
} }
override fun processLines(lines: Sequence<String>) { override fun processLines(lines: Sequence<String>) {
synchronized(neighbours) { neighbours = lines
neighbours.clear()
neighbours.putAll(lines
.flatMap { IpNeighbour.parse(it).asSequence() } .flatMap { IpNeighbour.parse(it).asSequence() }
.filter { it.state != IpNeighbour.State.DELETING } // skip entries without lladdr .filter { it.state != IpNeighbour.State.DELETING } // skip entries without lladdr
.associateBy { it.ip }) .associateByTo(persistentMapOf<InetAddress, IpNeighbour>().builder()) { it.ip }
postUpdateLocked() .build()
} aggregator.sendBlocking(neighbours)
}
private fun postUpdateLocked() {
if (updatePosted || instance != this) return
GlobalScope.launch {
val neighbours = synchronized(neighbours) {
updatePosted = false
neighbours.values.toList()
}
synchronized(callbacks) {
for (callback in callbacks) callback.onIpNeighbourAvailable(neighbours)
}
}
updatePosted = true
} }
} }