Compare commits

...

3 Commits

15 changed files with 371 additions and 164 deletions

View File

@ -74,15 +74,16 @@ public fun <T, R> DeviceState.Companion.map(
public fun <T, R> DeviceState<T>.map(mapper: (T) -> R): DeviceStateWithDependencies<R> = DeviceState.map(this, mapper) public fun <T, R> DeviceState<T>.map(mapper: (T) -> R): DeviceStateWithDependencies<R> = DeviceState.map(this, mapper)
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> = object : DeviceState<Double> { public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> =
override val value: Double object : DeviceState<Double> {
get() = this@values.value.value override val value: Double
get() = this@values.value.value
override val valueFlow: Flow<Double> override val valueFlow: Flow<Double>
get() = this@values.valueFlow.map { it.value } get() = this@values.valueFlow.map { it.value }
override fun toString(): String = this@values.toString() override fun toString(): String = this@values.toString()
} }
/** /**
* Combine two device states into one read-only [DeviceState]. Only the latest value of each state is used. * Combine two device states into one read-only [DeviceState]. Only the latest value of each state is used.

View File

@ -41,13 +41,22 @@ private object InstantConverter : MetaConverter<Instant> {
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> { private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? = source.value?.doubleArray?.let { (start, end)-> override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? =
start..end source.value?.doubleArray?.let { (start, end) ->
} start..end
}
override fun convert( override fun convert(
obj: ClosedFloatingPointRange<Double>, obj: ClosedFloatingPointRange<Double>,
): Meta = Meta(doubleArrayOf(obj.start, obj.endInclusive).asValue()) ): Meta = Meta(doubleArrayOf(obj.start, obj.endInclusive).asValue())
} }
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
private object StringListConverter : MetaConverter<List<String>> {
override fun convert(obj: List<String>): Meta = Meta(obj.map { it.asValue() }.asValue())
override fun readOrNull(source: Meta): List<String>? = source.stringList ?: source["@jsonArray"]?.stringList
}
public val MetaConverter.Companion.stringList: MetaConverter<List<String>> get() = StringListConverter

View File

@ -2,14 +2,13 @@ package space.kscience.controls.peer
import space.kscience.dataforge.io.Envelope import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/** /**
* A manager that allows direct synchronous sending and receiving binary data * A manager that allows direct synchronous sending and receiving binary data
*/ */
public interface PeerConnection { public interface PeerConnection {
/** /**
* Receive an [Envelope] from a device with name [deviceName] on a given [address] with given [contentId]. * Receive an [Envelope] from a device on a given [address] with given [contentId].
* *
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or * The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name. * magix endpoint name.
@ -20,13 +19,12 @@ public interface PeerConnection {
*/ */
public suspend fun receive( public suspend fun receive(
address: String, address: String,
deviceName: Name,
contentId: String, contentId: String,
requestMeta: Meta = Meta.EMPTY, requestMeta: Meta = Meta.EMPTY,
): Envelope ): Envelope
/** /**
* Send an [envelope] to a device with name [deviceName] on a given [address] * Send an [envelope] to a device on a given [address]
* *
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or * The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name. * magix endpoint name.
@ -35,7 +33,6 @@ public interface PeerConnection {
*/ */
public suspend fun send( public suspend fun send(
address: String, address: String,
deviceName: Name,
envelope: Envelope, envelope: Envelope,
requestMeta: Meta = Meta.EMPTY, requestMeta: Meta = Meta.EMPTY,
) )

View File

@ -19,10 +19,13 @@ import space.kscience.dataforge.meta.Meta
public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T = public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T =
propertySpec.converter.readOrNull(readProperty(propertySpec.name)) ?: error("Property read result is not valid") propertySpec.converter.readOrNull(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T = public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T =
propertySpec.converter.read(getOrReadProperty(propertySpec.name)) propertySpec.converter.read(getOrReadProperty(propertySpec.name))
public fun <T> DeviceClient.getCached(propertySpec: DevicePropertySpec<*, T>): T? =
getProperty(propertySpec.name)?.let { propertySpec.converter.read(it) }
public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) { public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) {
writeProperty(propertySpec.name, propertySpec.converter.convert(value)) writeProperty(propertySpec.name, propertySpec.converter.convert(value))
} }

View File

@ -38,6 +38,7 @@ import space.kscience.dataforge.names.get
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
import space.kscience.magix.server.magixModule import space.kscience.magix.server.magixModule
@ -215,5 +216,6 @@ public fun Application.deviceManagerModule(
plugins.forEach { plugins.forEach {
it.start(this, magixFlow) it.start(this, magixFlow)
} }
magixModule(magixFlow) magixModule(magixFlow)
} }

View File

@ -13,7 +13,9 @@ kscience {
commonMain { commonMain {
implementation(projects.controlsVisualisationCompose) implementation(projects.controlsVisualisationCompose)
implementation(projects.controlsConstructor) implementation(projects.controlsConstructor)
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket) implementation(projects.magix.magixRsocket)
implementation(projects.controlsMagix)
} }
jvmMain { jvmMain {
// implementation("io.ktor:ktor-server-cio") // implementation("io.ktor:ktor-server-cio")

View File

@ -3,27 +3,36 @@
package space.kscience.controls.demo.collective package space.kscience.controls.demo.collective
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.controls.constructor.DeviceConstructor import space.kscience.controls.constructor.*
import space.kscience.controls.constructor.MutableDeviceState import space.kscience.controls.misc.stringList
import space.kscience.controls.constructor.registerAsProperty import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.DeviceSpec import space.kscience.controls.spec.DeviceSpec
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.MetaConverter import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.meta.Scheme import space.kscience.dataforge.meta.Scheme
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.maps.coordinates.Gmc import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.GmcCurve
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
class CollectiveDeviceConfiguration(deviceId: DeviceId) : Scheme() { typealias CollectiveDeviceId = String
class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() {
var deviceId by string(deviceId) var deviceId by string(deviceId)
var description by string() var description by string()
var reportInterval by int(500)
var radioFrequency by string(default = "169 MHz")
} }
typealias CollectiveDeviceRoster = Map<CollectiveDeviceId, CollectiveDeviceConfiguration>
interface CollectiveDevice : Device { interface CollectiveDevice : Device {
public val id: DeviceId public val id: CollectiveDeviceId
public val peerConnection: PeerConnection
suspend fun getPosition(): Gmc suspend fun getPosition(): Gmc
@ -31,8 +40,7 @@ interface CollectiveDevice : Device {
suspend fun setVelocity(value: GmcVelocity) suspend fun setVelocity(value: GmcVelocity)
suspend fun listVisible(): Collection<DeviceId> suspend fun listVisible(): Collection<CollectiveDeviceId>
companion object : DeviceSpec<CollectiveDevice>() { companion object : DeviceSpec<CollectiveDevice>() {
val position by property<Gmc>( val position by property<Gmc>(
@ -45,6 +53,17 @@ interface CollectiveDevice : Device {
read = { getVelocity() }, read = { getVelocity() },
write = { _, value -> setVelocity(value) } write = { _, value -> setVelocity(value) }
) )
val visibleNeighbors by property(
MetaConverter.stringList,
read = {
listVisible().toList()
}
)
// val listVisible by action(MetaConverter.unit, MetaConverter.valueList<String> { it.string }) {
// listVisible().toList()
// }
} }
} }
@ -54,13 +73,34 @@ class CollectiveDeviceConstructor(
val configuration: CollectiveDeviceConfiguration, val configuration: CollectiveDeviceConfiguration,
position: MutableDeviceState<Gmc>, position: MutableDeviceState<Gmc>,
velocity: MutableDeviceState<GmcVelocity>, velocity: MutableDeviceState<GmcVelocity>,
private val listVisible: suspend () -> Collection<DeviceId>, override val peerConnection: PeerConnection,
private val observation: suspend () -> Map<CollectiveDeviceId, GmcCurve>,
) : DeviceConstructor(context, configuration.meta), CollectiveDevice { ) : DeviceConstructor(context, configuration.meta), CollectiveDevice {
override val id: DeviceId get() = configuration.deviceId override val id: CollectiveDeviceId get() = configuration.deviceId
val position = registerAsProperty(CollectiveDevice.position, position.sample(500.milliseconds)) val position = registerAsProperty(
val velocity = registerAsProperty(CollectiveDevice.velocity, velocity) CollectiveDevice.position,
position.sample(configuration.reportInterval.milliseconds)
)
val velocity = registerAsProperty(
CollectiveDevice.velocity,
velocity.sample(configuration.reportInterval.milliseconds)
)
private val _visibleNeighbors: MutableDeviceState<Collection<CollectiveDeviceId>> = stateOf(emptyList())
val visibleNeighbors = registerAsProperty(
CollectiveDevice.visibleNeighbors,
_visibleNeighbors.map { it.toList() }
)
init {
position.onNext {
_visibleNeighbors.value = observation.invoke().keys
}
}
override suspend fun getPosition(): Gmc = position.value override suspend fun getPosition(): Gmc = position.value
@ -70,5 +110,5 @@ class CollectiveDeviceConstructor(
velocity.value = value velocity.value = value
} }
override suspend fun listVisible(): Collection<DeviceId> = listVisible.invoke() override suspend fun listVisible(): Collection<CollectiveDeviceId> = observation.invoke().keys
} }

View File

@ -1,27 +1,39 @@
package space.kscience.controls.demo.collective package space.kscience.controls.demo.collective
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.constructor.ModelConstructor import space.kscience.controls.constructor.ModelConstructor
import space.kscience.controls.constructor.MutableDeviceState import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.onTimer import space.kscience.controls.constructor.onTimer
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.peer.PeerConnection
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.startMagixServer
import space.kscience.maps.coordinates.* import space.kscience.maps.coordinates.*
typealias DeviceId = String internal data class CollectiveDeviceState(
val id: CollectiveDeviceId,
internal data class VirtualDeviceState(
val id: DeviceId,
val configuration: CollectiveDeviceConfiguration, val configuration: CollectiveDeviceConfiguration,
val position: MutableDeviceState<Gmc>, val position: MutableDeviceState<Gmc>,
val velocity: MutableDeviceState<GmcVelocity>, val velocity: MutableDeviceState<GmcVelocity>,
) )
internal fun VirtualDeviceState( internal fun VirtualDeviceState(
id: DeviceId, id: CollectiveDeviceId,
position: Gmc, position: Gmc,
configuration: CollectiveDeviceConfiguration.() -> Unit = {}, configuration: CollectiveDeviceConfiguration.() -> Unit = {},
) = VirtualDeviceState( ) = CollectiveDeviceState(
id, id,
CollectiveDeviceConfiguration(id).apply(configuration), CollectiveDeviceConfiguration(id).apply(configuration),
MutableDeviceState(position), MutableDeviceState(position),
@ -31,9 +43,10 @@ internal fun VirtualDeviceState(
internal class DeviceCollectiveModel( internal class DeviceCollectiveModel(
context: Context, context: Context,
val deviceStates: Collection<VirtualDeviceState>, val deviceStates: Collection<CollectiveDeviceState>,
val visibilityRange: Distance, val visibilityRange: Distance = 0.5.kilometers,
) : ModelConstructor(context) { val radioRange: Distance = 5.kilometers,
) : ModelConstructor(context), PeerConnection {
/** /**
* Propagate movement * Propagate movement
@ -45,7 +58,7 @@ internal class DeviceCollectiveModel(
} }
} }
suspend fun locateVisible(id: DeviceId): Map<DeviceId, GmcCurve> { private fun locateVisible(id: CollectiveDeviceId): Map<CollectiveDeviceId, GmcCurve> {
val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value } val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value }
val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id") val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id")
@ -56,4 +69,53 @@ internal class DeviceCollectiveModel(
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange } return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
} }
val devices = deviceStates.associate {
val device = CollectiveDeviceConstructor(
context = context,
configuration = it.configuration,
position = it.position,
velocity = it.velocity,
peerConnection = this,
) {
locateVisible(it.id)
}
//start movement program
device.moveInCircles()
it.id to device
}
val roster = deviceStates.associate { it.id to it.configuration }
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope {
TODO("Not yet implemented")
}
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
// devices.values.filter { it.configuration.radioFrequency == address }.forEach { device ->
// ```
// }
}
}
internal fun CoroutineScope.launchCollectiveMagixServer(
collectiveModel: DeviceCollectiveModel,
): Job = launch(Dispatchers.IO) {
val server = startMagixServer(
// RSocketMagixFlowPlugin()
)
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
collectiveModel.devices.forEach { (id, device) ->
val deviceContext = collectiveModel.context.buildContext(id.parseAsName()) {
coroutineContext(coroutineContext)
plugin(DeviceManager)
}
deviceContext.install(id, device)
// val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id)
}
} }

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.sample import kotlinx.coroutines.flow.sample
import space.kscience.controls.constructor.DeviceState import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.MutableDeviceState
import kotlin.time.Duration import kotlin.time.Duration
@OptIn(FlowPreview::class) @OptIn(FlowPreview::class)
@ -11,10 +12,24 @@ class SampleDeviceState<T>(
val origin: DeviceState<T>, val origin: DeviceState<T>,
val interval: Duration, val interval: Duration,
) : DeviceState<T> { ) : DeviceState<T> {
override val value: T get() = origin.value override val value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval) override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)" override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
} }
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
@OptIn(FlowPreview::class)
class MutableSampleDeviceState<T>(
val origin: MutableDeviceState<T>,
val interval: Duration,
) : MutableDeviceState<T> {
override var value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> MutableDeviceState<T>.sample(interval: Duration) = MutableSampleDeviceState(this, interval)

View File

@ -12,6 +12,8 @@ import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.kilometers import space.kscience.maps.coordinates.kilometers
import kotlin.math.PI import kotlin.math.PI
import kotlin.random.Random import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
private val deviceVelocity = 0.1.kilometers private val deviceVelocity = 0.1.kilometers
@ -19,8 +21,13 @@ private val center = Gmc.ofDegrees(55.925, 37.514)
private val radius = 0.01.degrees private val radius = 0.01.degrees
internal fun generateModel(context: Context): DeviceCollectiveModel { internal fun generateModel(
val devices: List<VirtualDeviceState> = List(100) { index -> context: Context,
size: Int = 50,
reportInterval: Duration = 500.milliseconds,
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
): DeviceCollectiveModel {
val devices: List<CollectiveDeviceState> = List(size) { index ->
val id = "device[$index]" val id = "device[$index]"
VirtualDeviceState( VirtualDeviceState(
@ -32,10 +39,12 @@ internal fun generateModel(context: Context): DeviceCollectiveModel {
) { ) {
deviceId = id deviceId = id
description = "Virtual remote device $id" description = "Virtual remote device $id"
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
additionalConfiguration()
} }
} }
val model = DeviceCollectiveModel(context, devices, 0.2.kilometers) val model = DeviceCollectiveModel(context, devices)
return model return model
} }

View File

@ -10,6 +10,7 @@ import androidx.compose.foundation.layout.padding
import androidx.compose.material.Card import androidx.compose.material.Card
import androidx.compose.material.Checkbox import androidx.compose.material.Checkbox
import androidx.compose.material.Text import androidx.compose.material.Text
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.MaterialTheme import androidx.compose.material3.MaterialTheme
import androidx.compose.runtime.* import androidx.compose.runtime.*
import androidx.compose.ui.Alignment import androidx.compose.ui.Alignment
@ -24,74 +25,103 @@ import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application import androidx.compose.ui.window.application
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO import io.ktor.client.engine.cio.CIO
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane import org.jetbrains.compose.splitpane.HorizontalSplitPane
import org.jetbrains.compose.splitpane.rememberSplitPaneState import org.jetbrains.compose.splitpane.rememberSplitPaneState
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.*
import space.kscience.controls.compose.conditional import space.kscience.controls.compose.conditional
import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.spec.useProperty
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.maps.compose.MapView import space.kscience.maps.compose.MapView
import space.kscience.maps.compose.OpenStreetMapTileProvider import space.kscience.maps.compose.OpenStreetMapTileProvider
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.meters
import space.kscience.maps.features.* import space.kscience.maps.features.*
import java.nio.file.Path import java.nio.file.Path
import kotlin.time.Duration.Companion.seconds
@Composable @Composable
fun rememberDeviceManager(): DeviceManager = remember { fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}): Context = remember {
val context = Context { Context(name, contextBuilder)
plugin(DeviceManager)
}
context.request(DeviceManager)
} }
private val gmcMetaConverter = MetaConverter.serializable<Gmc>()
@Composable @Composable
fun App() { fun App() {
val scope = rememberCoroutineScope() val scope = rememberCoroutineScope()
val parentContext = rememberContext("Parent") {
val deviceManager = rememberDeviceManager() plugin(DeviceManager)
val collectiveModel = remember {
generateModel(deviceManager.context)
} }
val devices: Map<DeviceId, CollectiveDevice> = remember { val collectiveModel = remember {
collectiveModel.deviceStates.associate { generateModel(parentContext, 100, reportInterval = 1.seconds)
val device = CollectiveDeviceConstructor( }
context = deviceManager.context,
configuration = it.configuration, val roster = remember {
position = it.position, collectiveModel.roster
velocity = it.velocity }
) {
collectiveModel.locateVisible(it.id).keys val client = remember { CompletableDeferred<MagixEndpoint>() }
val devices = remember { mutableStateMapOf<CollectiveDeviceId, DeviceClient>() }
LaunchedEffect(collectiveModel) {
launchCollectiveMagixServer(collectiveModel)
withContext(Dispatchers.IO) {
val magixClient = MagixEndpoint.rSocketWithWebSockets("localhost")
client.complete(magixClient)
collectiveModel.roster.forEach { (id, config) ->
scope.launch {
devices[id] = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
}
} }
device.moveInCircles() }
it.id to device
}
var selectedDeviceId by remember { mutableStateOf<CollectiveDeviceId?>(null) }
var currentPosition by remember { mutableStateOf<Gmc?>(null) }
LaunchedEffect(selectedDeviceId, devices) {
selectedDeviceId?.let { devices[it] }?.propertyFlow(CollectiveDevice.position)?.collect {
currentPosition = it
} }
} }
var selectedDeviceId by remember { mutableStateOf<DeviceId?>(null) }
var showOnlyVisible by remember { mutableStateOf(false) } var showOnlyVisible by remember { mutableStateOf(false) }
val mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
}
HorizontalSplitPane( HorizontalSplitPane(
splitPaneState = rememberSplitPaneState(0.9f) splitPaneState = rememberSplitPaneState(0.9f)
) { ) {
first(400.dp) { first(400.dp) {
MapView( MapView(
mapTileProvider = mapTileProvider, mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
},
config = ViewConfig() config = ViewConfig()
) { ) {
collectiveModel.deviceStates.forEach { device -> collectiveModel.deviceStates.forEach { device ->
@ -103,82 +133,104 @@ fun App() {
}.launchIn(scope) }.launchIn(scope)
} }
devices.forEach { (id, device) -> scope.launch {
device.useProperty(CollectiveDevice.position, scope = scope) { position ->
val activeDevice = selectedDeviceId?.let { devices[it] } client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") {
val id = magixMessage.sourceEndpoint
val position = gmcMetaConverter.read(deviceMessage.value)
val activeDevice = selectedDeviceId?.let { devices[it] }
if (activeDevice == null || id == selectedDeviceId || !showOnlyVisible || id in activeDevice.listVisible()) { if (
rectangle( activeDevice == null ||
position, id == selectedDeviceId ||
id = id, !showOnlyVisible ||
size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp) id in activeDevice.request(CollectiveDevice.visibleNeighbors)
).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue) ) {
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f) rectangle(
.onClick { selectedDeviceId = id } position,
} else { id = id,
removeFeature(id) size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp)
).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue)
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f)
.onClick { selectedDeviceId = id }
} else {
removeFeature(id)
}
} }
}.launchIn(scope)
}
} }
} }
} }
second(200.dp) { second(200.dp) {
Column {
selectedDeviceId?.let { id -> Column(
Column( modifier = Modifier.verticalScroll(rememberScrollState())
modifier = Modifier ) {
.padding(8.dp) collectiveModel.roster.forEach { (id, _) ->
.border(2.dp, Color.DarkGray) Card(
elevation = 16.dp,
modifier = Modifier.padding(8.dp).onClick {
selectedDeviceId = id
}.conditional(id == selectedDeviceId) {
border(2.dp, Color.Blue)
},
) { ) {
Card( Column(
elevation = 16.dp, modifier = Modifier.padding(8.dp)
) { ) {
Text( Row(verticalAlignment = Alignment.CenterVertically) {
text = "Выбран: $id", if (devices[id] == null) {
fontSize = 16.sp, CircularProgressIndicator()
fontWeight = FontWeight.Bold, }
modifier = Modifier.padding(10.dp).fillMaxWidth() Text(
) text = id,
} fontSize = 16.sp,
devices[id]?.let { fontWeight = FontWeight.Bold,
Text(it.meta.toString(), Modifier.padding(10.dp)) modifier = Modifier.padding(10.dp).fillMaxWidth(),
} )
Row(verticalAlignment = Alignment.CenterVertically, modifier = Modifier.padding(10.dp).fillMaxWidth()) { }
Text("Показать только видимые") if (id == selectedDeviceId) {
Checkbox(showOnlyVisible, { showOnlyVisible = it }) roster[id]?.let {
} Text("Meta:", color = Color.Blue, fontWeight = FontWeight.Bold)
} Card(elevation = 16.dp, modifier = Modifier.fillMaxWidth().padding(8.dp)) {
} Text(it.toString())
Column( }
modifier = Modifier.verticalScroll(rememberScrollState()) }
) {
devices.forEach { (id, device) -> currentPosition?.let { currentPosition ->
Card( Text(
elevation = 16.dp, "Широта: ${String.format("%.3f", currentPosition.latitude.toDegrees().value)}"
modifier = Modifier.padding(8.dp).onClick { )
selectedDeviceId = id Text(
}.conditional(id == selectedDeviceId) { "Долгота: ${String.format("%.3f", currentPosition.longitude.toDegrees().value)}"
border(2.dp, Color.Blue) )
currentPosition.elevation?.let {
Text("Высота: ${String.format("%.1f", it.meters)} м")
}
}
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth()
) {
Text("Показать только видимые")
Checkbox(showOnlyVisible, { showOnlyVisible = it })
}
} }
) {
Text(
text = id,
fontSize = 16.sp,
fontWeight = FontWeight.Bold,
modifier = Modifier.padding(10.dp).fillMaxWidth()
)
} }
} }
} }
} }
} }
} }
} }
fun main() = application { fun main() = application {
// System.setProperty(IO_PARALLELISM_PROPERTY_NAME, 300.toString())
Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) { Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) {
MaterialTheme { MaterialTheme {
App() App()

View File

@ -83,8 +83,7 @@ suspend fun main() {
val endpointId = "device$it" val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost") val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO) deviceManager.launchMagixService(deviceEndpoint, endpointId)
} }
val trace = Bar { val trace = Bar {

View File

@ -21,9 +21,10 @@ public fun interface MagixFlowPlugin {
sendMessage: suspend (MagixMessage) -> Unit, sendMessage: suspend (MagixMessage) -> Unit,
): Job ): Job
/** }
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
*/ /**
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job = * Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
start(scope, magixFlow) { magixFlow.emit(it) } */
} public fun MagixFlowPlugin.start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
start(scope, magixFlow) { magixFlow.emit(it) }

View File

@ -10,7 +10,9 @@ import io.ktor.server.util.getValue
import io.ktor.server.websocket.WebSockets import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ktor.server.RSocketSupport import io.rsocket.kotlin.ktor.server.RSocketSupport
import io.rsocket.kotlin.ktor.server.rSocket import io.rsocket.kotlin.ktor.server.rSocket
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.html.* import kotlinx.html.*
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
@ -42,7 +44,11 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
/** /**
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] * Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
*/ */
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") { public fun Application.magixModule(
magixFlow: Flow<MagixMessage>,
send: suspend (MagixMessage) -> Unit,
route: String = "/",
) {
if (pluginOrNull(WebSockets) == null) { if (pluginOrNull(WebSockets) == null) {
install(WebSockets) install(WebSockets)
} }
@ -62,27 +68,31 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
routing { routing {
route(route) { route(route) {
install(ContentNegotiation){ install(ContentNegotiation) {
json() json()
} }
get("state") { if (magixFlow is SharedFlow) {
call.respondHtml { get("state") {
head { call.respondHtml {
meta { head {
httpEquiv = "refresh" meta {
content = "2" httpEquiv = "refresh"
content = "2"
}
} }
} body {
body { h1 { +"Magix loop statistics" }
h1 { +"Magix loop statistics" } if (magixFlow is MutableSharedFlow) {
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } }
h3 { +"Replay cache:" } h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
ol { h3 { +"Replay cache:" }
magixFlow.replayCache.forEach { message -> ol {
li { magixFlow.replayCache.forEach { message ->
code { li {
+magixJson.encodeToString(message) code {
+magixJson.encodeToString(message)
}
} }
} }
} }
@ -102,17 +112,22 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
} }
post("broadcast") { post("broadcast") {
val message = call.receive<MagixMessage>() val message = call.receive<MagixMessage>()
magixFlow.emit(message) send(message)
} }
//rSocket WS server. Filter from Payload //rSocket WS server. Filter from Payload
rSocket( rSocket(
"rsocket", "rsocket",
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) } acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { send(it) }
) )
} }
} }
} }
public fun Application.magixModule(
magixFlow: MutableSharedFlow<MagixMessage>,
route: String = "/",
): Unit = magixModule(magixFlow, { magixFlow.emit(it) }, route)
/** /**
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it * Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
*/ */

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
/** /**
@ -22,7 +23,6 @@ public fun CoroutineScope.startMagixServer(
val magixFlow = MutableSharedFlow<MagixMessage>( val magixFlow = MutableSharedFlow<MagixMessage>(
replay = buffer, replay = buffer,
extraBufferCapacity = buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST onBufferOverflow = BufferOverflow.DROP_OLDEST
) )