Migrate to K-1.4.0

This commit is contained in:
Alexander Nozik 2020-08-31 12:40:49 +03:00
parent 20079e62da
commit 0e6363048e
32 changed files with 276 additions and 180 deletions

View File

@ -1,4 +1,4 @@
val dataforgeVersion by extra("0.1.8") val dataforgeVersion by extra("0.1.9-dev-2")
allprojects { allprojects {
repositories { repositories {

View File

@ -1,19 +1,18 @@
plugins { plugins {
id("scientifik.mpp") id("kscience.mpp")
id("scientifik.publish") id("kscience.publish")
} }
val ktorVersion: String by extra("1.3.2") val ktorVersion: String by extra("1.4.0")
kotlin { kotlin {
js { // js {
browser { // browser {
dceTask { // dceTask {
keep("ktor-ktor-io.\$\$importsForInline\$\$.ktor-ktor-io.io.ktor.utils.io") // keep("ktor-ktor-io.\$\$importsForInline\$\$.ktor-ktor-io.io.ktor.utils.io")
} // }
} // }
} // }
sourceSets { sourceSets {
commonMain { commonMain {

View File

@ -16,8 +16,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.*
import kotlinx.serialization.json.json
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
/* /*
@ -61,14 +60,14 @@ class MagixClient(
} }
} }
private fun wrapMessage(message: DeviceMessage, requestId: String? = null): JsonObject = json { private fun wrapMessage(message: DeviceMessage, requestId: String? = null): JsonObject = buildJsonObject {
"id" to generateId(message, requestId) put("id", generateId(message, requestId))
if (requestId != null) { if (requestId != null) {
"parentId" to requestId put("parentId", requestId)
} }
"target" to "magix" put("target", "magix")
"origin" to "df" put("origin", "df")
"payload" to message.config.toJson() put("payload", message.config.toJson())
} }
@ -81,7 +80,7 @@ class MagixClient(
private val respondJob = launch { private val respondJob = launch {
inbox.collect { json -> inbox.collect { json ->
val requestId = json["id"]?.primitive?.content val requestId = json["id"]?.jsonPrimitive?.content
val payload = json["payload"]?.jsonObject val payload = json["payload"]?.jsonObject
//TODO analyze action //TODO analyze action

View File

@ -1,15 +1,14 @@
import scientifik.useCoroutines
import scientifik.useSerialization
plugins { plugins {
id("scientifik.mpp") id("kscience.mpp")
id("scientifik.publish") id("kscience.publish")
} }
val dataforgeVersion: String by rootProject.extra val dataforgeVersion: String by rootProject.extra
useCoroutines() kscience {
useSerialization() useCoroutines()
useSerialization()
}
kotlin { kotlin {
sourceSets { sourceSets {

View File

@ -3,6 +3,7 @@ package hep.dataforge.control.api
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
import hep.dataforge.io.Envelope import hep.dataforge.io.Envelope
import hep.dataforge.io.EnvelopeBuilder import hep.dataforge.io.EnvelopeBuilder
import hep.dataforge.io.Responder
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem import hep.dataforge.meta.MetaItem
import hep.dataforge.provider.Type import hep.dataforge.provider.Type
@ -10,82 +11,77 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.io.Closeable import kotlinx.io.Closeable
interface Consumer {
fun consume(message: Envelope): Unit
}
/** /**
* General interface describing a managed Device * General interface describing a managed Device
*/ */
@Type(DEVICE_TARGET) @Type(DEVICE_TARGET)
interface Device : Closeable { public interface Device : Responder, Closeable {
/** /**
* List of supported property descriptors * List of supported property descriptors
*/ */
val propertyDescriptors: Collection<PropertyDescriptor> public val propertyDescriptors: Collection<PropertyDescriptor>
/** /**
* List of supported action descriptors. Action is a request to the device that * List of supported action descriptors. Action is a request to the device that
* may or may not change the properties * may or may not change the properties
*/ */
val actionDescriptors: Collection<ActionDescriptor> public val actionDescriptors: Collection<ActionDescriptor>
/** /**
* The scope encompassing all operations on a device. When canceled, cancels all running processes * The scope encompassing all operations on a device. When canceled, cancels all running processes
*/ */
val scope: CoroutineScope public val scope: CoroutineScope
/** /**
* Register a new property change listener for this device. * Register a new property change listener for this device.
* [owner] is provided optionally in order for listener to be * [owner] is provided optionally in order for listener to be
* easily removable * easily removable
*/ */
fun registerListener(listener: DeviceListener, owner: Any? = listener) public fun registerListener(listener: DeviceListener, owner: Any? = listener)
/** /**
* Remove all listeners belonging to the specified owner * Remove all listeners belonging to the specified owner
*/ */
fun removeListeners(owner: Any?) public fun removeListeners(owner: Any?)
/** /**
* Get the value of the property or throw error if property in not defined. * Get the value of the property or throw error if property in not defined.
* Suspend if property value is not available * Suspend if property value is not available
*/ */
suspend fun getProperty(propertyName: String): MetaItem<*> public suspend fun getProperty(propertyName: String): MetaItem<*>
/** /**
* Invalidate property and force recalculate * Invalidate property and force recalculate
*/ */
suspend fun invalidateProperty(propertyName: String) public suspend fun invalidateProperty(propertyName: String)
/** /**
* Set property [value] for a property with name [propertyName]. * Set property [value] for a property with name [propertyName].
* In rare cases could suspend if the [Device] supports command queue and it is full at the moment. * In rare cases could suspend if the [Device] supports command queue and it is full at the moment.
*/ */
suspend fun setProperty(propertyName: String, value: MetaItem<*>) public suspend fun setProperty(propertyName: String, value: MetaItem<*>)
/** /**
* Send an action request and suspend caller while request is being processed. * Send an action request and suspend caller while request is being processed.
* Could return null if request does not return a meaningful answer. * Could return null if request does not return a meaningful answer.
*/ */
suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>? public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>?
/** /**
* *
* A request with binary data or for binary response (or both). This request does not cover basic functionality like * A request with binary data or for binary response (or both). This request does not cover basic functionality like
* [setProperty], [getProperty] or [execute] and not defined for a generic device. * [setProperty], [getProperty] or [execute] and not defined for a generic device.
* *
* TODO implement Responder after DF 0.1.9
*/ */
suspend fun respond(request: Envelope): EnvelopeBuilder = error("No binary response defined") override suspend fun respond(request: Envelope): EnvelopeBuilder = error("No binary response defined")
override fun close() { override fun close() {
scope.cancel("The device is closed") scope.cancel("The device is closed")
} }
companion object { public companion object {
const val DEVICE_TARGET = "device" public const val DEVICE_TARGET: String = "device"
} }
} }
suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) }) public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) })

View File

@ -1,15 +1,14 @@
package hep.dataforge.control.api package hep.dataforge.control.api
import hep.dataforge.meta.MetaItem import hep.dataforge.meta.MetaItem
import hep.dataforge.names.Name import hep.dataforge.names.*
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider import hep.dataforge.provider.Provider
/** /**
* A hub that could locate multiple devices and redirect actions to them * A hub that could locate multiple devices and redirect actions to them
*/ */
interface DeviceHub : Provider { public interface DeviceHub : Provider {
val devices: Map<Name, Device>//TODO use token instead of Names public val devices: Map<NameToken, Device>
override val defaultTarget: String get() = Device.DEVICE_TARGET override val defaultTarget: String get() = Device.DEVICE_TARGET
@ -17,31 +16,51 @@ interface DeviceHub : Provider {
override fun provideTop(target: String): Map<Name, Any> { override fun provideTop(target: String): Map<Name, Any> {
if (target == Device.DEVICE_TARGET) { if (target == Device.DEVICE_TARGET) {
return devices return buildMap {
fun putAll(prefix: Name, hub: DeviceHub) {
hub.devices.forEach {
put(prefix + it.key, it.value)
}
}
devices.forEach {
val name = it.key.asName()
put(name, it.value)
(it.value as? DeviceHub)?.let { hub ->
putAll(name, hub)
}
}
}
} else { } else {
throw IllegalArgumentException("Target $target is not supported for $this") throw IllegalArgumentException("Target $target is not supported for $this")
} }
} }
companion object { public companion object {
} }
} }
operator fun DeviceHub.get(deviceName: Name) = public operator fun DeviceHub.get(nameToken: NameToken): Device =
devices[deviceName] ?: error("Device with name $deviceName not found in $this") devices[nameToken] ?: error("Device with name $nameToken not found in $this")
operator fun DeviceHub.get(deviceName: String) = get(deviceName.toName()) public operator fun DeviceHub.get(name: Name): Device? = when {
name.isEmpty() -> this as? Device
suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem<*> = name.length == 1 -> get(name.firstOrNull()!!)
this[deviceName].getProperty(propertyName) else -> (get(name.firstOrNull()!!) as? DeviceHub)?.get(name.cutFirst())
suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value: MetaItem<*>) {
this[deviceName].setProperty(propertyName, value)
} }
suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: MetaItem<*>?): MetaItem<*>? = public operator fun DeviceHub.get(deviceName: String): Device? = get(deviceName.toName())
this[deviceName].execute(command, argument)
public suspend fun DeviceHub.getProperty(deviceName: Name, propertyName: String): MetaItem<*>? =
this[deviceName]?.getProperty(propertyName)
public suspend fun DeviceHub.setProperty(deviceName: Name, propertyName: String, value: MetaItem<*>) {
this[deviceName]?.setProperty(propertyName, value)
}
public suspend fun DeviceHub.execute(deviceName: Name, command: String, argument: MetaItem<*>?): MetaItem<*>? =
this[deviceName]?.execute(command, argument)
//suspend fun DeviceHub.respond(request: Envelope): EnvelopeBuilder { //suspend fun DeviceHub.respond(request: Envelope): EnvelopeBuilder {

View File

@ -8,6 +8,7 @@ import hep.dataforge.meta.string
*/ */
class PropertyDescriptor(name: String) : Scheme() { class PropertyDescriptor(name: String) : Scheme() {
val name by string(name) val name by string(name)
var info by string()
} }
/** /**
@ -15,6 +16,7 @@ class PropertyDescriptor(name: String) : Scheme() {
*/ */
class ActionDescriptor(name: String) : Scheme() { class ActionDescriptor(name: String) : Scheme() {
val name by string(name) val name by string(name)
var info by string()
//var descriptor by spec(ItemDescriptor) //var descriptor by spec(ItemDescriptor)
} }

View File

@ -292,4 +292,4 @@ fun <D : DeviceBase> D.writingDouble(
innerGetter, innerGetter,
innerSetter innerSetter
) )
} }

