A lot of small refactoring in html

This commit is contained in:
Alexander Nozik 2020-12-12 10:44:41 +03:00
parent 17beb29217
commit d68f5a9840
14 changed files with 146 additions and 237 deletions

View File

@ -4,7 +4,7 @@ plugins {
kotlin("js") apply false
}
val dataforgeVersion: String by extra("0.2.0")
val dataforgeVersion: String by extra("0.2.1-dev-2")
val ktorVersion: String by extra("1.4.3")
val rsocketVersion by extra("0.11.1")

View File

@ -2,13 +2,12 @@ package hep.dataforge.control.api
import hep.dataforge.context.ContextAware
import hep.dataforge.control.api.Device.Companion.DEVICE_TARGET
import hep.dataforge.io.Envelope
import hep.dataforge.io.EnvelopeBuilder
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.provider.Type
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.io.Closeable
/**
@ -28,27 +27,15 @@ public interface Device : Closeable, ContextAware {
public val actionDescriptors: Collection<ActionDescriptor>
/**
* The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes
* The supervisor scope encompassing all operations on a device. When canceled, cancels all running processes.
*/
public val scope: CoroutineScope
/**
* Register a new property change listener for this device.
* [owner] is provided optionally in order for listener to be
* easily removable
*/
public fun registerListener(listener: DeviceListener, owner: Any? = listener)
/**
* Remove all listeners belonging to the specified owner
*/
public fun removeListeners(owner: Any?)
/**
* Get the value of the property or throw error if property in not defined.
* Suspend if property value is not available
*/
public suspend fun getProperty(propertyName: String): MetaItem<*>
public suspend fun getProperty(propertyName: String): MetaItem<*>?
/**
* Invalidate property and force recalculate
@ -61,11 +48,16 @@ public interface Device : Closeable, ContextAware {
*/
public suspend fun setProperty(propertyName: String, value: MetaItem<*>)
/**
* The [SharedFlow] of property changes
*/
public val propertyFlow: SharedFlow<Pair<String, MetaItem<*>>>
/**
* Send an action request and suspend caller while request is being processed.
* Could return null if request does not return a meaningful answer.
*/
public suspend fun execute(command: String, argument: MetaItem<*>? = null): MetaItem<*>?
public suspend fun execute(action: String, argument: MetaItem<*>? = null): MetaItem<*>?
override fun close() {
scope.cancel("The device is closed")
@ -76,14 +68,10 @@ public interface Device : Closeable, ContextAware {
}
}
public interface ResponderDevice{
/**
*
* A request with binary data or for binary response (or both). This request does not cover basic functionality like
* [setProperty], [getProperty] or [execute] and not defined for a generic device.
*
*/
public suspend fun respondWithData(request: Envelope): EnvelopeBuilder
public suspend fun Device.getState(): Meta = Meta{
for(descriptor in propertyDescriptors) {
descriptor.name put getProperty(descriptor.name)
}
}
public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) })
//public suspend fun Device.execute(name: String, meta: Meta?): MetaItem<*>? = execute(name, meta?.let { MetaItem.NodeItem(it) })

View File

@ -1,14 +0,0 @@
package hep.dataforge.control.api
import hep.dataforge.meta.MetaItem
/**
* PropertyChangeListener Interface
* [value] is a new value that property has after a change; null is for invalid state.
*/
public interface DeviceListener {
public fun propertyChanged(propertyName: String, value: MetaItem<*>?)
public fun actionExecuted(action: String, argument: MetaItem<*>?, result: MetaItem<*>?) {}
//TODO add general message listener method
}

View File

