Dev #8
@ -2,7 +2,7 @@ plugins {
|
||||
id("ru.mipt.npm.gradle.project")
|
||||
}
|
||||
|
||||
val dataforgeVersion: String by extra("0.5.1")
|
||||
val dataforgeVersion: String by extra("0.5.2")
|
||||
val ktorVersion: String by extra(ru.mipt.npm.gradle.KScienceVersions.ktorVersion)
|
||||
val rsocketVersion by extra("0.13.1")
|
||||
|
||||
|
@ -80,6 +80,8 @@ public suspend fun DeviceHub.respondHubMessage(request: DeviceMessage): DeviceMe
|
||||
* Collect all messages from given [DeviceHub], applying proper relative names
|
||||
*/
|
||||
public fun DeviceHub.hubMessageFlow(scope: CoroutineScope): Flow<DeviceMessage> {
|
||||
|
||||
//TODO could we avoid using downstream scope?
|
||||
val outbox = MutableSharedFlow<DeviceMessage>()
|
||||
if (this is Device) {
|
||||
messageFlow.onEach {
|
||||
|
@ -18,4 +18,4 @@ kotlin {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ internal fun generateId(request: MagixMessage<*>): String = if (request.id != nu
|
||||
public fun DeviceManager.connectToMagix(
|
||||
endpoint: MagixEndpoint<DeviceMessage>,
|
||||
endpointID: String = DATAFORGE_MAGIX_FORMAT,
|
||||
preSendAction: (MagixMessage<*>) -> Unit = {}
|
||||
): Job = context.launch {
|
||||
endpoint.subscribe().onEach { request ->
|
||||
val responsePayload = respondHubMessage(request.payload)
|
||||
@ -48,13 +49,15 @@ public fun DeviceManager.connectToMagix(
|
||||
}.launchIn(this)
|
||||
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
val magixMessage = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
)
|
||||
preSendAction(magixMessage)
|
||||
endpoint.broadcast(
|
||||
MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
)
|
||||
magixMessage
|
||||
)
|
||||
}.catch { error ->
|
||||
logger.error(error) { "Error while sending a message" }
|
||||
|
13
controls-mongo/build.gradle.kts
Normal file
13
controls-mongo/build.gradle.kts
Normal file
@ -0,0 +1,13 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.jvm")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val kmongoVersion = "4.4.0"
|
||||
|
||||
dependencies {
|
||||
implementation(projects.controlsCore)
|
||||
implementation(projects.magix.magixApi)
|
||||
implementation(projects.controlsMagixClient)
|
||||
implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion")
|
||||
}
|
@ -0,0 +1,58 @@
|
||||
package ru.mipt.npm.controls.mongo
|
||||
|
||||
import kotlinx.coroutines.InternalCoroutinesApi
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.litote.kmongo.coroutine.CoroutineClient
|
||||
import org.litote.kmongo.coroutine.coroutine
|
||||
import org.litote.kmongo.coroutine.insertOne
|
||||
import org.litote.kmongo.reactivestreams.KMongo
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.context.debug
|
||||
import space.kscience.dataforge.context.logger
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
|
||||
internal object DefaultMongoConfig {
|
||||
const val databaseName = "deviceMessage"
|
||||
}
|
||||
|
||||
public object MongoClientFactory : Factory<CoroutineClient> {
|
||||
private const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["mongoConfig"]?.get("connectionString")?.string?.let {
|
||||
KMongo.createClient(it).coroutine
|
||||
} ?: KMongo.createClient(connectionString).coroutine
|
||||
}
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
public fun DeviceManager.connectMongo(
|
||||
factory: Factory<CoroutineClient>,
|
||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true }
|
||||
): Job {
|
||||
val client = factory.invoke(meta, context)
|
||||
logger.debug { "Mongo client opened" }
|
||||
val collection = client
|
||||
.getDatabase(meta["mongoConfig"]?.get("databaseName")?.string ?: DefaultMongoConfig.databaseName)
|
||||
.getCollection<DeviceMessage>()
|
||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||
context.launch {
|
||||
collection.insertOne(Json.encodeToString(message))
|
||||
}
|
||||
}.launchIn(context).apply {
|
||||
invokeOnCompletion(onCancelling = true) {
|
||||
logger.debug { "Mongo client closed" }
|
||||
client.close()
|
||||
}
|
||||
}
|
||||
}
|
17
controls-xodus/build.gradle.kts
Normal file
17
controls-xodus/build.gradle.kts
Normal file
@ -0,0 +1,17 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.jvm")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val xodusVersion = "1.3.232"
|
||||
|
||||
dependencies {
|
||||
implementation(projects.controlsCore)
|
||||
implementation(projects.magix.magixApi)
|
||||
implementation(projects.controlsMagixClient)
|
||||
implementation(projects.magix.magixServer)
|
||||
implementation(projects.xodusSerialization)
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||
}
|
@ -0,0 +1,99 @@
|
||||
package ru.mipt.npm.controls.xodus
|
||||
|
||||
import io.ktor.application.Application
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.coroutines.InternalCoroutinesApi
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.coroutines.job
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.context.debug
|
||||
import space.kscience.dataforge.context.logger
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.names.Name
|
||||
|
||||
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
||||
private val XODUS_STORE_PROPERTY = Name.of("xodus", "entityStorePath")
|
||||
|
||||
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
||||
val storePath = meta[XODUS_STORE_PROPERTY]?.string
|
||||
?: properties[XODUS_STORE_PROPERTY]?.string
|
||||
?: DEFAULT_XODUS_STORE_PATH
|
||||
|
||||
return PersistentEntityStores.newInstance(storePath)
|
||||
}
|
||||
|
||||
internal val defaultPersistentStoreFactory = object : Factory<PersistentEntityStore> {
|
||||
override fun invoke(meta: Meta, context: Context): PersistentEntityStore = context.getPersistentEntityStore(meta)
|
||||
}
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
public fun DeviceManager.storeMessagesInXodus(
|
||||
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
||||
): Job {
|
||||
val entityStore = factory(Meta.EMPTY, context)
|
||||
logger.debug { "Device entity store opened" }
|
||||
|
||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||
entityStore.encodeToEntity(message, "DeviceMessage")
|
||||
}.launchIn(context).apply {
|
||||
invokeOnCompletion(onCancelling = true) {
|
||||
entityStore.close()
|
||||
logger.debug { "Device entity store closed" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//public fun CoroutineScope.startMagixServer(
|
||||
// entityStore: PersistentEntityStore,
|
||||
// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||
// port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
|
||||
// buffer: Int = 100,
|
||||
// enableRawRSocket: Boolean = true,
|
||||
// enableZmq: Boolean = true,
|
||||
// applicationConfiguration: Application.(MutableSharedFlow<GenericMagixMessage>) -> Unit = {},
|
||||
//): ApplicationEngine = startMagixServer(
|
||||
// port, buffer, enableRawRSocket, enableZmq
|
||||
//) { flow ->
|
||||
// applicationConfiguration(flow)
|
||||
// flow.filter(flowFilter).onEach { message ->
|
||||
// entityStore.executeInTransaction { txn ->
|
||||
// val entity = txn.newEntity("MagixMessage")
|
||||
// entity.setProperty("value", message.toString())
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
internal fun Flow<GenericMagixMessage>.storeInXodus(
|
||||
entityStore: PersistentEntityStore,
|
||||
flowFilter: (GenericMagixMessage) -> Boolean = { true },
|
||||
) {
|
||||
filter(flowFilter).onEach { message ->
|
||||
entityStore.encodeToEntity(message, "MagixMessage")
|
||||
}
|
||||
}
|
||||
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
public fun Application.storeInXodus(
|
||||
flow: MutableSharedFlow<GenericMagixMessage>,
|
||||
meta: Meta = Meta.EMPTY,
|
||||
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
||||
flowFilter: (GenericMagixMessage) -> Boolean = { true },
|
||||
) {
|
||||
val entityStore = factory(meta)
|
||||
|
||||
flow.storeInXodus(entityStore, flowFilter)
|
||||
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
|
||||
entityStore.close()
|
||||
}
|
||||
}
|
@ -0,0 +1,73 @@
|
||||
package ru.mipt.npm.controls.xodus
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.meta.MetaSerializer
|
||||
import space.kscience.dataforge.meta.isLeaf
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.values.Value
|
||||
import space.kscience.dataforge.values.ValueType
|
||||
|
||||
internal fun PropertyChangedMessage.toEntity(transaction: StoreTransaction): Entity {
|
||||
val entity = transaction.newEntity("PropertyChangedMessage")
|
||||
entity.setProperty("property", property)
|
||||
entity.setProperty("value", value.toString())
|
||||
entity.setProperty("sourceDevice", sourceDevice.toString())
|
||||
targetDevice?.let { entity.setProperty("targetDevice", it.toString()) }
|
||||
comment?.let { entity.setProperty("comment", it) }
|
||||
time?.let { entity.setProperty("time", it.toEpochMilliseconds()) }
|
||||
return entity
|
||||
}
|
||||
|
||||
internal fun Entity.toPropertyChangedMessage(): PropertyChangedMessage? {
|
||||
if (getProperty("property") == null || getProperty("value") == null || getProperty("sourceDevice") == null) {
|
||||
return null
|
||||
}
|
||||
|
||||
return PropertyChangedMessage(
|
||||
getProperty("property") as String,
|
||||
Json.decodeFromString(MetaSerializer, getProperty("value") as String),
|
||||
Name.parse(getProperty("sourceDevice") as String),
|
||||
getProperty("targetDevice")?.let { Name.parse(it as String) },
|
||||
getProperty("comment")?.let { it as String },
|
||||
getProperty("time")?.let { Instant.fromEpochMilliseconds(it as Long) }
|
||||
)
|
||||
}
|
||||
|
||||
internal fun <T> MagixMessage<T>.toEntity(transaction: StoreTransaction): Entity {
|
||||
val entity = transaction.newEntity("MagixMessage")
|
||||
entity.setProperty("format", format)
|
||||
entity.setProperty("origin", origin)
|
||||
if (payload is PropertyChangedMessage) {
|
||||
val payloadEntity = (payload as PropertyChangedMessage).toEntity(transaction)
|
||||
entity.setLink("payload", payloadEntity)
|
||||
}
|
||||
target?.let { entity.setProperty("target", it) }
|
||||
id?.let { entity.setProperty("id", it) }
|
||||
parentId?.let { entity.setProperty("parentId", it) }
|
||||
user?.let { entity.setProperty("user", it.toString()) }
|
||||
return entity
|
||||
}
|
||||
|
||||
internal fun Entity.toMagixMessage(): MagixMessage<PropertyChangedMessage>? {
|
||||
if (getProperty("format") == null || getProperty("origin") == null) {
|
||||
return null
|
||||
}
|
||||
|
||||
return getLink("payload")?.toPropertyChangedMessage()?.let { propertyChangedMessage ->
|
||||
MagixMessage(
|
||||
getProperty("format") as String,
|
||||
getProperty("origin") as String,
|
||||
propertyChangedMessage,
|
||||
getProperty("target")?.let { it as String },
|
||||
getProperty("id")?.let { it as String },
|
||||
getProperty("parentId")?.let { it as String },
|
||||
getProperty("user")?.let { Json.decodeFromString(JsonElement.serializer(), it as String) }
|
||||
)
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package ru.mipt.npm.controls.xodus.util
|
||||
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.datetime.Instant
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.xodus.toPropertyChangedMessage
|
||||
|
||||
public fun StoreTransaction.selectPropertyChangedMessagesFromRange(
|
||||
range: ClosedRange<Instant>
|
||||
): List<PropertyChangedMessage> = find(
|
||||
"PropertyChangedMessage",
|
||||
"time",
|
||||
range.start.toEpochMilliseconds(),
|
||||
range.endInclusive.toEpochMilliseconds()
|
||||
).mapNotNull { it.toPropertyChangedMessage() }
|
@ -0,0 +1,60 @@
|
||||
package ru.mipt.npm.controls.xodus
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.JsonPrimitive
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import java.io.File
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
internal class ConvertersTest {
|
||||
companion object {
|
||||
private val storeName = ".converters_test"
|
||||
private val entityStore = PersistentEntityStores.newInstance(storeName)
|
||||
private val expectedMessage = MagixMessage(
|
||||
"dataforge",
|
||||
"dataforge",
|
||||
PropertyChangedMessage(
|
||||
"acceleration",
|
||||
Meta {
|
||||
"x" put 3.0
|
||||
"y" put 9.0
|
||||
},
|
||||
Name.parse("virtual-car"),
|
||||
Name.parse("magix-virtual-car"),
|
||||
time = Instant.fromEpochMilliseconds(1337)
|
||||
),
|
||||
"magix-virtual-car",
|
||||
user = JsonObject(content = mapOf(Pair("name", JsonPrimitive("SCADA"))))
|
||||
)
|
||||
|
||||
@BeforeAll
|
||||
@JvmStatic
|
||||
fun createEntities() {
|
||||
entityStore.executeInTransaction {
|
||||
expectedMessage.toEntity(it)
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@JvmStatic
|
||||
fun deleteDatabase() {
|
||||
entityStore.close()
|
||||
File(storeName).deleteRecursively()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testMagixMessageAndPropertyChangedMessageConverters() {
|
||||
assertEquals(expectedMessage, entityStore.computeInReadonlyTransaction {
|
||||
it.getAll("MagixMessage").first?.toMagixMessage()
|
||||
}!!)
|
||||
}
|
||||
}
|
@ -0,0 +1,62 @@
|
||||
package ru.mipt.npm.controls.xodus.util
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.Assertions.assertEquals
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import org.junit.jupiter.api.Test
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.xodus.toEntity
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import java.io.File
|
||||
|
||||
internal class QueriesTest {
|
||||
companion object {
|
||||
private val storeName = ".queries_test"
|
||||
private val entityStore = PersistentEntityStores.newInstance(storeName)
|
||||
|
||||
private val propertyChangedMessages = listOf(
|
||||
PropertyChangedMessage(
|
||||
"",
|
||||
Meta.EMPTY,
|
||||
time = Instant.fromEpochMilliseconds(1000)
|
||||
),
|
||||
PropertyChangedMessage(
|
||||
"",
|
||||
Meta.EMPTY,
|
||||
time = Instant.fromEpochMilliseconds(1500)
|
||||
),
|
||||
PropertyChangedMessage(
|
||||
"",
|
||||
Meta.EMPTY,
|
||||
time = Instant.fromEpochMilliseconds(2000)
|
||||
)
|
||||
)
|
||||
|
||||
@BeforeAll
|
||||
@JvmStatic
|
||||
fun createEntities() {
|
||||
entityStore.executeInTransaction { transaction ->
|
||||
propertyChangedMessages.forEach {
|
||||
it.toEntity(transaction)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@JvmStatic
|
||||
fun deleteDatabase() {
|
||||
entityStore.close()
|
||||
File(storeName).deleteRecursively()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testFromTo() {
|
||||
assertEquals(propertyChangedMessages.subList(0, 2).toSet(), entityStore.computeInReadonlyTransaction {
|
||||
it.selectPropertyChangedMessagesFromRange( Instant.fromEpochMilliseconds(1000)..Instant.fromEpochMilliseconds(1500))
|
||||
}.toSet())
|
||||
}
|
||||
|
||||
}
|
@ -19,12 +19,18 @@ dependencies {
|
||||
implementation(projects.magix.magixServer)
|
||||
implementation(projects.magix.magixRsocket)
|
||||
implementation(projects.controlsMagixClient)
|
||||
implementation(projects.controlsXodus)
|
||||
implementation(projects.controlsMongo)
|
||||
|
||||
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
|
||||
implementation("no.tornado:tornadofx:1.7.20")
|
||||
implementation("space.kscience:plotlykt-server:0.5.0-dev-1")
|
||||
implementation("ch.qos.logback:logback-classic:1.2.3")
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:1.3.232")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:1.3.232")
|
||||
implementation("org.litote.kmongo:kmongo-coroutine-serialization:4.4.0")
|
||||
}
|
||||
|
||||
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
|
||||
|
@ -14,8 +14,7 @@ import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.names.Name
|
||||
import kotlin.time.ExperimentalTime
|
||||
|
||||
class MagixVirtualCar(context: Context, meta: Meta)
|
||||
: VirtualCar(context, meta) {
|
||||
class MagixVirtualCar(context: Context, meta: Meta) : VirtualCar(context, meta) {
|
||||
|
||||
private suspend fun MagixEndpoint<DeviceMessage>.startMagixVirtualCarUpdate() {
|
||||
launch {
|
||||
|
@ -1,42 +1,70 @@
|
||||
package ru.mipt.npm.controls.demo.car
|
||||
|
||||
import io.ktor.server.engine.*
|
||||
import io.ktor.server.engine.ApplicationEngine
|
||||
import javafx.beans.property.DoubleProperty
|
||||
import javafx.scene.Parent
|
||||
import javafx.scene.control.TextField
|
||||
import javafx.scene.layout.Priority
|
||||
import javafx.stage.Stage
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.client.connectToMagix
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.install
|
||||
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
||||
import ru.mipt.npm.controls.mongo.MongoClientFactory
|
||||
import ru.mipt.npm.controls.mongo.connectMongo
|
||||
import ru.mipt.npm.controls.xodus.storeInXodus
|
||||
import ru.mipt.npm.controls.xodus.storeMessagesInXodus
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||
import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
|
||||
import ru.mipt.npm.magix.server.startMagixServer
|
||||
import space.kscience.dataforge.context.*
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import tornadofx.*
|
||||
import java.nio.file.Paths
|
||||
|
||||
internal object VirtualCarControllerConfig {
|
||||
val deviceEntityStorePath = Paths.get(".messages")
|
||||
val magixEntityStorePath = Paths.get(".server_messages")
|
||||
}
|
||||
|
||||
class VirtualCarController : Controller(), ContextAware {
|
||||
|
||||
var virtualCar: VirtualCar? = null
|
||||
var magixVirtualCar: MagixVirtualCar? = null
|
||||
var magixServer: ApplicationEngine? = null
|
||||
var xodusStorageJob: Job? = null
|
||||
var mongoStorageJob: Job? = null
|
||||
|
||||
override val context = Context("demoDevice") {
|
||||
plugin(DeviceManager)
|
||||
}
|
||||
|
||||
private val deviceManager = context.fetch(DeviceManager)
|
||||
private val deviceManager = context.fetch(DeviceManager, Meta {
|
||||
"xodusConfig" put {
|
||||
"entityStorePath" put VirtualCarControllerConfig.deviceEntityStorePath.toString()
|
||||
}
|
||||
})
|
||||
|
||||
fun init() {
|
||||
context.launch {
|
||||
virtualCar = deviceManager.install("virtual-car", VirtualCar)
|
||||
//starting magix event loop
|
||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
|
||||
|
||||
//starting magix event loop and connect it to entity store
|
||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
||||
storeInXodus( flow, Meta {
|
||||
"xodusConfig" put {
|
||||
"entityStorePath" put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
||||
}
|
||||
})
|
||||
}
|
||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||
//connect to device entity store
|
||||
xodusStorageJob = deviceManager.storeMessagesInXodus()
|
||||
//Create mongo client and connect to MongoDB
|
||||
mongoStorageJob = deviceManager.connectMongo(MongoClientFactory)
|
||||
//Launch device client and connect it to the server
|
||||
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
||||
deviceManager.connectToMagix(deviceEndpoint)
|
||||
|
@ -19,6 +19,7 @@ kscience{
|
||||
}
|
||||
|
||||
val ktorVersion: String by rootProject.extra
|
||||
val dataforgeVersion: String by extra
|
||||
|
||||
dependencies {
|
||||
implementation(project(":controls-tcp"))
|
||||
|
@ -16,7 +16,7 @@ import ru.mipt.npm.controls.controllers.installing
|
||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.maxPosition
|
||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.minPosition
|
||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.position
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.fetch
|
||||
import tornadofx.*
|
||||
|
||||
@ -24,7 +24,7 @@ class PiMotionMasterApp : App(PiMotionMasterView::class)
|
||||
|
||||
class PiMotionMasterController : Controller() {
|
||||
//initialize context
|
||||
val context = Global.buildContext("piMotionMaster"){
|
||||
val context = Context("piMotionMaster"){
|
||||
plugin(DeviceManager)
|
||||
}
|
||||
|
||||
|
@ -50,5 +50,9 @@ include(
|
||||
":magix:magix-zmq",
|
||||
":magix:magix-demo",
|
||||
":controls-magix-client",
|
||||
":motors"
|
||||
)
|
||||
":motors",
|
||||
":controls-xodus",
|
||||
":controls-mongo",
|
||||
":xodus-serialization"
|
||||
)
|
||||
include("xodus-serialization")
|
||||
|
22
xodus-serialization/build.gradle.kts
Normal file
22
xodus-serialization/build.gradle.kts
Normal file
@ -0,0 +1,22 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.jvm")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val xodusVersion = "1.3.232"
|
||||
|
||||
//TODO to be moved to DataForge
|
||||
|
||||
kscience {
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(projects.magix.magixApi)
|
||||
implementation(projects.controlsCore)
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||
}
|
@ -0,0 +1,61 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.EntityId
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.serialization.DeserializationStrategy
|
||||
import kotlinx.serialization.json.*
|
||||
import kotlinx.serialization.serializer
|
||||
|
||||
internal fun StoreTransaction.decodeFromEntity(entity: Entity): JsonElement = buildJsonObject {
|
||||
entity.propertyNames.forEach { property ->
|
||||
entity.getProperty(property).let { value ->
|
||||
when (value) {
|
||||
is Number -> put(property, value)
|
||||
is Boolean -> put(property, value)
|
||||
is String -> put(property, value)
|
||||
else -> throw IllegalStateException("Unsupported type for primitive field")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
entity.linkNames.forEach { link ->
|
||||
entity.getLinks(link).let { entities ->
|
||||
when (entities.size()) {
|
||||
1L -> entities.first?.let { put(link, decodeFromEntity(it)) }
|
||||
else -> {
|
||||
putJsonArray(link) {
|
||||
entities.forEach {
|
||||
add(decodeFromEntity(it))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public fun <T> StoreTransaction.decodeFromEntity(entity: Entity, deserializer: DeserializationStrategy<T>): T {
|
||||
val jsonElement = decodeFromEntity(entity)
|
||||
return Json.decodeFromJsonElement(deserializer, jsonElement)
|
||||
}
|
||||
|
||||
public inline fun <reified T> StoreTransaction.decodeFromEntity(entity: Entity): T = decodeFromEntity(entity, serializer())
|
||||
|
||||
// First entity with entityType will be decoded
|
||||
public fun <T> PersistentEntityStore.decodeFromEntity(entityType: String, deserializer: DeserializationStrategy<T>): T? {
|
||||
return computeInTransaction { txn ->
|
||||
txn.getAll(entityType).first?.let { txn.decodeFromEntity(it, deserializer) }
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> PersistentEntityStore.decodeFromEntity(entityType: String): T? = decodeFromEntity(entityType, serializer())
|
||||
|
||||
public fun <T> PersistentEntityStore.decodeFromEntity(entityId: EntityId, deserializer: DeserializationStrategy<T>): T? {
|
||||
return computeInTransaction { txn ->
|
||||
txn.decodeFromEntity(txn.getEntity(entityId), deserializer)
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> PersistentEntityStore.decodeFromEntity(entityId: EntityId): T? = decodeFromEntity(entityId, serializer())
|
@ -0,0 +1,71 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.EntityId
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.serialization.SerializationStrategy
|
||||
import kotlinx.serialization.json.*
|
||||
import kotlinx.serialization.serializer
|
||||
|
||||
internal fun StoreTransaction.encodeToEntity(jsonElement: JsonElement, entity: Entity) {
|
||||
when (jsonElement) {
|
||||
is JsonPrimitive -> throw IllegalStateException("Can't serialize primitive value to entity")
|
||||
is JsonArray -> throw IllegalStateException("Can't serialize array value to entity")
|
||||
is JsonObject -> {
|
||||
jsonElement.forEach { entry ->
|
||||
entry.value.let { value ->
|
||||
when(value) {
|
||||
// не сможем десериализовать, если JsonNull (надо ли обрабатывать???) (можно сохранить в отдельный список ключи null-ов)
|
||||
is JsonPrimitive -> {
|
||||
if (value.isString) {
|
||||
entity.setProperty(entry.key, value.content)
|
||||
} else {
|
||||
(value.longOrNull ?: value.doubleOrNull ?: value.booleanOrNull)?.let {
|
||||
entity.setProperty(
|
||||
entry.key,
|
||||
it
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// считаем, что все элементы массива - JsonObject, иначе не можем напрямую сериализовать (надо придывать костыли???)
|
||||
// не сможем десериализовать, если массив пустой (надо ли обрабатывать???) (можно сохранять в отдельный список ключи пустых массивов)
|
||||
is JsonArray -> {
|
||||
value.forEach { element ->
|
||||
val childEntity = newEntity("${entity.type}.${entry.key}")
|
||||
encodeToEntity(element, childEntity)
|
||||
entity.addLink(entry.key, childEntity)
|
||||
}
|
||||
}
|
||||
|
||||
is JsonObject -> {
|
||||
val childEntity = newEntity("${entity.type}.${entry.key}")
|
||||
encodeToEntity(value, childEntity)
|
||||
entity.setLink(entry.key, childEntity)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public fun <T> StoreTransaction.encodeToEntity(serializer: SerializationStrategy<T>, value: T, entityType: String): Entity {
|
||||
val entity: Entity = newEntity(entityType)
|
||||
encodeToEntity(Json.encodeToJsonElement(serializer, value), entity)
|
||||
return entity
|
||||
}
|
||||
|
||||
public inline fun <reified T> StoreTransaction.encodeToEntity(value: T, entityType: String): Entity =
|
||||
encodeToEntity(serializer(), value, entityType)
|
||||
|
||||
public fun <T> PersistentEntityStore.encodeToEntity(serializer: SerializationStrategy<T>, value: T, entityType: String): EntityId {
|
||||
return computeInTransaction { txn ->
|
||||
txn.encodeToEntity(serializer, value, entityType).id
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> PersistentEntityStore.encodeToEntity(value: T, entityType: String): EntityId =
|
||||
encodeToEntity(serializer(), value, entityType)
|
@ -0,0 +1,38 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.JsonPrimitive
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import java.nio.file.Paths
|
||||
|
||||
internal fun main() {
|
||||
val expectedMessage = MagixMessage(
|
||||
"dataforge",
|
||||
"dataforge",
|
||||
PropertyChangedMessage(
|
||||
"acceleration",
|
||||
Meta {
|
||||
"x" put 3.0
|
||||
"y" put 9.0
|
||||
},
|
||||
Name.parse("virtual-car"),
|
||||
Name.parse("magix-virtual-car"),
|
||||
time = Instant.fromEpochMilliseconds(1337)
|
||||
),
|
||||
"magix-virtual-car"
|
||||
)
|
||||
|
||||
val entityStore = PersistentEntityStores.newInstance(Paths.get(".xodus_serialization").toString())
|
||||
entityStore.executeInTransaction { txn ->
|
||||
txn.encodeToEntity(expectedMessage, "MagixMessage")
|
||||
}
|
||||
|
||||
entityStore.executeInTransaction { txn ->
|
||||
txn.getAll("MagixMessage").first?.let { println(txn.decodeFromEntity<MagixMessage<PropertyChangedMessage>>(it) == expectedMessage) }
|
||||
}
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.json.*
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertThrows
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import java.nio.file.Paths
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
internal class EncoderDecoderTests {
|
||||
companion object {
|
||||
private val storePath = Paths.get(".xodus_serialization_test")
|
||||
private val entityStore = PersistentEntityStores.newInstance(storePath.toString())
|
||||
|
||||
@AfterAll
|
||||
@JvmStatic
|
||||
fun deleteDatabase() {
|
||||
entityStore.close()
|
||||
storePath.toFile().deleteRecursively()
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
fun clearDatabase() {
|
||||
entityStore.clear()
|
||||
}
|
||||
|
||||
fun checkEncodingDecodingCorrectness(json: JsonObject) {
|
||||
val id = entityStore.encodeToEntity(json, "JsonObject")
|
||||
assertEquals(json, entityStore.decodeFromEntity(id))
|
||||
}
|
||||
|
||||
fun checkEncodingDecodingCorrectness(jsons: List<JsonObject>) = jsons.forEach {
|
||||
checkEncodingDecodingCorrectness(it)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `encoder throw Illegal exception if input is not a JsonObject`() {
|
||||
assertThrows<IllegalStateException> {
|
||||
val json = JsonPrimitive(0)
|
||||
entityStore.encodeToEntity(json, "JsonPrimitive")
|
||||
}
|
||||
|
||||
assertThrows<IllegalStateException> {
|
||||
val json = buildJsonArray {}
|
||||
entityStore.encodeToEntity(json, "JsonArray")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `correctly work with underlying JsonPrimitive`() {
|
||||
val jsonLong = buildJsonObject { put("value", 0) }
|
||||
val jsonDouble = buildJsonObject { put("value", 0.0) }
|
||||
val jsonBoolean = buildJsonObject { put("value", true) }
|
||||
val jsonString = buildJsonObject { put("value", "") }
|
||||
|
||||
checkEncodingDecodingCorrectness(listOf(jsonLong, jsonDouble, jsonBoolean, jsonString))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `correctly work with underlying JsonArray`() {
|
||||
checkEncodingDecodingCorrectness(buildJsonObject { putJsonArray("value") {
|
||||
add(buildJsonObject { put("value", 0) })
|
||||
add(buildJsonObject { put("value", 0.0) })
|
||||
} })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `correctly work with underlying JsonObject`() {
|
||||
checkEncodingDecodingCorrectness(buildJsonObject {
|
||||
putJsonObject("value", { put("value", true) })
|
||||
})
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testMagixMessagePropertyChangedMessage() {
|
||||
val expectedMessage = MagixMessage(
|
||||
"dataforge",
|
||||
"dataforge",
|
||||
PropertyChangedMessage(
|
||||
"acceleration",
|
||||
Meta {
|
||||
"x" put 3.0
|
||||
"y" put 9.0
|
||||
},
|
||||
Name.parse("virtual-car"),
|
||||
Name.parse("magix-virtual-car"),
|
||||
time = Instant.fromEpochMilliseconds(1337)
|
||||
),
|
||||
"magix-virtual-car",
|
||||
user = buildJsonObject { put("name", "SCADA") }
|
||||
)
|
||||
|
||||
val id = entityStore.encodeToEntity(expectedMessage, "MagixMessage")
|
||||
assertEquals(expectedMessage, entityStore.decodeFromEntity<MagixMessage<PropertyChangedMessage>>(id))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user