Refactor asynchronous storage api

This commit is contained in:
Atos1337 2022-01-15 15:42:26 +03:00
parent 89bb132e36
commit cbdd6a477b
7 changed files with 192 additions and 107 deletions

View File

@ -10,5 +10,6 @@ dependencies {
implementation(projects.magix.magixApi)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixServer)
implementation(projects.controlsStorage)
implementation("org.litote.kmongo:kmongo-coroutine-serialization:$kmongoVersion")
}

View File

@ -0,0 +1,78 @@
package ru.mipt.npm.controls.mongo
import kotlinx.serialization.InternalSerializationApi
import kotlinx.serialization.json.Json
import kotlinx.serialization.serializer
import org.litote.kmongo.coroutine.CoroutineClient
import org.litote.kmongo.coroutine.coroutine
import org.litote.kmongo.coroutine.insertOne
import org.litote.kmongo.reactivestreams.KMongo
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.storage.asynchronous.AsynchronousStorageClient
import ru.mipt.npm.controls.storage.synchronous.StorageKind
import ru.mipt.npm.magix.server.GenericMagixMessage
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME: String = "deviceMessage"
private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage"
private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888"
public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName")
public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName")
public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl")
internal class AsynchronousMongoClient(
private val client: CoroutineClient,
private val meta: Meta = Meta.EMPTY
) : AsynchronousStorageClient {
@OptIn(InternalSerializationApi::class)
override suspend fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>) {
when (storageKind) {
StorageKind.DEVICE_HUB -> {
val collection = client
.getDatabase(
meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string
?: DEFAULT_DEVICE_MESSAGE_DATABASE_NAME
)
.getCollection<DeviceMessage>()
collection.insertOne(Json.encodeToString(clazz.serializer(), value))
}
StorageKind.MAGIX_SERVER -> {
val collection = client
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
.getCollection<GenericMagixMessage>()
collection.insertOne(Json.encodeToString(clazz.serializer(), value))
}
}
}
override suspend fun getPropertyHistory(
sourceDeviceName: String,
propertyName: String
): List<PropertyChangedMessage> {
TODO("Not yet implemented: problems with deserialization")
}
override fun close() {
client.close()
}
}
public object DefaultAsynchronousMongoClientFactory : Factory<AsynchronousStorageClient> {
override fun invoke(meta: Meta, context: Context): AsynchronousStorageClient {
val client = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let {
KMongo.createClient(it).coroutine
} ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine
return AsynchronousMongoClient(client, meta)
}
}

View File

@ -1,102 +0,0 @@
package ru.mipt.npm.controls.mongo
import io.ktor.application.*
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.litote.kmongo.coroutine.CoroutineClient
import org.litote.kmongo.coroutine.CoroutineCollection
import org.litote.kmongo.coroutine.coroutine
import org.litote.kmongo.coroutine.insertOne
import org.litote.kmongo.reactivestreams.KMongo
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import ru.mipt.npm.magix.server.GenericMagixMessage
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.names.Name
private const val DEFAULT_MONGO_DATABASE_URL = "mongodb://mongoadmin:secret@localhost:27888"
private const val DEFAULT_DEVICE_MESSAGE_DATABASE_NAME = "deviceMessage"
private const val DEFAULT_MAGIX_MESSAGE_DATABASE_NAME = "magixMessage"
public val MONGO_DATABASE_URL_PROPERTY: Name = Name.of("mongo", "databaseUrl")
public val MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "deviceMessageDatabaseName")
public val MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY: Name = Name.of("mongo", "magixMessageDatabaseName")
public object DefaultMongoClientFactory : Factory<CoroutineClient> {
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta[MONGO_DATABASE_URL_PROPERTY]?.string?.let {
KMongo.createClient(it).coroutine
} ?: KMongo.createClient(DEFAULT_MONGO_DATABASE_URL).coroutine
}
/**
* Begin to store DeviceMessages from this DeviceManager
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
* DeviceManager's meta and context will be used for in invoke method.
* @param filterCondition allow you to specify messages which we want to store. Always true by default.
* @return Job which responsible for our storage
*/
@OptIn(InternalCoroutinesApi::class)
public fun DeviceManager.connectMongo(
factory: Factory<CoroutineClient>,
filterCondition: suspend (DeviceMessage) -> Boolean = { true }
): Job {
val client = factory.invoke(meta, context)
logger.debug { "Mongo client opened" }
val collection = client
.getDatabase(meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_DEVICE_MESSAGE_DATABASE_NAME)
.getCollection<DeviceMessage>()
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
context.launch {
collection.insertOne(Json.encodeToString(message))
}
}.launchIn(context).apply {
invokeOnCompletion(onCancelling = true) {
logger.debug { "Mongo client closed" }
client.close()
}
}
}
internal fun Flow<GenericMagixMessage>.storeInMongo(
collection: CoroutineCollection<GenericMagixMessage>,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
filter(flowFilter).onEach { message ->
collection.insertOne(Json.encodeToString(message))
}
}
/** Begin to store MagixMessages from certain flow
* @param flow flow of messages which we will store
* @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
* @param flowFilter allow you to specify messages which we want to store. Always true by default.
*/
@OptIn(InternalCoroutinesApi::class)
public fun Application.storeInMongo(
flow: MutableSharedFlow<GenericMagixMessage>,
meta: Meta = Meta.EMPTY,
factory: Factory<CoroutineClient> = DefaultMongoClientFactory,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
val client = factory.invoke(meta)
val collection = client
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
.getCollection<GenericMagixMessage>()
flow.storeInMongo(collection, flowFilter)
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
client.close()
}
}

