Add rsocket service

This commit is contained in:
Alexander Nozik 2020-11-03 18:54:52 +03:00
parent e5883dc318
commit f0acbbb8cc
12 changed files with 260 additions and 68 deletions

4
.gitignore vendored
View File

@ -1,7 +1,11 @@
# Created by .ignore support plugin (hsz.mobi)
.idea/
.gradle
*.iws
*.iml
*.ipr
out/
build/
!gradle-wrapper.jar

View File

@ -6,7 +6,7 @@ plugins {
val dataforgeVersion: String by extra("0.2.0-dev-4")
val ktorVersion: String by extra("1.4.1")
val rsocketVersion by extra("0.10.0")
val rsocketVersion by extra("0.11.0-SNAPSHOT")
allprojects {
repositories {
@ -17,6 +17,7 @@ allprojects {
maven("https://maven.pkg.github.com/altavir/kotlin-logging/")
maven("https://dl.bintray.com/rsocket-admin/RSocket")
maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
maven("https://oss.jfrog.org/oss-snapshot-local")
}
group = "hep.dataforge"

View File

@ -4,7 +4,9 @@ plugins {
}
kscience {
useSerialization()
useSerialization{
json()
}
useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}

View File

@ -3,6 +3,7 @@ package hep.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
/**
* Inwards API of magix endpoint used to build plugins
@ -10,7 +11,7 @@ import kotlinx.serialization.KSerializer
public interface MagixEndpoint {
public val scope: CoroutineScope
public fun <T> subscribe(
public suspend fun <T> subscribe(
payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<T>>
@ -19,4 +20,10 @@ public interface MagixEndpoint {
payloadSerializer: KSerializer<T>,
message: MagixMessage<T>
)
public companion object{
public const val DEFAULT_MAGIX_WS_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
public val magixJson: Json = Json
}
}

View File

@ -0,0 +1,21 @@
package hep.dataforge.magix.api
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonElement
public interface MagixProcessor {
public fun process(endpoint: MagixEndpoint): Job
}
public class MagixConverter(
public val filter: MagixMessageFilter,
public val transformer: (JsonElement) -> JsonElement,
) : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch {
endpoint.subscribe(JsonElement.serializer(), filter).onEach {
TODO()
}
}
}

View File

@ -5,7 +5,9 @@ plugins {
}
kscience {
useSerialization()
useSerialization{
json()
}
}
val dataforgeVersion: String by rootProject.extra
@ -21,6 +23,4 @@ dependencies{
implementation("io.rsocket.kotlin:rsocket-core:$rsocketVersion")
implementation("io.rsocket.kotlin:rsocket-transport-ktor-server:$rsocketVersion")
implementation("ru.mipt.npm:ktor-client-sse:0.1.0")
}

View File

@ -1,31 +1,34 @@
package hep.dataforge.magix.server
import hep.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import hep.dataforge.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.aSocket
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.serverTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
public const val DEFAULT_MAGIX_SERVER_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
@OptIn(KtorExperimentalAPI::class)
public fun startMagixServer(port: Int = DEFAULT_MAGIX_SERVER_PORT, host: String = "0.0.0.0", buffer: Int = 100): ApplicationEngine {
public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_WS_PORT,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
buffer: Int = 100,
): ApplicationEngine {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
extraBufferCapacity = buffer
)
//TODO add raw sockets server from https://github.com/rsocket/rsocket-kotlin/blob/master/examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt#L102
val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort)
RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow))
// val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = 8000)
// rSocketServer.bind(tcpTransport, acceptor)
return embeddedServer(CIO, port = port, host = host){
return embeddedServer(CIO, port = port) {
magixModule(magixFlow)
}
}

View File

@ -1,5 +1,6 @@
package hep.dataforge.magix.server
import hep.dataforge.magix.api.MagixEndpoint.Companion.magixJson
import hep.dataforge.magix.api.MagixMessage
import hep.dataforge.magix.api.MagixMessageFilter
import hep.dataforge.magix.api.filter
@ -7,11 +8,7 @@ import io.ktor.application.*
import io.ktor.features.CORS
import io.ktor.features.ContentNegotiation
import io.ktor.html.respondHtml
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.request.receive
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.routing.get
import io.ktor.routing.post
import io.ktor.routing.route
@ -20,35 +17,51 @@ import io.ktor.serialization.json
import io.ktor.util.KtorExperimentalAPI
import io.ktor.util.getValue
import io.ktor.websocket.WebSockets
import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketServerSupport
import io.rsocket.kotlin.core.rSocket
import io.rsocket.kotlin.payload.Payload
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.map
import io.rsocket.kotlin.transport.ktor.server.RSocketSupport
import io.rsocket.kotlin.transport.ktor.server.rSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.html.*
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import ru.mipt.npm.ktor.sse.SseEvent
import ru.mipt.npm.ktor.sse.writeSseFlow
public typealias GenericMagixMessage = MagixMessage<JsonElement>
private val genericMessageSerializer: KSerializer<MagixMessage<JsonElement>> =
MagixMessage.serializer(JsonElement.serializer())
@OptIn(KtorExperimentalAPI::class)
public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMagixMessage>) = ConnectionAcceptor {
RSocketRequestHandler {
//handler for request/stream
requestStream { request: Payload ->
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(genericMessageSerializer, message)
Payload(string)
}
}
fireAndForget { request: Payload ->
val message = magixJson.decodeFromString(genericMessageSerializer, request.data.readText())
magixFlow.emit(message)
}
// bi-directional connection
requestChannel { input: Flow<Payload> ->
input.onEach {
magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer,it.data.readText()))
}.launchIn(this@magixAcceptor)
magixFlow.map { message ->
val string = magixJson.encodeToString(genericMessageSerializer, message)
Payload(string)
}
}
}
}
/**
* Create a message filter from call parameters
*/
@ -85,8 +98,8 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
}
}
if (featureOrNull(RSocketServerSupport) == null) {
install(RSocketServerSupport)
if (featureOrNull(RSocketSupport) == null) {
install(RSocketSupport)
}
routing {
@ -106,7 +119,7 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
magixFlow.replayCache.forEach { message ->
li {
code {
+Json.encodeToString(genericMessageSerializer, message)
+magixJson.encodeToString(genericMessageSerializer, message)
}
}
}
@ -119,36 +132,18 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<GenericMagixMess
val filter = call.buildFilter()
var idCounter = 0
val sseFlow = magixFlow.filter(filter).map {
val data = Json.encodeToString(genericMessageSerializer, it)
val data = magixJson.encodeToString(genericMessageSerializer, it)
SseEvent(data, id = idCounter++.toString())
}
call.respondSse(sseFlow)
}
//rSocket server. Filter from Payload
rSocket("rsocket") {
RSocketRequestHandler {
//handler for request/stream
requestStream = { request: Payload ->
val filter = Json.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message ->
val string = Json.encodeToString(genericMessageSerializer, message)
Payload(string)
}
}
fireAndForget = { request: Payload ->
val message = Json.decodeFromString(genericMessageSerializer, payload.data.readText())
magixFlow.emit(message)
}
}
}
rSocket("rsocket", acceptor = magixAcceptor(magixFlow))
}
}
}
public fun Application.magixModule(route: String = "/", buffer: Int = 100) {
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val magixFlow = MutableSharedFlow<GenericMagixMessage>(buffer)
magixModule(magixFlow, route)
}

