From 11f130d8f5392844a4773f5948abb2f9abe31f41 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Thu, 17 May 2018 19:05:10 +0300 Subject: [PATCH] Separated Input and Output management. Input moved to context --- .../inr/numass/control/NumassControlUtils.kt | 2 +- .../kotlin/inr/numass/data/NumassDataUtils.kt | 20 +++---- .../kotlin/inr/numass/data/api/NumassBlock.kt | 12 +++-- .../inr/numass/data/api/SimpleNumassPoint.kt | 2 +- .../inr/numass/data/legacy/NumassDatFile.kt | 4 +- .../data/storage/NumassStorageFactory.kt | 4 +- .../numass/data/storage/ProtoNumassPoint.kt | 2 +- .../numass/scripts/models/TristanModel.groovy | 2 +- .../scripts/workspace/TestActions.groovy | 4 +- .../src/main/java/inr/numass/Main.java | 13 +++-- .../numass/actions/MonitorCorrectAction.java | 4 +- .../actions/SubstractSpectrumAction.java | 4 +- .../models/TransmissionInterpolator.java | 2 +- .../java/inr/numass/utils/OldDataReader.java | 8 +-- .../main/kotlin/inr/numass/NumassPlugin.kt | 2 +- .../src/main/kotlin/inr/numass/NumassUtils.kt | 4 +- .../inr/numass/actions/MergeDataAction.kt | 2 +- .../inr/numass/actions/SummaryAction.kt | 2 +- .../inr/numass/actions/TransformDataAction.kt | 2 +- .../main/kotlin/inr/numass/data/Generator.kt | 11 ++-- .../kotlin/inr/numass/data/PileUpSimulator.kt | 9 ++-- .../main/kotlin/inr/numass/scripts/Bunches.kt | 23 +++++--- .../scripts/timeanalysis/TestAnalyzer.kt | 27 +++++++--- .../numass/scripts/tristan/AnalyzeTristan.kt | 52 ++++++++++--------- .../numass/tasks/NumassFitScanSummaryTask.kt | 2 +- .../kotlin/inr/numass/tasks/NumassTasks.kt | 10 ++-- .../java/inr/numass/server/ServerRunner.java | 2 +- 27 files changed, 131 insertions(+), 100 deletions(-) diff --git a/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt b/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt index ae4cfb00..fcef9df5 100644 --- a/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt +++ b/numass-control/src/main/kotlin/inr/numass/control/NumassControlUtils.kt @@ -57,7 +57,7 @@ fun connectStorage(device: Device, config: Meta) { } fun readResourceMeta(path: String): Meta { - val resource = Global.io.optResource(path).nullable + val resource = Global.output.optResource(path).nullable if (resource != null) { return XMLMetaReader().read(resource.stream) } else { diff --git a/numass-core/src/main/kotlin/inr/numass/data/NumassDataUtils.kt b/numass-core/src/main/kotlin/inr/numass/data/NumassDataUtils.kt index 0d7dbb3c..f49dd92f 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/NumassDataUtils.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/NumassDataUtils.kt @@ -86,34 +86,34 @@ val NumassBlock.channel: Int } -fun NumassBlock.transformChain(transform: (NumassEvent, NumassEvent) -> Pair?): NumassBlock { - return SimpleBlock(this.startTime, this.length) { owner -> +suspend fun NumassBlock.transformChain(transform: (NumassEvent, NumassEvent) -> Pair?): NumassBlock { + return SimpleBlock.produce(this.startTime, this.length) { this.events.asSequence() .sortedBy { it.timeOffset } .zipWithNext(transform) .filterNotNull() - .map { NumassEvent(it.first, it.second, owner) }.asIterable() + .map { OrphanNumassEvent(it.first, it.second) }.asIterable() } } -fun NumassBlock.filterChain(condition: (NumassEvent, NumassEvent) -> Boolean): NumassBlock { - return SimpleBlock(this.startTime, this.length) { owner -> +suspend fun NumassBlock.filterChain(condition: (NumassEvent, NumassEvent) -> Boolean): NumassBlock { + return SimpleBlock.produce(this.startTime, this.length) { this.events.asSequence() .sortedBy { it.timeOffset } .zipWithNext().filter { condition.invoke(it.first, it.second) }.map { it.second }.asIterable() } } -fun NumassBlock.filter(condition: (NumassEvent) -> Boolean): NumassBlock { - return SimpleBlock(this.startTime, this.length) { owner -> +suspend fun NumassBlock.filter(condition: (NumassEvent) -> Boolean): NumassBlock { + return SimpleBlock.produce(this.startTime, this.length) { this.events.asSequence().filter(condition).asIterable() } } -fun NumassBlock.transform(transform: (NumassEvent) -> OrphanNumassEvent): NumassBlock { - return SimpleBlock(this.startTime, this.length) { owner -> +suspend fun NumassBlock.transform(transform: (NumassEvent) -> OrphanNumassEvent): NumassBlock { + return SimpleBlock.produce(this.startTime, this.length) { this.events.asSequence() - .map { transform(it).adopt(owner) } + .map { transform(it) } .asIterable() } } \ No newline at end of file diff --git a/numass-core/src/main/kotlin/inr/numass/data/api/NumassBlock.kt b/numass-core/src/main/kotlin/inr/numass/data/api/NumassBlock.kt index c004642f..ac0afcce 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/api/NumassBlock.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/api/NumassBlock.kt @@ -1,7 +1,6 @@ package inr.numass.data.api import inr.numass.data.channel -import kotlinx.coroutines.experimental.runBlocking import java.io.Serializable import java.time.Duration import java.time.Instant @@ -76,13 +75,20 @@ fun OrphanNumassEvent.adopt(parent: NumassBlock): NumassEvent { class SimpleBlock( override val startTime: Instant, override val length: Duration, - producer: suspend (NumassBlock) -> Iterable) : NumassBlock, Serializable { + rawEvents: Iterable +) : NumassBlock, Serializable { - private val eventList = runBlocking { producer(this@SimpleBlock).toList() } + private val eventList by lazy { rawEvents.map { it.adopt(this) } } override val frames: Stream get() = Stream.empty() override val events: Stream get() = eventList.stream() + companion object { + suspend fun produce(startTime: Instant, length: Duration, producer: suspend () -> Iterable): SimpleBlock { + return SimpleBlock(startTime, length, producer()) + } + } + } \ No newline at end of file diff --git a/numass-core/src/main/kotlin/inr/numass/data/api/SimpleNumassPoint.kt b/numass-core/src/main/kotlin/inr/numass/data/api/SimpleNumassPoint.kt index b3cda470..6b757912 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/api/SimpleNumassPoint.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/api/SimpleNumassPoint.kt @@ -8,7 +8,7 @@ import hep.dataforge.meta.MetaHolder * A simple static implementation of NumassPoint * Created by darksnake on 08.07.2017. */ -class SimpleNumassPoint(override val blocks: List, meta: Meta) : MetaHolder(meta), NumassPoint { +class SimpleNumassPoint(override val blocks: List, meta: Meta, override val isSequential: Boolean = true) : MetaHolder(meta), NumassPoint { /** * Input blocks must be sorted diff --git a/numass-core/src/main/kotlin/inr/numass/data/legacy/NumassDatFile.kt b/numass-core/src/main/kotlin/inr/numass/data/legacy/NumassDatFile.kt index 48515232..3db91de6 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/legacy/NumassDatFile.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/legacy/NumassDatFile.kt @@ -162,9 +162,7 @@ constructor(override val name: String, private val path: Path, meta: Meta) : Num voltage / 10.0 } - val block = SimpleBlock(absoluteTime.toInstant(ZoneOffset.UTC), Duration.ofSeconds(length.toLong())) { parent -> - events.map { it.adopt(parent) } - } + val block = SimpleBlock(absoluteTime.toInstant(ZoneOffset.UTC), Duration.ofSeconds(length.toLong()), events) val pointMeta = MetaBuilder("point") .setValue(HV_KEY, uSet) diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt index 94201562..790528db 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/NumassStorageFactory.kt @@ -51,7 +51,7 @@ class NumassStorageFactory : StorageType { return NumassStorage(context, meta, path) } else { context.logger.warn("A storage path not provided. Creating default root storage in the working directory") - return NumassStorage(context, meta, context.io.workDir) + return NumassStorage(context, meta, context.workDir) } } @@ -69,7 +69,7 @@ class NumassStorageFactory : StorageType { } fun buildLocal(context: Context, path: String, readOnly: Boolean, monitor: Boolean): FileStorage { - val file = context.io.dataDir.resolve(path) + val file = context.dataDir.resolve(path) return buildLocal(context, file, readOnly, monitor) } diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt index 8c12f6fa..ba1e01df 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt @@ -70,7 +70,7 @@ class ProtoNumassPoint(override val meta: Meta, val protoBuilder: () -> NumassPr } fun readFile(path: String, context: Context = Global): ProtoNumassPoint { - return readFile(context.io.getFile(path).absolutePath) + return readFile(context.getFile(path).absolutePath) } fun ofEpochNanos(nanos: Long): Instant { diff --git a/numass-main/src/main/groovy/inr/numass/scripts/models/TristanModel.groovy b/numass-main/src/main/groovy/inr/numass/scripts/models/TristanModel.groovy index d21db35b..af21810a 100644 --- a/numass-main/src/main/groovy/inr/numass/scripts/models/TristanModel.groovy +++ b/numass-main/src/main/groovy/inr/numass/scripts/models/TristanModel.groovy @@ -97,6 +97,6 @@ new GrindShell(ctx).eval { def res = fm.runStage(state, "MINUIT", FitStage.TASK_RUN, "N", "bkg", "E0", "U2"); - res.printState(ctx.io.out().newPrintWriter()); + res.printState(ctx.getOutput.out().newPrintWriter()); NumassIOKt.display(res, ctx, "fit") } \ No newline at end of file diff --git a/numass-main/src/main/groovy/inr/numass/scripts/workspace/TestActions.groovy b/numass-main/src/main/groovy/inr/numass/scripts/workspace/TestActions.groovy index 7173ec42..e089b1dd 100644 --- a/numass-main/src/main/groovy/inr/numass/scripts/workspace/TestActions.groovy +++ b/numass-main/src/main/groovy/inr/numass/scripts/workspace/TestActions.groovy @@ -2,7 +2,7 @@ package inr.numass.scripts.workspace import hep.dataforge.actions.ActionUtils import hep.dataforge.context.Context -import hep.dataforge.io.IOManager +import hep.dataforge.io.OutputManager import inr.numass.NumassPlugin /** @@ -12,5 +12,5 @@ import inr.numass.NumassPlugin Context context = new Context("numass"); context.loadPlugin(new NumassPlugin()); -context.setValue(IOManager.ROOT_DIRECTORY_CONTEXT_KEY, "D:\\Work\\Numass\\sterile2016"); +context.setValue(OutputManager.ROOT_DIRECTORY_CONTEXT_KEY, "D:\\Work\\Numass\\sterile2016"); ActionUtils.runConfig(context, "test.xml").computeAll() \ No newline at end of file diff --git a/numass-main/src/main/java/inr/numass/Main.java b/numass-main/src/main/java/inr/numass/Main.java index de556851..591e44a7 100644 --- a/numass-main/src/main/java/inr/numass/Main.java +++ b/numass-main/src/main/java/inr/numass/Main.java @@ -18,7 +18,6 @@ package inr.numass; import hep.dataforge.actions.ActionUtils; import hep.dataforge.context.Context; import hep.dataforge.context.Global; -import hep.dataforge.io.IOManager; import hep.dataforge.io.MetaFileReader; import hep.dataforge.meta.Meta; import org.apache.commons.cli.*; @@ -86,7 +85,7 @@ public class Main { return; } - java.nio.file.Path configFile = context.getIo().getRootDir().resolve(cfgPath); + java.nio.file.Path configFile = context.getRootDir().resolve(cfgPath); if (!Files.exists(configFile)) { throw new FileNotFoundException("Configuration file not found"); @@ -94,7 +93,7 @@ public class Main { Meta config = MetaFileReader.Companion.read(configFile); - context.setValue(IOManager.ROOT_DIRECTORY_CONTEXT_KEY, configFile.getParent().toString()); + context.setValue(Context.ROOT_DIRECTORY_CONTEXT_KEY, configFile.getParent().toString()); applyCLItoContext(line, context); @@ -103,11 +102,11 @@ public class Main { } public static void applyCLItoContext(CommandLine line, Context context) throws FileNotFoundException { - File workDir = new File(context.getString(IOManager.ROOT_DIRECTORY_CONTEXT_KEY)); + File workDir = new File(context.getString(Context.ROOT_DIRECTORY_CONTEXT_KEY)); if (line.hasOption("h")) { workDir = new File(line.getOptionValue("h")); - context.setValue(IOManager.ROOT_DIRECTORY_CONTEXT_KEY, workDir.toString()); + context.setValue(Context.ROOT_DIRECTORY_CONTEXT_KEY, workDir.toString()); } if (line.hasOption("d")) { @@ -117,7 +116,7 @@ public class Main { dataDir = new File(workDir, dataPath); } if (dataDir.exists() && dataDir.isDirectory()) { - context.setValue(IOManager.DATA_DIRECTORY_CONTEXT_KEY, dataDir.getAbsolutePath()); + context.setValue(Context.DATA_DIRECTORY_CONTEXT_KEY, dataDir.getAbsolutePath()); } else { throw new FileNotFoundException("Data directory not found"); } @@ -132,7 +131,7 @@ public class Main { if (!outDir.exists()) { outDir.mkdirs(); } - context.setValue(IOManager.WORK_DIRECTORY_CONTEXT_KEY, outDir.toString()); + context.setValue(Context.WORK_DIRECTORY_CONTEXT_KEY, outDir.toString()); } } diff --git a/numass-main/src/main/java/inr/numass/actions/MonitorCorrectAction.java b/numass-main/src/main/java/inr/numass/actions/MonitorCorrectAction.java index a085b635..2bb75fef 100644 --- a/numass-main/src/main/java/inr/numass/actions/MonitorCorrectAction.java +++ b/numass-main/src/main/java/inr/numass/actions/MonitorCorrectAction.java @@ -129,7 +129,7 @@ public class MonitorCorrectAction extends OneToOneAction { // } Table res = ListTable.infer(dataList); - context.getIo().output(name, getName()).render(NumassUtils.INSTANCE.wrap(res, meta), Meta.empty()); + context.getOutput().get(name, getName()).render(NumassUtils.INSTANCE.wrap(res, meta), Meta.empty()); return res; } @@ -194,7 +194,7 @@ public class MonitorCorrectAction extends OneToOneAction { String monitorFileName = meta.getString("monitorFile", "monitor"); ListTable data = ListTable.infer(monitorPoints); - context.getIo().output(monitorFileName, getName()).render(NumassUtils.INSTANCE.wrap(data, meta), Meta.empty()); + context.getOutput().get(monitorFileName, getName()).render(NumassUtils.INSTANCE.wrap(data, meta), Meta.empty()); // ColumnedDataWriter.writeTable(stream, TableTransform.sort(data, "Timestamp", true), "Monitor points", monitorNames); } } diff --git a/numass-main/src/main/java/inr/numass/actions/SubstractSpectrumAction.java b/numass-main/src/main/java/inr/numass/actions/SubstractSpectrumAction.java index 8b84bc2c..380858df 100644 --- a/numass-main/src/main/java/inr/numass/actions/SubstractSpectrumAction.java +++ b/numass-main/src/main/java/inr/numass/actions/SubstractSpectrumAction.java @@ -31,7 +31,7 @@ public class SubstractSpectrumAction extends OneToOneAction { protected Table execute(Context context, String name, Table input, Laminate inputMeta) { try { String referencePath = inputMeta. getString("file", "empty.dat"); - Path referenceFile = context.getIo().getRootDir().resolve(referencePath); + Path referenceFile = context.getRootDir().resolve(referencePath); Table referenceTable = new ColumnedDataReader(referenceFile).toTable(); ListTable.Builder builder = new ListTable.Builder(input.getFormat()); input.getRows().forEach(point -> { @@ -49,7 +49,7 @@ public class SubstractSpectrumAction extends OneToOneAction { Table res = builder.build(); - context.getIo().output(name, getName()).render(NumassUtils.INSTANCE.wrap(res, inputMeta), Meta.empty()); + context.getOutput().get(name, getName()).render(NumassUtils.INSTANCE.wrap(res, inputMeta), Meta.empty()); return res; } catch (IOException ex) { throw new RuntimeException("Could not read reference file", ex); diff --git a/numass-main/src/main/java/inr/numass/models/TransmissionInterpolator.java b/numass-main/src/main/java/inr/numass/models/TransmissionInterpolator.java index 7b4c108d..205894ea 100644 --- a/numass-main/src/main/java/inr/numass/models/TransmissionInterpolator.java +++ b/numass-main/src/main/java/inr/numass/models/TransmissionInterpolator.java @@ -40,7 +40,7 @@ public class TransmissionInterpolator implements UnivariateFunction { public static TransmissionInterpolator fromFile(Context context, String path, String xName, String yName, int nSmooth, double w, double border) { try { - Path dataFile = context.getIo().getRootDir().resolve(path); + Path dataFile = context.getRootDir().resolve(path); ColumnedDataReader reader = new ColumnedDataReader(Files.newInputStream(dataFile)); return new TransmissionInterpolator(reader, xName, yName, nSmooth, w, border); } catch (IOException ex) { diff --git a/numass-main/src/main/java/inr/numass/utils/OldDataReader.java b/numass-main/src/main/java/inr/numass/utils/OldDataReader.java index 28a893f3..a023bafa 100644 --- a/numass-main/src/main/java/inr/numass/utils/OldDataReader.java +++ b/numass-main/src/main/java/inr/numass/utils/OldDataReader.java @@ -40,7 +40,7 @@ public class OldDataReader { public static Table readConfig(String path) throws IOException { String[] list = {"X", "time", "ushift"}; ListTable.Builder res = new ListTable.Builder(list); - Path file = Global.INSTANCE.getIo().getRootDir().resolve(path); + Path file = Global.INSTANCE.getRootDir().resolve(path); Scanner sc = new Scanner(file); sc.nextLine(); @@ -62,7 +62,7 @@ public class OldDataReader { public static Table readData(String path, double Elow) { SpectrumAdapter factory = new SpectrumAdapter(Meta.empty()); ListTable.Builder res = new ListTable.Builder(Adapters.getFormat(factory)); - Path file = Global.INSTANCE.getIo().getRootDir().resolve(path); + Path file = Global.INSTANCE.getRootDir().resolve(path); double x; int count; int time; @@ -114,7 +114,7 @@ public class OldDataReader { public static Table readDataAsGun(String path, double Elow) { SpectrumAdapter factory = new SpectrumAdapter(Meta.empty()); ListTable.Builder res = new ListTable.Builder(Adapters.getFormat(factory)); - Path file = Global.INSTANCE.getIo().getRootDir().resolve(path); + Path file = Global.INSTANCE.getRootDir().resolve(path); double x; long count; int time; @@ -147,7 +147,7 @@ public class OldDataReader { public static Table readSpectrumData(String path) { SpectrumAdapter factory = new SpectrumAdapter(Meta.empty()); ListTable.Builder res = new ListTable.Builder(Adapters.getFormat(factory)); - Path file = Global.INSTANCE.getIo().getRootDir().resolve(path); + Path file = Global.INSTANCE.getRootDir().resolve(path); double x; double count; double time; diff --git a/numass-main/src/main/kotlin/inr/numass/NumassPlugin.kt b/numass-main/src/main/kotlin/inr/numass/NumassPlugin.kt index 8e209bc9..837939b0 100644 --- a/numass-main/src/main/kotlin/inr/numass/NumassPlugin.kt +++ b/numass-main/src/main/kotlin/inr/numass/NumassPlugin.kt @@ -43,7 +43,7 @@ import org.apache.commons.math3.util.FastMath @PluginDef( group = "inr.numass", name = "numass", - dependsOn = arrayOf("hep.dataforge:functions", "hep.dataforge:MINUIT", "hep.dataforge:actions", "hep.dataforge:io.dir"), + dependsOn = arrayOf("hep.dataforge:functions", "hep.dataforge:MINUIT", "hep.dataforge:actions", "hep.dataforge:output.dir"), support = false, info = "Numass data analysis tools" ) diff --git a/numass-main/src/main/kotlin/inr/numass/NumassUtils.kt b/numass-main/src/main/kotlin/inr/numass/NumassUtils.kt index a87a5c6f..ae44231d 100644 --- a/numass-main/src/main/kotlin/inr/numass/NumassUtils.kt +++ b/numass-main/src/main/kotlin/inr/numass/NumassUtils.kt @@ -177,8 +177,8 @@ object NumassUtils { fun getFSS(context: Context, meta: Meta): FSS? { return if (meta.getBoolean("useFSS", true)) { val fssBinary: Binary? = meta.optString("fssFile") - .map { fssFile -> context.io.getFile(fssFile).binary } - .orElse(context.io.optResource("data/FS.txt").nullable) + .map { fssFile -> context.getFile(fssFile).binary } + .orElse(context.optResource("data/FS.txt").nullable) fssBinary?.let { FSS(it.stream) } ?: throw RuntimeException("Could not load FSS file") } else { null diff --git a/numass-main/src/main/kotlin/inr/numass/actions/MergeDataAction.kt b/numass-main/src/main/kotlin/inr/numass/actions/MergeDataAction.kt index 0d06c85e..71740d86 100644 --- a/numass-main/src/main/kotlin/inr/numass/actions/MergeDataAction.kt +++ b/numass-main/src/main/kotlin/inr/numass/actions/MergeDataAction.kt @@ -60,7 +60,7 @@ class MergeDataAction : ManyToOneAction() { } override fun afterGroup(context: Context, groupName: String, outputMeta: Meta, output: Table) { - context.io.output(groupName, name).render(NumassUtils.wrap(output, outputMeta)) + context.output.get(groupName, name).render(NumassUtils.wrap(output, outputMeta)) super.afterGroup(context, groupName, outputMeta, output) } diff --git a/numass-main/src/main/kotlin/inr/numass/actions/SummaryAction.kt b/numass-main/src/main/kotlin/inr/numass/actions/SummaryAction.kt index 5bd45e1f..2e6c26c2 100644 --- a/numass-main/src/main/kotlin/inr/numass/actions/SummaryAction.kt +++ b/numass-main/src/main/kotlin/inr/numass/actions/SummaryAction.kt @@ -108,7 +108,7 @@ object SummaryAction : ManyToOneAction() { } override fun afterGroup(context: Context, groupName: String, outputMeta: Meta, output: Table) { - context.io.output(groupName, name).render(NumassUtils.wrap(output, outputMeta)) + context.output.get(groupName, name).render(NumassUtils.wrap(output, outputMeta)) super.afterGroup(context, groupName, outputMeta, output) } diff --git a/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt b/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt index 9cfc634c..e8c1d679 100644 --- a/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt +++ b/numass-main/src/main/kotlin/inr/numass/actions/TransformDataAction.kt @@ -103,7 +103,7 @@ class TransformDataAction : OneToOneAction() { val res = table.addColumn(ListColumn.build(table.getColumn(COUNT_RATE_KEY).format, cr.stream())) .addColumn(ListColumn.build(table.getColumn(COUNT_RATE_ERROR_KEY).format, crErr.stream())) - context.io.output(name, name).render(NumassUtils.wrap(res, meta)) + context.output.get(name, name).render(NumassUtils.wrap(res, meta)) return res } diff --git a/numass-main/src/main/kotlin/inr/numass/data/Generator.kt b/numass-main/src/main/kotlin/inr/numass/data/Generator.kt index f5a53edd..4f3c8b78 100644 --- a/numass-main/src/main/kotlin/inr/numass/data/Generator.kt +++ b/numass-main/src/main/kotlin/inr/numass/data/Generator.kt @@ -7,8 +7,9 @@ import hep.dataforge.stat.defaultGenerator import hep.dataforge.tables.Table import inr.numass.data.analyzers.NumassAnalyzer.Companion.CHANNEL_KEY import inr.numass.data.analyzers.NumassAnalyzer.Companion.COUNT_RATE_KEY -import inr.numass.data.api.* -import kotlinx.coroutines.experimental.channels.map +import inr.numass.data.api.NumassBlock +import inr.numass.data.api.OrphanNumassEvent +import inr.numass.data.api.SimpleBlock import kotlinx.coroutines.experimental.channels.takeWhile import kotlinx.coroutines.experimental.channels.toList import org.apache.commons.math3.distribution.EnumeratedRealDistribution @@ -24,9 +25,9 @@ private fun RandomGenerator.nextDeltaTime(cr: Double): Long { return (nextExp(1.0 / cr) * 1e9).toLong() } -fun generateBlock(start: Instant, length: Long, chain: Chain): NumassBlock { - return SimpleBlock(start, Duration.ofNanos(length)) { parent -> - chain.channel.map { it.adopt(parent) }.takeWhile { it.timeOffset < length }.toList() +suspend fun Chain.generateBlock(start: Instant, length: Long): NumassBlock { + return SimpleBlock.produce(start, Duration.ofNanos(length)) { + channel.takeWhile { it.timeOffset < length }.toList() } } diff --git a/numass-main/src/main/kotlin/inr/numass/data/PileUpSimulator.kt b/numass-main/src/main/kotlin/inr/numass/data/PileUpSimulator.kt index ef01a0ab..bd9ee8c8 100644 --- a/numass-main/src/main/kotlin/inr/numass/data/PileUpSimulator.kt +++ b/numass-main/src/main/kotlin/inr/numass/data/PileUpSimulator.kt @@ -25,7 +25,6 @@ import hep.dataforge.maths.chain.Chain import inr.numass.data.api.NumassBlock import inr.numass.data.api.OrphanNumassEvent import inr.numass.data.api.SimpleBlock -import inr.numass.data.api.adopt import kotlinx.coroutines.experimental.runBlocking import org.apache.commons.math3.random.RandomGenerator import java.lang.Math.max @@ -67,15 +66,15 @@ class PileUpSimulator { // } fun generated(): NumassBlock { - return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength)) { parent -> generated.map { it.adopt(parent) } } + return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength), generated) } fun registered(): NumassBlock { - return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength)) { parent -> registered.map { it.adopt(parent) } } + return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength), registered) } fun pileup(): NumassBlock { - return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength)) { parent -> pileup.map { it.adopt(parent) } } + return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength), pileup) } /** @@ -126,7 +125,7 @@ class PileUpSimulator { fun generate() { var next: OrphanNumassEvent //var lastRegisteredTime = 0.0 // Time of DAQ closing - val last = AtomicReference(OrphanNumassEvent(0,0)) + val last = AtomicReference(OrphanNumassEvent(0, 0)) //flag that shows that previous event was pileup var pileupFlag = false diff --git a/numass-main/src/main/kotlin/inr/numass/scripts/Bunches.kt b/numass-main/src/main/kotlin/inr/numass/scripts/Bunches.kt index 1ccedf95..2731159e 100644 --- a/numass-main/src/main/kotlin/inr/numass/scripts/Bunches.kt +++ b/numass-main/src/main/kotlin/inr/numass/scripts/Bunches.kt @@ -8,6 +8,9 @@ import inr.numass.data.buildBunchChain import inr.numass.data.generateBlock import inr.numass.data.generateEvents import inr.numass.data.mergeEventChains +import kotlinx.coroutines.experimental.channels.produce +import kotlinx.coroutines.experimental.channels.toList +import kotlinx.coroutines.experimental.runBlocking import java.time.Instant fun main(args: Array) { @@ -18,14 +21,22 @@ fun main(args: Array) { val length = 1e12.toLong() val num = 60; - val blocks = (1..num).map { - val regularChain = generateEvents(cr) - val bunchChain = buildBunchChain(40.0, 0.01, 5.0) + val start = Instant.now() - val generator = mergeEventChains(regularChain, bunchChain) - generateBlock(Instant.now().plusNanos(it * length), length, generator) + val blockchannel = produce { + (1..num).forEach { + val regularChain = generateEvents(cr) + val bunchChain = buildBunchChain(40.0, 0.01, 5.0) + + send(mergeEventChains(regularChain, bunchChain).generateBlock(start.plusNanos(it * length), length)) + } } + val blocks = runBlocking { + blockchannel.toList() + } + + val point = SimpleNumassPoint(blocks, 10000.0) val meta = buildMeta { @@ -37,7 +48,7 @@ fun main(args: Array) { println("actual count rate: ${point.events.count().toDouble() / point.length.seconds}") - TimeAnalyzerAction().simpleRun(point,meta) + TimeAnalyzerAction().simpleRun(point, meta) // val res = SmartAnalyzer().analyze(point, meta) // .getDouble(NumassAnalyzer.COUNT_RATE_KEY) diff --git a/numass-main/src/main/kotlin/inr/numass/scripts/timeanalysis/TestAnalyzer.kt b/numass-main/src/main/kotlin/inr/numass/scripts/timeanalysis/TestAnalyzer.kt index 0c9d75b2..308f6659 100644 --- a/numass-main/src/main/kotlin/inr/numass/scripts/timeanalysis/TestAnalyzer.kt +++ b/numass-main/src/main/kotlin/inr/numass/scripts/timeanalysis/TestAnalyzer.kt @@ -8,6 +8,9 @@ import inr.numass.actions.TimeAnalyzerAction import inr.numass.data.api.OrphanNumassEvent import inr.numass.data.api.SimpleNumassPoint import inr.numass.data.generateBlock +import kotlinx.coroutines.experimental.channels.produce +import kotlinx.coroutines.experimental.channels.toList +import kotlinx.coroutines.experimental.runBlocking import org.apache.commons.math3.random.JDKRandomGenerator import org.apache.commons.math3.random.RandomGenerator import java.time.Instant @@ -18,7 +21,7 @@ fun main(args: Array) { val cr = 30e3 val length = 30e9.toLong() - val num = 20 + val num = 4 val dt = 6.5 val rnd = JDKRandomGenerator() @@ -32,16 +35,26 @@ fun main(args: Array) { } - val chain = MarkovChain(OrphanNumassEvent(1000, 0)) { event -> - //val deltaT = rnd.nextDeltaTime(cr * exp(- event.timeOffset / 1e11)) - val deltaT = rnd.nextDeltaTime(cr) - OrphanNumassEvent(1000, event.timeOffset + deltaT) + val start = Instant.now() + + //TODO make parallel + val blockChannel = produce { + (1..num).forEach { + send( + MarkovChain(OrphanNumassEvent(1000, 0)) { event -> + //val deltaT = rnd.nextDeltaTime(cr * exp(- event.timeOffset / 1e11)) + val deltaT = rnd.nextDeltaTime(cr) + OrphanNumassEvent(1000, event.timeOffset + deltaT) + }.generateBlock(start.plusNanos(it * length), length) + ) + } } - val blocks = (1..num).map { - generateBlock(Instant.now().plusNanos(it * length), length, chain) + val blocks = runBlocking { + blockChannel.toList() } + val point = SimpleNumassPoint(blocks, 12000.0) val meta = buildMeta { diff --git a/numass-main/src/main/kotlin/inr/numass/scripts/tristan/AnalyzeTristan.kt b/numass-main/src/main/kotlin/inr/numass/scripts/tristan/AnalyzeTristan.kt index bdb22354..6fc1364c 100644 --- a/numass-main/src/main/kotlin/inr/numass/scripts/tristan/AnalyzeTristan.kt +++ b/numass-main/src/main/kotlin/inr/numass/scripts/tristan/AnalyzeTristan.kt @@ -4,6 +4,7 @@ import inr.numass.data.channel import inr.numass.data.plotAmplitudeSpectrum import inr.numass.data.storage.ProtoNumassPoint import inr.numass.data.transformChain +import kotlinx.coroutines.experimental.runBlocking import java.io.File fun main(args: Array) { @@ -26,36 +27,39 @@ fun main(args: Array) { println("Number of events for pixel 4 is ${it.events.count()}") } - listOf(0, 20, 50, 100, 200).forEach { window -> + runBlocking { + listOf(0, 20, 50, 100, 200).forEach { window -> - point.transformChain { first, second -> - val dt = second.timeOffset - first.timeOffset - if (second.channel == 4 && first.channel == 0 && dt > window && dt < 1000) { - Pair((first.amplitude + second.amplitude).toShort(), second.timeOffset) - } else { - null + point.transformChain { first, second -> + val dt = second.timeOffset - first.timeOffset + if (second.channel == 4 && first.channel == 0 && dt > window && dt < 1000) { + Pair((first.amplitude + second.amplitude).toShort(), second.timeOffset) + } else { + null + } + }.also { + println("Number of events for $window is ${it.events.count()}") + }.plotAmplitudeSpectrum(plotName = "filtered.before.$window") { + "binning" to 50 } - }.also { - println("Number of events for $window is ${it.events.count()}") - }.plotAmplitudeSpectrum(plotName = "filtered.before.$window") { - "binning" to 50 + } - } + listOf(0, 20, 50, 100, 200).forEach { window -> - listOf(0, 20, 50, 100, 200).forEach { window -> - - point.transformChain { first, second -> - val dt = second.timeOffset - first.timeOffset - if (second.channel == 0 && first.channel == 4 && dt > window && dt < 1000) { - Pair((first.amplitude + second.amplitude).toShort(), second.timeOffset) - } else { - null + point.transformChain { first, second -> + val dt = second.timeOffset - first.timeOffset + if (second.channel == 0 && first.channel == 4 && dt > window && dt < 1000) { + Pair((first.amplitude + second.amplitude).toShort(), second.timeOffset) + } else { + null + } + }.also { + println("Number of events for $window is ${it.events.count()}") + }.plotAmplitudeSpectrum(plotName = "filtered.after.$window") { + "binning" to 50 } - }.also { - println("Number of events for $window is ${it.events.count()}") - }.plotAmplitudeSpectrum(plotName = "filtered.after.$window") { - "binning" to 50 + } } diff --git a/numass-main/src/main/kotlin/inr/numass/tasks/NumassFitScanSummaryTask.kt b/numass-main/src/main/kotlin/inr/numass/tasks/NumassFitScanSummaryTask.kt index 7eda317e..98e8d881 100644 --- a/numass-main/src/main/kotlin/inr/numass/tasks/NumassFitScanSummaryTask.kt +++ b/numass-main/src/main/kotlin/inr/numass/tasks/NumassFitScanSummaryTask.kt @@ -68,7 +68,7 @@ object NumassFitScanSummaryTask : AbstractTask() { pars.getValue("trap")) } val res = TableTransform.sort(builder.build(), "m", true) - context.io.output(nodeName, stage = name).render(NumassUtils.wrap(res, meta)) + context.output.get(nodeName, stage = name).render(NumassUtils.wrap(res, meta)) return res } diff --git a/numass-main/src/main/kotlin/inr/numass/tasks/NumassTasks.kt b/numass-main/src/main/kotlin/inr/numass/tasks/NumassTasks.kt index 528074b9..f4d03762 100644 --- a/numass-main/src/main/kotlin/inr/numass/tasks/NumassTasks.kt +++ b/numass-main/src/main/kotlin/inr/numass/tasks/NumassTasks.kt @@ -60,7 +60,7 @@ val analyzeTask = task("analyze") { pipe { set -> SmartAnalyzer().analyzeSet(set, meta).also { res -> val outputMeta = meta.builder.putNode("data", set.meta) - context.io.output(name, stage = "numass.analyze").render(NumassUtils.wrap(res, outputMeta)) + context.output.get(name, stage = "numass.analyze").render(NumassUtils.wrap(res, outputMeta)) } } } @@ -105,13 +105,13 @@ val monitorTableTask = task("monitor") { //add set markers addSetMarkers(frame, data.values) } - context.io.output(name, stage = "numass.monitor", type = "dfp").render(PlotFrame.Wrapper().wrap(frame)) + context.output.get(name, stage = "numass.monitor", type = "dfp").render(PlotFrame.Wrapper().wrap(frame)) } } } - context.io.output(name, stage = "numass.monitor").render(NumassUtils.wrap(res, meta)) + context.output.get(name, stage = "numass.monitor").render(NumassUtils.wrap(res, meta)) return@join res; } @@ -170,7 +170,7 @@ val subtractEmptyTask = task("dif") { res.goal.onComplete { r, _ -> if (r != null) { - context.io.output(input.name + "_subtract", stage = "numass.merge").render(NumassUtils.wrap(r, resMeta)) + context.output.get(input.name + "_subtract", stage = "numass.merge").render(NumassUtils.wrap(r, resMeta)) } } @@ -221,7 +221,7 @@ val fitTask = task("fit") { configure(meta.getMeta("fit")) } pipe { data -> - context.io.stream(name, "numass.fit").use { out -> + context.output.stream(name, "numass.fit").use { out -> val writer = PrintWriter(out) writer.printf("%n*** META ***%n") writer.println(meta.toString()) diff --git a/numass-server/src/main/java/inr/numass/server/ServerRunner.java b/numass-server/src/main/java/inr/numass/server/ServerRunner.java index 19a88a97..ae9acb4e 100644 --- a/numass-server/src/main/java/inr/numass/server/ServerRunner.java +++ b/numass-server/src/main/java/inr/numass/server/ServerRunner.java @@ -29,7 +29,7 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable { public ServerRunner() throws IOException, ParseException { // Global.instance().getPluginManager().load(StorageManager.class); - Path configFile = context.getIo().getFile(SERVER_CONFIG_PATH).getPath(); + Path configFile = context.getOutput().getFile(SERVER_CONFIG_PATH).getPath(); if (Files.exists(configFile)) { context.getLogger().info("Trying to read server configuration from {}", SERVER_CONFIG_PATH); configure(MetaFileReader.Companion.read(configFile));