Dev #6

Merged
altavir merged 75 commits from dev into master 2021-10-23 11:02:48 +03:00
9 changed files with 161 additions and 31 deletions
Showing only changes of commit fed5d55512 - Show all commits
dataforge-device-core/src
commonMain/kotlin/hep/dataforge/control/ports
jvmMain/kotlin/hep/dataforge/control/ports
jvmTest/kotlin/hep/dataforge/control/ports
dataforge-device-serial
build.gradle.kts
src/main/kotlin/hep/dataforge/control/serial
demo
build.gradle.kts
src/main/kotlin/hep/dataforge/control/demo
settings.gradle.kts

@ -43,7 +43,7 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
write(data) write(data)
logger.debug { "SENT: ${data.decodeToString()}" } logger.debug { "SENT: ${data.decodeToString()}" }
} catch (ex: Exception) { } catch (ex: Exception) {
if(ex is CancellationException) throw ex if (ex is CancellationException) throw ex
logger.error(ex) { "Error while writing data to the port" } logger.error(ex) { "Error while writing data to the port" }
} }
} }
@ -53,7 +53,12 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
outgoing.send(data) outgoing.send(data)
} }
fun flow(): Flow<ByteArray> { /**
* Raw flow of incoming data chunks. The chunks are not guaranteed to be complete phrases.
* In order to form phrases some condition should used on top of it.
* For example [delimitedInput] generates phrases with fixed delimiter.
*/
fun input(): Flow<ByteArray> {
return incoming.receiveAsFlow() return incoming.receiveAsFlow()
} }
@ -94,3 +99,6 @@ fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int
} }
} }
} }
fun Port.delimitedInput(delimiter: ByteArray, expectedMessageSize: Int = 32) =
input().withDelimiter(delimiter, expectedMessageSize)

@ -10,6 +10,7 @@ import kotlinx.coroutines.*
import mu.KLogger import mu.KLogger
import mu.KotlinLogging import mu.KotlinLogging
import java.net.InetSocketAddress import java.net.InetSocketAddress
import kotlin.coroutines.coroutineContext
class KtorTcpPort internal constructor( class KtorTcpPort internal constructor(
scope: CoroutineScope, scope: CoroutineScope,
@ -19,16 +20,16 @@ class KtorTcpPort internal constructor(
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]")
private val socket = scope.async { private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port)) aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port))
} }
private val writeChannel = scope.async { private val writeChannel = scope.async {
socket.await().openWriteChannel(true) futureSocket.await().openWriteChannel(true)
} }
private val listenerJob = scope.launch { private val listenerJob = scope.launch {
val input = socket.await().openReadChannel() val input = futureSocket.await().openReadChannel()
input.consumeEachBufferRange { buffer, last -> input.consumeEachBufferRange { buffer, last ->
val array = ByteArray(buffer.remaining()) val array = ByteArray(buffer.remaining())
buffer.get(array) buffer.get(array)
@ -41,9 +42,16 @@ class KtorTcpPort internal constructor(
writeChannel.await().writeAvailable(data) writeChannel.await().writeAvailable(data)
} }
} override fun close() {
listenerJob.cancel()
futureSocket.cancel()
super.close()
}
fun CoroutineScope.openKtorTcpPort(host: String, port: Int): TcpPort { companion object{
suspend fun open(host: String, port: Int): KtorTcpPort{
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) val scope = CoroutineScope(SupervisorJob(coroutineContext[Job]))
return TcpPort(scope, host, port) return KtorTcpPort(scope, host, port)
}
}
} }

