From 6e4bf51a6f9d78b71d03b869ba9433351c127c1a Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 20 Jun 2021 14:46:02 +0300 Subject: [PATCH] Move serializer to endpoint parameter --- build.gradle.kts | 17 ++--- .../mipt/npm/controls/client/magixClient.kt | 18 ++--- controls-magix-server/build.gradle.kts | 21 ------ demo/build.gradle.kts | 3 +- .../npm/controls/demo/DemoControllerView.kt | 2 +- .../ru/mipt/npm/controls/demo/DemoDevice.kt | 2 +- .../npm/controls/demo/demoDeviceServer.kt | 2 +- .../controls/demo/generateMessageSchema.kt | 2 +- .../hep/dataforge/magix/api/MagixEndpoint.kt | 44 ------------ .../hep/dataforge/magix/api/MagixProcessor.kt | 60 ----------------- .../ru/mipt/npm/magix/api/MagixEndpoint.kt | 67 +++++++++++++++++++ .../mipt/npm}/magix/api/MagixMessage.kt | 11 ++- .../mipt/npm}/magix/api/MagixMessageFilter.kt | 0 .../ru/mipt/npm/magix/api/converters.kt | 31 +++++++++ .../ru/mipt/npm/magix/client/MagixClient.java | 4 +- .../npm/magix/client/ControlsMagixClient.kt | 31 +++++---- .../mipt/npm}/magix/server/MagixServer.kt | 7 +- .../mipt/npm}/magix/server/magixModule.kt | 6 +- .../mipt/npm}/magix/server/sse.kt | 0 .../magix/service/RSocketMagixEndpoint.kt | 21 +++--- .../mipt/npm}/magix/service/withTcp.kt | 10 +-- settings.gradle.kts | 13 +--- 22 files changed, 172 insertions(+), 200 deletions(-) delete mode 100644 controls-magix-server/build.gradle.kts delete mode 100644 magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt delete mode 100644 magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt create mode 100644 magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt rename magix/magix-api/src/commonMain/kotlin/{hep/dataforge => ru/mipt/npm}/magix/api/MagixMessage.kt (74%) rename magix/magix-api/src/commonMain/kotlin/{hep/dataforge => ru/mipt/npm}/magix/api/MagixMessageFilter.kt (100%) create mode 100644 magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt rename magix/magix-server/src/main/kotlin/{hep/dataforge => ru/mipt/npm}/magix/server/MagixServer.kt (79%) rename magix/magix-server/src/main/kotlin/{hep/dataforge => ru/mipt/npm}/magix/server/magixModule.kt (96%) rename magix/magix-server/src/main/kotlin/{hep/dataforge => ru/mipt/npm}/magix/server/sse.kt (100%) rename magix/magix-service/src/commonMain/kotlin/{hep/dataforge => ru/mipt/npm}/magix/service/RSocketMagixEndpoint.kt (82%) rename magix/magix-service/src/jvmMain/kotlin/{hep/dataforge => ru/mipt/npm}/magix/service/withTcp.kt (76%) diff --git a/build.gradle.kts b/build.gradle.kts index de93677..173ddca 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,29 +1,22 @@ plugins { id("ru.mipt.npm.gradle.project") - kotlin("jvm") apply false - kotlin("js") apply false } -val dataforgeVersion: String by extra("0.4.0-dev-8") +val dataforgeVersion: String by extra("0.4.3") val ktorVersion: String by extra(ru.mipt.npm.gradle.KScienceVersions.ktorVersion) val rsocketVersion by extra("0.12.0") allprojects { - repositories { - mavenLocal() - //maven("http://maven.jzy3d.org/releases") - maven(url = "https://maven.pkg.jetbrains.space/public/p/kotlinx-html/maven") - maven("https://kotlin.bintray.com/js-externals") - maven("https://dl.bintray.com/rsocket-admin/RSocket") - //maven("https://maven.pkg.github.com/altavir/ktor-client-sse") - } - group = "ru.mipt.npm" version = "0.1.0" + repositories{ + jcenter() + } } ksciencePublish { github("controls.kt") + space() } apiValidation { diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt index 93171a1..d18dffd 100644 --- a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt +++ b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/magixClient.kt @@ -5,14 +5,13 @@ import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch +import ru.mipt.npm.magix.api.MagixEndpoint import space.kscience.dataforge.context.error import space.kscience.dataforge.context.logger import space.kscience.dataforge.control.controllers.DeviceManager import space.kscience.dataforge.control.controllers.respondMessage import space.kscience.dataforge.control.messages.DeviceMessage -import space.kscience.dataforge.magix.api.MagixEndpoint import space.kscience.dataforge.magix.api.MagixMessage -import space.kscience.dataforge.magix.api.MagixProcessor public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge" @@ -27,10 +26,10 @@ private fun generateId(request: MagixMessage): String = if (reque * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) */ public fun DeviceManager.launchMagixClient( - endpoint: MagixEndpoint, + endpoint: MagixEndpoint, endpointID: String = DATAFORGE_MAGIX_FORMAT, ): Job = context.launch { - endpoint.subscribe(DeviceMessage.serializer()).onEach { request -> + endpoint.subscribe().onEach { request -> //TODO analyze action val responsePayload = respondMessage(request.payload) @@ -41,9 +40,9 @@ public fun DeviceManager.launchMagixClient( origin = endpointID, payload = responsePayload ) - endpoint.broadcast(DeviceMessage.serializer(), response) + endpoint.broadcast(response) }.catch { error -> - logger.error(error){"Error while responding to message"} + logger.error(error) { "Error while responding to message" } }.launchIn(this) controller.messageOutput().onEach { payload -> @@ -54,13 +53,8 @@ public fun DeviceManager.launchMagixClient( payload = payload ) }.catch { error -> - logger.error(error){"Error while sending a message"} + logger.error(error) { "Error while sending a message" } }.launchIn(this) } -public fun DeviceManager.asMagixProcessor(endpointID: String = "dataforge"): MagixProcessor = object : MagixProcessor { - override fun process(endpoint: MagixEndpoint): Job = launchMagixClient(endpoint, endpointID) - -} - diff --git a/controls-magix-server/build.gradle.kts b/controls-magix-server/build.gradle.kts deleted file mode 100644 index c9963fc..0000000 --- a/controls-magix-server/build.gradle.kts +++ /dev/null @@ -1,21 +0,0 @@ -plugins { - id("ru.mipt.npm.gradle.mpp") - `maven-publish` -} - -kscience{ - useSerialization { - json() - } -} - -kotlin { - sourceSets { - commonMain { - dependencies { - implementation(project(":magix:magix-server")) - implementation(project(":controls-core")) - } - } - } -} \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 494d0f3..826b89c 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -17,13 +17,14 @@ dependencies{ implementation(project(":controls-server")) implementation(project(":controls-magix-client")) implementation("no.tornado:tornadofx:1.7.20") - implementation("space.kscience:plotlykt-server:0.4.0-dev-2") + implementation("space.kscience:plotlykt-server:0.4.2") implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6") } tasks.withType().configureEach { kotlinOptions { jvmTarget = "11" + freeCompilerArgs = freeCompilerArgs + "-Xjvm-default=all" } } diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt index 1670610..0bb2c60 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.control.demo +package ru.mipt.npm.controls.demo import io.ktor.server.engine.ApplicationEngine import javafx.scene.Parent diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt index 9c2d27e..ab485de 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.control.demo +package ru.mipt.npm.controls.demo import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt index 476628a..69866d6 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.control.demo +package ru.mipt.npm.controls.demo import io.ktor.server.engine.ApplicationEngine import kotlinx.coroutines.flow.* diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt index bb84fe3..65faef3 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.control.demo +package ru.mipt.npm.controls.demo import com.github.ricky12awesome.jss.encodeToSchema import com.github.ricky12awesome.jss.globalJson diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt deleted file mode 100644 index 39b145a..0000000 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt +++ /dev/null @@ -1,44 +0,0 @@ -package space.kscience.dataforge.magix.api - -import kotlinx.coroutines.flow.Flow -import kotlinx.serialization.KSerializer -import kotlinx.serialization.json.Json -import kotlinx.serialization.json.JsonElement - -/** - * Inwards API of magix endpoint used to build services - */ -public interface MagixEndpoint { - /** - * Subscribe to a [Flow] of messages using specific [payloadSerializer] - */ - public fun subscribe( - payloadSerializer: KSerializer, - filter: MagixMessageFilter = MagixMessageFilter.ALL, - ): Flow> - - - /** - * Send an event using specific [payloadSerializer] - */ - public suspend fun broadcast( - payloadSerializer: KSerializer, - message: MagixMessage, - ) - - public companion object { - public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 - public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 - public val magixJson: Json = Json{ - ignoreUnknownKeys = true - encodeDefaults = false - } - } -} - -public fun MagixEndpoint.subscribe( - filter: MagixMessageFilter = MagixMessageFilter.ALL, -): Flow> = subscribe(JsonElement.serializer(),filter) - -public suspend fun MagixEndpoint.broadcast(message: MagixMessage): Unit = - broadcast(JsonElement.serializer(), message) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt deleted file mode 100644 index 49e5503..0000000 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt +++ /dev/null @@ -1,60 +0,0 @@ -package space.kscience.dataforge.magix.api - -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.serialization.KSerializer -import kotlinx.serialization.json.JsonElement - -public fun interface MagixProcessor { - public fun process(endpoint: MagixEndpoint): Job - - public companion object { - /** - * A converter from one (or several) format to another. It captures all events with the given filter then transforms it - * with given [transformer] and sends back to the loop with given [outputFormat]. - * - * If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag. - */ - public fun convert( - scope: CoroutineScope, - filter: MagixMessageFilter, - outputFormat: String, - inputSerializer: KSerializer, - outputSerializer: KSerializer, - newOrigin: String? = null, - transformer: suspend (T) -> R, - ): MagixProcessor = MagixProcessor { endpoint -> - endpoint.subscribe(inputSerializer, filter).onEach { message -> - val newPayload = transformer(message.payload) - val transformed: MagixMessage = MagixMessage( - outputFormat, - newOrigin ?: message.origin, - newPayload, - message.target, - message.id, - message.parentId, - message.user - ) - endpoint.broadcast(outputSerializer, transformed) - }.launchIn(scope) - } - } - - public fun convert( - scope: CoroutineScope, - filter: MagixMessageFilter, - outputFormat: String, - newOrigin: String? = null, - transformer: suspend (JsonElement) -> JsonElement, - ): MagixProcessor = convert( - scope, - filter, - outputFormat, - JsonElement.serializer(), - JsonElement.serializer(), - newOrigin, - transformer - ) -} diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt new file mode 100644 index 0000000..b1bdfd4 --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt @@ -0,0 +1,67 @@ +package ru.mipt.npm.magix.api + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement +import space.kscience.dataforge.magix.api.MagixMessage +import space.kscience.dataforge.magix.api.MagixMessageFilter +import space.kscience.dataforge.magix.api.replacePayload + +/** + * Inwards API of magix endpoint used to build services + */ +public interface MagixEndpoint { + + /** + * Subscribe to a [Flow] of messages + */ + public fun subscribe( + filter: MagixMessageFilter = MagixMessageFilter.ALL, + ): Flow> + + + /** + * Send an event + */ + public suspend fun broadcast( + message: MagixMessage, + ) + + public companion object { + public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 + public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 + public val magixJson: Json = Json { + ignoreUnknownKeys = true + encodeDefaults = false + } + } +} + +/** + * Specialize this raw json endpoint to use specific serializer + */ +public fun MagixEndpoint.specialize( + payloadSerializer: KSerializer +): MagixEndpoint = object : MagixEndpoint { + override fun subscribe( + filter: MagixMessageFilter + ): Flow> = this@specialize.subscribe(filter).map { message -> + message.replacePayload { payload -> + MagixEndpoint.magixJson.decodeFromJsonElement(payloadSerializer, payload) + } + } + + override suspend fun broadcast(message: MagixMessage) { + this@specialize.broadcast( + message.replacePayload { payload -> + MagixEndpoint.magixJson.encodeToJsonElement( + payloadSerializer, + payload + ) + } + ) + } + +} \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt similarity index 74% rename from magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt rename to magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt index e6672c3..bfc3704 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt @@ -25,8 +25,15 @@ public data class MagixMessage( val origin: String, val payload: T, val target: String? = null, - val id: String? = null, + val id: String? = null, val parentId: String? = null, val user: JsonElement? = null, val action: String? = null -) \ No newline at end of file +) + +/** + * Create message with same field but replaced payload + */ +@Suppress("UNCHECKED_CAST") +public fun MagixMessage.replacePayload(payloadTransform: (T) -> R): MagixMessage = + MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user, action) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt similarity index 100% rename from magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt rename to magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt new file mode 100644 index 0000000..e362826 --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt @@ -0,0 +1,31 @@ +package space.kscience.dataforge.magix.api + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import ru.mipt.npm.magix.api.MagixEndpoint + +/** + * Launch magix message converter service + */ +public fun CoroutineScope.launchMagixConverter( + inputEndpoint: MagixEndpoint, + outputEndpoint: MagixEndpoint, + filter: MagixMessageFilter, + outputFormat: String, + newOrigin: String? = null, + transformer: suspend (T) -> R, +): Job = inputEndpoint.subscribe(filter).onEach { message-> + val newPayload = transformer(message.payload) + val transformed: MagixMessage = MagixMessage( + outputFormat, + newOrigin ?: message.origin, + newPayload, + message.target, + message.id, + message.parentId, + message.user + ) + outputEndpoint.broadcast(transformed) +}.launchIn(this) diff --git a/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java b/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java index 7844dde..4e2a2e3 100644 --- a/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java +++ b/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java @@ -17,10 +17,10 @@ public interface MagixClient { Flow.Publisher> subscribe(); static MagixClient rSocketTcp(String host, int port) { - return ControlsMagixClient.Companion.rSocketTcp(host, port); + return ControlsMagixClient.Companion.rSocketTcp(host, port, JsonElement.Companion.serializer()); } static MagixClient rSocketWs(String host, int port, String path) { - return ControlsMagixClient.Companion.rSocketWs(host, port, path); + return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path); } } diff --git a/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt b/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt index 05cd0f2..77db813 100644 --- a/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt +++ b/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt @@ -3,8 +3,7 @@ package ru.mipt.npm.magix.client import kotlinx.coroutines.jdk9.asPublisher import kotlinx.coroutines.runBlocking import kotlinx.serialization.KSerializer -import kotlinx.serialization.json.JsonElement -import space.kscience.dataforge.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixEndpoint import space.kscience.dataforge.magix.api.MagixMessage import space.kscience.dataforge.magix.api.MagixMessageFilter import space.kscience.dataforge.magix.service.RSocketMagixEndpoint @@ -12,31 +11,39 @@ import space.kscience.dataforge.magix.service.withTcp import java.util.concurrent.Flow public class ControlsMagixClient( - private val endpoint: MagixEndpoint, + private val endpoint: MagixEndpoint, private val filter: MagixMessageFilter, - private val serializer: KSerializer, ) : MagixClient { override fun broadcast(msg: MagixMessage): Unit = runBlocking { - endpoint.broadcast(serializer, msg) + endpoint.broadcast(msg) } - override fun subscribe(): Flow.Publisher> = endpoint.subscribe(serializer, filter).asPublisher() + override fun subscribe(): Flow.Publisher> = endpoint.subscribe(filter).asPublisher() public companion object { - public fun rSocketTcp(host: String, port: Int): ControlsMagixClient { + public fun rSocketTcp( + host: String, + port: Int, + payloadSerializer: KSerializer + ): ControlsMagixClient { val endpoint = runBlocking { - RSocketMagixEndpoint.withTcp(host, port) + RSocketMagixEndpoint.withTcp(host, port, payloadSerializer) } - return ControlsMagixClient(endpoint, MagixMessageFilter(), JsonElement.serializer()) + return ControlsMagixClient(endpoint, MagixMessageFilter()) } - public fun rSocketWs(host: String, port: Int, path: String = "/rsocket"): ControlsMagixClient { + public fun rSocketWs( + host: String, + port: Int, + payloadSerializer: KSerializer, + path: String = "/rsocket" + ): ControlsMagixClient { val endpoint = runBlocking { - RSocketMagixEndpoint.withWebSockets(host, port, path) + RSocketMagixEndpoint.withWebSockets(host, port, payloadSerializer, path) } - return ControlsMagixClient(endpoint, MagixMessageFilter(), JsonElement.serializer()) + return ControlsMagixClient(endpoint, MagixMessageFilter()) } } } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt similarity index 79% rename from magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt rename to magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt index bd7514f..483e994 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/MagixServer.kt @@ -11,8 +11,11 @@ import io.rsocket.kotlin.transport.ktor.serverTransport import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.MutableSharedFlow -import space.kscience.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT -import space.kscience.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT +import ru.mipt.npm.magix.server.GenericMagixMessage +import ru.mipt.npm.magix.server.magixAcceptor +import ru.mipt.npm.magix.server.magixModule @OptIn(KtorExperimentalAPI::class) public fun CoroutineScope.startMagixServer( diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt similarity index 96% rename from magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt rename to magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt index b134a2d..a21e356 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt @@ -1,4 +1,4 @@ -package space.kscience.dataforge.magix.server +package ru.mipt.npm.magix.server import io.ktor.application.* import io.ktor.features.CORS @@ -25,10 +25,12 @@ import kotlinx.coroutines.flow.* import kotlinx.html.* import kotlinx.serialization.KSerializer import kotlinx.serialization.json.JsonElement -import space.kscience.dataforge.magix.api.MagixEndpoint.Companion.magixJson +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson import space.kscience.dataforge.magix.api.MagixMessage import space.kscience.dataforge.magix.api.MagixMessageFilter import space.kscience.dataforge.magix.api.filter +import space.kscience.dataforge.magix.server.SseEvent +import space.kscience.dataforge.magix.server.respondSse import java.util.* public typealias GenericMagixMessage = MagixMessage diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/sse.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/sse.kt similarity index 100% rename from magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/sse.kt rename to magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/sse.kt diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/ru/mipt/npm/magix/service/RSocketMagixEndpoint.kt similarity index 82% rename from magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt rename to magix/magix-service/src/commonMain/kotlin/ru/mipt/npm/magix/service/RSocketMagixEndpoint.kt index 1f54a31..d8aab72 100644 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt +++ b/magix/magix-service/src/commonMain/kotlin/ru/mipt/npm/magix/service/RSocketMagixEndpoint.kt @@ -2,7 +2,6 @@ package space.kscience.dataforge.magix.service import io.ktor.client.HttpClient import io.ktor.client.features.websocket.WebSockets -import io.ktor.util.KtorExperimentalAPI import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnectorBuilder @@ -15,19 +14,19 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer import kotlinx.serialization.encodeToString -import space.kscience.dataforge.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixEndpoint import space.kscience.dataforge.magix.api.MagixMessage import space.kscience.dataforge.magix.api.MagixMessageFilter import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -public class RSocketMagixEndpoint( +public class RSocketMagixEndpoint( private val coroutineContext: CoroutineContext, + private val payloadSerializer: KSerializer, private val rSocket: RSocket, -) : MagixEndpoint { +) : MagixEndpoint { - override fun subscribe( - payloadSerializer: KSerializer, + override fun subscribe( filter: MagixMessageFilter, ): Flow> { val serializer = MagixMessage.serializer(payloadSerializer) @@ -36,7 +35,7 @@ public class RSocketMagixEndpoint( return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } } - override suspend fun broadcast(payloadSerializer: KSerializer, message: MagixMessage) { + override suspend fun broadcast(message: MagixMessage) { withContext(coroutineContext) { val serializer = MagixMessage.serializer(payloadSerializer) val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) } @@ -52,13 +51,13 @@ public class RSocketMagixEndpoint( connectionConfig(rSocketConfig) } - @OptIn(KtorExperimentalAPI::class) - public suspend fun withWebSockets( + public suspend fun withWebSockets( host: String, port: Int, + payloadSerializer: KSerializer, path: String = "/rsocket", rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, - ): RSocketMagixEndpoint { + ): RSocketMagixEndpoint { val client = HttpClient { install(WebSockets) install(RSocketSupport) { @@ -73,7 +72,7 @@ public class RSocketMagixEndpoint( client.close() } - return RSocketMagixEndpoint(coroutineContext, rSocket) + return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) } } } \ No newline at end of file diff --git a/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt b/magix/magix-service/src/jvmMain/kotlin/ru/mipt/npm/magix/service/withTcp.kt similarity index 76% rename from magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt rename to magix/magix-service/src/jvmMain/kotlin/ru/mipt/npm/magix/service/withTcp.kt index 69a0afd..63f37e4 100644 --- a/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt +++ b/magix/magix-service/src/jvmMain/kotlin/ru/mipt/npm/magix/service/withTcp.kt @@ -3,25 +3,25 @@ package space.kscience.dataforge.magix.service import io.ktor.network.selector.ActorSelectorManager import io.ktor.network.sockets.SocketOptions import io.ktor.network.sockets.aSocket -import io.ktor.util.KtorExperimentalAPI import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.transport.ktor.clientTransport import kotlinx.coroutines.Dispatchers +import kotlinx.serialization.KSerializer import kotlin.coroutines.coroutineContext /** * Create a plain TCP based [RSocketMagixEndpoint] */ -@OptIn(KtorExperimentalAPI::class) -public suspend fun RSocketMagixEndpoint.Companion.withTcp( +public suspend fun RSocketMagixEndpoint.Companion.withTcp( host: String, port: Int, + payloadSerializer: KSerializer, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, -): RSocketMagixEndpoint { +): RSocketMagixEndpoint { val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketMagixEndpoint(coroutineContext, rSocket) + return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) } diff --git a/settings.gradle.kts b/settings.gradle.kts index 14e7573..c297374 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,9 +1,9 @@ +rootProject.name = "controls.kt" + pluginManagement { - val kotlinVersion = "1.5.0-M2" - val toolsVersion = "0.9.5-dev" + val toolsVersion = "0.10.0" repositories { - mavenLocal() maven("https://repo.kotlin.link") mavenCentral() gradlePluginPortal() @@ -14,15 +14,9 @@ pluginManagement { id("ru.mipt.npm.gradle.mpp") version toolsVersion id("ru.mipt.npm.gradle.jvm") version toolsVersion id("ru.mipt.npm.gradle.js") version toolsVersion - id("ru.mipt.npm.gradle.publish") version toolsVersion - kotlin("jvm") version kotlinVersion - kotlin("js") version kotlinVersion - kotlin("multiplatform") version kotlinVersion } } -rootProject.name = "controls.kt" - include( ":controls-core", ":controls-tcp", @@ -35,7 +29,6 @@ include( ":magix:magix-service", ":magix:magix-java-client", ":controls-magix-client", - ":controls-magix-server", ":motors" )