Global update on storages. Adopted connections instead of listeners for them

This commit is contained in:
Alexander Nozik 2017-05-24 21:28:50 +03:00
parent 52e488697f
commit 6aeaf89d69
18 changed files with 158 additions and 208 deletions

View File

@ -20,6 +20,7 @@ import hep.dataforge.control.RoleDef;
import hep.dataforge.control.collectors.RegularPointCollector; import hep.dataforge.control.collectors.RegularPointCollector;
import hep.dataforge.control.connections.Roles; import hep.dataforge.control.connections.Roles;
import hep.dataforge.control.connections.StorageConnection; import hep.dataforge.control.connections.StorageConnection;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.PortSensor; import hep.dataforge.control.devices.PortSensor;
import hep.dataforge.control.measurements.AbstractMeasurement; import hep.dataforge.control.measurements.AbstractMeasurement;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
@ -43,6 +44,7 @@ import java.time.Duration;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/** /**
* A device controller for Dubna PKT 8 cryogenic thermometry device * A device controller for Dubna PKT 8 cryogenic thermometry device
@ -128,10 +130,13 @@ public class PKT8Device extends PortSensor<PKT8Result> {
// setting up the collector // setting up the collector
storageHelper = new StorageHelper(this, this::buildLoader); storageHelper = new StorageHelper(this, this::buildLoader);
Duration duration = Duration.parse(meta().getString("averagingDuration", "PT30S")); Duration duration = Duration.parse(meta().getString("averagingDuration", "PT30S"));
collector = new RegularPointCollector((DataPoint dp) -> { collector = new RegularPointCollector(
duration,
channels.values().stream().map(PKT8Channel::getName).collect(Collectors.toList()),
(DataPoint dp) -> {
getLogger().debug("Point measurement complete. Pushing..."); getLogger().debug("Point measurement complete. Pushing...");
storageHelper.push(dp); storageHelper.push(dp);
}, duration, channels.values().stream().map(PKT8Channel::getName).toArray(String[]::new)); });
} }
@ -335,6 +340,11 @@ public class PKT8Device extends PortSensor<PKT8Result> {
this.handler = handler; this.handler = handler;
} }
@Override
public Device getDevice() {
return PKT8Device.this;
}
@Override @Override
public void start() { public void start() {
if (isStarted()) { if (isStarted()) {

View File

@ -26,15 +26,15 @@ public class PKT8PlotFragment extends FXFragment {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
showingProperty().addListener((observable, oldValue, newValue) -> { // showingProperty().addListener((observable, oldValue, newValue) -> {
if (device.isMeasuring()) { // if (device.isMeasuring()) {
if (newValue) { // if (newValue) {
device.getMeasurement().addListener(plotController); // device.getMeasurement().addListener(plotController);
} else { // } else {
device.getMeasurement().removeListener(plotController); // device.getMeasurement().removeListener(plotController);
} // }
} // }
}); // });
} }
@Override @Override

View File

@ -43,7 +43,7 @@ import java.util.ResourceBundle;
* *
* @author darksnake * @author darksnake
*/ */
public class PKT8PlotView extends DeviceViewConnection<PKT8Device> implements Initializable, MeasurementListener<PKT8Result> { public class PKT8PlotView extends DeviceViewConnection<PKT8Device> implements Initializable, MeasurementListener {
private FXPlotFrame plotFrame; private FXPlotFrame plotFrame;
private TimePlottableGroup plottables; private TimePlottableGroup plottables;
@ -121,12 +121,13 @@ public class PKT8PlotView extends DeviceViewConnection<PKT8Device> implements In
} }
@Override @Override
public synchronized void onMeasurementResult(Measurement<PKT8Result> measurement, PKT8Result result, Instant time) { public synchronized void onMeasurementResult(Measurement measurement, Object result, Instant time) {
PKT8Result res = PKT8Result.class.cast(result);
//PENDING replace by connection? //PENDING replace by connection?
if (rawDataButton.isSelected()) { if (rawDataButton.isSelected()) {
plottables.put(result.channel, result.rawValue); plottables.put(res.channel, res.rawValue);
} else { } else {
plottables.put(result.channel, result.temperature); plottables.put(res.channel, res.temperature);
} }
} }

View File

