From 069c5422e02d88bf1e1c9be5cb76f7e32be15e5b Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Tue, 6 Mar 2018 16:59:09 +0300 Subject: [PATCH] Working on new controls for numass --- .../java/inr/numass/client/NumassClient.java | 42 ++-- .../inr/numass/control/cryotemp/PKT8Device.kt | 107 ++++---- .../inr/numass/control/msp/MspDevice.kt | 2 +- .../inr/numass/control/NumassControlUtils.kt | 2 +- .../inr/numass/control/readvac/CM32Device.kt | 22 +- numass-core/build.gradle | 4 + .../java/inr/numass/NumassEnvelopeType.java | 101 -------- .../java/inr/numass/NumassProperties.java | 60 ----- .../java/inr/numass/data/api/NumassSet.java | 6 +- .../data/legacy/NumassFileEnvelope.java | 5 +- .../data/storage/ClassicNumassPoint.java | 153 ------------ .../data/storage/NumassDataFactory.java | 37 --- .../numass/data/storage/NumassDataLoader.java | 230 ------------------ .../numass/data/storage/NumassStorage.java | 216 ---------------- .../data/storage/NumassStorageFactory.java | 83 ------- .../numass/data/storage/ProtoNumassPoint.java | 110 --------- .../kotlin/inr/numass/NumassEnvelopeType.kt | 84 +++++++ .../kotlin/inr/numass/NumassProperties.kt | 63 +++++ .../numass/data/storage/ClassicNumassPoint.kt | 130 ++++++++++ .../numass/data/storage/NumassDataFactory.kt | 30 +++ .../numass/data/storage/NumassDataLoader.kt | 204 ++++++++++++++++ .../inr/numass/data/storage/NumassStorage.kt | 203 ++++++++++++++++ .../data/storage/NumassStorageFactory.kt | 79 ++++++ .../numass/data/storage/ProtoNumassPoint.kt | 86 +++++++ .../inr/numass/actions/TransformDataAction.kt | 2 +- .../models/sterile/NumassTransmission.kt | 4 - .../inr/numass/scripts/utils/ScanTree.kt | 2 +- .../test/java/inr/numass/models/TestModels.kt | 4 +- .../java/inr/numass/server/HandlerUtils.java | 10 +- .../java/inr/numass/server/NumassRun.java | 31 ++- .../java/inr/numass/server/NumassServer.java | 18 +- .../java/inr/numass/server/ServerRunner.java | 14 +- .../java/inr/numass/server/TestServer.java | 4 +- .../inr/numass/viewer/NumassDataCache.kt | 8 +- .../kotlin/inr/numass/viewer/StorageView.kt | 2 +- .../inr/numass/viewer/test/ComponentTest.kt | 4 +- 36 files changed, 1033 insertions(+), 1129 deletions(-) delete mode 100644 numass-core/src/main/java/inr/numass/NumassEnvelopeType.java delete mode 100644 numass-core/src/main/java/inr/numass/NumassProperties.java delete mode 100644 numass-core/src/main/java/inr/numass/data/storage/ClassicNumassPoint.java delete mode 100644 numass-core/src/main/java/inr/numass/data/storage/NumassDataFactory.java delete mode 100644 numass-core/src/main/java/inr/numass/data/storage/NumassDataLoader.java delete mode 100644 numass-core/src/main/java/inr/numass/data/storage/NumassStorage.java delete mode 100644 numass-core/src/main/java/inr/numass/data/storage/NumassStorageFactory.java delete mode 100644 numass-core/src/main/java/inr/numass/data/storage/ProtoNumassPoint.java create mode 100644 numass-core/src/main/kotlin/inr/numass/NumassEnvelopeType.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/NumassProperties.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/data/storage/ClassicNumassPoint.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt create mode 100644 numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt diff --git a/numass-client/src/main/java/inr/numass/client/NumassClient.java b/numass-client/src/main/java/inr/numass/client/NumassClient.java index 354dafcc..1786a00c 100644 --- a/numass-client/src/main/java/inr/numass/client/NumassClient.java +++ b/numass-client/src/main/java/inr/numass/client/NumassClient.java @@ -1,4 +1,4 @@ -/* +/* * Copyright 2015 Alexander Nozik. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,10 +22,10 @@ import hep.dataforge.io.envelopes.EnvelopeBuilder; import hep.dataforge.io.messages.Responder; import hep.dataforge.meta.Meta; import hep.dataforge.meta.MetaBuilder; -import hep.dataforge.storage.commons.StorageUtils; import hep.dataforge.values.Value; import hep.dataforge.values.Values; import inr.numass.data.storage.NumassStorage; +import org.jetbrains.annotations.NotNull; import org.slf4j.LoggerFactory; import org.zeroturnaround.zip.ZipUtil; @@ -39,6 +39,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static hep.dataforge.io.messages.MessagesKt.*; /** * @author darksnake @@ -46,7 +49,6 @@ import java.util.Map; public class NumassClient implements AutoCloseable, Responder { Socket socket; - MessageFactory mf = new MessageFactory(); public NumassClient(String address, int port) throws IOException { socket = new Socket(address, port); @@ -61,7 +63,7 @@ public class NumassClient implements AutoCloseable, Responder { @Override public void close() throws IOException { if (!socket.isClosed()) { - write(mf.terminator(), socket.getOutputStream()); + write(getTerminator(), socket.getOutputStream()); } socket.close(); } @@ -73,7 +75,7 @@ public class NumassClient implements AutoCloseable, Responder { return read(socket.getInputStream()); } catch (IOException ex) { LoggerFactory.getLogger(getClass()).error("Error in envelope exchange", ex); - return mf.errorResponseBase(message, ex).build(); + return errorResponseBase(message, ex).build(); } } @@ -87,7 +89,7 @@ public class NumassClient implements AutoCloseable, Responder { } private EnvelopeBuilder requestActionBase(String type, String action) { - return mf.requestBase(type).putMetaValue("action", action); + return requestBase(type).setMetaValue("action", action); } public Meta getCurrentRun() { @@ -111,34 +113,34 @@ public class NumassClient implements AutoCloseable, Responder { ByteBuffer buffer; String zipName = null; if (file.isDirectory()) { - File tmpFile = File.createTempFile(file.getName(), NumassStorage.NUMASS_ZIP_EXTENSION); + File tmpFile = File.createTempFile(file.getName(), NumassStorage.Companion.getNUMASS_ZIP_EXTENSION()); tmpFile.deleteOnExit(); ZipUtil.pack(file, tmpFile); zipName = file.getName(); file = tmpFile; } - if (file.toString().endsWith(NumassStorage.NUMASS_ZIP_EXTENSION)) { + if (file.toString().endsWith(NumassStorage.Companion.getNUMASS_ZIP_EXTENSION())) { FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); buffer = ByteBuffer.allocate((int) channel.size()); channel.read(buffer); if (zipName == null) { - zipName = file.getName().replace(NumassStorage.NUMASS_ZIP_EXTENSION, ""); + zipName = file.getName().replace(NumassStorage.Companion.getNUMASS_ZIP_EXTENSION(), ""); } } else { - return StorageUtils.getErrorMeta(new FileNotFoundException(fileName)); + return getErrorMeta(new FileNotFoundException(fileName)); } - Envelope bin = mf.requestBase("numass.data") - .putMetaValue("action", "push") - .putMetaValue("path", path) - .putMetaValue("name", zipName) + Envelope bin = requestBase("numass.data") + .setMetaValue("action", "push") + .setMetaValue("path", path) + .setMetaValue("name", zipName) .setData(buffer) .build(); return respond(bin).getMeta(); } catch (IOException ex) { - return StorageUtils.getErrorMeta(ex); + return getErrorMeta(ex); } } @@ -159,7 +161,7 @@ public class NumassClient implements AutoCloseable, Responder { Meta response = respond(env.build()).getMeta(); if (response.getBoolean("success", true)) { Map res = new HashMap<>(); - response.getMetaList("state").stream().forEach((stateMeta) -> { + response.getMetaList("state").forEach((stateMeta) -> { res.put(stateMeta.getString("name"), stateMeta.getValue("value")); }); return res; @@ -193,7 +195,7 @@ public class NumassClient implements AutoCloseable, Responder { */ public Meta setState(Map stateMap) { EnvelopeBuilder env = requestActionBase("numass.state", "set"); - stateMap.entrySet().stream().forEach((state) -> { + stateMap.entrySet().forEach((state) -> { env.putMetaNode(new MetaBuilder("state") .setValue("name", state.getKey()) .setValue("value", state.getValue()) @@ -254,5 +256,9 @@ public class NumassClient implements AutoCloseable, Responder { throw new UnsupportedOperationException(); } - + @NotNull + @Override + public CompletableFuture respondInFuture(@NotNull Envelope message) { + return CompletableFuture.supplyAsync(() -> respond(message)); + } } diff --git a/numass-control/cryotemp/src/main/kotlin/inr/numass/control/cryotemp/PKT8Device.kt b/numass-control/cryotemp/src/main/kotlin/inr/numass/control/cryotemp/PKT8Device.kt index d43daecb..dc4b20ae 100644 --- a/numass-control/cryotemp/src/main/kotlin/inr/numass/control/cryotemp/PKT8Device.kt +++ b/numass-control/cryotemp/src/main/kotlin/inr/numass/control/cryotemp/PKT8Device.kt @@ -141,8 +141,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor(context, } - @Throws(ControlException::class) - override fun buildPort(portName: String): Port { + override fun buildPort(meta: Meta): Port { //setup connection val handler: Port = if ("virtual" == portName) { logger.info("Starting {} using virtual debug port", name) @@ -225,67 +224,67 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor(context, override fun startMeasurement(oldMeta: Meta, newMeta: Meta) { - if (!oldMeta.isEmpty) { - logger.warn("Trying to start measurement which is already started") + if (!oldMeta.isEmpty) { + logger.warn("Trying to start measurement which is already started") + } + + try { + logger.info("Starting measurement") + + //add weak stop listener + stopListener = controller.onPhrase("[Ss]topped\\s*") { + afterPause() + updateLogicalState(Sensor.MEASURING_STATE, false) } - try { - logger.info("Starting measurement") + //add weak measurement listener + valueListener = controller.onPhrase("[a-f].*") { + val trimmed = it.trim() + val designation = trimmed.substring(0, 1) + val rawValue = java.lang.Double.parseDouble(trimmed.substring(1)) / 100 - //add weak stop listener - stopListener = controller.onPhrase("[Ss]topped\\s*") { - afterPause() - updateLogicalState(Sensor.MEASURING_STATE, false) + val channel = this@PKT8Device.channels[designation] + + if (channel != null) { + result(channel.evaluate(rawValue)) + collector.put(channel.name, channel.getTemperature(rawValue)) + } else { + result(PKT8Result(designation, rawValue, -1.0)) } - - //add weak measurement listener - valueListener = controller.onPhrase("[a-f].*") { - val trimmed = it.trim() - val designation = trimmed.substring(0, 1) - val rawValue = java.lang.Double.parseDouble(trimmed.substring(1)) / 100 - - val channel = this@PKT8Device.channels[designation] - - if (channel != null) { - result(channel.evaluate(rawValue)) - collector.put(channel.name, channel.getTemperature(rawValue)) - } else { - result(PKT8Result(designation, rawValue, -1.0)) - } - } - - //send start signal - controller.send("s") - - afterStart() - } catch (ex: ControlException) { - onError("Failed to start measurement", ex) } + + //send start signal + controller.send("s") + + afterStart() + } catch (ex: ControlException) { + onError("Failed to start measurement", ex) + } } override fun stopMeasurement(meta: Meta) { - if (isFinished) { - logger.warn("Trying to stop measurement which is already stopped") - return true - } else { + if (isFinished) { + logger.warn("Trying to stop measurement which is already stopped") + return true + } else { - try { - logger.info("Stopping measurement") - val response = sendAndWait("p").trim() - // Должно быть именно с большой буквы!!! - return "Stopped" == response || "stopped" == response - } catch (ex: Exception) { - onError("Failed to stop measurement", ex) - return false - } finally { - afterStop() - errorListener?.let { controller.removeErrorListener(it) } - stopListener?.let { controller.removePhraseListener(it) } - valueListener?.let { controller.removePhraseListener(it) } - collector.stop() - logger.debug("Collector stopped") - } + try { + logger.info("Stopping measurement") + val response = sendAndWait("p").trim() + // Должно быть именно с большой буквы!!! + return "Stopped" == response || "stopped" == response + } catch (ex: Exception) { + onError("Failed to stop measurement", ex) + return false + } finally { + afterStop() + errorListener?.let { controller.removeErrorListener(it) } + stopListener?.let { controller.removePhraseListener(it) } + valueListener?.let { controller.removePhraseListener(it) } + collector.stop() + logger.debug("Collector stopped") } + } } private fun setSPS(sps: Int) { @@ -332,7 +331,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor(context, // } -// inner class PKT8Measurement(private val controller: GenericPortController) : AbstractMeasurement() { + // inner class PKT8Measurement(private val controller: GenericPortController) : AbstractMeasurement() { // // override fun getDevice(): Device = this@PKT8Device // diff --git a/numass-control/msp/src/main/kotlin/inr/numass/control/msp/MspDevice.kt b/numass-control/msp/src/main/kotlin/inr/numass/control/msp/MspDevice.kt index c592c58e..b4807f25 100644 --- a/numass-control/msp/src/main/kotlin/inr/numass/control/msp/MspDevice.kt +++ b/numass-control/msp/src/main/kotlin/inr/numass/control/msp/MspDevice.kt @@ -120,7 +120,7 @@ class MspDevice(context: Context, meta: Meta) : PortSensor(context, meta } } - override fun acceptError(errorMessage: String?, error: Throwable?) { + override fun error(errorMessage: String?, error: Throwable?) { notifyError(errorMessage, error) } diff --git a/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt b/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt index 3f04078d..bb082d83 100644 --- a/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt +++ b/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt @@ -107,7 +107,7 @@ fun findDeviceMeta(config: Meta, criterion: Predicate): Optional { fun setupContext(meta: Meta): Context { val ctx = Global.getContext("NUMASS-CONTROL") - ctx.pluginManager.getOrLoad(StorageManager::class.java) + ctx.pluginManager.load(StorageManager::class.java) return ctx } diff --git a/numass-control/vac/src/main/kotlin/inr/numass/control/readvac/CM32Device.kt b/numass-control/vac/src/main/kotlin/inr/numass/control/readvac/CM32Device.kt index 4e9b5ff2..65058d93 100644 --- a/numass-control/vac/src/main/kotlin/inr/numass/control/readvac/CM32Device.kt +++ b/numass-control/vac/src/main/kotlin/inr/numass/control/readvac/CM32Device.kt @@ -11,9 +11,9 @@ import hep.dataforge.control.devices.PortSensor import hep.dataforge.control.measurements.Measurement import hep.dataforge.control.measurements.SimpleMeasurement import hep.dataforge.control.ports.ComPort +import hep.dataforge.control.ports.GenericPortController import hep.dataforge.control.ports.Port import hep.dataforge.control.ports.PortFactory -import hep.dataforge.exceptions.ControlException import hep.dataforge.meta.Meta import inr.numass.control.DeviceView @@ -23,17 +23,21 @@ import inr.numass.control.DeviceView @DeviceView(VacDisplay::class) class CM32Device(context: Context, meta: Meta) : PortSensor(context, meta) { - @Throws(ControlException::class) - override fun buildPort(portName: String): Port { + override fun connect(meta: Meta): GenericPortController { + val portName = meta.getString("name") logger.info("Connecting to port {}", portName) - val new: Port - if (portName.startsWith("com")) { - new = ComPort(portName, 2400, 8, 1, 0) + val port: Port = if (portName.startsWith("com")) { + ComPort.create(portName, 2400, 8, 1, 0) } else { - new = PortFactory.build(portName) + PortFactory.build(meta) } - new.setDelimiter("T--\r") - return new + return GenericPortController(context, port){it.endsWith("T--\r")} + } + + + + override fun startMeasurement(oldMeta: Meta, newMeta: Meta) { + TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } override fun createMeasurement(): Measurement = CMVacMeasurement() diff --git a/numass-core/build.gradle b/numass-core/build.gradle index bede3cd2..dd25157c 100644 --- a/numass-core/build.gradle +++ b/numass-core/build.gradle @@ -29,6 +29,10 @@ protobuf { generatedFilesBaseDir = "$projectDir/gen" } +sourceSets { + main.kotlin.srcDirs += 'gen/main/java' +} + clean { delete protobuf.generatedFilesBaseDir } diff --git a/numass-core/src/main/java/inr/numass/NumassEnvelopeType.java b/numass-core/src/main/java/inr/numass/NumassEnvelopeType.java deleted file mode 100644 index 40e64246..00000000 --- a/numass-core/src/main/java/inr/numass/NumassEnvelopeType.java +++ /dev/null @@ -1,101 +0,0 @@ -package inr.numass; - -import hep.dataforge.io.envelopes.*; -import hep.dataforge.values.Value; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; - -/** - * An envelope type for legacy numass tags. Reads legacy tag and writes DF02 tags - */ -public class NumassEnvelopeType implements EnvelopeType { - - public static final byte[] LEGACY_START_SEQUENCE = {'#', '!'}; - public static final byte[] LEGACY_END_SEQUENCE = {'!', '#', '\r', '\n'}; - - @Override - public int getCode() { - return DefaultEnvelopeType.DEFAULT_ENVELOPE_TYPE; - } - - @Override - public String getName() { - return "numass"; - } - - @Override - public String description() { - return "Numass legacy envelope"; - } - - @Override - public EnvelopeReader getReader(Map properties) { - return new NumassEnvelopeReader(); - } - - @Override - public EnvelopeWriter getWriter(Map properties) { - return new DefaultEnvelopeWriter(this, MetaType.Companion.resolve(properties)); - } - - public static class LegacyTag extends EnvelopeTag { - @Override - protected byte[] getStartSequence() { - return LEGACY_START_SEQUENCE; - } - - @Override - protected byte[] getEndSequence() { - return LEGACY_END_SEQUENCE; - } - - /** - * Get the length of tag in bytes. -1 means undefined size in case tag was modified - * - * @return - */ - public int getLength() { - return 30; - } - - /** - * Read leagscy version 1 tag without leading tag head - * - * @param buffer - * @return - * @throws IOException - */ - protected Map readHeader(ByteBuffer buffer) { - Map res = new HashMap<>(); - - int type = buffer.getInt(2); - res.put(Envelope.Companion.TYPE_PROPERTY, Value.of(type)); - - short metaTypeCode = buffer.getShort(10); - MetaType metaType = MetaType.Companion.resolve(metaTypeCode); - - if (metaType != null) { - res.put(Envelope.Companion.META_TYPE_PROPERTY, Value.of(metaType.getName())); - } else { - LoggerFactory.getLogger(EnvelopeTag.class).warn("Could not resolve meta type. Using default"); - } - - long metaLength = Integer.toUnsignedLong(buffer.getInt(14)); - res.put(Envelope.Companion.META_LENGTH_PROPERTY, Value.of(metaLength)); - long dataLength = Integer.toUnsignedLong(buffer.getInt(22)); - res.put(Envelope.Companion.DATA_LENGTH_PROPERTY, Value.of(dataLength)); - return res; - } - } - - private static class NumassEnvelopeReader extends DefaultEnvelopeReader { - @Override - protected EnvelopeTag newTag() { - return new LegacyTag(); - } - } -} diff --git a/numass-core/src/main/java/inr/numass/NumassProperties.java b/numass-core/src/main/java/inr/numass/NumassProperties.java deleted file mode 100644 index cd6e733d..00000000 --- a/numass-core/src/main/java/inr/numass/NumassProperties.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * To change this license header, choose License Headers in Project Properties. - * To change this template file, choose Tools | Templates - * and open the template in the editor. - */ -package inr.numass; - -import hep.dataforge.context.Global; -import org.jetbrains.annotations.Nullable; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.Properties; - -/** - * - * @author Alexander Nozik - */ -public class NumassProperties { - - private static File getNumassPropertiesFile() throws IOException { - File file = new File(Global.INSTANCE.getUserDirectory(), "numass"); - if (!file.exists()) { - file.mkdirs(); - } - file = new File(file, "numass.cfg"); - if(!file.exists()){ - file.createNewFile(); - } - return file; - } - - public static String getNumassProperty(String key) { - try { - Properties props = new Properties(); - props.load(new FileInputStream(getNumassPropertiesFile())); - return props.getProperty(key); - } catch (IOException ex) { - return null; - } - } - - public synchronized static void setNumassProperty(String key, @Nullable String value) { - try { - Properties props = new Properties(); - File store = getNumassPropertiesFile(); - props.load(new FileInputStream(store)); - if(value == null){ - props.remove(key); - } else { - props.setProperty(key, value); - } - props.store(new FileOutputStream(store), ""); - } catch (IOException ex) { - Global.INSTANCE.getLogger().error("Failed to save numass properties", ex); - } - } -} diff --git a/numass-core/src/main/java/inr/numass/data/api/NumassSet.java b/numass-core/src/main/java/inr/numass/data/api/NumassSet.java index 2d415f3e..49771ec4 100644 --- a/numass-core/src/main/java/inr/numass/data/api/NumassSet.java +++ b/numass-core/src/main/java/inr/numass/data/api/NumassSet.java @@ -32,9 +32,9 @@ public interface NumassSet extends Named, Metoid, Iterable, Provide Stream getPoints(); - default String getDescription() { - return getMeta().getString(DESCRIPTION_KEY, ""); - } +// default String getDescription() { +// return getMeta().getString(DESCRIPTION_KEY, ""); +// } @NotNull @Override diff --git a/numass-core/src/main/java/inr/numass/data/legacy/NumassFileEnvelope.java b/numass-core/src/main/java/inr/numass/data/legacy/NumassFileEnvelope.java index 10bcc7e4..ec1ff3a7 100644 --- a/numass-core/src/main/java/inr/numass/data/legacy/NumassFileEnvelope.java +++ b/numass-core/src/main/java/inr/numass/data/legacy/NumassFileEnvelope.java @@ -10,7 +10,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; -import static inr.numass.NumassEnvelopeType.LEGACY_START_SEQUENCE; import static java.nio.file.StandardOpenOption.READ; public class NumassFileEnvelope extends FileEnvelope { @@ -22,10 +21,10 @@ public class NumassFileEnvelope extends FileEnvelope { try (InputStream stream = Files.newInputStream(path, READ)) { byte[] bytes = new byte[2]; stream.read(bytes); - if (Arrays.equals(bytes, LEGACY_START_SEQUENCE)) { + if (Arrays.equals(bytes, NumassEnvelopeType.Companion.getLEGACY_START_SEQUENCE())) { return new NumassFileEnvelope(path, readOnly); } else { - return FileEnvelope.open(path, readOnly); + return FileEnvelope.Companion.open(path, readOnly); } } catch (IOException e) { throw new RuntimeException("Failed to open file envelope", e); diff --git a/numass-core/src/main/java/inr/numass/data/storage/ClassicNumassPoint.java b/numass-core/src/main/java/inr/numass/data/storage/ClassicNumassPoint.java deleted file mode 100644 index 39f6561c..00000000 --- a/numass-core/src/main/java/inr/numass/data/storage/ClassicNumassPoint.java +++ /dev/null @@ -1,153 +0,0 @@ -package inr.numass.data.storage; - -import hep.dataforge.io.envelopes.Envelope; -import hep.dataforge.meta.Meta; -import inr.numass.data.api.NumassBlock; -import inr.numass.data.api.NumassEvent; -import inr.numass.data.api.NumassFrame; -import inr.numass.data.api.NumassPoint; -import inr.numass.data.legacy.NumassFileEnvelope; -import org.jetbrains.annotations.NotNull; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.nio.channels.ReadableByteChannel; -import java.nio.file.Path; -import java.time.Duration; -import java.time.Instant; -import java.util.Iterator; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -/** - * Created by darksnake on 08.07.2017. - */ -public class ClassicNumassPoint implements NumassPoint { - public static ClassicNumassPoint readFile(Path path) { - return new ClassicNumassPoint(NumassFileEnvelope.open(path, true)); - } - - private final Envelope envelope; - - public ClassicNumassPoint(Envelope envelope) { - this.envelope = envelope; - } - - @Override - public Stream getBlocks() { -// double u = envelope.meta().getDouble("external_meta.HV1_value", 0); - long length; - if (envelope.getMeta().hasValue("external_meta.acquisition_time")) { - length = envelope.getMeta().getValue("external_meta.acquisition_time").longValue(); - } else { - length = envelope.getMeta().getValue("acquisition_time").longValue(); - } - return Stream.of(new ClassicBlock(getStartTime(), Duration.ofSeconds(length))); - } - - @Override - public Instant getStartTime() { - if (getMeta().hasValue("start_time")) { - return getMeta().getValue("start_time").timeValue(); - } else { - return Instant.EPOCH; - } - } - - @Override - public double getVoltage() { - return getMeta().getDouble("external_meta.HV1_value", 0); - } - - @Override - public int getIndex() { - return getMeta().getInt("external_meta.point_index", -1); - } - - @Override - public Meta getMeta() { - return envelope.getMeta(); - } - - //TODO split blocks using meta - private class ClassicBlock implements NumassBlock, Iterable { - private final Instant startTime; - private final Duration length; -// private final long blockOffset; - - public ClassicBlock(Instant startTime, Duration length) { - this.startTime = startTime; - this.length = length; -// this.blockOffset = blockOffset; - } - - @Override - public Instant getStartTime() { - return startTime; - } - - @Override - public Duration getLength() { - return length; - } - - @Override - public Stream getEvents() { - return StreamSupport.stream(this.spliterator(), false); - } - - @NotNull - @Override - public Iterator iterator() { - double timeCoef = envelope.getMeta().getDouble("time_coeff", 50); - try { - ByteBuffer buffer = ByteBuffer.allocate(7000); - buffer.order(ByteOrder.LITTLE_ENDIAN); - ReadableByteChannel channel = envelope.getData().getChannel(); - channel.read(buffer); - buffer.flip(); - return new Iterator() { - - @Override - public boolean hasNext() { - try { - if (buffer.hasRemaining()) { - return true; - } else { - buffer.flip(); - int num = channel.read(buffer); - if (num > 0) { - buffer.flip(); - return true; - } else { - return false; - } - } - } catch (IOException e) { - LoggerFactory.getLogger(ClassicNumassPoint.this.getClass()).error("Unexpected IOException when reading block", e); - return false; - } - } - - @Override - public NumassEvent next() { - short channel = (short) Short.toUnsignedInt(buffer.getShort()); - long time = Integer.toUnsignedLong(buffer.getInt()); - byte status = buffer.get(); // status is ignored - return new NumassEvent(channel, startTime, (long) (time * timeCoef)); - } - }; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - - @Override - public Stream getFrames() { - return Stream.empty(); - } - } -} diff --git a/numass-core/src/main/java/inr/numass/data/storage/NumassDataFactory.java b/numass-core/src/main/java/inr/numass/data/storage/NumassDataFactory.java deleted file mode 100644 index 23b295c6..00000000 --- a/numass-core/src/main/java/inr/numass/data/storage/NumassDataFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -package inr.numass.data.storage; - -import hep.dataforge.context.Context; -import hep.dataforge.data.DataFactory; -import hep.dataforge.data.DataTree; -import hep.dataforge.meta.Meta; -import hep.dataforge.storage.api.Storage; -import hep.dataforge.storage.commons.StorageManager; -import hep.dataforge.storage.commons.StorageUtils; -import inr.numass.data.api.NumassSet; - -/** - * Created by darksnake on 03-Feb-17. - */ -public class NumassDataFactory extends DataFactory { - - public NumassDataFactory() { - super(NumassSet.class); - } - - @Override - public String getName() { - return "numass"; - } - - - @Override - protected void fill(DataTree.Builder builder, Context context, Meta meta) { - Meta newMeta = meta.getBuilder().setValue("type", "numass"); - Storage storage = context.load(StorageManager.class, Meta.empty()).buildStorage(newMeta); - StorageUtils.loaderStream(storage).forEach(loader -> { - if (loader instanceof NumassSet) { - builder.putStatic(loader.getFullName().toUnescaped(), (NumassSet) loader); - } - }); - } -} diff --git a/numass-core/src/main/java/inr/numass/data/storage/NumassDataLoader.java b/numass-core/src/main/java/inr/numass/data/storage/NumassDataLoader.java deleted file mode 100644 index 13f80eef..00000000 --- a/numass-core/src/main/java/inr/numass/data/storage/NumassDataLoader.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Copyright 2015 Alexander Nozik. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package inr.numass.data.storage; - -import hep.dataforge.exceptions.StorageException; -import hep.dataforge.io.ColumnedDataReader; -import hep.dataforge.io.envelopes.Envelope; -import hep.dataforge.meta.Meta; -import hep.dataforge.meta.MetaBuilder; -import hep.dataforge.providers.Provider; -import hep.dataforge.storage.api.ObjectLoader; -import hep.dataforge.storage.api.Storage; -import hep.dataforge.storage.filestorage.FileStorage; -import hep.dataforge.storage.loaders.AbstractLoader; -import hep.dataforge.tables.Table; -import hep.dataforge.values.Value; -import inr.numass.data.api.NumassPoint; -import inr.numass.data.api.NumassSet; -import inr.numass.data.legacy.NumassFileEnvelope; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Instant; -import java.util.*; -import java.util.function.Supplier; -import java.util.stream.Stream; - - -/** - * The reader for numass main detector data directory or zip format; - * - * @author darksnake - */ -public class NumassDataLoader extends AbstractLoader implements ObjectLoader, NumassSet, Provider { - - - public static NumassDataLoader fromFile(Storage storage, Path zipFile) throws IOException { - throw new UnsupportedOperationException("TODO"); - } - - - /** - * Construct numass loader from directory - * - * @param storage - * @param directory - * @return - * @throws IOException - */ - public static NumassDataLoader fromDir(Storage storage, Path directory, String name) throws IOException { - if (!Files.isDirectory(directory)) { - throw new IllegalArgumentException("Numass data directory required"); - } - Meta annotation = new MetaBuilder("loader") - .putValue("type", "numass") - .putValue("numass.loaderFormat", "dir") -// .setValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime())) - .build(); - - if (name == null || name.isEmpty()) { - name = FileStorage.entryName(directory); - } - - //FIXME envelopes are lazy do we need to do additional lazy evaluations here? - Map> items = new LinkedHashMap<>(); - - Files.list(directory).filter(file -> { - String fileName = file.getFileName().toString(); - return fileName.equals(META_FRAGMENT_NAME) - || fileName.equals(HV_FRAGMENT_NAME) - || fileName.startsWith(POINT_FRAGMENT_NAME); - }).forEach(file -> { - try { - items.put(FileStorage.entryName(file), () -> NumassFileEnvelope.open(file, true)); - } catch (Exception ex) { - LoggerFactory.getLogger(NumassDataLoader.class) - .error("Can't load numass data directory " + FileStorage.entryName(directory), ex); - } - }); - - return new NumassDataLoader(storage, name, annotation, items); - } - - /** - * "start_time": "2016-04-20T04:08:50", - * - * @param meta - * @return - */ - private static Instant readTime(Meta meta) { - if (meta.hasValue("start_time")) { - return meta.getValue("start_time").timeValue(); - } else { - return Instant.EPOCH; - } - } - - /** - * The name of informational meta file in numass data directory - */ - public static final String META_FRAGMENT_NAME = "meta"; - - /** - * The beginning of point fragment name - */ - public static final String POINT_FRAGMENT_NAME = "p"; - - /** - * The beginning of hv fragment name - */ - public static final String HV_FRAGMENT_NAME = "voltage"; - private final Map> itemsProvider; - - private NumassDataLoader(Storage storage, String name, Meta annotation) { - super(storage, name, annotation); - itemsProvider = new HashMap<>(); - readOnly = true; - } - - private NumassDataLoader(Storage storage, String name, Meta meta, Map> items) { - super(storage, name, meta); - this.itemsProvider = items; - readOnly = true; - } - - private Map> getItems() { - return itemsProvider; - } - - @Override - public Collection fragmentNames() { - return getItems().keySet(); - } - - @Override - public Meta getMeta() { - return getItems() - .get(META_FRAGMENT_NAME) - .get() - .getMeta(); - - } - - @Override - public Optional getHvData() { - return getHVEnvelope().map(hvEnvelope -> { - try { - return new ColumnedDataReader(hvEnvelope.getData().getStream(), "timestamp", "block", "value").toTable(); - } catch (IOException ex) { - LoggerFactory.getLogger(getClass()).error("Failed to load HV data from file", ex); - return null; - } - } - ); - - } - - private Optional getHVEnvelope() { - return Optional.ofNullable(getItems().get(HV_FRAGMENT_NAME)).map(Supplier::get); - } - - private Stream getPointEnvelopes() { - return getItems().entrySet().stream() - .filter(entry -> entry.getKey().startsWith(POINT_FRAGMENT_NAME) && entry.getValue() != null) - .map(entry -> entry.getValue().get()) - .sorted(Comparator.comparing(t -> t.getMeta().getInt("external_meta.point_index", -1))); - - } - - @Override - public Stream getPoints() { - return getPointEnvelopes().map(ClassicNumassPoint::new); - } - - public boolean isReversed() { - return this.getMeta().getBoolean("iteration_info.reverse", false); - } - - @Override - public boolean isEmpty() { - return getItems().isEmpty(); - } - - @Override - public Envelope pull(String header) { - //PENDING read data to memory? - return getItems().get(header).get(); - } - - @Override - public void push(String header, Envelope data) throws StorageException { - tryPush(); - } - - @Override - public Envelope respond(Envelope message) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public Instant getStartTime() { - return getMeta().optValue("start_time").map(Value::timeValue).orElseGet(() -> NumassSet.super.getStartTime()); - } - - @Override - public String getDescription() { - return this.getMeta().getString("description", "").replace("\\n", "\n"); - } - - @Override - public void open() throws Exception { - - } - -} diff --git a/numass-core/src/main/java/inr/numass/data/storage/NumassStorage.java b/numass-core/src/main/java/inr/numass/data/storage/NumassStorage.java deleted file mode 100644 index 6f8c7b95..00000000 --- a/numass-core/src/main/java/inr/numass/data/storage/NumassStorage.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Copyright 2015 Alexander Nozik. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package inr.numass.data.storage; - -import hep.dataforge.context.Context; -import hep.dataforge.events.Event; -import hep.dataforge.events.EventBuilder; -import hep.dataforge.exceptions.StorageException; -import hep.dataforge.meta.Meta; -import hep.dataforge.storage.filestorage.FileStorage; -import inr.numass.data.api.NumassSet; -import inr.numass.data.legacy.NumassDatFile; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ByteChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; - -import static java.nio.file.StandardOpenOption.CREATE; -import static java.nio.file.StandardOpenOption.WRITE; - - -/** - * The file storage containing numass data directories or zips. - *

- * Any subdirectory is treated as numass data directory. Any zip must have - * {@code NUMASS_ZIP_EXTENSION} extension to be recognized. Any other files are - * ignored. - *

- * - * @author Alexander Nozik - */ -public class NumassStorage extends FileStorage { - - public static final String NUMASS_ZIP_EXTENSION = ".nm.zip"; - public static final String NUMASS_DATA_LOADER_TYPE = "numassData"; - - protected NumassStorage(FileStorage parent, Meta config, String shelf) throws StorageException { - super(parent, config, shelf); - super.refresh(); - } - - public NumassStorage(Context context, Meta config, Path path) throws StorageException { - super(context, config, path); - super.refresh(); - } - - @Override - protected void updateDirectoryLoaders() { - try { - this.loaders.clear(); - Files.list(getDataDir()).forEach(file -> { - try { - if (Files.isDirectory(file)) { - Path metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME); - if (Files.exists(metaFile)) { - this.loaders.put(entryName(file), - NumassDataLoader.fromDir(this, file, null)); - } else { - this.shelves.put(entryName(file), - new NumassStorage(this, getMeta(), entryName(file))); - } - } else if (file.getFileName().endsWith(NUMASS_ZIP_EXTENSION)) { - this.loaders.put(entryName(file), NumassDataLoader.fromFile(this, file)); -// } else if (file.getFileName().endsWith(".points")) { -// try { -// loaders.put(getFileName(file), -// FilePointLoader.fromFile(this, file, true)); -// } catch (Exception ex) { -// getLogger().error("Failed to builder numass point loader from file {}", file.getName()); -// } - } else { - //updating non-numass loader files - updateFile(file); - } - } catch (IOException ex) { - LoggerFactory.getLogger(getClass()).error("Error while creating numass loader", ex); - } catch (StorageException ex) { - LoggerFactory.getLogger(getClass()).error("Error while creating numass group", ex); - } - }); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - public void pushNumassData(String path, String fileName, ByteBuffer data) throws StorageException { - if (path == null || path.isEmpty()) { - pushNumassData(fileName, data); - } else { - NumassStorage st = (NumassStorage) buildShelf(path); - st.pushNumassData(fileName, data); - } - } - - /** - * Read nm.zip content and write it as a new nm.zip file - * - * @param fileName - */ - @SuppressWarnings("unchecked") - public void pushNumassData(String fileName, ByteBuffer data) throws StorageException { - //FIXME move zip to internal - try { - Path nmFile = getDataDir().resolve(fileName + NUMASS_ZIP_EXTENSION); - if (Files.exists(nmFile)) { - LoggerFactory.getLogger(getClass()).warn("Trying to rewrite existing numass data file {}", nmFile.toString()); - } - try (ByteChannel channel = Files.newByteChannel(nmFile, CREATE, WRITE)) { - channel.write(data); - } - - dispatchEvent(NumassDataPointEvent.build(getName(), fileName, (int) Files.size(nmFile))); - } catch (IOException ex) { - throw new StorageException(ex); - } - } - - @Override - public NumassStorage createShelf(Meta meta, String path) throws StorageException { - return new NumassStorage(this, meta, path); - } - - /** - * A list of legacy DAT files in the directory - * - * @return - */ - public List legacyFiles() { - try { - List files = new ArrayList<>(); - Files.list(getDataDir()).forEach(file -> { - if (Files.isRegularFile(file) && file.getFileName().toString().toLowerCase().endsWith(".dat")) { - String name = file.getFileName().toString(); - try { - files.add(new NumassDatFile(file, Meta.empty())); - } catch (Exception ex) { - LoggerFactory.getLogger(getClass()).error("Error while reading legacy numass file " + file.getFileName(), ex); - } - } - }); - return files; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - public String getDescription() { - return getMeta().getString("description", ""); - } - - @Override - public void close() throws Exception { - super.close(); - //close remote file system after use - try { - getDataDir().getFileSystem().close(); - } catch (UnsupportedOperationException ex) { - - } - } - - public static class NumassDataPointEvent extends Event { - - public static final String FILE_NAME_KEY = "fileName"; - public static final String FILE_SIZE_KEY = "fileSize"; - - public NumassDataPointEvent(Meta meta) { - super(meta); - } - - public static NumassDataPointEvent build(String source, String fileName, int fileSize) { - return new NumassDataPointEvent(builder(source, fileName, fileSize).buildEventMeta()); - } - - public static EventBuilder builder(String source, String fileName, int fileSize) { - return EventBuilder.make("numass.storage.pushData") - .setSource(source) - .setMetaValue(FILE_NAME_KEY, fileName) - .setMetaValue(FILE_SIZE_KEY, fileSize); - } - - public int getFileSize() { - return getMeta().getInt(FILE_SIZE_KEY, 0); - } - - public String getFileName() { - return getMeta().getString(FILE_NAME_KEY); - } - - @Override - public String toString() { - return String.format("(%s) [%s] : pushed numass data file with name '%s' and size '%d'", - time().toString(), sourceTag(), getFileName(), getFileSize()); - } - - } - -} diff --git a/numass-core/src/main/java/inr/numass/data/storage/NumassStorageFactory.java b/numass-core/src/main/java/inr/numass/data/storage/NumassStorageFactory.java deleted file mode 100644 index 34964130..00000000 --- a/numass-core/src/main/java/inr/numass/data/storage/NumassStorageFactory.java +++ /dev/null @@ -1,83 +0,0 @@ -package inr.numass.data.storage; - -import com.github.robtimus.filesystems.sftp.SFTPEnvironment; -import hep.dataforge.context.Context; -import hep.dataforge.meta.Meta; -import hep.dataforge.meta.MetaBuilder; -import hep.dataforge.storage.api.Storage; -import hep.dataforge.storage.api.StorageType; -import hep.dataforge.storage.commons.StorageManager; -import hep.dataforge.storage.filestorage.FileStorage; -import org.jetbrains.annotations.NotNull; - -import java.net.URI; -import java.nio.file.FileSystem; -import java.nio.file.FileSystems; -import java.nio.file.Path; -import java.nio.file.Paths; - -/** - * Created by darksnake on 17-May-17. - */ -public class NumassStorageFactory implements StorageType { - - /** - * Build local storage with Global context. Used for tests. - * - * @param file - * @return - */ - @NotNull - public static FileStorage buildLocal(Context context, Path file, boolean readOnly, boolean monitor) { - StorageManager manager = context.load(StorageManager.class, Meta.empty()); - return (FileStorage) manager.buildStorage(buildStorageMeta(file.toUri(), readOnly, monitor)); - } - - @NotNull - public static FileStorage buildLocal(Context context, String path, boolean readOnly, boolean monitor) { - Path file = context.getIo().getDataDir().resolve(path); - return buildLocal(context, file, readOnly, monitor); - } - - @Override - public String type() { - return "numass"; - } - - @NotNull - @Override - public Storage build(Context context, Meta meta) { - if (meta.hasValue("path")) { - URI uri = URI.create(meta.getString("path")); - Path path; - if (uri.getScheme().startsWith("ssh")) { - try { - String username = meta.getString("userName", uri.getUserInfo()); - //String host = meta.getString("host", uri.getHost()); - int port = meta.getInt("port", 22); - SFTPEnvironment env = new SFTPEnvironment() - .withUsername(username) - .withPassword(meta.getString("password", "").toCharArray()); - FileSystem fs = FileSystems.newFileSystem(uri, env, context.getClassLoader()); - path = fs.getPath(uri.getPath()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - path = Paths.get(uri); - } - return new NumassStorage(context, meta, path); - } else { - context.getLogger().warn("A storage path not provided. Creating default root storage in the working directory"); - return new NumassStorage(context, meta, context.getIo().getWorkDir()); - } - } - - public static MetaBuilder buildStorageMeta(URI path, boolean readOnly, boolean monitor) { - return new MetaBuilder("storage") - .setValue("path", path.toString()) - .setValue("type", "numass") - .setValue("readOnly", readOnly) - .setValue("monitor", monitor); - } -} diff --git a/numass-core/src/main/java/inr/numass/data/storage/ProtoNumassPoint.java b/numass-core/src/main/java/inr/numass/data/storage/ProtoNumassPoint.java deleted file mode 100644 index 298d68ce..00000000 --- a/numass-core/src/main/java/inr/numass/data/storage/ProtoNumassPoint.java +++ /dev/null @@ -1,110 +0,0 @@ -package inr.numass.data.storage; - -import hep.dataforge.io.envelopes.Envelope; -import hep.dataforge.meta.Meta; -import inr.numass.data.NumassProto; -import inr.numass.data.api.NumassBlock; -import inr.numass.data.api.NumassEvent; -import inr.numass.data.api.NumassFrame; -import inr.numass.data.api.NumassPoint; -import inr.numass.data.legacy.NumassFileEnvelope; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.file.Path; -import java.time.Duration; -import java.time.Instant; -import java.util.Comparator; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -/** - * Protobuf based numass point - * Created by darksnake on 09.07.2017. - */ -public class ProtoNumassPoint implements NumassPoint { - public static ProtoNumassPoint readFile(Path path) { - return new ProtoNumassPoint(NumassFileEnvelope.open(path, true)); - } - - - private final Envelope envelope; - - public ProtoNumassPoint(Envelope envelope) { - this.envelope = envelope; - } - - private NumassProto.Point getPoint() { - try (InputStream stream = envelope.getData().getStream()) { - return NumassProto.Point.parseFrom(stream); - } catch (IOException ex) { - throw new RuntimeException("Failed to read point via protobuf"); - } - } - - @Override - public Stream getBlocks() { - return getPoint().getChannelsList().stream() - .flatMap(channel -> - channel.getBlocksList().stream() - .map(block -> new ProtoBlock((int) channel.getNum(), block)) - .sorted(Comparator.comparing(ProtoBlock::getStartTime)) - ); - } - - @Override - public Meta getMeta() { - return envelope.getMeta(); - } - - public static Instant ofEpochNanos(long nanos) { - long seconds = Math.floorDiv(nanos, (int) 1e9); - int reminder = (int) (nanos % 1e9); - return Instant.ofEpochSecond(seconds, reminder); - } - - private class ProtoBlock implements NumassBlock { - - final int channel; - final NumassProto.Point.Channel.Block block; - - private ProtoBlock(int channel, NumassProto.Point.Channel.Block block) { - this.channel = channel; - this.block = block; - } - - @Override - public Instant getStartTime() { - return ofEpochNanos(block.getTime()); - } - - @Override - public Duration getLength() { - return Duration.ofNanos((long) (getMeta().getDouble("params.b_size") / getMeta().getDouble("params.sample_freq") * 1e9)); - } - - @Override - public Stream getEvents() { - Instant blockTime = getStartTime(); - if (block.hasEvents()) { - NumassProto.Point.Channel.Block.Events events = block.getEvents(); - return IntStream.range(0, events.getTimesCount()).mapToObj(i -> - new NumassEvent((short) events.getAmplitudes(i), blockTime, events.getTimes(i)) - ); - } else { - return Stream.empty(); - } - } - - @Override - public Stream getFrames() { - Duration tickSize = Duration.ofNanos((long) (1e9 / getMeta().getInt("params.sample_freq"))); - return block.getFramesList().stream().map(frame -> { - Instant time = getStartTime().plusNanos(frame.getTime()); - ByteBuffer data = frame.getData().asReadOnlyByteBuffer(); - return new NumassFrame(time, tickSize, data.asShortBuffer()); - }); - } - } -} diff --git a/numass-core/src/main/kotlin/inr/numass/NumassEnvelopeType.kt b/numass-core/src/main/kotlin/inr/numass/NumassEnvelopeType.kt new file mode 100644 index 00000000..86774f25 --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/NumassEnvelopeType.kt @@ -0,0 +1,84 @@ +package inr.numass + +import hep.dataforge.io.envelopes.* +import hep.dataforge.values.Value +import org.slf4j.LoggerFactory +import java.io.IOException +import java.nio.ByteBuffer +import java.util.* + +/** + * An envelope type for legacy numass tags. Reads legacy tag and writes DF02 tags + */ +class NumassEnvelopeType : EnvelopeType { + + override val code: Int = DefaultEnvelopeType.DEFAULT_ENVELOPE_TYPE + + override val name: String = "numass" + + override fun description(): String = "Numass legacy envelope" + + override fun getReader(properties: Map): EnvelopeReader { + return NumassEnvelopeReader() + } + + override fun getWriter(properties: Map): EnvelopeWriter { + return DefaultEnvelopeWriter(this, MetaType.resolve(properties)) + } + + class LegacyTag : EnvelopeTag() { + override val startSequence: ByteArray + get() = LEGACY_START_SEQUENCE + + override val endSequence: ByteArray + get() = LEGACY_END_SEQUENCE + + /** + * Get the length of tag in bytes. -1 means undefined size in case tag was modified + * + * @return + */ + override val length: Int + get() = 30 + + /** + * Read leagscy version 1 tag without leading tag head + * + * @param buffer + * @return + * @throws IOException + */ + override fun readHeader(buffer: ByteBuffer): Map { + val res = HashMap() + + val type = buffer.getInt(2) + res[Envelope.TYPE_PROPERTY] = Value.of(type) + + val metaTypeCode = buffer.getShort(10) + val metaType = MetaType.resolve(metaTypeCode) + + if (metaType != null) { + res[Envelope.META_TYPE_PROPERTY] = Value.of(metaType.name) + } else { + LoggerFactory.getLogger(EnvelopeTag::class.java).warn("Could not resolve meta type. Using default") + } + + val metaLength = Integer.toUnsignedLong(buffer.getInt(14)) + res[Envelope.META_LENGTH_PROPERTY] = Value.of(metaLength) + val dataLength = Integer.toUnsignedLong(buffer.getInt(22)) + res[Envelope.DATA_LENGTH_PROPERTY] = Value.of(dataLength) + return res + } + } + + private class NumassEnvelopeReader : DefaultEnvelopeReader() { + override fun newTag(): EnvelopeTag { + return LegacyTag() + } + } + + companion object { + val LEGACY_START_SEQUENCE = byteArrayOf('#'.toByte(), '!'.toByte()) + val LEGACY_END_SEQUENCE = byteArrayOf('!'.toByte(), '#'.toByte(), '\r'.toByte(), '\n'.toByte()) + } +} diff --git a/numass-core/src/main/kotlin/inr/numass/NumassProperties.kt b/numass-core/src/main/kotlin/inr/numass/NumassProperties.kt new file mode 100644 index 00000000..74b1ce07 --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/NumassProperties.kt @@ -0,0 +1,63 @@ +/* + * To change this license header, choose License Headers in Project Properties. + * To change this template file, choose Tools | Templates + * and open the template in the editor. + */ +package inr.numass + +import hep.dataforge.context.Global +import java.io.File +import java.io.FileInputStream +import java.io.FileOutputStream +import java.io.IOException +import java.util.* + +/** + * + * @author Alexander Nozik + */ +object NumassProperties { + + private val numassPropertiesFile: File + @Throws(IOException::class) + get() { + var file = File(Global.userDirectory, "numass") + if (!file.exists()) { + file.mkdirs() + } + file = File(file, "numass.cfg") + if (!file.exists()) { + file.createNewFile() + } + return file + } + + fun getNumassProperty(key: String): String? { + try { + val props = Properties() + props.load(FileInputStream(numassPropertiesFile)) + return props.getProperty(key) + } catch (ex: IOException) { + return null + } + + } + + @Synchronized + fun setNumassProperty(key: String, value: String?) { + try { + val props = Properties() + val store = numassPropertiesFile + props.load(FileInputStream(store)) + if (value == null) { + props.remove(key) + } else { + props.setProperty(key, value) + } + props.store(FileOutputStream(store), "") + } catch (ex: IOException) { + Global.logger.error("Failed to save numass properties", ex) + } + + } +} diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/ClassicNumassPoint.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/ClassicNumassPoint.kt new file mode 100644 index 00000000..7ffacf8d --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/ClassicNumassPoint.kt @@ -0,0 +1,130 @@ +package inr.numass.data.storage + +import hep.dataforge.io.envelopes.Envelope +import hep.dataforge.meta.Meta +import inr.numass.data.api.NumassBlock +import inr.numass.data.api.NumassEvent +import inr.numass.data.api.NumassFrame +import inr.numass.data.api.NumassPoint +import inr.numass.data.legacy.NumassFileEnvelope +import org.slf4j.LoggerFactory +import java.io.IOException +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.stream.Stream +import java.util.stream.StreamSupport + +/** + * Created by darksnake on 08.07.2017. + */ +class ClassicNumassPoint(private val envelope: Envelope) : NumassPoint { + + override fun getBlocks(): Stream { + // double u = envelope.meta().getDouble("external_meta.HV1_value", 0); + val length: Long + if (envelope.meta.hasValue("external_meta.acquisition_time")) { + length = envelope.meta.getValue("external_meta.acquisition_time").longValue() + } else { + length = envelope.meta.getValue("acquisition_time").longValue() + } + return Stream.of(ClassicBlock(startTime, Duration.ofSeconds(length))) + } + + override fun getStartTime(): Instant { + return if (meta.hasValue("start_time")) { + meta.getValue("start_time").timeValue() + } else { + Instant.EPOCH + } + } + + override fun getVoltage(): Double { + return meta.getDouble("external_meta.HV1_value", 0.0) + } + + override fun getIndex(): Int { + return meta.getInt("external_meta.point_index", -1) + } + + override fun getMeta(): Meta { + return envelope.meta + } + + //TODO split blocks using meta + private inner class ClassicBlock + // private final long blockOffset; + + (private val startTime: Instant, private val length: Duration)// this.blockOffset = blockOffset; + : NumassBlock, Iterable { + + override fun getStartTime(): Instant { + return startTime + } + + override fun getLength(): Duration { + return length + } + + override fun getEvents(): Stream { + return StreamSupport.stream(this.spliterator(), false) + } + + override fun iterator(): Iterator { + val timeCoef = envelope.meta.getDouble("time_coeff", 50.0) + try { + val buffer = ByteBuffer.allocate(7000) + buffer.order(ByteOrder.LITTLE_ENDIAN) + val channel = envelope.data.channel + channel.read(buffer) + buffer.flip() + return object : Iterator { + + override fun hasNext(): Boolean { + try { + if (buffer.hasRemaining()) { + return true + } else { + buffer.flip() + val num = channel.read(buffer) + if (num > 0) { + buffer.flip() + return true + } else { + return false + } + } + } catch (e: IOException) { + LoggerFactory.getLogger(this@ClassicNumassPoint.javaClass).error("Unexpected IOException when reading block", e) + return false + } + + } + + override fun next(): NumassEvent { + val channel = java.lang.Short.toUnsignedInt(buffer.short).toShort() + val time = Integer.toUnsignedLong(buffer.int) + val status = buffer.get() // status is ignored + return NumassEvent(channel, startTime, (time * timeCoef).toLong()) + } + } + } catch (ex: IOException) { + throw RuntimeException(ex) + } + + } + + + override fun getFrames(): Stream { + return Stream.empty() + } + } + + companion object { + fun readFile(path: Path): ClassicNumassPoint { + return ClassicNumassPoint(NumassFileEnvelope.open(path, true)) + } + } +} diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt new file mode 100644 index 00000000..2118634d --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataFactory.kt @@ -0,0 +1,30 @@ +package inr.numass.data.storage + +import hep.dataforge.context.Context +import hep.dataforge.data.DataFactory +import hep.dataforge.data.DataTree +import hep.dataforge.meta.Meta +import hep.dataforge.storage.commons.StorageManager +import hep.dataforge.storage.commons.StorageUtils +import inr.numass.data.api.NumassSet + +/** + * Created by darksnake on 03-Feb-17. + */ +class NumassDataFactory : DataFactory(NumassSet::class.java) { + + override fun getName(): String { + return "numass" + } + + + override fun fill(builder: DataTree.Builder, context: Context, meta: Meta) { + val newMeta = meta.builder.setValue("type", "numass") + val storage = context.load(StorageManager::class.java, Meta.empty()).buildStorage(newMeta) + StorageUtils.loaderStream(storage).forEach { loader -> + if (loader is NumassSet) { + builder.putStatic(loader.fullName.toUnescaped(), loader as NumassSet) + } + } + } +} diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt new file mode 100644 index 00000000..0c98c92b --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassDataLoader.kt @@ -0,0 +1,204 @@ +/* + * Copyright 2015 Alexander Nozik. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package inr.numass.data.storage + +import hep.dataforge.exceptions.StorageException +import hep.dataforge.io.ColumnedDataReader +import hep.dataforge.io.envelopes.Envelope +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.providers.Provider +import hep.dataforge.storage.api.ObjectLoader +import hep.dataforge.storage.api.Storage +import hep.dataforge.storage.filestorage.FileStorage +import hep.dataforge.storage.loaders.AbstractLoader +import hep.dataforge.tables.Table +import inr.numass.data.api.NumassPoint +import inr.numass.data.api.NumassSet +import inr.numass.data.legacy.NumassFileEnvelope +import org.slf4j.LoggerFactory + +import java.io.IOException +import java.nio.file.Files +import java.nio.file.Path +import java.time.Instant +import java.util.* +import java.util.function.Supplier +import java.util.stream.Stream + + +/** + * The reader for numass main detector data directory or zip format; + * + * @author darksnake + */ +class NumassDataLoader( + storage: Storage, + name: String, + meta: Meta, + private val items: Map>, + override var isReadOnly: Boolean = true +) : AbstractLoader(storage, name, meta), ObjectLoader, NumassSet, Provider { + + private val hvEnvelope: Optional + get() = Optional.ofNullable(items[HV_FRAGMENT_NAME]).map { it.get() } + + private val pointEnvelopes: Stream + get() = items.entries.stream() + .filter { entry -> entry.key.startsWith(POINT_FRAGMENT_NAME) } + .map { entry -> entry.value.get() } + .sorted(Comparator.comparing { t -> t.meta.getInt("external_meta.point_index", -1) }) + + val isReversed: Boolean + get() = this.meta.getBoolean("iteration_info.reverse", false) + + override val isEmpty: Boolean + get() = items.isEmpty() + + override val description: String = this.meta.getString("description", "").replace("\\n", "\n") + + override fun fragmentNames(): Collection { + return items.keys + } + + override fun getMeta(): Meta { + return items[META_FRAGMENT_NAME]?.get()?.meta ?: Meta.empty() + + } + + override fun getHvData(): Optional
{ + return hvEnvelope.map { hvEnvelope -> + try { + ColumnedDataReader(hvEnvelope.data.stream, "timestamp", "block", "value").toTable() + } catch (ex: IOException) { + LoggerFactory.getLogger(javaClass).error("Failed to load HV data from file", ex) + null + } + } + + } + + override fun getPoints(): Stream { + return pointEnvelopes.map { ClassicNumassPoint(it) } + } + + override fun pull(fragmentName: String): Envelope { + //PENDING read data to memory? + return items[fragmentName]?.get() + ?: throw StorageException("The fragment with name $fragmentName is not found in the loader $name") + } + + @Throws(StorageException::class) + override fun push(fragmentName: String, data: Envelope) { + tryPush() + TODO() + } + + override fun respond(message: Envelope): Envelope { + throw TODO("Not supported yet.") + } + + override fun getStartTime(): Instant { + return meta.optValue("start_time").map { it.timeValue() }.orElseGet { super.getStartTime() } + } + + override val isOpen: Boolean + get() = true + + override fun close() { + //do nothing + } + + + companion object { + + + @Throws(IOException::class) + fun fromFile(storage: Storage, zipFile: Path): NumassDataLoader { + throw UnsupportedOperationException("TODO") + } + + + /** + * Construct numass loader from directory + * + * @param storage + * @param directory + * @return + * @throws IOException + */ + @Throws(IOException::class) + fun fromDir(storage: Storage, directory: Path, name: String = FileStorage.entryName(directory)): NumassDataLoader { + if (!Files.isDirectory(directory)) { + throw IllegalArgumentException("Numass data directory required") + } + val annotation = MetaBuilder("loader") + .putValue("type", "numass") + .putValue("numass.loaderFormat", "dir") + // .setValue("file.timeCreated", Instant.ofEpochMilli(directory.getContent().getLastModifiedTime())) + .build() + + //FIXME envelopes are lazy do we need to do additional lazy evaluations here? + val items = LinkedHashMap>() + + Files.list(directory).filter { file -> + val fileName = file.fileName.toString() + (fileName == META_FRAGMENT_NAME + || fileName == HV_FRAGMENT_NAME + || fileName.startsWith(POINT_FRAGMENT_NAME)) + }.forEach { file -> + try { + items[FileStorage.entryName(file)] = Supplier{ NumassFileEnvelope.open(file, true) } + } catch (ex: Exception) { + LoggerFactory.getLogger(NumassDataLoader::class.java) + .error("Can't load numass data directory " + FileStorage.entryName(directory), ex) + } + } + + return NumassDataLoader(storage, name, annotation, items) + } + + /** + * "start_time": "2016-04-20T04:08:50", + * + * @param meta + * @return + */ + private fun readTime(meta: Meta): Instant { + return if (meta.hasValue("start_time")) { + meta.getValue("start_time").timeValue() + } else { + Instant.EPOCH + } + } + + /** + * The name of informational meta file in numass data directory + */ + val META_FRAGMENT_NAME = "meta" + + /** + * The beginning of point fragment name + */ + val POINT_FRAGMENT_NAME = "p" + + /** + * The beginning of hv fragment name + */ + val HV_FRAGMENT_NAME = "voltage" + } + +} diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt new file mode 100644 index 00000000..ff08b2b5 --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorage.kt @@ -0,0 +1,203 @@ +/* + * Copyright 2015 Alexander Nozik. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package inr.numass.data.storage + +import hep.dataforge.context.Context +import hep.dataforge.events.Event +import hep.dataforge.events.EventBuilder +import hep.dataforge.exceptions.StorageException +import hep.dataforge.meta.Meta +import hep.dataforge.storage.filestorage.FileStorage +import inr.numass.data.api.NumassSet +import inr.numass.data.legacy.NumassDatFile +import org.slf4j.LoggerFactory +import java.io.IOException +import java.nio.ByteBuffer +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.StandardOpenOption.CREATE +import java.nio.file.StandardOpenOption.WRITE +import java.util.* + + +/** + * The file storage containing numass data directories or zips. + * + * + * Any subdirectory is treated as numass data directory. Any zip must have + * `NUMASS_ZIP_EXTENSION` extension to be recognized. Any other files are + * ignored. + * + * + * @author Alexander Nozik + */ +class NumassStorage : FileStorage { + + val description: String + get() = meta.getString("description", "") + + @Throws(StorageException::class) + protected constructor(parent: FileStorage, config: Meta, shelf: String) : super(parent, config, shelf) { + super.refresh() + } + + @Throws(StorageException::class) + constructor(context: Context, config: Meta, path: Path) : super(context, config, path) { + super.refresh() + } + + override fun updateDirectoryLoaders() { + try { + this.loaders.clear() + Files.list(dataDir).forEach { file -> + try { + if (Files.isDirectory(file)) { + val metaFile = file.resolve(NumassDataLoader.META_FRAGMENT_NAME) + if (Files.exists(metaFile)) { + this.loaders[entryName(file)] = NumassDataLoader.fromDir(this, file) + } else { + this.shelves[entryName(file)] = NumassStorage(this, meta, entryName(file)) + } + } else if (file.fileName.endsWith(NUMASS_ZIP_EXTENSION)) { + this.loaders[entryName(file)] = NumassDataLoader.fromFile(this, file) + } else { + //updating non-numass loader files + updateFile(file) + } + } catch (ex: IOException) { + LoggerFactory.getLogger(javaClass).error("Error while creating numass loader", ex) + } catch (ex: StorageException) { + LoggerFactory.getLogger(javaClass).error("Error while creating numass group", ex) + } + } + } catch (ex: IOException) { + throw RuntimeException(ex) + } + + } + + @Throws(StorageException::class) + fun pushNumassData(path: String?, fileName: String, data: ByteBuffer) { + if (path == null || path.isEmpty()) { + pushNumassData(fileName, data) + } else { + val st = buildShelf(path) as NumassStorage + st.pushNumassData(fileName, data) + } + } + + /** + * Read nm.zip content and write it as a new nm.zip file + * + * @param fileName + */ + @Throws(StorageException::class) + fun pushNumassData(fileName: String, data: ByteBuffer) { + //FIXME move zip to internal + try { + val nmFile = dataDir.resolve(fileName + NUMASS_ZIP_EXTENSION) + if (Files.exists(nmFile)) { + LoggerFactory.getLogger(javaClass).warn("Trying to rewrite existing numass data file {}", nmFile.toString()) + } + Files.newByteChannel(nmFile, CREATE, WRITE).use { channel -> channel.write(data) } + + dispatchEvent(NumassDataPointEvent.build(name, fileName, Files.size(nmFile).toInt())) + } catch (ex: IOException) { + throw StorageException(ex) + } + + } + + @Throws(StorageException::class) + override fun createShelf(meta: Meta, path: String): NumassStorage { + return NumassStorage(this, meta, path) + } + + /** + * A list of legacy DAT files in the directory + * + * @return + */ + fun legacyFiles(): List { + try { + val files = ArrayList() + Files.list(dataDir).forEach { file -> + if (Files.isRegularFile(file) && file.fileName.toString().toLowerCase().endsWith(".dat")) { + val name = file.fileName.toString() + try { + files.add(NumassDatFile(file, Meta.empty())) + } catch (ex: Exception) { + LoggerFactory.getLogger(javaClass).error("Error while reading legacy numass file " + file.fileName, ex) + } + + } + } + return files + } catch (ex: IOException) { + throw RuntimeException(ex) + } + + } + + @Throws(Exception::class) + override fun close() { + super.close() + //close remote file system after use + try { + dataDir.fileSystem.close() + } catch (ex: UnsupportedOperationException) { + + } + + } + + class NumassDataPointEvent(meta: Meta) : Event(meta) { + + val fileSize: Int = meta.getInt(FILE_SIZE_KEY, 0) + + val fileName: String = meta.getString(FILE_NAME_KEY) + + override fun toString(): String { + return String.format("(%s) [%s] : pushed numass data file with name '%s' and size '%d'", + time().toString(), sourceTag(), fileName, fileSize) + } + + companion object { + + val FILE_NAME_KEY = "fileName" + val FILE_SIZE_KEY = "fileSize" + + fun build(source: String, fileName: String, fileSize: Int): NumassDataPointEvent { + return NumassDataPointEvent(builder(source, fileName, fileSize).buildEventMeta()) + } + + fun builder(source: String, fileName: String, fileSize: Int): EventBuilder<*> { + return EventBuilder.make("numass.storage.pushData") + .setSource(source) + .setMetaValue(FILE_NAME_KEY, fileName) + .setMetaValue(FILE_SIZE_KEY, fileSize) + } + } + + } + + companion object { + + val NUMASS_ZIP_EXTENSION = ".nm.zip" + val NUMASS_DATA_LOADER_TYPE = "numassData" + } + +} diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt new file mode 100644 index 00000000..b84be66f --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt @@ -0,0 +1,79 @@ +package inr.numass.data.storage + +import com.github.robtimus.filesystems.sftp.SFTPEnvironment +import hep.dataforge.context.Context +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.storage.api.Storage +import hep.dataforge.storage.api.StorageType +import hep.dataforge.storage.commons.StorageManager +import hep.dataforge.storage.filestorage.FileStorage +import java.net.URI +import java.nio.file.FileSystems +import java.nio.file.Path +import java.nio.file.Paths + +/** + * Created by darksnake on 17-May-17. + */ +class NumassStorageFactory : StorageType { + + override fun type(): String { + return "numass" + } + + override fun build(context: Context, meta: Meta): Storage { + if (meta.hasValue("path")) { + val uri = URI.create(meta.getString("path")) + val path: Path + if (uri.scheme.startsWith("ssh")) { + try { + val username = meta.getString("userName", uri.userInfo) + //String host = meta.getString("host", uri.getHost()); + val port = meta.getInt("port", 22) + val env = SFTPEnvironment() + .withUsername(username) + .withPassword(meta.getString("password", "").toCharArray()) + val fs = FileSystems.newFileSystem(uri, env, context.classLoader) + path = fs.getPath(uri.path) + } catch (e: Exception) { + throw RuntimeException(e) + } + + } else { + path = Paths.get(uri) + } + return NumassStorage(context, meta, path) + } else { + context.logger.warn("A storage path not provided. Creating default root storage in the working directory") + return NumassStorage(context, meta, context.io.workDir) + } + } + + companion object { + + /** + * Build local storage with Global context. Used for tests. + * + * @param file + * @return + */ + fun buildLocal(context: Context, file: Path, readOnly: Boolean, monitor: Boolean): FileStorage { + val manager = context.load(StorageManager::class.java, Meta.empty()) + return manager.buildStorage(buildStorageMeta(file.toUri(), readOnly, monitor)) as FileStorage + } + + fun buildLocal(context: Context, path: String, readOnly: Boolean, monitor: Boolean): FileStorage { + val file = context.io.dataDir.resolve(path) + return buildLocal(context, file, readOnly, monitor) + } + + fun buildStorageMeta(path: URI, readOnly: Boolean, monitor: Boolean): MetaBuilder { + return MetaBuilder("storage") + .setValue("path", path.toString()) + .setValue("type", "numass") + .setValue("readOnly", readOnly) + .setValue("monitor", monitor) + } + } +} diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt new file mode 100644 index 00000000..30c5a703 --- /dev/null +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt @@ -0,0 +1,86 @@ +package inr.numass.data.storage + +import hep.dataforge.io.envelopes.Envelope +import hep.dataforge.meta.Meta +import inr.numass.data.NumassProto +import inr.numass.data.api.NumassBlock +import inr.numass.data.api.NumassEvent +import inr.numass.data.api.NumassFrame +import inr.numass.data.api.NumassPoint +import inr.numass.data.legacy.NumassFileEnvelope +import java.io.IOException +import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.* +import java.util.stream.IntStream +import java.util.stream.Stream + +/** + * Protobuf based numass point + * Created by darksnake on 09.07.2017. + */ +class ProtoNumassPoint(private val envelope: Envelope) : NumassPoint { + + private val point: NumassProto.Point + get() = try { + envelope.data.stream.use { stream -> return NumassProto.Point.parseFrom(stream) } + } catch (ex: IOException) { + throw RuntimeException("Failed to read point via protobuf") + } + + override fun getBlocks(): Stream { + return point.channelsList.stream() + .flatMap { channel -> + channel.blocksList.stream() + .map { block -> ProtoBlock(channel.num.toInt(), block) } + .sorted(Comparator.comparing { it.startTime }) + } + } + + override fun getMeta(): Meta { + return envelope.meta + } + + private inner class ProtoBlock(internal val channel: Int, internal val block: NumassProto.Point.Channel.Block) : NumassBlock { + + override fun getStartTime(): Instant { + return ofEpochNanos(block.time) + } + + override fun getLength(): Duration { + return Duration.ofNanos((meta.getDouble("params.b_size") / meta.getDouble("params.sample_freq") * 1e9).toLong()) + } + + override fun getEvents(): Stream { + val blockTime = startTime + if (block.hasEvents()) { + val events = block.events + return IntStream.range(0, events.timesCount).mapToObj { i -> NumassEvent(events.getAmplitudes(i).toShort(), blockTime, events.getTimes(i)) } + } else { + return Stream.empty() + } + } + + override fun getFrames(): Stream { + val tickSize = Duration.ofNanos((1e9 / meta.getInt("params.sample_freq")).toLong()) + return block.framesList.stream().map { frame -> + val time = startTime.plusNanos(frame.time) + val data = frame.data.asReadOnlyByteBuffer() + NumassFrame(time, tickSize, data.asShortBuffer()) + } + } + } + + companion object { + fun readFile(path: Path): ProtoNumassPoint { + return ProtoNumassPoint(NumassFileEnvelope.open(path, true)) + } + + fun ofEpochNanos(nanos: Long): Instant { + val seconds = Math.floorDiv(nanos, 1e9.toInt().toLong()) + val reminder = (nanos % 1e9).toInt() + return Instant.ofEpochSecond(seconds, reminder.toLong()) + } + } +} diff --git a/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt b/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt index b6ffd6dd..b6b64fdc 100644 --- a/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt +++ b/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt @@ -41,7 +41,7 @@ class TransformDataAction : OneToOneAction() { meta.optMeta("corrections").ifPresent { cors -> MetaUtils.nodeStream(cors) - .map { it.value } + .map { it.second } .map { this.makeCorrection(it) } .forEach { corrections.add(it) } } diff --git a/numass-main/src/main/kotlin/inr/numass/models/sterile/NumassTransmission.kt b/numass-main/src/main/kotlin/inr/numass/models/sterile/NumassTransmission.kt index 61b03b91..dd9d03f1 100644 --- a/numass-main/src/main/kotlin/inr/numass/models/sterile/NumassTransmission.kt +++ b/numass-main/src/main/kotlin/inr/numass/models/sterile/NumassTransmission.kt @@ -58,10 +58,6 @@ class NumassTransmission(context: Context, meta: Meta) : AbstractParametricBiFun return LossCalculator.instance().getLossProbability(0, getX(eIn, set)) } - private fun getTrapFunction(context: Context, meta: Meta): BivariateFunction { - return FunctionLibrary.buildFrom(context).buildBivariateFunction(meta) - } - override fun derivValue(parName: String, eIn: Double, eOut: Double, set: Values): Double { return when (parName) { "trap" -> trapFunc.value(eIn, eOut) diff --git a/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt b/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt index 411fa0c8..3934c4eb 100644 --- a/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt +++ b/numass-main/src/main/kotlin/inr/numass/scripts/utils/ScanTree.kt @@ -59,7 +59,7 @@ private fun createSummaryNode(storage: Storage): MetaBuilder { fun calculateStatistics(summary: Meta, hv: Double): Meta { var totalLength = 0.0 var totalCount = 0L - MetaUtils.nodeStream(summary).map { it.value }.filter { it.name == "point" && it.getDouble("hv") == hv }.forEach { + MetaUtils.nodeStream(summary).map { it.second }.filter { it.name == "point" && it.getDouble("hv") == hv }.forEach { totalCount += it.getInt("count") totalLength += it.getDouble("length") } diff --git a/numass-main/src/test/java/inr/numass/models/TestModels.kt b/numass-main/src/test/java/inr/numass/models/TestModels.kt index 47996a54..3b395b7d 100644 --- a/numass-main/src/test/java/inr/numass/models/TestModels.kt +++ b/numass-main/src/test/java/inr/numass/models/TestModels.kt @@ -63,7 +63,7 @@ private fun oldModel(context: Context, meta: Meta): ParametricFunction { val A = meta.getDouble("resolution", meta.getDouble("resolution.width", 8.3e-5))//8.3e-5 val from = meta.getDouble("from", 13900.0) val to = meta.getDouble("to", 18700.0) - context.chronicle.report("Setting up tritium model with real transmission function") + context.history.report("Setting up tritium model with real transmission function") val resolutionTail: BivariateFunction = if (meta.hasValue("resolution.tailAlpha")) { ResolutionFunction.getAngledTail(meta.getDouble("resolution.tailAlpha"), meta.getDouble("resolution.tailBeta", 0.0)) @@ -73,7 +73,7 @@ private fun oldModel(context: Context, meta: Meta): ParametricFunction { //RangedNamedSetSpectrum beta = new BetaSpectrum(context.io().getFile("FS.txt")); val sp = ModularSpectrum(BetaSpectrum(), ResolutionFunction(A, resolutionTail), from, to) if (meta.getBoolean("caching", false)) { - context.chronicle.report("Caching turned on") + context.history.report("Caching turned on") sp.setCaching(true) } //Adding trapping energy dependence diff --git a/numass-server/src/main/java/inr/numass/server/HandlerUtils.java b/numass-server/src/main/java/inr/numass/server/HandlerUtils.java index 328a8416..89d457ea 100644 --- a/numass-server/src/main/java/inr/numass/server/HandlerUtils.java +++ b/numass-server/src/main/java/inr/numass/server/HandlerUtils.java @@ -6,7 +6,6 @@ package inr.numass.server; import hep.dataforge.storage.api.StateLoader; -import hep.dataforge.values.Value; /** * @@ -17,10 +16,9 @@ public class HandlerUtils { public static String renderStates(StateLoader states) { StringBuilder b = new StringBuilder(); b.append("
\n"); - for (String state : states.getStateNames()) { - Value val = states.getValue(state); + states.getValueStream().forEach(pair->{ String color; - switch (val.getType()) { + switch (pair.getSecond().getType()) { case NUMBER: color = "blue"; break; @@ -35,8 +33,8 @@ public class HandlerUtils { } b.append(String.format("

%s : %s

%n", - state, color, val.stringValue())); - } + pair.getFirst(), color, pair.getSecond().stringValue())); + }); b.append("
\n"); return b.toString(); } diff --git a/numass-server/src/main/java/inr/numass/server/NumassRun.java b/numass-server/src/main/java/inr/numass/server/NumassRun.java index f77af2ef..bcbcaf75 100644 --- a/numass-server/src/main/java/inr/numass/server/NumassRun.java +++ b/numass-server/src/main/java/inr/numass/server/NumassRun.java @@ -1,4 +1,4 @@ -/* +/* * Copyright 2015 Alexander Nozik. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -27,13 +27,17 @@ import hep.dataforge.storage.api.Storage; import hep.dataforge.storage.commons.LoaderFactory; import hep.dataforge.values.Value; import inr.numass.data.storage.NumassStorage; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Instant; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; +import static hep.dataforge.io.messages.MessagesKt.errorResponseBase; +import static hep.dataforge.io.messages.MessagesKt.okResponseBase; import static inr.numass.server.NumassServerUtils.getNotes; /** @@ -59,7 +63,6 @@ public class NumassRun implements Metoid, Responder { private final StateLoader states; private final ObjectLoader noteLoader; - private final MessageFactory factory; private final Logger logger; @@ -67,11 +70,10 @@ public class NumassRun implements Metoid, Responder { // * A set with inverted order of elements (last note first) // */ // private final Set notes = new TreeSet<>((NumassNote o1, NumassNote o2) -> -o1.time().compareTo(o2.time())); - public NumassRun(String path, Storage workStorage, MessageFactory factory) throws StorageException { + public NumassRun(String path, Storage workStorage) throws StorageException { this.storage = workStorage; - this.states = LoaderFactory.buildStateLoder(storage, RUN_STATE, null); - this.noteLoader = LoaderFactory.buildObjectLoder(storage, RUN_NOTES, null); - this.factory = factory; + this.states = LoaderFactory.buildStateLoder(storage, RUN_STATE, ""); + this.noteLoader = (ObjectLoader) LoaderFactory.buildObjectLoder(storage, RUN_NOTES, ""); this.runPath = path; logger = LoggerFactory.getLogger("CURRENT_RUN"); } @@ -141,15 +143,15 @@ public class NumassRun implements Metoid, Responder { } else { addNote(NumassNote.buildFrom(message.getMeta())); } - return factory.okResponseBase(message, false, false).build(); + return okResponseBase(message, false, false).build(); } catch (Exception ex) { logger.error("Failed to push note", ex); - return factory.errorResponseBase(message, ex).build(); + return errorResponseBase(message, ex).build(); } } private Envelope pullNotes(Envelope message) { - EnvelopeBuilder envelope = factory.okResponseBase(message, true, false); + EnvelopeBuilder envelope = okResponseBase(message, true, false); int limit = message.getMeta().getInt("limit", -1); //TODO add time window and search conditions here Stream stream = getNotes(noteLoader); @@ -165,17 +167,17 @@ public class NumassRun implements Metoid, Responder { try { String filePath = message.getMeta().getString("path", ""); String fileName = message.getMeta().getString("name") - .replace(NumassStorage.NUMASS_ZIP_EXTENSION, "");// removing .nm.zip if it is present + .replace(NumassStorage.Companion.getNUMASS_ZIP_EXTENSION(), "");// removing .nm.zip if it is present if (storage instanceof NumassStorage) { ((NumassStorage) storage).pushNumassData(filePath, fileName, message.getData().getBuffer()); } else { throw new StorageException("Storage does not support numass point push"); } //TODO add checksum here - return factory.okResponseBase("numass.data.push.response", false, false).build(); + return okResponseBase("numass.data.push.response", false, false).build(); } catch (StorageException | IOException ex) { logger.error("Failed to push point", ex); - return factory.errorResponseBase("numass.data.push.response", ex).build(); + return errorResponseBase("numass.data.push.response", ex).build(); } } @@ -196,4 +198,9 @@ public class NumassRun implements Metoid, Responder { return states; } + @NotNull + @Override + public CompletableFuture respondInFuture(@NotNull Envelope message) { + return CompletableFuture.supplyAsync(() -> respond(message)); + } } diff --git a/numass-server/src/main/java/inr/numass/server/NumassServer.java b/numass-server/src/main/java/inr/numass/server/NumassServer.java index ee0bab18..e2a07769 100644 --- a/numass-server/src/main/java/inr/numass/server/NumassServer.java +++ b/numass-server/src/main/java/inr/numass/server/NumassServer.java @@ -34,6 +34,9 @@ import ratpack.server.RatpackServer; import java.io.IOException; +import static hep.dataforge.io.messages.MessagesKt.errorResponseBase; +import static hep.dataforge.io.messages.MessagesKt.responseBase; + /** * @author darksnake */ @@ -67,7 +70,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar new StorageManager().startGlobal(); this.root = storage; try { - rootState = LoaderFactory.buildStateLoder(storage, "@numass", null); + rootState = LoaderFactory.buildStateLoder(storage, "@numass", ""); updateRun(); } catch (StorageException ex) { throw new RuntimeException(ex); @@ -99,8 +102,8 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar private void startRun(Meta meta) throws StorageException { String path = meta.getString("path", DEFAULT_RUN_PATH); - Storage storage = StorageUtils.getOrBuildShelf(root, path, meta); - run = new NumassRun(path, storage, getResponseFactory()); + Storage storage = StorageUtils.INSTANCE.getOrBuildShelf(root, path, meta); + run = new NumassRun(path, storage); getRootState().push("numass.current.run", path); } @@ -125,8 +128,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar case "numass.run.state": return getRun().respond(message); case "numass.control": - return getResponseFactory() - .errorResponseBase(message, + return errorResponseBase(message, new UnknownNumassActionException("numass.control", UnknownNumassActionException.Cause.IN_DEVELOPMENT)) .build(); @@ -144,7 +146,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar private void updateRun() throws StorageException { String currentRun = getRootState().getString("numass.current.run", DEFAULT_RUN_PATH); Storage storage = root.optShelf(currentRun).get(); - this.run = new NumassRun(currentRun, storage, getResponseFactory()); + this.run = new NumassRun(currentRun, storage); } /** @@ -159,7 +161,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar runAn.putNode(getRun().getMeta()); } - return getResponseFactory().responseBase("numass.run.response") + return responseBase("numass.run.response") .putMetaNode(runAn.build()).build(); } @@ -187,7 +189,7 @@ public class NumassServer extends AbstractNetworkListener implements ContextAwar throw new UnknownNumassActionException(action, UnknownNumassActionException.Cause.NOT_SUPPORTED); } } catch (StorageException ex) { - return getResponseFactory().errorResponseBase("numass.run.response", ex).build(); + return errorResponseBase("numass.run.response", ex).build(); } } diff --git a/numass-server/src/main/java/inr/numass/server/ServerRunner.java b/numass-server/src/main/java/inr/numass/server/ServerRunner.java index 4a244df4..36eb2c20 100644 --- a/numass-server/src/main/java/inr/numass/server/ServerRunner.java +++ b/numass-server/src/main/java/inr/numass/server/ServerRunner.java @@ -21,7 +21,7 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable { public static final String SERVER_CONFIG_PATH = "numass-server.xml"; private static final String NUMASS_REPO_ELEMENT = "numass.repository"; private static final String LISTENER_ELEMENT = "listener"; -// private static final String NUMASS_REPO_PATH_PROPERTY = "numass.repository.path"; + // private static final String NUMASS_REPO_PATH_PROPERTY = "numass.repository.path"; NumassStorage root; NumassServer listener; Context context = Global.INSTANCE.getContext("NUMASS_SERVER"); @@ -29,7 +29,7 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable { public ServerRunner() throws IOException, ParseException { // Global.instance().getPluginManager().load(StorageManager.class); - Path configFile = context.getIo().getFile(SERVER_CONFIG_PATH); + Path configFile = context.getIo().getFile(SERVER_CONFIG_PATH).getPath(); if (Files.exists(configFile)) { context.getLogger().info("Trying to read server configuration from {}", SERVER_CONFIG_PATH); configure(MetaFileReader.read(configFile)); @@ -56,16 +56,16 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable { public ServerRunner start() throws Exception { // String repoPath = meta().getString(NUMASS_REPO_PATH_PROPERTY, "."); - Meta storageMeta = getMeta().getMetaOrEmpty(NUMASS_REPO_ELEMENT); - context.getLogger().info("Initializing file storage with meta: {}",storageMeta); - root = (NumassStorage) StorageManager.buildStorage(context,storageMeta); + Meta storageMeta = getConfig().getMetaOrEmpty(NUMASS_REPO_ELEMENT); + context.getLogger().info("Initializing file storage with meta: {}", storageMeta); + root = (NumassStorage) StorageManager.Companion.buildStorage(context, storageMeta); context.getLogger().info("Starting numass server"); if (root != null) { root.open(); Meta listenerConfig = null; - if (getMeta().hasMeta(LISTENER_ELEMENT)) { - listenerConfig = getMeta().getMeta(LISTENER_ELEMENT); + if (getConfig().hasMeta(LISTENER_ELEMENT)) { + listenerConfig = getConfig().getMeta(LISTENER_ELEMENT); } listener = new NumassServer(root, listenerConfig); diff --git a/numass-server/src/main/java/inr/numass/server/TestServer.java b/numass-server/src/main/java/inr/numass/server/TestServer.java index a8914979..be4a13d4 100644 --- a/numass-server/src/main/java/inr/numass/server/TestServer.java +++ b/numass-server/src/main/java/inr/numass/server/TestServer.java @@ -27,7 +27,7 @@ public class TestServer { * @throws hep.dataforge.exceptions.StorageException */ public static void main(String[] args) throws Exception { - Context context = Global.Companion.getContext("NUMASS-SERVER"); + Context context = Global.INSTANCE.getContext("NUMASS-SERVER"); StorageManager storageManager = context.getPluginManager().load(StorageManager.class); @@ -36,7 +36,7 @@ public class TestServer { File path = new File("/D:/temp/test"); context.getLogger().info("Starting test numass storage servlet in '{}'", path); - NumassStorage storage = (NumassStorage) storageManager.buildStorage(NumassStorageFactory.buildStorageMeta(path.toURI(), true, true)); + NumassStorage storage = (NumassStorage) storageManager.buildStorage(NumassStorageFactory.Companion.buildStorageMeta(path.toURI(), true, true)); StorageServerUtils.addStorage(serverManager,storage,"numass-storage"); serverManager.startServer(); diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/NumassDataCache.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/NumassDataCache.kt index c9988efb..cf3327cb 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/NumassDataCache.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/NumassDataCache.kt @@ -13,7 +13,7 @@ import java.util.stream.Stream * Created by darksnake on 23-Jun-17. */ class NumassDataCache(val data: NumassSet) : NumassSet { - private val cachedDescription: String by lazy { data.description } + //private val cachedDescription: String by lazy { data.description } private val cachedMeta: Meta by lazy { data.meta } private val cachedPoints: List by lazy { data.points.collect(Collectors.toList()) } private val hv: Optional
by lazy { data.hvData } @@ -23,9 +23,9 @@ class NumassDataCache(val data: NumassSet) : NumassSet { return cachedPoints.stream(); } - override fun getDescription(): String { - return cachedDescription - } +// override fun getDescription(): String { +// return cachedDescription +// } override fun getMeta(): Meta { return cachedMeta diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt index 43f6be87..b74109f5 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/StorageView.kt @@ -34,7 +34,7 @@ import java.io.File import java.net.URI import kotlin.streams.toList -class StorageView(private val context: Context = Global.instance()) : View(title = "Numass storage", icon = ImageView(dfIcon)) { +class StorageView(private val context: Context = Global) : View(title = "Numass storage", icon = ImageView(dfIcon)) { val storageProperty = SimpleObjectProperty() diff --git a/numass-viewer/src/main/kotlin/inr/numass/viewer/test/ComponentTest.kt b/numass-viewer/src/main/kotlin/inr/numass/viewer/test/ComponentTest.kt index cd687710..a93d233b 100644 --- a/numass-viewer/src/main/kotlin/inr/numass/viewer/test/ComponentTest.kt +++ b/numass-viewer/src/main/kotlin/inr/numass/viewer/test/ComponentTest.kt @@ -1,7 +1,7 @@ package inr.numass.viewer.test +import hep.dataforge.context.Global import hep.dataforge.fx.dfIcon -import hep.dataforge.kodex.global import hep.dataforge.tables.Table import inr.numass.data.api.NumassPoint import inr.numass.data.api.NumassSet @@ -35,7 +35,7 @@ class ViewerComponentsTest : View(title = "Numass viewer test", icon = ImageView button("Click me!") { action { runAsync { - val set: NumassSet = NumassStorageFactory.buildLocal(global, "D:\\Work\\Numass\\data\\2017_05\\Fill_2", true, true) + val set: NumassSet = NumassStorageFactory.buildLocal(Global, "D:\\Work\\Numass\\data\\2017_05\\Fill_2", true, true) .provide("loader::set_2", NumassSet::class.java) .orElseThrow { RuntimeException("err") } update(set);