diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index c6d0277..5327e53 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -10,6 +10,7 @@ import io.rsocket.kotlin.payload.data import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.rSocket import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.map import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer @@ -21,9 +22,9 @@ import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext public class RSocketMagixEndpoint( - private val coroutineContext: CoroutineContext, payloadSerializer: KSerializer, private val rSocket: RSocket, + private val coroutineContext: CoroutineContext, ) : MagixEndpoint { private val serializer = MagixMessage.serializer(payloadSerializer) @@ -33,7 +34,9 @@ public class RSocketMagixEndpoint( ): Flow> { val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } val flow = rSocket.requestStream(payload) - return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } + return flow.map { + MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) + }.flowOn(coroutineContext) } override suspend fun broadcast(message: MagixMessage) { @@ -77,5 +80,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithWebSockets( client.close() } - return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) + return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext) } \ No newline at end of file diff --git a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt index 1efeb13..3642905 100644 --- a/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt +++ b/magix/magix-rsocket/src/jvmMain/kotlin/ru/mipt/npm/magix/rsocket/withTcp.kt @@ -24,5 +24,5 @@ public suspend fun MagixEndpoint.Companion.rSocketWithTcp( val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) val rSocket = buildConnector(rSocketConfig).connect(transport) - return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) + return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext) } diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index 1711a25..76d74c6 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -1,11 +1,9 @@ package ru.mipt.npm.magix.zmq -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.withContext import kotlinx.serialization.KSerializer import org.zeromq.SocketType import org.zeromq.ZContext @@ -35,12 +33,10 @@ public class ZmqMagixEndpoint( socket.subscribe("") return channelFlow { - var activeFlag = true invokeOnClose { - activeFlag = false socket.close() } - while (activeFlag) { + while (isActive) { try { //This is a blocking call. val string: String? = socket.recvStr() @@ -51,9 +47,9 @@ public class ZmqMagixEndpoint( } catch (t: Throwable) { socket.close() if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) { - activeFlag = false + cancel("ZMQ connection terminated", t) } else { - zmqContext.close() + throw t } } }