Merge remote-tracking branch 'origin/dev' into dev

# Conflicts:
#	build.gradle.kts
#	dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt
#	magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt
This commit is contained in:
Alexander Nozik 2020-12-01 10:46:02 +03:00
commit 7fc27c49a3
8 changed files with 38 additions and 23 deletions

View File

@ -1,14 +1,24 @@
package hep.dataforge.control.client package hep.dataforge.control.client
public data class TangoPayload( public sealed class TangoPayload(
val host: String, val host: String,
val device: String, val device: String,
val name: String, val name: String,
val value: Any? = null,
val timestamp: Long? = null, val timestamp: Long? = null,
val quality: String = "VALID", val quality: String = "VALID",
val event: String? = null, val event: String? = null,
val input: Any? = null, // val input: Any? = null,
val output: Any? = null, // val output: Any? = null,
val errors: Iterable<Any?>?, // val errors: Iterable<Any?>?,
) )
public class TangoAttributePayload(
host: String,
device: String,
name: String,
val value: Any? = null,
timestamp: Long? = null,
quality: String = "VALID",
event: String? = null,
errors: Iterable<Any?>?,
) : TangoPayload(host, device, name, timestamp, quality, event)

View File

@ -55,5 +55,9 @@ public fun DeviceManager.launchMagixClient(
}.launchIn(endpoint.scope) }.launchIn(endpoint.scope)
} }
public fun DeviceManager.asMagixProcessor(endpointID: String = "dataforge"): MagixProcessor = object : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = launchMagixClient(endpoint, endpointID)
}

View File

@ -1,6 +1,5 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -10,8 +9,6 @@ import kotlinx.serialization.json.JsonElement
* Inwards API of magix endpoint used to build services * Inwards API of magix endpoint used to build services
*/ */
public interface MagixEndpoint { public interface MagixEndpoint {
public val scope: CoroutineScope
/** /**
* Subscribe to a [Flow] of messages using specific [payloadSerializer] * Subscribe to a [Flow] of messages using specific [payloadSerializer]
*/ */
@ -32,13 +29,16 @@ public interface MagixEndpoint {
public companion object { public companion object {
public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 public const val DEFAULT_MAGIX_WS_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
public val magixJson: Json = Json public val magixJson: Json = Json{
ignoreUnknownKeys = true
encodeDefaults = false
}
} }
} }
public fun MagixEndpoint.subscribe( public fun MagixEndpoint.subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL, filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer()) ): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer(),filter)
public suspend fun MagixEndpoint.broadcast(message: MagixMessage<JsonElement>): Unit = public suspend fun MagixEndpoint.broadcast(message: MagixMessage<JsonElement>): Unit =
broadcast(JsonElement.serializer(), message) broadcast(JsonElement.serializer(), message)

View File

@ -1,6 +1,7 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
/** /**
* *
@ -26,6 +27,6 @@ public data class MagixMessage<T>(
val target: String? = null, val target: String? = null,
val id: String? = null, val id: String? = null,
val parentId: String? = null, val parentId: String? = null,
val user: String? = null, val user: JsonElement? = null,
val action: String? = null val action: String? = null
) )

View File

@ -9,7 +9,6 @@ public data class MagixMessageFilter(
val format: List<String>? = null, val format: List<String>? = null,
val origin: List<String>? = null, val origin: List<String>? = null,
val target: List<String?>? = null, val target: List<String?>? = null,
val user: List<String?>? = null,
val action: List<String?>? = null, val action: List<String?>? = null,
) { ) {
public companion object { public companion object {
@ -29,7 +28,6 @@ public fun <T> Flow<MagixMessage<T>>.filter(filter: MagixMessageFilter): Flow<Ma
&& filter.origin?.contains(message.origin) ?: true && filter.origin?.contains(message.origin) ?: true
&& filter.origin?.contains(message.origin) ?: true && filter.origin?.contains(message.origin) ?: true
&& filter.target?.contains(message.target) ?: true && filter.target?.contains(message.target) ?: true
&& filter.user?.contains(message.user) ?: true
&& filter.action?.contains(message.action) ?: true && filter.action?.contains(message.action) ?: true
} }
} }

View File

@ -1,5 +1,6 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -17,12 +18,13 @@ public interface MagixProcessor {
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag. * If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/ */
public class MagixConverter( public class MagixConverter(
public val filter: MagixMessageFilter, private val scope: CoroutineScope,
public val outputFormat: String, private val filter: MagixMessageFilter,
public val newOrigin: String? = null, private val outputFormat: String,
public val transformer: suspend (JsonElement) -> JsonElement, private val newOrigin: String? = null,
private val transformer: suspend (JsonElement) -> JsonElement,
) : MagixProcessor { ) : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch { override fun process(endpoint: MagixEndpoint): Job = scope.launch {
endpoint.subscribe(filter).onEach { message -> endpoint.subscribe(filter).onEach { message ->
val newPayload = transformer(message.payload) val newPayload = transformer(message.payload)
val transformed = message.copy( val transformed = message.copy(
@ -32,5 +34,6 @@ public class MagixConverter(
) )
endpoint.broadcast(transformed) endpoint.broadcast(transformed)
}.launchIn(this) }.launchIn(this)
//TODO add catch logic here
} }
} }

View File

@ -54,7 +54,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMa
// bi-directional connection // bi-directional connection
requestChannel { input: Flow<Payload> -> requestChannel { input: Flow<Payload> ->
input.onEach { input.onEach {
magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer,it.data.readText())) magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText()))
}.launchIn(this@magixAcceptor) }.launchIn(this@magixAcceptor)
magixFlow.map { message -> magixFlow.map { message ->

View File

@ -6,8 +6,8 @@ import io.ktor.network.sockets.aSocket
import io.ktor.util.KtorExperimentalAPI import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.clientTransport import io.rsocket.kotlin.transport.ktor.clientTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.coroutineContext
/** /**
@ -15,7 +15,6 @@ import kotlinx.coroutines.Dispatchers
*/ */
@OptIn(KtorExperimentalAPI::class) @OptIn(KtorExperimentalAPI::class)
public suspend fun RSocketMagixEndpoint.Companion.withTcp( public suspend fun RSocketMagixEndpoint.Companion.withTcp(
scope: CoroutineScope,
host: String, host: String,
port: Int, port: Int,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
@ -24,5 +23,5 @@ public suspend fun RSocketMagixEndpoint.Companion.withTcp(
val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig)
val rSocket = buildConnector(rSocketConfig).connect(transport) val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(scope, rSocket) return RSocketMagixEndpoint(coroutineContext, rSocket)
} }