@ -6,6 +6,7 @@ import mu.KotlinLogging
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
import kotlin.coroutines.coroutineContext
internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray { internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
rewind() rewind()
@ -15,7 +16,7 @@ internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
return response return response
} }
class TcpPort internal constructor( class TcpPort private constructor(
scope: CoroutineScope, scope: CoroutineScope,
val host: String, val host: String,
val port: Int val port: Int
@ -24,7 +25,9 @@ class TcpPort internal constructor(
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]")
private val futureChannel: Deferred<SocketChannel> = this.scope.async(Dispatchers.IO) { private val futureChannel: Deferred<SocketChannel> = this.scope.async(Dispatchers.IO) {
SocketChannel.open(InetSocketAddress(host, port)) SocketChannel.open(InetSocketAddress(host, port)).apply {
configureBlocking(false)
}
} }
/** /**
@ -52,10 +55,17 @@ class TcpPort internal constructor(
override suspend fun write(data: ByteArray) { override suspend fun write(data: ByteArray) {
futureChannel.await().write(ByteBuffer.wrap(data)) futureChannel.await().write(ByteBuffer.wrap(data))
} }
}
fun CoroutineScope.openTcpPort(host: String, port: Int): TcpPort { override fun close() {
listenerJob.cancel()
futureChannel.cancel()
super.close()
}
companion object{
suspend fun open(host: String, port: Int): TcpPort{
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) val scope = CoroutineScope(SupervisorJob(coroutineContext[Job]))
return TcpPort(scope, host, port) return TcpPort(scope, host, port)
}
}
} }

@ -55,16 +55,39 @@ class TcpPortTest {
try { try {
runBlocking{ runBlocking{
val server = launchEchoServer(22188) val server = launchEchoServer(22188)
val port = openTcpPort("localhost", 22188) val port = TcpPort.open("localhost", 22188)
val logJob = launch { val logJob = launch {
port.flow().collect { port.input().collect {
println("Flow: ${it.decodeToString()}") println("Flow: ${it.decodeToString()}")
} }
} }
port.startJob.join() port.startJob.join()
port.send("aaa\n") port.send("aaa\n")
// delay(20) port.send("ddd\n")
delay(200)
cancel()
}
} catch (ex: Exception) {
if (ex !is CancellationException) throw ex
}
}
@Test
fun testKtorWithEchoServer() {
try {
runBlocking{
val server = launchEchoServer(22188)
val port = KtorTcpPort.open("localhost", 22188)
val logJob = launch {
port.input().collect {
println("Flow: ${it.decodeToString()}")
}
}
port.send("aaa\n")
port.send("ddd\n") port.send("ddd\n")
delay(200) delay(200)

@ -0,0 +1,9 @@
plugins {
id("scientifik.jvm")
id("scientifik.publish")
}
dependencies{
api(project(":dataforge-device-core"))
implementation("org.scream3r:jssc:2.8.0")
}

@ -0,0 +1,73 @@
package hep.dataforge.control.serial
import hep.dataforge.control.ports.Port
import jssc.SerialPort.*
import jssc.SerialPortEventListener
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.coroutineContext
import jssc.SerialPort as JSSCPort
/**
* COM/USB port
*/
class SerialPort private constructor(scope: CoroutineScope, val jssc: JSSCPort) : Port(scope) {
override val logger: KLogger = KotlinLogging.logger("port[${jssc.portName}]")
private val serialPortListener = SerialPortEventListener { event ->
if (event.isRXCHAR) {
val chars = event.eventValue
val bytes = jssc.readBytes(chars)
receive(bytes)
}
}
init {
jssc.addEventListener(serialPortListener)
}
/**
* Clear current input and output buffers
*/
fun clearPort() {
jssc.purgePort(PURGE_RXCLEAR or PURGE_TXCLEAR)
}
override suspend fun write(data: ByteArray) {
jssc.writeBytes(data)
}
@Throws(Exception::class)
override fun close() {
jssc.removeEventListener()
clearPort()
if (jssc.isOpened) {
jssc.closePort()
}
super.close()
}
companion object {
/**
* Construct ComPort with given parameters
*/
suspend fun open(
portName: String,
baudRate: Int = BAUDRATE_9600,
dataBits: Int = DATABITS_8,
stopBits: Int = STOPBITS_1,
parity: Int = PARITY_NONE
): SerialPort {
val jssc = JSSCPort(portName).apply {
openPort()
setParams(baudRate, dataBits, stopBits, parity)
}
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job]))
return SerialPort(scope, jssc)
}
}
}

