Add connection to xodus from magix server

This commit is contained in:
Atos1337 2021-11-26 17:26:41 +03:00
parent 0b14c7ed7f
commit 3639783b4e
4 changed files with 63 additions and 30 deletions

View File

@ -9,6 +9,7 @@ dependencies {
implementation(projects.controlsCore)
implementation(projects.magix.magixApi)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixServer)
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
implementation("org.jetbrains.xodus:xodus-environment:$xodusVersion")
implementation("org.jetbrains.xodus:xodus-vfs:$xodusVersion")

View File

@ -0,0 +1,48 @@
package ru.mipt.npm.controls.xodus
import io.ktor.application.*
import io.ktor.server.engine.*
import jetbrains.exodus.entitystore.PersistentEntityStore
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.server.GenericMagixMessage
import ru.mipt.npm.magix.server.startMagixServer
public fun DeviceManager.connectXodus(
entityStore: PersistentEntityStore,
filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage }
): Job = hubMessageFlow(context).filter(filterCondition).onEach { message ->
entityStore.executeInTransaction {
(message as PropertyChangedMessage).toEntity(it)
}
}.launchIn(context)
public fun CoroutineScope.startMagixServer(
entityStore: PersistentEntityStore,
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
buffer: Int = 100,
enableRawRSocket: Boolean = true,
enableZmq: Boolean = true,
applicationConfiguration: Application.(MutableSharedFlow<GenericMagixMessage>) -> Unit = {},
): ApplicationEngine = startMagixServer(
port, buffer, enableRawRSocket, enableZmq
) { flow ->
applicationConfiguration(flow)
flow.filter(flowFilter).onEach { message ->
entityStore.executeInTransaction { txn ->
val entity = txn.newEntity("MagixMessage")
entity.setProperty("value", message.toString())
}
}
}

View File

@ -1,21 +0,0 @@
package ru.mipt.npm.controls.xodus
import jetbrains.exodus.entitystore.PersistentEntityStore
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.hubMessageFlow
public fun DeviceManager.connectXodus(
entityStore: PersistentEntityStore,
filterCondition: suspend (DeviceMessage) -> Boolean = { it is PropertyChangedMessage }
): Job = hubMessageFlow(context).filter(filterCondition).onEach { message ->
entityStore.executeInTransaction {
(message as PropertyChangedMessage).toEntity(it)
}
}.launchIn(context)

View File

@ -18,7 +18,7 @@ import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
import ru.mipt.npm.controls.xodus.connectXodus
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
import ru.mipt.npm.magix.server.startMagixServer
import ru.mipt.npm.controls.xodus.startMagixServer
import space.kscience.dataforge.context.*
import tornadofx.*
@ -27,7 +27,8 @@ class VirtualCarController : Controller(), ContextAware {
var virtualCar: VirtualCar? = null
var magixVirtualCar: MagixVirtualCar? = null
var magixServer: ApplicationEngine? = null
var entityStore: PersistentEntityStore? = null
var deviceEntityStore: PersistentEntityStore? = null
var magixEntityStore: PersistentEntityStore? = null
var storageJob: Job? =null
override val context = Context("demoDevice") {
@ -40,12 +41,14 @@ class VirtualCarController : Controller(), ContextAware {
context.launch {
virtualCar = deviceManager.install("virtual-car", VirtualCar)
//starting magix event loop
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
//starting magix event loop and connect it to entity store
magixEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.server_messages")
magixServer = startMagixServer(
entityStore = magixEntityStore as PersistentEntityStore, enableRawRSocket = true, enableZmq = true)
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
entityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
//connect to entity store
storageJob = deviceManager.connectXodus(entityStore as PersistentEntityStore)
deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
//connect to device entity store
storageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore)
//Launch device client and connect it to the server
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
deviceManager.connectToMagix(deviceEndpoint)
@ -60,8 +63,10 @@ class VirtualCarController : Controller(), ContextAware {
logger.info { "Magix virtual car server stopped" }
virtualCar?.close()
logger.info { "Virtual car server stopped" }
entityStore?.close()
logger.info { "Entity store closed" }
deviceEntityStore?.close()
logger.info { "Device entity store closed" }
magixEntityStore?.close()
logger.info { "Magix entity store closed" }
context.close()
}
}