Rsocket tcp client

This commit is contained in:
Alexander Nozik 2020-11-04 14:49:57 +03:00
parent 5c90e8e07b
commit f3cfe9c6db
7 changed files with 112 additions and 98 deletions

View File

@ -24,7 +24,7 @@ public interface MagixEndpoint {
/** /**
* Send an event using specific [payloadSerializer] * Send an event using specific [payloadSerializer]
*/ */
public suspend fun <T> send( public suspend fun <T> broadcast(
payloadSerializer: KSerializer<T>, payloadSerializer: KSerializer<T>,
message: MagixMessage<T>, message: MagixMessage<T>,
) )
@ -40,5 +40,5 @@ public suspend fun MagixEndpoint.subscribe(
filter: MagixMessageFilter = MagixMessageFilter.ALL, filter: MagixMessageFilter = MagixMessageFilter.ALL,
): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer()) ): Flow<MagixMessage<JsonElement>> = subscribe(JsonElement.serializer())
public suspend fun MagixEndpoint.send(message: MagixMessage<JsonElement>): Unit = public suspend fun MagixEndpoint.broadcast(message: MagixMessage<JsonElement>): Unit =
send(JsonElement.serializer(), message) broadcast(JsonElement.serializer(), message)

View File

@ -30,7 +30,7 @@ public class MagixConverter(
format = outputFormat, format = outputFormat,
origin = newOrigin ?: message.origin origin = newOrigin ?: message.origin
) )
endpoint.send(transformed) endpoint.broadcast(transformed)
}.launchIn(this) }.launchIn(this)
} }
} }

View File

@ -27,8 +27,6 @@ import kotlinx.coroutines.flow.*
import kotlinx.html.* import kotlinx.html.*
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonElement
import ru.mipt.npm.ktor.sse.SseEvent
import ru.mipt.npm.ktor.sse.writeSseFlow
import java.util.* import java.util.*
public typealias GenericMagixMessage = MagixMessage<JsonElement> public typealias GenericMagixMessage = MagixMessage<JsonElement>

View File

@ -7,7 +7,7 @@ kscience {
useSerialization{ useSerialization{
json() json()
} }
useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API) useCoroutines("1.4.1", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
} }
val dataforgeVersion: String by rootProject.extra val dataforgeVersion: String by rootProject.extra

View File

@ -0,0 +1,79 @@
package hep.dataforge.magix.service
import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixMessage
import hep.dataforge.magix.api.MagixMessageFilter
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
public class RSocketMagixEndpoint(
override val scope: CoroutineScope,
public val rSocket: RSocket,
) : MagixEndpoint {
override suspend fun <T> subscribe(
payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter,
): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(MagixEndpoint.magixJson.encodeToString(filter))
val flow = rSocket.requestStream(payload)
return flow.map { MagixEndpoint.magixJson.decodeFromString(serializer, it.data.readText()) }
}
override suspend fun <T> broadcast(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
scope.launch {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(MagixEndpoint.magixJson.encodeToString(serializer, message))
rSocket.fireAndForget(payload)
}
}
public companion object {
internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) =
RSocketConnector {
reconnectable(10)
connectionConfig(rSocketConfig)
}
@OptIn(KtorExperimentalAPI::class)
public suspend fun withWebSockets(
scope: CoroutineScope,
host: String,
port: Int,
path: String = "/rsocket",
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
connector = buildConnector(rSocketConfig)
}
}
val rSocket = client.rSocket(host, port, path)
//Ensure client is closed after rSocket if finished
rSocket.job.invokeOnCompletion {
client.close()
}
return RSocketMagixEndpoint(scope, rSocket)
}
}
}

View File

@ -1,91 +0,0 @@
package hep.dataforge.magix.service
import hep.dataforge.magix.api.MagixEndpoint
import hep.dataforge.magix.api.MagixEndpoint.Companion.magixJson
import hep.dataforge.magix.api.MagixMessage
import hep.dataforge.magix.api.MagixMessageFilter
import io.ktor.client.HttpClient
import io.ktor.client.features.websocket.WebSockets
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.RSocketRequestHandler
import io.rsocket.kotlin.core.RSocketConnector
import io.rsocket.kotlin.keepalive.KeepAlive
import io.rsocket.kotlin.payload.Payload
import io.rsocket.kotlin.payload.PayloadMimeType
import io.rsocket.kotlin.transport.ktor.client.RSocketSupport
import io.rsocket.kotlin.transport.ktor.client.rSocket
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import kotlin.time.minutes
import kotlin.time.seconds
/**
* An RSocket endpoint which relies on WebSocket transport
*/
public class WebRScocketMagixEndpoint(
override val scope: CoroutineScope,
public val host: String,
public val port: Int,
public val path: String = "/rsocket",
) : MagixEndpoint {
//create ktor client
@OptIn(KtorExperimentalAPI::class)
private val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
connector = RSocketConnector {
reconnectable(10)
//configure rSocket connector (all values have defaults)
connectionConfig {
keepAlive = KeepAlive(
interval = 30.seconds,
maxLifetime = 2.minutes
)
// //payload for setup frame
// setupPayload { Payload("hello world") }
//mime types
payloadMimeType = PayloadMimeType(
data = "application/json",
metadata = "application/json"
)
}
//optional acceptor for server requests
acceptor {
RSocketRequestHandler {
requestResponse { it } //echo request payload
}
}
}
}
}
private val rSocket = scope.async {
client.rSocket(host, port, path)
}
override suspend fun <T> subscribe(
payloadSerializer: KSerializer<T>,
filter: MagixMessageFilter,
): Flow<MagixMessage<T>> {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(magixJson.encodeToString(filter))
val flow = rSocket.await().requestStream(payload)
return flow.map { magixJson.decodeFromString(serializer, it.data.readText()) }
}
override suspend fun <T> send(payloadSerializer: KSerializer<T>, message: MagixMessage<T>) {
scope.launch {
val serializer = MagixMessage.serializer(payloadSerializer)
val payload = Payload(magixJson.encodeToString(serializer, message))
rSocket.await().fireAndForget(payload)
}
}
}

View File

@ -0,0 +1,28 @@
package hep.dataforge.magix.service
import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.SocketOptions
import io.ktor.network.sockets.aSocket
import io.ktor.util.KtorExperimentalAPI
import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.clientTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
/**
* Create a plain TCP based [RSocketMagixEndpoint]
*/
@OptIn(KtorExperimentalAPI::class)
public suspend fun RSocketMagixEndpoint.Companion.withTcp(
scope: CoroutineScope,
host: String,
port: Int,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint {
val transport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().clientTransport(host, port, tcpConfig)
val rSocket = buildConnector(rSocketConfig).connect(transport)
return RSocketMagixEndpoint(scope, rSocket)
}