diff --git a/dataforge-device-core/build.gradle.kts b/dataforge-device-core/build.gradle.kts index 6ad2390..58f616c 100644 --- a/dataforge-device-core/build.gradle.kts +++ b/dataforge-device-core/build.gradle.kts @@ -8,7 +8,9 @@ val ktorVersion: String by rootProject.extra kscience { useCoroutines() - useSerialization() + useSerialization{ + json() + } } kotlin { diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index 6a80602..5b670cd 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -37,19 +37,19 @@ public class DeviceController( override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { if (value == null) return scope.launch { - val change = DeviceMessage.ok { - this.sourceName = deviceTarget - this.action = PROPERTY_CHANGED_ACTION - this.key = propertyName - this.value = value - } + val change = DeviceMessage( + sourceName = deviceTarget, + action = PROPERTY_CHANGED_ACTION, + key = propertyName, + value = value, + ) val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) outputChannel.send(envelope) } } - public fun recieving(): Flow = outputChannel.consumeAsFlow() + public fun receiving(): Flow = outputChannel.consumeAsFlow() @DFExperimental override fun consume(message: Envelope) { @@ -70,7 +70,7 @@ public class DeviceController( val target = request.meta["target"].string return try { if (request.data == null) { - respondMessage(device, deviceTarget, DeviceMessage.wrap(request.meta)).wrap() + respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope() } else if (target != null && target != deviceTarget) { error("Wrong target name $deviceTarget expected but $target found") } else { @@ -85,7 +85,7 @@ public class DeviceController( } else error("Device does not support binary response") } } catch (ex: Exception) { - DeviceMessage.fail(cause = ex).wrap() + DeviceMessage.fail(ex).toEnvelope() } } @@ -93,58 +93,59 @@ public class DeviceController( device: Device, deviceTarget: String, request: DeviceMessage, - ): DeviceMessage { - return try { - DeviceMessage.ok { - targetName = request.sourceName - sourceName = deviceTarget - action = "response.${request.action}" - val requestKey = request.key - val requestValue = request.value - - when (val action = request.action) { - GET_PROPERTY_ACTION -> { - key = requestKey - value = device.getProperty(requestKey ?: error("Key field is not defined in request")) - } - SET_PROPERTY_ACTION -> { - require(requestKey != null) { "Key field is not defined in request" } - if (requestValue == null) { - device.invalidateProperty(requestKey) - } else { - device.setProperty(requestKey, requestValue) - } - key = requestKey - value = device.getProperty(requestKey) - } - EXECUTE_ACTION -> { - require(requestKey != null) { "Key field is not defined in request" } - key = requestKey - value = device.execute(requestKey, requestValue) - - } - PROPERTY_LIST_ACTION -> { - value = Meta { - device.propertyDescriptors.map { descriptor -> - descriptor.name put descriptor.config - } - }.asMetaItem() - } - ACTION_LIST_ACTION -> { - value = Meta { - device.actionDescriptors.map { descriptor -> - descriptor.name put descriptor.config - } - }.asMetaItem() - } - else -> { - error("Unrecognized action $action") - } - } + ): DeviceMessage = try { + val requestKey = request.key + val requestValue = request.value + var key: String? = null + var value: MetaItem<*>? = null + when (val action = request.action) { + GET_PROPERTY_ACTION -> { + key = requestKey + value = device.getProperty(requestKey ?: error("Key field is not defined in request")) + } + SET_PROPERTY_ACTION -> { + require(requestKey != null) { "Key field is not defined in request" } + if (requestValue == null) { + device.invalidateProperty(requestKey) + } else { + device.setProperty(requestKey, requestValue) + } + key = requestKey + value = device.getProperty(requestKey) + } + EXECUTE_ACTION -> { + require(requestKey != null) { "Key field is not defined in request" } + key = requestKey + value = device.execute(requestKey, requestValue) + + } + PROPERTY_LIST_ACTION -> { + value = Meta { + device.propertyDescriptors.map { descriptor -> + descriptor.name put descriptor.config + } + }.asMetaItem() + } + ACTION_LIST_ACTION -> { + value = Meta { + device.actionDescriptors.map { descriptor -> + descriptor.name put descriptor.config + } + }.asMetaItem() + } + else -> { + error("Unrecognized action $action") } - } catch (ex: Exception) { - DeviceMessage.fail(request, cause = ex) } + DeviceMessage( + targetName = request.sourceName, + sourceName = deviceTarget, + action = "response.${request.action}", + key = key, + value = value + ) + } catch (ex: Exception) { + DeviceMessage.fail(ex, request.action).respondsTo(request) } } } @@ -156,6 +157,6 @@ public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessa val device = this[targetName] ?: error("The device with name $targetName not found in $this") DeviceController.respondMessage(device, targetName.toString(), request) } catch (ex: Exception) { - DeviceMessage.fail(request, cause = ex) + DeviceMessage.fail(ex, request.action).respondsTo(request) } } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt index 082b904..1af7bcd 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt @@ -4,21 +4,22 @@ import hep.dataforge.io.SimpleEnvelope import hep.dataforge.meta.* import hep.dataforge.names.Name import hep.dataforge.names.asName -import kotlinx.serialization.KSerializer -import kotlinx.serialization.descriptors.SerialDescriptor -import kotlinx.serialization.encoding.Decoder -import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.decodeFromJsonElement +import kotlinx.serialization.json.encodeToJsonElement -public class DeviceMessage : Scheme() { - public var action: String by string { error("Action not defined") } - public var status: String by string(default = OK_STATUS) - public var sourceName: String? by string() - public var targetName: String? by string() - public var comment: String? by string() - public var key: String? by string() - public var value: MetaItem<*>? by item() - - public companion object : SchemeSpec(::DeviceMessage), KSerializer { +@Serializable +public data class DeviceMessage( + public val action: String, + public val status: String = OK_STATUS, + public val sourceName: String? = null, + public val targetName: String? = null, + public val comment: String? = null, + public val key: String? = null, + public val value: MetaItem<*>? = null, +) { + public companion object { public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName() public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName() public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName() @@ -29,42 +30,32 @@ public class DeviceMessage : Scheme() { public const val FAIL_STATUS: String = "FAIL" public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChanged" - public inline fun ok( - request: DeviceMessage? = null, - block: DeviceMessage.() -> Unit = {}, - ): DeviceMessage = DeviceMessage { - targetName = request?.sourceName - }.apply(block) - - public inline fun fail( - request: DeviceMessage? = null, - cause: Throwable? = null, - block: DeviceMessage.() -> Unit = {}, - ): DeviceMessage = DeviceMessage { - targetName = request?.sourceName - status = FAIL_STATUS - if (cause != null) { - configure { - set("error.type", cause::class.simpleName) - set("error.message", cause.message) - //set("error.trace", ex.stackTraceToString()) - } - comment = cause.message - } - }.apply(block) - - - override val descriptor: SerialDescriptor = MetaSerializer.descriptor - - override fun deserialize(decoder: Decoder): DeviceMessage { - val meta = MetaSerializer.deserialize(decoder) - return wrap(meta) + private fun Throwable.toMeta(): Meta = Meta { + "type" put this::class.simpleName + "message" put message + "trace" put stackTraceToString() } - override fun serialize(encoder: Encoder, value: DeviceMessage) { - MetaSerializer.serialize(encoder, value.toMeta()) - } + public fun fail( + cause: Throwable, + action: String = "undefined", + ): DeviceMessage = DeviceMessage( + action = action, + status = FAIL_STATUS, + value = cause.toMeta().asMetaItem() + ) + + public fun fromMeta(meta: Meta): DeviceMessage = Json.decodeFromJsonElement(meta.toJson()) } } -public fun DeviceMessage.wrap(): SimpleEnvelope = SimpleEnvelope(this.config, null) + +public fun DeviceMessage.ok(): DeviceMessage = + copy(status = DeviceMessage.OK_STATUS) + +public fun DeviceMessage.respondsTo(request: DeviceMessage): DeviceMessage = + copy(sourceName = request.targetName, targetName = request.sourceName) + +public fun DeviceMessage.toMeta(): JsonMeta = Json.encodeToJsonElement(this).toMetaItem().node!! + +public fun DeviceMessage.toEnvelope(): SimpleEnvelope = SimpleEnvelope(toMeta(), null) diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt index 755562e..c61d43a 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt @@ -6,7 +6,10 @@ import hep.dataforge.control.api.get import hep.dataforge.io.Consumer import hep.dataforge.io.Envelope import hep.dataforge.io.Responder -import hep.dataforge.meta.* +import hep.dataforge.meta.DFExperimental +import hep.dataforge.meta.MetaItem +import hep.dataforge.meta.get +import hep.dataforge.meta.string import hep.dataforge.names.Name import hep.dataforge.names.NameToken import hep.dataforge.names.toName @@ -35,7 +38,7 @@ public class HubController( private val packJob = scope.launch { while (isActive) { val message = messageOutbox.receive() - envelopeOutbox.send(message.wrap()) + envelopeOutbox.send(message.toEnvelope()) } } @@ -44,12 +47,12 @@ public class HubController( override fun propertyChanged(propertyName: String, value: MetaItem<*>?) { if (value == null) return scope.launch { - val change = DeviceMessage.ok { - sourceName = name.toString() - action = DeviceMessage.PROPERTY_CHANGED_ACTION - this.key = propertyName - this.value = value - } + val change = DeviceMessage( + sourceName = name.toString(), + action = DeviceMessage.PROPERTY_CHANGED_ACTION, + key = propertyName, + value = value + ) messageOutbox.send(change) } @@ -64,23 +67,19 @@ public class HubController( val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") DeviceController.respondMessage(device, targetName.toString(), message) } catch (ex: Exception) { - DeviceMessage.fail { - comment = ex.message - } + DeviceMessage.fail(ex, message.action).respondsTo(message) } override suspend fun respond(request: Envelope): Envelope = try { val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") if (request.data == null) { - DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.wrap(request.meta)).wrap() + DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta)).toEnvelope() } else { DeviceController.respond(device, targetName.toString(), request) } } catch (ex: Exception) { - DeviceMessage.fail { - comment = ex.message - }.wrap() + DeviceMessage.fail(ex).toEnvelope() } override fun consume(message: Envelope) { diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt index 552aa0a..2b86058 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/conversions.kt @@ -1,6 +1,7 @@ package hep.dataforge.control.server import hep.dataforge.control.controllers.DeviceMessage +import hep.dataforge.control.controllers.toMeta import hep.dataforge.io.* import hep.dataforge.meta.MetaSerializer import io.ktor.application.ApplicationCall @@ -34,10 +35,10 @@ public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json) } -public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { - respondMessage(DeviceMessage(builder)) -} - -public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { - respondMessage(DeviceMessage.fail(null, block = builder)) -} \ No newline at end of file +//public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { +// respondMessage(DeviceMessage(builder)) +//} +// +//public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { +// respondMessage(DeviceMessage.fail(null, block = builder)) +//} \ No newline at end of file diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index f84de29..f9561a1 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -12,7 +12,6 @@ import hep.dataforge.control.controllers.respondMessage import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta import hep.dataforge.meta.toMetaItem -import hep.dataforge.meta.wrap import io.ktor.application.* import io.ktor.features.CORS import io.ktor.features.StatusPages @@ -189,7 +188,7 @@ public fun Application.deviceModule( ?: throw IllegalArgumentException("The body is not a json object") val meta = json.toMeta() - val request = DeviceMessage.wrap(meta) + val request = DeviceMessage.fromMeta(meta) val response = manager.respondMessage(request) call.respondMessage(response) @@ -202,12 +201,12 @@ public fun Application.deviceModule( get("get") { val target: String by call.parameters val property: String by call.parameters - val request = DeviceMessage { - action = GET_PROPERTY_ACTION - sourceName = WEB_SERVER_TARGET - this.targetName = target - key = property - } + val request = DeviceMessage( + action = GET_PROPERTY_ACTION, + sourceName = WEB_SERVER_TARGET, + targetName = target, + key = property, + ) val response = manager.respondMessage(request) call.respondMessage(response) @@ -218,14 +217,13 @@ public fun Application.deviceModule( val body = call.receiveText() val json = Json.parseToJsonElement(body) - val request = DeviceMessage { - action = SET_PROPERTY_ACTION - sourceName = WEB_SERVER_TARGET - this.targetName = target - key = property + val request = DeviceMessage( + action = SET_PROPERTY_ACTION, + sourceName = WEB_SERVER_TARGET, + targetName = target, + key = property, value = json.toMetaItem() - - } + ) val response = manager.respondMessage(request) call.respondMessage(response) diff --git a/dataforge-magix-client/build.gradle.kts b/dataforge-magix-client/build.gradle.kts index 1518204..beefe3a 100644 --- a/dataforge-magix-client/build.gradle.kts +++ b/dataforge-magix-client/build.gradle.kts @@ -5,28 +5,12 @@ plugins { val ktorVersion: String by rootProject.extra -repositories{ - maven("https://maven.pkg.github.com/altavir/ktor-client-sse") -} - kotlin { sourceSets { commonMain { dependencies { + implementation(project(":magix:magix-service")) implementation(project(":dataforge-device-core")) - implementation(project(":dataforge-device-tcp")) - implementation("io.ktor:ktor-client-core:$ktorVersion") - implementation("ru.mipt.npm:ktor-client-sse:0.1.0") - } - } - jvmMain { - dependencies { - - } - } - jsMain { - dependencies { - } } } diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt deleted file mode 100644 index 18ecf36..0000000 --- a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/MagixClient.kt +++ /dev/null @@ -1,104 +0,0 @@ -package hep.dataforge.control.client - -import hep.dataforge.control.controllers.DeviceManager -import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.control.controllers.respondMessage -import hep.dataforge.meta.toJson -import hep.dataforge.meta.toMeta -import hep.dataforge.meta.wrap -import io.ktor.client.HttpClient -import io.ktor.client.request.post -import io.ktor.http.ContentType -import io.ktor.http.Url -import io.ktor.http.contentType -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.launch -import kotlinx.serialization.json.* -import ru.mipt.npm.ktor.sse.readSse -import kotlin.coroutines.CoroutineContext - - -/* -{ - "id":"string|number[optional, but desired]", - "parentId": "string|number[optional]", - "target":"string[optional]", - "origin":"string[required]", - "user":"string[optional]", - "action":"string[optional, default='heartbeat']", - "payload":"object[optional]" -} - */ - -/** - * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) - */ -public class MagixClient( - private val manager: DeviceManager, - private val postUrl: Url, - private val sseUrl: Url, - //private val inbox: Flow -) : CoroutineScope { - - override val coroutineContext: CoroutineContext = - manager.context.coroutineContext + Job(manager.context.coroutineContext[Job]) - - private val client = HttpClient() - - protected fun generateId(message: DeviceMessage, requestId: String?): String = if (requestId != null) { - "$requestId.response" - } else { - "df[${message.hashCode()}" - } - - private fun send(json: JsonObject) { - launch { - client.post(postUrl) { - this.contentType(ContentType.Application.Json) - body = json.toString() - } - } - } - - private fun wrapMessage(message: DeviceMessage, requestId: String? = null): JsonObject = buildJsonObject { - put("id", generateId(message, requestId)) - if (requestId != null) { - put("parentId", requestId) - } - put("target", "magix") - put("origin", "df") - put("payload", message.config.toJson()) - } - - - private val listenJob = launch { - manager.controller.messageOutput().collect { message -> - val json = wrapMessage(message) - send(json) - } - } - - private val respondJob = launch { - client.readSse(sseUrl.toString()) { - val json = Json.parseToJsonElement(it.data) as JsonObject - - val requestId = json["id"]?.jsonPrimitive?.content - val payload = json["payload"]?.jsonObject - //TODO analyze action - - if (payload != null) { - val meta = payload.toMeta() - val request = DeviceMessage.wrap(meta) - val response = manager.respondMessage(request) - send(wrapMessage(response, requestId)) - } else { - TODO("process heartbeat and other system messages") - } - } - } -} - - - diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt new file mode 100644 index 0000000..5baed0c --- /dev/null +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/TangoPayload.kt @@ -0,0 +1,14 @@ +package hep.dataforge.control.client + +public data class TangoPayload( + val host: String, + val device: String, + val name: String, + val value: Any? = null, + val timestamp: Long? = null, + val quality: String = "VALID", + val event: String? = null, + val input: Any? = null, + val output: Any? = null, + val errors: Iterable?, +) \ No newline at end of file diff --git a/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt new file mode 100644 index 0000000..8815c5c --- /dev/null +++ b/dataforge-magix-client/src/commonMain/kotlin/hep/dataforge/control/client/magixClient.kt @@ -0,0 +1,54 @@ +package hep.dataforge.control.client + +import hep.dataforge.control.controllers.DeviceManager +import hep.dataforge.control.controllers.DeviceMessage +import hep.dataforge.control.controllers.respondMessage +import hep.dataforge.magix.api.MagixEndpoint +import hep.dataforge.magix.api.MagixMessage +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch + + +public const val DATAFORGE_FORMAT: String = "dataforge" + +private fun generateId(request: MagixMessage): String = if (request.id != null) { + "${request.id}.response" +} else { + "df[${request.payload.hashCode()}" +} + +/** + * Communicate with server in [Magix format](https://github.com/waltz-controls/rfc/tree/master/1) + */ +public fun DeviceManager.launchMagixClient( + endpoint: MagixEndpoint, + endpointID: String = "dataforge", +): Job = context.launch { + endpoint.subscribe(DeviceMessage.serializer()).onEach { request -> + //TODO analyze action + + val responsePayload = respondMessage(request.payload) + val response = MagixMessage( + format = DATAFORGE_FORMAT, + id = generateId(request), + parentId = request.id, + origin = endpointID, + payload = responsePayload + ) + endpoint.broadcast(DeviceMessage.serializer(), response) + }.launchIn(endpoint.scope) + + controller.messageOutput().onEach { payload -> + MagixMessage( + format = DATAFORGE_FORMAT, + id = "df[${payload.hashCode()}]", + origin = endpointID, + payload = payload + ) + }.launchIn(endpoint.scope) +} + + + diff --git a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt index 16af886..42aa8a3 100644 --- a/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt +++ b/magix/magix-api/src/commonMain/kotlin/hep/dataforge/magix/api/MagixEndpoint.kt @@ -15,7 +15,7 @@ public interface MagixEndpoint { /** * Subscribe to a [Flow] of messages using specific [payloadSerializer] */ - public suspend fun subscribe( + public fun subscribe( payloadSerializer: KSerializer, filter: MagixMessageFilter = MagixMessageFilter.ALL, ): Flow> @@ -36,7 +36,7 @@ public interface MagixEndpoint { } } -public suspend fun MagixEndpoint.subscribe( +public fun MagixEndpoint.subscribe( filter: MagixMessageFilter = MagixMessageFilter.ALL, ): Flow> = subscribe(JsonElement.serializer()) diff --git a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt index f7cafdb..05b7cbf 100644 --- a/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt +++ b/magix/magix-service/src/commonMain/kotlin/hep/dataforge/magix/service/RSocketMagixEndpoint.kt @@ -24,7 +24,7 @@ public class RSocketMagixEndpoint( public val rSocket: RSocket, ) : MagixEndpoint { - override suspend fun subscribe( + override fun subscribe( payloadSerializer: KSerializer, filter: MagixMessageFilter, ): Flow> { diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt index 733398d..ed4a53f 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt @@ -142,6 +142,7 @@ class PiMotionMasterDevice( /** * Send a synchronous request and receive a list of lines as a response */ + @OptIn(ExperimentalCoroutinesApi::class) private suspend fun request(command: String, vararg arguments: String): List = mutex.withLock { try { withTimeout(timeoutValue) {