diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/controlsMagix.kt similarity index 64% rename from controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt rename to controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/controlsMagix.kt index 87972c9..d5fa09d 100644 --- a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/controlsMagix.kt @@ -9,15 +9,17 @@ import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.manager.DeviceManager import ru.mipt.npm.controls.manager.hubMessageFlow import ru.mipt.npm.controls.manager.respondHubMessage -import ru.mipt.npm.magix.api.MagixEndpoint -import ru.mipt.npm.magix.api.MagixMessage +import ru.mipt.npm.magix.api.* import space.kscience.dataforge.context.error import space.kscience.dataforge.context.logger -public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge" +public val controlsMagixFormat: MagixFormat = MagixFormat( + DeviceMessage.serializer(), + setOf("controls-kt", "dataforge") +) -internal fun generateId(request: MagixMessage<*>): String = if (request.id != null) { +internal fun generateId(request: MagixMessage): String = if (request.id != null) { "${request.id}.response" } else { "df[${request.payload.hashCode()}" @@ -27,38 +29,31 @@ internal fun generateId(request: MagixMessage<*>): String = if (request.id != nu * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) */ public fun DeviceManager.connectToMagix( - endpoint: MagixEndpoint, - endpointID: String = DATAFORGE_MAGIX_FORMAT, - preSendAction: (MagixMessage<*>) -> Unit = {} + endpoint: MagixEndpoint, + endpointID: String = controlsMagixFormat.defaultFormat, ): Job = context.launch { - endpoint.subscribe().onEach { request -> - val responsePayload = respondHubMessage(request.payload) + endpoint.subscribe(controlsMagixFormat).onEach { (request, payload) -> + val responsePayload = respondHubMessage(payload) if (responsePayload != null) { - val response = MagixMessage( + endpoint.broadcast( + format = controlsMagixFormat, origin = endpointID, payload = responsePayload, - format = DATAFORGE_MAGIX_FORMAT, id = generateId(request), parentId = request.id ) - - endpoint.broadcast(response) } }.catch { error -> logger.error(error) { "Error while responding to message" } }.launchIn(this) hubMessageFlow(this).onEach { payload -> - val magixMessage = MagixMessage( + endpoint.broadcast( + format = controlsMagixFormat, origin = endpointID, payload = payload, - format = DATAFORGE_MAGIX_FORMAT, id = "df[${payload.hashCode()}]" ) - preSendAction(magixMessage) - endpoint.broadcast( - magixMessage - ) }.catch { error -> logger.error(error) { "Error while sending a message" } }.launchIn(this) diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt index b1d9e2d..8a88868 100644 --- a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/tangoMagix.kt @@ -8,8 +8,7 @@ import kotlinx.serialization.Serializable import ru.mipt.npm.controls.api.get import ru.mipt.npm.controls.api.getOrReadProperty import ru.mipt.npm.controls.manager.DeviceManager -import ru.mipt.npm.magix.api.MagixEndpoint -import ru.mipt.npm.magix.api.MagixMessage +import ru.mipt.npm.magix.api.* import space.kscience.dataforge.context.error import space.kscience.dataforge.context.logger import space.kscience.dataforge.meta.Meta @@ -59,33 +58,39 @@ public data class TangoPayload( val argin: Meta? = null, val argout: Meta? = null, val data: Meta? = null, - val errors: List? = null + val errors: List? = null, ) +internal val tangoMagixFormat = MagixFormat( + TangoPayload.serializer(), + setOf("tango") +) + + public fun DeviceManager.launchTangoMagix( - endpoint: MagixEndpoint, + endpoint: MagixEndpoint, endpointID: String = TANGO_MAGIX_FORMAT, ): Job { - suspend fun respond(request: MagixMessage, payloadBuilder: (TangoPayload) -> TangoPayload) { + + suspend fun respond(request: MagixMessage, payload: TangoPayload, payloadBuilder: (TangoPayload) -> TangoPayload) { endpoint.broadcast( - request.copy( - id = generateId(request), - parentId = request.id, - origin = endpointID, - payload = payloadBuilder(request.payload) - ) + tangoMagixFormat, + id = generateId(request), + parentId = request.id, + origin = endpointID, + payload = payloadBuilder(payload) ) } return context.launch { - endpoint.subscribe().onEach { request -> + endpoint.subscribe(tangoMagixFormat).onEach { (request, payload) -> try { - val device = get(request.payload.device) - when (request.payload.action) { + val device = get(payload.device) + when (payload.action) { TangoAction.read -> { - val value = device.getOrReadProperty(request.payload.name) - respond(request) { requestPayload -> + val value = device.getOrReadProperty(payload.name) + respond(request, payload) { requestPayload -> requestPayload.copy( value = value, quality = TangoQuality.VALID @@ -93,12 +98,12 @@ public fun DeviceManager.launchTangoMagix( } } TangoAction.write -> { - request.payload.value?.let { value -> - device.writeProperty(request.payload.name, value) + payload.value?.let { value -> + device.writeProperty(payload.name, value) } //wait for value to be written and return final state - val value = device.getOrReadProperty(request.payload.name) - respond(request) { requestPayload -> + val value = device.getOrReadProperty(payload.name) + respond(request, payload) { requestPayload -> requestPayload.copy( value = value, quality = TangoQuality.VALID @@ -106,8 +111,8 @@ public fun DeviceManager.launchTangoMagix( } } TangoAction.exec -> { - val result = device.execute(request.payload.name, request.payload.argin) - respond(request) { requestPayload -> + val result = device.execute(payload.name, payload.argin) + respond(request, payload) { requestPayload -> requestPayload.copy( argout = result, quality = TangoQuality.VALID @@ -119,12 +124,11 @@ public fun DeviceManager.launchTangoMagix( } catch (ex: Exception) { logger.error(ex) { "Error while responding to message" } endpoint.broadcast( - request.copy( - id = generateId(request), - parentId = request.id, - origin = endpointID, - payload = request.payload.copy(quality = TangoQuality.WARNING) - ) + tangoMagixFormat, + id = generateId(request), + parentId = request.id, + origin = endpointID, + payload = payload.copy(quality = TangoQuality.WARNING) ) } }.launchIn(this) diff --git a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt index ca1b738..a2c67dd 100644 --- a/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt +++ b/controls-server/src/main/kotlin/ru/mipt/npm/controls/server/deviceWebServer.kt @@ -33,7 +33,7 @@ import ru.mipt.npm.controls.api.getOrNull import ru.mipt.npm.controls.manager.DeviceManager import ru.mipt.npm.controls.manager.respondHubMessage import ru.mipt.npm.magix.api.MagixEndpoint -import ru.mipt.npm.magix.server.GenericMagixMessage +import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.server.launchMagixServerRawRSocket import ru.mipt.npm.magix.server.magixModule import space.kscience.dataforge.meta.toMeta @@ -212,7 +212,7 @@ public fun Application.deviceManagerModule( } } - val magixFlow = MutableSharedFlow( + val magixFlow = MutableSharedFlow( buffer, extraBufferCapacity = buffer ) diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt index 64e0fa0..ba6ff8e 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt @@ -8,7 +8,6 @@ import javafx.stage.Stage import kotlinx.coroutines.launch import org.eclipse.milo.opcua.sdk.server.OpcUaServer import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText -import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.client.connectToMagix import ru.mipt.npm.controls.demo.DemoDevice.Companion.cosScale import ru.mipt.npm.controls.demo.DemoDevice.Companion.sinScale @@ -52,9 +51,9 @@ class DemoController : Controller(), ContextAware { //starting magix event loop magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) //Launch device client and connect it to the server - val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) + val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") deviceManager.connectToMagix(deviceEndpoint) - val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost", DeviceMessage.serializer()) + val visualEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") visualizer = visualEndpoint.startDemoDeviceServer() opcUaServer.startup() diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt index 7f65add..9cf7589 100644 --- a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt +++ b/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt @@ -11,9 +11,10 @@ import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.html.div import kotlinx.html.link -import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.client.controlsMagixFormat import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.subscribe import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.double import space.kscience.plotly.layout @@ -54,7 +55,7 @@ suspend fun Trace.updateXYFrom(flow: Flow>>) { } -suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedServer(CIO, 9090) { +suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEngine = embeddedServer(CIO, 9090) { install(WebSockets) install(RSocketSupport) @@ -66,8 +67,8 @@ suspend fun MagixEndpoint.startDemoDeviceServer(): ApplicationEng val cosFlow = MutableSharedFlow()// = device.cos.flow() launch { - subscribe().collect { magix -> - (magix.payload as? PropertyChangedMessage)?.let { message -> + subscribe(controlsMagixFormat).collect { (magix, payload) -> + (payload as? PropertyChangedMessage)?.let { message -> when (message.property) { "sin" -> sinFlow.emit(message.value) "cos" -> cosFlow.emit(message.value) diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt index 78ab54d..c531583 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt @@ -1,29 +1,26 @@ package ru.mipt.npm.magix.api import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map -import kotlinx.serialization.KSerializer import kotlinx.serialization.json.Json -import kotlinx.serialization.json.JsonElement /** * Inwards API of magix endpoint used to build services */ -public interface MagixEndpoint { +public interface MagixEndpoint { /** * Subscribe to a [Flow] of messages */ public fun subscribe( filter: MagixMessageFilter = MagixMessageFilter.ALL, - ): Flow> + ): Flow /** * Send an event */ public suspend fun broadcast( - message: MagixMessage, + message: MagixMessage, ) public companion object { @@ -53,31 +50,4 @@ public interface MagixEndpoint { encodeDefaults = false } } -} - -/** - * Specialize this raw json endpoint to use specific serializer - */ -public fun MagixEndpoint.specialize( - payloadSerializer: KSerializer -): MagixEndpoint = object : MagixEndpoint { - override fun subscribe( - filter: MagixMessageFilter - ): Flow> = this@specialize.subscribe(filter).map { message -> - message.replacePayload { payload -> - MagixEndpoint.magixJson.decodeFromJsonElement(payloadSerializer, payload) - } - } - - override suspend fun broadcast(message: MagixMessage) { - this@specialize.broadcast( - message.replacePayload { payload -> - MagixEndpoint.magixJson.encodeToJsonElement( - payloadSerializer, - payload - ) - } - ) - } - } \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt new file mode 100644 index 0000000..d2aa466 --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt @@ -0,0 +1,47 @@ +package ru.mipt.npm.magix.api + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.JsonElement +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson + +public data class MagixFormat( + val serializer: KSerializer, + val formats: Set, +) { + val defaultFormat: String get() = formats.firstOrNull() ?: "magix" +} + +public fun MagixEndpoint.subscribe( + format: MagixFormat, + originFilter: Collection? = null, + targetFilter: Collection? = null, +): Flow> = subscribe( + MagixMessageFilter(format = format.formats, origin = originFilter, target = targetFilter) +).map { + val value: T = magixJson.decodeFromJsonElement(format.serializer, it.payload) + it to value +} + +public suspend fun MagixEndpoint.broadcast( + format: MagixFormat, + payload: T, + target: String? = null, + id: String? = null, + parentId: String? = null, + user: JsonElement? = null, + origin: String = format.defaultFormat, +) { + val message = MagixMessage( + origin = origin, + payload = magixJson.encodeToJsonElement(format.serializer, payload), + format = format.defaultFormat, + target = target, + id = id, + parentId = parentId, + user = user + ) + broadcast(message) +} + diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt index e804a69..e593434 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt @@ -24,19 +24,12 @@ import kotlinx.serialization.json.JsonElement * */ @Serializable -public data class MagixMessage( +public data class MagixMessage( val origin: String, - val payload: T, + val payload: JsonElement, val format: String = origin, val target: String? = null, val id: String? = null, val parentId: String? = null, val user: JsonElement? = null, -) - -/** - * Create message with same field but replaced payload - */ -@Suppress("UNCHECKED_CAST") -public fun MagixMessage.replacePayload(payloadTransform: (T) -> R): MagixMessage = - MagixMessage(origin, payloadTransform(payload), format, target, id, parentId, user) \ No newline at end of file +) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt index 6083ac4..fb6ab4e 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessageFilter.kt @@ -6,9 +6,9 @@ import kotlinx.serialization.Serializable @Serializable public data class MagixMessageFilter( - val format: List? = null, - val origin: List? = null, - val target: List? = null, + val format: Collection? = null, + val origin: Collection? = null, + val target: Collection? = null, ) { public companion object { public val ALL: MagixMessageFilter = MagixMessageFilter() @@ -18,7 +18,7 @@ public data class MagixMessageFilter( /** * Filter a [Flow] of messages based on given filter */ -public fun Flow>.filter(filter: MagixMessageFilter): Flow> { +public fun Flow.filter(filter: MagixMessageFilter): Flow { if (filter == MagixMessageFilter.ALL) { return this } diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt index 1ab2de7..dbdea23 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt @@ -4,20 +4,20 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.serialization.json.JsonElement /** * Launch magix message converter service */ public fun CoroutineScope.launchMagixConverter( - inputEndpoint: MagixEndpoint, - outputEndpoint: MagixEndpoint, + endpoint: MagixEndpoint, filter: MagixMessageFilter, outputFormat: String, newOrigin: String? = null, - transformer: suspend (T) -> R, -): Job = inputEndpoint.subscribe(filter).onEach { message-> + transformer: suspend (JsonElement) -> JsonElement, +): Job = endpoint.subscribe(filter).onEach { message-> val newPayload = transformer(message.payload) - val transformed: MagixMessage = MagixMessage( + val transformed: MagixMessage = MagixMessage( newOrigin ?: message.origin, newPayload, outputFormat, @@ -26,5 +26,5 @@ public fun CoroutineScope.launchMagixConverter( message.parentId, message.user ) - outputEndpoint.broadcast(transformed) + endpoint.broadcast(transformed) }.launchIn(this) diff --git a/magix/magix-demo/src/main/kotlin/zmq.kt b/magix/magix-demo/src/main/kotlin/zmq.kt index 44bafc9..868b503 100644 --- a/magix/magix-demo/src/main/kotlin/zmq.kt +++ b/magix/magix-demo/src/main/kotlin/zmq.kt @@ -15,7 +15,7 @@ import java.awt.Desktop import java.net.URI -suspend fun MagixEndpoint.sendJson( +suspend fun MagixEndpoint.sendJson( origin: String, format: String = "json", target: String? = null, @@ -44,11 +44,11 @@ suspend fun main(): Unit = coroutineScope { logger.info("Starting client") //Create zmq magix endpoint and wait for to finish - ZmqMagixEndpoint("tcp://localhost", JsonObject.serializer()).use { client -> + ZmqMagixEndpoint("tcp://localhost").use { client -> logger.info("Starting subscription") client.subscribe().onEach { println(it.payload) - if (it.payload["index"]?.jsonPrimitive?.int == numberOfMessages) { + if (it.payload.jsonObject["index"]?.jsonPrimitive?.int == numberOfMessages) { logger.info("Index $numberOfMessages reached. Terminating") cancel() } diff --git a/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java b/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java index 50d6f09..58af6d2 100644 --- a/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java +++ b/magix/magix-java-client/src/main/java/ru/mipt/npm/magix/client/MagixClient.java @@ -12,9 +12,9 @@ import java.util.concurrent.Flow; * @param */ public interface MagixClient { - void broadcast(MagixMessage msg) throws IOException; + void broadcast(MagixMessage msg) throws IOException; - Flow.Publisher> subscribe(); + Flow.Publisher subscribe(); /** * Create a magix endpoint client using RSocket with raw tcp connection @@ -23,7 +23,7 @@ public interface MagixClient { * @return the client */ static MagixClient rSocketTcp(String host, int port) { - return ControlsMagixClient.Companion.rSocketTcp(host, port, JsonElement.Companion.serializer()); + return ControlsMagixClient.Companion.rSocketTcp(host, port); } /** @@ -31,7 +31,6 @@ public interface MagixClient { * @param host host name of magix server event loop * @param port port of magix server event loop * @param path - * @return */ static MagixClient rSocketWs(String host, int port, String path) { return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path); diff --git a/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt b/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt index 476a73d..11c2681 100644 --- a/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt +++ b/magix/magix-java-client/src/main/kotlin/ru/mipt/npm/magix/client/ControlsMagixClient.kt @@ -11,25 +11,24 @@ import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets import java.util.concurrent.Flow internal class ControlsMagixClient( - private val endpoint: MagixEndpoint, + private val endpoint: MagixEndpoint, private val filter: MagixMessageFilter, ) : MagixClient { - override fun broadcast(msg: MagixMessage): Unit = runBlocking { + override fun broadcast(msg: MagixMessage): Unit = runBlocking { endpoint.broadcast(msg) } - override fun subscribe(): Flow.Publisher> = endpoint.subscribe(filter).asPublisher() + override fun subscribe(): Flow.Publisher = endpoint.subscribe(filter).asPublisher() companion object { fun rSocketTcp( host: String, port: Int, - payloadSerializer: KSerializer ): ControlsMagixClient { val endpoint = runBlocking { - MagixEndpoint.rSocketWithTcp(host, payloadSerializer, port) + MagixEndpoint.rSocketWithTcp(host, port) } return ControlsMagixClient(endpoint, MagixMessageFilter()) } @@ -41,7 +40,7 @@ internal class ControlsMagixClient( path: String = "/rsocket" ): ControlsMagixClient { val endpoint = runBlocking { - MagixEndpoint.rSocketWithWebSockets(host, payloadSerializer, port, path) + MagixEndpoint.rSocketWithWebSockets(host, port, path) } return ControlsMagixClient(endpoint, MagixMessageFilter()) } diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index 7fdcee4..4b05e6f 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -16,7 +16,6 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.withContext -import kotlinx.serialization.KSerializer import kotlinx.serialization.encodeToString import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage @@ -25,27 +24,24 @@ import ru.mipt.npm.magix.api.filter import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -public class RSocketMagixEndpoint( - payloadSerializer: KSerializer, +public class RSocketMagixEndpoint( private val rSocket: RSocket, private val coroutineContext: CoroutineContext, -) : MagixEndpoint { - - private val serializer = MagixMessage.serializer(payloadSerializer) +) : MagixEndpoint { override fun subscribe( filter: MagixMessageFilter, - ): Flow> { + ): Flow { val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } val flow = rSocket.requestStream(payload) return flow.map { - MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) + MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()) }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) } - override suspend fun broadcast(message: MagixMessage) { + override suspend fun broadcast(message: MagixMessage) { withContext(coroutineContext) { - val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) } + val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) } rSocket.fireAndForget(payload) } } @@ -63,13 +59,12 @@ internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionCon /** * Build a websocket based endpoint connected to [host], [port] and given routing [path] */ -public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( +public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( host: String, - payloadSerializer: KSerializer, port: Int = DEFAULT_MAGIX_HTTP_PORT, path: String = "/rsocket", rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, -): RSocketMagixEndpoint { +): RSocketMagixEndpoint { val client = HttpClient { install(WebSockets) install(RSocketSupport) { @@ -84,5 +79,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( client.close() } - return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext) + return RSocketMagixEndpoint(rSocket, coroutineContext) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt index d3c0719..05a596b 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt @@ -4,7 +4,6 @@ import io.ktor.network.sockets.SocketOptions import io.ktor.util.InternalAPI import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.transport.ktor.tcp.TcpClientTransport -import kotlinx.serialization.KSerializer import ru.mipt.npm.magix.api.MagixEndpoint import kotlin.coroutines.coroutineContext @@ -13,13 +12,12 @@ import kotlin.coroutines.coroutineContext * Create a plain TCP based [RSocketMagixEndpoint] connected to [host] and [port] */ @OptIn(InternalAPI::class) -public suspend fun MagixEndpoint.Companion.rSocketWithTcp( +public suspend fun MagixEndpoint.Companion.rSocketWithTcp( host: String, - payloadSerializer: KSerializer, port: Int = DEFAULT_MAGIX_RAW_PORT, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, -): RSocketMagixEndpoint { +): RSocketMagixEndpoint { val transport = TcpClientTransport( hostname = host, port = port, @@ -27,5 +25,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( ) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext) + return RSocketMagixEndpoint(rSocket, coroutineContext) } diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt index a19c8a4..70d683f 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/magixModule.kt @@ -21,44 +21,39 @@ import io.rsocket.kotlin.payload.data import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.* import kotlinx.html.* -import kotlinx.serialization.KSerializer -import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter import ru.mipt.npm.magix.api.filter import java.util.* -public typealias GenericMagixMessage = MagixMessage -internal val genericMessageSerializer: KSerializer> = - MagixMessage.serializer(JsonElement.serializer()) - - -internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow) = ConnectionAcceptor { +internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow) = ConnectionAcceptor { RSocketRequestHandler { //handler for request/stream requestStream { request: Payload -> val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) magixFlow.filter(filter).map { message -> - val string = magixJson.encodeToString(genericMessageSerializer, message) + val string = magixJson.encodeToString(message) buildPayload { data(string) } } } fireAndForget { request: Payload -> - val message = magixJson.decodeFromString(genericMessageSerializer, request.data.readText()) + val message = magixJson.decodeFromString(request.data.readText()) magixFlow.emit(message) } // bi-directional connection requestChannel { request: Payload, input: Flow -> input.onEach { - magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText())) + magixFlow.emit(magixJson.decodeFromString(it.data.readText())) }.launchIn(this@magixAcceptor) val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) magixFlow.filter(filter).map { message -> - val string = magixJson.encodeToString(genericMessageSerializer, message) + val string = magixJson.encodeToString(message) buildPayload { data(string) } } } @@ -86,7 +81,7 @@ private fun ApplicationCall.buildFilter(): MagixMessageFilter { /** * Attache magix http/sse and websocket-based rsocket event loop + statistics page to existing [MutableSharedFlow] */ -public fun Application.magixModule(magixFlow: MutableSharedFlow, route: String = "/") { +public fun Application.magixModule(magixFlow: MutableSharedFlow, route: String = "/") { if (pluginOrNull(WebSockets) == null) { install(WebSockets) } @@ -126,7 +121,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow li { code { - +magixJson.encodeToString(genericMessageSerializer, message) + +magixJson.encodeToString(message) } } } @@ -138,14 +133,14 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow() + val message = call.receive() magixFlow.emit(message) } //rSocket server. Filter from Payload @@ -158,6 +153,6 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow(buffer) + val magixFlow = MutableSharedFlow(buffer) magixModule(magixFlow, route) } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt index 2a59c11..bc84f63 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -14,12 +14,13 @@ import org.slf4j.LoggerFactory import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT +import ru.mipt.npm.magix.api.MagixMessage /** * Raw TCP magix server */ public fun CoroutineScope.launchMagixServerRawRSocket( - magixFlow: MutableSharedFlow, + magixFlow: MutableSharedFlow, rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, ): TcpServer { val tcpTransport = TcpServerTransport(port = rawSocketPort) @@ -41,10 +42,10 @@ public fun CoroutineScope.startMagixServer( buffer: Int = 100, enableRawRSocket: Boolean = true, enableZmq: Boolean = true, - applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, + applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, ): ApplicationEngine { val logger = LoggerFactory.getLogger("magix-server") - val magixFlow = MutableSharedFlow( + val magixFlow = MutableSharedFlow( buffer, extraBufferCapacity = buffer ) diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt index e62acd7..f0c62b8 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/zmqMagixServerSocket.kt @@ -4,13 +4,16 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString import org.slf4j.LoggerFactory import org.zeromq.SocketType import org.zeromq.ZContext import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixMessage public fun CoroutineScope.launchMagixServerZmqSocket( - magixFlow: MutableSharedFlow, + magixFlow: MutableSharedFlow, localHost: String = "tcp://*", zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, @@ -22,7 +25,7 @@ public fun CoroutineScope.launchMagixServerZmqSocket( val pubSocket = context.createSocket(SocketType.PUB) pubSocket.bind("$localHost:$zmqPubSocketPort") magixFlow.onEach { message -> - val string = MagixEndpoint.magixJson.encodeToString(genericMessageSerializer, message) + val string = MagixEndpoint.magixJson.encodeToString(message) pubSocket.send(string) logger.debug("Published: $string") }.launchIn(this) @@ -36,7 +39,7 @@ public fun CoroutineScope.launchMagixServerZmqSocket( val string: String? = pullSocket.recvStr() if (string != null) { logger.debug("Received: $string") - val message = MagixEndpoint.magixJson.decodeFromString(genericMessageSerializer, string) + val message = MagixEndpoint.magixJson.decodeFromString(string) magixFlow.emit(message) } } diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt index 6c76ee7..3b98368 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt @@ -5,7 +5,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.JsonElement import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter @@ -13,7 +12,7 @@ import ru.mipt.npm.magix.api.MagixMessageFilter public class XodusMagixStorage( scope: CoroutineScope, private val store: PersistentEntityStore, - endpoint: MagixEndpoint, + endpoint: MagixEndpoint, filter: MagixMessageFilter = MagixMessageFilter(), ) : AutoCloseable { @@ -21,22 +20,22 @@ public class XodusMagixStorage( private val subscriptionJob = endpoint.subscribe(filter).onEach { message -> store.executeInTransaction { transaction -> transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply { - setProperty(MagixMessage<*>::origin.name, message.origin) - setProperty(MagixMessage<*>::format.name, message.format) + setProperty(MagixMessage::origin.name, message.origin) + setProperty(MagixMessage::format.name, message.format) - setBlobString(MagixMessage<*>::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload)) + setBlobString(MagixMessage::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload)) message.target?.let { - setProperty(MagixMessage<*>::target.name, it) + setProperty(MagixMessage::target.name, it) } message.id?.let { - setProperty(MagixMessage<*>::id.name, it) + setProperty(MagixMessage::id.name, it) } message.parentId?.let { - setProperty(MagixMessage<*>::parentId.name, it) + setProperty(MagixMessage::parentId.name, it) } message.user?.let { - setBlobString(MagixMessage<*>::user.name, MagixEndpoint.magixJson.encodeToString(it)) + setBlobString(MagixMessage::user.name, MagixEndpoint.magixJson.encodeToString(it)) } } } diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index 99f209e..56846e9 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -4,7 +4,8 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.flowOn -import kotlinx.serialization.KSerializer +import kotlinx.serialization.decodeFromString +import kotlinx.serialization.encodeToString import org.zeromq.SocketType import org.zeromq.ZContext import org.zeromq.ZMQ @@ -16,19 +17,16 @@ import ru.mipt.npm.magix.api.filter import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext -public class ZmqMagixEndpoint( +public class ZmqMagixEndpoint( private val host: String, - payloadSerializer: KSerializer, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val coroutineContext: CoroutineContext = Dispatchers.IO, -) : MagixEndpoint, AutoCloseable { +) : MagixEndpoint, AutoCloseable { private val zmqContext by lazy { ZContext() } - private val serializer = MagixMessage.serializer(payloadSerializer) - @OptIn(ExperimentalCoroutinesApi::class) - override fun subscribe(filter: MagixMessageFilter): Flow> { + override fun subscribe(filter: MagixMessageFilter): Flow { val socket = zmqContext.createSocket(SocketType.SUB) socket.connect("$host:$pubPort") socket.subscribe("") @@ -42,7 +40,7 @@ public class ZmqMagixEndpoint( //This is a blocking call. val string: String? = socket.recvStr() if (string != null) { - val message = MagixEndpoint.magixJson.decodeFromString(serializer, string) + val message = MagixEndpoint.magixJson.decodeFromString(string) send(message) } } catch (t: Throwable) { @@ -64,8 +62,8 @@ public class ZmqMagixEndpoint( } } - override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { - val string = MagixEndpoint.magixJson.encodeToString(serializer, message) + override suspend fun broadcast(message: MagixMessage): Unit = withContext(coroutineContext) { + val string = MagixEndpoint.magixJson.encodeToString(message) publishSocket.send(string) } @@ -76,12 +74,10 @@ public class ZmqMagixEndpoint( public suspend fun MagixEndpoint.Companion.zmq( host: String, - payloadSerializer: KSerializer, pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, pullPort: Int = DEFAULT_MAGIX_ZMQ_PULL_PORT, -): ZmqMagixEndpoint = ZmqMagixEndpoint( +): ZmqMagixEndpoint = ZmqMagixEndpoint( host, - payloadSerializer, pubPort, pullPort, coroutineContext = coroutineContext