Refactor ports

This commit is contained in:
Alexander Nozik 2024-03-31 16:13:02 +03:00
parent d91296c47d
commit 85c2910ee9
28 changed files with 990 additions and 542 deletions

View File

@ -8,6 +8,7 @@
### Changed ### Changed
- Constructor properties return `DeviceStat` in order to be able to subscribe to them - Constructor properties return `DeviceStat` in order to be able to subscribe to them
- Refactored ports. Now we have `AsynchronousPort` as well as `SynchronousPort`
### Deprecated ### Deprecated

View File

@ -0,0 +1,40 @@
package space.kscience.controls.api
import kotlinx.coroutines.flow.Flow
/**
* A generic bidirectional asynchronous sender/receiver object
*/
public interface AsynchronousSocket<T> : AutoCloseable {
/**
* Send an object to the socket
*/
public suspend fun send(data: T)
/**
* Flow of objects received from socket
*/
public fun subscribe(): Flow<T>
/**
* Start socket operation
*/
public fun open()
/**
* Check if this socket is open
*/
public val isOpen: Boolean
}
/**
* Connect an input to this socket.
* Multiple inputs could be connected to the same [AsynchronousSocket].
*
* This method suspends indefinitely, so it should be started in a separate coroutine.
*/
public suspend fun <T> AsynchronousSocket<T>.sendFlow(flow: Flow<T>) {
flow.collect { send(it) }
}

View File

@ -1,32 +0,0 @@
package space.kscience.controls.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.launch
/**
* A generic bidirectional sender/receiver object
*/
public interface Socket<T> : AutoCloseable {
/**
* Send an object to the socket
*/
public suspend fun send(data: T)
/**
* Flow of objects received from socket
*/
public fun receiving(): Flow<T>
public fun isOpen(): Boolean
}
/**
* Connect an input to this socket using designated [scope] for it and return a handler [Job].
* Multiple inputs could be connected to the same [Socket].
*/
public fun <T> Socket<T>.connectInput(scope: CoroutineScope, flow: Flow<T>): Job = scope.launch {
flow.collect { send(it) }
}

View File

@ -0,0 +1,125 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.io.Buffer
import kotlinx.io.Source
import space.kscience.controls.api.AsynchronousSocket
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
/**
* Raw [ByteArray] port
*/
public interface AsynchronousPort : ContextAware, AsynchronousSocket<ByteArray>
/**
* Capture [AsynchronousPort] output as kotlinx-io [Source].
* [scope] controls the consummation.
* If the scope is canceled, the source stops producing.
*/
public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source {
val buffer = Buffer()
subscribe().onEach {
buffer.write(it)
}.launchIn(scope)
return buffer
}
/**
* Common abstraction for [AsynchronousPort] based on [Channel]
*/
public abstract class AbstractAsynchronousPort(
override val context: Context,
public val meta: Meta,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AsynchronousPort {
protected val scope: CoroutineScope by lazy {
CoroutineScope(
coroutineContext +
SupervisorJob(coroutineContext[Job]) +
CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { throwable.stackTraceToString() } } +
CoroutineName(toString())
)
}
private val outgoing = Channel<ByteArray>(meta["outgoing.capacity"].int?:100)
private val incoming = Channel<ByteArray>(meta["incoming.capacity"].int?:100)
/**
* Internal method to synchronously send data
*/
protected abstract suspend fun write(data: ByteArray)
/**
* Internal method to receive data synchronously
*/
protected suspend fun receive(data: ByteArray) {
logger.debug { "$this RECEIVED: ${data.decodeToString()}" }
incoming.send(data)
}
private var sendJob: Job? = null
protected abstract fun onOpen()
final override fun open() {
if (!isOpen) {
sendJob = scope.launch {
for (data in outgoing) {
try {
write(data)
logger.debug { "${this@AbstractAsynchronousPort} SENT: ${data.decodeToString()}" }
} catch (ex: Exception) {
if (ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" }
}
}
}
onOpen()
} else {
logger.warn { "$this already opened" }
}
}
/**
* Send a data packet via the port
*/
override suspend fun send(data: ByteArray) {
outgoing.send(data)
}
/**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* To form phrases, some condition should be used on top of it.
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
*/
override fun subscribe(): Flow<ByteArray> = incoming.receiveAsFlow()
override fun close() {
outgoing.close()
incoming.close()
sendJob?.cancel()
}
override fun toString(): String = meta["name"].string?:"ChannelPort[${hashCode().toString(16)}]"
}
/**
* Send UTF-8 encoded string
*/
public suspend fun AsynchronousPort.send(string: String): Unit = send(string.encodeToByteArray())

View File

@ -1,100 +0,0 @@
package space.kscience.controls.ports
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import space.kscience.controls.api.Socket
import space.kscience.dataforge.context.*
import space.kscience.dataforge.misc.DfType
import kotlin.coroutines.CoroutineContext
/**
* Raw [ByteArray] port
*/
public interface Port : ContextAware, Socket<ByteArray>
/**
* A specialized factory for [Port]
*/
@DfType(PortFactory.TYPE)
public interface PortFactory : Factory<Port> {
public val type: String
public companion object {
public const val TYPE: String = "controls.port"
}
}
/**
* Common abstraction for [Port] based on [Channel]
*/
public abstract class AbstractPort(
override val context: Context,
coroutineContext: CoroutineContext = context.coroutineContext,
) : Port {
protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job]))
private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(100)
init {
scope.coroutineContext[Job]?.invokeOnCompletion {
close()
}
}
/**
* Internal method to synchronously send data
*/
protected abstract suspend fun write(data: ByteArray)
/**
* Internal method to receive data synchronously
*/
protected suspend fun receive(data: ByteArray) {
logger.debug { "${this@AbstractPort} RECEIVED: ${data.decodeToString()}" }
incoming.send(data)
}
private val sendJob = scope.launch {
for (data in outgoing) {
try {
write(data)
logger.debug { "${this@AbstractPort} SENT: ${data.decodeToString()}" }
} catch (ex: Exception) {
if (ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" }
}
}
}
/**
* Send a data packet via the port
*/
override suspend fun send(data: ByteArray) {
outgoing.send(data)
}
/**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* To form phrases, some condition should be used on top of it.
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
*/
override fun receiving(): Flow<ByteArray> = incoming.receiveAsFlow()
override fun close() {
outgoing.close()
incoming.close()
scope.cancel()
}
override fun isOpen(): Boolean = scope.isActive
}
/**
* Send UTF-8 encoded string
*/
public suspend fun Port.send(string: String): Unit = send(string.encodeToByteArray())

View File

