From f0acbbb8cc7613cef54f5f175740c7636627bded Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 3 Nov 2020 18:54:52 +0300 Subject: [PATCH] Add rsocket service --- .gitignore | 4 + build.gradle.kts | 3 +- magix/magix-api/build.gradle.kts | 4 +- .../hep/dataforge/magix/api/MagixEndpoint.kt | 9 +- .../hep/dataforge/magix/api/MagixProcessor.kt | 21 +++++ magix/magix-server/build.gradle.kts | 6 +- .../hep/dataforge/magix/server/MagixServer.kt | 25 +++--- .../hep/dataforge/magix/server/magixModule.kt | 81 ++++++++--------- .../kotlin/hep/dataforge/magix/server/sse.kt | 39 ++++++++ magix/magix-service/build.gradle.kts | 31 +++++++ .../magix/service/RScocketMagixEndpoint.kt | 88 +++++++++++++++++++ settings.gradle.kts | 17 ++-- 12 files changed, 260 insertions(+), 68 deletions(-) create mode 100644 magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt create mode 100644 magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/sse.kt create mode 100644 magix/magix-service/build.gradle.kts create mode 100644 magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt diff --git a/.gitignore b/.gitignore index ea7eb83..3bf252e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,11 @@ # Created by .ignore support plugin (hsz.mobi) .idea/ .gradle + *.iws +*.iml +*.ipr + out/ build/ !gradle-wrapper.jar \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 48f9907..ac98e5b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { val dataforgeVersion: String by extra("0.2.0-dev-4") val ktorVersion: String by extra("1.4.1") -val rsocketVersion by extra("0.10.0") +val rsocketVersion by extra("0.11.0-SNAPSHOT") allprojects { repositories { @@ -17,6 +17,7 @@ allprojects { maven("https://maven.pkg.github.com/altavir/kotlin-logging/") maven("https://dl.bintray.com/rsocket-admin/RSocket") maven("https://maven.pkg.github.com/altavir/ktor-client-sse") + maven("https://oss.jfrog.org/oss-snapshot-local") } group = "hep.dataforge" diff --git a/magix/magix-api/build.gradle.kts b/magix/magix-api/build.gradle.kts index 650e08e..dd6269b 100644 --- a/magix/magix-api/build.gradle.kts +++ b/magix/magix-api/build.gradle.kts @@ -4,7 +4,9 @@ plugins { } kscience { - useSerialization() + useSerialization{ + json() + } useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) } diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt index 6f3ad55..c0a21fd 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt @@ -3,6 +3,7 @@ package hep.dataforge.magix.api import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.Json /** * Inwards API of magix endpoint used to build plugins @@ -10,7 +11,7 @@ import kotlinx.serialization.KSerializer public interface MagixEndpoint { public val scope: CoroutineScope - public fun subscribe( + public suspend fun subscribe( payloadSerializer: KSerializer, filter: MagixMessageFilter = MagixMessageFilter.ALL, ): Flow> @@ -19,4 +20,10 @@ public interface MagixEndpoint { payloadSerializer: KSerializer, message: MagixMessage ) + + public companion object{ + public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 + public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 + public val magixJson: Json = Json + } } \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt new file mode 100644 index 0000000..2378f4a --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt @@ -0,0 +1,21 @@ +package hep.dataforge.magix.api + +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.serialization.json.JsonElement + +public interface MagixProcessor { + public fun process(endpoint: MagixEndpoint): Job +} + +public class MagixConverter( + public val filter: MagixMessageFilter, + public val transformer: (JsonElement) -> JsonElement, +) : MagixProcessor { + override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch { + endpoint.subscribe(JsonElement.serializer(), filter).onEach { + TODO() + } + } +} \ No newline at end of file diff --git a/magix/magix-server/build.gradle.kts b/magix/magix-server/build.gradle.kts index 6296fed..674fde4 100644 --- a/magix/magix-server/build.gradle.kts +++ b/magix/magix-server/build.gradle.kts @@ -5,7 +5,9 @@ plugins { } kscience { - useSerialization() + useSerialization{ + json() + } } val dataforgeVersion: String by rootProject.extra @@ -21,6 +23,4 @@ dependencies{ implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion") implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") - - implementation("ru.mipt.npm:ktor-client-sse:0.1.0") } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt index 314373c..96a6e5d 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt @@ -1,31 +1,34 @@ package hep.dataforge.magix.server +import hep.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT +import hep.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT import io.ktor.network.selector.ActorSelectorManager import io.ktor.network.sockets.aSocket import io.ktor.server.cio.CIO import io.ktor.server.engine.ApplicationEngine import io.ktor.server.engine.embeddedServer import io.ktor.util.KtorExperimentalAPI +import io.rsocket.kotlin.core.RSocketServer +import io.rsocket.kotlin.transport.ktor.serverTransport +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.MutableSharedFlow -public const val DEFAULT_MAGIX_SERVER_PORT: Int = 7777 -public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 - @OptIn(KtorExperimentalAPI::class) -public fun startMagixServer(port: Int = DEFAULT_MAGIX_SERVER_PORT, host: String = "0.0.0.0", buffer: Int = 100): ApplicationEngine { +public fun CoroutineScope.startMagixServer( + port: Int = DEFAULT_MAGIX_WS_PORT, + rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, + buffer: Int = 100, +): ApplicationEngine { val magixFlow = MutableSharedFlow( buffer, - onBufferOverflow = BufferOverflow.DROP_OLDEST + extraBufferCapacity = buffer ) - //TODO add raw sockets server from https://github.com/rsocket/rsocket-kotlin/blob/master/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt#L102 + val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort) + RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow)) - // val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = 8000) - // rSocketServer.bind(tcpTransport, acceptor) - - return embeddedServer(CIO, port = port, host = host){ + return embeddedServer(CIO, port = port) { magixModule(magixFlow) } } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt index 4762c99..7549f79 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt @@ -1,5 +1,6 @@ package hep.dataforge.magix.server +import hep.dataforge.magix.api.MagixEndpoint.Companion.magixJson import hep.dataforge.magix.api.MagixMessage import hep.dataforge.magix.api.MagixMessageFilter import hep.dataforge.magix.api.filter @@ -7,11 +8,7 @@ import io.ktor.application.* import io.ktor.features.CORS import io.ktor.features.ContentNegotiation import io.ktor.html.respondHtml -import io.ktor.http.CacheControl -import io.ktor.http.ContentType import io.ktor.request.receive -import io.ktor.response.cacheControl -import io.ktor.response.respondBytesWriter import io.ktor.routing.get import io.ktor.routing.post import io.ktor.routing.route @@ -20,35 +17,51 @@ import io.ktor.serialization.json import io.ktor.util.KtorExperimentalAPI import io.ktor.util.getValue import io.ktor.websocket.WebSockets +import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.RSocketRequestHandler -import io.rsocket.kotlin.core.RSocketServerSupport -import io.rsocket.kotlin.core.rSocket import io.rsocket.kotlin.payload.Payload -import kotlinx.coroutines.channels.BufferOverflow -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.map +import io.rsocket.kotlin.transport.ktor.server.RSocketSupport +import io.rsocket.kotlin.transport.ktor.server.rSocket +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.* import kotlinx.html.* import kotlinx.serialization.KSerializer -import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonElement -import ru.mipt.npm.ktor.sse.SseEvent -import ru.mipt.npm.ktor.sse.writeSseFlow public typealias GenericMagixMessage = MagixMessage private val genericMessageSerializer: KSerializer> = MagixMessage.serializer(JsonElement.serializer()) -@OptIn(KtorExperimentalAPI::class) -public suspend fun ApplicationCall.respondSse(events: Flow) { - response.cacheControl(CacheControl.NoCache(null)) - respondBytesWriter(contentType = ContentType.Text.EventStream) { - writeSseFlow(events) + +internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow) = ConnectionAcceptor { + RSocketRequestHandler { + //handler for request/stream + requestStream { request: Payload -> + val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) + magixFlow.filter(filter).map { message -> + val string = magixJson.encodeToString(genericMessageSerializer, message) + Payload(string) + } + } + fireAndForget { request: Payload -> + val message = magixJson.decodeFromString(genericMessageSerializer, request.data.readText()) + magixFlow.emit(message) + } + // bi-directional connection + requestChannel { input: Flow -> + input.onEach { + magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer,it.data.readText())) + }.launchIn(this@magixAcceptor) + + magixFlow.map { message -> + val string = magixJson.encodeToString(genericMessageSerializer, message) + Payload(string) + } + } } } - /** * Create a message filter from call parameters */ @@ -85,8 +98,8 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow li { code { - +Json.encodeToString(genericMessageSerializer, message) + +magixJson.encodeToString(genericMessageSerializer, message) } } } @@ -119,36 +132,18 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow - val filter = Json.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) - magixFlow.filter(filter).map { message -> - val string = Json.encodeToString(genericMessageSerializer, message) - Payload(string) - } - } - fireAndForget = { request: Payload -> - val message = Json.decodeFromString(genericMessageSerializer, payload.data.readText()) - magixFlow.emit(message) - } - } - } + rSocket("rsocket", acceptor = magixAcceptor(magixFlow)) } } } public fun Application.magixModule(route: String = "/", buffer: Int = 100) { - val magixFlow = MutableSharedFlow( - buffer, - onBufferOverflow = BufferOverflow.DROP_OLDEST - ) + val magixFlow = MutableSharedFlow(buffer) magixModule(magixFlow, route) } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/sse.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/sse.kt new file mode 100644 index 0000000..1bf8598 --- /dev/null +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/sse.kt @@ -0,0 +1,39 @@ +package hep.dataforge.magix.server + +import io.ktor.application.ApplicationCall +import io.ktor.http.CacheControl +import io.ktor.http.ContentType +import io.ktor.response.cacheControl +import io.ktor.response.respondBytesWriter +import io.ktor.util.KtorExperimentalAPI +import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.writeStringUtf8 +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect + +/** + * The data class representing a SSE Event that will be sent to the client. + */ +public data class SseEvent(val data: String, val event: String? = "message", val id: String? = null) + +public suspend fun ByteWriteChannel.writeSseFlow(events: Flow): Unit = events.collect { event -> + if (event.id != null) { + writeStringUtf8("id: ${event.id}\n") + } + if (event.event != null) { + writeStringUtf8("event: ${event.event}\n") + } + for (dataLine in event.data.lines()) { + writeStringUtf8("data: $dataLine\n") + } + writeStringUtf8("\n") + flush() +} + +@OptIn(KtorExperimentalAPI::class) +public suspend fun ApplicationCall.respondSse(events: Flow) { + response.cacheControl(CacheControl.NoCache(null)) + respondBytesWriter(contentType = ContentType.Text.EventStream) { + writeSseFlow(events) + } +} \ No newline at end of file diff --git a/magix/magix-service/build.gradle.kts b/magix/magix-service/build.gradle.kts new file mode 100644 index 0000000..8aeb3fd --- /dev/null +++ b/magix/magix-service/build.gradle.kts @@ -0,0 +1,31 @@ +plugins { + id("ru.mipt.npm.mpp") + id("ru.mipt.npm.publish") +} + +kscience { + useSerialization{ + json() + } + useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) +} + +val dataforgeVersion: String by rootProject.extra +val ktorVersion: String by rootProject.extra +val rsocketVersion: String by rootProject.extra + +repositories{ + maven("https://maven.pkg.github.com/altavir/ktor-client-sse") +} + +kotlin { + sourceSets { + commonMain { + dependencies { + api(project(":magix:magix-api")) + implementation("io.ktor:ktor-client-core:$ktorVersion") + implementation("io.rsocket.kotlin:rsocket-transport-ktor-client:$rsocketVersion") + } + } + } +} \ No newline at end of file diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt new file mode 100644 index 0000000..d040ff6 --- /dev/null +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RScocketMagixEndpoint.kt @@ -0,0 +1,88 @@ +package hep.dataforge.magix.service + +import hep.dataforge.magix.api.MagixEndpoint +import hep.dataforge.magix.api.MagixEndpoint.Companion.magixJson +import hep.dataforge.magix.api.MagixMessage +import hep.dataforge.magix.api.MagixMessageFilter +import io.ktor.client.HttpClient +import io.ktor.client.features.websocket.WebSockets +import io.ktor.util.KtorExperimentalAPI +import io.rsocket.kotlin.RSocketRequestHandler +import io.rsocket.kotlin.core.RSocketConnector +import io.rsocket.kotlin.keepalive.KeepAlive +import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.payload.PayloadMimeType +import io.rsocket.kotlin.transport.ktor.client.RSocketSupport +import io.rsocket.kotlin.transport.ktor.client.rSocket +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import kotlinx.serialization.KSerializer +import kotlinx.serialization.encodeToString +import kotlin.time.minutes +import kotlin.time.seconds + +public class RScocketMagixEndpoint( + override val scope: CoroutineScope, + public val host: String, + public val port: Int, + public val path: String = "/rsocket", +) : MagixEndpoint { + //create ktor client + @OptIn(KtorExperimentalAPI::class) + private val client = HttpClient { + install(WebSockets) + install(RSocketSupport) { + connector = RSocketConnector { + reconnectable(10) + //configure rSocket connector (all values have defaults) + connectionConfig { + keepAlive = KeepAlive( + interval = 30.seconds, + maxLifetime = 2.minutes + ) + +// //payload for setup frame +// setupPayload { Payload("hello world") } + + //mime types + payloadMimeType = PayloadMimeType( + data = "application/json", + metadata = "application/json" + ) + } + + //optional acceptor for server requests + acceptor { + RSocketRequestHandler { + requestResponse { it } //echo request payload + } + } + } + } + } + + private val rSocket = scope.async { + client.rSocket(host, port, path) + } + + override suspend fun subscribe( + payloadSerializer: KSerializer, + filter: MagixMessageFilter, + ): Flow> { + val serializer = MagixMessage.serializer(payloadSerializer) + val payload = Payload(magixJson.encodeToString(filter)) + val flow = rSocket.await().requestStream(payload) + return flow.map { magixJson.decodeFromString(serializer, it.data.readText()) } + } + + override suspend fun send(payloadSerializer: KSerializer, message: MagixMessage) { + scope.launch { + val serializer = MagixMessage.serializer(payloadSerializer) + val payload = Payload(magixJson.encodeToString(serializer, message)) + rSocket.await().fireAndForget(payload) + } + } +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index ac10a37..55fc746 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -27,16 +27,17 @@ pluginManagement { rootProject.name = "dataforge-control" include( - ":dataforge-device-core", - ":dataforge-device-tcp", - ":dataforge-device-serial", - ":dataforge-device-server", - ":dataforge-magix-client", - ":motors", - ":demo", +// ":dataforge-device-core", +// ":dataforge-device-tcp", +// ":dataforge-device-serial", +// ":dataforge-device-server", +// ":dataforge-magix-client", +// ":motors", +// ":demo", ":magix", ":magix:magix-api", - ":magix:magix-server" + ":magix:magix-server", + ":magix:magix-service" ) //includeBuild("../dataforge-core")