diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Socket.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Socket.kt new file mode 100644 index 0000000..36c3d85 --- /dev/null +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/api/Socket.kt @@ -0,0 +1,34 @@ +package hep.dataforge.control.api + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.launch +import kotlinx.io.Closeable + +/** + * A generic bi-directional sender/receiver object + */ +public interface Socket : Closeable { + /** + * Send an object to the socket + */ + public suspend fun send(data: T) + + /** + * Flow of objects received from socket + */ + public fun receiving(): Flow + public fun isOpen(): Boolean +} + +/** + * Connect an input to this socket using designated [scope] for it and return a handler [Job]. + * Multiple inputs could be connected to the same [Socket]. + */ +public fun Socket.connectInput(scope: CoroutineScope, flow: Flow): Job = scope.launch { + flow.collect { send(it) } +} + + diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt index 99964cc..fc00df8 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceController.kt @@ -41,12 +41,10 @@ public class DeviceController( if (value == null) return scope.launch { val change = DeviceMessage.ok { - this.source = deviceTarget - type = PROPERTY_CHANGED_ACTION - data { - name = propertyName - this.value = value - } + this.sourceName = deviceTarget + this.action = PROPERTY_CHANGED_ACTION + this.key = propertyName + this.value = value } val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY) @@ -98,62 +96,52 @@ public class DeviceController( request: DeviceMessage, ): DeviceMessage { return try { - val result: List = when (val action = request.type) { - GET_PROPERTY_ACTION -> { - request.data.map { property -> - MessageData { - name = property.name - value = device.getProperty(name) - } - } - } - SET_PROPERTY_ACTION -> { - request.data.map { property -> - val propertyName: String = property.name - val propertyValue = property.value - if (propertyValue == null) { - device.invalidateProperty(propertyName) - } else { - device.setProperty(propertyName, propertyValue) - } - MessageData { - name = propertyName - value = device.getProperty(propertyName) - } - } - } - EXECUTE_ACTION -> { - request.data.map { payload -> - MessageData { - name = payload.name - value = device.execute(payload.name, payload.value) - } - } - } - PROPERTY_LIST_ACTION -> { - device.propertyDescriptors.map { descriptor -> - MessageData { - name = descriptor.name - value = MetaItem.NodeItem(descriptor.config) - } - } - } - ACTION_LIST_ACTION -> { - device.actionDescriptors.map { descriptor -> - MessageData { - name = descriptor.name - value = MetaItem.NodeItem(descriptor.config) - } - } - } - else -> { - error("Unrecognized action $action") - } - } DeviceMessage.ok { - target = request.source - source = deviceTarget - data = result + targetName = request.sourceName + sourceName = deviceTarget + action ="response.${request.action}" + val requestKey = request.key + val requestValue = request.value + + when (val action = request.action) { + GET_PROPERTY_ACTION -> { + key = requestKey + value = device.getProperty(requestKey ?: error("Key field is not defined in request")) + } + SET_PROPERTY_ACTION -> { + require(requestKey != null) { "Key field is not defined in request" } + if (requestValue == null) { + device.invalidateProperty(requestKey) + } else { + device.setProperty(requestKey, requestValue) + } + key = requestKey + value = device.getProperty(requestKey) + } + EXECUTE_ACTION -> { + require(requestKey != null) { "Key field is not defined in request" } + key = requestKey + value = device.execute(requestKey, requestValue) + + } + PROPERTY_LIST_ACTION -> { + value = Meta { + device.propertyDescriptors.map { descriptor -> + descriptor.name put descriptor.config + } + }.asMetaItem() + } + ACTION_LIST_ACTION -> { + value = Meta { + device.actionDescriptors.map { descriptor -> + descriptor.name put descriptor.config + } + }.asMetaItem() + } + else -> { + error("Unrecognized action $action") + } + } } } catch (ex: Exception) { DeviceMessage.fail(request, cause = ex) @@ -165,7 +153,7 @@ public class DeviceController( public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage { return try { - val targetName = request.target?.toName() ?: Name.EMPTY + val targetName = request.targetName?.toName() ?: Name.EMPTY val device = this[targetName] ?: error("The device with name $targetName not found in $this") DeviceController.respondMessage(device, targetName.toString(), request) } catch (ex: Exception) { diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt index 6e26f05..c67a319 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/DeviceMessage.kt @@ -1,6 +1,5 @@ package hep.dataforge.control.controllers -import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION import hep.dataforge.io.SimpleEnvelope import hep.dataforge.meta.* import hep.dataforge.names.Name @@ -11,28 +10,20 @@ import kotlinx.serialization.encoding.Decoder import kotlinx.serialization.encoding.Encoder public class DeviceMessage : Scheme() { - public var source: String? by string(key = SOURCE_KEY) - public var target: String? by string(key = TARGET_KEY) - public var type: String by string(default = GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY) + public var action: String by string { error("Action not defined") } + public var status: String by string(default = RESPONSE_OK_STATUS) + public var sourceName: String? by string() + public var targetName: String? by string() public var comment: String? by string() - public var status: String by string(RESPONSE_OK_STATUS) - public var data: List - get() = config.getIndexed(MESSAGE_DATA_KEY).values.map { MessageData.wrap(it.node!!) } - set(value) { - config[MESSAGE_DATA_KEY] = value.map { it.config } - } - - /** - * Append a payload to this message according to the given scheme - */ - public fun append(spec: Specification, block: T.() -> Unit): T = - spec.invoke(block).also { config.append(MESSAGE_DATA_KEY, it) } + public var key: String? by string() + public var value: MetaItem<*>? by item() public companion object : SchemeSpec(::DeviceMessage), KSerializer { - public val SOURCE_KEY: Name = "source".asName() - public val TARGET_KEY: Name = "target".asName() - public val MESSAGE_TYPE_KEY: Name = "type".asName() - public val MESSAGE_DATA_KEY: Name = "data".asName() + public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName() + public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName() + public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName() + public val MESSAGE_KEY_KEY: Name = DeviceMessage::key.name.asName() + public val MESSAGE_VALUE_KEY: Name = DeviceMessage::value.name.asName() public const val RESPONSE_OK_STATUS: String = "response.OK" public const val RESPONSE_FAIL_STATUS: String = "response.FAIL" @@ -40,19 +31,19 @@ public class DeviceMessage : Scheme() { public inline fun ok( request: DeviceMessage? = null, - block: DeviceMessage.() -> Unit = {} + block: DeviceMessage.() -> Unit = {}, ): DeviceMessage = DeviceMessage { - target = request?.source + targetName = request?.sourceName }.apply(block) public inline fun fail( request: DeviceMessage? = null, cause: Throwable? = null, - block: DeviceMessage.() -> Unit = {} + block: DeviceMessage.() -> Unit = {}, ): DeviceMessage = DeviceMessage { - target = request?.source + targetName = request?.sourceName status = RESPONSE_FAIL_STATUS - if(cause!=null){ + if (cause != null) { configure { set("error.type", cause::class.simpleName) set("error.message", cause.message) @@ -76,16 +67,4 @@ public class DeviceMessage : Scheme() { } } -public class MessageData : Scheme() { - public var name: String by string { error("Property name could not be empty") } - public var value: MetaItem<*>? by item(key = DATA_VALUE_KEY) - - public companion object : SchemeSpec(::MessageData) { - public val DATA_VALUE_KEY: Name = "value".asName() - } -} - -@DFBuilder -public fun DeviceMessage.data(block: MessageData.() -> Unit): MessageData = append(MessageData, block) - public fun DeviceMessage.wrap(): SimpleEnvelope = SimpleEnvelope(this.config, null) diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt index e31d7ed..755562e 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/HubController.kt @@ -21,7 +21,7 @@ import kotlinx.coroutines.launch @OptIn(DFExperimental::class) public class HubController( public val hub: DeviceHub, - public val scope: CoroutineScope + public val scope: CoroutineScope, ) : Consumer, Responder { private val messageOutbox = Channel(Channel.CONFLATED) @@ -45,12 +45,10 @@ public class HubController( if (value == null) return scope.launch { val change = DeviceMessage.ok { - source = name.toString() - type = DeviceMessage.PROPERTY_CHANGED_ACTION - data { - this.name = propertyName - this.value = value - } + sourceName = name.toString() + action = DeviceMessage.PROPERTY_CHANGED_ACTION + this.key = propertyName + this.value = value } messageOutbox.send(change) @@ -62,7 +60,7 @@ public class HubController( } public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try { - val targetName = message.target?.toName() ?: Name.EMPTY + val targetName = message.targetName?.toName() ?: Name.EMPTY val device = hub[targetName] ?: error("The device with name $targetName not found in $hub") DeviceController.respondMessage(device, targetName.toString(), message) } catch (ex: Exception) { diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt index 1472f27..ac5b5d6 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/controllers/delegates.kt @@ -48,6 +48,9 @@ public fun DeviceProperty.int(): ReadWriteProperty = convert(MetaConv public fun ReadOnlyDeviceProperty.string(): ReadOnlyProperty = convert(MetaConverter.string) public fun DeviceProperty.string(): ReadWriteProperty = convert(MetaConverter.string) +public fun ReadOnlyDeviceProperty.boolean(): ReadOnlyProperty = convert(MetaConverter.boolean) +public fun DeviceProperty.boolean(): ReadWriteProperty = convert(MetaConverter.boolean) + //TODO to be moved to DF private object DurationConverter : MetaConverter { override fun itemToObject(item: MetaItem<*>): Duration = when (item) { diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt index e51f4e5..2887483 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -3,22 +3,21 @@ package hep.dataforge.control.ports import hep.dataforge.context.Context import hep.dataforge.context.ContextAware import hep.dataforge.context.Factory +import hep.dataforge.control.api.Socket import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.receiveAsFlow -import kotlinx.io.Closeable import kotlin.coroutines.CoroutineContext -public interface Port: Closeable, ContextAware { - public suspend fun send(data: ByteArray) - public suspend fun receiving(): Flow - public fun isOpen(): Boolean -} +public interface Port : ContextAware, Socket public typealias PortFactory = Factory -public abstract class AbstractPort(override val context: Context, coroutineContext: CoroutineContext = context.coroutineContext) : Port { +public abstract class AbstractPort( + override val context: Context, + coroutineContext: CoroutineContext = context.coroutineContext, +) : Port { protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job])) @@ -70,7 +69,7 @@ public abstract class AbstractPort(override val context: Context, coroutineConte * In order to form phrases some condition should used on top of it. * For example [delimitedIncoming] generates phrases with fixed delimiter. */ - override suspend fun receiving(): Flow { + override fun receiving(): Flow { return incoming.receiveAsFlow() } diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/PortProxy.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/PortProxy.kt index 91b474f..ee8c42d 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/PortProxy.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/PortProxy.kt @@ -43,7 +43,7 @@ public class PortProxy(override val context: Context = Global, public val factor } @OptIn(ExperimentalCoroutinesApi::class) - override suspend fun receiving(): Flow = channelFlow { + override fun receiving(): Flow = channelFlow { while (isActive) { try { //recreate port and Flow on cancel diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt index b0d14fb..2f21757 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/phrases.kt @@ -39,7 +39,7 @@ public fun Flow.withDelimiter(delimiter: ByteArray, expectedMessageSi * Transform byte fragments into utf-8 phrases using utf-8 delimiter */ public fun Flow.withDelimiter(delimiter: String, expectedMessageSize: Int = 32): Flow { - return withDelimiter(delimiter.encodeToByteArray()).map { it.decodeToString() } + return withDelimiter(delimiter.encodeToByteArray(),expectedMessageSize).map { it.decodeToString() } } /** diff --git a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt index 1ddb1f2..f84de29 100644 --- a/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt +++ b/dataforge-device-server/src/main/kotlin/hep/dataforge/control/server/deviceWebServer.kt @@ -8,7 +8,6 @@ import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY import hep.dataforge.control.controllers.DeviceController.Companion.SET_PROPERTY_ACTION import hep.dataforge.control.controllers.DeviceManager import hep.dataforge.control.controllers.DeviceMessage -import hep.dataforge.control.controllers.data import hep.dataforge.control.controllers.respondMessage import hep.dataforge.meta.toJson import hep.dataforge.meta.toMeta @@ -47,7 +46,7 @@ import kotlinx.serialization.json.put public fun CoroutineScope.startDeviceServer( manager: DeviceManager, port: Int = 8111, - host: String = "localhost" + host: String = "localhost", ): ApplicationEngine { return this.embeddedServer(CIO, port, host) { @@ -80,7 +79,7 @@ public const val WEB_SERVER_TARGET: String = "@webServer" public fun Application.deviceModule( manager: DeviceManager, deviceNames: Collection = manager.devices.keys.map { it.toString() }, - route: String = "/" + route: String = "/", ) { // val controllers = deviceNames.associateWith { name -> // val device = manager.devices[name.toName()] @@ -115,7 +114,8 @@ public fun Application.deviceModule( +"Device server dashboard" } deviceNames.forEach { deviceName -> - val device = manager[deviceName] ?: error("The device with name $deviceName not found in $manager") + val device = + manager[deviceName] ?: error("The device with name $deviceName not found in $manager") div { id = deviceName h2 { +deviceName } @@ -203,12 +203,10 @@ public fun Application.deviceModule( val target: String by call.parameters val property: String by call.parameters val request = DeviceMessage { - type = GET_PROPERTY_ACTION - source = WEB_SERVER_TARGET - this.target = target - data { - name = property - } + action = GET_PROPERTY_ACTION + sourceName = WEB_SERVER_TARGET + this.targetName = target + key = property } val response = manager.respondMessage(request) @@ -221,13 +219,12 @@ public fun Application.deviceModule( val json = Json.parseToJsonElement(body) val request = DeviceMessage { - type = SET_PROPERTY_ACTION - source = WEB_SERVER_TARGET - this.target = target - data { - name = property - value = json.toMetaItem() - } + action = SET_PROPERTY_ACTION + sourceName = WEB_SERVER_TARGET + this.targetName = target + key = property + value = json.toMetaItem() + } val response = manager.respondMessage(request) diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt index 9992d7a..c3f5ad9 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterDevice.kt @@ -4,6 +4,8 @@ import hep.dataforge.context.Context import hep.dataforge.control.api.DeviceHub import hep.dataforge.control.api.PropertyDescriptor import hep.dataforge.control.base.* +import hep.dataforge.control.controllers.boolean +import hep.dataforge.control.controllers.double import hep.dataforge.control.controllers.duration import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.PortProxy @@ -12,15 +14,12 @@ import hep.dataforge.control.ports.withDelimiter import hep.dataforge.meta.MetaItem import hep.dataforge.names.NameToken import hep.dataforge.values.Null -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.* import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.toList import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -import kotlinx.coroutines.withTimeout import kotlin.time.Duration @@ -48,7 +47,7 @@ public class PiMotionMasterDevice( private val mutex = Mutex() - private suspend fun dispatchError(errorCode: Int){ + private suspend fun dispatchError(errorCode: Int) { } @@ -62,7 +61,7 @@ public class PiMotionMasterDevice( connector.send(stringToSend) } - public suspend fun getErrorCode(): Int = mutex.withLock{ + public suspend fun getErrorCode(): Int = mutex.withLock { withTimeout(timeoutValue) { sendCommandInternal("ERR?") val errorString = connector.receiving().withDelimiter("\n").first() @@ -81,7 +80,7 @@ public class PiMotionMasterDevice( val phrases = connector.receiving().withDelimiter("\n") phrases.takeWhile { it.endsWith(" \n") }.toList() + phrases.first() } - } catch (ex: Throwable){ + } catch (ex: Throwable) { logger.warn { "Error during PIMotionMaster request. Requesting error code." } val errorCode = getErrorCode() dispatchError(errorCode) @@ -185,10 +184,25 @@ public class PiMotionMasterDevice( } ) + public val reference: ReadOnlyDeviceProperty by readingBoolean( + descriptorBuilder = { + info = "Get Referencing Result" + }, + getter = { + readAxisBoolean("FRF?") + } + ) + + val moveToReference by acting { + send("FRF", axisId) + } + public val position: DeviceProperty by axisNumberProperty("POS") { info = "The current axis position." } + var positionValue by position.double() + public val openLoopTarget: DeviceProperty by axisNumberProperty("OMA") { info = "Position for open-loop operation." } @@ -197,12 +211,18 @@ public class PiMotionMasterDevice( info = "Servo closed loop mode" } + var closedLoopValue by closedLoop.boolean() + public val velocity: DeviceProperty by axisNumberProperty("VEL") { info = "Velocity value for closed-loop operation" } - } override val devices: Map = axes.associate { NameToken(it) to Axis(it) } + /** + * + */ + val axes: Map get() = devices.mapKeys { it.toString() } + } \ No newline at end of file diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt index 695009e..6cd10d4 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/PiMotionMasterVirtualDevice.kt @@ -1,51 +1,60 @@ package ru.mipt.npm.devices.pimotionmaster import hep.dataforge.context.Context +import hep.dataforge.control.api.Socket import hep.dataforge.control.ports.AbstractPort -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job +import hep.dataforge.control.ports.withDelimiter +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect -import kotlinx.coroutines.flow.receiveAsFlow -import kotlinx.coroutines.launch +import kotlinx.coroutines.flow.* import kotlin.math.abs import kotlin.time.Duration -public abstract class VirtualDevice { - protected abstract val scope: CoroutineScope +abstract class VirtualDevice(val scope: CoroutineScope) : Socket { - public abstract suspend fun evaluateRequest(request: ByteArray) + protected abstract suspend fun evaluateRequest(request: ByteArray) - private val toSend = Channel(100) + protected open fun Flow.transformRequests(): Flow = this - public val responses: Flow get() = toSend.receiveAsFlow() + private val toReceive = Channel(100) + private val toRespond = Channel(100) - protected suspend fun send(response: ByteArray) { - toSend.send(response) + private val receiveJob: Job = toReceive.consumeAsFlow().onEach { + evaluateRequest(it) + }.catch { + it.printStackTrace() + }.launchIn(scope) + + + override suspend fun send(data: ByteArray) { + toReceive.send(data) } -// -// protected suspend fun respond(response: String){ -// respond(response.encodeToByteArray()) -// } + + protected suspend fun respond(response: ByteArray) { + toRespond.send(response) + } + + override fun receiving(): Flow = toRespond.receiveAsFlow() protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch { delay(delay) - send(response()) + respond(response()) } + + override fun isOpen(): Boolean = scope.isActive + + override fun close() = scope.cancel() } -public class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) { +class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) { + + private val respondJob = device.receiving().onEach(::receive).catch { + it.printStackTrace() + }.launchIn(scope) - private val respondJob = scope.launch { - device.responses.collect { - receive(it) - } - } override suspend fun write(data: ByteArray) { - device.evaluateRequest(data) + device.send(data) } override fun close() { @@ -55,13 +64,13 @@ public class VirtualPort(private val device: VirtualDevice, context: Context) : } -class PiMotionMasterVirtualDevice(override val scope: CoroutineScope, axisIds: List) : VirtualDevice() { +class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List) : VirtualDevice(scope) { init { //add asynchronous send logic here } - private val axisID = "0" + override fun Flow.transformRequests(): Flow = withDelimiter("\n".toByteArray()) private var errorCode: Int = 0 @@ -116,7 +125,7 @@ class PiMotionMasterVirtualDevice(override val scope: CoroutineScope, axisIds: L private fun respond(str: String) = scope.launch { - send((str + "\n").encodeToByteArray()) + respond((str + "\n").encodeToByteArray()) } private fun respondForAllAxis(axisIds: List, extract: VirtualAxisState.(index: String) -> Any) { diff --git a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt index 4d4306d..abf5206 100644 --- a/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt +++ b/motors/src/main/kotlin/ru/mipt/npm/devices/pimotionmaster/piDebugServer.kt @@ -28,21 +28,23 @@ fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = laun server.accept() } catch (ex: Exception) { server.close() + ex.printStackTrace() return@launch } - launch { - println("Socket accepted: ${socket.remoteAddress}") - try { + println("Socket accepted: ${socket.remoteAddress}") + supervisorScope { + socket.use { socket -> val input = socket.openReadChannel() val output = socket.openWriteChannel(autoFlush = true) val buffer = ByteBuffer.allocate(1024) launch { virtualPort.receiving().collect { - println("Sending: ${it.decodeToString()}") + //println("Sending: ${it.decodeToString()}") output.writeAvailable(it) + output.flush() } } while (isActive) { @@ -51,14 +53,10 @@ fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = laun if (read > 0) { buffer.flip() val array = buffer.moveToByteArray() - println("Received: ${array.decodeToString()}") + //println("Received: ${array.decodeToString()}") virtualPort.send(array) } } - } catch (ex: Exception) { - cancel() - } finally { - socket.close() } } } @@ -66,7 +64,7 @@ fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = laun fun main() { val port = 10024 - val virtualDevice = PiMotionMasterVirtualDevice(Global, listOf("1","2")) + val virtualDevice = PiMotionMasterVirtualDevice(Global, listOf("1", "2")) val virtualPort = VirtualPort(virtualDevice, Global) runBlocking(Dispatchers.Default) { val serverJob = launchPiDebugServer(port, virtualPort)