RSocket and demo device name refactor

This commit is contained in:
Alexander Nozik 2021-06-22 17:37:22 +03:00
parent 3cbabd5d4b
commit 28e6e24cf7
18 changed files with 326 additions and 267 deletions

View File

@ -18,6 +18,91 @@ import space.kscience.dataforge.misc.DFExperimental
@DFExperimental
public data class LogEntry(val content: String, val priority: Int = 0)
@OptIn(ExperimentalCoroutinesApi::class)
private open class BasicReadOnlyDeviceProperty(
val device: DeviceBase,
override val name: String,
default: MetaItem?,
override val descriptor: PropertyDescriptor,
private val getter: suspend (before: MetaItem?) -> MetaItem,
) : ReadOnlyDeviceProperty {
override val scope: CoroutineScope get() = device.scope
private val state: MutableStateFlow<MetaItem?> = MutableStateFlow(default)
override val value: MetaItem? get() = state.value
override suspend fun invalidate() {
state.value = null
}
override fun updateLogical(item: MetaItem) {
state.value = item
scope.launch {
device.sharedPropertyFlow.emit(Pair(name, item))
}
}
override suspend fun read(force: Boolean): MetaItem {
//backup current value
val currentValue = value
return if (force || currentValue == null) {
//all device operations should be run on device context
//propagate error, but do not fail scope
val res = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
getter(currentValue)
}
updateLogical(res)
res
} else {
currentValue
}
}
override fun flow(): StateFlow<MetaItem?> = state
}
@OptIn(ExperimentalCoroutinesApi::class)
private class BasicDeviceProperty(
device: DeviceBase,
name: String,
default: MetaItem?,
descriptor: PropertyDescriptor,
getter: suspend (MetaItem?) -> MetaItem,
private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
) : BasicReadOnlyDeviceProperty(device, name, default, descriptor, getter), DeviceProperty {
override var value: MetaItem?
get() = super.value
set(value) {
scope.launch {
if (value == null) {
invalidate()
} else {
write(value)
}
}
}
private val writeLock = Mutex()
override suspend fun write(item: MetaItem) {
writeLock.withLock {
//fast return if value is not changed
if (item == value) return@withLock
val oldValue = value
//all device operations should be run on device context
withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
setter(oldValue, item)?.let {
updateLogical(it)
}
}
}
}
}
/**
* Baseline implementation of [Device] interface
*/
@ -33,7 +118,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
private val _actions = HashMap<String, DeviceAction>()
public val actions: Map<String, DeviceAction> get() = _actions
private val sharedPropertyFlow = MutableSharedFlow<Pair<String, MetaItem>>()
internal val sharedPropertyFlow = MutableSharedFlow<Pair<String, MetaItem>>()
override val propertyFlow: SharedFlow<Pair<String, MetaItem>> get() = sharedPropertyFlow
@ -82,49 +167,6 @@ public abstract class DeviceBase(override val context: Context) : Device {
override suspend fun execute(action: String, argument: MetaItem?): MetaItem? =
(_actions[action] ?: error("Request with name $action not defined")).invoke(argument)
@OptIn(ExperimentalCoroutinesApi::class)
private open inner class BasicReadOnlyDeviceProperty(
override val name: String,
default: MetaItem?,
override val descriptor: PropertyDescriptor,
private val getter: suspend (before: MetaItem?) -> MetaItem,
) : ReadOnlyDeviceProperty {
override val scope: CoroutineScope get() = this@DeviceBase.scope
private val state: MutableStateFlow<MetaItem?> = MutableStateFlow(default)
override val value: MetaItem? get() = state.value
override suspend fun invalidate() {
state.value = null
}
override fun updateLogical(item: MetaItem) {
state.value = item
scope.launch {
sharedPropertyFlow.emit(Pair(name, item))
}
}
override suspend fun read(force: Boolean): MetaItem {
//backup current value
val currentValue = value
return if (force || currentValue == null) {
//all device operations should be run on device context
//propagate error, but do not fail scope
val res = withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
getter(currentValue)
}
updateLogical(res)
res
} else {
currentValue
}
}
override fun flow(): StateFlow<MetaItem?> = state
}
/**
* Create a bound read-only property with given [getter]
*/
@ -135,6 +177,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
getter: suspend (MetaItem?) -> MetaItem,
): ReadOnlyDeviceProperty {
val property = BasicReadOnlyDeviceProperty(
this,
name,
default,
PropertyDescriptor(name).apply(descriptorBuilder),
@ -144,43 +187,6 @@ public abstract class DeviceBase(override val context: Context) : Device {
return property
}
@OptIn(ExperimentalCoroutinesApi::class)
private inner class BasicDeviceProperty(
name: String,
default: MetaItem?,
descriptor: PropertyDescriptor,
getter: suspend (MetaItem?) -> MetaItem,
private val setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
) : BasicReadOnlyDeviceProperty(name, default, descriptor, getter), DeviceProperty {
override var value: MetaItem?
get() = super.value
set(value) {
scope.launch {
if (value == null) {
invalidate()
} else {
write(value)
}
}
}
private val writeLock = Mutex()
override suspend fun write(item: MetaItem) {
writeLock.withLock {
//fast return if value is not changed
if (item == value) return@withLock
val oldValue = value
//all device operations should be run on device context
withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
setter(oldValue, item)?.let {
updateLogical(it)
}
}
}
}
}
/**
* Create a bound mutable property with given [getter] and [setter]
@ -193,6 +199,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
setter: suspend (oldValue: MetaItem?, newValue: MetaItem) -> MetaItem?,
): DeviceProperty {
val property = BasicDeviceProperty(
this,
name,
default,
PropertyDescriptor(name).apply(descriptorBuilder),

View File

@ -40,18 +40,16 @@ public class DeviceManager(override val deviceName: String = "") : AbstractPlugi
}
}
public interface DeviceFactory<D : Device> : Factory<D>
public interface DeviceSpec<D : Device> : Factory<D>
public val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager)
public fun <D : Device> DeviceManager.install(name: String, factory: DeviceFactory<D>, meta: Meta = Meta.EMPTY): D {
public fun <D : Device> DeviceManager.install(name: String, factory: DeviceSpec<D>, meta: Meta = Meta.EMPTY): D {
val device = factory(meta, context)
registerDevice(NameToken(name), device)
return device
}
public fun <D : Device> DeviceManager.installing(
factory: DeviceFactory<D>,
factory: DeviceSpec<D>,
metaBuilder: MetaBuilder.() -> Unit = {},
): ReadOnlyProperty<Any?, D> = ReadOnlyProperty { _, property ->
val name = property.name

View File

@ -3,12 +3,18 @@ plugins {
`maven-publish`
}
description = """
A stand-alone device tree web server which also works as magix event dispatcher.
The server is used to work with stand-alone devices without intermediate control system.
""".trimIndent()
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String = "1.5.3"
dependencies{
dependencies {
implementation(project(":controls-core"))
implementation(project(":controls-tcp"))
implementation(projects.magix.magixServer)
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion")
implementation("io.ktor:ktor-serialization:$ktorVersion")

View File

@ -4,13 +4,11 @@ import io.ktor.application.ApplicationCall
import io.ktor.http.ContentType
import io.ktor.http.cio.websocket.Frame
import io.ktor.response.respondText
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.buildJsonObject
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.toMeta
import ru.mipt.npm.magix.api.MagixEndpoint
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.MetaSerializer
internal fun Frame.toEnvelope(): Envelope {
@ -29,6 +27,7 @@ internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -
respondText(json.toString(), contentType = ContentType.Application.Json)
}
public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json)
}
public suspend fun ApplicationCall.respondMessage(message: DeviceMessage): Unit = respondText(
MagixEndpoint.magixJson.encodeToString(DeviceMessage.serializer(), message),
contentType = ContentType.Application.Json
)

View File

@ -1,4 +1,3 @@
package ru.mipt.npm.controls.server
@ -20,9 +19,9 @@ import io.ktor.server.engine.embeddedServer
import io.ktor.util.getValue
import io.ktor.websocket.WebSockets
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.html.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.put
import ru.mipt.npm.controls.api.DeviceMessage
@ -31,8 +30,11 @@ import ru.mipt.npm.controls.api.PropertySetMessage
import ru.mipt.npm.controls.api.getOrNull
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.respondMessage
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.server.GenericMagixMessage
import ru.mipt.npm.magix.server.magixModule
import ru.mipt.npm.magix.server.rawMagixServerSocket
import space.kscience.dataforge.meta.toJson
import space.kscience.dataforge.meta.toMeta
import space.kscience.dataforge.meta.toMetaItem
/**
@ -40,7 +42,7 @@ import space.kscience.dataforge.meta.toMetaItem
*/
public fun CoroutineScope.startDeviceServer(
manager: DeviceManager,
port: Int = 8111,
port: Int = MagixEndpoint.DEFAULT_MAGIX_HTTP_PORT,
host: String = "localhost",
): ApplicationEngine {
@ -54,7 +56,7 @@ public fun CoroutineScope.startDeviceServer(
call.respond(HttpStatusCode.BadRequest, cause.message ?: "")
}
}
deviceModule(manager)
deviceManagerModule(manager)
routing {
get("/") {
call.respondRedirect("/dashboard")
@ -70,22 +72,13 @@ public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
public const val WEB_SERVER_TARGET: String = "@webServer"
public fun Application.deviceModule(
public fun Application.deviceManagerModule(
manager: DeviceManager,
deviceNames: Collection<String> = manager.devices.keys.map { it.toString() },
route: String = "/",
rawSocketPort: Int = MagixEndpoint.DEFAULT_MAGIX_RAW_PORT,
buffer: Int = 100,
) {
// val controllers = deviceNames.associateWith { name ->
// val device = manager.devices[name.toName()]
// DeviceController(device, name, manager.context)
// }
//
// fun generateFlow(target: String?) = if (target == null) {
// controllers.values.asFlow().flatMapMerge { it.output() }
// } else {
// controllers[target]?.output() ?: error("The device with target $target not found")
// }
if (featureOrNull(WebSockets) == null) {
install(WebSockets)
}
@ -159,32 +152,11 @@ public fun Application.deviceModule(
}
}
}
// //Check if application supports websockets and if it does add a push channel
// if (this.application.featureOrNull(WebSockets) != null) {
// webSocket("ws") {
// //subscribe on device
// val target: String? by call.request.queryParameters
//
// try {
// application.log.debug("Opened server socket for ${call.request.queryParameters}")
//
// manager.controller.envelopeOutput().collect {
// outgoing.send(it.toFrame())
// }
//
// } catch (ex: Exception) {
// application.log.debug("Closed server socket for ${call.request.queryParameters}")
// }
// }
// }
post("message") {
val body = call.receiveText()
val json = Json.parseToJsonElement(body) as? JsonObject
?: throw IllegalArgumentException("The body is not a json object")
val meta = json.toMeta()
val request = DeviceMessage.fromMeta(meta)
val request: DeviceMessage = MagixEndpoint.magixJson.decodeFromString(DeviceMessage.serializer(), body)
val response = manager.respondMessage(request)
call.respondMessage(response)
@ -226,4 +198,12 @@ public fun Application.deviceModule(
}
}
}
val magixFlow = MutableSharedFlow<GenericMagixMessage>(
buffer,
extraBufferCapacity = buffer
)
rawMagixServerSocket(magixFlow, rawSocketPort)
magixModule(magixFlow)
}

View File

@ -13,9 +13,11 @@ repositories{
}
dependencies{
implementation(project(":controls-core"))
implementation(project(":controls-server"))
implementation(project(":controls-magix-client"))
implementation(projects.controlsCore)
//implementation(projects.controlsServer)
implementation(projects.magix.magixServer)
implementation(projects.controlsMagixClient)
implementation(projects.magix.magixRsocket)
implementation("no.tornado:tornadofx:1.7.20")
implementation("space.kscience:plotlykt-server:0.4.2")
implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6")

View File

@ -6,10 +6,14 @@ import javafx.scene.control.Slider
import javafx.scene.layout.Priority
import javafx.stage.Stage
import kotlinx.coroutines.launch
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.context.info
import space.kscience.dataforge.context.logger
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.client.launchDfMagix
import ru.mipt.npm.controls.controllers.DeviceManager
import ru.mipt.npm.controls.controllers.install
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
import ru.mipt.npm.magix.server.startMagixServer
import space.kscience.dataforge.context.*
import tornadofx.*
import java.awt.Desktop
import java.net.URI
@ -17,21 +21,32 @@ import java.net.URI
class DemoController : Controller(), ContextAware {
var device: DemoDevice? = null
var server: ApplicationEngine? = null
override val context = Global.buildContext("demoDevice")
var magixServer: ApplicationEngine? = null
var visualizer: ApplicationEngine? = null
override val context = Context("demoDevice") {
plugin(DeviceManager)
}
private val deviceManager = context.fetch(DeviceManager)
fun init() {
context.launch {
val demo = DemoDevice(context)
device = demo
server = startDemoDeviceServer(context, demo)
device = deviceManager.install("demo", DemoDevice)
//starting magix event loop
magixServer = startMagixServer()
//Launch device client and connect it to the server
deviceManager.launchDfMagix(MagixEndpoint.rSocketWithTcp("localhost", DeviceMessage.serializer()))
visualizer = startDemoDeviceServer()
}
}
fun shutdown() {
logger.info { "Shutting down..." }
server?.stop(1000, 5000)
visualizer?.stop(1000,5000)
logger.info { "Visualization server stopped" }
magixServer?.stop(1000, 5000)
logger.info { "Magix server stopped" }
device?.close()
logger.info { "Device server stopped" }
context.close()
@ -89,7 +104,7 @@ class DemoControllerView : View(title = " Demo controller remote") {
button("Show plots") {
useMaxWidth = true
action {
controller.server?.run {
controller.magixServer?.run {
val host = "localhost"//environment.connectors.first().host
val port = environment.connectors.first().port
val uri = URI("http", null, host, port, "/plots", null, null)

View File

@ -4,9 +4,9 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import ru.mipt.npm.controls.base.*
import ru.mipt.npm.controls.controllers.DeviceSpec
import ru.mipt.npm.controls.controllers.double
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.values.asValue
import java.time.Instant
@ -30,7 +30,7 @@ class DemoDevice(context: Context) : DeviceBase(context) {
val sinScale by writingVirtual(1.0.asValue())
var sinScaleValue by sinScale.double()
val sin by readingNumber {
val sin: TypedReadOnlyDeviceProperty<Number> by readingNumber {
val time = Instant.now()
sin(time.toEpochMilli().toDouble() / timeScaleValue) * sinScaleValue
}
@ -67,7 +67,7 @@ class DemoDevice(context: Context) : DeviceBase(context) {
executor.shutdown()
}
companion object : Factory<DemoDevice> {
companion object : DeviceSpec<DemoDevice> {
override fun invoke(meta: Meta, context: Context): DemoDevice = DemoDevice(context)
}
}

View File

@ -1,16 +1,18 @@
package ru.mipt.npm.controls.demo
import io.ktor.server.cio.CIO
import io.ktor.server.engine.ApplicationEngine
import io.ktor.server.engine.embeddedServer
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.html.div
import kotlinx.html.link
import ru.mipt.npm.controls.controllers.devices
import ru.mipt.npm.controls.server.startDeviceServer
import ru.mipt.npm.controls.server.whenStarted
import space.kscience.dataforge.context.Context
import ru.mipt.npm.controls.api.DeviceMessage
import ru.mipt.npm.controls.api.PropertyChangedMessage
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
import space.kscience.dataforge.meta.MetaItem
import space.kscience.dataforge.meta.double
import space.kscience.dataforge.names.NameToken
import space.kscience.plotly.layout
import space.kscience.plotly.models.Trace
import space.kscience.plotly.plot
@ -49,16 +51,26 @@ suspend fun Trace.updateXYFrom(flow: Flow<Iterable<Pair<Double, Double>>>) {
}
fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngine {
context.devices.registerDevice(NameToken("demo"), device)
val server = context.startDeviceServer(context.devices)
server.whenStarted {
suspend fun startDemoDeviceServer(magixHost: String = "localhost"): ApplicationEngine = embeddedServer(CIO, 8080) {
val sinFlow = MutableSharedFlow<MetaItem?>()// = device.sin.flow()
val cosFlow = MutableSharedFlow<MetaItem?>()// = device.cos.flow()
launch {
val endpoint = MagixEndpoint.rSocketWithWebSockets(magixHost, DeviceMessage.serializer())
endpoint.subscribe().collect { magix ->
(magix.payload as? PropertyChangedMessage)?.let { message ->
when (message.property) {
"sin" -> sinFlow.emit(message.value)
"cos" -> cosFlow.emit(message.value)
}
}
}
}
plotlyModule("plots").apply {
updateMode = PlotlyUpdateMode.PUSH
updateInterval = 50
}.page { container ->
val sinFlow = device.sin.flow()
val cosFlow = device.cos.flow()
val sinCosFlow = sinFlow.zip(cosFlow) { sin, cos ->
sin.double!! to cos.double!!
}
@ -77,7 +89,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
yaxis.title = "sin"
}
trace {
context.launch {
launch {
val flow: Flow<Iterable<Double>> = sinFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
@ -92,7 +104,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
yaxis.title = "cos"
}
trace {
context.launch {
launch {
val flow: Flow<Iterable<Double>> = cosFlow.mapNotNull { it.double }.windowed(100)
updateFrom(Trace.Y_AXIS, flow)
}
@ -110,7 +122,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
}
trace {
name = "non-synchronized"
context.launch {
launch {
val flow: Flow<Iterable<Pair<Double, Double>>> = sinCosFlow.windowed(30)
updateXYFrom(flow)
}
@ -119,7 +131,5 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
}
}
}
}
return server
}

View File

@ -27,8 +27,16 @@ public interface MagixEndpoint<T> {
)
public companion object {
public const val DEFAULT_MAGIX_WS_PORT: Int = 7777
/**
* A default port for HTTP/WS connections
*/
public const val DEFAULT_MAGIX_HTTP_PORT: Int = 7777
/**
* A default port for raw TCP connections
*/
public const val DEFAULT_MAGIX_RAW_PORT: Int = 7778
public val magixJson: Json = Json {
ignoreUnknownKeys = true
encodeDefaults = false

View File

@ -16,10 +16,23 @@ public interface MagixClient<T> {
Flow.Publisher<MagixMessage<T>> subscribe();
/**
* Create a magix endpoint client using RSocket with raw tcp connection
* @param host host name of magix server event loop
* @param port port of magix server event loop
* @return the client
*/
static MagixClient<JsonElement> rSocketTcp(String host, int port) {
return ControlsMagixClient.Companion.rSocketTcp(host, port, JsonElement.Companion.serializer());
}
/**
*
* @param host host name of magix server event loop
* @param port port of magix server event loop
* @param path
* @return
*/
static MagixClient<JsonElement> rSocketWs(String host, int port, String path) {
return ControlsMagixClient.Companion.rSocketWs(host, port, JsonElement.Companion.serializer(), path);
}

View File

@ -6,11 +6,11 @@ import kotlinx.serialization.KSerializer
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.rsocket.RSocketMagixEndpoint
import ru.mipt.npm.magix.rsocket.withTcp
import ru.mipt.npm.magix.rsocket.rSocketWithTcp
import ru.mipt.npm.magix.rsocket.rSocketWithWebSockets
import java.util.concurrent.Flow
public class ControlsMagixClient<T>(
internal class ControlsMagixClient<T>(
private val endpoint: MagixEndpoint<T>,
private val filter: MagixMessageFilter,
) : MagixClient<T> {
@ -21,27 +21,27 @@ public class ControlsMagixClient<T>(
override fun subscribe(): Flow.Publisher<MagixMessage<T>> = endpoint.subscribe(filter).asPublisher()
public companion object {
companion object {
public fun <T> rSocketTcp(
fun <T> rSocketTcp(
host: String,
port: Int,
payloadSerializer: KSerializer<T>
): ControlsMagixClient<T> {
val endpoint = runBlocking {
RSocketMagixEndpoint.withTcp(host, port, payloadSerializer)
MagixEndpoint.rSocketWithTcp(host, payloadSerializer, port)
}
return ControlsMagixClient(endpoint, MagixMessageFilter())
}
public fun <T> rSocketWs(
fun <T> rSocketWs(
host: String,
port: Int,
payloadSerializer: KSerializer<T>,
path: String = "/rsocket"
): ControlsMagixClient<T> {
val endpoint = runBlocking {
RSocketMagixEndpoint.withWebSockets(host, port, payloadSerializer, path)
MagixEndpoint.rSocketWithWebSockets(host, payloadSerializer, port, path)
}
return ControlsMagixClient(endpoint, MagixMessageFilter())
}

View File

@ -43,24 +43,26 @@ public class RSocketMagixEndpoint<T>(
}
}
public companion object {
public companion object
}
internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) =
internal fun buildConnector(rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit) =
RSocketConnector {
reconnectable(10)
connectionConfig(rSocketConfig)
}
/**
/**
* Build a websocket based endpoint connected to [host], [port] and given routing [path]
*/
public suspend fun <T> withWebSockets(
public suspend fun <T> MagixEndpoint.Companion.rSocketWithWebSockets(
host: String,
port: Int,
payloadSerializer: KSerializer<T>,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
path: String = "/rsocket",
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint<T> {
): RSocketMagixEndpoint<T> {
val client = HttpClient {
install(WebSockets)
install(RSocketSupport) {
@ -76,6 +78,4 @@ public class RSocketMagixEndpoint<T>(
}
return RSocketMagixEndpoint(coroutineContext, payloadSerializer, rSocket)
}
}
}

View File

@ -7,16 +7,17 @@ import io.rsocket.kotlin.core.RSocketConnectorBuilder
import io.rsocket.kotlin.transport.ktor.clientTransport
import kotlinx.coroutines.Dispatchers
import kotlinx.serialization.KSerializer
import ru.mipt.npm.magix.api.MagixEndpoint
import kotlin.coroutines.coroutineContext
/**
* Create a plain TCP based [RSocketMagixEndpoint] connected to [host] and [port]
*/
public suspend fun <T> RSocketMagixEndpoint.Companion.withTcp(
public suspend fun <T> MagixEndpoint.Companion.rSocketWithTcp(
host: String,
port: Int,
payloadSerializer: KSerializer<T>,
port: Int = DEFAULT_MAGIX_RAW_PORT,
tcpConfig: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
rSocketConfig: RSocketConnectorBuilder.ConnectionConfigBuilder.() -> Unit = {},
): RSocketMagixEndpoint<T> {

View File

@ -4,6 +4,10 @@ plugins {
application
}
description = """
A magix event loop implementation in Kotlin. Includes HTTP/SSE and RSocket routes.
""".trimIndent()
kscience {
useSerialization{
json()

View File

@ -9,12 +9,28 @@ import io.rsocket.kotlin.core.RSocketServer
import io.rsocket.kotlin.transport.ktor.serverTransport
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_HTTP_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_RAW_PORT
import ru.mipt.npm.magix.api.MagixEndpoint.Companion.DEFAULT_MAGIX_WS_PORT
/**
*
*/
public fun CoroutineScope.rawMagixServerSocket(
magixFlow: MutableSharedFlow<GenericMagixMessage>,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT
): Job {
val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort)
val rSocketJob = RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow))
coroutineContext[Job]?.invokeOnCompletion{
rSocketJob.cancel()
}
return rSocketJob;
}
public fun CoroutineScope.startMagixServer(
port: Int = DEFAULT_MAGIX_WS_PORT,
port: Int = DEFAULT_MAGIX_HTTP_PORT,
rawSocketPort: Int = DEFAULT_MAGIX_RAW_PORT,
buffer: Int = 100,
): ApplicationEngine {
@ -24,8 +40,7 @@ public fun CoroutineScope.startMagixServer(
extraBufferCapacity = buffer
)
val tcpTransport = aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().serverTransport(port = rawSocketPort)
RSocketServer().bind(tcpTransport, magixAcceptor(magixFlow))
rawMagixServerSocket(magixFlow, rawSocketPort)
return embeddedServer(CIO, port = port) {
magixModule(magixFlow)

View File

@ -3,9 +3,9 @@ package ru.mipt.npm.magix.zmq
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.withContext
import kotlinx.serialization.KSerializer
import kotlinx.serialization.encodeToString
import org.zeromq.SocketType
import org.zeromq.ZContext
import org.zeromq.ZMQ
@ -13,6 +13,7 @@ import org.zeromq.ZMQException
import ru.mipt.npm.magix.api.MagixEndpoint
import ru.mipt.npm.magix.api.MagixMessage
import ru.mipt.npm.magix.api.MagixMessageFilter
import ru.mipt.npm.magix.api.filter
import kotlin.coroutines.CoroutineContext
public class ZmqMagixEndpoint<T>(
@ -28,7 +29,7 @@ public class ZmqMagixEndpoint<T>(
val socket = zmqContext.createSocket(SocketType.XSUB)
socket.bind(address)
val topic = MagixEndpoint.magixJson.encodeToString(filter)
val topic = "magix"//MagixEndpoint.magixJson.encodeToString(filter)
socket.subscribe(topic)
return channelFlow {
@ -39,6 +40,7 @@ public class ZmqMagixEndpoint<T>(
}
while (activeFlag) {
try {
//This is a blocking call.
val string = socket.recvStr()
val message = MagixEndpoint.magixJson.decodeFromString(serializer, string)
send(message)
@ -51,7 +53,7 @@ public class ZmqMagixEndpoint<T>(
}
}
}
}
}.filter(filter).flowOn(Dispatchers.IO) //should be flown on IO because of blocking calls
}
private val publishSocket = zmqContext.createSocket(SocketType.XPUB).apply {

View File

@ -4,14 +4,13 @@ package ru.mipt.npm.devices.pimotionmaster
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.transformWhile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import ru.mipt.npm.controls.api.DeviceHub
import ru.mipt.npm.controls.api.PropertyDescriptor
import ru.mipt.npm.controls.base.*
import ru.mipt.npm.controls.controllers.DeviceFactory
import ru.mipt.npm.controls.controllers.DeviceSpec
import ru.mipt.npm.controls.controllers.duration
import ru.mipt.npm.controls.ports.*
import space.kscience.dataforge.context.*
@ -343,7 +342,7 @@ class PiMotionMasterDevice(
}
}
companion object : DeviceFactory<PiMotionMasterDevice> {
companion object : DeviceSpec<PiMotionMasterDevice> {
override fun invoke(meta: Meta, context: Context): PiMotionMasterDevice = PiMotionMasterDevice(context)
}