Remove unnecessary active flag in ZMQ client

This commit is contained in:
Alexander Nozik 2021-07-19 09:41:41 +03:00
parent 64d3f04469
commit b068403429
3 changed files with 11 additions and 12 deletions

View File

@ -10,6 +10,7 @@ import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
@ -21,9 +22,9 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint<T>( public class RSocketMagixEndpoint<T>(
private val coroutineContext: CoroutineContext,
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
private val rSocket: RSocket, private val rSocket: RSocket,
private val coroutineContext: CoroutineContext,
) : MagixEndpoint<T> { ) : MagixEndpoint<T> {
private val serializer = MagixMessage.serializer(payloadSerializer) private val serializer = MagixMessage.serializer(payloadSerializer)
@ -33,7 +34,9 @@ public class RSocketMagixEndpoint<T>(
): Flow<MagixMessage<T>> { ): Flow<MagixMessage<T>> {
val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) } val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
val flow = rSocket.requestStream(payload) val flow = rSocket.requestStream(payload)
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } return flow.map {
MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText())
}.flowOn(coroutineContext)
} }
override suspend fun broadcast(message: MagixMessage<T>) { override suspend fun broadcast(message: MagixMessage<T>) {
@ -77,5 +80,5 @@ public suspend fun <T> MagixEndpoint.Companion.rSocketWithWebSockets(
client.close() client.close()
} }
return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext)
} }

View File

@ -24,5 +24,5 @@ public suspend fun <T> MagixEndpoint.Companion.rSocketWithTcp(
val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig)
val rSocket = buildConnector(rSocketConfig).connect(transport) val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket) return RSocketMagixEndpoint(payloadSerializer, rSocket, coroutineContext)
} }

View File

@ -1,11 +1,9 @@
package ru.mipt.npm.magix.zmq package ru.mipt.npm.magix.zmq
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import org.zeromq.SocketType import org.zeromq.SocketType
import org.zeromq.ZContext import org.zeromq.ZContext
@ -35,12 +33,10 @@ public class ZmqMagixEndpoint<T>(
socket.subscribe("") socket.subscribe("")
return channelFlow { return channelFlow {
var activeFlag = true
invokeOnClose { invokeOnClose {
activeFlag = false
socket.close() socket.close()
} }
while (activeFlag) { while (isActive) {
try { try {
//This is a blocking call. //This is a blocking call.
val string: String? = socket.recvStr() val string: String? = socket.recvStr()
@ -51,9 +47,9 @@ public class ZmqMagixEndpoint<T>(
} catch (t: Throwable) { } catch (t: Throwable) {
socket.close() socket.close()
if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) { if (t is ZMQException && t.errorCode == ZMQ.Error.ETERM.code) {
activeFlag = false cancel("ZMQ connection terminated", t)
} else { } else {
zmqContext.close() throw t
} }
} }
} }