@ -3,18 +3,25 @@ package hep.dataforge.control.base
import hep.dataforge.context.Context
import hep.dataforge.control.api.ActionDescriptor
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.PropertyDescriptor
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
//TODO move to DataForge-core
@DFExperimental
public data class LogEntry(val content: String, val priority: Int = 0)
/**
* Baseline implementation of [Device] interface
*/
@Suppress("EXPERIMENTAL_API_USAGE")
public abstract class DeviceBase(override val context: Context) : Device {
private val _properties = HashMap<String, ReadOnlyDeviceProperty>()
@ -22,25 +29,21 @@ public abstract class DeviceBase(override val context: Context) : Device {
private val _actions = HashMap<String, DeviceAction>()
public val actions: Map<String, DeviceAction> get() = _actions
private val listeners = ArrayList<Pair<Any?, DeviceListener>>(4)
private val sharedPropertyFlow = MutableSharedFlow<Pair<String, MetaItem<*>>>()
override fun registerListener(listener: DeviceListener, owner: Any?) {
listeners.add(owner to listener)
}
override val propertyFlow: SharedFlow<Pair<String, MetaItem<*>>> get() = sharedPropertyFlow
override fun removeListeners(owner: Any?) {
listeners.removeAll { it.first == owner }
}
private val sharedLogFlow = MutableSharedFlow<LogEntry>()
internal fun notifyListeners(block: DeviceListener.() -> Unit) {
listeners.forEach { it.second.block() }
}
/**
* The [SharedFlow] of log messages
*/
@DFExperimental
public val logFlow: SharedFlow<LogEntry>
get() = sharedLogFlow
public fun notifyPropertyChanged(propertyName: String) {
scope.launch {
val value = getProperty(propertyName)
notifyListeners { propertyChanged(propertyName, value) }
}
protected suspend fun log(message: String, priority: Int = 0) {
sharedLogFlow.emit(LogEntry(message, priority))
}
override val propertyDescriptors: Collection<PropertyDescriptor>
@ -72,8 +75,8 @@ public abstract class DeviceBase(override val context: Context) : Device {
)
}
override suspend fun execute(command: String, argument: MetaItem<*>?): MetaItem<*>? =
(_actions[command] ?: error("Request with name $command not defined")).invoke(argument)
override suspend fun execute(action: String, argument: MetaItem<*>?): MetaItem<*>? =
(_actions[action] ?: error("Request with name $action not defined")).invoke(argument)
@OptIn(ExperimentalCoroutinesApi::class)
private open inner class BasicReadOnlyDeviceProperty(
@ -94,8 +97,8 @@ public abstract class DeviceBase(override val context: Context) : Device {
override fun updateLogical(item: MetaItem<*>) {
state.value = item
notifyListeners {
propertyChanged(name, item)
scope.launch {
sharedPropertyFlow.emit(Pair(name, item))
}
}
@ -206,11 +209,7 @@ public abstract class DeviceBase(override val context: Context) : Device {
) : DeviceAction {
override suspend fun invoke(arg: MetaItem<*>?): MetaItem<*>? =
withContext(scope.coroutineContext + SupervisorJob(scope.coroutineContext[Job])) {
block(arg).also {
notifyListeners {
actionExecuted(name, arg, it)
}
}
block(arg)
}
}

View File

@ -1,62 +1,42 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.*
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.get
import hep.dataforge.control.messages.*
import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItem
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.io.Binary
import kotlinx.coroutines.flow.map
/**
* The [DeviceController] wraps device operations in [DeviceMessage]
*/
@OptIn(DFExperimental::class)
public class DeviceController(
public val device: Device,
public val deviceName: String,
public val scope: CoroutineScope = device.scope,
) : Responder, Consumer, DeviceListener {
) {
init {
device.registerListener(this, this)
private val propertyChanges = device.propertyFlow.map { (propertyName: String, value: MetaItem<*>) ->
PropertyChangedMessage(
sourceDevice = deviceName,
key = propertyName,
value = value,
)
}
private val outputChannel = Channel<Envelope>(Channel.CONFLATED)
/**
* The flow of outgoing messages
*/
public val messages: Flow<DeviceMessage> get() = propertyChanges
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage =
respondMessage(device, deviceName, message)
override suspend fun respond(request: Envelope): Envelope = respond(device, deviceName, request)
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
scope.launch {
val change = PropertyChangedMessage(
sourceDevice = deviceName,
key = propertyName,
value = value,
)
val envelope = SimpleEnvelope(change.toMeta(), Binary.EMPTY)
outputChannel.send(envelope)
}
}
public fun receiving(): Flow<Envelope> = outputChannel.consumeAsFlow()
@DFExperimental
override fun consume(message: Envelope) {
// Fire the respond procedure and forget about the result
scope.launch {
respond(message)
}
}
public companion object {
public const val GET_PROPERTY_ACTION: String = "read"
@ -65,29 +45,21 @@ public class DeviceController(
public const val PROPERTY_LIST_ACTION: String = "propertyList"
public const val ACTION_LIST_ACTION: String = "actionList"
internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
val target = request.meta["target"].string
return try {
if (request.data == null) {
respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope()
} else if (target != null && target != deviceTarget) {
error("Wrong target name $deviceTarget expected but $target found")
} else {
if (device is ResponderDevice) {
val response = device.respondWithData(request).apply {
meta {
"target" put request.meta["source"].string
"source" put deviceTarget
}
}
response.seal()
} else error("Device does not support binary response")
}
} catch (ex: Exception) {
val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string
DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope()
}
}
// internal suspend fun respond(device: Device, deviceTarget: String, request: Envelope): Envelope {
// val target = request.meta["target"].string
// return try {
// if (device is Responder) {
// device.respond(request)
// } else if (request.data == null) {
// respondMessage(device, deviceTarget, DeviceMessage.fromMeta(request.meta)).toEnvelope()
// } else if (target != null && target != deviceTarget) {
// error("Wrong target name $deviceTarget expected but $target found")
// } else error("Device does not support binary response")
// } catch (ex: Exception) {
// val requestSourceName = request.meta[DeviceMessage.SOURCE_KEY].string
// DeviceMessage.error(ex, sourceDevice = deviceTarget, targetDevice = requestSourceName).toEnvelope()
// }
// }
internal suspend fun respondMessage(
device: Device,

View File

@ -1,67 +1,53 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.DeviceHub
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.control.api.get
import hep.dataforge.control.messages.DeviceMessage
import hep.dataforge.control.messages.PropertyChangedMessage
import hep.dataforge.control.messages.toEnvelope
import hep.dataforge.io.Consumer
import hep.dataforge.io.Envelope
import hep.dataforge.io.Responder
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.MetaItem
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
@OptIn(DFExperimental::class)
public class HubController(
public val hub: DeviceHub,
public val scope: CoroutineScope,
) : Consumer, Responder {
) {
private val messageOutbox = Channel<DeviceMessage>(Channel.CONFLATED)
private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED)
// private val envelopeOutbox = Channel<Envelope>(Channel.CONFLATED)
public fun messageOutput(): Flow<DeviceMessage> = messageOutbox.consumeAsFlow()
public fun envelopeOutput(): Flow<Envelope> = envelopeOutbox.consumeAsFlow()
// public fun envelopeOutput(): Flow<Envelope> = envelopeOutbox.consumeAsFlow()
private val packJob = scope.launch {
while (isActive) {
val message = messageOutbox.receive()
envelopeOutbox.send(message.toEnvelope())
}
}
// private val packJob = scope.launch {
// while (isActive) {
// val message = messageOutbox.receive()
// envelopeOutbox.send(message.toEnvelope())
// }
// }
private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (name, device) ->
object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value == null) return
scope.launch {
val change = PropertyChangedMessage(
sourceDevice = name.toString(),
key = propertyName,
value = value
)
messageOutbox.send(change)
}
}
}.also {
device.registerListener(it)
}
}
// private val listeners: Map<NameToken, DeviceListener> = hub.devices.mapValues { (deviceNameToken, device) ->
// object : DeviceListener {
// override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
// if (value == null) return
// scope.launch {
// val change = PropertyChangedMessage(
// sourceDevice = deviceNameToken.toString(),
// key = propertyName,
// value = value
// )
// messageOutbox.send(change)
// }
// }
// }.also {
// device.registerListener(it)
// }
// }
public suspend fun respondMessage(message: DeviceMessage): DeviceMessage = try {
val targetName = message.targetDevice?.toName() ?: Name.EMPTY
@ -70,24 +56,24 @@ public class HubController(
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = null, targetDevice = message.sourceDevice)
}
override suspend fun respond(request: Envelope): Envelope = try {
val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
if (request.data == null) {
DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
.toEnvelope()
} else {
DeviceController.respond(device, targetName.toString(), request)
}
} catch (ex: Exception) {
DeviceMessage.error(ex, sourceDevice = null).toEnvelope()
}
override fun consume(message: Envelope) {
// Fire the respond procedure and forget about the result
scope.launch {
respond(message)
}
}
//
// override suspend fun respond(request: Envelope): Envelope = try {
// val targetName = request.meta[DeviceMessage.TARGET_KEY].string?.toName() ?: Name.EMPTY
// val device = hub[targetName] ?: error("The device with name $targetName not found in $hub")
// if (request.data == null) {
// DeviceController.respondMessage(device, targetName.toString(), DeviceMessage.fromMeta(request.meta))
// .toEnvelope()
// } else {
// DeviceController.respond(device, targetName.toString(), request)
// }
// } catch (ex: Exception) {
// DeviceMessage.error(ex, sourceDevice = null).toEnvelope()
// }
//
// override fun consume(message: Envelope) {
// // Fire the respond procedure and forget about the result
// scope.launch {
// respond(message)
// }
// }
}

View File

@ -1,30 +0,0 @@
package hep.dataforge.control.controllers
import hep.dataforge.control.api.Device
import hep.dataforge.control.api.DeviceListener
import hep.dataforge.meta.MetaItem
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.launch
/**
* Flow changes of all properties of a given device ignoring invalidation events
*/
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun Device.flowPropertyChanges(): Flow<Pair<String, MetaItem<*>>> = callbackFlow {
val listener = object : DeviceListener {
override fun propertyChanged(propertyName: String, value: MetaItem<*>?) {
if (value != null) {
launch {
send(propertyName to value)
}
}
}
}
registerListener(listener)
awaitClose {
removeListeners(listener)
}
}

View File

@ -2,8 +2,6 @@ package hep.dataforge.control.messages
import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
@ -18,9 +16,6 @@ public sealed class DeviceMessage{
public companion object {
public val SOURCE_KEY: Name = DeviceMessage::sourceDevice.name.asName()
public val TARGET_KEY: Name = DeviceMessage::targetDevice.name.asName()
public fun error(
cause: Throwable,
sourceDevice: String?,
@ -131,6 +126,18 @@ public data class EmptyDeviceMessage(
override val comment: String? = null,
) : DeviceMessage()
/**
* Information log message
*/
@Serializable
@SerialName("log")
public data class DeviceLogMessage(
val message: String,
override val sourceDevice: String? = null,
override val targetDevice: String? = null,
override val comment: String? = null,
) : DeviceMessage()
/**
* The evaluation of the message produced a service error
*/

View File

@ -3,10 +3,6 @@ plugins {
id("ru.mipt.npm.publish")
}
kscience {
useSerialization()
}
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra

View File

@ -2,13 +2,8 @@ plugins {
id("ru.mipt.npm.mpp")
}
val ktorVersion: String by rootProject.extra
kscience{
useCoroutines()
}
kotlin {
sourceSets {
commonMain {

View File

@ -23,6 +23,8 @@ dependencies{
implementation("no.tornado:tornadofx:1.7.20")
implementation(kotlin("stdlib-jdk8"))
implementation("kscience.plotlykt:plotlykt-server:0.3.0")
implementation("com.github.Ricky12Awesome:json-schema-serialization:0.6.6")
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile>().configureEach {
@ -37,5 +39,5 @@ javafx{
}
application{
mainClassName = "hep.dataforge.control.demo.DemoControllerViewKt"
mainClass.set("hep.dataforge.control.demo.DemoControllerViewKt")
}

View File

@ -0,0 +1,10 @@
package hep.dataforge.control.demo
import com.github.ricky12awesome.jss.encodeToSchema
import com.github.ricky12awesome.jss.globalJson
import hep.dataforge.control.messages.DeviceMessage
fun main() {
val schema = globalJson.encodeToSchema(DeviceMessage.serializer(), generateDefinitions = false)
println(schema)
}

View File

@ -7,7 +7,6 @@ kscience {
useSerialization{
json()
}
useCoroutines("1.4.0", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}
val dataforgeVersion: String by rootProject.extra

View File

@ -7,7 +7,6 @@ kscience {
useSerialization{
json()
}
useCoroutines("1.4.1", configuration = ru.mipt.npm.gradle.DependencyConfiguration.API)
}
val dataforgeVersion: String by rootProject.extra