diff --git a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessage.kt b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessage.kt index 4162593..c1557e0 100644 --- a/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessage.kt +++ b/magix/magix-api/src/commonMain/kotlin/space/kscience/magix/api/MagixMessage.kt @@ -1,7 +1,7 @@ package space.kscience.magix.api import kotlinx.serialization.Serializable -import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.* /* @@ -32,4 +32,15 @@ public data class MagixMessage( val id: String? = null, val parentId: String? = null, val user: JsonElement? = null, -) \ No newline at end of file +) + +/** + * The default accessor for username. If `user` is an object, take it's "name" field. + * If it is primitive, take its content. Return "@error" if it is an array. + */ +public val MagixMessage.userName: String? get() = when(user){ + null, JsonNull -> null + is JsonObject -> user.jsonObject["name"]?.jsonPrimitive?.content + is JsonPrimitive -> user.content + else -> "@error" +} \ No newline at end of file diff --git a/magix/magix-storage/build.gradle.kts b/magix/magix-storage/build.gradle.kts index 43306c4..76bc2df 100644 --- a/magix/magix-storage/build.gradle.kts +++ b/magix/magix-storage/build.gradle.kts @@ -15,6 +15,7 @@ kscience { useSerialization { json() } + useCoroutines() dependencies { api(projects.magix.magixApi) api(spclibs.kotlinx.datetime) diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt index bfd7508..4a3efa3 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt @@ -6,12 +6,11 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonObject -import kotlinx.serialization.json.JsonPrimitive -import kotlinx.serialization.json.jsonPrimitive import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint.Companion.magixJson import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter +import space.kscience.magix.api.userName import space.kscience.magix.storage.* import java.nio.file.Path import kotlin.sequences.Sequence @@ -31,7 +30,7 @@ private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage( }, ) -public class XodusMagixHistory(private val store: PersistentEntityStore) : MagixHistory { +public class XodusMagixHistory(private val store: PersistentEntityStore) : WriteableMagixHistory { public fun writeMessage(storeTransaction: StoreTransaction, message: MagixMessage) { storeTransaction.newEntity(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE).apply { @@ -49,20 +48,13 @@ public class XodusMagixHistory(private val store: PersistentEntityStore) : Magix message.parentId?.let { setProperty(MagixMessage::parentId.name, it) } - message.user?.let { - setProperty( - MagixMessage::user.name, - when (it) { - is JsonObject -> it["name"]?.jsonPrimitive?.content ?: "@error" - is JsonPrimitive -> it.content - else -> "@error" - } - ) + message.userName?.let { + setProperty(MagixMessage::user.name, it) } } } - public fun sendMessage(message: MagixMessage) { + override suspend fun send(message: MagixMessage) { store.executeInTransaction { transaction -> writeMessage(transaction, message) } @@ -139,6 +131,7 @@ public class XodusMagixHistory(private val store: PersistentEntityStore) : Magix } } + /** * Attach a Xodus storage process to the given endpoint. */ @@ -154,7 +147,7 @@ public class XodusMagixStorage( //TODO consider message buffering private val subscriptionJob = endpoint.subscribe(subscriptionFilter).onEach { message -> - history.sendMessage(message) + history.send(message) }.launchIn(scope) private val broadcastJob = endpoint.launchHistory(scope, history, endpointName = endpointName) diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt index 3059481..5501dc5 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt @@ -4,7 +4,9 @@ import kotlinx.datetime.LocalDateTime import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.doubleOrNull import kotlinx.serialization.json.jsonObject +import kotlinx.serialization.json.jsonPrimitive import space.kscience.magix.api.MagixFormat import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter @@ -56,8 +58,17 @@ private fun JsonElement.takeElement(path: String): JsonElement? = if (path.isEmp public fun MagixPayloadFilter.test(element: JsonElement): Boolean = when (this) { is MagixPayloadFilter.Equals -> element.takeElement(path) == value - is MagixPayloadFilter.DateTimeInRange -> TODO() - is MagixPayloadFilter.NumberInRange -> TODO() + is MagixPayloadFilter.DateTimeInRange -> { + element.takeElement(path)?.jsonPrimitive?.content?.let { + LocalDateTime.parse(it) in from..to + } ?: false + } + is MagixPayloadFilter.NumberInRange -> { + element.takeElement(path)?.jsonPrimitive?.doubleOrNull?.let { + it in (from.toDouble()..to.toDouble()) + } ?: false + } + is MagixPayloadFilter.Not -> !argument.test(element) is MagixPayloadFilter.And -> left.test(element) && right.test(element) is MagixPayloadFilter.Or -> left.test(element) || right.test(element) @@ -103,3 +114,7 @@ public interface MagixHistory { } } +public interface WriteableMagixHistory: MagixHistory{ + public suspend fun send(message: MagixMessage) +} + diff --git a/magix/magix-storage/src/commonTest/kotlin/InMemoryMagixHistory.kt b/magix/magix-storage/src/commonTest/kotlin/InMemoryMagixHistory.kt new file mode 100644 index 0000000..432c6d8 --- /dev/null +++ b/magix/magix-storage/src/commonTest/kotlin/InMemoryMagixHistory.kt @@ -0,0 +1,32 @@ +package space.kscience.magix.storage + +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.MagixMessageFilter + +class InMemoryMagixHistory() : WriteableMagixHistory { + + private val cache = mutableListOf() + private val mutex = Mutex() + + override suspend fun send(message: MagixMessage) { + mutex.withLock { + cache.add(message) + } + } + + override suspend fun useMessages( + magixFilter: MagixMessageFilter?, + payloadFilter: MagixPayloadFilter?, + userFilter: MagixUsernameFilter?, + callback: (Sequence) -> Unit, + ) = mutex.withLock { + val sequence = cache.asSequence().filter { message -> + (magixFilter?.accepts(message) ?: true) && + (userFilter?.userName?.equals(message.user) ?: true) && + payloadFilter?.test(message.payload) ?: true + } + callback(sequence) + } +} \ No newline at end of file