io tweaks
This commit is contained in:
parent
72b046abfe
commit
a699a56f66
@ -78,8 +78,17 @@ public class NumassIO extends BasicIOManager {
|
||||
});
|
||||
}
|
||||
|
||||
public String getExtension(String type) {
|
||||
switch (type){
|
||||
case DEFAULT_OUTPUT_TYPE:
|
||||
return ".out";
|
||||
default:
|
||||
return "." + type;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputStream out(Name stage, Name name) {
|
||||
public OutputStream out(Name stage, Name name, String type) {
|
||||
List<String> tokens = new ArrayList<>();
|
||||
if (getContext().hasValue("numass.path")) {
|
||||
String path = getContext().getString("numass.path");
|
||||
@ -95,7 +104,7 @@ public class NumassIO extends BasicIOManager {
|
||||
}
|
||||
|
||||
String dirName = String.join(File.separator, tokens);
|
||||
String fileName = name.removeNameSpace().toString() + ".out";
|
||||
String fileName = name.removeNameSpace().toString() + getExtension(type);
|
||||
OutputStream out = buildOut(getWorkDirectory(), dirName, fileName);
|
||||
registry.add(out);
|
||||
return out;
|
||||
|
@ -17,6 +17,8 @@ package inr.numass.utils;
|
||||
|
||||
import hep.dataforge.data.DataNode;
|
||||
import hep.dataforge.data.DataSet;
|
||||
import hep.dataforge.io.envelopes.DefaultEnvelopeType;
|
||||
import hep.dataforge.io.envelopes.Envelope;
|
||||
import hep.dataforge.io.envelopes.EnvelopeBuilder;
|
||||
import hep.dataforge.io.envelopes.TaglessEnvelopeType;
|
||||
import hep.dataforge.io.markup.Markedup;
|
||||
@ -102,7 +104,6 @@ public class NumassUtils {
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void writeEnvelope(OutputStream stream, Meta meta, Consumer<OutputStream> dataWriter) {
|
||||
//TODO replace by text envelope when it is ready
|
||||
try {
|
||||
TaglessEnvelopeType.instance.getWriter().write(
|
||||
stream,
|
||||
@ -117,6 +118,15 @@ public class NumassUtils {
|
||||
}
|
||||
}
|
||||
|
||||
public static void writeEnvelope(OutputStream stream, Envelope envelope) {
|
||||
try {
|
||||
DefaultEnvelopeType.instance.getWriter().write(stream, envelope);
|
||||
stream.flush();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void write(OutputStream stream, Meta meta, Markedup something) {
|
||||
writeEnvelope(stream, meta, out -> new SimpleMarkupRenderer(out).render(something.markup(meta)));
|
||||
}
|
||||
@ -133,10 +143,10 @@ public class NumassUtils {
|
||||
set.getPoints().forEach(point -> {
|
||||
Meta pointMeta = new MetaBuilder("point")
|
||||
.putValue("voltage", point.getVoltage())
|
||||
.putValue("index", point.meta().getInt("external_meta.point_index",-1))
|
||||
.putValue("run", point.meta().getString("external_meta.session",""))
|
||||
.putValue("group", point.meta().getString("external_meta.group",""));
|
||||
String pointName = "point_" + point.meta().getInt("external_meta.point_index",point.hashCode());
|
||||
.putValue("index", point.meta().getInt("external_meta.point_index", -1))
|
||||
.putValue("run", point.meta().getString("external_meta.session", ""))
|
||||
.putValue("group", point.meta().getString("external_meta.group", ""));
|
||||
String pointName = "point_" + point.meta().getInt("external_meta.point_index", point.hashCode());
|
||||
builder.putData(pointName, point, pointMeta);
|
||||
});
|
||||
set.getHvData().ifPresent(hv -> builder.putData("hv", hv, Meta.empty()));
|
||||
@ -145,10 +155,11 @@ public class NumassUtils {
|
||||
|
||||
/**
|
||||
* Convert numass set to uniform node which consists of points
|
||||
*
|
||||
* @param set
|
||||
* @return
|
||||
*/
|
||||
public static DataNode<NumassPoint> pointsToNode(NumassSet set){
|
||||
public static DataNode<NumassPoint> pointsToNode(NumassSet set) {
|
||||
return setToNode(set).checked(NumassPoint.class);
|
||||
}
|
||||
|
||||
|
@ -1,9 +1,15 @@
|
||||
package inr.numass.tasks
|
||||
|
||||
import hep.dataforge.data.CustomDataFilter
|
||||
import hep.dataforge.kodex.configure
|
||||
import hep.dataforge.kodex.fx.plots.PlotManager
|
||||
import hep.dataforge.kodex.fx.plots.plus
|
||||
import hep.dataforge.kodex.task
|
||||
import hep.dataforge.plots.PlotFrame
|
||||
import hep.dataforge.plots.data.DataPlot
|
||||
import hep.dataforge.tables.ListTable
|
||||
import hep.dataforge.tables.Table
|
||||
import hep.dataforge.tables.XYAdapter
|
||||
import inr.numass.data.analyzers.SmartAnalyzer
|
||||
import inr.numass.data.api.NumassSet
|
||||
import inr.numass.utils.NumassUtils
|
||||
@ -27,19 +33,36 @@ val monitorTableTask = task("monitor") {
|
||||
val monitorVoltage = meta.getDouble("monitorVoltage", 16000.0);
|
||||
val analyzer = SmartAnalyzer()
|
||||
val analyzerMeta = meta.getMetaOrEmpty("analyzer")
|
||||
val builder = ListTable.Builder("timestamp", "count", "cr", "crErr")
|
||||
//TODO add separator labels
|
||||
val res = ListTable.Builder("timestamp", "count", "cr", "crErr")
|
||||
.rows(
|
||||
data.values.stream().parallel()
|
||||
.flatMap { it.points }
|
||||
.filter { it.voltage == monitorVoltage }
|
||||
.map { it -> analyzer.analyzePoint(it, analyzerMeta) }
|
||||
)
|
||||
).build()
|
||||
|
||||
context.io().out("numass.monitor", name).use {
|
||||
NumassUtils.write(it, meta, builder.build())
|
||||
context.provide("plots", PlotManager::class.java).ifPresent {
|
||||
it.display(stage = "monitor") {
|
||||
configure {
|
||||
"xAxis.title" to "time"
|
||||
"xAxis.type" to "time"
|
||||
"yAxis.title" to "Count rate"
|
||||
"yAxis.units" to "Hz"
|
||||
}
|
||||
plots + DataPlot.plot(name, XYAdapter("timestamp", "cr", "crErr"), res)
|
||||
}.also { frame ->
|
||||
context.io().out("numass.monitor", name, "dfp").use {
|
||||
NumassUtils.writeEnvelope(it, PlotFrame.Wrapper().wrap(frame))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return@result builder.build();
|
||||
context.io().out("numass.monitor", name).use {
|
||||
NumassUtils.write(it, meta, res)
|
||||
}
|
||||
|
||||
return@result res;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user