diff --git a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt index af223e3..c129c79 100644 --- a/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt +++ b/controls-magix-client/src/commonMain/kotlin/space/kscience/controls/client/controlsMagix.kt @@ -16,7 +16,7 @@ import space.kscience.magix.api.* internal val controlsMagixFormat: MagixFormat = MagixFormat( DeviceMessage.serializer(), - setOf("controls-kt", "dataforge") + setOf("controls-kt") ) /** diff --git a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt index 9f28b9e..bfd7508 100644 --- a/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt +++ b/magix/magix-storage/magix-storage-xodus/src/main/kotlin/space/kscience/magix/storage/xodus/XodusMagixStorage.kt @@ -12,10 +12,7 @@ import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint.Companion.magixJson import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter -import space.kscience.magix.storage.MagixHistory -import space.kscience.magix.storage.MagixPayloadFilter -import space.kscience.magix.storage.MagixUsernameFilter -import space.kscience.magix.storage.test +import space.kscience.magix.storage.* import java.nio.file.Path import kotlin.sequences.Sequence @@ -149,16 +146,19 @@ public class XodusMagixStorage( scope: CoroutineScope, private val store: PersistentEntityStore, endpoint: MagixEndpoint, - filter: MagixMessageFilter = MagixMessageFilter.ALL, + endpointName: String? = null, + subscriptionFilter: MagixMessageFilter = MagixMessageFilter.ALL, ) : AutoCloseable { public val history: XodusMagixHistory = XodusMagixHistory(store) //TODO consider message buffering - internal val subscriptionJob = endpoint.subscribe(filter).onEach { message -> + private val subscriptionJob = endpoint.subscribe(subscriptionFilter).onEach { message -> history.sendMessage(message) }.launchIn(scope) + private val broadcastJob = endpoint.launchHistory(scope, history, endpointName = endpointName) + /** * Access all messages in a given format @@ -204,14 +204,16 @@ public class XodusMagixStorage( public fun MagixEndpoint.storeInXodus( scope: CoroutineScope, xodusStore: PersistentEntityStore, + endpointName: String? = null, filter: MagixMessageFilter = MagixMessageFilter(), -): XodusMagixStorage = XodusMagixStorage(scope, xodusStore, this, filter) +): XodusMagixStorage = XodusMagixStorage(scope, xodusStore, this, endpointName, filter) public fun MagixEndpoint.storeInXodus( scope: CoroutineScope, path: Path, + endpointName: String? = null, filter: MagixMessageFilter = MagixMessageFilter(), ): XodusMagixStorage { val store = PersistentEntityStores.newInstance(path.toFile()) - return XodusMagixStorage(scope, store, this, filter) + return XodusMagixStorage(scope, store, this, endpointName, filter) } \ No newline at end of file diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt index e991fa9..3059481 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/MagixHistory.kt @@ -4,6 +4,7 @@ import kotlinx.datetime.LocalDateTime import kotlinx.serialization.SerialName import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.jsonObject import space.kscience.magix.api.MagixFormat import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.MagixMessageFilter @@ -39,8 +40,27 @@ public sealed class MagixPayloadFilter { public class Or(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter() } -public fun MagixPayloadFilter.test(element: JsonElement): Boolean { - TODO() +private fun JsonElement.takeElement(path: String): JsonElement? = if (path.isEmpty()) { + this +} else { + val separatorIndex = path.indexOf(".") + if (separatorIndex == -1) { + jsonObject[path] + } else { + val firstSegment = path.substring(0, separatorIndex) + val remaining = path.substring(separatorIndex + 1, path.length) + jsonObject[firstSegment]?.takeElement(remaining) + } + +} + +public fun MagixPayloadFilter.test(element: JsonElement): Boolean = when (this) { + is MagixPayloadFilter.Equals -> element.takeElement(path) == value + is MagixPayloadFilter.DateTimeInRange -> TODO() + is MagixPayloadFilter.NumberInRange -> TODO() + is MagixPayloadFilter.Not -> !argument.test(element) + is MagixPayloadFilter.And -> left.test(element) && right.test(element) + is MagixPayloadFilter.Or -> left.test(element) || right.test(element) } public fun Sequence.filter(magixPayloadFilter: MagixPayloadFilter): Sequence = filter { diff --git a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt index 4009f0b..67b2cd3 100644 --- a/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt +++ b/magix/magix-storage/src/commonMain/kotlin/space/kscience/magix/storage/historyEndpoint.kt @@ -5,7 +5,8 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch -import kotlinx.serialization.json.JsonObject +import kotlinx.serialization.json.JsonElement +import kotlinx.serialization.json.JsonPrimitive import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixMessage import space.kscience.magix.api.broadcast @@ -22,7 +23,7 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null) * * @param scope the [CoroutineScope] in which the responding process runs. * @param history the history database. - * @param targetFilter filters the request messages by target if defined. + * @param endpointName the name of this endpoint that is used as a filter. * @param pageSize maximum messages per page in the response. The default is 100. * @param user user block for outgoing messages if defined. * @param origin tag for outgoing messages if defined. @@ -30,17 +31,21 @@ internal fun generateId(request: MagixMessage): String = if (request.id != null) public fun MagixEndpoint.launchHistory( scope: CoroutineScope, history: MagixHistory, - targetFilter: Collection? = null, + endpointName: String? = null, pageSize: Int = 100, - user: JsonObject? = null, + user: JsonElement? = endpointName?.let { JsonPrimitive(endpointName) }, origin: String = MagixHistory.HISTORY_PAYLOAD_FORMAT, -): Job = subscribe(MagixHistory.magixFormat, targetFilter = targetFilter).onEach { (request, payload) -> +): Job = subscribe( + MagixHistory.magixFormat, + targetFilter = endpointName?.let { setOf(it) } +).onEach { (request, payload) -> fun send(chunk: List, pageNumber: Int, end: Boolean) { scope.launch { val sendPayload = HistoryResponsePayload( chunk, - pageNumber + pageNumber, + lastPage = end ) broadcast( format = MagixHistory.magixFormat,