Add lifecycle to ports. Suspended device start

This commit is contained in:
Alexander Nozik 2024-08-03 21:11:59 +03:00
parent 47327aef19
commit c12f1ce1cd
24 changed files with 185 additions and 142 deletions

View File

@ -9,6 +9,7 @@
- `DeviceClient` properly evaluates lifecycle and logs - `DeviceClient` properly evaluates lifecycle and logs
- `PeerConnection` API for direct device-device binary sharing - `PeerConnection` API for direct device-device binary sharing
- DeviceDrawable2D intermediate visualization implementation - DeviceDrawable2D intermediate visualization implementation
- New interface `WithLifeCycle`. Change Port API to adhere to it.
### Changed ### Changed
- Constructor properties return `DeviceState` in order to be able to subscribe to them - Constructor properties return `DeviceState` in order to be able to subscribe to them
@ -16,6 +17,8 @@
- `DeviceClient` now initializes property and action descriptors eagerly. - `DeviceClient` now initializes property and action descriptors eagerly.
- `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices. - `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices.
- Add some utility methods to ports. Synchronous port response could be now consumed as `Source`. - Add some utility methods to ports. Synchronous port response could be now consumed as `Source`.
- `DeviceLifecycleState` is replaced by `LifecycleState`.
### Deprecated ### Deprecated

View File

@ -3,7 +3,7 @@ package space.kscience.controls.constructor
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import space.kscience.controls.api.* import space.kscience.controls.api.*
import space.kscience.controls.api.DeviceLifecycleState.* import space.kscience.controls.api.LifecycleState.*
import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install import space.kscience.controls.manager.install
import space.kscience.controls.spec.DevicePropertySpec import space.kscience.controls.spec.DevicePropertySpec
@ -165,11 +165,11 @@ public open class DeviceGroup(
return action(argument) return action(argument)
} }
final override var lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STOPPED final override var lifecycleState: LifecycleState = LifecycleState.STOPPED
private set private set
private suspend fun setLifecycleState(lifecycleState: DeviceLifecycleState) { private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
this.lifecycleState = lifecycleState this.lifecycleState = lifecycleState
sharedMessageFlow.emit( sharedMessageFlow.emit(
DeviceLifeCycleMessage(lifecycleState) DeviceLifeCycleMessage(lifecycleState)

View File

@ -4,7 +4,7 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.controls.api.DeviceLifeCycleMessage import space.kscience.controls.api.DeviceLifeCycleMessage
import space.kscience.controls.api.DeviceLifecycleState import space.kscience.controls.api.LifecycleState
import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.manager.install import space.kscience.controls.manager.install
import space.kscience.controls.spec.doRecurring import space.kscience.controls.spec.doRecurring
@ -37,7 +37,7 @@ class DeviceGroupTest {
} }
error("Error!") error("Error!")
} }
testDevice.messageFlow.first { it is DeviceLifeCycleMessage && it.state == DeviceLifecycleState.STOPPED } testDevice.messageFlow.first { it is DeviceLifeCycleMessage && it.state == LifecycleState.STOPPED }
println("stopped") println("stopped")
} }
} }

View File

