Add rsocket server

This commit is contained in:
Alexander Nozik 2020-11-02 17:53:53 +03:00
parent 8c8d53b187
commit e5883dc318
10 changed files with 327 additions and 6 deletions

View File

@ -4,8 +4,9 @@ plugins {
kotlin("js") apply false kotlin("js") apply false
} }
val dataforgeVersion: String by extra("0.2.0-dev-3") val dataforgeVersion: String by extra("0.2.0-dev-4")
val ktorVersion: String by extra("1.4.1") val ktorVersion: String by extra("1.4.1")
val rsocketVersion by extra("0.10.0")
allprojects { allprojects {
repositories { repositories {
@ -14,10 +15,12 @@ allprojects {
maven("http://maven.jzy3d.org/releases") maven("http://maven.jzy3d.org/releases")
maven("https://kotlin.bintray.com/js-externals") maven("https://kotlin.bintray.com/js-externals")
maven("https://maven.pkg.github.com/altavir/kotlin-logging/") maven("https://maven.pkg.github.com/altavir/kotlin-logging/")
maven("https://dl.bintray.com/rsocket-admin/RSocket")
maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
} }
group = "hep.dataforge" group = "hep.dataforge"
version = "0.0.1" version = "0.1.0"
} }
ksciencePublish { ksciencePublish {

3
magix/build.gradle.kts Normal file
View File

@ -0,0 +1,3 @@
subprojects{
}

View File

@ -0,0 +1,13 @@
plugins {
id("ru.mipt.npm.mpp")
id("ru.mipt.npm.publish")
}
kscience {
useSerialization()
useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra

View File

@ -0,0 +1,22 @@
package hep.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.KSerializer
/**
* Inwards API of magix endpoint used to build plugins
*/
public interface MagixEndpoint {
public val scope: CoroutineScope
public fun <T> subscribe(
payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<T>>
public suspend fun <T> send(
payloadSerializer: KSerializer<T>,
message: MagixMessage<T>
)
}

View File

@ -0,0 +1,31 @@
package hep.dataforge.magix.api
import kotlinx.serialization.Serializable
/**
*
* Magix message according to [magix specification](https://github.com/piazza-controls/rfc/tree/master/1)
* with a [correction](https://github.com/piazza-controls/rfc/issues/12)
*
* {
* "format": "string[required]",
* "id":"string|number[optional, but desired]",
* "parentId": "string|number[optional]",
* "target":"string[optional]",
* "origin":"string[required]",
* "user":"string[optional]",
* "action":"string[optional, default='heartbeat']",
* "payload":"object[optional]"
* }
*/
@Serializable
public data class MagixMessage<T>(
val format: String,
val origin: String,
val payload: T,
val target: String? = null,
val id: String? = null,
val parentId: String? = null,
val user: String? = null,
val action: String? = null
)

View File

@ -0,0 +1,35 @@
package hep.dataforge.magix.api
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.serialization.Serializable
@Serializable
public data class MagixMessageFilter(
val format: List<String>? = null,
val origin: List<String>? = null,
val target: List<String?>? = null,
val user: List<String?>? = null,
val action: List<String?>? = null,
) {
public companion object {
public val ALL: MagixMessageFilter = MagixMessageFilter()
}
}
/**
* Filter a [Flow] of messages based on given filter
*/
public fun <T> Flow<MagixMessage<T>>.filter(filter: MagixMessageFilter): Flow<MagixMessage<T>> {
if (filter == MagixMessageFilter.ALL) {
return this
}
return filter { message ->
filter.format?.contains(message.format) ?: true
&& filter.origin?.contains(message.origin) ?: true
&& filter.origin?.contains(message.origin) ?: true
&& filter.target?.contains(message.target) ?: true
&& filter.user?.contains(message.user) ?: true
&& filter.action?.contains(message.action) ?: true
}
}

View File

@ -0,0 +1,26 @@
plugins {
id("ru.mipt.npm.jvm")
id("ru.mipt.npm.publish")
application
}
kscience {
useSerialization()
}
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra
val rsocketVersion: String by rootProject.extra
dependencies{
api(project(":magix:magix-api"))
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion")
implementation("io.ktor:ktor-serialization:$ktorVersion")
implementation("io.ktor:ktor-html-builder:$ktorVersion")
implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion")
implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion")
implementation("ru.mipt.npm:ktor-client-sse:0.1.0")
}

View File

@ -0,0 +1,31 @@
package hep.dataforge.magix.server
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
public const val DEFAULT_MAGIX_SERVER_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
@OptIn(KtorExperimentalAPI::class)
public fun startMagixServer(port: Int = DEFAULT_MAGIX_SERVER_PORT, host: String = "0.0.0.0", buffer: Int = 100): ApplicationEngine {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
//TODO add raw sockets server from https://github.com/rsocket/rsocket-kotlin/blob/master/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt#L102
// val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = 8000)
// rSocketServer.bind(tcpTransport, acceptor)
return embeddedServer(CIO, port = port, host = host){
magixModule(magixFlow)
}
}

View File

@ -0,0 +1,154 @@
package hep.dataforge.magix.server
import hep.dataforge.magix.api.MagixMessage
import hep.dataforge.magix.api.MagixMessageFilter
import hep.dataforge.magix.api.filter
import io.ktor.application.*
import io.ktor.features.CORS
import io.ktor.features.ContentNegotiation
import io.ktor.html.respondHtml
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.request.receive
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.routing.get
import io.ktor.routing.post
import io.ktor.routing.route
import io.ktor.routing.routing
import io.ktor.serialization.json
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.getValue
import io.ktor.websocket.WebSockets
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketServerSupport
import io.rsocket.kotlin.core.rSocket
import io.rsocket.kotlin.payload.Payload
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import kotlinx.html.*
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import ru.mipt.npm.ktor.sse.SseEvent
import ru.mipt.npm.ktor.sse.writeSseFlow
public typealias GenericMagixMessage = MagixMessage<JsonElement>
private val genericMessageSerializer: KSerializer<MagixMessage<JsonElement>> =
MagixMessage.serializer(JsonElement.serializer())
@OptIn(KtorExperimentalAPI::class)
public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
}
}
/**
* Create a message filter from call parameters
*/
@OptIn(KtorExperimentalAPI::class)
private fun ApplicationCall.buildFilter(): MagixMessageFilter {
val query = request.queryParameters
if (query.isEmpty()) {
return MagixMessageFilter.ALL
}
val format: List<String>? by query
val origin: List<String>? by query
return MagixMessageFilter(
format,
origin
)
}
public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMessage>, route: String = "/") {
if (featureOrNull(WebSockets) == null) {
install(WebSockets)
}
if (featureOrNull(CORS) == null) {
install(CORS) {
//TODO consider more safe policy
anyHost()
}
}
if (featureOrNull(ContentNegotiation) == null) {
install(ContentNegotiation) {
json()
}
}
if (featureOrNull(RSocketServerSupport) == null) {
install(RSocketServerSupport)
}
routing {
route(route) {
post {
val message = call.receive<GenericMagixMessage>()
magixFlow.emit(message)
}
get {
call.respondHtml {
body {
h1 { +"Magix stream statistics" }
h2 { +"Number of subscribers: ${magixFlow.subscriptionCount.value}" }
h3 { +"Replay cache size: ${magixFlow.replayCache.size}" }
h3 { +"Replay cache:" }
ol {
magixFlow.replayCache.forEach { message ->
li {
code {
+Json.encodeToString(genericMessageSerializer, message)
}
}
}
}
}
}
}
//SSE server. Filter from query
get("sse") {
val filter = call.buildFilter()
var idCounter = 0
val sseFlow = magixFlow.filter(filter).map {
val data = Json.encodeToString(genericMessageSerializer, it)
SseEvent(data, id = idCounter++.toString())
}
call.respondSse(sseFlow)
}
//rSocket server. Filter from Payload
rSocket("rsocket") {
RSocketRequestHandler {
//handler for request/stream
requestStream = { request: Payload ->
val filter = Json.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message ->
val string = Json.encodeToString(genericMessageSerializer, message)
Payload(string)
}
}
fireAndForget = { request: Payload ->
val message = Json.decodeFromString(genericMessageSerializer, payload.data.readText())
magixFlow.emit(message)
}
}
}
}
}
}
public fun Application.magixModule(route: String = "/", buffer: Int = 100) {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
magixModule(magixFlow, route)
}

View File

@ -1,6 +1,6 @@
pluginManagement { pluginManagement {
val kotlinVersion = "1.4.20-M1" val kotlinVersion = "1.4.20-M2"
val toolsVersion = "0.6.3-dev-1.4.20-M1" val toolsVersion = "0.6.4-dev-1.4.20-M2"
repositories { repositories {
mavenLocal() mavenLocal()
@ -32,8 +32,11 @@ include(
":dataforge-device-serial", ":dataforge-device-serial",
":dataforge-device-server", ":dataforge-device-server",
":dataforge-magix-client", ":dataforge-magix-client",
// ":demo", ":motors",
":motors" ":demo",
":magix",
":magix:magix-api",
":magix:magix-server"
) )
//includeBuild("../dataforge-core") //includeBuild("../dataforge-core")