More or less working motors

This commit is contained in:
Alexander Nozik 2020-10-11 22:37:39 +03:00
parent 23c66d9703
commit 62dc6ef127
20 changed files with 290 additions and 123 deletions

View File

@ -4,6 +4,7 @@ plugins {
}
val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra
kscience {
useCoroutines()
@ -17,13 +18,9 @@ kotlin {
api("hep.dataforge:dataforge-io:$dataforgeVersion")
}
}
jvmMain{
dependencies{
api("io.ktor:ktor-network:1.3.2")
}
}
jsMain{
jvmTest{
dependencies{
api("io.ktor:ktor-network:$ktorVersion")
}
}
}

View File

@ -5,9 +5,9 @@ import hep.dataforge.context.ContextAware
import hep.dataforge.context.Global
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.isActive
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
@ -31,40 +31,37 @@ public class PortProxy(override val context: Context = Global, public val factor
}
}
/**
* Ensure that the port is open. If it is already open, does nothing. Otherwise, open a new port.
*/
public suspend fun open() {
port()//ignore result
}
override suspend fun send(data: ByteArray) {
port().send(data)
}
@OptIn(ExperimentalCoroutinesApi::class)
override fun receiving(): Flow<ByteArray> = channelFlow {
while (isActive) {
override fun receiving(): Flow<ByteArray> = flow {
while (true) {
try {
//recreate port and Flow on cancel
//recreate port and Flow on connection problems
port().receiving().collect {
send(it)
emit(it)
}
} catch (t: Throwable) {
logger.warn(t){"Port read failed. Reconnecting."}
//cancel
// if (t is CancellationException) {
// cancel(t)
// }
mutex.withLock {
actualPort?.close()
actualPort = null
}
}
}
}// port().receiving()
}
// open by default
override fun isOpen(): Boolean = true
override fun close() {
actualPort?.close()
actualPort = null
context.launch {
mutex.withLock {
actualPort?.close()
actualPort = null
}
}
}
}

View File

