Compare commits

..

No commits in common. "58675f72f598aac1fbcdfbadfad0c61d286c5947" and "d91296c47d73358b3d5e0ebcb018240902168cf7" have entirely different histories.

28 changed files with 542 additions and 1031 deletions

View File

@ -8,7 +8,6 @@
### Changed
- Constructor properties return `DeviceStat` in order to be able to subscribe to them
- Refactored ports. Now we have `AsynchronousPort` as well as `SynchronousPort`
### Deprecated

View File

@ -1,40 +0,0 @@
package space.kscience.controls.api
import kotlinx.coroutines.flow.Flow
/**
* A generic bidirectional asynchronous sender/receiver object
*/
public interface AsynchronousSocket<T> : AutoCloseable {
/**
* Send an object to the socket
*/
public suspend fun send(data: T)
/**
* Flow of objects received from socket
*/
public fun subscribe(): Flow<T>
/**
* Start socket operation
*/
public fun open()
/**
* Check if this socket is open
*/
public val isOpen: Boolean
}
/**
* Connect an input to this socket.
* Multiple inputs could be connected to the same [AsynchronousSocket].
*
* This method suspends indefinitely, so it should be started in a separate coroutine.
*/
public suspend fun <T> AsynchronousSocket<T>.sendFlow(flow: Flow<T>) {
flow.collect { send(it) }
}

View File

@ -0,0 +1,32 @@
package space.kscience.controls.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
/**
* A generic bidirectional sender/receiver object
*/
public interface Socket<T> : AutoCloseable {
/**
* Send an object to the socket
*/
public suspend fun send(data: T)
/**
* Flow of objects received from socket
*/
public fun receiving(): Flow<T>
public fun isOpen(): Boolean
}
/**
* Connect an input to this socket using designated [scope] for it and return a handler [Job].
* Multiple inputs could be connected to the same [Socket].
*/
public fun <T> Socket<T>.connectInput(scope: CoroutineScope, flow: Flow<T>): Job = scope.launch {
flow.collect { send(it) }
}

View File

@ -1,125 +0,0 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.io.Buffer
import kotlinx.io.Source
import space.kscience.controls.api.AsynchronousSocket
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
/**
* Raw [ByteArray] port
*/
public interface AsynchronousPort : ContextAware, AsynchronousSocket<ByteArray>
/**
* Capture [AsynchronousPort] output as kotlinx-io [Source].
* [scope] controls the consummation.
* If the scope is canceled, the source stops producing.
*/
public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source {
val buffer = Buffer()
subscribe().onEach {
buffer.write(it)
}.launchIn(scope)
return buffer
}
/**
* Common abstraction for [AsynchronousPort] based on [Channel]
*/
public abstract class AbstractAsynchronousPort(
override val context: Context,
public val meta: Meta,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AsynchronousPort {
protected val scope: CoroutineScope by lazy {
CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { throwable.stackTraceToString() } } +
CoroutineName(toString())
)
}
private val outgoing = Channel<ByteArray>(meta["outgoing.capacity"].int?:100)
private val incoming = Channel<ByteArray>(meta["incoming.capacity"].int?:100)
/**
* Internal method to synchronously send data
*/
protected abstract suspend fun write(data: ByteArray)
/**
* Internal method to receive data synchronously
*/
protected suspend fun receive(data: ByteArray) {
logger.debug { "$this RECEIVED: ${data.decodeToString()}" }
incoming.send(data)
}
private var sendJob: Job? = null
protected abstract fun onOpen()
final override fun open() {
if (!isOpen) {
sendJob = scope.launch {
for (data in outgoing) {
try {
write(data)
logger.debug { "${this@AbstractAsynchronousPort} SENT: ${data.decodeToString()}" }
} catch (ex: Exception) {
if (ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" }
}
}
}
onOpen()
} else {
logger.warn { "$this already opened" }
}
}
/**
* Send a data packet via the port
*/
override suspend fun send(data: ByteArray) {
outgoing.send(data)
}
/**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* To form phrases, some condition should be used on top of it.
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
*/
override fun subscribe(): Flow<ByteArray> = incoming.receiveAsFlow()
override fun close() {
outgoing.close()
incoming.close()
sendJob?.cancel()
}
override fun toString(): String = meta["name"].string?:"ChannelPort[${hashCode().toString(16)}]"
}
/**
* Send UTF-8 encoded string
*/
public suspend fun AsynchronousPort.send(string: String): Unit = send(string.encodeToByteArray())

View File

@ -0,0 +1,100 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import space.kscience.controls.api.Socket
import space.kscience.dataforge.context.*
import space.kscience.dataforge.misc.DfType
import kotlin.coroutines.CoroutineContext
/**
* Raw [ByteArray] port
*/
public interface Port : ContextAware, Socket<ByteArray>
/**
* A specialized factory for [Port]
*/
@DfType(PortFactory.TYPE)
public interface PortFactory : Factory<Port> {
public val type: String
public companion object {
public const val TYPE: String = "controls.port"
}
}
/**
* Common abstraction for [Port] based on [Channel]
*/
public abstract class AbstractPort(
override val context: Context,
coroutineContext: CoroutineContext = context.coroutineContext,
) : Port {
protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job]))
private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(100)
init {
scope.coroutineContext[Job]?.invokeOnCompletion {
close()
}
}
/**
* Internal method to synchronously send data
*/
protected abstract suspend fun write(data: ByteArray)
/**
* Internal method to receive data synchronously
*/
protected suspend fun receive(data: ByteArray) {
logger.debug { "${this@AbstractPort} RECEIVED: ${data.decodeToString()}" }
incoming.send(data)
}
private val sendJob = scope.launch {
for (data in outgoing) {
try {
write(data)
logger.debug { "${this@AbstractPort} SENT: ${data.decodeToString()}" }
} catch (ex: Exception) {
if (ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" }
}
}
}
/**
* Send a data packet via the port
*/
override suspend fun send(data: ByteArray) {
outgoing.send(data)
}
/**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* To form phrases, some condition should be used on top of it.
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
*/
override fun receiving(): Flow<ByteArray> = incoming.receiveAsFlow()
override fun close() {
outgoing.close()
incoming.close()
scope.cancel()
}
override fun isOpen(): Boolean = scope.isActive
}
/**
* Send UTF-8 encoded string
*/
public suspend fun Port.send(string: String): Unit = send(string.encodeToByteArray())