View File

@ -1,6 +1,9 @@
package hep.dataforge.control.controllers package hep.dataforge.control.controllers
import hep.dataforge.control.api.* import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.get
import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION import hep.dataforge.control.controllers.DeviceMessage.Companion.PROPERTY_CHANGED_ACTION
import hep.dataforge.io.Envelope import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder import hep.dataforge.io.Responder
@ -80,7 +83,7 @@ class DeviceController(
"source" put deviceTarget "source" put deviceTarget
} }
} }
return response.build() return response.seal()
} }
} catch (ex: Exception) { } catch (ex: Exception) {
DeviceMessage.fail { DeviceMessage.fail {

View File

@ -4,10 +4,10 @@ import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY
import hep.dataforge.io.SimpleEnvelope import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.asName import hep.dataforge.names.asName
import kotlinx.serialization.Decoder
import kotlinx.serialization.Encoder
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerialDescriptor import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
class DeviceMessage : Scheme() { class DeviceMessage : Scheme() {
var source by string(key = SOURCE_KEY) var source by string(key = SOURCE_KEY)

View File

@ -38,3 +38,9 @@ fun <T : Any> DeviceProperty.convert(metaConverter: MetaConverter<T>): ReadWrite
fun ReadOnlyDeviceProperty.double() = convert(MetaConverter.double) fun ReadOnlyDeviceProperty.double() = convert(MetaConverter.double)
fun DeviceProperty.double() = convert(MetaConverter.double) fun DeviceProperty.double() = convert(MetaConverter.double)
fun ReadOnlyDeviceProperty.int() = convert(MetaConverter.int)
fun DeviceProperty.int() = convert(MetaConverter.int)
fun ReadOnlyDeviceProperty.string() = convert(MetaConverter.string)
fun DeviceProperty.string() = convert(MetaConverter.string)

View File

@ -1,18 +1,26 @@
package hep.dataforge.control.ports package hep.dataforge.control.ports
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.io.Closeable import kotlinx.io.Closeable
import mu.KLogger import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.CoroutineContext
abstract class Port(val scope: CoroutineScope) : Closeable { interface Port: Closeable {
suspend fun send(data: ByteArray)
suspend fun receiving(): Flow<ByteArray>
fun isOpen(): Boolean
}
abstract val logger: KLogger
abstract class AbstractPort(parentContext: CoroutineContext) : Port {
protected val scope = CoroutineScope(SupervisorJob(parentContext[Job]))
protected val logger: KLogger by lazy { KotlinLogging.logger(toString()) }
private val outgoing = Channel<ByteArray>(100) private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(Channel.CONFLATED) private val incoming = Channel<ByteArray>(Channel.CONFLATED)
@ -53,7 +61,7 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
/** /**
* Send a data packet via the port * Send a data packet via the port
*/ */
suspend fun send(data: ByteArray) { override suspend fun send(data: ByteArray) {
outgoing.send(data) outgoing.send(data)
} }
@ -62,7 +70,7 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
* In order to form phrases some condition should used on top of it. * In order to form phrases some condition should used on top of it.
* For example [delimitedIncoming] generates phrases with fixed delimiter. * For example [delimitedIncoming] generates phrases with fixed delimiter.
*/ */
fun incoming(): Flow<ByteArray> { override suspend fun receiving(): Flow<ByteArray> {
return incoming.receiveAsFlow() return incoming.receiveAsFlow()
} }
@ -70,7 +78,10 @@ abstract class Port(val scope: CoroutineScope) : Closeable {
outgoing.close() outgoing.close()
incoming.close() incoming.close()
sendJob.cancel() sendJob.cancel()
scope.cancel()
} }
override fun isOpen(): Boolean = scope.isActive
} }
/** /**

View File

@ -0,0 +1,36 @@
package hep.dataforge.control.ports
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
class PortProxy(val factory: suspend () -> Port) : Port {
private var actualPort: Port? = null
private val mutex = Mutex()
suspend fun port(): Port{
return mutex.withLock {
if(actualPort?.isOpen() == true){
actualPort!!
} else {
factory().also{
actualPort = it
}
}
}
}
override suspend fun send(data: ByteArray) {
port().send(data)
}
override suspend fun receiving(): Flow<ByteArray> = port().receiving()
// open by default
override fun isOpen(): Boolean = true
override fun close() {
actualPort?.close()
}
}

View File

@ -19,7 +19,7 @@ class SynchronousPortHandler(val port: Port) {
suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R { suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R {
return mutex.withLock { return mutex.withLock {
port.send(data) port.send(data)
transform(port.incoming()) transform(port.receiving())
} }
} }
} }

View File

@ -3,9 +3,13 @@ package hep.dataforge.control.ports
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.io.ByteArrayOutput import kotlinx.io.ByteArrayOutput
fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> = flow { /**
* Transform byte fragments into complete phrases using given delimiter
*/
public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> = flow {
require(delimiter.isNotEmpty()) { "Delimiter must not be empty" } require(delimiter.isNotEmpty()) { "Delimiter must not be empty" }
var output = ByteArrayOutput(expectedMessageSize) var output = ByteArrayOutput(expectedMessageSize)
@ -31,8 +35,15 @@ fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int
} }
} }
/**
* Transform byte fragments into utf-8 phrases using utf-8 delimiter
*/
public fun Flow<ByteArray>.withDelimiter(delimiter: String, expectedMessageSize: Int = 32): Flow<String> {
return withDelimiter(delimiter.encodeToByteArray()).map { it.decodeToString() }
}
/** /**
* A flow of delimited phrases * A flow of delimited phrases
*/ */
fun Port.delimitedIncoming(delimiter: ByteArray, expectedMessageSize: Int = 32) = public suspend fun Port.delimitedIncoming(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> =
incoming().withDelimiter(delimiter, expectedMessageSize) receiving().withDelimiter(delimiter, expectedMessageSize)

View File

@ -6,19 +6,21 @@ import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.consumeEachBufferRange import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.writeAvailable import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.* import kotlinx.coroutines.Dispatchers
import mu.KLogger import kotlinx.coroutines.async
import mu.KotlinLogging import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import java.net.InetSocketAddress import java.net.InetSocketAddress
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
class KtorTcpPort internal constructor( class KtorTcpPort internal constructor(
scope: CoroutineScope, parentContext: CoroutineContext,
val host: String, val host: String,
val port: Int val port: Int
) : Port(scope), AutoCloseable { ) : AbstractPort(parentContext), AutoCloseable {
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") override fun toString() = "port[tcp:$host:$port]"
private val futureSocket = scope.async { private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port)) aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port))
@ -48,10 +50,9 @@ class KtorTcpPort internal constructor(
super.close() super.close()
} }
companion object{ companion object {
suspend fun open(host: String, port: Int): KtorTcpPort{ suspend fun open(host: String, port: Int): KtorTcpPort {
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) return KtorTcpPort(coroutineContext, host, port)
return KtorTcpPort(scope, host, port)
} }
} }
} }