@ -5,7 +5,7 @@ import kotlinx.coroutines.flow.Flow
/** /**
* A generic bidirectional asynchronous sender/receiver object * A generic bidirectional asynchronous sender/receiver object
*/ */
public interface AsynchronousSocket<T> : AutoCloseable { public interface AsynchronousSocket<T> : WithLifeCycle {
/** /**
* Send an object to the socket * Send an object to the socket
*/ */
@ -15,16 +15,6 @@ public interface AsynchronousSocket<T> : AutoCloseable {
* Flow of objects received from socket * Flow of objects received from socket
*/ */
public fun subscribe(): Flow<T> public fun subscribe(): Flow<T>
/**
* Start socket operation
*/
public fun open()
/**
* Check if this socket is open
*/
public val isOpen: Boolean
} }
/** /**

View File

@ -4,7 +4,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.serialization.Serializable
import space.kscience.controls.api.Device.Companion.DEVICE_TARGET import space.kscience.controls.api.Device.Companion.DEVICE_TARGET
import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.context.info import space.kscience.dataforge.context.info
@ -15,40 +14,13 @@ import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DfType import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.names.parseAsName import space.kscience.dataforge.names.parseAsName
/**
* A lifecycle state of a device
*/
@Serializable
public enum class DeviceLifecycleState {
/**
* Device is initializing
*/
STARTING,
/**
* The Device is initialized and running
*/
STARTED,
/**
* The Device is closed
*/
STOPPED,
/**
* The device encountered irrecoverable error
*/
ERROR
}
/** /**
* General interface describing a managed Device. * General interface describing a managed Device.
* [Device] is a supervisor scope encompassing all operations on a device. * [Device] is a supervisor scope encompassing all operations on a device.
* When canceled, cancels all running processes. * When canceled, cancels all running processes.
*/ */
@DfType(DEVICE_TARGET) @DfType(DEVICE_TARGET)
public interface Device : ContextAware, CoroutineScope { public interface Device : ContextAware, WithLifeCycle, CoroutineScope {
/** /**
* Initial configuration meta for the device * Initial configuration meta for the device
@ -94,18 +66,16 @@ public interface Device : ContextAware, CoroutineScope {
* Initialize the device. This function suspends until the device is finished initialization. * Initialize the device. This function suspends until the device is finished initialization.
* Does nothing if the device is started or is starting * Does nothing if the device is started or is starting
*/ */
public suspend fun start(): Unit = Unit override suspend fun start(): Unit = Unit
/** /**
* Close and terminate the device. This function does not wait for the device to be closed. * Close and terminate the device. This function does not wait for the device to be closed.
*/ */
public suspend fun stop() { override suspend fun stop() {
coroutineContext[Job]?.cancel("The device is closed") coroutineContext[Job]?.cancel("The device is closed")
logger.info { "Device $this is closed" } logger.info { "Device $this is closed" }
} }
public val lifecycleState: DeviceLifecycleState
public companion object { public companion object {
public const val DEVICE_TARGET: String = "device" public const val DEVICE_TARGET: String = "device"
} }
@ -167,3 +137,12 @@ public fun Device.onPropertyChange(
public fun Device.propertyMessageFlow(propertyName: String): Flow<PropertyChangedMessage> = messageFlow public fun Device.propertyMessageFlow(propertyName: String): Flow<PropertyChangedMessage> = messageFlow
.filterIsInstance<PropertyChangedMessage>() .filterIsInstance<PropertyChangedMessage>()
.filter { it.property == propertyName } .filter { it.property == propertyName }
/**
* React on device lifecycle events
*/
public fun Device.onLifecycleEvent(
block: suspend (LifecycleState) -> Unit
): Job = messageFlow.filterIsInstance<DeviceLifeCycleMessage>().onEach {
block(it.state)
}.launchIn(this)

View File

@ -240,7 +240,7 @@ public data class DeviceErrorMessage(
@Serializable @Serializable
@SerialName("lifecycle") @SerialName("lifecycle")
public data class DeviceLifeCycleMessage( public data class DeviceLifeCycleMessage(
val state: DeviceLifecycleState, val state: LifecycleState,
override val sourceDevice: Name = Name.EMPTY, override val sourceDevice: Name = Name.EMPTY,
override val targetDevice: Name? = null, override val targetDevice: Name? = null,
override val comment: String? = null, override val comment: String? = null,

View File

@ -0,0 +1,59 @@
package space.kscience.controls.api
import kotlinx.serialization.Serializable
/**
* A lifecycle state of a device
*/
@Serializable
public enum class LifecycleState {
/**
* Device is initializing
*/
STARTING,
/**
* The Device is initialized and running
*/
STARTED,
/**
* The Device is closed
*/
STOPPED,
/**
* The device encountered irrecoverable error
*/
ERROR
}
/**
* An object that could be started or stopped functioning
*/
public interface WithLifeCycle {
public suspend fun start()
public suspend fun stop()
public val lifecycleState: LifecycleState
}
/**
* Bind this object lifecycle to a device lifecycle
*
* The starting and stopping are done in device scope
*/
public fun WithLifeCycle.bindToDeviceLifecycle(device: Device){
device.onLifecycleEvent {
when(it){
LifecycleState.STARTING -> start()
LifecycleState.STARTED -> {/*ignore*/}
LifecycleState.STOPPED -> stop()
LifecycleState.ERROR -> stop()
}
}
}

View File

@ -6,6 +6,7 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.io.Source import kotlinx.io.Source
import space.kscience.controls.api.AsynchronousSocket import space.kscience.controls.api.AsynchronousSocket
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
@ -65,8 +66,8 @@ public abstract class AbstractAsynchronousPort(
protected abstract fun onOpen() protected abstract fun onOpen()
final override fun open() { final override suspend fun start() {
if (!isOpen) { if (lifecycleState == LifecycleState.STOPPED) {
sendJob = scope.launch { sendJob = scope.launch {
for (data in outgoing) { for (data in outgoing) {
try { try {
@ -80,7 +81,7 @@ public abstract class AbstractAsynchronousPort(
} }
onOpen() onOpen()
} else { } else {
logger.warn { "$this already opened" } logger.warn { "$this already started" }
} }
} }
@ -89,7 +90,7 @@ public abstract class AbstractAsynchronousPort(
* Send a data packet via the port * Send a data packet via the port
*/ */
override suspend fun send(data: ByteArray) { override suspend fun send(data: ByteArray) {
check(isOpen) { "The port is not opened" } check(lifecycleState == LifecycleState.STARTED) { "The port is not opened" }
outgoing.send(data) outgoing.send(data)
} }
@ -100,7 +101,7 @@ public abstract class AbstractAsynchronousPort(
*/ */
override fun subscribe(): Flow<ByteArray> = incoming.receiveAsFlow() override fun subscribe(): Flow<ByteArray> = incoming.receiveAsFlow()
override fun close() { override suspend fun stop() {
outgoing.close() outgoing.close()
incoming.close() incoming.close()
sendJob?.cancel() sendJob?.cancel()

View File

@ -7,6 +7,8 @@ import kotlinx.coroutines.sync.withLock
import kotlinx.io.Buffer import kotlinx.io.Buffer
import kotlinx.io.Source import kotlinx.io.Source
import kotlinx.io.readByteArray import kotlinx.io.readByteArray
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.api.WithLifeCycle
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.ContextAware
@ -14,11 +16,7 @@ import space.kscience.dataforge.context.ContextAware
* A port handler for synchronous (request-response) communication with a port. * A port handler for synchronous (request-response) communication with a port.
* Only one request could be active at a time (others are suspended). * Only one request could be active at a time (others are suspended).
*/ */
public interface SynchronousPort : ContextAware, AutoCloseable { public interface SynchronousPort : ContextAware, WithLifeCycle {
public fun open()
public val isOpen: Boolean
/** /**
* Send a single message and wait for the flow of response chunks. * Send a single message and wait for the flow of response chunks.
@ -71,14 +69,14 @@ private class SynchronousOverAsynchronousPort(
override val context: Context get() = port.context override val context: Context get() = port.context
override fun open() { override suspend fun start() {
if (!port.isOpen) port.open() if (port.lifecycleState == LifecycleState.STOPPED) port.start()
} }
override val isOpen: Boolean get() = port.isOpen override val lifecycleState: LifecycleState get() = port.lifecycleState
override fun close() { override suspend fun stop() {
if (port.isOpen) port.close() if (port.lifecycleState == LifecycleState.STARTED) port.stop()
} }
override suspend fun <R> respond( override suspend fun <R> respond(

View File

@ -72,7 +72,6 @@ public abstract class DeviceBase<D : Device>(
onBufferOverflow = BufferOverflow.DROP_OLDEST onBufferOverflow = BufferOverflow.DROP_OLDEST
) )
@OptIn(ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext = context.newCoroutineContext( override val coroutineContext: CoroutineContext = context.newCoroutineContext(
SupervisorJob(context.coroutineContext[Job]) + SupervisorJob(context.coroutineContext[Job]) +
CoroutineName("Device $id") + CoroutineName("Device $id") +
@ -188,11 +187,11 @@ public abstract class DeviceBase<D : Device>(
return spec.executeWithMeta(self, argument ?: Meta.EMPTY) return spec.executeWithMeta(self, argument ?: Meta.EMPTY)
} }
final override var lifecycleState: DeviceLifecycleState = DeviceLifecycleState.STOPPED final override var lifecycleState: LifecycleState = LifecycleState.STOPPED
private set private set
private suspend fun setLifecycleState(lifecycleState: DeviceLifecycleState) { private suspend fun setLifecycleState(lifecycleState: LifecycleState) {
this.lifecycleState = lifecycleState this.lifecycleState = lifecycleState
sharedMessageFlow.emit( sharedMessageFlow.emit(
DeviceLifeCycleMessage(lifecycleState) DeviceLifeCycleMessage(lifecycleState)
@ -204,11 +203,11 @@ public abstract class DeviceBase<D : Device>(
} }
final override suspend fun start() { final override suspend fun start() {
if (lifecycleState == DeviceLifecycleState.STOPPED) { if (lifecycleState == LifecycleState.STOPPED) {
super.start() super.start()
setLifecycleState(DeviceLifecycleState.STARTING) setLifecycleState(LifecycleState.STARTING)
onStart() onStart()
setLifecycleState(DeviceLifecycleState.STARTED) setLifecycleState(LifecycleState.STARTED)
} else { } else {
logger.debug { "Device $this is already started" } logger.debug { "Device $this is already started" }
} }
@ -220,7 +219,7 @@ public abstract class DeviceBase<D : Device>(
final override suspend fun stop() { final override suspend fun stop() {
onStop() onStop()
setLifecycleState(DeviceLifecycleState.STOPPED) setLifecycleState(LifecycleState.STOPPED)
super.stop() super.stop()
} }

View File

@ -33,7 +33,7 @@ public abstract class DeviceSpec<D : Device> {
public open suspend fun D.onOpen() { public open suspend fun D.onOpen() {
} }
public open fun D.onClose() { public open suspend fun D.onClose() {
} }

View File

@ -1,6 +1,7 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import java.net.InetSocketAddress import java.net.InetSocketAddress
@ -30,7 +31,7 @@ public class ChannelPort(
meta: Meta, meta: Meta,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
channelBuilder: suspend () -> ByteChannel, channelBuilder: suspend () -> ByteChannel,
) : AbstractAsynchronousPort(context, meta, coroutineContext), AutoCloseable { ) : AbstractAsynchronousPort(context, meta, coroutineContext) {
/** /**
* A handler to await port connection * A handler to await port connection
@ -41,7 +42,8 @@ public class ChannelPort(
private var listenerJob: Job? = null private var listenerJob: Job? = null
override val isOpen: Boolean get() = listenerJob?.isActive == true override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override fun onOpen() { override fun onOpen() {
listenerJob = scope.launch(Dispatchers.IO) { listenerJob = scope.launch(Dispatchers.IO) {
@ -71,12 +73,12 @@ public class ChannelPort(
} }
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
override fun close() { override suspend fun stop() {
listenerJob?.cancel() listenerJob?.cancel()
if (futureChannel.isCompleted) { if (futureChannel.isCompleted) {
futureChannel.getCompleted().close() futureChannel.getCompleted().close()
} }
super.close() super.stop()
} }
} }
@ -105,12 +107,12 @@ public object TcpPort : Factory<AsynchronousPort> {
/** /**
* Create and open TCP port * Create and open TCP port
*/ */
public fun open( public suspend fun start(
context: Context, context: Context,
host: String, host: String,
port: Int, port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = build(context, host, port, coroutineContext).apply { open() } ): ChannelPort = build(context, host, port, coroutineContext).apply { start() }
override fun build(context: Context, meta: Meta): ChannelPort { override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost" val host = meta["host"].string ?: "localhost"
@ -156,13 +158,13 @@ public object UdpPort : Factory<AsynchronousPort> {
/** /**
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages. * Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
*/ */
public fun open( public suspend fun start(
context: Context, context: Context,
remoteHost: String, remoteHost: String,
remotePort: Int, remotePort: Int,
localPort: Int? = null, localPort: Int? = null,
localHost: String = "localhost", localHost: String = "localhost",
): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { open() } ): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { start() }
override fun build(context: Context, meta: Meta): ChannelPort { override fun build(context: Context, meta: Meta): ChannelPort {

View File

@ -1,6 +1,7 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import java.net.DatagramPacket import java.net.DatagramPacket
@ -39,13 +40,13 @@ public class UdpSocketPort(
} }
} }
override fun close() { override suspend fun stop() {
listenerJob?.cancel() listenerJob?.cancel()
super.close() super.stop()
} }
override val isOpen: Boolean get() = listenerJob?.isActive == true override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) { override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
val packet = DatagramPacket( val packet = DatagramPacket(

View File

@ -29,8 +29,8 @@ internal class AsynchronousPortIOTest {
@Test @Test
fun testUdpCommunication() = runTest { fun testUdpCommunication() = runTest {
val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812) val receiver = UdpPort.start(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811) val sender = UdpPort.start(Global, "localhost", 8812, localPort = 8811)
delay(30) delay(30)
repeat(10) { repeat(10) {
@ -44,7 +44,7 @@ internal class AsynchronousPortIOTest {
.toList() .toList()
assertEquals("Line number 3", res[3].trim()) assertEquals("Line number 3", res[3].trim())
receiver.close() receiver.stop()
sender.close() sender.stop()
} }
} }

View File

@ -99,10 +99,10 @@ public class DeviceClient internal constructor(
} }
private val lifecycleStateFlow = messageFlow.filterIsInstance<DeviceLifeCycleMessage>() private val lifecycleStateFlow = messageFlow.filterIsInstance<DeviceLifeCycleMessage>()
.map { it.state }.stateIn(this, started = SharingStarted.Eagerly, DeviceLifecycleState.STARTED) .map { it.state }.stateIn(this, started = SharingStarted.Eagerly, LifecycleState.STARTED)
@DFExperimental @DFExperimental
override val lifecycleState: DeviceLifecycleState get() = lifecycleStateFlow.value override val lifecycleState: LifecycleState get() = lifecycleStateFlow.value
} }
/** /**

View File

@ -5,6 +5,7 @@ import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial import com.pi4j.ktx.io.serial
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.AbstractAsynchronousPort import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort import space.kscience.controls.ports.AsynchronousPort
import space.kscience.controls.ports.copyToArray import space.kscience.controls.ports.copyToArray
@ -49,9 +50,10 @@ public class AsynchronousPiPort(
} }
override val isOpen: Boolean get() = listenerJob?.isActive == true override val lifecycleState: LifecycleState
get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override fun close() { override suspend fun stop() {
listenerJob?.cancel() listenerJob?.cancel()
serial.close() serial.close()
} }
@ -74,11 +76,11 @@ public class AsynchronousPiPort(
return AsynchronousPiPort(context, meta, serial) return AsynchronousPiPort(context, meta, serial)
} }
public fun open( public suspend fun start(
context: Context, context: Context,
device: String, device: String,
block: SerialConfigBuilder.() -> Unit, block: SerialConfigBuilder.() -> Unit,
): AsynchronousPiPort = build(context, device, block).apply { open() } ): AsynchronousPiPort = build(context, device, block).apply { start() }
override fun build(context: Context, meta: Meta): AsynchronousPort { override fun build(context: Context, meta: Meta): AsynchronousPort {
val device: String = meta["device"].string ?: error("Device name not defined") val device: String = meta["device"].string ?: error("Device name not defined")

View File

@ -10,6 +10,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.SynchronousPort import space.kscience.controls.ports.SynchronousPort
import space.kscience.controls.ports.copyToArray import space.kscience.controls.ports.copyToArray
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
@ -27,11 +28,13 @@ public class SynchronousPiPort(
) : SynchronousPort { ) : SynchronousPort {
private val pi = context.request(PiPlugin) private val pi = context.request(PiPlugin)
override fun open() {
override suspend fun start() {
serial.open() serial.open()
} }
override val isOpen: Boolean get() = serial.isOpen override val lifecycleState: LifecycleState
get() = if(serial.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED
override suspend fun <R> respond( override suspend fun <R> respond(
request: ByteArray, request: ByteArray,
@ -41,7 +44,7 @@ public class SynchronousPiPort(
serial.write(request) serial.write(request)
flow<ByteArray> { flow<ByteArray> {
val buffer = ByteBuffer.allocate(1024) val buffer = ByteBuffer.allocate(1024)
while (isOpen) { while (serial.isOpen) {
try { try {
val num = serial.read(buffer) val num = serial.read(buffer)
if (num > 0) { if (num > 0) {
@ -64,7 +67,7 @@ public class SynchronousPiPort(
} }
} }
override fun close() { override suspend fun stop() {
serial.close() serial.close()
} }
@ -86,11 +89,11 @@ public class SynchronousPiPort(
return SynchronousPiPort(context, meta, serial) return SynchronousPiPort(context, meta, serial)
} }
public fun open( public suspend fun start(
context: Context, context: Context,
device: String, device: String,
block: SerialConfigBuilder.() -> Unit, block: SerialConfigBuilder.() -> Unit,
): SynchronousPiPort = build(context, device, block).apply { open() } ): SynchronousPiPort = build(context, device, block).apply { start() }
override fun build(context: Context, meta: Meta): SynchronousPiPort { override fun build(context: Context, meta: Meta): SynchronousPiPort {
val device: String = meta["device"].string ?: error("Device name not defined") val device: String = meta["device"].string ?: error("Device name not defined")

View File

@ -6,9 +6,9 @@ import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -25,7 +25,7 @@ public class KtorTcpPort internal constructor(
public val port: Int, public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable { ) : AbstractAsynchronousPort(context, meta, coroutineContext) {
override fun toString(): String = "port[tcp:$host:$port]" override fun toString(): String = "port[tcp:$host:$port]"
@ -55,13 +55,13 @@ public class KtorTcpPort internal constructor(
writeChannel.await().writeAvailable(data) writeChannel.await().writeAvailable(data)
} }
override val isOpen: Boolean override val lifecycleState: LifecycleState
get() = listenerJob?.isActive == true get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override fun close() { override suspend fun stop() {
listenerJob?.cancel() listenerJob?.cancel()
futureSocket.cancel() futureSocket.cancel()
super.close() super.stop()
} }
public companion object : Factory<AsynchronousPort> { public companion object : Factory<AsynchronousPort> {
@ -82,13 +82,13 @@ public class KtorTcpPort internal constructor(
return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions) return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions)
} }
public fun open( public suspend fun start(
context: Context, context: Context,
host: String, host: String,
port: Int, port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}, socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { open() } ): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { start() }
override fun build(context: Context, meta: Meta): AsynchronousPort { override fun build(context: Context, meta: Meta): AsynchronousPort {
val host = meta["host"].string ?: "localhost" val host = meta["host"].string ?: "localhost"

View File

@ -4,9 +4,9 @@ import io.ktor.network.selector.ActorSelectorManager
import io.ktor.network.sockets.* import io.ktor.network.sockets.*
import io.ktor.utils.io.ByteWriteChannel import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.consumeEachBufferRange import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.controls.api.LifecycleState
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -24,7 +24,7 @@ public class KtorUdpPort internal constructor(
public val localHost: String = "localhost", public val localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}, socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable { ) : AbstractAsynchronousPort(context, meta, coroutineContext) {
override fun toString(): String = "port[udp:$remoteHost:$remotePort]" override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
@ -58,13 +58,13 @@ public class KtorUdpPort internal constructor(
writeChannel.await().writeAvailable(data) writeChannel.await().writeAvailable(data)
} }
override val isOpen: Boolean override val lifecycleState: LifecycleState
get() = listenerJob?.isActive == true get() = if(listenerJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override fun close() { override suspend fun stop() {
listenerJob?.cancel() listenerJob?.cancel()
futureSocket.cancel() futureSocket.cancel()
super.close() super.stop()
} }
public companion object : Factory<AsynchronousPort> { public companion object : Factory<AsynchronousPort> {
@ -101,7 +101,7 @@ public class KtorUdpPort internal constructor(
/** /**
* Create and open UDP port * Create and open UDP port
*/ */
public fun open( public suspend fun start(
context: Context, context: Context,
remoteHost: String, remoteHost: String,
remotePort: Int, remotePort: Int,
@ -117,7 +117,7 @@ public class KtorUdpPort internal constructor(
localHost, localHost,
coroutineContext, coroutineContext,
socketOptions socketOptions
).apply { open() } ).apply { start() }
override fun build(context: Context, meta: Meta): AsynchronousPort { override fun build(context: Context, meta: Meta): AsynchronousPort {
val remoteHost by meta.string { error("Remote host is not specified") } val remoteHost by meta.string { error("Remote host is not specified") }

View File

@ -5,6 +5,7 @@ import com.fazecast.jSerialComm.SerialPortDataListener
import com.fazecast.jSerialComm.SerialPortEvent import com.fazecast.jSerialComm.SerialPortEvent
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.AbstractAsynchronousPort import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort import space.kscience.controls.ports.AsynchronousPort
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
@ -55,18 +56,20 @@ public class AsynchronousSerialPort(
comPort.addDataListener(serialPortListener) comPort.addDataListener(serialPortListener)
} }
override val isOpen: Boolean get() = comPort.isOpen override val lifecycleState: LifecycleState
get() = if(comPort.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED
override suspend fun write(data: ByteArray) { override suspend fun write(data: ByteArray) {
comPort.writeBytes(data, data.size) comPort.writeBytes(data, data.size)
} }
override fun close() { override suspend fun stop() {
comPort.removeDataListener() comPort.removeDataListener()
if (comPort.isOpen) { if (comPort.isOpen) {
comPort.closePort() comPort.closePort()
} }
super.close() super.stop()
} }
public companion object : Factory<AsynchronousPort> { public companion object : Factory<AsynchronousPort> {
@ -100,7 +103,7 @@ public class AsynchronousSerialPort(
/** /**
* Construct ComPort with given parameters * Construct ComPort with given parameters
*/ */
public fun open( public suspend fun start(
context: Context, context: Context,
portName: String, portName: String,
baudRate: Int = 9600, baudRate: Int = 9600,
@ -118,7 +121,7 @@ public class AsynchronousSerialPort(
parity = parity, parity = parity,
coroutineContext = coroutineContext, coroutineContext = coroutineContext,
additionalConfig = additionalConfig additionalConfig = additionalConfig
).apply { open() } ).apply { start() }
override fun build(context: Context, meta: Meta): AsynchronousPort { override fun build(context: Context, meta: Meta): AsynchronousPort {

View File

@ -7,6 +7,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.SynchronousPort import space.kscience.controls.ports.SynchronousPort
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.Factory
@ -28,16 +29,17 @@ public class SynchronousSerialPort(
override fun toString(): String = "port[${comPort.descriptivePortName}]" override fun toString(): String = "port[${comPort.descriptivePortName}]"
override fun open() { override suspend fun start() {
if (!isOpen) { if (!comPort.isOpen) {
comPort.openPort() comPort.openPort()
} }
} }
override val isOpen: Boolean get() = comPort.isOpen override val lifecycleState: LifecycleState
get() = if(comPort.isOpen) LifecycleState.STARTED else LifecycleState.STOPPED
override fun close() { override suspend fun stop() {
if (comPort.isOpen) { if (comPort.isOpen) {
comPort.closePort() comPort.closePort()
} }
@ -52,7 +54,7 @@ public class SynchronousSerialPort(
comPort.flushIOBuffers() comPort.flushIOBuffers()
comPort.writeBytes(request, request.size) comPort.writeBytes(request, request.size)
flow<ByteArray> { flow<ByteArray> {
while (isOpen) { while (comPort.isOpen) {
try { try {
val available = comPort.bytesAvailable() val available = comPort.bytesAvailable()
if (available > 0) { if (available > 0) {
@ -108,7 +110,7 @@ public class SynchronousSerialPort(
/** /**
* Construct ComPort with given parameters * Construct ComPort with given parameters
*/ */
public fun open( public suspend fun start(
context: Context, context: Context,
portName: String, portName: String,
baudRate: Int = 9600, baudRate: Int = 9600,
@ -124,7 +126,7 @@ public class SynchronousSerialPort(
stopBits = stopBits, stopBits = stopBits,
parity = parity, parity = parity,
additionalConfig = additionalConfig additionalConfig = additionalConfig
).apply { open() } ).apply { start() }
override fun build(context: Context, meta: Meta): SynchronousPort { override fun build(context: Context, meta: Meta): SynchronousPort {

View File

@ -99,9 +99,9 @@ class MksPdr900Device(context: Context, meta: Meta) : DeviceBySpec<MksPdr900Devi
val error by logicalProperty(MetaConverter.string) val error by logicalProperty(MetaConverter.string)
override fun MksPdr900Device.onClose() { override suspend fun MksPdr900Device.onClose() {
if (portDelegate.isInitialized()) { if (portDelegate.isInitialized()) {
port.close() port.stop()
} }
} }
} }

View File

@ -168,7 +168,7 @@ class PiMotionMasterDevice(
} }
//Update port //Update port
//address = portSpec.node //address = portSpec.node
port = portFactory(portSpec, context).apply { open() } port = portFactory(portSpec, context).apply { start() }
// connector.open() // connector.open()
//Initialize axes //Initialize axes
val idn = read(identity) val idn = read(identity)
@ -190,7 +190,7 @@ class PiMotionMasterDevice(
}) { }) {
port?.let { port?.let {
execute(stop) execute(stop)
it.close() it.stop()
} }
port = null port = null
propertyChanged(connected, false) propertyChanged(connected, false)

View File

@ -6,6 +6,7 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.AsynchronousSocket import space.kscience.controls.api.AsynchronousSocket
import space.kscience.controls.api.LifecycleState
import space.kscience.controls.ports.AbstractAsynchronousPort import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.withDelimiter import space.kscience.controls.ports.withDelimiter
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
@ -48,10 +49,10 @@ abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<Byt
respond(response()) respond(response())
} }
override val isOpen: Boolean override val lifecycleState: LifecycleState
get() = scope.isActive get() = if(scope.isActive) LifecycleState.STARTED else LifecycleState.STOPPED
override fun close() = scope.cancel() override suspend fun stop() = scope.cancel()
} }
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractAsynchronousPort(context, Meta.EMPTY) { class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractAsynchronousPort(context, Meta.EMPTY) {
@ -72,12 +73,12 @@ class VirtualPort(private val device: VirtualDevice, context: Context) : Abstrac
device.send(data) device.send(data)
} }
override val isOpen: Boolean override val lifecycleState: LifecycleState
get() = respondJob?.isActive == true get() = if(respondJob?.isActive == true) LifecycleState.STARTED else LifecycleState.STOPPED
override fun close() { override suspend fun stop() {
respondJob?.cancel() respondJob?.cancel()
super.close() super.stop()
} }
} }
@ -88,7 +89,7 @@ class PiMotionMasterVirtualDevice(
scope: CoroutineScope = context, scope: CoroutineScope = context,
) : VirtualDevice(scope), ContextAware { ) : VirtualDevice(scope), ContextAware {
override fun open() { override suspend fun start() {
//add asynchronous send logic here //add asynchronous send logic here
} }