diff --git a/build.gradle.kts b/build.gradle.kts index 93a689c..8034b20 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -6,7 +6,7 @@ plugins { id("space.kscience.gradle.project") } -val dataforgeVersion: String by extra("0.6.1") +val dataforgeVersion: String by extra("0.6.2-dev-3") val ktorVersion: String by extra(space.kscience.gradle.KScienceVersions.ktorVersion) val rsocketVersion by extra("0.15.4") val xodusVersion by extra("2.0.1") diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt index 0b4131e..45d7f0b 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt @@ -1,7 +1,11 @@ +@file:OptIn(ExperimentalSerializationApi::class) + package space.kscience.controls.api import kotlinx.datetime.Clock import kotlinx.datetime.Instant +import kotlinx.serialization.EncodeDefault +import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.Json @@ -55,9 +59,9 @@ public data class PropertyChangedMessage( override val sourceDevice: Name = Name.EMPTY, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) } /** @@ -71,9 +75,9 @@ public data class PropertySetMessage( override val sourceDevice: Name? = null, override val targetDevice: Name, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) } /** @@ -87,9 +91,9 @@ public data class PropertyGetMessage( override val sourceDevice: Name? = null, override val targetDevice: Name, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) } /** @@ -101,9 +105,9 @@ public data class GetDescriptionMessage( override val sourceDevice: Name? = null, override val targetDevice: Name, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) } /** @@ -118,9 +122,9 @@ public data class DescriptionMessage( override val sourceDevice: Name, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) } /** @@ -137,9 +141,9 @@ public data class ActionExecuteMessage( override val sourceDevice: Name? = null, override val targetDevice: Name, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) } /** @@ -156,9 +160,9 @@ public data class ActionResultMessage( override val sourceDevice: Name, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) } /** @@ -171,9 +175,9 @@ public data class BinaryNotificationMessage( override val sourceDevice: Name, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) } /** @@ -186,9 +190,9 @@ public data class EmptyDeviceMessage( override val sourceDevice: Name? = null, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) } /** @@ -202,9 +206,9 @@ public data class DeviceLogMessage( override val sourceDevice: Name? = null, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) } /** @@ -219,9 +223,9 @@ public data class DeviceErrorMessage( override val sourceDevice: Name, override val targetDevice: Name? = null, override val comment: String? = null, - override val time: Instant? = Clock.System.now() -) : DeviceMessage(){ - override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) + @EncodeDefault override val time: Instant? = Clock.System.now(), +) : DeviceMessage() { + override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice)) } diff --git a/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt b/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt index 881997b..c0ad995 100644 --- a/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt +++ b/demo/echo/src/main/kotlin/space/kscience/controls/demo/echo/main.kt @@ -22,7 +22,7 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) { scope.launch { subscribe( MagixMessageFilter( - origin = listOf("loop") + source = listOf("loop") ) ).collect { message -> if (message.id?.endsWith(".response") == true) { diff --git a/demo/many-devices/build.gradle.kts b/demo/many-devices/build.gradle.kts index 8c765d6..277d47b 100644 --- a/demo/many-devices/build.gradle.kts +++ b/demo/many-devices/build.gradle.kts @@ -19,7 +19,7 @@ dependencies { implementation(projects.magix.magixZmq) implementation("io.ktor:ktor-client-cio:$ktorVersion") - implementation("space.kscience:plotlykt-server:0.5.3") + implementation("space.kscience:plotlykt-server:0.6.0") implementation(spclibs.logback.classic) } diff --git a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt index 5014a6f..56b66b9 100644 --- a/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt +++ b/demo/many-devices/src/main/kotlin/space/kscience/controls/demo/MassDevice.kt @@ -1,11 +1,10 @@ package space.kscience.controls.demo -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.datetime.Clock import space.kscience.controls.client.connectToMagix import space.kscience.controls.client.magixFormat @@ -20,6 +19,7 @@ import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.int import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.subscribe +import space.kscience.magix.rsocket.rSocketWithTcp import space.kscience.magix.rsocket.rSocketWithWebSockets import space.kscience.magix.server.RSocketMagixFlowPlugin import space.kscience.magix.server.startMagixServer @@ -31,7 +31,6 @@ import space.kscience.plotly.server.PlotlyUpdateMode import space.kscience.plotly.server.serve import space.kscience.plotly.server.show import space.kscince.magix.zmq.ZmqMagixFlowPlugin -import java.util.concurrent.ConcurrentHashMap import kotlin.random.Random import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds @@ -49,14 +48,15 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec(MassDe val value by doubleProperty { randomValue } override suspend fun MassDevice.onOpen() { - doRecurring(50.milliseconds) { + doRecurring(10.milliseconds) { read(value) } } } } -fun main() { +@OptIn(DelicateCoroutinesApi::class) +suspend fun main() { val context = Context("Mass") context.startMagixServer( @@ -67,7 +67,7 @@ fun main() { val numDevices = 100 repeat(numDevices) { - context.launch(Dispatchers.IO) { + context.launch(newFixedThreadPoolContext(2, "Device${it}")) { val deviceContext = Context("Device${it}") { plugin(DeviceManager) } @@ -77,7 +77,7 @@ fun main() { deviceManager.install("device$it", MassDevice) val endpointId = "device$it" - val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") + val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost") deviceManager.connectToMagix(deviceEndpoint, endpointId) } } @@ -94,18 +94,24 @@ fun main() { launch(Dispatchers.IO) { val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") - val latest = ConcurrentHashMap() + val mutex = Mutex() + + val latest = HashMap() monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) -> - latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!! + mutex.withLock { + latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!! + } }.launchIn(this) while (isActive) { delay(200) - val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap() - latest.clear() - x.numbers = sorted.keys - y.numbers = sorted.values.map { it.inWholeMilliseconds / 1000.0 + 0.0001 } + mutex.withLock { + val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap() + latest.clear() + x.numbers = sorted.keys + y.numbers = sorted.values.map { it.inWholeMilliseconds / 1000.0 + 0.0001 } + } } } } diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt index c68e326..d1a02e8 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixFormat.kt @@ -28,7 +28,7 @@ public fun MagixEndpoint.subscribe( originFilter: Collection? = null, targetFilter: Collection? = null, ): Flow> = subscribe( - MagixMessageFilter(format = format.formats, origin = originFilter, target = targetFilter) + MagixMessageFilter(format = format.formats, source = originFilter, target = targetFilter) ).map { val value: T = magixJson.decodeFromJsonElement(format.serializer, it.payload) it to value diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt index 12def5b..4bd4746 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessageFilter.kt @@ -10,13 +10,13 @@ import kotlinx.serialization.Serializable @Serializable public data class MagixMessageFilter( val format: Collection? = null, - val origin: Collection? = null, + val source: Collection? = null, val target: Collection? = null, ) { public fun accepts(message: MagixMessage): Boolean = format?.contains(message.format) ?: true - && origin?.contains(message.sourceEndpoint) ?: true + && source?.contains(message.sourceEndpoint) ?: true && target?.contains(message.targetEndpoint) ?: true public companion object { diff --git a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt index d3f5a2d..6c4e9dd 100644 --- a/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt +++ b/magix/magix-rsocket/src/commonMain/kotlin/space/kscience/magix/rsocket/RSocketMagixEndpoint.kt @@ -36,7 +36,7 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, }.filter(filter).flowOn(rSocket.coroutineContext[CoroutineDispatcher] ?: Dispatchers.Unconfined) } - override suspend fun broadcast(message: MagixMessage): Unit { + override suspend fun broadcast(message: MagixMessage): Unit { val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)) } @@ -51,11 +51,12 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint, } -internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) = - RSocketConnector { - reconnectable(10) - connectionConfig(rSocketConfig) - } +internal fun buildConnector( + rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit, +) = RSocketConnector { + reconnectable(5) + connectionConfig(rSocketConfig) +} /** * Build a websocket based endpoint connected to [host], [port] and given routing [path] diff --git a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt index 9eb9ad6..a00c33f 100644 --- a/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt +++ b/magix/magix-server/src/main/kotlin/space/kscience/magix/server/RSocketMagixFlowPlugin.kt @@ -35,8 +35,11 @@ public class RSocketMagixFlowPlugin( receive: Flow, sendMessage: suspend (MagixMessage) -> Unit, ): Job { - val tcpTransport = - TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration) + val tcpTransport = TcpServerTransport( + hostname = serverHost, + port = serverPort, + configure = transportConfiguration + ) val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration) .bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage)) @@ -60,7 +63,7 @@ public class RSocketMagixFlowPlugin( MagixMessageFilter.serializer(), request.data.readText() ) - request.close() + receive.filter(filter).map { message -> val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message) buildPayload { data(string) } @@ -72,7 +75,7 @@ public class RSocketMagixFlowPlugin( MagixMessage.serializer(), request.data.readText() ) - request.close() + sendMessage(message) } // bidirectional connection, used for streaming connection @@ -81,12 +84,12 @@ public class RSocketMagixFlowPlugin( sendMessage( MagixEndpoint.magixJson.decodeFromString( MagixMessage.serializer(), - inputPayload.use{ it.data.readText()} + inputPayload.use { it.data.readText() } ) ) }.launchIn(this) - val filterText = request.use { it.data.readText()} + val filterText = request.use { it.data.readText() } val filter = if (filterText.isNotBlank()) { MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt index 24d35be..9f28b9e 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt @@ -19,6 +19,129 @@ import space.kscience.magix.storage.test import java.nio.file.Path import kotlin.sequences.Sequence + +private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage( + format = getProperty(MagixMessage::format.name).toString(), + payload = getBlobString(MagixMessage::payload.name)?.let { + magixJson.parseToJsonElement(it) + } ?: JsonObject(emptyMap()), + sourceEndpoint = getProperty(MagixMessage::sourceEndpoint.name).toString(), + targetEndpoint = getProperty(MagixMessage::targetEndpoint.name)?.toString(), + id = getProperty(MagixMessage::id.name)?.toString(), + parentId = getProperty(MagixMessage::parentId.name)?.toString(), + user = getBlobString(MagixMessage::user.name)?.let { + magixJson.parseToJsonElement(it) + }, +) + +public class XodusMagixHistory(private val store: PersistentEntityStore) : MagixHistory { + + public fun writeMessage(storeTransaction: StoreTransaction, message: MagixMessage) { + storeTransaction.newEntity(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE).apply { + setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint) + setProperty(MagixMessage::format.name, message.format) + + setBlobString(MagixMessage::payload.name, magixJson.encodeToString(message.payload)) + + message.targetEndpoint?.let { + setProperty(MagixMessage::targetEndpoint.name, it) + } + message.id?.let { + setProperty(MagixMessage::id.name, it) + } + message.parentId?.let { + setProperty(MagixMessage::parentId.name, it) + } + message.user?.let { + setProperty( + MagixMessage::user.name, + when (it) { + is JsonObject -> it["name"]?.jsonPrimitive?.content ?: "@error" + is JsonPrimitive -> it.content + else -> "@error" + } + ) + } + } + } + + public fun sendMessage(message: MagixMessage) { + store.executeInTransaction { transaction -> + writeMessage(transaction, message) + } + } + + override suspend fun useMessages( + magixFilter: MagixMessageFilter?, + payloadFilter: MagixPayloadFilter?, + userFilter: MagixUsernameFilter?, + callback: (Sequence) -> Unit, + ): Unit = store.executeInReadonlyTransaction { transaction -> + val all = transaction.getAll(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE) + + fun StoreTransaction.findAllIn( + entityType: String, + field: String, + values: Collection?, + ): EntityIterable? { + var union: EntityIterable? = null + values?.forEach { + val filter = transaction.find(entityType, field, it) + union = union?.union(filter) ?: filter + } + return union + } + + // filter by magix filter + val filteredByMagix: EntityIterable = magixFilter?.let { mf -> + var res = all + transaction.findAllIn(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::format.name, mf.format) + ?.let { + res = res.intersect(it) + } + transaction.findAllIn( + XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE, + MagixMessage::sourceEndpoint.name, + mf.source + )?.let { + res = res.intersect(it) + } + transaction.findAllIn( + XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE, + MagixMessage::targetEndpoint.name, + mf.target + )?.let { + res = res.intersect(it) + } + + res + } ?: all + + val filteredByUser: EntityIterable = userFilter?.let { userFilter -> + filteredByMagix.intersect( + transaction.find( + XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE, + MagixMessage::user.name, + userFilter.userName + ) + ) + } ?: filteredByMagix + + + val sequence = filteredByUser.asSequence().map { it.parseMagixMessage() } + + val filteredSequence = if (payloadFilter == null) { + sequence + } else { + sequence.filter { + payloadFilter.test(it.payload) + } + } + + callback(filteredSequence) + } +} + /** * Attach a Xodus storage process to the given endpoint. */ @@ -27,54 +150,15 @@ public class XodusMagixStorage( private val store: PersistentEntityStore, endpoint: MagixEndpoint, filter: MagixMessageFilter = MagixMessageFilter.ALL, -) : MagixHistory, AutoCloseable { +) : AutoCloseable { + + public val history: XodusMagixHistory = XodusMagixHistory(store) //TODO consider message buffering internal val subscriptionJob = endpoint.subscribe(filter).onEach { message -> - store.executeInTransaction { transaction -> - transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply { - setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint) - setProperty(MagixMessage::format.name, message.format) - - setBlobString(MagixMessage::payload.name, magixJson.encodeToString(message.payload)) - - message.targetEndpoint?.let { - setProperty(MagixMessage::targetEndpoint.name, it) - } - message.id?.let { - setProperty(MagixMessage::id.name, it) - } - message.parentId?.let { - setProperty(MagixMessage::parentId.name, it) - } - message.user?.let { - setProperty( - MagixMessage::user.name, - when (it) { - is JsonObject -> it["name"]?.jsonPrimitive?.content ?: "@error" - is JsonPrimitive -> it.content - else -> "@error" - } - ) - } - } - } + history.sendMessage(message) }.launchIn(scope) - private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage( - format = getProperty(MagixMessage::format.name).toString(), - payload = getBlobString(MagixMessage::payload.name)?.let { - magixJson.parseToJsonElement(it) - } ?: JsonObject(emptyMap()), - sourceEndpoint = getProperty(MagixMessage::sourceEndpoint.name).toString(), - targetEndpoint = getProperty(MagixMessage::targetEndpoint.name)?.toString(), - id = getProperty(MagixMessage::id.name)?.toString(), - parentId = getProperty(MagixMessage::parentId.name)?.toString(), - user = getBlobString(MagixMessage::user.name)?.let { - magixJson.parseToJsonElement(it) - }, - ) - /** * Access all messages in a given format @@ -105,63 +189,6 @@ public class XodusMagixStorage( block(sequence) } - override suspend fun findMessages( - magixFilter: MagixMessageFilter?, - payloadFilter: MagixPayloadFilter?, - userFilter: MagixUsernameFilter?, - callback: (Sequence) -> Unit, - ): Unit = store.executeInReadonlyTransaction { transaction -> - val all = transaction.getAll(MAGIC_MESSAGE_ENTITY_TYPE) - - fun StoreTransaction.findAllIn( - entityType: String, - field: String, - values: Collection?, - ): EntityIterable? { - var union: EntityIterable? = null - values?.forEach { - val filter = transaction.find(entityType, field, it) - union = union?.union(filter) ?: filter - } - return union - } - - // filter by magix filter - val filteredByMagix: EntityIterable = magixFilter?.let { mf -> - var res = all - transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::format.name, mf.format)?.let { - res = res.intersect(it) - } - transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::sourceEndpoint.name, mf.origin)?.let { - res = res.intersect(it) - } - transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::targetEndpoint.name, mf.target)?.let { - res = res.intersect(it) - } - - res - } ?: all - - val filteredByUser: EntityIterable = userFilter?.let { userFilter -> - filteredByMagix.intersect( - transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName) - ) - } ?: filteredByMagix - - - val sequence = filteredByUser.asSequence().map { it.parseMagixMessage() } - - val filteredSequence = if (payloadFilter == null) { - sequence - } else { - sequence.filter { - payloadFilter.test(it.payload) - } - } - - callback(filteredSequence) - } - override fun close() { subscriptionJob.cancel() } diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/storageAPITest.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/storageAPITest.kt new file mode 100644 index 0000000..dc1d42b --- /dev/null +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/storageAPITest.kt @@ -0,0 +1,50 @@ +package space.kscience.magix.storage.xodus + +import jetbrains.exodus.entitystore.PersistentEntityStores +import kotlinx.serialization.json.JsonPrimitive +import kotlinx.serialization.json.buildJsonObject +import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.MagixMessageFilter +import java.nio.file.Files +import kotlin.time.measureTime + +public suspend fun main() { + val storeDirectory = Files.createTempDirectory("controls-xodus").toFile() + println(storeDirectory) + val store = PersistentEntityStores.newInstance(storeDirectory) + val history = XodusMagixHistory(store) + + store.executeInTransaction { transaction -> + for (value in 1..100) { + for (source in 1..100) { + for (target in 1..100) { + history.writeMessage( + transaction, + MagixMessage( + "test", + sourceEndpoint = "source$source", + targetEndpoint = "target$target", + payload = buildJsonObject { + put("value", JsonPrimitive(value)) + } + ) + ) + } + } + } + } + + println("written million messages") + + + val time = measureTime { + history.useMessages( + MagixMessageFilter(source = listOf("source12"), target = listOf("target12")) + ) { sequence -> + println(sequence.count()) + } + } + println("Finished query in $time") + + store.close() +} \ No newline at end of file diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt index c64ffc1..e991fa9 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt @@ -66,7 +66,7 @@ public interface MagixHistory { * @param payloadFilter filter for payload fields. * @param userFilter filters user names ("user.name"). */ - public suspend fun findMessages( + public suspend fun useMessages( magixFilter: MagixMessageFilter? = null, payloadFilter: MagixPayloadFilter? = null, userFilter: MagixUsernameFilter? = null, diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt index cbd5fd4..4009f0b 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt @@ -57,7 +57,7 @@ public fun MagixEndpoint.launchHistory( if (payload is HistoryRequestPayload) { val realPageSize = payload.pageSize ?: pageSize - history.findMessages(payload.magixFilter, payload.payloadFilter, payload.userFilter) { sequence -> + history.useMessages(payload.magixFilter, payload.payloadFilter, payload.userFilter) { sequence -> // start from -1 because increment always happens first var pageNumber = -1 diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt index 2e9ab8b..7a8aeb7 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt @@ -13,6 +13,7 @@ import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.filter import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.coroutineContext public class ZmqMagixEndpoint( private val host: String, @@ -69,8 +70,7 @@ public class ZmqMagixEndpoint( } } -public fun MagixEndpoint.Companion.zmq( - scope: CoroutineScope, +public suspend fun MagixEndpoint.Companion.zmq( host: String, protocol: String = "tcp", pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, @@ -80,5 +80,5 @@ public fun MagixEndpoint.Companion.zmq( protocol, pubPort, pullPort, - coroutineContext = scope.coroutineContext + coroutineContext = coroutineContext ) \ No newline at end of file diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt index c4ec16a..7813b8c 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixFlowPlugin.kt @@ -18,6 +18,7 @@ public class ZmqMagixFlowPlugin( public val localHost: String = "tcp://*", public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, + private val zContext: ZContext = ZContext() ) : MagixFlowPlugin { override fun start( @@ -27,7 +28,7 @@ public class ZmqMagixFlowPlugin( ): Job = scope.launch(Dispatchers.IO) { val logger = LoggerFactory.getLogger("magix-server-zmq") - ZContext().use { context -> + zContext.use { context -> //launch the publishing job val pubSocket = context.createSocket(SocketType.PUB) pubSocket.bind("$localHost:$zmqPubSocketPort")