Documentation update and minor refactoring

This commit is contained in:
Alexander Nozik 2023-07-29 09:45:34 +03:00
parent 405bcd6ba3
commit fcabd9aed4
11 changed files with 62 additions and 31 deletions

View File

@ -6,6 +6,7 @@ import kotlinx.coroutines.newCoroutineContext
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.*
import space.kscience.controls.manager.DeviceManager
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
@ -105,8 +106,8 @@ public class DeviceClient(
* Connect to a remote device via this client.
*/
public fun MagixEndpoint.remoteDevice(context: Context, magixTarget: String, deviceName: Name): DeviceClient {
val subscription = subscribe(controlsMagixFormat, originFilter = listOf(magixTarget)).map { it.second }
val subscription = subscribe(DeviceManager.magixFormat, originFilter = listOf(magixTarget)).map { it.second }
return DeviceClient(context, deviceName, subscription) {
broadcast(controlsMagixFormat, it, magixTarget, id = stringUID())
broadcast(DeviceManager.magixFormat, it, magixTarget, id = stringUID())
}
}

View File

@ -14,15 +14,20 @@ import space.kscience.dataforge.context.logger
import space.kscience.magix.api.*
public val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
internal val controlsMagixFormat: MagixFormat<DeviceMessage> = MagixFormat(
DeviceMessage.serializer(),
setOf("controls-kt", "dataforge")
)
/**
* A magix message format to work with Controls-kt data
*/
public val DeviceManager.Companion.magixFormat: MagixFormat<DeviceMessage> get() = controlsMagixFormat
internal fun generateId(request: MagixMessage): String = if (request.id != null) {
"${request.id}.response"
} else {
"df[${request.payload.hashCode()}"
"controls[${request.payload.hashCode().toString(16)}"
}
/**
@ -37,6 +42,7 @@ public fun DeviceManager.connectToMagix(
if (responsePayload != null) {
endpoint.broadcast(
format = controlsMagixFormat,
target = request.origin,
origin = endpointID,
payload = responsePayload,
id = generateId(request),

View File

@ -7,7 +7,8 @@ import kotlinx.coroutines.launch
import kotlinx.html.div
import kotlinx.html.link
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.controlsMagixFormat
import space.kscience.controls.client.magixFormat
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.spec.name
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.get
@ -54,7 +55,7 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
fun CoroutineScope.startDemoDeviceServer(magixEndpoint: MagixEndpoint): ApplicationEngine {
//share subscription to a parse message only once
val subscription = magixEndpoint.subscribe(controlsMagixFormat).shareIn(this, SharingStarted.Lazily)
val subscription = magixEndpoint.subscribe(DeviceManager.magixFormat).shareIn(this, SharingStarted.Lazily)
val sinFlow = subscription.mapNotNull { (_, payload) ->
(payload as? PropertyChangedMessage)?.takeIf { it.property == DemoDevice.sin.name }

View File

@ -2,7 +2,8 @@ package space.kscience.controls.demo.car
import kotlinx.coroutines.launch
import space.kscience.controls.api.PropertyChangedMessage
import space.kscience.controls.client.controlsMagixFormat
import space.kscience.controls.client.magixFormat
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.spec.write
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
@ -18,7 +19,7 @@ import kotlin.time.ExperimentalTime
class MagixVirtualCar(context: Context, meta: Meta) : VirtualCar(context, meta) {
private fun MagixEndpoint.launchMagixVirtualCarUpdate() = launch {
subscribe(controlsMagixFormat).collect { (_, payload) ->
subscribe(DeviceManager.magixFormat).collect { (_, payload) ->
(payload as? PropertyChangedMessage)?.let { message ->
if (message.sourceDevice == Name.parse("virtual-car")) {
when (message.property) {

View File

@ -8,7 +8,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import space.kscience.controls.client.connectToMagix
import space.kscience.controls.client.controlsMagixFormat
import space.kscience.controls.client.magixFormat
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install
import space.kscience.controls.spec.*
@ -96,7 +96,7 @@ fun main() {
val latest = ConcurrentHashMap<String, Duration>()
monitorEndpoint.subscribe(controlsMagixFormat).onEach { (magixMessage, payload) ->
monitorEndpoint.subscribe(DeviceManager.magixFormat).onEach { (magixMessage, payload) ->
latest[magixMessage.origin] = Clock.System.now() - payload.time!!
}.launchIn(this)

View File

@ -6,6 +6,11 @@ import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement
import space.kscience.magix.api.MagixEndpoint.Companion.magixJson
/**
* A format for [MagixMessage] that allows to decode typed payload
*
* @param formats allowed values of the format field that are processed. The first value is the primary format for sending.
*/
public data class MagixFormat<T>(
val serializer: KSerializer<T>,
val formats: Set<String>,
@ -13,10 +18,15 @@ public data class MagixFormat<T>(
val defaultFormat: String get() = formats.firstOrNull() ?: "magix"
}
/**
* Subscribe for messages in given endpoint using [format] to declare format filter as well as automatic decoding.
*
* @return a flow of pairs (raw message, decoded message). Raw messages are to be used to extract headers.
*/
public fun <T> MagixEndpoint.subscribe(
format: MagixFormat<T>,
originFilter: Collection<String?>? = null,
targetFilter: Collection<String?>? = null,
originFilter: Collection<String>? = null,
targetFilter: Collection<String>? = null,
): Flow<Pair<MagixMessage, T>> = subscribe(
MagixMessageFilter(format = format.formats, origin = originFilter, target = targetFilter)
).map {
@ -24,6 +34,10 @@ public fun <T> MagixEndpoint.subscribe(
it to value
}
/**
* Send a message using given [format] to encode the message payload. The format field is also taken from [format].
*
*/
public suspend fun <T> MagixEndpoint.broadcast(
format: MagixFormat<T>,
payload: T,

View File

@ -4,11 +4,14 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.serialization.Serializable
/**
* A filter that allows receiving only messages with format, origin and target in given list.
*/
@Serializable
public data class MagixMessageFilter(
val format: Collection<String?>? = null,
val origin: Collection<String?>? = null,
val target: Collection<String?>? = null,
val format: Collection<String>? = null,
val origin: Collection<String>? = null,
val target: Collection<String>? = null,
) {
public fun accepts(message: MagixMessage): Boolean =

View File

@ -77,6 +77,7 @@ public class MqttMagixEndpoint(
public companion object {
public const val DEFAULT_MAGIX_TOPIC_NAME: String = "magix"
//TODO add target name escaping
internal val defaultBroadcastTopicBuilder: (MagixMessage) -> String = { message ->
message.target?.let { "$DEFAULT_MAGIX_TOPIC_NAME/it" } ?: DEFAULT_MAGIX_TOPIC_NAME

View File

@ -35,7 +35,8 @@ public class RSocketMagixFlowPlugin(
receive: Flow<MagixMessage>,
sendMessage: suspend (MagixMessage) -> Unit,
): Job {
val tcpTransport = TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration)
val tcpTransport =
TcpServerTransport(hostname = serverHost, port = serverPort, configure = transportConfiguration)
val rSocketJob: TcpServer = RSocketServer(rsocketConfiguration)
.bindIn(scope, tcpTransport, acceptor(scope, receive, sendMessage))
@ -59,6 +60,7 @@ public class RSocketMagixFlowPlugin(
MagixMessageFilter.serializer(),
request.data.readText()
)
request.close()
receive.filter(filter).map { message ->
val string = MagixEndpoint.magixJson.encodeToString(MagixMessage.serializer(), message)
buildPayload { data(string) }
@ -66,22 +68,25 @@ public class RSocketMagixFlowPlugin(
}
//single send
fireAndForget { request: Payload ->
val message =
MagixEndpoint.magixJson.decodeFromString(MagixMessage.serializer(), request.data.readText())
val message = MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
request.data.readText()
)
request.close()
sendMessage(message)
}
// bidirectional connection, used for streaming connection
requestChannel { request: Payload, input: Flow<Payload> ->
input.onEach {
input.onEach { inputPayload ->
sendMessage(
MagixEndpoint.magixJson.decodeFromString(
MagixMessage.serializer(),
it.data.readText()
inputPayload.use{ it.data.readText()}
)
)
}.launchIn(this)
val filterText = request.data.readText()
val filterText = request.use { it.data.readText()}
val filter = if (filterText.isNotBlank()) {
MagixEndpoint.magixJson.decodeFromString(MagixMessageFilter.serializer(), filterText)

View File

@ -47,24 +47,24 @@ public fun Application.magixModule(magixFlow: MutableSharedFlow<MagixMessage>, r
install(WebSockets)
}
if (pluginOrNull(RSocketSupport) == null) {
install(RSocketSupport)
}
// if (pluginOrNull(CORS) == null) {
// install(CORS) {
// //TODO consider more safe policy
// anyHost()
// }
// }
if (pluginOrNull(ContentNegotiation) == null) {
install(ContentNegotiation) {
json()
}
}
if (pluginOrNull(RSocketSupport) == null) {
install(RSocketSupport)
}
routing {
route(route) {
install(ContentNegotiation){
json()
}
get("state") {
call.respondHtml {
head {

View File

@ -58,8 +58,7 @@ include(
":magix:magix-zmq",
":magix:magix-rabbit",
":magix:magix-mqtt",
// ":magix:magix-storage",
":magix:magix-storage",
":magix:magix-storage:magix-storage-xodus",
":controls-magix-client",
":demo:all-things",