Move serializer to endpoint parameter

This commit is contained in:
Alexander Nozik 2021-06-20 14:46:02 +03:00
parent e3ea2304ef
commit 6e4bf51a6f
22 changed files with 172 additions and 200 deletions

View File

@ -1,29 +1,22 @@
plugins {
id("ru.mipt.npm.gradle.project")
kotlin("jvm") apply false
kotlin("js") apply false
}
val dataforgeVersion: String by extra("0.4.0-dev-8")
val dataforgeVersion: String by extra("0.4.3")
val ktorVersion: String by extra(ru.mipt.npm.gradle.KScienceVersions.ktorVersion)
val rsocketVersion by extra("0.12.0")
allprojects {
repositories {
mavenLocal()
//maven("http://maven.jzy3d.org/releases")
maven(url = "https://maven.pkg.jetbrains.space/public/p/kotlinx-html/maven")
maven("https://kotlin.bintray.com/js-externals")
maven("https://dl.bintray.com/rsocket-admin/RSocket")
//maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
}
group = "ru.mipt.npm"
version = "0.1.0"
repositories{
jcenter()
}
}
ksciencePublish {
github("controls.kt")
space()
}
apiValidation {

View File

@ -5,14 +5,13 @@ import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.control.controllers.DeviceManager
import space.kscience.dataforge.control.controllers.respondMessage
import space.kscience.dataforge.control.messages.DeviceMessage
import space.kscience.dataforge.magix.api.MagixEndpoint
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixProcessor
public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge"
@ -27,10 +26,10 @@ private fun generateId(request: MagixMessage<DeviceMessage>): String = if (reque
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1)
*/
public fun DeviceManager.launchMagixClient(
endpoint: MagixEndpoint,
endpoint: MagixEndpoint<DeviceMessage>,
endpointID: String = DATAFORGE_MAGIX_FORMAT,
): Job = context.launch {
endpoint.subscribe(DeviceMessage.serializer()).onEach { request ->
endpoint.subscribe().onEach { request ->
//TODO analyze action
val responsePayload = respondMessage(request.payload)
@ -41,9 +40,9 @@ public fun DeviceManager.launchMagixClient(
origin = endpointID,
payload = responsePayload
)
endpoint.broadcast(DeviceMessage.serializer(), response)
endpoint.broadcast(response)
}.catch { error ->
logger.error(error){"Error while responding to message"}
logger.error(error) { "Error while responding to message" }
}.launchIn(this)
controller.messageOutput().onEach { payload ->
@ -54,13 +53,8 @@ public fun DeviceManager.launchMagixClient(
payload = payload
)
}.catch { error ->
logger.error(error){"Error while sending a message"}
logger.error(error) { "Error while sending a message" }
}.launchIn(this)
}
public fun DeviceManager.asMagixProcessor(endpointID: String = "dataforge"): MagixProcessor = object : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = launchMagixClient(endpoint, endpointID)
}

View File

@ -1,21 +0,0 @@
plugins {
id("ru.mipt.npm.gradle.mpp")
`maven-publish`
}
kscience{
useSerialization {
json()
}
}
kotlin {
sourceSets {
commonMain {
dependencies {
implementation(project(":magix:magix-server"))
implementation(project(":controls-core"))
}
}
}
}

View File

@ -17,13 +17,14 @@ dependencies{
implementation(project(":controls-server"))
implementation(project(":controls-magix-client"))
implementation("no.tornado:tornadofx:1.7.20")
implementation("space.kscience:plotlykt-server:0.4.0-dev-2")
implementation("space.kscience:plotlykt-server:0.4.2")
implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
kotlinOptions {
jvmTarget = "11"
freeCompilerArgs = freeCompilerArgs + "-Xjvm-default=all"
}
}

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.control.demo
package ru.mipt.npm.controls.demo
import io.ktor.server.engine.ApplicationEngine
import javafx.scene.Parent

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.control.demo
package ru.mipt.npm.controls.demo
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.control.demo
package ru.mipt.npm.controls.demo
import io.ktor.server.engine.ApplicationEngine
import kotlinx.coroutines.flow.*

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.control.demo
package ru.mipt.npm.controls.demo
import com.github.ricky12awesome.jss.encodeToSchema
import com.github.ricky12awesome.jss.globalJson

View File

@ -1,44 +0,0 @@
package space.kscience.dataforge.magix.api
import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
/**
* Inwards API of magix endpoint used to build services
*/
public interface MagixEndpoint {
/**
* Subscribe to a [Flow] of messages using specific [payloadSerializer]
*/
public fun <T> subscribe(
payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<T>>
/**
* Send an event using specific [payloadSerializer]
*/
public suspend fun <T> broadcast(
payloadSerializer: KSerializer<T>,
message: MagixMessage<T>,
)
public companion object {
public const val DEFAULT_MAGIX_WS_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
public val magixJson: Json = Json{
ignoreUnknownKeys = true
encodeDefaults = false
}
}
}
public fun MagixEndpoint.subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer(),filter)
public suspend fun MagixEndpoint.broadcast(message: MagixMessage<JsonElement>): Unit =
broadcast(JsonElement.serializer(), message)

View File

@ -1,60 +0,0 @@
package space.kscience.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
public fun interface MagixProcessor {
public fun process(endpoint: MagixEndpoint): Job
public companion object {
/**
* A converter from one (or several) format to another. It captures all events with the given filter then transforms it
* with given [transformer] and sends back to the loop with given [outputFormat].
*
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/
public fun <T : Any, R : Any> convert(
scope: CoroutineScope,
filter: MagixMessageFilter,
outputFormat: String,
inputSerializer: KSerializer<T>,
outputSerializer: KSerializer<R>,
newOrigin: String? = null,
transformer: suspend (T) -> R,
): MagixProcessor = MagixProcessor { endpoint ->
endpoint.subscribe(inputSerializer, filter).onEach { message ->
val newPayload = transformer(message.payload)
val transformed: MagixMessage<R> = MagixMessage(
outputFormat,
newOrigin ?: message.origin,
newPayload,
message.target,
message.id,
message.parentId,
message.user
)
endpoint.broadcast(outputSerializer, transformed)
}.launchIn(scope)
}
}
public fun convert(
scope: CoroutineScope,
filter: MagixMessageFilter,
outputFormat: String,
newOrigin: String? = null,
transformer: suspend (JsonElement) -> JsonElement,
): MagixProcessor = convert(
scope,
filter,
outputFormat,
JsonElement.serializer(),
JsonElement.serializer(),
newOrigin,
transformer
)
}

View File

@ -0,0 +1,67 @@
package ru.mipt.npm.magix.api
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import space.kscience.dataforge.magix.api.replacePayload
/**
* Inwards API of magix endpoint used to build services
*/
public interface MagixEndpoint<T> {
/**
* Subscribe to a [Flow] of messages
*/
public fun subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<T>>
/**
* Send an event
*/
public suspend fun broadcast(
message: MagixMessage<T>,
)
public companion object {
public const val DEFAULT_MAGIX_WS_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
public val magixJson: Json = Json {
ignoreUnknownKeys = true
encodeDefaults = false
}
}
}
/**
* Specialize this raw json endpoint to use specific serializer
*/
public fun <T : Any> MagixEndpoint<JsonElement>.specialize(
payloadSerializer: KSerializer<T>
): MagixEndpoint<T> = object : MagixEndpoint<T> {
override fun subscribe(
filter: MagixMessageFilter
): Flow<MagixMessage<T>> = this@specialize.subscribe(filter).map { message ->
message.replacePayload { payload ->
MagixEndpoint.magixJson.decodeFromJsonElement(payloadSerializer, payload)
}
}
override suspend fun broadcast(message: MagixMessage<T>) {
this@specialize.broadcast(
message.replacePayload { payload ->
MagixEndpoint.magixJson.encodeToJsonElement(
payloadSerializer,
payload
)
}
)
}
}

View File

@ -25,8 +25,15 @@ public data class MagixMessage<T>(
val origin: String,
val payload: T,
val target: String? = null,
val id: String? = null,
val id: String? = null,
val parentId: String? = null,
val user: JsonElement? = null,
val action: String? = null
)
/**
* Create message with same field but replaced payload
*/
@Suppress("UNCHECKED_CAST")
public fun <T, R> MagixMessage<T>.replacePayload(payloadTransform: (T) -> R): MagixMessage<R> =
MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user, action)

View File

@ -0,0 +1,31 @@
package space.kscience.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import ru.mipt.npm.magix.api.MagixEndpoint
/**
* Launch magix message converter service
*/
public fun <T, R> CoroutineScope.launchMagixConverter(
inputEndpoint: MagixEndpoint<T>,
outputEndpoint: MagixEndpoint<R>,
filter: MagixMessageFilter,
outputFormat: String,
newOrigin: String? = null,
transformer: suspend (T) -> R,
): Job = inputEndpoint.subscribe(filter).onEach { message->
val newPayload = transformer(message.payload)
val transformed: MagixMessage<R> = MagixMessage(
outputFormat,
newOrigin ?: message.origin,
newPayload,
message.target,
message.id,
message.parentId,
message.user
)
outputEndpoint.broadcast(transformed)
}.launchIn(this)

View File

@ -17,10 +17,10 @@ public interface MagixClient<T> {
Flow.Publisher<MagixMessage<T>> subscribe();
static MagixClient<JsonElement> rSocketTcp(String host, int port) {
return ControlsMagixClient.Companion.rSocketTcp(host, port);
return ControlsMagixClient.Companion.rSocketTcp(host, port, JsonElement.Companion.serializer());
}
static MagixClient<JsonElement> rSocketWs(String host, int port, String path) {
return ControlsMagixClient.Companion.rSocketWs(host, port, path);
return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path);
}
}

View File

@ -3,8 +3,7 @@ package ru.mipt.npm.magix.client
import kotlinx.coroutines.jdk9.asPublisher
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
import space.kscience.dataforge.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import space.kscience.dataforge.magix.service.RSocketMagixEndpoint
@ -12,31 +11,39 @@ import space.kscience.dataforge.magix.service.withTcp
import java.util.concurrent.Flow
public class ControlsMagixClient<T>(
private val endpoint: MagixEndpoint,
private val endpoint: MagixEndpoint<T>,
private val filter: MagixMessageFilter,
private val serializer: KSerializer<T>,
) : MagixClient<T> {
override fun broadcast(msg: MagixMessage<T>): Unit = runBlocking {
endpoint.broadcast(serializer, msg)
endpoint.broadcast(msg)
}
override fun subscribe(): Flow.Publisher<MagixMessage<T>> = endpoint.subscribe(serializer, filter).asPublisher()
override fun subscribe(): Flow.Publisher<MagixMessage<T>> = endpoint.subscribe(filter).asPublisher()
public companion object {
public fun rSocketTcp(host: String, port: Int): ControlsMagixClient<JsonElement> {
public fun <T> rSocketTcp(
host: String,
port: Int,
payloadSerializer: KSerializer<T>
): ControlsMagixClient<T> {
val endpoint = runBlocking {
RSocketMagixEndpoint.withTcp(host, port)
RSocketMagixEndpoint.withTcp(host, port, payloadSerializer)
}
return ControlsMagixClient(endpoint, MagixMessageFilter(), JsonElement.serializer())
return ControlsMagixClient(endpoint, MagixMessageFilter())
}
public fun rSocketWs(host: String, port: Int, path: String = "/rsocket"): ControlsMagixClient<JsonElement> {
public fun <T> rSocketWs(
host: String,
port: Int,
payloadSerializer: KSerializer<T>,
path: String = "/rsocket"
): ControlsMagixClient<T> {
val endpoint = runBlocking {
RSocketMagixEndpoint.withWebSockets(host, port, path)
RSocketMagixEndpoint.withWebSockets(host, port, payloadSerializer, path)
}
return ControlsMagixClient(endpoint, MagixMessageFilter(), JsonElement.serializer())
return ControlsMagixClient(endpoint, MagixMessageFilter())
}
}
}

View File

@ -11,8 +11,11 @@ import io.rsocket.kotlin.transport.ktor.serverTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableSharedFlow
import space.kscience.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import space.kscience.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT
import ru.mipt.npm.magix.server.GenericMagixMessage
import ru.mipt.npm.magix.server.magixAcceptor
import ru.mipt.npm.magix.server.magixModule
@OptIn(KtorExperimentalAPI::class)
public fun CoroutineScope.startMagixServer(

View File

@ -1,4 +1,4 @@
package space.kscience.dataforge.magix.server
package ru.mipt.npm.magix.server
import io.ktor.application.*
import io.ktor.features.CORS
@ -25,10 +25,12 @@ import kotlinx.coroutines.flow.*
import kotlinx.html.*
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
import space.kscience.dataforge.magix.api.MagixEndpoint.Companion.magixJson
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import space.kscience.dataforge.magix.api.filter
import space.kscience.dataforge.magix.server.SseEvent
import space.kscience.dataforge.magix.server.respondSse
import java.util.*
public typealias GenericMagixMessage = MagixMessage<JsonElement>

View File

@ -2,7 +2,6 @@ package space.kscience.dataforge.magix.service
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.core.RSocketConnectorBuilder
@ -15,19 +14,19 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import space.kscience.dataforge.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.magix.api.MagixMessage
import space.kscience.dataforge.magix.api.MagixMessageFilter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint(
public class RSocketMagixEndpoint<T>(
private val coroutineContext: CoroutineContext,
private val payloadSerializer: KSerializer<T>,
private val rSocket: RSocket,
) : MagixEndpoint {
) : MagixEndpoint<T> {
override fun <T> subscribe(
payloadSerializer: KSerializer<T>,
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer)
@ -36,7 +35,7 @@ public class RSocketMagixEndpoint(
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) }
}
override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
override suspend fun broadcast(message: MagixMessage<T>) {
withContext(coroutineContext) {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) }
@ -52,13 +51,13 @@ public class RSocketMagixEndpoint(
connectionConfig(rSocketConfig)
}
@OptIn(KtorExperimentalAPI::class)
public suspend fun withWebSockets(
public suspend fun <T> withWebSockets(
host: String,
port: Int,
payloadSerializer: KSerializer<T>,
path: String = "/rsocket",
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
): RSocketMagixEndpoint<T> {
val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
@ -73,7 +72,7 @@ public class RSocketMagixEndpoint(
client.close()
}
return RSocketMagixEndpoint(coroutineContext, rSocket)
return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket)
}
}
}

View File

@ -3,25 +3,25 @@ package space.kscience.dataforge.magix.service
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.SocketOptions
import io.ktor.network.sockets.aSocket
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.clientTransport
import kotlinx.coroutines.Dispatchers
import kotlinx.serialization.KSerializer
import kotlin.coroutines.coroutineContext
/**
* Create a plain TCP based [RSocketMagixEndpoint]
*/
@OptIn(KtorExperimentalAPI::class)
public suspend fun RSocketMagixEndpoint.Companion.withTcp(
public suspend fun <T> RSocketMagixEndpoint.Companion.withTcp(
host: String,
port: Int,
payloadSerializer: KSerializer<T>,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
): RSocketMagixEndpoint<T> {
val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(coroutineContext, rSocket)
return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket)
}

View File

@ -1,9 +1,9 @@
rootProject.name = "controls.kt"
pluginManagement {
val kotlinVersion = "1.5.0-M2"
val toolsVersion = "0.9.5-dev"
val toolsVersion = "0.10.0"
repositories {
mavenLocal()
maven("https://repo.kotlin.link")
mavenCentral()
gradlePluginPortal()
@ -14,15 +14,9 @@ pluginManagement {
id("ru.mipt.npm.gradle.mpp") version toolsVersion
id("ru.mipt.npm.gradle.jvm") version toolsVersion
id("ru.mipt.npm.gradle.js") version toolsVersion
id("ru.mipt.npm.gradle.publish") version toolsVersion
kotlin("jvm") version kotlinVersion
kotlin("js") version kotlinVersion
kotlin("multiplatform") version kotlinVersion
}
}
rootProject.name = "controls.kt"
include(
":controls-core",
":controls-tcp",
@ -35,7 +29,6 @@ include(
":magix:magix-service",
":magix:magix-java-client",
":controls-magix-client",
":controls-magix-server",
":motors"
)