diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt index 7a8ead1..24d35be 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt @@ -5,7 +5,9 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.* +import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonPrimitive +import kotlinx.serialization.json.jsonPrimitive import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint.Companion.magixJson import space.kscience.magix.api.MagixMessage @@ -13,6 +15,7 @@ import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.storage.MagixHistory import space.kscience.magix.storage.MagixPayloadFilter import space.kscience.magix.storage.MagixUsernameFilter +import space.kscience.magix.storage.test import java.nio.file.Path import kotlin.sequences.Sequence @@ -33,7 +36,7 @@ public class XodusMagixStorage( setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint) setProperty(MagixMessage::format.name, message.format) - setBlobString(MagixMessage::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload)) + setBlobString(MagixMessage::payload.name, magixJson.encodeToString(message.payload)) message.targetEndpoint?.let { setProperty(MagixMessage::targetEndpoint.name, it) @@ -104,7 +107,7 @@ public class XodusMagixStorage( override suspend fun findMessages( magixFilter: MagixMessageFilter?, - payloadFilters: List, + payloadFilter: MagixPayloadFilter?, userFilter: MagixUsernameFilter?, callback: (Sequence) -> Unit, ): Unit = store.executeInReadonlyTransaction { transaction -> @@ -139,17 +142,24 @@ public class XodusMagixStorage( res } ?: all - val filteredByUser: EntityIterable = userFilter?.let { userFilter-> + val filteredByUser: EntityIterable = userFilter?.let { userFilter -> filteredByMagix.intersect( transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName) ) } ?: filteredByMagix - filteredByUser.se + val sequence = filteredByUser.asSequence().map { it.parseMagixMessage() } + val filteredSequence = if (payloadFilter == null) { + sequence + } else { + sequence.filter { + payloadFilter.test(it.payload) + } + } - block(sequence) + callback(filteredSequence) } override fun close() { diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt index 615fb95..c64ffc1 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt @@ -14,10 +14,6 @@ public sealed class MagixPayloadFilter { @SerialName("eq") public class Equals(public val path: String, public val value: JsonElement) : MagixPayloadFilter() - @SerialName("not") - public class Not(public val argument: MagixPayloadFilter) : MagixPayloadFilter() - - // @SerialName("like") // public class Like(public val path: String, public val value: String) : MagixPayloadFilter() @@ -31,16 +27,20 @@ public sealed class MagixPayloadFilter { public val from: LocalDateTime, public val to: LocalDateTime, ) : MagixPayloadFilter() + + + @SerialName("not") + public class Not(public val argument: MagixPayloadFilter) : MagixPayloadFilter() + + @SerialName("and") + public class And(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter() + + @SerialName("or") + public class Or(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter() } public fun MagixPayloadFilter.test(element: JsonElement): Boolean { TODO() -// when (this) { -// is MagixPayloadFilter.DateTimeInRange -> TODO() -// is MagixPayloadFilter.Equals -> TODO() -// is MagixPayloadFilter.Not -> !(argument.test(element)) -// is MagixPayloadFilter.NumberInRange -> element.jsonObject[path] -// } } public fun Sequence.filter(magixPayloadFilter: MagixPayloadFilter): Sequence = filter { @@ -63,12 +63,12 @@ public interface MagixHistory { * closes all transactions after use. * * @param magixFilter magix header filter. - * @param payloadFilters filters for payload fields. + * @param payloadFilter filter for payload fields. * @param userFilter filters user names ("user.name"). */ public suspend fun findMessages( magixFilter: MagixMessageFilter? = null, - payloadFilters: List = emptyList(), + payloadFilter: MagixPayloadFilter? = null, userFilter: MagixUsernameFilter? = null, callback: (Sequence) -> Unit, ) diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt index a2af500..5438fa4 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt @@ -23,7 +23,7 @@ public sealed class MagixHistoryPayload @SerialName("history.request") public data class HistoryRequestPayload( val magixFilter: MagixMessageFilter? = null, - val payloadFilters: List = emptyList(), + val payloadFilter: MagixPayloadFilter? = null, val userFilter: MagixUsernameFilter? = null, val pageSize: Int? = null ) : MagixHistoryPayload() diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt index 45a7f81..cbd5fd4 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt @@ -57,7 +57,7 @@ public fun MagixEndpoint.launchHistory( if (payload is HistoryRequestPayload) { val realPageSize = payload.pageSize ?: pageSize - history.findMessages(payload.magixFilter, payload.payloadFilters, payload.userFilter) { sequence -> + history.findMessages(payload.magixFilter, payload.payloadFilter, payload.userFilter) { sequence -> // start from -1 because increment always happens first var pageNumber = -1 diff --git a/magix/magix-zmq/build.gradle.kts b/magix/magix-zmq/build.gradle.kts index e7c9f77..3258e17 100644 --- a/magix/magix-zmq/build.gradle.kts +++ b/magix/magix-zmq/build.gradle.kts @@ -10,5 +10,5 @@ description = """ dependencies { api(projects.magix.magixApi) api("org.slf4j:slf4j-api:2.0.6") - implementation("org.zeromq:jeromq:0.5.2") + api("org.zeromq:jeromq:0.5.2") } diff --git a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt index 376208d..2e9ab8b 100644 --- a/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt +++ b/magix/magix-zmq/src/main/kotlin/space/kscince/magix/zmq/ZmqMagixEndpoint.kt @@ -21,7 +21,6 @@ public class ZmqMagixEndpoint( private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, private val coroutineContext: CoroutineContext = Dispatchers.IO, private val zmqContext: ZContext = ZContext() - ) : MagixEndpoint, AutoCloseable { override fun subscribe(filter: MagixMessageFilter): Flow {