diff --git a/controls-mongo/build.gradle.kts b/controls-mongo/build.gradle.kts index 624b71f..d5c81f0 100644 --- a/controls-mongo/build.gradle.kts +++ b/controls-mongo/build.gradle.kts @@ -10,5 +10,6 @@ dependencies { implementation(projects.magix.magixApi) implementation(projects.controlsMagixClient) implementation(projects.magix.magixServer) + implementation(projects.controlsStorage) implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion") } diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/AsynchronousMongoClient.kt b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/AsynchronousMongoClient.kt new file mode 100644 index 0000000..eb4eb1d --- /dev/null +++ b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/AsynchronousMongoClient.kt @@ -0,0 +1,78 @@ +package ru.mipt.npm.controls.mongo + +import kotlinx.serialization.InternalSerializationApi +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import org.litote.kmongo.coroutine.CoroutineClient +import org.litote.kmongo.coroutine.coroutine +import org.litote.kmongo.coroutine.insertOne +import org.litote.kmongo.reactivestreams.KMongo +import ru.mipt.npm.controls.api.DeviceMessage +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.storage.asynchronous.AsynchronousStorageClient +import ru.mipt.npm.controls.storage.synchronous.StorageKind +import ru.mipt.npm.magix.server.GenericMagixMessage +import space.kscience.dataforge.context.Context +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.get +import space.kscience.dataforge.meta.string +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.names.Name +import kotlin.reflect.KClass + +private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME: String = "deviceMessage" +private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage" +private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888" +public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName") +public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName") +public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl") + +internal class AsynchronousMongoClient( + private val client: CoroutineClient, + private val meta: Meta = Meta.EMPTY +) : AsynchronousStorageClient { + @OptIn(InternalSerializationApi::class) + override suspend fun storeValue(value: T, storageKind: StorageKind, clazz: KClass) { + when (storageKind) { + StorageKind.DEVICE_HUB -> { + val collection = client + .getDatabase( + meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string + ?: DEFAULT_DEVICE_MESSAGE_DATABASE_NAME + ) + .getCollection() + + collection.insertOne(Json.encodeToString(clazz.serializer(), value)) + } + + StorageKind.MAGIX_SERVER -> { + val collection = client + .getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME) + .getCollection() + + collection.insertOne(Json.encodeToString(clazz.serializer(), value)) + } + } + } + + override suspend fun getPropertyHistory( + sourceDeviceName: String, + propertyName: String + ): List { + TODO("Not yet implemented: problems with deserialization") + } + + override fun close() { + client.close() + } +} + +public object DefaultAsynchronousMongoClientFactory : Factory { + override fun invoke(meta: Meta, context: Context): AsynchronousStorageClient { + val client = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let { + KMongo.createClient(it).coroutine + } ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine + + return AsynchronousMongoClient(client, meta) + } +} diff --git a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt b/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt deleted file mode 100644 index e041171..0000000 --- a/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt +++ /dev/null @@ -1,102 +0,0 @@ -package ru.mipt.npm.controls.mongo - -import io.ktor.application.* -import kotlinx.coroutines.InternalCoroutinesApi -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.job -import kotlinx.coroutines.launch -import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.Json -import org.litote.kmongo.coroutine.CoroutineClient -import org.litote.kmongo.coroutine.CoroutineCollection -import org.litote.kmongo.coroutine.coroutine -import org.litote.kmongo.coroutine.insertOne -import org.litote.kmongo.reactivestreams.KMongo -import ru.mipt.npm.controls.api.DeviceMessage -import ru.mipt.npm.controls.controllers.DeviceManager -import ru.mipt.npm.controls.controllers.hubMessageFlow -import ru.mipt.npm.magix.server.GenericMagixMessage -import space.kscience.dataforge.context.Context -import space.kscience.dataforge.context.Factory -import space.kscience.dataforge.context.debug -import space.kscience.dataforge.context.logger -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.get -import space.kscience.dataforge.meta.string -import space.kscience.dataforge.names.Name - -private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888" -private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME = "deviceMessage" -private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage" -public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl") -public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName") -public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName") - -public object DefaultMongoClientFactory : Factory { - override fun invoke(meta: Meta, context: Context): CoroutineClient = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let { - KMongo.createClient(it).coroutine - } ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine -} - -/** - * Begin to store DeviceMessages from this DeviceManager - * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. - * DeviceManager's meta and context will be used for in invoke method. - * @param filterCondition allow you to specify messages which we want to store. Always true by default. - * @return Job which responsible for our storage - */ -@OptIn(InternalCoroutinesApi::class) -public fun DeviceManager.connectMongo( - factory: Factory, - filterCondition: suspend (DeviceMessage) -> Boolean = { true } -): Job { - val client = factory.invoke(meta, context) - logger.debug { "Mongo client opened" } - val collection = client - .getDatabase(meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_DEVICE_MESSAGE_DATABASE_NAME) - .getCollection() - return hubMessageFlow(context).filter(filterCondition).onEach { message -> - context.launch { - collection.insertOne(Json.encodeToString(message)) - } - }.launchIn(context).apply { - invokeOnCompletion(onCancelling = true) { - logger.debug { "Mongo client closed" } - client.close() - } - } -} - -internal fun Flow.storeInMongo( - collection: CoroutineCollection, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - filter(flowFilter).onEach { message -> - collection.insertOne(Json.encodeToString(message)) - } -} - -/** Begin to store MagixMessages from certain flow - * @param flow flow of messages which we will store - * @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory - * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default. - * @param flowFilter allow you to specify messages which we want to store. Always true by default. - */ -@OptIn(InternalCoroutinesApi::class) -public fun Application.storeInMongo( - flow: MutableSharedFlow, - meta: Meta = Meta.EMPTY, - factory: Factory = DefaultMongoClientFactory, - flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, -) { - val client = factory.invoke(meta) - val collection = client - .getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME) - .getCollection() - - flow.storeInMongo(collection, flowFilter) - coroutineContext.job.invokeOnCompletion(onCancelling = true) { - client.close() - } -} diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt new file mode 100644 index 0000000..b14f1f8 --- /dev/null +++ b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/AsynchronousStorageClient.kt @@ -0,0 +1,16 @@ +package ru.mipt.npm.controls.storage.asynchronous + +import kotlinx.io.core.Closeable +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.storage.synchronous.StorageKind +import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient +import kotlin.reflect.KClass + +public interface AsynchronousStorageClient : Closeable { + public suspend fun storeValue(value: T, storageKind: StorageKind, clazz: KClass) + + public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List +} + +public suspend inline fun AsynchronousStorageClient.storeValue(value: T, storageKind: StorageKind): Unit = + storeValue(value, storageKind, T::class) diff --git a/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt new file mode 100644 index 0000000..3877daf --- /dev/null +++ b/controls-storage/src/commonMain/kotlin/ru.mipt.npm.controls.storage/asynchronous/commonApi.kt @@ -0,0 +1,51 @@ +package ru.mipt.npm.controls.storage.asynchronous + +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.io.core.use +import ru.mipt.npm.controls.api.DeviceMessage +import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.controllers.DeviceManager +import ru.mipt.npm.controls.controllers.hubMessageFlow +import ru.mipt.npm.controls.storage.synchronous.StorageKind +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.context.debug +import space.kscience.dataforge.context.logger +import space.kscience.dataforge.meta.Meta + +/** + * Asynchronous version of synchronous API, so for more details check relative docs + */ + +@OptIn(InternalCoroutinesApi::class) +public fun DeviceManager.storeMessages( + factory: Factory, + filterCondition: suspend (DeviceMessage) -> Boolean = { true }, +): Job { + val client = factory(meta, context) + logger.debug { "Storage client created" } + + return hubMessageFlow(context).filter(filterCondition).onEach { message -> + client.storeValue(message, StorageKind.DEVICE_HUB) + }.launchIn(context).apply { + invokeOnCompletion(onCancelling = true) { + client.close() + logger.debug { "Storage client closed" } + } + } +} + +public suspend fun getPropertyHistory( + sourceDeviceName: String, + propertyName: String, + factory: Factory, + meta: Meta = Meta.EMPTY +): List { + return factory(meta).use { + it.getPropertyHistory(sourceDeviceName, propertyName) + } +} + diff --git a/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt new file mode 100644 index 0000000..61c634c --- /dev/null +++ b/controls-storage/src/jvmMain/kotlin/ru/mipt/npm/controls/storage/asynchronous/jvmApi.kt @@ -0,0 +1,41 @@ +package ru.mipt.npm.controls.storage.asynchronous + +import io.ktor.application.* +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.job +import ru.mipt.npm.controls.storage.synchronous.StorageKind +import ru.mipt.npm.magix.server.GenericMagixMessage +import space.kscience.dataforge.context.Factory +import space.kscience.dataforge.meta.Meta + +/** + * Asynchronous version of synchronous API, so for more details check relative docs + */ + +internal fun Flow.store( + client: AsynchronousStorageClient, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +) { + filter(flowFilter).onEach { message -> + client.storeValue(message, StorageKind.MAGIX_SERVER) + } +} + +@OptIn(InternalCoroutinesApi::class) +public fun Application.store( + flow: MutableSharedFlow, + factory: Factory, + meta: Meta = Meta.EMPTY, + flowFilter: suspend (GenericMagixMessage) -> Boolean = { true }, +) { + val client = factory(meta) + + flow.store(client, flowFilter) + coroutineContext.job.invokeOnCompletion(onCancelling = true) { + client.close() + } +} diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index 3796bc7..d13c8ef 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -13,9 +13,9 @@ import ru.mipt.npm.controls.client.connectToMagix import ru.mipt.npm.controls.controllers.DeviceManager import ru.mipt.npm.controls.controllers.install import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration -import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory -import ru.mipt.npm.controls.mongo.connectMongo -import ru.mipt.npm.controls.mongo.storeInMongo +import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory +import ru.mipt.npm.controls.storage.asynchronous.store +import ru.mipt.npm.controls.storage.asynchronous.storeMessages import ru.mipt.npm.controls.storage.synchronous.store import ru.mipt.npm.controls.storage.synchronous.storeMessages import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory @@ -60,13 +60,13 @@ class VirtualCarController : Controller(), ContextAware { store( flow, Meta { XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString() }, DefaultSynchronousXodusClientFactory) - storeInMongo(flow) + store(flow, DefaultAsynchronousMongoClientFactory) } magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) //connect to device entity store xodusStorageJob = deviceManager.storeMessages(DefaultSynchronousXodusClientFactory) //Create mongo client and connect to MongoDB - mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory) + mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory) //Launch device client and connect it to the server val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) deviceManager.connectToMagix(deviceEndpoint)