[WIP] remote communication for CollectiveDevice

This commit is contained in:
Alexander Nozik 2024-06-09 15:09:43 +03:00
parent 13b80be884
commit e9bde68674
12 changed files with 312 additions and 143 deletions

View File

@ -2,14 +2,13 @@ package space.kscience.controls.peer
import space.kscience.dataforge.io.Envelope import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/** /**
* A manager that allows direct synchronous sending and receiving binary data * A manager that allows direct synchronous sending and receiving binary data
*/ */
public interface PeerConnection { public interface PeerConnection {
/** /**
* Receive an [Envelope] from a device with name [deviceName] on a given [address] with given [contentId]. * Receive an [Envelope] from a device on a given [address] with given [contentId].
* *
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or * The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name. * magix endpoint name.
@ -20,13 +19,12 @@ public interface PeerConnection {
*/ */
public suspend fun receive( public suspend fun receive(
address: String, address: String,
deviceName: Name,
contentId: String, contentId: String,
requestMeta: Meta = Meta.EMPTY, requestMeta: Meta = Meta.EMPTY,
): Envelope ): Envelope
/** /**
* Send an [envelope] to a device with name [deviceName] on a given [address] * Send an [envelope] to a device on a given [address]
* *
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or * The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name. * magix endpoint name.
@ -35,7 +33,6 @@ public interface PeerConnection {
*/ */
public suspend fun send( public suspend fun send(
address: String, address: String,
deviceName: Name,
envelope: Envelope, envelope: Envelope,
requestMeta: Meta = Meta.EMPTY, requestMeta: Meta = Meta.EMPTY,
) )

View File

@ -38,6 +38,7 @@ import space.kscience.dataforge.names.get
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
import space.kscience.magix.server.magixModule import space.kscience.magix.server.magixModule
@ -215,5 +216,6 @@ public fun Application.deviceManagerModule(
plugins.forEach { plugins.forEach {
it.start(this, magixFlow) it.start(this, magixFlow)
} }
magixModule(magixFlow) magixModule(magixFlow)
} }

View File

@ -13,7 +13,9 @@ kscience {
commonMain { commonMain {
implementation(projects.controlsVisualisationCompose) implementation(projects.controlsVisualisationCompose)
implementation(projects.controlsConstructor) implementation(projects.controlsConstructor)
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket) implementation(projects.magix.magixRsocket)
implementation(projects.controlsMagix)
} }
jvmMain { jvmMain {
// implementation("io.ktor:ktor-server-cio") // implementation("io.ktor:ktor-server-cio")

View File

@ -6,24 +6,35 @@ import space.kscience.controls.api.Device
import space.kscience.controls.constructor.DeviceConstructor import space.kscience.controls.constructor.DeviceConstructor
import space.kscience.controls.constructor.MutableDeviceState import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.registerAsProperty import space.kscience.controls.constructor.registerAsProperty
import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.DeviceSpec import space.kscience.controls.spec.DeviceSpec
import space.kscience.controls.spec.unit
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.MetaConverter import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.meta.Scheme import space.kscience.dataforge.meta.Scheme
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.maps.coordinates.Gmc import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.GmcCurve
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
class CollectiveDeviceConfiguration(deviceId: DeviceId) : Scheme() { typealias CollectiveDeviceId = String
class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() {
var deviceId by string(deviceId) var deviceId by string(deviceId)
var description by string() var description by string()
var reportInterval by int(500)
var radioFrequency by string(default = "169 MHz")
} }
typealias CollectiveDeviceRoster = Map<CollectiveDeviceId, CollectiveDeviceConfiguration>
interface CollectiveDevice : Device { interface CollectiveDevice : Device {
public val id: DeviceId public val id: CollectiveDeviceId
public val peerConnection: PeerConnection
suspend fun getPosition(): Gmc suspend fun getPosition(): Gmc
@ -31,8 +42,7 @@ interface CollectiveDevice : Device {
suspend fun setVelocity(value: GmcVelocity) suspend fun setVelocity(value: GmcVelocity)
suspend fun listVisible(): Collection<DeviceId> suspend fun listVisible(): Collection<CollectiveDeviceId>
companion object : DeviceSpec<CollectiveDevice>() { companion object : DeviceSpec<CollectiveDevice>() {
val position by property<Gmc>( val position by property<Gmc>(
@ -45,6 +55,10 @@ interface CollectiveDevice : Device {
read = { getVelocity() }, read = { getVelocity() },
write = { _, value -> setVelocity(value) } write = { _, value -> setVelocity(value) }
) )
val listVisible by action(MetaConverter.unit, MetaConverter.valueList<String> { it.string }) {
listVisible().toList()
}
} }
} }
@ -54,13 +68,14 @@ class CollectiveDeviceConstructor(
val configuration: CollectiveDeviceConfiguration, val configuration: CollectiveDeviceConfiguration,
position: MutableDeviceState<Gmc>, position: MutableDeviceState<Gmc>,
velocity: MutableDeviceState<GmcVelocity>, velocity: MutableDeviceState<GmcVelocity>,
private val listVisible: suspend () -> Collection<DeviceId>, override val peerConnection: PeerConnection,
private val observation: suspend () -> Map<CollectiveDeviceId, GmcCurve>,
) : DeviceConstructor(context, configuration.meta), CollectiveDevice { ) : DeviceConstructor(context, configuration.meta), CollectiveDevice {
override val id: DeviceId get() = configuration.deviceId override val id: CollectiveDeviceId get() = configuration.deviceId
val position = registerAsProperty(CollectiveDevice.position, position.sample(500.milliseconds)) val position = registerAsProperty(CollectiveDevice.position, position.sample(configuration.reportInterval.milliseconds))
val velocity = registerAsProperty(CollectiveDevice.velocity, velocity) val velocity = registerAsProperty(CollectiveDevice.velocity, velocity.sample(configuration.reportInterval.milliseconds))
override suspend fun getPosition(): Gmc = position.value override suspend fun getPosition(): Gmc = position.value
@ -70,5 +85,5 @@ class CollectiveDeviceConstructor(
velocity.value = value velocity.value = value
} }
override suspend fun listVisible(): Collection<DeviceId> = listVisible.invoke() override suspend fun listVisible(): Collection<CollectiveDeviceId> = observation.invoke().keys
} }

View File

@ -1,27 +1,41 @@
package space.kscience.controls.demo.collective package space.kscience.controls.demo.collective
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.constructor.ModelConstructor import space.kscience.controls.constructor.ModelConstructor
import space.kscience.controls.constructor.MutableDeviceState import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.onTimer import space.kscience.controls.constructor.onTimer
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.peer.PeerConnection
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.startMagixServer
import space.kscience.maps.coordinates.* import space.kscience.maps.coordinates.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
typealias DeviceId = String internal data class CollectiveDeviceState(
val id: CollectiveDeviceId,
internal data class VirtualDeviceState(
val id: DeviceId,
val configuration: CollectiveDeviceConfiguration, val configuration: CollectiveDeviceConfiguration,
val position: MutableDeviceState<Gmc>, val position: MutableDeviceState<Gmc>,
val velocity: MutableDeviceState<GmcVelocity>, val velocity: MutableDeviceState<GmcVelocity>,
) )
internal fun VirtualDeviceState( internal fun VirtualDeviceState(
id: DeviceId, id: CollectiveDeviceId,
position: Gmc, position: Gmc,
configuration: CollectiveDeviceConfiguration.() -> Unit = {}, configuration: CollectiveDeviceConfiguration.() -> Unit = {},
) = VirtualDeviceState( ) = CollectiveDeviceState(
id, id,
CollectiveDeviceConfiguration(id).apply(configuration), CollectiveDeviceConfiguration(id).apply(configuration),
MutableDeviceState(position), MutableDeviceState(position),
@ -31,9 +45,11 @@ internal fun VirtualDeviceState(
internal class DeviceCollectiveModel( internal class DeviceCollectiveModel(
context: Context, context: Context,
val deviceStates: Collection<VirtualDeviceState>, val deviceStates: Collection<CollectiveDeviceState>,
val visibilityRange: Distance, val visibilityRange: Distance = 0.4.kilometers,
) : ModelConstructor(context) { val radioRange: Distance = 5.kilometers,
val reportInterval: Duration = 1000.milliseconds
) : ModelConstructor(context), PeerConnection {
/** /**
* Propagate movement * Propagate movement
@ -45,7 +61,7 @@ internal class DeviceCollectiveModel(
} }
} }
suspend fun locateVisible(id: DeviceId): Map<DeviceId, GmcCurve> { private fun locateVisible(id: CollectiveDeviceId): Map<CollectiveDeviceId, GmcCurve> {
val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value } val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value }
val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id") val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id")
@ -56,4 +72,52 @@ internal class DeviceCollectiveModel(
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange } return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
} }
val devices = deviceStates.associate {
val device = CollectiveDeviceConstructor(
context = context,
configuration = it.configuration,
position = it.position,
velocity = it.velocity,
peerConnection = this,
) {
locateVisible(it.id)
}
//start movement program
device.moveInCircles()
it.id to device
}
val roster = deviceStates.associate { it.id to it.configuration }
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope {
TODO("Not yet implemented")
}
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
// devices.values.filter { it.configuration.radioFrequency == address }.forEach { device ->
// ```
// }
}
}
internal fun CoroutineScope.launchCollectiveMagixServer(
collectiveModel: DeviceCollectiveModel,
): Job = launch(Dispatchers.IO) {
val server = startMagixServer(
// RSocketMagixFlowPlugin()
)
collectiveModel.devices.forEach { (id, device) ->
val deviceContext = collectiveModel.context.buildContext(id.parseAsName()) {
coroutineContext(coroutineContext)
plugin(DeviceManager)
}
deviceContext.install(id, device)
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id)
}
} }

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.sample import kotlinx.coroutines.flow.sample
import space.kscience.controls.constructor.DeviceState import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.MutableDeviceState
import kotlin.time.Duration import kotlin.time.Duration
@OptIn(FlowPreview::class) @OptIn(FlowPreview::class)
@ -11,10 +12,24 @@ class SampleDeviceState<T>(
val origin: DeviceState<T>, val origin: DeviceState<T>,
val interval: Duration, val interval: Duration,
) : DeviceState<T> { ) : DeviceState<T> {
override val value: T get() = origin.value override val value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval) override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)" override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
} }
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval) fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
@OptIn(FlowPreview::class)
class MutableSampleDeviceState<T>(
val origin: MutableDeviceState<T>,
val interval: Duration,
) : MutableDeviceState<T> {
override var value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> MutableDeviceState<T>.sample(interval: Duration) = MutableSampleDeviceState(this, interval)

View File

@ -12,6 +12,8 @@ import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.kilometers import space.kscience.maps.coordinates.kilometers
import kotlin.math.PI import kotlin.math.PI
import kotlin.random.Random import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
private val deviceVelocity = 0.1.kilometers private val deviceVelocity = 0.1.kilometers
@ -19,8 +21,13 @@ private val center = Gmc.ofDegrees(55.925, 37.514)
private val radius = 0.01.degrees private val radius = 0.01.degrees
internal fun generateModel(context: Context): DeviceCollectiveModel { internal fun generateModel(
val devices: List<VirtualDeviceState> = List(100) { index -> context: Context,
size: Int = 50,
reportInterval: Duration = 500.milliseconds,
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
): DeviceCollectiveModel {
val devices: List<CollectiveDeviceState> = List(size) { index ->
val id = "device[$index]" val id = "device[$index]"
VirtualDeviceState( VirtualDeviceState(
@ -32,6 +39,8 @@ internal fun generateModel(context: Context): DeviceCollectiveModel {
) { ) {
deviceId = id deviceId = id
description = "Virtual remote device $id" description = "Virtual remote device $id"
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
additionalConfiguration()
} }
} }

View File

@ -24,74 +24,95 @@ import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application import androidx.compose.ui.window.application
import io.ktor.client.HttpClient import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO import io.ktor.client.engine.cio.CIO
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane import org.jetbrains.compose.splitpane.HorizontalSplitPane
import org.jetbrains.compose.splitpane.rememberSplitPaneState import org.jetbrains.compose.splitpane.rememberSplitPaneState
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.*
import space.kscience.controls.compose.conditional import space.kscience.controls.compose.conditional
import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.spec.useProperty
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.maps.compose.MapView import space.kscience.maps.compose.MapView
import space.kscience.maps.compose.OpenStreetMapTileProvider import space.kscience.maps.compose.OpenStreetMapTileProvider
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.meters
import space.kscience.maps.features.* import space.kscience.maps.features.*
import java.nio.file.Path import java.nio.file.Path
import kotlin.time.Duration.Companion.seconds
@Composable @Composable
fun rememberDeviceManager(): DeviceManager = remember { fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}): Context = remember {
val context = Context { Context(name, contextBuilder)
plugin(DeviceManager)
}
context.request(DeviceManager)
} }
@Composable @Composable
fun App() { fun App() {
val scope = rememberCoroutineScope() val scope = rememberCoroutineScope()
val parentContext = rememberContext("Parent") {
val deviceManager = rememberDeviceManager() plugin(DeviceManager)
}
val collectiveModel = remember { val collectiveModel = remember {
generateModel(deviceManager.context) generateModel(parentContext, 60)
} }
val devices: Map<DeviceId, CollectiveDevice> = remember { val roster = remember {
collectiveModel.deviceStates.associate { collectiveModel.roster
val device = CollectiveDeviceConstructor(
context = deviceManager.context,
configuration = it.configuration,
position = it.position,
velocity = it.velocity
) {
collectiveModel.locateVisible(it.id).keys
} }
device.moveInCircles()
it.id to device val client = remember { CompletableDeferred<MagixEndpoint>() }
val devices = remember { mutableStateMapOf<CollectiveDeviceId, DeviceClient>() }
LaunchedEffect(collectiveModel) {
launchCollectiveMagixServer(collectiveModel)
withContext(Dispatchers.IO) {
val magixClient = MagixEndpoint.rSocketWithWebSockets("localhost")
client.complete(magixClient)
collectiveModel.roster.forEach { (id, config) ->
devices[id] = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
}
}
}
var selectedDeviceId by remember { mutableStateOf<CollectiveDeviceId?>(null) }
var currentPosition by remember { mutableStateOf<Gmc?>(null) }
LaunchedEffect(selectedDeviceId, devices) {
selectedDeviceId?.let { devices[it] }?.propertyFlow(CollectiveDevice.position)?.collect {
currentPosition = it
} }
} }
var selectedDeviceId by remember { mutableStateOf<DeviceId?>(null) }
var showOnlyVisible by remember { mutableStateOf(false) } var showOnlyVisible by remember { mutableStateOf(false) }
val mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
}
HorizontalSplitPane( HorizontalSplitPane(
splitPaneState = rememberSplitPaneState(0.9f) splitPaneState = rememberSplitPaneState(0.9f)
) { ) {
first(400.dp) { first(400.dp) {
MapView( MapView(
mapTileProvider = mapTileProvider, mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
},
config = ViewConfig() config = ViewConfig()
) { ) {
collectiveModel.deviceStates.forEach { device -> collectiveModel.deviceStates.forEach { device ->
@ -103,12 +124,29 @@ fun App() {
}.launchIn(scope) }.launchIn(scope)
} }
devices.forEach { (id, device) -> scope.launch {
device.useProperty(CollectiveDevice.position, scope = scope) { position ->
client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") {
val id = magixMessage.sourceEndpoint
val position = MetaConverter.serializable<Gmc>().read(deviceMessage.value)
val activeDevice = selectedDeviceId?.let { devices[it] } val activeDevice = selectedDeviceId?.let { devices[it] }
if (activeDevice == null || id == selectedDeviceId || !showOnlyVisible || id in activeDevice.listVisible()) { suspend fun DeviceClient.idIsVisible() = try {
withTimeout(1.seconds) {
id in execute(CollectiveDevice.listVisible)
}
} catch (ex: Exception) {
ex.printStackTrace()
true
}
if (
activeDevice == null ||
id == selectedDeviceId ||
!showOnlyVisible ||
activeDevice.idIsVisible()
) {
rectangle( rectangle(
position, position,
id = id, id = id,
@ -119,42 +157,18 @@ fun App() {
} else { } else {
removeFeature(id) removeFeature(id)
} }
}
}.launchIn(scope)
} }
} }
} }
}
second(200.dp) { second(200.dp) {
Column {
selectedDeviceId?.let { id ->
Column(
modifier = Modifier
.padding(8.dp)
.border(2.dp, Color.DarkGray)
) {
Card(
elevation = 16.dp,
) {
Text(
text = "Выбран: $id",
fontSize = 16.sp,
fontWeight = FontWeight.Bold,
modifier = Modifier.padding(10.dp).fillMaxWidth()
)
}
devices[id]?.let {
Text(it.meta.toString(), Modifier.padding(10.dp))
}
Row(verticalAlignment = Alignment.CenterVertically, modifier = Modifier.padding(10.dp).fillMaxWidth()) {
Text("Показать только видимые")
Checkbox(showOnlyVisible, { showOnlyVisible = it })
}
}
}
Column( Column(
modifier = Modifier.verticalScroll(rememberScrollState()) modifier = Modifier.verticalScroll(rememberScrollState())
) { ) {
devices.forEach { (id, device) -> devices.forEach { (id, _) ->
Card( Card(
elevation = 16.dp, elevation = 16.dp,
modifier = Modifier.padding(8.dp).onClick { modifier = Modifier.padding(8.dp).onClick {
@ -162,6 +176,9 @@ fun App() {
}.conditional(id == selectedDeviceId) { }.conditional(id == selectedDeviceId) {
border(2.dp, Color.Blue) border(2.dp, Color.Blue)
} }
) {
Column(
modifier = Modifier.padding(8.dp)
) { ) {
Text( Text(
text = id, text = id,
@ -169,16 +186,49 @@ fun App() {
fontWeight = FontWeight.Bold, fontWeight = FontWeight.Bold,
modifier = Modifier.padding(10.dp).fillMaxWidth() modifier = Modifier.padding(10.dp).fillMaxWidth()
) )
if (id == selectedDeviceId) {
roster[id]?.let {
Text("Meta:", color = Color.Blue, fontWeight = FontWeight.Bold)
Card(elevation = 16.dp, modifier = Modifier.fillMaxWidth().padding(8.dp)) {
Text(it.toString())
}
}
currentPosition?.let { currentPosition ->
Text("Широта: ${String.format("%.3f", currentPosition.latitude.toDegrees().value)}")
Text(
"Долгота: ${
String.format(
"%.3f",
currentPosition.longitude.toDegrees().value
)
}"
)
currentPosition.elevation?.let {
Text("Высота: ${String.format("%.1f", it.meters)} м")
}
}
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth()
) {
Text("Показать только видимые")
Checkbox(showOnlyVisible, { showOnlyVisible = it })
} }
} }
} }
} }
} }
} }
}
}
} }
fun main() = application { fun main() = application {
// System.setProperty(IO_PARALLELISM_PROPERTY_NAME, 300.toString())
Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) { Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) {
MaterialTheme { MaterialTheme {
App() App()

View File

@ -83,8 +83,7 @@ suspend fun main() {
val endpointId = "device$it" val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost") val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO) deviceManager.launchMagixService(deviceEndpoint, endpointId)
} }
val trace = Bar { val trace = Bar {

View File

@ -21,9 +21,10 @@ public fun interface MagixFlowPlugin {
sendMessage: suspend (MagixMessage) -> Unit, sendMessage: suspend (MagixMessage) -> Unit,
): Job ): Job
}
/** /**
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins. * Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
*/ */
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job = public fun MagixFlowPlugin.start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
start(scope, magixFlow) { magixFlow.emit(it) } start(scope, magixFlow) { magixFlow.emit(it) }
}

View File

@ -10,7 +10,9 @@ import io.ktor.server.util.getValue
import io.ktor.server.websocket.WebSockets import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ktor.server.RSocketSupport import io.rsocket.kotlin.ktor.server.RSocketSupport
import io.rsocket.kotlin.ktor.server.rSocket import io.rsocket.kotlin.ktor.server.rSocket
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.html.* import kotlinx.html.*
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
@ -42,7 +44,11 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
/** /**
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] * Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
*/ */
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") { public fun Application.magixModule(
magixFlow: Flow<MagixMessage>,
send: suspend (MagixMessage) -> Unit,
route: String = "/",
) {
if (pluginOrNull(WebSockets) == null) { if (pluginOrNull(WebSockets) == null) {
install(WebSockets) install(WebSockets)
} }
@ -65,6 +71,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
install(ContentNegotiation) { install(ContentNegotiation) {
json() json()
} }
if (magixFlow is SharedFlow) {
get("state") { get("state") {
call.respondHtml { call.respondHtml {
head { head {
@ -75,7 +82,9 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
} }
body { body {
h1 { +"Magix loop statistics" } h1 { +"Magix loop statistics" }
if (magixFlow is MutableSharedFlow) {
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
}
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
h3 { +"Replay cache:" } h3 { +"Replay cache:" }
ol { ol {
@ -90,6 +99,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
} }
} }
} }
}
//SSE server. Filter from query //SSE server. Filter from query
get("sse") { get("sse") {
val filter = call.buildFilter() val filter = call.buildFilter()
@ -102,17 +112,22 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
} }
post("broadcast") { post("broadcast") {
val message = call.receive<MagixMessage>() val message = call.receive<MagixMessage>()
magixFlow.emit(message) send(message)
} }
//rSocket WS server. Filter from Payload //rSocket WS server. Filter from Payload
rSocket( rSocket(
"rsocket", "rsocket",
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) } acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { send(it) }
) )
} }
} }
} }
public fun Application.magixModule(
magixFlow: MutableSharedFlow<MagixMessage>,
route: String = "/",
): Unit = magixModule(magixFlow, { magixFlow.emit(it) }, route)
/** /**
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it * Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
*/ */

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import space.kscience.magix.api.MagixFlowPlugin import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
/** /**
@ -22,7 +23,6 @@ public fun CoroutineScope.startMagixServer(
val magixFlow = MutableSharedFlow<MagixMessage>( val magixFlow = MutableSharedFlow<MagixMessage>(
replay = buffer, replay = buffer,
extraBufferCapacity = buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST onBufferOverflow = BufferOverflow.DROP_OLDEST
) )