Synchronous port response consumeAsSource

This commit is contained in:
Alexander Nozik 2024-05-28 09:51:30 +03:00
parent 05757aefdc
commit f72d7aa3fa
4 changed files with 48 additions and 21 deletions

View File

@ -13,6 +13,7 @@
- Refactored ports. Now we have `AsynchronousPort` as well as `SynchronousPort`
- `DeviceClient` now initializes property and action descriptors eagerly.
- `DeviceHub` now works with `Name` instead of `NameToken`. Tree-like structure is made using `Path`. Device messages no longer have access to sub-devices.
- Add some utility methods to ports. Synchronous port response could be now consumed as `Source`.
### Deprecated

View File

@ -3,10 +3,7 @@ package space.kscience.controls.ports
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.io.Buffer
import kotlinx.io.Source
import space.kscience.controls.api.AsynchronousSocket
import space.kscience.dataforge.context.*
@ -26,15 +23,7 @@ public interface AsynchronousPort : ContextAware, AsynchronousSocket<ByteArray>
* [scope] controls the consummation.
* If the scope is canceled, the source stops producing.
*/
public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source {
val buffer = Buffer()
subscribe().onEach {
buffer.write(it)
}.launchIn(scope)
return buffer
}
public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source = subscribe().consumeAsSource(scope)
/**
@ -51,13 +40,13 @@ public abstract class AbstractAsynchronousPort(
CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { throwable.stackTraceToString() } } +
CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { "Asynchronous port error: " + throwable.stackTraceToString() } } +
CoroutineName(toString())
)
}
private val outgoing = Channel<ByteArray>(meta["outgoing.capacity"].int?:100)
private val incoming = Channel<ByteArray>(meta["incoming.capacity"].int?:100)
private val outgoing = Channel<ByteArray>(meta["outgoing.capacity"].int ?: 100)
private val incoming = Channel<ByteArray>(meta["incoming.capacity"].int ?: 100)
/**
* Internal method to synchronously send data
@ -100,7 +89,7 @@ public abstract class AbstractAsynchronousPort(
* Send a data packet via the port
*/
override suspend fun send(data: ByteArray) {
check(isOpen){"The port is not opened"}
check(isOpen) { "The port is not opened" }
outgoing.send(data)
}
@ -117,7 +106,7 @@ public abstract class AbstractAsynchronousPort(
sendJob?.cancel()
}
override fun toString(): String = meta["name"].string?:"ChannelPort[${hashCode().toString(16)}]"
override fun toString(): String = meta["name"].string ?: "ChannelPort[${hashCode().toString(16)}]"
}
/**

View File

@ -1,11 +1,11 @@
package space.kscience.controls.ports
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.io.Buffer
import kotlinx.io.Source
import kotlinx.io.readByteArray
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextAware
@ -46,6 +46,24 @@ public interface SynchronousPort : ContextAware, AutoCloseable {
}
}
/**
* Read response to a given message using [Source] abstraction
*/
public suspend fun <R> SynchronousPort.respondAsSource(
request: ByteArray,
transform: suspend Source.() -> R,
): R = respond(request) {
//suspend until the response is fully read
coroutineScope {
val buffer = Buffer()
val collectJob = onEach { buffer.write(it) }.launchIn(this)
val res = transform(buffer)
//cancel collection when the result is achieved
collectJob.cancel()
res
}
}
private class SynchronousOverAsynchronousPort(
val port: AsynchronousPort,
val mutex: Mutex,

View File

@ -1,5 +1,24 @@
package space.kscience.controls.ports
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.io.Buffer
import kotlinx.io.Source
import space.kscience.dataforge.io.Binary
public fun Binary.readShort(position: Int): Short = read(position) { readShort() }
public fun Binary.readShort(position: Int): Short = read(position) { readShort() }
/**
* Consume given flow of [ByteArray] as [Source]. The subscription is canceled when [scope] is closed.
*/
public fun Flow<ByteArray>.consumeAsSource(scope: CoroutineScope): Source {
val buffer = Buffer()
//subscription is canceled when the scope is canceled
onEach {
buffer.write(it)
}.launchIn(scope)
return buffer
}