Make it buildable again
This commit is contained in:
parent
cf70339a9f
commit
f9e20f8766
@ -5,7 +5,9 @@ import kotlinx.coroutines.CoroutineScope
|
|||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.serialization.encodeToString
|
import kotlinx.serialization.encodeToString
|
||||||
import kotlinx.serialization.json.*
|
import kotlinx.serialization.json.JsonObject
|
||||||
|
import kotlinx.serialization.json.JsonPrimitive
|
||||||
|
import kotlinx.serialization.json.jsonPrimitive
|
||||||
import space.kscience.magix.api.MagixEndpoint
|
import space.kscience.magix.api.MagixEndpoint
|
||||||
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
|
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
|
||||||
import space.kscience.magix.api.MagixMessage
|
import space.kscience.magix.api.MagixMessage
|
||||||
@ -13,6 +15,7 @@ import space.kscience.magix.api.MagixMessageFilter
|
|||||||
import space.kscience.magix.storage.MagixHistory
|
import space.kscience.magix.storage.MagixHistory
|
||||||
import space.kscience.magix.storage.MagixPayloadFilter
|
import space.kscience.magix.storage.MagixPayloadFilter
|
||||||
import space.kscience.magix.storage.MagixUsernameFilter
|
import space.kscience.magix.storage.MagixUsernameFilter
|
||||||
|
import space.kscience.magix.storage.test
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import kotlin.sequences.Sequence
|
import kotlin.sequences.Sequence
|
||||||
|
|
||||||
@ -33,7 +36,7 @@ public class XodusMagixStorage(
|
|||||||
setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint)
|
setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint)
|
||||||
setProperty(MagixMessage::format.name, message.format)
|
setProperty(MagixMessage::format.name, message.format)
|
||||||
|
|
||||||
setBlobString(MagixMessage::payload.name, MagixEndpoint.magixJson.encodeToString(message.payload))
|
setBlobString(MagixMessage::payload.name, magixJson.encodeToString(message.payload))
|
||||||
|
|
||||||
message.targetEndpoint?.let {
|
message.targetEndpoint?.let {
|
||||||
setProperty(MagixMessage::targetEndpoint.name, it)
|
setProperty(MagixMessage::targetEndpoint.name, it)
|
||||||
@ -104,7 +107,7 @@ public class XodusMagixStorage(
|
|||||||
|
|
||||||
override suspend fun findMessages(
|
override suspend fun findMessages(
|
||||||
magixFilter: MagixMessageFilter?,
|
magixFilter: MagixMessageFilter?,
|
||||||
payloadFilters: List<MagixPayloadFilter>,
|
payloadFilter: MagixPayloadFilter?,
|
||||||
userFilter: MagixUsernameFilter?,
|
userFilter: MagixUsernameFilter?,
|
||||||
callback: (Sequence<MagixMessage>) -> Unit,
|
callback: (Sequence<MagixMessage>) -> Unit,
|
||||||
): Unit = store.executeInReadonlyTransaction { transaction ->
|
): Unit = store.executeInReadonlyTransaction { transaction ->
|
||||||
@ -139,17 +142,24 @@ public class XodusMagixStorage(
|
|||||||
res
|
res
|
||||||
} ?: all
|
} ?: all
|
||||||
|
|
||||||
val filteredByUser: EntityIterable = userFilter?.let { userFilter->
|
val filteredByUser: EntityIterable = userFilter?.let { userFilter ->
|
||||||
filteredByMagix.intersect(
|
filteredByMagix.intersect(
|
||||||
transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName)
|
transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName)
|
||||||
)
|
)
|
||||||
} ?: filteredByMagix
|
} ?: filteredByMagix
|
||||||
|
|
||||||
|
|
||||||
filteredByUser.se
|
val sequence = filteredByUser.asSequence().map { it.parseMagixMessage() }
|
||||||
|
|
||||||
|
val filteredSequence = if (payloadFilter == null) {
|
||||||
|
sequence
|
||||||
|
} else {
|
||||||
|
sequence.filter {
|
||||||
|
payloadFilter.test(it.payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
block(sequence)
|
callback(filteredSequence)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
|
@ -14,10 +14,6 @@ public sealed class MagixPayloadFilter {
|
|||||||
@SerialName("eq")
|
@SerialName("eq")
|
||||||
public class Equals(public val path: String, public val value: JsonElement) : MagixPayloadFilter()
|
public class Equals(public val path: String, public val value: JsonElement) : MagixPayloadFilter()
|
||||||
|
|
||||||
@SerialName("not")
|
|
||||||
public class Not(public val argument: MagixPayloadFilter) : MagixPayloadFilter()
|
|
||||||
|
|
||||||
|
|
||||||
// @SerialName("like")
|
// @SerialName("like")
|
||||||
// public class Like(public val path: String, public val value: String) : MagixPayloadFilter()
|
// public class Like(public val path: String, public val value: String) : MagixPayloadFilter()
|
||||||
|
|
||||||
@ -31,16 +27,20 @@ public sealed class MagixPayloadFilter {
|
|||||||
public val from: LocalDateTime,
|
public val from: LocalDateTime,
|
||||||
public val to: LocalDateTime,
|
public val to: LocalDateTime,
|
||||||
) : MagixPayloadFilter()
|
) : MagixPayloadFilter()
|
||||||
|
|
||||||
|
|
||||||
|
@SerialName("not")
|
||||||
|
public class Not(public val argument: MagixPayloadFilter) : MagixPayloadFilter()
|
||||||
|
|
||||||
|
@SerialName("and")
|
||||||
|
public class And(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter()
|
||||||
|
|
||||||
|
@SerialName("or")
|
||||||
|
public class Or(public val left: MagixPayloadFilter, public val right: MagixPayloadFilter) : MagixPayloadFilter()
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun MagixPayloadFilter.test(element: JsonElement): Boolean {
|
public fun MagixPayloadFilter.test(element: JsonElement): Boolean {
|
||||||
TODO()
|
TODO()
|
||||||
// when (this) {
|
|
||||||
// is MagixPayloadFilter.DateTimeInRange -> TODO()
|
|
||||||
// is MagixPayloadFilter.Equals -> TODO()
|
|
||||||
// is MagixPayloadFilter.Not -> !(argument.test(element))
|
|
||||||
// is MagixPayloadFilter.NumberInRange -> element.jsonObject[path]
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun Sequence<JsonElement>.filter(magixPayloadFilter: MagixPayloadFilter): Sequence<JsonElement> = filter {
|
public fun Sequence<JsonElement>.filter(magixPayloadFilter: MagixPayloadFilter): Sequence<JsonElement> = filter {
|
||||||
@ -63,12 +63,12 @@ public interface MagixHistory {
|
|||||||
* closes all transactions after use.
|
* closes all transactions after use.
|
||||||
*
|
*
|
||||||
* @param magixFilter magix header filter.
|
* @param magixFilter magix header filter.
|
||||||
* @param payloadFilters filters for payload fields.
|
* @param payloadFilter filter for payload fields.
|
||||||
* @param userFilter filters user names ("user.name").
|
* @param userFilter filters user names ("user.name").
|
||||||
*/
|
*/
|
||||||
public suspend fun findMessages(
|
public suspend fun findMessages(
|
||||||
magixFilter: MagixMessageFilter? = null,
|
magixFilter: MagixMessageFilter? = null,
|
||||||
payloadFilters: List<MagixPayloadFilter> = emptyList(),
|
payloadFilter: MagixPayloadFilter? = null,
|
||||||
userFilter: MagixUsernameFilter? = null,
|
userFilter: MagixUsernameFilter? = null,
|
||||||
callback: (Sequence<MagixMessage>) -> Unit,
|
callback: (Sequence<MagixMessage>) -> Unit,
|
||||||
)
|
)
|
||||||
|
@ -23,7 +23,7 @@ public sealed class MagixHistoryPayload
|
|||||||
@SerialName("history.request")
|
@SerialName("history.request")
|
||||||
public data class HistoryRequestPayload(
|
public data class HistoryRequestPayload(
|
||||||
val magixFilter: MagixMessageFilter? = null,
|
val magixFilter: MagixMessageFilter? = null,
|
||||||
val payloadFilters: List<MagixPayloadFilter> = emptyList(),
|
val payloadFilter: MagixPayloadFilter? = null,
|
||||||
val userFilter: MagixUsernameFilter? = null,
|
val userFilter: MagixUsernameFilter? = null,
|
||||||
val pageSize: Int? = null
|
val pageSize: Int? = null
|
||||||
) : MagixHistoryPayload()
|
) : MagixHistoryPayload()
|
||||||
|
@ -57,7 +57,7 @@ public fun MagixEndpoint.launchHistory(
|
|||||||
|
|
||||||
if (payload is HistoryRequestPayload) {
|
if (payload is HistoryRequestPayload) {
|
||||||
val realPageSize = payload.pageSize ?: pageSize
|
val realPageSize = payload.pageSize ?: pageSize
|
||||||
history.findMessages(payload.magixFilter, payload.payloadFilters, payload.userFilter) { sequence ->
|
history.findMessages(payload.magixFilter, payload.payloadFilter, payload.userFilter) { sequence ->
|
||||||
// start from -1 because increment always happens first
|
// start from -1 because increment always happens first
|
||||||
var pageNumber = -1
|
var pageNumber = -1
|
||||||
|
|
||||||
|
@ -10,5 +10,5 @@ description = """
|
|||||||
dependencies {
|
dependencies {
|
||||||
api(projects.magix.magixApi)
|
api(projects.magix.magixApi)
|
||||||
api("org.slf4j:slf4j-api:2.0.6")
|
api("org.slf4j:slf4j-api:2.0.6")
|
||||||
implementation("org.zeromq:jeromq:0.5.2")
|
api("org.zeromq:jeromq:0.5.2")
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,6 @@ public class ZmqMagixEndpoint(
|
|||||||
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
private val pullPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
|
||||||
private val coroutineContext: CoroutineContext = Dispatchers.IO,
|
private val coroutineContext: CoroutineContext = Dispatchers.IO,
|
||||||
private val zmqContext: ZContext = ZContext()
|
private val zmqContext: ZContext = ZContext()
|
||||||
|
|
||||||
) : MagixEndpoint, AutoCloseable {
|
) : MagixEndpoint, AutoCloseable {
|
||||||
|
|
||||||
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> {
|
override fun subscribe(filter: MagixMessageFilter): Flow<MagixMessage> {
|
||||||
|
Loading…
Reference in New Issue
Block a user