@ -30,7 +30,7 @@ import java.util.ResourceBundle;
/** /**
* Created by darksnake on 07-Oct-16. * Created by darksnake on 07-Oct-16.
*/ */
public class PKT8View extends DeviceViewConnection<PKT8Device> implements Initializable, MeasurementListener<PKT8Result> { public class PKT8View extends DeviceViewConnection<PKT8Device> implements Initializable, MeasurementListener {
public static PKT8View build(){ public static PKT8View build(){
try { try {
@ -96,11 +96,12 @@ public class PKT8View extends DeviceViewConnection<PKT8Device> implements Initia
} }
@Override @Override
public void onMeasurementResult(Measurement<PKT8Result> measurement, PKT8Result result, Instant time) { public void onMeasurementResult(Measurement<?> measurement, Object result, Instant time) {
PKT8Result res = PKT8Result.class.cast(result);
Platform.runLater(() -> { Platform.runLater(() -> {
lastUpdateLabel.setText(time.toString()); lastUpdateLabel.setText(time.toString());
table.getItems().removeIf(it -> it.channel.equals(result.channel)); table.getItems().removeIf(it -> it.channel.equals(res.channel));
table.getItems().add(result); table.getItems().add(res);
table.getItems().sort(Comparator.comparing(o -> o.channel)); table.getItems().sort(Comparator.comparing(o -> o.channel));
}); });
} }
@ -112,12 +113,11 @@ public class PKT8View extends DeviceViewConnection<PKT8Device> implements Initia
private void startMeasurement() throws MeasurementException { private void startMeasurement() throws MeasurementException {
getDevice().startMeasurement().addListener(this); getDevice().startMeasurement();
} }
private void stopMeasurement() throws MeasurementException { private void stopMeasurement() throws MeasurementException {
if (getDevice().isMeasuring()) { if (getDevice().isMeasuring()) {
getDevice().getMeasurement().removeListener(this);
getDevice().stopMeasurement(false); getDevice().stopMeasurement(false);
} }
} }

View File

@ -17,8 +17,11 @@ package inr.numass.control.msp;
import hep.dataforge.context.Context; import hep.dataforge.context.Context;
import hep.dataforge.control.RoleDef; import hep.dataforge.control.RoleDef;
import hep.dataforge.control.collectors.RegularPointCollector;
import hep.dataforge.control.collectors.ValueCollector;
import hep.dataforge.control.connections.Roles; import hep.dataforge.control.connections.Roles;
import hep.dataforge.control.connections.StorageConnection; import hep.dataforge.control.connections.StorageConnection;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.PortSensor; import hep.dataforge.control.devices.PortSensor;
import hep.dataforge.control.devices.SingleMeasurementDevice; import hep.dataforge.control.devices.SingleMeasurementDevice;
import hep.dataforge.control.devices.StateDef; import hep.dataforge.control.devices.StateDef;
@ -35,16 +38,15 @@ import hep.dataforge.storage.api.PointLoader;
import hep.dataforge.storage.api.Storage; import hep.dataforge.storage.api.Storage;
import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.tables.DataPoint; import hep.dataforge.tables.DataPoint;
import hep.dataforge.tables.MapPoint;
import hep.dataforge.tables.TableFormat; import hep.dataforge.tables.TableFormat;
import hep.dataforge.tables.TableFormatBuilder; import hep.dataforge.tables.TableFormatBuilder;
import hep.dataforge.utils.DateTimeUtils; import hep.dataforge.utils.DateTimeUtils;
import hep.dataforge.values.Value; import hep.dataforge.values.Value;
import inr.numass.control.StorageHelper; import inr.numass.control.StorageHelper;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@ -99,13 +101,8 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
} }
@Override @Override
protected Meta getMetaForMeasurement(String name) { protected Meta getMeasurementMeta() {
switch (name) {
case "peakJump":
return meta().getMeta("peakJump"); return meta().getMeta("peakJump");
default:
return super.getMetaForMeasurement(name);
}
} }
@Override @Override
@ -347,10 +344,14 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
return handler; return handler;
} }
private Duration getAveragingDuration() {
return Duration.parse(meta().getString("averagingDuration", "PT60S"));
}
/** /**
* The MKS response as two-dimensional array of strings * The MKS response as two-dimensional array of strings
*/ */
public static class MspResponse { static class MspResponse {
private final List<List<String>> data = new ArrayList<>(); private final List<List<String>> data = new ArrayList<>();
@ -400,7 +401,7 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
private class PeakJumpMeasurement extends AbstractMeasurement<DataPoint> { private class PeakJumpMeasurement extends AbstractMeasurement<DataPoint> {
private final Map<Integer, Double> measurement = new ConcurrentSkipListMap<>(); private ValueCollector collector = new RegularPointCollector(getAveragingDuration(), this::result);
private StorageHelper helper = new StorageHelper(MspDevice.this, this::makeLoader); private StorageHelper helper = new StorageHelper(MspDevice.this, this::makeLoader);
private final Meta meta; private final Meta meta;
private Map<Integer, String> peakMap; private Map<Integer, String> peakMap;
@ -433,6 +434,11 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
} }
} }
@Override
public Device getDevice() {
return MspDevice.this;
}
@Override @Override
public void start() { public void start() {
responseDelegate = this::eval; responseDelegate = this::eval;
@ -484,6 +490,12 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
} }
} }
@Override
protected synchronized void result(DataPoint result, Instant time) {
super.result(result, time);
helper.push(result);
}
public void eval(MspResponse response) { public void eval(MspResponse response) {
//Evaluating device state change //Evaluating device state change
@ -493,36 +505,12 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
case "MassReading": case "MassReading":
double mass = Double.parseDouble(response.get(0, 1)); double mass = Double.parseDouble(response.get(0, 1));
double value = Double.parseDouble(response.get(0, 2)) / 100d; double value = Double.parseDouble(response.get(0, 2)) / 100d;
measurement.put((int) Math.floor(mass + 0.5), value); String massName = Integer.toString((int) Math.floor(mass + 0.5));
collector.put(massName, value);
break; break;
case "ZeroReading": case "ZeroReading":
zero = Double.parseDouble(response.get(0, 2)) / 100d; zero = Double.parseDouble(response.get(0, 2)) / 100d;
case "StartingScan": case "StartingScan":
if (mspListener != null && !measurement.isEmpty()) {
if (peakMap == null) {
throw new IllegalStateException("Peak map is not initialized");
}
if (isFilamentOn()) {
Instant time = DateTimeUtils.now();
MapPoint.Builder point = new MapPoint.Builder();
point.putValue("timestamp", time);
measurement.forEach((key, value1) -> {
double val = value1;
point.putValue(peakMap.get(key), val);
});
mspListener.acceptScan(measurement);
//pushing data to storage
helper.push(point.build());
}
measurement.clear();
int numScans = Integer.parseInt(response.get(0, 3)); int numScans = Integer.parseInt(response.get(0, 3));
if (numScans == 0) { if (numScans == 0) {
@ -533,11 +521,9 @@ public class MspDevice extends SingleMeasurementDevice implements PortHandler.Po
error(null, ex); error(null, ex);
} }
} }
break; break;
} }
} }
}
public void error(String errorMessage, Throwable error) { public void error(String errorMessage, Throwable error) {
if (error == null) { if (error == null) {

View File

@ -15,8 +15,6 @@
*/ */
package inr.numass.control.msp; package inr.numass.control.msp;
import java.util.Map;
/** /**
* *
* @author darksnake * @author darksnake
@ -25,9 +23,8 @@ public interface MspListener {
void error(String errorMessage, Throwable error); void error(String errorMessage, Throwable error);
void acceptScan(Map<Integer, Double> point);
void acceptMessage(String message); void acceptMessage(String message);
void acceptRequest(String message); void acceptRequest(String message);
default void acceptFilamentStateChange(String fillamentState){ default void acceptFilamentStateChange(String fillamentState){

View File

@ -16,6 +16,8 @@
package inr.numass.control.msp.fx; package inr.numass.control.msp.fx;
import hep.dataforge.control.devices.DeviceListener; import hep.dataforge.control.devices.DeviceListener;
import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.MeasurementListener;
import hep.dataforge.exceptions.ControlException; import hep.dataforge.exceptions.ControlException;
import hep.dataforge.exceptions.PortException; import hep.dataforge.exceptions.PortException;
import hep.dataforge.fx.fragments.FragmentWindow; import hep.dataforge.fx.fragments.FragmentWindow;
@ -27,6 +29,7 @@ import hep.dataforge.plots.data.TimePlottable;
import hep.dataforge.plots.data.TimePlottableGroup; import hep.dataforge.plots.data.TimePlottableGroup;
import hep.dataforge.plots.fx.PlotContainer; import hep.dataforge.plots.fx.PlotContainer;
import hep.dataforge.plots.jfreechart.JFreeChartFrame; import hep.dataforge.plots.jfreechart.JFreeChartFrame;
import hep.dataforge.tables.DataPoint;
import hep.dataforge.values.Value; import hep.dataforge.values.Value;
import inr.numass.control.DeviceViewConnection; import inr.numass.control.DeviceViewConnection;
import inr.numass.control.msp.MspDevice; import inr.numass.control.msp.MspDevice;
@ -51,8 +54,8 @@ import org.controlsfx.control.ToggleSwitch;
import java.io.IOException; import java.io.IOException;
import java.net.URL; import java.net.URL;
import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.ResourceBundle; import java.util.ResourceBundle;
/** /**
@ -60,7 +63,7 @@ import java.util.ResourceBundle;
* *
* @author darksnake * @author darksnake
*/ */
public class MspViewController extends DeviceViewConnection<MspDevice> implements DeviceListener, Initializable, MspListener { public class MspViewController extends DeviceViewConnection<MspDevice> implements DeviceListener, Initializable, MspListener, MeasurementListener {
public static MspViewController build() { public static MspViewController build() {
try { try {
@ -166,15 +169,6 @@ public class MspViewController extends DeviceViewConnection<MspDevice> implement
bindBooleanToState("connected", connectButton.selectedProperty()); bindBooleanToState("connected", connectButton.selectedProperty());
} }
// public void setDeviceConfig(Context context, File cfgFile) {
// try {
// Meta deviceConfig = MetaFileReader.instance().read(context, cfgFile, null);
// setDeviceConfig(context, deviceConfig);
// } catch (IOException | ParseException ex) {
// showError("Can't load configuration file");
// }
// }
private void initPlot() { private void initPlot() {
Meta plotConfig = new MetaBuilder("plotFrame") Meta plotConfig = new MetaBuilder("plotFrame")
.setNode(new MetaBuilder("yAxis") .setNode(new MetaBuilder("yAxis")
@ -187,9 +181,6 @@ public class MspViewController extends DeviceViewConnection<MspDevice> implement
this.plot = new JFreeChartFrame(plotConfig); this.plot = new JFreeChartFrame(plotConfig);
PlotContainer container = PlotContainer.centerIn(plotPane); PlotContainer container = PlotContainer.centerIn(plotPane);
container.setPlot(plot); container.setPlot(plot);
// updatePlot();
// this.plot = DynamicPlot.attachToFX(plotPane, new AnnotationBuilder("plot-config").putValue("logY", true).build());
// plot.setAutoRange(30 * 60);
} }
public void updatePlot() { public void updatePlot() {
@ -220,21 +211,6 @@ public class MspViewController extends DeviceViewConnection<MspDevice> implement
} }
} }
@Override
public void acceptScan(Map<Integer, Double> measurement) {
// MapPoint.Builder point = new MapPoint.Builder();
for (Map.Entry<Integer, Double> entry : measurement.entrySet()) {
Double val = entry.getValue();
if (val <= 0) {
val = Double.NaN;
}
TimePlottable pl = plottables.get(Integer.toString(entry.getKey()));
if (pl != null) {
pl.put(Value.of(val));
}
}
}
@Override @Override
public void acceptMessage(String message) { public void acceptMessage(String message) {
Platform.runLater(() -> { Platform.runLater(() -> {
@ -261,7 +237,7 @@ public class MspViewController extends DeviceViewConnection<MspDevice> implement
@FXML @FXML
private void onPlotToggle(ActionEvent event) throws ControlException { private void onPlotToggle(ActionEvent event) throws ControlException {
if (measureButton.isSelected()) { if (measureButton.isSelected()) {
getDevice().startMeasurement("peakJump"); getDevice().startMeasurement();
} else { } else {
getDevice().stopMeasurement(false); getDevice().stopMeasurement(false);
} }
@ -302,50 +278,6 @@ public class MspViewController extends DeviceViewConnection<MspDevice> implement
@FXML @FXML
private void onStoreButtonClick(ActionEvent event) { private void onStoreButtonClick(ActionEvent event) {
getDevice().setState("storing", storeButton.isSelected()); getDevice().setState("storing", storeButton.isSelected());
// if (storeButton.isSelected()) {
//
// if (!getDevice().meta().hasMeta("storage")) {
// getDevice().getLogger().info("Storage not defined. Starting storage selection dialog");
// DirectoryChooser chooser = new DirectoryChooser();
// File storageDir = chooser.showDialog(this.plotPane.getScene().getWindow());
// if (storageDir == null) {
// storeButton.setSelected(false);
// throw new RuntimeException("User canceled directory selection");
// }
// getDevice().getConfig().putNode(new MetaBuilder("storage")
// .putValue("path", storageDir.getAbsolutePath()));
// }
// Meta storageConfig = getDevice().meta().getMeta("storage");
// Storage localStorage = StorageManager.buildFrom(getDevice().getContext())
// .buildStorage(storageConfig);
//
// String runName = getDevice().meta().getString("numass.run", "");
// Meta meta = getDevice().meta();
// if (meta.hasMeta("numass")) {
// try {
// getDevice().getLogger().info("Obtaining run information from cetral server...");
// NumassClient client = new NumassClient(meta.getString("numass.ip", "192.168.111.1"),
// meta.getInt("numass.port", 8335));
// runName = client.getCurrentRun().getString("path", "");
// getDevice().getLogger().info("Run name is '{}'", runName);
// } catch (Exception ex) {
// getDevice().getLogger().warn("Failed to download current run information", ex);
// }
// }
//
// if (!runName.isEmpty()) {
// try {
// localStorage = localStorage.buildShelf(runName, null);
// } catch (StorageException ex) {
// getDevice().getLogger().error("Failed to create storage shelf. Using root storage instead");
// }
// }
//
// connection = new StorageConnection(localStorage);
// getDevice().connect(connection, Roles.STORAGE_ROLE);
// } else if (connection != null) {
// getDevice().disconnect(connection);
// }
} }
@ -353,4 +285,21 @@ public class MspViewController extends DeviceViewConnection<MspDevice> implement
public Node getFXNode() { public Node getFXNode() {
return root; return root;
} }
@Override
public void onMeasurementResult(Measurement<?> measurement, Object res, Instant time) {
DataPoint result = DataPoint.class.cast(res);
for (String valueName : result.names()) {
TimePlottable pl = plottables.get(valueName);
if (pl != null) {
pl.put(Value.of(result.getValue(valueName)));
}
}
}
@Override
public void onMeasurementFailed(Measurement<?> measurement, Throwable exception) {
}
} }

View File

@ -6,6 +6,7 @@
package inr.numass.readvac; package inr.numass.readvac;
import hep.dataforge.context.Context; import hep.dataforge.context.Context;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.PortSensor; import hep.dataforge.control.devices.PortSensor;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.SimpleMeasurement; import hep.dataforge.control.measurements.SimpleMeasurement;
@ -81,6 +82,11 @@ public class CM32Device extends PortSensor<Double> {
return Double.parseDouble(answer.substring(14, 17) + answer.substring(19, 23)); return Double.parseDouble(answer.substring(14, 17) + answer.substring(19, 23));
} }
} }
@Override
public Device getDevice() {
return CM32Device.this;
}
} }
} }

View File

@ -6,6 +6,7 @@
package inr.numass.readvac; package inr.numass.readvac;
import hep.dataforge.context.Context; import hep.dataforge.context.Context;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.PortSensor; import hep.dataforge.control.devices.PortSensor;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.SimpleMeasurement; import hep.dataforge.control.measurements.SimpleMeasurement;
@ -52,6 +53,11 @@ public class MKSBaratronDevice extends PortSensor<Double> {
private class BaratronMeasurement extends SimpleMeasurement<Double> { private class BaratronMeasurement extends SimpleMeasurement<Double> {
@Override
public Device getDevice() {
return MKSBaratronDevice.this;
}
@Override @Override
protected synchronized Double doMeasure() throws Exception { protected synchronized Double doMeasure() throws Exception {
String answer = getHandler().sendAndWait("AV" + getChannel() + "\r", timeout()); String answer = getHandler().sendAndWait("AV" + getChannel() + "\r", timeout());

View File

@ -6,6 +6,7 @@
package inr.numass.readvac; package inr.numass.readvac;
import hep.dataforge.context.Context; import hep.dataforge.context.Context;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.PortSensor; import hep.dataforge.control.devices.PortSensor;
import hep.dataforge.control.devices.StateDef; import hep.dataforge.control.devices.StateDef;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
@ -165,5 +166,9 @@ public class MKSVacDevice extends PortSensor<Double> {
} }
} }
@Override
public Device getDevice() {
return MKSVacDevice.this;
}
} }
} }

View File

@ -6,6 +6,7 @@
package inr.numass.readvac; package inr.numass.readvac;
import hep.dataforge.context.Context; import hep.dataforge.context.Context;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.PortSensor; import hep.dataforge.control.devices.PortSensor;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.SimpleMeasurement; import hep.dataforge.control.measurements.SimpleMeasurement;
@ -114,6 +115,11 @@ public class MeradatVacDevice extends PortSensor<Double> {
} }
} }
} }
@Override
public Device getDevice() {
return MeradatVacDevice.this;
}
} }
} }

