Dev #8
@ -10,6 +10,7 @@ dependencies {
|
|||||||
implementation(projects.magix.magixApi)
|
implementation(projects.magix.magixApi)
|
||||||
implementation(projects.controlsMagixClient)
|
implementation(projects.controlsMagixClient)
|
||||||
implementation(projects.magix.magixServer)
|
implementation(projects.magix.magixServer)
|
||||||
|
implementation(projects.xodusSerialization)
|
||||||
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
|
||||||
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")
|
||||||
|
@ -12,6 +12,7 @@ import ru.mipt.npm.controls.api.PropertyChangedMessage
|
|||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||||
|
import ru.mipt.npm.xodus.serialization.json.encodeToEntity
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Factory
|
import space.kscience.dataforge.context.Factory
|
||||||
import space.kscience.dataforge.context.debug
|
import space.kscience.dataforge.context.debug
|
||||||
@ -36,15 +37,13 @@ public object EntityStoreFactory : Factory<PersistentEntityStore> {
|
|||||||
@OptIn(InternalCoroutinesApi::class)
|
@OptIn(InternalCoroutinesApi::class)
|
||||||
public fun DeviceManager.connectXodus(
|
public fun DeviceManager.connectXodus(
|
||||||
factory: Factory<PersistentEntityStore>,
|
factory: Factory<PersistentEntityStore>,
|
||||||
filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage }
|
filterCondition: suspend (DeviceMessage) -> Boolean = { true }
|
||||||
): Job {
|
): Job {
|
||||||
val entityStore = factory.invoke(meta, context)
|
val entityStore = factory.invoke(meta, context)
|
||||||
logger.debug { "Device entity store opened" }
|
logger.debug { "Device entity store opened" }
|
||||||
|
|
||||||
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
return hubMessageFlow(context).filter(filterCondition).onEach { message ->
|
||||||
entityStore.executeInTransaction {
|
entityStore.encodeToEntity(message, "DeviceMessage")
|
||||||
(message as PropertyChangedMessage).toEntity(it)
|
|
||||||
}
|
|
||||||
}.launchIn(context).apply {
|
}.launchIn(context).apply {
|
||||||
invokeOnCompletion(onCancelling = true) {
|
invokeOnCompletion(onCancelling = true) {
|
||||||
entityStore.close()
|
entityStore.close()
|
||||||
@ -78,10 +77,7 @@ public fun SharedFlow<GenericMagixMessage>.storeInXodus(
|
|||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
){
|
){
|
||||||
filter(flowFilter).onEach { message ->
|
filter(flowFilter).onEach { message ->
|
||||||
entityStore.executeInTransaction { txn ->
|
entityStore.encodeToEntity(message, "MagixMessage")
|
||||||
val entity = txn.newEntity("MagixMessage")
|
|
||||||
entity.setProperty("value", message.toString())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user