diff --git a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt index 2e92205..8faed41 100644 --- a/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/ru/mipt/npm/controls/client/dfMagix.kt @@ -29,6 +29,7 @@ internal fun generateId(request: MagixMessage<*>): String = if (request.id != nu public fun DeviceManager.connectToMagix( endpoint: MagixEndpoint, endpointID: String = DATAFORGE_MAGIX_FORMAT, + preSendAction: (MagixMessage<*>) -> Unit = {} ): Job = context.launch { endpoint.subscribe().onEach { request -> val responsePayload = respondHubMessage(request.payload) @@ -48,13 +49,15 @@ public fun DeviceManager.connectToMagix( }.launchIn(this) hubMessageFlow(this).onEach { payload -> + val magixMessage = MagixMessage( + format = DATAFORGE_MAGIX_FORMAT, + id = "df[${payload.hashCode()}]", + origin = endpointID, + payload = payload + ) + preSendAction(magixMessage) endpoint.broadcast( - MagixMessage( - format = DATAFORGE_MAGIX_FORMAT, - id = "df[${payload.hashCode()}]", - origin = endpointID, - payload = payload - ) + magixMessage ) }.catch { error -> logger.error(error) { "Error while sending a message" } diff --git a/controls-magix-client/src/jvmMain/kotlin/ru/mipt/npm/controls/client/xodusDfMagix.kt b/controls-magix-client/src/jvmMain/kotlin/ru/mipt/npm/controls/client/xodusDfMagix.kt index 680b042..4a8cdc4 100644 --- a/controls-magix-client/src/jvmMain/kotlin/ru/mipt/npm/controls/client/xodusDfMagix.kt +++ b/controls-magix-client/src/jvmMain/kotlin/ru/mipt/npm/controls/client/xodusDfMagix.kt @@ -2,20 +2,11 @@ package ru.mipt.npm.controls.client import jetbrains.exodus.entitystore.PersistentEntityStore import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage import ru.mipt.npm.controls.controllers.DeviceManager -import ru.mipt.npm.controls.controllers.hubMessageFlow -import ru.mipt.npm.controls.controllers.respondHubMessage import ru.mipt.npm.controls.xodus.toEntity import ru.mipt.npm.magix.api.MagixEndpoint -import ru.mipt.npm.magix.api.MagixMessage -import space.kscience.dataforge.context.error -import space.kscience.dataforge.context.logger /** * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) and dump messages at xodus entity store @@ -24,38 +15,10 @@ public fun DeviceManager.connectToMagix( endpoint: MagixEndpoint, endpointID: String = DATAFORGE_MAGIX_FORMAT, entityStore: PersistentEntityStore -): Job = context.launch { - endpoint.subscribe().onEach { request -> - val responsePayload = respondHubMessage(request.payload) - if (responsePayload != null) { - val response = MagixMessage( - format = DATAFORGE_MAGIX_FORMAT, - id = generateId(request), - parentId = request.id, - origin = endpointID, - payload = responsePayload - ) - - endpoint.broadcast(response) +): Job = connectToMagix(endpoint, endpointID) { message -> + if (message.payload is PropertyChangedMessage) { + entityStore.executeInTransaction { + message.toEntity(it) } - }.catch { error -> - logger.error(error) { "Error while responding to message" } - }.launchIn(this) - - hubMessageFlow(this).onEach { payload -> - val magixMessage = MagixMessage( - format = DATAFORGE_MAGIX_FORMAT, - id = "df[${payload.hashCode()}]", - origin = endpointID, - payload = payload - ) - endpoint.broadcast(magixMessage) - if (payload is PropertyChangedMessage) { - entityStore.executeInTransaction { - magixMessage.toEntity(it) - } - } - }.catch { error -> - logger.error(error) { "Error while sending a message" } - }.launchIn(this) + } }