Compare commits

..

11 Commits

52 changed files with 1003 additions and 143 deletions

3
.gitignore vendored
View File

@ -9,4 +9,7 @@
out/
build/
!gradle-wrapper.jar
/demo/device-collective/mapCache/

View File

@ -7,6 +7,7 @@
- PLC4X bindings
- Shortcuts to access all Controls devices in a magix network.
- `DeviceClient` properly evaluates lifecycle and logs
- `PeerConnection` API for direct device-device binary sharing
### Changed
- Constructor properties return `DeviceState` in order to be able to subscribe to them

View File

@ -134,6 +134,11 @@ Automatically checks consistency.
>
> **Maturity**: PROTOTYPE
### [controls-visualisation-compose](controls-visualisation-compose)
> Visualisation extension using compose-multiplatform
>
> **Maturity**: PROTOTYPE
### [demo](demo)
>
> **Maturity**: EXPERIMENTAL
@ -159,6 +164,10 @@ Automatically checks consistency.
>
> **Maturity**: EXPERIMENTAL
### [demo/device-collective](demo/device-collective)
>
> **Maturity**: EXPERIMENTAL
### [demo/echo](demo/echo)
>
> **Maturity**: EXPERIMENTAL

View File

@ -6,7 +6,7 @@ A low-code constructor for composite devices simulation
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-constructor:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-constructor:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-constructor:0.4.0-dev-1")
implementation("space.kscience:controls-constructor:0.4.0-dev-4")
}
```

View File

@ -74,15 +74,16 @@ public fun <T, R> DeviceState.Companion.map(
public fun <T, R> DeviceState<T>.map(mapper: (T) -> R): DeviceStateWithDependencies<R> = DeviceState.map(this, mapper)
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> = object : DeviceState<Double> {
override val value: Double
get() = this@values.value.value
public fun DeviceState<NumericalValue<out UnitsOfMeasurement>>.values(): DeviceState<Double> =
object : DeviceState<Double> {
override val value: Double
get() = this@values.value.value
override val valueFlow: Flow<Double>
get() = this@values.valueFlow.map { it.value }
override val valueFlow: Flow<Double>
get() = this@values.valueFlow.map { it.value }
override fun toString(): String = this@values.toString()
}
override fun toString(): String = this@values.toString()
}
/**
* Combine two device states into one read-only [DeviceState]. Only the latest value of each state is used.

View File

@ -16,7 +16,7 @@ Core interfaces for building a device server
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-core:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-core:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -26,6 +26,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-core:0.4.0-dev-1")
implementation("space.kscience:controls-core:0.4.0-dev-4")
}
```

View File

