diff --git a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/base/DeviceBase.kt b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/base/DeviceBase.kt index 8eb07a0..8a3375d 100644 --- a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/base/DeviceBase.kt +++ b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/base/DeviceBase.kt @@ -18,6 +18,91 @@ import space.kscience.dataforge.misc.DFExperimental @DFExperimental public data class LogEntry(val content: String, val priority: Int = 0) + +@OptIn(ExperimentalCoroutinesApi::class) +private open class BasicReadOnlyDeviceProperty( + val device: DeviceBase, + override val name: String, + default: MetaItem?, + override val descriptor: PropertyDescriptor, + private val getter: suspend (before: MetaItem?) -> MetaItem, +) : ReadOnlyDeviceProperty { + + override val scope: CoroutineScope get() = device.scope + + private val state: MutableStateFlow = MutableStateFlow(default) + override val value: MetaItem? get() = state.value + + override suspend fun invalidate() { + state.value = null + } + + override fun updateLogical(item: MetaItem) { + state.value = item + scope.launch { + device.sharedPropertyFlow.emit(Pair(name, item)) + } + } + + override suspend fun read(force: Boolean): MetaItem { + //backup current value + val currentValue = value + return if (force || currentValue == null) { + //all device operations should be run on device context + //propagate error, but do not fail scope + val res = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { + getter(currentValue) + } + updateLogical(res) + res + } else { + currentValue + } + } + + override fun flow(): StateFlow = state +} + + +@OptIn(ExperimentalCoroutinesApi::class) +private class BasicDeviceProperty( + device: DeviceBase, + name: String, + default: MetaItem?, + descriptor: PropertyDescriptor, + getter: suspend (MetaItem?) -> MetaItem, + private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?, +) : BasicReadOnlyDeviceProperty(device, name, default, descriptor, getter), DeviceProperty { + + override var value: MetaItem? + get() = super.value + set(value) { + scope.launch { + if (value == null) { + invalidate() + } else { + write(value) + } + } + } + + private val writeLock = Mutex() + + override suspend fun write(item: MetaItem) { + writeLock.withLock { + //fast return if value is not changed + if (item == value) return@withLock + val oldValue = value + //all device operations should be run on device context + withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { + setter(oldValue, item)?.let { + updateLogical(it) + } + } + } + } +} + /** * Baseline implementation of [Device] interface */ @@ -33,7 +118,7 @@ public abstract class DeviceBase(override val context: Context) : Device { private val _actions = HashMap() public val actions: Map get() = _actions - private val sharedPropertyFlow = MutableSharedFlow>() + internal val sharedPropertyFlow = MutableSharedFlow>() override val propertyFlow: SharedFlow> get() = sharedPropertyFlow @@ -82,49 +167,6 @@ public abstract class DeviceBase(override val context: Context) : Device { override suspend fun execute(action: String, argument: MetaItem?): MetaItem? = (_actions[action] ?: error("Request with name $action not defined")).invoke(argument) - @OptIn(ExperimentalCoroutinesApi::class) - private open inner class BasicReadOnlyDeviceProperty( - override val name: String, - default: MetaItem?, - override val descriptor: PropertyDescriptor, - private val getter: suspend (before: MetaItem?) -> MetaItem, - ) : ReadOnlyDeviceProperty { - - override val scope: CoroutineScope get() = this@DeviceBase.scope - - private val state: MutableStateFlow = MutableStateFlow(default) - override val value: MetaItem? get() = state.value - - override suspend fun invalidate() { - state.value = null - } - - override fun updateLogical(item: MetaItem) { - state.value = item - scope.launch { - sharedPropertyFlow.emit(Pair(name, item)) - } - } - - override suspend fun read(force: Boolean): MetaItem { - //backup current value - val currentValue = value - return if (force || currentValue == null) { - //all device operations should be run on device context - //propagate error, but do not fail scope - val res = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { - getter(currentValue) - } - updateLogical(res) - res - } else { - currentValue - } - } - - override fun flow(): StateFlow = state - } - /** * Create a bound read-only property with given [getter] */ @@ -135,6 +177,7 @@ public abstract class DeviceBase(override val context: Context) : Device { getter: suspend (MetaItem?) -> MetaItem, ): ReadOnlyDeviceProperty { val property = BasicReadOnlyDeviceProperty( + this, name, default, PropertyDescriptor(name).apply(descriptorBuilder), @@ -144,43 +187,6 @@ public abstract class DeviceBase(override val context: Context) : Device { return property } - @OptIn(ExperimentalCoroutinesApi::class) - private inner class BasicDeviceProperty( - name: String, - default: MetaItem?, - descriptor: PropertyDescriptor, - getter: suspend (MetaItem?) -> MetaItem, - private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?, - ) : BasicReadOnlyDeviceProperty(name, default, descriptor, getter), DeviceProperty { - - override var value: MetaItem? - get() = super.value - set(value) { - scope.launch { - if (value == null) { - invalidate() - } else { - write(value) - } - } - } - - private val writeLock = Mutex() - - override suspend fun write(item: MetaItem) { - writeLock.withLock { - //fast return if value is not changed - if (item == value) return@withLock - val oldValue = value - //all device operations should be run on device context - withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) { - setter(oldValue, item)?.let { - updateLogical(it) - } - } - } - } - } /** * Create a bound mutable property with given [getter] and [setter] @@ -193,6 +199,7 @@ public abstract class DeviceBase(override val context: Context) : Device { setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?, ): DeviceProperty { val property = BasicDeviceProperty( + this, name, default, PropertyDescriptor(name).apply(descriptorBuilder), diff --git a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt index bae7d4a..9ffb735 100644 --- a/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt +++ b/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt @@ -40,18 +40,16 @@ public class DeviceManager(override val deviceName: String = "") : AbstractPlugi } } -public interface DeviceFactory : Factory +public interface DeviceSpec : Factory -public val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager) - -public fun DeviceManager.install(name: String, factory: DeviceFactory, meta: Meta = Meta.EMPTY): D { +public fun DeviceManager.install(name: String, factory: DeviceSpec, meta: Meta = Meta.EMPTY): D { val device = factory(meta, context) registerDevice(NameToken(name), device) return device } public fun DeviceManager.installing( - factory: DeviceFactory, + factory: DeviceSpec, metaBuilder: MetaBuilder.() -> Unit = {}, ): ReadOnlyProperty = ReadOnlyProperty { _, property -> val name = property.name diff --git a/controls-server/build.gradle.kts b/controls-server/build.gradle.kts index ea1837d..cad4073 100644 --- a/controls-server/build.gradle.kts +++ b/controls-server/build.gradle.kts @@ -3,12 +3,18 @@ plugins { `maven-publish` } -val dataforgeVersion: String by rootProject.extra -val ktorVersion: String = "1.5.3" +description = """ + A stand-alone device tree web server which also works as magix event dispatcher. + The server is used to work with stand-alone devices without intermediate control system. +""".trimIndent() -dependencies{ +val dataforgeVersion: String by rootProject.extra +val ktorVersion: String = "1.5.3" + +dependencies { implementation(project(":controls-core")) implementation(project(":controls-tcp")) + implementation(projects.magix.magixServer) implementation("io.ktor:ktor-server-cio:$ktorVersion") implementation("io.ktor:ktor-websockets:$ktorVersion") implementation("io.ktor:ktor-serialization:$ktorVersion") diff --git a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt index 522d81a..592c42f 100644 --- a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt +++ b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/conversions.kt @@ -4,13 +4,11 @@ import io.ktor.application.ApplicationCall import io.ktor.http.ContentType import io.ktor.http.cio.websocket.Frame import io.ktor.response.respondText -import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonObjectBuilder import kotlinx.serialization.json.buildJsonObject import ru.mipt.npm.controls.api.DeviceMessage -import ru.mipt.npm.controls.api.toMeta +import ru.mipt.npm.magix.api.MagixEndpoint import space.kscience.dataforge.io.* -import space.kscience.dataforge.meta.MetaSerializer internal fun Frame.toEnvelope(): Envelope { @@ -29,6 +27,7 @@ internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() - respondText(json.toString(), contentType = ContentType.Application.Json) } -public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { - respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json) -} \ No newline at end of file +public suspend fun ApplicationCall.respondMessage(message: DeviceMessage): Unit = respondText( + MagixEndpoint.magixJson.encodeToString(DeviceMessage.serializer(), message), + contentType = ContentType.Application.Json +) \ No newline at end of file diff --git a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt index e02b82c..01e8faf 100644 --- a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt +++ b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt @@ -1,4 +1,3 @@ - package ru.mipt.npm.controls.server @@ -20,9 +19,9 @@ import io.ktor.server.engine.embeddedServer import io.ktor.util.getValue import io.ktor.websocket.WebSockets import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.html.* import kotlinx.serialization.json.Json -import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.buildJsonArray import kotlinx.serialization.json.put import ru.mipt.npm.controls.api.DeviceMessage @@ -31,8 +30,11 @@ import ru.mipt.npm.controls.api.PropertySetMessage import ru.mipt.npm.controls.api.getOrNull import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.respondMessage +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.server.GenericMagixMessage +import ru.mipt.npm.magix.server.magixModule +import ru.mipt.npm.magix.server.rawMagixServerSocket import space.kscience.dataforge.meta.toJson -import space.kscience.dataforge.meta.toMeta import space.kscience.dataforge.meta.toMetaItem /** @@ -40,7 +42,7 @@ import space.kscience.dataforge.meta.toMetaItem */ public fun CoroutineScope.startDeviceServer( manager: DeviceManager, - port: Int = 8111, + port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT, host: String = "localhost", ): ApplicationEngine { @@ -54,7 +56,7 @@ public fun CoroutineScope.startDeviceServer( call.respond(HttpStatusCode.BadRequest, cause.message ?: "") } } - deviceModule(manager) + deviceManagerModule(manager) routing { get("/") { call.respondRedirect("/dashboard") @@ -70,22 +72,13 @@ public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) { public const val WEB_SERVER_TARGET: String = "@webServer" -public fun Application.deviceModule( +public fun Application.deviceManagerModule( manager: DeviceManager, deviceNames: Collection = manager.devices.keys.map { it.toString() }, route: String = "/", + rawSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_RAW_PORT, + buffer: Int = 100, ) { -// val controllers = deviceNames.associateWith { name -> -// val device = manager.devices[name.toName()] -// DeviceController(device, name, manager.context) -// } -// -// fun generateFlow(target: String?) = if (target == null) { -// controllers.values.asFlow().flatMapMerge { it.output() } -// } else { -// controllers[target]?.output() ?: error("The device with target $target not found") -// } - if (featureOrNull(WebSockets) == null) { install(WebSockets) } @@ -159,32 +152,11 @@ public fun Application.deviceModule( } } } -// //Check if application supports websockets and if it does add a push channel -// if (this.application.featureOrNull(WebSockets) != null) { -// webSocket("ws") { -// //subscribe on device -// val target: String? by call.request.queryParameters -// -// try { -// application.log.debug("Opened server socket for ${call.request.queryParameters}") -// -// manager.controller.envelopeOutput().collect { -// outgoing.send(it.toFrame()) -// } -// -// } catch (ex: Exception) { -// application.log.debug("Closed server socket for ${call.request.queryParameters}") -// } -// } -// } post("message") { val body = call.receiveText() - val json = Json.parseToJsonElement(body) as? JsonObject - ?: throw IllegalArgumentException("The body is not a json object") - val meta = json.toMeta() - val request = DeviceMessage.fromMeta(meta) + val request: DeviceMessage = MagixEndpoint.magixJson.decodeFromString(DeviceMessage.serializer(), body) val response = manager.respondMessage(request) call.respondMessage(response) @@ -226,4 +198,12 @@ public fun Application.deviceModule( } } } + + val magixFlow = MutableSharedFlow( + buffer, + extraBufferCapacity = buffer + ) + + rawMagixServerSocket(magixFlow, rawSocketPort) + magixModule(magixFlow) } \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 826b89c..cdb9b7f 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -13,9 +13,11 @@ repositories{ } dependencies{ - implementation(project(":controls-core")) - implementation(project(":controls-server")) - implementation(project(":controls-magix-client")) + implementation(projects.controlsCore) + //implementation(projects.controlsServer) + implementation(projects.magix.magixServer) + implementation(projects.controlsMagixClient) + implementation(projects.magix.magixRsocket) implementation("no.tornado:tornadofx:1.7.20") implementation("space.kscience:plotlykt-server:0.4.2") implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6") diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt index 0bb2c60..c1390d0 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt @@ -6,10 +6,14 @@ import javafx.scene.control.Slider import javafx.scene.layout.Priority import javafx.stage.Stage import kotlinx.coroutines.launch -import space.kscience.dataforge.context.ContextAware -import space.kscience.dataforge.context.Global -import space.kscience.dataforge.context.info -import space.kscience.dataforge.context.logger +import ru.mipt.npm.controls.api.DeviceMessage +import ru.mipt.npm.controls.client.launchDfMagix +import ru.mipt.npm.controls.controllers.DeviceManager +import ru.mipt.npm.controls.controllers.install +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.rsocket.rSocketWithTcp +import ru.mipt.npm.magix.server.startMagixServer +import space.kscience.dataforge.context.* import tornadofx.* import java.awt.Desktop import java.net.URI @@ -17,21 +21,32 @@ import java.net.URI class DemoController : Controller(), ContextAware { var device: DemoDevice? = null - var server: ApplicationEngine? = null - override val context = Global.buildContext("demoDevice") + var magixServer: ApplicationEngine? = null + var visualizer: ApplicationEngine? = null + + override val context = Context("demoDevice") { + plugin(DeviceManager) + } + + private val deviceManager = context.fetch(DeviceManager) fun init() { context.launch { - val demo = DemoDevice(context) - device = demo - server = startDemoDeviceServer(context, demo) + device = deviceManager.install("demo", DemoDevice) + //starting magix event loop + magixServer = startMagixServer() + //Launch device client and connect it to the server + deviceManager.launchDfMagix(MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())) + visualizer = startDemoDeviceServer() } } fun shutdown() { logger.info { "Shutting down..." } - server?.stop(1000, 5000) + visualizer?.stop(1000,5000) logger.info { "Visualization server stopped" } + magixServer?.stop(1000, 5000) + logger.info { "Magix server stopped" } device?.close() logger.info { "Device server stopped" } context.close() @@ -89,7 +104,7 @@ class DemoControllerView : View(title = " Demo controller remote") { button("Show plots") { useMaxWidth = true action { - controller.server?.run { + controller.magixServer?.run { val host = "localhost"//environment.connectors.first().host val port = environment.connectors.first().port val uri = URI("http", null, host, port, "/plots", null, null) diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt index 4e6622f..484551c 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt @@ -4,9 +4,9 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.asCoroutineDispatcher import ru.mipt.npm.controls.base.* +import ru.mipt.npm.controls.controllers.DeviceSpec import ru.mipt.npm.controls.controllers.double import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Factory import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.values.asValue import java.time.Instant @@ -30,7 +30,7 @@ class DemoDevice(context: Context) : DeviceBase(context) { val sinScale by writingVirtual(1.0.asValue()) var sinScaleValue by sinScale.double() - val sin by readingNumber { + val sin: TypedReadOnlyDeviceProperty by readingNumber { val time = Instant.now() sin(time.toEpochMilli().toDouble() / timeScaleValue) * sinScaleValue } @@ -67,7 +67,7 @@ class DemoDevice(context: Context) : DeviceBase(context) { executor.shutdown() } - companion object : Factory { + companion object : DeviceSpec { override fun invoke(meta: Meta, context: Context): DemoDevice = DemoDevice(context) } } \ No newline at end of file diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt index 908e2a1..52ffeab 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt @@ -1,16 +1,18 @@ package ru.mipt.npm.controls.demo +import io.ktor.server.cio.CIO import io.ktor.server.engine.ApplicationEngine +import io.ktor.server.engine.embeddedServer import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.html.div import kotlinx.html.link -import ru.mipt.npm.controls.controllers.devices -import ru.mipt.npm.controls.server.startDeviceServer -import ru.mipt.npm.controls.server.whenStarted -import space.kscience.dataforge.context.Context +import ru.mipt.npm.controls.api.DeviceMessage +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets +import space.kscience.dataforge.meta.MetaItem import space.kscience.dataforge.meta.double -import space.kscience.dataforge.names.NameToken import space.kscience.plotly.layout import space.kscience.plotly.models.Trace import space.kscience.plotly.plot @@ -49,77 +51,85 @@ suspend fun Trace.updateXYFrom(flow: Flow>>) { } -fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngine { - context.devices.registerDevice(NameToken("demo"), device) - val server = context.startDeviceServer(context.devices) - server.whenStarted { - plotlyModule("plots").apply { - updateMode = PlotlyUpdateMode.PUSH - updateInterval = 50 - }.page { container -> - val sinFlow = device.sin.flow() - val cosFlow = device.cos.flow() - val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> - sin.double!! to cos.double!! - } - link { - rel = "stylesheet" - href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css" - attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk" - attributes["crossorigin"] = "anonymous" - } - div("row") { - div("col-6") { - plot(renderer = container) { - layout { - title = "sin property" - xaxis.title = "point index" - yaxis.title = "sin" - } - trace { - context.launch { - val flow: Flow> = sinFlow.mapNotNull { it.double }.windowed(100) - updateFrom(Trace.Y_AXIS, flow) - } - } - } +suspend fun startDemoDeviceServer(magixHost: String = "localhost"): ApplicationEngine = embeddedServer(CIO, 8080) { + val sinFlow = MutableSharedFlow()// = device.sin.flow() + val cosFlow = MutableSharedFlow()// = device.cos.flow() + + launch { + val endpoint = MagixEndpoint.rSocketWithWebSockets(magixHost, DeviceMessage.serializer()) + endpoint.subscribe().collect { magix -> + (magix.payload as? PropertyChangedMessage)?.let { message -> + when (message.property) { + "sin" -> sinFlow.emit(message.value) + "cos" -> cosFlow.emit(message.value) } - div("col-6") { - plot(renderer = container) { - layout { - title = "cos property" - xaxis.title = "point index" - yaxis.title = "cos" - } - trace { - context.launch { - val flow: Flow> = cosFlow.mapNotNull { it.double }.windowed(100) - updateFrom(Trace.Y_AXIS, flow) - } + } + } + } + + plotlyModule("plots").apply { + updateMode = PlotlyUpdateMode.PUSH + updateInterval = 50 + }.page { container -> + val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos -> + sin.double!! to cos.double!! + } + link { + rel = "stylesheet" + href = "https://stackpath.bootstrapcdn.com/bootstrap/4.5.0/css/bootstrap.min.css" + attributes["integrity"] = "sha384-9aIt2nRpC12Uk9gS9baDl411NQApFmC26EwAOH8WgZl5MYYxFfc+NcPb1dKGj7Sk" + attributes["crossorigin"] = "anonymous" + } + div("row") { + div("col-6") { + plot(renderer = container) { + layout { + title = "sin property" + xaxis.title = "point index" + yaxis.title = "sin" + } + trace { + launch { + val flow: Flow> = sinFlow.mapNotNull { it.double }.windowed(100) + updateFrom(Trace.Y_AXIS, flow) } } } } - div("row") { - div("col-12") { - plot(renderer = container) { - layout { - title = "cos vs sin" - xaxis.title = "sin" - yaxis.title = "cos" + div("col-6") { + plot(renderer = container) { + layout { + title = "cos property" + xaxis.title = "point index" + yaxis.title = "cos" + } + trace { + launch { + val flow: Flow> = cosFlow.mapNotNull { it.double }.windowed(100) + updateFrom(Trace.Y_AXIS, flow) } - trace { - name = "non-synchronized" - context.launch { - val flow: Flow>> = sinCosFlow.windowed(30) - updateXYFrom(flow) - } + } + } + } + } + div("row") { + div("col-12") { + plot(renderer = container) { + layout { + title = "cos vs sin" + xaxis.title = "sin" + yaxis.title = "cos" + } + trace { + name = "non-synchronized" + launch { + val flow: Flow>> = sinCosFlow.windowed(30) + updateXYFrom(flow) } } } } } } - return server } diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt index b22a433..a266cd1 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt @@ -27,8 +27,16 @@ public interface MagixEndpoint { ) public companion object { - public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 + /** + * A default port for HTTP/WS connections + */ + public const val DEFAULT_MAGIX_HTTP_PORT: Int = 7777 + + /** + * A default port for raw TCP connections + */ public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 + public val magixJson: Json = Json { ignoreUnknownKeys = true encodeDefaults = false diff --git a/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java b/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java index 4188622..50d6f09 100644 --- a/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java +++ b/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java @@ -16,10 +16,23 @@ public interface MagixClient { Flow.Publisher> subscribe(); + /** + * Create a magix endpoint client using RSocket with raw tcp connection + * @param host host name of magix server event loop + * @param port port of magix server event loop + * @return the client + */ static MagixClient rSocketTcp(String host, int port) { return ControlsMagixClient.Companion.rSocketTcp(host, port, JsonElement.Companion.serializer()); } + /** + * + * @param host host name of magix server event loop + * @param port port of magix server event loop + * @param path + * @return + */ static MagixClient rSocketWs(String host, int port, String path) { return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path); } diff --git a/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt b/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt index 5bfbdd1..476a73d 100644 --- a/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt +++ b/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt @@ -6,11 +6,11 @@ import kotlinx.serialization.KSerializer import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter -import ru.mipt.npm.magix.rsocket.RSocketMagixEndpoint -import ru.mipt.npm.magix.rsocket.withTcp +import ru.mipt.npm.magix.rsocket.rSocketWithTcp +import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets import java.util.concurrent.Flow -public class ControlsMagixClient( +internal class ControlsMagixClient( private val endpoint: MagixEndpoint, private val filter: MagixMessageFilter, ) : MagixClient { @@ -21,27 +21,27 @@ public class ControlsMagixClient( override fun subscribe(): Flow.Publisher> = endpoint.subscribe(filter).asPublisher() - public companion object { + companion object { - public fun rSocketTcp( + fun rSocketTcp( host: String, port: Int, payloadSerializer: KSerializer ): ControlsMagixClient { val endpoint = runBlocking { - RSocketMagixEndpoint.withTcp(host, port, payloadSerializer) + MagixEndpoint.rSocketWithTcp(host, payloadSerializer, port) } return ControlsMagixClient(endpoint, MagixMessageFilter()) } - public fun rSocketWs( + fun rSocketWs( host: String, port: Int, payloadSerializer: KSerializer, path: String = "/rsocket" ): ControlsMagixClient { val endpoint = runBlocking { - RSocketMagixEndpoint.withWebSockets(host, port, payloadSerializer, path) + MagixEndpoint.rSocketWithWebSockets(host, payloadSerializer, port, path) } return ControlsMagixClient(endpoint, MagixMessageFilter()) } diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index 8589a6a..c6d0277 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -43,39 +43,39 @@ public class RSocketMagixEndpoint( } } - public companion object { + public companion object +} - internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) = - RSocketConnector { - reconnectable(10) - connectionConfig(rSocketConfig) - } - /** - * Build a websocket based endpoint connected to [host], [port] and given routing [path] - */ - public suspend fun withWebSockets( - host: String, - port: Int, - payloadSerializer: KSerializer, - path: String = "/rsocket", - rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, - ): RSocketMagixEndpoint { - val client = HttpClient { - install(WebSockets) - install(RSocketSupport) { - connector = buildConnector(rSocketConfig) - } - } +internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) = + RSocketConnector { + reconnectable(10) + connectionConfig(rSocketConfig) + } - val rSocket = client.rSocket(host, port, path) - - //Ensure client is closed after rSocket if finished - rSocket.job.invokeOnCompletion { - client.close() - } - - return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) +/** + * Build a websocket based endpoint connected to [host], [port] and given routing [path] + */ +public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( + host: String, + payloadSerializer: KSerializer, + port: Int = DEFAULT_MAGIX_HTTP_PORT, + path: String = "/rsocket", + rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, +): RSocketMagixEndpoint { + val client = HttpClient { + install(WebSockets) + install(RSocketSupport) { + connector = buildConnector(rSocketConfig) } } + + val rSocket = client.rSocket(host, port, path) + + //Ensure client is closed after rSocket if finished + rSocket.job.invokeOnCompletion { + client.close() + } + + return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt index cd8f6b3..1efeb13 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt @@ -7,16 +7,17 @@ import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.transport.ktor.clientTransport import kotlinx.coroutines.Dispatchers import kotlinx.serialization.KSerializer +import ru.mipt.npm.magix.api.MagixEndpoint import kotlin.coroutines.coroutineContext /** * Create a plain TCP based [RSocketMagixEndpoint] connected to [host] and [port] */ -public suspend fun RSocketMagixEndpoint.Companion.withTcp( +public suspend fun MagixEndpoint.Companion.rSocketWithTcp( host: String, - port: Int, payloadSerializer: KSerializer, + port: Int = DEFAULT_MAGIX_RAW_PORT, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, ): RSocketMagixEndpoint { diff --git a/magix/magix-server/build.gradle.kts b/magix/magix-server/build.gradle.kts index e9ffdb3..e4de48a 100644 --- a/magix/magix-server/build.gradle.kts +++ b/magix/magix-server/build.gradle.kts @@ -4,6 +4,10 @@ plugins { application } +description = """ + A magix event loop implementation in Kotlin. Includes HTTP/SSE and RSocket routes. +""".trimIndent() + kscience { useSerialization{ json() diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt index dbe2369..0481521 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt @@ -9,12 +9,28 @@ import io.rsocket.kotlin.core.RSocketServer import io.rsocket.kotlin.transport.ktor.serverTransport import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.flow.MutableSharedFlow +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT -import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT + +/** + * + */ +public fun CoroutineScope.rawMagixServerSocket( + magixFlow: MutableSharedFlow, + rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT +): Job { + val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort) + val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow)) + coroutineContext[Job]?.invokeOnCompletion{ + rSocketJob.cancel() + } + return rSocketJob; +} public fun CoroutineScope.startMagixServer( - port: Int = DEFAULT_MAGIX_WS_PORT, + port: Int = DEFAULT_MAGIX_HTTP_PORT, rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, buffer: Int = 100, ): ApplicationEngine { @@ -24,8 +40,7 @@ public fun CoroutineScope.startMagixServer( extraBufferCapacity = buffer ) - val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort) - RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow)) + rawMagixServerSocket(magixFlow, rawSocketPort) return embeddedServer(CIO, port = port) { magixModule(magixFlow) diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index 8a272df..068d3d1 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -3,9 +3,9 @@ package ru.mipt.npm.magix.zmq import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer -import kotlinx.serialization.encodeToString import org.zeromq.SocketType import org.zeromq.ZContext import org.zeromq.ZMQ @@ -13,6 +13,7 @@ import org.zeromq.ZMQException import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter +import ru.mipt.npm.magix.api.filter import kotlin.coroutines.CoroutineContext public class ZmqMagixEndpoint( @@ -28,7 +29,7 @@ public class ZmqMagixEndpoint( val socket = zmqContext.createSocket(SocketType.XSUB) socket.bind(address) - val topic = MagixEndpoint.magixJson.encodeToString(filter) + val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter) socket.subscribe(topic) return channelFlow { @@ -39,6 +40,7 @@ public class ZmqMagixEndpoint( } while (activeFlag) { try { + //This is a blocking call. val string = socket.recvStr() val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) send(message) @@ -51,7 +53,7 @@ public class ZmqMagixEndpoint( } } } - } + }.filter(filter).flowOn(Dispatchers.IO) //should be flown on IO because of blocking calls } private val publishSocket = zmqContext.createSocket(SocketType.XPUB).apply { diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt index 56466b1..828f0bc 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt @@ -4,14 +4,13 @@ package ru.mipt.npm.devices.pimotionmaster import kotlinx.coroutines.* import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.toList import kotlinx.coroutines.flow.transformWhile import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import ru.mipt.npm.controls.api.DeviceHub import ru.mipt.npm.controls.api.PropertyDescriptor import ru.mipt.npm.controls.base.* -import ru.mipt.npm.controls.controllers.DeviceFactory +import ru.mipt.npm.controls.controllers.DeviceSpec import ru.mipt.npm.controls.controllers.duration import ru.mipt.npm.controls.ports.* import space.kscience.dataforge.context.* @@ -343,7 +342,7 @@ class PiMotionMasterDevice( } } - companion object : DeviceFactory { + companion object : DeviceSpec { override fun invoke(meta: Meta, context: Context): PiMotionMasterDevice = PiMotionMasterDevice(context) }