Working on new controls for numass
This commit is contained in:
parent
89e8ed10f3
commit
069c5422e0
@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*
|
||||
* Copyright 2015 Alexander Nozik.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -22,10 +22,10 @@ import hep.dataforge.io.envelopes.EnvelopeBuilder;
|
||||
import hep.dataforge.io.messages.Responder;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import hep.dataforge.meta.MetaBuilder;
|
||||
import hep.dataforge.storage.commons.StorageUtils;
|
||||
import hep.dataforge.values.Value;
|
||||
import hep.dataforge.values.Values;
|
||||
import inr.numass.data.storage.NumassStorage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.zeroturnaround.zip.ZipUtil;
|
||||
|
||||
@ -39,6 +39,9 @@ import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import static hep.dataforge.io.messages.MessagesKt.*;
|
||||
|
||||
/**
|
||||
* @author darksnake
|
||||
@ -46,7 +49,6 @@ import java.util.Map;
|
||||
public class NumassClient implements AutoCloseable, Responder {
|
||||
|
||||
Socket socket;
|
||||
MessageFactory mf = new MessageFactory();
|
||||
|
||||
public NumassClient(String address, int port) throws IOException {
|
||||
socket = new Socket(address, port);
|
||||
@ -61,7 +63,7 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (!socket.isClosed()) {
|
||||
write(mf.terminator(), socket.getOutputStream());
|
||||
write(getTerminator(), socket.getOutputStream());
|
||||
}
|
||||
socket.close();
|
||||
}
|
||||
@ -73,7 +75,7 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
return read(socket.getInputStream());
|
||||
} catch (IOException ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Error in envelope exchange", ex);
|
||||
return mf.errorResponseBase(message, ex).build();
|
||||
return errorResponseBase(message, ex).build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -87,7 +89,7 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
}
|
||||
|
||||
private EnvelopeBuilder requestActionBase(String type, String action) {
|
||||
return mf.requestBase(type).putMetaValue("action", action);
|
||||
return requestBase(type).setMetaValue("action", action);
|
||||
}
|
||||
|
||||
public Meta getCurrentRun() {
|
||||
@ -111,34 +113,34 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
ByteBuffer buffer;
|
||||
String zipName = null;
|
||||
if (file.isDirectory()) {
|
||||
File tmpFile = File.createTempFile(file.getName(), NumassStorage.NUMASS_ZIP_EXTENSION);
|
||||
File tmpFile = File.createTempFile(file.getName(), NumassStorage.Companion.getNUMASS_ZIP_EXTENSION());
|
||||
tmpFile.deleteOnExit();
|
||||
ZipUtil.pack(file, tmpFile);
|
||||
zipName = file.getName();
|
||||
file = tmpFile;
|
||||
}
|
||||
|
||||
if (file.toString().endsWith(NumassStorage.NUMASS_ZIP_EXTENSION)) {
|
||||
if (file.toString().endsWith(NumassStorage.Companion.getNUMASS_ZIP_EXTENSION())) {
|
||||
FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
|
||||
buffer = ByteBuffer.allocate((int) channel.size());
|
||||
channel.read(buffer);
|
||||
if (zipName == null) {
|
||||
zipName = file.getName().replace(NumassStorage.NUMASS_ZIP_EXTENSION, "");
|
||||
zipName = file.getName().replace(NumassStorage.Companion.getNUMASS_ZIP_EXTENSION(), "");
|
||||
}
|
||||
} else {
|
||||
return StorageUtils.getErrorMeta(new FileNotFoundException(fileName));
|
||||
return getErrorMeta(new FileNotFoundException(fileName));
|
||||
}
|
||||
|
||||
Envelope bin = mf.requestBase("numass.data")
|
||||
.putMetaValue("action", "push")
|
||||
.putMetaValue("path", path)
|
||||
.putMetaValue("name", zipName)
|
||||
Envelope bin = requestBase("numass.data")
|
||||
.setMetaValue("action", "push")
|
||||
.setMetaValue("path", path)
|
||||
.setMetaValue("name", zipName)
|
||||
.setData(buffer)
|
||||
.build();
|
||||
|
||||
return respond(bin).getMeta();
|
||||
} catch (IOException ex) {
|
||||
return StorageUtils.getErrorMeta(ex);
|
||||
return getErrorMeta(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -159,7 +161,7 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
Meta response = respond(env.build()).getMeta();
|
||||
if (response.getBoolean("success", true)) {
|
||||
Map<String, Value> res = new HashMap<>();
|
||||
response.getMetaList("state").stream().forEach((stateMeta) -> {
|
||||
response.getMetaList("state").forEach((stateMeta) -> {
|
||||
res.put(stateMeta.getString("name"), stateMeta.getValue("value"));
|
||||
});
|
||||
return res;
|
||||
@ -193,7 +195,7 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
*/
|
||||
public Meta setState(Map<String, Value> stateMap) {
|
||||
EnvelopeBuilder env = requestActionBase("numass.state", "set");
|
||||
stateMap.entrySet().stream().forEach((state) -> {
|
||||
stateMap.entrySet().forEach((state) -> {
|
||||
env.putMetaNode(new MetaBuilder("state")
|
||||
.setValue("name", state.getKey())
|
||||
.setValue("value", state.getValue())
|
||||
@ -254,5 +256,9 @@ public class NumassClient implements AutoCloseable, Responder {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CompletableFuture<Envelope> respondInFuture(@NotNull Envelope message) {
|
||||
return CompletableFuture.supplyAsync(() -> respond(message));
|
||||
}
|
||||
}
|
||||
|
@ -141,8 +141,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor<PKT8Result>(context,
|
||||
}
|
||||
|
||||
|
||||
@Throws(ControlException::class)
|
||||
override fun buildPort(portName: String): Port {
|
||||
override fun buildPort(meta: Meta): Port {
|
||||
//setup connection
|
||||
val handler: Port = if ("virtual" == portName) {
|
||||
logger.info("Starting {} using virtual debug port", name)
|
||||
@ -225,67 +224,67 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor<PKT8Result>(context,
|
||||
|
||||
|
||||
override fun startMeasurement(oldMeta: Meta, newMeta: Meta) {
|
||||
if (!oldMeta.isEmpty) {
|
||||
logger.warn("Trying to start measurement which is already started")
|
||||
if (!oldMeta.isEmpty) {
|
||||
logger.warn("Trying to start measurement which is already started")
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info("Starting measurement")
|
||||
|
||||
//add weak stop listener
|
||||
stopListener = controller.onPhrase("[Ss]topped\\s*") {
|
||||
afterPause()
|
||||
updateLogicalState(Sensor.MEASURING_STATE, false)
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info("Starting measurement")
|
||||
//add weak measurement listener
|
||||
valueListener = controller.onPhrase("[a-f].*") {
|
||||
val trimmed = it.trim()
|
||||
val designation = trimmed.substring(0, 1)
|
||||
val rawValue = java.lang.Double.parseDouble(trimmed.substring(1)) / 100
|
||||
|
||||
//add weak stop listener
|
||||
stopListener = controller.onPhrase("[Ss]topped\\s*") {
|
||||
afterPause()
|
||||
updateLogicalState(Sensor.MEASURING_STATE, false)
|
||||
val channel = this@PKT8Device.channels[designation]
|
||||
|
||||
if (channel != null) {
|
||||
result(channel.evaluate(rawValue))
|
||||
collector.put(channel.name, channel.getTemperature(rawValue))
|
||||
} else {
|
||||
result(PKT8Result(designation, rawValue, -1.0))
|
||||
}
|
||||
|
||||
//add weak measurement listener
|
||||
valueListener = controller.onPhrase("[a-f].*") {
|
||||
val trimmed = it.trim()
|
||||
val designation = trimmed.substring(0, 1)
|
||||
val rawValue = java.lang.Double.parseDouble(trimmed.substring(1)) / 100
|
||||
|
||||
val channel = this@PKT8Device.channels[designation]
|
||||
|
||||
if (channel != null) {
|
||||
result(channel.evaluate(rawValue))
|
||||
collector.put(channel.name, channel.getTemperature(rawValue))
|
||||
} else {
|
||||
result(PKT8Result(designation, rawValue, -1.0))
|
||||
}
|
||||
}
|
||||
|
||||
//send start signal
|
||||
controller.send("s")
|
||||
|
||||
afterStart()
|
||||
} catch (ex: ControlException) {
|
||||
onError("Failed to start measurement", ex)
|
||||
}
|
||||
|
||||
//send start signal
|
||||
controller.send("s")
|
||||
|
||||
afterStart()
|
||||
} catch (ex: ControlException) {
|
||||
onError("Failed to start measurement", ex)
|
||||
}
|
||||
}
|
||||
|
||||
override fun stopMeasurement(meta: Meta) {
|
||||
if (isFinished) {
|
||||
logger.warn("Trying to stop measurement which is already stopped")
|
||||
return true
|
||||
} else {
|
||||
if (isFinished) {
|
||||
logger.warn("Trying to stop measurement which is already stopped")
|
||||
return true
|
||||
} else {
|
||||
|
||||
try {
|
||||
logger.info("Stopping measurement")
|
||||
val response = sendAndWait("p").trim()
|
||||
// Должно быть именно с большой буквы!!!
|
||||
return "Stopped" == response || "stopped" == response
|
||||
} catch (ex: Exception) {
|
||||
onError("Failed to stop measurement", ex)
|
||||
return false
|
||||
} finally {
|
||||
afterStop()
|
||||
errorListener?.let { controller.removeErrorListener(it) }
|
||||
stopListener?.let { controller.removePhraseListener(it) }
|
||||
valueListener?.let { controller.removePhraseListener(it) }
|
||||
collector.stop()
|
||||
logger.debug("Collector stopped")
|
||||
}
|
||||
try {
|
||||
logger.info("Stopping measurement")
|
||||
val response = sendAndWait("p").trim()
|
||||
// Должно быть именно с большой буквы!!!
|
||||
return "Stopped" == response || "stopped" == response
|
||||
} catch (ex: Exception) {
|
||||
onError("Failed to stop measurement", ex)
|
||||
return false
|
||||
} finally {
|
||||
afterStop()
|
||||
errorListener?.let { controller.removeErrorListener(it) }
|
||||
stopListener?.let { controller.removePhraseListener(it) }
|
||||
valueListener?.let { controller.removePhraseListener(it) }
|
||||
collector.stop()
|
||||
logger.debug("Collector stopped")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun setSPS(sps: Int) {
|
||||
@ -332,7 +331,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor<PKT8Result>(context,
|
||||
// }
|
||||
|
||||
|
||||
// inner class PKT8Measurement(private val controller: GenericPortController) : AbstractMeasurement<PKT8Result>() {
|
||||
// inner class PKT8Measurement(private val controller: GenericPortController) : AbstractMeasurement<PKT8Result>() {
|
||||
//
|
||||
// override fun getDevice(): Device = this@PKT8Device
|
||||
//
|
||||
|
@ -120,7 +120,7 @@ class MspDevice(context: Context, meta: Meta) : PortSensor<Values>(context, meta
|
||||
}
|
||||
}
|
||||
|
||||
override fun acceptError(errorMessage: String?, error: Throwable?) {
|
||||
override fun error(errorMessage: String?, error: Throwable?) {
|
||||
notifyError(errorMessage, error)
|
||||
}
|
||||
|
||||
|
@ -107,7 +107,7 @@ fun findDeviceMeta(config: Meta, criterion: Predicate<Meta>): Optional<Meta> {
|
||||
|
||||
fun setupContext(meta: Meta): Context {
|
||||
val ctx = Global.getContext("NUMASS-CONTROL")
|
||||
ctx.pluginManager.getOrLoad(StorageManager::class.java)
|
||||
ctx.pluginManager.load(StorageManager::class.java)
|
||||
return ctx
|
||||
}
|
||||
|
||||
|
@ -11,9 +11,9 @@ import hep.dataforge.control.devices.PortSensor
|
||||
import hep.dataforge.control.measurements.Measurement
|
||||
import hep.dataforge.control.measurements.SimpleMeasurement
|
||||
import hep.dataforge.control.ports.ComPort
|
||||
import hep.dataforge.control.ports.GenericPortController
|
||||
import hep.dataforge.control.ports.Port
|
||||
import hep.dataforge.control.ports.PortFactory
|
||||
import hep.dataforge.exceptions.ControlException
|
||||
import hep.dataforge.meta.Meta
|
||||
import inr.numass.control.DeviceView
|
||||
|
||||
@ -23,17 +23,21 @@ import inr.numass.control.DeviceView
|
||||
@DeviceView(VacDisplay::class)
|
||||
class CM32Device(context: Context, meta: Meta) : PortSensor<Double>(context, meta) {
|
||||
|
||||
@Throws(ControlException::class)
|
||||
override fun buildPort(portName: String): Port {
|
||||
override fun connect(meta: Meta): GenericPortController {
|
||||
val portName = meta.getString("name")
|
||||
logger.info("Connecting to port {}", portName)
|
||||
val new: Port
|
||||
if (portName.startsWith("com")) {
|
||||
new = ComPort(portName, 2400, 8, 1, 0)
|
||||
val port: Port = if (portName.startsWith("com")) {
|
||||
ComPort.create(portName, 2400, 8, 1, 0)
|
||||
} else {
|
||||
new = PortFactory.build(portName)
|
||||
PortFactory.build(meta)
|
||||
}
|
||||
new.setDelimiter("T--\r")
|
||||
return new
|
||||
return GenericPortController(context, port){it.endsWith("T--\r")}
|
||||
}
|
||||
|
||||
|
||||
|
||||
override fun startMeasurement(oldMeta: Meta, newMeta: Meta) {
|
||||
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||
}
|
||||
|
||||
override fun createMeasurement(): Measurement<Double> = CMVacMeasurement()
|
||||
|
@ -29,6 +29,10 @@ protobuf {
|
||||
generatedFilesBaseDir = "$projectDir/gen"
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
main.kotlin.srcDirs += 'gen/main/java'
|
||||
}
|
||||
|
||||
clean {
|
||||
delete protobuf.generatedFilesBaseDir
|
||||
}
|
||||
|
@ -1,101 +0,0 @@
|
||||
package inr.numass;
|
||||
|
||||
import hep.dataforge.io.envelopes.*;
|
||||
import hep.dataforge.values.Value;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* An envelope type for legacy numass tags. Reads legacy tag and writes DF02 tags
|
||||
*/
|
||||
public class NumassEnvelopeType implements EnvelopeType {
|
||||
|
||||
public static final byte[] LEGACY_START_SEQUENCE = {'#', '!'};
|
||||
public static final byte[] LEGACY_END_SEQUENCE = {'!', '#', '\r', '\n'};
|
||||
|
||||
@Override
|
||||
public int getCode() {
|
||||
return DefaultEnvelopeType.DEFAULT_ENVELOPE_TYPE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "numass";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Numass legacy envelope";
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnvelopeReader getReader(Map<String, String> properties) {
|
||||
return new NumassEnvelopeReader();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EnvelopeWriter getWriter(Map<String, String> properties) {
|
||||
return new DefaultEnvelopeWriter(this, MetaType.Companion.resolve(properties));
|
||||
}
|
||||
|
||||
public static class LegacyTag extends EnvelopeTag {
|
||||
@Override
|
||||
protected byte[] getStartSequence() {
|
||||
return LEGACY_START_SEQUENCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] getEndSequence() {
|
||||
return LEGACY_END_SEQUENCE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the length of tag in bytes. -1 means undefined size in case tag was modified
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public int getLength() {
|
||||
return 30;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read leagscy version 1 tag without leading tag head
|
||||
*
|
||||
* @param buffer
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
protected Map<String, Value> readHeader(ByteBuffer buffer) {
|
||||
Map<String, Value> res = new HashMap<>();
|
||||
|
||||
int type = buffer.getInt(2);
|
||||
res.put(Envelope.Companion.TYPE_PROPERTY, Value.of(type));
|
||||
|
||||
short metaTypeCode = buffer.getShort(10);
|
||||
MetaType metaType = MetaType.Companion.resolve(metaTypeCode);
|
||||
|
||||
if (metaType != null) {
|
||||
res.put(Envelope.Companion.META_TYPE_PROPERTY, Value.of(metaType.getName()));
|
||||
} else {
|
||||
LoggerFactory.getLogger(EnvelopeTag.class).warn("Could not resolve meta type. Using default");
|
||||
}
|
||||
|
||||
long metaLength = Integer.toUnsignedLong(buffer.getInt(14));
|
||||
res.put(Envelope.Companion.META_LENGTH_PROPERTY, Value.of(metaLength));
|
||||
long dataLength = Integer.toUnsignedLong(buffer.getInt(22));
|
||||
res.put(Envelope.Companion.DATA_LENGTH_PROPERTY, Value.of(dataLength));
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
private static class NumassEnvelopeReader extends DefaultEnvelopeReader {
|
||||
@Override
|
||||
protected EnvelopeTag newTag() {
|
||||
return new LegacyTag();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package inr.numass;
|
||||
|
||||
import hep.dataforge.context.Global;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Alexander Nozik
|
||||
*/
|
||||
public class NumassProperties {
|
||||
|
||||
private static File getNumassPropertiesFile() throws IOException {
|
||||
File file = new File(Global.INSTANCE.getUserDirectory(), "numass");
|
||||
if (!file.exists()) {
|
||||
file.mkdirs();
|
||||
}
|
||||
file = new File(file, "numass.cfg");
|
||||
if(!file.exists()){
|
||||
file.createNewFile();
|
||||
}
|
||||
return file;
|
||||
}
|
||||
|
||||
public static String getNumassProperty(String key) {
|
||||
try {
|
||||
Properties props = new Properties();
|
||||
props.load(new FileInputStream(getNumassPropertiesFile()));
|
||||
return props.getProperty(key);
|
||||
} catch (IOException ex) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized static void setNumassProperty(String key, @Nullable String value) {
|
||||
try {
|
||||
Properties props = new Properties();
|
||||
File store = getNumassPropertiesFile();
|
||||
props.load(new FileInputStream(store));
|
||||
if(value == null){
|
||||
props.remove(key);
|
||||
} else {
|
||||
props.setProperty(key, value);
|
||||
}
|
||||
props.store(new FileOutputStream(store), "");
|
||||
} catch (IOException ex) {
|
||||
Global.INSTANCE.getLogger().error("Failed to save numass properties", ex);
|
||||
}
|
||||
}
|
||||
}
|
@ -32,9 +32,9 @@ public interface NumassSet extends Named, Metoid, Iterable<NumassPoint>, Provide
|
||||
|
||||
Stream<NumassPoint> getPoints();
|
||||
|
||||
default String getDescription() {
|
||||
return getMeta().getString(DESCRIPTION_KEY, "");
|
||||
}
|
||||
// default String getDescription() {
|
||||
// return getMeta().getString(DESCRIPTION_KEY, "");
|
||||
// }
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
|
@ -10,7 +10,6 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static inr.numass.NumassEnvelopeType.LEGACY_START_SEQUENCE;
|
||||
import static java.nio.file.StandardOpenOption.READ;
|
||||
|
||||
public class NumassFileEnvelope extends FileEnvelope {
|
||||
@ -22,10 +21,10 @@ public class NumassFileEnvelope extends FileEnvelope {
|
||||
try (InputStream stream = Files.newInputStream(path, READ)) {
|
||||
byte[] bytes = new byte[2];
|
||||
stream.read(bytes);
|
||||
if (Arrays.equals(bytes, LEGACY_START_SEQUENCE)) {
|
||||
if (Arrays.equals(bytes, NumassEnvelopeType.Companion.getLEGACY_START_SEQUENCE())) {
|
||||
return new NumassFileEnvelope(path, readOnly);
|
||||
} else {
|
||||
return FileEnvelope.open(path, readOnly);
|
||||
return FileEnvelope.Companion.open(path, readOnly);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to open file envelope", e);
|
||||
|
@ -1,153 +0,0 @@
|
||||
package inr.numass.data.storage;
|
||||
|
||||
import hep.dataforge.io.envelopes.Envelope;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import inr.numass.data.api.NumassBlock;
|
||||
import inr.numass.data.api.NumassEvent;
|
||||
import inr.numass.data.api.NumassFrame;
|
||||
import inr.numass.data.api.NumassPoint;
|
||||
import inr.numass.data.legacy.NumassFileEnvelope;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Iterator;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
* Created by darksnake on 08.07.2017.
|
||||
*/
|
||||
public class ClassicNumassPoint implements NumassPoint {
|
||||
public static ClassicNumassPoint readFile(Path path) {
|
||||
return new ClassicNumassPoint(NumassFileEnvelope.open(path, true));
|
||||
}
|
||||
|
||||
private final Envelope envelope;
|
||||
|
||||
public ClassicNumassPoint(Envelope envelope) {
|
||||
this.envelope = envelope;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<NumassBlock> getBlocks() {
|
||||
// double u = envelope.meta().getDouble("external_meta.HV1_value", 0);
|
||||
long length;
|
||||
if (envelope.getMeta().hasValue("external_meta.acquisition_time")) {
|
||||
length = envelope.getMeta().getValue("external_meta.acquisition_time").longValue();
|
||||
} else {
|
||||
length = envelope.getMeta().getValue("acquisition_time").longValue();
|
||||
}
|
||||
return Stream.of(new ClassicBlock(getStartTime(), Duration.ofSeconds(length)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant getStartTime() {
|
||||
if (getMeta().hasValue("start_time")) {
|
||||
return getMeta().getValue("start_time").timeValue();
|
||||
} else {
|
||||
return Instant.EPOCH;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getVoltage() {
|
||||
return getMeta().getDouble("external_meta.HV1_value", 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getIndex() {
|
||||
return getMeta().getInt("external_meta.point_index", -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Meta getMeta() {
|
||||
return envelope.getMeta();
|
||||
}
|
||||
|
||||
//TODO split blocks using meta
|
||||
private class ClassicBlock implements NumassBlock, Iterable<NumassEvent> {
|
||||
private final Instant startTime;
|
||||
private final Duration length;
|
||||
// private final long blockOffset;
|
||||
|
||||
public ClassicBlock(Instant startTime, Duration length) {
|
||||
this.startTime = startTime;
|
||||
this.length = length;
|
||||
// this.blockOffset = blockOffset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant getStartTime() {
|
||||
return startTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getLength() {
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<NumassEvent> getEvents() {
|
||||
return StreamSupport.stream(this.spliterator(), false);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public Iterator<NumassEvent> iterator() {
|
||||
double timeCoef = envelope.getMeta().getDouble("time_coeff", 50);
|
||||
try {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(7000);
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN);
|
||||
ReadableByteChannel channel = envelope.getData().getChannel();
|
||||
channel.read(buffer);
|
||||
buffer.flip();
|
||||
return new Iterator<NumassEvent>() {
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
try {
|
||||
if (buffer.hasRemaining()) {
|
||||
return true;
|
||||
} else {
|
||||
buffer.flip();
|
||||
int num = channel.read(buffer);
|
||||
if (num > 0) {
|
||||
buffer.flip();
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LoggerFactory.getLogger(ClassicNumassPoint.this.getClass()).error("Unexpected IOException when reading block", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumassEvent next() {
|
||||
short channel = (short) Short.toUnsignedInt(buffer.getShort());
|
||||
long time = Integer.toUnsignedLong(buffer.getInt());
|
||||
byte status = buffer.get(); // status is ignored
|
||||
return new NumassEvent(channel, startTime, (long) (time * timeCoef));
|
||||
}
|
||||
};
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Stream<NumassFrame> getFrames() {
|
||||
return Stream.empty();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
package inr.numass.data.storage;
|
||||
|
||||
import hep.dataforge.context.Context;
|
||||
import hep.dataforge.data.DataFactory;
|
||||
import hep.dataforge.data.DataTree;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import hep.dataforge.storage.api.Storage;
|
||||
import hep.dataforge.storage.commons.StorageManager;
|
||||
import hep.dataforge.storage.commons.StorageUtils;
|
||||
import inr.numass.data.api.NumassSet;
|
||||
|
||||
/**
|
||||
* Created by darksnake on 03-Feb-17.
|
||||
*/
|
||||
public class NumassDataFactory extends DataFactory<NumassSet> {
|
||||
|
||||
public NumassDataFactory() {
|
||||
super(NumassSet.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "numass";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void fill(DataTree.Builder<NumassSet> builder, Context context, Meta meta) {
|
||||
Meta newMeta = meta.getBuilder().setValue("type", "numass");
|
||||
Storage storage = context.load(StorageManager.class, Meta.empty()).buildStorage(newMeta);
|
||||
StorageUtils.loaderStream(storage).forEach(loader -> {
|
||||
if (loader instanceof NumassSet) {
|
||||
builder.putStatic(loader.getFullName().toUnescaped(), (NumassSet) loader);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -1,230 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Alexander Nozik.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package inr.numass.data.storage;
|
||||
|
||||
import hep.dataforge.exceptions.StorageException;
|
||||
import hep.dataforge.io.ColumnedDataReader;
|
||||
import hep.dataforge.io.envelopes.Envelope;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import hep.dataforge.meta.MetaBuilder;
|
||||
import hep.dataforge.providers.Provider;
|
||||
import hep.dataforge.storage.api.ObjectLoader;
|
||||
import hep.dataforge.storage.api.Storage;
|
||||
import hep.dataforge.storage.filestorage.FileStorage;
|
||||
import hep.dataforge.storage.loaders.AbstractLoader;
|
||||
import hep.dataforge.tables.Table;
|
||||
import hep.dataforge.values.Value;
|
||||
import inr.numass.data.api.NumassPoint;
|
||||
import inr.numass.data.api.NumassSet;
|
||||
import inr.numass.data.legacy.NumassFileEnvelope;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.util.*;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
||||
/**
|
||||
* The reader for numass main detector data directory or zip format;
|
||||
*
|
||||
* @author darksnake
|
||||
*/
|
||||
public class NumassDataLoader extends AbstractLoader implements ObjectLoader<Envelope>, NumassSet, Provider {
|
||||
|
||||
|
||||
public static NumassDataLoader fromFile(Storage storage, Path zipFile) throws IOException {
|
||||
throw new UnsupportedOperationException("TODO");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Construct numass loader from directory
|
||||
*
|
||||
* @param storage
|
||||
* @param directory
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public static NumassDataLoader fromDir(Storage storage, Path directory, String name) throws IOException {
|
||||
if (!Files.isDirectory(directory)) {
|
||||
throw new IllegalArgumentException("Numass data directory required");
|
||||
}
|
||||
Meta annotation = new MetaBuilder("loader")
|
||||
.putValue("type", "numass")
|
||||
.putValue("numass.loaderFormat", "dir")
|
||||
// .setValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime()))
|
||||
.build();
|
||||
|
||||
if (name == null || name.isEmpty()) {
|
||||
name = FileStorage.entryName(directory);
|
||||
}
|
||||
|
||||
//FIXME envelopes are lazy do we need to do additional lazy evaluations here?
|
||||
Map<String, Supplier<Envelope>> items = new LinkedHashMap<>();
|
||||
|
||||
Files.list(directory).filter(file -> {
|
||||
String fileName = file.getFileName().toString();
|
||||
return fileName.equals(META_FRAGMENT_NAME)
|
||||
|| fileName.equals(HV_FRAGMENT_NAME)
|
||||
|| fileName.startsWith(POINT_FRAGMENT_NAME);
|
||||
}).forEach(file -> {
|
||||
try {
|
||||
items.put(FileStorage.entryName(file), () -> NumassFileEnvelope.open(file, true));
|
||||
} catch (Exception ex) {
|
||||
LoggerFactory.getLogger(NumassDataLoader.class)
|
||||
.error("Can't load numass data directory " + FileStorage.entryName(directory), ex);
|
||||
}
|
||||
});
|
||||
|
||||
return new NumassDataLoader(storage, name, annotation, items);
|
||||
}
|
||||
|
||||
/**
|
||||
* "start_time": "2016-04-20T04:08:50",
|
||||
*
|
||||
* @param meta
|
||||
* @return
|
||||
*/
|
||||
private static Instant readTime(Meta meta) {
|
||||
if (meta.hasValue("start_time")) {
|
||||
return meta.getValue("start_time").timeValue();
|
||||
} else {
|
||||
return Instant.EPOCH;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of informational meta file in numass data directory
|
||||
*/
|
||||
public static final String META_FRAGMENT_NAME = "meta";
|
||||
|
||||
/**
|
||||
* The beginning of point fragment name
|
||||
*/
|
||||
public static final String POINT_FRAGMENT_NAME = "p";
|
||||
|
||||
/**
|
||||
* The beginning of hv fragment name
|
||||
*/
|
||||
public static final String HV_FRAGMENT_NAME = "voltage";
|
||||
private final Map<String, Supplier<Envelope>> itemsProvider;
|
||||
|
||||
private NumassDataLoader(Storage storage, String name, Meta annotation) {
|
||||
super(storage, name, annotation);
|
||||
itemsProvider = new HashMap<>();
|
||||
readOnly = true;
|
||||
}
|
||||
|
||||
private NumassDataLoader(Storage storage, String name, Meta meta, Map<String, Supplier<Envelope>> items) {
|
||||
super(storage, name, meta);
|
||||
this.itemsProvider = items;
|
||||
readOnly = true;
|
||||
}
|
||||
|
||||
private Map<String, Supplier<Envelope>> getItems() {
|
||||
return itemsProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> fragmentNames() {
|
||||
return getItems().keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Meta getMeta() {
|
||||
return getItems()
|
||||
.get(META_FRAGMENT_NAME)
|
||||
.get()
|
||||
.getMeta();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<Table> getHvData() {
|
||||
return getHVEnvelope().map(hvEnvelope -> {
|
||||
try {
|
||||
return new ColumnedDataReader(hvEnvelope.getData().getStream(), "timestamp", "block", "value").toTable();
|
||||
} catch (IOException ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Failed to load HV data from file", ex);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
private Optional<Envelope> getHVEnvelope() {
|
||||
return Optional.ofNullable(getItems().get(HV_FRAGMENT_NAME)).map(Supplier::get);
|
||||
}
|
||||
|
||||
private Stream<Envelope> getPointEnvelopes() {
|
||||
return getItems().entrySet().stream()
|
||||
.filter(entry -> entry.getKey().startsWith(POINT_FRAGMENT_NAME) && entry.getValue() != null)
|
||||
.map(entry -> entry.getValue().get())
|
||||
.sorted(Comparator.comparing(t -> t.getMeta().getInt("external_meta.point_index", -1)));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<NumassPoint> getPoints() {
|
||||
return getPointEnvelopes().map(ClassicNumassPoint::new);
|
||||
}
|
||||
|
||||
public boolean isReversed() {
|
||||
return this.getMeta().getBoolean("iteration_info.reverse", false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty() {
|
||||
return getItems().isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Envelope pull(String header) {
|
||||
//PENDING read data to memory?
|
||||
return getItems().get(header).get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void push(String header, Envelope data) throws StorageException {
|
||||
tryPush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Envelope respond(Envelope message) {
|
||||
throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant getStartTime() {
|
||||
return getMeta().optValue("start_time").map(Value::timeValue).orElseGet(() -> NumassSet.super.getStartTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return this.getMeta().getString("description", "").replace("\\n", "\n");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() throws Exception {
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,216 +0,0 @@
|
||||
/*
|
||||
* Copyright 2015 Alexander Nozik.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package inr.numass.data.storage;
|
||||
|
||||
import hep.dataforge.context.Context;
|
||||
import hep.dataforge.events.Event;
|
||||
import hep.dataforge.events.EventBuilder;
|
||||
import hep.dataforge.exceptions.StorageException;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import hep.dataforge.storage.filestorage.FileStorage;
|
||||
import inr.numass.data.api.NumassSet;
|
||||
import inr.numass.data.legacy.NumassDatFile;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ByteChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static java.nio.file.StandardOpenOption.CREATE;
|
||||
import static java.nio.file.StandardOpenOption.WRITE;
|
||||
|
||||
|
||||
/**
|
||||
* The file storage containing numass data directories or zips.
|
||||
* <p>
|
||||
* Any subdirectory is treated as numass data directory. Any zip must have
|
||||
* {@code NUMASS_ZIP_EXTENSION} extension to be recognized. Any other files are
|
||||
* ignored.
|
||||
* </p>
|
||||
*
|
||||
* @author Alexander Nozik
|
||||
*/
|
||||
public class NumassStorage extends FileStorage {
|
||||
|
||||
public static final String NUMASS_ZIP_EXTENSION = ".nm.zip";
|
||||
public static final String NUMASS_DATA_LOADER_TYPE = "numassData";
|
||||
|
||||
protected NumassStorage(FileStorage parent, Meta config, String shelf) throws StorageException {
|
||||
super(parent, config, shelf);
|
||||
super.refresh();
|
||||
}
|
||||
|
||||
public NumassStorage(Context context, Meta config, Path path) throws StorageException {
|
||||
super(context, config, path);
|
||||
super.refresh();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateDirectoryLoaders() {
|
||||
try {
|
||||
this.loaders.clear();
|
||||
Files.list(getDataDir()).forEach(file -> {
|
||||
try {
|
||||
if (Files.isDirectory(file)) {
|
||||
Path metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME);
|
||||
if (Files.exists(metaFile)) {
|
||||
this.loaders.put(entryName(file),
|
||||
NumassDataLoader.fromDir(this, file, null));
|
||||
} else {
|
||||
this.shelves.put(entryName(file),
|
||||
new NumassStorage(this, getMeta(), entryName(file)));
|
||||
}
|
||||
} else if (file.getFileName().endsWith(NUMASS_ZIP_EXTENSION)) {
|
||||
this.loaders.put(entryName(file), NumassDataLoader.fromFile(this, file));
|
||||
// } else if (file.getFileName().endsWith(".points")) {
|
||||
// try {
|
||||
// loaders.put(getFileName(file),
|
||||
// FilePointLoader.fromFile(this, file, true));
|
||||
// } catch (Exception ex) {
|
||||
// getLogger().error("Failed to builder numass point loader from file {}", file.getName());
|
||||
// }
|
||||
} else {
|
||||
//updating non-numass loader files
|
||||
updateFile(file);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Error while creating numass loader", ex);
|
||||
} catch (StorageException ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Error while creating numass group", ex);
|
||||
}
|
||||
});
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public void pushNumassData(String path, String fileName, ByteBuffer data) throws StorageException {
|
||||
if (path == null || path.isEmpty()) {
|
||||
pushNumassData(fileName, data);
|
||||
} else {
|
||||
NumassStorage st = (NumassStorage) buildShelf(path);
|
||||
st.pushNumassData(fileName, data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read nm.zip content and write it as a new nm.zip file
|
||||
*
|
||||
* @param fileName
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void pushNumassData(String fileName, ByteBuffer data) throws StorageException {
|
||||
//FIXME move zip to internal
|
||||
try {
|
||||
Path nmFile = getDataDir().resolve(fileName + NUMASS_ZIP_EXTENSION);
|
||||
if (Files.exists(nmFile)) {
|
||||
LoggerFactory.getLogger(getClass()).warn("Trying to rewrite existing numass data file {}", nmFile.toString());
|
||||
}
|
||||
try (ByteChannel channel = Files.newByteChannel(nmFile, CREATE, WRITE)) {
|
||||
channel.write(data);
|
||||
}
|
||||
|
||||
dispatchEvent(NumassDataPointEvent.build(getName(), fileName, (int) Files.size(nmFile)));
|
||||
} catch (IOException ex) {
|
||||
throw new StorageException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumassStorage createShelf(Meta meta, String path) throws StorageException {
|
||||
return new NumassStorage(this, meta, path);
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of legacy DAT files in the directory
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public List<NumassSet> legacyFiles() {
|
||||
try {
|
||||
List<NumassSet> files = new ArrayList<>();
|
||||
Files.list(getDataDir()).forEach(file -> {
|
||||
if (Files.isRegularFile(file) && file.getFileName().toString().toLowerCase().endsWith(".dat")) {
|
||||
String name = file.getFileName().toString();
|
||||
try {
|
||||
files.add(new NumassDatFile(file, Meta.empty()));
|
||||
} catch (Exception ex) {
|
||||
LoggerFactory.getLogger(getClass()).error("Error while reading legacy numass file " + file.getFileName(), ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
return files;
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return getMeta().getString("description", "");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
super.close();
|
||||
//close remote file system after use
|
||||
try {
|
||||
getDataDir().getFileSystem().close();
|
||||
} catch (UnsupportedOperationException ex) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public static class NumassDataPointEvent extends Event {
|
||||
|
||||
public static final String FILE_NAME_KEY = "fileName";
|
||||
public static final String FILE_SIZE_KEY = "fileSize";
|
||||
|
||||
public NumassDataPointEvent(Meta meta) {
|
||||
super(meta);
|
||||
}
|
||||
|
||||
public static NumassDataPointEvent build(String source, String fileName, int fileSize) {
|
||||
return new NumassDataPointEvent(builder(source, fileName, fileSize).buildEventMeta());
|
||||
}
|
||||
|
||||
public static EventBuilder builder(String source, String fileName, int fileSize) {
|
||||
return EventBuilder.make("numass.storage.pushData")
|
||||
.setSource(source)
|
||||
.setMetaValue(FILE_NAME_KEY, fileName)
|
||||
.setMetaValue(FILE_SIZE_KEY, fileSize);
|
||||
}
|
||||
|
||||
public int getFileSize() {
|
||||
return getMeta().getInt(FILE_SIZE_KEY, 0);
|
||||
}
|
||||
|
||||
public String getFileName() {
|
||||
return getMeta().getString(FILE_NAME_KEY);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format("(%s) [%s] : pushed numass data file with name '%s' and size '%d'",
|
||||
time().toString(), sourceTag(), getFileName(), getFileSize());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -1,83 +0,0 @@
|
||||
package inr.numass.data.storage;
|
||||
|
||||
import com.github.robtimus.filesystems.sftp.SFTPEnvironment;
|
||||
import hep.dataforge.context.Context;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import hep.dataforge.meta.MetaBuilder;
|
||||
import hep.dataforge.storage.api.Storage;
|
||||
import hep.dataforge.storage.api.StorageType;
|
||||
import hep.dataforge.storage.commons.StorageManager;
|
||||
import hep.dataforge.storage.filestorage.FileStorage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystem;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
/**
|
||||
* Created by darksnake on 17-May-17.
|
||||
*/
|
||||
public class NumassStorageFactory implements StorageType {
|
||||
|
||||
/**
|
||||
* Build local storage with Global context. Used for tests.
|
||||
*
|
||||
* @param file
|
||||
* @return
|
||||
*/
|
||||
@NotNull
|
||||
public static FileStorage buildLocal(Context context, Path file, boolean readOnly, boolean monitor) {
|
||||
StorageManager manager = context.load(StorageManager.class, Meta.empty());
|
||||
return (FileStorage) manager.buildStorage(buildStorageMeta(file.toUri(), readOnly, monitor));
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public static FileStorage buildLocal(Context context, String path, boolean readOnly, boolean monitor) {
|
||||
Path file = context.getIo().getDataDir().resolve(path);
|
||||
return buildLocal(context, file, readOnly, monitor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String type() {
|
||||
return "numass";
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public Storage build(Context context, Meta meta) {
|
||||
if (meta.hasValue("path")) {
|
||||
URI uri = URI.create(meta.getString("path"));
|
||||
Path path;
|
||||
if (uri.getScheme().startsWith("ssh")) {
|
||||
try {
|
||||
String username = meta.getString("userName", uri.getUserInfo());
|
||||
//String host = meta.getString("host", uri.getHost());
|
||||
int port = meta.getInt("port", 22);
|
||||
SFTPEnvironment env = new SFTPEnvironment()
|
||||
.withUsername(username)
|
||||
.withPassword(meta.getString("password", "").toCharArray());
|
||||
FileSystem fs = FileSystems.newFileSystem(uri, env, context.getClassLoader());
|
||||
path = fs.getPath(uri.getPath());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
path = Paths.get(uri);
|
||||
}
|
||||
return new NumassStorage(context, meta, path);
|
||||
} else {
|
||||
context.getLogger().warn("A storage path not provided. Creating default root storage in the working directory");
|
||||
return new NumassStorage(context, meta, context.getIo().getWorkDir());
|
||||
}
|
||||
}
|
||||
|
||||
public static MetaBuilder buildStorageMeta(URI path, boolean readOnly, boolean monitor) {
|
||||
return new MetaBuilder("storage")
|
||||
.setValue("path", path.toString())
|
||||
.setValue("type", "numass")
|
||||
.setValue("readOnly", readOnly)
|
||||
.setValue("monitor", monitor);
|
||||
}
|
||||
}
|
@ -1,110 +0,0 @@
|
||||
package inr.numass.data.storage;
|
||||
|
||||
import hep.dataforge.io.envelopes.Envelope;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import inr.numass.data.NumassProto;
|
||||
import inr.numass.data.api.NumassBlock;
|
||||
import inr.numass.data.api.NumassEvent;
|
||||
import inr.numass.data.api.NumassFrame;
|
||||
import inr.numass.data.api.NumassPoint;
|
||||
import inr.numass.data.legacy.NumassFileEnvelope;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Comparator;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Protobuf based numass point
|
||||
* Created by darksnake on 09.07.2017.
|
||||
*/
|
||||
public class ProtoNumassPoint implements NumassPoint {
|
||||
public static ProtoNumassPoint readFile(Path path) {
|
||||
return new ProtoNumassPoint(NumassFileEnvelope.open(path, true));
|
||||
}
|
||||
|
||||
|
||||
private final Envelope envelope;
|
||||
|
||||
public ProtoNumassPoint(Envelope envelope) {
|
||||
this.envelope = envelope;
|
||||
}
|
||||
|
||||
private NumassProto.Point getPoint() {
|
||||
try (InputStream stream = envelope.getData().getStream()) {
|
||||
return NumassProto.Point.parseFrom(stream);
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException("Failed to read point via protobuf");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<NumassBlock> getBlocks() {
|
||||
return getPoint().getChannelsList().stream()
|
||||
.flatMap(channel ->
|
||||
channel.getBlocksList().stream()
|
||||
.map(block -> new ProtoBlock((int) channel.getNum(), block))
|
||||
.sorted(Comparator.comparing(ProtoBlock::getStartTime))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Meta getMeta() {
|
||||
return envelope.getMeta();
|
||||
}
|
||||
|
||||
public static Instant ofEpochNanos(long nanos) {
|
||||
long seconds = Math.floorDiv(nanos, (int) 1e9);
|
||||
int reminder = (int) (nanos % 1e9);
|
||||
return Instant.ofEpochSecond(seconds, reminder);
|
||||
}
|
||||
|
||||
private class ProtoBlock implements NumassBlock {
|
||||
|
||||
final int channel;
|
||||
final NumassProto.Point.Channel.Block block;
|
||||
|
||||
private ProtoBlock(int channel, NumassProto.Point.Channel.Block block) {
|
||||
this.channel = channel;
|
||||
this.block = block;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Instant getStartTime() {
|
||||
return ofEpochNanos(block.getTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getLength() {
|
||||
return Duration.ofNanos((long) (getMeta().getDouble("params.b_size") / getMeta().getDouble("params.sample_freq") * 1e9));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<NumassEvent> getEvents() {
|
||||
Instant blockTime = getStartTime();
|
||||
if (block.hasEvents()) {
|
||||
NumassProto.Point.Channel.Block.Events events = block.getEvents();
|
||||
return IntStream.range(0, events.getTimesCount()).mapToObj(i ->
|
||||
new NumassEvent((short) events.getAmplitudes(i), blockTime, events.getTimes(i))
|
||||
);
|
||||
} else {
|
||||
return Stream.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<NumassFrame> getFrames() {
|
||||
Duration tickSize = Duration.ofNanos((long) (1e9 / getMeta().getInt("params.sample_freq")));
|
||||
return block.getFramesList().stream().map(frame -> {
|
||||
Instant time = getStartTime().plusNanos(frame.getTime());
|
||||
ByteBuffer data = frame.getData().asReadOnlyByteBuffer();
|
||||
return new NumassFrame(time, tickSize, data.asShortBuffer());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
84
numass-core/src/main/kotlin/inr/numass/NumassEnvelopeType.kt
Normal file
84
numass-core/src/main/kotlin/inr/numass/NumassEnvelopeType.kt
Normal file
@ -0,0 +1,84 @@
|
||||
package inr.numass
|
||||
|
||||
import hep.dataforge.io.envelopes.*
|
||||
import hep.dataforge.values.Value
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* An envelope type for legacy numass tags. Reads legacy tag and writes DF02 tags
|
||||
*/
|
||||
class NumassEnvelopeType : EnvelopeType {
|
||||
|
||||
override val code: Int = DefaultEnvelopeType.DEFAULT_ENVELOPE_TYPE
|
||||
|
||||
override val name: String = "numass"
|
||||
|
||||
override fun description(): String = "Numass legacy envelope"
|
||||
|
||||
override fun getReader(properties: Map<String, String>): EnvelopeReader {
|
||||
return NumassEnvelopeReader()
|
||||
}
|
||||
|
||||
override fun getWriter(properties: Map<String, String>): EnvelopeWriter {
|
||||
return DefaultEnvelopeWriter(this, MetaType.resolve(properties))
|
||||
}
|
||||
|
||||
class LegacyTag : EnvelopeTag() {
|
||||
override val startSequence: ByteArray
|
||||
get() = LEGACY_START_SEQUENCE
|
||||
|
||||
override val endSequence: ByteArray
|
||||
get() = LEGACY_END_SEQUENCE
|
||||
|
||||
/**
|
||||
* Get the length of tag in bytes. -1 means undefined size in case tag was modified
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override val length: Int
|
||||
get() = 30
|
||||
|
||||
/**
|
||||
* Read leagscy version 1 tag without leading tag head
|
||||
*
|
||||
* @param buffer
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
override fun readHeader(buffer: ByteBuffer): Map<String, Value> {
|
||||
val res = HashMap<String, Value>()
|
||||
|
||||
val type = buffer.getInt(2)
|
||||
res[Envelope.TYPE_PROPERTY] = Value.of(type)
|
||||
|
||||
val metaTypeCode = buffer.getShort(10)
|
||||
val metaType = MetaType.resolve(metaTypeCode)
|
||||
|
||||
if (metaType != null) {
|
||||
res[Envelope.META_TYPE_PROPERTY] = Value.of(metaType.name)
|
||||
} else {
|
||||
LoggerFactory.getLogger(EnvelopeTag::class.java).warn("Could not resolve meta type. Using default")
|
||||
}
|
||||
|
||||
val metaLength = Integer.toUnsignedLong(buffer.getInt(14))
|
||||
res[Envelope.META_LENGTH_PROPERTY] = Value.of(metaLength)
|
||||
val dataLength = Integer.toUnsignedLong(buffer.getInt(22))
|
||||
res[Envelope.DATA_LENGTH_PROPERTY] = Value.of(dataLength)
|
||||
return res
|
||||
}
|
||||
}
|
||||
|
||||
private class NumassEnvelopeReader : DefaultEnvelopeReader() {
|
||||
override fun newTag(): EnvelopeTag {
|
||||
return LegacyTag()
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
val LEGACY_START_SEQUENCE = byteArrayOf('#'.toByte(), '!'.toByte())
|
||||
val LEGACY_END_SEQUENCE = byteArrayOf('!'.toByte(), '#'.toByte(), '\r'.toByte(), '\n'.toByte())
|
||||
}
|
||||
}
|
63
numass-core/src/main/kotlin/inr/numass/NumassProperties.kt
Normal file
63
numass-core/src/main/kotlin/inr/numass/NumassProperties.kt
Normal file
@ -0,0 +1,63 @@
|
||||
/*
|
||||
* To change this license header, choose License Headers in Project Properties.
|
||||
* To change this template file, choose Tools | Templates
|
||||
* and open the template in the editor.
|
||||
*/
|
||||
package inr.numass
|
||||
|
||||
import hep.dataforge.context.Global
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
import java.io.FileOutputStream
|
||||
import java.io.IOException
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Alexander Nozik
|
||||
*/
|
||||
object NumassProperties {
|
||||
|
||||
private val numassPropertiesFile: File
|
||||
@Throws(IOException::class)
|
||||
get() {
|
||||
var file = File(Global.userDirectory, "numass")
|
||||
if (!file.exists()) {
|
||||
file.mkdirs()
|
||||
}
|
||||
file = File(file, "numass.cfg")
|
||||
if (!file.exists()) {
|
||||
file.createNewFile()
|
||||
}
|
||||
return file
|
||||
}
|
||||
|
||||
fun getNumassProperty(key: String): String? {
|
||||
try {
|
||||
val props = Properties()
|
||||
props.load(FileInputStream(numassPropertiesFile))
|
||||
return props.getProperty(key)
|
||||
} catch (ex: IOException) {
|
||||
return null
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun setNumassProperty(key: String, value: String?) {
|
||||
try {
|
||||
val props = Properties()
|
||||
val store = numassPropertiesFile
|
||||
props.load(FileInputStream(store))
|
||||
if (value == null) {
|
||||
props.remove(key)
|
||||
} else {
|
||||
props.setProperty(key, value)
|
||||
}
|
||||
props.store(FileOutputStream(store), "")
|
||||
} catch (ex: IOException) {
|
||||
Global.logger.error("Failed to save numass properties", ex)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,130 @@
|
||||
package inr.numass.data.storage
|
||||
|
||||
import hep.dataforge.io.envelopes.Envelope
|
||||
import hep.dataforge.meta.Meta
|
||||
import inr.numass.data.api.NumassBlock
|
||||
import inr.numass.data.api.NumassEvent
|
||||
import inr.numass.data.api.NumassFrame
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.legacy.NumassFileEnvelope
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.ByteOrder
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.stream.Stream
|
||||
import java.util.stream.StreamSupport
|
||||
|
||||
/**
|
||||
* Created by darksnake on 08.07.2017.
|
||||
*/
|
||||
class ClassicNumassPoint(private val envelope: Envelope) : NumassPoint {
|
||||
|
||||
override fun getBlocks(): Stream<NumassBlock> {
|
||||
// double u = envelope.meta().getDouble("external_meta.HV1_value", 0);
|
||||
val length: Long
|
||||
if (envelope.meta.hasValue("external_meta.acquisition_time")) {
|
||||
length = envelope.meta.getValue("external_meta.acquisition_time").longValue()
|
||||
} else {
|
||||
length = envelope.meta.getValue("acquisition_time").longValue()
|
||||
}
|
||||
return Stream.of(ClassicBlock(startTime, Duration.ofSeconds(length)))
|
||||
}
|
||||
|
||||
override fun getStartTime(): Instant {
|
||||
return if (meta.hasValue("start_time")) {
|
||||
meta.getValue("start_time").timeValue()
|
||||
} else {
|
||||
Instant.EPOCH
|
||||
}
|
||||
}
|
||||
|
||||
override fun getVoltage(): Double {
|
||||
return meta.getDouble("external_meta.HV1_value", 0.0)
|
||||
}
|
||||
|
||||
override fun getIndex(): Int {
|
||||
return meta.getInt("external_meta.point_index", -1)
|
||||
}
|
||||
|
||||
override fun getMeta(): Meta {
|
||||
return envelope.meta
|
||||
}
|
||||
|
||||
//TODO split blocks using meta
|
||||
private inner class ClassicBlock
|
||||
// private final long blockOffset;
|
||||
|
||||
(private val startTime: Instant, private val length: Duration)// this.blockOffset = blockOffset;
|
||||
: NumassBlock, Iterable<NumassEvent> {
|
||||
|
||||
override fun getStartTime(): Instant {
|
||||
return startTime
|
||||
}
|
||||
|
||||
override fun getLength(): Duration {
|
||||
return length
|
||||
}
|
||||
|
||||
override fun getEvents(): Stream<NumassEvent> {
|
||||
return StreamSupport.stream(this.spliterator(), false)
|
||||
}
|
||||
|
||||
override fun iterator(): Iterator<NumassEvent> {
|
||||
val timeCoef = envelope.meta.getDouble("time_coeff", 50.0)
|
||||
try {
|
||||
val buffer = ByteBuffer.allocate(7000)
|
||||
buffer.order(ByteOrder.LITTLE_ENDIAN)
|
||||
val channel = envelope.data.channel
|
||||
channel.read(buffer)
|
||||
buffer.flip()
|
||||
return object : Iterator<NumassEvent> {
|
||||
|
||||
override fun hasNext(): Boolean {
|
||||
try {
|
||||
if (buffer.hasRemaining()) {
|
||||
return true
|
||||
} else {
|
||||
buffer.flip()
|
||||
val num = channel.read(buffer)
|
||||
if (num > 0) {
|
||||
buffer.flip()
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
LoggerFactory.getLogger(this@ClassicNumassPoint.javaClass).error("Unexpected IOException when reading block", e)
|
||||
return false
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override fun next(): NumassEvent {
|
||||
val channel = java.lang.Short.toUnsignedInt(buffer.short).toShort()
|
||||
val time = Integer.toUnsignedLong(buffer.int)
|
||||
val status = buffer.get() // status is ignored
|
||||
return NumassEvent(channel, startTime, (time * timeCoef).toLong())
|
||||
}
|
||||
}
|
||||
} catch (ex: IOException) {
|
||||
throw RuntimeException(ex)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
override fun getFrames(): Stream<NumassFrame> {
|
||||
return Stream.empty()
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun readFile(path: Path): ClassicNumassPoint {
|
||||
return ClassicNumassPoint(NumassFileEnvelope.open(path, true))
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package inr.numass.data.storage
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.data.DataFactory
|
||||
import hep.dataforge.data.DataTree
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.storage.commons.StorageManager
|
||||
import hep.dataforge.storage.commons.StorageUtils
|
||||
import inr.numass.data.api.NumassSet
|
||||
|
||||
/**
|
||||
* Created by darksnake on 03-Feb-17.
|
||||
*/
|
||||
class NumassDataFactory : DataFactory<NumassSet>(NumassSet::class.java) {
|
||||
|
||||
override fun getName(): String {
|
||||
return "numass"
|
||||
}
|
||||
|
||||
|
||||
override fun fill(builder: DataTree.Builder<NumassSet>, context: Context, meta: Meta) {
|
||||
val newMeta = meta.builder.setValue("type", "numass")
|
||||
val storage = context.load(StorageManager::class.java, Meta.empty()).buildStorage(newMeta)
|
||||
StorageUtils.loaderStream(storage).forEach { loader ->
|
||||
if (loader is NumassSet) {
|
||||
builder.putStatic(loader.fullName.toUnescaped(), loader as NumassSet)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,204 @@
|
||||
/*
|
||||
* Copyright 2015 Alexander Nozik.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package inr.numass.data.storage
|
||||
|
||||
import hep.dataforge.exceptions.StorageException
|
||||
import hep.dataforge.io.ColumnedDataReader
|
||||
import hep.dataforge.io.envelopes.Envelope
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaBuilder
|
||||
import hep.dataforge.providers.Provider
|
||||
import hep.dataforge.storage.api.ObjectLoader
|
||||
import hep.dataforge.storage.api.Storage
|
||||
import hep.dataforge.storage.filestorage.FileStorage
|
||||
import hep.dataforge.storage.loaders.AbstractLoader
|
||||
import hep.dataforge.tables.Table
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.api.NumassSet
|
||||
import inr.numass.data.legacy.NumassFileEnvelope
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.io.IOException
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.function.Supplier
|
||||
import java.util.stream.Stream
|
||||
|
||||
|
||||
/**
|
||||
* The reader for numass main detector data directory or zip format;
|
||||
*
|
||||
* @author darksnake
|
||||
*/
|
||||
class NumassDataLoader(
|
||||
storage: Storage,
|
||||
name: String,
|
||||
meta: Meta,
|
||||
private val items: Map<String, Supplier<out Envelope>>,
|
||||
override var isReadOnly: Boolean = true
|
||||
) : AbstractLoader(storage, name, meta), ObjectLoader<Envelope>, NumassSet, Provider {
|
||||
|
||||
private val hvEnvelope: Optional<Envelope>
|
||||
get() = Optional.ofNullable(items[HV_FRAGMENT_NAME]).map { it.get() }
|
||||
|
||||
private val pointEnvelopes: Stream<Envelope>
|
||||
get() = items.entries.stream()
|
||||
.filter { entry -> entry.key.startsWith(POINT_FRAGMENT_NAME) }
|
||||
.map { entry -> entry.value.get() }
|
||||
.sorted(Comparator.comparing<Envelope, Int> { t -> t.meta.getInt("external_meta.point_index", -1) })
|
||||
|
||||
val isReversed: Boolean
|
||||
get() = this.meta.getBoolean("iteration_info.reverse", false)
|
||||
|
||||
override val isEmpty: Boolean
|
||||
get() = items.isEmpty()
|
||||
|
||||
override val description: String = this.meta.getString("description", "").replace("\\n", "\n")
|
||||
|
||||
override fun fragmentNames(): Collection<String> {
|
||||
return items.keys
|
||||
}
|
||||
|
||||
override fun getMeta(): Meta {
|
||||
return items[META_FRAGMENT_NAME]?.get()?.meta ?: Meta.empty()
|
||||
|
||||
}
|
||||
|
||||
override fun getHvData(): Optional<Table> {
|
||||
return hvEnvelope.map { hvEnvelope ->
|
||||
try {
|
||||
ColumnedDataReader(hvEnvelope.data.stream, "timestamp", "block", "value").toTable()
|
||||
} catch (ex: IOException) {
|
||||
LoggerFactory.getLogger(javaClass).error("Failed to load HV data from file", ex)
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override fun getPoints(): Stream<NumassPoint> {
|
||||
return pointEnvelopes.map { ClassicNumassPoint(it) }
|
||||
}
|
||||
|
||||
override fun pull(fragmentName: String): Envelope {
|
||||
//PENDING read data to memory?
|
||||
return items[fragmentName]?.get()
|
||||
?: throw StorageException("The fragment with name $fragmentName is not found in the loader $name")
|
||||
}
|
||||
|
||||
@Throws(StorageException::class)
|
||||
override fun push(fragmentName: String, data: Envelope) {
|
||||
tryPush()
|
||||
TODO()
|
||||
}
|
||||
|
||||
override fun respond(message: Envelope): Envelope {
|
||||
throw TODO("Not supported yet.")
|
||||
}
|
||||
|
||||
override fun getStartTime(): Instant {
|
||||
return meta.optValue("start_time").map<Instant> { it.timeValue() }.orElseGet { super.getStartTime() }
|
||||
}
|
||||
|
||||
override val isOpen: Boolean
|
||||
get() = true
|
||||
|
||||
override fun close() {
|
||||
//do nothing
|
||||
}
|
||||
|
||||
|
||||
companion object {
|
||||
|
||||
|
||||
@Throws(IOException::class)
|
||||
fun fromFile(storage: Storage, zipFile: Path): NumassDataLoader {
|
||||
throw UnsupportedOperationException("TODO")
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Construct numass loader from directory
|
||||
*
|
||||
* @param storage
|
||||
* @param directory
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Throws(IOException::class)
|
||||
fun fromDir(storage: Storage, directory: Path, name: String = FileStorage.entryName(directory)): NumassDataLoader {
|
||||
if (!Files.isDirectory(directory)) {
|
||||
throw IllegalArgumentException("Numass data directory required")
|
||||
}
|
||||
val annotation = MetaBuilder("loader")
|
||||
.putValue("type", "numass")
|
||||
.putValue("numass.loaderFormat", "dir")
|
||||
// .setValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime()))
|
||||
.build()
|
||||
|
||||
//FIXME envelopes are lazy do we need to do additional lazy evaluations here?
|
||||
val items = LinkedHashMap<String, Supplier<out Envelope>>()
|
||||
|
||||
Files.list(directory).filter { file ->
|
||||
val fileName = file.fileName.toString()
|
||||
(fileName == META_FRAGMENT_NAME
|
||||
|| fileName == HV_FRAGMENT_NAME
|
||||
|| fileName.startsWith(POINT_FRAGMENT_NAME))
|
||||
}.forEach { file ->
|
||||
try {
|
||||
items[FileStorage.entryName(file)] = Supplier{ NumassFileEnvelope.open(file, true) }
|
||||
} catch (ex: Exception) {
|
||||
LoggerFactory.getLogger(NumassDataLoader::class.java)
|
||||
.error("Can't load numass data directory " + FileStorage.entryName(directory), ex)
|
||||
}
|
||||
}
|
||||
|
||||
return NumassDataLoader(storage, name, annotation, items)
|
||||
}
|
||||
|
||||
/**
|
||||
* "start_time": "2016-04-20T04:08:50",
|
||||
*
|
||||
* @param meta
|
||||
* @return
|
||||
*/
|
||||
private fun readTime(meta: Meta): Instant {
|
||||
return if (meta.hasValue("start_time")) {
|
||||
meta.getValue("start_time").timeValue()
|
||||
} else {
|
||||
Instant.EPOCH
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The name of informational meta file in numass data directory
|
||||
*/
|
||||
val META_FRAGMENT_NAME = "meta"
|
||||
|
||||
/**
|
||||
* The beginning of point fragment name
|
||||
*/
|
||||
val POINT_FRAGMENT_NAME = "p"
|
||||
|
||||
/**
|
||||
* The beginning of hv fragment name
|
||||
*/
|
||||
val HV_FRAGMENT_NAME = "voltage"
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,203 @@
|
||||
/*
|
||||
* Copyright 2015 Alexander Nozik.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package inr.numass.data.storage
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.events.Event
|
||||
import hep.dataforge.events.EventBuilder
|
||||
import hep.dataforge.exceptions.StorageException
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.storage.filestorage.FileStorage
|
||||
import inr.numass.data.api.NumassSet
|
||||
import inr.numass.data.legacy.NumassDatFile
|
||||
import org.slf4j.LoggerFactory
|
||||
import java.io.IOException
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption.CREATE
|
||||
import java.nio.file.StandardOpenOption.WRITE
|
||||
import java.util.*
|
||||
|
||||
|
||||
/**
|
||||
* The file storage containing numass data directories or zips.
|
||||
*
|
||||
*
|
||||
* Any subdirectory is treated as numass data directory. Any zip must have
|
||||
* `NUMASS_ZIP_EXTENSION` extension to be recognized. Any other files are
|
||||
* ignored.
|
||||
*
|
||||
*
|
||||
* @author Alexander Nozik
|
||||
*/
|
||||
class NumassStorage : FileStorage {
|
||||
|
||||
val description: String
|
||||
get() = meta.getString("description", "")
|
||||
|
||||
@Throws(StorageException::class)
|
||||
protected constructor(parent: FileStorage, config: Meta, shelf: String) : super(parent, config, shelf) {
|
||||
super.refresh()
|
||||
}
|
||||
|
||||
@Throws(StorageException::class)
|
||||
constructor(context: Context, config: Meta, path: Path) : super(context, config, path) {
|
||||
super.refresh()
|
||||
}
|
||||
|
||||
override fun updateDirectoryLoaders() {
|
||||
try {
|
||||
this.loaders.clear()
|
||||
Files.list(dataDir).forEach { file ->
|
||||
try {
|
||||
if (Files.isDirectory(file)) {
|
||||
val metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME)
|
||||
if (Files.exists(metaFile)) {
|
||||
this.loaders[entryName(file)] = NumassDataLoader.fromDir(this, file)
|
||||
} else {
|
||||
this.shelves[entryName(file)] = NumassStorage(this, meta, entryName(file))
|
||||
}
|
||||
} else if (file.fileName.endsWith(NUMASS_ZIP_EXTENSION)) {
|
||||
this.loaders[entryName(file)] = NumassDataLoader.fromFile(this, file)
|
||||
} else {
|
||||
//updating non-numass loader files
|
||||
updateFile(file)
|
||||
}
|
||||
} catch (ex: IOException) {
|
||||
LoggerFactory.getLogger(javaClass).error("Error while creating numass loader", ex)
|
||||
} catch (ex: StorageException) {
|
||||
LoggerFactory.getLogger(javaClass).error("Error while creating numass group", ex)
|
||||
}
|
||||
}
|
||||
} catch (ex: IOException) {
|
||||
throw RuntimeException(ex)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Throws(StorageException::class)
|
||||
fun pushNumassData(path: String?, fileName: String, data: ByteBuffer) {
|
||||
if (path == null || path.isEmpty()) {
|
||||
pushNumassData(fileName, data)
|
||||
} else {
|
||||
val st = buildShelf(path) as NumassStorage
|
||||
st.pushNumassData(fileName, data)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read nm.zip content and write it as a new nm.zip file
|
||||
*
|
||||
* @param fileName
|
||||
*/
|
||||
@Throws(StorageException::class)
|
||||
fun pushNumassData(fileName: String, data: ByteBuffer) {
|
||||
//FIXME move zip to internal
|
||||
try {
|
||||
val nmFile = dataDir.resolve(fileName + NUMASS_ZIP_EXTENSION)
|
||||
if (Files.exists(nmFile)) {
|
||||
LoggerFactory.getLogger(javaClass).warn("Trying to rewrite existing numass data file {}", nmFile.toString())
|
||||
}
|
||||
Files.newByteChannel(nmFile, CREATE, WRITE).use { channel -> channel.write(data) }
|
||||
|
||||
dispatchEvent(NumassDataPointEvent.build(name, fileName, Files.size(nmFile).toInt()))
|
||||
} catch (ex: IOException) {
|
||||
throw StorageException(ex)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Throws(StorageException::class)
|
||||
override fun createShelf(meta: Meta, path: String): NumassStorage {
|
||||
return NumassStorage(this, meta, path)
|
||||
}
|
||||
|
||||
/**
|
||||
* A list of legacy DAT files in the directory
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
fun legacyFiles(): List<NumassSet> {
|
||||
try {
|
||||
val files = ArrayList<NumassSet>()
|
||||
Files.list(dataDir).forEach { file ->
|
||||
if (Files.isRegularFile(file) && file.fileName.toString().toLowerCase().endsWith(".dat")) {
|
||||
val name = file.fileName.toString()
|
||||
try {
|
||||
files.add(NumassDatFile(file, Meta.empty()))
|
||||
} catch (ex: Exception) {
|
||||
LoggerFactory.getLogger(javaClass).error("Error while reading legacy numass file " + file.fileName, ex)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
return files
|
||||
} catch (ex: IOException) {
|
||||
throw RuntimeException(ex)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Throws(Exception::class)
|
||||
override fun close() {
|
||||
super.close()
|
||||
//close remote file system after use
|
||||
try {
|
||||
dataDir.fileSystem.close()
|
||||
} catch (ex: UnsupportedOperationException) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class NumassDataPointEvent(meta: Meta) : Event(meta) {
|
||||
|
||||
val fileSize: Int = meta.getInt(FILE_SIZE_KEY, 0)
|
||||
|
||||
val fileName: String = meta.getString(FILE_NAME_KEY)
|
||||
|
||||
override fun toString(): String {
|
||||
return String.format("(%s) [%s] : pushed numass data file with name '%s' and size '%d'",
|
||||
time().toString(), sourceTag(), fileName, fileSize)
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
val FILE_NAME_KEY = "fileName"
|
||||
val FILE_SIZE_KEY = "fileSize"
|
||||
|
||||
fun build(source: String, fileName: String, fileSize: Int): NumassDataPointEvent {
|
||||
return NumassDataPointEvent(builder(source, fileName, fileSize).buildEventMeta())
|
||||
}
|
||||
|
||||
fun builder(source: String, fileName: String, fileSize: Int): EventBuilder<*> {
|
||||
return EventBuilder.make("numass.storage.pushData")
|
||||
.setSource(source)
|
||||
.setMetaValue(FILE_NAME_KEY, fileName)
|
||||
.setMetaValue(FILE_SIZE_KEY, fileSize)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
val NUMASS_ZIP_EXTENSION = ".nm.zip"
|
||||
val NUMASS_DATA_LOADER_TYPE = "numassData"
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,79 @@
|
||||
package inr.numass.data.storage
|
||||
|
||||
import com.github.robtimus.filesystems.sftp.SFTPEnvironment
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaBuilder
|
||||
import hep.dataforge.storage.api.Storage
|
||||
import hep.dataforge.storage.api.StorageType
|
||||
import hep.dataforge.storage.commons.StorageManager
|
||||
import hep.dataforge.storage.filestorage.FileStorage
|
||||
import java.net.URI
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.Paths
|
||||
|
||||
/**
|
||||
* Created by darksnake on 17-May-17.
|
||||
*/
|
||||
class NumassStorageFactory : StorageType {
|
||||
|
||||
override fun type(): String {
|
||||
return "numass"
|
||||
}
|
||||
|
||||
override fun build(context: Context, meta: Meta): Storage {
|
||||
if (meta.hasValue("path")) {
|
||||
val uri = URI.create(meta.getString("path"))
|
||||
val path: Path
|
||||
if (uri.scheme.startsWith("ssh")) {
|
||||
try {
|
||||
val username = meta.getString("userName", uri.userInfo)
|
||||
//String host = meta.getString("host", uri.getHost());
|
||||
val port = meta.getInt("port", 22)
|
||||
val env = SFTPEnvironment()
|
||||
.withUsername(username)
|
||||
.withPassword(meta.getString("password", "").toCharArray())
|
||||
val fs = FileSystems.newFileSystem(uri, env, context.classLoader)
|
||||
path = fs.getPath(uri.path)
|
||||
} catch (e: Exception) {
|
||||
throw RuntimeException(e)
|
||||
}
|
||||
|
||||
} else {
|
||||
path = Paths.get(uri)
|
||||
}
|
||||
return NumassStorage(context, meta, path)
|
||||
} else {
|
||||
context.logger.warn("A storage path not provided. Creating default root storage in the working directory")
|
||||
return NumassStorage(context, meta, context.io.workDir)
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
|
||||
/**
|
||||
* Build local storage with Global context. Used for tests.
|
||||
*
|
||||
* @param file
|
||||
* @return
|
||||
*/
|
||||
fun buildLocal(context: Context, file: Path, readOnly: Boolean, monitor: Boolean): FileStorage {
|
||||
val manager = context.load(StorageManager::class.java, Meta.empty())
|
||||
return manager.buildStorage(buildStorageMeta(file.toUri(), readOnly, monitor)) as FileStorage
|
||||
}
|
||||
|
||||
fun buildLocal(context: Context, path: String, readOnly: Boolean, monitor: Boolean): FileStorage {
|
||||
val file = context.io.dataDir.resolve(path)
|
||||
return buildLocal(context, file, readOnly, monitor)
|
||||
}
|
||||
|
||||
fun buildStorageMeta(path: URI, readOnly: Boolean, monitor: Boolean): MetaBuilder {
|
||||
return MetaBuilder("storage")
|
||||
.setValue("path", path.toString())
|
||||
.setValue("type", "numass")
|
||||
.setValue("readOnly", readOnly)
|
||||
.setValue("monitor", monitor)
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
package inr.numass.data.storage
|
||||
|
||||
import hep.dataforge.io.envelopes.Envelope
|
||||
import hep.dataforge.meta.Meta
|
||||
import inr.numass.data.NumassProto
|
||||
import inr.numass.data.api.NumassBlock
|
||||
import inr.numass.data.api.NumassEvent
|
||||
import inr.numass.data.api.NumassFrame
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.legacy.NumassFileEnvelope
|
||||
import java.io.IOException
|
||||
import java.nio.file.Path
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.stream.IntStream
|
||||
import java.util.stream.Stream
|
||||
|
||||
/**
|
||||
* Protobuf based numass point
|
||||
* Created by darksnake on 09.07.2017.
|
||||
*/
|
||||
class ProtoNumassPoint(private val envelope: Envelope) : NumassPoint {
|
||||
|
||||
private val point: NumassProto.Point
|
||||
get() = try {
|
||||
envelope.data.stream.use { stream -> return NumassProto.Point.parseFrom(stream) }
|
||||
} catch (ex: IOException) {
|
||||
throw RuntimeException("Failed to read point via protobuf")
|
||||
}
|
||||
|
||||
override fun getBlocks(): Stream<NumassBlock> {
|
||||
return point.channelsList.stream()
|
||||
.flatMap { channel ->
|
||||
channel.blocksList.stream()
|
||||
.map { block -> ProtoBlock(channel.num.toInt(), block) }
|
||||
.sorted(Comparator.comparing<ProtoBlock, Instant> { it.startTime })
|
||||
}
|
||||
}
|
||||
|
||||
override fun getMeta(): Meta {
|
||||
return envelope.meta
|
||||
}
|
||||
|
||||
private inner class ProtoBlock(internal val channel: Int, internal val block: NumassProto.Point.Channel.Block) : NumassBlock {
|
||||
|
||||
override fun getStartTime(): Instant {
|
||||
return ofEpochNanos(block.time)
|
||||
}
|
||||
|
||||
override fun getLength(): Duration {
|
||||
return Duration.ofNanos((meta.getDouble("params.b_size") / meta.getDouble("params.sample_freq") * 1e9).toLong())
|
||||
}
|
||||
|
||||
override fun getEvents(): Stream<NumassEvent> {
|
||||
val blockTime = startTime
|
||||
if (block.hasEvents()) {
|
||||
val events = block.events
|
||||
return IntStream.range(0, events.timesCount).mapToObj { i -> NumassEvent(events.getAmplitudes(i).toShort(), blockTime, events.getTimes(i)) }
|
||||
} else {
|
||||
return Stream.empty()
|
||||
}
|
||||
}
|
||||
|
||||
override fun getFrames(): Stream<NumassFrame> {
|
||||
val tickSize = Duration.ofNanos((1e9 / meta.getInt("params.sample_freq")).toLong())
|
||||
return block.framesList.stream().map { frame ->
|
||||
val time = startTime.plusNanos(frame.time)
|
||||
val data = frame.data.asReadOnlyByteBuffer()
|
||||
NumassFrame(time, tickSize, data.asShortBuffer())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun readFile(path: Path): ProtoNumassPoint {
|
||||
return ProtoNumassPoint(NumassFileEnvelope.open(path, true))
|
||||
}
|
||||
|
||||
fun ofEpochNanos(nanos: Long): Instant {
|
||||
val seconds = Math.floorDiv(nanos, 1e9.toInt().toLong())
|
||||
val reminder = (nanos % 1e9).toInt()
|
||||
return Instant.ofEpochSecond(seconds, reminder.toLong())
|
||||
}
|
||||
}
|
||||
}
|
@ -41,7 +41,7 @@ class TransformDataAction : OneToOneAction<Table, Table>() {
|
||||
|
||||
meta.optMeta("corrections").ifPresent { cors ->
|
||||
MetaUtils.nodeStream(cors)
|
||||
.map<Meta> { it.value }
|
||||
.map<Meta> { it.second }
|
||||
.map<Correction> { this.makeCorrection(it) }
|
||||
.forEach { corrections.add(it) }
|
||||
}
|
||||
|
@ -58,10 +58,6 @@ class NumassTransmission(context: Context, meta: Meta) : AbstractParametricBiFun
|
||||
return LossCalculator.instance().getLossProbability(0, getX(eIn, set))
|
||||
}
|
||||
|
||||
private fun getTrapFunction(context: Context, meta: Meta): BivariateFunction {
|
||||
return FunctionLibrary.buildFrom(context).buildBivariateFunction(meta)
|
||||
}
|
||||
|
||||
override fun derivValue(parName: String, eIn: Double, eOut: Double, set: Values): Double {
|
||||
return when (parName) {
|
||||
"trap" -> trapFunc.value(eIn, eOut)
|
||||
|
@ -59,7 +59,7 @@ private fun createSummaryNode(storage: Storage): MetaBuilder {
|
||||
fun calculateStatistics(summary: Meta, hv: Double): Meta {
|
||||
var totalLength = 0.0
|
||||
var totalCount = 0L
|
||||
MetaUtils.nodeStream(summary).map { it.value }.filter { it.name == "point" && it.getDouble("hv") == hv }.forEach {
|
||||
MetaUtils.nodeStream(summary).map { it.second }.filter { it.name == "point" && it.getDouble("hv") == hv }.forEach {
|
||||
totalCount += it.getInt("count")
|
||||
totalLength += it.getDouble("length")
|
||||
}
|
||||
|
@ -63,7 +63,7 @@ private fun oldModel(context: Context, meta: Meta): ParametricFunction {
|
||||
val A = meta.getDouble("resolution", meta.getDouble("resolution.width", 8.3e-5))//8.3e-5
|
||||
val from = meta.getDouble("from", 13900.0)
|
||||
val to = meta.getDouble("to", 18700.0)
|
||||
context.chronicle.report("Setting up tritium model with real transmission function")
|
||||
context.history.report("Setting up tritium model with real transmission function")
|
||||
|
||||
val resolutionTail: BivariateFunction = if (meta.hasValue("resolution.tailAlpha")) {
|
||||
ResolutionFunction.getAngledTail(meta.getDouble("resolution.tailAlpha"), meta.getDouble("resolution.tailBeta", 0.0))
|
||||
@ -73,7 +73,7 @@ private fun oldModel(context: Context, meta: Meta): ParametricFunction {
|
||||
//RangedNamedSetSpectrum beta = new BetaSpectrum(context.io().getFile("FS.txt"));
|
||||
val sp = ModularSpectrum(BetaSpectrum(), ResolutionFunction(A, resolutionTail), from, to)
|
||||
if (meta.getBoolean("caching", false)) {
|
||||
context.chronicle.report("Caching turned on")
|
||||
context.history.report("Caching turned on")
|
||||
sp.setCaching(true)
|
||||
}
|
||||
//Adding trapping energy dependence
|
||||
|
@ -6,7 +6,6 @@
|
||||
package inr.numass.server;
|
||||
|
||||
import hep.dataforge.storage.api.StateLoader;
|
||||
import hep.dataforge.values.Value;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -17,10 +16,9 @@ public class HandlerUtils {
|
||||
public static String renderStates(StateLoader states) {
|
||||
StringBuilder b = new StringBuilder();
|
||||
b.append("<div class=\"shifted\">\n");
|
||||
for (String state : states.getStateNames()) {
|
||||
Value val = states.getValue(state);
|
||||
states.getValueStream().forEach(pair->{
|
||||
String color;
|
||||
switch (val.getType()) {
|
||||
switch (pair.getSecond().getType()) {
|
||||
case NUMBER:
|
||||
color = "blue";
|
||||
break;
|
||||
@ -35,8 +33,8 @@ public class HandlerUtils {
|
||||
|
||||
}
|
||||
b.append(String.format("<p> <strong>%s</strong> : <font color= \"%s\">%s</font> </p>%n",
|
||||
state, color, val.stringValue()));
|
||||
}
|
||||
pair.getFirst(), color, pair.getSecond().stringValue()));
|
||||
});
|
||||
b.append("</div>\n");
|
||||
return b.toString();
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
/*
|
||||
/*
|
||||
* Copyright 2015 Alexander Nozik.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
@ -27,13 +27,17 @@ import hep.dataforge.storage.api.Storage;
|
||||
import hep.dataforge.storage.commons.LoaderFactory;
|
||||
import hep.dataforge.values.Value;
|
||||
import inr.numass.data.storage.NumassStorage;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static hep.dataforge.io.messages.MessagesKt.errorResponseBase;
|
||||
import static hep.dataforge.io.messages.MessagesKt.okResponseBase;
|
||||
import static inr.numass.server.NumassServerUtils.getNotes;
|
||||
|
||||
/**
|
||||
@ -59,7 +63,6 @@ public class NumassRun implements Metoid, Responder {
|
||||
private final StateLoader states;
|
||||
|
||||
private final ObjectLoader<NumassNote> noteLoader;
|
||||
private final MessageFactory factory;
|
||||
|
||||
private final Logger logger;
|
||||
|
||||
@ -67,11 +70,10 @@ public class NumassRun implements Metoid, Responder {
|
||||
// * A set with inverted order of elements (last note first)
|
||||
// */
|
||||
// private final Set<NumassNote> notes = new TreeSet<>((NumassNote o1, NumassNote o2) -> -o1.time().compareTo(o2.time()));
|
||||
public NumassRun(String path, Storage workStorage, MessageFactory factory) throws StorageException {
|
||||
public NumassRun(String path, Storage workStorage) throws StorageException {
|
||||
this.storage = workStorage;
|
||||
this.states = LoaderFactory.buildStateLoder(storage, RUN_STATE, null);
|
||||
this.noteLoader = LoaderFactory.buildObjectLoder(storage, RUN_NOTES, null);
|
||||
this.factory = factory;
|
||||
this.states = LoaderFactory.buildStateLoder(storage, RUN_STATE, "");
|
||||
this.noteLoader = (ObjectLoader<NumassNote>) LoaderFactory.buildObjectLoder(storage, RUN_NOTES, "");
|
||||
this.runPath = path;
|
||||
logger = LoggerFactory.getLogger("CURRENT_RUN");
|
||||
}
|
||||
@ -141,15 +143,15 @@ public class NumassRun implements Metoid, Responder {
|
||||
} else {
|
||||
addNote(NumassNote.buildFrom(message.getMeta()));
|
||||
}
|
||||
return factory.okResponseBase(message, false, false).build();
|
||||
return okResponseBase(message, false, false).build();
|
||||
} catch (Exception ex) {
|
||||
logger.error("Failed to push note", ex);
|
||||
return factory.errorResponseBase(message, ex).build();
|
||||
return errorResponseBase(message, ex).build();
|
||||
}
|
||||
}
|
||||
|
||||
private Envelope pullNotes(Envelope message) {
|
||||
EnvelopeBuilder envelope = factory.okResponseBase(message, true, false);
|
||||
EnvelopeBuilder envelope = okResponseBase(message, true, false);
|
||||
int limit = message.getMeta().getInt("limit", -1);
|
||||
//TODO add time window and search conditions here
|
||||
Stream<NumassNote> stream = getNotes(noteLoader);
|
||||
@ -165,17 +167,17 @@ public class NumassRun implements Metoid, Responder {
|
||||
try {
|
||||
String filePath = message.getMeta().getString("path", "");
|
||||
String fileName = message.getMeta().getString("name")
|
||||
.replace(NumassStorage.NUMASS_ZIP_EXTENSION, "");// removing .nm.zip if it is present
|
||||
.replace(NumassStorage.Companion.getNUMASS_ZIP_EXTENSION(), "");// removing .nm.zip if it is present
|
||||
if (storage instanceof NumassStorage) {
|
||||
((NumassStorage) storage).pushNumassData(filePath, fileName, message.getData().getBuffer());
|
||||
} else {
|
||||
throw new StorageException("Storage does not support numass point push");
|
||||
}
|
||||
//TODO add checksum here
|
||||
return factory.okResponseBase("numass.data.push.response", false, false).build();
|
||||
return okResponseBase("numass.data.push.response", false, false).build();
|
||||
} catch (StorageException | IOException ex) {
|
||||
logger.error("Failed to push point", ex);
|
||||
return factory.errorResponseBase("numass.data.push.response", ex).build();
|
||||
return errorResponseBase("numass.data.push.response", ex).build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -196,4 +198,9 @@ public class NumassRun implements Metoid, Responder {
|
||||
return states;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
@Override
|
||||
public CompletableFuture<Envelope> respondInFuture(@NotNull Envelope message) {
|
||||
return CompletableFuture.supplyAsync(() -> respond(message));
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,9 @@ import ratpack.server.RatpackServer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static hep.dataforge.io.messages.MessagesKt.errorResponseBase;
|
||||
import static hep.dataforge.io.messages.MessagesKt.responseBase;
|
||||
|
||||
/**
|
||||
* @author darksnake
|
||||
*/
|
||||
@ -67,7 +70,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
|
||||
new StorageManager().startGlobal();
|
||||
this.root = storage;
|
||||
try {
|
||||
rootState = LoaderFactory.buildStateLoder(storage, "@numass", null);
|
||||
rootState = LoaderFactory.buildStateLoder(storage, "@numass", "");
|
||||
updateRun();
|
||||
} catch (StorageException ex) {
|
||||
throw new RuntimeException(ex);
|
||||
@ -99,8 +102,8 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
|
||||
|
||||
private void startRun(Meta meta) throws StorageException {
|
||||
String path = meta.getString("path", DEFAULT_RUN_PATH);
|
||||
Storage storage = StorageUtils.getOrBuildShelf(root, path, meta);
|
||||
run = new NumassRun(path, storage, getResponseFactory());
|
||||
Storage storage = StorageUtils.INSTANCE.getOrBuildShelf(root, path, meta);
|
||||
run = new NumassRun(path, storage);
|
||||
getRootState().push("numass.current.run", path);
|
||||
}
|
||||
|
||||
@ -125,8 +128,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
|
||||
case "numass.run.state":
|
||||
return getRun().respond(message);
|
||||
case "numass.control":
|
||||
return getResponseFactory()
|
||||
.errorResponseBase(message,
|
||||
return errorResponseBase(message,
|
||||
new UnknownNumassActionException("numass.control",
|
||||
UnknownNumassActionException.Cause.IN_DEVELOPMENT))
|
||||
.build();
|
||||
@ -144,7 +146,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
|
||||
private void updateRun() throws StorageException {
|
||||
String currentRun = getRootState().getString("numass.current.run", DEFAULT_RUN_PATH);
|
||||
Storage storage = root.optShelf(currentRun).get();
|
||||
this.run = new NumassRun(currentRun, storage, getResponseFactory());
|
||||
this.run = new NumassRun(currentRun, storage);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -159,7 +161,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
|
||||
runAn.putNode(getRun().getMeta());
|
||||
}
|
||||
|
||||
return getResponseFactory().responseBase("numass.run.response")
|
||||
return responseBase("numass.run.response")
|
||||
.putMetaNode(runAn.build()).build();
|
||||
}
|
||||
|
||||
@ -187,7 +189,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar
|
||||
throw new UnknownNumassActionException(action, UnknownNumassActionException.Cause.NOT_SUPPORTED);
|
||||
}
|
||||
} catch (StorageException ex) {
|
||||
return getResponseFactory().errorResponseBase("numass.run.response", ex).build();
|
||||
return errorResponseBase("numass.run.response", ex).build();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -21,7 +21,7 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable {
|
||||
public static final String SERVER_CONFIG_PATH = "numass-server.xml";
|
||||
private static final String NUMASS_REPO_ELEMENT = "numass.repository";
|
||||
private static final String LISTENER_ELEMENT = "listener";
|
||||
// private static final String NUMASS_REPO_PATH_PROPERTY = "numass.repository.path";
|
||||
// private static final String NUMASS_REPO_PATH_PROPERTY = "numass.repository.path";
|
||||
NumassStorage root;
|
||||
NumassServer listener;
|
||||
Context context = Global.INSTANCE.getContext("NUMASS_SERVER");
|
||||
@ -29,7 +29,7 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable {
|
||||
public ServerRunner() throws IOException, ParseException {
|
||||
// Global.instance().getPluginManager().load(StorageManager.class);
|
||||
|
||||
Path configFile = context.getIo().getFile(SERVER_CONFIG_PATH);
|
||||
Path configFile = context.getIo().getFile(SERVER_CONFIG_PATH).getPath();
|
||||
if (Files.exists(configFile)) {
|
||||
context.getLogger().info("Trying to read server configuration from {}", SERVER_CONFIG_PATH);
|
||||
configure(MetaFileReader.read(configFile));
|
||||
@ -56,16 +56,16 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable {
|
||||
public ServerRunner start() throws Exception {
|
||||
// String repoPath = meta().getString(NUMASS_REPO_PATH_PROPERTY, ".");
|
||||
|
||||
Meta storageMeta = getMeta().getMetaOrEmpty(NUMASS_REPO_ELEMENT);
|
||||
context.getLogger().info("Initializing file storage with meta: {}",storageMeta);
|
||||
root = (NumassStorage) StorageManager.buildStorage(context,storageMeta);
|
||||
Meta storageMeta = getConfig().getMetaOrEmpty(NUMASS_REPO_ELEMENT);
|
||||
context.getLogger().info("Initializing file storage with meta: {}", storageMeta);
|
||||
root = (NumassStorage) StorageManager.Companion.buildStorage(context, storageMeta);
|
||||
|
||||
context.getLogger().info("Starting numass server");
|
||||
if (root != null) {
|
||||
root.open();
|
||||
Meta listenerConfig = null;
|
||||
if (getMeta().hasMeta(LISTENER_ELEMENT)) {
|
||||
listenerConfig = getMeta().getMeta(LISTENER_ELEMENT);
|
||||
if (getConfig().hasMeta(LISTENER_ELEMENT)) {
|
||||
listenerConfig = getConfig().getMeta(LISTENER_ELEMENT);
|
||||
}
|
||||
|
||||
listener = new NumassServer(root, listenerConfig);
|
||||
|
@ -27,7 +27,7 @@ public class TestServer {
|
||||
* @throws hep.dataforge.exceptions.StorageException
|
||||
*/
|
||||
public static void main(String[] args) throws Exception {
|
||||
Context context = Global.Companion.getContext("NUMASS-SERVER");
|
||||
Context context = Global.INSTANCE.getContext("NUMASS-SERVER");
|
||||
|
||||
StorageManager storageManager = context.getPluginManager().load(StorageManager.class);
|
||||
|
||||
@ -36,7 +36,7 @@ public class TestServer {
|
||||
File path = new File("/D:/temp/test");
|
||||
context.getLogger().info("Starting test numass storage servlet in '{}'", path);
|
||||
|
||||
NumassStorage storage = (NumassStorage) storageManager.buildStorage(NumassStorageFactory.buildStorageMeta(path.toURI(), true, true));
|
||||
NumassStorage storage = (NumassStorage) storageManager.buildStorage(NumassStorageFactory.Companion.buildStorageMeta(path.toURI(), true, true));
|
||||
StorageServerUtils.addStorage(serverManager,storage,"numass-storage");
|
||||
|
||||
serverManager.startServer();
|
||||
|
@ -13,7 +13,7 @@ import java.util.stream.Stream
|
||||
* Created by darksnake on 23-Jun-17.
|
||||
*/
|
||||
class NumassDataCache(val data: NumassSet) : NumassSet {
|
||||
private val cachedDescription: String by lazy { data.description }
|
||||
//private val cachedDescription: String by lazy { data.description }
|
||||
private val cachedMeta: Meta by lazy { data.meta }
|
||||
private val cachedPoints: List<NumassPoint> by lazy { data.points.collect(Collectors.toList()) }
|
||||
private val hv: Optional<Table> by lazy { data.hvData }
|
||||
@ -23,9 +23,9 @@ class NumassDataCache(val data: NumassSet) : NumassSet {
|
||||
return cachedPoints.stream();
|
||||
}
|
||||
|
||||
override fun getDescription(): String {
|
||||
return cachedDescription
|
||||
}
|
||||
// override fun getDescription(): String {
|
||||
// return cachedDescription
|
||||
// }
|
||||
|
||||
override fun getMeta(): Meta {
|
||||
return cachedMeta
|
||||
|
@ -34,7 +34,7 @@ import java.io.File
|
||||
import java.net.URI
|
||||
import kotlin.streams.toList
|
||||
|
||||
class StorageView(private val context: Context = Global.instance()) : View(title = "Numass storage", icon = ImageView(dfIcon)) {
|
||||
class StorageView(private val context: Context = Global) : View(title = "Numass storage", icon = ImageView(dfIcon)) {
|
||||
|
||||
|
||||
val storageProperty = SimpleObjectProperty<Storage?>()
|
||||
|
@ -1,7 +1,7 @@
|
||||
package inr.numass.viewer.test
|
||||
|
||||
import hep.dataforge.context.Global
|
||||
import hep.dataforge.fx.dfIcon
|
||||
import hep.dataforge.kodex.global
|
||||
import hep.dataforge.tables.Table
|
||||
import inr.numass.data.api.NumassPoint
|
||||
import inr.numass.data.api.NumassSet
|
||||
@ -35,7 +35,7 @@ class ViewerComponentsTest : View(title = "Numass viewer test", icon = ImageView
|
||||
button("Click me!") {
|
||||
action {
|
||||
runAsync {
|
||||
val set: NumassSet = NumassStorageFactory.buildLocal(global, "D:\\Work\\Numass\\data\\2017_05\\Fill_2", true, true)
|
||||
val set: NumassSet = NumassStorageFactory.buildLocal(Global, "D:\\Work\\Numass\\data\\2017_05\\Fill_2", true, true)
|
||||
.provide("loader::set_2", NumassSet::class.java)
|
||||
.orElseThrow { RuntimeException("err") }
|
||||
update(set);
|
||||
|
Loading…
Reference in New Issue
Block a user