@ -1,64 +0,0 @@
package space.kscience.controls.ports
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.context.*
/**
* A port that could be closed multiple times and opens automatically on request
*/
public class PortProxy(override val context: Context = Global, public val factory: suspend () -> Port) : Port, ContextAware {
private var actualPort: Port? = null
private val mutex: Mutex = Mutex()
private suspend fun port(): Port {
return mutex.withLock {
if (actualPort?.isOpen() == true) {
actualPort!!
} else {
factory().also {
actualPort = it
}
}
}
}
override suspend fun send(data: ByteArray) {
port().send(data)
}
@OptIn(ExperimentalCoroutinesApi::class)
override fun receiving(): Flow<ByteArray> = flow {
while (true) {
try {
//recreate port and Flow on connection problems
port().receiving().collect {
emit(it)
}
} catch (t: Throwable) {
logger.warn{"Port read failed: ${t.message}. Reconnecting."}
mutex.withLock {
actualPort?.close()
actualPort = null
}
}
}
}
// open by default
override fun isOpen(): Boolean = true
override fun close() {
context.launch {
mutex.withLock {
actualPort?.close()
actualPort = null
}
}
}
}

View File

@ -11,26 +11,43 @@ public class Ports : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag override val tag: PluginTag get() = Companion.tag
private val portFactories by lazy { private val synchronousPortFactories by lazy {
context.gather<PortFactory>(PortFactory.TYPE) context.gather<Factory<SynchronousPort>>(SYNCHRONOUS_PORT_TYPE)
} }
private val portCache = mutableMapOf<Meta, Port>() private val asynchronousPortFactories by lazy {
context.gather<Factory<AsynchronousPort>>(ASYNCHRONOUS_PORT_TYPE)
}
/** /**
* Create a new [Port] according to specification * Create a new [AsynchronousPort] according to specification
*/ */
public fun buildPort(meta: Meta): Port = portCache.getOrPut(meta) { public fun buildAsynchronousPort(meta: Meta): AsynchronousPort {
val type by meta.string { error("Port type is not defined") } val type by meta.string { error("Port type is not defined") }
val factory = portFactories.values.firstOrNull { it.type == type } val factory = asynchronousPortFactories.entries
.firstOrNull { it.key.toString() == type }?.value
?: error("Port factory for type $type not found") ?: error("Port factory for type $type not found")
factory.build(context, meta) return factory.build(context, meta)
}
/**
* Create a [SynchronousPort] according to specification or wrap an asynchronous implementation
*/
public fun buildSynchronousPort(meta: Meta): SynchronousPort {
val type by meta.string { error("Port type is not defined") }
val factory = synchronousPortFactories.entries
.firstOrNull { it.key.toString() == type }?.value
?: return buildAsynchronousPort(meta).asSynchronousPort()
return factory.build(context, meta)
} }
public companion object : PluginFactory<Ports> { public companion object : PluginFactory<Ports> {
override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP) override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP)
public const val ASYNCHRONOUS_PORT_TYPE: String = "controls.asynchronousPort"
public const val SYNCHRONOUS_PORT_TYPE: String = "controls.synchronousPort"
override fun build(context: Context, meta: Meta): Ports = Ports() override fun build(context: Context, meta: Meta): Ports = Ports()
} }

View File