View File

@ -11,6 +11,7 @@ import hep.dataforge.control.collectors.PointCollector;
import hep.dataforge.control.collectors.ValueCollector; import hep.dataforge.control.collectors.ValueCollector;
import hep.dataforge.control.connections.Roles; import hep.dataforge.control.connections.Roles;
import hep.dataforge.control.connections.StorageConnection; import hep.dataforge.control.connections.StorageConnection;
import hep.dataforge.control.devices.Device;
import hep.dataforge.control.devices.StateDef; import hep.dataforge.control.devices.StateDef;
import hep.dataforge.control.measurements.AbstractMeasurement; import hep.dataforge.control.measurements.AbstractMeasurement;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
@ -22,7 +23,6 @@ import hep.dataforge.storage.api.PointLoader;
import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.tables.DataPoint; import hep.dataforge.tables.DataPoint;
import hep.dataforge.tables.MapPoint; import hep.dataforge.tables.MapPoint;
import hep.dataforge.tables.PointListener;
import hep.dataforge.tables.TableFormatBuilder; import hep.dataforge.tables.TableFormatBuilder;
import hep.dataforge.utils.DateTimeUtils; import hep.dataforge.utils.DateTimeUtils;
import hep.dataforge.values.Value; import hep.dataforge.values.Value;
@ -122,12 +122,6 @@ public class VacCollectorDevice extends Sensor<DataPoint> {
return LoaderFactory.buildPointLoder(connection.getStorage(), "vactms_" + suffix, "", "timestamp", format.build()); return LoaderFactory.buildPointLoder(connection.getStorage(), "vactms_" + suffix, "", "timestamp", format.build());
} }
@Override
public void onMeasurementResult(Measurement<DataPoint> measurement, DataPoint result, Instant time) {
super.onMeasurementResult(measurement, result, time);
helper.push(result);
}
public Collection<Sensor<Double>> getSensors() { public Collection<Sensor<Double>> getSensors() {
return sensorMap.values(); return sensorMap.values();
} }
@ -138,6 +132,13 @@ public class VacCollectorDevice extends Sensor<DataPoint> {
private ScheduledExecutorService executor; private ScheduledExecutorService executor;
private ScheduledFuture<?> currentTask; private ScheduledFuture<?> currentTask;
@Override
public Device getDevice() {
return VacCollectorDevice.this;
}
@Override @Override
public void start() { public void start() {
executor = Executors executor = Executors
@ -163,9 +164,7 @@ public class VacCollectorDevice extends Sensor<DataPoint> {
@Override @Override
protected synchronized void result(DataPoint result, Instant time) { protected synchronized void result(DataPoint result, Instant time) {
super.result(result, time); super.result(result, time);
forEachConnection(Roles.STORAGE_ROLE, PointListener.class, (PointListener listener) -> { helper.push(result);
listener.accept(result);
});
} }
private DataPoint terminator() { private DataPoint terminator() {

View File

@ -58,7 +58,7 @@ import java.util.ResourceBundle;
* *
* @author <a href="mailto:altavir@gmail.com">Alexander Nozik</a> * @author <a href="mailto:altavir@gmail.com">Alexander Nozik</a>
*/ */
public class VacCollectorView extends DeviceViewConnection<VacCollectorDevice> implements Initializable, MeasurementListener<DataPoint> { public class VacCollectorView extends DeviceViewConnection<VacCollectorDevice> implements Initializable, MeasurementListener {
public static VacCollectorView build() { public static VacCollectorView build() {
try { try {
@ -154,9 +154,9 @@ public class VacCollectorView extends DeviceViewConnection<VacCollectorDevice> i
} }
@Override @Override
public void onMeasurementResult(Measurement<DataPoint> measurement, DataPoint result, Instant time) { public void onMeasurementResult(Measurement measurement, Object res, Instant time) {
if (plottables != null) { if (plottables != null) {
plottables.put(result); plottables.put(DataPoint.class.cast(res));
} }
Platform.runLater(() -> timeLabel.setText(TIME_FORMAT.format(LocalDateTime.ofInstant(time, ZoneOffset.UTC)))); Platform.runLater(() -> timeLabel.setText(TIME_FORMAT.format(LocalDateTime.ofInstant(time, ZoneOffset.UTC))));
} }
@ -190,7 +190,7 @@ public class VacCollectorView extends DeviceViewConnection<VacCollectorDevice> i
} }
private void startMeasurement() throws ControlException { private void startMeasurement() throws ControlException {
getDevice().startMeasurement().addListener(this); getDevice().startMeasurement();
startStopButton.setSelected(true); startStopButton.setSelected(true);
} }

View File

@ -5,7 +5,6 @@
*/ */
package inr.numass.readvac.fx; package inr.numass.readvac.fx;
import hep.dataforge.control.connections.MeasurementConsumer;
import hep.dataforge.control.devices.Device; import hep.dataforge.control.devices.Device;
import hep.dataforge.control.measurements.Measurement; import hep.dataforge.control.measurements.Measurement;
import hep.dataforge.control.measurements.MeasurementListener; import hep.dataforge.control.measurements.MeasurementListener;
@ -36,7 +35,7 @@ import static hep.dataforge.control.devices.PortSensor.CONNECTED_STATE;
/** /**
* @author <a href="mailto:altavir@gmail.com">Alexander Nozik</a> * @author <a href="mailto:altavir@gmail.com">Alexander Nozik</a>
*/ */
public class VacuumeterView extends DeviceViewConnection<Sensor<Double>> implements MeasurementConsumer, MeasurementListener<Double>, Initializable { public class VacuumeterView extends DeviceViewConnection<Sensor<Double>> implements MeasurementListener, Initializable {
private static final DecimalFormat FORMAT = new DecimalFormat("0.###E0"); private static final DecimalFormat FORMAT = new DecimalFormat("0.###E0");
private static final DateTimeFormatter TIME_FORMAT = DateTimeFormatter.ISO_LOCAL_TIME; private static final DateTimeFormatter TIME_FORMAT = DateTimeFormatter.ISO_LOCAL_TIME;
@ -56,14 +55,6 @@ public class VacuumeterView extends DeviceViewConnection<Sensor<Double>> impleme
@FXML @FXML
private ToggleSwitch disableButton; private ToggleSwitch disableButton;
@Override
@SuppressWarnings("unchecked")
public void accept(Device device, String measurementName, Measurement measurement) {
measurement.addListener(this);
getDevice().meta().optValue("color").ifPresent(colorValue -> valueLabel.setTextFill(Color.valueOf(colorValue.stringValue())));
}
@Override @Override
public void evaluateDeviceException(Device device, String message, Throwable exception) { public void evaluateDeviceException(Device device, String message, Throwable exception) {
Platform.runLater(() -> setStatus("ERROR: " + message)); Platform.runLater(() -> setStatus("ERROR: " + message));
@ -120,7 +111,13 @@ public class VacuumeterView extends DeviceViewConnection<Sensor<Double>> impleme
} }
@Override @Override
public void onMeasurementResult(Measurement<Double> measurement, Double result, Instant time) { public void onMeasurementStarted(Measurement<?> measurement) {
getDevice().meta().optValue("color").ifPresent(colorValue -> valueLabel.setTextFill(Color.valueOf(colorValue.stringValue())));
}
@Override
public void onMeasurementResult(Measurement measurement, Object res, Instant time) {
Double result = Double.class.cast(res);
String resString = FORMAT.format(result); String resString = FORMAT.format(result);
Platform.runLater(() -> { Platform.runLater(() -> {
valueLabel.setText(resString); valueLabel.setText(resString);

View File

@ -37,7 +37,6 @@ import java.net.URL;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Function;
@ -140,12 +139,7 @@ public class NumassDataLoader extends AbstractLoader implements ObjectLoader<Env
if (fileName.equals(META_FRAGMENT_NAME) if (fileName.equals(META_FRAGMENT_NAME)
|| fileName.equals(HV_FRAGMENT_NAME) || fileName.equals(HV_FRAGMENT_NAME)
|| fileName.startsWith(POINT_FRAGMENT_NAME)) { || fileName.startsWith(POINT_FRAGMENT_NAME)) {
try {
return new FileEnvelope(file.getPublicURIString(), true); return new FileEnvelope(file.getPublicURIString(), true);
} catch (IOException | ParseException ex) {
LoggerFactory.getLogger(NumassDataLoader.class).error("Can't read file envelope", ex);
return null;
}
} else { } else {
return null; return null;
} }

View File

@ -16,7 +16,7 @@
package inr.numass.storage; package inr.numass.storage;
import hep.dataforge.context.Context; import hep.dataforge.context.Context;
import hep.dataforge.events.BasicEvent; import hep.dataforge.events.Event;
import hep.dataforge.events.EventBuilder; import hep.dataforge.events.EventBuilder;
import hep.dataforge.exceptions.StorageException; import hep.dataforge.exceptions.StorageException;
import hep.dataforge.meta.Meta; import hep.dataforge.meta.Meta;
@ -30,7 +30,6 @@ import org.apache.commons.vfs2.FileSystemException;
import org.apache.commons.vfs2.FileType; import org.apache.commons.vfs2.FileType;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -131,19 +130,15 @@ public class NumassStorage extends FileStorage {
try (OutputStream os = nmFile.getContent().getOutputStream(false)) { try (OutputStream os = nmFile.getContent().getOutputStream(false)) {
os.write(data.array()); os.write(data.array());
} }
getDefaultEventLoader().push(NumassDataPointEvent.build(getName(), fileName, (int) nmFile.getContent().getSize())); dispatchEvent(NumassDataPointEvent.build(getName(), fileName, (int) nmFile.getContent().getSize()));
} catch (IOException ex) { } catch (IOException ex) {
throw new StorageException(ex); throw new StorageException(ex);
} }
} }
@Override @Override
public NumassStorage buildShelf(String path, Meta an) throws StorageException { public NumassStorage createShelf(String path, Meta meta) throws StorageException {
return new NumassStorage(this, path, meta);
//TODO add recusive shelves builders for composite paths
//converting dataforge paths to file paths
path = path.replace('.', File.separatorChar);
return new NumassStorage(this, path, an);
} }
/** /**
@ -175,7 +170,7 @@ public class NumassStorage extends FileStorage {
return meta().getString("description", ""); return meta().getString("description", "");
} }
public static class NumassDataPointEvent extends BasicEvent { public static class NumassDataPointEvent extends Event {
public static final String FILE_NAME_KEY = "fileName"; public static final String FILE_NAME_KEY = "fileName";
public static final String FILE_SIZE_KEY = "fileSize"; public static final String FILE_SIZE_KEY = "fileSize";

View File

@ -83,11 +83,11 @@ public class NumassRun implements Metoid, Responder {
} }
public void setState(String name, Value value) throws StorageException { public void setState(String name, Value value) throws StorageException {
states.setValue(name, value); states.pushState(name, value);
} }
public void setState(String name, Object value) throws StorageException { public void setState(String name, Object value) throws StorageException {
states.setValue(name, Value.of(value)); states.pushState(name, Value.of(value));
} }
public boolean hasState(String name) { public boolean hasState(String name) {

View File

@ -26,6 +26,7 @@ import hep.dataforge.storage.api.Storage;
import hep.dataforge.storage.commons.AbstractNetworkListener; import hep.dataforge.storage.commons.AbstractNetworkListener;
import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.storage.commons.LoaderFactory;
import hep.dataforge.storage.commons.StorageManager; import hep.dataforge.storage.commons.StorageManager;
import hep.dataforge.storage.commons.StorageUtils;
import inr.numass.storage.NumassStorage; import inr.numass.storage.NumassStorage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,7 +35,6 @@ import ratpack.server.RatpackServer;
import java.io.IOException; import java.io.IOException;
/** /**
*
* @author darksnake * @author darksnake
*/ */
public class NumassServer extends AbstractNetworkListener implements Encapsulated { public class NumassServer extends AbstractNetworkListener implements Encapsulated {
@ -99,13 +99,12 @@ public class NumassServer extends AbstractNetworkListener implements Encapsulate
private void startRun(Meta meta) throws StorageException { private void startRun(Meta meta) throws StorageException {
String path = meta.getString("path", DEFAULT_RUN_PATH); String path = meta.getString("path", DEFAULT_RUN_PATH);
NumassStorage storage = root.buildShelf(path,meta); Storage storage = StorageUtils.getOrBuildShelf(root, path, meta);
run = new NumassRun(path, storage, getResponseFactory()); run = new NumassRun(path, storage, getResponseFactory());
getRootState().setValue("numass.current.run", path); getRootState().pushState("numass.current.run", path);
} }
/** /**
*
* @param message * @param message
* @return * @return
*/ */
@ -168,7 +167,7 @@ public class NumassServer extends AbstractNetworkListener implements Encapsulate
* Reset run to default * Reset run to default
*/ */
public void resetRun() throws StorageException { public void resetRun() throws StorageException {
getRootState().setValue("numass.current.run", DEFAULT_RUN_PATH); getRootState().pushState("numass.current.run", DEFAULT_RUN_PATH);
updateRun(); updateRun();
} }