diff --git a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt index b5f7c0a..296ddb0 100644 --- a/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt +++ b/magix/magix-mqtt/src/main/kotlin/space/ksceince/magix/mqtt/MqttMagixEndpoint.kt @@ -14,7 +14,7 @@ import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter import java.util.* -`/** +/** * MQTT5 endpoint for magix. * * @param broadcastTopicBuilder defines how the topic is constructed from broadcast message structure. diff --git a/magix/magix-storage/build.gradle.kts b/magix/magix-storage/build.gradle.kts new file mode 100644 index 0000000..43306c4 --- /dev/null +++ b/magix/magix-storage/build.gradle.kts @@ -0,0 +1,22 @@ +plugins { + id("space.kscience.gradle.mpp") + `maven-publish` +} + +description = """ + Magix history database API +""".trimIndent() + + +kscience { + jvm() + js() + native() + useSerialization { + json() + } + dependencies { + api(projects.magix.magixApi) + api(spclibs.kotlinx.datetime) + } +} diff --git a/magix/magix-storage/magix-storage-xodus/build.gradle.kts b/magix/magix-storage/magix-storage-xodus/build.gradle.kts index 8869867..ed1dc32 100644 --- a/magix/magix-storage/magix-storage-xodus/build.gradle.kts +++ b/magix/magix-storage/magix-storage-xodus/build.gradle.kts @@ -5,17 +5,18 @@ plugins { val xodusVersion: String by rootProject.extra -kscience{ +kscience { useCoroutines() } dependencies { - api(projects.magix.magixApi) + api(projects.magix.magixStorage) implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion") +// implementation("org.jetbrains.xodus:dnq:2.0.0") testImplementation(spclibs.kotlinx.coroutines.test) } -readme{ +readme { maturity = space.kscience.gradle.Maturity.PROTOTYPE } diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt index 39eb4ea..1c9e636 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt @@ -1,25 +1,30 @@ package space.kscience.magix.storage.xodus -import jetbrains.exodus.entitystore.Entity -import jetbrains.exodus.entitystore.PersistentEntityStore -import jetbrains.exodus.entitystore.PersistentEntityStores +import jetbrains.exodus.entitystore.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.serialization.encodeToString -import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.* import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint.Companion.magixJson import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter +import space.kscience.magix.storage.MagixHistory +import space.kscience.magix.storage.MagixPayloadFilter +import space.kscience.magix.storage.MagixUsernameFilter import java.nio.file.Path +import kotlin.sequences.Sequence +/** + * Attach a Xodus storage process to the given endpoint. + */ public class XodusMagixStorage( scope: CoroutineScope, private val store: PersistentEntityStore, endpoint: MagixEndpoint, - filter: MagixMessageFilter = MagixMessageFilter(), -) : AutoCloseable { + filter: MagixMessageFilter = MagixMessageFilter.ALL, +) : MagixHistory, AutoCloseable { //TODO consider message buffering internal val subscriptionJob = endpoint.subscribe(filter).onEach { message -> @@ -40,7 +45,14 @@ public class XodusMagixStorage( setProperty(MagixMessage::parentId.name, it) } message.user?.let { - setBlobString(MagixMessage::user.name, MagixEndpoint.magixJson.encodeToString(it)) + setProperty( + MagixMessage::user.name, + when (it) { + is JsonObject -> it["name"]?.jsonPrimitive?.content ?: "@error" + is JsonPrimitive -> it.content + else -> "@error" + } + ) } } } @@ -60,6 +72,10 @@ public class XodusMagixStorage( }, ) + + /** + * Access all messages in a given format + */ public fun readByFormat( format: String, block: (Sequence) -> Unit, @@ -74,6 +90,9 @@ public class XodusMagixStorage( block(sequence) } + /** + * Access all messages as + */ public fun readAll( block: (Sequence) -> Unit, ): Unit = store.executeInReadonlyTransaction { transaction -> @@ -83,6 +102,56 @@ public class XodusMagixStorage( block(sequence) } + override suspend fun findMessages( + magixFilter: MagixMessageFilter?, + payloadFilters: List, + userFilter: MagixUsernameFilter?, + callback: (Sequence) -> Unit, + ): Unit = store.executeInReadonlyTransaction { transaction -> + val all = transaction.getAll(MAGIC_MESSAGE_ENTITY_TYPE) + + fun StoreTransaction.findAllIn( + entityType: String, + field: String, + values: Collection?, + ): EntityIterable? { + var union: EntityIterable? = null + values?.forEach { + val filter = transaction.find(entityType, field, it) + union = union?.union(filter) ?: filter + } + return union + } + + // filter by magix filter + val filteredByMagix: EntityIterable = magixFilter?.let { mf -> + var res = all + transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::format.name, mf.format)?.let { + res = res.intersect(it) + } + transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::origin.name, mf.origin)?.let { + res = res.intersect(it) + } + transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::target.name, mf.target)?.let { + res = res.intersect(it) + } + + res + } ?: all + + val filteredByUser: EntityIterable = userFilter?.let { userFilter-> + filteredByMagix.intersect( + transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName) + ) + } ?: filteredByMagix + + + filteredByUser.se + + + block(sequence) + } + override fun close() { subscriptionJob.cancel() } diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt new file mode 100644 index 0000000..615fb95 --- /dev/null +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt @@ -0,0 +1,85 @@ +package space.kscience.magix.storage + +import kotlinx.datetime.LocalDateTime +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.JsonElement +import space.kscience.magix.api.MagixFormat +import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.MagixMessageFilter +import kotlin.jvm.JvmInline + +@Serializable +public sealed class MagixPayloadFilter { + @SerialName("eq") + public class Equals(public val path: String, public val value: JsonElement) : MagixPayloadFilter() + + @SerialName("not") + public class Not(public val argument: MagixPayloadFilter) : MagixPayloadFilter() + + +// @SerialName("like") +// public class Like(public val path: String, public val value: String) : MagixPayloadFilter() + + @SerialName("numberInRange") + public class NumberInRange(public val path: String, public val from: Number, public val to: Number) : + MagixPayloadFilter() + + @SerialName("dateTimeInRange") + public class DateTimeInRange( + public val path: String, + public val from: LocalDateTime, + public val to: LocalDateTime, + ) : MagixPayloadFilter() +} + +public fun MagixPayloadFilter.test(element: JsonElement): Boolean { + TODO() +// when (this) { +// is MagixPayloadFilter.DateTimeInRange -> TODO() +// is MagixPayloadFilter.Equals -> TODO() +// is MagixPayloadFilter.Not -> !(argument.test(element)) +// is MagixPayloadFilter.NumberInRange -> element.jsonObject[path] +// } +} + +public fun Sequence.filter(magixPayloadFilter: MagixPayloadFilter): Sequence = filter { + magixPayloadFilter.test(it) +} + + +@Serializable +@JvmInline +public value class MagixUsernameFilter(public val userName: String) + +/** + * An interface for history access to magix messages + */ +public interface MagixHistory { + /** + * Find messages using intersection of given filters. If filters are not defined, get all messages. + * + * The result is supplied as a callback with [Sequence] of messages. If backing storage uses transactions, the function + * closes all transactions after use. + * + * @param magixFilter magix header filter. + * @param payloadFilters filters for payload fields. + * @param userFilter filters user names ("user.name"). + */ + public suspend fun findMessages( + magixFilter: MagixMessageFilter? = null, + payloadFilters: List = emptyList(), + userFilter: MagixUsernameFilter? = null, + callback: (Sequence) -> Unit, + ) + + public companion object { + public const val HISTORY_PAYLOAD_FORMAT: String = "magix.history" + + public val magixFormat: MagixFormat = MagixFormat( + MagixHistoryPayload.serializer(), + setOf(HISTORY_PAYLOAD_FORMAT) + ) + } +} + diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt new file mode 100644 index 0000000..a2af500 --- /dev/null +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistoryPayload.kt @@ -0,0 +1,44 @@ +package space.kscience.magix.storage + +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.MagixMessageFilter + +/** + * Base class for history API request and response messages + */ +@Serializable +public sealed class MagixHistoryPayload + +/** + * Message to request history information from the storage + * + * @param magixFilter filter for magix headers + * @param payloadFilters filter for payload fields + * @param userFilter filter for user name + * @param pageSize if defined, defines the maximum number of messages per response message. If not defined, uses history provider default. + */ +@Serializable +@SerialName("history.request") +public data class HistoryRequestPayload( + val magixFilter: MagixMessageFilter? = null, + val payloadFilters: List = emptyList(), + val userFilter: MagixUsernameFilter? = null, + val pageSize: Int? = null +) : MagixHistoryPayload() + +/** + * A response to a [HistoryRequestPayload]. Contains a list of messages. + * + * @param messages the list of messages. + * @param page the index of current page for multiple page messages. Page indexing starts with 0. + * @param lastPage true if this page is the last. + */ +@Serializable +@SerialName("history.response") +public data class HistoryResponsePayload( + val messages: List, + val page: Int = 0, + val lastPage: Boolean = true +) : MagixHistoryPayload() \ No newline at end of file diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt new file mode 100644 index 0000000..cfee042 --- /dev/null +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt @@ -0,0 +1,82 @@ +package space.kscience.magix.storage + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.serialization.json.JsonObject +import space.kscience.magix.api.MagixEndpoint +import space.kscience.magix.api.MagixMessage +import space.kscience.magix.api.broadcast +import space.kscience.magix.api.subscribe + +internal fun generateId(request: MagixMessage): String = if (request.id != null) { + "${request.id}.response" +} else { + "history[${request.payload.hashCode().toString(16)}" +} + +/** + * Launch responding history messages on this [MagixEndpoint]. The process does not store messages, only responds to history requests. + * + * @param scope the [CoroutineScope] in which the responding process runs. + * @param history the history database. + * @param targetFilter filters the request messages by target if defined. + * @param pageSize maximum messages per page in the response. The default is 100. + * @param user user block for outgoing messages if defined. + * @param origin tag for outgoing messages if defined. + */ +public fun MagixEndpoint.launchHistory( + scope: CoroutineScope, + history: MagixHistory, + targetFilter: Collection? = null, + pageSize: Int = 100, + user: JsonObject? = null, + origin: String = MagixHistory.HISTORY_PAYLOAD_FORMAT, +): Job = subscribe(MagixHistory.magixFormat, targetFilter = targetFilter).onEach { (request, payload) -> + + fun send(chunk: List, pageNumber: Int, end: Boolean) { + scope.launch { + val sendPayload = HistoryResponsePayload( + chunk, + pageNumber + ) + broadcast( + format = MagixHistory.magixFormat, + payload = sendPayload, + target = request.origin, + id = generateId(request), + parentId = request.id, + user = user, + origin = origin, + ) + } + } + + + if (payload is HistoryRequestPayload) { + val realPageSize = payload.pageSize ?: pageSize + history.findMessages(payload.magixFilter, payload.payloadFilters, payload.userFilter) { sequence -> + // start from -1 because increment always happens first + var pageNumber = -1 + + //remember the last chunk to determine which is last + var chunk: List? = null + + sequence.chunked(realPageSize).forEach { + //If the last chunk was not final, send it + chunk?.let { chunk -> + send(chunk, pageNumber, false) + } + pageNumber++ + // update last chunk + chunk = it + } + // send the final chunk + chunk?.let { + send(it, pageNumber, true) + } + } + } +}.launchIn(scope) \ No newline at end of file