Update and finalize message design

This commit is contained in:
Alexander Nozik 2020-10-06 15:33:15 +03:00
parent 8f9bae6462
commit dbf0466c64
12 changed files with 206 additions and 181 deletions

View File

@ -0,0 +1,34 @@
package hep.dataforge.control.api
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.io.Closeable
/**
* A generic bi-directional sender/receiver object
*/
public interface Socket<T> : Closeable {
/**
* Send an object to the socket
*/
public suspend fun send(data: T)
/**
* Flow of objects received from socket
*/
public fun receiving(): Flow<T>
public fun isOpen(): Boolean
}
/**
* Connect an input to this socket using designated [scope] for it and return a handler [Job].
* Multiple inputs could be connected to the same [Socket].
*/
public fun <T> Socket<T>.connectInput(scope: CoroutineScope, flow: Flow<T>): Job = scope.launch {
flow.collect { send(it) }
}

View File

@ -41,13 +41,11 @@ public class DeviceController(
if (value == null) return
scope.launch {
val change = DeviceMessage.ok {
this.source = deviceTarget
type = PROPERTY_CHANGED_ACTION
data {
name = propertyName
this.sourceName = deviceTarget
this.action = PROPERTY_CHANGED_ACTION
this.key = propertyName
this.value = value
}
}
val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY)
outputChannel.send(envelope)
@ -98,62 +96,52 @@ public class DeviceController(
request: DeviceMessage,
): DeviceMessage {
return try {
val result: List<MessageData> = when (val action = request.type) {
DeviceMessage.ok {
targetName = request.sourceName
sourceName = deviceTarget
action ="response.${request.action}"
val requestKey = request.key
val requestValue = request.value
when (val action = request.action) {
GET_PROPERTY_ACTION -> {
request.data.map { property ->
MessageData {
name = property.name
value = device.getProperty(name)
}
}
key = requestKey
value = device.getProperty(requestKey ?: error("Key field is not defined in request"))
}
SET_PROPERTY_ACTION -> {
request.data.map { property ->
val propertyName: String = property.name
val propertyValue = property.value
if (propertyValue == null) {
device.invalidateProperty(propertyName)
require(requestKey != null) { "Key field is not defined in request" }
if (requestValue == null) {
device.invalidateProperty(requestKey)
} else {
device.setProperty(propertyName, propertyValue)
}
MessageData {
name = propertyName
value = device.getProperty(propertyName)
}
device.setProperty(requestKey, requestValue)
}
key = requestKey
value = device.getProperty(requestKey)
}
EXECUTE_ACTION -> {
request.data.map { payload ->
MessageData {
name = payload.name
value = device.execute(payload.name, payload.value)
}
}
require(requestKey != null) { "Key field is not defined in request" }
key = requestKey
value = device.execute(requestKey, requestValue)
}
PROPERTY_LIST_ACTION -> {
value = Meta {
device.propertyDescriptors.map { descriptor ->
MessageData {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
descriptor.name put descriptor.config
}
}.asMetaItem()
}
ACTION_LIST_ACTION -> {
value = Meta {
device.actionDescriptors.map { descriptor ->
MessageData {
name = descriptor.name
value = MetaItem.NodeItem(descriptor.config)
}
descriptor.name put descriptor.config
}
}.asMetaItem()
}
else -> {
error("Unrecognized action $action")
}
}
DeviceMessage.ok {
target = request.source
source = deviceTarget
data = result
}
} catch (ex: Exception) {
DeviceMessage.fail(request, cause = ex)
@ -165,7 +153,7 @@ public class DeviceController(
public suspend fun DeviceHub.respondMessage(request: DeviceMessage): DeviceMessage {
return try {
val targetName = request.target?.toName() ?: Name.EMPTY
val targetName = request.targetName?.toName() ?: Name.EMPTY
val device = this[targetName] ?: error("The device with name $targetName not found in $this")
DeviceController.respondMessage(device, targetName.toString(), request)
} catch (ex: Exception) {

View File

@ -1,6 +1,5 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY_ACTION
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import hep.dataforge.names.Name
@ -11,28 +10,20 @@ import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder
public class DeviceMessage : Scheme() {
public var source: String? by string(key = SOURCE_KEY)
public var target: String? by string(key = TARGET_KEY)
public var type: String by string(default = GET_PROPERTY_ACTION, key = MESSAGE_TYPE_KEY)
public var action: String by string { error("Action not defined") }
public var status: String by string(default = RESPONSE_OK_STATUS)
public var sourceName: String? by string()
public var targetName: String? by string()
public var comment: String? by string()
public var status: String by string(RESPONSE_OK_STATUS)
public var data: List<MessageData>
get() = config.getIndexed(MESSAGE_DATA_KEY).values.map { MessageData.wrap(it.node!!) }
set(value) {
config[MESSAGE_DATA_KEY] = value.map { it.config }
}
/**
* Append a payload to this message according to the given scheme
*/
public fun <T : Configurable> append(spec: Specification<T>, block: T.() -> Unit): T =
spec.invoke(block).also { config.append(MESSAGE_DATA_KEY, it) }
public var key: String? by string()
public var value: MetaItem<*>? by item()
public companion object : SchemeSpec<DeviceMessage>(::DeviceMessage), KSerializer<DeviceMessage> {
public val SOURCE_KEY: Name = "source".asName()
public val TARGET_KEY: Name = "target".asName()
public val MESSAGE_TYPE_KEY: Name = "type".asName()
public val MESSAGE_DATA_KEY: Name = "data".asName()
public val SOURCE_KEY: Name = DeviceMessage::sourceName.name.asName()
public val TARGET_KEY: Name = DeviceMessage::targetName.name.asName()
public val MESSAGE_ACTION_KEY: Name = DeviceMessage::action.name.asName()
public val MESSAGE_KEY_KEY: Name = DeviceMessage::key.name.asName()
public val MESSAGE_VALUE_KEY: Name = DeviceMessage::value.name.asName()
public const val RESPONSE_OK_STATUS: String = "response.OK"
public const val RESPONSE_FAIL_STATUS: String = "response.FAIL"
@ -40,19 +31,19 @@ public class DeviceMessage : Scheme() {
public inline fun ok(
request: DeviceMessage? = null,
block: DeviceMessage.() -> Unit = {}
block: DeviceMessage.() -> Unit = {},
): DeviceMessage = DeviceMessage {
target = request?.source
targetName = request?.sourceName
}.apply(block)
public inline fun fail(
request: DeviceMessage? = null,
cause: Throwable? = null,
block: DeviceMessage.() -> Unit = {}
block: DeviceMessage.() -> Unit = {},
): DeviceMessage = DeviceMessage {
target = request?.source
targetName = request?.sourceName
status = RESPONSE_FAIL_STATUS
if(cause!=null){
if (cause != null) {
configure {
set("error.type", cause::class.simpleName)
set("error.message", cause.message)
@ -76,16 +67,4 @@ public class DeviceMessage : Scheme() {
}
}
public class MessageData : Scheme() {
public var name: String by string { error("Property name could not be empty") }
public var value: MetaItem<*>? by item(key = DATA_VALUE_KEY)
public companion object : SchemeSpec<MessageData>(::MessageData) {
public val DATA_VALUE_KEY: Name = "value".asName()
}
}
@DFBuilder
public fun DeviceMessage.data(block: MessageData.() -> Unit): MessageData = append(MessageData, block)
public fun DeviceMessage.wrap(): SimpleEnvelope = SimpleEnvelope(this.config, null)

View File

@ -21,7 +21,7 @@ import kotlinx.coroutines.launch
@OptIn(DFExperimental::class)
public class HubController(
public val hub: DeviceHub,
public val scope: CoroutineScope
public val scope: CoroutineScope,
) : Consumer, Responder {
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
@ -45,13 +45,11 @@ public class HubController(
if (value == null) return
scope.launch {
val change = DeviceMessage.ok {
source = name.toString()
type = DeviceMessage.PROPERTY_CHANGED_ACTION
data {
this.name = propertyName
sourceName = name.toString()
action = DeviceMessage.PROPERTY_CHANGED_ACTION
this.key = propertyName
this.value = value
}
}
messageOutbox.send(change)
}
@ -62,7 +60,7 @@ public class HubController(
}
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.target?.toName() ?: Name.EMPTY
val targetName = message.targetName?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
DeviceController.respondMessage(device, targetName.toString(), message)
} catch (ex: Exception) {

View File

@ -48,6 +48,9 @@ public fun DeviceProperty.int(): ReadWriteProperty<Any?, Int> = convert(MetaConv
public fun ReadOnlyDeviceProperty.string(): ReadOnlyProperty<Any?, String> = convert(MetaConverter.string)
public fun DeviceProperty.string(): ReadWriteProperty<Any?, String> = convert(MetaConverter.string)
public fun ReadOnlyDeviceProperty.boolean(): ReadOnlyProperty<Any?, Boolean> = convert(MetaConverter.boolean)
public fun DeviceProperty.boolean(): ReadWriteProperty<Any?, Boolean> = convert(MetaConverter.boolean)
//TODO to be moved to DF
private object DurationConverter : MetaConverter<Duration> {
override fun itemToObject(item: MetaItem<*>): Duration = when (item) {

View File

@ -3,22 +3,21 @@ package hep.dataforge.control.ports
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.context.Factory
import hep.dataforge.control.api.Socket
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.io.Closeable
import kotlin.coroutines.CoroutineContext
public interface Port: Closeable, ContextAware {
public suspend fun send(data: ByteArray)
public suspend fun receiving(): Flow<ByteArray>
public fun isOpen(): Boolean
}
public interface Port : ContextAware, Socket<ByteArray>
public typealias PortFactory = Factory<Port>
public abstract class AbstractPort(override val context: Context, coroutineContext: CoroutineContext = context.coroutineContext) : Port {
public abstract class AbstractPort(
override val context: Context,
coroutineContext: CoroutineContext = context.coroutineContext,
) : Port {
protected val scope: CoroutineScope = CoroutineScope(coroutineContext + SupervisorJob(coroutineContext[Job]))
@ -70,7 +69,7 @@ public abstract class AbstractPort(override val context: Context, coroutineConte
* In order to form phrases some condition should used on top of it.
* For example [delimitedIncoming] generates phrases with fixed delimiter.
*/
override suspend fun receiving(): Flow<ByteArray> {
override fun receiving(): Flow<ByteArray> {
return incoming.receiveAsFlow()
}

View File

@ -43,7 +43,7 @@ public class PortProxy(override val context: Context = Global, public val factor
}
@OptIn(ExperimentalCoroutinesApi::class)
override suspend fun receiving(): Flow<ByteArray> = channelFlow {
override fun receiving(): Flow<ByteArray> = channelFlow {
while (isActive) {
try {
//recreate port and Flow on cancel

View File

@ -39,7 +39,7 @@ public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSi
* Transform byte fragments into utf-8 phrases using utf-8 delimiter
*/
public fun Flow<ByteArray>.withDelimiter(delimiter: String, expectedMessageSize: Int = 32): Flow<String> {
return withDelimiter(delimiter.encodeToByteArray()).map { it.decodeToString() }
return withDelimiter(delimiter.encodeToByteArray(),expectedMessageSize).map { it.decodeToString() }
}
/**

View File

@ -8,7 +8,6 @@ import hep.dataforge.control.controllers.DeviceController.Companion.GET_PROPERTY
import hep.dataforge.control.controllers.DeviceController.Companion.SET_PROPERTY_ACTION
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.data
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
@ -47,7 +46,7 @@ import kotlinx.serialization.json.put
public fun CoroutineScope.startDeviceServer(
manager: DeviceManager,
port: Int = 8111,
host: String = "localhost"
host: String = "localhost",
): ApplicationEngine {
return this.embeddedServer(CIO, port, host) {
@ -80,7 +79,7 @@ public const val WEB_SERVER_TARGET: String = "@webServer"
public fun Application.deviceModule(
manager: DeviceManager,
deviceNames: Collection<String> = manager.devices.keys.map { it.toString() },
route: String = "/"
route: String = "/",
) {
// val controllers = deviceNames.associateWith { name ->
// val device = manager.devices[name.toName()]
@ -115,7 +114,8 @@ public fun Application.deviceModule(
+"Device server dashboard"
}
deviceNames.forEach { deviceName ->
val device = manager[deviceName] ?: error("The device with name $deviceName not found in $manager")
val device =
manager[deviceName] ?: error("The device with name $deviceName not found in $manager")
div {
id = deviceName
h2 { +deviceName }
@ -203,12 +203,10 @@ public fun Application.deviceModule(
val target: String by call.parameters
val property: String by call.parameters
val request = DeviceMessage {
type = GET_PROPERTY_ACTION
source = WEB_SERVER_TARGET
this.target = target
data {
name = property
}
action = GET_PROPERTY_ACTION
sourceName = WEB_SERVER_TARGET
this.targetName = target
key = property
}
val response = manager.respondMessage(request)
@ -221,13 +219,12 @@ public fun Application.deviceModule(
val json = Json.parseToJsonElement(body)
val request = DeviceMessage {
type = SET_PROPERTY_ACTION
source = WEB_SERVER_TARGET
this.target = target
data {
name = property
action = SET_PROPERTY_ACTION
sourceName = WEB_SERVER_TARGET
this.targetName = target
key = property
value = json.toMetaItem()
}
}
val response = manager.respondMessage(request)

View File

@ -4,6 +4,8 @@ import hep.dataforge.context.Context
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.control.base.*
import hep.dataforge.control.controllers.boolean
import hep.dataforge.control.controllers.double
import hep.dataforge.control.controllers.duration
import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortProxy
@ -12,15 +14,12 @@ import hep.dataforge.control.ports.withDelimiter
import hep.dataforge.meta.MetaItem
import hep.dataforge.names.NameToken
import hep.dataforge.values.Null
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration
@ -48,7 +47,7 @@ public class PiMotionMasterDevice(
private val mutex = Mutex()
private suspend fun dispatchError(errorCode: Int){
private suspend fun dispatchError(errorCode: Int) {
}
@ -62,7 +61,7 @@ public class PiMotionMasterDevice(
connector.send(stringToSend)
}
public suspend fun getErrorCode(): Int = mutex.withLock{
public suspend fun getErrorCode(): Int = mutex.withLock {
withTimeout(timeoutValue) {
sendCommandInternal("ERR?")
val errorString = connector.receiving().withDelimiter("\n").first()
@ -81,7 +80,7 @@ public class PiMotionMasterDevice(
val phrases = connector.receiving().withDelimiter("\n")
phrases.takeWhile { it.endsWith(" \n") }.toList() + phrases.first()
}
} catch (ex: Throwable){
} catch (ex: Throwable) {
logger.warn { "Error during PIMotionMaster request. Requesting error code." }
val errorCode = getErrorCode()
dispatchError(errorCode)
@ -185,10 +184,25 @@ public class PiMotionMasterDevice(
}
)
public val reference: ReadOnlyDeviceProperty by readingBoolean(
descriptorBuilder = {
info = "Get Referencing Result"
},
getter = {
readAxisBoolean("FRF?")
}
)
val moveToReference by acting {
send("FRF", axisId)
}
public val position: DeviceProperty by axisNumberProperty("POS") {
info = "The current axis position."
}
var positionValue by position.double()
public val openLoopTarget: DeviceProperty by axisNumberProperty("OMA") {
info = "Position for open-loop operation."
}
@ -197,12 +211,18 @@ public class PiMotionMasterDevice(
info = "Servo closed loop mode"
}
var closedLoopValue by closedLoop.boolean()
public val velocity: DeviceProperty by axisNumberProperty("VEL") {
info = "Velocity value for closed-loop operation"
}
}
override val devices: Map<NameToken, Axis> = axes.associate { NameToken(it) to Axis(it) }
/**
*
*/
val axes: Map<String, Axis> get() = devices.mapKeys { it.toString() }
}

View File

@ -1,51 +1,60 @@
package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.context.Context
import hep.dataforge.control.api.Socket
import hep.dataforge.control.ports.AbstractPort
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import hep.dataforge.control.ports.withDelimiter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.flow.*
import kotlin.math.abs
import kotlin.time.Duration
public abstract class VirtualDevice {
protected abstract val scope: CoroutineScope
abstract class VirtualDevice(val scope: CoroutineScope) : Socket<ByteArray> {
public abstract suspend fun evaluateRequest(request: ByteArray)
protected abstract suspend fun evaluateRequest(request: ByteArray)
private val toSend = Channel<ByteArray>(100)
protected open fun Flow<ByteArray>.transformRequests(): Flow<ByteArray> = this
public val responses: Flow<ByteArray> get() = toSend.receiveAsFlow()
private val toReceive = Channel<ByteArray>(100)
private val toRespond = Channel<ByteArray>(100)
protected suspend fun send(response: ByteArray) {
toSend.send(response)
private val receiveJob: Job = toReceive.consumeAsFlow().onEach {
evaluateRequest(it)
}.catch {
it.printStackTrace()
}.launchIn(scope)
override suspend fun send(data: ByteArray) {
toReceive.send(data)
}
//
// protected suspend fun respond(response: String){
// respond(response.encodeToByteArray())
// }
protected suspend fun respond(response: ByteArray) {
toRespond.send(response)
}
override fun receiving(): Flow<ByteArray> = toRespond.receiveAsFlow()
protected fun respondInFuture(delay: Duration, response: suspend () -> ByteArray): Job = scope.launch {
delay(delay)
send(response())
respond(response())
}
override fun isOpen(): Boolean = scope.isActive
override fun close() = scope.cancel()
}
public class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) {
class VirtualPort(private val device: VirtualDevice, context: Context) : AbstractPort(context) {
private val respondJob = device.receiving().onEach(::receive).catch {
it.printStackTrace()
}.launchIn(scope)
private val respondJob = scope.launch {
device.responses.collect {
receive(it)
}
}
override suspend fun write(data: ByteArray) {
device.evaluateRequest(data)
device.send(data)
}
override fun close() {
@ -55,13 +64,13 @@ public class VirtualPort(private val device: VirtualDevice, context: Context) :
}
class PiMotionMasterVirtualDevice(override val scope: CoroutineScope, axisIds: List<String>) : VirtualDevice() {
class PiMotionMasterVirtualDevice(scope: CoroutineScope, axisIds: List<String>) : VirtualDevice(scope) {
init {
//add asynchronous send logic here
}
private val axisID = "0"
override fun Flow<ByteArray>.transformRequests(): Flow<ByteArray> = withDelimiter("\n".toByteArray())
private var errorCode: Int = 0
@ -116,7 +125,7 @@ class PiMotionMasterVirtualDevice(override val scope: CoroutineScope, axisIds: L
private fun respond(str: String) = scope.launch {
send((str + "\n").encodeToByteArray())
respond((str + "\n").encodeToByteArray())
}
private fun respondForAllAxis(axisIds: List<String>, extract: VirtualAxisState.(index: String) -> Any) {

View File

@ -28,21 +28,23 @@ fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = laun
server.accept()
} catch (ex: Exception) {
server.close()
ex.printStackTrace()
return@launch
}
launch {
println("Socket accepted: ${socket.remoteAddress}")
try {
println("Socket accepted: ${socket.remoteAddress}")
supervisorScope {
socket.use { socket ->
val input = socket.openReadChannel()
val output = socket.openWriteChannel(autoFlush = true)
val buffer = ByteBuffer.allocate(1024)
launch {
virtualPort.receiving().collect {
println("Sending: ${it.decodeToString()}")
//println("Sending: ${it.decodeToString()}")
output.writeAvailable(it)
output.flush()
}
}
while (isActive) {
@ -51,14 +53,10 @@ fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = laun
if (read > 0) {
buffer.flip()
val array = buffer.moveToByteArray()
println("Received: ${array.decodeToString()}")
//println("Received: ${array.decodeToString()}")
virtualPort.send(array)
}
}
} catch (ex: Exception) {
cancel()
} finally {
socket.close()
}
}
}
@ -66,7 +64,7 @@ fun CoroutineScope.launchPiDebugServer(port: Int, virtualPort: Port): Job = laun
fun main() {
val port = 10024
val virtualDevice = PiMotionMasterVirtualDevice(Global, listOf("1","2"))
val virtualDevice = PiMotionMasterVirtualDevice(Global, listOf("1", "2"))
val virtualPort = VirtualPort(virtualDevice, Global)
runBlocking(Dispatchers.Default) {
val serverJob = launchPiDebugServer(port, virtualPort)