Refactor xodus storage for local history
This commit is contained in:
parent
ba4d2abd27
commit
b6f3769529
@ -6,7 +6,7 @@ plugins {
|
||||
val dataforgeVersion: String by rootProject.extra
|
||||
|
||||
kscience {
|
||||
useCoroutines("1.4.1")
|
||||
useCoroutines()
|
||||
useSerialization{
|
||||
json()
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
package ru.mipt.npm.controls.controllers
|
||||
package ru.mipt.npm.controls.manager
|
||||
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.Device
|
@ -1,4 +1,4 @@
|
||||
package ru.mipt.npm.controls.controllers
|
||||
package ru.mipt.npm.controls.manager
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.Flow
|
@ -6,9 +6,9 @@ import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||
import ru.mipt.npm.controls.controllers.respondHubMessage
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.hubMessageFlow
|
||||
import ru.mipt.npm.controls.manager.respondHubMessage
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.context.error
|
||||
@ -35,11 +35,11 @@ public fun DeviceManager.connectToMagix(
|
||||
val responsePayload = respondHubMessage(request.payload)
|
||||
if (responsePayload != null) {
|
||||
val response = MagixMessage(
|
||||
origin = endpointID,
|
||||
payload = responsePayload,
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = generateId(request),
|
||||
parentId = request.id,
|
||||
origin = endpointID,
|
||||
payload = responsePayload
|
||||
parentId = request.id
|
||||
)
|
||||
|
||||
endpoint.broadcast(response)
|
||||
@ -50,10 +50,10 @@ public fun DeviceManager.connectToMagix(
|
||||
|
||||
hubMessageFlow(this).onEach { payload ->
|
||||
val magixMessage = MagixMessage(
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]",
|
||||
origin = endpointID,
|
||||
payload = payload
|
||||
payload = payload,
|
||||
format = DATAFORGE_MAGIX_FORMAT,
|
||||
id = "df[${payload.hashCode()}]"
|
||||
)
|
||||
preSendAction(magixMessage)
|
||||
endpoint.broadcast(
|
||||
|
@ -7,7 +7,7 @@ import kotlinx.coroutines.launch
|
||||
import kotlinx.serialization.Serializable
|
||||
import ru.mipt.npm.controls.api.get
|
||||
import ru.mipt.npm.controls.api.getOrReadProperty
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.context.error
|
||||
|
@ -3,7 +3,7 @@ plugins {
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val kmongoVersion = "4.4.0"
|
||||
val kmongoVersion = "4.5.1"
|
||||
|
||||
dependencies {
|
||||
implementation(projects.controlsStorage)
|
||||
|
@ -22,7 +22,7 @@ import ru.mipt.npm.controls.api.Device
|
||||
import ru.mipt.npm.controls.api.DeviceHub
|
||||
import ru.mipt.npm.controls.api.PropertyDescriptor
|
||||
import ru.mipt.npm.controls.api.onPropertyChange
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.MetaSerializer
|
||||
import space.kscience.dataforge.names.Name
|
||||
|
@ -30,8 +30,8 @@ import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.api.PropertyGetMessage
|
||||
import ru.mipt.npm.controls.api.PropertySetMessage
|
||||
import ru.mipt.npm.controls.api.getOrNull
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.respondHubMessage
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.respondHubMessage
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||
import ru.mipt.npm.magix.server.launchMagixServerRawRSocket
|
||||
|
@ -0,0 +1,23 @@
|
||||
package ru.mipt.npm.controls.storage
|
||||
|
||||
import kotlinx.datetime.Instant
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import space.kscience.dataforge.names.Name
|
||||
|
||||
/**
|
||||
* A storage for Controls-kt [DeviceMessage]
|
||||
*/
|
||||
public interface DeviceMessageStorage {
|
||||
public suspend fun write(event: DeviceMessage)
|
||||
|
||||
public suspend fun readAll(): List<DeviceMessage>
|
||||
|
||||
public suspend fun read(
|
||||
eventType: String,
|
||||
range: ClosedRange<Instant>? = null,
|
||||
sourceDevice: Name? = null,
|
||||
targetDevice: Name? = null,
|
||||
): List<DeviceMessage>
|
||||
|
||||
public fun close()
|
||||
}
|
@ -1,20 +0,0 @@
|
||||
package ru.mipt.npm.controls.storage
|
||||
|
||||
import io.ktor.utils.io.core.Closeable
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.serializer
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
|
||||
public interface EventStorage : Closeable {
|
||||
public suspend fun <T : Any> storeDeviceMessage(value: T, serializer: KSerializer<T>)
|
||||
|
||||
public suspend fun <T : Any> storeMagixMessage(value: T, serializer: KSerializer<T>)
|
||||
|
||||
public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
||||
}
|
||||
|
||||
public suspend inline fun <reified T : Any> EventStorage.storeDeviceMessage(value: T): Unit =
|
||||
storeDeviceMessage(value, serializer())
|
||||
|
||||
public suspend inline fun <reified T : Any> EventStorage.storeMagixMessage(value: T): Unit =
|
||||
storeMagixMessage(value, serializer())
|
@ -1,20 +1,21 @@
|
||||
package ru.mipt.npm.controls.storage
|
||||
|
||||
import io.ktor.utils.io.core.use
|
||||
import kotlinx.coroutines.InternalCoroutinesApi
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onCompletion
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.hubMessageFlow
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.context.debug
|
||||
import space.kscience.dataforge.context.logger
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import kotlin.jvm.JvmName
|
||||
|
||||
//TODO replace by plugin?
|
||||
public fun DeviceManager.storage(
|
||||
factory: Factory<DeviceMessageStorage>,
|
||||
): DeviceMessageStorage = factory(meta, context)
|
||||
|
||||
/**
|
||||
* Begin to store DeviceMessages from this DeviceManager
|
||||
@ -23,45 +24,41 @@ import kotlin.jvm.JvmName
|
||||
* @param filterCondition allow you to specify messages which we want to store. Always true by default.
|
||||
* @return Job which responsible for our storage
|
||||
*/
|
||||
@OptIn(InternalCoroutinesApi::class)
|
||||
@JvmName("storeMessagesAsync")
|
||||
public fun DeviceManager.storeMessages(
|
||||
factory: Factory<EventStorage>,
|
||||
factory: Factory<DeviceMessageStorage>,
|
||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
||||
): Job {
|
||||
val client = factory(meta, context)
|
||||
logger.debug { "Storage client created" }
|
||||
val storage = factory(meta, context)
|
||||
logger.debug { "Message storage with meta = $meta created" }
|
||||
|
||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||
client.storeDeviceMessage(message)
|
||||
}.launchIn(context).apply {
|
||||
invokeOnCompletion(onCancelling = true) {
|
||||
client.close()
|
||||
logger.debug { "Storage client closed" }
|
||||
}
|
||||
}
|
||||
storage.write(message)
|
||||
}.onCompletion {
|
||||
storage.close()
|
||||
logger.debug { "Message storage closed" }
|
||||
}.launchIn(context)
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
|
||||
* @param sourceDeviceName a name of device, history of which property we want to get
|
||||
* @param propertyName a name of property, history of which we want to get
|
||||
* @param factory a factory that produce mongo clients
|
||||
*/
|
||||
public suspend fun getPropertyHistory(
|
||||
sourceDeviceName: String,
|
||||
propertyName: String,
|
||||
factory: Factory<EventStorage>,
|
||||
meta: Meta = Meta.EMPTY,
|
||||
): List<PropertyChangedMessage> {
|
||||
return factory(meta).use {
|
||||
it.getPropertyHistory(sourceDeviceName, propertyName)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public enum class StorageKind {
|
||||
DEVICE_HUB,
|
||||
MAGIX_SERVER
|
||||
}
|
||||
///**
|
||||
// * @return the list of deviceMessages that describes changes of specified property of specified device sorted by time
|
||||
// * @param sourceDeviceName a name of device, history of which property we want to get
|
||||
// * @param propertyName a name of property, history of which we want to get
|
||||
// * @param factory a factory that produce mongo clients
|
||||
// */
|
||||
//public suspend fun getPropertyHistory(
|
||||
// sourceDeviceName: String,
|
||||
// propertyName: String,
|
||||
// factory: Factory<EventStorage>,
|
||||
// meta: Meta = Meta.EMPTY,
|
||||
//): List<PropertyChangedMessage> {
|
||||
// return factory(meta).use {
|
||||
// it.getPropertyHistory(sourceDeviceName, propertyName)
|
||||
// }
|
||||
//}
|
||||
//
|
||||
//
|
||||
//public enum class StorageKind {
|
||||
// DEVICE_HUB,
|
||||
// MAGIX_SERVER
|
||||
//}
|
||||
|
||||
|
@ -0,0 +1,32 @@
|
||||
package ru.mipt.npm.controls.storage
|
||||
|
||||
import space.kscience.dataforge.context.ContextBuilder
|
||||
import space.kscience.dataforge.io.IOPlugin
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.set
|
||||
import space.kscience.dataforge.meta.string
|
||||
import java.nio.file.Path
|
||||
import kotlin.io.path.Path
|
||||
|
||||
//TODO remove on DF 0.6
|
||||
|
||||
internal val IOPlugin.Companion.WORK_DIRECTORY_KEY: String get() = ".dataforge"
|
||||
|
||||
public val IOPlugin.workDirectory: Path
|
||||
get() {
|
||||
val workDirectoryPath = meta[IOPlugin.WORK_DIRECTORY_KEY].string
|
||||
?: context.properties[IOPlugin.WORK_DIRECTORY_KEY].string
|
||||
?: ".dataforge"
|
||||
|
||||
return Path(workDirectoryPath)
|
||||
}
|
||||
|
||||
public fun ContextBuilder.workDirectory(path: String) {
|
||||
properties {
|
||||
set(IOPlugin.WORK_DIRECTORY_KEY, path)
|
||||
}
|
||||
}
|
||||
|
||||
public fun ContextBuilder.workDirectory(path: Path){
|
||||
workDirectory(path.toAbsolutePath().toString())
|
||||
}
|
@ -3,14 +3,13 @@ plugins {
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val xodusVersion = "1.3.232"
|
||||
val xodusVersion = "2.0.1"
|
||||
|
||||
dependencies {
|
||||
api(projects.xodusSerialization)
|
||||
api(projects.controlsStorage)
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||
// implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||
// implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||
|
||||
testImplementation(npmlibs.kotlinx.coroutines.test)
|
||||
}
|
||||
|
@ -0,0 +1,142 @@
|
||||
package ru.mipt.npm.controls.xodus
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.ExperimentalSerializationApi
|
||||
import kotlinx.serialization.descriptors.serialDescriptor
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.jsonObject
|
||||
import kotlinx.serialization.json.jsonPrimitive
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.storage.DeviceMessageStorage
|
||||
import ru.mipt.npm.controls.storage.workDirectory
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.context.fetch
|
||||
import space.kscience.dataforge.io.IOPlugin
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import space.kscience.dataforge.names.Name
|
||||
import space.kscience.dataforge.names.matches
|
||||
import space.kscience.dataforge.names.parseAsName
|
||||
|
||||
|
||||
internal fun StoreTransaction.writeMessage(message: DeviceMessage): Entity {
|
||||
val entity: Entity = newEntity(XodusDeviceMessageStorage.DEVICE_MESSAGE_ENTITY_TYPE)
|
||||
val json = Json.encodeToJsonElement(DeviceMessage.serializer(), message).jsonObject
|
||||
val type = json["type"]?.jsonPrimitive?.content ?: error("Message json representation must have type.")
|
||||
entity.setProperty("type", type)
|
||||
|
||||
message.sourceDevice?.let {
|
||||
entity.setProperty(DeviceMessage::sourceDevice.name, it.toString())
|
||||
}
|
||||
message.targetDevice?.let {
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||
}
|
||||
message.time?.let {
|
||||
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
|
||||
}
|
||||
entity.setBlobString("json", Json.encodeToString(json))
|
||||
|
||||
return entity
|
||||
}
|
||||
|
||||
|
||||
@OptIn(DFExperimental::class)
|
||||
private fun Entity.propertyMatchesName(propertyName: String, pattern: Name? = null) =
|
||||
pattern == null || getProperty(propertyName).toString().parseAsName().matches(pattern)
|
||||
|
||||
private fun Entity.timeInRange(range: ClosedRange<Instant>?): Boolean {
|
||||
if (range == null) return true
|
||||
val time: Instant? = getProperty(DeviceMessage::time.name)?.let { entityString ->
|
||||
Instant.parse(entityString.toString())
|
||||
}
|
||||
return time != null && time in range
|
||||
}
|
||||
|
||||
public class XodusDeviceMessageStorage(
|
||||
private val entityStore: PersistentEntityStore,
|
||||
) : DeviceMessageStorage, AutoCloseable {
|
||||
|
||||
override suspend fun write(event: DeviceMessage) {
|
||||
//entityStore.encodeToEntity(event, DEVICE_MESSAGE_ENTITY_TYPE, DeviceMessage.serializer())
|
||||
entityStore.computeInTransaction { txn ->
|
||||
txn.writeMessage(event)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInTransaction { transaction ->
|
||||
transaction.getAll(
|
||||
DEVICE_MESSAGE_ENTITY_TYPE,
|
||||
).map {
|
||||
Json.decodeFromString(
|
||||
DeviceMessage.serializer(),
|
||||
it.getBlobString("json") ?: error("No json content found")
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun read(
|
||||
eventType: String,
|
||||
range: ClosedRange<Instant>?,
|
||||
sourceDevice: Name?,
|
||||
targetDevice: Name?,
|
||||
): List<DeviceMessage> = entityStore.computeInTransaction { transaction ->
|
||||
transaction.find(
|
||||
DEVICE_MESSAGE_ENTITY_TYPE,
|
||||
"type",
|
||||
eventType
|
||||
).mapNotNull {
|
||||
if (it.timeInRange(range) &&
|
||||
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
|
||||
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
|
||||
) {
|
||||
Json.decodeFromString(
|
||||
DeviceMessage.serializer(),
|
||||
it.getBlobString("json") ?: error("No json content found")
|
||||
)
|
||||
} else null
|
||||
}
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
entityStore.close()
|
||||
}
|
||||
|
||||
public companion object : Factory<XodusDeviceMessageStorage> {
|
||||
internal const val DEVICE_MESSAGE_ENTITY_TYPE = "DeviceMessage"
|
||||
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "storagePath")
|
||||
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): XodusDeviceMessageStorage {
|
||||
val io = context.fetch(IOPlugin)
|
||||
val storePath = io.workDirectory.resolve(
|
||||
meta[XODUS_STORE_PROPERTY]?.string
|
||||
?: context.properties[XODUS_STORE_PROPERTY]?.string ?: "storage"
|
||||
)
|
||||
|
||||
val entityStore = PersistentEntityStores.newInstance(storePath.toFile())
|
||||
|
||||
return XodusDeviceMessageStorage(entityStore)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query all messages of given type
|
||||
*/
|
||||
@OptIn(ExperimentalSerializationApi::class)
|
||||
public suspend inline fun <reified T : DeviceMessage> XodusDeviceMessageStorage.query(
|
||||
range: ClosedRange<Instant>? = null,
|
||||
sourceDevice: Name? = null,
|
||||
targetDevice: Name? = null,
|
||||
): List<T> = read(serialDescriptor<T>().serialName, range, sourceDevice, targetDevice).map {
|
||||
//Check that all types are correct
|
||||
it as T
|
||||
}
|
@ -1,64 +0,0 @@
|
||||
package ru.mipt.npm.controls.xodus
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.KSerializer
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.storage.EventStorage
|
||||
import ru.mipt.npm.xodus.serialization.json.decodeFromEntity
|
||||
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.string
|
||||
import space.kscience.dataforge.names.Name
|
||||
|
||||
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
||||
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath")
|
||||
|
||||
private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage"
|
||||
private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage"
|
||||
|
||||
public class XodusEventStorage(private val entityStore: PersistentEntityStore) : EventStorage {
|
||||
override suspend fun <T : Any> storeDeviceMessage(value: T, serializer: KSerializer<T>) {
|
||||
entityStore.encodeToEntity(value, DEVICE_HUB_ENTITY_TYPE, serializer)
|
||||
}
|
||||
|
||||
override suspend fun <T : Any> storeMagixMessage(value: T, serializer: KSerializer<T>) {
|
||||
entityStore.encodeToEntity(value, MAGIX_SERVER_ENTITY_TYPE, serializer)
|
||||
}
|
||||
|
||||
override suspend fun getPropertyHistory(
|
||||
sourceDeviceName: String,
|
||||
propertyName: String,
|
||||
): List<PropertyChangedMessage> = entityStore.computeInTransaction { txn ->
|
||||
txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed")
|
||||
.filter {
|
||||
it?.getProperty("sourceDevice") == sourceDeviceName && it.getProperty("property") == propertyName
|
||||
}
|
||||
.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } }
|
||||
.map { txn.decodeFromEntity(it, PropertyChangedMessage.serializer()) }
|
||||
.toList()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
entityStore.close()
|
||||
}
|
||||
|
||||
public companion object : Factory<EventStorage> {
|
||||
override fun invoke(meta: Meta, context: Context): EventStorage {
|
||||
val entityStore = context.getPersistentEntityStore(meta)
|
||||
return XodusEventStorage(entityStore)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
||||
val storePath = meta[XODUS_STORE_PROPERTY]?.string
|
||||
?: properties[XODUS_STORE_PROPERTY]?.string
|
||||
?: DEFAULT_XODUS_STORE_PATH
|
||||
|
||||
return PersistentEntityStores.newInstance(storePath)
|
||||
}
|
@ -10,11 +10,11 @@ import org.eclipse.milo.opcua.sdk.server.OpcUaServer
|
||||
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.client.connectToMagix
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.install
|
||||
import ru.mipt.npm.controls.demo.DemoDevice.Companion.cosScale
|
||||
import ru.mipt.npm.controls.demo.DemoDevice.Companion.sinScale
|
||||
import ru.mipt.npm.controls.demo.DemoDevice.Companion.timeScale
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.install
|
||||
import ru.mipt.npm.controls.opcua.server.OpcUaServer
|
||||
import ru.mipt.npm.controls.opcua.server.endpoint
|
||||
import ru.mipt.npm.controls.opcua.server.serveDevices
|
||||
|
@ -8,4 +8,4 @@ org.gradle.parallel=true
|
||||
publishing.github=false
|
||||
publishing.sonatype=false
|
||||
|
||||
toolsVersion=0.11.5-kotlin-1.6.21
|
||||
toolsVersion=0.11.5-kotlin-1.7.0-RC
|
@ -25,9 +25,9 @@ import kotlinx.serialization.json.JsonElement
|
||||
*/
|
||||
@Serializable
|
||||
public data class MagixMessage<T>(
|
||||
val format: String,
|
||||
val origin: String,
|
||||
val payload: T,
|
||||
val format: String = origin,
|
||||
val target: String? = null,
|
||||
val id: String? = null,
|
||||
val parentId: String? = null,
|
||||
@ -39,4 +39,4 @@ public data class MagixMessage<T>(
|
||||
*/
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
public fun <T, R> MagixMessage<T>.replacePayload(payloadTransform: (T) -> R): MagixMessage<R> =
|
||||
MagixMessage(format, origin, payloadTransform(payload), target, id, parentId, user)
|
||||
MagixMessage(origin, payloadTransform(payload), format, target, id, parentId, user)
|
@ -0,0 +1,17 @@
|
||||
package ru.mipt.npm.magix.api
|
||||
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
|
||||
/**
|
||||
* An interface to access distributed Magix property registry
|
||||
*/
|
||||
public interface MagixRegistry {
|
||||
/**
|
||||
* Request a property with name [propertyName] and user authentication data [user].
|
||||
*
|
||||
* Return a property value in its generic form or null if it is not present.
|
||||
*
|
||||
* Throw an exception if property is present but access is denied.
|
||||
*/
|
||||
public suspend fun request(propertyName: String, user: JsonElement? = null): JsonElement?
|
||||
}
|
@ -18,9 +18,9 @@ public fun <T, R> CoroutineScope.launchMagixConverter(
|
||||
): Job = inputEndpoint.subscribe(filter).onEach { message->
|
||||
val newPayload = transformer(message.payload)
|
||||
val transformed: MagixMessage<R> = MagixMessage(
|
||||
outputFormat,
|
||||
newOrigin ?: message.origin,
|
||||
newPayload,
|
||||
outputFormat,
|
||||
message.target,
|
||||
message.id,
|
||||
message.parentId,
|
||||
|
@ -23,7 +23,7 @@ suspend fun MagixEndpoint<JsonObject>.sendJson(
|
||||
parentId: String? = null,
|
||||
user: JsonElement? = null,
|
||||
builder: JsonObjectBuilder.() -> Unit
|
||||
): Unit = broadcast(MagixMessage(format, origin, buildJsonObject(builder), target, id, parentId, user))
|
||||
): Unit = broadcast(MagixMessage(origin, buildJsonObject(builder), format, target, id, parentId, user))
|
||||
|
||||
internal const val numberOfMessages = 100
|
||||
|
||||
|
21
magix/magix-storage/magix-storage-xodus/build.gradle.kts
Normal file
21
magix/magix-storage/magix-storage-xodus/build.gradle.kts
Normal file
@ -0,0 +1,21 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.jvm")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val xodusVersion = "2.0.1"
|
||||
|
||||
kscience{
|
||||
useCoroutines()
|
||||
}
|
||||
|
||||
dependencies {
|
||||
api(projects.magix.magixApi)
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||
|
||||
testImplementation(npmlibs.kotlinx.coroutines.test)
|
||||
}
|
||||
|
||||
readme{
|
||||
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package ru.mipt.npm.magix.storage.xodus
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixMessageFilter
|
||||
import java.nio.file.Path
|
||||
|
||||
public class XodusMagixStorage(
|
||||
private val scope: CoroutineScope,
|
||||
private val path: Path,
|
||||
private val endpoint: MagixEndpoint<JsonElement>,
|
||||
private val filter: MagixMessageFilter = MagixMessageFilter(),
|
||||
) : AutoCloseable {
|
||||
|
||||
private val subscriptionJob = endpoint.subscribe(filter).onEach {
|
||||
TODO()
|
||||
}.launchIn(scope)
|
||||
|
||||
override fun close() {
|
||||
subscriptionJob.cancel()
|
||||
}
|
||||
}
|
@ -11,8 +11,8 @@ import javafx.scene.layout.VBox
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.installing
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.installing
|
||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.maxPosition
|
||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.minPosition
|
||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.position
|
||||
|
@ -44,8 +44,9 @@ include(
|
||||
":controls-serial",
|
||||
":controls-server",
|
||||
":controls-opcua",
|
||||
":demo",
|
||||
// ":demo:car",
|
||||
":controls-xodus",
|
||||
// ":controls-mongo",
|
||||
":controls-storage",
|
||||
":magix",
|
||||
":magix:magix-api",
|
||||
":magix:magix-server",
|
||||
@ -53,10 +54,10 @@ include(
|
||||
":magix:magix-java-client",
|
||||
":magix:magix-zmq",
|
||||
":magix:magix-demo",
|
||||
// ":magix:magix-storage",
|
||||
":magix:magix-storage:magix-storage-xodus",
|
||||
":controls-magix-client",
|
||||
":motors",
|
||||
":controls-xodus",
|
||||
":controls-mongo",
|
||||
":xodus-serialization",
|
||||
":controls-storage"
|
||||
":demo",
|
||||
// ":demo:car",
|
||||
)
|
||||
|
@ -1,22 +0,0 @@
|
||||
plugins {
|
||||
id("ru.mipt.npm.gradle.jvm")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val xodusVersion = "1.3.232"
|
||||
|
||||
//TODO to be moved to DataForge
|
||||
|
||||
kscience {
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(projects.magix.magixApi)
|
||||
implementation(projects.controlsCore)
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||
}
|
@ -1,62 +0,0 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.EntityId
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.serialization.DeserializationStrategy
|
||||
import kotlinx.serialization.json.*
|
||||
import kotlinx.serialization.serializer
|
||||
|
||||
internal fun StoreTransaction.decodeFromEntity(entity: Entity): JsonElement = buildJsonObject {
|
||||
entity.propertyNames.forEach { property ->
|
||||
entity.getProperty(property).let { value ->
|
||||
when (value) {
|
||||
is Number -> put(property, value)
|
||||
is Boolean -> put(property, value)
|
||||
is String -> put(property, value)
|
||||
else -> throw IllegalStateException("Unsupported type for primitive field")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
entity.linkNames.forEach { link ->
|
||||
entity.getLinks(link).let { entities ->
|
||||
when (entities.size()) {
|
||||
1L -> entities.first?.let { put(link, decodeFromEntity(it)) }
|
||||
else -> {
|
||||
putJsonArray(link) {
|
||||
entities.forEach {
|
||||
add(decodeFromEntity(it))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public fun <T> StoreTransaction.decodeFromEntity(entity: Entity, deserializer: DeserializationStrategy<T>): T {
|
||||
val jsonElement = decodeFromEntity(entity)
|
||||
val json = Json { ignoreUnknownKeys = true }
|
||||
return json.decodeFromJsonElement(deserializer, jsonElement)
|
||||
}
|
||||
|
||||
public inline fun <reified T> StoreTransaction.decodeFromEntity(entity: Entity): T = decodeFromEntity(entity, serializer())
|
||||
|
||||
// First entity with entityType will be decoded
|
||||
public fun <T> PersistentEntityStore.decodeFromEntity(entityType: String, deserializer: DeserializationStrategy<T>): T? {
|
||||
return computeInTransaction { txn ->
|
||||
txn.getAll(entityType).first?.let { txn.decodeFromEntity(it, deserializer) }
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> PersistentEntityStore.decodeFromEntity(entityType: String): T? = decodeFromEntity(entityType, serializer())
|
||||
|
||||
public fun <T> PersistentEntityStore.decodeFromEntity(entityId: EntityId, deserializer: DeserializationStrategy<T>): T? {
|
||||
return computeInTransaction { txn ->
|
||||
txn.decodeFromEntity(txn.getEntity(entityId), deserializer)
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> PersistentEntityStore.decodeFromEntity(entityId: EntityId): T? = decodeFromEntity(entityId, serializer())
|
@ -1,78 +0,0 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.EntityId
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.StoreTransaction
|
||||
import kotlinx.serialization.InternalSerializationApi
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.SerializationStrategy
|
||||
import kotlinx.serialization.json.*
|
||||
import kotlinx.serialization.serializer
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
internal fun StoreTransaction.encodeToEntity(jsonElement: JsonElement, entity: Entity) {
|
||||
when (jsonElement) {
|
||||
is JsonPrimitive -> throw IllegalStateException("Can't serialize primitive value to entity")
|
||||
is JsonArray -> throw IllegalStateException("Can't serialize array value to entity")
|
||||
is JsonObject -> {
|
||||
jsonElement.forEach { entry ->
|
||||
entry.value.let { value ->
|
||||
when(value) {
|
||||
// не сможем десериализовать, если JsonNull (надо ли обрабатывать???) (можно сохранить в отдельный список ключи null-ов)
|
||||
is JsonPrimitive -> {
|
||||
if (value.isString) {
|
||||
entity.setProperty(entry.key, value.content)
|
||||
} else {
|
||||
(value.longOrNull ?: value.doubleOrNull ?: value.booleanOrNull)?.let {
|
||||
entity.setProperty(
|
||||
entry.key,
|
||||
it
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// считаем, что все элементы массива - JsonObject, иначе не можем напрямую сериализовать (надо придывать костыли???)
|
||||
// не сможем десериализовать, если массив пустой (надо ли обрабатывать???) (можно сохранять в отдельный список ключи пустых массивов)
|
||||
is JsonArray -> {
|
||||
value.forEach { element ->
|
||||
val childEntity = newEntity("${entity.type}.${entry.key}")
|
||||
encodeToEntity(element, childEntity)
|
||||
entity.addLink(entry.key, childEntity)
|
||||
}
|
||||
}
|
||||
|
||||
is JsonObject -> {
|
||||
val childEntity = newEntity("${entity.type}.${entry.key}")
|
||||
encodeToEntity(value, childEntity)
|
||||
entity.setLink(entry.key, childEntity)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public fun <T> StoreTransaction.encodeToEntity(serializer: SerializationStrategy<T>, value: T, entityType: String): Entity {
|
||||
val entity: Entity = newEntity(entityType)
|
||||
encodeToEntity(Json.encodeToJsonElement(serializer, value), entity)
|
||||
return entity
|
||||
}
|
||||
|
||||
public inline fun <reified T> StoreTransaction.encodeToEntity(value: T, entityType: String): Entity =
|
||||
encodeToEntity(serializer(), value, entityType)
|
||||
|
||||
public fun <T> PersistentEntityStore.encodeToEntity(serializer: SerializationStrategy<T>, value: T, entityType: String): EntityId {
|
||||
return computeInTransaction { txn ->
|
||||
txn.encodeToEntity(serializer, value, entityType).id
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> PersistentEntityStore.encodeToEntity(value: T, entityType: String): EntityId =
|
||||
encodeToEntity(serializer(), value, entityType)
|
||||
|
||||
@OptIn(InternalSerializationApi::class)
|
||||
public fun <T : Any> PersistentEntityStore.encodeToEntity(value: T, entityType: String, serializer: KSerializer<T>): EntityId =
|
||||
encodeToEntity(serializer, value, entityType)
|
@ -1,38 +0,0 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.JsonPrimitive
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import java.nio.file.Paths
|
||||
|
||||
internal fun main() {
|
||||
val expectedMessage = MagixMessage(
|
||||
"dataforge",
|
||||
"dataforge",
|
||||
PropertyChangedMessage(
|
||||
"acceleration",
|
||||
Meta {
|
||||
"x" put 3.0
|
||||
"y" put 9.0
|
||||
},
|
||||
Name.parse("virtual-car"),
|
||||
Name.parse("magix-virtual-car"),
|
||||
time = Instant.fromEpochMilliseconds(1337)
|
||||
),
|
||||
"magix-virtual-car"
|
||||
)
|
||||
|
||||
val entityStore = PersistentEntityStores.newInstance(Paths.get(".xodus_serialization").toString())
|
||||
entityStore.executeInTransaction { txn ->
|
||||
txn.encodeToEntity(expectedMessage, "MagixMessage")
|
||||
}
|
||||
|
||||
entityStore.executeInTransaction { txn ->
|
||||
txn.getAll("MagixMessage").first?.let { println(txn.decodeFromEntity<MagixMessage<PropertyChangedMessage>>(it) == expectedMessage) }
|
||||
}
|
||||
}
|
@ -1,105 +0,0 @@
|
||||
package ru.mipt.npm.xodus.serialization.json
|
||||
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.serialization.json.*
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.AfterEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertThrows
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.names.Name
|
||||
import java.nio.file.Paths
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
internal class EncoderDecoderTests {
|
||||
companion object {
|
||||
private val storePath = Paths.get(".xodus_serialization_test")
|
||||
private val entityStore = PersistentEntityStores.newInstance(storePath.toString())
|
||||
|
||||
@AfterAll
|
||||
@JvmStatic
|
||||
fun deleteDatabase() {
|
||||
entityStore.close()
|
||||
storePath.toFile().deleteRecursively()
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
fun clearDatabase() {
|
||||
entityStore.clear()
|
||||
}
|
||||
|
||||
fun checkEncodingDecodingCorrectness(json: JsonObject) {
|
||||
val id = entityStore.encodeToEntity(json, "JsonObject")
|
||||
assertEquals(json, entityStore.decodeFromEntity(id))
|
||||
}
|
||||
|
||||
fun checkEncodingDecodingCorrectness(jsons: List<JsonObject>) = jsons.forEach {
|
||||
checkEncodingDecodingCorrectness(it)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `encoder throw Illegal exception if input is not a JsonObject`() {
|
||||
assertThrows<IllegalStateException> {
|
||||
val json = JsonPrimitive(0)
|
||||
entityStore.encodeToEntity(json, "JsonPrimitive")
|
||||
}
|
||||
|
||||
assertThrows<IllegalStateException> {
|
||||
val json = buildJsonArray {}
|
||||
entityStore.encodeToEntity(json, "JsonArray")
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `correctly work with underlying JsonPrimitive`() {
|
||||
val jsonLong = buildJsonObject { put("value", 0) }
|
||||
val jsonDouble = buildJsonObject { put("value", 0.0) }
|
||||
val jsonBoolean = buildJsonObject { put("value", true) }
|
||||
val jsonString = buildJsonObject { put("value", "") }
|
||||
|
||||
checkEncodingDecodingCorrectness(listOf(jsonLong, jsonDouble, jsonBoolean, jsonString))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `correctly work with underlying JsonArray`() {
|
||||
checkEncodingDecodingCorrectness(buildJsonObject { putJsonArray("value") {
|
||||
add(buildJsonObject { put("value", 0) })
|
||||
add(buildJsonObject { put("value", 0.0) })
|
||||
} })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `correctly work with underlying JsonObject`() {
|
||||
checkEncodingDecodingCorrectness(buildJsonObject {
|
||||
putJsonObject("value", { put("value", true) })
|
||||
})
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testMagixMessagePropertyChangedMessage() {
|
||||
val expectedMessage = MagixMessage(
|
||||
"dataforge",
|
||||
"dataforge",
|
||||
PropertyChangedMessage(
|
||||
"acceleration",
|
||||
Meta {
|
||||
"x" put 3.0
|
||||
"y" put 9.0
|
||||
},
|
||||
Name.parse("virtual-car"),
|
||||
Name.parse("magix-virtual-car"),
|
||||
time = Instant.fromEpochMilliseconds(1337)
|
||||
),
|
||||
"magix-virtual-car",
|
||||
user = buildJsonObject { put("name", "SCADA") }
|
||||
)
|
||||
|
||||
val id = entityStore.encodeToEntity(expectedMessage, "MagixMessage")
|
||||
assertEquals(expectedMessage, entityStore.decodeFromEntity<MagixMessage<PropertyChangedMessage>>(id))
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user