Dev #6

altavir merged 75 commits from dev into master 2021-10-23 11:02:48 +03:00
22 changed files with 350 additions and 171 deletions
Showing only changes of commit bf213c9814 - Show all commits

@ -35,10 +35,10 @@ import kotlin.coroutines.CoroutineContext
* Communicate with server in [Magix format](
class MagixClient(
val manager: DeviceManager,
val postUrl: Url,
val inbox: Flow<JsonObject>
public class MagixClient(
private val manager: DeviceManager,
private val postUrl: Url,
private val inbox: Flow<JsonObject>
): CoroutineScope {
override val coroutineContext: CoroutineContext = manager.context.coroutineContext + Job(manager.context.coroutineContext[Job])

@ -1,5 +1,6 @@
package hep.dataforge.control.api
import hep.dataforge.context.ContextAware
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
@ -14,7 +15,7 @@ import
* General interface describing a managed Device
public interface Device : Closeable {
public interface Device : Closeable, ContextAware {
* List of supported property descriptors
@ -27,7 +28,7 @@ public interface Device : Closeable {
public val actionDescriptors: Collection<ActionDescriptor>
* The scope encompassing all operations on a device. When canceled, cancels all running processes
* The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes
public val scope: CoroutineScope

@ -1,5 +1,6 @@
package hep.dataforge.control.base
import hep.dataforge.context.Context
import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
@ -14,7 +15,8 @@ import kotlinx.coroutines.sync.withLock
* Baseline implementation of [Device] interface
public abstract class DeviceBase : Device {
public abstract class DeviceBase(override val context: Context) : Device {
private val _properties = HashMap<String, ReadOnlyDeviceProperty>()
public val properties: Map<String, ReadOnlyDeviceProperty> get() = _properties
private val _actions = HashMap<String, Action>()

@ -31,29 +31,29 @@ private class ActionProvider<D : DeviceBase>(
public fun DeviceBase.requesting(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> MetaItem<*>?,
): PropertyDelegateProvider<DeviceBase, ActionDelegate> = ActionProvider(this, descriptorBuilder, block)
action: suspend (MetaItem<*>?) -> MetaItem<*>?,
): PropertyDelegateProvider<DeviceBase, ActionDelegate> = ActionProvider(this, descriptorBuilder, action)
public fun <D : DeviceBase> D.requestingValue(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> Any?,
action: suspend (MetaItem<*>?) -> Any?,
): PropertyDelegateProvider<D, ActionDelegate> = ActionProvider(this, descriptorBuilder) {
val res = block(it)
val res = action(it)
public fun <D : DeviceBase> D.requestingMeta(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend MetaBuilder.(MetaItem<*>?) -> Unit,
action: suspend MetaBuilder.(MetaItem<*>?) -> Unit,
): PropertyDelegateProvider<D, ActionDelegate> = ActionProvider(this, descriptorBuilder) {
val res = MetaBuilder().apply { block(it) }
val res = MetaBuilder().apply { action(it) }
public fun DeviceBase.acting(
descriptorBuilder: ActionDescriptor.() -> Unit = {},
block: suspend (MetaItem<*>?) -> Unit,
action: suspend (MetaItem<*>?) -> Unit,
): PropertyDelegateProvider<DeviceBase, ActionDelegate> = ActionProvider(this, descriptorBuilder) {

@ -68,7 +68,7 @@ public fun DeviceBase.readingNumber(
public fun DeviceBase.readingString(
default: Number? = null,
default: String? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend () -> String,
): PropertyDelegateProvider<DeviceBase, ReadOnlyPropertyDelegate> = ReadOnlyDevicePropertyProvider(
@ -81,6 +81,20 @@ public fun DeviceBase.readingString(
public fun DeviceBase.readingBoolean(
default: Boolean? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},
getter: suspend () -> Boolean,
): PropertyDelegateProvider<DeviceBase, ReadOnlyPropertyDelegate> = ReadOnlyDevicePropertyProvider(
default?.let { MetaItem.ValueItem(it.asValue()) },
getter = {
val boolean = getter()
public fun DeviceBase.readingMeta(
default: Meta? = null,
descriptorBuilder: PropertyDescriptor.() -> Unit = {},

@ -3,4 +3,4 @@ package hep.dataforge.control.base
import hep.dataforge.meta.MetaItem
import hep.dataforge.values.asValue
fun Double.asMetaItem(): MetaItem.ValueItem = MetaItem.ValueItem(asValue())
public fun Double.asMetaItem(): MetaItem.ValueItem = MetaItem.ValueItem(asValue())

@ -9,22 +9,21 @@ import
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.meta.wrap
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
class DeviceController(
val device: Device,
val deviceTarget: String,
val scope: CoroutineScope = device.scope
public class DeviceController(
public val device: Device,
public val deviceTarget: String,
public val scope: CoroutineScope = device.scope
) : Responder, Consumer, DeviceListener {
init {
@ -33,7 +32,7 @@ class DeviceController(
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
suspend fun respondMessage(message: DeviceMessage): DeviceMessage = respondMessage(device, deviceTarget, message)
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = respondMessage(device, deviceTarget, message)
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceTarget, request)
@ -54,8 +53,9 @@ class DeviceController(
fun output() = outputChannel.consumeAsFlow()
public fun recieving(): Flow<Envelope> = outputChannel.consumeAsFlow()
override fun consume(message: Envelope) {
// Fire the respond procedure and forget about the result
scope.launch {
@ -63,12 +63,12 @@ class DeviceController(
companion object {
const val GET_PROPERTY_ACTION = "read"
const val SET_PROPERTY_ACTION = "write"
const val EXECUTE_ACTION = "execute"
const val PROPERTY_LIST_ACTION = "propertyList"
const val ACTION_LIST_ACTION = "actionList"
public companion object {
public const val GET_PROPERTY_ACTION: String = "read"
public const val SET_PROPERTY_ACTION: String = "write"
public const val EXECUTE_ACTION: String = "execute"
public const val PROPERTY_LIST_ACTION: String = "propertyList"
public const val ACTION_LIST_ACTION: String = "actionList"
internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
val target = request.meta["target"].string
@ -166,7 +166,7 @@ class DeviceController(
suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = ?: Name.EMPTY
val device = this[targetName] ?: error("The device with name $targetName not found in $this")

@ -11,7 +11,7 @@ import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import kotlin.reflect.KClass
class DeviceManager : AbstractPlugin(), DeviceHub {
public class DeviceManager : AbstractPlugin(), DeviceHub {
override val tag: PluginTag get() = Companion.tag
@ -20,17 +20,17 @@ class DeviceManager : AbstractPlugin(), DeviceHub {
private val top = HashMap<NameToken, Device>()
override val devices: Map<NameToken, Device> get() = top
val controller by lazy {
public val controller: HubController by lazy {
HubController(this, context)
fun registerDevice(name: NameToken, device: Device) {
public fun registerDevice(name: NameToken, device: Device) {
top[name] = device
override fun provideTop(target: String): Map<Name, Any> = super<DeviceHub>.provideTop(target)
companion object : PluginFactory<DeviceManager> {
public companion object : PluginFactory<DeviceManager> {
override val tag: PluginTag = PluginTag("devices", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out DeviceManager> = DeviceManager::class
@ -39,5 +39,5 @@ class DeviceManager : AbstractPlugin(), DeviceHub {
val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager)
public val Context.devices: DeviceManager get() = plugins.fetch(DeviceManager)

@ -3,19 +3,20 @@ package hep.dataforge.control.controllers
import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
class DeviceMessage : Scheme() {
var source by string(key = SOURCE_KEY)
var target by string(key = TARGET_KEY)
var type by string(default = GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY)
var comment by string()
var status by string(RESPONSE_OK_STATUS)
var data: List<MessageData>
public class DeviceMessage : Scheme() {
public var source: String? by string(key = SOURCE_KEY)
public var target: String? by string(key = TARGET_KEY)
public var type: String by string(default = GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY)
public var comment: String? by string()
public var status: String by string(RESPONSE_OK_STATUS)
public var data: List<MessageData>
get() = config.getIndexed(MESSAGE_DATA_KEY) { MessageData.wrap(it.node!!) }
set(value) {
config[MESSAGE_DATA_KEY] = { it.config }
@ -24,27 +25,27 @@ class DeviceMessage : Scheme() {
* Append a payload to this message according to the given scheme
fun <T : Configurable> append(spec: Specification<T>, block: T.() -> Unit): T =
public fun <T : Configurable> append(spec: Specification<T>, block: T.() -> Unit): T =
spec.invoke(block).also { config.append(MESSAGE_DATA_KEY, it) }
companion object : SchemeSpec<DeviceMessage>(::DeviceMessage), KSerializer<DeviceMessage> {
val SOURCE_KEY = "source".asName()
val TARGET_KEY = "target".asName()
val MESSAGE_TYPE_KEY = "type".asName()
val MESSAGE_DATA_KEY = "data".asName()
public companion object : SchemeSpec<DeviceMessage>(::DeviceMessage), KSerializer<DeviceMessage> {
public val SOURCE_KEY: Name = "source".asName()
public val TARGET_KEY: Name = "target".asName()
public val MESSAGE_TYPE_KEY: Name = "type".asName()
public val MESSAGE_DATA_KEY: Name = "data".asName()
const val RESPONSE_OK_STATUS = "response.OK"
const val RESPONSE_FAIL_STATUS = "response.FAIL"
const val PROPERTY_CHANGED_ACTION = "event.propertyChange"
public const val RESPONSE_OK_STATUS: String = "response.OK"
public const val RESPONSE_FAIL_STATUS: String = "response.FAIL"
public const val PROPERTY_CHANGED_ACTION: String = "event.propertyChange"
inline fun ok(
public inline fun ok(
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage {
target = request?.source
inline fun fail(
public inline fun fail(
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
): DeviceMessage = DeviceMessage {
@ -65,16 +66,16 @@ class DeviceMessage : Scheme() {
class MessageData : Scheme() {
var name by string { error("Property name could not be empty") }
var value by item(key = DATA_VALUE_KEY)
public class MessageData : Scheme() {
public var name: String by string { error("Property name could not be empty") }
public var value: MetaItem<*>? by item(key = DATA_VALUE_KEY)
companion object : SchemeSpec<MessageData>(::MessageData) {
val DATA_VALUE_KEY = "value".asName()
public companion object : SchemeSpec<MessageData>(::MessageData) {
public val DATA_VALUE_KEY: Name = "value".asName()
fun MessageData.() -> Unit): MessageData = append(MessageData, block)
public fun MessageData.() -> Unit): MessageData = append(MessageData, block)
fun DeviceMessage.wrap() = SimpleEnvelope(this.config, null)
public fun DeviceMessage.wrap(): SimpleEnvelope = SimpleEnvelope(this.config, null)

@ -6,31 +6,31 @@ import hep.dataforge.control.api.get
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.meta.wrap
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
class HubController(
val hub: DeviceHub,
val scope: CoroutineScope
public class HubController(
public val hub: DeviceHub,
public val scope: CoroutineScope
) : Consumer, Responder {
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED)
fun messageOutput() = messageOutbox.consumeAsFlow()
public fun messageOutput(): Flow<DeviceMessage> = messageOutbox.consumeAsFlow()
fun envelopeOutput() = envelopeOutbox.consumeAsFlow()
public fun envelopeOutput(): Flow<Envelope> = envelopeOutbox.consumeAsFlow()
private val packJob = scope.launch {
while (isActive) {
@ -61,7 +61,7 @@ class HubController(
suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message)

@ -9,9 +9,8 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
suspend fun Device.flowValues(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
public suspend fun Device.flowValues(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
val listener = object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value != null) {

@ -2,28 +2,32 @@ package hep.dataforge.control.controllers
import hep.dataforge.control.base.DeviceProperty
import hep.dataforge.control.base.ReadOnlyDeviceProperty
import hep.dataforge.meta.MetaItem
import hep.dataforge.control.base.asMetaItem
import hep.dataforge.meta.*
import hep.dataforge.meta.transformations.MetaConverter
import hep.dataforge.values.Null
import hep.dataforge.values.double
import kotlin.reflect.KProperty
import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration
operator fun ReadOnlyDeviceProperty.getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*> =
public operator fun ReadOnlyDeviceProperty.getValue(thisRef: Any?, property: KProperty<*>): MetaItem<*> =
value ?: MetaItem.ValueItem(Null)
operator fun DeviceProperty.setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>) {
public operator fun DeviceProperty.setValue(thisRef: Any?, property: KProperty<*>, value: MetaItem<*>) {
this.value = value
fun <T : Any> ReadOnlyDeviceProperty.convert(metaConverter: MetaConverter<T>): ReadOnlyProperty<Any?, T> {
public fun <T : Any> ReadOnlyDeviceProperty.convert(metaConverter: MetaConverter<T>): ReadOnlyProperty<Any?, T> {
return ReadOnlyProperty { thisRef, property ->
getValue(thisRef, property).let { metaConverter.itemToObject(it) }
fun <T : Any> DeviceProperty.convert(metaConverter: MetaConverter<T>): ReadWriteProperty<Any?, T> {
public fun <T : Any> DeviceProperty.convert(metaConverter: MetaConverter<T>): ReadWriteProperty<Any?, T> {
return object : ReadWriteProperty<Any?, T> {
override fun getValue(thisRef: Any?, property: KProperty<*>): T {
return this@convert.getValue(thisRef, property).let { metaConverter.itemToObject(it) }
@ -35,14 +39,30 @@ fun <T : Any> DeviceProperty.convert(metaConverter: MetaConverter<T>): ReadWrite
fun ReadOnlyDeviceProperty.double() = convert(MetaConverter.double)
fun DeviceProperty.double() = convert(MetaConverter.double)
public fun ReadOnlyDeviceProperty.double(): ReadOnlyProperty<Any?, Double> = convert(MetaConverter.double)
public fun DeviceProperty.double(): ReadWriteProperty<Any?, Double> = convert(MetaConverter.double)
fun = convert(
fun = convert(
public fun ReadOnlyProperty<Any?, Int> = convert(
public fun ReadWriteProperty<Any?, Int> = convert(
fun ReadOnlyDeviceProperty.string() = convert(MetaConverter.string)
fun DeviceProperty.string() = convert(MetaConverter.string)
public fun ReadOnlyDeviceProperty.string(): ReadOnlyProperty<Any?, String> = convert(MetaConverter.string)
public fun DeviceProperty.string(): ReadWriteProperty<Any?, String> = convert(MetaConverter.string)
fun ReadOnlyDeviceProperty.duration(): ReadOnlyProperty<Any?, Duration> = TODO()
fun DeviceProperty.duration(): ReadWriteProperty<Any?, Duration> = TODO()
//TODO to be moved to DF
private object DurationConverter : MetaConverter<Duration> {
override fun itemToObject(item: MetaItem<*>): Duration = when (item) {
is MetaItem.NodeItem -> {
val unit: DurationUnit = item.node["unit"].enum<DurationUnit>() ?: DurationUnit.SECONDS
val value = item.node[Meta.VALUE_KEY].double ?: error("No value present for Duration")
is MetaItem.ValueItem -> item.value.double.toDuration(DurationUnit.SECONDS)
override fun objectToMetaItem(obj: Duration): MetaItem<*> = obj.toDouble(DurationUnit.SECONDS).asMetaItem()
public val MetaConverter.Companion.duration: MetaConverter<Duration> get() = DurationConverter
public fun ReadOnlyDeviceProperty.duration(): ReadOnlyProperty<Any?, Duration> = convert(DurationConverter)
public fun DeviceProperty.duration(): ReadWriteProperty<Any?, Duration> = convert(DurationConverter)

@ -1,26 +1,26 @@
package hep.dataforge.control.ports
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.context.Factory
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.CoroutineContext
interface Port: Closeable {
suspend fun send(data: ByteArray)
suspend fun receiving(): Flow<ByteArray>
fun isOpen(): Boolean
public interface Port: Closeable, ContextAware {
public suspend fun send(data: ByteArray)
public suspend fun receiving(): Flow<ByteArray>
public fun isOpen(): Boolean
public typealias PortFactory = Factory<Port>
abstract class AbstractPort(parentContext: CoroutineContext) : Port {
public abstract class AbstractPort(override val context: Context, parentContext: CoroutineContext = context.coroutineContext) : Port {
protected val scope = CoroutineScope(SupervisorJob(parentContext[Job]))
protected val logger: KLogger by lazy { KotlinLogging.logger(toString()) }
protected val scope: CoroutineScope = CoroutineScope(parentContext + SupervisorJob(parentContext[Job]))
private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(Channel.CONFLATED)
@ -87,4 +87,4 @@ abstract class AbstractPort(parentContext: CoroutineContext) : Port {
* Send UTF-8 encoded string
suspend fun Port.send(string: String) = send(string.encodeToByteArray())
public suspend fun Port.send(string: String): Unit = send(string.encodeToByteArray())

@ -1,36 +1,70 @@
package hep.dataforge.control.ports
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.context.Global
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
class PortProxy(val factory: suspend () -> Port) : Port {
* A port that could be closed multiple times and opens automatically on request
public class PortProxy(override val context: Context = Global, public val factory: suspend () -> Port) : Port, ContextAware {
private var actualPort: Port? = null
private val mutex = Mutex()
private val mutex: Mutex = Mutex()
suspend fun port(): Port{
private suspend fun port(): Port {
return mutex.withLock {
if(actualPort?.isOpen() == true){
if (actualPort?.isOpen() == true) {
} else {
factory().also {
actualPort = it
* Ensure that the port is open. If it is already open, does nothing. Otherwise, open a new port.
public suspend fun open() {
port()//ignore result
override suspend fun send(data: ByteArray) {
override suspend fun receiving(): Flow<ByteArray> = port().receiving()
override suspend fun receiving(): Flow<ByteArray> = channelFlow {
while (isActive) {
try {
//recreate port and Flow on cancel
port().receiving().collect {
} catch (t: Throwable) {
logger.warn(t){"Port read failed. Reconnecting."}
// if (t is CancellationException) {
// cancel(t)
// }
}// port().receiving()
// open by default
override fun isOpen(): Boolean = true
override fun close() {
actualPort = null

@ -10,13 +10,13 @@ import kotlinx.coroutines.sync.withLock
* The handler does not guarantee exclusive access to the port so the user mush ensure that no other controller handles port at the moment.
class SynchronousPortHandler(val port: Port) {
public class SynchronousPortHandler(public val port: Port) {
private val mutex = Mutex()
* Send a single message and wait for the flow of respond messages.
suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R {
public suspend fun <R> respond(data: ByteArray, transform: suspend Flow<ByteArray>.() -> R): R {
return mutex.withLock {
@ -27,7 +27,7 @@ class SynchronousPortHandler(val port: Port) {
* Send request and read incoming data blocks until the delimiter is encountered
suspend fun SynchronousPortHandler.respondWithDelimiter(data: ByteArray, delimiter: ByteArray): ByteArray {
public suspend fun SynchronousPortHandler.respondWithDelimiter(data: ByteArray, delimiter: ByteArray): ByteArray {
return respond(data) {

@ -1,9 +1,15 @@
package hep.dataforge.control.ports
import hep.dataforge.context.Context
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.Dispatchers
@ -12,16 +18,17 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
class KtorTcpPort internal constructor(
parentContext: CoroutineContext,
val host: String,
val port: Int
) : AbstractPort(parentContext), AutoCloseable {
public class KtorTcpPort internal constructor(
context: Context,
public val host: String,
public val port: Int,
parentContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, parentContext), AutoCloseable {
override fun toString() = "port[tcp:$host:$port]"
override fun toString(): String = "port[tcp:$host:$port]"
private val futureSocket = scope.async {
aSocket(ActorSelectorManager(Dispatchers.IO)).tcp().connect(InetSocketAddress(host, port))
@ -50,9 +57,20 @@ class KtorTcpPort internal constructor(
companion object {
suspend fun open(host: String, port: Int): KtorTcpPort {
return KtorTcpPort(coroutineContext, host, port)
public companion object: PortFactory {
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): KtorTcpPort {
return KtorTcpPort(context, host, port, coroutineContext)
override fun invoke(meta: Meta, context: Context): Port {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return open(context, host, port)

@ -1,11 +1,15 @@
package hep.dataforge.control.ports
import hep.dataforge.context.Context
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import kotlinx.coroutines.*
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
@ -15,11 +19,12 @@ internal fun ByteBuffer.readArray(limit: Int = limit()): ByteArray {
return response
class TcpPort private constructor(
parentContext: CoroutineContext,
val host: String,
val port: Int
) : AbstractPort(parentContext), AutoCloseable {
public class TcpPort private constructor(
context: Context,
public val host: String,
public val port: Int,
parentContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, parentContext), AutoCloseable {
override fun toString(): String = "port[tcp:$host:$port]"
@ -32,7 +37,7 @@ class TcpPort private constructor(
* A handler to await port connection
val startJob: Job get() = futureChannel
public val startJob: Job get() = futureChannel
private val listenerJob = this.scope.launch {
val channel = futureChannel.await()
@ -61,9 +66,20 @@ class TcpPort private constructor(
companion object{
suspend fun open(host: String, port: Int): TcpPort{
return TcpPort(coroutineContext, host, port)
public companion object : PortFactory {
public fun open(
context: Context,
host: String,
port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
): TcpPort {
return TcpPort(context, host, port, coroutineContext)
override fun invoke(meta: Meta, context: Context): Port {
val host = meta["host"].string ?: "localhost"
val port = meta["port"].int ?: error("Port value for TCP port is not defined in $meta")
return open(context, host, port)

@ -1,5 +1,6 @@
package hep.dataforge.control.ports
import hep.dataforge.context.Global
@ -53,9 +54,9 @@ class TcpPortTest {
fun testWithEchoServer() {
try {
runBlocking {
val server = launchEchoServer(22188)
val port ="localhost", 22188)
val port =, "localhost", 22188)
val logJob = launch {
port.receiving().collect {
@ -78,9 +79,9 @@ class TcpPortTest {
fun testKtorWithEchoServer() {
try {
runBlocking {
val server = launchEchoServer(22188)
val port ="localhost", 22188)
val port =,"localhost", 22188)
val logJob = launch {
port.receiving().collect {

@ -1,17 +1,25 @@
package hep.dataforge.control.serial
import hep.dataforge.context.Context
import hep.dataforge.control.ports.AbstractPort
import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortFactory
import hep.dataforge.meta.Meta
import hep.dataforge.meta.string
import jssc.SerialPort.*
import jssc.SerialPortEventListener
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import jssc.SerialPort as JSSCPort
* COM/USB port
public class SerialPort private constructor(parentContext: CoroutineContext, private val jssc: JSSCPort) :
AbstractPort(parentContext) {
public class SerialPort private constructor(
context: Context,
private val jssc: JSSCPort,
parentContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, parentContext) {
override fun toString(): String = "port[${jssc.portName}]"
@ -48,23 +56,34 @@ public class SerialPort private constructor(parentContext: CoroutineContext, pri
public companion object {
public companion object : PortFactory {
* Construct ComPort with given parameters
public suspend fun open(
public fun open(
context: Context,
portName: String,
baudRate: Int = BAUDRATE_9600,
dataBits: Int = DATABITS_8,
stopBits: Int = STOPBITS_1,
parity: Int = PARITY_NONE,
coroutineContext: CoroutineContext = context.coroutineContext,
): SerialPort {
val jssc = JSSCPort(portName).apply {
setParams(baudRate, dataBits, stopBits, parity)
return SerialPort(coroutineContext, jssc)
return SerialPort(context, jssc, coroutineContext)
override fun invoke(meta: Meta, context: Context): Port {
val name by meta.string { error("Serial port name not defined") }
val baudRate by
val dataBits by
val stopBits by
val parity by
return open(context, name, baudRate, dataBits, stopBits, parity)

@ -17,12 +17,12 @@ import kotlin.time.ExperimentalTime
import kotlin.time.seconds
class DemoDevice(parentScope: CoroutineScope) : DeviceBase() {
class DemoDevice(context: Context) : DeviceBase(context) {
private val executor = Executors.newSingleThreadExecutor()
override val scope: CoroutineScope = CoroutineScope(
parentScope.coroutineContext + executor.asCoroutineDispatcher() + Job(parentScope.coroutineContext[Job])
context.coroutineContext + executor.asCoroutineDispatcher() + Job(context.coroutineContext[Job])
val timeScale: DeviceProperty by writingVirtual(5000.0.asValue())

@ -1,6 +1,8 @@
package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.context.Context
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.control.base.*
import hep.dataforge.control.controllers.duration
import hep.dataforge.control.ports.Port
@ -12,6 +14,7 @@ import hep.dataforge.names.NameToken
import hep.dataforge.values.Null
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
@ -22,13 +25,13 @@ import kotlin.time.Duration
public class PiMotionMasterDevice(
parentScope: CoroutineScope,
context: Context,
axes: List<String>,
private val portFactory: suspend (MetaItem<*>?) -> Port,
) : DeviceBase(), DeviceHub {
) : DeviceBase(context), DeviceHub {
override val scope: CoroutineScope = CoroutineScope(
parentScope.coroutineContext + Job(parentScope.coroutineContext[Job])
context.coroutineContext + SupervisorJob(context.coroutineContext[Job])
public val port: DeviceProperty by writingVirtual(Null) {
@ -92,39 +95,90 @@ public class PiMotionMasterDevice(
public inner class Axis(public val axisId: String) : DeviceBase() {
override val scope: CoroutineScope get() = this@PiMotionMasterDevice.scope
public val enabled: DeviceProperty by writingBoolean<Axis>(
getter = {
val eax = requestAndParse("EAX?", axisId)[axisId]?.toIntOrNull()
?: error("Malformed EAX response. Should include integer value for $axisId")
eax != 0
public val stop: Action by acting(
descriptorBuilder = {
info = "Stop all axis"
setter = { _, newValue ->
val value = if (newValue) {
action = { send("STP") }
public inner class Axis(public val axisId: String) : DeviceBase(context) {
override val scope: CoroutineScope get() = this@PiMotionMasterDevice.scope
private suspend fun readAxisBoolean(command: String): Boolean =
requestAndParse(command, axisId)[axisId]?.toIntOrNull()
?: error("Malformed $command response. Should include integer value for $axisId") != 0
private suspend fun writeAxisBoolean(command: String, value: Boolean): Boolean {
val boolean = if (value) {
} else {
send("EAX", axisId, value)
send(command, axisId, boolean)
return value
private fun axisBooleanProperty(command: String, descriptorBuilder: PropertyDescriptor.() -> Unit = {}) =
getter = { readAxisBoolean("$command?") },
setter = { _, newValue -> writeAxisBoolean(command, newValue) },
descriptorBuilder = descriptorBuilder
private fun axisNumberProperty(command: String, descriptorBuilder: PropertyDescriptor.() -> Unit = {}) =
getter = {
requestAndParse("$command?", axisId)[axisId]?.toDoubleOrNull()
?: error("Malformed $command response. Should include float value for $axisId")
setter = { _, newValue ->
send(command, axisId, newValue.toString())
descriptorBuilder = descriptorBuilder
public val enabled: DeviceProperty by axisBooleanProperty("EAX") {
info = "Motor enable state."
public val halt: Action by acting {
send("HLT", axisId)
public val targetPosition: DeviceProperty by writingDouble<Axis>(
getter = {
requestAndParse("MOV?", axisId)[axisId]?.toDoubleOrNull()
?: error("Malformed MOV response. Should include float value for $axisId")
public val targetPosition: DeviceProperty by axisNumberProperty("MOV") {
info = """
Sets a new absolute target position for the specified axis.
Servo mode must be switched on for the commanded axis prior to using this command (closed-loop operation).
public val onTarget: ReadOnlyDeviceProperty by readingBoolean(
descriptorBuilder = {
info = "Queries the on-target state of the specified axis."
setter = { _, newValue ->
send("MOV", axisId, newValue.toString())
getter = {
public val position: DeviceProperty by axisNumberProperty("POS") {
info = "The current axis position."
public val openLoopTarget: DeviceProperty by axisNumberProperty("OMA") {
info = "Position for open-loop operation."
public val closedLoop: DeviceProperty by axisBooleanProperty("SVO") {
info = "Servo closed loop mode"
public val velocity: DeviceProperty by axisNumberProperty("VEL") {
info = "Velocity value for closed-loop operation"
override val devices: Map<NameToken, Axis> = axes.associate { NameToken(it) to Axis(it) }

@ -1,6 +1,6 @@
pluginManagement {
val kotlinVersion = "1.4.0"
val toolsVersion = "0.6.0-dev-4"
val kotlinVersion = "1.4.10"
val toolsVersion = "0.6.0"
repositories {