Architecture rework
This commit is contained in:
parent
312cd06706
commit
5da1559882
@ -23,11 +23,9 @@ import space.kscience.dataforge.meta.string
|
|||||||
public object MongoClientFactory : Factory<CoroutineClient> {
|
public object MongoClientFactory : Factory<CoroutineClient> {
|
||||||
public const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
|
public const val connectionString: String = "mongodb://mongoadmin:secret@localhost:27888"
|
||||||
|
|
||||||
override fun invoke(meta: Meta, context: Context): CoroutineClient {
|
override fun invoke(meta: Meta, context: Context): CoroutineClient = meta["connectionString"]?.string?.let {
|
||||||
return meta["connectionString"]?.string?.let {
|
|
||||||
KMongo.createClient(it).coroutine
|
KMongo.createClient(it).coroutine
|
||||||
} ?: KMongo.createClient(connectionString).coroutine
|
} ?: KMongo.createClient(connectionString).coroutine
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun DeviceManager.connectMongo(
|
public fun DeviceManager.connectMongo(
|
||||||
@ -40,4 +38,8 @@ public fun DeviceManager.connectMongo(
|
|||||||
.getCollection<DeviceMessage>()
|
.getCollection<DeviceMessage>()
|
||||||
.insertOne(Json.encodeToString(message))
|
.insertOne(Json.encodeToString(message))
|
||||||
}
|
}
|
||||||
}.launchIn(context)
|
}.launchIn(context).apply {
|
||||||
|
invokeOnCompletion {
|
||||||
|
client.close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,11 +1,8 @@
|
|||||||
package ru.mipt.npm.controls.xodus
|
package ru.mipt.npm.controls.xodus
|
||||||
|
|
||||||
import io.ktor.application.*
|
|
||||||
import io.ktor.server.engine.*
|
|
||||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.Job
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.SharedFlow
|
||||||
import kotlinx.coroutines.flow.filter
|
import kotlinx.coroutines.flow.filter
|
||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
@ -13,9 +10,7 @@ import ru.mipt.npm.controls.api.DeviceMessage
|
|||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||||
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
import ru.mipt.npm.controls.controllers.hubMessageFlow
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
|
||||||
import ru.mipt.npm.magix.server.GenericMagixMessage
|
import ru.mipt.npm.magix.server.GenericMagixMessage
|
||||||
import ru.mipt.npm.magix.server.startMagixServer
|
|
||||||
|
|
||||||
|
|
||||||
public fun DeviceManager.connectXodus(
|
public fun DeviceManager.connectXodus(
|
||||||
@ -27,19 +22,31 @@ public fun DeviceManager.connectXodus(
|
|||||||
}
|
}
|
||||||
}.launchIn(context)
|
}.launchIn(context)
|
||||||
|
|
||||||
public fun CoroutineScope.startMagixServer(
|
//public fun CoroutineScope.startMagixServer(
|
||||||
|
// entityStore: PersistentEntityStore,
|
||||||
|
// flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
|
// port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
|
||||||
|
// buffer: Int = 100,
|
||||||
|
// enableRawRSocket: Boolean = true,
|
||||||
|
// enableZmq: Boolean = true,
|
||||||
|
// applicationConfiguration: Application.(MutableSharedFlow<GenericMagixMessage>) -> Unit = {},
|
||||||
|
//): ApplicationEngine = startMagixServer(
|
||||||
|
// port, buffer, enableRawRSocket, enableZmq
|
||||||
|
//) { flow ->
|
||||||
|
// applicationConfiguration(flow)
|
||||||
|
// flow.filter(flowFilter).onEach { message ->
|
||||||
|
// entityStore.executeInTransaction { txn ->
|
||||||
|
// val entity = txn.newEntity("MagixMessage")
|
||||||
|
// entity.setProperty("value", message.toString())
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
|
public fun SharedFlow<GenericMagixMessage>.storeInXodus(
|
||||||
entityStore: PersistentEntityStore,
|
entityStore: PersistentEntityStore,
|
||||||
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
flowFilter: suspend (GenericMagixMessage) -> Boolean = { true },
|
||||||
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
|
){
|
||||||
buffer: Int = 100,
|
filter(flowFilter).onEach { message ->
|
||||||
enableRawRSocket: Boolean = true,
|
|
||||||
enableZmq: Boolean = true,
|
|
||||||
applicationConfiguration: Application.(MutableSharedFlow<GenericMagixMessage>) -> Unit = {},
|
|
||||||
): ApplicationEngine = startMagixServer(
|
|
||||||
port, buffer, enableRawRSocket, enableZmq
|
|
||||||
) { flow ->
|
|
||||||
applicationConfiguration(flow)
|
|
||||||
flow.filter(flowFilter).onEach { message ->
|
|
||||||
entityStore.executeInTransaction { txn ->
|
entityStore.executeInTransaction { txn ->
|
||||||
val entity = txn.newEntity("MagixMessage")
|
val entity = txn.newEntity("MagixMessage")
|
||||||
entity.setProperty("value", message.toString())
|
entity.setProperty("value", message.toString())
|
||||||
|
@ -5,11 +5,14 @@ import kotlinx.datetime.Instant
|
|||||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||||
import ru.mipt.npm.controls.xodus.toPropertyChangedMessage
|
import ru.mipt.npm.controls.xodus.toPropertyChangedMessage
|
||||||
|
|
||||||
public fun StoreTransaction.fromTo(from: Instant, to: Instant): List<PropertyChangedMessage> {
|
//selectDeviceMessagesInRange
|
||||||
return find(
|
public fun StoreTransaction.fromTo(
|
||||||
|
range: ClosedRange<Instant>,
|
||||||
|
// from: Instant,
|
||||||
|
// to: Instant,
|
||||||
|
): List<PropertyChangedMessage> = find(
|
||||||
"PropertyChangedMessage",
|
"PropertyChangedMessage",
|
||||||
"time",
|
"time",
|
||||||
from.toEpochMilliseconds(),
|
range.start.toEpochMilliseconds(),
|
||||||
to.toEpochMilliseconds()
|
range.endInclusive.toEpochMilliseconds()
|
||||||
).map { it -> it.toPropertyChangedMessage() }.filterNotNull()
|
).mapNotNull { it.toPropertyChangedMessage() }
|
||||||
}
|
|
||||||
|
@ -55,7 +55,7 @@ internal class QueriesTest {
|
|||||||
@Test
|
@Test
|
||||||
fun testFromTo() {
|
fun testFromTo() {
|
||||||
assertEquals(propertyChangedMessages.subList(0, 2).toSet(), entityStore.computeInReadonlyTransaction {
|
assertEquals(propertyChangedMessages.subList(0, 2).toSet(), entityStore.computeInReadonlyTransaction {
|
||||||
it.fromTo(Instant.fromEpochMilliseconds(1000), Instant.fromEpochMilliseconds(1500))
|
it.fromTo( Instant.fromEpochMilliseconds(1000)..Instant.fromEpochMilliseconds(1500))
|
||||||
}.toSet())
|
}.toSet())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,9 +19,10 @@ import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
|||||||
import ru.mipt.npm.controls.mongo.MongoClientFactory
|
import ru.mipt.npm.controls.mongo.MongoClientFactory
|
||||||
import ru.mipt.npm.controls.mongo.connectMongo
|
import ru.mipt.npm.controls.mongo.connectMongo
|
||||||
import ru.mipt.npm.controls.xodus.connectXodus
|
import ru.mipt.npm.controls.xodus.connectXodus
|
||||||
|
import ru.mipt.npm.controls.xodus.storeInXodus
|
||||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||||
import ru.mipt.npm.controls.xodus.startMagixServer
|
import ru.mipt.npm.magix.server.startMagixServer
|
||||||
import space.kscience.dataforge.context.*
|
import space.kscience.dataforge.context.*
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import tornadofx.*
|
import tornadofx.*
|
||||||
@ -49,15 +50,17 @@ class VirtualCarController : Controller(), ContextAware {
|
|||||||
|
|
||||||
//starting magix event loop and connect it to entity store
|
//starting magix event loop and connect it to entity store
|
||||||
magixEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.server_messages")
|
magixEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.server_messages")
|
||||||
magixServer = startMagixServer(
|
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
||||||
entityStore = magixEntityStore as PersistentEntityStore, enableRawRSocket = true, enableZmq = true)
|
flow.storeInXodus(magixEntityStore as PersistentEntityStore)
|
||||||
|
}
|
||||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||||
deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
|
deviceEntityStore = PersistentEntityStores.newInstance("/home/marvel1337/2021/SCADA/.messages")
|
||||||
//connect to device entity store
|
//connect to device entity store
|
||||||
xodusStorageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore)
|
xodusStorageJob = deviceManager.connectXodus(deviceEntityStore as PersistentEntityStore)
|
||||||
//Create mongo client and connect to MongoDB
|
//Create mongo client and connect to MongoDB
|
||||||
mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context)
|
val mongoClient = MongoClientFactory.invoke(meta = Meta.EMPTY, context)
|
||||||
mongoStorageJob = deviceManager.connectMongo(mongoClient as CoroutineClient)
|
mongoStorageJob = deviceManager.connectMongo(mongoClient)
|
||||||
|
this@VirtualCarController.mongoClient = mongoClient
|
||||||
//Launch device client and connect it to the server
|
//Launch device client and connect it to the server
|
||||||
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
||||||
deviceManager.connectToMagix(deviceEndpoint)
|
deviceManager.connectToMagix(deviceEndpoint)
|
||||||
|
@ -19,6 +19,7 @@ kscience{
|
|||||||
}
|
}
|
||||||
|
|
||||||
val ktorVersion: String by rootProject.extra
|
val ktorVersion: String by rootProject.extra
|
||||||
|
val dataforgeVersion: String by extra
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(project(":controls-tcp"))
|
implementation(project(":controls-tcp"))
|
||||||
|
@ -16,7 +16,7 @@ import ru.mipt.npm.controls.controllers.installing
|
|||||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.maxPosition
|
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.maxPosition
|
||||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.minPosition
|
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.minPosition
|
||||||
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.position
|
import ru.mipt.npm.devices.pimotionmaster.PiMotionMasterDevice.Axis.Companion.position
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.fetch
|
import space.kscience.dataforge.context.fetch
|
||||||
import tornadofx.*
|
import tornadofx.*
|
||||||
|
|
||||||
@ -24,7 +24,7 @@ class PiMotionMasterApp : App(PiMotionMasterView::class)
|
|||||||
|
|
||||||
class PiMotionMasterController : Controller() {
|
class PiMotionMasterController : Controller() {
|
||||||
//initialize context
|
//initialize context
|
||||||
val context = Global.buildContext("piMotionMaster"){
|
val context = Context("piMotionMaster"){
|
||||||
plugin(DeviceManager)
|
plugin(DeviceManager)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user