From e5883dc318522155fd92830dfe21f9bca51a710b Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 2 Nov 2020 17:53:53 +0300 Subject: [PATCH] Add rsocket server --- build.gradle.kts | 7 +- magix/build.gradle.kts | 3 + magix/magix-api/build.gradle.kts | 13 ++ .../hep/dataforge/magix/api/MagixEndpoint.kt | 22 +++ .../hep/dataforge/magix/api/MagixMessage.kt | 31 ++++ .../dataforge/magix/api/MagixMessageFilter.kt | 35 ++++ magix/magix-server/build.gradle.kts | 26 +++ .../hep/dataforge/magix/server/MagixServer.kt | 31 ++++ .../hep/dataforge/magix/server/magixModule.kt | 154 ++++++++++++++++++ settings.gradle.kts | 11 +- 10 files changed, 327 insertions(+), 6 deletions(-) create mode 100644 magix/build.gradle.kts create mode 100644 magix/magix-api/build.gradle.kts create mode 100644 magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt create mode 100644 magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt create mode 100644 magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt create mode 100644 magix/magix-server/build.gradle.kts create mode 100644 magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt create mode 100644 magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt diff --git a/build.gradle.kts b/build.gradle.kts index c89c248..48f9907 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -4,8 +4,9 @@ plugins { kotlin("js") apply false } -val dataforgeVersion: String by extra("0.2.0-dev-3") +val dataforgeVersion: String by extra("0.2.0-dev-4") val ktorVersion: String by extra("1.4.1") +val rsocketVersion by extra("0.10.0") allprojects { repositories { @@ -14,10 +15,12 @@ allprojects { maven("http://maven.jzy3d.org/releases") maven("https://kotlin.bintray.com/js-externals") maven("https://maven.pkg.github.com/altavir/kotlin-logging/") + maven("https://dl.bintray.com/rsocket-admin/RSocket") + maven("https://maven.pkg.github.com/altavir/ktor-client-sse") } group = "hep.dataforge" - version = "0.0.1" + version = "0.1.0" } ksciencePublish { diff --git a/magix/build.gradle.kts b/magix/build.gradle.kts new file mode 100644 index 0000000..ae31ca2 --- /dev/null +++ b/magix/build.gradle.kts @@ -0,0 +1,3 @@ +subprojects{ + +} \ No newline at end of file diff --git a/magix/magix-api/build.gradle.kts b/magix/magix-api/build.gradle.kts new file mode 100644 index 0000000..650e08e --- /dev/null +++ b/magix/magix-api/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("ru.mipt.npm.mpp") + id("ru.mipt.npm.publish") +} + +kscience { + useSerialization() + useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) +} + +val dataforgeVersion: String by rootProject.extra +val ktorVersion: String by rootProject.extra + diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt new file mode 100644 index 0000000..6f3ad55 --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt @@ -0,0 +1,22 @@ +package hep.dataforge.magix.api + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.serialization.KSerializer + +/** + * Inwards API of magix endpoint used to build plugins + */ +public interface MagixEndpoint { + public val scope: CoroutineScope + + public fun subscribe( + payloadSerializer: KSerializer, + filter: MagixMessageFilter = MagixMessageFilter.ALL, + ): Flow> + + public suspend fun send( + payloadSerializer: KSerializer, + message: MagixMessage + ) +} \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt new file mode 100644 index 0000000..a92275f --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessage.kt @@ -0,0 +1,31 @@ +package hep.dataforge.magix.api + +import kotlinx.serialization.Serializable + +/** + * + * Magix message according to [magix specification](https://github.com/piazza-controls/rfc/tree/master/1) + * with a [correction](https://github.com/piazza-controls/rfc/issues/12) + * + * { + * "format": "string[required]", + * "id":"string|number[optional, but desired]", + * "parentId": "string|number[optional]", + * "target":"string[optional]", + * "origin":"string[required]", + * "user":"string[optional]", + * "action":"string[optional, default='heartbeat']", + * "payload":"object[optional]" + * } + */ +@Serializable +public data class MagixMessage( + val format: String, + val origin: String, + val payload: T, + val target: String? = null, + val id: String? = null, + val parentId: String? = null, + val user: String? = null, + val action: String? = null +) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt new file mode 100644 index 0000000..8f9c097 --- /dev/null +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixMessageFilter.kt @@ -0,0 +1,35 @@ +package hep.dataforge.magix.api + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.serialization.Serializable + +@Serializable +public data class MagixMessageFilter( + val format: List? = null, + val origin: List? = null, + val target: List? = null, + val user: List? = null, + val action: List? = null, +) { + public companion object { + public val ALL: MagixMessageFilter = MagixMessageFilter() + } +} + +/** + * Filter a [Flow] of messages based on given filter + */ +public fun Flow>.filter(filter: MagixMessageFilter): Flow> { + if (filter == MagixMessageFilter.ALL) { + return this + } + return filter { message -> + filter.format?.contains(message.format) ?: true + && filter.origin?.contains(message.origin) ?: true + && filter.origin?.contains(message.origin) ?: true + && filter.target?.contains(message.target) ?: true + && filter.user?.contains(message.user) ?: true + && filter.action?.contains(message.action) ?: true + } +} \ No newline at end of file diff --git a/magix/magix-server/build.gradle.kts b/magix/magix-server/build.gradle.kts new file mode 100644 index 0000000..6296fed --- /dev/null +++ b/magix/magix-server/build.gradle.kts @@ -0,0 +1,26 @@ +plugins { + id("ru.mipt.npm.jvm") + id("ru.mipt.npm.publish") + application +} + +kscience { + useSerialization() +} + +val dataforgeVersion: String by rootProject.extra +val ktorVersion: String by rootProject.extra +val rsocketVersion: String by rootProject.extra + +dependencies{ + api(project(":magix:magix-api")) + implementation("io.ktor:ktor-server-cio:$ktorVersion") + implementation("io.ktor:ktor-websockets:$ktorVersion") + implementation("io.ktor:ktor-serialization:$ktorVersion") + implementation("io.ktor:ktor-html-builder:$ktorVersion") + + implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion") + implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion") + + implementation("ru.mipt.npm:ktor-client-sse:0.1.0") +} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt new file mode 100644 index 0000000..314373c --- /dev/null +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/MagixServer.kt @@ -0,0 +1,31 @@ +package hep.dataforge.magix.server + +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.server.cio.CIO +import io.ktor.server.engine.ApplicationEngine +import io.ktor.server.engine.embeddedServer +import io.ktor.util.KtorExperimentalAPI +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.MutableSharedFlow + +public const val DEFAULT_MAGIX_SERVER_PORT: Int = 7777 +public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 + +@OptIn(KtorExperimentalAPI::class) +public fun startMagixServer(port: Int = DEFAULT_MAGIX_SERVER_PORT, host: String = "0.0.0.0", buffer: Int = 100): ApplicationEngine { + + val magixFlow = MutableSharedFlow( + buffer, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + //TODO add raw sockets server from https://github.com/rsocket/rsocket-kotlin/blob/master/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt#L102 + + // val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = 8000) + // rSocketServer.bind(tcpTransport, acceptor) + + return embeddedServer(CIO, port = port, host = host){ + magixModule(magixFlow) + } +} \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt new file mode 100644 index 0000000..4762c99 --- /dev/null +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt @@ -0,0 +1,154 @@ +package hep.dataforge.magix.server + +import hep.dataforge.magix.api.MagixMessage +import hep.dataforge.magix.api.MagixMessageFilter +import hep.dataforge.magix.api.filter +import io.ktor.application.* +import io.ktor.features.CORS +import io.ktor.features.ContentNegotiation +import io.ktor.html.respondHtml +import io.ktor.http.CacheControl +import io.ktor.http.ContentType +import io.ktor.request.receive +import io.ktor.response.cacheControl +import io.ktor.response.respondBytesWriter +import io.ktor.routing.get +import io.ktor.routing.post +import io.ktor.routing.route +import io.ktor.routing.routing +import io.ktor.serialization.json +import io.ktor.util.KtorExperimentalAPI +import io.ktor.util.getValue +import io.ktor.websocket.WebSockets +import io.rsocket.kotlin.RSocketRequestHandler +import io.rsocket.kotlin.core.RSocketServerSupport +import io.rsocket.kotlin.core.rSocket +import io.rsocket.kotlin.payload.Payload +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.map +import kotlinx.html.* +import kotlinx.serialization.KSerializer +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement +import ru.mipt.npm.ktor.sse.SseEvent +import ru.mipt.npm.ktor.sse.writeSseFlow + +public typealias GenericMagixMessage = MagixMessage + +private val genericMessageSerializer: KSerializer> = + MagixMessage.serializer(JsonElement.serializer()) + +@OptIn(KtorExperimentalAPI::class) +public suspend fun ApplicationCall.respondSse(events: Flow) { + response.cacheControl(CacheControl.NoCache(null)) + respondBytesWriter(contentType = ContentType.Text.EventStream) { + writeSseFlow(events) + } +} + + +/** + * Create a message filter from call parameters + */ +@OptIn(KtorExperimentalAPI::class) +private fun ApplicationCall.buildFilter(): MagixMessageFilter { + val query = request.queryParameters + + if (query.isEmpty()) { + return MagixMessageFilter.ALL + } + + val format: List? by query + val origin: List? by query + return MagixMessageFilter( + format, + origin + ) +} + +public fun Application.magixModule(magixFlow: MutableSharedFlow, route: String = "/") { + if (featureOrNull(WebSockets) == null) { + install(WebSockets) + } + + if (featureOrNull(CORS) == null) { + install(CORS) { + //TODO consider more safe policy + anyHost() + } + } + if (featureOrNull(ContentNegotiation) == null) { + install(ContentNegotiation) { + json() + } + } + + if (featureOrNull(RSocketServerSupport) == null) { + install(RSocketServerSupport) + } + + routing { + route(route) { + post { + val message = call.receive() + magixFlow.emit(message) + } + get { + call.respondHtml { + body { + h1 { +"Magix stream statistics" } + h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" } + h3 { +"Replay cache size: ${magixFlow.replayCache.size}" } + h3 { +"Replay cache:" } + ol { + magixFlow.replayCache.forEach { message -> + li { + code { + +Json.encodeToString(genericMessageSerializer, message) + } + } + } + } + } + } + } + //SSE server. Filter from query + get("sse") { + val filter = call.buildFilter() + var idCounter = 0 + val sseFlow = magixFlow.filter(filter).map { + val data = Json.encodeToString(genericMessageSerializer, it) + SseEvent(data, id = idCounter++.toString()) + } + call.respondSse(sseFlow) + } + //rSocket server. Filter from Payload + rSocket("rsocket") { + RSocketRequestHandler { + //handler for request/stream + requestStream = { request: Payload -> + val filter = Json.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) + magixFlow.filter(filter).map { message -> + val string = Json.encodeToString(genericMessageSerializer, message) + Payload(string) + } + } + fireAndForget = { request: Payload -> + val message = Json.decodeFromString(genericMessageSerializer, payload.data.readText()) + magixFlow.emit(message) + } + } + } + } + } +} + +public fun Application.magixModule(route: String = "/", buffer: Int = 100) { + val magixFlow = MutableSharedFlow( + buffer, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + magixModule(magixFlow, route) +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 62c2f90..ac10a37 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,6 +1,6 @@ pluginManagement { - val kotlinVersion = "1.4.20-M1" - val toolsVersion = "0.6.3-dev-1.4.20-M1" + val kotlinVersion = "1.4.20-M2" + val toolsVersion = "0.6.4-dev-1.4.20-M2" repositories { mavenLocal() @@ -32,8 +32,11 @@ include( ":dataforge-device-serial", ":dataforge-device-server", ":dataforge-magix-client", -// ":demo", - ":motors" + ":motors", + ":demo", + ":magix", + ":magix:magix-api", + ":magix:magix-server" ) //includeBuild("../dataforge-core")