Add storage API test (inclomplete)

This commit is contained in:
Alexander Nozik 2023-07-29 18:08:25 +03:00
parent f9e20f8766
commit d3d8413837
15 changed files with 264 additions and 172 deletions

View File

@ -6,7 +6,7 @@ plugins {
id("space.kscience.gradle.project") id("space.kscience.gradle.project")
} }
val dataforgeVersion: String by extra("0.6.1") val dataforgeVersion: String by extra("0.6.2-dev-3")
val ktorVersion: String by extra(space.kscience.gradle.KScienceVersions.ktorVersion) val ktorVersion: String by extra(space.kscience.gradle.KScienceVersions.ktorVersion)
val rsocketVersion by extra("0.15.4") val rsocketVersion by extra("0.15.4")
val xodusVersion by extra("2.0.1") val xodusVersion by extra("2.0.1")

View File

@ -1,7 +1,11 @@
@file:OptIn(ExperimentalSerializationApi::class)
package space.kscience.controls.api package space.kscience.controls.api
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import kotlinx.datetime.Instant import kotlinx.datetime.Instant
import kotlinx.serialization.EncodeDefault
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.SerialName import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -55,9 +59,9 @@ public data class PropertyChangedMessage(
override val sourceDevice: Name = Name.EMPTY, override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
/** /**
@ -71,9 +75,9 @@ public data class PropertySetMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
/** /**
@ -87,9 +91,9 @@ public data class PropertyGetMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
/** /**
@ -101,9 +105,9 @@ public data class GetDescriptionMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
/** /**
@ -118,9 +122,9 @@ public data class DescriptionMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
/** /**
@ -137,9 +141,9 @@ public data class ActionExecuteMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name, override val targetDevice: Name,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
/** /**
@ -156,9 +160,9 @@ public data class ActionResultMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
/** /**
@ -171,9 +175,9 @@ public data class BinaryNotificationMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }
/** /**
@ -186,9 +190,9 @@ public data class EmptyDeviceMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
/** /**
@ -202,9 +206,9 @@ public data class DeviceLogMessage(
override val sourceDevice: Name? = null, override val sourceDevice: Name? = null,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = sourceDevice?.let(block)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = sourceDevice?.let(block))
} }
/** /**
@ -219,9 +223,9 @@ public data class DeviceErrorMessage(
override val sourceDevice: Name, override val sourceDevice: Name,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,
override val time: Instant? = Clock.System.now() @EncodeDefault override val time: Instant? = Clock.System.now(),
) : DeviceMessage(){ ) : DeviceMessage() {
override fun changeSource(block: (Name) -> Name):DeviceMessage = copy(sourceDevice = block(sourceDevice)) override fun changeSource(block: (Name) -> Name): DeviceMessage = copy(sourceDevice = block(sourceDevice))
} }

View File

@ -22,7 +22,7 @@ private suspend fun MagixEndpoint.collectEcho(scope: CoroutineScope, n: Int) {
scope.launch { scope.launch {
subscribe( subscribe(
MagixMessageFilter( MagixMessageFilter(
origin = listOf("loop") source = listOf("loop")
) )
).collect { message -> ).collect { message ->
if (message.id?.endsWith(".response") == true) { if (message.id?.endsWith(".response") == true) {

View File

@ -19,7 +19,7 @@ dependencies {
implementation(projects.magix.magixZmq) implementation(projects.magix.magixZmq)
implementation("io.ktor:ktor-client-cio:$ktorVersion") implementation("io.ktor:ktor-client-cio:$ktorVersion")
implementation("space.kscience:plotlykt-server:0.5.3") implementation("space.kscience:plotlykt-server:0.6.0")
implementation(spclibs.logback.classic) implementation(spclibs.logback.classic)
} }

View File

@ -1,11 +1,10 @@
package space.kscience.controls.demo package space.kscience.controls.demo
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.launch import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock import kotlinx.datetime.Clock
import space.kscience.controls.client.connectToMagix import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.magixFormat import space.kscience.controls.client.magixFormat
@ -20,6 +19,7 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.int
import space.kscience.magix.api.MagixEndpoint import space.kscience.magix.api.MagixEndpoint
import space.kscience.magix.api.subscribe import space.kscience.magix.api.subscribe
import space.kscience.magix.rsocket.rSocketWithTcp
import space.kscience.magix.rsocket.rSocketWithWebSockets import space.kscience.magix.rsocket.rSocketWithWebSockets
import space.kscience.magix.server.RSocketMagixFlowPlugin import space.kscience.magix.server.RSocketMagixFlowPlugin
import space.kscience.magix.server.startMagixServer import space.kscience.magix.server.startMagixServer
@ -31,7 +31,6 @@ import space.kscience.plotly.server.PlotlyUpdateMode
import space.kscience.plotly.server.serve import space.kscience.plotly.server.serve
import space.kscience.plotly.server.show import space.kscience.plotly.server.show
import space.kscince.magix.zmq.ZmqMagixFlowPlugin import space.kscince.magix.zmq.ZmqMagixFlowPlugin
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random import kotlin.random.Random
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
@ -49,14 +48,15 @@ class MassDevice(context: Context, meta: Meta) : DeviceBySpec<MassDevice>(MassDe
val value by doubleProperty { randomValue } val value by doubleProperty { randomValue }
override suspend fun MassDevice.onOpen() { override suspend fun MassDevice.onOpen() {
doRecurring(50.milliseconds) { doRecurring(10.milliseconds) {
read(value) read(value)
} }
} }
} }
} }
fun main() { @OptIn(DelicateCoroutinesApi::class)
suspend fun main() {
val context = Context("Mass") val context = Context("Mass")
context.startMagixServer( context.startMagixServer(
@ -67,7 +67,7 @@ fun main() {
val numDevices = 100 val numDevices = 100
repeat(numDevices) { repeat(numDevices) {
context.launch(Dispatchers.IO) { context.launch(newFixedThreadPoolContext(2, "Device${it}")) {
val deviceContext = Context("Device${it}") { val deviceContext = Context("Device${it}") {
plugin(DeviceManager) plugin(DeviceManager)
} }
@ -77,7 +77,7 @@ fun main() {
deviceManager.install("device$it", MassDevice) deviceManager.install("device$it", MassDevice)
val endpointId = "device$it" val endpointId = "device$it"
val deviceEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") val deviceEndpoint = MagixEndpoint.rSocketWithTcp("localhost")
deviceManager.connectToMagix(deviceEndpoint, endpointId) deviceManager.connectToMagix(deviceEndpoint, endpointId)
} }
} }
@ -94,14 +94,19 @@ fun main() {
launch(Dispatchers.IO) { launch(Dispatchers.IO) {
val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost") val monitorEndpoint = MagixEndpoint.rSocketWithWebSockets("localhost")
val latest = ConcurrentHashMap<String, Duration>() val mutex = Mutex()
val latest = HashMap<String, Duration>()
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) -> monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
mutex.withLock {
latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!! latest[magixMessage.sourceEndpoint] = Clock.System.now() - payload.time!!
}
}.launchIn(this) }.launchIn(this)
while (isActive) { while (isActive) {
delay(200) delay(200)
mutex.withLock {
val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap() val sorted = latest.mapKeys { it.key.substring(6).toInt() }.toSortedMap()
latest.clear() latest.clear()
x.numbers = sorted.keys x.numbers = sorted.keys
@ -112,6 +117,7 @@ fun main() {
} }
} }
} }
}
application.show() application.show()

View File

@ -28,7 +28,7 @@ public fun <T> MagixEndpoint.subscribe(
originFilter: Collection<String>? = null, originFilter: Collection<String>? = null,
targetFilter: Collection<String>? = null, targetFilter: Collection<String>? = null,
): Flow<Pair<MagixMessage, T>> = subscribe( ): Flow<Pair<MagixMessage, T>> = subscribe(
MagixMessageFilter(format = format.formats, origin = originFilter, target = targetFilter) MagixMessageFilter(format = format.formats, source = originFilter, target = targetFilter)
).map { ).map {
val value: T = magixJson.decodeFromJsonElement(format.serializer, it.payload) val value: T = magixJson.decodeFromJsonElement(format.serializer, it.payload)
it to value it to value

View File

@ -10,13 +10,13 @@ import kotlinx.serialization.Serializable
@Serializable @Serializable
public data class MagixMessageFilter( public data class MagixMessageFilter(
val format: Collection<String>? = null, val format: Collection<String>? = null,
val origin: Collection<String>? = null, val source: Collection<String>? = null,
val target: Collection<String>? = null, val target: Collection<String>? = null,
) { ) {
public fun accepts(message: MagixMessage): Boolean = public fun accepts(message: MagixMessage): Boolean =
format?.contains(message.format) ?: true format?.contains(message.format) ?: true
&& origin?.contains(message.sourceEndpoint) ?: true && source?.contains(message.sourceEndpoint) ?: true
&& target?.contains(message.targetEndpoint) ?: true && target?.contains(message.targetEndpoint) ?: true
public companion object { public companion object {

View File

@ -51,11 +51,12 @@ public class RSocketMagixEndpoint(private val rSocket: RSocket) : MagixEndpoint,
} }
internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) = internal fun buildConnector(
RSocketConnector { rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit,
reconnectable(10) ) = RSocketConnector {
reconnectable(5)
connectionConfig(rSocketConfig) connectionConfig(rSocketConfig)
} }
/** /**
* Build a websocket based endpoint connected to [host], [port] and given routing [path] * Build a websocket based endpoint connected to [host], [port] and given routing [path]

View File

@ -35,8 +35,11 @@ public class RSocketMagixFlowPlugin(
receive: Flow<MagixMessage>, receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit, sendMessage: suspend (MagixMessage) -> Unit,
): Job { ): Job {
val tcpTransport = val tcpTransport = TcpServerTransport(
TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration) hostname = serverHost,
port = serverPort,
configure = transportConfiguration
)
val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration) val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration)
.bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage)) .bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage))
@ -60,7 +63,7 @@ public class RSocketMagixFlowPlugin(
MagixMessageFilter.serializer(), MagixMessageFilter.serializer(),
request.data.readText() request.data.readText()
) )
request.close()
receive.filter(filter).map { message -> receive.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message) val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) } buildPayload { data(string) }
@ -72,7 +75,7 @@ public class RSocketMagixFlowPlugin(
MagixMessage.serializer(), MagixMessage.serializer(),
request.data.readText() request.data.readText()
) )
request.close()
sendMessage(message) sendMessage(message)
} }
// bidirectional connection, used for streaming connection // bidirectional connection, used for streaming connection
@ -81,12 +84,12 @@ public class RSocketMagixFlowPlugin(
sendMessage( sendMessage(
MagixEndpoint.magixJson.decodeFromString( MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(), MagixMessage.serializer(),
inputPayload.use{ it.data.readText()} inputPayload.use { it.data.readText() }
) )
) )
}.launchIn(this) }.launchIn(this)
val filterText = request.use { it.data.readText()} val filterText = request.use { it.data.readText() }
val filter = if (filterText.isNotBlank()) { val filter = if (filterText.isNotBlank()) {
MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText) MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)

View File

@ -19,20 +19,25 @@ import space.kscience.magix.storage.test
import java.nio.file.Path import java.nio.file.Path
import kotlin.sequences.Sequence import kotlin.sequences.Sequence
/**
* Attach a Xodus storage process to the given endpoint.
*/
public class XodusMagixStorage(
scope: CoroutineScope,
private val store: PersistentEntityStore,
endpoint: MagixEndpoint,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
) : MagixHistory, AutoCloseable {
//TODO consider message buffering private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage(
internal val subscriptionJob = endpoint.subscribe(filter).onEach { message -> format = getProperty(MagixMessage::format.name).toString(),
store.executeInTransaction { transaction -> payload = getBlobString(MagixMessage::payload.name)?.let {
transaction.newEntity(MAGIC_MESSAGE_ENTITY_TYPE).apply { magixJson.parseToJsonElement(it)
} ?: JsonObject(emptyMap()),
sourceEndpoint = getProperty(MagixMessage::sourceEndpoint.name).toString(),
targetEndpoint = getProperty(MagixMessage::targetEndpoint.name)?.toString(),
id = getProperty(MagixMessage::id.name)?.toString(),
parentId = getProperty(MagixMessage::parentId.name)?.toString(),
user = getBlobString(MagixMessage::user.name)?.let {
magixJson.parseToJsonElement(it)
},
)
public class XodusMagixHistory(private val store: PersistentEntityStore) : MagixHistory {
public fun writeMessage(storeTransaction: StoreTransaction, message: MagixMessage) {
storeTransaction.newEntity(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE).apply {
setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint) setProperty(MagixMessage::sourceEndpoint.name, message.sourceEndpoint)
setProperty(MagixMessage::format.name, message.format) setProperty(MagixMessage::format.name, message.format)
@ -59,21 +64,100 @@ public class XodusMagixStorage(
} }
} }
} }
}.launchIn(scope)
private fun Entity.parseMagixMessage(): MagixMessage = MagixMessage( public fun sendMessage(message: MagixMessage) {
format = getProperty(MagixMessage::format.name).toString(), store.executeInTransaction { transaction ->
payload = getBlobString(MagixMessage::payload.name)?.let { writeMessage(transaction, message)
magixJson.parseToJsonElement(it) }
} ?: JsonObject(emptyMap()), }
sourceEndpoint = getProperty(MagixMessage::sourceEndpoint.name).toString(),
targetEndpoint = getProperty(MagixMessage::targetEndpoint.name)?.toString(), override suspend fun useMessages(
id = getProperty(MagixMessage::id.name)?.toString(), magixFilter: MagixMessageFilter?,
parentId = getProperty(MagixMessage::parentId.name)?.toString(), payloadFilter: MagixPayloadFilter?,
user = getBlobString(MagixMessage::user.name)?.let { userFilter: MagixUsernameFilter?,
magixJson.parseToJsonElement(it) callback: (Sequence<MagixMessage>) -> Unit,
}, ): Unit = store.executeInReadonlyTransaction { transaction ->
val all = transaction.getAll(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE)
fun StoreTransaction.findAllIn(
entityType: String,
field: String,
values: Collection<String>?,
): EntityIterable? {
var union: EntityIterable? = null
values?.forEach {
val filter = transaction.find(entityType, field, it)
union = union?.union(filter) ?: filter
}
return union
}
// filter by magix filter
val filteredByMagix: EntityIterable = magixFilter?.let { mf ->
var res = all
transaction.findAllIn(XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::format.name, mf.format)
?.let {
res = res.intersect(it)
}
transaction.findAllIn(
XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE,
MagixMessage::sourceEndpoint.name,
mf.source
)?.let {
res = res.intersect(it)
}
transaction.findAllIn(
XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE,
MagixMessage::targetEndpoint.name,
mf.target
)?.let {
res = res.intersect(it)
}
res
} ?: all
val filteredByUser: EntityIterable = userFilter?.let { userFilter ->
filteredByMagix.intersect(
transaction.find(
XodusMagixStorage.MAGIC_MESSAGE_ENTITY_TYPE,
MagixMessage::user.name,
userFilter.userName
) )
)
} ?: filteredByMagix
val sequence = filteredByUser.asSequence().map { it.parseMagixMessage() }
val filteredSequence = if (payloadFilter == null) {
sequence
} else {
sequence.filter {
payloadFilter.test(it.payload)
}
}
callback(filteredSequence)
}
}
/**
* Attach a Xodus storage process to the given endpoint.
*/
public class XodusMagixStorage(
scope: CoroutineScope,
private val store: PersistentEntityStore,
endpoint: MagixEndpoint,
filter: MagixMessageFilter = MagixMessageFilter.ALL,
) : AutoCloseable {
public val history: XodusMagixHistory = XodusMagixHistory(store)
//TODO consider message buffering
internal val subscriptionJob = endpoint.subscribe(filter).onEach { message ->
history.sendMessage(message)
}.launchIn(scope)
/** /**
@ -105,63 +189,6 @@ public class XodusMagixStorage(
block(sequence) block(sequence)
} }
override suspend fun findMessages(
magixFilter: MagixMessageFilter?,
payloadFilter: MagixPayloadFilter?,
userFilter: MagixUsernameFilter?,
callback: (Sequence<MagixMessage>) -> Unit,
): Unit = store.executeInReadonlyTransaction { transaction ->
val all = transaction.getAll(MAGIC_MESSAGE_ENTITY_TYPE)
fun StoreTransaction.findAllIn(
entityType: String,
field: String,
values: Collection<String>?,
): EntityIterable? {
var union: EntityIterable? = null
values?.forEach {
val filter = transaction.find(entityType, field, it)
union = union?.union(filter) ?: filter
}
return union
}
// filter by magix filter
val filteredByMagix: EntityIterable = magixFilter?.let { mf ->
var res = all
transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::format.name, mf.format)?.let {
res = res.intersect(it)
}
transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::sourceEndpoint.name, mf.origin)?.let {
res = res.intersect(it)
}
transaction.findAllIn(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::targetEndpoint.name, mf.target)?.let {
res = res.intersect(it)
}
res
} ?: all
val filteredByUser: EntityIterable = userFilter?.let { userFilter ->
filteredByMagix.intersect(
transaction.find(MAGIC_MESSAGE_ENTITY_TYPE, MagixMessage::user.name, userFilter.userName)
)
} ?: filteredByMagix
val sequence = filteredByUser.asSequence().map { it.parseMagixMessage() }
val filteredSequence = if (payloadFilter == null) {
sequence
} else {
sequence.filter {
payloadFilter.test(it.payload)
}
}
callback(filteredSequence)
}
override fun close() { override fun close() {
subscriptionJob.cancel() subscriptionJob.cancel()
} }

View File

@ -0,0 +1,50 @@
package space.kscience.magix.storage.xodus
import jetbrains.exodus.entitystore.PersistentEntityStores
import kotlinx.serialization.json.JsonPrimitive
import kotlinx.serialization.json.buildJsonObject
import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter
import java.nio.file.Files
import kotlin.time.measureTime
public suspend fun main() {
val storeDirectory = Files.createTempDirectory("controls-xodus").toFile()
println(storeDirectory)
val store = PersistentEntityStores.newInstance(storeDirectory)
val history = XodusMagixHistory(store)
store.executeInTransaction { transaction ->
for (value in 1..100) {
for (source in 1..100) {
for (target in 1..100) {
history.writeMessage(
transaction,
MagixMessage(
"test",
sourceEndpoint = "source$source",
targetEndpoint = "target$target",
payload = buildJsonObject {
put("value", JsonPrimitive(value))
}
)
)
}
}
}
}
println("written million messages")
val time = measureTime {
history.useMessages(
MagixMessageFilter(source = listOf("source12"), target = listOf("target12"))
) { sequence ->
println(sequence.count())
}
}
println("Finished query in $time")
store.close()
}

View File

@ -66,7 +66,7 @@ public interface MagixHistory {
* @param payloadFilter filter for payload fields. * @param payloadFilter filter for payload fields.
* @param userFilter filters user names ("user.name"). * @param userFilter filters user names ("user.name").
*/ */
public suspend fun findMessages( public suspend fun useMessages(
magixFilter: MagixMessageFilter? = null, magixFilter: MagixMessageFilter? = null,
payloadFilter: MagixPayloadFilter? = null, payloadFilter: MagixPayloadFilter? = null,
userFilter: MagixUsernameFilter? = null, userFilter: MagixUsernameFilter? = null,

View File

@ -57,7 +57,7 @@ public fun MagixEndpoint.launchHistory(
if (payload is HistoryRequestPayload) { if (payload is HistoryRequestPayload) {
val realPageSize = payload.pageSize ?: pageSize val realPageSize = payload.pageSize ?: pageSize
history.findMessages(payload.magixFilter, payload.payloadFilter, payload.userFilter) { sequence -> history.useMessages(payload.magixFilter, payload.payloadFilter, payload.userFilter) { sequence ->
// start from -1 because increment always happens first // start from -1 because increment always happens first
var pageNumber = -1 var pageNumber = -1

View File

@ -13,6 +13,7 @@ import space.kscience.magix.api.MagixMessage
import space.kscience.magix.api.MagixMessageFilter import space.kscience.magix.api.MagixMessageFilter
import space.kscience.magix.api.filter import space.kscience.magix.api.filter
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class ZmqMagixEndpoint( public class ZmqMagixEndpoint(
private val host: String, private val host: String,
@ -69,8 +70,7 @@ public class ZmqMagixEndpoint(
} }
} }
public fun MagixEndpoint.Companion.zmq( public suspend fun MagixEndpoint.Companion.zmq(
scope: CoroutineScope,
host: String, host: String,
protocol: String = "tcp", protocol: String = "tcp",
pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT, pubPort: Int = DEFAULT_MAGIX_ZMQ_PUB_PORT,
@ -80,5 +80,5 @@ public fun MagixEndpoint.Companion.zmq(
protocol, protocol,
pubPort, pubPort,
pullPort, pullPort,
coroutineContext = scope.coroutineContext coroutineContext = coroutineContext
) )

View File

@ -18,6 +18,7 @@ public class ZmqMagixFlowPlugin(
public val localHost: String = "tcp://*", public val localHost: String = "tcp://*",
public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT, public val zmqPubSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PUB_PORT,
public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT, public val zmqPullSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_ZMQ_PULL_PORT,
private val zContext: ZContext = ZContext()
) : MagixFlowPlugin { ) : MagixFlowPlugin {
override fun start( override fun start(
@ -27,7 +28,7 @@ public class ZmqMagixFlowPlugin(
): Job = scope.launch(Dispatchers.IO) { ): Job = scope.launch(Dispatchers.IO) {
val logger = LoggerFactory.getLogger("magix-server-zmq") val logger = LoggerFactory.getLogger("magix-server-zmq")
ZContext().use { context -> zContext.use { context ->
//launch the publishing job //launch the publishing job
val pubSocket = context.createSocket(SocketType.PUB) val pubSocket = context.createSocket(SocketType.PUB)
pubSocket.bind("$localHost:$zmqPubSocketPort") pubSocket.bind("$localHost:$zmqPubSocketPort")