Control update

This commit is contained in:
Alexander Nozik 2018-03-09 10:09:55 +03:00
parent 069c5422e0
commit 0fe0c1f1e1
7 changed files with 197 additions and 262 deletions

View File

@ -223,7 +223,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor<PKT8Result>(context,
} }
override fun startMeasurement(oldMeta: Meta, newMeta: Meta) { fun setMeasurement(oldMeta: Meta, newMeta: Meta) {
if (!oldMeta.isEmpty) { if (!oldMeta.isEmpty) {
logger.warn("Trying to start measurement which is already started") logger.warn("Trying to start measurement which is already started")
} }
@ -317,7 +317,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor<PKT8Result>(context,
// } // }
// //
// @Throws(MeasurementException::class) // @Throws(MeasurementException::class)
// override fun startMeasurement(): Measurement<PKT8Result> { // override fun setMeasurement(): Measurement<PKT8Result> {
// //clearing PKT queue // //clearing PKT queue
// try { // try {
// send("p") // send("p")
@ -327,7 +327,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor<PKT8Result>(context,
// // throw new MeasurementException(e); // // throw new MeasurementException(e);
// } // }
// //
// return super.startMeasurement() // return super.setMeasurement()
// } // }

View File

@ -1,6 +1,7 @@
package inr.numass.control package inr.numass.control
import hep.dataforge.control.devices.AbstractDevice import hep.dataforge.control.devices.AbstractDevice
import hep.dataforge.kodex.nullable
import hep.dataforge.storage.api.TableLoader import hep.dataforge.storage.api.TableLoader
import hep.dataforge.storage.commons.StorageConnection import hep.dataforge.storage.commons.StorageConnection
import hep.dataforge.values.Values import hep.dataforge.values.Values
@ -14,7 +15,7 @@ class StorageHelper(private val device: AbstractDevice, private val loaderFactor
private val loaderMap = HashMap<StorageConnection, TableLoader>() private val loaderMap = HashMap<StorageConnection, TableLoader>()
fun push(point: Values) { fun push(point: Values) {
if (!device.hasState("storing") || device.getState("storing").booleanValue()) { if (device.optBooleanState("storing").nullable == true) {
device.forEachConnection("storage", StorageConnection::class.java) { connection -> device.forEachConnection("storage", StorageConnection::class.java) { connection ->
try { try {
val pl = loaderMap.computeIfAbsent(connection, loaderFactory) val pl = loaderMap.computeIfAbsent(connection, loaderFactory)

View File

@ -6,10 +6,7 @@
package inr.numass.control.readvac package inr.numass.control.readvac
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.control.devices.Device
import hep.dataforge.control.devices.PortSensor import hep.dataforge.control.devices.PortSensor
import hep.dataforge.control.measurements.Measurement
import hep.dataforge.control.measurements.SimpleMeasurement
import hep.dataforge.control.ports.ComPort import hep.dataforge.control.ports.ComPort
import hep.dataforge.control.ports.GenericPortController import hep.dataforge.control.ports.GenericPortController
import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.Port
@ -31,51 +28,36 @@ class CM32Device(context: Context, meta: Meta) : PortSensor<Double>(context, met
} else { } else {
PortFactory.build(meta) PortFactory.build(meta)
} }
return GenericPortController(context, port){it.endsWith("T--\r")} return GenericPortController(context, port) { it.endsWith("T--\r") }
} }
override fun setMeasurement(oldMeta: Meta?, newMeta: Meta) {
override fun startMeasurement(oldMeta: Meta, newMeta: Meta) { startMeasurement {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. doMeasure()
}
} }
override fun createMeasurement(): Measurement<Double> = CMVacMeasurement() private fun doMeasure(): Meta{
override fun getType(): String {
return getMeta().getString("type", "Leibold CM32")
}
private inner class CMVacMeasurement : SimpleMeasurement<Double>() {
@Synchronized
@Throws(Exception::class)
override fun doMeasure(): Double? {
val answer = sendAndWait("MES R PM 1\r\n") val answer = sendAndWait("MES R PM 1\r\n")
if (answer.isEmpty()) { return if (answer.isEmpty()) {
this.updateMessage("No signal")
updateLogicalState(PortSensor.CONNECTED_STATE, false) updateLogicalState(PortSensor.CONNECTED_STATE, false)
return null produceError("No signal")
} else if (!answer.contains("PM1:mbar")) { } else if (!answer.contains("PM1:mbar")) {
this.updateMessage("Wrong answer: " + answer)
updateLogicalState(PortSensor.CONNECTED_STATE, false) updateLogicalState(PortSensor.CONNECTED_STATE, false)
return null produceError("Wrong answer: $answer")
} else if (answer.substring(14, 17) == "OFF") { } else if (answer.substring(14, 17) == "OFF") {
this.updateMessage("Off")
updateLogicalState(PortSensor.CONNECTED_STATE, true) updateLogicalState(PortSensor.CONNECTED_STATE, true)
return null produceError("Off")
} else { } else {
this.updateMessage("OK")
updateLogicalState(PortSensor.CONNECTED_STATE, true) updateLogicalState(PortSensor.CONNECTED_STATE, true)
return java.lang.Double.parseDouble(answer.substring(14, 17) + answer.substring(19, 23)) produceResult(answer.substring(14, 17) + answer.substring(19, 23))
} }
} }
override fun getDevice(): Device = this@CM32Device override fun getType(): String {
return meta.getString("type", "numass.vac.cm32")
} }
} }

View File

@ -6,14 +6,15 @@
package inr.numass.control.readvac package inr.numass.control.readvac
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.control.devices.Device
import hep.dataforge.control.devices.PortSensor import hep.dataforge.control.devices.PortSensor
import hep.dataforge.control.measurements.Measurement import hep.dataforge.control.devices.intState
import hep.dataforge.control.measurements.SimpleMeasurement import hep.dataforge.control.ports.GenericPortController
import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortFactory
import hep.dataforge.description.ValueDef import hep.dataforge.description.ValueDef
import hep.dataforge.exceptions.ControlException
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.states.StateDef
import hep.dataforge.values.ValueType
import inr.numass.control.DeviceView import inr.numass.control.DeviceView
/** /**
@ -21,52 +22,41 @@ import inr.numass.control.DeviceView
*/ */
@ValueDef(name = "channel") @ValueDef(name = "channel")
@DeviceView(VacDisplay::class) @DeviceView(VacDisplay::class)
@StateDef(value = ValueDef(name = "channel", type = [ValueType.NUMBER], def = "2"), writable = true)
class MKSBaratronDevice(context: Context, meta: Meta) : PortSensor<Double>(context, meta) { class MKSBaratronDevice(context: Context, meta: Meta) : PortSensor<Double>(context, meta) {
private val channel: Int = getMeta().getInt("channel", 2) var channel by intState("channel")
override fun createMeasurement(): Measurement<Double> = BaratronMeasurement()
override fun getType(): String { override fun getType(): String {
return getMeta().getString("type", "MKS baratron") return meta.getString("type", "numass.vac.baratron")
} }
@Throws(ControlException::class) override fun connect(meta: Meta): GenericPortController {
override fun buildPort(portName: String): Port { val port: Port = PortFactory.build(meta)
val handler = super.buildPort(portName) logger.info("Connecting to port {}", port.name)
handler.setDelimiter("\r") return GenericPortController(context, port) { it.endsWith("\r") }
return handler
} }
private inner class BaratronMeasurement : SimpleMeasurement<Double>() { override fun setMeasurement(oldMeta: Meta?, newMeta: Meta) {
startMeasurement(newMeta) {
override fun getDevice(): Device { doMeasure()
return this@MKSBaratronDevice }
} }
@Synchronized private fun doMeasure(): Meta {
@Throws(Exception::class) val answer = sendAndWait("AV$channel\r")
override fun doMeasure(): Double? { if (answer.isEmpty()) {
val answer = sendAndWait("AV" + channel + "\r")
if (answer == null || answer.isEmpty()) {
// invalidateState("connection"); // invalidateState("connection");
updateLogicalState(PortSensor.CONNECTED_STATE, false) updateLogicalState(PortSensor.CONNECTED_STATE, false)
this.updateMessage("No connection") return produceError("No connection")
return null
} else { } else {
updateLogicalState(PortSensor.CONNECTED_STATE, true) updateLogicalState(PortSensor.CONNECTED_STATE, true)
} }
val res = java.lang.Double.parseDouble(answer) val res = java.lang.Double.parseDouble(answer)
if (res <= 0) { return if (res <= 0) {
this.updateMessage("Non positive") produceError("Non positive")
// invalidateState("power");
return null
} else { } else {
this.updateMessage("OK") produceResult(res)
return res
} }
} }
}
} }

View File

@ -6,11 +6,11 @@
package inr.numass.control.readvac package inr.numass.control.readvac
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.control.devices.Device
import hep.dataforge.control.devices.PortSensor import hep.dataforge.control.devices.PortSensor
import hep.dataforge.control.measurements.Measurement import hep.dataforge.control.devices.booleanState
import hep.dataforge.control.measurements.SimpleMeasurement import hep.dataforge.control.ports.GenericPortController
import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortFactory
import hep.dataforge.description.ValueDef import hep.dataforge.description.ValueDef
import hep.dataforge.description.ValueDefs import hep.dataforge.description.ValueDefs
import hep.dataforge.exceptions.ControlException import hep.dataforge.exceptions.ControlException
@ -40,13 +40,7 @@ class MKSVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, m
private val deviceAddress: String = meta.getString("address", "253") private val deviceAddress: String = meta.getString("address", "253")
var power: Boolean by booleanState("power")
// val isPowerOnProperty = SimpleBooleanProperty()
// var isPowerOn by isPowerOnProperty
// val channelProperty = SimpleIntegerProperty(meta.getInt("channel", 5))
// var channel by channelProperty
@Throws(ControlException::class) @Throws(ControlException::class)
@ -61,15 +55,12 @@ class MKSVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, m
} }
} }
@Throws(ControlException::class) override fun connect(meta: Meta): GenericPortController {
override fun buildPort(portName: String): Port { val port: Port = PortFactory.build(meta)
val handler = super.buildPort(portName) logger.info("Connecting to port {}", port.name)
handler.setDelimiter(";FF") return GenericPortController(context, port) { it.endsWith(";FF") }
return handler
} }
override fun createMeasurement(): Measurement<Double> = MKSVacMeasurement()
@Throws(ControlException::class) @Throws(ControlException::class)
override fun computeState(stateName: String): Any = when (stateName) { override fun computeState(stateName: String): Any = when (stateName) {
"power" -> talk("FP?") == "ON" "power" -> talk("FP?") == "ON"
@ -79,7 +70,7 @@ class MKSVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, m
@Throws(ControlException::class) @Throws(ControlException::class)
override fun requestStateChange(stateName: String, value: Value) { override fun requestStateChange(stateName: String, value: Value) {
when (stateName) { when (stateName) {
"power" -> setPower(value.booleanValue()) "power" -> setPowerOn(value.booleanValue())
else -> super.requestStateChange(stateName, value) else -> super.requestStateChange(stateName, value)
} }
@ -93,18 +84,8 @@ class MKSVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, m
super.shutdown() super.shutdown()
} }
// fun powerOnProperty(): BooleanProperty { private fun setPowerOn(powerOn: Boolean) {
// try { if (powerOn != power) {
// return JavaBeanBooleanPropertyBuilder().bean(this)
// .name("powerOn").getter("isPowerOn").setter("setPower").build()
// } catch (ex: NoSuchMethodException) {
// throw Error(ex)
// }
//
// }
private fun setPower(powerOn: Boolean) {
if (powerOn != getLogicalState("power").booleanValue()) {
if (powerOn) { if (powerOn) {
// String ans = talkMKS(p1Port, "@253ENC!OFF;FF"); // String ans = talkMKS(p1Port, "@253ENC!OFF;FF");
// if (!ans.equals("OFF")) { // if (!ans.equals("OFF")) {
@ -112,47 +93,44 @@ class MKSVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, m
// } // }
val ans = talk("FP!ON") val ans = talk("FP!ON")
if (ans == "ON") { if (ans == "ON") {
setLogicalState("power", true) updateLogicalState("power", true)
} else { } else {
this.notifyError("Failed to set power state", null) this.notifyError("Failed to set power state")
} }
} else { } else {
val ans = talk("FP!OFF") val ans = talk("FP!OFF")
if (ans == "OFF") { if (ans == "OFF") {
setLogicalState("power", false) updateLogicalState("power", false)
} else { } else {
this.notifyError("Failed to set power state", null) this.notifyError("Failed to set power state")
} }
} }
} }
} }
override fun getType(): String = meta.getString("type", "MKS vacuumeter") override fun getType(): String = meta.getString("type", "numass.vac.mks")
private inner class MKSVacMeasurement : SimpleMeasurement<Double>() { override fun setMeasurement(oldMeta: Meta?, newMeta: Meta) {
startMeasurement(newMeta) {
doMeasure()
}
}
@Synchronized private fun doMeasure(): Meta {
@Throws(Exception::class)
override fun doMeasure(): Double? {
// if (getState("power").booleanValue()) { // if (getState("power").booleanValue()) {
val channel = meta.getInt("channel", 5) val channel = meta.getInt("channel", 5)
val answer = talk("PR$channel?") val answer = talk("PR$channel?")
if (answer == null || answer.isEmpty()) { if (answer == null || answer.isEmpty()) {
updateLogicalState(PortSensor.CONNECTED_STATE, false) updateLogicalState(PortSensor.CONNECTED_STATE, false)
this.updateMessage("No connection") return produceError("No connection")
return null
} }
val res = parseDouble(answer) val res = parseDouble(answer)
return if (res <= 0) { return if (res <= 0) {
this.updateMessage("No power")
updateLogicalState("power", false) updateLogicalState("power", false)
null produceError("No power")
} else { } else {
this.updateMessage("OK") this.updateMessage("OK")
res produceResult(res)
} }
} }
override fun getDevice(): Device = this@MKSVacDevice
}
} }

View File

@ -6,14 +6,14 @@
package inr.numass.control.readvac package inr.numass.control.readvac
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.control.devices.Device
import hep.dataforge.control.devices.PortSensor import hep.dataforge.control.devices.PortSensor
import hep.dataforge.control.measurements.Measurement import hep.dataforge.control.devices.intState
import hep.dataforge.control.measurements.SimpleMeasurement import hep.dataforge.control.ports.GenericPortController
import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortFactory
import hep.dataforge.description.ValueDef import hep.dataforge.description.ValueDef
import hep.dataforge.exceptions.ControlException
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.states.StateDef
import hep.dataforge.values.ValueType.NUMBER import hep.dataforge.values.ValueType.NUMBER
import java.math.BigDecimal import java.math.BigDecimal
import java.math.BigInteger import java.math.BigInteger
@ -23,50 +23,44 @@ import java.util.regex.Pattern
/** /**
* @author Alexander Nozik * @author Alexander Nozik
*/ */
@ValueDef(name = "address", type = arrayOf(NUMBER), def = "1", info = "A modbus address") @StateDef(value = ValueDef(name = "address", type = [NUMBER], def = "1", info = "A modbus address"), writable = true)
class MeradatVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, meta) { class MeradatVacDevice(context: Context, meta: Meta) : PortSensor<Double>(context, meta) {
@Throws(ControlException::class) var address by intState("address")
override fun buildPort(portName: String): Port {
val newHandler = super.buildPort(portName)
newHandler.setDelimiter("\r\n")
return newHandler
}
override fun createMeasurement(): Measurement<Double> = MeradatMeasurement() override fun connect(meta: Meta): GenericPortController {
val port: Port = PortFactory.build(meta)
logger.info("Connecting to port {}", port.name)
return GenericPortController(context, port) { it.endsWith("\r\n") }
}
override fun getType(): String { override fun getType(): String {
return getMeta().getString("type", "Vit vacuumeter") return meta.getString("type", "numass.vac.vit")
}
override fun setMeasurement(oldMeta: Meta?, newMeta: Meta) {
startMeasurement(newMeta) {
doMeasure()
}
} }
private inner class MeradatMeasurement : SimpleMeasurement<Double>() { private fun doMeasure(): Meta {
val requestBase: String = String.format(":%02d", address)
val dataStr = requestBase.substring(1) + REQUEST
val query = requestBase + REQUEST + calculateLRC(dataStr) + "\r\n" // ":010300000002FA\r\n";
val response: Pattern = Pattern.compile(requestBase + "0304(\\w{4})(\\w{4})..\r\n")
private val query: String // ":010300000002FA\r\n"; val answer = sendAndWait(query) { phrase -> phrase.startsWith(requestBase) }
private val response: Pattern
private val base: String
init {
base = String.format(":%02d", getMeta().getInt("address", 1))
val dataStr = base.substring(1) + REQUEST
query = base + REQUEST + calculateLRC(dataStr) + "\r\n"
response = Pattern.compile(base + "0304(\\w{4})(\\w{4})..\r\n")
}
@Synchronized
@Throws(Exception::class)
override fun doMeasure(): Double? {
val answer = sendAndWait(query) { phrase -> phrase.startsWith(base) }
if (answer.isEmpty()) { if (answer.isEmpty()) {
this.updateMessage("No signal")
updateLogicalState(PortSensor.CONNECTED_STATE, false) updateLogicalState(PortSensor.CONNECTED_STATE, false)
return null return produceError("No signal")
} else { } else {
val match = response.matcher(answer) val match = response.matcher(answer)
if (match.matches()) { return if (match.matches()) {
val base = Integer.parseInt(match.group(1), 16).toDouble() / 10.0 val base = Integer.parseInt(match.group(1), 16).toDouble() / 10.0
var exp = Integer.parseInt(match.group(2), 16) var exp = Integer.parseInt(match.group(2), 16)
if (exp > 32766) { if (exp > 32766) {
@ -74,22 +68,18 @@ class MeradatVacDevice(context: Context, meta: Meta) : PortSensor<Double>(contex
} }
var res = BigDecimal.valueOf(base * Math.pow(10.0, exp.toDouble())) var res = BigDecimal.valueOf(base * Math.pow(10.0, exp.toDouble()))
res = res.setScale(4, RoundingMode.CEILING) res = res.setScale(4, RoundingMode.CEILING)
this.updateMessage("OK")
updateLogicalState(PortSensor.CONNECTED_STATE, true) updateLogicalState(PortSensor.CONNECTED_STATE, true)
return res.toDouble() produceResult(res)
} else { } else {
this.updateMessage("Wrong answer: " + answer)
updateLogicalState(PortSensor.CONNECTED_STATE, false) updateLogicalState(PortSensor.CONNECTED_STATE, false)
return null produceError("Wrong answer: $answer")
} }
} }
} }
override fun getDevice(): Device = this@MeradatVacDevice
}
companion object { companion object {
private val REQUEST = "0300000002" private const val REQUEST = "0300000002"
fun calculateLRC(inputString: String): String { fun calculateLRC(inputString: String): String {
/* /*
@ -100,7 +90,7 @@ class MeradatVacDevice(context: Context, meta: Meta) : PortSensor<Double>(contex
var value = Integer.toHexString(-checksum) var value = Integer.toHexString(-checksum)
value = value.substring(value.length - 2).toUpperCase() value = value.substring(value.length - 2).toUpperCase()
if (value.length < 2) { if (value.length < 2) {
value = "0" + value value = "0$value"
} }
return value return value

View File

@ -12,9 +12,9 @@ import hep.dataforge.control.collectors.RegularPointCollector
import hep.dataforge.control.connections.Roles import hep.dataforge.control.connections.Roles
import hep.dataforge.control.devices.Device import hep.dataforge.control.devices.Device
import hep.dataforge.control.devices.DeviceHub import hep.dataforge.control.devices.DeviceHub
import hep.dataforge.control.devices.DeviceListener
import hep.dataforge.control.devices.PortSensor.Companion.CONNECTED_STATE
import hep.dataforge.control.devices.Sensor import hep.dataforge.control.devices.Sensor
import hep.dataforge.control.measurements.AbstractMeasurement
import hep.dataforge.control.measurements.Measurement
import hep.dataforge.description.ValueDef import hep.dataforge.description.ValueDef
import hep.dataforge.exceptions.ControlException import hep.dataforge.exceptions.ControlException
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
@ -31,13 +31,10 @@ import hep.dataforge.values.ValueType
import hep.dataforge.values.Values import hep.dataforge.values.Values
import inr.numass.control.DeviceView import inr.numass.control.DeviceView
import inr.numass.control.StorageHelper import inr.numass.control.StorageHelper
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.time.delay
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.stream.Stream import java.util.stream.Stream
/** /**
@ -50,8 +47,22 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
private val helper = StorageHelper(this, this::buildLoader) private val helper = StorageHelper(this, this::buildLoader)
private val averagingDuration: Duration private val collector = object : DeviceListener {
get() = Duration.parse(getMeta().getString("averagingDuration", "PT30S")) val averagingDuration: Duration = Duration.parse(meta.getString("averagingDuration", "PT30S"))
private val collector = RegularPointCollector(averagingDuration) {
notifyResult(it)
}
override fun notifyDeviceStateChanged(device: Device, name: String, state: Value) {
}
override fun notifyDeviceStateChanged(device: Device, name: String, state: Meta) {
if (name == MEASUREMENT_RESULT_STATE) {
collector.put(device.name, state.getValue("value"))
}
}
}
override fun optDevice(name: Name): Optional<Device> = override fun optDevice(name: Name): Optional<Device> =
@ -66,8 +77,6 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
s.init() s.init()
} }
} }
//TODO use meta
override fun createMeasurement(): Measurement<Values> = VacuumMeasurement()
override fun getType(): String = "numass.vac.collector" override fun getType(): String = "numass.vac.collector"
@ -87,7 +96,7 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
val suffix = DateTimeUtils.fileSuffix() val suffix = DateTimeUtils.fileSuffix()
return LoaderFactory.buildPointLoader(connection.storage, "vactms_" + suffix, "", "timestamp", format.build()) return LoaderFactory.buildPointLoader(connection.storage, "vactms_$suffix", "", "timestamp", format.build())
} }
override fun connectAll(connection: Connection, vararg roles: String) { override fun connectAll(connection: Connection, vararg roles: String) {
@ -100,61 +109,46 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
this.sensors.forEach { it.connectionHelper.connect(context, meta) } this.sensors.forEach { it.connectionHelper.connect(context, meta) }
} }
private inner class VacuumMeasurement : AbstractMeasurement<Values>() {
private val collector = RegularPointCollector(averagingDuration) { this.result(it) } private fun notifyResult(values: Values) {
private var executor: ScheduledExecutorService? = null notifyResult(produceResult(values.toMeta()))
private var currentTask: ScheduledFuture<*>? = null helper.push(values)
override fun getDevice(): Device {
return this@VacCollectorDevice
} }
override fun onStateChange(stateName: String, oldState: Value?, newState: Value) {
if (stateName == MEASURING_STATE) {
if (!newState.booleanValue()) {
notifyResult(terminator())
}
}
}
override fun start() { override fun setMeasurement(oldMeta: Meta?, newMeta: Meta) {
executor = Executors.newSingleThreadScheduledExecutor { r: Runnable -> Thread(r, "VacuumMeasurement thread") } oldMeta?.let {
val delay = getMeta().getInt("delay", 5)!! * 1000 stopMeasurement()
currentTask = executor!!.scheduleWithFixedDelay({ }
val interval = Duration.ofSeconds(meta.getInt("delay", 5).toLong())
job = launch {
while (true) {
notifyMeasurementState(MeasurementState.IN_PROGRESS)
sensors.forEach { sensor -> sensors.forEach { sensor ->
try { if (sensor.optBooleanState(CONNECTED_STATE).orElse(false)) {
val value: Any? sensor.measure()
value = if (sensor.optBooleanState(CONNECTED_STATE).orElse(false)) {
sensor.read()
} else {
null
}
collector.put(sensor.name, value)
} catch (ex: Exception) {
collector.put(sensor.name, Value.NULL)
} }
} }
}, 0, delay.toLong(), TimeUnit.MILLISECONDS) notifyMeasurementState(MeasurementState.WAITING)
delay(interval)
}
} }
@Synchronized override fun result(result: Values, time: Instant) {
super.result(result, time)
helper.push(result)
} }
private fun terminator(): Values { private fun terminator(): Values {
val p = ValueMap.Builder() val p = ValueMap.Builder()
p.putValue("timestamp", DateTimeUtils.now()) deviceNames.forEach { n -> p.putValue(n.toUnescaped(), null) }
getDeviceNames().forEach { n -> p.putValue(n.toUnescaped(), null) }
return p.build() return p.build()
} }
override fun stop(force: Boolean): Boolean {
val isRunning = currentTask != null
if (isRunning) {
logger.debug("Stopping vacuum collector measurement. Writing terminator point")
result(terminator())
currentTask!!.cancel(force)
executor!!.shutdown()
currentTask = null
afterStop()
}
return isRunning
}
}
} }