Separated Input and Output management. Input moved to context
This commit is contained in:
parent
caf875da84
commit
11f130d8f5
@ -57,7 +57,7 @@ fun connectStorage(device: Device, config: Meta) {
|
||||
}
|
||||
|
||||
fun readResourceMeta(path: String): Meta {
|
||||
val resource = Global.io.optResource(path).nullable
|
||||
val resource = Global.output.optResource(path).nullable
|
||||
if (resource != null) {
|
||||
return XMLMetaReader().read(resource.stream)
|
||||
} else {
|
||||
|
@ -86,34 +86,34 @@ val NumassBlock.channel: Int
|
||||
}
|
||||
|
||||
|
||||
fun NumassBlock.transformChain(transform: (NumassEvent, NumassEvent) -> Pair<Short, Long>?): NumassBlock {
|
||||
return SimpleBlock(this.startTime, this.length) { owner ->
|
||||
suspend fun NumassBlock.transformChain(transform: (NumassEvent, NumassEvent) -> Pair<Short, Long>?): NumassBlock {
|
||||
return SimpleBlock.produce(this.startTime, this.length) {
|
||||
this.events.asSequence()
|
||||
.sortedBy { it.timeOffset }
|
||||
.zipWithNext(transform)
|
||||
.filterNotNull()
|
||||
.map { NumassEvent(it.first, it.second, owner) }.asIterable()
|
||||
.map { OrphanNumassEvent(it.first, it.second) }.asIterable()
|
||||
}
|
||||
}
|
||||
|
||||
fun NumassBlock.filterChain(condition: (NumassEvent, NumassEvent) -> Boolean): NumassBlock {
|
||||
return SimpleBlock(this.startTime, this.length) { owner ->
|
||||
suspend fun NumassBlock.filterChain(condition: (NumassEvent, NumassEvent) -> Boolean): NumassBlock {
|
||||
return SimpleBlock.produce(this.startTime, this.length) {
|
||||
this.events.asSequence()
|
||||
.sortedBy { it.timeOffset }
|
||||
.zipWithNext().filter { condition.invoke(it.first, it.second) }.map { it.second }.asIterable()
|
||||
}
|
||||
}
|
||||
|
||||
fun NumassBlock.filter(condition: (NumassEvent) -> Boolean): NumassBlock {
|
||||
return SimpleBlock(this.startTime, this.length) { owner ->
|
||||
suspend fun NumassBlock.filter(condition: (NumassEvent) -> Boolean): NumassBlock {
|
||||
return SimpleBlock.produce(this.startTime, this.length) {
|
||||
this.events.asSequence().filter(condition).asIterable()
|
||||
}
|
||||
}
|
||||
|
||||
fun NumassBlock.transform(transform: (NumassEvent) -> OrphanNumassEvent): NumassBlock {
|
||||
return SimpleBlock(this.startTime, this.length) { owner ->
|
||||
suspend fun NumassBlock.transform(transform: (NumassEvent) -> OrphanNumassEvent): NumassBlock {
|
||||
return SimpleBlock.produce(this.startTime, this.length) {
|
||||
this.events.asSequence()
|
||||
.map { transform(it).adopt(owner) }
|
||||
.map { transform(it) }
|
||||
.asIterable()
|
||||
}
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
package inr.numass.data.api
|
||||
|
||||
import inr.numass.data.channel
|
||||
import kotlinx.coroutines.experimental.runBlocking
|
||||
import java.io.Serializable
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
@ -76,13 +75,20 @@ fun OrphanNumassEvent.adopt(parent: NumassBlock): NumassEvent {
|
||||
class SimpleBlock(
|
||||
override val startTime: Instant,
|
||||
override val length: Duration,
|
||||
producer: suspend (NumassBlock) -> Iterable<NumassEvent>) : NumassBlock, Serializable {
|
||||
rawEvents: Iterable<OrphanNumassEvent>
|
||||
) : NumassBlock, Serializable {
|
||||
|
||||
private val eventList = runBlocking { producer(this@SimpleBlock).toList() }
|
||||
private val eventList by lazy { rawEvents.map { it.adopt(this) } }
|
||||
|
||||
override val frames: Stream<NumassFrame> get() = Stream.empty()
|
||||
|
||||
override val events: Stream<NumassEvent>
|
||||
get() = eventList.stream()
|
||||
|
||||
companion object {
|
||||
suspend fun produce(startTime: Instant, length: Duration, producer: suspend () -> Iterable<OrphanNumassEvent>): SimpleBlock {
|
||||
return SimpleBlock(startTime, length, producer())
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -8,7 +8,7 @@ import hep.dataforge.meta.MetaHolder
|
||||
* A simple static implementation of NumassPoint
|
||||
* Created by darksnake on 08.07.2017.
|
||||
*/
|
||||
class SimpleNumassPoint(override val blocks: List<NumassBlock>, meta: Meta) : MetaHolder(meta), NumassPoint {
|
||||
class SimpleNumassPoint(override val blocks: List<NumassBlock>, meta: Meta, override val isSequential: Boolean = true) : MetaHolder(meta), NumassPoint {
|
||||
|
||||
/**
|
||||
* Input blocks must be sorted
|
||||
|
@ -162,9 +162,7 @@ constructor(override val name: String, private val path: Path, meta: Meta) : Num
|
||||
voltage / 10.0
|
||||
}
|
||||
|
||||
val block = SimpleBlock(absoluteTime.toInstant(ZoneOffset.UTC), Duration.ofSeconds(length.toLong())) { parent ->
|
||||
events.map { it.adopt(parent) }
|
||||
}
|
||||
val block = SimpleBlock(absoluteTime.toInstant(ZoneOffset.UTC), Duration.ofSeconds(length.toLong()), events)
|
||||
|
||||
val pointMeta = MetaBuilder("point")
|
||||
.setValue(HV_KEY, uSet)
|
||||
|
@ -51,7 +51,7 @@ class NumassStorageFactory : StorageType {
|
||||
return NumassStorage(context, meta, path)
|
||||
} else {
|
||||
context.logger.warn("A storage path not provided. Creating default root storage in the working directory")
|
||||
return NumassStorage(context, meta, context.io.workDir)
|
||||
return NumassStorage(context, meta, context.workDir)
|
||||
}
|
||||
}
|
||||
|
||||
@ -69,7 +69,7 @@ class NumassStorageFactory : StorageType {
|
||||
}
|
||||
|
||||
fun buildLocal(context: Context, path: String, readOnly: Boolean, monitor: Boolean): FileStorage {
|
||||
val file = context.io.dataDir.resolve(path)
|
||||
val file = context.dataDir.resolve(path)
|
||||
return buildLocal(context, file, readOnly, monitor)
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ class ProtoNumassPoint(override val meta: Meta, val protoBuilder: () -> NumassPr
|
||||
}
|
||||
|
||||
fun readFile(path: String, context: Context = Global): ProtoNumassPoint {
|
||||
return readFile(context.io.getFile(path).absolutePath)
|
||||
return readFile(context.getFile(path).absolutePath)
|
||||
}
|
||||
|
||||
fun ofEpochNanos(nanos: Long): Instant {
|
||||
|
@ -97,6 +97,6 @@ new GrindShell(ctx).eval {
|
||||
def res = fm.runStage(state, "MINUIT", FitStage.TASK_RUN, "N", "bkg", "E0", "U2");
|
||||
|
||||
|
||||
res.printState(ctx.io.out().newPrintWriter());
|
||||
res.printState(ctx.getOutput.out().newPrintWriter());
|
||||
NumassIOKt.display(res, ctx, "fit")
|
||||
}
|
@ -2,7 +2,7 @@ package inr.numass.scripts.workspace
|
||||
|
||||
import hep.dataforge.actions.ActionUtils
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.io.IOManager
|
||||
import hep.dataforge.io.OutputManager
|
||||
import inr.numass.NumassPlugin
|
||||
|
||||
/**
|
||||
@ -12,5 +12,5 @@ import inr.numass.NumassPlugin
|
||||
|
||||
Context context = new Context("numass");
|
||||
context.loadPlugin(new NumassPlugin());
|
||||
context.setValue(IOManager.ROOT_DIRECTORY_CONTEXT_KEY, "D:\\Work\\Numass\\sterile2016");
|
||||
context.setValue(OutputManager.ROOT_DIRECTORY_CONTEXT_KEY, "D:\\Work\\Numass\\sterile2016");
|
||||
ActionUtils.runConfig(context, "test.xml").computeAll()
|
@ -18,7 +18,6 @@ package inr.numass;
|
||||
import hep.dataforge.actions.ActionUtils;
|
||||
import hep.dataforge.context.Context;
|
||||
import hep.dataforge.context.Global;
|
||||
import hep.dataforge.io.IOManager;
|
||||
import hep.dataforge.io.MetaFileReader;
|
||||
import hep.dataforge.meta.Meta;
|
||||
import org.apache.commons.cli.*;
|
||||
@ -86,7 +85,7 @@ public class Main {
|
||||
return;
|
||||
}
|
||||
|
||||
java.nio.file.Path configFile = context.getIo().getRootDir().resolve(cfgPath);
|
||||
java.nio.file.Path configFile = context.getRootDir().resolve(cfgPath);
|
||||
|
||||
if (!Files.exists(configFile)) {
|
||||
throw new FileNotFoundException("Configuration file not found");
|
||||
@ -94,7 +93,7 @@ public class Main {
|
||||
|
||||
Meta config = MetaFileReader.Companion.read(configFile);
|
||||
|
||||
context.setValue(IOManager.ROOT_DIRECTORY_CONTEXT_KEY, configFile.getParent().toString());
|
||||
context.setValue(Context.ROOT_DIRECTORY_CONTEXT_KEY, configFile.getParent().toString());
|
||||
|
||||
applyCLItoContext(line, context);
|
||||
|
||||
@ -103,11 +102,11 @@ public class Main {
|
||||
}
|
||||
|
||||
public static void applyCLItoContext(CommandLine line, Context context) throws FileNotFoundException {
|
||||
File workDir = new File(context.getString(IOManager.ROOT_DIRECTORY_CONTEXT_KEY));
|
||||
File workDir = new File(context.getString(Context.ROOT_DIRECTORY_CONTEXT_KEY));
|
||||
|
||||
if (line.hasOption("h")) {
|
||||
workDir = new File(line.getOptionValue("h"));
|
||||
context.setValue(IOManager.ROOT_DIRECTORY_CONTEXT_KEY, workDir.toString());
|
||||
context.setValue(Context.ROOT_DIRECTORY_CONTEXT_KEY, workDir.toString());
|
||||
}
|
||||
|
||||
if (line.hasOption("d")) {
|
||||
@ -117,7 +116,7 @@ public class Main {
|
||||
dataDir = new File(workDir, dataPath);
|
||||
}
|
||||
if (dataDir.exists() && dataDir.isDirectory()) {
|
||||
context.setValue(IOManager.DATA_DIRECTORY_CONTEXT_KEY, dataDir.getAbsolutePath());
|
||||
context.setValue(Context.DATA_DIRECTORY_CONTEXT_KEY, dataDir.getAbsolutePath());
|
||||
} else {
|
||||
throw new FileNotFoundException("Data directory not found");
|
||||
}
|
||||
@ -132,7 +131,7 @@ public class Main {
|
||||
if (!outDir.exists()) {
|
||||
outDir.mkdirs();
|
||||
}
|
||||
context.setValue(IOManager.WORK_DIRECTORY_CONTEXT_KEY, outDir.toString());
|
||||
context.setValue(Context.WORK_DIRECTORY_CONTEXT_KEY, outDir.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,7 +129,7 @@ public class MonitorCorrectAction extends OneToOneAction<Table, Table> {
|
||||
// }
|
||||
Table res = ListTable.infer(dataList);
|
||||
|
||||
context.getIo().output(name, getName()).render(NumassUtils.INSTANCE.wrap(res, meta), Meta.empty());
|
||||
context.getOutput().get(name, getName()).render(NumassUtils.INSTANCE.wrap(res, meta), Meta.empty());
|
||||
|
||||
return res;
|
||||
}
|
||||
@ -194,7 +194,7 @@ public class MonitorCorrectAction extends OneToOneAction<Table, Table> {
|
||||
String monitorFileName = meta.getString("monitorFile", "monitor");
|
||||
ListTable data = ListTable.infer(monitorPoints);
|
||||
|
||||
context.getIo().output(monitorFileName, getName()).render(NumassUtils.INSTANCE.wrap(data, meta), Meta.empty());
|
||||
context.getOutput().get(monitorFileName, getName()).render(NumassUtils.INSTANCE.wrap(data, meta), Meta.empty());
|
||||
// ColumnedDataWriter.writeTable(stream, TableTransform.sort(data, "Timestamp", true), "Monitor points", monitorNames);
|
||||
}
|
||||
}
|
||||
|
@ -31,7 +31,7 @@ public class SubstractSpectrumAction extends OneToOneAction<Table, Table> {
|
||||
protected Table execute(Context context, String name, Table input, Laminate inputMeta) {
|
||||
try {
|
||||
String referencePath = inputMeta. getString("file", "empty.dat");
|
||||
Path referenceFile = context.getIo().getRootDir().resolve(referencePath);
|
||||
Path referenceFile = context.getRootDir().resolve(referencePath);
|
||||
Table referenceTable = new ColumnedDataReader(referenceFile).toTable();
|
||||
ListTable.Builder builder = new ListTable.Builder(input.getFormat());
|
||||
input.getRows().forEach(point -> {
|
||||
@ -49,7 +49,7 @@ public class SubstractSpectrumAction extends OneToOneAction<Table, Table> {
|
||||
|
||||
Table res = builder.build();
|
||||
|
||||
context.getIo().output(name, getName()).render(NumassUtils.INSTANCE.wrap(res, inputMeta), Meta.empty());
|
||||
context.getOutput().get(name, getName()).render(NumassUtils.INSTANCE.wrap(res, inputMeta), Meta.empty());
|
||||
return res;
|
||||
} catch (IOException ex) {
|
||||
throw new RuntimeException("Could not read reference file", ex);
|
||||
|
@ -40,7 +40,7 @@ public class TransmissionInterpolator implements UnivariateFunction {
|
||||
|
||||
public static TransmissionInterpolator fromFile(Context context, String path, String xName, String yName, int nSmooth, double w, double border) {
|
||||
try {
|
||||
Path dataFile = context.getIo().getRootDir().resolve(path);
|
||||
Path dataFile = context.getRootDir().resolve(path);
|
||||
ColumnedDataReader reader = new ColumnedDataReader(Files.newInputStream(dataFile));
|
||||
return new TransmissionInterpolator(reader, xName, yName, nSmooth, w, border);
|
||||
} catch (IOException ex) {
|
||||
|
@ -40,7 +40,7 @@ public class OldDataReader {
|
||||
public static Table readConfig(String path) throws IOException {
|
||||
String[] list = {"X", "time", "ushift"};
|
||||
ListTable.Builder res = new ListTable.Builder(list);
|
||||
Path file = Global.INSTANCE.getIo().getRootDir().resolve(path);
|
||||
Path file = Global.INSTANCE.getRootDir().resolve(path);
|
||||
Scanner sc = new Scanner(file);
|
||||
sc.nextLine();
|
||||
|
||||
@ -62,7 +62,7 @@ public class OldDataReader {
|
||||
public static Table readData(String path, double Elow) {
|
||||
SpectrumAdapter factory = new SpectrumAdapter(Meta.empty());
|
||||
ListTable.Builder res = new ListTable.Builder(Adapters.getFormat(factory));
|
||||
Path file = Global.INSTANCE.getIo().getRootDir().resolve(path);
|
||||
Path file = Global.INSTANCE.getRootDir().resolve(path);
|
||||
double x;
|
||||
int count;
|
||||
int time;
|
||||
@ -114,7 +114,7 @@ public class OldDataReader {
|
||||
public static Table readDataAsGun(String path, double Elow) {
|
||||
SpectrumAdapter factory = new SpectrumAdapter(Meta.empty());
|
||||
ListTable.Builder res = new ListTable.Builder(Adapters.getFormat(factory));
|
||||
Path file = Global.INSTANCE.getIo().getRootDir().resolve(path);
|
||||
Path file = Global.INSTANCE.getRootDir().resolve(path);
|
||||
double x;
|
||||
long count;
|
||||
int time;
|
||||
@ -147,7 +147,7 @@ public class OldDataReader {
|
||||
public static Table readSpectrumData(String path) {
|
||||
SpectrumAdapter factory = new SpectrumAdapter(Meta.empty());
|
||||
ListTable.Builder res = new ListTable.Builder(Adapters.getFormat(factory));
|
||||
Path file = Global.INSTANCE.getIo().getRootDir().resolve(path);
|
||||
Path file = Global.INSTANCE.getRootDir().resolve(path);
|
||||
double x;
|
||||
double count;
|
||||
double time;
|
||||
|
@ -43,7 +43,7 @@ import org.apache.commons.math3.util.FastMath
|
||||
@PluginDef(
|
||||
group = "inr.numass",
|
||||
name = "numass",
|
||||
dependsOn = arrayOf("hep.dataforge:functions", "hep.dataforge:MINUIT", "hep.dataforge:actions", "hep.dataforge:io.dir"),
|
||||
dependsOn = arrayOf("hep.dataforge:functions", "hep.dataforge:MINUIT", "hep.dataforge:actions", "hep.dataforge:output.dir"),
|
||||
support = false,
|
||||
info = "Numass data analysis tools"
|
||||
)
|
||||
|
@ -177,8 +177,8 @@ object NumassUtils {
|
||||
fun getFSS(context: Context, meta: Meta): FSS? {
|
||||
return if (meta.getBoolean("useFSS", true)) {
|
||||
val fssBinary: Binary? = meta.optString("fssFile")
|
||||
.map { fssFile -> context.io.getFile(fssFile).binary }
|
||||
.orElse(context.io.optResource("data/FS.txt").nullable)
|
||||
.map { fssFile -> context.getFile(fssFile).binary }
|
||||
.orElse(context.optResource("data/FS.txt").nullable)
|
||||
fssBinary?.let { FSS(it.stream) } ?: throw RuntimeException("Could not load FSS file")
|
||||
} else {
|
||||
null
|
||||
|
@ -60,7 +60,7 @@ class MergeDataAction : ManyToOneAction<Table, Table>() {
|
||||
}
|
||||
|
||||
override fun afterGroup(context: Context, groupName: String, outputMeta: Meta, output: Table) {
|
||||
context.io.output(groupName, name).render(NumassUtils.wrap(output, outputMeta))
|
||||
context.output.get(groupName, name).render(NumassUtils.wrap(output, outputMeta))
|
||||
super.afterGroup(context, groupName, outputMeta, output)
|
||||
}
|
||||
|
||||
|
@ -108,7 +108,7 @@ object SummaryAction : ManyToOneAction<FitState, Table>() {
|
||||
}
|
||||
|
||||
override fun afterGroup(context: Context, groupName: String, outputMeta: Meta, output: Table) {
|
||||
context.io.output(groupName, name).render(NumassUtils.wrap(output, outputMeta))
|
||||
context.output.get(groupName, name).render(NumassUtils.wrap(output, outputMeta))
|
||||
super.afterGroup(context, groupName, outputMeta, output)
|
||||
}
|
||||
|
||||
|
@ -103,7 +103,7 @@ class TransformDataAction : OneToOneAction<Table, Table>() {
|
||||
val res = table.addColumn(ListColumn.build(table.getColumn(COUNT_RATE_KEY).format, cr.stream()))
|
||||
.addColumn(ListColumn.build(table.getColumn(COUNT_RATE_ERROR_KEY).format, crErr.stream()))
|
||||
|
||||
context.io.output(name, name).render(NumassUtils.wrap(res, meta))
|
||||
context.output.get(name, name).render(NumassUtils.wrap(res, meta))
|
||||
return res
|
||||
}
|
||||
|
||||
|
@ -7,8 +7,9 @@ import hep.dataforge.stat.defaultGenerator
|
||||
import hep.dataforge.tables.Table
|
||||
import inr.numass.data.analyzers.NumassAnalyzer.Companion.CHANNEL_KEY
|
||||
import inr.numass.data.analyzers.NumassAnalyzer.Companion.COUNT_RATE_KEY
|
||||
import inr.numass.data.api.*
|
||||
import kotlinx.coroutines.experimental.channels.map
|
||||
import inr.numass.data.api.NumassBlock
|
||||
import inr.numass.data.api.OrphanNumassEvent
|
||||
import inr.numass.data.api.SimpleBlock
|
||||
import kotlinx.coroutines.experimental.channels.takeWhile
|
||||
import kotlinx.coroutines.experimental.channels.toList
|
||||
import org.apache.commons.math3.distribution.EnumeratedRealDistribution
|
||||
@ -24,9 +25,9 @@ private fun RandomGenerator.nextDeltaTime(cr: Double): Long {
|
||||
return (nextExp(1.0 / cr) * 1e9).toLong()
|
||||
}
|
||||
|
||||
fun generateBlock(start: Instant, length: Long, chain: Chain<OrphanNumassEvent>): NumassBlock {
|
||||
return SimpleBlock(start, Duration.ofNanos(length)) { parent ->
|
||||
chain.channel.map { it.adopt(parent) }.takeWhile { it.timeOffset < length }.toList()
|
||||
suspend fun Chain<OrphanNumassEvent>.generateBlock(start: Instant, length: Long): NumassBlock {
|
||||
return SimpleBlock.produce(start, Duration.ofNanos(length)) {
|
||||
channel.takeWhile { it.timeOffset < length }.toList()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,6 @@ import hep.dataforge.maths.chain.Chain
|
||||
import inr.numass.data.api.NumassBlock
|
||||
import inr.numass.data.api.OrphanNumassEvent
|
||||
import inr.numass.data.api.SimpleBlock
|
||||
import inr.numass.data.api.adopt
|
||||
import kotlinx.coroutines.experimental.runBlocking
|
||||
import org.apache.commons.math3.random.RandomGenerator
|
||||
import java.lang.Math.max
|
||||
@ -67,15 +66,15 @@ class PileUpSimulator {
|
||||
// }
|
||||
|
||||
fun generated(): NumassBlock {
|
||||
return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength)) { parent -> generated.map { it.adopt(parent) } }
|
||||
return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength), generated)
|
||||
}
|
||||
|
||||
fun registered(): NumassBlock {
|
||||
return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength)) { parent -> registered.map { it.adopt(parent) } }
|
||||
return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength), registered)
|
||||
}
|
||||
|
||||
fun pileup(): NumassBlock {
|
||||
return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength)) { parent -> pileup.map { it.adopt(parent) } }
|
||||
return SimpleBlock(Instant.EPOCH, Duration.ofNanos(pointLength), pileup)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -126,7 +125,7 @@ class PileUpSimulator {
|
||||
fun generate() {
|
||||
var next: OrphanNumassEvent
|
||||
//var lastRegisteredTime = 0.0 // Time of DAQ closing
|
||||
val last = AtomicReference<OrphanNumassEvent>(OrphanNumassEvent(0,0))
|
||||
val last = AtomicReference<OrphanNumassEvent>(OrphanNumassEvent(0, 0))
|
||||
|
||||
//flag that shows that previous event was pileup
|
||||
var pileupFlag = false
|
||||
|
@ -8,6 +8,9 @@ import inr.numass.data.buildBunchChain
|
||||
import inr.numass.data.generateBlock
|
||||
import inr.numass.data.generateEvents
|
||||
import inr.numass.data.mergeEventChains
|
||||
import kotlinx.coroutines.experimental.channels.produce
|
||||
import kotlinx.coroutines.experimental.channels.toList
|
||||
import kotlinx.coroutines.experimental.runBlocking
|
||||
import java.time.Instant
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
@ -18,13 +21,21 @@ fun main(args: Array<String>) {
|
||||
val length = 1e12.toLong()
|
||||
val num = 60;
|
||||
|
||||
val blocks = (1..num).map {
|
||||
val start = Instant.now()
|
||||
|
||||
val blockchannel = produce {
|
||||
(1..num).forEach {
|
||||
val regularChain = generateEvents(cr)
|
||||
val bunchChain = buildBunchChain(40.0, 0.01, 5.0)
|
||||
|
||||
val generator = mergeEventChains(regularChain, bunchChain)
|
||||
generateBlock(Instant.now().plusNanos(it * length), length, generator)
|
||||
send(mergeEventChains(regularChain, bunchChain).generateBlock(start.plusNanos(it * length), length))
|
||||
}
|
||||
}
|
||||
|
||||
val blocks = runBlocking {
|
||||
blockchannel.toList()
|
||||
}
|
||||
|
||||
|
||||
val point = SimpleNumassPoint(blocks, 10000.0)
|
||||
|
||||
@ -37,7 +48,7 @@ fun main(args: Array<String>) {
|
||||
|
||||
println("actual count rate: ${point.events.count().toDouble() / point.length.seconds}")
|
||||
|
||||
TimeAnalyzerAction().simpleRun(point,meta)
|
||||
TimeAnalyzerAction().simpleRun(point, meta)
|
||||
|
||||
// val res = SmartAnalyzer().analyze(point, meta)
|
||||
// .getDouble(NumassAnalyzer.COUNT_RATE_KEY)
|
||||
|
@ -8,6 +8,9 @@ import inr.numass.actions.TimeAnalyzerAction
|
||||
import inr.numass.data.api.OrphanNumassEvent
|
||||
import inr.numass.data.api.SimpleNumassPoint
|
||||
import inr.numass.data.generateBlock
|
||||
import kotlinx.coroutines.experimental.channels.produce
|
||||
import kotlinx.coroutines.experimental.channels.toList
|
||||
import kotlinx.coroutines.experimental.runBlocking
|
||||
import org.apache.commons.math3.random.JDKRandomGenerator
|
||||
import org.apache.commons.math3.random.RandomGenerator
|
||||
import java.time.Instant
|
||||
@ -18,7 +21,7 @@ fun main(args: Array<String>) {
|
||||
|
||||
val cr = 30e3
|
||||
val length = 30e9.toLong()
|
||||
val num = 20
|
||||
val num = 4
|
||||
val dt = 6.5
|
||||
|
||||
val rnd = JDKRandomGenerator()
|
||||
@ -32,16 +35,26 @@ fun main(args: Array<String>) {
|
||||
}
|
||||
|
||||
|
||||
val chain = MarkovChain(OrphanNumassEvent(1000, 0)) { event ->
|
||||
val start = Instant.now()
|
||||
|
||||
//TODO make parallel
|
||||
val blockChannel = produce {
|
||||
(1..num).forEach {
|
||||
send(
|
||||
MarkovChain(OrphanNumassEvent(1000, 0)) { event ->
|
||||
//val deltaT = rnd.nextDeltaTime(cr * exp(- event.timeOffset / 1e11))
|
||||
val deltaT = rnd.nextDeltaTime(cr)
|
||||
OrphanNumassEvent(1000, event.timeOffset + deltaT)
|
||||
}.generateBlock(start.plusNanos(it * length), length)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
val blocks = (1..num).map {
|
||||
generateBlock(Instant.now().plusNanos(it * length), length, chain)
|
||||
val blocks = runBlocking {
|
||||
blockChannel.toList()
|
||||
}
|
||||
|
||||
|
||||
val point = SimpleNumassPoint(blocks, 12000.0)
|
||||
|
||||
val meta = buildMeta {
|
||||
|
@ -4,6 +4,7 @@ import inr.numass.data.channel
|
||||
import inr.numass.data.plotAmplitudeSpectrum
|
||||
import inr.numass.data.storage.ProtoNumassPoint
|
||||
import inr.numass.data.transformChain
|
||||
import kotlinx.coroutines.experimental.runBlocking
|
||||
import java.io.File
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
@ -26,6 +27,7 @@ fun main(args: Array<String>) {
|
||||
println("Number of events for pixel 4 is ${it.events.count()}")
|
||||
}
|
||||
|
||||
runBlocking {
|
||||
listOf(0, 20, 50, 100, 200).forEach { window ->
|
||||
|
||||
point.transformChain { first, second ->
|
||||
@ -59,4 +61,6 @@ fun main(args: Array<String>) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -68,7 +68,7 @@ object NumassFitScanSummaryTask : AbstractTask<Table>() {
|
||||
pars.getValue("trap"))
|
||||
}
|
||||
val res = TableTransform.sort(builder.build(), "m", true)
|
||||
context.io.output(nodeName, stage = name).render(NumassUtils.wrap(res, meta))
|
||||
context.output.get(nodeName, stage = name).render(NumassUtils.wrap(res, meta))
|
||||
return res
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ val analyzeTask = task("analyze") {
|
||||
pipe<NumassSet, Table> { set ->
|
||||
SmartAnalyzer().analyzeSet(set, meta).also { res ->
|
||||
val outputMeta = meta.builder.putNode("data", set.meta)
|
||||
context.io.output(name, stage = "numass.analyze").render(NumassUtils.wrap(res, outputMeta))
|
||||
context.output.get(name, stage = "numass.analyze").render(NumassUtils.wrap(res, outputMeta))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -105,13 +105,13 @@ val monitorTableTask = task("monitor") {
|
||||
//add set markers
|
||||
addSetMarkers(frame, data.values)
|
||||
}
|
||||
context.io.output(name, stage = "numass.monitor", type = "dfp").render(PlotFrame.Wrapper().wrap(frame))
|
||||
context.output.get(name, stage = "numass.monitor", type = "dfp").render(PlotFrame.Wrapper().wrap(frame))
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
context.io.output(name, stage = "numass.monitor").render(NumassUtils.wrap(res, meta))
|
||||
context.output.get(name, stage = "numass.monitor").render(NumassUtils.wrap(res, meta))
|
||||
|
||||
return@join res;
|
||||
}
|
||||
@ -170,7 +170,7 @@ val subtractEmptyTask = task("dif") {
|
||||
|
||||
res.goal.onComplete { r, _ ->
|
||||
if (r != null) {
|
||||
context.io.output(input.name + "_subtract", stage = "numass.merge").render(NumassUtils.wrap(r, resMeta))
|
||||
context.output.get(input.name + "_subtract", stage = "numass.merge").render(NumassUtils.wrap(r, resMeta))
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ val fitTask = task("fit") {
|
||||
configure(meta.getMeta("fit"))
|
||||
}
|
||||
pipe<Table, FitResult> { data ->
|
||||
context.io.stream(name, "numass.fit").use { out ->
|
||||
context.output.stream(name, "numass.fit").use { out ->
|
||||
val writer = PrintWriter(out)
|
||||
writer.printf("%n*** META ***%n")
|
||||
writer.println(meta.toString())
|
||||
|
@ -29,7 +29,7 @@ public class ServerRunner extends SimpleConfigurable implements AutoCloseable {
|
||||
public ServerRunner() throws IOException, ParseException {
|
||||
// Global.instance().getPluginManager().load(StorageManager.class);
|
||||
|
||||
Path configFile = context.getIo().getFile(SERVER_CONFIG_PATH).getPath();
|
||||
Path configFile = context.getOutput().getFile(SERVER_CONFIG_PATH).getPath();
|
||||
if (Files.exists(configFile)) {
|
||||
context.getLogger().info("Trying to read server configuration from {}", SERVER_CONFIG_PATH);
|
||||
configure(MetaFileReader.Companion.read(configFile));
|
||||
|
Loading…
Reference in New Issue
Block a user