From 5c90e8e07b170d5de35b1a76e60b4b1d141ab14d Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 3 Nov 2020 22:33:13 +0300 Subject: [PATCH] Magix format converter --- .../hep/dataforge/magix/api/MagixEndpoint.kt | 23 +++++++++++++++---- .../hep/dataforge/magix/api/MagixProcessor.kt | 23 +++++++++++++++---- ...ndpoint.kt => WebRScocketMagixEndpoint.kt} | 5 +++- 3 files changed, 42 insertions(+), 9 deletions(-) rename magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/{RScocketMagixEndpoint.kt => WebRScocketMagixEndpoint.kt} (96%) diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt index c0a21fd..132a871 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt @@ -4,26 +4,41 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement /** - * Inwards API of magix endpoint used to build plugins + * Inwards API of magix endpoint used to build services */ public interface MagixEndpoint { public val scope: CoroutineScope + /** + * Subscribe to a [Flow] of messages using specific [payloadSerializer] + */ public suspend fun subscribe( payloadSerializer: KSerializer, filter: MagixMessageFilter = MagixMessageFilter.ALL, ): Flow> + + /** + * Send an event using specific [payloadSerializer] + */ public suspend fun send( payloadSerializer: KSerializer, - message: MagixMessage + message: MagixMessage, ) - public companion object{ + public companion object { public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 public val magixJson: Json = Json } -} \ No newline at end of file +} + +public suspend fun MagixEndpoint.subscribe( + filter: MagixMessageFilter = MagixMessageFilter.ALL, +): Flow> = subscribe(JsonElement.serializer()) + +public suspend fun MagixEndpoint.send(message: MagixMessage): Unit = + send(JsonElement.serializer(), message) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt index 2378f4a..a7afe75 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt @@ -1,6 +1,7 @@ package hep.dataforge.magix.api import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import kotlinx.serialization.json.JsonElement @@ -9,13 +10,27 @@ public interface MagixProcessor { public fun process(endpoint: MagixEndpoint): Job } +/** + * A converter from one (or several) format to another. It captures all events with the given filter then transforms it + * with given [transformer] and sends back to the loop with given [outputFormat]. + * + * If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag. + */ public class MagixConverter( public val filter: MagixMessageFilter, - public val transformer: (JsonElement) -> JsonElement, + public val outputFormat: String, + public val newOrigin: String? = null, + public val transformer: suspend (JsonElement) -> JsonElement, ) : MagixProcessor { override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch { - endpoint.subscribe(JsonElement.serializer(), filter).onEach { - TODO() - } + endpoint.subscribe(filter).onEach { message -> + val newPayload = transformer(message.payload) + val transformed = message.copy( + payload = newPayload, + format = outputFormat, + origin = newOrigin ?: message.origin + ) + endpoint.send(transformed) + }.launchIn(this) } } \ No newline at end of file diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/WebRScocketMagixEndpoint.kt similarity index 96% rename from magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt rename to magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/WebRScocketMagixEndpoint.kt index d040ff6..265174d 100644 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/WebRScocketMagixEndpoint.kt @@ -24,7 +24,10 @@ import kotlinx.serialization.encodeToString import kotlin.time.minutes import kotlin.time.seconds -public class RScocketMagixEndpoint( +/** + * An RSocket endpoint which relies on WebSocket transport + */ +public class WebRScocketMagixEndpoint( override val scope: CoroutineScope, public val host: String, public val port: Int,