Updates in FX output

This commit is contained in:
Alexander Nozik 2018-05-13 18:25:51 +03:00
parent 122c71c51d
commit c57969b392

View File

@ -34,6 +34,7 @@ import java.util.stream.Stream
import kotlin.streams.asSequence
import kotlin.streams.asStream
/**
* An analyzer which uses time information from events
* Created by darksnake on 11.07.2017.
@ -51,11 +52,29 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces
val upChannel = config.getInt("window.up", Integer.MAX_VALUE)
val t0 = getT0(block, config).toLong()
val chunkSize = config.getInt("chunkSize", 1000)
val res = getEventsWithDelay(block, config).asSequence().chunked(chunkSize) {
analyzeSequence(it.asSequence(), t0)
}.reduce(this::combineBlockResults) ?: ValueMap.ofPairs(
NumassAnalyzer.LENGTH_KEY to 0,
NumassAnalyzer.COUNT_KEY to 0,
NumassAnalyzer.COUNT_RATE_KEY to 0,
NumassAnalyzer.COUNT_RATE_ERROR_KEY to 0
)
return ValueMap.Builder(res)
.putValue(NumassAnalyzer.WINDOW_KEY, arrayOf(loChannel, upChannel))
.putValue(NumassAnalyzer.TIME_KEY, block.startTime)
.putValue(T0_KEY, t0.toDouble() / 1000.0)
.build()
}
private fun analyzeSequence(sequence: Sequence<Pair<NumassEvent, Long>>, t0: Long): Values {
val totalN = AtomicLong(0)
val totalT = AtomicLong(0)
getEventsWithDelay(block, config)
.filter { pair -> pair.second >= t0 }
sequence.filter { pair -> pair.second >= t0 }
.forEach { pair ->
totalN.incrementAndGet()
//TODO add progress listener here
@ -67,15 +86,21 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces
val length = totalT.get() / 1e9
val count = (length * countRate).toLong()
return ValueMap.of(NAME_LIST,
length,
count,
countRate,
countRateError,
arrayOf(loChannel, upChannel),
block.startTime,
t0.toDouble() / 1000.0
return ValueMap.ofPairs(
NumassAnalyzer.LENGTH_KEY to length,
NumassAnalyzer.COUNT_KEY to count,
NumassAnalyzer.COUNT_RATE_KEY to countRate,
NumassAnalyzer.COUNT_RATE_ERROR_KEY to countRateError
)
// ValueMap.of(NAME_LIST,
// length,
// count,
// countRate,
// countRateError,
// arrayOf(loChannel, upChannel),
// block.startTime,
// t0.toDouble() / 1000.0
// )
}
override fun analyzeParent(point: ParentBlock, config: Meta): Values {
@ -120,15 +145,12 @@ class TimeAnalyzer @JvmOverloads constructor(private val processor: SignalProces
val countRateErr = Math.sqrt(Math.pow(t1 * err1 / (t1 + t2), 2.0) + Math.pow(t2 * err2 / (t1 + t2), 2.0))
return ValueMap.of(NAME_LIST,
v1.getDouble(NumassAnalyzer.LENGTH_KEY) + v2.getDouble(NumassAnalyzer.LENGTH_KEY),
v1.getInt(NumassAnalyzer.COUNT_KEY) + v2.getInt(NumassAnalyzer.COUNT_KEY),
countRate,
countRateErr,
v1.getValue(NumassAnalyzer.WINDOW_KEY),
v1.getValue(NumassAnalyzer.TIME_KEY),
v1.getDouble(T0_KEY)
)
return ValueMap.Builder(v1)
.putValue(NumassAnalyzer.LENGTH_KEY, t1 + t2)
.putValue(NumassAnalyzer.COUNT_KEY, v1.getInt(NumassAnalyzer.COUNT_KEY) + v2.getInt(NumassAnalyzer.COUNT_KEY))
.putValue(NumassAnalyzer.COUNT_RATE_KEY, countRate)
.putValue(NumassAnalyzer.COUNT_RATE_ERROR_KEY, countRateErr)
.build()
}
@ValueDefs(