@ -4,25 +4,65 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextAware
/** /**
* A port handler for synchronous (request-response) communication with a port. Only one request could be active at a time (others are suspended. * A port handler for synchronous (request-response) communication with a port.
* The handler does not guarantee exclusive access to the port so the user mush ensure that no other controller handles port at the moment. * Only one request could be active at a time (others are suspended).
*/ */
public class SynchronousPort(public val port: Port, private val mutex: Mutex) : Port by port { public interface SynchronousPort : ContextAware, AutoCloseable {
public fun open()
public val isOpen: Boolean
/** /**
* Send a single message and wait for the flow of respond messages. * Send a single message and wait for the flow of response chunks.
* The consumer is responsible for calling a terminal operation on the flow.
*/ */
public suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R = mutex.withLock { public suspend fun <R> respond(
port.send(data) request: ByteArray,
transform(port.receiving()) transform: suspend Flow<ByteArray>.() -> R,
): R
}
private class SynchronousOverAsynchronousPort(
val port: AsynchronousPort,
val mutex: Mutex,
) : SynchronousPort {
override val context: Context get() = port.context
override fun open() {
if (!port.isOpen) port.open()
}
override val isOpen: Boolean get() = port.isOpen
override fun close() {
if (port.isOpen) port.close()
}
override suspend fun <R> respond(
request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R = mutex.withLock {
port.send(request)
transform(port.subscribe())
} }
} }
/** /**
* Provide a synchronous wrapper for a port * Provide a synchronous wrapper for an asynchronous port.
* Optionally provide external [mutex] for operation synchronization.
*
* If the [AsynchronousPort] is called directly, it could violate [SynchronousPort] contract
* of only one request running simultaneously.
*/ */
public fun Port.synchronous(mutex: Mutex = Mutex()): SynchronousPort = SynchronousPort(this, mutex) public fun AsynchronousPort.asSynchronousPort(mutex: Mutex = Mutex()): SynchronousPort =
SynchronousOverAsynchronousPort(this, mutex)
/** /**
* Send request and read incoming data blocks until the delimiter is encountered * Send request and read incoming data blocks until the delimiter is encountered

View File

@ -77,9 +77,9 @@ public fun Flow<ByteArray>.withStringDelimiter(delimiter: String): Flow<String>
/** /**
* A flow of delimited phrases * A flow of delimited phrases
*/ */
public fun Port.delimitedIncoming(delimiter: ByteArray): Flow<ByteArray> = receiving().withDelimiter(delimiter) public fun AsynchronousPort.delimitedIncoming(delimiter: ByteArray): Flow<ByteArray> = subscribe().withDelimiter(delimiter)
/** /**
* A flow of delimited phrases with string content * A flow of delimited phrases with string content
*/ */
public fun Port.stringsDelimitedIncoming(delimiter: String): Flow<String> = receiving().withStringDelimiter(delimiter) public fun AsynchronousPort.stringsDelimitedIncoming(delimiter: String): Flow<String> = subscribe().withStringDelimiter(delimiter)

View File

@ -1,10 +1,7 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.*
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.info
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.ByteBuffer import java.nio.ByteBuffer
@ -14,7 +11,10 @@ import java.nio.channels.DatagramChannel
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
public fun ByteBuffer.toArray(limit: Int = limit()): ByteArray { /**
* Copy the contents of this buffer to an array
*/
public fun ByteBuffer.copyToArray(limit: Int = limit()): ByteArray {
rewind() rewind()
val response = ByteArray(limit) val response = ByteArray(limit)
get(response) get(response)
@ -27,27 +27,31 @@ public fun ByteBuffer.toArray(limit: Int = limit()): ByteArray {
*/ */
public class ChannelPort( public class ChannelPort(
context: Context, context: Context,
meta: Meta,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
channelBuilder: suspend () -> ByteChannel, channelBuilder: suspend () -> ByteChannel,
) : AbstractPort(context, coroutineContext), AutoCloseable { ) : AbstractAsynchronousPort(context, meta, coroutineContext), AutoCloseable {
private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO) {
channelBuilder()
}
/** /**
* A handler to await port connection * A handler to await port connection
*/ */
public val startJob: Job get() = futureChannel private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
channelBuilder()
}
private val listenerJob = scope.launch(Dispatchers.IO) { private var listenerJob: Job? = null
override val isOpen: Boolean get() = listenerJob?.isActive == true
override fun onOpen() {
listenerJob = scope.launch(Dispatchers.IO) {
val channel = futureChannel.await() val channel = futureChannel.await()
val buffer = ByteBuffer.allocate(1024) val buffer = ByteBuffer.allocate(1024)
while (isActive && channel.isOpen) { while (isActive && channel.isOpen) {
try { try {
val num = channel.read(buffer) val num = channel.read(buffer)
if (num > 0) { if (num > 0) {
receive(buffer.toArray(num)) receive(buffer.copyToArray(num))
} }
if (num < 0) cancel("The input channel is exhausted") if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) { } catch (ex: Exception) {
@ -60,6 +64,7 @@ public class ChannelPort(
} }
} }
} }
}
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) { override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
futureChannel.await().write(ByteBuffer.wrap(data)) futureChannel.await().write(ByteBuffer.wrap(data))
@ -67,6 +72,7 @@ public class ChannelPort(
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
override fun close() { override fun close() {
listenerJob?.cancel()
if (futureChannel.isCompleted) { if (futureChannel.isCompleted) {
futureChannel.getCompleted().close() futureChannel.getCompleted().close()
} }
@ -75,61 +81,95 @@ public class ChannelPort(
} }
/** /**
* A [PortFactory] for TCP connections * A [Factory] for TCP connections
*/ */
public object TcpPort : PortFactory { public object TcpPort : Factory<AsynchronousPort> {
override val type: String = "tcp" public fun build(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort {
val meta = Meta {
"name" put "tcp://$host:$port"
"type" put "tcp"
"host" put host
"port" put port
}
return ChannelPort(context, meta, coroutineContext) {
SocketChannel.open(InetSocketAddress(host, port))
}
}
/**
* Create and open TCP port
*/
public fun open( public fun open(
context: Context, context: Context,
host: String, host: String,
port: Int, port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort = ChannelPort(context, coroutineContext) { ): ChannelPort = build(context, host, port, coroutineContext).apply { open() }
SocketChannel.open(InetSocketAddress(host, port))
}
override fun build(context: Context, meta: Meta): ChannelPort { override fun build(context: Context, meta: Meta): ChannelPort {
val host = meta["host"].string ?: "localhost" val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta") val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return open(context, host, port) return build(context, host, port)
} }
} }
/** /**
* A [PortFactory] for UDP connections * A [Factory] for UDP connections
*/ */
public object UdpPort : PortFactory { public object UdpPort : Factory<AsynchronousPort> {
override val type: String = "udp" public fun build(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String? = null,
coroutineContext: CoroutineContext = context.coroutineContext,
): ChannelPort {
val meta = Meta {
"name" put "udp://$remoteHost:$remotePort"
"type" put "udp"
"remoteHost" put remoteHost
"remotePort" put remotePort
localHost?.let { "localHost" put it }
localPort?.let { "localPort" put it }
}
return ChannelPort(context, meta, coroutineContext) {
DatagramChannel.open().apply {
//bind the channel to a local port to receive messages
localPort?.let { bind(InetSocketAddress(localHost ?: "localhost", it)) }
//connect to remote port to send messages
connect(InetSocketAddress(remoteHost, remotePort.toInt()))
context.logger.info { "Connected to UDP $remotePort on $remoteHost" }
}
}
}
/** /**
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages. * Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
*/ */
public fun openChannel( public fun open(
context: Context, context: Context,
remoteHost: String, remoteHost: String,
remotePort: Int, remotePort: Int,
localPort: Int? = null, localPort: Int? = null,
localHost: String = "localhost", localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext, ): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { open() }
): ChannelPort = ChannelPort(context, coroutineContext) {
DatagramChannel.open().apply {
//bind the channel to a local port to receive messages
localPort?.let { bind(InetSocketAddress(localHost, localPort)) }
//connect to remote port to send messages
connect(InetSocketAddress(remoteHost, remotePort))
context.logger.info { "Connected to UDP $remotePort on $remoteHost" }
}
}
override fun build(context: Context, meta: Meta): ChannelPort { override fun build(context: Context, meta: Meta): ChannelPort {
val remoteHost by meta.string { error("Remote host is not specified") } val remoteHost by meta.string { error("Remote host is not specified") }
val remotePort by meta.number { error("Remote port is not specified") } val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string() val localHost: String? by meta.string()
val localPort: Int? by meta.int() val localPort: Int? by meta.int()
return openChannel(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost") return build(context, remoteHost, remotePort.toInt(), localPort, localHost)
} }
} }

View File

@ -6,7 +6,7 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName import space.kscience.dataforge.names.asName
/** /**
* A plugin for loading JVM nio-based ports * A plugin for loading JVM nio-based ports
@ -17,9 +17,9 @@ public class JvmPortsPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when(target){ override fun content(target: String): Map<Name, Any> = when(target){
PortFactory.TYPE -> mapOf( Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
TcpPort.type.parseAsName() to TcpPort, "tcp".asName() to TcpPort,
UdpPort.type.parseAsName() to UdpPort "udp".asName() to UdpPort
) )
else -> emptyMap() else -> emptyMap()
} }

View File

@ -1,10 +1,8 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import java.net.DatagramPacket import java.net.DatagramPacket
import java.net.DatagramSocket import java.net.DatagramSocket
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
@ -14,11 +12,15 @@ import kotlin.coroutines.CoroutineContext
*/ */
public class UdpSocketPort( public class UdpSocketPort(
override val context: Context, override val context: Context,
meta: Meta,
private val socket: DatagramSocket, private val socket: DatagramSocket,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, coroutineContext) { ) : AbstractAsynchronousPort(context, meta, coroutineContext) {
private val listenerJob = context.launch(Dispatchers.IO) { private var listenerJob: Job? = null
override fun onOpen() {
listenerJob = context.launch(Dispatchers.IO) {
while (isActive) { while (isActive) {
val buf = ByteArray(socket.receiveBufferSize) val buf = ByteArray(socket.receiveBufferSize)
@ -35,12 +37,14 @@ public class UdpSocketPort(
receive(bytes) receive(bytes)
} }
} }
override fun close() {
listenerJob.cancel()
} }
override fun isOpen(): Boolean = listenerJob.isActive override fun close() {
listenerJob?.cancel()
super.close()
}
override val isOpen: Boolean get() = listenerJob?.isActive == true
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) { override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {

View File

@ -12,7 +12,7 @@ import space.kscience.dataforge.context.Global
import kotlin.test.assertEquals import kotlin.test.assertEquals
internal class PortIOTest { internal class AsynchronousPortIOTest {
@Test @Test
fun testDelimiteredByteArrayFlow() { fun testDelimiteredByteArrayFlow() {
@ -29,8 +29,8 @@ internal class PortIOTest {
@Test @Test
fun testUdpCommunication() = runTest { fun testUdpCommunication() = runTest {
val receiver = UdpPort.openChannel(Global, "localhost", 8811, localPort = 8812) val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.openChannel(Global, "localhost", 8812, localPort = 8811) val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811)
delay(30) delay(30)
repeat(10) { repeat(10) {
@ -38,7 +38,7 @@ internal class PortIOTest {
} }
val res = receiver val res = receiver
.receiving() .subscribe()
.withStringDelimiter("\n") .withStringDelimiter("\n")
.take(10) .take(10)
.toList() .toList()

View File

@ -0,0 +1,95 @@
package space.kscience.controls.pi
import com.pi4j.io.serial.Baud
import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.*
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.controls.ports.copyToArray
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
public class AsynchronousPiPort(
context: Context,
meta: Meta,
private val serial: Serial,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
private var listenerJob: Job? = null
override fun onOpen() {
serial.open()
listenerJob = this.scope.launch(Dispatchers.IO) {
val buffer = ByteBuffer.allocate(1024)
while (isActive) {
try {
val num = serial.read(buffer)
if (num > 0) {
receive(buffer.copyToArray(num))
}
if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}
}
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
serial.write(data)
}
override val isOpen: Boolean get() = listenerJob?.isActive == true
override fun close() {
listenerJob?.cancel()
serial.close()
}
public companion object : Factory<AsynchronousPort> {
public fun build(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): AsynchronousPiPort {
val meta = Meta {
"name" put "pi://$device"
"type" put "serial"
}
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device, block)
return AsynchronousPiPort(context, meta, serial)
}
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): AsynchronousPiPort = build(context, device, block).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val device: String = meta["device"].string ?: error("Device name not defined")
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device) {
baud8N1(baudRate)
}
return AsynchronousPiPort(context, meta, serial)
}
}
}

View File

@ -2,7 +2,6 @@ package space.kscience.controls.pi
import com.pi4j.Pi4J import com.pi4j.Pi4J
import space.kscience.controls.manager.DeviceManager import space.kscience.controls.manager.DeviceManager
import space.kscience.controls.ports.PortFactory
import space.kscience.controls.ports.Ports import space.kscience.controls.ports.Ports
import space.kscience.dataforge.context.AbstractPlugin import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
@ -10,7 +9,7 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.parseAsName import space.kscience.dataforge.names.asName
import com.pi4j.context.Context as PiContext import com.pi4j.context.Context as PiContext
public class PiPlugin : AbstractPlugin() { public class PiPlugin : AbstractPlugin() {
@ -22,8 +21,11 @@ public class PiPlugin : AbstractPlugin() {
public val piContext: PiContext by lazy { createPiContext(context, meta) } public val piContext: PiContext by lazy { createPiContext(context, meta) }
override fun content(target: String): Map<Name, Any> = when (target) { override fun content(target: String): Map<Name, Any> = when (target) {
PortFactory.TYPE -> mapOf( Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
PiSerialPort.type.parseAsName() to PiSerialPort, "serial".asName() to AsynchronousPiPort,
)
Ports.SYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to SynchronousPiPort,
) )
else -> super.content(target) else -> super.content(target)
@ -40,6 +42,7 @@ public class PiPlugin : AbstractPlugin() {
override fun build(context: Context, meta: Meta): PiPlugin = PiPlugin() override fun build(context: Context, meta: Meta): PiPlugin = PiPlugin()
@Suppress("UNUSED_PARAMETER")
public fun createPiContext(context: Context, meta: Meta): PiContext = Pi4J.newAutoContext() public fun createPiContext(context: Context, meta: Meta): PiContext = Pi4J.newAutoContext()
} }

View File

@ -1,82 +0,0 @@
package space.kscience.controls.pi
import com.pi4j.io.serial.Baud
import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.*
import space.kscience.controls.ports.AbstractPort
import space.kscience.controls.ports.Port
import space.kscience.controls.ports.PortFactory
import space.kscience.controls.ports.toArray
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.request
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext
import com.pi4j.context.Context as PiContext
public class PiSerialPort(
context: Context,
coroutineContext: CoroutineContext = context.coroutineContext,
public val serialBuilder: PiContext.() -> Serial,
) : AbstractPort(context, coroutineContext) {
private val serial: Serial by lazy {
val pi = context.request(PiPlugin)
pi.piContext.serialBuilder()
}
private val listenerJob = this.scope.launch(Dispatchers.IO) {
val buffer = ByteBuffer.allocate(1024)
while (isActive) {
try {
val num = serial.read(buffer)
if (num > 0) {
receive(buffer.toArray(num))
}
if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
serial.write(data)
}
override fun close() {
listenerJob.cancel()
serial.close()
}
public companion object : PortFactory {
override val type: String get() = "pi"
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): PiSerialPort = PiSerialPort(context) {
serial(device, block)
}
override fun build(context: Context, meta: Meta): Port = PiSerialPort(context) {
val device: String = meta["device"].string ?: error("Device name not defined")
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
serial(device) {
baud8N1(baudRate)
}
}
}
}

View File

@ -0,0 +1,98 @@
package space.kscience.controls.pi
import com.pi4j.io.serial.Baud
import com.pi4j.io.serial.Serial
import com.pi4j.io.serial.SerialConfigBuilder
import com.pi4j.ktx.io.serial
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.ports.SynchronousPort
import space.kscience.controls.ports.copyToArray
import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.enum
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
public class SynchronousPiPort(
override val context: Context,
public val meta: Meta,
private val serial: Serial,
private val mutex: Mutex = Mutex(),
) : SynchronousPort {
private val pi = context.request(PiPlugin)
override fun open() {
serial.open()
}
override val isOpen: Boolean get() = serial.isOpen
override suspend fun <R> respond(
request: ByteArray,
transform: suspend Flow<ByteArray>.() -> R,
): R = mutex.withLock {
serial.drain()
serial.write(request)
flow<ByteArray> {
val buffer = ByteBuffer.allocate(1024)
while (isOpen) {
try {
val num = serial.read(buffer)
if (num > 0) {
emit(buffer.copyToArray(num))
}
if (num < 0) break
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}.transform()
}
override fun close() {
serial.close()
}
public companion object : Factory<SynchronousPort> {
public fun build(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): SynchronousPiPort {
val meta = Meta {
"name" put "pi://$device"
"type" put "serial"
}
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device, block)
return SynchronousPiPort(context, meta, serial)
}
public fun open(
context: Context,
device: String,
block: SerialConfigBuilder.() -> Unit,
): SynchronousPiPort = build(context, device, block).apply { open() }
override fun build(context: Context, meta: Meta): SynchronousPiPort {
val device: String = meta["device"].string ?: error("Device name not defined")
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
val pi = context.request(PiPlugin)
val serial = pi.piContext.serial(device) {
baud8N1(baudRate)
}
return SynchronousPiPort(context, meta, serial)
}
}
}

View File

@ -13,7 +13,7 @@ public class KtorPortsPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when (target) { override fun content(target: String): Map<Name, Any> = when (target) {
PortFactory.TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort) Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort)
else -> emptyMap() else -> emptyMap()
} }

View File

@ -8,42 +8,46 @@ import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.async
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import java.nio.ByteBuffer
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
public class KtorTcpPort internal constructor( public class KtorTcpPort internal constructor(
context: Context, context: Context,
meta: Meta,
public val host: String, public val host: String,
public val port: Int, public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {} socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
) : AbstractPort(context, coroutineContext), Closeable { ) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
override fun toString(): String = "port[tcp:$host:$port]" override fun toString(): String = "port[tcp:$host:$port]"
private val futureSocket = scope.async { private val futureSocket = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions) aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions)
} }
private val writeChannel = scope.async { private val writeChannel = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
futureSocket.await().openWriteChannel(true) futureSocket.await().openWriteChannel(true)
} }
private val listenerJob = scope.launch { private var listenerJob: Job? = null
override fun onOpen() {
listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel() val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, _ -> input.consumeEachBufferRange { buffer: ByteBuffer, last ->
val array = ByteArray(buffer.remaining()) val array = ByteArray(buffer.remaining())
buffer.get(array) buffer.get(array)
receive(array) receive(array)
isActive !last && isActive
}
} }
} }
@ -51,30 +55,45 @@ public class KtorTcpPort internal constructor(
writeChannel.await().writeAvailable(data) writeChannel.await().writeAvailable(data)
} }
override val isOpen: Boolean
get() = listenerJob?.isActive == true
override fun close() { override fun close() {
listenerJob.cancel() listenerJob?.cancel()
futureSocket.cancel() futureSocket.cancel()
super.close() super.close()
} }
public companion object : PortFactory { public companion object : Factory<AsynchronousPort> {
override val type: String = "tcp" public fun build(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
): KtorTcpPort {
val meta = Meta {
"name" put "tcp://$host:$port"
"type" put "tcp"
"host" put host
"port" put port
}
return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions)
}
public fun open( public fun open(
context: Context, context: Context,
host: String, host: String,
port: Int, port: Int,
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {} socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
): KtorTcpPort { ): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { open() }
return KtorTcpPort(context, host, port, coroutineContext, socketOptions)
}
override fun build(context: Context, meta: Meta): Port { override fun build(context: Context, meta: Meta): AsynchronousPort {
val host = meta["host"].string ?: "localhost" val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta") val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return open(context, host, port) return build(context, host, port)
} }
} }
} }

View File

@ -8,6 +8,7 @@ import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.* import kotlinx.coroutines.*
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.number import space.kscience.dataforge.meta.number
@ -16,17 +17,18 @@ import kotlin.coroutines.CoroutineContext
public class KtorUdpPort internal constructor( public class KtorUdpPort internal constructor(
context: Context, context: Context,
meta: Meta,
public val remoteHost: String, public val remoteHost: String,
public val remotePort: Int, public val remotePort: Int,
public val localPort: Int? = null, public val localPort: Int? = null,
public val localHost: String = "localhost", public val localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {} socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
) : AbstractPort(context, coroutineContext), Closeable { ) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
override fun toString(): String = "port[udp:$remoteHost:$remotePort]" override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
private val futureSocket = scope.async { private val futureSocket = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect( aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect(
remoteAddress = InetSocketAddress(remoteHost, remotePort), remoteAddress = InetSocketAddress(remoteHost, remotePort),
localAddress = localPort?.let { InetSocketAddress(localHost, localPort) }, localAddress = localPort?.let { InetSocketAddress(localHost, localPort) },
@ -34,17 +36,21 @@ public class KtorUdpPort internal constructor(
) )
} }
private val writeChannel: Deferred<ByteWriteChannel> = scope.async { private val writeChannel: Deferred<ByteWriteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
futureSocket.await().openWriteChannel(true) futureSocket.await().openWriteChannel(true)
} }
private val listenerJob = scope.launch { private var listenerJob: Job? = null
override fun onOpen() {
listenerJob = scope.launch {
val input = futureSocket.await().openReadChannel() val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, _ -> input.consumeEachBufferRange { buffer, last ->
val array = ByteArray(buffer.remaining()) val array = ByteArray(buffer.remaining())
buffer.get(array) buffer.get(array)
receive(array) receive(array)
isActive !last && isActive
}
} }
} }
@ -52,16 +58,49 @@ public class KtorUdpPort internal constructor(
writeChannel.await().writeAvailable(data) writeChannel.await().writeAvailable(data)
} }
override val isOpen: Boolean
get() = listenerJob?.isActive == true
override fun close() { override fun close() {
listenerJob.cancel() listenerJob?.cancel()
futureSocket.cancel() futureSocket.cancel()
super.close() super.close()
} }
public companion object : PortFactory { public companion object : Factory<AsynchronousPort> {
override val type: String = "udp" public fun build(
context: Context,
remoteHost: String,
remotePort: Int,
localPort: Int? = null,
localHost: String? = null,
coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
): KtorUdpPort {
val meta = Meta {
"name" put "udp://$remoteHost:$remotePort"
"type" put "udp"
"remoteHost" put remoteHost
"remotePort" put remotePort
localHost?.let { "localHost" put it }
localPort?.let { "localPort" put it }
}
return KtorUdpPort(
context = context,
meta = meta,
remoteHost = remoteHost,
remotePort = remotePort,
localPort = localPort,
localHost = localHost ?: "localhost",
coroutineContext = coroutineContext,
socketOptions = socketOptions
)
}
/**
* Create and open UDP port
*/
public fun open( public fun open(
context: Context, context: Context,
remoteHost: String, remoteHost: String,
@ -69,23 +108,23 @@ public class KtorUdpPort internal constructor(
localPort: Int? = null, localPort: Int? = null,
localHost: String = "localhost", localHost: String = "localhost",
coroutineContext: CoroutineContext = context.coroutineContext, coroutineContext: CoroutineContext = context.coroutineContext,
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {} socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
): KtorUdpPort = KtorUdpPort( ): KtorUdpPort = build(
context = context, context,
remoteHost = remoteHost, remoteHost,
remotePort = remotePort, remotePort,
localPort = localPort, localPort,
localHost = localHost, localHost,
coroutineContext = coroutineContext, coroutineContext,
socketOptions = socketOptions socketOptions
) ).apply { open() }
override fun build(context: Context, meta: Meta): Port { override fun build(context: Context, meta: Meta): AsynchronousPort {
val remoteHost by meta.string { error("Remote host is not specified") } val remoteHost by meta.string { error("Remote host is not specified") }
val remotePort by meta.number { error("Remote port is not specified") } val remotePort by meta.number { error("Remote port is not specified") }
val localHost: String? by meta.string() val localHost: String? by meta.string()
val localPort: Int? by meta.int() val localPort: Int? by meta.int()
return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost") return build(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
} }
} }
} }

View File

@ -0,0 +1,134 @@
package space.kscience.controls.serial
import com.fazecast.jSerialComm.SerialPort
import com.fazecast.jSerialComm.SerialPortDataListener
import com.fazecast.jSerialComm.SerialPortEvent
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.AsynchronousPort
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
/**
* A port based on JSerialComm
*/
public class AsynchronousSerialPort(
context: Context,
meta: Meta,
private val comPort: SerialPort,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
override fun toString(): String = "port[${comPort.descriptivePortName}]"
private val serialPortListener = object : SerialPortDataListener {
override fun getListeningEvents(): Int =
SerialPort.LISTENING_EVENT_DATA_RECEIVED and SerialPort.LISTENING_EVENT_DATA_AVAILABLE
override fun serialEvent(event: SerialPortEvent) {
when (event.eventType) {
SerialPort.LISTENING_EVENT_DATA_RECEIVED -> {
scope.launch { receive(event.receivedData) }
}
SerialPort.LISTENING_EVENT_DATA_AVAILABLE -> {
scope.launch(Dispatchers.IO) {
val available = comPort.bytesAvailable()
if (available > 0) {
val buffer = ByteArray(available)
comPort.readBytes(buffer, available)
receive(buffer)
}
}
}
}
}
}
override fun onOpen() {
comPort.openPort()
comPort.addDataListener(serialPortListener)
}
override val isOpen: Boolean get() = comPort.isOpen
override suspend fun write(data: ByteArray) {
comPort.writeBytes(data, data.size)
}
override fun close() {
comPort.removeDataListener()
if (comPort.isOpen) {
comPort.closePort()
}
super.close()
}
public companion object : Factory<AsynchronousPort> {
public fun build(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
coroutineContext: CoroutineContext = context.coroutineContext,
additionalConfig: SerialPort.() -> Unit = {},
): AsynchronousSerialPort {
val serialPort = SerialPort.getCommPort(portName).apply {
setComPortParameters(baudRate, dataBits, stopBits, parity)
additionalConfig()
}
val meta = Meta {
"name" put "com://$portName"
"type" put "serial"
"baudRate" put serialPort.baudRate
"dataBits" put serialPort.numDataBits
"stopBits" put serialPort.numStopBits
"parity" put serialPort.parity
}
return AsynchronousSerialPort(context, meta, serialPort, coroutineContext)
}
/**
* Construct ComPort with given parameters
*/
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
coroutineContext: CoroutineContext = context.coroutineContext,
additionalConfig: SerialPort.() -> Unit = {},
): AsynchronousSerialPort = build(
context = context,
portName = portName,
baudRate = baudRate,
dataBits = dataBits,
stopBits = stopBits,
parity = parity,
coroutineContext = coroutineContext,
additionalConfig = additionalConfig
).apply { open() }
override fun build(context: Context, meta: Meta): AsynchronousPort {
val name by meta.string { error("Serial port name not defined") }
val baudRate by meta.int(9600)
val dataBits by meta.int(8)
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
val parity by meta.int(SerialPort.NO_PARITY)
return build(context, name, baudRate, dataBits, stopBits, parity)
}
}
}

View File

@ -1,87 +0,0 @@
package space.kscience.controls.serial
import com.fazecast.jSerialComm.SerialPort
import com.fazecast.jSerialComm.SerialPortDataListener
import com.fazecast.jSerialComm.SerialPortEvent
import kotlinx.coroutines.launch
import space.kscience.controls.ports.AbstractPort
import space.kscience.controls.ports.Port
import space.kscience.controls.ports.PortFactory
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
import kotlin.coroutines.CoroutineContext
/**
* A port based on JSerialComm
*/
public class JSerialCommPort(
context: Context,
private val comPort: SerialPort,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, coroutineContext) {
override fun toString(): String = "port[${comPort.descriptivePortName}]"
private val serialPortListener = object : SerialPortDataListener {
override fun getListeningEvents(): Int = SerialPort.LISTENING_EVENT_DATA_AVAILABLE
override fun serialEvent(event: SerialPortEvent) {
if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE && event.receivedData != null) {
scope.launch { receive(event.receivedData) }
}
}
}
init {
comPort.addDataListener(serialPortListener)
}
override suspend fun write(data: ByteArray) {
comPort.writeBytes(data, data.size)
}
override fun close() {
comPort.removeDataListener()
if (comPort.isOpen) {
comPort.closePort()
}
super.close()
}
public companion object : PortFactory {
override val type: String = "com"
/**
* Construct ComPort with given parameters
*/
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
coroutineContext: CoroutineContext = context.coroutineContext,
): JSerialCommPort {
val serialPort = SerialPort.getCommPort(portName).apply {
setComPortParameters(baudRate, dataBits, stopBits, parity)
openPort()
}
return JSerialCommPort(context, serialPort, coroutineContext)
}
override fun build(context: Context, meta: Meta): Port {
val name by meta.string { error("Serial port name not defined") }
val baudRate by meta.int(9600)
val dataBits by meta.int(8)
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
val parity by meta.int(SerialPort.NO_PARITY)
return open(context, name, baudRate, dataBits, stopBits, parity)
}
}
}

View File

@ -1,19 +1,27 @@
package space.kscience.controls.serial package space.kscience.controls.serial
import space.kscience.controls.ports.PortFactory import space.kscience.controls.ports.Ports
import space.kscience.dataforge.context.AbstractPlugin import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
public class SerialPortPlugin : AbstractPlugin() { public class SerialPortPlugin : AbstractPlugin() {
override val tag: PluginTag get() = Companion.tag override val tag: PluginTag get() = Companion.tag
override fun content(target: String): Map<Name, Any> = when (target) { override fun content(target: String): Map<Name, Any> = when (target) {
PortFactory.TYPE -> mapOf(Name.EMPTY to JSerialCommPort) Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to AsynchronousSerialPort,
)
Ports.SYNCHRONOUS_PORT_TYPE -> mapOf(
"serial".asName() to SynchronousSerialPort,
)
else -> emptyMap() else -> emptyMap()
} }

View File

@ -0,0 +1,127 @@
package space.kscience.controls.serial
import com.fazecast.jSerialComm.SerialPort
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import space.kscience.controls.ports.SynchronousPort
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.int
import space.kscience.dataforge.meta.string
/**
* A port based on JSerialComm
*/
public class SynchronousSerialPort(
override val context: Context,
public val meta: Meta,
private val comPort: SerialPort,
) : SynchronousPort {
override fun toString(): String = "port[${comPort.descriptivePortName}]"
override fun open() {
if (!isOpen) {
comPort.openPort()
}
}
override val isOpen: Boolean get() = comPort.isOpen
override fun close() {
if (comPort.isOpen) {
comPort.closePort()
}
}
private val mutex = Mutex()
override suspend fun <R> respond(request: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R =
mutex.withLock {
comPort.flushIOBuffers()
comPort.writeBytes(request, request.size)
flow<ByteArray> {
while (isOpen) {
try {
val available = comPort.bytesAvailable()
if (available > 0) {
val buffer = ByteArray(available)
comPort.readBytes(buffer, available)
emit(buffer)
} else if (available < 0) break
} catch (ex: Exception) {
logger.error(ex) { "Channel read error" }
delay(1000)
}
}
}.transform()
}
public companion object : Factory<SynchronousPort> {
public fun build(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
additionalConfig: SerialPort.() -> Unit = {},
): SynchronousSerialPort {
val serialPort = SerialPort.getCommPort(portName).apply {
setComPortParameters(baudRate, dataBits, stopBits, parity)
additionalConfig()
}
val meta = Meta {
"name" put "com://$portName"
"type" put "serial"
"baudRate" put serialPort.baudRate
"dataBits" put serialPort.numDataBits
"stopBits" put serialPort.numStopBits
"parity" put serialPort.parity
}
return SynchronousSerialPort(context, meta, serialPort)
}
/**
* Construct ComPort with given parameters
*/
public fun open(
context: Context,
portName: String,
baudRate: Int = 9600,
dataBits: Int = 8,
stopBits: Int = SerialPort.ONE_STOP_BIT,
parity: Int = SerialPort.NO_PARITY,
additionalConfig: SerialPort.() -> Unit = {},
): SynchronousSerialPort = build(
context = context,
portName = portName,
baudRate = baudRate,
dataBits = dataBits,
stopBits = stopBits,
parity = parity,
additionalConfig = additionalConfig
).apply { open() }
override fun build(context: Context, meta: Meta): SynchronousPort {
val name by meta.string { error("Serial port name not defined") }
val baudRate by meta.int(9600)
val dataBits by meta.int(8)
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
val parity by meta.int(SerialPort.NO_PARITY)
return build(context, name, baudRate, dataBits, stopBits, parity)
}
}
}

View File

@ -4,7 +4,6 @@ import kotlinx.coroutines.withTimeoutOrNull
import space.kscience.controls.ports.Ports import space.kscience.controls.ports.Ports
import space.kscience.controls.ports.SynchronousPort import space.kscience.controls.ports.SynchronousPort
import space.kscience.controls.ports.respondStringWithDelimiter import space.kscience.controls.ports.respondStringWithDelimiter
import space.kscience.controls.ports.synchronous
import space.kscience.controls.spec.* import space.kscience.controls.spec.*
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.Factory
@ -22,7 +21,7 @@ class MksPdr900Device(context: Context, meta: Meta) : DeviceBySpec<MksPdr900Devi
private val portDelegate = lazy { private val portDelegate = lazy {
val ports = context.request(Ports) val ports = context.request(Ports)
ports.buildPort(meta["port"] ?: error("Port is not defined in device configuration")).synchronous() ports.buildSynchronousPort(meta["port"] ?: error("Port is not defined in device configuration"))
} }
private val port: SynchronousPort by portDelegate private val port: SynchronousPort by portDelegate

View File

@ -25,10 +25,10 @@ import kotlin.time.Duration.Companion.milliseconds
class PiMotionMasterDevice( class PiMotionMasterDevice(
context: Context, context: Context,
private val portFactory: PortFactory = KtorTcpPort, private val portFactory: Factory<AsynchronousPort> = KtorTcpPort,
) : DeviceBySpec<PiMotionMasterDevice>(PiMotionMasterDevice, context), DeviceHub { ) : DeviceBySpec<PiMotionMasterDevice>(PiMotionMasterDevice, context), DeviceHub {
private var port: Port? = null private var port: AsynchronousPort? = null
//TODO make proxy work //TODO make proxy work
//PortProxy { portFactory(address ?: error("The device is not connected"), context) } //PortProxy { portFactory(address ?: error("The device is not connected"), context) }
@ -83,7 +83,7 @@ class PiMotionMasterDevice(
suspend fun getErrorCode(): Int = mutex.withLock { suspend fun getErrorCode(): Int = mutex.withLock {
withTimeout(timeoutValue) { withTimeout(timeoutValue) {
sendCommandInternal("ERR?") sendCommandInternal("ERR?")
val errorString = port?.receiving()?.withStringDelimiter("\n")?.first() ?: error("Not connected to device") val errorString = port?.subscribe()?.withStringDelimiter("\n")?.first() ?: error("Not connected to device")
errorString.trim().toInt() errorString.trim().toInt()
} }
} }
@ -96,7 +96,7 @@ class PiMotionMasterDevice(
try { try {
withTimeout(timeoutValue) { withTimeout(timeoutValue) {
sendCommandInternal(command, *arguments) sendCommandInternal(command, *arguments)
val phrases = port?.receiving()?.withStringDelimiter("\n") ?: error("Not connected to device") val phrases = port?.subscribe()?.withStringDelimiter("\n") ?: error("Not connected to device")
phrases.transformWhile { line -> phrases.transformWhile { line ->
emit(line) emit(line)
line.endsWith(" \n") line.endsWith(" \n")

View File

@ -5,14 +5,15 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.sync.withLock
import space.kscience.controls.api.Socket import space.kscience.controls.api.AsynchronousSocket
import space.kscience.controls.ports.AbstractPort import space.kscience.controls.ports.AbstractAsynchronousPort
import space.kscience.controls.ports.withDelimiter import space.kscience.controls.ports.withDelimiter
import space.kscience.dataforge.context.* import space.kscience.dataforge.context.*
import space.kscience.dataforge.meta.Meta
import kotlin.math.abs import kotlin.math.abs
import kotlin.time.Duration import kotlin.time.Duration
abstract class VirtualDevice(val scope: CoroutineScope) : Socket<ByteArray> { abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<ByteArray> {
protected abstract suspend fun evaluateRequest(request: ByteArray) protected abstract suspend fun evaluateRequest(request: ByteArray)
@ -40,33 +41,42 @@ abstract class VirtualDevice(val scope: CoroutineScope) : Socket<ByteArray> {
toRespond.send(response) toRespond.send(response)
} }
override fun receiving(): Flow<ByteArray> = toRespond.receiveAsFlow() override fun subscribe(): Flow<ByteArray> = toRespond.receiveAsFlow()
protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch { protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch {
delay(delay) delay(delay)
respond(response()) respond(response())
} }
override fun isOpen(): Boolean = scope.isActive override val isOpen: Boolean
get() = scope.isActive
override fun close() = scope.cancel() override fun close() = scope.cancel()
} }
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) { class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractAsynchronousPort(context, Meta.EMPTY) {
private val respondJob = device.receiving().onEach { private var respondJob: Job? = null
override fun onOpen() {
respondJob = device.subscribe().onEach {
receive(it) receive(it)
}.catch { }.catch {
it.printStackTrace() it.printStackTrace()
}.launchIn(scope) }.launchIn(scope)
}
override suspend fun write(data: ByteArray) { override suspend fun write(data: ByteArray) {
device.send(data) device.send(data)
} }
override val isOpen: Boolean
get() = respondJob?.isActive == true
override fun close() { override fun close() {
respondJob.cancel() respondJob?.cancel()
super.close() super.close()
} }
} }
@ -78,7 +88,7 @@ class PiMotionMasterVirtualDevice(
scope: CoroutineScope = context, scope: CoroutineScope = context,
) : VirtualDevice(scope), ContextAware { ) : VirtualDevice(scope), ContextAware {
init { override fun open() {
//add asynchronous send logic here //add asynchronous send logic here
} }
@ -102,9 +112,11 @@ class PiMotionMasterVirtualDevice(
abs(distance) < proposedStep -> { abs(distance) < proposedStep -> {
position = targetPosition position = targetPosition
} }
targetPosition > position -> { targetPosition > position -> {
position += proposedStep position += proposedStep
} }
else -> { else -> {
position -= proposedStep position -= proposedStep
} }
@ -180,8 +192,10 @@ class PiMotionMasterVirtualDevice(
when (command) { when (command) {
"XXX" -> { "XXX" -> {
} }
"IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1") "IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1")
"VER?" -> respond(""" "VER?" -> respond(
"""
2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039 2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039
3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039 3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039
4: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550041, 00.039 4: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550041, 00.039
@ -195,8 +209,11 @@ class PiMotionMasterVirtualDevice(
12: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550049, 00.039 12: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550049, 00.039
13: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550051, 00.039 13: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550051, 00.039
FW_ARM: V1.0.0.1 FW_ARM: V1.0.0.1
""".trimIndent()) """.trimIndent()
"HLP?" -> respond(""" )
"HLP?" -> respond(
"""
The following commands are valid: The following commands are valid:
#4 Request Status Register #4 Request Status Register
#5 Request Motion Status #5 Request Motion Status
@ -235,11 +252,14 @@ class PiMotionMasterVirtualDevice(
VEL? [{<AxisID>}] Get Closed-Loop Velocity VEL? [{<AxisID>}] Get Closed-Loop Velocity
VER? Get Versions Of Firmware And Drivers VER? Get Versions Of Firmware And Drivers
end of help end of help
""".trimIndent()) """.trimIndent()
)
"ERR?" -> { "ERR?" -> {
respond(errorCode.toString()) respond(errorCode.toString())
errorCode = 0 errorCode = 0
} }
"SAI?" -> respond(axisState.keys.joinToString(separator = " \n")) "SAI?" -> respond(axisState.keys.joinToString(separator = " \n"))
"CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" } "CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" }
"RON?" -> respondForAllAxis(axisIds) { referenceMode } "RON?" -> respondForAllAxis(axisIds) { referenceMode }
@ -255,15 +275,19 @@ class PiMotionMasterVirtualDevice(
"SVO" -> doForEachAxis(parts) { key, value -> "SVO" -> doForEachAxis(parts) { key, value ->
axisState[key]?.servoMode = value.toInt() axisState[key]?.servoMode = value.toInt()
} }
"MOV" -> doForEachAxis(parts) { key, value -> "MOV" -> doForEachAxis(parts) { key, value ->
axisState[key]?.targetPosition = value.toDouble() axisState[key]?.targetPosition = value.toDouble()
} }
"VEL" -> doForEachAxis(parts) { key, value -> "VEL" -> doForEachAxis(parts) { key, value ->
axisState[key]?.velocity = value.toDouble() axisState[key]?.velocity = value.toDouble()
} }
"INI" -> { "INI" -> {
logger.info { "Axes initialized!" } logger.info { "Axes initialized!" }
} }
else -> { else -> {
logger.warn { "Unknown command: $command in message ${String(request)}" } logger.warn { "Unknown command: $command in message ${String(request)}" }
errorCode = 2 errorCode = 2

View File

@ -29,7 +29,7 @@ fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exc
val output = socket.openWriteChannel() val output = socket.openWriteChannel()
val sendJob = launch { val sendJob = launch {
virtualDevice.receiving().collect { virtualDevice.subscribe().collect {
//println("Sending: ${it.decodeToString()}") //println("Sending: ${it.decodeToString()}")
output.writeAvailable(it) output.writeAvailable(it)
output.flush() output.flush()