From 356f3e15a6399a18b2b9283dea5ed0443a6b0eb7 Mon Sep 17 00:00:00 2001 From: darksnake Date: Mon, 9 Nov 2020 19:22:34 +0300 Subject: [PATCH] Remove CoroutineScope from Endpoint interface --- build.gradle.kts | 3 +-- .../dataforge/control/client/TangoPayload.kt | 22 ++++++++++++++----- .../dataforge/control/client/magixClient.kt | 11 +++++++--- .../hep/dataforge/magix/api/MagixEndpoint.kt | 10 ++++----- .../hep/dataforge/magix/api/MagixMessage.kt | 3 ++- .../dataforge/magix/api/MagixMessageFilter.kt | 2 -- .../hep/dataforge/magix/api/MagixProcessor.kt | 13 ++++++----- .../hep/dataforge/magix/server/magixModule.kt | 8 ++++--- .../magix/service/RSocketMagixEndpoint.kt | 22 +++++++++---------- .../hep/dataforge/magix/service/withTcp.kt | 5 ++--- 10 files changed, 58 insertions(+), 41 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index ac98e5b..8bdb37d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { val dataforgeVersion: String by extra("0.2.0-dev-4") val ktorVersion: String by extra("1.4.1") -val rsocketVersion by extra("0.11.0-SNAPSHOT") +val rsocketVersion by extra("0.11.1") allprojects { repositories { @@ -17,7 +17,6 @@ allprojects { maven("https://maven.pkg.github.com/altavir/kotlin-logging/") maven("https://dl.bintray.com/rsocket-admin/RSocket") maven("https://maven.pkg.github.com/altavir/ktor-client-sse") - maven("https://oss.jfrog.org/oss-snapshot-local") } group = "hep.dataforge" diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt index 5baed0c..3c96d55 100644 --- a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt @@ -1,14 +1,24 @@ package hep.dataforge.control.client -public data class TangoPayload( +public sealed class TangoPayload( val host: String, val device: String, val name: String, - val value: Any? = null, val timestamp: Long? = null, val quality: String = "VALID", val event: String? = null, - val input: Any? = null, - val output: Any? = null, - val errors: Iterable?, -) \ No newline at end of file +// val input: Any? = null, +// val output: Any? = null, +// val errors: Iterable?, +) + +public class TangoAttributePayload( + host: String, + device: String, + name: String, + val value: Any? = null, + timestamp: Long? = null, + quality: String = "VALID", + event: String? = null, + errors: Iterable?, +) : TangoPayload(host, device, name, timestamp, quality, event) \ No newline at end of file diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt index 8815c5c..bff98fd 100644 --- a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt @@ -5,6 +5,7 @@ import hep.dataforge.control.controllers.DeviceMessage import hep.dataforge.control.controllers.respondMessage import hep.dataforge.magix.api.MagixEndpoint import hep.dataforge.magix.api.MagixMessage +import hep.dataforge.magix.api.MagixProcessor import kotlinx.coroutines.Job import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach @@ -38,7 +39,7 @@ public fun DeviceManager.launchMagixClient( payload = responsePayload ) endpoint.broadcast(DeviceMessage.serializer(), response) - }.launchIn(endpoint.scope) + }.launchIn(this) controller.messageOutput().onEach { payload -> MagixMessage( @@ -47,8 +48,12 @@ public fun DeviceManager.launchMagixClient( origin = endpointID, payload = payload ) - }.launchIn(endpoint.scope) + }.launchIn(this) +} + +public fun DeviceManager.asMagixProcessor(endpointID: String = "dataforge"): MagixProcessor = object : MagixProcessor { + override fun process(endpoint: MagixEndpoint): Job = launchMagixClient(endpoint, endpointID) + } - diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt index 42aa8a3..3a17d50 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt @@ -1,6 +1,5 @@ package hep.dataforge.magix.api -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json @@ -10,8 +9,6 @@ import kotlinx.serialization.json.JsonElement * Inwards API of magix endpoint used to build services */ public interface MagixEndpoint { - public val scope: CoroutineScope - /** * Subscribe to a [Flow] of messages using specific [payloadSerializer] */ @@ -32,13 +29,16 @@ public interface MagixEndpoint { public companion object { public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 - public val magixJson: Json = Json + public val magixJson: Json = Json{ + ignoreUnknownKeys = true + encodeDefaults = false + } } } public fun MagixEndpoint.subscribe( filter: MagixMessageFilter = MagixMessageFilter.ALL, -): Flow> = subscribe(JsonElement.serializer()) +): Flow> = subscribe(JsonElement.serializer(),filter) public suspend fun MagixEndpoint.broadcast(message: MagixMessage): Unit = broadcast(JsonElement.serializer(), message) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt index a92275f..230598e 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt @@ -1,6 +1,7 @@ package hep.dataforge.magix.api import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement /** * @@ -26,6 +27,6 @@ public data class MagixMessage( val target: String? = null, val id: String? = null, val parentId: String? = null, - val user: String? = null, + val user: JsonElement? = null, val action: String? = null ) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt index 8f9c097..d96064d 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt @@ -9,7 +9,6 @@ public data class MagixMessageFilter( val format: List? = null, val origin: List? = null, val target: List? = null, - val user: List? = null, val action: List? = null, ) { public companion object { @@ -29,7 +28,6 @@ public fun Flow>.filter(filter: MagixMessageFilter): Flow JsonElement, + private val scope: CoroutineScope, + private val filter: MagixMessageFilter, + private val outputFormat: String, + private val newOrigin: String? = null, + private val transformer: suspend (JsonElement) -> JsonElement, ) : MagixProcessor { - override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch { + override fun process(endpoint: MagixEndpoint): Job = scope.launch { endpoint.subscribe(filter).onEach { message -> val newPayload = transformer(message.payload) val transformed = message.copy( @@ -32,5 +34,6 @@ public class MagixConverter( ) endpoint.broadcast(transformed) }.launchIn(this) + //TODO add catch logic here } } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt index b747771..2204a02 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt @@ -20,6 +20,8 @@ import io.ktor.websocket.WebSockets import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.RSocketRequestHandler import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.transport.ktor.server.RSocketSupport import io.rsocket.kotlin.transport.ktor.server.rSocket import kotlinx.coroutines.CoroutineScope @@ -42,7 +44,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow val string = magixJson.encodeToString(genericMessageSerializer, message) - Payload(string) + buildPayload { data(string) } } } fireAndForget { request: Payload -> @@ -52,12 +54,12 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow -> input.onEach { - magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer,it.data.readText())) + magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText())) }.launchIn(this@magixAcceptor) magixFlow.map { message -> val string = magixJson.encodeToString(genericMessageSerializer, message) - Payload(string) + buildPayload { data(string) } } } } diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt index 05b7cbf..201a647 100644 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt @@ -9,18 +9,20 @@ import io.ktor.util.KtorExperimentalAPI import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnectorBuilder -import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.buildPayload +import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.rSocket -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer import kotlinx.serialization.encodeToString +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext public class RSocketMagixEndpoint( - override val scope: CoroutineScope, + val coroutineContext: CoroutineContext, public val rSocket: RSocket, ) : MagixEndpoint { @@ -29,15 +31,15 @@ public class RSocketMagixEndpoint( filter: MagixMessageFilter, ): Flow> { val serializer = MagixMessage.serializer(payloadSerializer) - val payload = Payload(MagixEndpoint.magixJson.encodeToString(filter)) + val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } } override suspend fun broadcast(payloadSerializer: KSerializer, message: MagixMessage) { - scope.launch { + withContext(coroutineContext) { val serializer = MagixMessage.serializer(payloadSerializer) - val payload = Payload(MagixEndpoint.magixJson.encodeToString(serializer, message)) + val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) } rSocket.fireAndForget(payload) } } @@ -52,7 +54,6 @@ public class RSocketMagixEndpoint( @OptIn(KtorExperimentalAPI::class) public suspend fun withWebSockets( - scope: CoroutineScope, host: String, port: Int, path: String = "/rsocket", @@ -72,8 +73,7 @@ public class RSocketMagixEndpoint( client.close() } - return RSocketMagixEndpoint(scope, rSocket) + return RSocketMagixEndpoint(coroutineContext, rSocket) } } -} - +} \ No newline at end of file diff --git a/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt b/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt index 131d8ce..4a9aad3 100644 --- a/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt +++ b/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt @@ -6,8 +6,8 @@ import io.ktor.network.sockets.aSocket import io.ktor.util.KtorExperimentalAPI import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.transport.ktor.clientTransport -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlin.coroutines.coroutineContext /** @@ -15,7 +15,6 @@ import kotlinx.coroutines.Dispatchers */ @OptIn(KtorExperimentalAPI::class) public suspend fun RSocketMagixEndpoint.Companion.withTcp( - scope: CoroutineScope, host: String, port: Int, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, @@ -24,5 +23,5 @@ public suspend fun RSocketMagixEndpoint.Companion.withTcp( val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketMagixEndpoint(scope, rSocket) + return RSocketMagixEndpoint(coroutineContext, rSocket) }