View File

@ -0,0 +1,16 @@
package ru.mipt.npm.controls.storage.asynchronous
import kotlinx.io.core.Closeable
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.storage.synchronous.StorageKind
import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient
import kotlin.reflect.KClass
public interface AsynchronousStorageClient : Closeable {
public suspend fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>)
public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
}
public suspend inline fun <reified T : Any> AsynchronousStorageClient.storeValue(value: T, storageKind: StorageKind): Unit =
storeValue(value, storageKind, T::class)

View File

@ -0,0 +1,51 @@
package ru.mipt.npm.controls.storage.asynchronous
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.io.core.use
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import ru.mipt.npm.controls.storage.synchronous.StorageKind
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.debug
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
/**
* Asynchronous version of synchronous API, so for more details check relative docs
*/
@OptIn(InternalCoroutinesApi::class)
public fun DeviceManager.storeMessages(
factory: Factory<AsynchronousStorageClient>,
filterCondition: suspend (DeviceMessage) -> Boolean = { true },
): Job {
val client = factory(meta, context)
logger.debug { "Storage client created" }
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
client.storeValue(message, StorageKind.DEVICE_HUB)
}.launchIn(context).apply {
invokeOnCompletion(onCancelling = true) {
client.close()
logger.debug { "Storage client closed" }
}
}
}
public suspend fun getPropertyHistory(
sourceDeviceName: String,
propertyName: String,
factory: Factory<AsynchronousStorageClient>,
meta: Meta = Meta.EMPTY
): List<PropertyChangedMessage> {
return factory(meta).use {
it.getPropertyHistory(sourceDeviceName, propertyName)
}
}

View File

@ -0,0 +1,41 @@
package ru.mipt.npm.controls.storage.asynchronous
import io.ktor.application.*
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.job
import ru.mipt.npm.controls.storage.synchronous.StorageKind
import ru.mipt.npm.magix.server.GenericMagixMessage
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
/**
* Asynchronous version of synchronous API, so for more details check relative docs
*/
internal fun Flow<GenericMagixMessage>.store(
client: AsynchronousStorageClient,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
filter(flowFilter).onEach { message ->
client.storeValue(message, StorageKind.MAGIX_SERVER)
}
}
@OptIn(InternalCoroutinesApi::class)
public fun Application.store(
flow: MutableSharedFlow<GenericMagixMessage>,
factory: Factory<AsynchronousStorageClient>,
meta: Meta = Meta.EMPTY,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
val client = factory(meta)
flow.store(client, flowFilter)
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
client.close()
}
}

View File

@ -13,9 +13,9 @@ import ru.mipt.npm.controls.client.connectToMagix
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.install
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
import ru.mipt.npm.controls.mongo.DefaultMongoClientFactory
import ru.mipt.npm.controls.mongo.connectMongo
import ru.mipt.npm.controls.mongo.storeInMongo
import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory
import ru.mipt.npm.controls.storage.asynchronous.store
import ru.mipt.npm.controls.storage.asynchronous.storeMessages
import ru.mipt.npm.controls.storage.synchronous.store
import ru.mipt.npm.controls.storage.synchronous.storeMessages
import ru.mipt.npm.controls.xodus.DefaultSynchronousXodusClientFactory
@ -60,13 +60,13 @@ class VirtualCarController : Controller(), ContextAware {
store( flow, Meta {
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
}, DefaultSynchronousXodusClientFactory)
storeInMongo(flow)
store(flow, DefaultAsynchronousMongoClientFactory)
}
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
//connect to device entity store
xodusStorageJob = deviceManager.storeMessages(DefaultSynchronousXodusClientFactory)
//Create mongo client and connect to MongoDB
mongoStorageJob = deviceManager.connectMongo(DefaultMongoClientFactory)
mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory)
//Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
deviceManager.connectToMagix(deviceEndpoint)