Remove CoroutineScope from Endpoint interface

This commit is contained in:
Alexander Nozik 2020-11-09 19:22:34 +03:00
parent 78ee05371b
commit 356f3e15a6
10 changed files with 58 additions and 41 deletions

View File

@ -6,7 +6,7 @@ plugins {
val dataforgeVersion: String by extra("0.2.0-dev-4") val dataforgeVersion: String by extra("0.2.0-dev-4")
val ktorVersion: String by extra("1.4.1") val ktorVersion: String by extra("1.4.1")
val rsocketVersion by extra("0.11.0-SNAPSHOT") val rsocketVersion by extra("0.11.1")
allprojects { allprojects {
repositories { repositories {
@ -17,7 +17,6 @@ allprojects {
maven("https://maven.pkg.github.com/altavir/kotlin-logging/") maven("https://maven.pkg.github.com/altavir/kotlin-logging/")
maven("https://dl.bintray.com/rsocket-admin/RSocket") maven("https://dl.bintray.com/rsocket-admin/RSocket")
maven("https://maven.pkg.github.com/altavir/ktor-client-sse") maven("https://maven.pkg.github.com/altavir/ktor-client-sse")
maven("https://oss.jfrog.org/oss-snapshot-local")
} }
group = "hep.dataforge" group = "hep.dataforge"

View File

@ -1,14 +1,24 @@
package hep.dataforge.control.client package hep.dataforge.control.client
public data class TangoPayload( public sealed class TangoPayload(
val host: String, val host: String,
val device: String, val device: String,
val name: String, val name: String,
val value: Any? = null,
val timestamp: Long? = null, val timestamp: Long? = null,
val quality: String = "VALID", val quality: String = "VALID",
val event: String? = null, val event: String? = null,
val input: Any? = null, // val input: Any? = null,
val output: Any? = null, // val output: Any? = null,
val errors: Iterable<Any?>?, // val errors: Iterable<Any?>?,
) )
public class TangoAttributePayload(
host: String,
device: String,
name: String,
val value: Any? = null,
timestamp: Long? = null,
quality: String = "VALID",
event: String? = null,
errors: Iterable<Any?>?,
) : TangoPayload(host, device, name, timestamp, quality, event)

View File

@ -5,6 +5,7 @@ import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.magix.api.MagixEndpoint import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixMessage import hep.dataforge.magix.api.MagixMessage
import hep.dataforge.magix.api.MagixProcessor
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -38,7 +39,7 @@ public fun DeviceManager.launchMagixClient(
payload = responsePayload payload = responsePayload
) )
endpoint.broadcast(DeviceMessage.serializer(), response) endpoint.broadcast(DeviceMessage.serializer(), response)
}.launchIn(endpoint.scope) }.launchIn(this)
controller.messageOutput().onEach { payload -> controller.messageOutput().onEach { payload ->
MagixMessage( MagixMessage(
@ -47,8 +48,12 @@ public fun DeviceManager.launchMagixClient(
origin = endpointID, origin = endpointID,
payload = payload payload = payload
) )
}.launchIn(endpoint.scope) }.launchIn(this)
}
public fun DeviceManager.asMagixProcessor(endpointID: String = "dataforge"): MagixProcessor = object : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = launchMagixClient(endpoint, endpointID)
} }

View File

@ -1,6 +1,5 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -10,8 +9,6 @@ import kotlinx.serialization.json.JsonElement
* Inwards API of magix endpoint used to build services * Inwards API of magix endpoint used to build services
*/ */
public interface MagixEndpoint { public interface MagixEndpoint {
public val scope: CoroutineScope
/** /**
* Subscribe to a [Flow] of messages using specific [payloadSerializer] * Subscribe to a [Flow] of messages using specific [payloadSerializer]
*/ */
@ -32,13 +29,16 @@ public interface MagixEndpoint {
public companion object { public companion object {
public const val DEFAULT_MAGIX_WS_PORT: Int = 7777 public const val DEFAULT_MAGIX_WS_PORT: Int = 7777
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778 public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
public val magixJson: Json = Json public val magixJson: Json = Json{
ignoreUnknownKeys = true
encodeDefaults = false
}
} }
} }
public fun MagixEndpoint.subscribe( public fun MagixEndpoint.subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL, filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer()) ): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer(),filter)
public suspend fun MagixEndpoint.broadcast(message: MagixMessage<JsonElement>): Unit = public suspend fun MagixEndpoint.broadcast(message: MagixMessage<JsonElement>): Unit =
broadcast(JsonElement.serializer(), message) broadcast(JsonElement.serializer(), message)

