diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a1dad9..92b2765 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Refactored ports. Now we have `AsynchronousPort` as well as `SynchronousPort` - `DeviceClient` now initializes property and action descriptors eagerly. - `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices. +- Add some utility methods to ports. Synchronous port response could be now consumed as `Source`. ### Deprecated diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt index 3f89e75..9b37fd3 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/AsynchronousPort.kt @@ -3,10 +3,7 @@ package space.kscience.controls.ports import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.receiveAsFlow -import kotlinx.io.Buffer import kotlinx.io.Source import space.kscience.controls.api.AsynchronousSocket import space.kscience.dataforge.context.* @@ -26,15 +23,7 @@ public interface AsynchronousPort : ContextAware, AsynchronousSocket * [scope] controls the consummation. * If the scope is canceled, the source stops producing. */ -public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source { - val buffer = Buffer() - - subscribe().onEach { - buffer.write(it) - }.launchIn(scope) - - return buffer -} +public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source = subscribe().consumeAsSource(scope) /** @@ -51,13 +40,13 @@ public abstract class AbstractAsynchronousPort( CoroutineScope( coroutineContext + SupervisorJob(coroutineContext[Job]) + - CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { throwable.stackTraceToString() } } + + CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { "Asynchronous port error: " + throwable.stackTraceToString() } } + CoroutineName(toString()) ) } - private val outgoing = Channel(meta["outgoing.capacity"].int?:100) - private val incoming = Channel(meta["incoming.capacity"].int?:100) + private val outgoing = Channel(meta["outgoing.capacity"].int ?: 100) + private val incoming = Channel(meta["incoming.capacity"].int ?: 100) /** * Internal method to synchronously send data @@ -100,7 +89,7 @@ public abstract class AbstractAsynchronousPort( * Send a data packet via the port */ override suspend fun send(data: ByteArray) { - check(isOpen){"The port is not opened"} + check(isOpen) { "The port is not opened" } outgoing.send(data) } @@ -117,7 +106,7 @@ public abstract class AbstractAsynchronousPort( sendJob?.cancel() } - override fun toString(): String = meta["name"].string?:"ChannelPort[${hashCode().toString(16)}]" + override fun toString(): String = meta["name"].string ?: "ChannelPort[${hashCode().toString(16)}]" } /** diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt index 49c2b06..ac368c5 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/SynchronousPort.kt @@ -1,11 +1,11 @@ package space.kscience.controls.ports -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.* import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.io.Buffer +import kotlinx.io.Source import kotlinx.io.readByteArray import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.ContextAware @@ -46,6 +46,24 @@ public interface SynchronousPort : ContextAware, AutoCloseable { } } +/** + * Read response to a given message using [Source] abstraction + */ +public suspend fun SynchronousPort.respondAsSource( + request: ByteArray, + transform: suspend Source.() -> R, +): R = respond(request) { + //suspend until the response is fully read + coroutineScope { + val buffer = Buffer() + val collectJob = onEach { buffer.write(it) }.launchIn(this) + val res = transform(buffer) + //cancel collection when the result is achieved + collectJob.cancel() + res + } +} + private class SynchronousOverAsynchronousPort( val port: AsynchronousPort, val mutex: Mutex, diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt index dbe7256..5d7d3bf 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt @@ -1,5 +1,24 @@ package space.kscience.controls.ports +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.io.Buffer +import kotlinx.io.Source import space.kscience.dataforge.io.Binary -public fun Binary.readShort(position: Int): Short = read(position) { readShort() } \ No newline at end of file +public fun Binary.readShort(position: Int): Short = read(position) { readShort() } + +/** + * Consume given flow of [ByteArray] as [Source]. The subscription is canceled when [scope] is closed. + */ +public fun Flow.consumeAsSource(scope: CoroutineScope): Source { + val buffer = Buffer() + //subscription is canceled when the scope is canceled + onEach { + buffer.write(it) + }.launchIn(scope) + + return buffer +} \ No newline at end of file