Refactor connection to magix
This commit is contained in:
parent
a1c3902b92
commit
1d8cc33c91
@ -29,6 +29,7 @@ internal fun generateId(request: MagixMessage<*>): String = if (request.id != nu
|
||||
public fun DeviceManager.connectToMagix(
|
||||
endpoint: MagixEndpoint<DeviceMessage>,
|
||||
endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
||||
preSendAction: (MagixMessage<*>) -> Unit = {}
|
||||
): Job = context.launch {
|
||||
endpoint.subscribe().onEach { request ->
|
||||
val responsePayload = respondHubMessage(request.payload)
|
||||
@ -48,13 +49,15 @@ public fun DeviceManager.connectToMagix(
|
||||
}.launchIn(this)
|
||||
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
val magixMessage = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
)
|
||||
preSendAction(magixMessage)
|
||||
endpoint.broadcast(
|
||||
MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
)
|
||||
magixMessage
|
||||
)
|
||||
}.catch { error ->
|
||||
logger.error(error) { "Error while sending a message" }
|
||||
|
@ -2,20 +2,11 @@ package ru.mipt.npm.controls.client
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||
import ru.mipt.npm.controls.controllers.respondHubMessage
|
||||
import ru.mipt.npm.controls.xodus.toEntity
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.context.error
|
||||
import space.kscience.dataforge.context.logger
|
||||
|
||||
/**
|
||||
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) and dump messages at xodus entity store
|
||||
@ -24,38 +15,10 @@ public fun DeviceManager.connectToMagix(
|
||||
endpoint: MagixEndpoint<DeviceMessage>,
|
||||
endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
||||
entityStore: PersistentEntityStore
|
||||
): Job = context.launch {
|
||||
endpoint.subscribe().onEach { request ->
|
||||
val responsePayload = respondHubMessage(request.payload)
|
||||
if (responsePayload != null) {
|
||||
val response = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = generateId(request),
|
||||
parentId = request.id,
|
||||
origin = endpointID,
|
||||
payload = responsePayload
|
||||
)
|
||||
|
||||
endpoint.broadcast(response)
|
||||
): Job = connectToMagix(endpoint, endpointID) { message ->
|
||||
if (message.payload is PropertyChangedMessage) {
|
||||
entityStore.executeInTransaction {
|
||||
message.toEntity(it)
|
||||
}
|
||||
}.catch { error ->
|
||||
logger.error(error) { "Error while responding to message" }
|
||||
}.launchIn(this)
|
||||
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
val magixMessage = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
)
|
||||
endpoint.broadcast(magixMessage)
|
||||
if (payload is PropertyChangedMessage) {
|
||||
entityStore.executeInTransaction {
|
||||
magixMessage.toEntity(it)
|
||||
}
|
||||
}
|
||||
}.catch { error ->
|
||||
logger.error(error) { "Error while sending a message" }
|
||||
}.launchIn(this)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user