View File

@ -1,6 +1,7 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonElement
/** /**
* *
@ -26,6 +27,6 @@ public data class MagixMessage<T>(
val target: String? = null, val target: String? = null,
val id: String? = null, val id: String? = null,
val parentId: String? = null, val parentId: String? = null,
val user: String? = null, val user: JsonElement? = null,
val action: String? = null val action: String? = null
) )

View File

@ -9,7 +9,6 @@ public data class MagixMessageFilter(
val format: List<String>? = null, val format: List<String>? = null,
val origin: List<String>? = null, val origin: List<String>? = null,
val target: List<String?>? = null, val target: List<String?>? = null,
val user: List<String?>? = null,
val action: List<String?>? = null, val action: List<String?>? = null,
) { ) {
public companion object { public companion object {
@ -29,7 +28,6 @@ public fun <T> Flow<MagixMessage<T>>.filter(filter: MagixMessageFilter): Flow<Ma
&& filter.origin?.contains(message.origin) ?: true && filter.origin?.contains(message.origin) ?: true
&& filter.origin?.contains(message.origin) ?: true && filter.origin?.contains(message.origin) ?: true
&& filter.target?.contains(message.target) ?: true && filter.target?.contains(message.target) ?: true
&& filter.user?.contains(message.user) ?: true
&& filter.action?.contains(message.action) ?: true && filter.action?.contains(message.action) ?: true
} }
} }

View File

@ -1,5 +1,6 @@
package hep.dataforge.magix.api package hep.dataforge.magix.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
@ -17,12 +18,13 @@ public interface MagixProcessor {
* If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag. * If [newOrigin] is not null, it is used as a replacement for old [MagixMessage.origin] tag.
*/ */
public class MagixConverter( public class MagixConverter(
public val filter: MagixMessageFilter, private val scope: CoroutineScope,
public val outputFormat: String, private val filter: MagixMessageFilter,
public val newOrigin: String? = null, private val outputFormat: String,
public val transformer: suspend (JsonElement) -> JsonElement, private val newOrigin: String? = null,
private val transformer: suspend (JsonElement) -> JsonElement,
) : MagixProcessor { ) : MagixProcessor {
override fun process(endpoint: MagixEndpoint): Job = endpoint.scope.launch { override fun process(endpoint: MagixEndpoint): Job = scope.launch {
endpoint.subscribe(filter).onEach { message -> endpoint.subscribe(filter).onEach { message ->
val newPayload = transformer(message.payload) val newPayload = transformer(message.payload)
val transformed = message.copy( val transformed = message.copy(
@ -32,5 +34,6 @@ public class MagixConverter(
) )
endpoint.broadcast(transformed) endpoint.broadcast(transformed)
}.launchIn(this) }.launchIn(this)
//TODO add catch logic here
} }
} }

View File

@ -20,6 +20,8 @@ import io.ktor.websocket.WebSockets
import io.rsocket.kotlin.ConnectionAcceptor import io.rsocket.kotlin.ConnectionAcceptor
import io.rsocket.kotlin.RSocketRequestHandler import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.server.RSocketSupport import io.rsocket.kotlin.transport.ktor.server.RSocketSupport
import io.rsocket.kotlin.transport.ktor.server.rSocket import io.rsocket.kotlin.transport.ktor.server.rSocket
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -42,7 +44,7 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMa
val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText()) val filter = magixJson.decodeFromString(MagixMessageFilter.serializer(), request.data.readText())
magixFlow.filter(filter).map { message -> magixFlow.filter(filter).map { message ->
val string = magixJson.encodeToString(genericMessageSerializer, message) val string = magixJson.encodeToString(genericMessageSerializer, message)
Payload(string) buildPayload { data(string) }
} }
} }
fireAndForget { request: Payload -> fireAndForget { request: Payload ->
@ -52,12 +54,12 @@ internal fun CoroutineScope.magixAcceptor(magixFlow: MutableSharedFlow<GenericMa
// bi-directional connection // bi-directional connection
requestChannel { input: Flow<Payload> -> requestChannel { input: Flow<Payload> ->
input.onEach { input.onEach {
magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer,it.data.readText())) magixFlow.emit(magixJson.decodeFromString(genericMessageSerializer, it.data.readText()))
}.launchIn(this@magixAcceptor) }.launchIn(this@magixAcceptor)
magixFlow.map { message -> magixFlow.map { message ->
val string = magixJson.encodeToString(genericMessageSerializer, message) val string = magixJson.encodeToString(genericMessageSerializer, message)
Payload(string) buildPayload { data(string) }
} }
} }
} }

