Dev #8
@ -1,6 +1,6 @@
|
||||
plugins {
|
||||
kotlin("jvm")
|
||||
id("org.openjfx.javafxplugin")
|
||||
id("org.openjfx.javafxplugin") version "0.0.10"
|
||||
application
|
||||
}
|
||||
|
||||
@ -19,9 +19,9 @@ dependencies {
|
||||
implementation(projects.magix.magixServer)
|
||||
implementation(projects.magix.magixRsocket)
|
||||
implementation(projects.controlsMagixClient)
|
||||
implementation(projects.controlsXodus)
|
||||
implementation(projects.controlsMongo)
|
||||
implementation(projects.controlsStorage)
|
||||
implementation(projects.controlsStorage.controlsXodus)
|
||||
implementation(projects.magix.magixStorage.magixStorageXodus)
|
||||
// implementation(projects.controlsMongo)
|
||||
|
||||
implementation("io.ktor:ktor-client-cio:$ktorVersion")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1")
|
||||
@ -31,7 +31,7 @@ dependencies {
|
||||
implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232")
|
||||
implementation("org.jetbrains.xodus:xodus-environment:1.3.232")
|
||||
implementation("org.jetbrains.xodus:xodus-vfs:1.3.232")
|
||||
implementation("org.litote.kmongo:kmongo-coroutine-serialization:4.4.0")
|
||||
// implementation("org.litote.kmongo:kmongo-coroutine-serialization:4.4.0")
|
||||
}
|
||||
|
||||
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
|
||||
|
@ -1,10 +1,10 @@
|
||||
package ru.mipt.npm.controls.demo.car
|
||||
|
||||
import kotlinx.coroutines.flow.collect
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.api.PropertyChangedMessage
|
||||
import ru.mipt.npm.controls.client.controlsMagixFormat
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.subscribe
|
||||
import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
@ -16,31 +16,29 @@ import kotlin.time.ExperimentalTime
|
||||
|
||||
class MagixVirtualCar(context: Context, meta: Meta) : VirtualCar(context, meta) {
|
||||
|
||||
private suspend fun MagixEndpoint<DeviceMessage>.startMagixVirtualCarUpdate() {
|
||||
launch {
|
||||
subscribe().collect { magix ->
|
||||
(magix.payload as? PropertyChangedMessage)?.let { message ->
|
||||
if (message.sourceDevice == Name.parse("virtual-car")) {
|
||||
when (message.property) {
|
||||
"acceleration" -> IVirtualCar.acceleration.write(Vector2D.metaToObject(message.value))
|
||||
}
|
||||
private fun MagixEndpoint.launchMagixVirtualCarUpdate() = launch {
|
||||
subscribe(controlsMagixFormat).collect { (_, payload) ->
|
||||
(payload as? PropertyChangedMessage)?.let { message ->
|
||||
if (message.sourceDevice == Name.parse("virtual-car")) {
|
||||
when (message.property) {
|
||||
"acceleration" -> IVirtualCar.acceleration.write(Vector2D.metaToObject(message.value))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@OptIn(ExperimentalTime::class)
|
||||
override suspend fun open() {
|
||||
super.open()
|
||||
|
||||
val magixEndpoint = MagixEndpoint.rSocketWithWebSockets(
|
||||
meta["magixServerHost"].string ?: "localhost",
|
||||
DeviceMessage.serializer()
|
||||
)
|
||||
|
||||
launch {
|
||||
magixEndpoint.startMagixVirtualCarUpdate()
|
||||
magixEndpoint.launchMagixVirtualCarUpdate()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,36 +8,29 @@ import javafx.scene.layout.Priority
|
||||
import javafx.stage.Stage
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.launch
|
||||
import ru.mipt.npm.controls.api.DeviceMessage
|
||||
import ru.mipt.npm.controls.client.connectToMagix
|
||||
import ru.mipt.npm.controls.controllers.DeviceManager
|
||||
import ru.mipt.npm.controls.controllers.install
|
||||
import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration
|
||||
import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory
|
||||
import ru.mipt.npm.controls.storage.store
|
||||
import ru.mipt.npm.controls.manager.DeviceManager
|
||||
import ru.mipt.npm.controls.manager.install
|
||||
import ru.mipt.npm.controls.storage.storeMessages
|
||||
import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY
|
||||
import ru.mipt.npm.controls.xodus.XodusEventStorage
|
||||
import ru.mipt.npm.controls.xodus.XodusDeviceMessageStorage
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
|
||||
import ru.mipt.npm.magix.server.startMagixServer
|
||||
import ru.mipt.npm.magix.storage.xodus.storeInXodus
|
||||
import space.kscience.dataforge.context.*
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import tornadofx.*
|
||||
import java.nio.file.Paths
|
||||
|
||||
internal object VirtualCarControllerConfig {
|
||||
val deviceEntityStorePath = Paths.get(".messages")
|
||||
val magixEntityStorePath = Paths.get(".server_messages")
|
||||
}
|
||||
|
||||
class VirtualCarController : Controller(), ContextAware {
|
||||
|
||||
var virtualCar: VirtualCar? = null
|
||||
var magixVirtualCar: MagixVirtualCar? = null
|
||||
var magixServer: ApplicationEngine? = null
|
||||
var xodusStorageJob: Job? = null
|
||||
var mongoStorageJob: Job? = null
|
||||
var storageEndpoint: MagixEndpoint? = null
|
||||
//var mongoStorageJob: Job? = null
|
||||
|
||||
override val context = Context("demoDevice") {
|
||||
plugin(DeviceManager)
|
||||
@ -45,7 +38,7 @@ class VirtualCarController : Controller(), ContextAware {
|
||||
|
||||
private val deviceManager = context.fetch(DeviceManager, Meta {
|
||||
"xodusConfig" put {
|
||||
"entityStorePath" put VirtualCarControllerConfig.deviceEntityStorePath.toString()
|
||||
"entityStorePath" put deviceEntityStorePath.toString()
|
||||
}
|
||||
})
|
||||
|
||||
@ -54,19 +47,19 @@ class VirtualCarController : Controller(), ContextAware {
|
||||
virtualCar = deviceManager.install("virtual-car", VirtualCar)
|
||||
|
||||
//starting magix event loop and connect it to entity store
|
||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow ->
|
||||
store(flow, XodusEventStorage, Meta {
|
||||
XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString()
|
||||
})
|
||||
store(flow, DefaultAsynchronousMongoClientFactory)
|
||||
magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true)
|
||||
|
||||
storageEndpoint = MagixEndpoint.rSocketWithTcp("localhost").apply {
|
||||
storeInXodus(this@launch, magixEntityStorePath)
|
||||
}
|
||||
|
||||
magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar)
|
||||
//connect to device entity store
|
||||
xodusStorageJob = deviceManager.storeMessages(XodusEventStorage)
|
||||
xodusStorageJob = deviceManager.storeMessages(XodusDeviceMessageStorage)
|
||||
//Create mongo client and connect to MongoDB
|
||||
mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory)
|
||||
//mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory)
|
||||
//Launch device client and connect it to the server
|
||||
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer())
|
||||
val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
|
||||
deviceManager.connectToMagix(deviceEndpoint)
|
||||
}
|
||||
}
|
||||
@ -81,6 +74,11 @@ class VirtualCarController : Controller(), ContextAware {
|
||||
logger.info { "Virtual car server stopped" }
|
||||
context.close()
|
||||
}
|
||||
|
||||
companion object {
|
||||
val deviceEntityStorePath = Paths.get(".messages")
|
||||
val magixEntityStorePath = Paths.get(".server_messages")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -113,8 +111,12 @@ class VirtualCarControllerView : View(title = " Virtual car controller remote")
|
||||
action {
|
||||
controller.virtualCar?.run {
|
||||
launch {
|
||||
acceleration.write(Vector2D(accelerationXProperty.get(),
|
||||
accelerationYProperty.get()))
|
||||
acceleration.write(
|
||||
Vector2D(
|
||||
accelerationXProperty.get(),
|
||||
accelerationYProperty.get()
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,9 +34,9 @@ public suspend fun <T> MagixEndpoint.broadcast(
|
||||
origin: String = format.defaultFormat,
|
||||
) {
|
||||
val message = MagixMessage(
|
||||
origin = origin,
|
||||
payload = magixJson.encodeToJsonElement(format.serializer, payload),
|
||||
format = format.defaultFormat,
|
||||
payload = magixJson.encodeToJsonElement(format.serializer, payload),
|
||||
origin = origin,
|
||||
target = target,
|
||||
id = id,
|
||||
parentId = parentId,
|
||||
|
@ -25,9 +25,9 @@ import kotlinx.serialization.json.JsonElement
|
||||
*/
|
||||
@Serializable
|
||||
public data class MagixMessage(
|
||||
val origin: String,
|
||||
val format: String,
|
||||
val payload: JsonElement,
|
||||
val format: String = origin,
|
||||
val origin: String,
|
||||
val target: String? = null,
|
||||
val id: String? = null,
|
||||
val parentId: String? = null,
|
||||
|
@ -18,9 +18,9 @@ public fun <T, R> CoroutineScope.launchMagixConverter(
|
||||
): Job = endpoint.subscribe(filter).onEach { message->
|
||||
val newPayload = transformer(message.payload)
|
||||
val transformed: MagixMessage = MagixMessage(
|
||||
newOrigin ?: message.origin,
|
||||
newPayload,
|
||||
outputFormat,
|
||||
newPayload,
|
||||
newOrigin ?: message.origin,
|
||||
message.target,
|
||||
message.id,
|
||||
message.parentId,
|
||||
|
@ -23,7 +23,7 @@ suspend fun MagixEndpoint.sendJson(
|
||||
parentId: String? = null,
|
||||
user: JsonElement? = null,
|
||||
builder: JsonObjectBuilder.() -> Unit
|
||||
): Unit = broadcast(MagixMessage(origin, buildJsonObject(builder), format, target, id, parentId, user))
|
||||
): Unit = broadcast(MagixMessage(format, buildJsonObject(builder), origin, target, id, parentId, user))
|
||||
|
||||
internal const val numberOfMessages = 100
|
||||
|
||||
|
@ -34,7 +34,7 @@ public fun CoroutineScope.launchMagixServerRawRSocket(
|
||||
}
|
||||
|
||||
/**
|
||||
* A combined RSocket/TCP server
|
||||
* A combined RSocket/TCP/ZMQ server
|
||||
* @param applicationConfiguration optional additional configuration for magix loop server
|
||||
*/
|
||||
public fun CoroutineScope.startMagixServer(
|
||||
|
@ -1,13 +1,18 @@
|
||||
package ru.mipt.npm.magix.storage.xodus
|
||||
|
||||
import jetbrains.exodus.entitystore.Entity
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStore
|
||||
import jetbrains.exodus.entitystore.PersistentEntityStores
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint
|
||||
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson
|
||||
import ru.mipt.npm.magix.api.MagixMessage
|
||||
import ru.mipt.npm.magix.api.MagixMessageFilter
|
||||
import java.nio.file.Path
|
||||
|
||||
public class XodusMagixStorage(
|
||||
scope: CoroutineScope,
|
||||
@ -17,7 +22,7 @@ public class XodusMagixStorage(
|
||||
) : AutoCloseable {
|
||||
|
||||
//TODO consider message buffering
|
||||
private val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
|
||||
internal val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
|
||||
store.executeInTransaction { transaction ->
|
||||
transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply {
|
||||
setProperty(MagixMessage::origin.name, message.origin)
|
||||
@ -41,6 +46,43 @@ public class XodusMagixStorage(
|
||||
}
|
||||
}.launchIn(scope)
|
||||
|
||||
private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage(
|
||||
format = getProperty(MagixMessage::format.name).toString(),
|
||||
payload = getBlobString(MagixMessage::payload.name)?.let {
|
||||
magixJson.parseToJsonElement(it)
|
||||
} ?: JsonObject(emptyMap()),
|
||||
origin = getProperty(MagixMessage::origin.name).toString(),
|
||||
target = getProperty(MagixMessage::target.name)?.toString(),
|
||||
id = getProperty(MagixMessage::id.name)?.toString(),
|
||||
parentId = getProperty(MagixMessage::parentId.name)?.toString(),
|
||||
user = getBlobString(MagixMessage::user.name)?.let {
|
||||
magixJson.parseToJsonElement(it)
|
||||
},
|
||||
)
|
||||
|
||||
public fun readByFormat(
|
||||
format: String,
|
||||
block: (Sequence<MagixMessage>) -> Unit,
|
||||
): Unit = store.executeInReadonlyTransaction { transaction ->
|
||||
val sequence = transaction.find(
|
||||
MAGIC_MESSAGE_ENTITY_TYPE,
|
||||
MagixMessage::format.name,
|
||||
format
|
||||
).asSequence().map { entity ->
|
||||
entity.parseMagixMessage()
|
||||
}
|
||||
block(sequence)
|
||||
}
|
||||
|
||||
public fun readAll(
|
||||
block: (Sequence<MagixMessage>) -> Unit,
|
||||
): Unit = store.executeInReadonlyTransaction { transaction ->
|
||||
val sequence = transaction.getAll(MAGIC_MESSAGE_ENTITY_TYPE).asSequence().map { entity ->
|
||||
entity.parseMagixMessage()
|
||||
}
|
||||
block(sequence)
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
subscriptionJob.cancel()
|
||||
}
|
||||
@ -48,4 +90,22 @@ public class XodusMagixStorage(
|
||||
public companion object {
|
||||
public const val MAGIC_MESSAGE_ENTITY_TYPE: String = "magix.message"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start writing all incoming messages with given [filter] to [xodusStore]
|
||||
*/
|
||||
public fun MagixEndpoint.storeInXodus(
|
||||
scope: CoroutineScope,
|
||||
xodusStore: PersistentEntityStore,
|
||||
filter: MagixMessageFilter = MagixMessageFilter(),
|
||||
): XodusMagixStorage = XodusMagixStorage(scope, xodusStore, this, filter)
|
||||
|
||||
public fun MagixEndpoint.storeInXodus(
|
||||
scope: CoroutineScope,
|
||||
path: Path,
|
||||
filter: MagixMessageFilter = MagixMessageFilter(),
|
||||
): XodusMagixStorage {
|
||||
val store = PersistentEntityStores.newInstance(path.toFile())
|
||||
return XodusMagixStorage(scope, store, this, filter)
|
||||
}
|
@ -44,9 +44,9 @@ include(
|
||||
":controls-serial",
|
||||
":controls-server",
|
||||
":controls-opcua",
|
||||
":controls-xodus",
|
||||
// ":controls-mongo",
|
||||
":controls-storage",
|
||||
":controls-storage:controls-xodus",
|
||||
":magix",
|
||||
":magix:magix-api",
|
||||
":magix:magix-server",
|
||||
@ -58,6 +58,6 @@ include(
|
||||
":magix:magix-storage:magix-storage-xodus",
|
||||
":controls-magix-client",
|
||||
":motors",
|
||||
":demo",
|
||||
// ":demo:car",
|
||||
":demo:all-things",
|
||||
":demo:car",
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user