Add port configuration to magix server

This commit is contained in:
Alexander Nozik 2022-06-06 10:57:24 +03:00
parent cfaeb964e7
commit 535e44286c
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
9 changed files with 72 additions and 11 deletions

View File

@ -8,7 +8,7 @@ import kotlinx.serialization.json.JsonObject
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.rsocket.rSocketStreamWithTcp
import ru.mipt.npm.magix.rsocket.rSocketStreamWithWebSockets
import ru.mipt.npm.magix.server.startMagixServer
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@ -59,7 +59,7 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) {
@OptIn(ExperimentalTime::class)
suspend fun main(): Unit = coroutineScope {
launch(Dispatchers.Default) {
val server = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
val server = startMagixServer(enableRawRSocket = false, enableZmq = false) { flow ->
//echo each message
flow.onEach { message ->
if (message.parentId == null) {
@ -72,7 +72,7 @@ suspend fun main(): Unit = coroutineScope {
val responseTime = measureTime {
MagixEndpoint.rSocketStreamWithTcp("localhost").use {
MagixEndpoint.rSocketStreamWithWebSockets("localhost").use {
it.collectEcho(this, 5000)
}
}

View File

@ -23,6 +23,11 @@ public interface MagixEndpoint {
message: MagixMessage,
)
/**
* Close the endpoint and the associated connection if it exists
*/
public fun close()
public companion object {
/**
* A default port for HTTP/WS connections

View File

@ -0,0 +1,13 @@
plugins {
id("ru.mipt.npm.gradle.jvm")
`maven-publish`
}
description = """
RabbitMQ client magix endpoint
""".trimIndent()
dependencies {
api(projects.magix.magixApi)
implementation("com.rabbitmq:amqp-client:5.14.2")
}

View File

@ -14,7 +14,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.serialization.encodeToString
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
@ -30,7 +29,7 @@ public class RSocketMagixEndpoint(
override fun subscribe(
filter: MagixMessageFilter,
): Flow<MagixMessage> {
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessageFilter.serializer(), filter)) }
val flow = rSocket.requestStream(payload)
return flow.map {
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText())

View File

@ -1,12 +1,18 @@
package ru.mipt.npm.magix.rsocket
import io.ktor.client.HttpClient
import io.ktor.client.plugins.websocket.WebSockets
import io.ktor.utils.io.core.Closeable
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.ktor.client.RSocketSupport
import io.rsocket.kotlin.ktor.client.rSocket
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
@ -17,20 +23,32 @@ import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
/**
* RSocket endpoint based on established channel
* RSocket endpoint based on established channel. This way it works a bit faster than [RSocketMagixEndpoint]
* for sending and receiving, but less flexible in terms of filters. One general [streamFilter] could be set
* in constructor and applied on the loop side. Filters in [subscribe] are applied on the endpoint side on top
* of received data.
*/
public class RSocketStreamMagixEndpoint(
private val rSocket: RSocket,
private val coroutineContext: CoroutineContext,
public val streamFilter: MagixMessageFilter = MagixMessageFilter(),
) : MagixEndpoint, Closeable {
private val output: MutableSharedFlow<MagixMessage> = MutableSharedFlow()
private val input: Flow<Payload> by lazy {
rSocket.requestChannel(
Payload.Empty,
buildPayload {
data(
MagixEndpoint.magixJson.encodeToString(
MagixMessageFilter.serializer(),
streamFilter
)
)
},
output.map { message ->
buildPayload {
data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message))
@ -54,4 +72,27 @@ public class RSocketStreamMagixEndpoint(
override fun close() {
rSocket.cancel()
}
}
public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets(
host: String,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
path: String = "/rsocket",
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketStreamMagixEndpoint {
val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
connector = buildConnector(rSocketConfig)
}
}
val rSocket = client.rSocket(host, port, path)
//Ensure client is closed after rSocket if finished
rSocket.coroutineContext[Job]?.invokeOnCompletion {
client.close()
}
return RSocketStreamMagixEndpoint(rSocket, coroutineContext)
}

View File

@ -42,7 +42,10 @@ public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 1000,
enableRawRSocket: Boolean = true,
rawRSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
enableZmq: Boolean = true,
zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
applicationConfiguration: Application.(MutableSharedFlow<MagixMessage>) -> Unit = {},
): ApplicationEngine {
val logger = LoggerFactory.getLogger("magix-server")
@ -54,14 +57,11 @@ public fun CoroutineScope.startMagixServer(
if (enableRawRSocket) {
//Start tcpRSocket server
val rawRSocketPort = DEFAULT_MAGIX_RAW_PORT
logger.info("Starting magix raw rsocket server on port $rawRSocketPort")
launchMagixServerRawRSocket(magixFlow, rawRSocketPort)
}
if (enableZmq) {
//Start ZMQ server socket pair
val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT
val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT
logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort")
launchMagixServerZmqSocket(
magixFlow,

View File

@ -1,4 +1,5 @@
rootProject.name = "controls-kt"
enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")
enableFeaturePreview("VERSION_CATALOGS")
@ -53,11 +54,13 @@ include(
":magix:magix-rsocket",
":magix:magix-java-client",
":magix:magix-zmq",
":magix:magix-demo",
":magix:magix-rabbit",
// ":magix:magix-storage",
":magix:magix-storage:magix-storage-xodus",
":controls-magix-client",
":demo:all-things",
":demo:magix-demo",
":demo:car",
":demo:motors",
":demo:echo"