View File

@ -1,11 +1,10 @@
package hep.dataforge.control.ports package hep.dataforge.control.ports
import kotlinx.coroutines.* import kotlinx.coroutines.*
import mu.KLogger
import mu.KotlinLogging
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray { internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
@ -17,12 +16,12 @@ internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
} }
class TcpPort private constructor( class TcpPort private constructor(
scope: CoroutineScope, parentContext: CoroutineContext,
val host: String, val host: String,
val port: Int val port: Int
) : Port(scope), AutoCloseable { ) : AbstractPort(parentContext), AutoCloseable {
override val logger: KLogger = KotlinLogging.logger("port[tcp:$host:$port]") override fun toString(): String = "port[tcp:$host:$port]"
private val futureChannel: Deferred<SocketChannel> = this.scope.async(Dispatchers.IO) { private val futureChannel: Deferred<SocketChannel> = this.scope.async(Dispatchers.IO) {
SocketChannel.open(InetSocketAddress(host, port)).apply { SocketChannel.open(InetSocketAddress(host, port)).apply {
@ -64,8 +63,7 @@ class TcpPort private constructor(
companion object{ companion object{
suspend fun open(host: String, port: Int): TcpPort{ suspend fun open(host: String, port: Int): TcpPort{
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) return TcpPort(coroutineContext, host, port)
return TcpPort(scope, host, port)
} }
} }
} }

View File

@ -58,7 +58,7 @@ class TcpPortTest {
val port = TcpPort.open("localhost", 22188) val port = TcpPort.open("localhost", 22188)
val logJob = launch { val logJob = launch {
port.incoming().collect { port.receiving().collect {
println("Flow: ${it.decodeToString()}") println("Flow: ${it.decodeToString()}")
} }
} }
@ -83,7 +83,7 @@ class TcpPortTest {
val port = KtorTcpPort.open("localhost", 22188) val port = KtorTcpPort.open("localhost", 22188)
val logJob = launch { val logJob = launch {
port.incoming().collect { port.receiving().collect {
println("Flow: ${it.decodeToString()}") println("Flow: ${it.decodeToString()}")
} }
} }

View File

@ -1,6 +1,6 @@
plugins { plugins {
id("scientifik.jvm") id("kscience.jvm")
id("scientifik.publish") id("kscience.publish")
} }
dependencies{ dependencies{

View File

@ -1,21 +1,19 @@
package hep.dataforge.control.serial package hep.dataforge.control.serial
import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.AbstractPort
import jssc.SerialPort.* import jssc.SerialPort.*
import jssc.SerialPortEventListener import jssc.SerialPortEventListener
import kotlinx.coroutines.CoroutineScope import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.coroutineContext import kotlin.coroutines.coroutineContext
import jssc.SerialPort as JSSCPort import jssc.SerialPort as JSSCPort
/** /**
* COM/USB port * COM/USB port
*/ */
class SerialPort private constructor(scope: CoroutineScope, val jssc: JSSCPort) : Port(scope) { public class SerialPort private constructor(parentContext: CoroutineContext, private val jssc: JSSCPort) :
override val logger: KLogger = KotlinLogging.logger("port[${jssc.portName}]") AbstractPort(parentContext) {
override fun toString(): String = "port[${jssc.portName}]"
private val serialPortListener = SerialPortEventListener { event -> private val serialPortListener = SerialPortEventListener { event ->
if (event.isRXCHAR) { if (event.isRXCHAR) {
@ -32,7 +30,7 @@ class SerialPort private constructor(scope: CoroutineScope, val jssc: JSSCPort)
/** /**
* Clear current input and output buffers * Clear current input and output buffers
*/ */
fun clearPort() { internal fun clearPort() {
jssc.purgePort(PURGE_RXCLEAR or PURGE_TXCLEAR) jssc.purgePort(PURGE_RXCLEAR or PURGE_TXCLEAR)
} }
@ -50,24 +48,23 @@ class SerialPort private constructor(scope: CoroutineScope, val jssc: JSSCPort)
super.close() super.close()
} }
companion object { public companion object {
/** /**
* Construct ComPort with given parameters * Construct ComPort with given parameters
*/ */
suspend fun open( public suspend fun open(
portName: String, portName: String,
baudRate: Int = BAUDRATE_9600, baudRate: Int = BAUDRATE_9600,
dataBits: Int = DATABITS_8, dataBits: Int = DATABITS_8,
stopBits: Int = STOPBITS_1, stopBits: Int = STOPBITS_1,
parity: Int = PARITY_NONE parity: Int = PARITY_NONE,
): SerialPort { ): SerialPort {
val jssc = JSSCPort(portName).apply { val jssc = JSSCPort(portName).apply {
openPort() openPort()
setParams(baudRate, dataBits, stopBits, parity) setParams(baudRate, dataBits, stopBits, parity)
} }
val scope = CoroutineScope(SupervisorJob(coroutineContext[Job])) return SerialPort(coroutineContext, jssc)
return SerialPort(scope, jssc)
} }
} }
} }

View File

@ -1,11 +1,11 @@
import scientifik.useSerialization
plugins { plugins {
id("scientifik.jvm") id("kscience.jvm")
id("scientifik.publish") id("kscience.publish")
} }
useSerialization() kscience {
useSerialization()
}
val dataforgeVersion: String by rootProject.extra val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by extra("1.3.2") val ktorVersion: String by extra("1.3.2")

View File

@ -8,36 +8,36 @@ import io.ktor.http.ContentType
import io.ktor.http.cio.websocket.Frame import io.ktor.http.cio.websocket.Frame
import io.ktor.response.respondText import io.ktor.response.respondText
import kotlinx.io.asBinary import kotlinx.io.asBinary
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObjectBuilder import kotlinx.serialization.json.JsonObjectBuilder
import kotlinx.serialization.json.json import kotlinx.serialization.json.buildJsonObject
fun Frame.toEnvelope(): Envelope {
internal fun Frame.toEnvelope(): Envelope {
return data.asBinary().readWith(TaggedEnvelopeFormat) return data.asBinary().readWith(TaggedEnvelopeFormat)
} }
fun Envelope.toFrame(): Frame { internal fun Envelope.toFrame(): Frame {
val data = buildByteArray { val data = buildByteArray {
writeWith(TaggedEnvelopeFormat,this@toFrame) writeWith(TaggedEnvelopeFormat, this@toFrame)
} }
return Frame.Binary(false, data) return Frame.Binary(false, data)
} }
suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) { internal suspend fun ApplicationCall.respondJson(builder: JsonObjectBuilder.() -> Unit) {
val json = json(builder) val json = buildJsonObject(builder)
respondText(json.toString(), contentType = ContentType.Application.Json) respondText(json.toString(), contentType = ContentType.Application.Json)
} }
@OptIn(UnstableDefault::class)
suspend fun ApplicationCall.respondMessage(message: DeviceMessage) { public suspend fun ApplicationCall.respondMessage(message: DeviceMessage) {
respondText(Json.stringify(MetaSerializer,message.toMeta()), contentType = ContentType.Application.Json) respondText(Json.encodeToString(MetaSerializer, message.toMeta()), contentType = ContentType.Application.Json)
} }
suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) { public suspend fun ApplicationCall.respondMessage(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage(builder)) respondMessage(DeviceMessage(builder))
} }
suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) { public suspend fun ApplicationCall.respondFail(builder: DeviceMessage.() -> Unit) {
respondMessage(DeviceMessage.fail(null, builder)) respondMessage(DeviceMessage.fail(null, builder))
} }

View File

@ -1,4 +1,4 @@
@file:OptIn(ExperimentalCoroutinesApi::class, KtorExperimentalAPI::class, FlowPreview::class, UnstableDefault::class) @file:OptIn(ExperimentalCoroutinesApi::class, KtorExperimentalAPI::class, FlowPreview::class)
package hep.dataforge.control.server package hep.dataforge.control.server
@ -35,15 +35,16 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.html.* import kotlinx.html.*
import kotlinx.serialization.UnstableDefault
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.jsonArray import kotlinx.serialization.json.buildJsonArray
import kotlinx.serialization.json.put
/** /**
* Create and start a web server for several devices * Create and start a web server for several devices
*/ */
fun CoroutineScope.startDeviceServer( @OptIn(KtorExperimentalAPI::class)
public fun CoroutineScope.startDeviceServer(
manager: DeviceManager, manager: DeviceManager,
port: Int = 8111, port: Int = 8111,
host: String = "localhost" host: String = "localhost"
@ -54,9 +55,6 @@ fun CoroutineScope.startDeviceServer(
install(CORS) { install(CORS) {
anyHost() anyHost()
} }
// install(ContentNegotiation) {
// json()
// }
install(StatusPages) { install(StatusPages) {
exception<IllegalArgumentException> { cause -> exception<IllegalArgumentException> { cause ->
call.respond(HttpStatusCode.BadRequest, cause.message ?: "") call.respond(HttpStatusCode.BadRequest, cause.message ?: "")
@ -71,15 +69,15 @@ fun CoroutineScope.startDeviceServer(
}.start() }.start()
} }
fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) { public fun ApplicationEngine.whenStarted(callback: Application.() -> Unit) {
environment.monitor.subscribe(ApplicationStarted, callback) environment.monitor.subscribe(ApplicationStarted, callback)
} }
const val WEB_SERVER_TARGET = "@webServer" public const val WEB_SERVER_TARGET: String = "@webServer"
@OptIn(KtorExperimentalAPI::class) @OptIn(KtorExperimentalAPI::class)
fun Application.deviceModule( public fun Application.deviceModule(
manager: DeviceManager, manager: DeviceManager,
deviceNames: Collection<String> = manager.devices.keys.map { it.toString() }, deviceNames: Collection<String> = manager.devices.keys.map { it.toString() },
route: String = "/" route: String = "/"
@ -152,17 +150,17 @@ fun Application.deviceModule(
get("list") { get("list") {
call.respondJson { call.respondJson {
manager.devices.forEach { (name, device) -> manager.devices.forEach { (name, device) ->
"target" to name.toString() put("target", name.toString())
"properties" to jsonArray { put("properties", buildJsonArray {
device.propertyDescriptors.forEach { descriptor -> device.propertyDescriptors.forEach { descriptor ->
+descriptor.config.toJson() add(descriptor.config.toJson())
} }
} })
"actions" to jsonArray { put("actions", buildJsonArray {
device.actionDescriptors.forEach { actionDescriptor -> device.actionDescriptors.forEach { actionDescriptor ->
+actionDescriptor.config.toJson() add(actionDescriptor.config.toJson())
} }
} })
} }
} }
} }
@ -187,7 +185,7 @@ fun Application.deviceModule(
post("message") { post("message") {
val body = call.receiveText() val body = call.receiveText()
val json = Json.parseJson(body) as? JsonObject val json = Json.parseToJsonElement(body) as? JsonObject
?: throw IllegalArgumentException("The body is not a json object") ?: throw IllegalArgumentException("The body is not a json object")
val meta = json.toMeta() val meta = json.toMeta()
@ -220,7 +218,7 @@ fun Application.deviceModule(
val target: String by call.parameters val target: String by call.parameters
val property: String by call.parameters val property: String by call.parameters
val body = call.receiveText() val body = call.receiveText()
val json = Json.parseJson(body) val json = Json.parseToJsonElement(body)
val request = DeviceMessage { val request = DeviceMessage {
type = SET_PROPERTY_ACTION type = SET_PROPERTY_ACTION

View File

@ -12,7 +12,7 @@ import kotlinx.coroutines.flow.collect
/** /**
* The data class representing a SSE Event that will be sent to the client. * The data class representing a SSE Event that will be sent to the client.
*/ */
data class SseEvent(val data: String, val event: String? = null, val id: String? = null) public data class SseEvent(val data: String, val event: String? = null, val id: String? = null)
/** /**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel] * Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]
@ -21,7 +21,7 @@ data class SseEvent(val data: String, val event: String? = null, val id: String?
* You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/ * You can read more about it here: https://www.html5rocks.com/en/tutorials/eventsource/basics/
*/ */
@Suppress("BlockingMethodInNonBlockingContext") @Suppress("BlockingMethodInNonBlockingContext")
suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) { public suspend fun ApplicationCall.respondSse(events: Flow<SseEvent>) {
response.cacheControl(CacheControl.NoCache(null)) response.cacheControl(CacheControl.NoCache(null))
respondTextWriter(contentType = ContentType.Text.EventStream) { respondTextWriter(contentType = ContentType.Text.EventStream) {
events.collect { event-> events.collect { event->

Binary file not shown.

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-6.6.1-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

2
gradlew vendored
View File

@ -130,7 +130,7 @@ fi
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"` APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"` JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath # We build the pattern for arguments to be converted via cygpath

21
gradlew.bat vendored
View File

@ -40,7 +40,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1 %JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init if "%ERRORLEVEL%" == "0" goto execute
echo. echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
@ -54,7 +54,7 @@ goto fail
set JAVA_HOME=%JAVA_HOME:"=% set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init if exist "%JAVA_EXE%" goto execute
echo. echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
@ -64,21 +64,6 @@ echo location of your Java installation.
goto fail goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute :execute
@rem Setup the command line @rem Setup the command line
@ -86,7 +71,7 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle @rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end :end
@rem End local scope for the variables with windows NT shell @rem End local scope for the variables with windows NT shell

View File

@ -1,6 +1,6 @@
plugins { plugins {
id("scientifik.jvm") id("kscience.jvm")
id("scientifik.publish") id("kscience.publish")
} }
//TODO to be moved to a separate project //TODO to be moved to a separate project

Binary file not shown.

View File

@ -0,0 +1,36 @@
package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.control.base.DeviceBase
import hep.dataforge.control.base.DeviceProperty
import hep.dataforge.control.base.writingVirtual
import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortProxy
import hep.dataforge.control.ports.withDelimiter
import hep.dataforge.meta.MetaItem
import hep.dataforge.values.Null
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.first
class PiMotionMasterDevice(parentScope: CoroutineScope, val portFactory: suspend (MetaItem<*>?) -> Port) : DeviceBase() {
override val scope: CoroutineScope = CoroutineScope(
parentScope.coroutineContext + Job(parentScope.coroutineContext[Job])
)
public val port: DeviceProperty by writingVirtual(Null) {
info = "The port for TCP connector"
}
private val connector = PortProxy { portFactory(port.value) }
private suspend fun readPhrase(command: String) {
connector.receiving().withDelimiter("\n").first { it.startsWith(command) }
}
//
// val firmwareVersion by reading {
// connector.r
// }
}

View File

@ -1,6 +1,6 @@
pluginManagement { pluginManagement {
val kotlinVersion = "1.3.72" val kotlinVersion = "1.4.0"
val toolsVersion = "0.5.2" val toolsVersion = "0.6.0"
repositories { repositories {
mavenLocal() mavenLocal()
@ -24,7 +24,7 @@ pluginManagement {
resolutionStrategy { resolutionStrategy {
eachPlugin { eachPlugin {
when (requested.id.id) { when (requested.id.id) {
"scientifik.publish", "scientifik.mpp", "scientifik.jvm", "scientifik.js" -> useModule("scientifik:gradle-tools:${toolsVersion}") "kscience.publish", "kscience.mpp", "kscience.jvm", "kscience.js" -> useModule("ru.mipt.npm:gradle-tools:${toolsVersion}")
"kotlinx-atomicfu" -> useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:${requested.version}") "kotlinx-atomicfu" -> useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:${requested.version}")
} }
} }