diff --git a/controls-xodus/build.gradle.kts b/controls-storage/controls-xodus/build.gradle.kts similarity index 100% rename from controls-xodus/build.gradle.kts rename to controls-storage/controls-xodus/build.gradle.kts diff --git a/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt b/controls-storage/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt similarity index 100% rename from controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt rename to controls-storage/controls-xodus/src/main/kotlin/ru/mipt/npm/controls/xodus/XodusDeviceMessageStorage.kt diff --git a/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt b/controls-storage/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt similarity index 100% rename from controls-xodus/src/test/kotlin/PropertyHistoryTest.kt rename to controls-storage/controls-xodus/src/test/kotlin/PropertyHistoryTest.kt diff --git a/demo/build.gradle.kts b/demo/all-things/build.gradle.kts similarity index 100% rename from demo/build.gradle.kts rename to demo/all-things/build.gradle.kts diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt b/demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt similarity index 100% rename from demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt rename to demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/DemoControllerView.kt diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt b/demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt similarity index 100% rename from demo/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt rename to demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/DemoDevice.kt diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt b/demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt similarity index 100% rename from demo/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt rename to demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/demoDeviceServer.kt diff --git a/demo/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt b/demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt similarity index 100% rename from demo/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt rename to demo/all-things/src/main/kotlin/ru/mipt/npm/controls/demo/generateMessageSchema.kt diff --git a/demo/car/build.gradle.kts b/demo/car/build.gradle.kts index 5bce76e..63d48ed 100644 --- a/demo/car/build.gradle.kts +++ b/demo/car/build.gradle.kts @@ -1,6 +1,6 @@ plugins { kotlin("jvm") - id("org.openjfx.javafxplugin") + id("org.openjfx.javafxplugin") version "0.0.10" application } @@ -19,9 +19,9 @@ dependencies { implementation(projects.magix.magixServer) implementation(projects.magix.magixRsocket) implementation(projects.controlsMagixClient) - implementation(projects.controlsXodus) - implementation(projects.controlsMongo) - implementation(projects.controlsStorage) + implementation(projects.controlsStorage.controlsXodus) + implementation(projects.magix.magixStorage.magixStorageXodus) +// implementation(projects.controlsMongo) implementation("io.ktor:ktor-client-cio:$ktorVersion") implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.3.1") @@ -31,7 +31,7 @@ dependencies { implementation("org.jetbrains.xodus:xodus-entity-store:1.3.232") implementation("org.jetbrains.xodus:xodus-environment:1.3.232") implementation("org.jetbrains.xodus:xodus-vfs:1.3.232") - implementation("org.litote.kmongo:kmongo-coroutine-serialization:4.4.0") +// implementation("org.litote.kmongo:kmongo-coroutine-serialization:4.4.0") } tasks.withType().configureEach { diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/MagixVirtualCar.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/MagixVirtualCar.kt index bee2464..0aaf7ab 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/MagixVirtualCar.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/MagixVirtualCar.kt @@ -1,10 +1,10 @@ package ru.mipt.npm.controls.demo.car -import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch -import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.api.PropertyChangedMessage +import ru.mipt.npm.controls.client.controlsMagixFormat import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.subscribe import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory @@ -16,31 +16,29 @@ import kotlin.time.ExperimentalTime class MagixVirtualCar(context: Context, meta: Meta) : VirtualCar(context, meta) { - private suspend fun MagixEndpoint.startMagixVirtualCarUpdate() { - launch { - subscribe().collect { magix -> - (magix.payload as? PropertyChangedMessage)?.let { message -> - if (message.sourceDevice == Name.parse("virtual-car")) { - when (message.property) { - "acceleration" -> IVirtualCar.acceleration.write(Vector2D.metaToObject(message.value)) - } + private fun MagixEndpoint.launchMagixVirtualCarUpdate() = launch { + subscribe(controlsMagixFormat).collect { (_, payload) -> + (payload as? PropertyChangedMessage)?.let { message -> + if (message.sourceDevice == Name.parse("virtual-car")) { + when (message.property) { + "acceleration" -> IVirtualCar.acceleration.write(Vector2D.metaToObject(message.value)) } } } } } + @OptIn(ExperimentalTime::class) override suspend fun open() { super.open() val magixEndpoint = MagixEndpoint.rSocketWithWebSockets( meta["magixServerHost"].string ?: "localhost", - DeviceMessage.serializer() ) launch { - magixEndpoint.startMagixVirtualCarUpdate() + magixEndpoint.launchMagixVirtualCarUpdate() } } diff --git a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt index 9b9a61e..3e5d968 100644 --- a/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt +++ b/demo/car/src/main/kotlin/ru/mipt/npm/controls/demo/car/VirtualCarController.kt @@ -8,36 +8,29 @@ import javafx.scene.layout.Priority import javafx.stage.Stage import kotlinx.coroutines.Job import kotlinx.coroutines.launch -import ru.mipt.npm.controls.api.DeviceMessage import ru.mipt.npm.controls.client.connectToMagix -import ru.mipt.npm.controls.controllers.DeviceManager -import ru.mipt.npm.controls.controllers.install import ru.mipt.npm.controls.demo.car.IVirtualCar.Companion.acceleration -import ru.mipt.npm.controls.mongo.DefaultAsynchronousMongoClientFactory -import ru.mipt.npm.controls.storage.store +import ru.mipt.npm.controls.manager.DeviceManager +import ru.mipt.npm.controls.manager.install import ru.mipt.npm.controls.storage.storeMessages -import ru.mipt.npm.controls.xodus.XODUS_STORE_PROPERTY -import ru.mipt.npm.controls.xodus.XodusEventStorage +import ru.mipt.npm.controls.xodus.XodusDeviceMessageStorage import ru.mipt.npm.magix.api.MagixEndpoint import ru.mipt.npm.magix.rsocket.rSocketWithTcp import ru.mipt.npm.magix.server.startMagixServer +import ru.mipt.npm.magix.storage.xodus.storeInXodus import space.kscience.dataforge.context.* import space.kscience.dataforge.meta.Meta import tornadofx.* import java.nio.file.Paths -internal object VirtualCarControllerConfig { - val deviceEntityStorePath = Paths.get(".messages") - val magixEntityStorePath = Paths.get(".server_messages") -} - class VirtualCarController : Controller(), ContextAware { var virtualCar: VirtualCar? = null var magixVirtualCar: MagixVirtualCar? = null var magixServer: ApplicationEngine? = null var xodusStorageJob: Job? = null - var mongoStorageJob: Job? = null + var storageEndpoint: MagixEndpoint? = null + //var mongoStorageJob: Job? = null override val context = Context("demoDevice") { plugin(DeviceManager) @@ -45,7 +38,7 @@ class VirtualCarController : Controller(), ContextAware { private val deviceManager = context.fetch(DeviceManager, Meta { "xodusConfig" put { - "entityStorePath" put VirtualCarControllerConfig.deviceEntityStorePath.toString() + "entityStorePath" put deviceEntityStorePath.toString() } }) @@ -54,19 +47,19 @@ class VirtualCarController : Controller(), ContextAware { virtualCar = deviceManager.install("virtual-car", VirtualCar) //starting magix event loop and connect it to entity store - magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) { flow -> - store(flow, XodusEventStorage, Meta { - XODUS_STORE_PROPERTY put VirtualCarControllerConfig.magixEntityStorePath.toString() - }) - store(flow, DefaultAsynchronousMongoClientFactory) + magixServer = startMagixServer(enableRawRSocket = true, enableZmq = true) + + storageEndpoint = MagixEndpoint.rSocketWithTcp("localhost").apply { + storeInXodus(this@launch, magixEntityStorePath) } + magixVirtualCar = deviceManager.install("magix-virtual-car", MagixVirtualCar) //connect to device entity store - xodusStorageJob = deviceManager.storeMessages(XodusEventStorage) + xodusStorageJob = deviceManager.storeMessages(XodusDeviceMessageStorage) //Create mongo client and connect to MongoDB - mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory) + //mongoStorageJob = deviceManager.storeMessages(DefaultAsynchronousMongoClientFactory) //Launch device client and connect it to the server - val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()) + val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") deviceManager.connectToMagix(deviceEndpoint) } } @@ -81,6 +74,11 @@ class VirtualCarController : Controller(), ContextAware { logger.info { "Virtual car server stopped" } context.close() } + + companion object { + val deviceEntityStorePath = Paths.get(".messages") + val magixEntityStorePath = Paths.get(".server_messages") + } } @@ -113,8 +111,12 @@ class VirtualCarControllerView : View(title = " Virtual car controller remote") action { controller.virtualCar?.run { launch { - acceleration.write(Vector2D(accelerationXProperty.get(), - accelerationYProperty.get())) + acceleration.write( + Vector2D( + accelerationXProperty.get(), + accelerationYProperty.get() + ) + ) } } } diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt index d2aa466..a5df654 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixFormat.kt @@ -34,9 +34,9 @@ public suspend fun MagixEndpoint.broadcast( origin: String = format.defaultFormat, ) { val message = MagixMessage( - origin = origin, - payload = magixJson.encodeToJsonElement(format.serializer, payload), format = format.defaultFormat, + payload = magixJson.encodeToJsonElement(format.serializer, payload), + origin = origin, target = target, id = id, parentId = parentId, diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt index e593434..44e7d6a 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/MagixMessage.kt @@ -25,9 +25,9 @@ import kotlinx.serialization.json.JsonElement */ @Serializable public data class MagixMessage( - val origin: String, + val format: String, val payload: JsonElement, - val format: String = origin, + val origin: String, val target: String? = null, val id: String? = null, val parentId: String? = null, diff --git a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt index dbdea23..6c558ad 100644 --- a/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt +++ b/magix/magix-api/src/commonMain/kotlin/ru/mipt/npm/magix/api/converters.kt @@ -18,9 +18,9 @@ public fun CoroutineScope.launchMagixConverter( ): Job = endpoint.subscribe(filter).onEach { message-> val newPayload = transformer(message.payload) val transformed: MagixMessage = MagixMessage( - newOrigin ?: message.origin, - newPayload, outputFormat, + newPayload, + newOrigin ?: message.origin, message.target, message.id, message.parentId, diff --git a/magix/magix-demo/src/main/kotlin/zmq.kt b/magix/magix-demo/src/main/kotlin/zmq.kt index 868b503..4672275 100644 --- a/magix/magix-demo/src/main/kotlin/zmq.kt +++ b/magix/magix-demo/src/main/kotlin/zmq.kt @@ -23,7 +23,7 @@ suspend fun MagixEndpoint.sendJson( parentId: String? = null, user: JsonElement? = null, builder: JsonObjectBuilder.() -> Unit -): Unit = broadcast(MagixMessage(origin, buildJsonObject(builder), format, target, id, parentId, user)) +): Unit = broadcast(MagixMessage(format, buildJsonObject(builder), origin, target, id, parentId, user)) internal const val numberOfMessages = 100 diff --git a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt index bc84f63..dd4091b 100644 --- a/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt +++ b/magix/magix-server/src/main/kotlin/ru/mipt/npm/magix/server/server.kt @@ -34,7 +34,7 @@ public fun CoroutineScope.launchMagixServerRawRSocket( } /** - * A combined RSocket/TCP server + * A combined RSocket/TCP/ZMQ server * @param applicationConfiguration optional additional configuration for magix loop server */ public fun CoroutineScope.startMagixServer( diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt index 3b98368..e14a1af 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/ru/mipt/npm/magix/storage/xodus/XodusMagixStorage.kt @@ -1,13 +1,18 @@ package ru.mipt.npm.magix.storage.xodus +import jetbrains.exodus.entitystore.Entity import jetbrains.exodus.entitystore.PersistentEntityStore +import jetbrains.exodus.entitystore.PersistentEntityStores import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.JsonObject import ru.mipt.npm.magix.api.MagixEndpoint +import ru.mipt.npm.magix.api.MagixEndpoint.Companion.magixJson import ru.mipt.npm.magix.api.MagixMessage import ru.mipt.npm.magix.api.MagixMessageFilter +import java.nio.file.Path public class XodusMagixStorage( scope: CoroutineScope, @@ -17,7 +22,7 @@ public class XodusMagixStorage( ) : AutoCloseable { //TODO consider message buffering - private val subscriptionJob = endpoint.subscribe(filter).onEach { message -> + internal val subscriptionJob = endpoint.subscribe(filter).onEach { message -> store.executeInTransaction { transaction -> transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply { setProperty(MagixMessage::origin.name, message.origin) @@ -41,6 +46,43 @@ public class XodusMagixStorage( } }.launchIn(scope) + private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage( + format = getProperty(MagixMessage::format.name).toString(), + payload = getBlobString(MagixMessage::payload.name)?.let { + magixJson.parseToJsonElement(it) + } ?: JsonObject(emptyMap()), + origin = getProperty(MagixMessage::origin.name).toString(), + target = getProperty(MagixMessage::target.name)?.toString(), + id = getProperty(MagixMessage::id.name)?.toString(), + parentId = getProperty(MagixMessage::parentId.name)?.toString(), + user = getBlobString(MagixMessage::user.name)?.let { + magixJson.parseToJsonElement(it) + }, + ) + + public fun readByFormat( + format: String, + block: (Sequence) -> Unit, + ): Unit = store.executeInReadonlyTransaction { transaction -> + val sequence = transaction.find( + MAGIC_MESSAGE_ENTITY_TYPE, + MagixMessage::format.name, + format + ).asSequence().map { entity -> + entity.parseMagixMessage() + } + block(sequence) + } + + public fun readAll( + block: (Sequence) -> Unit, + ): Unit = store.executeInReadonlyTransaction { transaction -> + val sequence = transaction.getAll(MAGIC_MESSAGE_ENTITY_TYPE).asSequence().map { entity -> + entity.parseMagixMessage() + } + block(sequence) + } + override fun close() { subscriptionJob.cancel() } @@ -48,4 +90,22 @@ public class XodusMagixStorage( public companion object { public const val MAGIC_MESSAGE_ENTITY_TYPE: String = "magix.message" } +} + +/** + * Start writing all incoming messages with given [filter] to [xodusStore] + */ +public fun MagixEndpoint.storeInXodus( + scope: CoroutineScope, + xodusStore: PersistentEntityStore, + filter: MagixMessageFilter = MagixMessageFilter(), +): XodusMagixStorage = XodusMagixStorage(scope, xodusStore, this, filter) + +public fun MagixEndpoint.storeInXodus( + scope: CoroutineScope, + path: Path, + filter: MagixMessageFilter = MagixMessageFilter(), +): XodusMagixStorage { + val store = PersistentEntityStores.newInstance(path.toFile()) + return XodusMagixStorage(scope, store, this, filter) } \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index e49920d..40e069f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -44,9 +44,9 @@ include( ":controls-serial", ":controls-server", ":controls-opcua", - ":controls-xodus", // ":controls-mongo", ":controls-storage", + ":controls-storage:controls-xodus", ":magix", ":magix:magix-api", ":magix:magix-server", @@ -58,6 +58,6 @@ include( ":magix:magix-storage:magix-storage-xodus", ":controls-magix-client", ":motors", - ":demo", -// ":demo:car", + ":demo:all-things", + ":demo:car", )