diff --git a/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceState.kt b/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceState.kt index ed7027a..a9cff51 100644 --- a/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceState.kt +++ b/controls-constructor/src/commonMain/kotlin/space/kscience/controls/constructor/DeviceState.kt @@ -40,7 +40,7 @@ public interface MutableDeviceState : DeviceState { public var MutableDeviceState.valueAsMeta: Meta get() = converter.objectToMeta(value) set(arg) { - value = converter.metaToObject(arg) ?: error("Conversion for meta $arg to property type with $converter failed") + value = converter.metaToObject(arg) } /** diff --git a/controls-core/build.gradle.kts b/controls-core/build.gradle.kts index bbe32eb..859b067 100644 --- a/controls-core/build.gradle.kts +++ b/controls-core/build.gradle.kts @@ -20,10 +20,14 @@ kscience { json() } useContextReceivers() - dependencies { + commonMain { api("space.kscience:dataforge-io:$dataforgeVersion") api(spclibs.kotlinx.datetime) } + + jvmTest{ + implementation(spclibs.logback.classic) + } } diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt index 83044ec..b17de92 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/Port.kt @@ -38,7 +38,7 @@ public abstract class AbstractPort( protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job])) private val outgoing = Channel(100) - private val incoming = Channel(Channel.CONFLATED) + private val incoming = Channel(100) init { scope.coroutineContext[Job]?.invokeOnCompletion { @@ -88,7 +88,6 @@ public abstract class AbstractPort( override fun close() { outgoing.close() incoming.close() - sendJob.cancel() scope.cancel() } diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt new file mode 100644 index 0000000..dbe7256 --- /dev/null +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/ioExtensions.kt @@ -0,0 +1,5 @@ +package space.kscience.controls.ports + +import space.kscience.dataforge.io.Binary + +public fun Binary.readShort(position: Int): Short = read(position) { readShort() } \ No newline at end of file diff --git a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt index 0f2b9bd..732e5d4 100644 --- a/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt +++ b/controls-core/src/commonMain/kotlin/space/kscience/controls/ports/phrases.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.transform import kotlinx.io.Buffer +import kotlinx.io.readByteArray /** * Transform byte fragments into complete phrases using given delimiter. Not thread safe. @@ -27,9 +28,8 @@ public fun Flow.withDelimiter(delimiter: ByteArray): Flow matcherPosition++ if (matcherPosition == delimiter.size) { //full match achieved, sending result - val bytes = output.build() - emit(bytes.readBytes()) - output.reset() + emit(output.readByteArray()) + output.clear() matcherPosition = 0 } } else if (matcherPosition > 0) { diff --git a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt index d7983f2..1e187b6 100644 --- a/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt +++ b/controls-core/src/jvmMain/kotlin/space/kscience/controls/ports/ChannelPort.kt @@ -8,6 +8,7 @@ import space.kscience.dataforge.context.logger import space.kscience.dataforge.meta.* import java.net.InetSocketAddress import java.nio.ByteBuffer +import java.nio.channels.AsynchronousCloseException import java.nio.channels.ByteChannel import java.nio.channels.DatagramChannel import java.nio.channels.SocketChannel @@ -30,7 +31,7 @@ public class ChannelPort( channelBuilder: suspend () -> ByteChannel, ) : AbstractPort(context, coroutineContext), AutoCloseable { - private val futureChannel: Deferred = this.scope.async(Dispatchers.IO) { + private val futureChannel: Deferred = scope.async(Dispatchers.IO) { channelBuilder() } @@ -39,10 +40,10 @@ public class ChannelPort( */ public val startJob: Job get() = futureChannel - private val listenerJob = this.scope.launch(Dispatchers.IO) { + private val listenerJob = scope.launch(Dispatchers.IO) { val channel = futureChannel.await() val buffer = ByteBuffer.allocate(1024) - while (isActive) { + while (isActive && channel.isOpen) { try { val num = channel.read(buffer) if (num > 0) { @@ -50,8 +51,12 @@ public class ChannelPort( } if (num < 0) cancel("The input channel is exhausted") } catch (ex: Exception) { - logger.error(ex) { "Channel read error" } - delay(1000) + if(ex is AsynchronousCloseException){ + logger.info { "Channel closed" } + } else { + logger.error(ex) { "Channel read error, retrying in 1 second" } + delay(1000) + } } } } @@ -62,11 +67,8 @@ public class ChannelPort( @OptIn(ExperimentalCoroutinesApi::class) override fun close() { - listenerJob.cancel() if (futureChannel.isCompleted) { futureChannel.getCompleted().close() - } else { - futureChannel.cancel() } super.close() } diff --git a/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/PortIOTest.kt b/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/PortIOTest.kt index bdf6891..3f92500 100644 --- a/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/PortIOTest.kt +++ b/controls-core/src/jvmTest/kotlin/space/kscience/controls/ports/PortIOTest.kt @@ -1,25 +1,46 @@ package space.kscience.controls.ports -import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.flow.* import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test +import space.kscience.dataforge.context.Global import kotlin.test.assertEquals -internal class PortIOTest{ +internal class PortIOTest { @Test - fun testDelimiteredByteArrayFlow(){ - val flow = flowOf("bb?b","ddd?",":defgb?:ddf","34fb?:--").map { it.encodeToByteArray() } + fun testDelimiteredByteArrayFlow() { + val flow = flowOf("bb?b", "ddd?", ":defgb?:ddf", "34fb?:--").map { it.encodeToByteArray() } val chunked = flow.withDelimiter("?:".encodeToByteArray()) runBlocking { val result = chunked.toList() assertEquals(3, result.size) - assertEquals("bb?bddd?:",result[0].decodeToString()) + assertEquals("bb?bddd?:", result[0].decodeToString()) assertEquals("defgb?:", result[1].decodeToString()) assertEquals("ddf34fb?:", result[2].decodeToString()) } } + + @Test + fun testUdpCommunication() = runTest { + val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812) + val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811) + + repeat(10) { + sender.send("Line number $it\n") + } + + val res = receiver + .receiving() + .onEach { println("ARRAY: ${it.decodeToString()}") } + .withStringDelimiter("\n") + .onEach { println("LINE: $it") } + .take(10).toList() + + assertEquals("Line number 3", res[3].trim()) + receiver.close() + sender.close() + } } \ No newline at end of file diff --git a/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/DeviceProcessImage.kt b/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/DeviceProcessImage.kt index 8f4a2b4..3b3f91b 100644 --- a/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/DeviceProcessImage.kt +++ b/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/DeviceProcessImage.kt @@ -1,12 +1,14 @@ package space.kscience.controls.modbus import com.ghgande.j2mod.modbus.procimg.* -import io.ktor.utils.io.core.* import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.io.Buffer import space.kscience.controls.api.Device +import space.kscience.controls.ports.readShort import space.kscience.controls.spec.* +import space.kscience.dataforge.io.Binary public class DeviceProcessImageBuilder internal constructor( @@ -106,11 +108,11 @@ public class DeviceProcessImageBuilder internal constructor( } device.useProperty(propertySpec) { value -> - val packet = buildPacket { - key.format.writeObject(this, value) - }.readByteBuffer() + val binary = Binary { + key.format.writeTo(this, value) + } registers.forEachIndexed { index, register -> - register.setValue(packet.getShort(index * 2)) + register.setValue(binary.readShort(index * 2)) } } } @@ -118,7 +120,7 @@ public class DeviceProcessImageBuilder internal constructor( /** * Trigger [block] if one of register changes. */ - private fun List.onChange(block: suspend (ByteReadPacket) -> Unit) { + private fun List.onChange(block: suspend (Buffer) -> Unit) { var ready = false forEach { register -> @@ -128,7 +130,7 @@ public class DeviceProcessImageBuilder internal constructor( } device.launch { - val builder = BytePacketBuilder() + val builder = Buffer() while (isActive) { delay(1) if (ready) { @@ -136,7 +138,7 @@ public class DeviceProcessImageBuilder internal constructor( forEach { value -> writeShort(value.toShort()) } - }.build() + } block(packet) ready = false } @@ -154,15 +156,15 @@ public class DeviceProcessImageBuilder internal constructor( } registers.onChange { packet -> - device.write(propertySpec, key.format.readObject(packet)) + device.write(propertySpec, key.format.readFrom(packet)) } device.useProperty(propertySpec) { value -> - val packet = buildPacket { - key.format.writeObject(this, value) - }.readByteBuffer() + val binary = Binary { + key.format.writeTo(this, value) + } registers.forEachIndexed { index, observableRegister -> - observableRegister.setValue(packet.getShort(index * 2)) + observableRegister.setValue(binary.readShort(index * 2)) } } } @@ -212,7 +214,7 @@ public class DeviceProcessImageBuilder internal constructor( registers.onChange { packet -> device.launch { - device.action(key.format.readObject(packet)) + device.action(key.format.readFrom(packet)) } } diff --git a/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/ModbusDevice.kt b/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/ModbusDevice.kt index 7297616..2585302 100644 --- a/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/ModbusDevice.kt +++ b/controls-modbus/src/main/kotlin/space/kscience/controls/modbus/ModbusDevice.kt @@ -5,11 +5,10 @@ import com.ghgande.j2mod.modbus.procimg.InputRegister import com.ghgande.j2mod.modbus.procimg.Register import com.ghgande.j2mod.modbus.procimg.SimpleInputRegister import com.ghgande.j2mod.modbus.util.BitVector -import io.ktor.utils.io.core.ByteReadPacket -import io.ktor.utils.io.core.buildPacket -import io.ktor.utils.io.core.readByteBuffer -import io.ktor.utils.io.core.writeShort +import kotlinx.io.Buffer import space.kscience.controls.api.Device +import space.kscience.dataforge.io.Buffer +import space.kscience.dataforge.io.ByteArray import java.nio.ByteBuffer import kotlin.properties.ReadWriteProperty import kotlin.reflect.KProperty @@ -45,7 +44,7 @@ public interface ModbusDevice : Device { public operator fun ModbusRegistryKey.InputRange.getValue(thisRef: Any?, property: KProperty<*>): T { val packet = readInputRegistersToPacket(address, count) - return format.readObject(packet) + return format.readFrom(packet) } @@ -62,7 +61,7 @@ public interface ModbusDevice : Device { public operator fun ModbusRegistryKey.HoldingRange.getValue(thisRef: Any?, property: KProperty<*>): T { val packet = readHoldingRegistersToPacket(address, count) - return format.readObject(packet) + return format.readFrom(packet) } public operator fun ModbusRegistryKey.HoldingRange.setValue( @@ -70,9 +69,9 @@ public interface ModbusDevice : Device { property: KProperty<*>, value: T, ) { - val buffer = buildPacket { - format.writeObject(this, value) - }.readByteBuffer() + val buffer = ByteArray { + format.writeTo(this, value) + } writeHoldingRegisters(address, buffer) } @@ -122,7 +121,7 @@ private fun Array.toBuffer(): ByteBuffer { return buffer } -private fun Array.toPacket(): ByteReadPacket = buildPacket { +private fun Array.toPacket(): Buffer = Buffer { forEach { value -> writeShort(value.toShort()) } @@ -131,7 +130,7 @@ private fun Array.toPacket(): ByteReadPacket = buildPacket { public fun ModbusDevice.readInputRegistersToBuffer(address: Int, count: Int): ByteBuffer = master.readInputRegisters(unitId, address, count).toBuffer() -public fun ModbusDevice.readInputRegistersToPacket(address: Int, count: Int): ByteReadPacket = +public fun ModbusDevice.readInputRegistersToPacket(address: Int, count: Int): Buffer = master.readInputRegisters(unitId, address, count).toPacket() public fun ModbusDevice.readDoubleInput(address: Int): Double = @@ -151,7 +150,7 @@ public fun ModbusDevice.readHoldingRegisters(address: Int, count: Int): List = object : ReadWriteProperty { diff --git a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/MetaBsdParser.kt b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/MetaBsdParser.kt index b760184..27330bb 100644 --- a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/MetaBsdParser.kt +++ b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/MetaBsdParser.kt @@ -107,7 +107,7 @@ internal class MetaStructureCodec( override fun createStructure(name: String, members: LinkedHashMap): Meta = Meta { members.forEach { (property: String, value: Meta?) -> - setMeta(Name.parse(property), value) + set(Name.parse(property), value) } } diff --git a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDevice.kt b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDevice.kt index dd83e58..51e6032 100644 --- a/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDevice.kt +++ b/controls-opcua/src/main/kotlin/space/kscience/controls/opcua/client/OpcUaDevice.kt @@ -43,7 +43,7 @@ public suspend inline fun OpcUaDevice.readOpcWithTime( else -> error("Incompatible OPC property value $content") } - val res: T = converter.metaToObject(meta) ?: error("Meta $meta could not be converted to ${T::class}") + val res: T = converter.metaToObject(meta) return res to time } diff --git a/controls-vision/src/jsMain/kotlin/ControlsVisionPlugin.js.kt b/controls-vision/src/jsMain/kotlin/ControlsVisionPlugin.js.kt index 0d928ac..b34cc44 100644 --- a/controls-vision/src/jsMain/kotlin/ControlsVisionPlugin.js.kt +++ b/controls-vision/src/jsMain/kotlin/ControlsVisionPlugin.js.kt @@ -9,6 +9,7 @@ import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name import space.kscience.visionforge.ElementVisionRenderer import space.kscience.visionforge.Vision +import space.kscience.visionforge.VisionClient import space.kscience.visionforge.VisionPlugin public actual class ControlVisionPlugin : VisionPlugin(), ElementVisionRenderer { @@ -20,7 +21,7 @@ public actual class ControlVisionPlugin : VisionPlugin(), ElementVisionRenderer TODO("Not yet implemented") } - override fun render(element: Element, name: Name, vision: Vision, meta: Meta) { + override fun render(element: Element, client: VisionClient, name: Name, vision: Vision, meta: Meta) { TODO("Not yet implemented") } diff --git a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt index eaa1bff..067792c 100644 --- a/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt +++ b/demo/all-things/src/main/kotlin/space/kscience/controls/demo/DemoDevice.kt @@ -42,7 +42,7 @@ class DemoDevice(context: Context, meta: Meta) : DeviceBySpec(Compa // register virtual properties based on actual object state val timeScale by mutableProperty(MetaConverter.double, IDemoDevice::timeScaleState) { metaDescriptor { - type(ValueType.NUMBER) + valueType(ValueType.NUMBER) } description = "Real to virtual time scale" } diff --git a/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCar.kt b/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCar.kt index 8ccb8a4..05bb2f0 100644 --- a/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCar.kt +++ b/demo/car/src/main/kotlin/space/kscience/controls/demo/car/VirtualCar.kt @@ -17,6 +17,8 @@ import space.kscience.dataforge.meta.double import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.transformations.MetaConverter import kotlin.math.pow +import kotlin.reflect.KType +import kotlin.reflect.typeOf import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlin.time.ExperimentalTime @@ -28,7 +30,10 @@ data class Vector2D(var x: Double = 0.0, var y: Double = 0.0) : MetaRepr { operator fun div(arg: Double): Vector2D = Vector2D(x / arg, y / arg) companion object CoordinatesMetaConverter : MetaConverter { - override fun metaToObject(meta: Meta): Vector2D = Vector2D( + + override val type: KType = typeOf() + + override fun metaToObjectOrNull(meta: Meta): Vector2D = Vector2D( meta["x"].double ?: 0.0, meta["y"].double ?: 0.0 ) @@ -40,7 +45,8 @@ data class Vector2D(var x: Double = 0.0, var y: Double = 0.0) : MetaRepr { } } -open class VirtualCar(context: Context, meta: Meta) : DeviceBySpec(IVirtualCar, context, meta), IVirtualCar { +open class VirtualCar(context: Context, meta: Meta) : DeviceBySpec(IVirtualCar, context, meta), + IVirtualCar { private val clock = context.clock private val timeScale = 1e-3 diff --git a/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/NullableStringMetaConverter.kt b/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/NullableStringMetaConverter.kt deleted file mode 100644 index 40c20ed..0000000 --- a/demo/mks-pdr900/src/main/kotlin/center/sciprog/devices/mks/NullableStringMetaConverter.kt +++ /dev/null @@ -1,10 +0,0 @@ -package center.sciprog.devices.mks - -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.string -import space.kscience.dataforge.meta.transformations.MetaConverter - -object NullableStringMetaConverter : MetaConverter { - override fun metaToObject(meta: Meta): String? = meta.string - override fun objectToMeta(obj: String?): Meta = Meta {} -} \ No newline at end of file diff --git a/magix/magix-api/build.gradle.kts b/magix/magix-api/build.gradle.kts index 159989d..68151c3 100644 --- a/magix/magix-api/build.gradle.kts +++ b/magix/magix-api/build.gradle.kts @@ -17,6 +17,10 @@ kscience { useSerialization{ json() } + + commonMain{ + implementation(spclibs.atomicfu) + } } readme{