@ -4,7 +4,6 @@ plugins {
application application
} }
val plotlyVersion by extra("0.2.0-dev-13")
repositories{ repositories{
jcenter() jcenter()
@ -12,6 +11,7 @@ repositories{
maven("https://dl.bintray.com/kotlin/kotlin-eap") maven("https://dl.bintray.com/kotlin/kotlin-eap")
maven("https://dl.bintray.com/mipt-npm/dataforge") maven("https://dl.bintray.com/mipt-npm/dataforge")
maven("https://dl.bintray.com/mipt-npm/scientifik") maven("https://dl.bintray.com/mipt-npm/scientifik")
maven("https://dl.bintray.com/mipt-npm/kscience")
maven("https://dl.bintray.com/mipt-npm/dev") maven("https://dl.bintray.com/mipt-npm/dev")
} }
@ -21,7 +21,7 @@ dependencies{
implementation(project(":dataforge-device-client")) implementation(project(":dataforge-device-client"))
implementation("no.tornado:tornadofx:1.7.20") implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8")) implementation(kotlin("stdlib-jdk8"))
implementation("scientifik:plotlykt-server:$plotlyVersion") implementation("kscience.plotlykt:plotlykt-server:0.2.0")
} }
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach { tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {

@ -5,18 +5,18 @@ import hep.dataforge.control.controllers.devices
import hep.dataforge.control.server.startDeviceServer import hep.dataforge.control.server.startDeviceServer
import hep.dataforge.control.server.whenStarted import hep.dataforge.control.server.whenStarted
import hep.dataforge.meta.double import hep.dataforge.meta.double
import hep.dataforge.meta.invoke
import hep.dataforge.names.asName import hep.dataforge.names.asName
import io.ktor.server.engine.ApplicationEngine import io.ktor.server.engine.ApplicationEngine
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.html.div import kotlinx.html.div
import kotlinx.html.link import kotlinx.html.link
import scientifik.plotly.models.Trace import kscience.plotly.layout
import scientifik.plotly.plot import kscience.plotly.models.Trace
import scientifik.plotly.server.PlotlyUpdateMode import kscience.plotly.plot
import scientifik.plotly.server.plotlyModule import kscience.plotly.server.PlotlyUpdateMode
import scientifik.plotly.trace import kscience.plotly.server.plotlyModule
import kscience.plotly.trace
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
/** /**
@ -70,7 +70,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
} }
div("row") { div("row") {
div("col-6") { div("col-6") {
plot(container = container) { plot(renderer = container) {
layout { layout {
title = "sin property" title = "sin property"
xaxis.title = "point index" xaxis.title = "point index"
@ -85,7 +85,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
} }
} }
div("col-6") { div("col-6") {
plot(container = container) { plot(renderer = container) {
layout { layout {
title = "cos property" title = "cos property"
xaxis.title = "point index" xaxis.title = "point index"
@ -102,7 +102,7 @@ fun startDemoDeviceServer(context: Context, device: DemoDevice): ApplicationEngi
} }
div("row") { div("row") {
div("col-12") { div("col-12") {
plot(container = container) { plot(renderer = container) {
layout { layout {
title = "cos vs sin" title = "cos vs sin"
xaxis.title = "sin" xaxis.title = "sin"

@ -14,8 +14,6 @@ pluginManagement {
} }
plugins { plugins {
kotlin("jvm") version kotlinVersion kotlin("jvm") version kotlinVersion
id("scientifik.mpp") version toolsVersion id("scientifik.mpp") version toolsVersion
id("scientifik.jvm") version toolsVersion id("scientifik.jvm") version toolsVersion
@ -37,6 +35,7 @@ rootProject.name = "dataforge-control"
include( include(
":dataforge-device-core", ":dataforge-device-core",
":dataforge-device-serial",
":dataforge-device-server", ":dataforge-device-server",
":dataforge-device-client", ":dataforge-device-client",
":demo" ":demo"