@ -1,21 +1,20 @@
package hep.dataforge.control.ports
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.transform
import kotlinx.io.ByteArrayOutput
/**
* Transform byte fragments into complete phrases using given delimiter
* Transform byte fragments into complete phrases using given delimiter. Not thread safe.
*/
public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> = flow {
public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSize: Int = 32): Flow<ByteArray> {
require(delimiter.isNotEmpty()) { "Delimiter must not be empty" }
var output = ByteArrayOutput(expectedMessageSize)
var matcherPosition = 0
collect { chunk ->
return transform { chunk ->
chunk.forEach { byte ->
output.writeByte(byte)
//matching current symbol in delimiter
@ -39,7 +38,7 @@ public fun Flow<ByteArray>.withDelimiter(delimiter: ByteArray, expectedMessageSi
* Transform byte fragments into utf-8 phrases using utf-8 delimiter
*/
public fun Flow<ByteArray>.withDelimiter(delimiter: String, expectedMessageSize: Int = 32): Flow<String> {
return withDelimiter(delimiter.encodeToByteArray(),expectedMessageSize).map { it.decodeToString() }
return withDelimiter(delimiter.encodeToByteArray(), expectedMessageSize).map { it.decodeToString() }
}
/**

View File

@ -34,7 +34,7 @@ public fun <T : Any> ReadOnlyDeviceProperty.convert(
metaConverter: MetaConverter<T>,
forceRead: Boolean,
): ReadOnlyProperty<Any?, T> {
return ReadOnlyProperty { thisRef, property ->
return ReadOnlyProperty { _, _ ->
runBlocking(scope.coroutineContext) {
read(forceRead).let { metaConverter.itemToObject(it) }
}

View File

@ -62,7 +62,11 @@ public class TcpPort private constructor(
override fun close() {
listenerJob.cancel()
futureChannel.cancel()
if(futureChannel.isCompleted){
futureChannel.getCompleted().close()
} else {
futureChannel.cancel()
}
super.close()
}

View File

@ -11,8 +11,8 @@ val dataforgeVersion: String by rootProject.extra
val ktorVersion: String by rootProject.extra
dependencies{
implementation(project(":ktor-sse"))
implementation(project(":dataforge-device-core"))
implementation(project(":dataforge-device-tcp"))
implementation("io.ktor:ktor-server-cio:$ktorVersion")
implementation("io.ktor:ktor-websockets:$ktorVersion")
implementation("io.ktor:ktor-serialization:$ktorVersion")

View File

@ -1,5 +1,7 @@
package hep.dataforge.control.server
import hep.dataforge.control.sse.SseEvent
import hep.dataforge.control.sse.writeSseFlow
import io.ktor.application.ApplicationCall
import io.ktor.http.CacheControl
import io.ktor.http.ContentType
@ -8,8 +10,6 @@ import io.ktor.response.respondBytesWriter
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import ru.mipt.npm.io.sse.SseEvent
import ru.mipt.npm.io.sse.writeSseFlow
/**
* Method that responds an [ApplicationCall] by reading all the [SseEvent]s from the specified [events] [ReceiveChannel]

View File

@ -2,7 +2,6 @@ plugins {
id("ru.mipt.npm.mpp")
}
group = "ru.mipt.npm"
val ktorVersion: String by rootProject.extra
@ -14,7 +13,8 @@ kotlin {
sourceSets {
commonMain {
dependencies {
api("io.ktor:ktor-io:$ktorVersion")
api(project(":dataforge-device-core"))
api("io.ktor:ktor-network:$ktorVersion")
}
}
jvmTest{

View File

@ -1,4 +1,4 @@
package ru.mipt.npm.io.sse
package hep.dataforge.control.sse
import io.ktor.utils.io.*
import kotlinx.coroutines.ExperimentalCoroutinesApi

View File

@ -11,6 +11,7 @@ import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.util.KtorExperimentalAPI
import io.ktor.utils.io.consumeEachBufferRange
import io.ktor.utils.io.core.Closeable
import io.ktor.utils.io.writeAvailable
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
@ -24,7 +25,7 @@ public class KtorTcpPort internal constructor(
public val host: String,
public val port: Int,
coroutineContext: CoroutineContext = context.coroutineContext,
) : AbstractPort(context, coroutineContext), AutoCloseable {
) : AbstractPort(context, coroutineContext), Closeable {
override fun toString(): String = "port[tcp:$host:$port]"

View File

@ -1,4 +1,4 @@
package ru.mipt.npm.io.sse
package hep.dataforge.control.sse
import io.ktor.application.ApplicationCall
import io.ktor.application.call

View File

@ -10,7 +10,7 @@ kotlin {
commonMain {
dependencies {
implementation(project(":dataforge-device-core"))
implementation(project(":ktor-sse"))
implementation(project(":dataforge-device-tcp"))
implementation("io.ktor:ktor-client-core:$ktorVersion")
}
}

View File

@ -3,6 +3,8 @@ package hep.dataforge.control.client
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.DeviceMessage
import hep.dataforge.control.controllers.respondMessage
import hep.dataforge.control.sse.SseEvent
import hep.dataforge.control.sse.readSseFlow
import hep.dataforge.meta.toJson
import hep.dataforge.meta.toMeta
import hep.dataforge.meta.wrap
@ -21,8 +23,6 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.serialization.json.*
import ru.mipt.npm.io.sse.SseEvent
import ru.mipt.npm.io.sse.readSseFlow
import kotlin.coroutines.CoroutineContext

View File

@ -15,7 +15,7 @@ kotlin{
val ktorVersion: String by rootProject.extra
dependencies {
implementation(project(":dataforge-device-core"))
implementation(project(":dataforge-device-tcp"))
implementation(project(":dataforge-magix-client"))
implementation("no.tornado:tornadofx:1.7.20")
}

View File

@ -3,10 +3,15 @@ package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.context.Global
import hep.dataforge.control.controllers.DeviceManager
import hep.dataforge.control.controllers.installing
import javafx.beans.property.SimpleBooleanProperty
import javafx.beans.property.ReadOnlyProperty
import javafx.beans.property.SimpleIntegerProperty
import javafx.beans.property.SimpleObjectProperty
import javafx.beans.property.SimpleStringProperty
import javafx.collections.FXCollections
import javafx.scene.Parent
import javafx.scene.layout.Priority
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import tornadofx.*
class PiMotionMasterApp : App(PiMotionMasterView::class)
@ -25,33 +30,98 @@ class PiMotionMasterController : Controller() {
class PiMotionMasterView : View() {
private val controller: PiMotionMasterController by inject()
val device = controller.motionMaster
private val connectedProperty: ReadOnlyProperty<Boolean> = device.connected.fxProperty(device)
private val debugServerJobProperty = SimpleObjectProperty<Job>()
private val debugServerStarted = debugServerJobProperty.booleanBinding { it != null }
private val axisList = FXCollections.observableArrayList<Map.Entry<String, PiMotionMasterDevice.Axis>>()
override val root: Parent = borderpane {
top {
form {
val host = SimpleStringProperty("127.0.0.1")
val port = SimpleIntegerProperty(10024)
val virtual = SimpleBooleanProperty(false)
fieldset("Address:") {
field("Host:") {
textfield(host){
enableWhen(virtual.not())
textfield(host) {
enableWhen(debugServerStarted.not())
}
}
field("Port:") {
textfield(port)
}
field("Virtual device:") {
checkbox(property = virtual)
textfield(port){
stripNonNumeric()
}
button {
hgrow = Priority.ALWAYS
textProperty().bind(debugServerStarted.stringBinding {
if (it != true) {
"Start debug server"
} else {
"Stop debug server"
}
})
action {
if (!debugServerStarted.get()) {
debugServerJobProperty.value =
controller.context.launchPiDebugServer(port.get(), listOf("1", "2"))
} else {
debugServerJobProperty.get().cancel()
debugServerJobProperty.value = null
}
}
}
}
}
button("Connect") {
action {
if(virtual.get()){
controller.context.launchPiDebugServer(port.get(), listOf("1", "2"))
button {
hgrow = Priority.ALWAYS
textProperty().bind(connectedProperty.stringBinding {
if (it == false) {
"Connect"
} else {
"Disconnect"
}
})
action {
if (!connectedProperty.value) {
device.connect(host.get(), port.get())
axisList.addAll(device.axes.entries)
} else {
axisList.removeAll()
device.disconnect()
}
}
}
}
}
center {
listview(axisList) {
cellFormat { (name, axis) ->
hbox {
minHeight = 40.0
label(name)
controller.context.launch {
val min = axis.minPosition.readTyped(true)
val max = axis.maxPosition.readTyped(true)
runLater {
slider(min.toDouble()..max.toDouble()){
hgrow = Priority.ALWAYS
valueProperty().onChange {
isDisable = true
launch {
axis.move(value)
runLater {
isDisable = false
}
}
}
}
}
}
controller.motionMaster.connect(host.get(), port.get())
}
}
}

View File

@ -22,19 +22,68 @@ import kotlin.time.Duration
class PiMotionMasterDevice(
context: Context,
private val portFactory: PortFactory = TcpPort,
private val portFactory: PortFactory = KtorTcpPort,
) : DeviceBase(context), DeviceHub {
override val scope: CoroutineScope = CoroutineScope(
context.coroutineContext + SupervisorJob(context.coroutineContext[Job])
)
private var address: Meta? = null
private var port: Port? = null
//TODO make proxy work
//PortProxy { portFactory(address ?: error("The device is not connected"), context) }
val connected by readingBoolean(false, descriptorBuilder = {
info = "True if the connection address is defined and the device is initialized"
}) {
address != null
port != null
}
val connect: DeviceAction by acting({
info = "Connect to specific port and initialize axis"
}) { portSpec ->
//Clear current actions if present
if (port != null) {
disconnect()
}
//Update port
//address = portSpec.node
port = portFactory(portSpec.node!!, context)
connected.updateLogical(true)
// connector.open()
//Initialize axes
if (portSpec != null) {
val idn = identity.read()
failIfError { "Can't connect to $portSpec. Error code: $it" }
logger.info { "Connected to $idn on $portSpec" }
val ids = request("SAI?").map { it.trim() }
if (ids != axes.keys.toList()) {
//re-define axes if needed
axes = ids.associateWith { Axis(it) }
}
ids.map { it.asValue() }.asValue().asMetaItem()
initialize()
failIfError()
}
}
val disconnect: DeviceAction by acting({
info = "Disconnect the program from the device if it is connected"
}) {
if (port != null) {
stop()
port?.close()
}
port = null
connected.updateLogical(false)
}
fun disconnect() {
runBlocking{
disconnect.invoke()
}
}
val timeout: DeviceProperty by writingVirtual(200.asValue()) {
@ -43,8 +92,6 @@ class PiMotionMasterDevice(
var timeoutValue: Duration by timeout.duration()
private val connector = PortProxy { portFactory(address ?: error("The device is not connected"), context) }
/**
* Name-friendly accessor for axis
*/
@ -58,46 +105,8 @@ class PiMotionMasterDevice(
if (errorCode != 0) error(message(errorCode))
}
val connect: DeviceAction by acting({
info = "Connect to specific port and initialize axis"
}) { portSpec ->
//Clear current actions if present
if (address != null) {
stop()
}
//Update port
address = portSpec.node
connected.invalidate()
connector.open()
//Initialize axes
if (portSpec != null) {
val idn = identity.read()
failIfError { "Can't connect to $portSpec. Error code: $it" }
logger.info { "Connected to $idn on $portSpec" }
val ids = request("SAI?")
if (ids != axes.keys.toList()) {
//re-define axes if needed
axes = ids.associateWith { Axis(it) }
}
ids.map { it.asValue() }.asValue().asMetaItem()
initialize()
failIfError()
}
}
val disconnect: DeviceAction by acting({
info = "Disconnect the program from the device if it is connected"
}) {
connector.close()
if (address != null) {
stop()
}
address = null
connected.invalidate()
}
fun connect(host: String, port: Int) {
scope.launch {
runBlocking {
connect(Meta {
"host" put host
"port" put port
@ -119,14 +128,14 @@ class PiMotionMasterDevice(
arguments.joinToString(prefix = " ", separator = " ", postfix = "")
}
val stringToSend = "$command$joinedArguments\n"
connector.send(stringToSend)
port?.send(stringToSend) ?: error("Not connected to device")
}
suspend fun getErrorCode(): Int = mutex.withLock {
withTimeout(timeoutValue) {
sendCommandInternal("ERR?")
val errorString = connector.receiving().withDelimiter("\n").first()
errorString.toInt()
val errorString = port?.receiving()?.withDelimiter("\n")?.first() ?: error("Not connected to device")
errorString.trim().toInt()
}
}
@ -137,17 +146,12 @@ class PiMotionMasterDevice(
try {
withTimeout(timeoutValue) {
sendCommandInternal(command, *arguments)
val phrases = connector.receiving().withDelimiter("\n")
var lastLineFlag = false
phrases.transformWhile { line ->
if (lastLineFlag) {
false
} else {
emit(line)
lastLineFlag = !line.endsWith(" \n")
true
}
val phrases = port?.receiving()?.withDelimiter("\n") ?: error("Not connected to device")
val list = phrases.transformWhile { line ->
emit(line)
line.endsWith(" \n")
}.toList()
list
}
} catch (ex: Throwable) {
logger.warn { "Error during PIMotionMaster request. Requesting error code." }
@ -273,6 +277,26 @@ class PiMotionMasterDevice(
send("FRF", axisId)
}
val minPosition by readingNumber(
descriptorBuilder = {
info = "Minimal position value for the axis"
},
getter = {
requestAndParse("TMN?", axisId)[axisId]?.toDoubleOrNull()
?: error("Malformed `TMN?` response. Should include float value for $axisId")
}
)
val maxPosition by readingNumber(
descriptorBuilder = {
info = "Maximal position value for the axis"
},
getter = {
requestAndParse("TMX?", axisId)[axisId]?.toDoubleOrNull()
?: error("Malformed `TMX?` response. Should include float value for $axisId")
}
)
val position: TypedDeviceProperty<Double> by axisNumberProperty("POS") {
info = "The current axis position."
}
@ -303,6 +327,10 @@ class PiMotionMasterDevice(
delay(200)
}
}
suspend fun move(target: Double) {
move(target.asMetaItem())
}
}
companion object : DeviceFactory<PiMotionMasterDevice> {

View File

@ -148,6 +148,14 @@ class PiMotionMasterVirtualDevice(
respond(response)
}
private suspend fun doForEachAxis(parts: List<String>, action: suspend (key: String, value: String) -> Unit) {
var i = 0
while (parts.size > 2 * i + 1) {
action(parts[2 * i + 1], parts[2 * i + 2])
i++
}
}
override suspend fun evaluateRequest(request: ByteArray) {
assert(request.last() == '\n'.toByte())
val string = request.decodeToString().substringBefore("\n")
@ -222,7 +230,7 @@ class PiMotionMasterVirtualDevice(
respond(errorCode.toString())
errorCode = 0
}
"SAI?" -> respond(axisIds.joinToString(separator = " \n"))
"SAI?" -> respond(axisState.keys.joinToString(separator = " \n"))
"CST?" -> respondForAllAxis(axisIds) { "L-220.20SG" }
"RON?" -> respondForAllAxis(axisIds) { referenceMode }
"FRF?" -> respondForAllAxis(axisIds) { "1" } // WAT?
@ -233,10 +241,17 @@ class PiMotionMasterVirtualDevice(
"TMX?" -> respondForAllAxis(axisIds) { maxPosition }
"VEL?" -> respondForAllAxis(axisIds) { velocity }
"SRG?" -> respond(WAT)
"SVO" -> {
val requestAxis = parts[1]
val servoMode = parts.last()
axisState[requestAxis]?.servoMode = servoMode.toInt()
"SVO" -> doForEachAxis(parts) { key, value ->
axisState[key]?.servoMode = value.toInt()
}
"MOV" -> doForEachAxis(parts) { key, value ->
axisState[key]?.targetPosition = value.toDouble()
}
"VEL"-> doForEachAxis(parts){key, value ->
axisState[key]?.velocity = value.toDouble()
}
"INI" -> {
logger.info { "Axes initialized!" }
}
else -> {
logger.warn { "Unknown command: $command in message ${String(request)}" }

View File

@ -0,0 +1,56 @@
package ru.mipt.npm.devices.pimotionmaster
import hep.dataforge.control.api.Device
import hep.dataforge.control.base.TypedDeviceProperty
import hep.dataforge.control.base.TypedReadOnlyDeviceProperty
import javafx.beans.property.*
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import tornadofx.*
fun <T : Any> TypedReadOnlyDeviceProperty<T>.fxProperty(ownerDevice: Device?): ReadOnlyProperty<T> =
object : ObjectPropertyBase<T>() {
override fun getBean(): Any? = ownerDevice
override fun getName(): String = this@fxProperty.name
init {
//Read incoming changes
flowTyped().onEach {
if (it != null) {
runLater {
set(it)
}
} else {
invalidated()
}
}.catch {
ownerDevice?.logger?.info { "Failed to set property $name to $it" }
}.launchIn(scope)
}
}
fun <T : Any> TypedDeviceProperty<T>.fxProperty(ownerDevice: Device?): Property<T> =
object : ObjectPropertyBase<T>() {
override fun getBean(): Any? = ownerDevice
override fun getName(): String = this@fxProperty.name
init {
//Read incoming changes
flowTyped().onEach {
if (it != null) {
runLater {
set(it)
}
} else {
invalidated()
}
}.catch {
ownerDevice?.logger?.info { "Failed to set property $name to $it" }
}.launchIn(scope)
onChange {
typedValue = it
}
}
}

View File

@ -3,7 +3,7 @@ pluginManagement {
val toolsVersion = "0.6.3-dev-1.4.20-M1"
repositories {
mavenLocal()
//mavenLocal()
jcenter()
gradlePluginPortal()
maven("https://kotlin.bintray.com/kotlinx")
@ -27,8 +27,8 @@ pluginManagement {
rootProject.name = "dataforge-control"
include(
":ktor-sse",
":dataforge-device-core",
":dataforge-device-tcp",
":dataforge-device-serial",
":dataforge-device-server",
":dataforge-magix-client",