diff --git a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt index 7c7f99b..87ceb67 100644 --- a/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt +++ b/dataforge-device-core/src/commonMain/kotlin/hep/dataforge/control/ports/Port.kt @@ -43,7 +43,7 @@ abstract class Port(val scope: CoroutineScope) : Closeable { write(data) logger.debug { "SENT: ${data.decodeToString()}" } } catch (ex: Exception) { - if(ex is CancellationException) throw ex + if (ex is CancellationException) throw ex logger.error(ex) { "Error while writing data to the port" } } } @@ -53,7 +53,12 @@ abstract class Port(val scope: CoroutineScope) : Closeable { outgoing.send(data) } - fun flow(): Flow { + /** + * Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases. + * In order to form phrases some condition should used on top of it. + * For example [delimitedInput] generates phrases with fixed delimiter. + */ + fun input(): Flow { return incoming.receiveAsFlow() } @@ -94,3 +99,6 @@ fun Flow.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int } } } + +fun Port.delimitedInput(delimiter: ByteArray, expectedMessageSize: Int = 32) = + input().withDelimiter(delimiter, expectedMessageSize) diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt index eca8d3f..5c610d5 100644 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/KtorTcpPort.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.* import mu.KLogger import mu.KotlinLogging import java.net.InetSocketAddress +import kotlin.coroutines.coroutineContext class KtorTcpPort internal constructor( scope: CoroutineScope, @@ -19,16 +20,16 @@ class KtorTcpPort internal constructor( override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") - private val socket = scope.async { + private val futureSocket = scope.async { aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port)) } private val writeChannel = scope.async { - socket.await().openWriteChannel(true) + futureSocket.await().openWriteChannel(true) } private val listenerJob = scope.launch { - val input = socket.await().openReadChannel() + val input = futureSocket.await().openReadChannel() input.consumeEachBufferRange { buffer, last -> val array = ByteArray(buffer.remaining()) buffer.get(array) @@ -41,9 +42,16 @@ class KtorTcpPort internal constructor( writeChannel.await().writeAvailable(data) } -} + override fun close() { + listenerJob.cancel() + futureSocket.cancel() + super.close() + } -fun CoroutineScope.openKtorTcpPort(host: String, port: Int): TcpPort { - val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) - return TcpPort(scope, host, port) + companion object{ + suspend fun open(host: String, port: Int): KtorTcpPort{ + val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) + return KtorTcpPort(scope, host, port) + } + } } \ No newline at end of file diff --git a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt index 9e1d573..942aea1 100644 --- a/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt +++ b/dataforge-device-core/src/jvmMain/kotlin/hep/dataforge/control/ports/TcpPort.kt @@ -6,6 +6,7 @@ import mu.KotlinLogging import java.net.InetSocketAddress import java.nio.ByteBuffer import java.nio.channels.SocketChannel +import kotlin.coroutines.coroutineContext internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray { rewind() @@ -15,7 +16,7 @@ internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray { return response } -class TcpPort internal constructor( +class TcpPort private constructor( scope: CoroutineScope, val host: String, val port: Int @@ -24,7 +25,9 @@ class TcpPort internal constructor( override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") private val futureChannel: Deferred = this.scope.async(Dispatchers.IO) { - SocketChannel.open(InetSocketAddress(host, port)) + SocketChannel.open(InetSocketAddress(host, port)).apply { + configureBlocking(false) + } } /** @@ -52,10 +55,17 @@ class TcpPort internal constructor( override suspend fun write(data: ByteArray) { futureChannel.await().write(ByteBuffer.wrap(data)) } -} -fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort { - val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) - return TcpPort(scope, host, port) + override fun close() { + listenerJob.cancel() + futureChannel.cancel() + super.close() + } + companion object{ + suspend fun open(host: String, port: Int): TcpPort{ + val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) + return TcpPort(scope, host, port) + } + } } \ No newline at end of file diff --git a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt index 41d97a2..2ffb21a 100644 --- a/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt +++ b/dataforge-device-core/src/jvmTest/kotlin/hep/dataforge/control/ports/TcpPortTest.kt @@ -55,16 +55,39 @@ class TcpPortTest { try { runBlocking{ val server = launchEchoServer(22188) - val port = openTcpPort("localhost", 22188) + val port = TcpPort.open("localhost", 22188) val logJob = launch { - port.flow().collect { + port.input().collect { println("Flow: ${it.decodeToString()}") } } port.startJob.join() port.send("aaa\n") -// delay(20) + port.send("ddd\n") + + delay(200) + + cancel() + } + } catch (ex: Exception) { + if (ex !is CancellationException) throw ex + } + } + + @Test + fun testKtorWithEchoServer() { + try { + runBlocking{ + val server = launchEchoServer(22188) + val port = KtorTcpPort.open("localhost", 22188) + + val logJob = launch { + port.input().collect { + println("Flow: ${it.decodeToString()}") + } + } + port.send("aaa\n") port.send("ddd\n") delay(200) diff --git a/dataforge-device-serial/build.gradle.kts b/dataforge-device-serial/build.gradle.kts new file mode 100644 index 0000000..b956e43 --- /dev/null +++ b/dataforge-device-serial/build.gradle.kts @@ -0,0 +1,9 @@ +plugins { + id("scientifik.jvm") + id("scientifik.publish") +} + +dependencies{ + api(project(":dataforge-device-core")) + implementation("org.scream3r:jssc:2.8.0") +} \ No newline at end of file diff --git a/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt b/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt new file mode 100644 index 0000000..0e5ab34 --- /dev/null +++ b/dataforge-device-serial/src/main/kotlin/hep/dataforge/control/serial/SerialPort.kt @@ -0,0 +1,73 @@ +package hep.dataforge.control.serial + +import hep.dataforge.control.ports.Port +import jssc.SerialPort.* +import jssc.SerialPortEventListener +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import mu.KLogger +import mu.KotlinLogging +import kotlin.coroutines.coroutineContext +import jssc.SerialPort as JSSCPort + +/** + * COM/USB port + */ +class SerialPort private constructor(scope: CoroutineScope, val jssc: JSSCPort) : Port(scope) { + override val logger: KLogger = KotlinLogging.logger("port[${jssc.portName}]") + + private val serialPortListener = SerialPortEventListener { event -> + if (event.isRXCHAR) { + val chars = event.eventValue + val bytes = jssc.readBytes(chars) + receive(bytes) + } + } + + init { + jssc.addEventListener(serialPortListener) + } + + /** + * Clear current input and output buffers + */ + fun clearPort() { + jssc.purgePort(PURGE_RXCLEAR or PURGE_TXCLEAR) + } + + override suspend fun write(data: ByteArray) { + jssc.writeBytes(data) + } + + @Throws(Exception::class) + override fun close() { + jssc.removeEventListener() + clearPort() + if (jssc.isOpened) { + jssc.closePort() + } + super.close() + } + + companion object { + + /** + * Construct ComPort with given parameters + */ + suspend fun open( + portName: String, + baudRate: Int = BAUDRATE_9600, + dataBits: Int = DATABITS_8, + stopBits: Int = STOPBITS_1, + parity: Int = PARITY_NONE + ): SerialPort { + val jssc = JSSCPort(portName).apply { + openPort() + setParams(baudRate, dataBits, stopBits, parity) + } + val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) + return SerialPort(scope, jssc) + } + } +} \ No newline at end of file diff --git a/demo/build.gradle.kts b/demo/build.gradle.kts index 9da83fc..d104cf4 100644 --- a/demo/build.gradle.kts +++ b/demo/build.gradle.kts @@ -4,7 +4,6 @@ plugins { application } -val plotlyVersion by extra("0.2.0-dev-13") repositories{ jcenter() @@ -12,6 +11,7 @@ repositories{ maven("https://dl.bintray.com/kotlin/kotlin-eap") maven("https://dl.bintray.com/mipt-npm/dataforge") maven("https://dl.bintray.com/mipt-npm/scientifik") + maven("https://dl.bintray.com/mipt-npm/kscience") maven("https://dl.bintray.com/mipt-npm/dev") } @@ -21,7 +21,7 @@ dependencies{ implementation(project(":dataforge-device-client")) implementation("no.tornado:tornadofx:1.7.20") implementation(kotlin("stdlib-jdk8")) - implementation("scientifik:plotlykt-server:$plotlyVersion") + implementation("kscience.plotlykt:plotlykt-server:0.2.0") } tasks.withType().configureEach { diff --git a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt index df553e9..0f5ecc0 100644 --- a/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt +++ b/demo/src/main/kotlin/hep/dataforge/control/demo/demoDeviceServer.kt @@ -5,18 +5,18 @@ import hep.dataforge.control.controllers.devices import hep.dataforge.control.server.startDeviceServer import hep.dataforge.control.server.whenStarted import hep.dataforge.meta.double -import hep.dataforge.meta.invoke import hep.dataforge.names.asName import io.ktor.server.engine.ApplicationEngine import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch import kotlinx.html.div import kotlinx.html.link -import scientifik.plotly.models.Trace -import scientifik.plotly.plot -import scientifik.plotly.server.PlotlyUpdateMode -import scientifik.plotly.server.plotlyModule -import scientifik.plotly.trace +import kscience.plotly.layout +import kscience.plotly.models.Trace +import kscience.plotly.plot +import kscience.plotly.server.PlotlyUpdateMode +import kscience.plotly.server.plotlyModule +import kscience.plotly.trace import java.util.concurrent.ConcurrentLinkedQueue /** @@ -70,7 +70,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi } div("row") { div("col-6") { - plot(container = container) { + plot(renderer = container) { layout { title = "sin property" xaxis.title = "point index" @@ -85,7 +85,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi } } div("col-6") { - plot(container = container) { + plot(renderer = container) { layout { title = "cos property" xaxis.title = "point index" @@ -102,7 +102,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi } div("row") { div("col-12") { - plot(container = container) { + plot(renderer = container) { layout { title = "cos vs sin" xaxis.title = "sin" diff --git a/settings.gradle.kts b/settings.gradle.kts index d4b3ffd..993fdb1 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -14,8 +14,6 @@ pluginManagement { } plugins { - - kotlin("jvm") version kotlinVersion id("scientifik.mpp") version toolsVersion id("scientifik.jvm") version toolsVersion @@ -37,6 +35,7 @@ rootProject.name = "dataforge-control" include( ":dataforge-device-core", + ":dataforge-device-serial", ":dataforge-device-server", ":dataforge-device-client", ":demo"