View File

@ -0,0 +1,64 @@
package space.kscience.controls.ports
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.context.*
/**
* A port that could be closed multiple times and opens automatically on request
*/
public class PortProxy(override val context: Context = Global, public val factory: suspend () -> Port) : Port, ContextAware {
private var actualPort: Port? = null
private val mutex: Mutex = Mutex()
private suspend fun port(): Port {
return mutex.withLock {
if (actualPort?.isOpen() == true) {
actualPort!!
} else {
factory().also {
actualPort = it
}
}
}
}
override suspend fun send(data: ByteArray) {
port().send(data)
}
@OptIn(ExperimentalCoroutinesApi::class)
override fun receiving(): Flow<ByteArray> = flow {
while (true) {
try {
//recreate port and Flow on connection problems
port().receiving().collect {
emit(it)
}
} catch (t: Throwable) {
logger.warn{"Port read failed: ${t.message}. Reconnecting."}
mutex.withLock {
actualPort?.close()
actualPort = null
}
}
}
}
// open by default
override fun isOpen(): Boolean = true
override fun close() {
context.launch {
mutex.withLock {
actualPort?.close()
actualPort = null
}
}
}
}

View File

@ -11,43 +11,26 @@ public class Ports : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
private val synchronousPortFactories by lazy {
context.gather<Factory<SynchronousPort>>(SYNCHRONOUS_PORT_TYPE)
private val portFactories by lazy {
context.gather<PortFactory>(PortFactory.TYPE)
}
private val asynchronousPortFactories by lazy {
context.gather<Factory<AsynchronousPort>>(ASYNCHRONOUS_PORT_TYPE)
}
private val portCache = mutableMapOf<Meta, Port>()
/**
* Create a new [AsynchronousPort] according to specification
* Create a new [Port] according to specification
*/
public fun buildAsynchronousPort(meta: Meta): AsynchronousPort {
public fun buildPort(meta: Meta): Port = portCache.getOrPut(meta) {
val type by meta.string { error("Port type is not defined") }
val factory = asynchronousPortFactories.entries
.firstOrNull { it.key.toString() == type }?.value
val factory = portFactories.values.firstOrNull { it.type == type }
?: error("Port factory for type $type not found")
return factory.build(context, meta)
}
/**
* Create a [SynchronousPort] according to specification or wrap an asynchronous implementation
*/
public fun buildSynchronousPort(meta: Meta): SynchronousPort {
val type by meta.string { error("Port type is not defined") }
val factory = synchronousPortFactories.entries
.firstOrNull { it.key.toString() == type }?.value
?: return buildAsynchronousPort(meta).asSynchronousPort()
return factory.build(context, meta)
factory.build(context, meta)
}
public companion object : PluginFactory<Ports> {
override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP)
public const val ASYNCHRONOUS_PORT_TYPE: String = "controls.asynchronousPort"
public const val SYNCHRONOUS_PORT_TYPE: String = "controls.synchronousPort"
override fun build(context: Context, meta: Meta): Ports = Ports()
}

View File

@ -2,86 +2,27 @@ package space.kscience.controls.ports
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.io.Buffer
import kotlinx.io.readByteArray
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextAware
/**
* A port handler for synchronous (request-response) communication with a port.
* Only one request could be active at a time (others are suspended).
* A port handler for synchronous (request-response) communication with a port. Only one request could be active at a time (others are suspended.
* The handler does not guarantee exclusive access to the port so the user mush ensure that no other controller handles port at the moment.
*/
public interface SynchronousPort : ContextAware, AutoCloseable {
public fun open()
public val isOpen: Boolean
public class SynchronousPort(public val port: Port, private val mutex: Mutex) : Port by port {
/**
* Send a single message and wait for the flow of response chunks.
* The consumer is responsible for calling a terminal operation on the flow.
* Send a single message and wait for the flow of respond messages.
*/
public suspend fun <R> respond(
request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R
/**
* Synchronously read fixed size response to a given [request]. Discard additional response bytes.
*/
public suspend fun respondFixedMessageSize(
request: ByteArray,
responseSize: Int,
): ByteArray = respond(request) {
val buffer = Buffer()
takeWhile {
buffer.size < responseSize
}.collect {
buffer.write(it)
}
buffer.readByteArray(responseSize)
public suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R = mutex.withLock {
port.send(data)
transform(port.receiving())
}
}
private class SynchronousOverAsynchronousPort(
val port: AsynchronousPort,
val mutex: Mutex,
) : SynchronousPort {
override val context: Context get() = port.context
override fun open() {
if (!port.isOpen) port.open()
}
override val isOpen: Boolean get() = port.isOpen
override fun close() {
if (port.isOpen) port.close()
}
override suspend fun <R> respond(
request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R = mutex.withLock {
port.send(request)
transform(port.subscribe())
}
}
/**
* Provide a synchronous wrapper for an asynchronous port.
* Optionally provide external [mutex] for operation synchronization.
*
* If the [AsynchronousPort] is called directly, it could violate [SynchronousPort] contract
* of only one request running simultaneously.
* Provide a synchronous wrapper for a port
*/
public fun AsynchronousPort.asSynchronousPort(mutex: Mutex = Mutex()): SynchronousPort =
SynchronousOverAsynchronousPort(this, mutex)
public fun Port.synchronous(mutex: Mutex = Mutex()): SynchronousPort = SynchronousPort(this, mutex)
/**
* Send request and read incoming data blocks until the delimiter is encountered

View File

@ -77,9 +77,9 @@ public fun Flow<ByteArray>.withStringDelimiter(delimiter: String): Flow<String>
/**
* A flow of delimited phrases
*/
public fun AsynchronousPort.delimitedIncoming(delimiter: ByteArray): Flow<ByteArray> = subscribe().withDelimiter(delimiter)
public fun Port.delimitedIncoming(delimiter: ByteArray): Flow<ByteArray> = receiving().withDelimiter(delimiter)
/**
* A flow of delimited phrases with string content
*/
public fun AsynchronousPort.stringsDelimitedIncoming(delimiter: String): Flow<String> = subscribe().withStringDelimiter(delimiter)
public fun Port.stringsDelimitedIncoming(delimiter: String): Flow<String> = receiving().withStringDelimiter(delimiter)

View File

@ -1,7 +1,10 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import space.kscience.dataforge.context.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.info
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.*
import java.net.InetSocketAddress
import java.nio.ByteBuffer
@ -11,10 +14,7 @@ import java.nio.channels.DatagramChannel
import java.nio.channels.SocketChannel
import kotlin.coroutines.CoroutineContext
/**
* Copy the contents of this buffer to an array
*/
public fun ByteBuffer.copyToArray(limit: Int = limit()): ByteArray {
public fun ByteBuffer.toArray(limit: Int = limit()): ByteArray {
rewind()
val response = ByteArray(limit)
get(response)
@ -27,40 +27,35 @@ public fun ByteBuffer.copyToArray(limit: Int = limit()): ByteArray {
*/
public class ChannelPort(
context: Context,
meta: Meta,
coroutineContext: CoroutineContext = context.coroutineContext,
channelBuilder: suspend () -> ByteChannel,
) : AbstractAsynchronousPort(context, meta, coroutineContext), AutoCloseable {
) : AbstractPort(context, coroutineContext), AutoCloseable {
private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO) {
channelBuilder()
}
/**
* A handler to await port connection
*/
private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
channelBuilder()
}
public val startJob: Job get() = futureChannel
private var listenerJob: Job? = null
override val isOpen: Boolean get() = listenerJob?.isActive == true
override fun onOpen() {
listenerJob = scope.launch(Dispatchers.IO) {
val channel = futureChannel.await()
val buffer = ByteBuffer.allocate(1024)
while (isActive && channel.isOpen) {
try {
val num = channel.read(buffer)
if (num > 0) {
receive(buffer.copyToArray(num))
}
if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) {
if (ex is AsynchronousCloseException) {
logger.info { "Channel $channel closed" }
} else {
logger.error(ex) { "Channel read error, retrying in 1 second" }
delay(1000)
}
private val listenerJob = scope.launch(Dispatchers.IO) {
val channel = futureChannel.await()
val buffer = ByteBuffer.allocate(1024)
while (isActive && channel.isOpen) {
try {
val num = channel.read(buffer)
if (num > 0) {
receive(buffer.toArray(num))
}
if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) {
if (ex is AsynchronousCloseException) {
logger.info { "Channel $channel closed" }
} else {
logger.error(ex) { "Channel read error, retrying in 1 second" }
delay(1000)
}
}
}
@ -72,7 +67,6 @@ public class ChannelPort(
@OptIn(ExperimentalCoroutinesApi::class)
override fun close() {
listenerJob?.cancel()
if (futureChannel.isCompleted) {
futureChannel.getCompleted().close()
}
@ -81,95 +75,61 @@ public class ChannelPort(
}
/**
* A [Factory] for TCP connections
* A [PortFactory] for TCP connections
*/
public object TcpPort : Factory<AsynchronousPort> {
public object TcpPort : PortFactory {
public fun build(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort {
val meta = Meta {
"name" put "tcp://$host:$port"
"type" put "tcp"
"host" put host
"port" put port
}
return ChannelPort(context, meta, coroutineContext) {
SocketChannel.open(InetSocketAddress(host, port))
}
}
override val type: String = "tcp"
/**
* Create and open TCP port
*/
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = build(context, host, port, coroutineContext).apply { open() }
): ChannelPort = ChannelPort(context, coroutineContext) {
SocketChannel.open(InetSocketAddress(host, port))
}
override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return build(context, host, port)
return open(context, host, port)
}
}
/**
* A [Factory] for UDP connections
* A [PortFactory] for UDP connections
*/
public object UdpPort : Factory<AsynchronousPort> {
public object UdpPort : PortFactory {
public fun build(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String? = null,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort {
val meta = Meta {
"name" put "udp://$remoteHost:$remotePort"
"type" put "udp"
"remoteHost" put remoteHost
"remotePort" put remotePort
localHost?.let { "localHost" put it }
localPort?.let { "localPort" put it }
}
return ChannelPort(context, meta, coroutineContext) {
DatagramChannel.open().apply {
//bind the channel to a local port to receive messages
localPort?.let { bind(InetSocketAddress(localHost ?: "localhost", it)) }
//connect to remote port to send messages
connect(InetSocketAddress(remoteHost, remotePort.toInt()))
context.logger.info { "Connected to UDP $remotePort on $remoteHost" }
}
}
}
override val type: String = "udp"
/**
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
*/
public fun open(
public fun openChannel(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String = "localhost",
): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { open() }
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = ChannelPort(context, coroutineContext) {
DatagramChannel.open().apply {
//bind the channel to a local port to receive messages
localPort?.let { bind(InetSocketAddress(localHost, localPort)) }
//connect to remote port to send messages
connect(InetSocketAddress(remoteHost, remotePort))
context.logger.info { "Connected to UDP $remotePort on $remoteHost" }
}
}
override fun build(context: Context, meta: Meta): ChannelPort {
val remoteHost by meta.string { error("Remote host is not specified") }
val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string()
val localPort: Int? by meta.int()
return build(context, remoteHost, remotePort.toInt(), localPort, localHost)
return openChannel(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
}
}

View File

@ -6,7 +6,7 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.parseAsName
/**
* A plugin for loading JVM nio-based ports
@ -17,9 +17,9 @@ public class JvmPortsPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when(target){
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
"tcp".asName() to TcpPort,
"udp".asName() to UdpPort
PortFactory.TYPE -> mapOf(
TcpPort.type.parseAsName() to TcpPort,
UdpPort.type.parseAsName() to UdpPort
)
else -> emptyMap()
}

View File

@ -1,8 +1,10 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import java.net.DatagramPacket
import java.net.DatagramSocket
import kotlin.coroutines.CoroutineContext
@ -12,39 +14,33 @@ import kotlin.coroutines.CoroutineContext
*/
public class UdpSocketPort(
override val context: Context,
meta: Meta,
private val socket: DatagramSocket,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
) : AbstractPort(context, coroutineContext) {
private var listenerJob: Job? = null
private val listenerJob = context.launch(Dispatchers.IO) {
while (isActive) {
val buf = ByteArray(socket.receiveBufferSize)
override fun onOpen() {
listenerJob = context.launch(Dispatchers.IO) {
while (isActive) {
val buf = ByteArray(socket.receiveBufferSize)
val packet = DatagramPacket(
buf,
buf.size,
)
socket.receive(packet)
val packet = DatagramPacket(
buf,
buf.size,
)
socket.receive(packet)
val bytes = packet.data.copyOfRange(
packet.offset,
packet.offset + packet.length
)
receive(bytes)
}
val bytes = packet.data.copyOfRange(
packet.offset,
packet.offset + packet.length
)
receive(bytes)
}
}
override fun close() {
listenerJob?.cancel()
super.close()
listenerJob.cancel()
}
override val isOpen: Boolean get() = listenerJob?.isActive == true
override fun isOpen(): Boolean = listenerJob.isActive
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {

View File

@ -12,7 +12,7 @@ import space.kscience.dataforge.context.Global
import kotlin.test.assertEquals
internal class AsynchronousPortIOTest {
internal class PortIOTest {
@Test
fun testDelimiteredByteArrayFlow() {
@ -29,8 +29,8 @@ internal class AsynchronousPortIOTest {
@Test
fun testUdpCommunication() = runTest {
val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811)
val receiver = UdpPort.openChannel(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.openChannel(Global, "localhost", 8812, localPort = 8811)
delay(30)
repeat(10) {
@ -38,7 +38,7 @@ internal class AsynchronousPortIOTest {
}
val res = receiver
.subscribe()
.receiving()
.withStringDelimiter("\n")
.take(10)
.toList()

View File

@ -1,95 +0,0 @@
package space.kscience.controls.pi
import com.pi4j.io.serial.Baud
import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.*
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.controls.ports.copyToArray
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
public class AsynchronousPiPort(
context: Context,
meta: Meta,
private val serial: Serial,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
private var listenerJob: Job? = null
override fun onOpen() {
serial.open()
listenerJob = this.scope.launch(Dispatchers.IO) {
val buffer = ByteBuffer.allocate(1024)
while (isActive) {
try {
val num = serial.read(buffer)
if (num > 0) {
receive(buffer.copyToArray(num))
}
if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}
}
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
serial.write(data)
}
override val isOpen: Boolean get() = listenerJob?.isActive == true
override fun close() {
listenerJob?.cancel()
serial.close()
}
public companion object : Factory<AsynchronousPort> {
public fun build(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): AsynchronousPiPort {
val meta = Meta {
"name" put "pi://$device"
"type" put "serial"
}
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device, block)
return AsynchronousPiPort(context, meta, serial)
}
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): AsynchronousPiPort = build(context, device, block).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val device: String = meta["device"].string ?: error("Device name not defined")
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device) {
baud8N1(baudRate)
}
return AsynchronousPiPort(context, meta, serial)
}
}
}

View File

@ -2,6 +2,7 @@ package space.kscience.controls.pi
import com.pi4j.Pi4J
import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.ports.PortFactory
import space.kscience.controls.ports.Ports
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Context
@ -9,7 +10,7 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.parseAsName
import com.pi4j.context.Context as PiContext
public class PiPlugin : AbstractPlugin() {
@ -21,11 +22,8 @@ public class PiPlugin : AbstractPlugin() {
public val piContext: PiContext by lazy { createPiContext(context, meta) }
override fun content(target: String): Map<Name, Any> = when (target) {
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to AsynchronousPiPort,
)
Ports.SYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to SynchronousPiPort,
PortFactory.TYPE -> mapOf(
PiSerialPort.type.parseAsName() to PiSerialPort,
)
else -> super.content(target)
@ -42,7 +40,6 @@ public class PiPlugin : AbstractPlugin() {
override fun build(context: Context, meta: Meta): PiPlugin = PiPlugin()
@Suppress("UNUSED_PARAMETER")
public fun createPiContext(context: Context, meta: Meta): PiContext = Pi4J.newAutoContext()
}

View File

@ -0,0 +1,82 @@
package space.kscience.controls.pi
import com.pi4j.io.serial.Baud
import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.*
import space.kscience.controls.ports.AbstractPort
import space.kscience.controls.ports.Port
import space.kscience.controls.ports.PortFactory
import space.kscience.controls.ports.toArray
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.request
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
import com.pi4j.context.Context as PiContext
public class PiSerialPort(
context: Context,
coroutineContext: CoroutineContext = context.coroutineContext,
public val serialBuilder: PiContext.() -> Serial,
) : AbstractPort(context, coroutineContext) {
private val serial: Serial by lazy {
val pi = context.request(PiPlugin)
pi.piContext.serialBuilder()
}
private val listenerJob = this.scope.launch(Dispatchers.IO) {
val buffer = ByteBuffer.allocate(1024)
while (isActive) {
try {
val num = serial.read(buffer)
if (num > 0) {
receive(buffer.toArray(num))
}
if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
serial.write(data)
}
override fun close() {
listenerJob.cancel()
serial.close()
}
public companion object : PortFactory {
override val type: String get() = "pi"
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): PiSerialPort = PiSerialPort(context) {
serial(device, block)
}
override fun build(context: Context, meta: Meta): Port = PiSerialPort(context) {
val device: String = meta["device"].string ?: error("Device name not defined")
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
serial(device) {
baud8N1(baudRate)
}
}
}
}

View File

@ -1,107 +0,0 @@
package space.kscience.controls.pi
import com.pi4j.io.serial.Baud
import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.ports.SynchronousPort
import space.kscience.controls.ports.copyToArray
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
public class SynchronousPiPort(
override val context: Context,
public val meta: Meta,
private val serial: Serial,
private val mutex: Mutex = Mutex(),
) : SynchronousPort {
private val pi = context.request(PiPlugin)
override fun open() {
serial.open()
}
override val isOpen: Boolean get() = serial.isOpen
override suspend fun <R> respond(
request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R = mutex.withLock {
serial.drain()
serial.write(request)
flow<ByteArray> {
val buffer = ByteBuffer.allocate(1024)
while (isOpen) {
try {
val num = serial.read(buffer)
if (num > 0) {
emit(buffer.copyToArray(num))
}
if (num < 0) break
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}.transform()
}
override suspend fun respondFixedMessageSize(request: ByteArray, responseSize: Int): ByteArray = mutex.withLock {
runInterruptible {
serial.drain()
serial.write(request)
serial.readNBytes(responseSize)
}
}
override fun close() {
serial.close()
}
public companion object : Factory<SynchronousPort> {
public fun build(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): SynchronousPiPort {
val meta = Meta {
"name" put "pi://$device"
"type" put "serial"
}
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device, block)
return SynchronousPiPort(context, meta, serial)
}
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): SynchronousPiPort = build(context, device, block).apply { open() }
override fun build(context: Context, meta: Meta): SynchronousPiPort {
val device: String = meta["device"].string ?: error("Device name not defined")
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device) {
baud8N1(baudRate)
}
return SynchronousPiPort(context, meta, serial)
}
}
}

View File

@ -13,7 +13,7 @@ public class KtorPortsPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when (target) {
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort)
PortFactory.TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort)
else -> emptyMap()
}

View File

@ -8,46 +8,42 @@ import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
public class KtorTcpPort internal constructor(
context: Context,
meta: Meta,
public val host: String,
public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
) : AbstractPort(context, coroutineContext), Closeable {
override fun toString(): String = "port[tcp:$host:$port]"
private val futureSocket = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions)
}
private val writeChannel = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
private val writeChannel = scope.async {
futureSocket.await().openWriteChannel(true)
}
private var listenerJob: Job? = null
override fun onOpen() {
listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer: ByteBuffer, last ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
!last && isActive
}
private val listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, _ ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
isActive
}
}
@ -55,45 +51,30 @@ public class KtorTcpPort internal constructor(
writeChannel.await().writeAvailable(data)
}
override val isOpen: Boolean
get() = listenerJob?.isActive == true
override fun close() {
listenerJob?.cancel()
listenerJob.cancel()
futureSocket.cancel()
super.close()
}
public companion object : Factory<AsynchronousPort> {
public companion object : PortFactory {
public fun build(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
): KtorTcpPort {
val meta = Meta {
"name" put "tcp://$host:$port"
"type" put "tcp"
"host" put host
"port" put port
}
return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions)
}
override val type: String = "tcp"
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { open() }
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): KtorTcpPort {
return KtorTcpPort(context, host, port, coroutineContext, socketOptions)
}
override fun build(context: Context, meta: Meta): AsynchronousPort {
override fun build(context: Context, meta: Meta): Port {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return build(context, host, port)
return open(context, host, port)
}
}
}

View File

@ -8,7 +8,6 @@ import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.number
@ -17,18 +16,17 @@ import kotlin.coroutines.CoroutineContext
public class KtorUdpPort internal constructor(
context: Context,
meta: Meta,
public val remoteHost: String,
public val remotePort: Int,
public val localPort: Int? = null,
public val localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}
) : AbstractPort(context, coroutineContext), Closeable {
override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
private val futureSocket = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect(
remoteAddress = InetSocketAddress(remoteHost, remotePort),
localAddress = localPort?.let { InetSocketAddress(localHost, localPort) },
@ -36,21 +34,17 @@ public class KtorUdpPort internal constructor(
)
}
private val writeChannel: Deferred<ByteWriteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
private val writeChannel: Deferred<ByteWriteChannel> = scope.async {
futureSocket.await().openWriteChannel(true)
}
private var listenerJob: Job? = null
override fun onOpen() {
listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, last ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
!last && isActive
}
private val listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, _ ->
val array = ByteArray(buffer.remaining())
buffer.get(array)
receive(array)
isActive
}
}
@ -58,49 +52,16 @@ public class KtorUdpPort internal constructor(
writeChannel.await().writeAvailable(data)
}
override val isOpen: Boolean
get() = listenerJob?.isActive == true
override fun close() {
listenerJob?.cancel()
listenerJob.cancel()
futureSocket.cancel()
super.close()
}
public companion object : Factory<AsynchronousPort> {
public companion object : PortFactory {
public fun build(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String? = null,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
): KtorUdpPort {
val meta = Meta {
"name" put "udp://$remoteHost:$remotePort"
"type" put "udp"
"remoteHost" put remoteHost
"remotePort" put remotePort
localHost?.let { "localHost" put it }
localPort?.let { "localPort" put it }
}
return KtorUdpPort(
context = context,
meta = meta,
remoteHost = remoteHost,
remotePort = remotePort,
localPort = localPort,
localHost = localHost ?: "localhost",
coroutineContext = coroutineContext,
socketOptions = socketOptions
)
}
override val type: String = "udp"
/**
* Create and open UDP port
*/
public fun open(
context: Context,
remoteHost: String,
@ -108,23 +69,23 @@ public class KtorUdpPort internal constructor(
localPort: Int? = null,
localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
): KtorUdpPort = build(
context,
remoteHost,
remotePort,
localPort,
localHost,
coroutineContext,
socketOptions
).apply { open() }
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}
): KtorUdpPort = KtorUdpPort(
context = context,
remoteHost = remoteHost,
remotePort = remotePort,
localPort = localPort,
localHost = localHost,
coroutineContext = coroutineContext,
socketOptions = socketOptions
)
override fun build(context: Context, meta: Meta): AsynchronousPort {
override fun build(context: Context, meta: Meta): Port {
val remoteHost by meta.string { error("Remote host is not specified") }
val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string()
val localPort: Int? by meta.int()
return build(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
}
}
}

View File

@ -1,134 +0,0 @@
package space.kscience.controls.serial
import com.fazecast.jSerialComm.SerialPort
import com.fazecast.jSerialComm.SerialPortDataListener
import com.fazecast.jSerialComm.SerialPortEvent
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
/**
* A port based on JSerialComm
*/
public class AsynchronousSerialPort(
context: Context,
meta: Meta,
private val comPort: SerialPort,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
override fun toString(): String = "port[${comPort.descriptivePortName}]"
private val serialPortListener = object : SerialPortDataListener {
override fun getListeningEvents(): Int =
SerialPort.LISTENING_EVENT_DATA_RECEIVED and SerialPort.LISTENING_EVENT_DATA_AVAILABLE
override fun serialEvent(event: SerialPortEvent) {
when (event.eventType) {
SerialPort.LISTENING_EVENT_DATA_RECEIVED -> {
scope.launch { receive(event.receivedData) }
}
SerialPort.LISTENING_EVENT_DATA_AVAILABLE -> {
scope.launch(Dispatchers.IO) {
val available = comPort.bytesAvailable()
if (available > 0) {
val buffer = ByteArray(available)
comPort.readBytes(buffer, available)
receive(buffer)
}
}
}
}
}
}
override fun onOpen() {
comPort.openPort()
comPort.addDataListener(serialPortListener)
}
override val isOpen: Boolean get() = comPort.isOpen
override suspend fun write(data: ByteArray) {
comPort.writeBytes(data, data.size)
}
override fun close() {
comPort.removeDataListener()
if (comPort.isOpen) {
comPort.closePort()
}
super.close()
}
public companion object : Factory<AsynchronousPort> {
public fun build(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
coroutineContext: CoroutineContext = context.coroutineContext,
additionalConfig: SerialPort.() -> Unit = {},
): AsynchronousSerialPort {
val serialPort = SerialPort.getCommPort(portName).apply {
setComPortParameters(baudRate, dataBits, stopBits, parity)
additionalConfig()
}
val meta = Meta {
"name" put "com://$portName"
"type" put "serial"
"baudRate" put serialPort.baudRate
"dataBits" put serialPort.numDataBits
"stopBits" put serialPort.numStopBits
"parity" put serialPort.parity
}
return AsynchronousSerialPort(context, meta, serialPort, coroutineContext)
}
/**
* Construct ComPort with given parameters
*/
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
coroutineContext: CoroutineContext = context.coroutineContext,
additionalConfig: SerialPort.() -> Unit = {},
): AsynchronousSerialPort = build(
context = context,
portName = portName,
baudRate = baudRate,
dataBits = dataBits,
stopBits = stopBits,
parity = parity,
coroutineContext = coroutineContext,
additionalConfig = additionalConfig
).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val name by meta.string { error("Serial port name not defined") }
val baudRate by meta.int(9600)
val dataBits by meta.int(8)
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
val parity by meta.int(SerialPort.NO_PARITY)
return build(context, name, baudRate, dataBits, stopBits, parity)
}
}
}

View File

@ -0,0 +1,87 @@
package space.kscience.controls.serial
import com.fazecast.jSerialComm.SerialPort
import com.fazecast.jSerialComm.SerialPortDataListener
import com.fazecast.jSerialComm.SerialPortEvent
import kotlinx.coroutines.launch
import space.kscience.controls.ports.AbstractPort
import space.kscience.controls.ports.Port
import space.kscience.controls.ports.PortFactory
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
/**
* A port based on JSerialComm
*/
public class JSerialCommPort(
context: Context,
private val comPort: SerialPort,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, coroutineContext) {
override fun toString(): String = "port[${comPort.descriptivePortName}]"
private val serialPortListener = object : SerialPortDataListener {
override fun getListeningEvents(): Int = SerialPort.LISTENING_EVENT_DATA_AVAILABLE
override fun serialEvent(event: SerialPortEvent) {
if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE && event.receivedData != null) {
scope.launch { receive(event.receivedData) }
}
}
}
init {
comPort.addDataListener(serialPortListener)
}
override suspend fun write(data: ByteArray) {
comPort.writeBytes(data, data.size)
}
override fun close() {
comPort.removeDataListener()
if (comPort.isOpen) {
comPort.closePort()
}
super.close()
}
public companion object : PortFactory {
override val type: String = "com"
/**
* Construct ComPort with given parameters
*/
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
coroutineContext: CoroutineContext = context.coroutineContext,
): JSerialCommPort {
val serialPort = SerialPort.getCommPort(portName).apply {
setComPortParameters(baudRate, dataBits, stopBits, parity)
openPort()
}
return JSerialCommPort(context, serialPort, coroutineContext)
}
override fun build(context: Context, meta: Meta): Port {
val name by meta.string { error("Serial port name not defined") }
val baudRate by meta.int(9600)
val dataBits by meta.int(8)
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
val parity by meta.int(SerialPort.NO_PARITY)
return open(context, name, baudRate, dataBits, stopBits, parity)
}
}
}

View File

@ -1,27 +1,19 @@
package space.kscience.controls.serial
import space.kscience.controls.ports.Ports
import space.kscience.controls.ports.PortFactory
import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
public class SerialPortPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when (target) {
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to AsynchronousSerialPort,
)
Ports.SYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to SynchronousSerialPort,
)
override fun content(target: String): Map<Name, Any> = when(target){
PortFactory.TYPE -> mapOf(Name.EMPTY to JSerialCommPort)
else -> emptyMap()
}

View File

@ -1,140 +0,0 @@
package space.kscience.controls.serial
import com.fazecast.jSerialComm.SerialPort
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runInterruptible
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.ports.SynchronousPort
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
/**
* A port based on JSerialComm
*/
public class SynchronousSerialPort(
override val context: Context,
public val meta: Meta,
private val comPort: SerialPort,
) : SynchronousPort {
override fun toString(): String = "port[${comPort.descriptivePortName}]"
override fun open() {
if (!isOpen) {
comPort.openPort()
}
}
override val isOpen: Boolean get() = comPort.isOpen
override fun close() {
if (comPort.isOpen) {
comPort.closePort()
}
}
private val mutex = Mutex()
override suspend fun <R> respond(
request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R = mutex.withLock {
comPort.flushIOBuffers()
comPort.writeBytes(request, request.size)
flow<ByteArray> {
while (isOpen) {
try {
val available = comPort.bytesAvailable()
if (available > 0) {
val buffer = ByteArray(available)
comPort.readBytes(buffer, available)
emit(buffer)
} else if (available < 0) break
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}.transform()
}
override suspend fun respondFixedMessageSize(request: ByteArray, responseSize: Int): ByteArray = mutex.withLock {
runInterruptible {
comPort.flushIOBuffers()
comPort.writeBytes(request, request.size)
val buffer = ByteArray(responseSize)
comPort.readBytes(buffer, responseSize)
buffer
}
}
public companion object : Factory<SynchronousPort> {
public fun build(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
additionalConfig: SerialPort.() -> Unit = {},
): SynchronousSerialPort {
val serialPort = SerialPort.getCommPort(portName).apply {
setComPortParameters(baudRate, dataBits, stopBits, parity)
additionalConfig()
}
val meta = Meta {
"name" put "com://$portName"
"type" put "serial"
"baudRate" put serialPort.baudRate
"dataBits" put serialPort.numDataBits
"stopBits" put serialPort.numStopBits
"parity" put serialPort.parity
}
return SynchronousSerialPort(context, meta, serialPort)
}
/**
* Construct ComPort with given parameters
*/
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
additionalConfig: SerialPort.() -> Unit = {},
): SynchronousSerialPort = build(
context = context,
portName = portName,
baudRate = baudRate,
dataBits = dataBits,
stopBits = stopBits,
parity = parity,
additionalConfig = additionalConfig
).apply { open() }
override fun build(context: Context, meta: Meta): SynchronousPort {
val name by meta.string { error("Serial port name not defined") }
val baudRate by meta.int(9600)
val dataBits by meta.int(8)
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
val parity by meta.int(SerialPort.NO_PARITY)
return build(context, name, baudRate, dataBits, stopBits, parity)
}
}
}

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.withTimeoutOrNull
import space.kscience.controls.ports.Ports
import space.kscience.controls.ports.SynchronousPort
import space.kscience.controls.ports.respondStringWithDelimiter
import space.kscience.controls.ports.synchronous
import space.kscience.controls.spec.*
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
@ -21,7 +22,7 @@ class MksPdr900Device(context: Context, meta: Meta) : DeviceBySpec<MksPdr900Devi
private val portDelegate = lazy {
val ports = context.request(Ports)
ports.buildSynchronousPort(meta["port"] ?: error("Port is not defined in device configuration"))
ports.buildPort(meta["port"] ?: error("Port is not defined in device configuration")).synchronous()
}
private val port: SynchronousPort by portDelegate

View File

@ -25,10 +25,10 @@ import kotlin.time.Duration.Companion.milliseconds
class PiMotionMasterDevice(
context: Context,
private val portFactory: Factory<AsynchronousPort> = KtorTcpPort,
private val portFactory: PortFactory = KtorTcpPort,
) : DeviceBySpec<PiMotionMasterDevice>(PiMotionMasterDevice, context), DeviceHub {
private var port: AsynchronousPort? = null
private var port: Port? = null
//TODO make proxy work
//PortProxy { portFactory(address ?: error("The device is not connected"), context) }
@ -83,7 +83,7 @@ class PiMotionMasterDevice(
suspend fun getErrorCode(): Int = mutex.withLock {
withTimeout(timeoutValue) {
sendCommandInternal("ERR?")
val errorString = port?.subscribe()?.withStringDelimiter("\n")?.first() ?: error("Not connected to device")
val errorString = port?.receiving()?.withStringDelimiter("\n")?.first() ?: error("Not connected to device")
errorString.trim().toInt()
}
}
@ -96,7 +96,7 @@ class PiMotionMasterDevice(
try {
withTimeout(timeoutValue) {
sendCommandInternal(command, *arguments)
val phrases = port?.subscribe()?.withStringDelimiter("\n") ?: error("Not connected to device")
val phrases = port?.receiving()?.withStringDelimiter("\n") ?: error("Not connected to device")
phrases.transformWhile { line ->
emit(line)
line.endsWith(" \n")

View File

@ -5,15 +5,14 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.AsynchronousSocket
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.api.Socket
import space.kscience.controls.ports.AbstractPort
import space.kscience.controls.ports.withDelimiter
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import kotlin.math.abs
import kotlin.time.Duration
abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<ByteArray> {
abstract class VirtualDevice(val scope: CoroutineScope) : Socket<ByteArray> {
protected abstract suspend fun evaluateRequest(request: ByteArray)
@ -41,42 +40,33 @@ abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<Byt
toRespond.send(response)
}
override fun subscribe(): Flow<ByteArray> = toRespond.receiveAsFlow()
override fun receiving(): Flow<ByteArray> = toRespond.receiveAsFlow()
protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch {
delay(delay)
respond(response())
}
override val isOpen: Boolean
get() = scope.isActive
override fun isOpen(): Boolean = scope.isActive
override fun close() = scope.cancel()
}
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractAsynchronousPort(context, Meta.EMPTY) {
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) {
private var respondJob: Job? = null
override fun onOpen() {
respondJob = device.subscribe().onEach {
receive(it)
}.catch {
it.printStackTrace()
}.launchIn(scope)
}
private val respondJob = device.receiving().onEach {
receive(it)
}.catch {
it.printStackTrace()
}.launchIn(scope)
override suspend fun write(data: ByteArray) {
device.send(data)
}
override val isOpen: Boolean
get() = respondJob?.isActive == true
override fun close() {
respondJob?.cancel()
respondJob.cancel()
super.close()
}
}
@ -88,7 +78,7 @@ class PiMotionMasterVirtualDevice(
scope: CoroutineScope = context,
) : VirtualDevice(scope), ContextAware {
override fun open() {
init {
//add asynchronous send logic here
}
@ -112,11 +102,9 @@ class PiMotionMasterVirtualDevice(
abs(distance) < proposedStep -> {
position = targetPosition
}
targetPosition > position -> {
position += proposedStep
}
else -> {
position -= proposedStep
}
@ -192,10 +180,8 @@ class PiMotionMasterVirtualDevice(
when (command) {
"XXX" -> {
}
"IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1")
"VER?" -> respond(
"""
"VER?" -> respond("""
2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039
3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039
4: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550041, 00.039
@ -209,11 +195,8 @@ class PiMotionMasterVirtualDevice(
12: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550049, 00.039
13: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550051, 00.039
FW_ARM: V1.0.0.1
""".trimIndent()
)
"HLP?" -> respond(
"""
""".trimIndent())
"HLP?" -> respond("""
The following commands are valid:
#4 Request Status Register
#5 Request Motion Status
@ -252,14 +235,11 @@ class PiMotionMasterVirtualDevice(
VEL? [{<AxisID>}] Get Closed-Loop Velocity
VER? Get Versions Of Firmware And Drivers
end of help
""".trimIndent()
)
""".trimIndent())
"ERR?" -> {
respond(errorCode.toString())
errorCode = 0
}
"SAI?" -> respond(axisState.keys.joinToString(separator = " \n"))
"CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" }
"RON?" -> respondForAllAxis(axisIds) { referenceMode }
@ -275,19 +255,15 @@ class PiMotionMasterVirtualDevice(
"SVO" -> doForEachAxis(parts) { key, value ->
axisState[key]?.servoMode = value.toInt()
}
"MOV" -> doForEachAxis(parts) { key, value ->
axisState[key]?.targetPosition = value.toDouble()
}
"VEL" -> doForEachAxis(parts) { key, value ->
axisState[key]?.velocity = value.toDouble()
}
"INI" -> {
logger.info { "Axes initialized!" }
}
else -> {
logger.warn { "Unknown command: $command in message ${String(request)}" }
errorCode = 2

View File

@ -29,7 +29,7 @@ fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exc
val output = socket.openWriteChannel()
val sendJob = launch {
virtualDevice.subscribe().collect {
virtualDevice.receiving().collect {
//println("Sending: ${it.decodeToString()}")
output.writeAvailable(it)
output.flush()