From c12f1ce1cdd7f1423fd33e6037447bd9c8285d51 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sat, 3 Aug 2024 21:11:59 +0300 Subject: [PATCH] Add lifecycle to ports. Suspended device start --- CHANGELOG.md | 3 + .../controls/constructor/DeviceGroup.kt | 6 +- .../controls/constructor/DeviceGroupTest.kt | 4 +- .../controls/api/AsynchronousSocket.kt | 12 +--- .../space/kscience/controls/api/Device.kt | 47 ++++----------- .../kscience/controls/api/DeviceMessage.kt | 2 +- .../kscience/controls/api/WithLifeCycle.kt | 59 +++++++++++++++++++ .../controls/ports/AsynchronousPort.kt | 11 ++-- .../controls/ports/SynchronousPort.kt | 18 +++--- .../kscience/controls/spec/DeviceBase.kt | 13 ++-- .../kscience/controls/spec/DeviceSpec.kt | 2 +- .../kscience/controls/ports/ChannelPort.kt | 18 +++--- .../kscience/controls/ports/UdpSocketPort.kt | 9 +-- .../controls/ports/AsynchronousPortIOTest.kt | 8 +-- .../kscience/controls/client/DeviceClient.kt | 4 +- .../controls/pi/AsynchronousPiPort.kt | 10 ++-- .../kscience/controls/pi/SynchronousPiPort.kt | 15 +++-- .../kscience/controls/ports/KtorTcpPort.kt | 16 ++--- .../kscience/controls/ports/KtorUdpPort.kt | 16 ++--- .../controls/serial/AsynchronousSerialPort.kt | 13 ++-- .../controls/serial/SynchronousSerialPort.kt | 16 ++--- .../sciprog/devices/mks/MksPdr900Device.kt | 4 +- .../pimotionmaster/PiMotionMasterDevice.kt | 4 +- .../PiMotionMasterVirtualDevice.kt | 17 +++--- 24 files changed, 185 insertions(+), 142 deletions(-) create mode 100644 controls-core/src/commonMain/kotlin/space/kscience/controls/api/WithLifeCycle.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index 29b1a5c..26cb25f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - `DeviceClient` properly evaluates lifecycle and logs - `PeerConnection` API for direct device-device binary sharing - DeviceDrawable2D intermediate visualization implementation +- New interface `WithLifeCycle`. Change Port API to adhere to it. ### Changed - Constructor properties return `DeviceState` in order to be able to subscribe to them @@ -16,6 +17,8 @@ - `DeviceClient` now initializes property and action descriptors eagerly. - `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices. - Add some utility methods to ports. Synchronous port response could be now consumed as `Source`. +- `DeviceLifecycleState` is replaced by `LifecycleState`. + ### Deprecated diff --git a/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceGroup.kt b/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceGroup.kt index a41899a..79ff2ed 100644 --- a/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceGroup.kt +++ b/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceGroup.kt @@ -3,7 +3,7 @@ package space.kscience.controls.constructor import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import space.kscience.controls.api.* -import space.kscience.controls.api.DeviceLifecycleState.* +import space.kscience.controls.api.LifecycleState.* import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.install import space.kscience.controls.spec.DevicePropertySpec @@ -165,11 +165,11 @@ public open class DeviceGroup( return action(argument) } - final override var lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STOPPED + final override var lifecycleState: LifecycleState = LifecycleState.STOPPED private set - private suspend fun setLifecycleState(lifecycleState: DeviceLifecycleState) { + private suspend fun setLifecycleState(lifecycleState: LifecycleState) { this.lifecycleState = lifecycleState sharedMessageFlow.emit( DeviceLifeCycleMessage(lifecycleState) diff --git a/controls-constructor/src/commonTest/kotlin/space/kscience/controls/constructor/DeviceGroupTest.kt b/controls-constructor/src/commonTest/kotlin/space/kscience/controls/constructor/DeviceGroupTest.kt index 40f00dc..aa46133 100644 --- a/controls-constructor/src/commonTest/kotlin/space/kscience/controls/constructor/DeviceGroupTest.kt +++ b/controls-constructor/src/commonTest/kotlin/space/kscience/controls/constructor/DeviceGroupTest.kt @@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.test.runTest import space.kscience.controls.api.Device import space.kscience.controls.api.DeviceLifeCycleMessage -import space.kscience.controls.api.DeviceLifecycleState +import space.kscience.controls.api.LifecycleState import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.install import space.kscience.controls.spec.doRecurring @@ -37,7 +37,7 @@ class DeviceGroupTest { } error("Error!") } - testDevice.messageFlow.first { it is DeviceLifeCycleMessage && it.state == DeviceLifecycleState.STOPPED } + testDevice.messageFlow.first { it is DeviceLifeCycleMessage && it.state == LifecycleState.STOPPED } println("stopped") } } \ No newline at end of file diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/AsynchronousSocket.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/AsynchronousSocket.kt index defefb2..7c56084 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/AsynchronousSocket.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/AsynchronousSocket.kt @@ -5,7 +5,7 @@ import kotlinx.coroutines.flow.Flow /** * A generic bidirectional asynchronous sender/receiver object */ -public interface AsynchronousSocket : AutoCloseable { +public interface AsynchronousSocket : WithLifeCycle { /** * Send an object to the socket */ @@ -15,16 +15,6 @@ public interface AsynchronousSocket : AutoCloseable { * Flow of objects received from socket */ public fun subscribe(): Flow - - /** - * Start socket operation - */ - public fun open() - - /** - * Check if this socket is open - */ - public val isOpen: Boolean } /** diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/Device.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/Device.kt index 240eaa0..1e941bb 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/Device.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/Device.kt @@ -4,7 +4,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.* -import kotlinx.serialization.Serializable import space.kscience.controls.api.Device.Companion.DEVICE_TARGET import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.info @@ -15,40 +14,13 @@ import space.kscience.dataforge.meta.string import space.kscience.dataforge.misc.DfType import space.kscience.dataforge.names.parseAsName -/** - * A lifecycle state of a device - */ -@Serializable -public enum class DeviceLifecycleState { - - /** - * Device is initializing - */ - STARTING, - - /** - * The Device is initialized and running - */ - STARTED, - - /** - * The Device is closed - */ - STOPPED, - - /** - * The device encountered irrecoverable error - */ - ERROR -} - /** * General interface describing a managed Device. * [Device] is a supervisor scope encompassing all operations on a device. * When canceled, cancels all running processes. */ @DfType(DEVICE_TARGET) -public interface Device : ContextAware, CoroutineScope { +public interface Device : ContextAware, WithLifeCycle, CoroutineScope { /** * Initial configuration meta for the device @@ -94,18 +66,16 @@ public interface Device : ContextAware, CoroutineScope { * Initialize the device. This function suspends until the device is finished initialization. * Does nothing if the device is started or is starting */ - public suspend fun start(): Unit = Unit + override suspend fun start(): Unit = Unit /** * Close and terminate the device. This function does not wait for the device to be closed. */ - public suspend fun stop() { + override suspend fun stop() { coroutineContext[Job]?.cancel("The device is closed") logger.info { "Device $this is closed" } } - public val lifecycleState: DeviceLifecycleState - public companion object { public const val DEVICE_TARGET: String = "device" } @@ -114,7 +84,7 @@ public interface Device : ContextAware, CoroutineScope { /** * Inner id of a device. Not necessary corresponds to the name in the parent container */ -public val Device.id: String get() = meta["id"].string?: "device[${hashCode().toString(16)}]" +public val Device.id: String get() = meta["id"].string ?: "device[${hashCode().toString(16)}]" /** * Device that caches properties values @@ -167,3 +137,12 @@ public fun Device.onPropertyChange( public fun Device.propertyMessageFlow(propertyName: String): Flow = messageFlow .filterIsInstance() .filter { it.property == propertyName } + +/** + * React on device lifecycle events + */ +public fun Device.onLifecycleEvent( + block: suspend (LifecycleState) -> Unit +): Job = messageFlow.filterIsInstance().onEach { + block(it.state) +}.launchIn(this) \ No newline at end of file diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt index 1aeabf6..e93fe7d 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/DeviceMessage.kt @@ -240,7 +240,7 @@ public data class DeviceErrorMessage( @Serializable @SerialName("lifecycle") public data class DeviceLifeCycleMessage( - val state: DeviceLifecycleState, + val state: LifecycleState, override val sourceDevice: Name = Name.EMPTY, override val targetDevice: Name? = null, override val comment: String? = null, diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/api/WithLifeCycle.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/WithLifeCycle.kt new file mode 100644 index 0000000..631a66d --- /dev/null +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/api/WithLifeCycle.kt @@ -0,0 +1,59 @@ +package space.kscience.controls.api + +import kotlinx.serialization.Serializable + +/** + * A lifecycle state of a device + */ +@Serializable +public enum class LifecycleState { + + /** + * Device is initializing + */ + STARTING, + + /** + * The Device is initialized and running + */ + STARTED, + + /** + * The Device is closed + */ + STOPPED, + + /** + * The device encountered irrecoverable error + */ + ERROR +} + + +/** + * An object that could be started or stopped functioning + */ +public interface WithLifeCycle { + + public suspend fun start() + + public suspend fun stop() + + public val lifecycleState: LifecycleState +} + +/** + * Bind this object lifecycle to a device lifecycle + * + * The starting and stopping are done in device scope + */ +public fun WithLifeCycle.bindToDeviceLifecycle(device: Device){ + device.onLifecycleEvent { + when(it){ + LifecycleState.STARTING -> start() + LifecycleState.STARTED -> {/*ignore*/} + LifecycleState.STOPPED -> stop() + LifecycleState.ERROR -> stop() + } + } +} \ No newline at end of file diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt index 9b37fd3..29502f9 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.io.Source import space.kscience.controls.api.AsynchronousSocket +import space.kscience.controls.api.LifecycleState import space.kscience.dataforge.context.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.get @@ -65,8 +66,8 @@ public abstract class AbstractAsynchronousPort( protected abstract fun onOpen() - final override fun open() { - if (!isOpen) { + final override suspend fun start() { + if (lifecycleState == LifecycleState.STOPPED) { sendJob = scope.launch { for (data in outgoing) { try { @@ -80,7 +81,7 @@ public abstract class AbstractAsynchronousPort( } onOpen() } else { - logger.warn { "$this already opened" } + logger.warn { "$this already started" } } } @@ -89,7 +90,7 @@ public abstract class AbstractAsynchronousPort( * Send a data packet via the port */ override suspend fun send(data: ByteArray) { - check(isOpen) { "The port is not opened" } + check(lifecycleState == LifecycleState.STARTED) { "The port is not opened" } outgoing.send(data) } @@ -100,7 +101,7 @@ public abstract class AbstractAsynchronousPort( */ override fun subscribe(): Flow = incoming.receiveAsFlow() - override fun close() { + override suspend fun stop() { outgoing.close() incoming.close() sendJob?.cancel() diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt index ac368c5..8427760 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt @@ -7,6 +7,8 @@ import kotlinx.coroutines.sync.withLock import kotlinx.io.Buffer import kotlinx.io.Source import kotlinx.io.readByteArray +import space.kscience.controls.api.LifecycleState +import space.kscience.controls.api.WithLifeCycle import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.ContextAware @@ -14,11 +16,7 @@ import space.kscience.dataforge.context.ContextAware * A port handler for synchronous (request-response) communication with a port. * Only one request could be active at a time (others are suspended). */ -public interface SynchronousPort : ContextAware, AutoCloseable { - - public fun open() - - public val isOpen: Boolean +public interface SynchronousPort : ContextAware, WithLifeCycle { /** * Send a single message and wait for the flow of response chunks. @@ -71,14 +69,14 @@ private class SynchronousOverAsynchronousPort( override val context: Context get() = port.context - override fun open() { - if (!port.isOpen) port.open() + override suspend fun start() { + if (port.lifecycleState == LifecycleState.STOPPED) port.start() } - override val isOpen: Boolean get() = port.isOpen + override val lifecycleState: LifecycleState get() = port.lifecycleState - override fun close() { - if (port.isOpen) port.close() + override suspend fun stop() { + if (port.lifecycleState == LifecycleState.STARTED) port.stop() } override suspend fun respond( diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt index 941eda9..a060f46 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceBase.kt @@ -72,7 +72,6 @@ public abstract class DeviceBase( onBufferOverflow = BufferOverflow.DROP_OLDEST ) - @OptIn(ExperimentalCoroutinesApi::class) override val coroutineContext: CoroutineContext = context.newCoroutineContext( SupervisorJob(context.coroutineContext[Job]) + CoroutineName("Device $id") + @@ -188,11 +187,11 @@ public abstract class DeviceBase( return spec.executeWithMeta(self, argument ?: Meta.EMPTY) } - final override var lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STOPPED + final override var lifecycleState: LifecycleState = LifecycleState.STOPPED private set - private suspend fun setLifecycleState(lifecycleState: DeviceLifecycleState) { + private suspend fun setLifecycleState(lifecycleState: LifecycleState) { this.lifecycleState = lifecycleState sharedMessageFlow.emit( DeviceLifeCycleMessage(lifecycleState) @@ -204,11 +203,11 @@ public abstract class DeviceBase( } final override suspend fun start() { - if (lifecycleState == DeviceLifecycleState.STOPPED) { + if (lifecycleState == LifecycleState.STOPPED) { super.start() - setLifecycleState(DeviceLifecycleState.STARTING) + setLifecycleState(LifecycleState.STARTING) onStart() - setLifecycleState(DeviceLifecycleState.STARTED) + setLifecycleState(LifecycleState.STARTED) } else { logger.debug { "Device $this is already started" } } @@ -220,7 +219,7 @@ public abstract class DeviceBase( final override suspend fun stop() { onStop() - setLifecycleState(DeviceLifecycleState.STOPPED) + setLifecycleState(LifecycleState.STOPPED) super.stop() } diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceSpec.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceSpec.kt index 503df83..c6f8b0a 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceSpec.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/spec/DeviceSpec.kt @@ -33,7 +33,7 @@ public abstract class DeviceSpec { public open suspend fun D.onOpen() { } - public open fun D.onClose() { + public open suspend fun D.onClose() { } diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt index 85c3d5c..e5430db 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt @@ -1,6 +1,7 @@ package space.kscience.controls.ports import kotlinx.coroutines.* +import space.kscience.controls.api.LifecycleState import space.kscience.dataforge.context.* import space.kscience.dataforge.meta.* import java.net.InetSocketAddress @@ -30,7 +31,7 @@ public class ChannelPort( meta: Meta, coroutineContext: CoroutineContext = context.coroutineContext, channelBuilder: suspend () -> ByteChannel, -) : AbstractAsynchronousPort(context, meta, coroutineContext), AutoCloseable { +) : AbstractAsynchronousPort(context, meta, coroutineContext) { /** * A handler to await port connection @@ -41,7 +42,8 @@ public class ChannelPort( private var listenerJob: Job? = null - override val isOpen: Boolean get() = listenerJob?.isActive == true + override val lifecycleState: LifecycleState + get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED override fun onOpen() { listenerJob = scope.launch(Dispatchers.IO) { @@ -71,12 +73,12 @@ public class ChannelPort( } @OptIn(ExperimentalCoroutinesApi::class) - override fun close() { + override suspend fun stop() { listenerJob?.cancel() if (futureChannel.isCompleted) { futureChannel.getCompleted().close() } - super.close() + super.stop() } } @@ -105,12 +107,12 @@ public object TcpPort : Factory { /** * Create and open TCP port */ - public fun open( + public suspend fun start( context: Context, host: String, port: Int, coroutineContext: CoroutineContext = context.coroutineContext, - ): ChannelPort = build(context, host, port, coroutineContext).apply { open() } + ): ChannelPort = build(context, host, port, coroutineContext).apply { start() } override fun build(context: Context, meta: Meta): ChannelPort { val host = meta["host"].string ?: "localhost" @@ -156,13 +158,13 @@ public object UdpPort : Factory { /** * Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages. */ - public fun open( + public suspend fun start( context: Context, remoteHost: String, remotePort: Int, localPort: Int? = null, localHost: String = "localhost", - ): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { open() } + ): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { start() } override fun build(context: Context, meta: Meta): ChannelPort { diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt index ae65c64..39d4c13 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/UdpSocketPort.kt @@ -1,6 +1,7 @@ package space.kscience.controls.ports import kotlinx.coroutines.* +import space.kscience.controls.api.LifecycleState import space.kscience.dataforge.context.Context import space.kscience.dataforge.meta.Meta import java.net.DatagramPacket @@ -39,13 +40,13 @@ public class UdpSocketPort( } } - override fun close() { + override suspend fun stop() { listenerJob?.cancel() - super.close() + super.stop() } - override val isOpen: Boolean get() = listenerJob?.isActive == true - + override val lifecycleState: LifecycleState + get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) { val packet = DatagramPacket( diff --git a/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/AsynchronousPortIOTest.kt b/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/AsynchronousPortIOTest.kt index b19cfad..6709f8c 100644 --- a/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/AsynchronousPortIOTest.kt +++ b/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/AsynchronousPortIOTest.kt @@ -29,8 +29,8 @@ internal class AsynchronousPortIOTest { @Test fun testUdpCommunication() = runTest { - val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812) - val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811) + val receiver = UdpPort.start(Global, "localhost", 8811, localPort = 8812) + val sender = UdpPort.start(Global, "localhost", 8812, localPort = 8811) delay(30) repeat(10) { @@ -44,7 +44,7 @@ internal class AsynchronousPortIOTest { .toList() assertEquals("Line number 3", res[3].trim()) - receiver.close() - sender.close() + receiver.stop() + sender.stop() } } \ No newline at end of file diff --git a/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt b/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt index f4a232e..045ee02 100644 --- a/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt +++ b/controls-magix/src/commonMain/kotlin/space/kscience/controls/client/DeviceClient.kt @@ -99,10 +99,10 @@ public class DeviceClient internal constructor( } private val lifecycleStateFlow = messageFlow.filterIsInstance() - .map { it.state }.stateIn(this, started = SharingStarted.Eagerly, DeviceLifecycleState.STARTED) + .map { it.state }.stateIn(this, started = SharingStarted.Eagerly, LifecycleState.STARTED) @DFExperimental - override val lifecycleState: DeviceLifecycleState get() = lifecycleStateFlow.value + override val lifecycleState: LifecycleState get() = lifecycleStateFlow.value } /** diff --git a/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/AsynchronousPiPort.kt b/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/AsynchronousPiPort.kt index 1aee657..019ac9e 100644 --- a/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/AsynchronousPiPort.kt +++ b/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/AsynchronousPiPort.kt @@ -5,6 +5,7 @@ import com.pi4j.io.serial.Serial import com.pi4j.io.serial.SerialConfigBuilder import com.pi4j.ktx.io.serial import kotlinx.coroutines.* +import space.kscience.controls.api.LifecycleState import space.kscience.controls.ports.AbstractAsynchronousPort import space.kscience.controls.ports.AsynchronousPort import space.kscience.controls.ports.copyToArray @@ -49,9 +50,10 @@ public class AsynchronousPiPort( } - override val isOpen: Boolean get() = listenerJob?.isActive == true + override val lifecycleState: LifecycleState + get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED - override fun close() { + override suspend fun stop() { listenerJob?.cancel() serial.close() } @@ -74,11 +76,11 @@ public class AsynchronousPiPort( return AsynchronousPiPort(context, meta, serial) } - public fun open( + public suspend fun start( context: Context, device: String, block: SerialConfigBuilder.() -> Unit, - ): AsynchronousPiPort = build(context, device, block).apply { open() } + ): AsynchronousPiPort = build(context, device, block).apply { start() } override fun build(context: Context, meta: Meta): AsynchronousPort { val device: String = meta["device"].string ?: error("Device name not defined") diff --git a/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/SynchronousPiPort.kt b/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/SynchronousPiPort.kt index 6bc590c..d91cd6c 100644 --- a/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/SynchronousPiPort.kt +++ b/controls-pi/src/jvmMain/kotlin/space/kscience/controls/pi/SynchronousPiPort.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runInterruptible import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import space.kscience.controls.api.LifecycleState import space.kscience.controls.ports.SynchronousPort import space.kscience.controls.ports.copyToArray import space.kscience.dataforge.context.* @@ -27,11 +28,13 @@ public class SynchronousPiPort( ) : SynchronousPort { private val pi = context.request(PiPlugin) - override fun open() { + + override suspend fun start() { serial.open() } - override val isOpen: Boolean get() = serial.isOpen + override val lifecycleState: LifecycleState + get() = if(serial.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED override suspend fun respond( request: ByteArray, @@ -41,7 +44,7 @@ public class SynchronousPiPort( serial.write(request) flow { val buffer = ByteBuffer.allocate(1024) - while (isOpen) { + while (serial.isOpen) { try { val num = serial.read(buffer) if (num > 0) { @@ -64,7 +67,7 @@ public class SynchronousPiPort( } } - override fun close() { + override suspend fun stop() { serial.close() } @@ -86,11 +89,11 @@ public class SynchronousPiPort( return SynchronousPiPort(context, meta, serial) } - public fun open( + public suspend fun start( context: Context, device: String, block: SerialConfigBuilder.() -> Unit, - ): SynchronousPiPort = build(context, device, block).apply { open() } + ): SynchronousPiPort = build(context, device, block).apply { start() } override fun build(context: Context, meta: Meta): SynchronousPiPort { val device: String = meta["device"].string ?: error("Device name not defined") diff --git a/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorTcpPort.kt b/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorTcpPort.kt index 463e922..d853e46 100644 --- a/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorTcpPort.kt +++ b/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorTcpPort.kt @@ -6,9 +6,9 @@ import io.ktor.network.sockets.aSocket import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openWriteChannel import io.ktor.utils.io.consumeEachBufferRange -import io.ktor.utils.io.core.Closeable import io.ktor.utils.io.writeAvailable import kotlinx.coroutines.* +import space.kscience.controls.api.LifecycleState import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory import space.kscience.dataforge.meta.Meta @@ -25,7 +25,7 @@ public class KtorTcpPort internal constructor( public val port: Int, coroutineContext: CoroutineContext = context.coroutineContext, socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, -) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable { +) : AbstractAsynchronousPort(context, meta, coroutineContext) { override fun toString(): String = "port[tcp:$host:$port]" @@ -55,13 +55,13 @@ public class KtorTcpPort internal constructor( writeChannel.await().writeAvailable(data) } - override val isOpen: Boolean - get() = listenerJob?.isActive == true + override val lifecycleState: LifecycleState + get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED - override fun close() { + override suspend fun stop() { listenerJob?.cancel() futureSocket.cancel() - super.close() + super.stop() } public companion object : Factory { @@ -82,13 +82,13 @@ public class KtorTcpPort internal constructor( return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions) } - public fun open( + public suspend fun start( context: Context, host: String, port: Int, coroutineContext: CoroutineContext = context.coroutineContext, socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, - ): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { open() } + ): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { start() } override fun build(context: Context, meta: Meta): AsynchronousPort { val host = meta["host"].string ?: "localhost" diff --git a/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorUdpPort.kt b/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorUdpPort.kt index 48096cf..267daa4 100644 --- a/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorUdpPort.kt +++ b/controls-ports-ktor/src/jvmMain/kotlin/space/kscience/controls/ports/KtorUdpPort.kt @@ -4,9 +4,9 @@ import io.ktor.network.selector.ActorSelectorManager import io.ktor.network.sockets.* import io.ktor.utils.io.ByteWriteChannel import io.ktor.utils.io.consumeEachBufferRange -import io.ktor.utils.io.core.Closeable import io.ktor.utils.io.writeAvailable import kotlinx.coroutines.* +import space.kscience.controls.api.LifecycleState import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory import space.kscience.dataforge.meta.Meta @@ -24,7 +24,7 @@ public class KtorUdpPort internal constructor( public val localHost: String = "localhost", coroutineContext: CoroutineContext = context.coroutineContext, socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}, -) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable { +) : AbstractAsynchronousPort(context, meta, coroutineContext) { override fun toString(): String = "port[udp:$remoteHost:$remotePort]" @@ -58,13 +58,13 @@ public class KtorUdpPort internal constructor( writeChannel.await().writeAvailable(data) } - override val isOpen: Boolean - get() = listenerJob?.isActive == true + override val lifecycleState: LifecycleState + get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED - override fun close() { + override suspend fun stop() { listenerJob?.cancel() futureSocket.cancel() - super.close() + super.stop() } public companion object : Factory { @@ -101,7 +101,7 @@ public class KtorUdpPort internal constructor( /** * Create and open UDP port */ - public fun open( + public suspend fun start( context: Context, remoteHost: String, remotePort: Int, @@ -117,7 +117,7 @@ public class KtorUdpPort internal constructor( localHost, coroutineContext, socketOptions - ).apply { open() } + ).apply { start() } override fun build(context: Context, meta: Meta): AsynchronousPort { val remoteHost by meta.string { error("Remote host is not specified") } diff --git a/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/AsynchronousSerialPort.kt b/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/AsynchronousSerialPort.kt index b6e83bc..a9c4ede 100644 --- a/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/AsynchronousSerialPort.kt +++ b/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/AsynchronousSerialPort.kt @@ -5,6 +5,7 @@ import com.fazecast.jSerialComm.SerialPortDataListener import com.fazecast.jSerialComm.SerialPortEvent import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import space.kscience.controls.api.LifecycleState import space.kscience.controls.ports.AbstractAsynchronousPort import space.kscience.controls.ports.AsynchronousPort import space.kscience.dataforge.context.Context @@ -55,18 +56,20 @@ public class AsynchronousSerialPort( comPort.addDataListener(serialPortListener) } - override val isOpen: Boolean get() = comPort.isOpen + override val lifecycleState: LifecycleState + get() = if(comPort.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED + override suspend fun write(data: ByteArray) { comPort.writeBytes(data, data.size) } - override fun close() { + override suspend fun stop() { comPort.removeDataListener() if (comPort.isOpen) { comPort.closePort() } - super.close() + super.stop() } public companion object : Factory { @@ -100,7 +103,7 @@ public class AsynchronousSerialPort( /** * Construct ComPort with given parameters */ - public fun open( + public suspend fun start( context: Context, portName: String, baudRate: Int = 9600, @@ -118,7 +121,7 @@ public class AsynchronousSerialPort( parity = parity, coroutineContext = coroutineContext, additionalConfig = additionalConfig - ).apply { open() } + ).apply { start() } override fun build(context: Context, meta: Meta): AsynchronousPort { diff --git a/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/SynchronousSerialPort.kt b/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/SynchronousSerialPort.kt index 1a9b4a5..b9fc091 100644 --- a/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/SynchronousSerialPort.kt +++ b/controls-serial/src/jvmMain/kotlin/space/kscience/controls/serial/SynchronousSerialPort.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runInterruptible import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import space.kscience.controls.api.LifecycleState import space.kscience.controls.ports.SynchronousPort import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory @@ -28,16 +29,17 @@ public class SynchronousSerialPort( override fun toString(): String = "port[${comPort.descriptivePortName}]" - override fun open() { - if (!isOpen) { + override suspend fun start() { + if (!comPort.isOpen) { comPort.openPort() } } - override val isOpen: Boolean get() = comPort.isOpen + override val lifecycleState: LifecycleState + get() = if(comPort.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED - override fun close() { + override suspend fun stop() { if (comPort.isOpen) { comPort.closePort() } @@ -52,7 +54,7 @@ public class SynchronousSerialPort( comPort.flushIOBuffers() comPort.writeBytes(request, request.size) flow { - while (isOpen) { + while (comPort.isOpen) { try { val available = comPort.bytesAvailable() if (available > 0) { @@ -108,7 +110,7 @@ public class SynchronousSerialPort( /** * Construct ComPort with given parameters */ - public fun open( + public suspend fun start( context: Context, portName: String, baudRate: Int = 9600, @@ -124,7 +126,7 @@ public class SynchronousSerialPort( stopBits = stopBits, parity = parity, additionalConfig = additionalConfig - ).apply { open() } + ).apply { start() } override fun build(context: Context, meta: Meta): SynchronousPort { diff --git a/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/MksPdr900Device.kt b/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/MksPdr900Device.kt index b0fd562..d28acc0 100644 --- a/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/MksPdr900Device.kt +++ b/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/MksPdr900Device.kt @@ -99,9 +99,9 @@ class MksPdr900Device(context: Context, meta: Meta) : DeviceBySpec