diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt index 132a871..16af886 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt @@ -24,7 +24,7 @@ public interface MagixEndpoint { /** * Send an event using specific [payloadSerializer] */ - public suspend fun send( + public suspend fun broadcast( payloadSerializer: KSerializer, message: MagixMessage, ) @@ -40,5 +40,5 @@ public suspend fun MagixEndpoint.subscribe( filter: MagixMessageFilter = MagixMessageFilter.ALL, ): Flow> = subscribe(JsonElement.serializer()) -public suspend fun MagixEndpoint.send(message: MagixMessage): Unit = - send(JsonElement.serializer(), message) \ No newline at end of file +public suspend fun MagixEndpoint.broadcast(message: MagixMessage): Unit = + broadcast(JsonElement.serializer(), message) \ No newline at end of file diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt index a7afe75..8cd025f 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixProcessor.kt @@ -30,7 +30,7 @@ public class MagixConverter( format = outputFormat, origin = newOrigin ?: message.origin ) - endpoint.send(transformed) + endpoint.broadcast(transformed) }.launchIn(this) } } \ No newline at end of file diff --git a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt index e1ebcce..b747771 100644 --- a/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt +++ b/magix/magix-server/src/main/kotlin/hep/dataforge/magix/server/magixModule.kt @@ -27,8 +27,6 @@ import kotlinx.coroutines.flow.* import kotlinx.html.* import kotlinx.serialization.KSerializer import kotlinx.serialization.json.JsonElement -import ru.mipt.npm.ktor.sse.SseEvent -import ru.mipt.npm.ktor.sse.writeSseFlow import java.util.* public typealias GenericMagixMessage = MagixMessage diff --git a/magix/magix-service/build.gradle.kts b/magix/magix-service/build.gradle.kts index 8aeb3fd..abf4c03 100644 --- a/magix/magix-service/build.gradle.kts +++ b/magix/magix-service/build.gradle.kts @@ -7,7 +7,7 @@ kscience { useSerialization{ json() } - useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) + useCoroutines("1.4.1", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) } val dataforgeVersion: String by rootProject.extra diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt new file mode 100644 index 0000000..f7cafdb --- /dev/null +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt @@ -0,0 +1,79 @@ +package hep.dataforge.magix.service + +import hep.dataforge.magix.api.MagixEndpoint +import hep.dataforge.magix.api.MagixMessage +import hep.dataforge.magix.api.MagixMessageFilter +import io.ktor.client.HttpClient +import io.ktor.client.features.websocket.WebSockets +import io.ktor.util.KtorExperimentalAPI +import io.rsocket.kotlin.RSocket +import io.rsocket.kotlin.core.RSocketConnector +import io.rsocket.kotlin.core.RSocketConnectorBuilder +import io.rsocket.kotlin.payload.Payload +import io.rsocket.kotlin.transport.ktor.client.RSocketSupport +import io.rsocket.kotlin.transport.ktor.client.rSocket +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch +import kotlinx.serialization.KSerializer +import kotlinx.serialization.encodeToString + +public class RSocketMagixEndpoint( + override val scope: CoroutineScope, + public val rSocket: RSocket, +) : MagixEndpoint { + + override suspend fun subscribe( + payloadSerializer: KSerializer, + filter: MagixMessageFilter, + ): Flow> { + val serializer = MagixMessage.serializer(payloadSerializer) + val payload = Payload(MagixEndpoint.magixJson.encodeToString(filter)) + val flow = rSocket.requestStream(payload) + return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } + } + + override suspend fun broadcast(payloadSerializer: KSerializer, message: MagixMessage) { + scope.launch { + val serializer = MagixMessage.serializer(payloadSerializer) + val payload = Payload(MagixEndpoint.magixJson.encodeToString(serializer, message)) + rSocket.fireAndForget(payload) + } + } + + public companion object { + + internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) = + RSocketConnector { + reconnectable(10) + connectionConfig(rSocketConfig) + } + + @OptIn(KtorExperimentalAPI::class) + public suspend fun withWebSockets( + scope: CoroutineScope, + host: String, + port: Int, + path: String = "/rsocket", + rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, + ): RSocketMagixEndpoint { + val client = HttpClient { + install(WebSockets) + install(RSocketSupport) { + connector = buildConnector(rSocketConfig) + } + } + + val rSocket = client.rSocket(host, port, path) + + //Ensure client is closed after rSocket if finished + rSocket.job.invokeOnCompletion { + client.close() + } + + return RSocketMagixEndpoint(scope, rSocket) + } + } +} + diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/WebRScocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/WebRScocketMagixEndpoint.kt deleted file mode 100644 index 265174d..0000000 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/WebRScocketMagixEndpoint.kt +++ /dev/null @@ -1,91 +0,0 @@ -package hep.dataforge.magix.service - -import hep.dataforge.magix.api.MagixEndpoint -import hep.dataforge.magix.api.MagixEndpoint.Companion.magixJson -import hep.dataforge.magix.api.MagixMessage -import hep.dataforge.magix.api.MagixMessageFilter -import io.ktor.client.HttpClient -import io.ktor.client.features.websocket.WebSockets -import io.ktor.util.KtorExperimentalAPI -import io.rsocket.kotlin.RSocketRequestHandler -import io.rsocket.kotlin.core.RSocketConnector -import io.rsocket.kotlin.keepalive.KeepAlive -import io.rsocket.kotlin.payload.Payload -import io.rsocket.kotlin.payload.PayloadMimeType -import io.rsocket.kotlin.transport.ktor.client.RSocketSupport -import io.rsocket.kotlin.transport.ktor.client.rSocket -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.async -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.launch -import kotlinx.serialization.KSerializer -import kotlinx.serialization.encodeToString -import kotlin.time.minutes -import kotlin.time.seconds - -/** - * An RSocket endpoint which relies on WebSocket transport - */ -public class WebRScocketMagixEndpoint( - override val scope: CoroutineScope, - public val host: String, - public val port: Int, - public val path: String = "/rsocket", -) : MagixEndpoint { - //create ktor client - @OptIn(KtorExperimentalAPI::class) - private val client = HttpClient { - install(WebSockets) - install(RSocketSupport) { - connector = RSocketConnector { - reconnectable(10) - //configure rSocket connector (all values have defaults) - connectionConfig { - keepAlive = KeepAlive( - interval = 30.seconds, - maxLifetime = 2.minutes - ) - -// //payload for setup frame -// setupPayload { Payload("hello world") } - - //mime types - payloadMimeType = PayloadMimeType( - data = "application/json", - metadata = "application/json" - ) - } - - //optional acceptor for server requests - acceptor { - RSocketRequestHandler { - requestResponse { it } //echo request payload - } - } - } - } - } - - private val rSocket = scope.async { - client.rSocket(host, port, path) - } - - override suspend fun subscribe( - payloadSerializer: KSerializer, - filter: MagixMessageFilter, - ): Flow> { - val serializer = MagixMessage.serializer(payloadSerializer) - val payload = Payload(magixJson.encodeToString(filter)) - val flow = rSocket.await().requestStream(payload) - return flow.map { magixJson.decodeFromString(serializer, it.data.readText()) } - } - - override suspend fun send(payloadSerializer: KSerializer, message: MagixMessage) { - scope.launch { - val serializer = MagixMessage.serializer(payloadSerializer) - val payload = Payload(magixJson.encodeToString(serializer, message)) - rSocket.await().fireAndForget(payload) - } - } -} \ No newline at end of file diff --git a/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt b/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt new file mode 100644 index 0000000..131d8ce --- /dev/null +++ b/magix/magix-service/src/jvmMain/kotlin/hep/dataforge/magix/service/withTcp.kt @@ -0,0 +1,28 @@ +package hep.dataforge.magix.service + +import io.ktor.network.selector.ActorSelectorManager +import io.ktor.network.sockets.SocketOptions +import io.ktor.network.sockets.aSocket +import io.ktor.util.KtorExperimentalAPI +import io.rsocket.kotlin.core.RSocketConnectorBuilder +import io.rsocket.kotlin.transport.ktor.clientTransport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers + + +/** + * Create a plain TCP based [RSocketMagixEndpoint] + */ +@OptIn(KtorExperimentalAPI::class) +public suspend fun RSocketMagixEndpoint.Companion.withTcp( + scope: CoroutineScope, + host: String, + port: Int, + tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, + rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {}, +): RSocketMagixEndpoint { + val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) + val rSocket = buildConnector(rSocketConfig).connect(transport) + + return RSocketMagixEndpoint(scope, rSocket) +}