View File

@ -9,18 +9,20 @@ import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.RSocket import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.core.RSocketConnector import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.payload.Payload import io.rsocket.kotlin.payload.buildPayload
import io.rsocket.kotlin.payload.data
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString import kotlinx.serialization.encodeToString
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
public class RSocketMagixEndpoint( public class RSocketMagixEndpoint(
override val scope: CoroutineScope, val coroutineContext: CoroutineContext,
public val rSocket: RSocket, public val rSocket: RSocket,
) : MagixEndpoint { ) : MagixEndpoint {
@ -29,15 +31,15 @@ public class RSocketMagixEndpoint(
filter: MagixMessageFilter, filter: MagixMessageFilter,
): Flow<MagixMessage<T>> { ): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer) val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(MagixEndpoint.magixJson.encodeToString(filter)) val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(filter)) }
val flow = rSocket.requestStream(payload) val flow = rSocket.requestStream(payload)
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) } return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) }
} }
override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) { override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
scope.launch { withContext(coroutineContext) {
val serializer = MagixMessage.serializer(payloadSerializer) val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(MagixEndpoint.magixJson.encodeToString(serializer, message)) val payload = buildPayload { data(MagixEndpoint.magixJson.encodeToString(serializer, message)) }
rSocket.fireAndForget(payload) rSocket.fireAndForget(payload)
} }
} }
@ -52,7 +54,6 @@ public class RSocketMagixEndpoint(
@OptIn(KtorExperimentalAPI::class) @OptIn(KtorExperimentalAPI::class)
public suspend fun withWebSockets( public suspend fun withWebSockets(
scope: CoroutineScope,
host: String, host: String,
port: Int, port: Int,
path: String = "/rsocket", path: String = "/rsocket",
@ -72,8 +73,7 @@ public class RSocketMagixEndpoint(
client.close() client.close()
} }
return RSocketMagixEndpoint(scope, rSocket) return RSocketMagixEndpoint(coroutineContext, rSocket)
} }
} }
} }

View File

@ -6,8 +6,8 @@ import io.ktor.network.sockets.aSocket
import io.ktor.util.KtorExperimentalAPI import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.core.RSocketConnectorBuilder import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.clientTransport import io.rsocket.kotlin.transport.ktor.clientTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlin.coroutines.coroutineContext
/** /**
@ -15,7 +15,6 @@ import kotlinx.coroutines.Dispatchers
*/ */
@OptIn(KtorExperimentalAPI::class) @OptIn(KtorExperimentalAPI::class)
public suspend fun RSocketMagixEndpoint.Companion.withTcp( public suspend fun RSocketMagixEndpoint.Companion.withTcp(
scope: CoroutineScope,
host: String, host: String,
port: Int, port: Int,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
@ -24,5 +23,5 @@ public suspend fun RSocketMagixEndpoint.Companion.withTcp(
val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig) val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig)
val rSocket = buildConnector(rSocketConfig).connect(transport) val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(scope, rSocket) return RSocketMagixEndpoint(coroutineContext, rSocket)
} }