Compare commits

...

3 Commits

15 changed files with 371 additions and 164 deletions

View File

@ -74,7 +74,8 @@ public fun <T, R> DeviceState.Companion.map(
public fun <T, R> DeviceState<T>.map(mapper: (T) -> R): DeviceStateWithDependencies<R> = DeviceState.map(this, mapper)
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> = object : DeviceState<Double> {
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> =
object : DeviceState<Double> {
override val value: Double
get() = this@values.value.value
@ -82,7 +83,7 @@ public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceS
get() = this@values.valueFlow.map { it.value }
override fun toString(): String = this@values.toString()
}
}
/**
* Combine two device states into one read-only [DeviceState]. Only the latest value of each state is used.

View File

@ -41,7 +41,8 @@ private object InstantConverter : MetaConverter<Instant> {
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? = source.value?.doubleArray?.let { (start, end)->
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? =
source.value?.doubleArray?.let { (start, end) ->
start..end
}
@ -51,3 +52,11 @@ private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Dou
}
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
private object StringListConverter : MetaConverter<List<String>> {
override fun convert(obj: List<String>): Meta = Meta(obj.map { it.asValue() }.asValue())
override fun readOrNull(source: Meta): List<String>? = source.stringList ?: source["@jsonArray"]?.stringList
}
public val MetaConverter.Companion.stringList: MetaConverter<List<String>> get() = StringListConverter

View File

@ -2,14 +2,13 @@ package space.kscience.controls.peer
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/**
* A manager that allows direct synchronous sending and receiving binary data
*/
public interface PeerConnection {
/**
* Receive an [Envelope] from a device with name [deviceName] on a given [address] with given [contentId].
* Receive an [Envelope] from a device on a given [address] with given [contentId].
*
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name.
@ -20,13 +19,12 @@ public interface PeerConnection {
*/
public suspend fun receive(
address: String,
deviceName: Name,
contentId: String,
requestMeta: Meta = Meta.EMPTY,
): Envelope
/**
* Send an [envelope] to a device with name [deviceName] on a given [address]
* Send an [envelope] to a device on a given [address]
*
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name.
@ -35,7 +33,6 @@ public interface PeerConnection {
*/
public suspend fun send(
address: String,
deviceName: Name,
envelope: Envelope,
requestMeta: Meta = Meta.EMPTY,
)

View File

@ -19,10 +19,13 @@ import space.kscience.dataforge.meta.Meta
public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T =
propertySpec.converter.readOrNull(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T =
propertySpec.converter.read(getOrReadProperty(propertySpec.name))
public fun <T> DeviceClient.getCached(propertySpec: DevicePropertySpec<*, T>): T? =
getProperty(propertySpec.name)?.let { propertySpec.converter.read(it) }
public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) {
writeProperty(propertySpec.name, propertySpec.converter.convert(value))
}

View File

@ -38,6 +38,7 @@ import space.kscience.dataforge.names.get
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
import space.kscience.magix.server.magixModule
@ -215,5 +216,6 @@ public fun Application.deviceManagerModule(
plugins.forEach {
it.start(this, magixFlow)
}
magixModule(magixFlow)
}

View File

@ -13,7 +13,9 @@ kscience {
commonMain {
implementation(projects.controlsVisualisationCompose)
implementation(projects.controlsConstructor)
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket)
implementation(projects.controlsMagix)
}
jvmMain {
// implementation("io.ktor:ktor-server-cio")

View File

@ -3,27 +3,36 @@
package space.kscience.controls.demo.collective
import space.kscience.controls.api.Device
import space.kscience.controls.constructor.DeviceConstructor
import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.registerAsProperty
import space.kscience.controls.constructor.*
import space.kscience.controls.misc.stringList
import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.DeviceSpec
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.meta.Scheme
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.GmcCurve
import kotlin.time.Duration.Companion.milliseconds
class CollectiveDeviceConfiguration(deviceId: DeviceId) : Scheme() {
typealias CollectiveDeviceId = String
class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() {
var deviceId by string(deviceId)
var description by string()
var reportInterval by int(500)
var radioFrequency by string(default = "169 MHz")
}
typealias CollectiveDeviceRoster = Map<CollectiveDeviceId, CollectiveDeviceConfiguration>
interface CollectiveDevice : Device {
public val id: DeviceId
public val id: CollectiveDeviceId
public val peerConnection: PeerConnection
suspend fun getPosition(): Gmc
@ -31,8 +40,7 @@ interface CollectiveDevice : Device {
suspend fun setVelocity(value: GmcVelocity)
suspend fun listVisible(): Collection<DeviceId>
suspend fun listVisible(): Collection<CollectiveDeviceId>
companion object : DeviceSpec<CollectiveDevice>() {
val position by property<Gmc>(
@ -45,6 +53,17 @@ interface CollectiveDevice : Device {
read = { getVelocity() },
write = { _, value -> setVelocity(value) }
)
val visibleNeighbors by property(
MetaConverter.stringList,
read = {
listVisible().toList()
}
)
// val listVisible by action(MetaConverter.unit, MetaConverter.valueList<String> { it.string }) {
// listVisible().toList()
// }
}
}
@ -54,13 +73,34 @@ class CollectiveDeviceConstructor(
val configuration: CollectiveDeviceConfiguration,
position: MutableDeviceState<Gmc>,
velocity: MutableDeviceState<GmcVelocity>,
private val listVisible: suspend () -> Collection<DeviceId>,
override val peerConnection: PeerConnection,
private val observation: suspend () -> Map<CollectiveDeviceId, GmcCurve>,
) : DeviceConstructor(context, configuration.meta), CollectiveDevice {
override val id: DeviceId get() = configuration.deviceId
override val id: CollectiveDeviceId get() = configuration.deviceId
val position = registerAsProperty(CollectiveDevice.position, position.sample(500.milliseconds))
val velocity = registerAsProperty(CollectiveDevice.velocity, velocity)
val position = registerAsProperty(
CollectiveDevice.position,
position.sample(configuration.reportInterval.milliseconds)
)
val velocity = registerAsProperty(
CollectiveDevice.velocity,
velocity.sample(configuration.reportInterval.milliseconds)
)
private val _visibleNeighbors: MutableDeviceState<Collection<CollectiveDeviceId>> = stateOf(emptyList())
val visibleNeighbors = registerAsProperty(
CollectiveDevice.visibleNeighbors,
_visibleNeighbors.map { it.toList() }
)
init {
position.onNext {
_visibleNeighbors.value = observation.invoke().keys
}
}
override suspend fun getPosition(): Gmc = position.value
@ -70,5 +110,5 @@ class CollectiveDeviceConstructor(
velocity.value = value
}
override suspend fun listVisible(): Collection<DeviceId> = listVisible.invoke()
override suspend fun listVisible(): Collection<CollectiveDeviceId> = observation.invoke().keys
}

View File

@ -1,27 +1,39 @@
package space.kscience.controls.demo.collective
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.constructor.ModelConstructor
import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.onTimer
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.peer.PeerConnection
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.startMagixServer
import space.kscience.maps.coordinates.*
typealias DeviceId = String
internal data class VirtualDeviceState(
val id: DeviceId,
internal data class CollectiveDeviceState(
val id: CollectiveDeviceId,
val configuration: CollectiveDeviceConfiguration,
val position: MutableDeviceState<Gmc>,
val velocity: MutableDeviceState<GmcVelocity>,
)
internal fun VirtualDeviceState(
id: DeviceId,
id: CollectiveDeviceId,
position: Gmc,
configuration: CollectiveDeviceConfiguration.() -> Unit = {},
) = VirtualDeviceState(
) = CollectiveDeviceState(
id,
CollectiveDeviceConfiguration(id).apply(configuration),
MutableDeviceState(position),
@ -31,9 +43,10 @@ internal fun VirtualDeviceState(
internal class DeviceCollectiveModel(
context: Context,
val deviceStates: Collection<VirtualDeviceState>,
val visibilityRange: Distance,
) : ModelConstructor(context) {
val deviceStates: Collection<CollectiveDeviceState>,
val visibilityRange: Distance = 0.5.kilometers,
val radioRange: Distance = 5.kilometers,
) : ModelConstructor(context), PeerConnection {
/**
* Propagate movement
@ -45,7 +58,7 @@ internal class DeviceCollectiveModel(
}
}
suspend fun locateVisible(id: DeviceId): Map<DeviceId, GmcCurve> {
private fun locateVisible(id: CollectiveDeviceId): Map<CollectiveDeviceId, GmcCurve> {
val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value }
val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id")
@ -56,4 +69,53 @@ internal class DeviceCollectiveModel(
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
}
val devices = deviceStates.associate {
val device = CollectiveDeviceConstructor(
context = context,
configuration = it.configuration,
position = it.position,
velocity = it.velocity,
peerConnection = this,
) {
locateVisible(it.id)
}
//start movement program
device.moveInCircles()
it.id to device
}
val roster = deviceStates.associate { it.id to it.configuration }
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope {
TODO("Not yet implemented")
}
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
// devices.values.filter { it.configuration.radioFrequency == address }.forEach { device ->
// ```
// }
}
}
internal fun CoroutineScope.launchCollectiveMagixServer(
collectiveModel: DeviceCollectiveModel,
): Job = launch(Dispatchers.IO) {
val server = startMagixServer(
// RSocketMagixFlowPlugin()
)
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
collectiveModel.devices.forEach { (id, device) ->
val deviceContext = collectiveModel.context.buildContext(id.parseAsName()) {
coroutineContext(coroutineContext)
plugin(DeviceManager)
}
deviceContext.install(id, device)
// val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id)
}
}

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.sample
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.MutableDeviceState
import kotlin.time.Duration
@OptIn(FlowPreview::class)
@ -11,10 +12,24 @@ class SampleDeviceState<T>(
val origin: DeviceState<T>,
val interval: Duration,
) : DeviceState<T> {
override val value: T get() = origin.value
override val value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
@OptIn(FlowPreview::class)
class MutableSampleDeviceState<T>(
val origin: MutableDeviceState<T>,
val interval: Duration,
) : MutableDeviceState<T> {
override var value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> MutableDeviceState<T>.sample(interval: Duration) = MutableSampleDeviceState(this, interval)

View File

@ -12,6 +12,8 @@ import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.kilometers
import kotlin.math.PI
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
private val deviceVelocity = 0.1.kilometers
@ -19,8 +21,13 @@ private val center = Gmc.ofDegrees(55.925, 37.514)
private val radius = 0.01.degrees
internal fun generateModel(context: Context): DeviceCollectiveModel {
val devices: List<VirtualDeviceState> = List(100) { index ->
internal fun generateModel(
context: Context,
size: Int = 50,
reportInterval: Duration = 500.milliseconds,
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
): DeviceCollectiveModel {
val devices: List<CollectiveDeviceState> = List(size) { index ->
val id = "device[$index]"
VirtualDeviceState(
@ -32,10 +39,12 @@ internal fun generateModel(context: Context): DeviceCollectiveModel {
) {
deviceId = id
description = "Virtual remote device $id"
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
additionalConfiguration()
}
}
val model = DeviceCollectiveModel(context, devices, 0.2.kilometers)
val model = DeviceCollectiveModel(context, devices)
return model
}

View File

@ -10,6 +10,7 @@ import androidx.compose.foundation.layout.padding
import androidx.compose.material.Card
import androidx.compose.material.Checkbox
import androidx.compose.material.Text
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.MaterialTheme
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
@ -24,74 +25,103 @@ import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane
import org.jetbrains.compose.splitpane.rememberSplitPaneState
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.*
import space.kscience.controls.compose.conditional
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.spec.useProperty
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.maps.compose.MapView
import space.kscience.maps.compose.OpenStreetMapTileProvider
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.meters
import space.kscience.maps.features.*
import java.nio.file.Path
import kotlin.time.Duration.Companion.seconds
@Composable
fun rememberDeviceManager(): DeviceManager = remember {
val context = Context {
plugin(DeviceManager)
}
context.request(DeviceManager)
fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}): Context = remember {
Context(name, contextBuilder)
}
private val gmcMetaConverter = MetaConverter.serializable<Gmc>()
@Composable
fun App() {
val scope = rememberCoroutineScope()
val deviceManager = rememberDeviceManager()
val parentContext = rememberContext("Parent") {
plugin(DeviceManager)
}
val collectiveModel = remember {
generateModel(deviceManager.context)
generateModel(parentContext, 100, reportInterval = 1.seconds)
}
val devices: Map<DeviceId, CollectiveDevice> = remember {
collectiveModel.deviceStates.associate {
val device = CollectiveDeviceConstructor(
context = deviceManager.context,
configuration = it.configuration,
position = it.position,
velocity = it.velocity
) {
collectiveModel.locateVisible(it.id).keys
val roster = remember {
collectiveModel.roster
}
val client = remember { CompletableDeferred<MagixEndpoint>() }
val devices = remember { mutableStateMapOf<CollectiveDeviceId, DeviceClient>() }
LaunchedEffect(collectiveModel) {
launchCollectiveMagixServer(collectiveModel)
withContext(Dispatchers.IO) {
val magixClient = MagixEndpoint.rSocketWithWebSockets("localhost")
client.complete(magixClient)
collectiveModel.roster.forEach { (id, config) ->
scope.launch {
devices[id] = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
}
device.moveInCircles()
it.id to device
}
}
var selectedDeviceId by remember { mutableStateOf<DeviceId?>(null) }
}
var selectedDeviceId by remember { mutableStateOf<CollectiveDeviceId?>(null) }
var currentPosition by remember { mutableStateOf<Gmc?>(null) }
LaunchedEffect(selectedDeviceId, devices) {
selectedDeviceId?.let { devices[it] }?.propertyFlow(CollectiveDevice.position)?.collect {
currentPosition = it
}
}
var showOnlyVisible by remember { mutableStateOf(false) }
val mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
}
HorizontalSplitPane(
splitPaneState = rememberSplitPaneState(0.9f)
) {
first(400.dp) {
MapView(
mapTileProvider = mapTileProvider,
mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
},
config = ViewConfig()
) {
collectiveModel.deviceStates.forEach { device ->
@ -103,12 +133,20 @@ fun App() {
}.launchIn(scope)
}
devices.forEach { (id, device) ->
device.useProperty(CollectiveDevice.position, scope = scope) { position ->
scope.launch {
client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") {
val id = magixMessage.sourceEndpoint
val position = gmcMetaConverter.read(deviceMessage.value)
val activeDevice = selectedDeviceId?.let { devices[it] }
if (activeDevice == null || id == selectedDeviceId || !showOnlyVisible || id in activeDevice.listVisible()) {
if (
activeDevice == null ||
id == selectedDeviceId ||
!showOnlyVisible ||
id in activeDevice.request(CollectiveDevice.visibleNeighbors)
) {
rectangle(
position,
id = id,
@ -119,66 +157,80 @@ fun App() {
} else {
removeFeature(id)
}
}
}.launchIn(scope)
}
}
}
}
second(200.dp) {
Column {
selectedDeviceId?.let { id ->
Column(
modifier = Modifier
.padding(8.dp)
.border(2.dp, Color.DarkGray)
) {
Card(
elevation = 16.dp,
) {
Text(
text = "Выбран: $id",
fontSize = 16.sp,
fontWeight = FontWeight.Bold,
modifier = Modifier.padding(10.dp).fillMaxWidth()
)
}
devices[id]?.let {
Text(it.meta.toString(), Modifier.padding(10.dp))
}
Row(verticalAlignment = Alignment.CenterVertically, modifier = Modifier.padding(10.dp).fillMaxWidth()) {
Text("Показать только видимые")
Checkbox(showOnlyVisible, { showOnlyVisible = it })
}
}
}
Column(
modifier = Modifier.verticalScroll(rememberScrollState())
) {
devices.forEach { (id, device) ->
collectiveModel.roster.forEach { (id, _) ->
Card(
elevation = 16.dp,
modifier = Modifier.padding(8.dp).onClick {
selectedDeviceId = id
}.conditional(id == selectedDeviceId) {
border(2.dp, Color.Blue)
}
},
) {
Column(
modifier = Modifier.padding(8.dp)
) {
Row(verticalAlignment = Alignment.CenterVertically) {
if (devices[id] == null) {
CircularProgressIndicator()
}
Text(
text = id,
fontSize = 16.sp,
fontWeight = FontWeight.Bold,
modifier = Modifier.padding(10.dp).fillMaxWidth()
modifier = Modifier.padding(10.dp).fillMaxWidth(),
)
}
if (id == selectedDeviceId) {
roster[id]?.let {
Text("Meta:", color = Color.Blue, fontWeight = FontWeight.Bold)
Card(elevation = 16.dp, modifier = Modifier.fillMaxWidth().padding(8.dp)) {
Text(it.toString())
}
}
currentPosition?.let { currentPosition ->
Text(
"Широта: ${String.format("%.3f", currentPosition.latitude.toDegrees().value)}"
)
Text(
"Долгота: ${String.format("%.3f", currentPosition.longitude.toDegrees().value)}"
)
currentPosition.elevation?.let {
Text("Высота: ${String.format("%.1f", it.meters)} м")
}
}
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth()
) {
Text("Показать только видимые")
Checkbox(showOnlyVisible, { showOnlyVisible = it })
}
}
}
}
}
}
}
}
}
fun main() = application {
// System.setProperty(IO_PARALLELISM_PROPERTY_NAME, 300.toString())
Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) {
MaterialTheme {
App()

View File

@ -83,8 +83,7 @@ suspend fun main() {
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO)
deviceManager.launchMagixService(deviceEndpoint, endpointId)
}
val trace = Bar {

View File

@ -21,9 +21,10 @@ public fun interface MagixFlowPlugin {
sendMessage: suspend (MagixMessage) -> Unit,
): Job
/**
}
/**
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
*/
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
public fun MagixFlowPlugin.start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
start(scope, magixFlow) { magixFlow.emit(it) }
}

View File

@ -10,7 +10,9 @@ import io.ktor.server.util.getValue
import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ktor.server.RSocketSupport
import io.rsocket.kotlin.ktor.server.rSocket
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.html.*
import kotlinx.serialization.encodeToString
@ -42,7 +44,11 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
/**
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
*/
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") {
public fun Application.magixModule(
magixFlow: Flow<MagixMessage>,
send: suspend (MagixMessage) -> Unit,
route: String = "/",
) {
if (pluginOrNull(WebSockets) == null) {
install(WebSockets)
}
@ -62,9 +68,10 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
routing {
route(route) {
install(ContentNegotiation){
install(ContentNegotiation) {
json()
}
if (magixFlow is SharedFlow) {
get("state") {
call.respondHtml {
head {
@ -75,7 +82,9 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
}
body {
h1 { +"Magix loop statistics" }
if (magixFlow is MutableSharedFlow) {
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
}
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
h3 { +"Replay cache:" }
ol {
@ -90,6 +99,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
}
}
}
}
//SSE server. Filter from query
get("sse") {
val filter = call.buildFilter()
@ -102,17 +112,22 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
}
post("broadcast") {
val message = call.receive<MagixMessage>()
magixFlow.emit(message)
send(message)
}
//rSocket WS server. Filter from Payload
rSocket(
"rsocket",
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) }
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { send(it) }
)
}
}
}
public fun Application.magixModule(
magixFlow: MutableSharedFlow<MagixMessage>,
route: String = "/",
): Unit = magixModule(magixFlow, { magixFlow.emit(it) }, route)
/**
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
*/

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
/**
@ -22,7 +23,6 @@ public fun CoroutineScope.startMagixServer(
val magixFlow = MutableSharedFlow<MagixMessage>(
replay = buffer,
extraBufferCapacity = buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)