@ -73,7 +73,7 @@ public data class PropertySetMessage(
public val property: String,
public val value: Meta,
override val sourceDevice: Name? = null,
override val targetDevice: Name,
override val targetDevice: Name?,
override val comment: String? = null,
@EncodeDefault override val time: Instant = Clock.System.now(),
) : DeviceMessage() {
@ -166,12 +166,18 @@ public data class ActionResultMessage(
}
/**
* Notifies listeners that a new binary with given [binaryID] is available. The binary itself could not be provided via [DeviceMessage] API.
* Notifies listeners that a new binary with given [contentId] and [contentMeta] is available.
*
* [contentMeta] includes public information that could be shared with loop subscribers. It should not contain sensitive data.
*
* The binary itself could not be provided via [DeviceMessage] API.
* [space.kscience.controls.peer.PeerConnection] must be used instead
*/
@Serializable
@SerialName("binary.notification")
public data class BinaryNotificationMessage(
val binaryID: String,
val contentId: String,
val contentMeta: Meta,
override val sourceDevice: Name,
override val targetDevice: Name? = null,
override val comment: String? = null,

View File

@ -68,7 +68,7 @@ public suspend fun Device.respondMessage(deviceTarget: Name, request: DeviceMess
/**
* Process incoming [DeviceMessage], using hub naming to find target.
* If the `targetDevice` is `null`, then message is sent to each device in this hub
* If the `targetDevice` is `null`, then the message is sent to each device in this hub
*/
public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): List<DeviceMessage> {
return try {

View File

@ -41,9 +41,10 @@ private object InstantConverter : MetaConverter<Instant> {
public val MetaConverter.Companion.instant: MetaConverter<Instant> get() = InstantConverter
private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Double>> {
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? = source.value?.doubleArray?.let { (start, end)->
start..end
}
override fun readOrNull(source: Meta): ClosedFloatingPointRange<Double>? =
source.value?.doubleArray?.let { (start, end) ->
start..end
}
override fun convert(
obj: ClosedFloatingPointRange<Double>,
@ -51,3 +52,11 @@ private object DoubleRangeConverter : MetaConverter<ClosedFloatingPointRange<Dou
}
public val MetaConverter.Companion.doubleRange: MetaConverter<ClosedFloatingPointRange<Double>> get() = DoubleRangeConverter
private object StringListConverter : MetaConverter<List<String>> {
override fun convert(obj: List<String>): Meta = Meta(obj.map { it.asValue() }.asValue())
override fun readOrNull(source: Meta): List<String>? = source.stringList ?: source["@jsonArray"]?.stringList
}
public val MetaConverter.Companion.stringList: MetaConverter<List<String>> get() = StringListConverter

View File

@ -0,0 +1,39 @@
package space.kscience.controls.peer
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.meta.Meta
/**
* A manager that allows direct synchronous sending and receiving binary data
*/
public interface PeerConnection {
/**
* Receive an [Envelope] from a device on a given [address] with given [contentId].
*
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name.
*
* Depending on [PeerConnection] implementation, the resulting [Envelope] could be lazy loaded
*
* Additional metadata in [requestMeta] could be required for authentication.
*/
public suspend fun receive(
address: String,
contentId: String,
requestMeta: Meta = Meta.EMPTY,
): Envelope?
/**
* Send an [envelope] to a device on a given [address]
*
* The address depends on the specifics of given [PeerConnection]. For example, it could be a TCP/IP port or
* magix endpoint name.
*
* Additional metadata in [requestMeta] could be required for authentication.
*/
public suspend fun send(
address: String,
envelope: Envelope,
requestMeta: Meta = Meta.EMPTY,
)
}

View File

@ -6,7 +6,7 @@
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-jupyter:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-jupyter:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-jupyter:0.4.0-dev-1")
implementation("space.kscience:controls-jupyter:0.4.0-dev-4")
}
```

View File

@ -12,7 +12,7 @@ Magix service for binding controls devices (both as RPC client and server)
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-magix:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-magix:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -22,6 +22,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-magix:0.4.0-dev-1")
implementation("space.kscience:controls-magix:0.4.0-dev-4")
}
```

View File

@ -19,10 +19,13 @@ import space.kscience.dataforge.meta.Meta
public suspend fun <T> DeviceClient.read(propertySpec: DevicePropertySpec<*, T>): T =
propertySpec.converter.readOrNull(readProperty(propertySpec.name)) ?: error("Property read result is not valid")
public suspend fun <T> DeviceClient.request(propertySpec: DevicePropertySpec<*, T>): T =
propertySpec.converter.read(getOrReadProperty(propertySpec.name))
public fun <T> DeviceClient.getCached(propertySpec: DevicePropertySpec<*, T>): T? =
getProperty(propertySpec.name)?.let { propertySpec.converter.read(it) }
public suspend fun <T> DeviceClient.write(propertySpec: MutableDevicePropertySpec<*, T>, value: T) {
writeProperty(propertySpec.name, propertySpec.converter.convert(value))
}

View File

@ -93,7 +93,7 @@ internal class RemoteDeviceConnect {
val virtualMagixEndpoint = VirtualMagixEndpoint(deviceManager)
val remoteDevice = virtualMagixEndpoint.remoteDevice(context, "client", "device", "test".asName())
val remoteDevice: DeviceClient = virtualMagixEndpoint.remoteDevice(context, "client", "device", "test".asName())
assertContains(0.0..1.0, remoteDevice.read(TestDevice.value))

View File

@ -1,10 +1,10 @@
package space.kscience.controls.client
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import space.kscience.controls.client.RemoteDeviceConnect.TestDevice
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
@ -20,36 +20,37 @@ class MagixLoopTest {
@Test
fun realDeviceHub() = runTest {
withContext(Dispatchers.Default) {
val context = Context {
plugin(DeviceManager)
}
val server = context.startMagixServer()
val deviceManager = context.request(DeviceManager)
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, "device")
launch {
delay(50)
repeat(10) {
deviceManager.install("test[$it]", TestDevice)
}
}
val clientEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
val remoteHub = clientEndpoint.remoteDeviceHub(context, "client", "device")
assertEquals(0, remoteHub.devices.size)
delay(60)
clientEndpoint.requestDeviceUpdate("client", "device")
delay(60)
assertEquals(10, remoteHub.devices.size)
server.stop()
val context = Context {
coroutineContext(Dispatchers.Default)
plugin(DeviceManager)
}
val server = context.startMagixServer()
val deviceManager = context.request(DeviceManager)
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, "device")
val trigger = CompletableDeferred<Unit>()
context.launch {
repeat(10) {
deviceManager.install("test[$it]", TestDevice)
}
delay(100)
trigger.complete(Unit)
}
val clientEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
val remoteHub = clientEndpoint.remoteDeviceHub(context, "client", "device")
assertEquals(0, remoteHub.devices.size)
clientEndpoint.requestDeviceUpdate("client", "device")
trigger.join()
assertEquals(10, remoteHub.devices.size)
server.stop()
}
}

View File

@ -14,7 +14,7 @@ Automatically checks consistency.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-modbus:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-modbus:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -24,6 +24,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-modbus:0.4.0-dev-1")
implementation("space.kscience:controls-modbus:0.4.0-dev-4")
}
```

View File

@ -12,7 +12,7 @@ A client and server connectors for OPC-UA via Eclipse Milo
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-opcua:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-opcua:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -22,6 +22,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-opcua:0.4.0-dev-1")
implementation("space.kscience:controls-opcua:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ Utils to work with controls-kt on Raspberry pi
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-pi:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-pi:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-pi:0.4.0-dev-1")
implementation("space.kscience:controls-pi:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ Implementation of byte ports on top os ktor-io asynchronous API
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-ports-ktor:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-ports-ktor:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-ports-ktor:0.4.0-dev-1")
implementation("space.kscience:controls-ports-ktor:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ Implementation of direct serial port communication with JSerialComm
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-serial:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-serial:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-serial:0.4.0-dev-1")
implementation("space.kscience:controls-serial:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ A combined Magix event loop server with web server for visualization.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-server:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-server:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-server:0.4.0-dev-1")
implementation("space.kscience:controls-server:0.4.0-dev-4")
}
```

View File

@ -38,6 +38,7 @@ import space.kscience.dataforge.names.get
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
import space.kscience.magix.server.magixModule
@ -215,5 +216,6 @@ public fun Application.deviceManagerModule(
plugins.forEach {
it.start(this, magixFlow)
}
magixModule(magixFlow)
}

View File

@ -6,7 +6,7 @@ An API for stand-alone Controls-kt device or a hub.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-storage:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-storage:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-storage:0.4.0-dev-1")
implementation("space.kscience:controls-storage:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ An implementation of controls-storage on top of JetBrains Xodus.
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-xodus:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-xodus:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-xodus:0.4.0-dev-1")
implementation("space.kscience:controls-xodus:0.4.0-dev-4")
}
```

View File

@ -2,13 +2,11 @@
Dashboard and visualization extensions for devices
Hello world!
## Usage
## Artifact:
The Maven coordinates of this project are `space.kscience:controls-vision:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:controls-vision:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -18,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:controls-vision:0.4.0-dev-1")
implementation("space.kscience:controls-vision:0.4.0-dev-4")
}
```

View File

@ -1,3 +1,5 @@
@file:OptIn(FlowPreview::class)
package space.kscience.controls.compose
import androidx.compose.runtime.*
@ -6,11 +8,8 @@ import io.github.koalaplot.core.line.LinePlot
import io.github.koalaplot.core.style.LineStyle
import io.github.koalaplot.core.xygraph.DefaultPoint
import io.github.koalaplot.core.xygraph.XYGraphScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.controls.api.Device

View File

@ -0,0 +1,12 @@
package space.kscience.controls.compose
import androidx.compose.ui.Modifier
public inline fun Modifier.conditional(
condition: Boolean,
modifier: Modifier.() -> Modifier,
): Modifier = if (condition) {
then(modifier(Modifier))
} else {
this
}

View File

@ -15,6 +15,7 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import space.kscience.controls.constructor.*
import space.kscience.controls.constructor.devices.LimitSwitch
import space.kscience.controls.constructor.devices.StepDrive
import space.kscience.controls.constructor.devices.angle
import space.kscience.controls.constructor.models.Leadscrew
@ -31,10 +32,18 @@ private class Plotter(
context: Context,
xDrive: StepDrive,
yDrive: StepDrive,
xStartLimit: LimitSwitch,
xEndLimit: LimitSwitch,
yStartLimit: LimitSwitch,
yEndLimit: LimitSwitch,
val paint: suspend (Color) -> Unit,
) : DeviceConstructor(context) {
val xDrive by device(xDrive)
val yDrive by device(yDrive)
val xStartLimit by device(xStartLimit)
val xEndLimit by device(xEndLimit)
val yStartLimit by device(yStartLimit)
val yEndLimit by device(yEndLimit)
public fun moveToXY(x: Number, y: Number) {
xDrive.target.value = x.toLong()
@ -108,7 +117,15 @@ private class PlotterModel(
val xy: DeviceState<XY<Meters>> = combineState(x, y) { x, y -> XY(x, y) }
val plotter = Plotter(context, xDrive, yDrive) { color ->
val plotter = Plotter(
context = context,
xDrive = xDrive,
yDrive = yDrive,
xStartLimit = LimitSwitch(context,x.atStart),
xEndLimit = LimitSwitch(context,x.atEnd),
yStartLimit = LimitSwitch(context,x.atStart),
yEndLimit = LimitSwitch(context,x.atEnd),
) { color ->
println("Point X: ${x.value.value}, Y: ${y.value.value}, color: $color")
callback(PlotterPoint(x.value, y.value, color))
}

View File

@ -0,0 +1,14 @@
# Module device-collective
# Running demo from gradle
1. Install JDK 17-21 in the system (for example, from https://sdkman.io/jdks or https://github.com/ScoopInstaller/Java).
2. Clone the repository with Git.
3. Run `./gradlew :demo:device-collective:run` from the project root directory.
# Install distribution
1. Install JDK 17-21 in the system (for example, from https://sdkman.io/jdks or https://github.com/ScoopInstaller/Java).
2. Clone the repository with Git.
3. Run `./gradlew :demo:device-collective:packageUberJarForCurrentOS` from the project root directory.
4. Go to `build/compose/jars/device-collective-<your OS>-<version>.jar`. You can copy it and run with `java -jar <full-name>.jar`

View File

@ -13,6 +13,9 @@ kscience {
commonMain {
implementation(projects.controlsVisualisationCompose)
implementation(projects.controlsConstructor)
implementation(projects.magix.magixServer)
implementation(projects.magix.magixRsocket)
implementation(projects.controlsMagix)
}
jvmMain {
// implementation("io.ktor:ktor-server-cio")
@ -36,6 +39,6 @@ kotlin.explicitApi = ExplicitApiMode.Disabled
compose.desktop {
application {
mainClass = "space.kscience.controls.demo.map.MainKt"
mainClass = "space.kscience.controls.demo.collective.MainKt"
}
}

View File

@ -0,0 +1,118 @@
@file:OptIn(DFExperimental::class)
package space.kscience.controls.demo.collective
import space.kscience.controls.api.Device
import space.kscience.controls.constructor.*
import space.kscience.controls.misc.stringList
import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.DeviceSpec
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.meta.Scheme
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.GmcCurve
import kotlin.time.Duration.Companion.milliseconds
typealias CollectiveDeviceId = String
class CollectiveDeviceConfiguration(deviceId: CollectiveDeviceId) : Scheme() {
var deviceId by string(deviceId)
var description by string()
var reportInterval by int(500)
var radioFrequency by string(default = DEFAULT_FREQUENCY)
companion object {
const val DEFAULT_FREQUENCY = "169 MHz"
}
}
typealias CollectiveDeviceRoster = Map<CollectiveDeviceId, CollectiveDeviceConfiguration>
interface CollectiveDevice : Device {
public val id: CollectiveDeviceId
public val peerConnection: PeerConnection
suspend fun getPosition(): Gmc
suspend fun getVelocity(): GmcVelocity
suspend fun setVelocity(value: GmcVelocity)
suspend fun listVisible(): Collection<CollectiveDeviceId>
companion object : DeviceSpec<CollectiveDevice>() {
val position by property<Gmc>(
converter = MetaConverter.serializable(),
read = { getPosition() }
)
val velocity by mutableProperty<GmcVelocity>(
converter = MetaConverter.serializable(),
read = { getVelocity() },
write = { _, value -> setVelocity(value) }
)
val visibleNeighbors by property(
MetaConverter.stringList,
read = {
listVisible().toList()
}
)
// val listVisible by action(MetaConverter.unit, MetaConverter.valueList<String> { it.string }) {
// listVisible().toList()
// }
}
}
class CollectiveDeviceConstructor(
context: Context,
val configuration: CollectiveDeviceConfiguration,
position: MutableDeviceState<Gmc>,
velocity: MutableDeviceState<GmcVelocity>,
override val peerConnection: PeerConnection,
private val observation: suspend () -> Map<CollectiveDeviceId, GmcCurve>,
) : DeviceConstructor(context, configuration.meta), CollectiveDevice {
override val id: CollectiveDeviceId get() = configuration.deviceId
val position = registerAsProperty(
CollectiveDevice.position,
position.debounce(configuration.reportInterval.milliseconds)
)
val velocity = registerAsProperty(
CollectiveDevice.velocity,
velocity.debounce(configuration.reportInterval.milliseconds)
)
private val _visibleNeighbors: MutableDeviceState<Collection<CollectiveDeviceId>> = stateOf(emptyList())
val visibleNeighbors = registerAsProperty(
CollectiveDevice.visibleNeighbors,
_visibleNeighbors.map { it.toList() }
)
init {
position.onNext {
_visibleNeighbors.value = observation.invoke().keys
}
}
override suspend fun getPosition(): Gmc = position.value
override suspend fun getVelocity(): GmcVelocity = velocity.value
override suspend fun setVelocity(value: GmcVelocity) {
velocity.value = value
}
override suspend fun listVisible(): Collection<CollectiveDeviceId> = observation.invoke().keys
}

View File

@ -0,0 +1,36 @@
package space.kscience.controls.demo.collective
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.sample
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.MutableDeviceState
import kotlin.time.Duration
@OptIn(FlowPreview::class)
class DebounceDeviceState<T>(
val origin: DeviceState<T>,
val interval: Duration,
) : DeviceState<T> {
override val value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.debounce(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> DeviceState<T>.debounce(interval: Duration) = DebounceDeviceState(this, interval)
@OptIn(FlowPreview::class)
class MutableDebounceDeviceState<T>(
val origin: MutableDeviceState<T>,
val interval: Duration,
) : MutableDeviceState<T> {
override var value: T by origin::value
override val valueFlow: Flow<T> get() = origin.valueFlow.sample(interval)
override fun toString(): String = "DebounceDeviceState($value, interval=$interval)"
}
fun <T> MutableDeviceState<T>.debounce(interval: Duration) = MutableDebounceDeviceState(this, interval)

View File

@ -0,0 +1,255 @@
package space.kscience.controls.demo.collective
import kotlinx.coroutines.*
import kotlinx.io.writeString
import kotlinx.serialization.json.Json
import space.kscience.controls.api.DeviceMessage
import space.kscience.controls.api.PropertySetMessage
import space.kscience.controls.client.DeviceClient
import space.kscience.controls.client.launchMagixService
import space.kscience.controls.client.write
import space.kscience.controls.constructor.DeviceState
import space.kscience.controls.constructor.ModelConstructor
import space.kscience.controls.constructor.MutableDeviceState
import space.kscience.controls.constructor.onTimer
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.manager.respondMessage
import space.kscience.controls.peer.PeerConnection
import space.kscience.controls.spec.name
import space.kscience.controls.spec.write
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.request
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.toByteArray
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.parseAsName
import space.kscience.kmath.geometry.degrees
import space.kscience.kmath.geometry.radians
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.startMagixServer
import space.kscience.maps.coordinates.*
import kotlin.math.PI
import kotlin.random.Random
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
private val deviceVelocity = 0.1.kilometers
private val center = Gmc.ofDegrees(55.925, 37.514)
private val radius = 0.01.degrees
private val json = Json {
ignoreUnknownKeys = true
prettyPrint = true
}
internal data class CollectiveDeviceState(
val id: CollectiveDeviceId,
val configuration: CollectiveDeviceConfiguration,
val position: MutableDeviceState<Gmc>,
val velocity: MutableDeviceState<GmcVelocity>,
)
internal fun CollectiveDeviceState(
id: CollectiveDeviceId,
position: Gmc,
configuration: CollectiveDeviceConfiguration.() -> Unit = {},
) = CollectiveDeviceState(
id,
CollectiveDeviceConfiguration(id).apply(configuration),
MutableDeviceState(position),
MutableDeviceState(GmcVelocity.zero)
)
internal class DeviceCollectiveModel(
context: Context,
val deviceStates: Collection<CollectiveDeviceState>,
val visibilityRange: Distance = 0.5.kilometers,
val radioRange: Distance = 1.kilometers,
) : ModelConstructor(context) {
/**
* Propagate movement
*/
private val movement = onTimer { prev, next ->
val delta = (next - prev)
deviceStates.forEach { state ->
state.position.value = state.position.value.moveWith(state.velocity.value, delta)
}
}
private fun locateVisible(id: CollectiveDeviceId): Map<CollectiveDeviceId, GmcCurve> {
val coordinatesSnapshot = deviceStates.associate { it.id to it.position.value }
val selected = coordinatesSnapshot[id] ?: error("Can't find device with id $id")
val allCurves = coordinatesSnapshot
.filterKeys { it != id }
.mapValues { GeoEllipsoid.WGS84.curveBetween(selected, it.value) }
return allCurves.filterValues { it.distance in 0.kilometers..visibilityRange }
}
inner class RadioPeerConnectionModel(private val position: DeviceState<Gmc>) : PeerConnection {
override suspend fun receive(address: String, contentId: String, requestMeta: Meta): Envelope? = null
override suspend fun send(address: String, envelope: Envelope, requestMeta: Meta) {
devices.values.filter { it.configuration.radioFrequency == address }.filter {
GeoEllipsoid.WGS84.curveBetween(position.value, it.position.value).distance < radioRange
}.forEach { target ->
check(envelope.data != null) { "Envelope data is empty" }
val message = json.decodeFromString(
DeviceMessage.serializer(),
envelope.data?.toByteArray()?.decodeToString() ?: ""
)
target.respondMessage(target.configuration.deviceId.parseAsName(), message)
}
}
}
val devices = deviceStates.associate { state ->
val device = CollectiveDeviceConstructor(
context = context,
configuration = state.configuration,
position = state.position,
velocity = state.velocity,
peerConnection = RadioPeerConnectionModel(state.position),
) {
locateVisible(state.id)
}
state.id to device
}
internal fun createTrawler(position: Gmc, id: CollectiveDeviceId = "trawler"): CollectiveDeviceConstructor {
val state = CollectiveDeviceState(
id = id,
configuration = CollectiveDeviceConfiguration(id),
position = MutableDeviceState(position),
velocity = MutableDeviceState(GmcVelocity.zero)
)
val result = CollectiveDeviceConstructor(
context = context,
configuration = state.configuration,
position = state.position,
velocity = state.velocity,
peerConnection = RadioPeerConnectionModel(state.position),
) {
locateVisible(state.id)
}
// TODO move to CollectiveDeviceState
onTimer { prev, next ->
val delta = (next - prev)
state.position.value = state.position.value.moveWith(state.velocity.value, delta)
}
result.onTimer(1.seconds) { _, _ ->
val envelope = Envelope {
data {
writeString(
json.encodeToString(
DeviceMessage.serializer(),
PropertySetMessage(
property = CollectiveDevice.velocity.name,
value = gmcVelocityMetaConverter.convert(state.velocity.value),
targetDevice = null
)
)
)
}
}
result.peerConnection.send(
CollectiveDeviceConfiguration.DEFAULT_FREQUENCY,
envelope
)
}
return result
}
val roster = deviceStates.associate { it.id to it.configuration }
}
internal fun CoroutineScope.launchCollectiveMagixServer(
collectiveModel: DeviceCollectiveModel,
): Job = launch(Dispatchers.IO) {
val server = startMagixServer(
// RSocketMagixFlowPlugin()
)
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
collectiveModel.devices.forEach { (id, device) ->
val deviceContext = collectiveModel.context.buildContext(id.parseAsName()) {
coroutineContext(coroutineContext)
plugin(DeviceManager)
}
deviceContext.install(id, device)
// val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
deviceContext.request(DeviceManager).launchMagixService(deviceEndpoint, id)
}
}
internal fun generateModel(
context: Context,
size: Int = 50,
reportInterval: Duration = 500.milliseconds,
additionalConfiguration: CollectiveDeviceConfiguration.() -> Unit = {},
): DeviceCollectiveModel {
val devices: List<CollectiveDeviceState> = List(size) { index ->
val id = "device[$index]"
CollectiveDeviceState(
id = id,
Gmc(
center.latitude + radius * Random.nextDouble(),
center.longitude + radius * Random.nextDouble()
)
) {
deviceId = id
description = "Virtual remote device $id"
this.reportInterval = reportInterval.inWholeMilliseconds.toInt()
additionalConfiguration()
}
}
val model = DeviceCollectiveModel(context, devices)
return model
}
fun DeviceClient.moveInCircles(scope: CoroutineScope = this): Job = scope.launch {
var bearing = Random.nextDouble(-PI, PI).radians
write(CollectiveDevice.velocity, GmcVelocity(bearing, deviceVelocity))
while (isActive) {
delay(500)
bearing += 5.degrees
write(CollectiveDevice.velocity, GmcVelocity(bearing, deviceVelocity))
}
}
internal fun CollectiveDeviceConstructor.moveTo(
targetPosition: Gmc,
speedLimit: Distance = deviceVelocity,
scope: CoroutineScope = this,
): Job = scope.launch {
do {
val curve = GeoEllipsoid.WGS84.curveBetween(position.value, targetPosition)
write(CollectiveDevice.velocity, GmcVelocity(curve.forward.bearing, speedLimit))
delay(1.seconds)
} while (curve.distance > 0.1.kilometers)
write(CollectiveDevice.velocity, GmcVelocity.zero)
}

View File

@ -0,0 +1,24 @@
package space.kscience.controls.demo.collective
import kotlinx.serialization.Serializable
import space.kscience.kmath.geometry.Angle
import space.kscience.maps.coordinates.*
import kotlin.time.Duration
import kotlin.time.DurationUnit
@Serializable
data class GmcVelocity(val bearing: Angle, val velocity: Distance, val elevation: Distance = 0.kilometers){
companion object{
val zero = GmcVelocity(Angle.zero, 0.kilometers)
}
}
fun Gmc.moveWith(velocity: GmcVelocity, duration: Duration): Gmc {
val seconds = duration.toDouble(DurationUnit.SECONDS)
return GeoEllipsoid.WGS84.curveInDirection(
GmcPose(this, velocity.bearing),
velocity.velocity * seconds,
).backward.coordinates
}

View File

@ -0,0 +1,303 @@
@file:OptIn(ExperimentalFoundationApi::class, ExperimentalSplitPaneApi::class)
package space.kscience.controls.demo.collective
import androidx.compose.foundation.*
import androidx.compose.foundation.layout.Column
import androidx.compose.foundation.layout.Row
import androidx.compose.foundation.layout.fillMaxWidth
import androidx.compose.foundation.layout.padding
import androidx.compose.material.*
import androidx.compose.material3.CircularProgressIndicator
import androidx.compose.material3.MaterialTheme
import androidx.compose.runtime.*
import androidx.compose.ui.Alignment
import androidx.compose.ui.Modifier
import androidx.compose.ui.graphics.Color
import androidx.compose.ui.input.pointer.isSecondaryPressed
import androidx.compose.ui.res.painterResource
import androidx.compose.ui.text.font.FontWeight
import androidx.compose.ui.unit.dp
import androidx.compose.ui.unit.sp
import androidx.compose.ui.window.Window
import androidx.compose.ui.window.application
import io.ktor.client.HttpClient
import io.ktor.client.engine.cio.CIO
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import org.jetbrains.compose.splitpane.ExperimentalSplitPaneApi
import org.jetbrains.compose.splitpane.HorizontalSplitPane
import org.jetbrains.compose.splitpane.rememberSplitPaneState
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.*
import space.kscience.controls.compose.conditional
import space.kscience.controls.manager.DeviceManager
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.meta.MetaConverter
import space.kscience.dataforge.names.parseAsName
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.maps.compose.MapView
import space.kscience.maps.compose.OpenStreetMapTileProvider
import space.kscience.maps.coordinates.Gmc
import space.kscience.maps.coordinates.meters
import space.kscience.maps.features.*
import java.nio.file.Path
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
@Composable
fun rememberContext(name: String, contextBuilder: ContextBuilder.() -> Unit = {}): Context = remember {
Context(name, contextBuilder)
}
internal val gmcMetaConverter = MetaConverter.serializable<Gmc>()
internal val gmcVelocityMetaConverter = MetaConverter.serializable<GmcVelocity>()
@Composable
fun App() {
val scope = rememberCoroutineScope()
val parentContext = rememberContext("Parent") {
plugin(DeviceManager)
}
val collectiveModel = remember {
generateModel(parentContext, 100, reportInterval = 1.seconds)
}
val roster = remember {
collectiveModel.roster
}
val client = remember { CompletableDeferred<MagixEndpoint>() }
val devices = remember { mutableStateMapOf<CollectiveDeviceId, DeviceClient>() }
LaunchedEffect(collectiveModel) {
launchCollectiveMagixServer(collectiveModel)
withContext(Dispatchers.IO) {
val magixClient = MagixEndpoint.rSocketWithWebSockets("localhost")
client.complete(magixClient)
collectiveModel.roster.forEach { (id, config) ->
scope.launch {
val deviceClient = magixClient.remoteDevice(parentContext, "listener", id, id.parseAsName())
devices[id] = deviceClient
}
}
}
}
var selectedDeviceId by remember { mutableStateOf<CollectiveDeviceId?>(null) }
var currentPosition by remember { mutableStateOf<Gmc?>(null) }
LaunchedEffect(selectedDeviceId, devices) {
selectedDeviceId?.let { devices[it] }?.propertyFlow(CollectiveDevice.position)?.collect {
currentPosition = it
}
}
var showOnlyVisible by remember { mutableStateOf(false) }
var movementProgram: Job? by remember { mutableStateOf(null) }
val trawler: CollectiveDeviceConstructor = remember {
collectiveModel.createTrawler(Gmc.ofDegrees(55.925, 37.50))
}
HorizontalSplitPane(
splitPaneState = rememberSplitPaneState(0.9f)
) {
first(400.dp) {
var clickPoint by remember { mutableStateOf<Gmc?>(null) }
CursorDropdownMenu(clickPoint != null, { clickPoint = null }) {
clickPoint?.let { point ->
TextButton({
trawler.moveTo(point)
clickPoint = null
}) {
Text("Move trawler here")
}
}
}
MapView(
mapTileProvider = remember {
OpenStreetMapTileProvider(
client = HttpClient(CIO),
cacheDirectory = Path.of("mapCache")
)
},
config = ViewConfig(
onClick = { event, point ->
if (event.buttons.isSecondaryPressed) {
clickPoint = point.focus
}
}
)
) {
//draw real positions
collectiveModel.deviceStates.forEach { device ->
circle(device.position.value, id = device.id + ".position").color(Color.Red)
device.position.valueFlow.sample(50.milliseconds).onEach {
val activeDevice = selectedDeviceId?.let { devices[it] }
val color = if (selectedDeviceId == device.id) {
Color.Magenta
} else if (
showOnlyVisible &&
activeDevice != null &&
device.id in activeDevice.request(CollectiveDevice.visibleNeighbors)
) {
Color.Cyan
} else {
Color.Red
}
circle(
device.position.value,
id = device.id + ".position",
size = if (selectedDeviceId == device.id) 6.dp else 3.dp
)
.color(color)
.modifyAttribute(ZAttribute, 10f)
.modifyAttribute(AlphaAttribute, if (selectedDeviceId == device.id) 1f else 0.5f)
.modifyAttribute(AlphaAttribute, 0.5f) // does not work right now
}.launchIn(scope)
}
//draw received data
scope.launch {
client.await().subscribe(DeviceManager.magixFormat).onEach { (magixMessage, deviceMessage) ->
if (deviceMessage is PropertyChangedMessage && deviceMessage.property == "position") {
val id = magixMessage.sourceEndpoint
val position = gmcMetaConverter.read(deviceMessage.value)
rectangle(
position,
id = id,
).color(Color.Blue).onClick { selectedDeviceId = id }
}
}.launchIn(scope)
}
// draw trawler
trawler.position.valueFlow.onEach {
circle(it, id = "trawler").color(Color.Black)
}.launchIn(scope)
}
}
second(200.dp) {
Column(
modifier = Modifier.verticalScroll(rememberScrollState())
) {
Button(
onClick = {
if (movementProgram == null) {
//start movement program
movementProgram = parentContext.launch {
devices.values.forEach { device ->
device.moveInCircles(this)
}
}
} else {
movementProgram?.cancel()
parentContext.launch {
devices.values.forEach { device ->
device.write(CollectiveDevice.velocity, GmcVelocity.zero)
}
}
movementProgram = null
}
},
modifier = Modifier.fillMaxWidth()
) {
if (movementProgram == null) {
Text("Move")
} else {
Text("Stop")
}
}
collectiveModel.roster.forEach { (id, _) ->
Card(
elevation = 16.dp,
modifier = Modifier.padding(8.dp).onClick {
selectedDeviceId = id
}.conditional(id == selectedDeviceId) {
border(2.dp, Color.Blue)
},
) {
Column(
modifier = Modifier.padding(8.dp)
) {
Row(verticalAlignment = Alignment.CenterVertically) {
if (devices[id] == null) {
CircularProgressIndicator()
}
Text(
text = id,
fontSize = 16.sp,
fontWeight = FontWeight.Bold,
modifier = Modifier.padding(10.dp).fillMaxWidth(),
)
}
if (id == selectedDeviceId) {
roster[id]?.let {
Text("Meta:", color = Color.Blue, fontWeight = FontWeight.Bold)
Card(elevation = 16.dp, modifier = Modifier.fillMaxWidth().padding(8.dp)) {
Text(it.toString())
}
}
currentPosition?.let { currentPosition ->
Text(
"Широта: ${String.format("%.3f", currentPosition.latitude.toDegrees().value)}"
)
Text(
"Долгота: ${String.format("%.3f", currentPosition.longitude.toDegrees().value)}"
)
currentPosition.elevation?.let {
Text("Высота: ${String.format("%.1f", it.meters)} м")
}
}
Row(
verticalAlignment = Alignment.CenterVertically,
modifier = Modifier.fillMaxWidth()
) {
Text("Показать только видимые")
Checkbox(showOnlyVisible, { showOnlyVisible = it })
}
}
}
}
}
}
}
}
}
fun main() = application {
// System.setProperty(IO_PARALLELISM_PROPERTY_NAME, 300.toString())
Window(onCloseRequest = ::exitApplication, title = "Maps-kt demo", icon = painterResource("SPC-logo.png")) {
MaterialTheme {
App()
}
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.0 KiB

View File

@ -1,8 +0,0 @@
package space.kscience.controls.demo.map
import androidx.compose.ui.window.application
fun main() = application {
}

View File

@ -83,8 +83,7 @@ suspend fun main() {
val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketStreamWithWebSockets("localhost")
deviceManager.launchMagixService(deviceEndpoint, endpointId, Dispatchers.IO)
deviceManager.launchMagixService(deviceEndpoint, endpointId)
}
val trace = Bar {

View File

@ -1,6 +1,6 @@
[versions]
dataforge = "0.8.0"
dataforge = "0.9.0"
rsocket = "0.15.4"
xodus = "2.0.1"
@ -10,7 +10,7 @@ fazecast = "2.10.3"
tornadofx = "1.7.20"
plotlykt = "0.7.0"
plotlykt = "0.7.2"
logback = "1.2.11"
@ -29,7 +29,7 @@ pi4j-ktx = "2.4.0"
plc4j = "0.12.0"
visionforge = "0.4.1"
visionforge = "0.4.2"
versions = "0.51.0"

View File

@ -6,7 +6,7 @@ A kotlin API for magix standard and some zero-dependency magix services
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-api:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-api:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-api:0.4.0-dev-1")
implementation("space.kscience:magix-api:0.4.0-dev-4")
}
```

View File

@ -21,9 +21,10 @@ public fun interface MagixFlowPlugin {
sendMessage: suspend (MagixMessage) -> Unit,
): Job
/**
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
*/
public fun start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
start(scope, magixFlow) { magixFlow.emit(it) }
}
/**
* Use the same [MutableSharedFlow] to send and receive messages. Could be a bottleneck in case of many plugins.
*/
public fun MagixFlowPlugin.start(scope: CoroutineScope, magixFlow: MutableSharedFlow<MagixMessage>): Job =
start(scope, magixFlow) { magixFlow.emit(it) }

View File

@ -6,7 +6,7 @@ Java API to work with magix endpoints without Kotlin
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-java-endpoint:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-java-endpoint:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-java-endpoint:0.4.0-dev-1")
implementation("space.kscience:magix-java-endpoint:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ MQTT client magix endpoint
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-mqtt:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-mqtt:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-mqtt:0.4.0-dev-1")
implementation("space.kscience:magix-mqtt:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ RabbitMQ client magix endpoint
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-rabbit:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-rabbit:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-rabbit:0.4.0-dev-1")
implementation("space.kscience:magix-rabbit:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ Magix endpoint (client) based on RSocket
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-rsocket:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-rsocket:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-rsocket:0.4.0-dev-1")
implementation("space.kscience:magix-rsocket:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ A magix event loop implementation in Kotlin. Includes HTTP/SSE and RSocket route
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-server:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-server:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-server:0.4.0-dev-1")
implementation("space.kscience:magix-server:0.4.0-dev-4")
}
```

View File

@ -10,7 +10,9 @@ import io.ktor.server.util.getValue
import io.ktor.server.websocket.WebSockets
import io.rsocket.kotlin.ktor.server.RSocketSupport
import io.rsocket.kotlin.ktor.server.rSocket
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.html.*
import kotlinx.serialization.encodeToString
@ -42,7 +44,11 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter {
/**
* Attach magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow]
*/
public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, route: String = "/") {
public fun Application.magixModule(
magixFlow: Flow<MagixMessage>,
send: suspend (MagixMessage) -> Unit,
route: String = "/",
) {
if (pluginOrNull(WebSockets) == null) {
install(WebSockets)
}
@ -62,27 +68,31 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
routing {
route(route) {
install(ContentNegotiation){
install(ContentNegotiation) {
json()
}
get("state") {
call.respondHtml {
head {
meta {
httpEquiv = "refresh"
content = "2"
if (magixFlow is SharedFlow) {
get("state") {
call.respondHtml {
head {
meta {
httpEquiv = "refresh"
content = "2"
}
}
}
body {
h1 { +"Magix loop statistics" }
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
h3 { +"Replay cache:" }
ol {
magixFlow.replayCache.forEach { message ->
li {
code {
+magixJson.encodeToString(message)
body {
h1 { +"Magix loop statistics" }
if (magixFlow is MutableSharedFlow) {
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
}
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
h3 { +"Replay cache:" }
ol {
magixFlow.replayCache.forEach { message ->
li {
code {
+magixJson.encodeToString(message)
}
}
}
}
@ -102,17 +112,22 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
}
post("broadcast") {
val message = call.receive<MagixMessage>()
magixFlow.emit(message)
send(message)
}
//rSocket WS server. Filter from Payload
rSocket(
"rsocket",
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { magixFlow.emit(it) }
acceptor = RSocketMagixFlowPlugin.acceptor(application, magixFlow) { send(it) }
)
}
}
}
public fun Application.magixModule(
magixFlow: MutableSharedFlow<MagixMessage>,
route: String = "/",
): Unit = magixModule(magixFlow, { magixFlow.emit(it) }, route)
/**
* Create a new loop [MutableSharedFlow] with given [buffer] and setup magix module based on it
*/

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import space.kscience.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import space.kscience.magix.api.MagixFlowPlugin
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.start
/**
@ -22,7 +23,6 @@ public fun CoroutineScope.startMagixServer(
val magixFlow = MutableSharedFlow<MagixMessage>(
replay = buffer,
extraBufferCapacity = buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)

View File

@ -6,7 +6,7 @@ Magix history database API
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-storage:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-storage:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-storage:0.4.0-dev-1")
implementation("space.kscience:magix-storage:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-storage-xodus:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-storage-xodus:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-storage-xodus:0.4.0-dev-1")
implementation("space.kscience:magix-storage-xodus:0.4.0-dev-4")
}
```

View File

@ -6,7 +6,7 @@ ZMQ client endpoint for Magix
## Artifact:
The Maven coordinates of this project are `space.kscience:magix-zmq:0.4.0-dev-1`.
The Maven coordinates of this project are `space.kscience:magix-zmq:0.4.0-dev-4`.
**Gradle Kotlin DSL:**
```kotlin
@ -16,6 +16,6 @@ repositories {
}
dependencies {
implementation("space.kscience:magix-zmq:0.4.0-dev-1")
implementation("space.kscience:magix-zmq:0.4.0-dev-4")
}
```

View File

@ -87,5 +87,5 @@ include(
":demo:echo",
":demo:mks-pdr900",
":demo:constructor",
":demo:devices-on-map"
":demo:device-collective"
)