Add abstraction for writeable magix history
This commit is contained in:
parent
dc2d0094fc
commit
a172da0a3d
@ -1,7 +1,7 @@
|
||||
package space.kscience.magix.api
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
import kotlinx.serialization.json.*
|
||||
|
||||
|
||||
/*
|
||||
@ -33,3 +33,14 @@ public data class MagixMessage(
|
||||
val parentId: String? = null,
|
||||
val user: JsonElement? = null,
|
||||
)
|
||||
|
||||
/**
|
||||
* The default accessor for username. If `user` is an object, take it's "name" field.
|
||||
* If it is primitive, take its content. Return "@error" if it is an array.
|
||||
*/
|
||||
public val MagixMessage.userName: String? get() = when(user){
|
||||
null, JsonNull -> null
|
||||
is JsonObject -> user.jsonObject["name"]?.jsonPrimitive?.content
|
||||
is JsonPrimitive -> user.content
|
||||
else -> "@error"
|
||||
}
|
@ -15,6 +15,7 @@ kscience {
|
||||
useSerialization {
|
||||
json()
|
||||
}
|
||||
useCoroutines()
|
||||
dependencies {
|
||||
api(projects.magix.magixApi)
|
||||
api(spclibs.kotlinx.datetime)
|
||||
|
@ -6,12 +6,11 @@ import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.JsonObject
|
||||
import kotlinx.serialization.json.JsonPrimitive
|
||||
import kotlinx.serialization.json.jsonPrimitive
|
||||
import space.kscience.magix.api.MagixEndpoint
|
||||
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
|
||||
import space.kscience.magix.api.MagixMessage
|
||||
import space.kscience.magix.api.MagixMessageFilter
|
||||
import space.kscience.magix.api.userName
|
||||
import space.kscience.magix.storage.*
|
||||
import java.nio.file.Path
|
||||
import kotlin.sequences.Sequence
|
||||
@ -31,7 +30,7 @@ private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage(
|
||||
},
|
||||
)
|
||||
|
||||
public class XodusMagixHistory(private val store: PersistentEntityStore) : MagixHistory {
|
||||
public class XodusMagixHistory(private val store: PersistentEntityStore) : WriteableMagixHistory {
|
||||
|
||||
public fun writeMessage(storeTransaction: StoreTransaction, message: MagixMessage) {
|
||||
storeTransaction.newEntity(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE).apply {
|
||||
@ -49,20 +48,13 @@ public class XodusMagixHistory(private val store: PersistentEntityStore) : Magix
|
||||
message.parentId?.let {
|
||||
setProperty(MagixMessage::parentId.name, it)
|
||||
}
|
||||
message.user?.let {
|
||||
setProperty(
|
||||
MagixMessage::user.name,
|
||||
when (it) {
|
||||
is JsonObject -> it["name"]?.jsonPrimitive?.content ?: "@error"
|
||||
is JsonPrimitive -> it.content
|
||||
else -> "@error"
|
||||
}
|
||||
)
|
||||
message.userName?.let {
|
||||
setProperty(MagixMessage::user.name, it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public fun sendMessage(message: MagixMessage) {
|
||||
override suspend fun send(message: MagixMessage) {
|
||||
store.executeInTransaction { transaction ->
|
||||
writeMessage(transaction, message)
|
||||
}
|
||||
@ -139,6 +131,7 @@ public class XodusMagixHistory(private val store: PersistentEntityStore) : Magix
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Attach a Xodus storage process to the given endpoint.
|
||||
*/
|
||||
@ -154,7 +147,7 @@ public class XodusMagixStorage(
|
||||
|
||||
//TODO consider message buffering
|
||||
private val subscriptionJob = endpoint.subscribe(subscriptionFilter).onEach { message ->
|
||||
history.sendMessage(message)
|
||||
history.send(message)
|
||||
}.launchIn(scope)
|
||||
|
||||
private val broadcastJob = endpoint.launchHistory(scope, history, endpointName = endpointName)
|
||||
|
@ -4,7 +4,9 @@ import kotlinx.datetime.LocalDateTime
|
||||
import kotlinx.serialization.SerialName
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
import kotlinx.serialization.json.doubleOrNull
|
||||
import kotlinx.serialization.json.jsonObject
|
||||
import kotlinx.serialization.json.jsonPrimitive
|
||||
import space.kscience.magix.api.MagixFormat
|
||||
import space.kscience.magix.api.MagixMessage
|
||||
import space.kscience.magix.api.MagixMessageFilter
|
||||
@ -56,8 +58,17 @@ private fun JsonElement.takeElement(path: String): JsonElement? = if (path.isEmp
|
||||
|
||||
public fun MagixPayloadFilter.test(element: JsonElement): Boolean = when (this) {
|
||||
is MagixPayloadFilter.Equals -> element.takeElement(path) == value
|
||||
is MagixPayloadFilter.DateTimeInRange -> TODO()
|
||||
is MagixPayloadFilter.NumberInRange -> TODO()
|
||||
is MagixPayloadFilter.DateTimeInRange -> {
|
||||
element.takeElement(path)?.jsonPrimitive?.content?.let {
|
||||
LocalDateTime.parse(it) in from..to
|
||||
} ?: false
|
||||
}
|
||||
is MagixPayloadFilter.NumberInRange -> {
|
||||
element.takeElement(path)?.jsonPrimitive?.doubleOrNull?.let {
|
||||
it in (from.toDouble()..to.toDouble())
|
||||
} ?: false
|
||||
}
|
||||
|
||||
is MagixPayloadFilter.Not -> !argument.test(element)
|
||||
is MagixPayloadFilter.And -> left.test(element) && right.test(element)
|
||||
is MagixPayloadFilter.Or -> left.test(element) || right.test(element)
|
||||
@ -103,3 +114,7 @@ public interface MagixHistory {
|
||||
}
|
||||
}
|
||||
|
||||
public interface WriteableMagixHistory: MagixHistory{
|
||||
public suspend fun send(message: MagixMessage)
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,32 @@
|
||||
package space.kscience.magix.storage
|
||||
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import space.kscience.magix.api.MagixMessage
|
||||
import space.kscience.magix.api.MagixMessageFilter
|
||||
|
||||
class InMemoryMagixHistory() : WriteableMagixHistory {
|
||||
|
||||
private val cache = mutableListOf<MagixMessage>()
|
||||
private val mutex = Mutex()
|
||||
|
||||
override suspend fun send(message: MagixMessage) {
|
||||
mutex.withLock {
|
||||
cache.add(message)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun useMessages(
|
||||
magixFilter: MagixMessageFilter?,
|
||||
payloadFilter: MagixPayloadFilter?,
|
||||
userFilter: MagixUsernameFilter?,
|
||||
callback: (Sequence<MagixMessage>) -> Unit,
|
||||
) = mutex.withLock {
|
||||
val sequence = cache.asSequence().filter { message ->
|
||||
(magixFilter?.accepts(message) ?: true) &&
|
||||
(userFilter?.userName?.equals(message.user) ?: true) &&
|
||||
payloadFilter?.test(message.payload) ?: true
|
||||
}
|
||||
callback(sequence)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user