Refactor xodus storage for local history

This commit is contained in:
Alexander Nozik 2022-06-02 09:21:07 +03:00
parent b6f3769529
commit eb7507191e
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
4 changed files with 122 additions and 92 deletions

View File

@ -1,46 +1,46 @@
package ru.mipt.npm.controls.storage
import io.ktor.server.application.Application
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.job
import ru.mipt.npm.magix.server.GenericMagixMessage
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
/**
* Asynchronous version of synchronous API, so for more details check relative docs
*/
internal fun Flow<GenericMagixMessage>.store(
client: EventStorage,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
filter(flowFilter).onEach { message ->
client.storeMagixMessage(message)
}
}
/** Begin to store MagixMessages from certain flow
* @param flow flow of messages which we will store
* @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory
* @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
* @param flowFilter allow you to specify messages which we want to store. Always true by default.
*/
@OptIn(InternalCoroutinesApi::class)
public fun Application.store(
flow: MutableSharedFlow<GenericMagixMessage>,
factory: Factory<EventStorage>,
meta: Meta = Meta.EMPTY,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
) {
val client = factory(meta)
flow.store(client, flowFilter)
coroutineContext.job.invokeOnCompletion(onCancelling = true) {
client.close()
}
}
//package ru.mipt.npm.controls.storage
//
//import io.ktor.server.application.Application
//import kotlinx.coroutines.InternalCoroutinesApi
//import kotlinx.coroutines.flow.Flow
//import kotlinx.coroutines.flow.MutableSharedFlow
//import kotlinx.coroutines.flow.filter
//import kotlinx.coroutines.flow.onEach
//import kotlinx.coroutines.job
//import ru.mipt.npm.magix.server.GenericMagixMessage
//import space.kscience.dataforge.context.Factory
//import space.kscience.dataforge.meta.Meta
//
///**
// * Asynchronous version of synchronous API, so for more details check relative docs
// */
//
//internal fun Flow<GenericMagixMessage>.store(
// client: EventStorage,
// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
//) {
// filter(flowFilter).onEach { message ->
// client.storeMagixMessage(message)
// }
//}
//
///** Begin to store MagixMessages from certain flow
// * @param flow flow of messages which we will store
// * @param meta Meta which may have some configuration parameters for our storage and will be used in invoke method of factory
// * @param factory factory that will be used for creating persistent entity store instance. DefaultPersistentStoreFactory by default.
// * @param flowFilter allow you to specify messages which we want to store. Always true by default.
// */
//@OptIn(InternalCoroutinesApi::class)
//public fun Application.store(
// flow: MutableSharedFlow<GenericMagixMessage>,
// factory: Factory<EventStorage>,
// meta: Meta = Meta.EMPTY,
// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
//) {
// val client = factory(meta)
//
// flow.store(client, flowFilter)
// coroutineContext.job.invokeOnCompletion(onCancelling = true) {
// client.close()
// }
//}

View File

