Dev #8
@ -15,7 +15,12 @@ kotlin {
|
||||
dependencies {
|
||||
implementation(project(":magix:magix-rsocket"))
|
||||
implementation(project(":controls-core"))
|
||||
}
|
||||
}
|
||||
jvmMain {
|
||||
dependencies {
|
||||
implementation("org.jetbrains.xodus:xodus-openAPI:1.3.232")
|
||||
implementation(project(":controls-xodus"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,7 +13,6 @@ import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.context.error
|
||||
import space.kscience.dataforge.context.logger
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
|
||||
|
||||
public const val DATAFORGE_MAGIX_FORMAT: String = "dataforge"
|
||||
|
@ -0,0 +1,61 @@
|
||||
package ru.mipt.npm.controls.client
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.catch
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||
import ru.mipt.npm.controls.controllers.respondHubMessage
|
||||
import ru.mipt.npm.controls.xodus.toEntity
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.context.error
|
||||
import space.kscience.dataforge.context.logger
|
||||
|
||||
/**
|
||||
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) and dump messages at xodus entity store
|
||||
*/
|
||||
public fun DeviceManager.connectToMagix(
|
||||
endpoint: MagixEndpoint<DeviceMessage>,
|
||||
endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
||||
entityStore: PersistentEntityStore
|
||||
): Job = context.launch {
|
||||
endpoint.subscribe().onEach { request ->
|
||||
val responsePayload = respondHubMessage(request.payload)
|
||||
if (responsePayload != null) {
|
||||
val response = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = generateId(request),
|
||||
parentId = request.id,
|
||||
origin = endpointID,
|
||||
payload = responsePayload
|
||||
)
|
||||
|
||||
endpoint.broadcast(response)
|
||||
}
|
||||
}.catch { error ->
|
||||
logger.error(error) { "Error while responding to message" }
|
||||
}.launchIn(this)
|
||||
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
val magixMessage = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
)
|
||||
endpoint.broadcast(magixMessage)
|
||||
if (payload is PropertyChangedMessage) {
|
||||
entityStore.executeInTransaction {
|
||||
magixMessage.toEntity(it)
|
||||
}
|
||||
}
|
||||
}.catch { error ->
|
||||
logger.error(error) { "Error while sending a message" }
|
||||
}.launchIn(this)
|
||||
}
|
13
controls-xodus/build.gradle.kts
Normal file
13
controls-xodus/build.gradle.kts
Normal file
@ -0,0 +1,13 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.jvm")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
|
||||
dependencies {
|
||||
implementation(projects.controlsCore)
|
||||
implementation(projects.magix.magixApi)
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:1.3.232")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:1.3.232")
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
package ru.mipt.npm.controls.xodus
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
|
||||
public fun PropertyChangedMessage.toEntity(transaction: StoreTransaction): Entity {
|
||||
val entity = transaction.newEntity("PropertyChangedMessage")
|
||||
entity.setProperty("property", property)
|
||||
entity.setProperty("value", value.toString())
|
||||
entity.setProperty("sourceDevice", sourceDevice.toString())
|
||||
targetDevice?.let { entity.setProperty("targetDevice", it.toString()) }
|
||||
comment?.let { entity.setProperty("comment", it) }
|
||||
time?.let { entity.setProperty("time", it.toEpochMilliseconds()) }
|
||||
return entity
|
||||
}
|
||||
|
||||
public fun MagixMessage<DeviceMessage>.toEntity(transaction: StoreTransaction): Entity {
|
||||
val entity = transaction.newEntity("MagixMessage")
|
||||
entity.setProperty("format", format)
|
||||
entity.setProperty("origin", origin)
|
||||
if (payload is PropertyChangedMessage) {
|
||||
val payloadEntity = (payload as PropertyChangedMessage).toEntity(transaction)
|
||||
entity.setLink("payload", payloadEntity)
|
||||
}
|
||||
target?.let { entity.setProperty("target", it) }
|
||||
id?.let { entity.setProperty("id", it) }
|
||||
parentId?.let { entity.setProperty("parentId", it) }
|
||||
user?.let { entity.setProperty("user", it.toString()) }
|
||||
return entity
|
||||
}
|
@ -50,5 +50,6 @@ include(
|
||||
":magix:magix-zmq",
|
||||
":magix:magix-demo",
|
||||
":controls-magix-client",
|
||||
":motors"
|
||||
":motors",
|
||||
":controls-xodus"
|
||||
)
|
Loading…
Reference in New Issue
Block a user