Refactor synchronous storage api
This commit is contained in:
parent
3ebbec35fe
commit
89bb132e36
27
controls-storage/build.gradle.kts
Normal file
27
controls-storage/build.gradle.kts
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
plugins {
|
||||||
|
id("ru.mipt.npm.gradle.mpp")
|
||||||
|
`maven-publish`
|
||||||
|
}
|
||||||
|
|
||||||
|
val dataforgeVersion: String by rootProject.extra
|
||||||
|
val kotlinx_io_version = "0.1.1"
|
||||||
|
|
||||||
|
kotlin {
|
||||||
|
sourceSets {
|
||||||
|
commonMain {
|
||||||
|
dependencies {
|
||||||
|
implementation(projects.controlsCore)
|
||||||
|
implementation("org.jetbrains.kotlinx:kotlinx-io:$kotlinx_io_version")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
jvmMain {
|
||||||
|
dependencies {
|
||||||
|
implementation(projects.magix.magixApi)
|
||||||
|
implementation(projects.controlsMagixClient)
|
||||||
|
implementation(projects.magix.magixServer)
|
||||||
|
implementation("org.jetbrains.kotlinx:kotlinx-io-jvm:$kotlinx_io_version")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
package ru.mipt.npm.controls.storage.synchronous
|
||||||
|
|
||||||
|
import kotlinx.io.core.Closeable
|
||||||
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
public interface SynchronousStorageClient : Closeable {
|
||||||
|
public fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>)
|
||||||
|
|
||||||
|
public fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
||||||
|
}
|
||||||
|
|
||||||
|
public inline fun <reified T : Any> SynchronousStorageClient.storeValue(value: T, storageKind: StorageKind): Unit =
|
||||||
|
storeValue(value, storageKind, T::class)
|
@ -0,0 +1,61 @@
|
|||||||
|
package ru.mipt.npm.controls.storage.synchronous
|
||||||
|
|
||||||
|
import kotlinx.coroutines.InternalCoroutinesApi
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
|
import kotlinx.coroutines.flow.*
|
||||||
|
import kotlinx.io.core.use
|
||||||
|
import ru.mipt.npm.controls.api.DeviceMessage
|
||||||
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
|
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||||
|
import space.kscience.dataforge.context.Factory
|
||||||
|
import space.kscience.dataforge.context.debug
|
||||||
|
import space.kscience.dataforge.context.logger
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
|
||||||
|
public enum class StorageKind {
|
||||||
|
DEVICE_HUB,
|
||||||
|
MAGIX_SERVER
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Begin to store DeviceMessages from this DeviceManager
|
||||||
|
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
|
||||||
|
* DeviceManager's meta and context will be used for in invoke method.
|
||||||
|
* @param filterCondition allow you to specify messages which we want to store. Always true by default.
|
||||||
|
* @return Job which responsible for our storage
|
||||||
|
*/
|
||||||
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
|
public fun DeviceManager.storeMessages(
|
||||||
|
factory: Factory<SynchronousStorageClient>,
|
||||||
|
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
||||||
|
): Job {
|
||||||
|
val client = factory(meta, context)
|
||||||
|
logger.debug { "Storage client created" }
|
||||||
|
|
||||||
|
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||||
|
client.storeValue(message, StorageKind.DEVICE_HUB)
|
||||||
|
}.launchIn(context).apply {
|
||||||
|
invokeOnCompletion(onCancelling = true) {
|
||||||
|
client.close()
|
||||||
|
logger.debug { "Storage client closed" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
|
||||||
|
* @param sourceDeviceName a name of device, history of which property we want to get
|
||||||
|
* @param propertyName a name of property, history of which we want to get
|
||||||
|
* @param factory a factory that produce mongo clients
|
||||||
|
*/
|
||||||
|
public fun getPropertyHistory(
|
||||||
|
sourceDeviceName: String,
|
||||||
|
propertyName: String,
|
||||||
|
factory: Factory<SynchronousStorageClient>,
|
||||||
|
meta: Meta = Meta.EMPTY
|
||||||
|
): List<PropertyChangedMessage> {
|
||||||
|
return factory(meta).use {
|
||||||
|
it.getPropertyHistory(sourceDeviceName, propertyName)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,42 @@
|
|||||||
|
package ru.mipt.npm.controls.storage.synchronous
|
||||||
|
|
||||||
|
import io.ktor.application.*
|
||||||
|
import kotlinx.coroutines.InternalCoroutinesApi
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
|
import kotlinx.coroutines.flow.filter
|
||||||
|
import kotlinx.coroutines.flow.onEach
|
||||||
|
import kotlinx.coroutines.job
|
||||||
|
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||||
|
import space.kscience.dataforge.context.Factory
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
|
||||||
|
internal fun Flow<GenericMagixMessage>.store(
|
||||||
|
client: SynchronousStorageClient,
|
||||||
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
|
) {
|
||||||
|
filter(flowFilter).onEach { message ->
|
||||||
|
client.storeValue(message, StorageKind.MAGIX_SERVER)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Begin to store MagixMessages from certain flow
|
||||||
|
* @param flow flow of messages which we will store
|
||||||
|
* @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory
|
||||||
|
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
|
||||||
|
* @param flowFilter allow you to specify messages which we want to store. Always true by default.
|
||||||
|
*/
|
||||||
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
|
public fun Application.store(
|
||||||
|
flow: MutableSharedFlow<GenericMagixMessage>,
|
||||||
|
meta: Meta = Meta.EMPTY,
|
||||||
|
factory: Factory<SynchronousStorageClient>,
|
||||||
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
|
) {
|
||||||
|
val client = factory(meta)
|
||||||
|
|
||||||
|
flow.store(client, flowFilter)
|
||||||
|
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
@ -1,26 +0,0 @@
|
|||||||
# Description
|
|
||||||
This module allows you to store [DeviceMessages](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceMessage.kt)
|
|
||||||
from certain [DeviceManager](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt)
|
|
||||||
or [MagixMessages](magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt)
|
|
||||||
from [magix server](/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt)
|
|
||||||
in [xodus database](https://github.com/JetBrains/xodus).
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
|
|
||||||
All usage examples can be found in [VirtualCarController](/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt).
|
|
||||||
|
|
||||||
## Storage from Device Manager
|
|
||||||
|
|
||||||
Just call connectMongo. For more details, you can see comments in [source code](/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt)
|
|
||||||
|
|
||||||
## Storage from Magix Server
|
|
||||||
|
|
||||||
Just pass such lambda as parameter to startMagixServer:
|
|
||||||
```kotlin
|
|
||||||
{ flow ->
|
|
||||||
// some code
|
|
||||||
storeInXodus(flow)
|
|
||||||
// some code
|
|
||||||
}
|
|
||||||
```
|
|
||||||
For more details, you can see comments in [source code](/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt)
|
|
@ -6,11 +6,9 @@ plugins {
|
|||||||
val xodusVersion = "1.3.232"
|
val xodusVersion = "1.3.232"
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(projects.controlsCore)
|
|
||||||
implementation(projects.magix.magixApi)
|
|
||||||
implementation(projects.controlsMagixClient)
|
|
||||||
implementation(projects.magix.magixServer)
|
|
||||||
implementation(projects.xodusSerialization)
|
implementation(projects.xodusSerialization)
|
||||||
|
implementation(projects.controlsStorage)
|
||||||
|
implementation(projects.controlsCore)
|
||||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||||
|
@ -0,0 +1,66 @@
|
|||||||
|
package ru.mipt.npm.controls.xodus
|
||||||
|
|
||||||
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
|
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||||
|
import kotlinx.datetime.Instant
|
||||||
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
|
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
||||||
|
import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient
|
||||||
|
import ru.mipt.npm.xodus.serialization.json.decodeFromEntity
|
||||||
|
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.Factory
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.get
|
||||||
|
import space.kscience.dataforge.meta.string
|
||||||
|
import space.kscience.dataforge.names.Name
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
||||||
|
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath")
|
||||||
|
|
||||||
|
private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage"
|
||||||
|
private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage"
|
||||||
|
|
||||||
|
internal class SynchronousXodusClient(private val entityStore: PersistentEntityStore) : SynchronousStorageClient {
|
||||||
|
override fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>) {
|
||||||
|
val entityType = when (storageKind) {
|
||||||
|
StorageKind.DEVICE_HUB -> DEVICE_HUB_ENTITY_TYPE
|
||||||
|
StorageKind.MAGIX_SERVER -> MAGIX_SERVER_ENTITY_TYPE
|
||||||
|
}
|
||||||
|
|
||||||
|
entityStore.encodeToEntity(value, entityType, clazz)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun getPropertyHistory(
|
||||||
|
sourceDeviceName: String,
|
||||||
|
propertyName: String
|
||||||
|
): List<PropertyChangedMessage> {
|
||||||
|
return entityStore.computeInTransaction { txn ->
|
||||||
|
txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed").asSequence()
|
||||||
|
.filter { it?.getProperty("sourceDevice")?.let { it == sourceDeviceName } ?: false &&
|
||||||
|
it?.getProperty("property")?.let { it == propertyName } ?: false
|
||||||
|
}.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } }
|
||||||
|
.toList().map { txn.decodeFromEntity(it) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
entityStore.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
||||||
|
val storePath = meta[XODUS_STORE_PROPERTY]?.string
|
||||||
|
?: properties[XODUS_STORE_PROPERTY]?.string
|
||||||
|
?: DEFAULT_XODUS_STORE_PATH
|
||||||
|
|
||||||
|
return PersistentEntityStores.newInstance(storePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
public object DefaultSynchronousXodusClientFactory : Factory<SynchronousStorageClient> {
|
||||||
|
override fun invoke(meta: Meta, context: Context): SynchronousStorageClient {
|
||||||
|
val entityStore = context.getPersistentEntityStore(meta)
|
||||||
|
return SynchronousXodusClient(entityStore)
|
||||||
|
}
|
||||||
|
}
|
@ -1,112 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.xodus
|
|
||||||
|
|
||||||
import io.ktor.application.Application
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
|
||||||
import kotlinx.coroutines.Job
|
|
||||||
import kotlinx.coroutines.flow.*
|
|
||||||
import kotlinx.coroutines.job
|
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
|
||||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
|
||||||
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
|
||||||
import space.kscience.dataforge.context.Context
|
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.context.debug
|
|
||||||
import space.kscience.dataforge.context.logger
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.get
|
|
||||||
import space.kscience.dataforge.meta.string
|
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
|
|
||||||
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
|
||||||
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath")
|
|
||||||
|
|
||||||
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
|
||||||
val storePath = meta[XODUS_STORE_PROPERTY]?.string
|
|
||||||
?: properties[XODUS_STORE_PROPERTY]?.string
|
|
||||||
?: DEFAULT_XODUS_STORE_PATH
|
|
||||||
|
|
||||||
return PersistentEntityStores.newInstance(storePath)
|
|
||||||
}
|
|
||||||
|
|
||||||
internal val defaultPersistentStoreFactory = object : Factory<PersistentEntityStore> {
|
|
||||||
override fun invoke(meta: Meta, context: Context): PersistentEntityStore = context.getPersistentEntityStore(meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Begin to store DeviceMessages from this DeviceManager
|
|
||||||
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
|
|
||||||
* DeviceManager's meta and context will be used for in invoke method.
|
|
||||||
* @param filterCondition allow you to specify messages which we want to store. Always true by default.
|
|
||||||
* @return Job which responsible for our storage
|
|
||||||
*/
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
|
||||||
public fun DeviceManager.storeMessagesInXodus(
|
|
||||||
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
|
||||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
|
||||||
): Job {
|
|
||||||
val entityStore = factory(meta, context)
|
|
||||||
logger.debug { "Device entity store opened" }
|
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
|
||||||
entityStore.encodeToEntity(message, "DeviceMessage")
|
|
||||||
}.launchIn(context).apply {
|
|
||||||
invokeOnCompletion(onCancelling = true) {
|
|
||||||
entityStore.close()
|
|
||||||
logger.debug { "Device entity store closed" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
//public fun CoroutineScope.startMagixServer(
|
|
||||||
// entityStore: PersistentEntityStore,
|
|
||||||
// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
|
||||||
// port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
|
|
||||||
// buffer: Int = 100,
|
|
||||||
// enableRawRSocket: Boolean = true,
|
|
||||||
// enableZmq: Boolean = true,
|
|
||||||
// applicationConfiguration: Application.(MutableSharedFlow<GenericMagixMessage>) -> Unit = {},
|
|
||||||
//): ApplicationEngine = startMagixServer(
|
|
||||||
// port, buffer, enableRawRSocket, enableZmq
|
|
||||||
//) { flow ->
|
|
||||||
// applicationConfiguration(flow)
|
|
||||||
// flow.filter(flowFilter).onEach { message ->
|
|
||||||
// entityStore.executeInTransaction { txn ->
|
|
||||||
// val entity = txn.newEntity("MagixMessage")
|
|
||||||
// entity.setProperty("value", message.toString())
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
internal fun Flow<GenericMagixMessage>.storeInXodus(
|
|
||||||
entityStore: PersistentEntityStore,
|
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
|
||||||
) {
|
|
||||||
filter(flowFilter).onEach { message ->
|
|
||||||
entityStore.encodeToEntity(message, "MagixMessage")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Begin to store MagixMessages from certain flow
|
|
||||||
* @param flow flow of messages which we will store
|
|
||||||
* @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory
|
|
||||||
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
|
|
||||||
* @param flowFilter allow you to specify messages which we want to store. Always true by default.
|
|
||||||
*/
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
|
||||||
public fun Application.storeInXodus(
|
|
||||||
flow: MutableSharedFlow<GenericMagixMessage>,
|
|
||||||
meta: Meta = Meta.EMPTY,
|
|
||||||
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
|
||||||
) {
|
|
||||||
val entityStore = factory(meta)
|
|
||||||
|
|
||||||
flow.storeInXodus(entityStore, flowFilter)
|
|
||||||
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
|
|
||||||
entityStore.close()
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,73 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.xodus
|
|
||||||
|
|
||||||
import jetbrains.exodus.entitystore.Entity
|
|
||||||
import jetbrains.exodus.entitystore.StoreTransaction
|
|
||||||
import kotlinx.datetime.Instant
|
|
||||||
import kotlinx.serialization.json.Json
|
|
||||||
import kotlinx.serialization.json.JsonElement
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
|
||||||
import space.kscience.dataforge.meta.MetaSerializer
|
|
||||||
import space.kscience.dataforge.meta.isLeaf
|
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
import space.kscience.dataforge.values.Value
|
|
||||||
import space.kscience.dataforge.values.ValueType
|
|
||||||
|
|
||||||
internal fun PropertyChangedMessage.toEntity(transaction: StoreTransaction): Entity {
|
|
||||||
val entity = transaction.newEntity("PropertyChangedMessage")
|
|
||||||
entity.setProperty("property", property)
|
|
||||||
entity.setProperty("value", value.toString())
|
|
||||||
entity.setProperty("sourceDevice", sourceDevice.toString())
|
|
||||||
targetDevice?.let { entity.setProperty("targetDevice", it.toString()) }
|
|
||||||
comment?.let { entity.setProperty("comment", it) }
|
|
||||||
time?.let { entity.setProperty("time", it.toEpochMilliseconds()) }
|
|
||||||
return entity
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun Entity.toPropertyChangedMessage(): PropertyChangedMessage? {
|
|
||||||
if (getProperty("property") == null || getProperty("value") == null || getProperty("sourceDevice") == null) {
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
return PropertyChangedMessage(
|
|
||||||
getProperty("property") as String,
|
|
||||||
Json.decodeFromString(MetaSerializer, getProperty("value") as String),
|
|
||||||
Name.parse(getProperty("sourceDevice") as String),
|
|
||||||
getProperty("targetDevice")?.let { Name.parse(it as String) },
|
|
||||||
getProperty("comment")?.let { it as String },
|
|
||||||
getProperty("time")?.let { Instant.fromEpochMilliseconds(it as Long) }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun <T> MagixMessage<T>.toEntity(transaction: StoreTransaction): Entity {
|
|
||||||
val entity = transaction.newEntity("MagixMessage")
|
|
||||||
entity.setProperty("format", format)
|
|
||||||
entity.setProperty("origin", origin)
|
|
||||||
if (payload is PropertyChangedMessage) {
|
|
||||||
val payloadEntity = (payload as PropertyChangedMessage).toEntity(transaction)
|
|
||||||
entity.setLink("payload", payloadEntity)
|
|
||||||
}
|
|
||||||
target?.let { entity.setProperty("target", it) }
|
|
||||||
id?.let { entity.setProperty("id", it) }
|
|
||||||
parentId?.let { entity.setProperty("parentId", it) }
|
|
||||||
user?.let { entity.setProperty("user", it.toString()) }
|
|
||||||
return entity
|
|
||||||
}
|
|
||||||
|
|
||||||
internal fun Entity.toMagixMessage(): MagixMessage<PropertyChangedMessage>? {
|
|
||||||
if (getProperty("format") == null || getProperty("origin") == null) {
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
return getLink("payload")?.toPropertyChangedMessage()?.let { propertyChangedMessage ->
|
|
||||||
MagixMessage(
|
|
||||||
getProperty("format") as String,
|
|
||||||
getProperty("origin") as String,
|
|
||||||
propertyChangedMessage,
|
|
||||||
getProperty("target")?.let { it as String },
|
|
||||||
getProperty("id")?.let { it as String },
|
|
||||||
getProperty("parentId")?.let { it as String },
|
|
||||||
getProperty("user")?.let { Json.decodeFromString(JsonElement.serializer(), it as String) }
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,43 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.xodus.util
|
|
||||||
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
|
||||||
import jetbrains.exodus.entitystore.StoreTransaction
|
|
||||||
import kotlinx.datetime.Instant
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
import ru.mipt.npm.controls.xodus.defaultPersistentStoreFactory
|
|
||||||
import ru.mipt.npm.controls.xodus.toPropertyChangedMessage
|
|
||||||
import ru.mipt.npm.xodus.serialization.json.decodeFromEntity
|
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
|
|
||||||
public fun StoreTransaction.selectPropertyChangedMessagesFromRange(
|
|
||||||
range: ClosedRange<Instant>
|
|
||||||
): List<PropertyChangedMessage> = find(
|
|
||||||
"PropertyChangedMessage",
|
|
||||||
"time",
|
|
||||||
range.start.toEpochMilliseconds(),
|
|
||||||
range.endInclusive.toEpochMilliseconds()
|
|
||||||
).mapNotNull { it.toPropertyChangedMessage() }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
|
|
||||||
* @param sourceDeviceName a name of device, history of which property we want to get
|
|
||||||
* @param propertyName a name of property, history of which we want to get
|
|
||||||
* @param factory a factory that produce mongo clients
|
|
||||||
*/
|
|
||||||
public fun getPropertyHistory(
|
|
||||||
sourceDeviceName: String,
|
|
||||||
propertyName: String,
|
|
||||||
factory: Factory<PersistentEntityStore> = defaultPersistentStoreFactory,
|
|
||||||
meta: Meta = Meta.EMPTY
|
|
||||||
): List<PropertyChangedMessage> {
|
|
||||||
return factory(meta).use { store ->
|
|
||||||
store.computeInTransaction { txn ->
|
|
||||||
txn.find("DeviceMessage", "type", "property.changed").asSequence()
|
|
||||||
.filter { it?.getProperty("sourceDevice")?.let { it == sourceDeviceName } ?: false &&
|
|
||||||
it?.getProperty("property")?.let { it == propertyName } ?: false
|
|
||||||
}.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } }
|
|
||||||
.toList().map { txn.decodeFromEntity(it) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,60 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.xodus
|
|
||||||
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
|
||||||
import kotlinx.datetime.Instant
|
|
||||||
import kotlinx.serialization.json.JsonObject
|
|
||||||
import kotlinx.serialization.json.JsonPrimitive
|
|
||||||
import org.junit.jupiter.api.AfterAll
|
|
||||||
import org.junit.jupiter.api.BeforeAll
|
|
||||||
import org.junit.jupiter.api.Test
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
import ru.mipt.npm.magix.api.MagixMessage
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.names.Name
|
|
||||||
import java.io.File
|
|
||||||
import kotlin.test.assertEquals
|
|
||||||
|
|
||||||
internal class ConvertersTest {
|
|
||||||
companion object {
|
|
||||||
private val storeName = ".converters_test"
|
|
||||||
private val entityStore = PersistentEntityStores.newInstance(storeName)
|
|
||||||
private val expectedMessage = MagixMessage(
|
|
||||||
"dataforge",
|
|
||||||
"dataforge",
|
|
||||||
PropertyChangedMessage(
|
|
||||||
"acceleration",
|
|
||||||
Meta {
|
|
||||||
"x" put 3.0
|
|
||||||
"y" put 9.0
|
|
||||||
},
|
|
||||||
Name.parse("virtual-car"),
|
|
||||||
Name.parse("magix-virtual-car"),
|
|
||||||
time = Instant.fromEpochMilliseconds(1337)
|
|
||||||
),
|
|
||||||
"magix-virtual-car",
|
|
||||||
user = JsonObject(content = mapOf(Pair("name", JsonPrimitive("SCADA"))))
|
|
||||||
)
|
|
||||||
|
|
||||||
@BeforeAll
|
|
||||||
@JvmStatic
|
|
||||||
fun createEntities() {
|
|
||||||
entityStore.executeInTransaction {
|
|
||||||
expectedMessage.toEntity(it)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterAll
|
|
||||||
@JvmStatic
|
|
||||||
fun deleteDatabase() {
|
|
||||||
entityStore.close()
|
|
||||||
File(storeName).deleteRecursively()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testMagixMessageAndPropertyChangedMessageConverters() {
|
|
||||||
assertEquals(expectedMessage, entityStore.computeInReadonlyTransaction {
|
|
||||||
it.getAll("MagixMessage").first?.toMagixMessage()
|
|
||||||
}!!)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,62 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.xodus.util
|
|
||||||
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
|
||||||
import kotlinx.datetime.Instant
|
|
||||||
import org.junit.jupiter.api.AfterAll
|
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
|
||||||
import org.junit.jupiter.api.BeforeAll
|
|
||||||
import org.junit.jupiter.api.Test
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
import ru.mipt.npm.controls.xodus.toEntity
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import java.io.File
|
|
||||||
|
|
||||||
internal class QueriesTest {
|
|
||||||
companion object {
|
|
||||||
private val storeName = ".queries_test"
|
|
||||||
private val entityStore = PersistentEntityStores.newInstance(storeName)
|
|
||||||
|
|
||||||
private val propertyChangedMessages = listOf(
|
|
||||||
PropertyChangedMessage(
|
|
||||||
"",
|
|
||||||
Meta.EMPTY,
|
|
||||||
time = Instant.fromEpochMilliseconds(1000)
|
|
||||||
),
|
|
||||||
PropertyChangedMessage(
|
|
||||||
"",
|
|
||||||
Meta.EMPTY,
|
|
||||||
time = Instant.fromEpochMilliseconds(1500)
|
|
||||||
),
|
|
||||||
PropertyChangedMessage(
|
|
||||||
"",
|
|
||||||
Meta.EMPTY,
|
|
||||||
time = Instant.fromEpochMilliseconds(2000)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
@BeforeAll
|
|
||||||
@JvmStatic
|
|
||||||
fun createEntities() {
|
|
||||||
entityStore.executeInTransaction { transaction ->
|
|
||||||
propertyChangedMessages.forEach {
|
|
||||||
it.toEntity(transaction)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterAll
|
|
||||||
@JvmStatic
|
|
||||||
fun deleteDatabase() {
|
|
||||||
entityStore.close()
|
|
||||||
File(storeName).deleteRecursively()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun testFromTo() {
|
|
||||||
assertEquals(propertyChangedMessages.subList(0, 2).toSet(), entityStore.computeInReadonlyTransaction {
|
|
||||||
it.selectPropertyChangedMessagesFromRange( Instant.fromEpochMilliseconds(1000)..Instant.fromEpochMilliseconds(1500))
|
|
||||||
}.toSet())
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -21,6 +21,7 @@ dependencies {
|
|||||||
implementation(projects.controlsMagixClient)
|
implementation(projects.controlsMagixClient)
|
||||||
implementation(projects.controlsXodus)
|
implementation(projects.controlsXodus)
|
||||||
implementation(projects.controlsMongo)
|
implementation(projects.controlsMongo)
|
||||||
|
implementation(projects.controlsStorage)
|
||||||
|
|
||||||
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
||||||
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
|
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
|
||||||
|
@ -16,9 +16,10 @@ import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
|||||||
import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory
|
import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory
|
||||||
import ru.mipt.npm.controls.mongo.connectMongo
|
import ru.mipt.npm.controls.mongo.connectMongo
|
||||||
import ru.mipt.npm.controls.mongo.storeInMongo
|
import ru.mipt.npm.controls.mongo.storeInMongo
|
||||||
|
import ru.mipt.npm.controls.storage.synchronous.store
|
||||||
|
import ru.mipt.npm.controls.storage.synchronous.storeMessages
|
||||||
|
import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory
|
||||||
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
||||||
import ru.mipt.npm.controls.xodus.storeInXodus
|
|
||||||
import ru.mipt.npm.controls.xodus.storeMessagesInXodus
|
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||||
import ru.mipt.npm.magix.server.startMagixServer
|
import ru.mipt.npm.magix.server.startMagixServer
|
||||||
@ -56,14 +57,14 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
|
|
||||||
//starting magix event loop and connect it to entity store
|
//starting magix event loop and connect it to entity store
|
||||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
||||||
storeInXodus( flow, Meta {
|
store( flow, Meta {
|
||||||
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
||||||
})
|
}, DefaultSynchronousXodusClientFactory)
|
||||||
storeInMongo(flow)
|
storeInMongo(flow)
|
||||||
}
|
}
|
||||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||||
//connect to device entity store
|
//connect to device entity store
|
||||||
xodusStorageJob = deviceManager.storeMessagesInXodus()
|
xodusStorageJob = deviceManager.storeMessages(DefaultSynchronousXodusClientFactory)
|
||||||
//Create mongo client and connect to MongoDB
|
//Create mongo client and connect to MongoDB
|
||||||
mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory)
|
mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory)
|
||||||
//Launch device client and connect it to the server
|
//Launch device client and connect it to the server
|
||||||
|
@ -53,6 +53,6 @@ include(
|
|||||||
":motors",
|
":motors",
|
||||||
":controls-xodus",
|
":controls-xodus",
|
||||||
":controls-mongo",
|
":controls-mongo",
|
||||||
":xodus-serialization"
|
":xodus-serialization",
|
||||||
|
":controls-storage"
|
||||||
)
|
)
|
||||||
include("xodus-serialization")
|
|
||||||
|
@ -38,7 +38,8 @@ internal fun StoreTransaction.decodeFromEntity(entity: Entity): JsonElement = bu
|
|||||||
|
|
||||||
public fun <T> StoreTransaction.decodeFromEntity(entity: Entity, deserializer: DeserializationStrategy<T>): T {
|
public fun <T> StoreTransaction.decodeFromEntity(entity: Entity, deserializer: DeserializationStrategy<T>): T {
|
||||||
val jsonElement = decodeFromEntity(entity)
|
val jsonElement = decodeFromEntity(entity)
|
||||||
return Json.decodeFromJsonElement(deserializer, jsonElement)
|
val json = Json { ignoreUnknownKeys = true }
|
||||||
|
return json.decodeFromJsonElement(deserializer, jsonElement)
|
||||||
}
|
}
|
||||||
|
|
||||||
public inline fun <reified T> StoreTransaction.decodeFromEntity(entity: Entity): T = decodeFromEntity(entity, serializer())
|
public inline fun <reified T> StoreTransaction.decodeFromEntity(entity: Entity): T = decodeFromEntity(entity, serializer())
|
||||||
|
@ -4,9 +4,11 @@ import jetbrains.exodus.entitystore.Entity
|
|||||||
import jetbrains.exodus.entitystore.EntityId
|
import jetbrains.exodus.entitystore.EntityId
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import jetbrains.exodus.entitystore.StoreTransaction
|
import jetbrains.exodus.entitystore.StoreTransaction
|
||||||
|
import kotlinx.serialization.InternalSerializationApi
|
||||||
import kotlinx.serialization.SerializationStrategy
|
import kotlinx.serialization.SerializationStrategy
|
||||||
import kotlinx.serialization.json.*
|
import kotlinx.serialization.json.*
|
||||||
import kotlinx.serialization.serializer
|
import kotlinx.serialization.serializer
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
internal fun StoreTransaction.encodeToEntity(jsonElement: JsonElement, entity: Entity) {
|
internal fun StoreTransaction.encodeToEntity(jsonElement: JsonElement, entity: Entity) {
|
||||||
when (jsonElement) {
|
when (jsonElement) {
|
||||||
@ -69,3 +71,7 @@ public fun <T> PersistentEntityStore.encodeToEntity(serializer: SerializationStr
|
|||||||
|
|
||||||
public inline fun <reified T> PersistentEntityStore.encodeToEntity(value: T, entityType: String): EntityId =
|
public inline fun <reified T> PersistentEntityStore.encodeToEntity(value: T, entityType: String): EntityId =
|
||||||
encodeToEntity(serializer(), value, entityType)
|
encodeToEntity(serializer(), value, entityType)
|
||||||
|
|
||||||
|
@OptIn(InternalSerializationApi::class)
|
||||||
|
public fun <T : Any> PersistentEntityStore.encodeToEntity(value: T, entityType: String, clazz: KClass<T>): EntityId = encodeToEntity(
|
||||||
|
clazz.serializer(), value, entityType)
|
||||||
|
Loading…
Reference in New Issue
Block a user