@ -27,7 +27,7 @@ import space.kscience.dataforge.names.matches
import space.kscience.dataforge.names.parseAsName
internal fun StoreTransaction.writeMessage(message: DeviceMessage): Entity {
internal fun StoreTransaction.writeMessage(message: DeviceMessage): Unit {
val entity: Entity = newEntity(XodusDeviceMessageStorage.DEVICE_MESSAGE_ENTITY_TYPE)
val json = Json.encodeToJsonElement(DeviceMessage.serializer(), message).jsonObject
val type = json["type"]?.jsonPrimitive?.content ?: error("Message json representation must have type.")
@ -43,8 +43,6 @@ internal fun StoreTransaction.writeMessage(message: DeviceMessage): Entity {
entity.setProperty(DeviceMessage::targetDevice.name, it.toString())
}
entity.setBlobString("json", Json.encodeToString(json))
return entity
}
@ -65,15 +63,16 @@ public class XodusDeviceMessageStorage(
) : DeviceMessageStorage, AutoCloseable {
override suspend fun write(event: DeviceMessage) {
//entityStore.encodeToEntity(event, DEVICE_MESSAGE_ENTITY_TYPE, DeviceMessage.serializer())
entityStore.computeInTransaction { txn ->
entityStore.executeInTransaction { txn ->
txn.writeMessage(event)
}
}
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInTransaction { transaction ->
transaction.getAll(
override suspend fun readAll(): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.sort(
DEVICE_MESSAGE_ENTITY_TYPE,
DeviceMessage::time.name,
true
).map {
Json.decodeFromString(
DeviceMessage.serializer(),
@ -87,22 +86,21 @@ public class XodusDeviceMessageStorage(
range: ClosedRange<Instant>?,
sourceDevice: Name?,
targetDevice: Name?,
): List<DeviceMessage> = entityStore.computeInTransaction { transaction ->
): List<DeviceMessage> = entityStore.computeInReadonlyTransaction { transaction ->
transaction.find(
DEVICE_MESSAGE_ENTITY_TYPE,
"type",
eventType
).mapNotNull {
if (it.timeInRange(range) &&
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
) {
Json.decodeFromString(
DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found")
)
} else null
}
).asSequence().filter {
it.timeInRange(range) &&
it.propertyMatchesName(DeviceMessage::sourceDevice.name, sourceDevice) &&
it.propertyMatchesName(DeviceMessage::targetDevice.name, targetDevice)
}.map {
Json.decodeFromString(
DeviceMessage.serializer(),
it.getBlobString("json") ?: error("No json content found")
)
}.sortedBy { it.time }.toList()
}
override fun close() {
@ -110,7 +108,7 @@ public class XodusDeviceMessageStorage(
}
public companion object : Factory<XodusDeviceMessageStorage> {
internal const val DEVICE_MESSAGE_ENTITY_TYPE = "DeviceMessage"
internal const val DEVICE_MESSAGE_ENTITY_TYPE = "controls-kt.message"
public val XODUS_STORE_PROPERTY: Name = Name.of("xodus", "storagePath")

View File

@ -6,20 +6,19 @@ import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.storage.getPropertyHistory
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
import ru.mipt.npm.controls.xodus.XodusEventStorage
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
import ru.mipt.npm.controls.xodus.XodusDeviceMessageStorage
import ru.mipt.npm.controls.xodus.query
import ru.mipt.npm.controls.xodus.writeMessage
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import java.io.File
import space.kscience.dataforge.names.asName
import java.nio.file.Files
internal class PropertyHistoryTest {
companion object {
private val storeName = ".property_history_test"
private val entityStore = PersistentEntityStores.newInstance(storeName)
val storeFile = Files.createTempDirectory("controls-xodus").toFile()
private val propertyChangedMessages = listOf(
PropertyChangedMessage(
@ -45,28 +44,34 @@ internal class PropertyHistoryTest {
@BeforeAll
@JvmStatic
fun createEntities() {
propertyChangedMessages.forEach {
entityStore.encodeToEntity<DeviceMessage>(it, "DeviceMessage")
PersistentEntityStores.newInstance(storeFile).use {
it.executeInTransaction { transaction ->
propertyChangedMessages.forEach { message ->
transaction.writeMessage(message)
}
}
}
entityStore.close()
}
@AfterAll
@JvmStatic
fun deleteDatabase() {
File(storeName).deleteRecursively()
storeFile.deleteRecursively()
}
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun getPropertyHistoryTest() = runTest {
assertEquals(
listOf(propertyChangedMessages[0]),
getPropertyHistory(
"virtual-car", "speed", XodusEventStorage, Meta {
XODUS_STORE_PROPERTY put storeName
})
)
PersistentEntityStores.newInstance(storeFile).use { entityStore ->
XodusDeviceMessageStorage(entityStore).use { storage ->
assertEquals(
propertyChangedMessages[0],
storage.query<PropertyChangedMessage>(
sourceDevice = "virtual-car".asName()
).first { it.property == "speed" }
)
}
}
}
}

View File

@ -1,25 +1,52 @@
package ru.mipt.npm.magix.storage.xodus
import jetbrains.exodus.entitystore.PersistentEntityStore
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.JsonElement
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import java.nio.file.Path
public class XodusMagixStorage(
private val scope: CoroutineScope,
private val path: Path,
private val endpoint: MagixEndpoint<JsonElement>,
private val filter: MagixMessageFilter = MagixMessageFilter(),
scope: CoroutineScope,
private val store: PersistentEntityStore,
endpoint: MagixEndpoint<JsonElement>,
filter: MagixMessageFilter = MagixMessageFilter(),
) : AutoCloseable {
private val subscriptionJob = endpoint.subscribe(filter).onEach {
TODO()
//TODO consider message buffering
private val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
store.executeInTransaction { transaction ->
transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply {
setProperty(MagixMessage<*>::origin.name, message.origin)
setProperty(MagixMessage<*>::format.name, message.format)
setBlobString(MagixMessage<*>::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload))
message.target?.let {
setProperty(MagixMessage<*>::target.name, it)
}
message.id?.let {
setProperty(MagixMessage<*>::id.name, it)
}
message.parentId?.let {
setProperty(MagixMessage<*>::parentId.name, it)
}
message.user?.let {
setBlobString(MagixMessage<*>::user.name, MagixEndpoint.magixJson.encodeToString(it))
}
}
}
}.launchIn(scope)
override fun close() {
subscriptionJob.cancel()
}
public companion object {
public const val MAGIC_MESSAGE_ENTITY_TYPE: String = "magix.message"
}
}