Dev #8
@ -95,6 +95,8 @@ public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): DeviceMe
|
|||||||
* Collect all messages from given [DeviceHub], applying proper relative names
|
* Collect all messages from given [DeviceHub], applying proper relative names
|
||||||
*/
|
*/
|
||||||
public fun DeviceHub.hubMessageFlow(scope: CoroutineScope): Flow<DeviceMessage> {
|
public fun DeviceHub.hubMessageFlow(scope: CoroutineScope): Flow<DeviceMessage> {
|
||||||
|
|
||||||
|
//TODO could we avoid using downstream scope?
|
||||||
val outbox = MutableSharedFlow<DeviceMessage>()
|
val outbox = MutableSharedFlow<DeviceMessage>()
|
||||||
if (this is Device) {
|
if (this is Device) {
|
||||||
messageFlow.onEach {
|
messageFlow.onEach {
|
||||||
|
@ -17,11 +17,5 @@ kotlin {
|
|||||||
implementation(project(":controls-core"))
|
implementation(project(":controls-core"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
jvmMain {
|
|
||||||
dependencies {
|
|
||||||
implementation("org.jetbrains.xodus:xodus-openAPI:1.3.232")
|
|
||||||
implementation(project(":controls-xodus"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,24 +1,16 @@
|
|||||||
package ru.mipt.npm.controls.client
|
package ru.mipt.npm.controls.client
|
||||||
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
///**
|
||||||
import kotlinx.coroutines.Job
|
// * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) and dump messages at xodus entity store
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
// */
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
//public fun DeviceManager.connectToMagix(
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
// endpoint: MagixEndpoint<DeviceMessage>,
|
||||||
import ru.mipt.npm.controls.xodus.toEntity
|
// endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
// entityStore: PersistentEntityStore
|
||||||
|
//): Job = connectToMagix(endpoint, endpointID) { message ->
|
||||||
/**
|
// if (message.payload is PropertyChangedMessage) {
|
||||||
* Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) and dump messages at xodus entity store
|
// entityStore.executeInTransaction {
|
||||||
*/
|
// message.toEntity(it)
|
||||||
public fun DeviceManager.connectToMagix(
|
// }
|
||||||
endpoint: MagixEndpoint<DeviceMessage>,
|
// }
|
||||||
endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
//}
|
||||||
entityStore: PersistentEntityStore
|
|
||||||
): Job = connectToMagix(endpoint, endpointID) { message ->
|
|
||||||
if (message.payload is PropertyChangedMessage) {
|
|
||||||
entityStore.executeInTransaction {
|
|
||||||
message.toEntity(it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -3,11 +3,12 @@ plugins {
|
|||||||
`maven-publish`
|
`maven-publish`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val xodusVersion = "1.3.232"
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(projects.controlsCore)
|
implementation(projects.controlsCore)
|
||||||
implementation(projects.magix.magixApi)
|
implementation(projects.magix.magixApi)
|
||||||
implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232")
|
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-environment:1.3.232")
|
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-vfs:1.3.232")
|
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||||
}
|
}
|
@ -8,12 +8,26 @@ import kotlinx.serialization.json.JsonElement
|
|||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
import ru.mipt.npm.magix.api.MagixMessage
|
||||||
import space.kscience.dataforge.meta.MetaSerializer
|
import space.kscience.dataforge.meta.MetaSerializer
|
||||||
|
import space.kscience.dataforge.meta.isLeaf
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.values.Value
|
||||||
|
import space.kscience.dataforge.values.ValueType
|
||||||
|
|
||||||
public fun PropertyChangedMessage.toEntity(transaction: StoreTransaction): Entity {
|
public fun PropertyChangedMessage.toEntity(transaction: StoreTransaction): Entity {
|
||||||
val entity = transaction.newEntity("PropertyChangedMessage")
|
val entity = transaction.newEntity("PropertyChangedMessage")
|
||||||
entity.setProperty("property", property)
|
entity.setProperty("property", property)
|
||||||
|
if (value.isLeaf) {
|
||||||
|
val v: Value = value.value ?: TODO()
|
||||||
|
when(v.type){
|
||||||
|
ValueType.NULL -> TODO()
|
||||||
|
ValueType.LIST -> TODO()
|
||||||
|
ValueType.NUMBER -> TODO()
|
||||||
|
ValueType.STRING -> TODO()
|
||||||
|
ValueType.BOOLEAN -> TODO()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
entity.setProperty("value", value.toString())
|
entity.setProperty("value", value.toString())
|
||||||
|
}
|
||||||
entity.setProperty("sourceDevice", sourceDevice.toString())
|
entity.setProperty("sourceDevice", sourceDevice.toString())
|
||||||
targetDevice?.let { entity.setProperty("targetDevice", it.toString()) }
|
targetDevice?.let { entity.setProperty("targetDevice", it.toString()) }
|
||||||
comment?.let { entity.setProperty("comment", it) }
|
comment?.let { entity.setProperty("comment", it) }
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
package ru.mipt.npm.controls.xodus
|
||||||
|
|
||||||
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
|
import kotlinx.coroutines.flow.launchIn
|
||||||
|
import kotlinx.coroutines.flow.onEach
|
||||||
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
|
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||||
|
|
||||||
|
|
||||||
|
public fun DeviceManager.connectXodus(
|
||||||
|
entityStore: PersistentEntityStore,
|
||||||
|
//filter: (DeviceMessage) -> Boolean = {it is PropertyChangedMessage}
|
||||||
|
): Job = hubMessageFlow(context).onEach { message ->
|
||||||
|
if (message is PropertyChangedMessage) {
|
||||||
|
entityStore.executeInTransaction {
|
||||||
|
message.toEntity(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.launchIn(context)
|
@ -19,6 +19,7 @@ dependencies {
|
|||||||
implementation(projects.magix.magixServer)
|
implementation(projects.magix.magixServer)
|
||||||
implementation(projects.magix.magixRsocket)
|
implementation(projects.magix.magixRsocket)
|
||||||
implementation(projects.controlsMagixClient)
|
implementation(projects.controlsMagixClient)
|
||||||
|
implementation(projects.controlsXodus)
|
||||||
|
|
||||||
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
||||||
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
|
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
|
||||||
|
@ -14,8 +14,7 @@ import space.kscience.dataforge.meta.string
|
|||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import kotlin.time.ExperimentalTime
|
import kotlin.time.ExperimentalTime
|
||||||
|
|
||||||
class MagixVirtualCar(context: Context, meta: Meta)
|
class MagixVirtualCar(context: Context, meta: Meta) : VirtualCar(context, meta) {
|
||||||
: VirtualCar(context, meta) {
|
|
||||||
|
|
||||||
private suspend fun MagixEndpoint<DeviceMessage>.startMagixVirtualCarUpdate() {
|
private suspend fun MagixEndpoint<DeviceMessage>.startMagixVirtualCarUpdate() {
|
||||||
launch {
|
launch {
|
||||||
|
@ -1,20 +1,21 @@
|
|||||||
package ru.mipt.npm.controls.demo.car
|
package ru.mipt.npm.controls.demo.car
|
||||||
|
|
||||||
import io.ktor.server.engine.*
|
import io.ktor.server.engine.ApplicationEngine
|
||||||
import javafx.beans.property.DoubleProperty
|
import javafx.beans.property.DoubleProperty
|
||||||
import javafx.scene.Parent
|
import javafx.scene.Parent
|
||||||
import javafx.scene.control.TextField
|
import javafx.scene.control.TextField
|
||||||
import javafx.scene.layout.Priority
|
import javafx.scene.layout.Priority
|
||||||
import javafx.stage.Stage
|
import javafx.stage.Stage
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStoreImpl
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
import ru.mipt.npm.controls.api.DeviceMessage
|
||||||
import ru.mipt.npm.controls.client.connectToMagix
|
import ru.mipt.npm.controls.client.connectToMagix
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
import ru.mipt.npm.controls.controllers.install
|
import ru.mipt.npm.controls.controllers.install
|
||||||
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
||||||
|
import ru.mipt.npm.controls.xodus.connectXodus
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||||
import ru.mipt.npm.magix.server.startMagixServer
|
import ru.mipt.npm.magix.server.startMagixServer
|
||||||
@ -27,6 +28,7 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
var magixVirtualCar: MagixVirtualCar? = null
|
var magixVirtualCar: MagixVirtualCar? = null
|
||||||
var magixServer: ApplicationEngine? = null
|
var magixServer: ApplicationEngine? = null
|
||||||
var entityStore: PersistentEntityStore? = null
|
var entityStore: PersistentEntityStore? = null
|
||||||
|
var storageJob: Job? =null
|
||||||
|
|
||||||
override val context = Context("demoDevice") {
|
override val context = Context("demoDevice") {
|
||||||
plugin(DeviceManager)
|
plugin(DeviceManager)
|
||||||
@ -37,19 +39,18 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
fun init() {
|
fun init() {
|
||||||
context.launch {
|
context.launch {
|
||||||
virtualCar = deviceManager.install("virtual-car", VirtualCar)
|
virtualCar = deviceManager.install("virtual-car", VirtualCar)
|
||||||
|
|
||||||
//starting magix event loop
|
//starting magix event loop
|
||||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
|
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
|
||||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||||
entityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
|
entityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
|
||||||
|
//connect to entity store
|
||||||
|
storageJob = deviceManager.connectXodus(entityStore as PersistentEntityStore)
|
||||||
//Launch device client and connect it to the server
|
//Launch device client and connect it to the server
|
||||||
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
||||||
if (entityStore != null) {
|
|
||||||
deviceManager.connectToMagix(deviceEndpoint, entityStore = entityStore as PersistentEntityStoreImpl)
|
|
||||||
} else {
|
|
||||||
deviceManager.connectToMagix(deviceEndpoint)
|
deviceManager.connectToMagix(deviceEndpoint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
fun shutdown() {
|
fun shutdown() {
|
||||||
logger.info { "Shutting down..." }
|
logger.info { "Shutting down..." }
|
||||||
|
Loading…
Reference in New Issue
Block a user