Compare commits
No commits in common. "58675f72f598aac1fbcdfbadfad0c61d286c5947" and "d91296c47d73358b3d5e0ebcb018240902168cf7" have entirely different histories.
58675f72f5
...
d91296c47d
@ -8,7 +8,6 @@
|
|||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Constructor properties return `DeviceStat` in order to be able to subscribe to them
|
- Constructor properties return `DeviceStat` in order to be able to subscribe to them
|
||||||
- Refactored ports. Now we have `AsynchronousPort` as well as `SynchronousPort`
|
|
||||||
|
|
||||||
### Deprecated
|
### Deprecated
|
||||||
|
|
||||||
|
@ -1,40 +0,0 @@
|
|||||||
package space.kscience.controls.api
|
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A generic bidirectional asynchronous sender/receiver object
|
|
||||||
*/
|
|
||||||
public interface AsynchronousSocket<T> : AutoCloseable {
|
|
||||||
/**
|
|
||||||
* Send an object to the socket
|
|
||||||
*/
|
|
||||||
public suspend fun send(data: T)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flow of objects received from socket
|
|
||||||
*/
|
|
||||||
public fun subscribe(): Flow<T>
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start socket operation
|
|
||||||
*/
|
|
||||||
public fun open()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if this socket is open
|
|
||||||
*/
|
|
||||||
public val isOpen: Boolean
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connect an input to this socket.
|
|
||||||
* Multiple inputs could be connected to the same [AsynchronousSocket].
|
|
||||||
*
|
|
||||||
* This method suspends indefinitely, so it should be started in a separate coroutine.
|
|
||||||
*/
|
|
||||||
public suspend fun <T> AsynchronousSocket<T>.sendFlow(flow: Flow<T>) {
|
|
||||||
flow.collect { send(it) }
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,32 @@
|
|||||||
|
package space.kscience.controls.api
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A generic bidirectional sender/receiver object
|
||||||
|
*/
|
||||||
|
public interface Socket<T> : AutoCloseable {
|
||||||
|
/**
|
||||||
|
* Send an object to the socket
|
||||||
|
*/
|
||||||
|
public suspend fun send(data: T)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flow of objects received from socket
|
||||||
|
*/
|
||||||
|
public fun receiving(): Flow<T>
|
||||||
|
public fun isOpen(): Boolean
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect an input to this socket using designated [scope] for it and return a handler [Job].
|
||||||
|
* Multiple inputs could be connected to the same [Socket].
|
||||||
|
*/
|
||||||
|
public fun <T> Socket<T>.connectInput(scope: CoroutineScope, flow: Flow<T>): Job = scope.launch {
|
||||||
|
flow.collect { send(it) }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -1,125 +0,0 @@
|
|||||||
package space.kscience.controls.ports
|
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
|
||||||
import kotlinx.coroutines.channels.Channel
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.launchIn
|
|
||||||
import kotlinx.coroutines.flow.onEach
|
|
||||||
import kotlinx.coroutines.flow.receiveAsFlow
|
|
||||||
import kotlinx.io.Buffer
|
|
||||||
import kotlinx.io.Source
|
|
||||||
import space.kscience.controls.api.AsynchronousSocket
|
|
||||||
import space.kscience.dataforge.context.*
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.get
|
|
||||||
import space.kscience.dataforge.meta.int
|
|
||||||
import space.kscience.dataforge.meta.string
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Raw [ByteArray] port
|
|
||||||
*/
|
|
||||||
public interface AsynchronousPort : ContextAware, AsynchronousSocket<ByteArray>
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Capture [AsynchronousPort] output as kotlinx-io [Source].
|
|
||||||
* [scope] controls the consummation.
|
|
||||||
* If the scope is canceled, the source stops producing.
|
|
||||||
*/
|
|
||||||
public fun AsynchronousPort.receiveAsSource(scope: CoroutineScope): Source {
|
|
||||||
val buffer = Buffer()
|
|
||||||
|
|
||||||
subscribe().onEach {
|
|
||||||
buffer.write(it)
|
|
||||||
}.launchIn(scope)
|
|
||||||
|
|
||||||
return buffer
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Common abstraction for [AsynchronousPort] based on [Channel]
|
|
||||||
*/
|
|
||||||
public abstract class AbstractAsynchronousPort(
|
|
||||||
override val context: Context,
|
|
||||||
public val meta: Meta,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
) : AsynchronousPort {
|
|
||||||
|
|
||||||
|
|
||||||
protected val scope: CoroutineScope by lazy {
|
|
||||||
CoroutineScope(
|
|
||||||
coroutineContext +
|
|
||||||
SupervisorJob(coroutineContext[Job]) +
|
|
||||||
CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { throwable.stackTraceToString() } } +
|
|
||||||
CoroutineName(toString())
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
private val outgoing = Channel<ByteArray>(meta["outgoing.capacity"].int?:100)
|
|
||||||
private val incoming = Channel<ByteArray>(meta["incoming.capacity"].int?:100)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal method to synchronously send data
|
|
||||||
*/
|
|
||||||
protected abstract suspend fun write(data: ByteArray)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal method to receive data synchronously
|
|
||||||
*/
|
|
||||||
protected suspend fun receive(data: ByteArray) {
|
|
||||||
logger.debug { "$this RECEIVED: ${data.decodeToString()}" }
|
|
||||||
incoming.send(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
private var sendJob: Job? = null
|
|
||||||
|
|
||||||
protected abstract fun onOpen()
|
|
||||||
|
|
||||||
final override fun open() {
|
|
||||||
if (!isOpen) {
|
|
||||||
sendJob = scope.launch {
|
|
||||||
for (data in outgoing) {
|
|
||||||
try {
|
|
||||||
write(data)
|
|
||||||
logger.debug { "${this@AbstractAsynchronousPort} SENT: ${data.decodeToString()}" }
|
|
||||||
} catch (ex: Exception) {
|
|
||||||
if (ex is CancellationException) throw ex
|
|
||||||
logger.error(ex) { "Error while writing data to the port" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
onOpen()
|
|
||||||
} else {
|
|
||||||
logger.warn { "$this already opened" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send a data packet via the port
|
|
||||||
*/
|
|
||||||
override suspend fun send(data: ByteArray) {
|
|
||||||
outgoing.send(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
|
|
||||||
* To form phrases, some condition should be used on top of it.
|
|
||||||
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
|
|
||||||
*/
|
|
||||||
override fun subscribe(): Flow<ByteArray> = incoming.receiveAsFlow()
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
outgoing.close()
|
|
||||||
incoming.close()
|
|
||||||
sendJob?.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun toString(): String = meta["name"].string?:"ChannelPort[${hashCode().toString(16)}]"
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send UTF-8 encoded string
|
|
||||||
*/
|
|
||||||
public suspend fun AsynchronousPort.send(string: String): Unit = send(string.encodeToByteArray())
|
|
@ -0,0 +1,100 @@
|
|||||||
|
package space.kscience.controls.ports
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.receiveAsFlow
|
||||||
|
import space.kscience.controls.api.Socket
|
||||||
|
import space.kscience.dataforge.context.*
|
||||||
|
import space.kscience.dataforge.misc.DfType
|
||||||
|
|
||||||
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Raw [ByteArray] port
|
||||||
|
*/
|
||||||
|
public interface Port : ContextAware, Socket<ByteArray>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A specialized factory for [Port]
|
||||||
|
*/
|
||||||
|
@DfType(PortFactory.TYPE)
|
||||||
|
public interface PortFactory : Factory<Port> {
|
||||||
|
public val type: String
|
||||||
|
|
||||||
|
public companion object {
|
||||||
|
public const val TYPE: String = "controls.port"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common abstraction for [Port] based on [Channel]
|
||||||
|
*/
|
||||||
|
public abstract class AbstractPort(
|
||||||
|
override val context: Context,
|
||||||
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
|
) : Port {
|
||||||
|
|
||||||
|
protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job]))
|
||||||
|
|
||||||
|
private val outgoing = Channel<ByteArray>(100)
|
||||||
|
private val incoming = Channel<ByteArray>(100)
|
||||||
|
|
||||||
|
init {
|
||||||
|
scope.coroutineContext[Job]?.invokeOnCompletion {
|
||||||
|
close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to synchronously send data
|
||||||
|
*/
|
||||||
|
protected abstract suspend fun write(data: ByteArray)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to receive data synchronously
|
||||||
|
*/
|
||||||
|
protected suspend fun receive(data: ByteArray) {
|
||||||
|
logger.debug { "${this@AbstractPort} RECEIVED: ${data.decodeToString()}" }
|
||||||
|
incoming.send(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val sendJob = scope.launch {
|
||||||
|
for (data in outgoing) {
|
||||||
|
try {
|
||||||
|
write(data)
|
||||||
|
logger.debug { "${this@AbstractPort} SENT: ${data.decodeToString()}" }
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
if (ex is CancellationException) throw ex
|
||||||
|
logger.error(ex) { "Error while writing data to the port" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a data packet via the port
|
||||||
|
*/
|
||||||
|
override suspend fun send(data: ByteArray) {
|
||||||
|
outgoing.send(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
|
||||||
|
* To form phrases, some condition should be used on top of it.
|
||||||
|
* For example [stringsDelimitedIncoming] generates phrases with fixed delimiter.
|
||||||
|
*/
|
||||||
|
override fun receiving(): Flow<ByteArray> = incoming.receiveAsFlow()
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
outgoing.close()
|
||||||
|
incoming.close()
|
||||||
|
scope.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun isOpen(): Boolean = scope.isActive
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send UTF-8 encoded string
|
||||||
|
*/
|
||||||
|
public suspend fun Port.send(string: String): Unit = send(string.encodeToByteArray())
|
@ -0,0 +1,64 @@
|
|||||||
|
package space.kscience.controls.ports
|
||||||
|
|
||||||
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
|
import kotlinx.coroutines.flow.Flow
|
||||||
|
import kotlinx.coroutines.flow.flow
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.sync.Mutex
|
||||||
|
import kotlinx.coroutines.sync.withLock
|
||||||
|
import space.kscience.dataforge.context.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A port that could be closed multiple times and opens automatically on request
|
||||||
|
*/
|
||||||
|
public class PortProxy(override val context: Context = Global, public val factory: suspend () -> Port) : Port, ContextAware {
|
||||||
|
|
||||||
|
private var actualPort: Port? = null
|
||||||
|
private val mutex: Mutex = Mutex()
|
||||||
|
|
||||||
|
private suspend fun port(): Port {
|
||||||
|
return mutex.withLock {
|
||||||
|
if (actualPort?.isOpen() == true) {
|
||||||
|
actualPort!!
|
||||||
|
} else {
|
||||||
|
factory().also {
|
||||||
|
actualPort = it
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun send(data: ByteArray) {
|
||||||
|
port().send(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
@OptIn(ExperimentalCoroutinesApi::class)
|
||||||
|
override fun receiving(): Flow<ByteArray> = flow {
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
//recreate port and Flow on connection problems
|
||||||
|
port().receiving().collect {
|
||||||
|
emit(it)
|
||||||
|
}
|
||||||
|
} catch (t: Throwable) {
|
||||||
|
logger.warn{"Port read failed: ${t.message}. Reconnecting."}
|
||||||
|
mutex.withLock {
|
||||||
|
actualPort?.close()
|
||||||
|
actualPort = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// open by default
|
||||||
|
override fun isOpen(): Boolean = true
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
context.launch {
|
||||||
|
mutex.withLock {
|
||||||
|
actualPort?.close()
|
||||||
|
actualPort = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -11,43 +11,26 @@ public class Ports : AbstractPlugin() {
|
|||||||
|
|
||||||
override val tag: PluginTag get() = Companion.tag
|
override val tag: PluginTag get() = Companion.tag
|
||||||
|
|
||||||
private val synchronousPortFactories by lazy {
|
private val portFactories by lazy {
|
||||||
context.gather<Factory<SynchronousPort>>(SYNCHRONOUS_PORT_TYPE)
|
context.gather<PortFactory>(PortFactory.TYPE)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val asynchronousPortFactories by lazy {
|
private val portCache = mutableMapOf<Meta, Port>()
|
||||||
context.gather<Factory<AsynchronousPort>>(ASYNCHRONOUS_PORT_TYPE)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new [AsynchronousPort] according to specification
|
* Create a new [Port] according to specification
|
||||||
*/
|
*/
|
||||||
public fun buildAsynchronousPort(meta: Meta): AsynchronousPort {
|
public fun buildPort(meta: Meta): Port = portCache.getOrPut(meta) {
|
||||||
val type by meta.string { error("Port type is not defined") }
|
val type by meta.string { error("Port type is not defined") }
|
||||||
val factory = asynchronousPortFactories.entries
|
val factory = portFactories.values.firstOrNull { it.type == type }
|
||||||
.firstOrNull { it.key.toString() == type }?.value
|
|
||||||
?: error("Port factory for type $type not found")
|
?: error("Port factory for type $type not found")
|
||||||
return factory.build(context, meta)
|
factory.build(context, meta)
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a [SynchronousPort] according to specification or wrap an asynchronous implementation
|
|
||||||
*/
|
|
||||||
public fun buildSynchronousPort(meta: Meta): SynchronousPort {
|
|
||||||
val type by meta.string { error("Port type is not defined") }
|
|
||||||
val factory = synchronousPortFactories.entries
|
|
||||||
.firstOrNull { it.key.toString() == type }?.value
|
|
||||||
?: return buildAsynchronousPort(meta).asSynchronousPort()
|
|
||||||
return factory.build(context, meta)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public companion object : PluginFactory<Ports> {
|
public companion object : PluginFactory<Ports> {
|
||||||
|
|
||||||
override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP)
|
override val tag: PluginTag = PluginTag("controls.ports", group = PluginTag.DATAFORGE_GROUP)
|
||||||
|
|
||||||
public const val ASYNCHRONOUS_PORT_TYPE: String = "controls.asynchronousPort"
|
|
||||||
public const val SYNCHRONOUS_PORT_TYPE: String = "controls.synchronousPort"
|
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): Ports = Ports()
|
override fun build(context: Context, meta: Meta): Ports = Ports()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -2,86 +2,27 @@ package space.kscience.controls.ports
|
|||||||
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.first
|
import kotlinx.coroutines.flow.first
|
||||||
import kotlinx.coroutines.flow.takeWhile
|
|
||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.sync.Mutex
|
||||||
import kotlinx.coroutines.sync.withLock
|
import kotlinx.coroutines.sync.withLock
|
||||||
import kotlinx.io.Buffer
|
|
||||||
import kotlinx.io.readByteArray
|
|
||||||
import space.kscience.dataforge.context.Context
|
|
||||||
import space.kscience.dataforge.context.ContextAware
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A port handler for synchronous (request-response) communication with a port.
|
* A port handler for synchronous (request-response) communication with a port. Only one request could be active at a time (others are suspended.
|
||||||
* Only one request could be active at a time (others are suspended).
|
* The handler does not guarantee exclusive access to the port so the user mush ensure that no other controller handles port at the moment.
|
||||||
*/
|
*/
|
||||||
public interface SynchronousPort : ContextAware, AutoCloseable {
|
public class SynchronousPort(public val port: Port, private val mutex: Mutex) : Port by port {
|
||||||
|
|
||||||
public fun open()
|
|
||||||
|
|
||||||
public val isOpen: Boolean
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a single message and wait for the flow of response chunks.
|
* Send a single message and wait for the flow of respond messages.
|
||||||
* The consumer is responsible for calling a terminal operation on the flow.
|
|
||||||
*/
|
*/
|
||||||
public suspend fun <R> respond(
|
public suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R = mutex.withLock {
|
||||||
request: ByteArray,
|
port.send(data)
|
||||||
transform: suspend Flow<ByteArray>.() -> R,
|
transform(port.receiving())
|
||||||
): R
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Synchronously read fixed size response to a given [request]. Discard additional response bytes.
|
|
||||||
*/
|
|
||||||
public suspend fun respondFixedMessageSize(
|
|
||||||
request: ByteArray,
|
|
||||||
responseSize: Int,
|
|
||||||
): ByteArray = respond(request) {
|
|
||||||
val buffer = Buffer()
|
|
||||||
takeWhile {
|
|
||||||
buffer.size < responseSize
|
|
||||||
}.collect {
|
|
||||||
buffer.write(it)
|
|
||||||
}
|
|
||||||
buffer.readByteArray(responseSize)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private class SynchronousOverAsynchronousPort(
|
|
||||||
val port: AsynchronousPort,
|
|
||||||
val mutex: Mutex,
|
|
||||||
) : SynchronousPort {
|
|
||||||
|
|
||||||
override val context: Context get() = port.context
|
|
||||||
|
|
||||||
override fun open() {
|
|
||||||
if (!port.isOpen) port.open()
|
|
||||||
}
|
|
||||||
|
|
||||||
override val isOpen: Boolean get() = port.isOpen
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
if (port.isOpen) port.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun <R> respond(
|
|
||||||
request: ByteArray,
|
|
||||||
transform: suspend Flow<ByteArray>.() -> R,
|
|
||||||
): R = mutex.withLock {
|
|
||||||
port.send(request)
|
|
||||||
transform(port.subscribe())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Provide a synchronous wrapper for an asynchronous port.
|
* Provide a synchronous wrapper for a port
|
||||||
* Optionally provide external [mutex] for operation synchronization.
|
|
||||||
*
|
|
||||||
* If the [AsynchronousPort] is called directly, it could violate [SynchronousPort] contract
|
|
||||||
* of only one request running simultaneously.
|
|
||||||
*/
|
*/
|
||||||
public fun AsynchronousPort.asSynchronousPort(mutex: Mutex = Mutex()): SynchronousPort =
|
public fun Port.synchronous(mutex: Mutex = Mutex()): SynchronousPort = SynchronousPort(this, mutex)
|
||||||
SynchronousOverAsynchronousPort(this, mutex)
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send request and read incoming data blocks until the delimiter is encountered
|
* Send request and read incoming data blocks until the delimiter is encountered
|
||||||
|
@ -77,9 +77,9 @@ public fun Flow<ByteArray>.withStringDelimiter(delimiter: String): Flow<String>
|
|||||||
/**
|
/**
|
||||||
* A flow of delimited phrases
|
* A flow of delimited phrases
|
||||||
*/
|
*/
|
||||||
public fun AsynchronousPort.delimitedIncoming(delimiter: ByteArray): Flow<ByteArray> = subscribe().withDelimiter(delimiter)
|
public fun Port.delimitedIncoming(delimiter: ByteArray): Flow<ByteArray> = receiving().withDelimiter(delimiter)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A flow of delimited phrases with string content
|
* A flow of delimited phrases with string content
|
||||||
*/
|
*/
|
||||||
public fun AsynchronousPort.stringsDelimitedIncoming(delimiter: String): Flow<String> = subscribe().withStringDelimiter(delimiter)
|
public fun Port.stringsDelimitedIncoming(delimiter: String): Flow<String> = receiving().withStringDelimiter(delimiter)
|
||||||
|
@ -1,7 +1,10 @@
|
|||||||
package space.kscience.controls.ports
|
package space.kscience.controls.ports
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import space.kscience.dataforge.context.*
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.error
|
||||||
|
import space.kscience.dataforge.context.info
|
||||||
|
import space.kscience.dataforge.context.logger
|
||||||
import space.kscience.dataforge.meta.*
|
import space.kscience.dataforge.meta.*
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
@ -11,10 +14,7 @@ import java.nio.channels.DatagramChannel
|
|||||||
import java.nio.channels.SocketChannel
|
import java.nio.channels.SocketChannel
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
|
||||||
/**
|
public fun ByteBuffer.toArray(limit: Int = limit()): ByteArray {
|
||||||
* Copy the contents of this buffer to an array
|
|
||||||
*/
|
|
||||||
public fun ByteBuffer.copyToArray(limit: Int = limit()): ByteArray {
|
|
||||||
rewind()
|
rewind()
|
||||||
val response = ByteArray(limit)
|
val response = ByteArray(limit)
|
||||||
get(response)
|
get(response)
|
||||||
@ -27,31 +27,27 @@ public fun ByteBuffer.copyToArray(limit: Int = limit()): ByteArray {
|
|||||||
*/
|
*/
|
||||||
public class ChannelPort(
|
public class ChannelPort(
|
||||||
context: Context,
|
context: Context,
|
||||||
meta: Meta,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
channelBuilder: suspend () -> ByteChannel,
|
channelBuilder: suspend () -> ByteChannel,
|
||||||
) : AbstractAsynchronousPort(context, meta, coroutineContext), AutoCloseable {
|
) : AbstractPort(context, coroutineContext), AutoCloseable {
|
||||||
|
|
||||||
|
private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO) {
|
||||||
|
channelBuilder()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A handler to await port connection
|
* A handler to await port connection
|
||||||
*/
|
*/
|
||||||
private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
public val startJob: Job get() = futureChannel
|
||||||
channelBuilder()
|
|
||||||
}
|
|
||||||
|
|
||||||
private var listenerJob: Job? = null
|
private val listenerJob = scope.launch(Dispatchers.IO) {
|
||||||
|
|
||||||
override val isOpen: Boolean get() = listenerJob?.isActive == true
|
|
||||||
|
|
||||||
override fun onOpen() {
|
|
||||||
listenerJob = scope.launch(Dispatchers.IO) {
|
|
||||||
val channel = futureChannel.await()
|
val channel = futureChannel.await()
|
||||||
val buffer = ByteBuffer.allocate(1024)
|
val buffer = ByteBuffer.allocate(1024)
|
||||||
while (isActive && channel.isOpen) {
|
while (isActive && channel.isOpen) {
|
||||||
try {
|
try {
|
||||||
val num = channel.read(buffer)
|
val num = channel.read(buffer)
|
||||||
if (num > 0) {
|
if (num > 0) {
|
||||||
receive(buffer.copyToArray(num))
|
receive(buffer.toArray(num))
|
||||||
}
|
}
|
||||||
if (num < 0) cancel("The input channel is exhausted")
|
if (num < 0) cancel("The input channel is exhausted")
|
||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
@ -64,7 +60,6 @@ public class ChannelPort(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
|
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
|
||||||
futureChannel.await().write(ByteBuffer.wrap(data))
|
futureChannel.await().write(ByteBuffer.wrap(data))
|
||||||
@ -72,7 +67,6 @@ public class ChannelPort(
|
|||||||
|
|
||||||
@OptIn(ExperimentalCoroutinesApi::class)
|
@OptIn(ExperimentalCoroutinesApi::class)
|
||||||
override fun close() {
|
override fun close() {
|
||||||
listenerJob?.cancel()
|
|
||||||
if (futureChannel.isCompleted) {
|
if (futureChannel.isCompleted) {
|
||||||
futureChannel.getCompleted().close()
|
futureChannel.getCompleted().close()
|
||||||
}
|
}
|
||||||
@ -81,95 +75,61 @@ public class ChannelPort(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A [Factory] for TCP connections
|
* A [PortFactory] for TCP connections
|
||||||
*/
|
*/
|
||||||
public object TcpPort : Factory<AsynchronousPort> {
|
public object TcpPort : PortFactory {
|
||||||
|
|
||||||
public fun build(
|
override val type: String = "tcp"
|
||||||
context: Context,
|
|
||||||
host: String,
|
|
||||||
port: Int,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
): ChannelPort {
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "tcp://$host:$port"
|
|
||||||
"type" put "tcp"
|
|
||||||
"host" put host
|
|
||||||
"port" put port
|
|
||||||
}
|
|
||||||
return ChannelPort(context, meta, coroutineContext) {
|
|
||||||
SocketChannel.open(InetSocketAddress(host, port))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create and open TCP port
|
|
||||||
*/
|
|
||||||
public fun open(
|
public fun open(
|
||||||
context: Context,
|
context: Context,
|
||||||
host: String,
|
host: String,
|
||||||
port: Int,
|
port: Int,
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
): ChannelPort = build(context, host, port, coroutineContext).apply { open() }
|
): ChannelPort = ChannelPort(context, coroutineContext) {
|
||||||
|
SocketChannel.open(InetSocketAddress(host, port))
|
||||||
|
}
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): ChannelPort {
|
override fun build(context: Context, meta: Meta): ChannelPort {
|
||||||
val host = meta["host"].string ?: "localhost"
|
val host = meta["host"].string ?: "localhost"
|
||||||
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
|
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
|
||||||
return build(context, host, port)
|
return open(context, host, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A [Factory] for UDP connections
|
* A [PortFactory] for UDP connections
|
||||||
*/
|
*/
|
||||||
public object UdpPort : Factory<AsynchronousPort> {
|
public object UdpPort : PortFactory {
|
||||||
|
|
||||||
public fun build(
|
override val type: String = "udp"
|
||||||
context: Context,
|
|
||||||
remoteHost: String,
|
|
||||||
remotePort: Int,
|
|
||||||
localPort: Int? = null,
|
|
||||||
localHost: String? = null,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
): ChannelPort {
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "udp://$remoteHost:$remotePort"
|
|
||||||
"type" put "udp"
|
|
||||||
"remoteHost" put remoteHost
|
|
||||||
"remotePort" put remotePort
|
|
||||||
localHost?.let { "localHost" put it }
|
|
||||||
localPort?.let { "localPort" put it }
|
|
||||||
}
|
|
||||||
return ChannelPort(context, meta, coroutineContext) {
|
|
||||||
DatagramChannel.open().apply {
|
|
||||||
//bind the channel to a local port to receive messages
|
|
||||||
localPort?.let { bind(InetSocketAddress(localHost ?: "localhost", it)) }
|
|
||||||
//connect to remote port to send messages
|
|
||||||
connect(InetSocketAddress(remoteHost, remotePort.toInt()))
|
|
||||||
context.logger.info { "Connected to UDP $remotePort on $remoteHost" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
|
* Connect a datagram channel to a remote host/port. If [localPort] is provided, it is used to bind local port for receiving messages.
|
||||||
*/
|
*/
|
||||||
public fun open(
|
public fun openChannel(
|
||||||
context: Context,
|
context: Context,
|
||||||
remoteHost: String,
|
remoteHost: String,
|
||||||
remotePort: Int,
|
remotePort: Int,
|
||||||
localPort: Int? = null,
|
localPort: Int? = null,
|
||||||
localHost: String = "localhost",
|
localHost: String = "localhost",
|
||||||
): ChannelPort = build(context, remoteHost, remotePort, localPort, localHost).apply { open() }
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
|
): ChannelPort = ChannelPort(context, coroutineContext) {
|
||||||
|
DatagramChannel.open().apply {
|
||||||
|
//bind the channel to a local port to receive messages
|
||||||
|
localPort?.let { bind(InetSocketAddress(localHost, localPort)) }
|
||||||
|
//connect to remote port to send messages
|
||||||
|
connect(InetSocketAddress(remoteHost, remotePort))
|
||||||
|
context.logger.info { "Connected to UDP $remotePort on $remoteHost" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): ChannelPort {
|
override fun build(context: Context, meta: Meta): ChannelPort {
|
||||||
val remoteHost by meta.string { error("Remote host is not specified") }
|
val remoteHost by meta.string { error("Remote host is not specified") }
|
||||||
val remotePort by meta.number { error("Remote port is not specified") }
|
val remotePort by meta.number { error("Remote port is not specified") }
|
||||||
val localHost: String? by meta.string()
|
val localHost: String? by meta.string()
|
||||||
val localPort: Int? by meta.int()
|
val localPort: Int? by meta.int()
|
||||||
return build(context, remoteHost, remotePort.toInt(), localPort, localHost)
|
return openChannel(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -6,7 +6,7 @@ import space.kscience.dataforge.context.PluginFactory
|
|||||||
import space.kscience.dataforge.context.PluginTag
|
import space.kscience.dataforge.context.PluginTag
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import space.kscience.dataforge.names.asName
|
import space.kscience.dataforge.names.parseAsName
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A plugin for loading JVM nio-based ports
|
* A plugin for loading JVM nio-based ports
|
||||||
@ -17,9 +17,9 @@ public class JvmPortsPlugin : AbstractPlugin() {
|
|||||||
override val tag: PluginTag get() = Companion.tag
|
override val tag: PluginTag get() = Companion.tag
|
||||||
|
|
||||||
override fun content(target: String): Map<Name, Any> = when(target){
|
override fun content(target: String): Map<Name, Any> = when(target){
|
||||||
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
|
PortFactory.TYPE -> mapOf(
|
||||||
"tcp".asName() to TcpPort,
|
TcpPort.type.parseAsName() to TcpPort,
|
||||||
"udp".asName() to UdpPort
|
UdpPort.type.parseAsName() to UdpPort
|
||||||
)
|
)
|
||||||
else -> emptyMap()
|
else -> emptyMap()
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,10 @@
|
|||||||
package space.kscience.controls.ports
|
package space.kscience.controls.ports
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.isActive
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import java.net.DatagramPacket
|
import java.net.DatagramPacket
|
||||||
import java.net.DatagramSocket
|
import java.net.DatagramSocket
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
@ -12,15 +14,11 @@ import kotlin.coroutines.CoroutineContext
|
|||||||
*/
|
*/
|
||||||
public class UdpSocketPort(
|
public class UdpSocketPort(
|
||||||
override val context: Context,
|
override val context: Context,
|
||||||
meta: Meta,
|
|
||||||
private val socket: DatagramSocket,
|
private val socket: DatagramSocket,
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
|
) : AbstractPort(context, coroutineContext) {
|
||||||
|
|
||||||
private var listenerJob: Job? = null
|
private val listenerJob = context.launch(Dispatchers.IO) {
|
||||||
|
|
||||||
override fun onOpen() {
|
|
||||||
listenerJob = context.launch(Dispatchers.IO) {
|
|
||||||
while (isActive) {
|
while (isActive) {
|
||||||
val buf = ByteArray(socket.receiveBufferSize)
|
val buf = ByteArray(socket.receiveBufferSize)
|
||||||
|
|
||||||
@ -37,14 +35,12 @@ public class UdpSocketPort(
|
|||||||
receive(bytes)
|
receive(bytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
listenerJob?.cancel()
|
listenerJob.cancel()
|
||||||
super.close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override val isOpen: Boolean get() = listenerJob?.isActive == true
|
override fun isOpen(): Boolean = listenerJob.isActive
|
||||||
|
|
||||||
|
|
||||||
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
|
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
|
||||||
|
@ -12,7 +12,7 @@ import space.kscience.dataforge.context.Global
|
|||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
|
||||||
internal class AsynchronousPortIOTest {
|
internal class PortIOTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testDelimiteredByteArrayFlow() {
|
fun testDelimiteredByteArrayFlow() {
|
||||||
@ -29,8 +29,8 @@ internal class AsynchronousPortIOTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testUdpCommunication() = runTest {
|
fun testUdpCommunication() = runTest {
|
||||||
val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812)
|
val receiver = UdpPort.openChannel(Global, "localhost", 8811, localPort = 8812)
|
||||||
val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811)
|
val sender = UdpPort.openChannel(Global, "localhost", 8812, localPort = 8811)
|
||||||
|
|
||||||
delay(30)
|
delay(30)
|
||||||
repeat(10) {
|
repeat(10) {
|
||||||
@ -38,7 +38,7 @@ internal class AsynchronousPortIOTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val res = receiver
|
val res = receiver
|
||||||
.subscribe()
|
.receiving()
|
||||||
.withStringDelimiter("\n")
|
.withStringDelimiter("\n")
|
||||||
.take(10)
|
.take(10)
|
||||||
.toList()
|
.toList()
|
@ -1,95 +0,0 @@
|
|||||||
package space.kscience.controls.pi
|
|
||||||
|
|
||||||
import com.pi4j.io.serial.Baud
|
|
||||||
import com.pi4j.io.serial.Serial
|
|
||||||
import com.pi4j.io.serial.SerialConfigBuilder
|
|
||||||
import com.pi4j.ktx.io.serial
|
|
||||||
import kotlinx.coroutines.*
|
|
||||||
import space.kscience.controls.ports.AbstractAsynchronousPort
|
|
||||||
import space.kscience.controls.ports.AsynchronousPort
|
|
||||||
import space.kscience.controls.ports.copyToArray
|
|
||||||
import space.kscience.dataforge.context.*
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.enum
|
|
||||||
import space.kscience.dataforge.meta.get
|
|
||||||
import space.kscience.dataforge.meta.string
|
|
||||||
import java.nio.ByteBuffer
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
|
|
||||||
public class AsynchronousPiPort(
|
|
||||||
context: Context,
|
|
||||||
meta: Meta,
|
|
||||||
private val serial: Serial,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
|
|
||||||
|
|
||||||
|
|
||||||
private var listenerJob: Job? = null
|
|
||||||
override fun onOpen() {
|
|
||||||
serial.open()
|
|
||||||
listenerJob = this.scope.launch(Dispatchers.IO) {
|
|
||||||
val buffer = ByteBuffer.allocate(1024)
|
|
||||||
while (isActive) {
|
|
||||||
try {
|
|
||||||
val num = serial.read(buffer)
|
|
||||||
if (num > 0) {
|
|
||||||
receive(buffer.copyToArray(num))
|
|
||||||
}
|
|
||||||
if (num < 0) cancel("The input channel is exhausted")
|
|
||||||
} catch (ex: Exception) {
|
|
||||||
logger.error(ex) { "Channel read error" }
|
|
||||||
delay(1000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
|
|
||||||
serial.write(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
override val isOpen: Boolean get() = listenerJob?.isActive == true
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
listenerJob?.cancel()
|
|
||||||
serial.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
public companion object : Factory<AsynchronousPort> {
|
|
||||||
|
|
||||||
|
|
||||||
public fun build(
|
|
||||||
context: Context,
|
|
||||||
device: String,
|
|
||||||
block: SerialConfigBuilder.() -> Unit,
|
|
||||||
): AsynchronousPiPort {
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "pi://$device"
|
|
||||||
"type" put "serial"
|
|
||||||
}
|
|
||||||
val pi = context.request(PiPlugin)
|
|
||||||
|
|
||||||
val serial = pi.piContext.serial(device, block)
|
|
||||||
return AsynchronousPiPort(context, meta, serial)
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun open(
|
|
||||||
context: Context,
|
|
||||||
device: String,
|
|
||||||
block: SerialConfigBuilder.() -> Unit,
|
|
||||||
): AsynchronousPiPort = build(context, device, block).apply { open() }
|
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): AsynchronousPort {
|
|
||||||
val device: String = meta["device"].string ?: error("Device name not defined")
|
|
||||||
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
|
|
||||||
val pi = context.request(PiPlugin)
|
|
||||||
val serial = pi.piContext.serial(device) {
|
|
||||||
baud8N1(baudRate)
|
|
||||||
}
|
|
||||||
return AsynchronousPiPort(context, meta, serial)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -2,6 +2,7 @@ package space.kscience.controls.pi
|
|||||||
|
|
||||||
import com.pi4j.Pi4J
|
import com.pi4j.Pi4J
|
||||||
import space.kscience.controls.manager.DeviceManager
|
import space.kscience.controls.manager.DeviceManager
|
||||||
|
import space.kscience.controls.ports.PortFactory
|
||||||
import space.kscience.controls.ports.Ports
|
import space.kscience.controls.ports.Ports
|
||||||
import space.kscience.dataforge.context.AbstractPlugin
|
import space.kscience.dataforge.context.AbstractPlugin
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
@ -9,7 +10,7 @@ import space.kscience.dataforge.context.PluginFactory
|
|||||||
import space.kscience.dataforge.context.PluginTag
|
import space.kscience.dataforge.context.PluginTag
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import space.kscience.dataforge.names.asName
|
import space.kscience.dataforge.names.parseAsName
|
||||||
import com.pi4j.context.Context as PiContext
|
import com.pi4j.context.Context as PiContext
|
||||||
|
|
||||||
public class PiPlugin : AbstractPlugin() {
|
public class PiPlugin : AbstractPlugin() {
|
||||||
@ -21,11 +22,8 @@ public class PiPlugin : AbstractPlugin() {
|
|||||||
public val piContext: PiContext by lazy { createPiContext(context, meta) }
|
public val piContext: PiContext by lazy { createPiContext(context, meta) }
|
||||||
|
|
||||||
override fun content(target: String): Map<Name, Any> = when (target) {
|
override fun content(target: String): Map<Name, Any> = when (target) {
|
||||||
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
|
PortFactory.TYPE -> mapOf(
|
||||||
"serial".asName() to AsynchronousPiPort,
|
PiSerialPort.type.parseAsName() to PiSerialPort,
|
||||||
)
|
|
||||||
Ports.SYNCHRONOUS_PORT_TYPE -> mapOf(
|
|
||||||
"serial".asName() to SynchronousPiPort,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
else -> super.content(target)
|
else -> super.content(target)
|
||||||
@ -42,7 +40,6 @@ public class PiPlugin : AbstractPlugin() {
|
|||||||
|
|
||||||
override fun build(context: Context, meta: Meta): PiPlugin = PiPlugin()
|
override fun build(context: Context, meta: Meta): PiPlugin = PiPlugin()
|
||||||
|
|
||||||
@Suppress("UNUSED_PARAMETER")
|
|
||||||
public fun createPiContext(context: Context, meta: Meta): PiContext = Pi4J.newAutoContext()
|
public fun createPiContext(context: Context, meta: Meta): PiContext = Pi4J.newAutoContext()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,82 @@
|
|||||||
|
package space.kscience.controls.pi
|
||||||
|
|
||||||
|
import com.pi4j.io.serial.Baud
|
||||||
|
import com.pi4j.io.serial.Serial
|
||||||
|
import com.pi4j.io.serial.SerialConfigBuilder
|
||||||
|
import com.pi4j.ktx.io.serial
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import space.kscience.controls.ports.AbstractPort
|
||||||
|
import space.kscience.controls.ports.Port
|
||||||
|
import space.kscience.controls.ports.PortFactory
|
||||||
|
import space.kscience.controls.ports.toArray
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.context.error
|
||||||
|
import space.kscience.dataforge.context.logger
|
||||||
|
import space.kscience.dataforge.context.request
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.enum
|
||||||
|
import space.kscience.dataforge.meta.get
|
||||||
|
import space.kscience.dataforge.meta.string
|
||||||
|
import java.nio.ByteBuffer
|
||||||
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
import com.pi4j.context.Context as PiContext
|
||||||
|
|
||||||
|
public class PiSerialPort(
|
||||||
|
context: Context,
|
||||||
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
|
public val serialBuilder: PiContext.() -> Serial,
|
||||||
|
) : AbstractPort(context, coroutineContext) {
|
||||||
|
|
||||||
|
private val serial: Serial by lazy {
|
||||||
|
val pi = context.request(PiPlugin)
|
||||||
|
pi.piContext.serialBuilder()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private val listenerJob = this.scope.launch(Dispatchers.IO) {
|
||||||
|
val buffer = ByteBuffer.allocate(1024)
|
||||||
|
while (isActive) {
|
||||||
|
try {
|
||||||
|
val num = serial.read(buffer)
|
||||||
|
if (num > 0) {
|
||||||
|
receive(buffer.toArray(num))
|
||||||
|
}
|
||||||
|
if (num < 0) cancel("The input channel is exhausted")
|
||||||
|
} catch (ex: Exception) {
|
||||||
|
logger.error(ex) { "Channel read error" }
|
||||||
|
delay(1000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun write(data: ByteArray): Unit = withContext(Dispatchers.IO) {
|
||||||
|
serial.write(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
listenerJob.cancel()
|
||||||
|
serial.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
public companion object : PortFactory {
|
||||||
|
override val type: String get() = "pi"
|
||||||
|
|
||||||
|
public fun open(
|
||||||
|
context: Context,
|
||||||
|
device: String,
|
||||||
|
block: SerialConfigBuilder.() -> Unit,
|
||||||
|
): PiSerialPort = PiSerialPort(context) {
|
||||||
|
serial(device, block)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun build(context: Context, meta: Meta): Port = PiSerialPort(context) {
|
||||||
|
val device: String = meta["device"].string ?: error("Device name not defined")
|
||||||
|
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
|
||||||
|
serial(device) {
|
||||||
|
baud8N1(baudRate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,107 +0,0 @@
|
|||||||
package space.kscience.controls.pi
|
|
||||||
|
|
||||||
import com.pi4j.io.serial.Baud
|
|
||||||
import com.pi4j.io.serial.Serial
|
|
||||||
import com.pi4j.io.serial.SerialConfigBuilder
|
|
||||||
import com.pi4j.ktx.io.serial
|
|
||||||
import kotlinx.coroutines.delay
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.flow
|
|
||||||
import kotlinx.coroutines.runInterruptible
|
|
||||||
import kotlinx.coroutines.sync.Mutex
|
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import space.kscience.controls.ports.SynchronousPort
|
|
||||||
import space.kscience.controls.ports.copyToArray
|
|
||||||
import space.kscience.dataforge.context.*
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.enum
|
|
||||||
import space.kscience.dataforge.meta.get
|
|
||||||
import space.kscience.dataforge.meta.string
|
|
||||||
import java.nio.ByteBuffer
|
|
||||||
|
|
||||||
public class SynchronousPiPort(
|
|
||||||
override val context: Context,
|
|
||||||
public val meta: Meta,
|
|
||||||
private val serial: Serial,
|
|
||||||
private val mutex: Mutex = Mutex(),
|
|
||||||
) : SynchronousPort {
|
|
||||||
|
|
||||||
private val pi = context.request(PiPlugin)
|
|
||||||
override fun open() {
|
|
||||||
serial.open()
|
|
||||||
}
|
|
||||||
|
|
||||||
override val isOpen: Boolean get() = serial.isOpen
|
|
||||||
|
|
||||||
override suspend fun <R> respond(
|
|
||||||
request: ByteArray,
|
|
||||||
transform: suspend Flow<ByteArray>.() -> R,
|
|
||||||
): R = mutex.withLock {
|
|
||||||
serial.drain()
|
|
||||||
serial.write(request)
|
|
||||||
flow<ByteArray> {
|
|
||||||
val buffer = ByteBuffer.allocate(1024)
|
|
||||||
while (isOpen) {
|
|
||||||
try {
|
|
||||||
val num = serial.read(buffer)
|
|
||||||
if (num > 0) {
|
|
||||||
emit(buffer.copyToArray(num))
|
|
||||||
}
|
|
||||||
if (num < 0) break
|
|
||||||
} catch (ex: Exception) {
|
|
||||||
logger.error(ex) { "Channel read error" }
|
|
||||||
delay(1000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}.transform()
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun respondFixedMessageSize(request: ByteArray, responseSize: Int): ByteArray = mutex.withLock {
|
|
||||||
runInterruptible {
|
|
||||||
serial.drain()
|
|
||||||
serial.write(request)
|
|
||||||
serial.readNBytes(responseSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
serial.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
public companion object : Factory<SynchronousPort> {
|
|
||||||
|
|
||||||
|
|
||||||
public fun build(
|
|
||||||
context: Context,
|
|
||||||
device: String,
|
|
||||||
block: SerialConfigBuilder.() -> Unit,
|
|
||||||
): SynchronousPiPort {
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "pi://$device"
|
|
||||||
"type" put "serial"
|
|
||||||
}
|
|
||||||
val pi = context.request(PiPlugin)
|
|
||||||
|
|
||||||
val serial = pi.piContext.serial(device, block)
|
|
||||||
return SynchronousPiPort(context, meta, serial)
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun open(
|
|
||||||
context: Context,
|
|
||||||
device: String,
|
|
||||||
block: SerialConfigBuilder.() -> Unit,
|
|
||||||
): SynchronousPiPort = build(context, device, block).apply { open() }
|
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): SynchronousPiPort {
|
|
||||||
val device: String = meta["device"].string ?: error("Device name not defined")
|
|
||||||
val baudRate: Baud = meta["baudRate"].enum<Baud>() ?: Baud._9600
|
|
||||||
val pi = context.request(PiPlugin)
|
|
||||||
val serial = pi.piContext.serial(device) {
|
|
||||||
baud8N1(baudRate)
|
|
||||||
}
|
|
||||||
return SynchronousPiPort(context, meta, serial)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -13,7 +13,7 @@ public class KtorPortsPlugin : AbstractPlugin() {
|
|||||||
override val tag: PluginTag get() = Companion.tag
|
override val tag: PluginTag get() = Companion.tag
|
||||||
|
|
||||||
override fun content(target: String): Map<Name, Any> = when (target) {
|
override fun content(target: String): Map<Name, Any> = when (target) {
|
||||||
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort)
|
PortFactory.TYPE -> mapOf("tcp".asName() to KtorTcpPort, "udp".asName() to KtorUdpPort)
|
||||||
else -> emptyMap()
|
else -> emptyMap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -8,46 +8,42 @@ import io.ktor.network.sockets.openWriteChannel
|
|||||||
import io.ktor.utils.io.consumeEachBufferRange
|
import io.ktor.utils.io.consumeEachBufferRange
|
||||||
import io.ktor.utils.io.core.Closeable
|
import io.ktor.utils.io.core.Closeable
|
||||||
import io.ktor.utils.io.writeAvailable
|
import io.ktor.utils.io.writeAvailable
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.async
|
||||||
|
import kotlinx.coroutines.isActive
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.get
|
import space.kscience.dataforge.meta.get
|
||||||
import space.kscience.dataforge.meta.int
|
import space.kscience.dataforge.meta.int
|
||||||
import space.kscience.dataforge.meta.string
|
import space.kscience.dataforge.meta.string
|
||||||
import java.nio.ByteBuffer
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
|
||||||
public class KtorTcpPort internal constructor(
|
public class KtorTcpPort internal constructor(
|
||||||
context: Context,
|
context: Context,
|
||||||
meta: Meta,
|
|
||||||
public val host: String,
|
public val host: String,
|
||||||
public val port: Int,
|
public val port: Int,
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
|
||||||
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
|
) : AbstractPort(context, coroutineContext), Closeable {
|
||||||
|
|
||||||
override fun toString(): String = "port[tcp:$host:$port]"
|
override fun toString(): String = "port[tcp:$host:$port]"
|
||||||
|
|
||||||
private val futureSocket = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
private val futureSocket = scope.async {
|
||||||
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions)
|
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(host, port, socketOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val writeChannel = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
private val writeChannel = scope.async {
|
||||||
futureSocket.await().openWriteChannel(true)
|
futureSocket.await().openWriteChannel(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
private var listenerJob: Job? = null
|
private val listenerJob = scope.launch {
|
||||||
|
|
||||||
override fun onOpen() {
|
|
||||||
listenerJob = scope.launch {
|
|
||||||
val input = futureSocket.await().openReadChannel()
|
val input = futureSocket.await().openReadChannel()
|
||||||
input.consumeEachBufferRange { buffer: ByteBuffer, last ->
|
input.consumeEachBufferRange { buffer, _ ->
|
||||||
val array = ByteArray(buffer.remaining())
|
val array = ByteArray(buffer.remaining())
|
||||||
buffer.get(array)
|
buffer.get(array)
|
||||||
receive(array)
|
receive(array)
|
||||||
!last && isActive
|
isActive
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,45 +51,30 @@ public class KtorTcpPort internal constructor(
|
|||||||
writeChannel.await().writeAvailable(data)
|
writeChannel.await().writeAvailable(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override val isOpen: Boolean
|
|
||||||
get() = listenerJob?.isActive == true
|
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
listenerJob?.cancel()
|
listenerJob.cancel()
|
||||||
futureSocket.cancel()
|
futureSocket.cancel()
|
||||||
super.close()
|
super.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
public companion object : Factory<AsynchronousPort> {
|
public companion object : PortFactory {
|
||||||
|
|
||||||
public fun build(
|
override val type: String = "tcp"
|
||||||
context: Context,
|
|
||||||
host: String,
|
|
||||||
port: Int,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
|
||||||
): KtorTcpPort {
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "tcp://$host:$port"
|
|
||||||
"type" put "tcp"
|
|
||||||
"host" put host
|
|
||||||
"port" put port
|
|
||||||
}
|
|
||||||
return KtorTcpPort(context, meta, host, port, coroutineContext, socketOptions)
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun open(
|
public fun open(
|
||||||
context: Context,
|
context: Context,
|
||||||
host: String,
|
host: String,
|
||||||
port: Int,
|
port: Int,
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {},
|
socketOptions: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
|
||||||
): KtorTcpPort = build(context, host, port, coroutineContext, socketOptions).apply { open() }
|
): KtorTcpPort {
|
||||||
|
return KtorTcpPort(context, host, port, coroutineContext, socketOptions)
|
||||||
|
}
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): AsynchronousPort {
|
override fun build(context: Context, meta: Meta): Port {
|
||||||
val host = meta["host"].string ?: "localhost"
|
val host = meta["host"].string ?: "localhost"
|
||||||
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
|
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
|
||||||
return build(context, host, port)
|
return open(context, host, port)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -8,7 +8,6 @@ import io.ktor.utils.io.core.Closeable
|
|||||||
import io.ktor.utils.io.writeAvailable
|
import io.ktor.utils.io.writeAvailable
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.int
|
import space.kscience.dataforge.meta.int
|
||||||
import space.kscience.dataforge.meta.number
|
import space.kscience.dataforge.meta.number
|
||||||
@ -17,18 +16,17 @@ import kotlin.coroutines.CoroutineContext
|
|||||||
|
|
||||||
public class KtorUdpPort internal constructor(
|
public class KtorUdpPort internal constructor(
|
||||||
context: Context,
|
context: Context,
|
||||||
meta: Meta,
|
|
||||||
public val remoteHost: String,
|
public val remoteHost: String,
|
||||||
public val remotePort: Int,
|
public val remotePort: Int,
|
||||||
public val localPort: Int? = null,
|
public val localPort: Int? = null,
|
||||||
public val localHost: String = "localhost",
|
public val localHost: String = "localhost",
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
|
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}
|
||||||
) : AbstractAsynchronousPort(context, meta, coroutineContext), Closeable {
|
) : AbstractPort(context, coroutineContext), Closeable {
|
||||||
|
|
||||||
override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
|
override fun toString(): String = "port[udp:$remoteHost:$remotePort]"
|
||||||
|
|
||||||
private val futureSocket = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
private val futureSocket = scope.async {
|
||||||
aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect(
|
aSocket(ActorSelectorManager(Dispatchers.IO)).udp().connect(
|
||||||
remoteAddress = InetSocketAddress(remoteHost, remotePort),
|
remoteAddress = InetSocketAddress(remoteHost, remotePort),
|
||||||
localAddress = localPort?.let { InetSocketAddress(localHost, localPort) },
|
localAddress = localPort?.let { InetSocketAddress(localHost, localPort) },
|
||||||
@ -36,21 +34,17 @@ public class KtorUdpPort internal constructor(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val writeChannel: Deferred<ByteWriteChannel> = scope.async(Dispatchers.IO, start = CoroutineStart.LAZY) {
|
private val writeChannel: Deferred<ByteWriteChannel> = scope.async {
|
||||||
futureSocket.await().openWriteChannel(true)
|
futureSocket.await().openWriteChannel(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
private var listenerJob: Job? = null
|
private val listenerJob = scope.launch {
|
||||||
|
|
||||||
override fun onOpen() {
|
|
||||||
listenerJob = scope.launch {
|
|
||||||
val input = futureSocket.await().openReadChannel()
|
val input = futureSocket.await().openReadChannel()
|
||||||
input.consumeEachBufferRange { buffer, last ->
|
input.consumeEachBufferRange { buffer, _ ->
|
||||||
val array = ByteArray(buffer.remaining())
|
val array = ByteArray(buffer.remaining())
|
||||||
buffer.get(array)
|
buffer.get(array)
|
||||||
receive(array)
|
receive(array)
|
||||||
!last && isActive
|
isActive
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,49 +52,16 @@ public class KtorUdpPort internal constructor(
|
|||||||
writeChannel.await().writeAvailable(data)
|
writeChannel.await().writeAvailable(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override val isOpen: Boolean
|
|
||||||
get() = listenerJob?.isActive == true
|
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
listenerJob?.cancel()
|
listenerJob.cancel()
|
||||||
futureSocket.cancel()
|
futureSocket.cancel()
|
||||||
super.close()
|
super.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
public companion object : Factory<AsynchronousPort> {
|
public companion object : PortFactory {
|
||||||
|
|
||||||
public fun build(
|
override val type: String = "udp"
|
||||||
context: Context,
|
|
||||||
remoteHost: String,
|
|
||||||
remotePort: Int,
|
|
||||||
localPort: Int? = null,
|
|
||||||
localHost: String? = null,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
|
|
||||||
): KtorUdpPort {
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "udp://$remoteHost:$remotePort"
|
|
||||||
"type" put "udp"
|
|
||||||
"remoteHost" put remoteHost
|
|
||||||
"remotePort" put remotePort
|
|
||||||
localHost?.let { "localHost" put it }
|
|
||||||
localPort?.let { "localPort" put it }
|
|
||||||
}
|
|
||||||
return KtorUdpPort(
|
|
||||||
context = context,
|
|
||||||
meta = meta,
|
|
||||||
remoteHost = remoteHost,
|
|
||||||
remotePort = remotePort,
|
|
||||||
localPort = localPort,
|
|
||||||
localHost = localHost ?: "localhost",
|
|
||||||
coroutineContext = coroutineContext,
|
|
||||||
socketOptions = socketOptions
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create and open UDP port
|
|
||||||
*/
|
|
||||||
public fun open(
|
public fun open(
|
||||||
context: Context,
|
context: Context,
|
||||||
remoteHost: String,
|
remoteHost: String,
|
||||||
@ -108,23 +69,23 @@ public class KtorUdpPort internal constructor(
|
|||||||
localPort: Int? = null,
|
localPort: Int? = null,
|
||||||
localHost: String = "localhost",
|
localHost: String = "localhost",
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {},
|
socketOptions: SocketOptions.UDPSocketOptions.() -> Unit = {}
|
||||||
): KtorUdpPort = build(
|
): KtorUdpPort = KtorUdpPort(
|
||||||
context,
|
context = context,
|
||||||
remoteHost,
|
remoteHost = remoteHost,
|
||||||
remotePort,
|
remotePort = remotePort,
|
||||||
localPort,
|
localPort = localPort,
|
||||||
localHost,
|
localHost = localHost,
|
||||||
coroutineContext,
|
coroutineContext = coroutineContext,
|
||||||
socketOptions
|
socketOptions = socketOptions
|
||||||
).apply { open() }
|
)
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): AsynchronousPort {
|
override fun build(context: Context, meta: Meta): Port {
|
||||||
val remoteHost by meta.string { error("Remote host is not specified") }
|
val remoteHost by meta.string { error("Remote host is not specified") }
|
||||||
val remotePort by meta.number { error("Remote port is not specified") }
|
val remotePort by meta.number { error("Remote port is not specified") }
|
||||||
val localHost: String? by meta.string()
|
val localHost: String? by meta.string()
|
||||||
val localPort: Int? by meta.int()
|
val localPort: Int? by meta.int()
|
||||||
return build(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
|
return open(context, remoteHost, remotePort.toInt(), localPort, localHost ?: "localhost")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,134 +0,0 @@
|
|||||||
package space.kscience.controls.serial
|
|
||||||
|
|
||||||
import com.fazecast.jSerialComm.SerialPort
|
|
||||||
import com.fazecast.jSerialComm.SerialPortDataListener
|
|
||||||
import com.fazecast.jSerialComm.SerialPortEvent
|
|
||||||
import kotlinx.coroutines.Dispatchers
|
|
||||||
import kotlinx.coroutines.launch
|
|
||||||
import space.kscience.controls.ports.AbstractAsynchronousPort
|
|
||||||
import space.kscience.controls.ports.AsynchronousPort
|
|
||||||
import space.kscience.dataforge.context.Context
|
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.int
|
|
||||||
import space.kscience.dataforge.meta.string
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A port based on JSerialComm
|
|
||||||
*/
|
|
||||||
public class AsynchronousSerialPort(
|
|
||||||
context: Context,
|
|
||||||
meta: Meta,
|
|
||||||
private val comPort: SerialPort,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
) : AbstractAsynchronousPort(context, meta, coroutineContext) {
|
|
||||||
|
|
||||||
override fun toString(): String = "port[${comPort.descriptivePortName}]"
|
|
||||||
|
|
||||||
private val serialPortListener = object : SerialPortDataListener {
|
|
||||||
override fun getListeningEvents(): Int =
|
|
||||||
SerialPort.LISTENING_EVENT_DATA_RECEIVED and SerialPort.LISTENING_EVENT_DATA_AVAILABLE
|
|
||||||
|
|
||||||
override fun serialEvent(event: SerialPortEvent) {
|
|
||||||
when (event.eventType) {
|
|
||||||
SerialPort.LISTENING_EVENT_DATA_RECEIVED -> {
|
|
||||||
scope.launch { receive(event.receivedData) }
|
|
||||||
}
|
|
||||||
|
|
||||||
SerialPort.LISTENING_EVENT_DATA_AVAILABLE -> {
|
|
||||||
scope.launch(Dispatchers.IO) {
|
|
||||||
val available = comPort.bytesAvailable()
|
|
||||||
if (available > 0) {
|
|
||||||
val buffer = ByteArray(available)
|
|
||||||
comPort.readBytes(buffer, available)
|
|
||||||
receive(buffer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun onOpen() {
|
|
||||||
comPort.openPort()
|
|
||||||
comPort.addDataListener(serialPortListener)
|
|
||||||
}
|
|
||||||
|
|
||||||
override val isOpen: Boolean get() = comPort.isOpen
|
|
||||||
|
|
||||||
override suspend fun write(data: ByteArray) {
|
|
||||||
comPort.writeBytes(data, data.size)
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
comPort.removeDataListener()
|
|
||||||
if (comPort.isOpen) {
|
|
||||||
comPort.closePort()
|
|
||||||
}
|
|
||||||
super.close()
|
|
||||||
}
|
|
||||||
|
|
||||||
public companion object : Factory<AsynchronousPort> {
|
|
||||||
|
|
||||||
public fun build(
|
|
||||||
context: Context,
|
|
||||||
portName: String,
|
|
||||||
baudRate: Int = 9600,
|
|
||||||
dataBits: Int = 8,
|
|
||||||
stopBits: Int = SerialPort.ONE_STOP_BIT,
|
|
||||||
parity: Int = SerialPort.NO_PARITY,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
additionalConfig: SerialPort.() -> Unit = {},
|
|
||||||
): AsynchronousSerialPort {
|
|
||||||
val serialPort = SerialPort.getCommPort(portName).apply {
|
|
||||||
setComPortParameters(baudRate, dataBits, stopBits, parity)
|
|
||||||
additionalConfig()
|
|
||||||
}
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "com://$portName"
|
|
||||||
"type" put "serial"
|
|
||||||
"baudRate" put serialPort.baudRate
|
|
||||||
"dataBits" put serialPort.numDataBits
|
|
||||||
"stopBits" put serialPort.numStopBits
|
|
||||||
"parity" put serialPort.parity
|
|
||||||
}
|
|
||||||
return AsynchronousSerialPort(context, meta, serialPort, coroutineContext)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Construct ComPort with given parameters
|
|
||||||
*/
|
|
||||||
public fun open(
|
|
||||||
context: Context,
|
|
||||||
portName: String,
|
|
||||||
baudRate: Int = 9600,
|
|
||||||
dataBits: Int = 8,
|
|
||||||
stopBits: Int = SerialPort.ONE_STOP_BIT,
|
|
||||||
parity: Int = SerialPort.NO_PARITY,
|
|
||||||
coroutineContext: CoroutineContext = context.coroutineContext,
|
|
||||||
additionalConfig: SerialPort.() -> Unit = {},
|
|
||||||
): AsynchronousSerialPort = build(
|
|
||||||
context = context,
|
|
||||||
portName = portName,
|
|
||||||
baudRate = baudRate,
|
|
||||||
dataBits = dataBits,
|
|
||||||
stopBits = stopBits,
|
|
||||||
parity = parity,
|
|
||||||
coroutineContext = coroutineContext,
|
|
||||||
additionalConfig = additionalConfig
|
|
||||||
).apply { open() }
|
|
||||||
|
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): AsynchronousPort {
|
|
||||||
val name by meta.string { error("Serial port name not defined") }
|
|
||||||
val baudRate by meta.int(9600)
|
|
||||||
val dataBits by meta.int(8)
|
|
||||||
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
|
|
||||||
val parity by meta.int(SerialPort.NO_PARITY)
|
|
||||||
return build(context, name, baudRate, dataBits, stopBits, parity)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -0,0 +1,87 @@
|
|||||||
|
package space.kscience.controls.serial
|
||||||
|
|
||||||
|
import com.fazecast.jSerialComm.SerialPort
|
||||||
|
import com.fazecast.jSerialComm.SerialPortDataListener
|
||||||
|
import com.fazecast.jSerialComm.SerialPortEvent
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import space.kscience.controls.ports.AbstractPort
|
||||||
|
import space.kscience.controls.ports.Port
|
||||||
|
import space.kscience.controls.ports.PortFactory
|
||||||
|
import space.kscience.dataforge.context.Context
|
||||||
|
import space.kscience.dataforge.meta.Meta
|
||||||
|
import space.kscience.dataforge.meta.int
|
||||||
|
import space.kscience.dataforge.meta.string
|
||||||
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A port based on JSerialComm
|
||||||
|
*/
|
||||||
|
public class JSerialCommPort(
|
||||||
|
context: Context,
|
||||||
|
private val comPort: SerialPort,
|
||||||
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
|
) : AbstractPort(context, coroutineContext) {
|
||||||
|
|
||||||
|
override fun toString(): String = "port[${comPort.descriptivePortName}]"
|
||||||
|
|
||||||
|
private val serialPortListener = object : SerialPortDataListener {
|
||||||
|
override fun getListeningEvents(): Int = SerialPort.LISTENING_EVENT_DATA_AVAILABLE
|
||||||
|
|
||||||
|
override fun serialEvent(event: SerialPortEvent) {
|
||||||
|
if (event.eventType == SerialPort.LISTENING_EVENT_DATA_AVAILABLE && event.receivedData != null) {
|
||||||
|
scope.launch { receive(event.receivedData) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
init {
|
||||||
|
comPort.addDataListener(serialPortListener)
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun write(data: ByteArray) {
|
||||||
|
comPort.writeBytes(data, data.size)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
comPort.removeDataListener()
|
||||||
|
if (comPort.isOpen) {
|
||||||
|
comPort.closePort()
|
||||||
|
}
|
||||||
|
super.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
public companion object : PortFactory {
|
||||||
|
|
||||||
|
override val type: String = "com"
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct ComPort with given parameters
|
||||||
|
*/
|
||||||
|
public fun open(
|
||||||
|
context: Context,
|
||||||
|
portName: String,
|
||||||
|
baudRate: Int = 9600,
|
||||||
|
dataBits: Int = 8,
|
||||||
|
stopBits: Int = SerialPort.ONE_STOP_BIT,
|
||||||
|
parity: Int = SerialPort.NO_PARITY,
|
||||||
|
coroutineContext: CoroutineContext = context.coroutineContext,
|
||||||
|
): JSerialCommPort {
|
||||||
|
val serialPort = SerialPort.getCommPort(portName).apply {
|
||||||
|
setComPortParameters(baudRate, dataBits, stopBits, parity)
|
||||||
|
openPort()
|
||||||
|
}
|
||||||
|
return JSerialCommPort(context, serialPort, coroutineContext)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun build(context: Context, meta: Meta): Port {
|
||||||
|
val name by meta.string { error("Serial port name not defined") }
|
||||||
|
val baudRate by meta.int(9600)
|
||||||
|
val dataBits by meta.int(8)
|
||||||
|
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
|
||||||
|
val parity by meta.int(SerialPort.NO_PARITY)
|
||||||
|
return open(context, name, baudRate, dataBits, stopBits, parity)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,27 +1,19 @@
|
|||||||
package space.kscience.controls.serial
|
package space.kscience.controls.serial
|
||||||
|
|
||||||
import space.kscience.controls.ports.Ports
|
import space.kscience.controls.ports.PortFactory
|
||||||
import space.kscience.dataforge.context.AbstractPlugin
|
import space.kscience.dataforge.context.AbstractPlugin
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.PluginFactory
|
import space.kscience.dataforge.context.PluginFactory
|
||||||
import space.kscience.dataforge.context.PluginTag
|
import space.kscience.dataforge.context.PluginTag
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
import space.kscience.dataforge.names.asName
|
|
||||||
|
|
||||||
public class SerialPortPlugin : AbstractPlugin() {
|
public class SerialPortPlugin : AbstractPlugin() {
|
||||||
|
|
||||||
override val tag: PluginTag get() = Companion.tag
|
override val tag: PluginTag get() = Companion.tag
|
||||||
|
|
||||||
override fun content(target: String): Map<Name, Any> = when (target) {
|
override fun content(target: String): Map<Name, Any> = when(target){
|
||||||
Ports.ASYNCHRONOUS_PORT_TYPE -> mapOf(
|
PortFactory.TYPE -> mapOf(Name.EMPTY to JSerialCommPort)
|
||||||
"serial".asName() to AsynchronousSerialPort,
|
|
||||||
)
|
|
||||||
|
|
||||||
Ports.SYNCHRONOUS_PORT_TYPE -> mapOf(
|
|
||||||
"serial".asName() to SynchronousSerialPort,
|
|
||||||
)
|
|
||||||
|
|
||||||
else -> emptyMap()
|
else -> emptyMap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,140 +0,0 @@
|
|||||||
package space.kscience.controls.serial
|
|
||||||
|
|
||||||
import com.fazecast.jSerialComm.SerialPort
|
|
||||||
import kotlinx.coroutines.delay
|
|
||||||
import kotlinx.coroutines.flow.Flow
|
|
||||||
import kotlinx.coroutines.flow.flow
|
|
||||||
import kotlinx.coroutines.runInterruptible
|
|
||||||
import kotlinx.coroutines.sync.Mutex
|
|
||||||
import kotlinx.coroutines.sync.withLock
|
|
||||||
import space.kscience.controls.ports.SynchronousPort
|
|
||||||
import space.kscience.dataforge.context.Context
|
|
||||||
import space.kscience.dataforge.context.Factory
|
|
||||||
import space.kscience.dataforge.context.error
|
|
||||||
import space.kscience.dataforge.context.logger
|
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import space.kscience.dataforge.meta.int
|
|
||||||
import space.kscience.dataforge.meta.string
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A port based on JSerialComm
|
|
||||||
*/
|
|
||||||
public class SynchronousSerialPort(
|
|
||||||
override val context: Context,
|
|
||||||
public val meta: Meta,
|
|
||||||
private val comPort: SerialPort,
|
|
||||||
) : SynchronousPort {
|
|
||||||
|
|
||||||
override fun toString(): String = "port[${comPort.descriptivePortName}]"
|
|
||||||
|
|
||||||
|
|
||||||
override fun open() {
|
|
||||||
if (!isOpen) {
|
|
||||||
comPort.openPort()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override val isOpen: Boolean get() = comPort.isOpen
|
|
||||||
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
if (comPort.isOpen) {
|
|
||||||
comPort.closePort()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private val mutex = Mutex()
|
|
||||||
|
|
||||||
override suspend fun <R> respond(
|
|
||||||
request: ByteArray,
|
|
||||||
transform: suspend Flow<ByteArray>.() -> R,
|
|
||||||
): R = mutex.withLock {
|
|
||||||
comPort.flushIOBuffers()
|
|
||||||
comPort.writeBytes(request, request.size)
|
|
||||||
flow<ByteArray> {
|
|
||||||
while (isOpen) {
|
|
||||||
try {
|
|
||||||
val available = comPort.bytesAvailable()
|
|
||||||
if (available > 0) {
|
|
||||||
val buffer = ByteArray(available)
|
|
||||||
comPort.readBytes(buffer, available)
|
|
||||||
emit(buffer)
|
|
||||||
} else if (available < 0) break
|
|
||||||
} catch (ex: Exception) {
|
|
||||||
logger.error(ex) { "Channel read error" }
|
|
||||||
delay(1000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}.transform()
|
|
||||||
}
|
|
||||||
|
|
||||||
override suspend fun respondFixedMessageSize(request: ByteArray, responseSize: Int): ByteArray = mutex.withLock {
|
|
||||||
runInterruptible {
|
|
||||||
comPort.flushIOBuffers()
|
|
||||||
comPort.writeBytes(request, request.size)
|
|
||||||
val buffer = ByteArray(responseSize)
|
|
||||||
comPort.readBytes(buffer, responseSize)
|
|
||||||
buffer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public companion object : Factory<SynchronousPort> {
|
|
||||||
|
|
||||||
public fun build(
|
|
||||||
context: Context,
|
|
||||||
portName: String,
|
|
||||||
baudRate: Int = 9600,
|
|
||||||
dataBits: Int = 8,
|
|
||||||
stopBits: Int = SerialPort.ONE_STOP_BIT,
|
|
||||||
parity: Int = SerialPort.NO_PARITY,
|
|
||||||
additionalConfig: SerialPort.() -> Unit = {},
|
|
||||||
): SynchronousSerialPort {
|
|
||||||
val serialPort = SerialPort.getCommPort(portName).apply {
|
|
||||||
setComPortParameters(baudRate, dataBits, stopBits, parity)
|
|
||||||
additionalConfig()
|
|
||||||
}
|
|
||||||
val meta = Meta {
|
|
||||||
"name" put "com://$portName"
|
|
||||||
"type" put "serial"
|
|
||||||
"baudRate" put serialPort.baudRate
|
|
||||||
"dataBits" put serialPort.numDataBits
|
|
||||||
"stopBits" put serialPort.numStopBits
|
|
||||||
"parity" put serialPort.parity
|
|
||||||
}
|
|
||||||
return SynchronousSerialPort(context, meta, serialPort)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Construct ComPort with given parameters
|
|
||||||
*/
|
|
||||||
public fun open(
|
|
||||||
context: Context,
|
|
||||||
portName: String,
|
|
||||||
baudRate: Int = 9600,
|
|
||||||
dataBits: Int = 8,
|
|
||||||
stopBits: Int = SerialPort.ONE_STOP_BIT,
|
|
||||||
parity: Int = SerialPort.NO_PARITY,
|
|
||||||
additionalConfig: SerialPort.() -> Unit = {},
|
|
||||||
): SynchronousSerialPort = build(
|
|
||||||
context = context,
|
|
||||||
portName = portName,
|
|
||||||
baudRate = baudRate,
|
|
||||||
dataBits = dataBits,
|
|
||||||
stopBits = stopBits,
|
|
||||||
parity = parity,
|
|
||||||
additionalConfig = additionalConfig
|
|
||||||
).apply { open() }
|
|
||||||
|
|
||||||
|
|
||||||
override fun build(context: Context, meta: Meta): SynchronousPort {
|
|
||||||
val name by meta.string { error("Serial port name not defined") }
|
|
||||||
val baudRate by meta.int(9600)
|
|
||||||
val dataBits by meta.int(8)
|
|
||||||
val stopBits by meta.int(SerialPort.ONE_STOP_BIT)
|
|
||||||
val parity by meta.int(SerialPort.NO_PARITY)
|
|
||||||
return build(context, name, baudRate, dataBits, stopBits, parity)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -4,6 +4,7 @@ import kotlinx.coroutines.withTimeoutOrNull
|
|||||||
import space.kscience.controls.ports.Ports
|
import space.kscience.controls.ports.Ports
|
||||||
import space.kscience.controls.ports.SynchronousPort
|
import space.kscience.controls.ports.SynchronousPort
|
||||||
import space.kscience.controls.ports.respondStringWithDelimiter
|
import space.kscience.controls.ports.respondStringWithDelimiter
|
||||||
|
import space.kscience.controls.ports.synchronous
|
||||||
import space.kscience.controls.spec.*
|
import space.kscience.controls.spec.*
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Factory
|
import space.kscience.dataforge.context.Factory
|
||||||
@ -21,7 +22,7 @@ class MksPdr900Device(context: Context, meta: Meta) : DeviceBySpec<MksPdr900Devi
|
|||||||
|
|
||||||
private val portDelegate = lazy {
|
private val portDelegate = lazy {
|
||||||
val ports = context.request(Ports)
|
val ports = context.request(Ports)
|
||||||
ports.buildSynchronousPort(meta["port"] ?: error("Port is not defined in device configuration"))
|
ports.buildPort(meta["port"] ?: error("Port is not defined in device configuration")).synchronous()
|
||||||
}
|
}
|
||||||
|
|
||||||
private val port: SynchronousPort by portDelegate
|
private val port: SynchronousPort by portDelegate
|
||||||
|
@ -25,10 +25,10 @@ import kotlin.time.Duration.Companion.milliseconds
|
|||||||
|
|
||||||
class PiMotionMasterDevice(
|
class PiMotionMasterDevice(
|
||||||
context: Context,
|
context: Context,
|
||||||
private val portFactory: Factory<AsynchronousPort> = KtorTcpPort,
|
private val portFactory: PortFactory = KtorTcpPort,
|
||||||
) : DeviceBySpec<PiMotionMasterDevice>(PiMotionMasterDevice, context), DeviceHub {
|
) : DeviceBySpec<PiMotionMasterDevice>(PiMotionMasterDevice, context), DeviceHub {
|
||||||
|
|
||||||
private var port: AsynchronousPort? = null
|
private var port: Port? = null
|
||||||
//TODO make proxy work
|
//TODO make proxy work
|
||||||
//PortProxy { portFactory(address ?: error("The device is not connected"), context) }
|
//PortProxy { portFactory(address ?: error("The device is not connected"), context) }
|
||||||
|
|
||||||
@ -83,7 +83,7 @@ class PiMotionMasterDevice(
|
|||||||
suspend fun getErrorCode(): Int = mutex.withLock {
|
suspend fun getErrorCode(): Int = mutex.withLock {
|
||||||
withTimeout(timeoutValue) {
|
withTimeout(timeoutValue) {
|
||||||
sendCommandInternal("ERR?")
|
sendCommandInternal("ERR?")
|
||||||
val errorString = port?.subscribe()?.withStringDelimiter("\n")?.first() ?: error("Not connected to device")
|
val errorString = port?.receiving()?.withStringDelimiter("\n")?.first() ?: error("Not connected to device")
|
||||||
errorString.trim().toInt()
|
errorString.trim().toInt()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -96,7 +96,7 @@ class PiMotionMasterDevice(
|
|||||||
try {
|
try {
|
||||||
withTimeout(timeoutValue) {
|
withTimeout(timeoutValue) {
|
||||||
sendCommandInternal(command, *arguments)
|
sendCommandInternal(command, *arguments)
|
||||||
val phrases = port?.subscribe()?.withStringDelimiter("\n") ?: error("Not connected to device")
|
val phrases = port?.receiving()?.withStringDelimiter("\n") ?: error("Not connected to device")
|
||||||
phrases.transformWhile { line ->
|
phrases.transformWhile { line ->
|
||||||
emit(line)
|
emit(line)
|
||||||
line.endsWith(" \n")
|
line.endsWith(" \n")
|
||||||
|
@ -5,15 +5,14 @@ import kotlinx.coroutines.channels.Channel
|
|||||||
import kotlinx.coroutines.flow.*
|
import kotlinx.coroutines.flow.*
|
||||||
import kotlinx.coroutines.sync.Mutex
|
import kotlinx.coroutines.sync.Mutex
|
||||||
import kotlinx.coroutines.sync.withLock
|
import kotlinx.coroutines.sync.withLock
|
||||||
import space.kscience.controls.api.AsynchronousSocket
|
import space.kscience.controls.api.Socket
|
||||||
import space.kscience.controls.ports.AbstractAsynchronousPort
|
import space.kscience.controls.ports.AbstractPort
|
||||||
import space.kscience.controls.ports.withDelimiter
|
import space.kscience.controls.ports.withDelimiter
|
||||||
import space.kscience.dataforge.context.*
|
import space.kscience.dataforge.context.*
|
||||||
import space.kscience.dataforge.meta.Meta
|
|
||||||
import kotlin.math.abs
|
import kotlin.math.abs
|
||||||
import kotlin.time.Duration
|
import kotlin.time.Duration
|
||||||
|
|
||||||
abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<ByteArray> {
|
abstract class VirtualDevice(val scope: CoroutineScope) : Socket<ByteArray> {
|
||||||
|
|
||||||
protected abstract suspend fun evaluateRequest(request: ByteArray)
|
protected abstract suspend fun evaluateRequest(request: ByteArray)
|
||||||
|
|
||||||
@ -41,42 +40,33 @@ abstract class VirtualDevice(val scope: CoroutineScope) : AsynchronousSocket<Byt
|
|||||||
toRespond.send(response)
|
toRespond.send(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun subscribe(): Flow<ByteArray> = toRespond.receiveAsFlow()
|
override fun receiving(): Flow<ByteArray> = toRespond.receiveAsFlow()
|
||||||
|
|
||||||
protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch {
|
protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch {
|
||||||
delay(delay)
|
delay(delay)
|
||||||
respond(response())
|
respond(response())
|
||||||
}
|
}
|
||||||
|
|
||||||
override val isOpen: Boolean
|
override fun isOpen(): Boolean = scope.isActive
|
||||||
get() = scope.isActive
|
|
||||||
|
|
||||||
override fun close() = scope.cancel()
|
override fun close() = scope.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractAsynchronousPort(context, Meta.EMPTY) {
|
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) {
|
||||||
|
|
||||||
private var respondJob: Job? = null
|
private val respondJob = device.receiving().onEach {
|
||||||
|
|
||||||
override fun onOpen() {
|
|
||||||
respondJob = device.subscribe().onEach {
|
|
||||||
receive(it)
|
receive(it)
|
||||||
}.catch {
|
}.catch {
|
||||||
it.printStackTrace()
|
it.printStackTrace()
|
||||||
}.launchIn(scope)
|
}.launchIn(scope)
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
override suspend fun write(data: ByteArray) {
|
override suspend fun write(data: ByteArray) {
|
||||||
device.send(data)
|
device.send(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override val isOpen: Boolean
|
|
||||||
get() = respondJob?.isActive == true
|
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
respondJob?.cancel()
|
respondJob.cancel()
|
||||||
super.close()
|
super.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -88,7 +78,7 @@ class PiMotionMasterVirtualDevice(
|
|||||||
scope: CoroutineScope = context,
|
scope: CoroutineScope = context,
|
||||||
) : VirtualDevice(scope), ContextAware {
|
) : VirtualDevice(scope), ContextAware {
|
||||||
|
|
||||||
override fun open() {
|
init {
|
||||||
//add asynchronous send logic here
|
//add asynchronous send logic here
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -112,11 +102,9 @@ class PiMotionMasterVirtualDevice(
|
|||||||
abs(distance) < proposedStep -> {
|
abs(distance) < proposedStep -> {
|
||||||
position = targetPosition
|
position = targetPosition
|
||||||
}
|
}
|
||||||
|
|
||||||
targetPosition > position -> {
|
targetPosition > position -> {
|
||||||
position += proposedStep
|
position += proposedStep
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
position -= proposedStep
|
position -= proposedStep
|
||||||
}
|
}
|
||||||
@ -192,10 +180,8 @@ class PiMotionMasterVirtualDevice(
|
|||||||
when (command) {
|
when (command) {
|
||||||
"XXX" -> {
|
"XXX" -> {
|
||||||
}
|
}
|
||||||
|
|
||||||
"IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1")
|
"IDN?", "*IDN?" -> respond("(c)2015 Physik Instrumente(PI) Karlsruhe, C-885.M1 TCP-IP Master,0,1.0.0.1")
|
||||||
"VER?" -> respond(
|
"VER?" -> respond("""
|
||||||
"""
|
|
||||||
2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039
|
2: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550039, 00.039
|
||||||
3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039
|
3: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550040, 00.039
|
||||||
4: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550041, 00.039
|
4: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550041, 00.039
|
||||||
@ -209,11 +195,8 @@ class PiMotionMasterVirtualDevice(
|
|||||||
12: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550049, 00.039
|
12: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550049, 00.039
|
||||||
13: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550051, 00.039
|
13: (c)2017 Physik Instrumente (PI) GmbH & Co. KG, C-663.12C885, 018550051, 00.039
|
||||||
FW_ARM: V1.0.0.1
|
FW_ARM: V1.0.0.1
|
||||||
""".trimIndent()
|
""".trimIndent())
|
||||||
)
|
"HLP?" -> respond("""
|
||||||
|
|
||||||
"HLP?" -> respond(
|
|
||||||
"""
|
|
||||||
The following commands are valid:
|
The following commands are valid:
|
||||||
#4 Request Status Register
|
#4 Request Status Register
|
||||||
#5 Request Motion Status
|
#5 Request Motion Status
|
||||||
@ -252,14 +235,11 @@ class PiMotionMasterVirtualDevice(
|
|||||||
VEL? [{<AxisID>}] Get Closed-Loop Velocity
|
VEL? [{<AxisID>}] Get Closed-Loop Velocity
|
||||||
VER? Get Versions Of Firmware And Drivers
|
VER? Get Versions Of Firmware And Drivers
|
||||||
end of help
|
end of help
|
||||||
""".trimIndent()
|
""".trimIndent())
|
||||||
)
|
|
||||||
|
|
||||||
"ERR?" -> {
|
"ERR?" -> {
|
||||||
respond(errorCode.toString())
|
respond(errorCode.toString())
|
||||||
errorCode = 0
|
errorCode = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
"SAI?" -> respond(axisState.keys.joinToString(separator = " \n"))
|
"SAI?" -> respond(axisState.keys.joinToString(separator = " \n"))
|
||||||
"CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" }
|
"CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" }
|
||||||
"RON?" -> respondForAllAxis(axisIds) { referenceMode }
|
"RON?" -> respondForAllAxis(axisIds) { referenceMode }
|
||||||
@ -275,19 +255,15 @@ class PiMotionMasterVirtualDevice(
|
|||||||
"SVO" -> doForEachAxis(parts) { key, value ->
|
"SVO" -> doForEachAxis(parts) { key, value ->
|
||||||
axisState[key]?.servoMode = value.toInt()
|
axisState[key]?.servoMode = value.toInt()
|
||||||
}
|
}
|
||||||
|
|
||||||
"MOV" -> doForEachAxis(parts) { key, value ->
|
"MOV" -> doForEachAxis(parts) { key, value ->
|
||||||
axisState[key]?.targetPosition = value.toDouble()
|
axisState[key]?.targetPosition = value.toDouble()
|
||||||
}
|
}
|
||||||
|
|
||||||
"VEL" -> doForEachAxis(parts) { key, value ->
|
"VEL" -> doForEachAxis(parts) { key, value ->
|
||||||
axisState[key]?.velocity = value.toDouble()
|
axisState[key]?.velocity = value.toDouble()
|
||||||
}
|
}
|
||||||
|
|
||||||
"INI" -> {
|
"INI" -> {
|
||||||
logger.info { "Axes initialized!" }
|
logger.info { "Axes initialized!" }
|
||||||
}
|
}
|
||||||
|
|
||||||
else -> {
|
else -> {
|
||||||
logger.warn { "Unknown command: $command in message ${String(request)}" }
|
logger.warn { "Unknown command: $command in message ${String(request)}" }
|
||||||
errorCode = 2
|
errorCode = 2
|
||||||
|
@ -29,7 +29,7 @@ fun Context.launchPiDebugServer(port: Int, axes: List<String>): Job = launch(exc
|
|||||||
val output = socket.openWriteChannel()
|
val output = socket.openWriteChannel()
|
||||||
|
|
||||||
val sendJob = launch {
|
val sendJob = launch {
|
||||||
virtualDevice.subscribe().collect {
|
virtualDevice.receiving().collect {
|
||||||
//println("Sending: ${it.decodeToString()}")
|
//println("Sending: ${it.decodeToString()}")
|
||||||
output.writeAvailable(it)
|
output.writeAvailable(it)
|
||||||
output.flush()
|
output.flush()
|
||||||
|
Loading…
Reference in New Issue
Block a user