diff --git a/build.gradle.kts b/build.gradle.kts index 7327460..172bdbc 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { id("space.kscience.gradle.project") } -val dataforgeVersion: String by extra("0.6.1-dev-4") +val dataforgeVersion: String by extra("0.6.1") val ktorVersion: String by extra(space.kscience.gradle.KScienceVersions.ktorVersion) val rsocketVersion by extra("0.15.4") val xodusVersion by extra("2.0.1") diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/DeviceManager.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/DeviceManager.kt index c1a943a..871f236 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/DeviceManager.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/manager/DeviceManager.kt @@ -11,7 +11,6 @@ import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.NameToken import kotlin.collections.set import kotlin.properties.ReadOnlyProperty -import kotlin.reflect.KClass /** * DataForge Context plugin that allows to manage devices locally @@ -33,7 +32,6 @@ public class DeviceManager : AbstractPlugin(), DeviceHub { public companion object : PluginFactory { override val tag: PluginTag = PluginTag("devices", group = PluginTag.DATAFORGE_GROUP) - override val type: KClass = DeviceManager::class override fun build(context: Context, meta: Meta): DeviceManager = DeviceManager() } diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Ports.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Ports.kt index 6d5c6d9..3d01e62 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Ports.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Ports.kt @@ -3,7 +3,6 @@ package space.kscience.controls.ports import space.kscience.dataforge.context.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.string -import kotlin.reflect.KClass /** * A DataForge plugin for managing ports @@ -32,8 +31,6 @@ public class Ports : AbstractPlugin() { override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP) - override val type: KClass = Ports::class - override fun build(context: Context, meta: Meta): Ports = Ports() } diff --git a/controls-core/src/commonTest/kotlin/space/kscience/controls/api/MessageTest.kt b/controls-core/src/commonTest/kotlin/space/kscience/controls/api/MessageTest.kt new file mode 100644 index 0000000..719738a --- /dev/null +++ b/controls-core/src/commonTest/kotlin/space/kscience/controls/api/MessageTest.kt @@ -0,0 +1,18 @@ +package space.kscience.controls.api + +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import space.kscience.controls.spec.asMeta +import kotlin.test.Test +import kotlin.test.assertEquals + +class MessageTest { + @Test + fun messageSerialization() { + val changedMessage = PropertyChangedMessage("test", 22.0.asMeta()) + val json = Json.encodeToString(changedMessage) + val reconstructed: PropertyChangedMessage = Json.decodeFromString(json) + assertEquals(changedMessage.time, reconstructed.time) + } +} \ No newline at end of file diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt index 23174af..592d0c3 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/TcpPortPlugin.kt @@ -6,7 +6,6 @@ import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name -import kotlin.reflect.KClass public class TcpPortPlugin : AbstractPlugin() { @@ -21,8 +20,6 @@ public class TcpPortPlugin : AbstractPlugin() { override val tag: PluginTag = PluginTag("controls.ports.tcp", group = PluginTag.DATAFORGE_GROUP) - override val type: KClass = TcpPortPlugin::class - override fun build(context: Context, meta: Meta): TcpPortPlugin = TcpPortPlugin() } diff --git a/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt b/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt index 1256455..079f457 100644 --- a/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt +++ b/controls-ktor-tcp/src/main/kotlin/space/kscience/controls/ports/KtorTcpPortPlugin.kt @@ -6,7 +6,6 @@ import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name -import kotlin.reflect.KClass public class KtorTcpPortPlugin : AbstractPlugin() { @@ -21,8 +20,6 @@ public class KtorTcpPortPlugin : AbstractPlugin() { override val tag: PluginTag = PluginTag("controls.ports.serial", group = PluginTag.DATAFORGE_GROUP) - override val type: KClass = KtorTcpPortPlugin::class - override fun build(context: Context, meta: Meta): KtorTcpPortPlugin = KtorTcpPortPlugin() } diff --git a/controls-serial/src/main/kotlin/space/kscience/controls/serial/SerialPortPlugin.kt b/controls-serial/src/main/kotlin/space/kscience/controls/serial/SerialPortPlugin.kt index 871d266..6b573ae 100644 --- a/controls-serial/src/main/kotlin/space/kscience/controls/serial/SerialPortPlugin.kt +++ b/controls-serial/src/main/kotlin/space/kscience/controls/serial/SerialPortPlugin.kt @@ -7,7 +7,6 @@ import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name -import kotlin.reflect.KClass public class SerialPortPlugin : AbstractPlugin() { @@ -22,8 +21,6 @@ public class SerialPortPlugin : AbstractPlugin() { override val tag: PluginTag = PluginTag("controls.ports.serial", group = PluginTag.DATAFORGE_GROUP) - override val type: KClass = SerialPortPlugin::class - override fun build(context: Context, meta: Meta): SerialPortPlugin = SerialPortPlugin() } diff --git a/demo/many-devices/build.gradle.kts b/demo/many-devices/build.gradle.kts index d360ffa..8c765d6 100644 --- a/demo/many-devices/build.gradle.kts +++ b/demo/many-devices/build.gradle.kts @@ -16,6 +16,7 @@ dependencies { implementation(projects.magix.magixServer) implementation(projects.controlsMagixClient) implementation(projects.magix.magixRsocket) + implementation(projects.magix.magixZmq) implementation("io.ktor:ktor-client-cio:$ktorVersion") implementation("space.kscience:plotlykt-server:0.5.3") @@ -35,5 +36,5 @@ tasks.withType().configureEach application { - mainClass.set("space.kscience.controls.demo.DemoControllerViewKt") + mainClass.set("space.kscience.controls.demo.MassDeviceKt") } \ No newline at end of file diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt index 36e8253..6370d8d 100644 --- a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -21,14 +21,13 @@ import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.int import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.subscribe -import space.kscience.magix.rsocket.rSocketWithTcp -import space.kscience.magix.rsocket.rSocketWithWebSockets +import space.kscience.magix.rsocket.rSocketStreamWithTcp import space.kscience.magix.server.RSocketMagixFlowPlugin import space.kscience.magix.server.startMagixServer import space.kscience.plotly.Plotly +import space.kscience.plotly.bar import space.kscience.plotly.layout import space.kscience.plotly.plot -import space.kscience.plotly.scatter import space.kscience.plotly.server.PlotlyUpdateMode import space.kscience.plotly.server.serve import space.kscience.plotly.server.show @@ -49,7 +48,7 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec(MassDe val value by doubleProperty { randomValue } override suspend fun MassDevice.onOpen() { - doRecurring(2.milliseconds) { + doRecurring(100.milliseconds) { read(value) } } @@ -60,23 +59,25 @@ fun main() { val context = Context("Mass") context.startMagixServer( - RSocketMagixFlowPlugin() + RSocketMagixFlowPlugin(), +// ZmqMagixFlowPlugin() ) val numDevices = 100 - repeat(numDevices) { - val deviceContext = Context("Device${it}") { - plugin(DeviceManager) - } + context.launch(Dispatchers.IO) { + repeat(numDevices) { + val deviceContext = Context("Device${it}") { + plugin(DeviceManager) + } - val deviceManager = deviceContext.request(DeviceManager) + val deviceManager = deviceContext.request(DeviceManager) - deviceManager.install("device$it", MassDevice) + deviceManager.install("device$it", MassDevice) - deviceContext.launch(Dispatchers.Default) { - val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") - deviceManager.connectToMagix(deviceEndpoint, "device$it") + val endpointId = "device$it" + val deviceEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost") + deviceManager.connectToMagix(deviceEndpoint, endpointId) } } @@ -88,9 +89,9 @@ fun main() { layout { title = "Latest event" } - scatter { - launch(Dispatchers.Default) { - val monitorEndpoint = MagixEndpoint.rSocketWithTcp("localhost") + bar { + launch(Dispatchers.Default){ + val monitorEndpoint = MagixEndpoint.rSocketStreamWithTcp("localhost") val latest = ConcurrentHashMap() @@ -101,8 +102,9 @@ fun main() { while (isActive) { delay(200) val now = Clock.System.now() - x.strings = latest.keys - y.numbers = latest.values.map { now.minus(it).inWholeMilliseconds / 1000.0 } + val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap() + x.numbers = sorted.keys + y.numbers = sorted.values.map { now.minus(it).inWholeMilliseconds / 1000.0 } } } } diff --git a/docs/Device and DeviceSpec.md b/docs/Device and DeviceSpec.md index 37dbeda..65f6026 100644 --- a/docs/Device and DeviceSpec.md +++ b/docs/Device and DeviceSpec.md @@ -143,5 +143,5 @@ val res = device.read(DemoDevice.sin) ## Other ways to create a device -It is not obligatory to use `DeviceBySpec` to define a `Device`. One could directly implement the `Device` interface or use intermediate abstraction `DeviceBase`, which uses properties schema but allows to define it manually. +It is not obligatory to use `DeviceBySpec` to define a `Device`. One could directly implement the `Device` interface or use intermediate abstraction `DeviceBase`, which uses properties' schema but allows to define it manually. diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt index b6fd428..e4eae79 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketStreamMagixEndpoint.kt @@ -26,7 +26,7 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext /** - * RSocket endpoint based on an established channel. This way it works a bit faster than [RSocketMagixEndpoint] + * RSocket endpoint based on an established channel. This way it works a lot faster than [RSocketMagixEndpoint] * for sending and receiving, but less flexible in terms of filters. One general [streamFilter] could be set * in constructor and applied on the loop side. Filters in [subscribe] are applied on the endpoint side on top * of received data. @@ -78,6 +78,7 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets( host: String, port: Int = DEFAULT_MAGIX_HTTP_PORT, path: String = "/rsocket", + filter: MagixMessageFilter = MagixMessageFilter.ALL, rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, ): RSocketStreamMagixEndpoint { val client = HttpClient { @@ -89,10 +90,10 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets( val rSocket = client.rSocket(host, port, path) - //Ensure client is closed after rSocket if finished + //Ensure the client is closed after rSocket if finished rSocket.coroutineContext[Job]?.invokeOnCompletion { client.close() } - return RSocketStreamMagixEndpoint(rSocket, coroutineContext) + return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt index 211ce68..9dc0abd 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/space/kscience/magix/rsocket/withTcp.kt @@ -4,6 +4,7 @@ import io.ktor.network.sockets.SocketOptions import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixMessageFilter import kotlin.coroutines.coroutineContext @@ -30,6 +31,7 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp( host: String, port: Int = DEFAULT_MAGIX_RAW_PORT, + filter: MagixMessageFilter = MagixMessageFilter.ALL, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, ): RSocketStreamMagixEndpoint { @@ -40,5 +42,5 @@ public suspend fun MagixEndpoint.Companion.rSocketStreamWithTcp( ) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketStreamMagixEndpoint(rSocket, coroutineContext) + return RSocketStreamMagixEndpoint(rSocket, coroutineContext, filter) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/linuxX64Main/kotlin/rsocket/withTcp.kt b/magix/magix-rsocket/src/linuxX64Main/kotlin/rsocket/withTcp.kt index 117bcdd..efa8217 100644 --- a/magix/magix-rsocket/src/linuxX64Main/kotlin/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/linuxX64Main/kotlin/rsocket/withTcp.kt @@ -6,7 +6,6 @@ import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.rsocket.RSocketMagixEndpoint import space.kscience.magix.rsocket.buildConnector -import kotlin.coroutines.coroutineContext /** @@ -25,5 +24,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( ) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketMagixEndpoint(rSocket, coroutineContext) + return RSocketMagixEndpoint(rSocket) } diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt index 7d5a34e..3116513 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt @@ -49,7 +49,7 @@ public class RSocketMagixFlowPlugin(public val port: Int = DEFAULT_MAGIX_RAW_POR val message = MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText()) magixFlow.emit(message) } - // bidirectional connection, not covered by a standard + // bidirectional connection, used for streaming connection requestChannel { request: Payload, input: Flow -> input.onEach { magixFlow.emit(MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())) diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt index d8c8343..e878ce9 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt @@ -13,7 +13,6 @@ import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.filter import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.coroutineContext public class ZmqMagixEndpoint( private val host: String, @@ -71,7 +70,8 @@ public class ZmqMagixEndpoint( } } -public suspend fun MagixEndpoint.Companion.zmq( +public fun MagixEndpoint.Companion.zmq( + scope: CoroutineScope, host: String, protocol: String = "tcp", pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, @@ -81,5 +81,5 @@ public suspend fun MagixEndpoint.Companion.zmq( protocol, pubPort, pullPort, - coroutineContext = coroutineContext + coroutineContext = scope.coroutineContext ) \ No newline at end of file diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt index 1f075ac..e66d326 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt @@ -24,13 +24,13 @@ public class ZmqMagixFlowPlugin( val logger = LoggerFactory.getLogger("magix-server-zmq") ZContext().use { context -> - //launch publishing job + //launch the publishing job val pubSocket = context.createSocket(SocketType.PUB) pubSocket.bind("$localHost:$zmqPubSocketPort") magixFlow.onEach { message -> val string = MagixEndpoint.magixJson.encodeToString(message) pubSocket.send(string) - logger.debug("Published: $string") + logger.trace("Published: $string") }.launchIn(this) //launch pulling job @@ -41,7 +41,7 @@ public class ZmqMagixFlowPlugin( while (isActive) { val string: String? = pullSocket.recvStr() if (string != null) { - logger.debug("Received: $string") + logger.trace("Received: $string") val message = MagixEndpoint.magixJson.decodeFromString(string) magixFlow.emit(message) }