diff --git a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt index 42639ab..490ddaf 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/ru/mipt/npm/magix/rsocket/RSocketMagixEndpoint.kt @@ -20,6 +20,7 @@ import kotlinx.serialization.encodeToString import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter +import ru.mipt.npm.magix.api.filter import kotlin.coroutines.CoroutineContext import kotlin.coroutines.coroutineContext @@ -38,7 +39,7 @@ public class RSocketMagixEndpoint( val flow = rSocket.requestStream(payload) return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) - }.flowOn(coroutineContext[CoroutineDispatcher]?:Dispatchers.Unconfined) + }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) } override suspend fun broadcast(message: MagixMessage) { diff --git a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt index d9d6be5..99f209e 100644 --- a/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/ru/mipt/npm/magix/zmq/ZmqMagixEndpoint.kt @@ -21,7 +21,7 @@ public class ZmqMagixEndpoint( payloadSerializer: KSerializer, private val pubPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, - private val coroutineContext: CoroutineContext = Dispatchers.IO + private val coroutineContext: CoroutineContext = Dispatchers.IO, ) : MagixEndpoint, AutoCloseable { private val zmqContext by lazy { ZContext() } @@ -54,9 +54,8 @@ public class ZmqMagixEndpoint( } } } - }.filter(filter).flowOn( - coroutineContext[CoroutineDispatcher] ?: Dispatchers.IO - ) //should be flown on IO because of blocking calls + }.filter(filter).flowOn(coroutineContext[CoroutineDispatcher] ?: Dispatchers.IO) + //should be flown on IO because of blocking calls } private val publishSocket by lazy {