add filter implementation for Json
This commit is contained in:
parent
d3d8413837
commit
dc2d0094fc
@ -16,7 +16,7 @@ import space.kscience.magix.api.*
|
|||||||
|
|
||||||
internal val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
|
internal val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
|
||||||
DeviceMessage.serializer(),
|
DeviceMessage.serializer(),
|
||||||
setOf("controls-kt", "dataforge")
|
setOf("controls-kt")
|
||||||
)
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -12,10 +12,7 @@ import space.kscience.magix.api.MagixEndpoint
|
|||||||
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
|
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
|
||||||
import space.kscience.magix.api.MagixMessage
|
import space.kscience.magix.api.MagixMessage
|
||||||
import space.kscience.magix.api.MagixMessageFilter
|
import space.kscience.magix.api.MagixMessageFilter
|
||||||
import space.kscience.magix.storage.MagixHistory
|
import space.kscience.magix.storage.*
|
||||||
import space.kscience.magix.storage.MagixPayloadFilter
|
|
||||||
import space.kscience.magix.storage.MagixUsernameFilter
|
|
||||||
import space.kscience.magix.storage.test
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import kotlin.sequences.Sequence
|
import kotlin.sequences.Sequence
|
||||||
|
|
||||||
@ -149,16 +146,19 @@ public class XodusMagixStorage(
|
|||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
private val store: PersistentEntityStore,
|
private val store: PersistentEntityStore,
|
||||||
endpoint: MagixEndpoint,
|
endpoint: MagixEndpoint,
|
||||||
filter: MagixMessageFilter = MagixMessageFilter.ALL,
|
endpointName: String? = null,
|
||||||
|
subscriptionFilter: MagixMessageFilter = MagixMessageFilter.ALL,
|
||||||
) : AutoCloseable {
|
) : AutoCloseable {
|
||||||
|
|
||||||
public val history: XodusMagixHistory = XodusMagixHistory(store)
|
public val history: XodusMagixHistory = XodusMagixHistory(store)
|
||||||
|
|
||||||
//TODO consider message buffering
|
//TODO consider message buffering
|
||||||
internal val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
|
private val subscriptionJob = endpoint.subscribe(subscriptionFilter).onEach { message ->
|
||||||
history.sendMessage(message)
|
history.sendMessage(message)
|
||||||
}.launchIn(scope)
|
}.launchIn(scope)
|
||||||
|
|
||||||
|
private val broadcastJob = endpoint.launchHistory(scope, history, endpointName = endpointName)
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Access all messages in a given format
|
* Access all messages in a given format
|
||||||
@ -204,14 +204,16 @@ public class XodusMagixStorage(
|
|||||||
public fun MagixEndpoint.storeInXodus(
|
public fun MagixEndpoint.storeInXodus(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
xodusStore: PersistentEntityStore,
|
xodusStore: PersistentEntityStore,
|
||||||
|
endpointName: String? = null,
|
||||||
filter: MagixMessageFilter = MagixMessageFilter(),
|
filter: MagixMessageFilter = MagixMessageFilter(),
|
||||||
): XodusMagixStorage = XodusMagixStorage(scope, xodusStore, this, filter)
|
): XodusMagixStorage = XodusMagixStorage(scope, xodusStore, this, endpointName, filter)
|
||||||
|
|
||||||
public fun MagixEndpoint.storeInXodus(
|
public fun MagixEndpoint.storeInXodus(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
path: Path,
|
path: Path,
|
||||||
|
endpointName: String? = null,
|
||||||
filter: MagixMessageFilter = MagixMessageFilter(),
|
filter: MagixMessageFilter = MagixMessageFilter(),
|
||||||
): XodusMagixStorage {
|
): XodusMagixStorage {
|
||||||
val store = PersistentEntityStores.newInstance(path.toFile())
|
val store = PersistentEntityStores.newInstance(path.toFile())
|
||||||
return XodusMagixStorage(scope, store, this, filter)
|
return XodusMagixStorage(scope, store, this, endpointName, filter)
|
||||||
}
|
}
|
@ -4,6 +4,7 @@ import kotlinx.datetime.LocalDateTime
|
|||||||
import kotlinx.serialization.SerialName
|
import kotlinx.serialization.SerialName
|
||||||
import kotlinx.serialization.Serializable
|
import kotlinx.serialization.Serializable
|
||||||
import kotlinx.serialization.json.JsonElement
|
import kotlinx.serialization.json.JsonElement
|
||||||
|
import kotlinx.serialization.json.jsonObject
|
||||||
import space.kscience.magix.api.MagixFormat
|
import space.kscience.magix.api.MagixFormat
|
||||||
import space.kscience.magix.api.MagixMessage
|
import space.kscience.magix.api.MagixMessage
|
||||||
import space.kscience.magix.api.MagixMessageFilter
|
import space.kscience.magix.api.MagixMessageFilter
|
||||||
@ -39,8 +40,27 @@ public sealed class MagixPayloadFilter {
|
|||||||
public class Or(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter()
|
public class Or(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter()
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun MagixPayloadFilter.test(element: JsonElement): Boolean {
|
private fun JsonElement.takeElement(path: String): JsonElement? = if (path.isEmpty()) {
|
||||||
TODO()
|
this
|
||||||
|
} else {
|
||||||
|
val separatorIndex = path.indexOf(".")
|
||||||
|
if (separatorIndex == -1) {
|
||||||
|
jsonObject[path]
|
||||||
|
} else {
|
||||||
|
val firstSegment = path.substring(0, separatorIndex)
|
||||||
|
val remaining = path.substring(separatorIndex + 1, path.length)
|
||||||
|
jsonObject[firstSegment]?.takeElement(remaining)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun MagixPayloadFilter.test(element: JsonElement): Boolean = when (this) {
|
||||||
|
is MagixPayloadFilter.Equals -> element.takeElement(path) == value
|
||||||
|
is MagixPayloadFilter.DateTimeInRange -> TODO()
|
||||||
|
is MagixPayloadFilter.NumberInRange -> TODO()
|
||||||
|
is MagixPayloadFilter.Not -> !argument.test(element)
|
||||||
|
is MagixPayloadFilter.And -> left.test(element) && right.test(element)
|
||||||
|
is MagixPayloadFilter.Or -> left.test(element) || right.test(element)
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun Sequence<JsonElement>.filter(magixPayloadFilter: MagixPayloadFilter): Sequence<JsonElement> = filter {
|
public fun Sequence<JsonElement>.filter(magixPayloadFilter: MagixPayloadFilter): Sequence<JsonElement> = filter {
|
||||||
|
@ -5,7 +5,8 @@ import kotlinx.coroutines.Job
|
|||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.serialization.json.JsonObject
|
import kotlinx.serialization.json.JsonElement
|
||||||
|
import kotlinx.serialization.json.JsonPrimitive
|
||||||
import space.kscience.magix.api.MagixEndpoint
|
import space.kscience.magix.api.MagixEndpoint
|
||||||
import space.kscience.magix.api.MagixMessage
|
import space.kscience.magix.api.MagixMessage
|
||||||
import space.kscience.magix.api.broadcast
|
import space.kscience.magix.api.broadcast
|
||||||
@ -22,7 +23,7 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null)
|
|||||||
*
|
*
|
||||||
* @param scope the [CoroutineScope] in which the responding process runs.
|
* @param scope the [CoroutineScope] in which the responding process runs.
|
||||||
* @param history the history database.
|
* @param history the history database.
|
||||||
* @param targetFilter filters the request messages by target if defined.
|
* @param endpointName the name of this endpoint that is used as a filter.
|
||||||
* @param pageSize maximum messages per page in the response. The default is 100.
|
* @param pageSize maximum messages per page in the response. The default is 100.
|
||||||
* @param user user block for outgoing messages if defined.
|
* @param user user block for outgoing messages if defined.
|
||||||
* @param origin tag for outgoing messages if defined.
|
* @param origin tag for outgoing messages if defined.
|
||||||
@ -30,17 +31,21 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null)
|
|||||||
public fun MagixEndpoint.launchHistory(
|
public fun MagixEndpoint.launchHistory(
|
||||||
scope: CoroutineScope,
|
scope: CoroutineScope,
|
||||||
history: MagixHistory,
|
history: MagixHistory,
|
||||||
targetFilter: Collection<String>? = null,
|
endpointName: String? = null,
|
||||||
pageSize: Int = 100,
|
pageSize: Int = 100,
|
||||||
user: JsonObject? = null,
|
user: JsonElement? = endpointName?.let { JsonPrimitive(endpointName) },
|
||||||
origin: String = MagixHistory.HISTORY_PAYLOAD_FORMAT,
|
origin: String = MagixHistory.HISTORY_PAYLOAD_FORMAT,
|
||||||
): Job = subscribe(MagixHistory.magixFormat, targetFilter = targetFilter).onEach { (request, payload) ->
|
): Job = subscribe(
|
||||||
|
MagixHistory.magixFormat,
|
||||||
|
targetFilter = endpointName?.let { setOf(it) }
|
||||||
|
).onEach { (request, payload) ->
|
||||||
|
|
||||||
fun send(chunk: List<MagixMessage>, pageNumber: Int, end: Boolean) {
|
fun send(chunk: List<MagixMessage>, pageNumber: Int, end: Boolean) {
|
||||||
scope.launch {
|
scope.launch {
|
||||||
val sendPayload = HistoryResponsePayload(
|
val sendPayload = HistoryResponsePayload(
|
||||||
chunk,
|
chunk,
|
||||||
pageNumber
|
pageNumber,
|
||||||
|
lastPage = end
|
||||||
)
|
)
|
||||||
broadcast(
|
broadcast(
|
||||||
format = MagixHistory.magixFormat,
|
format = MagixHistory.magixFormat,
|
||||||
|
Loading…
Reference in New Issue
Block a user