Fix issues
This commit is contained in:
parent
23e821d2c2
commit
0c4c2e4cc0
@ -1,26 +0,0 @@
|
|||||||
# Description
|
|
||||||
This module allows you to store [DeviceMessages](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceMessage.kt)
|
|
||||||
from certain [DeviceManager](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt)
|
|
||||||
or [MagixMessages](magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt)
|
|
||||||
from [magix server](/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt)
|
|
||||||
in [mongoDB](https://www.mongodb.com/).
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
|
|
||||||
All usage examples can be found in [VirtualCarController](/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt).
|
|
||||||
|
|
||||||
## Storage from Device Manager
|
|
||||||
|
|
||||||
Just call storeMessagesInXodus. For more details, you can see comments in [source code](/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt)
|
|
||||||
|
|
||||||
## Storage from Magix Server
|
|
||||||
|
|
||||||
Just pass such lambda as parameter to startMagixServer:
|
|
||||||
```kotlin
|
|
||||||
{ flow ->
|
|
||||||
// some code
|
|
||||||
storeInMongo(flow)
|
|
||||||
// some code
|
|
||||||
}
|
|
||||||
```
|
|
||||||
For more details, you can see comments in [source code](/controls-mongo/src/main/kotlin/ru/mipt/npm/controls/mongo/connections.kt)
|
|
@ -1,6 +1,7 @@
|
|||||||
package ru.mipt.npm.controls.mongo
|
package ru.mipt.npm.controls.mongo
|
||||||
|
|
||||||
import kotlinx.serialization.InternalSerializationApi
|
import kotlinx.serialization.InternalSerializationApi
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import kotlinx.serialization.serializer
|
import kotlinx.serialization.serializer
|
||||||
import org.litote.kmongo.coroutine.CoroutineClient
|
import org.litote.kmongo.coroutine.CoroutineClient
|
||||||
@ -31,10 +32,7 @@ internal class AsynchronousMongoClient(
|
|||||||
private val client: CoroutineClient,
|
private val client: CoroutineClient,
|
||||||
private val meta: Meta = Meta.EMPTY
|
private val meta: Meta = Meta.EMPTY
|
||||||
) : AsynchronousStorageClient {
|
) : AsynchronousStorageClient {
|
||||||
@OptIn(InternalSerializationApi::class)
|
override suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>) {
|
||||||
override suspend fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>) {
|
|
||||||
when (storageKind) {
|
|
||||||
StorageKind.DEVICE_HUB -> {
|
|
||||||
val collection = client
|
val collection = client
|
||||||
.getDatabase(
|
.getDatabase(
|
||||||
meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string
|
meta[MONGO_DEVICE_MESSAGE_DATABASE_NAME_PROPERTY]?.string
|
||||||
@ -42,17 +40,15 @@ internal class AsynchronousMongoClient(
|
|||||||
)
|
)
|
||||||
.getCollection<DeviceMessage>()
|
.getCollection<DeviceMessage>()
|
||||||
|
|
||||||
collection.insertOne(Json.encodeToString(clazz.serializer(), value))
|
collection.insertOne(Json.encodeToString(serializer, value))
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageKind.MAGIX_SERVER -> {
|
override suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>) {
|
||||||
val collection = client
|
val collection = client
|
||||||
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
|
.getDatabase(meta[MONGO_MAGIX_MESSAGE_DATABASE_NAME_PROPERTY]?.string ?: DEFAULT_MAGIX_MESSAGE_DATABASE_NAME)
|
||||||
.getCollection<GenericMagixMessage>()
|
.getCollection<GenericMagixMessage>()
|
||||||
|
|
||||||
collection.insertOne(Json.encodeToString(clazz.serializer(), value))
|
collection.insertOne(Json.encodeToString(serializer, value))
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun getPropertyHistory(
|
override suspend fun getPropertyHistory(
|
||||||
|
12
controls-storage/README.md
Normal file
12
controls-storage/README.md
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
# Description
|
||||||
|
|
||||||
|
This module provides API to store [DeviceMessages](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceMessage.kt)
|
||||||
|
from certain [DeviceManager](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt)
|
||||||
|
or [MagixMessages](magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt)
|
||||||
|
from certain [magix server](/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt).
|
||||||
|
|
||||||
|
# Usage
|
||||||
|
|
||||||
|
All usage examples can be found in [VirtualCarController](/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt).
|
||||||
|
|
||||||
|
For more details, you can see comments in source code of this module.
|
@ -1,16 +1,20 @@
|
|||||||
package ru.mipt.npm.controls.storage.asynchronous
|
package ru.mipt.npm.controls.storage.asynchronous
|
||||||
|
|
||||||
import kotlinx.io.core.Closeable
|
import kotlinx.io.core.Closeable
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.serializer
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
|
||||||
import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient
|
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
public interface AsynchronousStorageClient : Closeable {
|
public interface AsynchronousStorageClient : Closeable {
|
||||||
public suspend fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>)
|
public suspend fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>)
|
||||||
|
|
||||||
|
public suspend fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>)
|
||||||
|
|
||||||
public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
public suspend fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
||||||
}
|
}
|
||||||
|
|
||||||
public suspend inline fun <reified T : Any> AsynchronousStorageClient.storeValue(value: T, storageKind: StorageKind): Unit =
|
public suspend inline fun <reified T : Any> AsynchronousStorageClient.storeValueInDeviceHub(value: T): Unit =
|
||||||
storeValue(value, storageKind, T::class)
|
storeValueInDeviceHub(value, serializer())
|
||||||
|
|
||||||
|
public suspend inline fun <reified T : Any> AsynchronousStorageClient.storeValueInMagixServer(value: T): Unit =
|
||||||
|
storeValueInMagixServer(value, serializer())
|
||||||
|
@ -10,7 +10,6 @@ import ru.mipt.npm.controls.api.DeviceMessage
|
|||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||||
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
|
||||||
import space.kscience.dataforge.context.Factory
|
import space.kscience.dataforge.context.Factory
|
||||||
import space.kscience.dataforge.context.debug
|
import space.kscience.dataforge.context.debug
|
||||||
import space.kscience.dataforge.context.logger
|
import space.kscience.dataforge.context.logger
|
||||||
@ -29,7 +28,7 @@ public fun DeviceManager.storeMessages(
|
|||||||
logger.debug { "Storage client created" }
|
logger.debug { "Storage client created" }
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||||
client.storeValue(message, StorageKind.DEVICE_HUB)
|
client.storeValueInDeviceHub(message)
|
||||||
}.launchIn(context).apply {
|
}.launchIn(context).apply {
|
||||||
invokeOnCompletion(onCancelling = true) {
|
invokeOnCompletion(onCancelling = true) {
|
||||||
client.close()
|
client.close()
|
||||||
|
@ -1,14 +1,20 @@
|
|||||||
package ru.mipt.npm.controls.storage.synchronous
|
package ru.mipt.npm.controls.storage.synchronous
|
||||||
|
|
||||||
import kotlinx.io.core.Closeable
|
import kotlinx.io.core.Closeable
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.serializer
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
public interface SynchronousStorageClient : Closeable {
|
public interface SynchronousStorageClient : Closeable {
|
||||||
public fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>)
|
public fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>)
|
||||||
|
|
||||||
|
public fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>)
|
||||||
|
|
||||||
public fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
public fun getPropertyHistory(sourceDeviceName: String, propertyName: String): List<PropertyChangedMessage>
|
||||||
}
|
}
|
||||||
|
|
||||||
public inline fun <reified T : Any> SynchronousStorageClient.storeValue(value: T, storageKind: StorageKind): Unit =
|
public inline fun <reified T : Any> SynchronousStorageClient.storeValueInDeviceHub(value: T): Unit =
|
||||||
storeValue(value, storageKind, T::class)
|
storeValueInDeviceHub(value, serializer())
|
||||||
|
|
||||||
|
public inline fun <reified T : Any> SynchronousStorageClient.storeValueInMagixServer(value: T): Unit =
|
||||||
|
storeValueInMagixServer(value, serializer())
|
||||||
|
@ -34,7 +34,7 @@ public fun DeviceManager.storeMessages(
|
|||||||
logger.debug { "Storage client created" }
|
logger.debug { "Storage client created" }
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||||
client.storeValue(message, StorageKind.DEVICE_HUB)
|
client.storeValueInDeviceHub(message)
|
||||||
}.launchIn(context).apply {
|
}.launchIn(context).apply {
|
||||||
invokeOnCompletion(onCancelling = true) {
|
invokeOnCompletion(onCancelling = true) {
|
||||||
client.close()
|
client.close()
|
||||||
|
@ -7,7 +7,6 @@ import kotlinx.coroutines.flow.MutableSharedFlow
|
|||||||
import kotlinx.coroutines.flow.filter
|
import kotlinx.coroutines.flow.filter
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.job
|
import kotlinx.coroutines.job
|
||||||
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||||
import space.kscience.dataforge.context.Factory
|
import space.kscience.dataforge.context.Factory
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
@ -21,7 +20,7 @@ internal fun Flow<GenericMagixMessage>.store(
|
|||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
) {
|
) {
|
||||||
filter(flowFilter).onEach { message ->
|
filter(flowFilter).onEach { message ->
|
||||||
client.storeValue(message, StorageKind.MAGIX_SERVER)
|
client.storeValueInMagixServer(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ internal fun Flow<GenericMagixMessage>.store(
|
|||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
) {
|
) {
|
||||||
filter(flowFilter).onEach { message ->
|
filter(flowFilter).onEach { message ->
|
||||||
client.storeValue(message, StorageKind.MAGIX_SERVER)
|
client.storeValueInMagixServer(message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,26 +0,0 @@
|
|||||||
# Description
|
|
||||||
This module allows you to store [DeviceMessages](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/api/DeviceMessage.kt)
|
|
||||||
from certain [DeviceManager](/controls-core/src/commonMain/kotlin/ru/mipt/npm/controls/controllers/DeviceManager.kt)
|
|
||||||
or [MagixMessages](magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt)
|
|
||||||
from [magix server](/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt)
|
|
||||||
in [xodus database](https://github.com/JetBrains/xodus).
|
|
||||||
|
|
||||||
# Usage
|
|
||||||
|
|
||||||
All usage examples can be found in [VirtualCarController](/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt).
|
|
||||||
|
|
||||||
## Storage from Device Manager
|
|
||||||
|
|
||||||
Just call connectMongo. For more details, you can see comments in [source code](/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt)
|
|
||||||
|
|
||||||
## Storage from Magix Server
|
|
||||||
|
|
||||||
Just pass such lambda as parameter to startMagixServer:
|
|
||||||
```kotlin
|
|
||||||
{ flow ->
|
|
||||||
// some code
|
|
||||||
storeInXodus(flow)
|
|
||||||
// some code
|
|
||||||
}
|
|
||||||
```
|
|
||||||
For more details, you can see comments in [source code](/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/connections.kt)
|
|
@ -3,6 +3,7 @@ package ru.mipt.npm.controls.xodus
|
|||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||||
import kotlinx.datetime.Instant
|
import kotlinx.datetime.Instant
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
import ru.mipt.npm.controls.storage.synchronous.StorageKind
|
||||||
import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient
|
import ru.mipt.npm.controls.storage.synchronous.SynchronousStorageClient
|
||||||
@ -23,13 +24,12 @@ private const val DEVICE_HUB_ENTITY_TYPE = "DeviceMessage"
|
|||||||
private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage"
|
private const val MAGIX_SERVER_ENTITY_TYPE = "MagixMessage"
|
||||||
|
|
||||||
internal class SynchronousXodusClient(private val entityStore: PersistentEntityStore) : SynchronousStorageClient {
|
internal class SynchronousXodusClient(private val entityStore: PersistentEntityStore) : SynchronousStorageClient {
|
||||||
override fun <T : Any> storeValue(value: T, storageKind: StorageKind, clazz: KClass<T>) {
|
override fun <T : Any> storeValueInDeviceHub(value: T, serializer: KSerializer<T>) {
|
||||||
val entityType = when (storageKind) {
|
entityStore.encodeToEntity(value, DEVICE_HUB_ENTITY_TYPE, serializer)
|
||||||
StorageKind.DEVICE_HUB -> DEVICE_HUB_ENTITY_TYPE
|
|
||||||
StorageKind.MAGIX_SERVER -> MAGIX_SERVER_ENTITY_TYPE
|
|
||||||
}
|
}
|
||||||
|
|
||||||
entityStore.encodeToEntity(value, entityType, clazz)
|
override fun <T : Any> storeValueInMagixServer(value: T, serializer: KSerializer<T>) {
|
||||||
|
entityStore.encodeToEntity(value, MAGIX_SERVER_ENTITY_TYPE, serializer)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getPropertyHistory(
|
override fun getPropertyHistory(
|
||||||
|
@ -5,6 +5,7 @@ import jetbrains.exodus.entitystore.EntityId
|
|||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import jetbrains.exodus.entitystore.StoreTransaction
|
import jetbrains.exodus.entitystore.StoreTransaction
|
||||||
import kotlinx.serialization.InternalSerializationApi
|
import kotlinx.serialization.InternalSerializationApi
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
import kotlinx.serialization.SerializationStrategy
|
import kotlinx.serialization.SerializationStrategy
|
||||||
import kotlinx.serialization.json.*
|
import kotlinx.serialization.json.*
|
||||||
import kotlinx.serialization.serializer
|
import kotlinx.serialization.serializer
|
||||||
@ -73,5 +74,5 @@ public inline fun <reified T> PersistentEntityStore.encodeToEntity(value: T, ent
|
|||||||
encodeToEntity(serializer(), value, entityType)
|
encodeToEntity(serializer(), value, entityType)
|
||||||
|
|
||||||
@OptIn(InternalSerializationApi::class)
|
@OptIn(InternalSerializationApi::class)
|
||||||
public fun <T : Any> PersistentEntityStore.encodeToEntity(value: T, entityType: String, clazz: KClass<T>): EntityId = encodeToEntity(
|
public fun <T : Any> PersistentEntityStore.encodeToEntity(value: T, entityType: String, serializer: KSerializer<T>): EntityId =
|
||||||
clazz.serializer(), value, entityType)
|
encodeToEntity(serializer, value, entityType)
|
||||||
|
Loading…
Reference in New Issue
Block a user