Refactor storage
This commit is contained in:
parent
786e1637b4
commit
427ecbf91a
@ -6,10 +6,10 @@ plugins {
|
|||||||
val kmongoVersion = "4.4.0"
|
val kmongoVersion = "4.4.0"
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(projects.controlsCore)
|
|
||||||
implementation(projects.magix.magixApi)
|
|
||||||
implementation(projects.controlsMagixClient)
|
|
||||||
implementation(projects.magix.magixServer)
|
|
||||||
implementation(projects.controlsStorage)
|
implementation(projects.controlsStorage)
|
||||||
implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion")
|
implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
readme{
|
||||||
|
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||||
|
}
|
||||||
|
@ -1,37 +1,33 @@
|
|||||||
package ru.mipt.npm.controls.mongo
|
package ru.mipt.npm.controls.mongo
|
||||||
|
|
||||||
import kotlinx.serialization.InternalSerializationApi
|
|
||||||
import kotlinx.serialization.KSerializer
|
import kotlinx.serialization.KSerializer
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import kotlinx.serialization.serializer
|
|
||||||
import org.litote.kmongo.coroutine.CoroutineClient
|
import org.litote.kmongo.coroutine.CoroutineClient
|
||||||
import org.litote.kmongo.coroutine.coroutine
|
import org.litote.kmongo.coroutine.coroutine
|
||||||
import org.litote.kmongo.coroutine.insertOne
|
import org.litote.kmongo.coroutine.insertOne
|
||||||
import org.litote.kmongo.reactivestreams.KMongo
|
import org.litote.kmongo.reactivestreams.KMongo
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
import ru.mipt.npm.controls.api.DeviceMessage
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.storage.asynchronous.AsynchronousStorageClient
|
import ru.mipt.npm.controls.storage.EventStorage
|
||||||
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.Factory
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.get
|
import space.kscience.dataforge.meta.get
|
||||||
import space.kscience.dataforge.meta.string
|
import space.kscience.dataforge.meta.string
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME: String = "deviceMessage"
|
private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME: String = "deviceMessage"
|
||||||
private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage"
|
private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage"
|
||||||
private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888"
|
private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888"
|
||||||
public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName")
|
private val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName")
|
||||||
public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName")
|
public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName")
|
||||||
public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl")
|
public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl")
|
||||||
|
|
||||||
internal class AsynchronousMongoClient(
|
internal class MongoEventStorage(
|
||||||
private val client: CoroutineClient,
|
private val client: CoroutineClient,
|
||||||
private val meta: Meta = Meta.EMPTY
|
private val meta: Meta = Meta.EMPTY,
|
||||||
) : AsynchronousStorageClient {
|
) : EventStorage {
|
||||||
override suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>) {
|
override suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>) {
|
||||||
val collection = client
|
val collection = client
|
||||||
.getDatabase(
|
.getDatabase(
|
||||||
@ -45,7 +41,8 @@ internal class AsynchronousMongoClient(
|
|||||||
|
|
||||||
override suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>) {
|
override suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>) {
|
||||||
val collection = client
|
val collection = client
|
||||||
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
|
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string
|
||||||
|
?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
|
||||||
.getCollection<GenericMagixMessage>()
|
.getCollection<GenericMagixMessage>()
|
||||||
|
|
||||||
collection.insertOne(Json.encodeToString(serializer, value))
|
collection.insertOne(Json.encodeToString(serializer, value))
|
||||||
@ -53,7 +50,7 @@ internal class AsynchronousMongoClient(
|
|||||||
|
|
||||||
override suspend fun getPropertyHistory(
|
override suspend fun getPropertyHistory(
|
||||||
sourceDeviceName: String,
|
sourceDeviceName: String,
|
||||||
propertyName: String
|
propertyName: String,
|
||||||
): List<PropertyChangedMessage> {
|
): List<PropertyChangedMessage> {
|
||||||
TODO("Not yet implemented: problems with deserialization")
|
TODO("Not yet implemented: problems with deserialization")
|
||||||
}
|
}
|
||||||
@ -63,12 +60,12 @@ internal class AsynchronousMongoClient(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public object DefaultAsynchronousMongoClientFactory : Factory<AsynchronousStorageClient> {
|
public object DefaultAsynchronousMongoClientFactory : Factory<EventStorage> {
|
||||||
override fun invoke(meta: Meta, context: Context): AsynchronousStorageClient {
|
override fun invoke(meta: Meta, context: Context): EventStorage {
|
||||||
val client = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let {
|
val client = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let {
|
||||||
KMongo.createClient(it).coroutine
|
KMongo.createClient(it).coroutine
|
||||||
} ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine
|
} ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine
|
||||||
|
|
||||||
return AsynchronousMongoClient(client, meta)
|
return MongoEventStorage(client, meta)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -9,16 +9,20 @@ kotlin {
|
|||||||
sourceSets {
|
sourceSets {
|
||||||
commonMain {
|
commonMain {
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(projects.controlsCore)
|
api(projects.controlsCore)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
jvmMain {
|
jvmMain {
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(projects.magix.magixApi)
|
api(projects.magix.magixApi)
|
||||||
implementation(projects.controlsMagixClient)
|
api(projects.controlsMagixClient)
|
||||||
implementation(projects.magix.magixServer)
|
api(projects.magix.magixServer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
readme{
|
||||||
|
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||||
|
}
|
||||||
|
@ -1,50 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.storage.asynchronous
|
|
||||||
|
|
||||||
import io.ktor.utils.io.core.use
|
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
|
||||||
import kotlinx.coroutines.Job
|
|
||||||
import kotlinx.coroutines.flow.filter
|
|
||||||
import kotlinx.coroutines.flow.launchIn
|
|
||||||
import kotlinx.coroutines.flow.onEach
|
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
|
||||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.context.debug
|
|
||||||
import space.kscience.dataforge.context.logger
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Asynchronous version of synchronous API, so for more details check relative docs
|
|
||||||
*/
|
|
||||||
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
|
||||||
public fun DeviceManager.storeMessages(
|
|
||||||
factory: Factory<AsynchronousStorageClient>,
|
|
||||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
|
||||||
): Job {
|
|
||||||
val client = factory(meta, context)
|
|
||||||
logger.debug { "Storage client created" }
|
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
|
||||||
client.storeValueInDeviceHub(message)
|
|
||||||
}.launchIn(context).apply {
|
|
||||||
invokeOnCompletion(onCancelling = true) {
|
|
||||||
client.close()
|
|
||||||
logger.debug { "Storage client closed" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public suspend fun getPropertyHistory(
|
|
||||||
sourceDeviceName: String,
|
|
||||||
propertyName: String,
|
|
||||||
factory: Factory<AsynchronousStorageClient>,
|
|
||||||
meta: Meta = Meta.EMPTY
|
|
||||||
): List<PropertyChangedMessage> {
|
|
||||||
return factory(meta).use {
|
|
||||||
it.getPropertyHistory(sourceDeviceName, propertyName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,20 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.storage.synchronous
|
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Closeable
|
|
||||||
import kotlinx.serialization.KSerializer
|
|
||||||
import kotlinx.serialization.serializer
|
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|
||||||
|
|
||||||
public interface SynchronousStorageClient : Closeable {
|
|
||||||
public fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>)
|
|
||||||
|
|
||||||
public fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>)
|
|
||||||
|
|
||||||
public fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
|
||||||
}
|
|
||||||
|
|
||||||
public inline fun <reified T : Any> SynchronousStorageClient.storeValueInDeviceHub(value: T): Unit =
|
|
||||||
storeValueInDeviceHub(value, serializer())
|
|
||||||
|
|
||||||
public inline fun <reified T : Any> SynchronousStorageClient.storeValueInMagixServer(value: T): Unit =
|
|
||||||
storeValueInMagixServer(value, serializer())
|
|
@ -1,11 +1,11 @@
|
|||||||
package ru.mipt.npm.controls.storage.asynchronous
|
package ru.mipt.npm.controls.storage
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Closeable
|
import io.ktor.utils.io.core.Closeable
|
||||||
import kotlinx.serialization.KSerializer
|
import kotlinx.serialization.KSerializer
|
||||||
import kotlinx.serialization.serializer
|
import kotlinx.serialization.serializer
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
|
|
||||||
public interface AsynchronousStorageClient : Closeable {
|
public interface EventStorage : Closeable {
|
||||||
public suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>)
|
public suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>)
|
||||||
|
|
||||||
public suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>)
|
public suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>)
|
||||||
@ -13,8 +13,8 @@ public interface AsynchronousStorageClient : Closeable {
|
|||||||
public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
||||||
}
|
}
|
||||||
|
|
||||||
public suspend inline fun <reified T : Any> AsynchronousStorageClient.storeValueInDeviceHub(value: T): Unit =
|
public suspend inline fun <reified T : Any> EventStorage.storeValueInDeviceHub(value: T): Unit =
|
||||||
storeValueInDeviceHub(value, serializer())
|
storeValueInDeviceHub(value, serializer())
|
||||||
|
|
||||||
public suspend inline fun <reified T : Any> AsynchronousStorageClient.storeValueInMagixServer(value: T): Unit =
|
public suspend inline fun <reified T : Any> EventStorage.storeValueInMagixServer(value: T): Unit =
|
||||||
storeValueInMagixServer(value, serializer())
|
storeValueInMagixServer(value, serializer())
|
@ -1,4 +1,4 @@
|
|||||||
package ru.mipt.npm.controls.storage.synchronous
|
package ru.mipt.npm.controls.storage
|
||||||
|
|
||||||
import io.ktor.utils.io.core.use
|
import io.ktor.utils.io.core.use
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
import kotlinx.coroutines.InternalCoroutinesApi
|
||||||
@ -14,11 +14,7 @@ import space.kscience.dataforge.context.Factory
|
|||||||
import space.kscience.dataforge.context.debug
|
import space.kscience.dataforge.context.debug
|
||||||
import space.kscience.dataforge.context.logger
|
import space.kscience.dataforge.context.logger
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import kotlin.jvm.JvmName
|
||||||
public enum class StorageKind {
|
|
||||||
DEVICE_HUB,
|
|
||||||
MAGIX_SERVER
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Begin to store DeviceMessages from this DeviceManager
|
* Begin to store DeviceMessages from this DeviceManager
|
||||||
@ -28,8 +24,9 @@ public enum class StorageKind {
|
|||||||
* @return Job which responsible for our storage
|
* @return Job which responsible for our storage
|
||||||
*/
|
*/
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
|
@JvmName("storeMessagesAsync")
|
||||||
public fun DeviceManager.storeMessages(
|
public fun DeviceManager.storeMessages(
|
||||||
factory: Factory<SynchronousStorageClient>,
|
factory: Factory<EventStorage>,
|
||||||
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
|
||||||
): Job {
|
): Job {
|
||||||
val client = factory(meta, context)
|
val client = factory(meta, context)
|
||||||
@ -51,13 +48,20 @@ public fun DeviceManager.storeMessages(
|
|||||||
* @param propertyName a name of property, history of which we want to get
|
* @param propertyName a name of property, history of which we want to get
|
||||||
* @param factory a factory that produce mongo clients
|
* @param factory a factory that produce mongo clients
|
||||||
*/
|
*/
|
||||||
public fun getPropertyHistory(
|
public suspend fun getPropertyHistory(
|
||||||
sourceDeviceName: String,
|
sourceDeviceName: String,
|
||||||
propertyName: String,
|
propertyName: String,
|
||||||
factory: Factory<SynchronousStorageClient>,
|
factory: Factory<EventStorage>,
|
||||||
meta: Meta = Meta.EMPTY
|
meta: Meta = Meta.EMPTY,
|
||||||
): List<PropertyChangedMessage> {
|
): List<PropertyChangedMessage> {
|
||||||
return factory(meta).use {
|
return factory(meta).use {
|
||||||
it.getPropertyHistory(sourceDeviceName, propertyName)
|
it.getPropertyHistory(sourceDeviceName, propertyName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public enum class StorageKind {
|
||||||
|
DEVICE_HUB,
|
||||||
|
MAGIX_SERVER
|
||||||
|
}
|
||||||
|
|
@ -1,40 +0,0 @@
|
|||||||
package ru.mipt.npm.controls.storage.asynchronous
|
|
||||||
|
|
||||||
import io.ktor.application.*
|
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
|
||||||
import kotlinx.coroutines.flow.filter
|
|
||||||
import kotlinx.coroutines.flow.onEach
|
|
||||||
import kotlinx.coroutines.job
|
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Asynchronous version of synchronous API, so for more details check relative docs
|
|
||||||
*/
|
|
||||||
|
|
||||||
internal fun Flow<GenericMagixMessage>.store(
|
|
||||||
client: AsynchronousStorageClient,
|
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
|
||||||
) {
|
|
||||||
filter(flowFilter).onEach { message ->
|
|
||||||
client.storeValueInMagixServer(message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@OptIn(InternalCoroutinesApi::class)
|
|
||||||
public fun Application.store(
|
|
||||||
flow: MutableSharedFlow<GenericMagixMessage>,
|
|
||||||
factory: Factory<AsynchronousStorageClient>,
|
|
||||||
meta: Meta = Meta.EMPTY,
|
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
|
||||||
) {
|
|
||||||
val client = factory(meta)
|
|
||||||
|
|
||||||
flow.store(client, flowFilter)
|
|
||||||
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
|
|
||||||
client.close()
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,6 +1,6 @@
|
|||||||
package ru.mipt.npm.controls.storage.synchronous
|
package ru.mipt.npm.controls.storage
|
||||||
|
|
||||||
import io.ktor.application.*
|
import io.ktor.application.Application
|
||||||
import kotlinx.coroutines.InternalCoroutinesApi
|
import kotlinx.coroutines.InternalCoroutinesApi
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
@ -11,8 +11,12 @@ import ru.mipt.npm.magix.server.GenericMagixMessage
|
|||||||
import space.kscience.dataforge.context.Factory
|
import space.kscience.dataforge.context.Factory
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asynchronous version of synchronous API, so for more details check relative docs
|
||||||
|
*/
|
||||||
|
|
||||||
internal fun Flow<GenericMagixMessage>.store(
|
internal fun Flow<GenericMagixMessage>.store(
|
||||||
client: SynchronousStorageClient,
|
client: EventStorage,
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
) {
|
) {
|
||||||
filter(flowFilter).onEach { message ->
|
filter(flowFilter).onEach { message ->
|
||||||
@ -29,8 +33,8 @@ internal fun Flow<GenericMagixMessage>.store(
|
|||||||
@OptIn(InternalCoroutinesApi::class)
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
public fun Application.store(
|
public fun Application.store(
|
||||||
flow: MutableSharedFlow<GenericMagixMessage>,
|
flow: MutableSharedFlow<GenericMagixMessage>,
|
||||||
|
factory: Factory<EventStorage>,
|
||||||
meta: Meta = Meta.EMPTY,
|
meta: Meta = Meta.EMPTY,
|
||||||
factory: Factory<SynchronousStorageClient>,
|
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
) {
|
) {
|
||||||
val client = factory(meta)
|
val client = factory(meta)
|
@ -6,10 +6,15 @@ plugins {
|
|||||||
val xodusVersion = "1.3.232"
|
val xodusVersion = "1.3.232"
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(projects.xodusSerialization)
|
api(projects.xodusSerialization)
|
||||||
implementation(projects.controlsStorage)
|
api(projects.controlsStorage)
|
||||||
implementation(projects.controlsCore)
|
|
||||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||||
}
|
|
||||||
|
testImplementation(npmlibs.kotlinx.coroutines.test)
|
||||||
|
}
|
||||||
|
|
||||||
|
readme{
|
||||||
|
maturity = ru.mipt.npm.gradle.Maturity.PROTOTYPE
|
||||||
|
}
|
||||||
|
@ -5,8 +5,7 @@ import jetbrains.exodus.entitystore.PersistentEntityStores
|
|||||||
import kotlinx.datetime.Instant
|
import kotlinx.datetime.Instant
|
||||||
import kotlinx.serialization.KSerializer
|
import kotlinx.serialization.KSerializer
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
import ru.mipt.npm.controls.storage.EventStorage
|
||||||
import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient
|
|
||||||
import ru.mipt.npm.xodus.serialization.json.decodeFromEntity
|
import ru.mipt.npm.xodus.serialization.json.decodeFromEntity
|
||||||
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
@ -15,7 +14,6 @@ import space.kscience.dataforge.meta.Meta
|
|||||||
import space.kscience.dataforge.meta.get
|
import space.kscience.dataforge.meta.get
|
||||||
import space.kscience.dataforge.meta.string
|
import space.kscience.dataforge.meta.string
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
private const val DEFAULT_XODUS_STORE_PATH = ".storage"
|
||||||
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath")
|
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath")
|
||||||
@ -23,31 +21,38 @@ public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "entityStorePath")
|
|||||||
private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage"
|
private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage"
|
||||||
private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage"
|
private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage"
|
||||||
|
|
||||||
internal class SynchronousXodusClient(private val entityStore: PersistentEntityStore) : SynchronousStorageClient {
|
public class XodusEventStorage(private val entityStore: PersistentEntityStore) : EventStorage {
|
||||||
override fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>) {
|
override suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>) {
|
||||||
entityStore.encodeToEntity(value, DEVICE_HUB_ENTITY_TYPE, serializer)
|
entityStore.encodeToEntity(value, DEVICE_HUB_ENTITY_TYPE, serializer)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>) {
|
override suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>) {
|
||||||
entityStore.encodeToEntity(value, MAGIX_SERVER_ENTITY_TYPE, serializer)
|
entityStore.encodeToEntity(value, MAGIX_SERVER_ENTITY_TYPE, serializer)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getPropertyHistory(
|
override suspend fun getPropertyHistory(
|
||||||
sourceDeviceName: String,
|
sourceDeviceName: String,
|
||||||
propertyName: String
|
propertyName: String,
|
||||||
): List<PropertyChangedMessage> {
|
): List<PropertyChangedMessage> = entityStore.computeInTransaction { txn ->
|
||||||
return entityStore.computeInTransaction { txn ->
|
txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed")
|
||||||
txn.find(DEVICE_HUB_ENTITY_TYPE, "type", "property.changed").asSequence()
|
.filter {
|
||||||
.filter { it?.getProperty("sourceDevice")?.let { it == sourceDeviceName } ?: false &&
|
it?.getProperty("sourceDevice") == sourceDeviceName && it.getProperty("property") == propertyName
|
||||||
it?.getProperty("property")?.let { it == propertyName } ?: false
|
}
|
||||||
}.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } }
|
.sortedByDescending { it?.getProperty("time")?.let { timeStr -> Instant.parse(timeStr as String) } }
|
||||||
.toList().map { txn.decodeFromEntity(it) }
|
.map { txn.decodeFromEntity(it, PropertyChangedMessage.serializer()) }
|
||||||
}
|
.toList()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
entityStore.close()
|
entityStore.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public companion object : Factory<EventStorage> {
|
||||||
|
override fun invoke(meta: Meta, context: Context): EventStorage {
|
||||||
|
val entityStore = context.getPersistentEntityStore(meta)
|
||||||
|
return XodusEventStorage(entityStore)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): PersistentEntityStore {
|
||||||
@ -57,10 +62,3 @@ private fun Context.getPersistentEntityStore(meta: Meta = Meta.EMPTY): Persisten
|
|||||||
|
|
||||||
return PersistentEntityStores.newInstance(storePath)
|
return PersistentEntityStores.newInstance(storePath)
|
||||||
}
|
}
|
||||||
|
|
||||||
public object DefaultSynchronousXodusClientFactory : Factory<SynchronousStorageClient> {
|
|
||||||
override fun invoke(meta: Meta, context: Context): SynchronousStorageClient {
|
|
||||||
val entityStore = context.getPersistentEntityStore(meta)
|
|
||||||
return SynchronousXodusClient(entityStore)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,4 +1,6 @@
|
|||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||||
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
|
import kotlinx.coroutines.test.runTest
|
||||||
import kotlinx.datetime.Instant
|
import kotlinx.datetime.Instant
|
||||||
import org.junit.jupiter.api.AfterAll
|
import org.junit.jupiter.api.AfterAll
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
@ -6,9 +8,9 @@ import org.junit.jupiter.api.BeforeAll
|
|||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import ru.mipt.npm.controls.api.DeviceMessage
|
import ru.mipt.npm.controls.api.DeviceMessage
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.storage.synchronous.getPropertyHistory
|
import ru.mipt.npm.controls.storage.getPropertyHistory
|
||||||
import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory
|
|
||||||
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
||||||
|
import ru.mipt.npm.controls.xodus.XodusEventStorage
|
||||||
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
@ -56,11 +58,15 @@ internal class PropertyHistoryTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@OptIn(ExperimentalCoroutinesApi::class)
|
||||||
@Test
|
@Test
|
||||||
fun getPropertyHistoryTest() {
|
fun getPropertyHistoryTest() = runTest {
|
||||||
assertEquals(listOf(propertyChangedMessages[0]), getPropertyHistory(
|
assertEquals(
|
||||||
"virtual-car", "speed", DefaultSynchronousXodusClientFactory, Meta {
|
listOf(propertyChangedMessages[0]),
|
||||||
|
getPropertyHistory(
|
||||||
|
"virtual-car", "speed", XodusEventStorage, Meta {
|
||||||
XODUS_STORE_PROPERTY put storeName
|
XODUS_STORE_PROPERTY put storeName
|
||||||
}))
|
})
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -14,12 +14,10 @@ import ru.mipt.npm.controls.controllers.DeviceManager
|
|||||||
import ru.mipt.npm.controls.controllers.install
|
import ru.mipt.npm.controls.controllers.install
|
||||||
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
||||||
import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory
|
import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory
|
||||||
import ru.mipt.npm.controls.storage.asynchronous.store
|
import ru.mipt.npm.controls.storage.store
|
||||||
import ru.mipt.npm.controls.storage.asynchronous.storeMessages
|
import ru.mipt.npm.controls.storage.storeMessages
|
||||||
import ru.mipt.npm.controls.storage.synchronous.store
|
|
||||||
import ru.mipt.npm.controls.storage.synchronous.storeMessages
|
|
||||||
import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory
|
|
||||||
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
||||||
|
import ru.mipt.npm.controls.xodus.XodusEventStorage
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||||
import ru.mipt.npm.magix.server.startMagixServer
|
import ru.mipt.npm.magix.server.startMagixServer
|
||||||
@ -57,14 +55,14 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
|
|
||||||
//starting magix event loop and connect it to entity store
|
//starting magix event loop and connect it to entity store
|
||||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
||||||
store( flow, Meta {
|
store(flow, XodusEventStorage, Meta {
|
||||||
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
||||||
}, DefaultSynchronousXodusClientFactory)
|
})
|
||||||
store(flow, DefaultAsynchronousMongoClientFactory)
|
store(flow, DefaultAsynchronousMongoClientFactory)
|
||||||
}
|
}
|
||||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||||
//connect to device entity store
|
//connect to device entity store
|
||||||
xodusStorageJob = deviceManager.storeMessages(DefaultSynchronousXodusClientFactory)
|
xodusStorageJob = deviceManager.storeMessages(XodusEventStorage)
|
||||||
//Create mongo client and connect to MongoDB
|
//Create mongo client and connect to MongoDB
|
||||||
mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory)
|
mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory)
|
||||||
//Launch device client and connect it to the server
|
//Launch device client and connect it to the server
|
||||||
|
Loading…
Reference in New Issue
Block a user