[WIP] Storage API

This commit is contained in:
Alexander Nozik 2023-07-29 09:46:13 +03:00
parent fcabd9aed4
commit 870cb7ef40
7 changed files with 314 additions and 11 deletions

View File

@ -14,7 +14,7 @@ import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.MagixMessageFilter
import java.util.* import java.util.*
`/** /**
* MQTT5 endpoint for magix. * MQTT5 endpoint for magix.
* *
* @param broadcastTopicBuilder defines how the topic is constructed from broadcast message structure. * @param broadcastTopicBuilder defines how the topic is constructed from broadcast message structure.

View File

@ -0,0 +1,22 @@
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
description = """
Magix history database API
""".trimIndent()
kscience {
jvm()
js()
native()
useSerialization {
json()
}
dependencies {
api(projects.magix.magixApi)
api(spclibs.kotlinx.datetime)
}
}

View File

@ -5,17 +5,18 @@ plugins {
val xodusVersion: String by rootProject.extra val xodusVersion: String by rootProject.extra
kscience{ kscience {
useCoroutines() useCoroutines()
} }
dependencies { dependencies {
api(projects.magix.magixApi) api(projects.magix.magixStorage)
implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion") implementation("org.jetbrains.xodus:xodus-entity-store:$xodusVersion")
// implementation("org.jetbrains.xodus:dnq:2.0.0")
testImplementation(spclibs.kotlinx.coroutines.test) testImplementation(spclibs.kotlinx.coroutines.test)
} }
readme{ readme {
maturity = space.kscience.gradle.Maturity.PROTOTYPE maturity = space.kscience.gradle.Maturity.PROTOTYPE
} }

View File

@ -1,25 +1,30 @@
package space.kscience.magix.storage.xodus package space.kscience.magix.storage.xodus
import jetbrains.exodus.entitystore.Entity import jetbrains.exodus.entitystore.*
import jetbrains.exodus.entitystore.PersistentEntityStore
import jetbrains.exodus.entitystore.PersistentEntityStores
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.*
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.storage.MagixHistory
import space.kscience.magix.storage.MagixPayloadFilter
import space.kscience.magix.storage.MagixUsernameFilter
import java.nio.file.Path import java.nio.file.Path
import kotlin.sequences.Sequence
/**
* Attach a Xodus storage process to the given endpoint.
*/
public class XodusMagixStorage( public class XodusMagixStorage(
scope: CoroutineScope, scope: CoroutineScope,
private val store: PersistentEntityStore, private val store: PersistentEntityStore,
endpoint: MagixEndpoint, endpoint: MagixEndpoint,
filter: MagixMessageFilter = MagixMessageFilter(), filter: MagixMessageFilter = MagixMessageFilter.ALL,
) : AutoCloseable { ) : MagixHistory, AutoCloseable {
//TODO consider message buffering //TODO consider message buffering
internal val subscriptionJob = endpoint.subscribe(filter).onEach { message -> internal val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
@ -40,7 +45,14 @@ public class XodusMagixStorage(
setProperty(MagixMessage::parentId.name, it) setProperty(MagixMessage::parentId.name, it)
} }
message.user?.let { message.user?.let {
setBlobString(MagixMessage::user.name, MagixEndpoint.magixJson.encodeToString(it)) setProperty(
MagixMessage::user.name,
when (it) {
is JsonObject -> it["name"]?.jsonPrimitive?.content ?: "@error"
is JsonPrimitive -> it.content
else -> "@error"
}
)
} }
} }
} }
@ -60,6 +72,10 @@ public class XodusMagixStorage(
}, },
) )
/**
* Access all messages in a given format
*/
public fun readByFormat( public fun readByFormat(
format: String, format: String,
block: (Sequence<MagixMessage>) -> Unit, block: (Sequence<MagixMessage>) -> Unit,
@ -74,6 +90,9 @@ public class XodusMagixStorage(
block(sequence) block(sequence)
} }
/**
* Access all messages as
*/
public fun readAll( public fun readAll(
block: (Sequence<MagixMessage>) -> Unit, block: (Sequence<MagixMessage>) -> Unit,
): Unit = store.executeInReadonlyTransaction { transaction -> ): Unit = store.executeInReadonlyTransaction { transaction ->
@ -83,6 +102,56 @@ public class XodusMagixStorage(
block(sequence) block(sequence)
} }
override suspend fun findMessages(
magixFilter: MagixMessageFilter?,
payloadFilters: List<MagixPayloadFilter>,
userFilter: MagixUsernameFilter?,
callback: (Sequence<MagixMessage>) -> Unit,
): Unit = store.executeInReadonlyTransaction { transaction ->
val all = transaction.getAll(MAGIC_MESSAGE_ENTITY_TYPE)
fun StoreTransaction.findAllIn(
entityType: String,
field: String,
values: Collection<String>?,
): EntityIterable? {
var union: EntityIterable? = null
values?.forEach {
val filter = transaction.find(entityType, field, it)
union = union?.union(filter) ?: filter
}
return union
}
// filter by magix filter
val filteredByMagix: EntityIterable = magixFilter?.let { mf ->
var res = all
transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::format.name, mf.format)?.let {
res = res.intersect(it)
}
transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::origin.name, mf.origin)?.let {
res = res.intersect(it)
}
transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::target.name, mf.target)?.let {
res = res.intersect(it)
}
res
} ?: all
val filteredByUser: EntityIterable = userFilter?.let { userFilter->
filteredByMagix.intersect(
transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName)
)
} ?: filteredByMagix
filteredByUser.se
block(sequence)
}
override fun close() { override fun close() {
subscriptionJob.cancel() subscriptionJob.cancel()
} }

View File

@ -0,0 +1,85 @@
package space.kscience.magix.storage
import kotlinx.datetime.LocalDateTime
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
import space.kscience.magix.api.MagixFormat
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import kotlin.jvm.JvmInline
@Serializable
public sealed class MagixPayloadFilter {
@SerialName("eq")
public class Equals(public val path: String, public val value: JsonElement) : MagixPayloadFilter()
@SerialName("not")
public class Not(public val argument: MagixPayloadFilter) : MagixPayloadFilter()
// @SerialName("like")
// public class Like(public val path: String, public val value: String) : MagixPayloadFilter()
@SerialName("numberInRange")
public class NumberInRange(public val path: String, public val from: Number, public val to: Number) :
MagixPayloadFilter()
@SerialName("dateTimeInRange")
public class DateTimeInRange(
public val path: String,
public val from: LocalDateTime,
public val to: LocalDateTime,
) : MagixPayloadFilter()
}
public fun MagixPayloadFilter.test(element: JsonElement): Boolean {
TODO()
// when (this) {
// is MagixPayloadFilter.DateTimeInRange -> TODO()
// is MagixPayloadFilter.Equals -> TODO()
// is MagixPayloadFilter.Not -> !(argument.test(element))
// is MagixPayloadFilter.NumberInRange -> element.jsonObject[path]
// }
}
public fun Sequence<JsonElement>.filter(magixPayloadFilter: MagixPayloadFilter): Sequence<JsonElement> = filter {
magixPayloadFilter.test(it)
}
@Serializable
@JvmInline
public value class MagixUsernameFilter(public val userName: String)
/**
* An interface for history access to magix messages
*/
public interface MagixHistory {
/**
* Find messages using intersection of given filters. If filters are not defined, get all messages.
*
* The result is supplied as a callback with [Sequence] of messages. If backing storage uses transactions, the function
* closes all transactions after use.
*
* @param magixFilter magix header filter.
* @param payloadFilters filters for payload fields.
* @param userFilter filters user names ("user.name").
*/
public suspend fun findMessages(
magixFilter: MagixMessageFilter? = null,
payloadFilters: List<MagixPayloadFilter> = emptyList(),
userFilter: MagixUsernameFilter? = null,
callback: (Sequence<MagixMessage>) -> Unit,
)
public companion object {
public const val HISTORY_PAYLOAD_FORMAT: String = "magix.history"
public val magixFormat: MagixFormat<MagixHistoryPayload> = MagixFormat(
MagixHistoryPayload.serializer(),
setOf(HISTORY_PAYLOAD_FORMAT)
)
}
}

View File

@ -0,0 +1,44 @@
package space.kscience.magix.storage
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
/**
* Base class for history API request and response messages
*/
@Serializable
public sealed class MagixHistoryPayload
/**
* Message to request history information from the storage
*
* @param magixFilter filter for magix headers
* @param payloadFilters filter for payload fields
* @param userFilter filter for user name
* @param pageSize if defined, defines the maximum number of messages per response message. If not defined, uses history provider default.
*/
@Serializable
@SerialName("history.request")
public data class HistoryRequestPayload(
val magixFilter: MagixMessageFilter? = null,
val payloadFilters: List<MagixPayloadFilter> = emptyList(),
val userFilter: MagixUsernameFilter? = null,
val pageSize: Int? = null
) : MagixHistoryPayload()
/**
* A response to a [HistoryRequestPayload]. Contains a list of messages.
*
* @param messages the list of messages.
* @param page the index of current page for multiple page messages. Page indexing starts with 0.
* @param lastPage true if this page is the last.
*/
@Serializable
@SerialName("history.response")
public data class HistoryResponsePayload(
val messages: List<MagixMessage>,
val page: Int = 0,
val lastPage: Boolean = true
) : MagixHistoryPayload()

View File

@ -0,0 +1,82 @@
package space.kscience.magix.storage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonObject
import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.broadcast
import space.kscience.magix.api.subscribe
internal fun generateId(request: MagixMessage): String = if (request.id != null) {
"${request.id}.response"
} else {
"history[${request.payload.hashCode().toString(16)}"
}
/**
* Launch responding history messages on this [MagixEndpoint]. The process does not store messages, only responds to history requests.
*
* @param scope the [CoroutineScope] in which the responding process runs.
* @param history the history database.
* @param targetFilter filters the request messages by target if defined.
* @param pageSize maximum messages per page in the response. The default is 100.
* @param user user block for outgoing messages if defined.
* @param origin tag for outgoing messages if defined.
*/
public fun MagixEndpoint.launchHistory(
scope: CoroutineScope,
history: MagixHistory,
targetFilter: Collection<String>? = null,
pageSize: Int = 100,
user: JsonObject? = null,
origin: String = MagixHistory.HISTORY_PAYLOAD_FORMAT,
): Job = subscribe(MagixHistory.magixFormat, targetFilter = targetFilter).onEach { (request, payload) ->
fun send(chunk: List<MagixMessage>, pageNumber: Int, end: Boolean) {
scope.launch {
val sendPayload = HistoryResponsePayload(
chunk,
pageNumber
)
broadcast(
format = MagixHistory.magixFormat,
payload = sendPayload,
target = request.origin,
id = generateId(request),
parentId = request.id,
user = user,
origin = origin,
)
}
}
if (payload is HistoryRequestPayload) {
val realPageSize = payload.pageSize ?: pageSize
history.findMessages(payload.magixFilter, payload.payloadFilters, payload.userFilter) { sequence ->
// start from -1 because increment always happens first
var pageNumber = -1
//remember the last chunk to determine which is last
var chunk: List<MagixMessage>? = null
sequence.chunked(realPageSize).forEach {
//If the last chunk was not final, send it
chunk?.let { chunk ->
send(chunk, pageNumber, false)
}
pageNumber++
// update last chunk
chunk = it
}
// send the final chunk
chunk?.let {
send(it, pageNumber, true)
}
}
}
}.launchIn(scope)