[no commit message]

This commit is contained in:
Alexander Nozik 2016-02-03 20:51:26 +03:00
parent ac7577a430
commit 2503e22b38
7 changed files with 358 additions and 219 deletions

View File

@ -17,21 +17,25 @@ package inr.numass.control.msp;
import hep.dataforge.context.Context;
import hep.dataforge.context.GlobalContext;
import hep.dataforge.control.measurements.DataDevice;
import hep.dataforge.control.devices.SingleMeasurementDevice;
import hep.dataforge.control.measurements.AbstractMeasurement;
import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.ports.PortHandler;
import hep.dataforge.control.ports.TcpPortHandler;
import hep.dataforge.data.DataFormat;
import hep.dataforge.data.DataFormatBuilder;
import hep.dataforge.data.DataPoint;
import hep.dataforge.data.MapDataPoint;
import hep.dataforge.exceptions.ControlException;
import hep.dataforge.exceptions.MeasurementException;
import hep.dataforge.exceptions.PortException;
import hep.dataforge.exceptions.StorageException;
import hep.dataforge.meta.Meta;
import hep.dataforge.storage.api.PointLoader;
import hep.dataforge.storage.api.Storage;
import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.storage.commons.StoragePlugin;
import hep.dataforge.storage.loaders.ChainPointLoader;
import hep.dataforge.values.Value;
import java.time.Instant;
import java.util.ArrayList;
import java.util.LinkedHashMap;
@ -39,13 +43,14 @@ import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ConcurrentSkipListMap;
import org.slf4j.LoggerFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author Alexander Nozik
*/
public class MspDevice extends DataDevice implements PortHandler.PortController {
public class MspDevice extends SingleMeasurementDevice {
// private static final String PEAK_SET_PATH = "peakJump.peak";
private static final int TIMEOUT = 200;
@ -69,9 +74,6 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
private boolean isFilamentOn = false;
// private boolean isScanning = false;
private final Map<Integer, Double> measurement = new ConcurrentSkipListMap<>();
private Map<Integer, String> peakMap;
public MspDevice(String name, Meta annotation) {
super(name, GlobalContext.instance(), annotation);
}
@ -90,46 +92,76 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
handler.setDelimeter("\r\r");
}
@Override
protected Measurement createMeasurement(Meta meta) throws ControlException {
switch (meta.getString("type", "peakJump")) {
case "peakJump":
return new PeakJumpMeasurement(meta);
default:
throw new ControlException("Unknown measurement type");
}
}
@Override
protected Object calculateState(String stateName) throws ControlException {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public String type() {
return "MKS E-Vision";
}
@Override
protected boolean applyState(String stateName, Value stateValue) throws ControlException {
}
/**
* Startup MSP: get available sensors, select sensor and control.
*
* @param measurement
* @throws hep.dataforge.exceptions.PortException
*/
@Override
public void doStart(Meta measurement) throws ControlException {
if (!isControlled) {
if (handler.isLocked()) {
LoggerFactory.getLogger(getClass()).error("Trying to init MSP controller on locked port. Breaking the lock.");
handler.breakHold();
}
handler.holdBy(this);
MspResponse response = sendAndWait("Sensors");
if (response.isOK()) {
this.sensorName = response.get(2, 1);
} else {
error(response.errorDescription(), null);
return;
}
//PENDING определеить в конфиге номер прибора
public void setConnected(boolean connected) throws ControlException {
if (getState("connection").booleanValue() != connected) {
// if (handler.isLocked()) {
// LoggerFactory.getLogger(getClass()).error("Trying to init MSP controller on locked port. Breaking the lock.");
// handler.breakHold();
// }
// handler.holdBy(this);
if (connected) {
MspResponse response = sendAndWait("Sensors");
if (response.isOK()) {
this.sensorName = response.get(2, 1);
} else {
evaluateError(response.errorDescription(), null);
return;
}
//PENDING определеить в конфиге номер прибора
response = sendAndWait("Select", sensorName);
if (response.isOK()) {
this.isSelected = true;
response = sendAndWait("Select", sensorName);
if (response.isOK()) {
this.isSelected = true;
} else {
evaluateError(response.errorDescription(), null);
return;
}
response = sendAndWait("Control", "inr.numass.msp", "1.0");
if (response.isOK()) {
this.isControlled = true;
} else {
evaluateError(response.errorDescription(), null);
}
updateState("", TIMEOUT);
} else {
error(response.errorDescription(), null);
return;
}
response = sendAndWait("Control", "inr.numass.msp", "1.0");
if (response.isOK()) {
this.isControlled = true;
} else {
error(response.errorDescription(), null);
}
}
createPeakJumpMeasurement(buildMeasurementLaminate(measurement).getNode("peakJump"));
this.peakJumpLoader = getPeakJumpLoader(measurement);
// createPeakJumpMeasurement(buildMeasurementLaminate(measurement).getNode("peakJump"));
// this.peakJumpLoader = getPeakJumpLoader(measurement);
}
// public void setStorageConfig(Meta storageConfig, List<String> peaks) throws StorageException {
@ -138,7 +170,6 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
//
// this.peakJumpLoader = LoaderFactory.buildPointLoder(storage, "msp" + suffix, "", "timestamp", DataFormat.forNames(10, peaks));
// }
private PointLoader getPeakJumpLoader(Meta measurement) {
if (peakJumpLoader == null) {
try {
@ -158,13 +189,11 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
DataFormat format = builder.build();
//TODO Переделать!!!
String run = meta().getString("numass.run","");
String run = meta().getString("numass.run", "");
String suffix = Integer.toString((int) Instant.now().toEpochMilli());
peakJumpLoader = LoaderFactory
.buildPointLoder(primaryStorage, "msp" + suffix, run, "timestamp", format);
try {
Storage secondaryStorage = getSecondaryStorage(measurement);
@ -190,6 +219,13 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
this.mspListener = listener;
}
/**
* Send request to the msp
*
* @param command
* @param parameters
* @throws PortException
*/
private void send(String command, Object... parameters) throws PortException {
String request = buildCommand(command, parameters);
if (mspListener != null) {
@ -198,6 +234,13 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
handler.send(request);
}
/**
* A helper method to build msp command string
*
* @param command
* @param parameters
* @return
*/
private String buildCommand(String command, Object... parameters) {
StringBuilder builder = new StringBuilder(command);
for (Object par : parameters) {
@ -208,7 +251,7 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
}
/**
* Send specific command and wait for its results (the result must begin
* Send specific command and wait for its results (the onResult must begin
* with command name)
*
* @param commandName
@ -231,72 +274,6 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
return new MspResponse(response);
}
@Override
public synchronized void accept(String message) {
if (mspListener != null) {
mspListener.acceptMessage(message.trim());
}
MspResponse response = new MspResponse(message);
switch (response.getCommandName()) {
// all possible async messages
case "FilamentStatus":
String status = response.get(0, 2);
isFilamentOn = status.equals("ON");
if (this.mspListener != null) {
this.mspListener.acceptFillamentStateChange(status);
}
break;
case "MassReading":
double mass = Double.parseDouble(response.get(0, 1));
double value = Double.parseDouble(response.get(0, 2));
this.measurement.put((int) Math.floor(mass + 0.5), value);
break;
case "StartingScan":
if (this.mspListener != null && !measurement.isEmpty() && isFilamentOn) {
if (peakMap == null) {
throw new IllegalStateException("Peal map is not initialized");
}
mspListener.acceptMeasurement(measurement);
Instant time = Instant.now();
MapDataPoint point = new MapDataPoint();
point.putValue("timestamp", time);
for (Map.Entry<Integer, Double> entry : measurement.entrySet()) {
double val = entry.getValue();
point.putValue(peakMap.get(entry.getKey()), val);
}
if (peakJumpLoader != null) {
try {
peakJumpLoader.push(point);
} catch (StorageException ex) {
getLogger().error("Push to repo failed", ex);
}
}
}
measurement.clear();
int numScans = Integer.parseInt(response.get(0, 3));
if (numScans == 0) {
try {
send("ScanResume", 2);
//FIXME обработать ошибку связи
} catch (PortException ex) {
error(null, ex);
}
}
break;
}
}
public boolean isSelected() {
return isSelected;
}
@ -328,67 +305,65 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
}
}
/**
* Create measurement with parameters and return its name
*
* @param an
* @return
* @throws hep.dataforge.exceptions.PortException
*/
private void createPeakJumpMeasurement(Meta an) throws ControlException {
String name = "peakJump";//an.getString("measurementNAmname", "default");
String filterMode = an.getString("filterMode", "PeakAverage");
int accuracy = an.getInt("accuracy", 5);
//PENDING вставить остальные параметры?
sendAndWait("MeasurementRemove", name);
if (sendAndWait("AddPeakJump", name, filterMode, accuracy, 0, 0, 0).isOK()) {
peakMap = new LinkedHashMap<>();
for (Meta peak : an.getNodes("peak")) {
peakMap.put(peak.getInt("mass"), peak.getString("name", peak.getString("mass")));
if (!sendAndWait("MeasurementAddMass", peak.getString("mass")).isOK()) {
throw new ControlException("Can't add mass to measurement measurement for msp");
}
}
} else {
throw new ControlException("Can't create measurement for msp");
}
}
public boolean startPeakJumpMeasurement() throws PortException {
if (!isFilamentOn()) {
error("Can't start measurement. Filament is not turned on.", null);
}
if (!sendAndWait("ScanAdd", "peakJump").isOK()) {
return false;
}
return sendAndWait("ScanStart", 2).isOK();
}
public boolean stopMeasurement() throws PortException {
return sendAndWait("ScanStop").isOK();
}
// /**
// * Create measurement with parameters and return its name
// *
// * @param an
// * @return
// * @throws hep.dataforge.exceptions.PortException
// */
// private void createPeakJumpMeasurement(Meta an) throws ControlException {
// String name = "peakJump";//an.getString("measurementNAmname", "default");
// String filterMode = an.getString("filterMode", "PeakAverage");
// int accuracy = an.getInt("accuracy", 5);
// //PENDING вставить остальные параметры?
// sendAndWait("MeasurementRemove", name);
// if (sendAndWait("AddPeakJump", name, filterMode, accuracy, 0, 0, 0).isOK()) {
// peakMap = new LinkedHashMap<>();
// for (Meta peak : an.getNodes("peak")) {
// peakMap.put(peak.getInt("mass"), peak.getString("name", peak.getString("mass")));
// if (!sendAndWait("MeasurementAddMass", peak.getString("mass")).isOK()) {
// throw new ControlException("Can't add mass to measurement measurement for msp");
// }
// }
// } else {
// throw new ControlException("Can't create measurement for msp");
// }
// }
@Override
public void doStop() throws PortException {
stopMeasurement();
public void shutdown() throws ControlException {
super.shutdown();
super.stopMeasurement(true);
setFileamentOn(false);
sendAndWait("Release");
handler.unholdBy(this);
handler.close();
}
@Override
public void error(String errorMessage, Throwable error) {
/**
* Evaluate general async messages
*
* @param response
*/
private void evaluateResponse(MspResponse response) {
switch (response.getCommandName()) {
// all possible async messages
case "FilamentStatus":
String status = response.get(0, 2);
isFilamentOn = status.equals("ON");
if (mspListener != null) {
mspListener.acceptFillamentStateChange(status);
}
break;
}
}
private void evaluateError(String errorMessage, Throwable error) {
if (mspListener != null) {
mspListener.error(errorMessage, error);
} else if (error != null) {
throw new RuntimeException(error);
} else {
if (error != null) {
throw new RuntimeException(error);
} else {
throw new RuntimeException(errorMessage);
}
throw new RuntimeException(errorMessage);
}
}
@ -443,4 +418,138 @@ public class MspDevice extends DataDevice implements PortHandler.PortController
}
}
private class PeakJumpMeasurement extends AbstractMeasurement<DataPoint> implements PortHandler.PortController {
private final Map<Integer, Double> measurement = new ConcurrentSkipListMap<>();
private Map<Integer, String> peakMap;
private final Meta meta;
public PeakJumpMeasurement(Meta meta) {
this.meta = meta;
}
@Override
public void start() {
try {
//Take control of port
handler.holdBy(this);
} catch (PortException ex) {
Logger.getLogger(MspDevice.class.getName()).log(Level.SEVERE, null, ex);
}
try {
String name = "peakJump";//an.getString("measurementNAmname", "default");
String filterMode = meta.getString("filterMode", "PeakAverage");
int accuracy = meta.getInt("accuracy", 5);
//PENDING вставить остальные параметры?
sendAndWait("MeasurementRemove", name);
if (sendAndWait("AddPeakJump", name, filterMode, accuracy, 0, 0, 0).isOK()) {
peakMap = new LinkedHashMap<>();
for (Meta peak : meta.getNodes("peak")) {
peakMap.put(peak.getInt("mass"), peak.getString("name", peak.getString("mass")));
if (!sendAndWait("MeasurementAddMass", peak.getString("mass")).isOK()) {
throw new ControlException("Can't add mass to measurement measurement for msp");
}
}
} else {
throw new ControlException("Can't create measurement for msp");
}
if (!isFilamentOn()) {
this.error("Can't start measurement. Filament is not turned on.", null);
}
if (!sendAndWait("ScanAdd", "peakJump").isOK()) {
this.error("Failed to add scan", null);
}
if (!sendAndWait("ScanStart", 2).isOK()) {
this.error("Failed to start scan", null);
}
} catch (ControlException ex) {
onError(ex);
}
}
@Override
public boolean stop(boolean force) {
handler.unholdBy(this);
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
public void accept(String message) {
if (mspListener != null) {
mspListener.acceptMessage(message.trim());
}
MspResponse response = new MspResponse(message);
//Evaluating device state change
evaluateResponse(response);
//Evaluating measurement information
switch (response.getCommandName()) {
case "MassReading":
double mass = Double.parseDouble(response.get(0, 1));
double value = Double.parseDouble(response.get(0, 2));
measurement.put((int) Math.floor(mass + 0.5), value);
break;
case "StartingScan":
if (mspListener != null && !measurement.isEmpty() && isFilamentOn) {
if (peakMap == null) {
throw new IllegalStateException("Peal map is not initialized");
}
mspListener.acceptScan(measurement);
Instant time = Instant.now();
MapDataPoint point = new MapDataPoint();
point.putValue("timestamp", time);
measurement.entrySet().stream().forEach((entry) -> {
double val = entry.getValue();
point.putValue(peakMap.get(entry.getKey()), val);
});
if (peakJumpLoader != null) {
try {
peakJumpLoader.push(point);
} catch (StorageException ex) {
getLogger().error("Push to repo failed", ex);
}
}
}
measurement.clear();
int numScans = Integer.parseInt(response.get(0, 3));
if (numScans == 0) {
try {
send("ScanResume", 2);
//FIXME обработать ошибку связи
} catch (PortException ex) {
error(null, ex);
}
}
break;
}
}
public boolean stopMeasurement() throws PortException {
return sendAndWait("ScanStop").isOK();
}
@Override
public void error(String errorMessage, Throwable error) {
if (error == null) {
onError(new MeasurementException(errorMessage));
} else {
onError(error);
}
}
}
}

View File

@ -13,20 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.control.msp;
import java.util.Map;
/**
*
* @author darksnake
*/
public interface MspListener {
void error(String errorMessage, Throwable error);
void acceptMeasurement(Map<Integer, Double> point);
void acceptMessage(String message);
void acceptRequest(String message);
default void acceptFillamentStateChange(String fillamentState){
}
}
package inr.numass.control.msp;
import java.util.Map;
/**
*
* @author darksnake
*/
public interface MspListener {
void error(String errorMessage, Throwable error);
void acceptScan(Map<Integer, Double> point);
void acceptMessage(String message);
void acceptRequest(String message);
default void acceptFillamentStateChange(String fillamentState){
}
}

View File

@ -155,7 +155,7 @@ public class MspViewController implements Initializable, MspListener {
try {
getDevice().setListener(this);
getDevice().init();
getDevice().start();
getDevice().startMeasurement("peakJump");
} catch (ControlException ex) {
showError(String.format("Can't connect to %s:%d. The port is either busy or not the MKS mass-spectrometer port",
config.getString("connection.ip", "127.0.0.1"),
@ -224,7 +224,7 @@ public class MspViewController implements Initializable, MspListener {
}
@Override
public void acceptMeasurement(Map<Integer, Double> measurement) {
public void acceptScan(Map<Integer, Double> measurement) {
MapDataPoint point = new MapDataPoint();
for (Map.Entry<Integer, Double> entry : measurement.entrySet()) {
Double val = entry.getValue();
@ -294,11 +294,11 @@ public class MspViewController implements Initializable, MspListener {
}
@FXML
private void onPlotToggle(ActionEvent event) throws PortException {
private void onPlotToggle(ActionEvent event) throws ControlException {
if (plotButton.isSelected()) {
getDevice().startPeakJumpMeasurement();
getDevice().startMeasurement("peakJump");
} else {
getDevice().stopMeasurement();
getDevice().stopMeasurement(false);
}
}
@ -316,7 +316,7 @@ public class MspViewController implements Initializable, MspListener {
}
public void disconnect() throws IOException, PortException, ControlException {
getDevice().stop();
getDevice().shutdown();
}
@Override

View File

@ -6,9 +6,9 @@
package inr.numass.readvac.devices;
import hep.dataforge.context.Context;
import hep.dataforge.control.measurements.SimpletMeasurement;
import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.Sensor;
import hep.dataforge.control.measurements.SimpleMeasurement;
import hep.dataforge.control.ports.ComPortHandler;
import hep.dataforge.control.ports.PortHandler;
import hep.dataforge.description.ValueDef;
@ -30,6 +30,10 @@ public class CM32Device extends Sensor<Double> {
super(name, context, meta);
}
public void setHandler(PortHandler handler){
this.handler = handler;
}
/**
* @return the handler
*/
@ -81,29 +85,29 @@ public class CM32Device extends Sensor<Double> {
return meta().getInt("timeout", 400);
}
private class CMVacMeasurement extends SimpleMeasurement<Double> {
private class CMVacMeasurement extends SimpletMeasurement<Double> {
private static final String CM32_QUERY = "MES R PM 1\r\n";
@Override
protected Double doMeasurement() throws Exception {
protected synchronized Double doMeasure() throws Exception {
String answer = handler.sendAndWait(CM32_QUERY, timeout());
if (answer.isEmpty()) {
this.progressUpdate("No signal");
this.onProgressUpdate("No signal");
updateState("connection", false);
return null;
} else if (answer.indexOf("PM1:mbar") < -1) {
this.progressUpdate("Wrong answer: " + answer);
this.onProgressUpdate("Wrong answer: " + answer);
updateState("connection", false);
return null;
} else if (answer.substring(14, 17).equals("OFF")) {
this.progressUpdate("Off");
this.onProgressUpdate("Off");
updateState("connection", true);
return null;
} else {
this.progressUpdate("OK");
this.onProgressUpdate("OK");
updateState("connection", true);
return Double.parseDouble(answer.substring(14, 17) + answer.substring(19, 23));
}

View File

@ -6,9 +6,9 @@
package inr.numass.readvac.devices;
import hep.dataforge.context.Context;
import hep.dataforge.control.measurements.SimpletMeasurement;
import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.Sensor;
import hep.dataforge.control.measurements.SimpleMeasurement;
import hep.dataforge.control.ports.ComPortHandler;
import hep.dataforge.control.ports.PortHandler;
import hep.dataforge.description.ValueDef;
@ -37,6 +37,10 @@ public class MKSVacDevice extends Sensor<Double> {
public MKSVacDevice(String name, Context context, Meta meta) {
super(name, context, meta);
}
public void setHandler(PortHandler handler){
this.handler = handler;
}
private String talk(String requestContent) throws ControlException {
String answer = getHandler().sendAndWait(String.format("@%s%s;FF", getDeviceAddress(), requestContent), timeout());
@ -171,23 +175,23 @@ public class MKSVacDevice extends Sensor<Double> {
return handler;
}
private class MKSVacMeasurement extends SimpleMeasurement<Double> {
private class MKSVacMeasurement extends SimpletMeasurement<Double> {
@Override
protected Double doMeasurement() throws Exception {
protected synchronized Double doMeasure() throws Exception {
String answer = talk("PR" + getChannel() + "?");
if (answer == null || answer.isEmpty()) {
invalidateState("connection");
this.progressUpdate("No connection");
this.onProgressUpdate("No connection");
return null;
}
double res = Double.parseDouble(answer);
if (res <= 0) {
this.progressUpdate("Non positive");
this.onProgressUpdate("Non positive");
invalidateState("power");
return null;
} else {
this.progressUpdate("OK");
this.onProgressUpdate("OK");
return res;
}
}

View File

@ -6,9 +6,9 @@
package inr.numass.readvac.devices;
import hep.dataforge.context.Context;
import hep.dataforge.control.measurements.SimpletMeasurement;
import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.Sensor;
import hep.dataforge.control.measurements.SimpleMeasurement;
import hep.dataforge.control.ports.ComPortHandler;
import hep.dataforge.control.ports.PortHandler;
import hep.dataforge.description.ValueDef;
@ -34,6 +34,10 @@ public class VITVacDevice extends Sensor<Double> {
super(name, context, meta);
}
public void setHandler(PortHandler handler) {
this.handler = handler;
}
/**
* @return the handler
*/
@ -85,17 +89,17 @@ public class VITVacDevice extends Sensor<Double> {
return meta().getInt("timeout", 400);
}
private class CMVacMeasurement extends SimpleMeasurement<Double> {
private class CMVacMeasurement extends SimpletMeasurement<Double> {
private static final String VIT_QUERY = ":010300000002FA\r\n";
@Override
protected Double doMeasurement() throws Exception {
protected synchronized Double doMeasure() throws Exception {
String answer = handler.sendAndWait(VIT_QUERY, timeout());
if (answer.isEmpty()) {
this.progressUpdate("No signal");
this.onProgressUpdate("No signal");
updateState("connection", false);
return null;
} else {
@ -109,11 +113,11 @@ public class VITVacDevice extends Sensor<Double> {
}
BigDecimal res = BigDecimal.valueOf(base * Math.pow(10, exp));
res = res.setScale(4, RoundingMode.CEILING);
this.progressUpdate("OK");
this.onProgressUpdate("OK");
updateState("connection", true);
return res.doubleValue();
} else {
this.progressUpdate("Wrong answer: " + answer);
this.onProgressUpdate("Wrong answer: " + answer);
updateState("connection", false);
return null;
}

View File

@ -13,11 +13,15 @@ import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.Sensor;
import hep.dataforge.data.DataPoint;
import hep.dataforge.exceptions.ControlException;
import hep.dataforge.exceptions.MeasurementException;
import hep.dataforge.meta.Meta;
import java.time.Instant;
import hep.dataforge.values.Value;
import java.util.HashMap;
import java.util.Map;
import javafx.util.Pair;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
*
@ -44,7 +48,7 @@ public class VacCollectorDevice extends Sensor<DataPoint> {
@Override
protected Measurement<DataPoint> createMeasurement() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
return new VacuumMeasurement();
}
@Override
@ -53,29 +57,39 @@ public class VacCollectorDevice extends Sensor<DataPoint> {
}
private class VacuumMeasurement extends AbstractMeasurement<DataPoint> {
ValueCollector collector = new PointCollector(this::result, sensorMap.keySet());
@Override
protected Pair<DataPoint, Instant> doGet() throws Exception {
sensorMap.values().stream().parallel().forEach(action);
}
@Override
public boolean isFinished() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
private final ValueCollector collector = new PointCollector(this::onResult, sensorMap.keySet());
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> currentTask;
@Override
public void start() {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
currentTask = executor.scheduleWithFixedDelay(() -> {
sensorMap.entrySet().stream().parallel().forEach((entry) -> {
try {
collector.put(entry.getKey(), entry.getValue().getMeasurement().getResult());
} catch (MeasurementException ex) {
onError(ex);
collector.put(entry.getKey(), Value.NULL);
}
});
}, 0, getDelay(), TimeUnit.MILLISECONDS);
}
private int getDelay() {
return meta().getInt("delay", 5000);
}
@Override
public boolean stop(boolean force) {
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
boolean isRunning = currentTask != null;
if (isRunning) {
currentTask.cancel(force);
isFinished = true;
currentTask = null;
}
return isRunning;
}
}
}