Compare commits
3 Commits
13b80be884
...
60a693b1b3
Author | SHA1 | Date | |
---|---|---|---|
60a693b1b3 | |||
c55ce2cf9a | |||
e9bde68674 |
@ -74,15 +74,16 @@ public fun <T, R> DeviceState.Companion.map(
|
|||||||
|
|
||||||
public fun <T, R> DeviceState<T>.map(mapper: (T) -> R): DeviceStateWithDependencies<R> = DeviceState.map(this, mapper)
|
public fun <T, R> DeviceState<T>.map(mapper: (T) -> R): DeviceStateWithDependencies<R> = DeviceState.map(this, mapper)
|
||||||
|
|
||||||
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> = object : DeviceState<Double> {
|
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> =
|
||||||
override val value: Double
|
object : DeviceState<Double> {
|
||||||
get() = this@values.value.value
|
override val value: Double
|
||||||
|
get() = this@values.value.value
|
||||||
|
|
||||||
override val valueFlow: Flow<Double>
|
override val valueFlow: Flow<Double>
|
||||||
get() = this@values.valueFlow.map { it.value }
|
get() = this@values.valueFlow.map { it.value }
|
||||||
|
|
||||||
override fun toString(): String = this@values.toString()
|
override fun toString(): String = this@values.toString()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Combine two device states into one read-only [DeviceState]. Only the latest value of each state is used.
|
* Combine two device states into one read-only [DeviceState]. Only the latest value of each state is used.
|
||||||
|
@ -41,13 +41,22 @@ private object InstantConverter : MetaConverter<Instant> {
|
|||||||
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
|
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
|
||||||
|
|
||||||
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
|
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
|
||||||
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? = source.value?.doubleArray?.let { (start, end)->
|
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? =
|
||||||
start..end
|
source.value?.doubleArray?.let { (start, end) ->
|
||||||
}
|
start..end
|
||||||
|
}
|
||||||
|
|
||||||
override fun convert(
|
override fun convert(
|
||||||
obj: ClosedFloatingPointRange<Double>,
|
obj: ClosedFloatingPointRange<Double>,
|
||||||
): Meta = Meta(doubleArrayOf(obj.start, obj.endInclusive).asValue())
|
): Meta = Meta(doubleArrayOf(obj.start, obj.endInclusive).asValue())
|
||||||
}
|
}
|
||||||
|
|
||||||
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
|
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
|
||||||
|
|
||||||
|
private object StringListConverter : MetaConverter<List<String>> {
|
||||||
|
override fun convert(obj: List<String>): Meta = Meta(obj.map { it.asValue() }.asValue())
|
||||||
|
|
||||||
|
override fun readOrNull(source: Meta): List<String>? = source.stringList ?: source["@jsonArray"]?.stringList
|
||||||
|
}
|
||||||
|
|
||||||
|
public val MetaConverter.Companion.stringList: MetaConverter<List<String>> get() = StringListConverter
|
||||||
|
@ -2,14 +2,13 @@ package space.kscience.controls.peer
|
|||||||
|
|
||||||
import space.kscience.dataforge.io.Envelope
|
import space.kscience.dataforge.io.Envelope
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A manager that allows direct synchronous sending and receiving binary data
|
* A manager that allows direct synchronous sending and receiving binary data
|
||||||
*/
|
*/
|
||||||
public interface PeerConnection {
|
public interface PeerConnection {
|
||||||
/**
|
/**
|
||||||
* Receive an [Envelope] from a device with name [deviceName] on a given [address] with given [contentId].
|
* Receive an [Envelope] from a device on a given [address] with given [contentId].
|
||||||
*
|
*
|
||||||
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
|
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
|
||||||
* magix endpoint name.
|
* magix endpoint name.
|
||||||
@ -20,13 +19,12 @@ public interface PeerConnection {
|
|||||||
*/
|
*/
|
||||||
public suspend fun receive(
|
public suspend fun receive(
|
||||||
address: String,
|
address: String,
|
||||||
deviceName: Name,
|
|
||||||
contentId: String,
|
contentId: String,
|
||||||
requestMeta: Meta = Meta.EMPTY,
|
requestMeta: Meta = Meta.EMPTY,
|
||||||
): Envelope
|
): Envelope
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send an [envelope] to a device with name [deviceName] on a given [address]
|
* Send an [envelope] to a device on a given [address]
|
||||||
*
|
*
|
||||||
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
|
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
|
||||||
* magix endpoint name.
|
* magix endpoint name.
|
||||||
@ -35,7 +33,6 @@ public interface PeerConnection {
|
|||||||
*/
|
*/
|
||||||
public suspend fun send(
|
public suspend fun send(
|
||||||
address: String,
|
address: String,
|
||||||
deviceName: Name,
|
|
||||||
envelope: Envelope,
|
envelope: Envelope,
|
||||||
requestMeta: Meta = Meta.EMPTY,
|
requestMeta: Meta = Meta.EMPTY,
|
||||||
)
|
)
|
||||||
|
@ -19,10 +19,13 @@ import space.kscience.dataforge.meta.Meta
|
|||||||
public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T =
|
public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T =
|
||||||
propertySpec.converter.readOrNull(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
|
propertySpec.converter.readOrNull(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
|
||||||
|
|
||||||
|
|
||||||
public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T =
|
public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T =
|
||||||
propertySpec.converter.read(getOrReadProperty(propertySpec.name))
|
propertySpec.converter.read(getOrReadProperty(propertySpec.name))
|
||||||
|
|
||||||
|
public fun <T> DeviceClient.getCached(propertySpec: DevicePropertySpec<*, T>): T? =
|
||||||
|
getProperty(propertySpec.name)?.let { propertySpec.converter.read(it) }
|
||||||
|
|
||||||
|
|
||||||
public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) {
|
public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) {
|
||||||
writeProperty(propertySpec.name, propertySpec.converter.convert(value))
|
writeProperty(propertySpec.name, propertySpec.converter.convert(value))
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@ import space.kscience.dataforge.names.get
|
|||||||
import space.kscience.magix.api.MagixEndpoint
|
import space.kscience.magix.api.MagixEndpoint
|
||||||
import space.kscience.magix.api.MagixFlowPlugin
|
import space.kscience.magix.api.MagixFlowPlugin
|
||||||
import space.kscience.magix.api.MagixMessage
|
import space.kscience.magix.api.MagixMessage
|
||||||
|
import space.kscience.magix.api.start
|
||||||
import space.kscience.magix.server.magixModule
|
import space.kscience.magix.server.magixModule
|
||||||
|
|
||||||
|
|
||||||
@ -215,5 +216,6 @@ public fun Application.deviceManagerModule(
|
|||||||
plugins.forEach {
|
plugins.forEach {
|
||||||
it.start(this, magixFlow)
|
it.start(this, magixFlow)
|
||||||
}
|
}
|
||||||
|
|
||||||
magixModule(magixFlow)
|
magixModule(magixFlow)
|
||||||
}
|
}
|
@ -13,7 +13,9 @@ kscience {
|
|||||||
commonMain {
|
commonMain {
|
||||||
implementation(projects.controlsVisualisationCompose)
|
implementation(projects.controlsVisualisationCompose)
|
||||||
implementation(projects.controlsConstructor)
|
implementation(projects.controlsConstructor)
|
||||||
|
implementation(projects.magix.magixServer)
|
||||||
implementation(projects.magix.magixRsocket)
|
implementation(projects.magix.magixRsocket)
|
||||||
|
implementation(projects.controlsMagix)
|
||||||
}
|
}
|
||||||
jvmMain {
|
jvmMain {
|
||||||
// implementation("io.ktor:ktor-server-cio")
|
// implementation("io.ktor:ktor-server-cio")
|
||||||
|
@ -3,27 +3,36 @@
|
|||||||
package space.kscience.controls.demo.collective
|
package space.kscience.controls.demo.collective
|
||||||
|
|
||||||
import space.kscience.controls.api.Device
|
import space.kscience.controls.api.Device
|
||||||
import space.kscience.controls.constructor.DeviceConstructor
|
import space.kscience.controls.constructor.*
|
||||||
import space.kscience.controls.constructor.MutableDeviceState
|
import space.kscience.controls.misc.stringList
|
||||||
import space.kscience.controls.constructor.registerAsProperty
|
import space.kscience.controls.peer.PeerConnection
|
||||||
import space.kscience.controls.spec.DeviceSpec
|
import space.kscience.controls.spec.DeviceSpec
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.meta.MetaConverter
|
import space.kscience.dataforge.meta.MetaConverter
|
||||||
import space.kscience.dataforge.meta.Scheme
|
import space.kscience.dataforge.meta.Scheme
|
||||||
|
import space.kscience.dataforge.meta.int
|
||||||
import space.kscience.dataforge.meta.string
|
import space.kscience.dataforge.meta.string
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import space.kscience.maps.coordinates.Gmc
|
import space.kscience.maps.coordinates.Gmc
|
||||||
|
import space.kscience.maps.coordinates.GmcCurve
|
||||||
import kotlin.time.Duration.Companion.milliseconds
|
import kotlin.time.Duration.Companion.milliseconds
|
||||||
|
|
||||||
class CollectiveDeviceConfiguration(deviceId: DeviceId) : Scheme() {
|
typealias CollectiveDeviceId = String
|
||||||
|
|
||||||
|
class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() {
|
||||||
var deviceId by string(deviceId)
|
var deviceId by string(deviceId)
|
||||||
var description by string()
|
var description by string()
|
||||||
|
var reportInterval by int(500)
|
||||||
|
var radioFrequency by string(default = "169 MHz")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typealias CollectiveDeviceRoster = Map<CollectiveDeviceId, CollectiveDeviceConfiguration>
|
||||||
|
|
||||||
interface CollectiveDevice : Device {
|
interface CollectiveDevice : Device {
|
||||||
|
|
||||||
public val id: DeviceId
|
public val id: CollectiveDeviceId
|
||||||
|
|
||||||
|
public val peerConnection: PeerConnection
|
||||||
|
|
||||||
suspend fun getPosition(): Gmc
|
suspend fun getPosition(): Gmc
|
||||||
|
|
||||||
@ -31,8 +40,7 @@ interface CollectiveDevice : Device {
|
|||||||
|
|
||||||
suspend fun setVelocity(value: GmcVelocity)
|
suspend fun setVelocity(value: GmcVelocity)
|
||||||
|
|
||||||
suspend fun listVisible(): Collection<DeviceId>
|
suspend fun listVisible(): Collection<CollectiveDeviceId>
|
||||||
|
|
||||||
|
|
||||||
companion object : DeviceSpec<CollectiveDevice>() {
|
companion object : DeviceSpec<CollectiveDevice>() {
|
||||||
val position by property<Gmc>(
|
val position by property<Gmc>(
|
||||||
@ -45,6 +53,17 @@ interface CollectiveDevice : Device {
|
|||||||
read = { getVelocity() },
|
read = { getVelocity() },
|
||||||
write = { _, value -> setVelocity(value) }
|
write = { _, value -> setVelocity(value) }
|
||||||
)
|
)
|
||||||
|
|
||||||
|
val visibleNeighbors by property(
|
||||||
|
MetaConverter.stringList,
|
||||||
|
read = {
|
||||||
|
listVisible().toList()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// val listVisible by action(MetaConverter.unit, MetaConverter.valueList<String> { it.string }) {
|
||||||
|
// listVisible().toList()
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,13 +73,34 @@ class CollectiveDeviceConstructor(
|
|||||||
val configuration: CollectiveDeviceConfiguration,
|
val configuration: CollectiveDeviceConfiguration,
|
||||||
position: MutableDeviceState<Gmc>,
|
position: MutableDeviceState<Gmc>,
|
||||||
velocity: MutableDeviceState<GmcVelocity>,
|
velocity: MutableDeviceState<GmcVelocity>,
|
||||||
private val listVisible: suspend () -> Collection<DeviceId>,
|
override val peerConnection: PeerConnection,
|
||||||
|
private val observation: suspend () -> Map<CollectiveDeviceId, GmcCurve>,
|
||||||
) : DeviceConstructor(context, configuration.meta), CollectiveDevice {
|
) : DeviceConstructor(context, configuration.meta), CollectiveDevice {
|
||||||
|
|
||||||
override val id: DeviceId get() = configuration.deviceId
|
override val id: CollectiveDeviceId get() = configuration.deviceId
|
||||||
|
|
||||||
val position = registerAsProperty(CollectiveDevice.position, position.sample(500.milliseconds))
|
val position = registerAsProperty(
|
||||||
val velocity = registerAsProperty(CollectiveDevice.velocity, velocity)
|
CollectiveDevice.position,
|
||||||
|
position.sample(configuration.reportInterval.milliseconds)
|
||||||
|
)
|
||||||
|
|
||||||
|
val velocity = registerAsProperty(
|
||||||
|
CollectiveDevice.velocity,
|
||||||
|
velocity.sample(configuration.reportInterval.milliseconds)
|
||||||
|
)
|
||||||
|
|
||||||
|
private val _visibleNeighbors: MutableDeviceState<Collection<CollectiveDeviceId>> = stateOf(emptyList())
|
||||||
|
|
||||||
|
val visibleNeighbors = registerAsProperty(
|
||||||
|
CollectiveDevice.visibleNeighbors,
|
||||||
|
_visibleNeighbors.map { it.toList() }
|
||||||
|
)
|
||||||
|
|
||||||
|
init {
|
||||||
|
position.onNext {
|
||||||
|
_visibleNeighbors.value = observation.invoke().keys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun getPosition(): Gmc = position.value
|
override suspend fun getPosition(): Gmc = position.value
|
||||||
|
|
||||||
@ -70,5 +110,5 @@ class CollectiveDeviceConstructor(
|
|||||||
velocity.value = value
|
velocity.value = value
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun listVisible(): Collection<DeviceId> = listVisible.invoke()
|
override suspend fun listVisible(): Collection<CollectiveDeviceId> = observation.invoke().keys
|
||||||
}
|
}
|
@ -1,27 +1,39 @@
|
|||||||
package space.kscience.controls.demo.collective
|
package space.kscience.controls.demo.collective
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import space.kscience.controls.client.launchMagixService
|
||||||
import space.kscience.controls.constructor.ModelConstructor
|
import space.kscience.controls.constructor.ModelConstructor
|
||||||
import space.kscience.controls.constructor.MutableDeviceState
|
import space.kscience.controls.constructor.MutableDeviceState
|
||||||
import space.kscience.controls.constructor.onTimer
|
import space.kscience.controls.constructor.onTimer
|
||||||
|
import space.kscience.controls.manager.DeviceManager
|
||||||
|
import space.kscience.controls.manager.install
|
||||||
|
import space.kscience.controls.peer.PeerConnection
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.request
|
||||||
|
import space.kscience.dataforge.io.Envelope
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.names.parseAsName
|
||||||
|
import space.kscience.magix.api.MagixEndpoint
|
||||||
|
import space.kscience.magix.rsocket.rSocketWithWebSockets
|
||||||
|
import space.kscience.magix.server.startMagixServer
|
||||||
import space.kscience.maps.coordinates.*
|
import space.kscience.maps.coordinates.*
|
||||||
|
|
||||||
|
|
||||||
typealias DeviceId = String
|
internal data class CollectiveDeviceState(
|
||||||
|
val id: CollectiveDeviceId,
|
||||||
|
|
||||||
internal data class VirtualDeviceState(
|
|
||||||
val id: DeviceId,
|
|
||||||
val configuration: CollectiveDeviceConfiguration,
|
val configuration: CollectiveDeviceConfiguration,
|
||||||
val position: MutableDeviceState<Gmc>,
|
val position: MutableDeviceState<Gmc>,
|
||||||
val velocity: MutableDeviceState<GmcVelocity>,
|
val velocity: MutableDeviceState<GmcVelocity>,
|
||||||
)
|
)
|
||||||
|
|
||||||
internal fun VirtualDeviceState(
|
internal fun VirtualDeviceState(
|
||||||
id: DeviceId,
|
id: CollectiveDeviceId,
|
||||||
position: Gmc,
|
position: Gmc,
|
||||||
configuration: CollectiveDeviceConfiguration.() -> Unit = {},
|
configuration: CollectiveDeviceConfiguration.() -> Unit = {},
|
||||||
) = VirtualDeviceState(
|
) = CollectiveDeviceState(
|
||||||
id,
|
id,
|
||||||
CollectiveDeviceConfiguration(id).apply(configuration),
|
CollectiveDeviceConfiguration(id).apply(configuration),
|
||||||
MutableDeviceState(position),
|
MutableDeviceState(position),
|
||||||
@ -31,9 +43,10 @@ internal fun VirtualDeviceState(
|
|||||||
|
|
||||||
internal class DeviceCollectiveModel(
|
internal class DeviceCollectiveModel(
|
||||||
context: Context,
|
context: Context,
|
||||||
val deviceStates: Collection<VirtualDeviceState>,
|
val deviceStates: Collection<CollectiveDeviceState>,
|
||||||
val visibilityRange: Distance,
|
val visibilityRange: Distance = 0.5.kilometers,
|
||||||
) : ModelConstructor(context) {
|
val radioRange: Distance = 5.kilometers,
|
||||||
|
) : ModelConstructor(context), PeerConnection {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Propagate movement
|
* Propagate movement
|
||||||
@ -45,7 +58,7 @@ internal class DeviceCollectiveModel(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun locateVisible(id: DeviceId): Map<DeviceId, GmcCurve> {
|
private fun locateVisible(id: CollectiveDeviceId): Map<CollectiveDeviceId, GmcCurve> {
|
||||||
val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value }
|
val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value }
|
||||||
|
|
||||||
val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id")
|
val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id")
|
||||||
@ -56,4 +69,53 @@ internal class DeviceCollectiveModel(
|
|||||||
|
|
||||||
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
|
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val devices = deviceStates.associate {
|
||||||
|
val device = CollectiveDeviceConstructor(
|
||||||
|
context = context,
|
||||||
|
configuration = it.configuration,
|
||||||
|
position = it.position,
|
||||||
|
velocity = it.velocity,
|
||||||
|
peerConnection = this,
|
||||||
|
) {
|
||||||
|
locateVisible(it.id)
|
||||||
|
}
|
||||||
|
//start movement program
|
||||||
|
device.moveInCircles()
|
||||||
|
it.id to device
|
||||||
|
}
|
||||||
|
|
||||||
|
val roster = deviceStates.associate { it.id to it.configuration }
|
||||||
|
|
||||||
|
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope {
|
||||||
|
TODO("Not yet implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
|
||||||
|
// devices.values.filter { it.configuration.radioFrequency == address }.forEach { device ->
|
||||||
|
// ```
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal fun CoroutineScope.launchCollectiveMagixServer(
|
||||||
|
collectiveModel: DeviceCollectiveModel,
|
||||||
|
): Job = launch(Dispatchers.IO) {
|
||||||
|
val server = startMagixServer(
|
||||||
|
// RSocketMagixFlowPlugin()
|
||||||
|
)
|
||||||
|
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
|
||||||
|
|
||||||
|
collectiveModel.devices.forEach { (id, device) ->
|
||||||
|
val deviceContext = collectiveModel.context.buildContext(id.parseAsName()) {
|
||||||
|
coroutineContext(coroutineContext)
|
||||||
|
plugin(DeviceManager)
|
||||||
|
}
|
||||||
|
|
||||||
|
deviceContext.install(id, device)
|
||||||
|
|
||||||
|
// val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
|
||||||
|
|
||||||
|
deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id)
|
||||||
|
}
|
||||||
}
|
}
|
@ -4,6 +4,7 @@ import kotlinx.coroutines.FlowPreview
|
|||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.sample
|
import kotlinx.coroutines.flow.sample
|
||||||
import space.kscience.controls.constructor.DeviceState
|
import space.kscience.controls.constructor.DeviceState
|
||||||
|
import space.kscience.controls.constructor.MutableDeviceState
|
||||||
import kotlin.time.Duration
|
import kotlin.time.Duration
|
||||||
|
|
||||||
@OptIn(FlowPreview::class)
|
@OptIn(FlowPreview::class)
|
||||||
@ -11,10 +12,24 @@ class SampleDeviceState<T>(
|
|||||||
val origin: DeviceState<T>,
|
val origin: DeviceState<T>,
|
||||||
val interval: Duration,
|
val interval: Duration,
|
||||||
) : DeviceState<T> {
|
) : DeviceState<T> {
|
||||||
override val value: T get() = origin.value
|
override val value: T by origin::value
|
||||||
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
|
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
|
||||||
|
|
||||||
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
|
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
|
||||||
}
|
}
|
||||||
|
|
||||||
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
|
|
||||||
|
fun <T> DeviceState<T>.sample(interval: Duration) = SampleDeviceState(this, interval)
|
||||||
|
|
||||||
|
@OptIn(FlowPreview::class)
|
||||||
|
class MutableSampleDeviceState<T>(
|
||||||
|
val origin: MutableDeviceState<T>,
|
||||||
|
val interval: Duration,
|
||||||
|
) : MutableDeviceState<T> {
|
||||||
|
override var value: T by origin::value
|
||||||
|
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
|
||||||
|
|
||||||
|
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
|
||||||
|
}
|
||||||
|
|
||||||
|
fun <T> MutableDeviceState<T>.sample(interval: Duration) = MutableSampleDeviceState(this, interval)
|
@ -12,6 +12,8 @@ import space.kscience.maps.coordinates.Gmc
|
|||||||
import space.kscience.maps.coordinates.kilometers
|
import space.kscience.maps.coordinates.kilometers
|
||||||
import kotlin.math.PI
|
import kotlin.math.PI
|
||||||
import kotlin.random.Random
|
import kotlin.random.Random
|
||||||
|
import kotlin.time.Duration
|
||||||
|
import kotlin.time.Duration.Companion.milliseconds
|
||||||
|
|
||||||
private val deviceVelocity = 0.1.kilometers
|
private val deviceVelocity = 0.1.kilometers
|
||||||
|
|
||||||
@ -19,8 +21,13 @@ private val center = Gmc.ofDegrees(55.925, 37.514)
|
|||||||
private val radius = 0.01.degrees
|
private val radius = 0.01.degrees
|
||||||
|
|
||||||
|
|
||||||
internal fun generateModel(context: Context): DeviceCollectiveModel {
|
internal fun generateModel(
|
||||||
val devices: List<VirtualDeviceState> = List(100) { index ->
|
context: Context,
|
||||||
|
size: Int = 50,
|
||||||
|
reportInterval: Duration = 500.milliseconds,
|
||||||
|
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
|
||||||
|
): DeviceCollectiveModel {
|
||||||
|
val devices: List<CollectiveDeviceState> = List(size) { index ->
|
||||||
val id = "device[$index]"
|
val id = "device[$index]"
|
||||||
|
|
||||||
VirtualDeviceState(
|
VirtualDeviceState(
|
||||||
@ -32,10 +39,12 @@ internal fun generateModel(context: Context): DeviceCollectiveModel {
|
|||||||
) {
|
) {
|
||||||
deviceId = id
|
deviceId = id
|
||||||
description = "Virtual remote device $id"
|
description = "Virtual remote device $id"
|
||||||
|
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
|
||||||
|
additionalConfiguration()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val model = DeviceCollectiveModel(context, devices, 0.2.kilometers)
|
val model = DeviceCollectiveModel(context, devices)
|
||||||
|
|
||||||
return model
|
return model
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@ import androidx.compose.foundation.layout.padding
|
|||||||
import androidx.compose.material.Card
|
import androidx.compose.material.Card
|
||||||
import androidx.compose.material.Checkbox
|
import androidx.compose.material.Checkbox
|
||||||
import androidx.compose.material.Text
|
import androidx.compose.material.Text
|
||||||
|
import androidx.compose.material3.CircularProgressIndicator
|
||||||
import androidx.compose.material3.MaterialTheme
|
import androidx.compose.material3.MaterialTheme
|
||||||
import androidx.compose.runtime.*
|
import androidx.compose.runtime.*
|
||||||
import androidx.compose.ui.Alignment
|
import androidx.compose.ui.Alignment
|
||||||
@ -24,74 +25,103 @@ import androidx.compose.ui.window.Window
|
|||||||
import androidx.compose.ui.window.application
|
import androidx.compose.ui.window.application
|
||||||
import io.ktor.client.HttpClient
|
import io.ktor.client.HttpClient
|
||||||
import io.ktor.client.engine.cio.CIO
|
import io.ktor.client.engine.cio.CIO
|
||||||
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
|
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
|
||||||
import org.jetbrains.compose.splitpane.HorizontalSplitPane
|
import org.jetbrains.compose.splitpane.HorizontalSplitPane
|
||||||
import org.jetbrains.compose.splitpane.rememberSplitPaneState
|
import org.jetbrains.compose.splitpane.rememberSplitPaneState
|
||||||
|
import space.kscience.controls.api.PropertyChangedMessage
|
||||||
|
import space.kscience.controls.client.*
|
||||||
import space.kscience.controls.compose.conditional
|
import space.kscience.controls.compose.conditional
|
||||||
import space.kscience.controls.manager.DeviceManager
|
import space.kscience.controls.manager.DeviceManager
|
||||||
import space.kscience.controls.spec.useProperty
|
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.request
|
import space.kscience.dataforge.context.ContextBuilder
|
||||||
|
import space.kscience.dataforge.meta.MetaConverter
|
||||||
|
import space.kscience.dataforge.names.parseAsName
|
||||||
|
import space.kscience.magix.api.MagixEndpoint
|
||||||
|
import space.kscience.magix.api.subscribe
|
||||||
|
import space.kscience.magix.rsocket.rSocketWithWebSockets
|
||||||
import space.kscience.maps.compose.MapView
|
import space.kscience.maps.compose.MapView
|
||||||
import space.kscience.maps.compose.OpenStreetMapTileProvider
|
import space.kscience.maps.compose.OpenStreetMapTileProvider
|
||||||
|
import space.kscience.maps.coordinates.Gmc
|
||||||
|
import space.kscience.maps.coordinates.meters
|
||||||
import space.kscience.maps.features.*
|
import space.kscience.maps.features.*
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
import kotlin.time.Duration.Companion.seconds
|
||||||
|
|
||||||
|
|
||||||
@Composable
|
@Composable
|
||||||
fun rememberDeviceManager(): DeviceManager = remember {
|
fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}): Context = remember {
|
||||||
val context = Context {
|
Context(name, contextBuilder)
|
||||||
plugin(DeviceManager)
|
|
||||||
}
|
|
||||||
|
|
||||||
context.request(DeviceManager)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val gmcMetaConverter = MetaConverter.serializable<Gmc>()
|
||||||
|
|
||||||
@Composable
|
@Composable
|
||||||
fun App() {
|
fun App() {
|
||||||
val scope = rememberCoroutineScope()
|
val scope = rememberCoroutineScope()
|
||||||
|
|
||||||
|
val parentContext = rememberContext("Parent") {
|
||||||
val deviceManager = rememberDeviceManager()
|
plugin(DeviceManager)
|
||||||
|
|
||||||
|
|
||||||
val collectiveModel = remember {
|
|
||||||
generateModel(deviceManager.context)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val devices: Map<DeviceId, CollectiveDevice> = remember {
|
val collectiveModel = remember {
|
||||||
collectiveModel.deviceStates.associate {
|
generateModel(parentContext, 100, reportInterval = 1.seconds)
|
||||||
val device = CollectiveDeviceConstructor(
|
}
|
||||||
context = deviceManager.context,
|
|
||||||
configuration = it.configuration,
|
val roster = remember {
|
||||||
position = it.position,
|
collectiveModel.roster
|
||||||
velocity = it.velocity
|
}
|
||||||
) {
|
|
||||||
collectiveModel.locateVisible(it.id).keys
|
val client = remember { CompletableDeferred<MagixEndpoint>() }
|
||||||
|
|
||||||
|
val devices = remember { mutableStateMapOf<CollectiveDeviceId, DeviceClient>() }
|
||||||
|
|
||||||
|
|
||||||
|
LaunchedEffect(collectiveModel) {
|
||||||
|
launchCollectiveMagixServer(collectiveModel)
|
||||||
|
|
||||||
|
withContext(Dispatchers.IO) {
|
||||||
|
val magixClient = MagixEndpoint.rSocketWithWebSockets("localhost")
|
||||||
|
|
||||||
|
client.complete(magixClient)
|
||||||
|
|
||||||
|
collectiveModel.roster.forEach { (id, config) ->
|
||||||
|
scope.launch {
|
||||||
|
devices[id] = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
device.moveInCircles()
|
}
|
||||||
it.id to device
|
|
||||||
|
}
|
||||||
|
|
||||||
|
var selectedDeviceId by remember { mutableStateOf<CollectiveDeviceId?>(null) }
|
||||||
|
|
||||||
|
var currentPosition by remember { mutableStateOf<Gmc?>(null) }
|
||||||
|
|
||||||
|
LaunchedEffect(selectedDeviceId, devices) {
|
||||||
|
selectedDeviceId?.let { devices[it] }?.propertyFlow(CollectiveDevice.position)?.collect {
|
||||||
|
currentPosition = it
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var selectedDeviceId by remember { mutableStateOf<DeviceId?>(null) }
|
|
||||||
var showOnlyVisible by remember { mutableStateOf(false) }
|
var showOnlyVisible by remember { mutableStateOf(false) }
|
||||||
|
|
||||||
val mapTileProvider = remember {
|
|
||||||
OpenStreetMapTileProvider(
|
|
||||||
client = HttpClient(CIO),
|
|
||||||
cacheDirectory = Path.of("mapCache")
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
HorizontalSplitPane(
|
HorizontalSplitPane(
|
||||||
splitPaneState = rememberSplitPaneState(0.9f)
|
splitPaneState = rememberSplitPaneState(0.9f)
|
||||||
) {
|
) {
|
||||||
first(400.dp) {
|
first(400.dp) {
|
||||||
MapView(
|
MapView(
|
||||||
mapTileProvider = mapTileProvider,
|
mapTileProvider = remember {
|
||||||
|
OpenStreetMapTileProvider(
|
||||||
|
client = HttpClient(CIO),
|
||||||
|
cacheDirectory = Path.of("mapCache")
|
||||||
|
)
|
||||||
|
},
|
||||||
config = ViewConfig()
|
config = ViewConfig()
|
||||||
) {
|
) {
|
||||||
collectiveModel.deviceStates.forEach { device ->
|
collectiveModel.deviceStates.forEach { device ->
|
||||||
@ -103,82 +133,104 @@ fun App() {
|
|||||||
}.launchIn(scope)
|
}.launchIn(scope)
|
||||||
}
|
}
|
||||||
|
|
||||||
devices.forEach { (id, device) ->
|
scope.launch {
|
||||||
device.useProperty(CollectiveDevice.position, scope = scope) { position ->
|
|
||||||
|
|
||||||
val activeDevice = selectedDeviceId?.let { devices[it] }
|
client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
|
||||||
|
if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") {
|
||||||
|
val id = magixMessage.sourceEndpoint
|
||||||
|
val position = gmcMetaConverter.read(deviceMessage.value)
|
||||||
|
val activeDevice = selectedDeviceId?.let { devices[it] }
|
||||||
|
|
||||||
if (activeDevice == null || id == selectedDeviceId || !showOnlyVisible || id in activeDevice.listVisible()) {
|
if (
|
||||||
rectangle(
|
activeDevice == null ||
|
||||||
position,
|
id == selectedDeviceId ||
|
||||||
id = id,
|
!showOnlyVisible ||
|
||||||
size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp)
|
id in activeDevice.request(CollectiveDevice.visibleNeighbors)
|
||||||
).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue)
|
) {
|
||||||
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f)
|
rectangle(
|
||||||
.onClick { selectedDeviceId = id }
|
position,
|
||||||
} else {
|
id = id,
|
||||||
removeFeature(id)
|
size = if (selectedDeviceId == id) DpSize(10.dp, 10.dp) else DpSize(5.dp, 5.dp)
|
||||||
|
).color(if (selectedDeviceId == id) Color.Magenta else Color.Blue)
|
||||||
|
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == id) 1f else 0.5f)
|
||||||
|
.onClick { selectedDeviceId = id }
|
||||||
|
} else {
|
||||||
|
removeFeature(id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}.launchIn(scope)
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
second(200.dp) {
|
second(200.dp) {
|
||||||
Column {
|
|
||||||
selectedDeviceId?.let { id ->
|
Column(
|
||||||
Column(
|
modifier = Modifier.verticalScroll(rememberScrollState())
|
||||||
modifier = Modifier
|
) {
|
||||||
.padding(8.dp)
|
collectiveModel.roster.forEach { (id, _) ->
|
||||||
.border(2.dp, Color.DarkGray)
|
Card(
|
||||||
|
elevation = 16.dp,
|
||||||
|
modifier = Modifier.padding(8.dp).onClick {
|
||||||
|
selectedDeviceId = id
|
||||||
|
}.conditional(id == selectedDeviceId) {
|
||||||
|
border(2.dp, Color.Blue)
|
||||||
|
},
|
||||||
) {
|
) {
|
||||||
Card(
|
Column(
|
||||||
elevation = 16.dp,
|
modifier = Modifier.padding(8.dp)
|
||||||
) {
|
) {
|
||||||
Text(
|
Row(verticalAlignment = Alignment.CenterVertically) {
|
||||||
text = "Выбран: $id",
|
if (devices[id] == null) {
|
||||||
fontSize = 16.sp,
|
CircularProgressIndicator()
|
||||||
fontWeight = FontWeight.Bold,
|
}
|
||||||
modifier = Modifier.padding(10.dp).fillMaxWidth()
|
Text(
|
||||||
)
|
text = id,
|
||||||
}
|
fontSize = 16.sp,
|
||||||
devices[id]?.let {
|
fontWeight = FontWeight.Bold,
|
||||||
Text(it.meta.toString(), Modifier.padding(10.dp))
|
modifier = Modifier.padding(10.dp).fillMaxWidth(),
|
||||||
}
|
)
|
||||||
Row(verticalAlignment = Alignment.CenterVertically, modifier = Modifier.padding(10.dp).fillMaxWidth()) {
|
}
|
||||||
Text("Показать только видимые")
|
if (id == selectedDeviceId) {
|
||||||
Checkbox(showOnlyVisible, { showOnlyVisible = it })
|
roster[id]?.let {
|
||||||
}
|
Text("Meta:", color = Color.Blue, fontWeight = FontWeight.Bold)
|
||||||
}
|
Card(elevation = 16.dp, modifier = Modifier.fillMaxWidth().padding(8.dp)) {
|
||||||
}
|
Text(it.toString())
|
||||||
Column(
|
}
|
||||||
modifier = Modifier.verticalScroll(rememberScrollState())
|
}
|
||||||
) {
|
|
||||||
devices.forEach { (id, device) ->
|
currentPosition?.let { currentPosition ->
|
||||||
Card(
|
Text(
|
||||||
elevation = 16.dp,
|
"Широта: ${String.format("%.3f", currentPosition.latitude.toDegrees().value)}"
|
||||||
modifier = Modifier.padding(8.dp).onClick {
|
)
|
||||||
selectedDeviceId = id
|
Text(
|
||||||
}.conditional(id == selectedDeviceId) {
|
"Долгота: ${String.format("%.3f", currentPosition.longitude.toDegrees().value)}"
|
||||||
border(2.dp, Color.Blue)
|
)
|
||||||
|
currentPosition.elevation?.let {
|
||||||
|
Text("Высота: ${String.format("%.1f", it.meters)} м")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Row(
|
||||||
|
verticalAlignment = Alignment.CenterVertically,
|
||||||
|
modifier = Modifier.fillMaxWidth()
|
||||||
|
) {
|
||||||
|
Text("Показать только видимые")
|
||||||
|
Checkbox(showOnlyVisible, { showOnlyVisible = it })
|
||||||
|
}
|
||||||
}
|
}
|
||||||
) {
|
|
||||||
Text(
|
|
||||||
text = id,
|
|
||||||
fontSize = 16.sp,
|
|
||||||
fontWeight = FontWeight.Bold,
|
|
||||||
modifier = Modifier.padding(10.dp).fillMaxWidth()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fun main() = application {
|
fun main() = application {
|
||||||
|
// System.setProperty(IO_PARALLELISM_PROPERTY_NAME, 300.toString())
|
||||||
Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) {
|
Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) {
|
||||||
MaterialTheme {
|
MaterialTheme {
|
||||||
App()
|
App()
|
||||||
|
@ -83,8 +83,7 @@ suspend fun main() {
|
|||||||
|
|
||||||
val endpointId = "device$it"
|
val endpointId = "device$it"
|
||||||
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
|
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
|
||||||
deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO)
|
deviceManager.launchMagixService(deviceEndpoint, endpointId)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
val trace = Bar {
|
val trace = Bar {
|
||||||
|
@ -21,9 +21,10 @@ public fun interface MagixFlowPlugin {
|
|||||||
sendMessage: suspend (MagixMessage) -> Unit,
|
sendMessage: suspend (MagixMessage) -> Unit,
|
||||||
): Job
|
): Job
|
||||||
|
|
||||||
/**
|
}
|
||||||
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
|
|
||||||
*/
|
/**
|
||||||
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
|
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
|
||||||
start(scope, magixFlow) { magixFlow.emit(it) }
|
*/
|
||||||
}
|
public fun MagixFlowPlugin.start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
|
||||||
|
start(scope, magixFlow) { magixFlow.emit(it) }
|
@ -10,7 +10,9 @@ import io.ktor.server.util.getValue
|
|||||||
import io.ktor.server.websocket.WebSockets
|
import io.ktor.server.websocket.WebSockets
|
||||||
import io.rsocket.kotlin.ktor.server.RSocketSupport
|
import io.rsocket.kotlin.ktor.server.RSocketSupport
|
||||||
import io.rsocket.kotlin.ktor.server.rSocket
|
import io.rsocket.kotlin.ktor.server.rSocket
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
|
import kotlinx.coroutines.flow.SharedFlow
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import kotlinx.html.*
|
import kotlinx.html.*
|
||||||
import kotlinx.serialization.encodeToString
|
import kotlinx.serialization.encodeToString
|
||||||
@ -42,7 +44,11 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
|
|||||||
/**
|
/**
|
||||||
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
|
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
|
||||||
*/
|
*/
|
||||||
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") {
|
public fun Application.magixModule(
|
||||||
|
magixFlow: Flow<MagixMessage>,
|
||||||
|
send: suspend (MagixMessage) -> Unit,
|
||||||
|
route: String = "/",
|
||||||
|
) {
|
||||||
if (pluginOrNull(WebSockets) == null) {
|
if (pluginOrNull(WebSockets) == null) {
|
||||||
install(WebSockets)
|
install(WebSockets)
|
||||||
}
|
}
|
||||||
@ -62,27 +68,31 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
|
|||||||
|
|
||||||
routing {
|
routing {
|
||||||
route(route) {
|
route(route) {
|
||||||
install(ContentNegotiation){
|
install(ContentNegotiation) {
|
||||||
json()
|
json()
|
||||||
}
|
}
|
||||||
get("state") {
|
if (magixFlow is SharedFlow) {
|
||||||
call.respondHtml {
|
get("state") {
|
||||||
head {
|
call.respondHtml {
|
||||||
meta {
|
head {
|
||||||
httpEquiv = "refresh"
|
meta {
|
||||||
content = "2"
|
httpEquiv = "refresh"
|
||||||
|
content = "2"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
body {
|
||||||
body {
|
h1 { +"Magix loop statistics" }
|
||||||
h1 { +"Magix loop statistics" }
|
if (magixFlow is MutableSharedFlow) {
|
||||||
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
|
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
|
||||||
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
|
}
|
||||||
h3 { +"Replay cache:" }
|
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
|
||||||
ol {
|
h3 { +"Replay cache:" }
|
||||||
magixFlow.replayCache.forEach { message ->
|
ol {
|
||||||
li {
|
magixFlow.replayCache.forEach { message ->
|
||||||
code {
|
li {
|
||||||
+magixJson.encodeToString(message)
|
code {
|
||||||
|
+magixJson.encodeToString(message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -102,17 +112,22 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
|
|||||||
}
|
}
|
||||||
post("broadcast") {
|
post("broadcast") {
|
||||||
val message = call.receive<MagixMessage>()
|
val message = call.receive<MagixMessage>()
|
||||||
magixFlow.emit(message)
|
send(message)
|
||||||
}
|
}
|
||||||
//rSocket WS server. Filter from Payload
|
//rSocket WS server. Filter from Payload
|
||||||
rSocket(
|
rSocket(
|
||||||
"rsocket",
|
"rsocket",
|
||||||
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) }
|
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { send(it) }
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public fun Application.magixModule(
|
||||||
|
magixFlow: MutableSharedFlow<MagixMessage>,
|
||||||
|
route: String = "/",
|
||||||
|
): Unit = magixModule(magixFlow, { magixFlow.emit(it) }, route)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
|
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
|
||||||
*/
|
*/
|
||||||
|
@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
|
|||||||
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
|
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
|
||||||
import space.kscience.magix.api.MagixFlowPlugin
|
import space.kscience.magix.api.MagixFlowPlugin
|
||||||
import space.kscience.magix.api.MagixMessage
|
import space.kscience.magix.api.MagixMessage
|
||||||
|
import space.kscience.magix.api.start
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -22,7 +23,6 @@ public fun CoroutineScope.startMagixServer(
|
|||||||
|
|
||||||
val magixFlow = MutableSharedFlow<MagixMessage>(
|
val magixFlow = MutableSharedFlow<MagixMessage>(
|
||||||
replay = buffer,
|
replay = buffer,
|
||||||
extraBufferCapacity = buffer,
|
|
||||||
onBufferOverflow = BufferOverflow.DROP_OLDEST
|
onBufferOverflow = BufferOverflow.DROP_OLDEST
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user