Refactor ports

This commit is contained in:
Alexander Nozik 2024-03-31 16:33:22 +03:00
parent 85c2910ee9
commit 58675f72f5
3 changed files with 58 additions and 17 deletions

View File

@ -2,8 +2,11 @@ package space.kscience.controls.ports
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import kotlinx.io.Buffer
import kotlinx.io.readByteArray
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextAware import space.kscience.dataforge.context.ContextAware
@ -25,6 +28,22 @@ public interface SynchronousPort : ContextAware, AutoCloseable {
request: ByteArray, request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R, transform: suspend Flow<ByteArray>.() -> R,
): R ): R
/**
* Synchronously read fixed size response to a given [request]. Discard additional response bytes.
*/
public suspend fun respondFixedMessageSize(
request: ByteArray,
responseSize: Int,
): ByteArray = respond(request) {
val buffer = Buffer()
takeWhile {
buffer.size < responseSize
}.collect {
buffer.write(it)
}
buffer.readByteArray(responseSize)
}
} }
private class SynchronousOverAsynchronousPort( private class SynchronousOverAsynchronousPort(

View File

@ -7,6 +7,7 @@ import com.pi4j.ktx.io.serial
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.controls.ports.SynchronousPort import space.kscience.controls.ports.SynchronousPort
@ -55,6 +56,14 @@ public class SynchronousPiPort(
}.transform() }.transform()
} }
override suspend fun respondFixedMessageSize(request: ByteArray, responseSize: Int): ByteArray = mutex.withLock {
runInterruptible {
serial.drain()
serial.write(request)
serial.readNBytes(responseSize)
}
}
override fun close() { override fun close() {
serial.close() serial.close()
} }

View File

@ -4,6 +4,7 @@ import com.fazecast.jSerialComm.SerialPort
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.controls.ports.SynchronousPort import space.kscience.controls.ports.SynchronousPort
@ -44,26 +45,38 @@ public class SynchronousSerialPort(
private val mutex = Mutex() private val mutex = Mutex()
override suspend fun <R> respond(request: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R = override suspend fun <R> respond(
mutex.withLock { request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R = mutex.withLock {
comPort.flushIOBuffers()
comPort.writeBytes(request, request.size)
flow<ByteArray> {
while (isOpen) {
try {
val available = comPort.bytesAvailable()
if (available > 0) {
val buffer = ByteArray(available)
comPort.readBytes(buffer, available)
emit(buffer)
} else if (available < 0) break
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}.transform()
}
override suspend fun respondFixedMessageSize(request: ByteArray, responseSize: Int): ByteArray = mutex.withLock {
runInterruptible {
comPort.flushIOBuffers() comPort.flushIOBuffers()
comPort.writeBytes(request, request.size) comPort.writeBytes(request, request.size)
flow<ByteArray> { val buffer = ByteArray(responseSize)
while (isOpen) { comPort.readBytes(buffer, responseSize)
try { buffer
val available = comPort.bytesAvailable()
if (available > 0) {
val buffer = ByteArray(available)
comPort.readBytes(buffer, available)
emit(buffer)
} else if (available < 0) break
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}.transform()
} }
}
public companion object : Factory<SynchronousPort> { public companion object : Factory<SynchronousPort> {