diff --git a/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt b/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt index 31d33d5..9467c47 100644 --- a/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt +++ b/demo/echo/src/main/kotlin/ru/mipt/npm/controls/demo/echo/main.kt @@ -8,7 +8,7 @@ import kotlinx.serialization.json.JsonObject import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter -import ru.mipt.npm.magix.rsocket.rSocketStreamWithTcp +import ru.mipt.npm.magix.rsocket.rSocketStreamWithWebSockets import ru.mipt.npm.magix.server.startMagixServer import kotlin.time.ExperimentalTime import kotlin.time.measureTime @@ -59,7 +59,7 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) { @OptIn(ExperimentalTime::class) suspend fun main(): Unit = coroutineScope { launch(Dispatchers.Default) { - val server = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> + val server = startMagixServer(enableRawRSocket = false, enableZmq = false) { flow -> //echo each message flow.onEach { message -> if (message.parentId == null) { @@ -72,7 +72,7 @@ suspend fun main(): Unit = coroutineScope { val responseTime = measureTime { - MagixEndpoint.rSocketStreamWithTcp("localhost").use { + MagixEndpoint.rSocketStreamWithWebSockets("localhost").use { it.collectEcho(this, 5000) } } diff --git a/magix/magix-demo/build.gradle.kts b/demo/magix-demo/build.gradle.kts similarity index 100% rename from magix/magix-demo/build.gradle.kts rename to demo/magix-demo/build.gradle.kts diff --git a/magix/magix-demo/src/main/kotlin/zmq.kt b/demo/magix-demo/src/main/kotlin/zmq.kt similarity index 100% rename from magix/magix-demo/src/main/kotlin/zmq.kt rename to demo/magix-demo/src/main/kotlin/zmq.kt diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt index c531583..52d591f 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixEndpoint.kt @@ -23,6 +23,11 @@ public interface MagixEndpoint { message: MagixMessage, ) + /** + * Close the endpoint and the associated connection if it exists + */ + public fun close() + public companion object { /** * A default port for HTTP/WS connections diff --git a/magix/magix-rabbit/build.gradle.kts b/magix/magix-rabbit/build.gradle.kts new file mode 100644 index 0000000..c9d4bef --- /dev/null +++ b/magix/magix-rabbit/build.gradle.kts @@ -0,0 +1,13 @@ +plugins { + id("ru.mipt.npm.gradle.jvm") + `maven-publish` +} + +description = """ + RabbitMQ client magix endpoint +""".trimIndent() + +dependencies { + api(projects.magix.magixApi) + implementation("com.rabbitmq:amqp-client:5.14.2") +} diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index 255b5bb..cb165cf 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -14,7 +14,6 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map -import kotlinx.serialization.encodeToString import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter @@ -30,7 +29,7 @@ public class RSocketMagixEndpoint( override fun subscribe( filter: MagixMessageFilter, ): Flow { - val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } + val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessageFilter.serializer(), filter)) } val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), it.data.readText()) diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt index 4b0745e..25f99bb 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketStreamMagixEndpoint.kt @@ -1,12 +1,18 @@ package ru.mipt.npm.magix.rsocket +import io.ktor.client.HttpClient +import io.ktor.client.plugins.websocket.WebSockets import io.ktor.utils.io.core.Closeable import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.core.RSocketConnectorBuilder +import io.rsocket.kotlin.ktor.client.RSocketSupport +import io.rsocket.kotlin.ktor.client.rSocket import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.buildPayload import io.rsocket.kotlin.payload.data import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow @@ -17,20 +23,32 @@ import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter import ru.mipt.npm.magix.api.filter import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext /** - * RSocket endpoint based on established channel + * RSocket endpoint based on established channel. This way it works a bit faster than [RSocketMagixEndpoint] + * for sending and receiving, but less flexible in terms of filters. One general [streamFilter] could be set + * in constructor and applied on the loop side. Filters in [subscribe] are applied on the endpoint side on top + * of received data. */ public class RSocketStreamMagixEndpoint( private val rSocket: RSocket, private val coroutineContext: CoroutineContext, + public val streamFilter: MagixMessageFilter = MagixMessageFilter(), ) : MagixEndpoint, Closeable { private val output: MutableSharedFlow = MutableSharedFlow() private val input: Flow by lazy { rSocket.requestChannel( - Payload.Empty, + buildPayload { + data( + MagixEndpoint.magixJson.encodeToString( + MagixMessageFilter.serializer(), + streamFilter + ) + ) + }, output.map { message -> buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) @@ -54,4 +72,27 @@ public class RSocketStreamMagixEndpoint( override fun close() { rSocket.cancel() } +} + +public suspend fun MagixEndpoint.Companion.rSocketStreamWithWebSockets( + host: String, + port: Int = DEFAULT_MAGIX_HTTP_PORT, + path: String = "/rsocket", + rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, +): RSocketStreamMagixEndpoint { + val client = HttpClient { + install(WebSockets) + install(RSocketSupport) { + connector = buildConnector(rSocketConfig) + } + } + + val rSocket = client.rSocket(host, port, path) + + //Ensure client is closed after rSocket if finished + rSocket.coroutineContext[Job]?.invokeOnCompletion { + client.close() + } + + return RSocketStreamMagixEndpoint(rSocket, coroutineContext) } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt index 539f439..e75b594 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -42,7 +42,10 @@ public fun CoroutineScope.startMagixServer( port: Int = DEFAULT_MAGIX_HTTP_PORT, buffer: Int = 1000, enableRawRSocket: Boolean = true, + rawRSocketPort: Int = DEFAULT_MAGIX_RAW_PORT, enableZmq: Boolean = true, + zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, + zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, applicationConfiguration: Application.(MutableSharedFlow) -> Unit = {}, ): ApplicationEngine { val logger = LoggerFactory.getLogger("magix-server") @@ -54,14 +57,11 @@ public fun CoroutineScope.startMagixServer( if (enableRawRSocket) { //Start tcpRSocket server - val rawRSocketPort = DEFAULT_MAGIX_RAW_PORT logger.info("Starting magix raw rsocket server on port $rawRSocketPort") launchMagixServerRawRSocket(magixFlow, rawRSocketPort) } if (enableZmq) { //Start ZMQ server socket pair - val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT - val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT logger.info("Starting magix zmq server on pub port $zmqPubSocketPort and pull port $zmqPullSocketPort") launchMagixServerZmqSocket( magixFlow, diff --git a/settings.gradle.kts b/settings.gradle.kts index e587c96..aff8bbd 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -1,4 +1,5 @@ rootProject.name = "controls-kt" + enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS") enableFeaturePreview("VERSION_CATALOGS") @@ -53,11 +54,13 @@ include( ":magix:magix-rsocket", ":magix:magix-java-client", ":magix:magix-zmq", - ":magix:magix-demo", + ":magix:magix-rabbit", + // ":magix:magix-storage", ":magix:magix-storage:magix-storage-xodus", ":controls-magix-client", ":demo:all-things", + ":demo:magix-demo", ":demo:car", ":demo:motors", ":demo:echo"