View File

@ -0,0 +1,39 @@
package hep.dataforge.magix.server
import io.ktor.application.ApplicationCall
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
import io.ktor.response.cacheControl
import io.ktor.response.respondBytesWriter
import io.ktor.util.KtorExperimentalAPI
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.writeStringUtf8
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
/**
* The data class representing a SSE Event that will be sent to the client.
*/
public data class SseEvent(val data: String, val event: String? = "message", val id: String? = null)
public suspend fun ByteWriteChannel.writeSseFlow(events: Flow<SseEvent>): Unit = events.collect { event ->
if (event.id != null) {
writeStringUtf8("id: ${event.id}\n")
}
if (event.event != null) {
writeStringUtf8("event: ${event.event}\n")
}
for (dataLine in event.data.lines()) {
writeStringUtf8("data: $dataLine\n")
}
writeStringUtf8("\n")
flush()
}
@OptIn(KtorExperimentalAPI::class)
public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null))
respondBytesWriter(contentType = ContentType.Text.EventStream) {
writeSseFlow(events)
}
}

View File

@ -0,0 +1,31 @@
plugins {
id("ru.mipt.npm.mpp")
id("ru.mipt.npm.publish")
}
kscience {
useSerialization{
json()
}
useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra
val rsocketVersion: String by rootProject.extra
repositories{
maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
}
kotlin {
sourceSets {
commonMain {
dependencies {
api(project(":magix:magix-api"))
implementation("io.ktor:ktor-client-core:$ktorVersion")
implementation("io.rsocket.kotlin:rsocket-transport-ktor-client:$rsocketVersion")
}
}
}
}

View File

@ -0,0 +1,88 @@
package hep.dataforge.magix.service
import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixEndpoint.Companion.magixJson
import hep.dataforge.magix.api.MagixMessage
import hep.dataforge.magix.api.MagixMessageFilter
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.keepalive.KeepAlive
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.PayloadMimeType
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import kotlin.time.minutes
import kotlin.time.seconds
public class RScocketMagixEndpoint(
override val scope: CoroutineScope,
public val host: String,
public val port: Int,
public val path: String = "/rsocket",
) : MagixEndpoint {
//create ktor client
@OptIn(KtorExperimentalAPI::class)
private val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
connector = RSocketConnector {
reconnectable(10)
//configure rSocket connector (all values have defaults)
connectionConfig {
keepAlive = KeepAlive(
interval = 30.seconds,
maxLifetime = 2.minutes
)
// //payload for setup frame
// setupPayload { Payload("hello world") }
//mime types
payloadMimeType = PayloadMimeType(
data = "application/json",
metadata = "application/json"
)
}
//optional acceptor for server requests
acceptor {
RSocketRequestHandler {
requestResponse { it } //echo request payload
}
}
}
}
}
private val rSocket = scope.async {
client.rSocket(host, port, path)
}
override suspend fun <T> subscribe(
payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter,
): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(magixJson.encodeToString(filter))
val flow = rSocket.await().requestStream(payload)
return flow.map { magixJson.decodeFromString(serializer, it.data.readText()) }
}
override suspend fun <T> send(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
scope.launch {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(magixJson.encodeToString(serializer, message))
rSocket.await().fireAndForget(payload)
}
}
}

View File

@ -27,16 +27,17 @@ pluginManagement {
rootProject.name = "dataforge-control"
include(
":dataforge-device-core",
":dataforge-device-tcp",
":dataforge-device-serial",
":dataforge-device-server",
":dataforge-magix-client",
":motors",
":demo",
// ":dataforge-device-core",
// ":dataforge-device-tcp",
// ":dataforge-device-serial",
// ":dataforge-device-server",
// ":dataforge-magix-client",
// ":motors",
// ":demo",
":magix",
":magix:magix-api",
":magix:magix-server"
":magix:magix-server",
":magix:magix-service"
)
//includeBuild("../dataforge-core")