diff --git a/numass-core/src/main/kotlin/inr/numass/data/analyzers/TimeAnalyzer.kt b/numass-core/src/main/kotlin/inr/numass/data/analyzers/TimeAnalyzer.kt index faab3a07..aa3d37c0 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/analyzers/TimeAnalyzer.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/analyzers/TimeAnalyzer.kt @@ -34,6 +34,7 @@ import java.util.stream.Stream import kotlin.streams.asSequence import kotlin.streams.asStream + /** * An analyzer which uses time information from events * Created by darksnake on 11.07.2017. @@ -51,11 +52,29 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces val upChannel = config.getInt("window.up", Integer.MAX_VALUE) val t0 = getT0(block, config).toLong() + val chunkSize = config.getInt("chunkSize", 1000) + + val res = getEventsWithDelay(block, config).asSequence().chunked(chunkSize) { + analyzeSequence(it.asSequence(), t0) + }.reduce(this::combineBlockResults) ?: ValueMap.ofPairs( + NumassAnalyzer.LENGTH_KEY to 0, + NumassAnalyzer.COUNT_KEY to 0, + NumassAnalyzer.COUNT_RATE_KEY to 0, + NumassAnalyzer.COUNT_RATE_ERROR_KEY to 0 + ) + + return ValueMap.Builder(res) + .putValue(NumassAnalyzer.WINDOW_KEY, arrayOf(loChannel, upChannel)) + .putValue(NumassAnalyzer.TIME_KEY, block.startTime) + .putValue(T0_KEY, t0.toDouble() / 1000.0) + .build() + } + + + private fun analyzeSequence(sequence: Sequence>, t0: Long): Values { val totalN = AtomicLong(0) val totalT = AtomicLong(0) - - getEventsWithDelay(block, config) - .filter { pair -> pair.second >= t0 } + sequence.filter { pair -> pair.second >= t0 } .forEach { pair -> totalN.incrementAndGet() //TODO add progress listener here @@ -67,15 +86,21 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces val length = totalT.get() / 1e9 val count = (length * countRate).toLong() - return ValueMap.of(NAME_LIST, - length, - count, - countRate, - countRateError, - arrayOf(loChannel, upChannel), - block.startTime, - t0.toDouble() / 1000.0 + return ValueMap.ofPairs( + NumassAnalyzer.LENGTH_KEY to length, + NumassAnalyzer.COUNT_KEY to count, + NumassAnalyzer.COUNT_RATE_KEY to countRate, + NumassAnalyzer.COUNT_RATE_ERROR_KEY to countRateError ) +// ValueMap.of(NAME_LIST, +// length, +// count, +// countRate, +// countRateError, +// arrayOf(loChannel, upChannel), +// block.startTime, +// t0.toDouble() / 1000.0 +// ) } override fun analyzeParent(point: ParentBlock, config: Meta): Values { @@ -86,7 +111,7 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces .reduce(null) { v1, v2 -> this.combineBlockResults(v1, v2) } val map = HashMap(res.asMap()) - if(point is NumassPoint) { + if (point is NumassPoint) { map[HV_KEY] = Value.of(point.voltage) } return ValueMap(map) @@ -120,15 +145,12 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces val countRateErr = Math.sqrt(Math.pow(t1 * err1 / (t1 + t2), 2.0) + Math.pow(t2 * err2 / (t1 + t2), 2.0)) - return ValueMap.of(NAME_LIST, - v1.getDouble(NumassAnalyzer.LENGTH_KEY) + v2.getDouble(NumassAnalyzer.LENGTH_KEY), - v1.getInt(NumassAnalyzer.COUNT_KEY) + v2.getInt(NumassAnalyzer.COUNT_KEY), - countRate, - countRateErr, - v1.getValue(NumassAnalyzer.WINDOW_KEY), - v1.getValue(NumassAnalyzer.TIME_KEY), - v1.getDouble(T0_KEY) - ) + return ValueMap.Builder(v1) + .putValue(NumassAnalyzer.LENGTH_KEY, t1 + t2) + .putValue(NumassAnalyzer.COUNT_KEY, v1.getInt(NumassAnalyzer.COUNT_KEY) + v2.getInt(NumassAnalyzer.COUNT_KEY)) + .putValue(NumassAnalyzer.COUNT_RATE_KEY, countRate) + .putValue(NumassAnalyzer.COUNT_RATE_ERROR_KEY, countRateErr) + .build() } @ValueDefs(