Finish migration to kotlinx-io

This commit is contained in:
Alexander Nozik 2023-12-13 12:29:06 +03:00
parent cf129b6242
commit fb03fcc982
16 changed files with 104 additions and 64 deletions

View File

@ -40,7 +40,7 @@ public interface MutableDeviceState<T> : DeviceState<T> {
public var <T : Any> MutableDeviceState<T>.valueAsMeta: Meta public var <T : Any> MutableDeviceState<T>.valueAsMeta: Meta
get() = converter.objectToMeta(value) get() = converter.objectToMeta(value)
set(arg) { set(arg) {
value = converter.metaToObject(arg) ?: error("Conversion for meta $arg to property type with $converter failed") value = converter.metaToObject(arg)
} }
/** /**

View File

@ -20,10 +20,14 @@ kscience {
json() json()
} }
useContextReceivers() useContextReceivers()
dependencies { commonMain {
api("space.kscience:dataforge-io:$dataforgeVersion") api("space.kscience:dataforge-io:$dataforgeVersion")
api(spclibs.kotlinx.datetime) api(spclibs.kotlinx.datetime)
} }
jvmTest{
implementation(spclibs.logback.classic)
}
} }

View File

@ -38,7 +38,7 @@ public abstract class AbstractPort(
protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job])) protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job]))
private val outgoing = Channel<ByteArray>(100) private val outgoing = Channel<ByteArray>(100)
private val incoming = Channel<ByteArray>(Channel.CONFLATED) private val incoming = Channel<ByteArray>(100)
init { init {
scope.coroutineContext[Job]?.invokeOnCompletion { scope.coroutineContext[Job]?.invokeOnCompletion {
@ -88,7 +88,6 @@ public abstract class AbstractPort(
override fun close() { override fun close() {
outgoing.close() outgoing.close()
incoming.close() incoming.close()
sendJob.cancel()
scope.cancel() scope.cancel()
} }

View File

@ -0,0 +1,5 @@
package space.kscience.controls.ports
import space.kscience.dataforge.io.Binary
public fun Binary.readShort(position: Int): Short = read(position) { readShort() }

View File

@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.transform import kotlinx.coroutines.flow.transform
import kotlinx.io.Buffer import kotlinx.io.Buffer
import kotlinx.io.readByteArray
/** /**
* Transform byte fragments into complete phrases using given delimiter. Not thread safe. * Transform byte fragments into complete phrases using given delimiter. Not thread safe.
@ -27,9 +28,8 @@ public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray): Flow<ByteArray>
matcherPosition++ matcherPosition++
if (matcherPosition == delimiter.size) { if (matcherPosition == delimiter.size) {
//full match achieved, sending result //full match achieved, sending result
val bytes = output.build() emit(output.readByteArray())
emit(bytes.readBytes()) output.clear()
output.reset()
matcherPosition = 0 matcherPosition = 0
} }
} else if (matcherPosition > 0) { } else if (matcherPosition > 0) {

View File

@ -8,6 +8,7 @@ import space.kscience.dataforge.context.logger
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.channels.AsynchronousCloseException
import java.nio.channels.ByteChannel import java.nio.channels.ByteChannel
import java.nio.channels.DatagramChannel import java.nio.channels.DatagramChannel
import java.nio.channels.SocketChannel import java.nio.channels.SocketChannel
@ -30,7 +31,7 @@ public class ChannelPort(
channelBuilder: suspend () -> ByteChannel, channelBuilder: suspend () -> ByteChannel,
) : AbstractPort(context, coroutineContext), AutoCloseable { ) : AbstractPort(context, coroutineContext), AutoCloseable {
private val futureChannel: Deferred<ByteChannel> = this.scope.async(Dispatchers.IO) { private val futureChannel: Deferred<ByteChannel> = scope.async(Dispatchers.IO) {
channelBuilder() channelBuilder()
} }
@ -39,10 +40,10 @@ public class ChannelPort(
*/ */
public val startJob: Job get() = futureChannel public val startJob: Job get() = futureChannel
private val listenerJob = this.scope.launch(Dispatchers.IO) { private val listenerJob = scope.launch(Dispatchers.IO) {
val channel = futureChannel.await() val channel = futureChannel.await()
val buffer = ByteBuffer.allocate(1024) val buffer = ByteBuffer.allocate(1024)
while (isActive) { while (isActive && channel.isOpen) {
try { try {
val num = channel.read(buffer) val num = channel.read(buffer)
if (num > 0) { if (num > 0) {
@ -50,8 +51,12 @@ public class ChannelPort(
} }
if (num < 0) cancel("The input channel is exhausted") if (num < 0) cancel("The input channel is exhausted")
} catch (ex: Exception) { } catch (ex: Exception) {
logger.error(ex) { "Channel read error" } if(ex is AsynchronousCloseException){
delay(1000) logger.info { "Channel closed" }
} else {
logger.error(ex) { "Channel read error, retrying in 1 second" }
delay(1000)
}
} }
} }
} }
@ -62,11 +67,8 @@ public class ChannelPort(
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
override fun close() { override fun close() {
listenerJob.cancel()
if (futureChannel.isCompleted) { if (futureChannel.isCompleted) {
futureChannel.getCompleted().close() futureChannel.getCompleted().close()
} else {
futureChannel.cancel()
} }
super.close() super.close()
} }

View File

@ -1,25 +1,46 @@
package space.kscience.controls.ports package space.kscience.controls.ports
import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.context.Global
import kotlin.test.assertEquals import kotlin.test.assertEquals
internal class PortIOTest{ internal class PortIOTest {
@Test @Test
fun testDelimiteredByteArrayFlow(){ fun testDelimiteredByteArrayFlow() {
val flow = flowOf("bb?b","ddd?",":defgb?:ddf","34fb?:--").map { it.encodeToByteArray() } val flow = flowOf("bb?b", "ddd?", ":defgb?:ddf", "34fb?:--").map { it.encodeToByteArray() }
val chunked = flow.withDelimiter("?:".encodeToByteArray()) val chunked = flow.withDelimiter("?:".encodeToByteArray())
runBlocking { runBlocking {
val result = chunked.toList() val result = chunked.toList()
assertEquals(3, result.size) assertEquals(3, result.size)
assertEquals("bb?bddd?:",result[0].decodeToString()) assertEquals("bb?bddd?:", result[0].decodeToString())
assertEquals("defgb?:", result[1].decodeToString()) assertEquals("defgb?:", result[1].decodeToString())
assertEquals("ddf34fb?:", result[2].decodeToString()) assertEquals("ddf34fb?:", result[2].decodeToString())
} }
} }
@Test
fun testUdpCommunication() = runTest {
val receiver = UdpPort.open(Global, "localhost", 8811, localPort = 8812)
val sender = UdpPort.open(Global, "localhost", 8812, localPort = 8811)
repeat(10) {
sender.send("Line number $it\n")
}
val res = receiver
.receiving()
.onEach { println("ARRAY: ${it.decodeToString()}") }
.withStringDelimiter("\n")
.onEach { println("LINE: $it") }
.take(10).toList()
assertEquals("Line number 3", res[3].trim())
receiver.close()
sender.close()
}
} }

View File

@ -1,12 +1,14 @@
package space.kscience.controls.modbus package space.kscience.controls.modbus
import com.ghgande.j2mod.modbus.procimg.* import com.ghgande.j2mod.modbus.procimg.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.io.Buffer
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.controls.ports.readShort
import space.kscience.controls.spec.* import space.kscience.controls.spec.*
import space.kscience.dataforge.io.Binary
public class DeviceProcessImageBuilder<D : Device> internal constructor( public class DeviceProcessImageBuilder<D : Device> internal constructor(
@ -106,11 +108,11 @@ public class DeviceProcessImageBuilder<D : Device> internal constructor(
} }
device.useProperty(propertySpec) { value -> device.useProperty(propertySpec) { value ->
val packet = buildPacket { val binary = Binary {
key.format.writeObject(this, value) key.format.writeTo(this, value)
}.readByteBuffer() }
registers.forEachIndexed { index, register -> registers.forEachIndexed { index, register ->
register.setValue(packet.getShort(index * 2)) register.setValue(binary.readShort(index * 2))
} }
} }
} }
@ -118,7 +120,7 @@ public class DeviceProcessImageBuilder<D : Device> internal constructor(
/** /**
* Trigger [block] if one of register changes. * Trigger [block] if one of register changes.
*/ */
private fun List<ObservableRegister>.onChange(block: suspend (ByteReadPacket) -> Unit) { private fun List<ObservableRegister>.onChange(block: suspend (Buffer) -> Unit) {
var ready = false var ready = false
forEach { register -> forEach { register ->
@ -128,7 +130,7 @@ public class DeviceProcessImageBuilder<D : Device> internal constructor(
} }
device.launch { device.launch {
val builder = BytePacketBuilder() val builder = Buffer()
while (isActive) { while (isActive) {
delay(1) delay(1)
if (ready) { if (ready) {
@ -136,7 +138,7 @@ public class DeviceProcessImageBuilder<D : Device> internal constructor(
forEach { value -> forEach { value ->
writeShort(value.toShort()) writeShort(value.toShort())
} }
}.build() }
block(packet) block(packet)
ready = false ready = false
} }
@ -154,15 +156,15 @@ public class DeviceProcessImageBuilder<D : Device> internal constructor(
} }
registers.onChange { packet -> registers.onChange { packet ->
device.write(propertySpec, key.format.readObject(packet)) device.write(propertySpec, key.format.readFrom(packet))
} }
device.useProperty(propertySpec) { value -> device.useProperty(propertySpec) { value ->
val packet = buildPacket { val binary = Binary {
key.format.writeObject(this, value) key.format.writeTo(this, value)
}.readByteBuffer() }
registers.forEachIndexed { index, observableRegister -> registers.forEachIndexed { index, observableRegister ->
observableRegister.setValue(packet.getShort(index * 2)) observableRegister.setValue(binary.readShort(index * 2))
} }
} }
} }
@ -212,7 +214,7 @@ public class DeviceProcessImageBuilder<D : Device> internal constructor(
registers.onChange { packet -> registers.onChange { packet ->
device.launch { device.launch {
device.action(key.format.readObject(packet)) device.action(key.format.readFrom(packet))
} }
} }

View File

@ -5,11 +5,10 @@ import com.ghgande.j2mod.modbus.procimg.InputRegister
import com.ghgande.j2mod.modbus.procimg.Register import com.ghgande.j2mod.modbus.procimg.Register
import com.ghgande.j2mod.modbus.procimg.SimpleInputRegister import com.ghgande.j2mod.modbus.procimg.SimpleInputRegister
import com.ghgande.j2mod.modbus.util.BitVector import com.ghgande.j2mod.modbus.util.BitVector
import io.ktor.utils.io.core.ByteReadPacket import kotlinx.io.Buffer
import io.ktor.utils.io.core.buildPacket
import io.ktor.utils.io.core.readByteBuffer
import io.ktor.utils.io.core.writeShort
import space.kscience.controls.api.Device import space.kscience.controls.api.Device
import space.kscience.dataforge.io.Buffer
import space.kscience.dataforge.io.ByteArray
import java.nio.ByteBuffer import java.nio.ByteBuffer
import kotlin.properties.ReadWriteProperty import kotlin.properties.ReadWriteProperty
import kotlin.reflect.KProperty import kotlin.reflect.KProperty
@ -45,7 +44,7 @@ public interface ModbusDevice : Device {
public operator fun <T> ModbusRegistryKey.InputRange<T>.getValue(thisRef: Any?, property: KProperty<*>): T { public operator fun <T> ModbusRegistryKey.InputRange<T>.getValue(thisRef: Any?, property: KProperty<*>): T {
val packet = readInputRegistersToPacket(address, count) val packet = readInputRegistersToPacket(address, count)
return format.readObject(packet) return format.readFrom(packet)
} }
@ -62,7 +61,7 @@ public interface ModbusDevice : Device {
public operator fun <T> ModbusRegistryKey.HoldingRange<T>.getValue(thisRef: Any?, property: KProperty<*>): T { public operator fun <T> ModbusRegistryKey.HoldingRange<T>.getValue(thisRef: Any?, property: KProperty<*>): T {
val packet = readHoldingRegistersToPacket(address, count) val packet = readHoldingRegistersToPacket(address, count)
return format.readObject(packet) return format.readFrom(packet)
} }
public operator fun <T> ModbusRegistryKey.HoldingRange<T>.setValue( public operator fun <T> ModbusRegistryKey.HoldingRange<T>.setValue(
@ -70,9 +69,9 @@ public interface ModbusDevice : Device {
property: KProperty<*>, property: KProperty<*>,
value: T, value: T,
) { ) {
val buffer = buildPacket { val buffer = ByteArray {
format.writeObject(this, value) format.writeTo(this, value)
}.readByteBuffer() }
writeHoldingRegisters(address, buffer) writeHoldingRegisters(address, buffer)
} }
@ -122,7 +121,7 @@ private fun Array<out InputRegister>.toBuffer(): ByteBuffer {
return buffer return buffer
} }
private fun Array<out InputRegister>.toPacket(): ByteReadPacket = buildPacket { private fun Array<out InputRegister>.toPacket(): Buffer = Buffer {
forEach { value -> forEach { value ->
writeShort(value.toShort()) writeShort(value.toShort())
} }
@ -131,7 +130,7 @@ private fun Array<out InputRegister>.toPacket(): ByteReadPacket = buildPacket {
public fun ModbusDevice.readInputRegistersToBuffer(address: Int, count: Int): ByteBuffer = public fun ModbusDevice.readInputRegistersToBuffer(address: Int, count: Int): ByteBuffer =
master.readInputRegisters(unitId, address, count).toBuffer() master.readInputRegisters(unitId, address, count).toBuffer()
public fun ModbusDevice.readInputRegistersToPacket(address: Int, count: Int): ByteReadPacket = public fun ModbusDevice.readInputRegistersToPacket(address: Int, count: Int): Buffer =
master.readInputRegisters(unitId, address, count).toPacket() master.readInputRegisters(unitId, address, count).toPacket()
public fun ModbusDevice.readDoubleInput(address: Int): Double = public fun ModbusDevice.readDoubleInput(address: Int): Double =
@ -151,7 +150,7 @@ public fun ModbusDevice.readHoldingRegisters(address: Int, count: Int): List<Reg
public fun ModbusDevice.readHoldingRegistersToBuffer(address: Int, count: Int): ByteBuffer = public fun ModbusDevice.readHoldingRegistersToBuffer(address: Int, count: Int): ByteBuffer =
master.readMultipleRegisters(unitId, address, count).toBuffer() master.readMultipleRegisters(unitId, address, count).toBuffer()
public fun ModbusDevice.readHoldingRegistersToPacket(address: Int, count: Int): ByteReadPacket = public fun ModbusDevice.readHoldingRegistersToPacket(address: Int, count: Int): Buffer =
master.readMultipleRegisters(unitId, address, count).toPacket() master.readMultipleRegisters(unitId, address, count).toPacket()
public fun ModbusDevice.readDoubleRegister(address: Int): Double = public fun ModbusDevice.readDoubleRegister(address: Int): Double =
@ -183,6 +182,13 @@ public fun ModbusDevice.writeHoldingRegisters(address: Int, buffer: ByteBuffer):
return writeHoldingRegisters(address, array) return writeHoldingRegisters(address, array)
} }
public fun ModbusDevice.writeHoldingRegisters(address: Int, byteArray: ByteArray): Int {
val buffer = ByteBuffer.wrap(byteArray)
val array: ShortArray = ShortArray(buffer.limit().floorDiv(2)) { buffer.getShort(it * 2) }
return writeHoldingRegisters(address, array)
}
public fun ModbusDevice.modbusRegister( public fun ModbusDevice.modbusRegister(
address: Int, address: Int,
): ReadWriteProperty<ModbusDevice, Short> = object : ReadWriteProperty<ModbusDevice, Short> { ): ReadWriteProperty<ModbusDevice, Short> = object : ReadWriteProperty<ModbusDevice, Short> {

View File

@ -107,7 +107,7 @@ internal class MetaStructureCodec(
override fun createStructure(name: String, members: LinkedHashMap<String, Meta>): Meta = Meta { override fun createStructure(name: String, members: LinkedHashMap<String, Meta>): Meta = Meta {
members.forEach { (property: String, value: Meta?) -> members.forEach { (property: String, value: Meta?) ->
setMeta(Name.parse(property), value) set(Name.parse(property), value)
} }
} }

View File

@ -43,7 +43,7 @@ public suspend inline fun <reified T: Any> OpcUaDevice.readOpcWithTime(
else -> error("Incompatible OPC property value $content") else -> error("Incompatible OPC property value $content")
} }
val res: T = converter.metaToObject(meta) ?: error("Meta $meta could not be converted to ${T::class}") val res: T = converter.metaToObject(meta)
return res to time return res to time
} }

View File

@ -9,6 +9,7 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.visionforge.ElementVisionRenderer import space.kscience.visionforge.ElementVisionRenderer
import space.kscience.visionforge.Vision import space.kscience.visionforge.Vision
import space.kscience.visionforge.VisionClient
import space.kscience.visionforge.VisionPlugin import space.kscience.visionforge.VisionPlugin
public actual class ControlVisionPlugin : VisionPlugin(), ElementVisionRenderer { public actual class ControlVisionPlugin : VisionPlugin(), ElementVisionRenderer {
@ -20,7 +21,7 @@ public actual class ControlVisionPlugin : VisionPlugin(), ElementVisionRenderer
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun render(element: Element, name: Name, vision: Vision, meta: Meta) { override fun render(element: Element, client: VisionClient, name: Name, vision: Vision, meta: Meta) {
TODO("Not yet implemented") TODO("Not yet implemented")
} }

View File

@ -42,7 +42,7 @@ class DemoDevice(context: Context, meta: Meta) : DeviceBySpec<IDemoDevice>(Compa
// register virtual properties based on actual object state // register virtual properties based on actual object state
val timeScale by mutableProperty(MetaConverter.double, IDemoDevice::timeScaleState) { val timeScale by mutableProperty(MetaConverter.double, IDemoDevice::timeScaleState) {
metaDescriptor { metaDescriptor {
type(ValueType.NUMBER) valueType(ValueType.NUMBER)
} }
description = "Real to virtual time scale" description = "Real to virtual time scale"
} }

View File

@ -17,6 +17,8 @@ import space.kscience.dataforge.meta.double
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.transformations.MetaConverter import space.kscience.dataforge.meta.transformations.MetaConverter
import kotlin.math.pow import kotlin.math.pow
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.time.Duration import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.ExperimentalTime import kotlin.time.ExperimentalTime
@ -28,7 +30,10 @@ data class Vector2D(var x: Double = 0.0, var y: Double = 0.0) : MetaRepr {
operator fun div(arg: Double): Vector2D = Vector2D(x / arg, y / arg) operator fun div(arg: Double): Vector2D = Vector2D(x / arg, y / arg)
companion object CoordinatesMetaConverter : MetaConverter<Vector2D> { companion object CoordinatesMetaConverter : MetaConverter<Vector2D> {
override fun metaToObject(meta: Meta): Vector2D = Vector2D(
override val type: KType = typeOf<Vector2D>()
override fun metaToObjectOrNull(meta: Meta): Vector2D = Vector2D(
meta["x"].double ?: 0.0, meta["x"].double ?: 0.0,
meta["y"].double ?: 0.0 meta["y"].double ?: 0.0
) )
@ -40,7 +45,8 @@ data class Vector2D(var x: Double = 0.0, var y: Double = 0.0) : MetaRepr {
} }
} }
open class VirtualCar(context: Context, meta: Meta) : DeviceBySpec<VirtualCar>(IVirtualCar, context, meta), IVirtualCar { open class VirtualCar(context: Context, meta: Meta) : DeviceBySpec<VirtualCar>(IVirtualCar, context, meta),
IVirtualCar {
private val clock = context.clock private val clock = context.clock
private val timeScale = 1e-3 private val timeScale = 1e-3

View File

@ -1,10 +0,0 @@
package center.sciprog.devices.mks
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.meta.transformations.MetaConverter
object NullableStringMetaConverter : MetaConverter<String?> {
override fun metaToObject(meta: Meta): String? = meta.string
override fun objectToMeta(obj: String?): Meta = Meta {}
}

View File

@ -17,6 +17,10 @@ kscience {
useSerialization{ useSerialization{
json() json()
} }
commonMain{
implementation(spclibs.atomicfu)
}
} }
readme{ readme{