Magix format converter

This commit is contained in:
Alexander Nozik 2020-11-03 22:33:13 +03:00
parent 32c29240d2
commit 5c90e8e07b
3 changed files with 42 additions and 9 deletions

View File

@ -4,21 +4,29 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
/** /**
* Inwards API of magix endpoint used to build plugins * Inwards API of magix endpoint used to build services
*/ */
public interface MagixEndpoint { public interface MagixEndpoint {
public val scope: CoroutineScope public val scope: CoroutineScope
/**
* Subscribe to a [Flow] of messages using specific [payloadSerializer]
*/
public suspend fun <T> subscribe( public suspend fun <T> subscribe(
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter = MagixMessageFilter.ALL, filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<T>> ): Flow<MagixMessage<T>>
/**
* Send an event using specific [payloadSerializer]
*/
public suspend fun <T> send( public suspend fun <T> send(
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
message: MagixMessage<T> message: MagixMessage<T>,
) )
public companion object { public companion object {
@ -27,3 +35,10 @@ public interface MagixEndpoint {
public val magixJson: Json = Json public val magixJson: Json = Json
} }
} }
public suspend fun MagixEndpoint.subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer())
public suspend fun MagixEndpoint.send(message: MagixMessage<JsonElement>): Unit =
send(JsonElement.serializer(), message)

View File

@ -1,6 +1,7 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonElement
@ -9,13 +10,27 @@ public interface MagixProcessor {
public fun process(endpoint: MagixEndpoint): Job public fun process(endpoint: MagixEndpoint): Job
} }
/**
* A converter from one (or several) format to another. It captures all events with the given filter then transforms it
* with given [transformer] and sends back to the loop with given [outputFormat].
*
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/
public class MagixConverter( public class MagixConverter(
public val filter: MagixMessageFilter, public val filter: MagixMessageFilter,
public val transformer: (JsonElement) -> JsonElement, public val outputFormat: String,
public val newOrigin: String? = null,
public val transformer: suspend (JsonElement) -> JsonElement,
) : MagixProcessor { ) : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch { override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch {
endpoint.subscribe(JsonElement.serializer(), filter).onEach { endpoint.subscribe(filter).onEach { message ->
TODO() val newPayload = transformer(message.payload)
} val transformed = message.copy(
payload = newPayload,
format = outputFormat,
origin = newOrigin ?: message.origin
)
endpoint.send(transformed)
}.launchIn(this)
} }
} }

View File

@ -24,7 +24,10 @@ import kotlinx.serialization.encodeToString
import kotlin.time.minutes import kotlin.time.minutes
import kotlin.time.seconds import kotlin.time.seconds
public class RScocketMagixEndpoint( /**
* An RSocket endpoint which relies on WebSocket transport
*/
public class WebRScocketMagixEndpoint(
override val scope: CoroutineScope, override val scope: CoroutineScope,
public val host: String, public val host: String,
public val port: Int, public val port: Int,