Refactor Numass API

This commit is contained in:
Alexander Nozik 2021-02-01 21:48:03 +03:00
parent 792670c301
commit 1df14dd2fa
14 changed files with 139 additions and 120 deletions

View File

@ -11,7 +11,7 @@ allprojects {
version = "0.1.0-SNAPSHOT"
}
val dataforgeVersion by extra("0.3.0-dev-2")
val dataforgeVersion by extra("0.3.0-dev-3")
apiValidation{
validationDisabled = true

View File

@ -13,6 +13,7 @@ kotlin.sourceSets {
commonMain {
dependencies {
api("hep.dataforge:dataforge-context:$dataforgeVersion")
api("hep.dataforge:dataforge-data:$dataforgeVersion")
api("org.jetbrains.kotlinx:kotlinx-datetime:0.1.1")
}
}

View File

@ -1,35 +1,37 @@
package ru.inr.mass.data.api
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Instant
import kotlin.time.Duration
import kotlin.time.nanoseconds
public interface ParentBlock : NumassBlock {
public val blocks: List<NumassBlock>
public fun flowBlocks(): Flow<NumassBlock>
/**
* If true, the sub-blocks a considered to be isSequential, if not, the sub-blocks are parallel
* If true, the sub-blocks a considered to be sequential, if not, the sub-blocks are parallel
*/
public val isSequential: Boolean get() = true
public val sequential: Boolean get() = true
}
/**
* A block constructed from a set of other blocks. Internal blocks are not necessary subsequent. Blocks are automatically sorted.
* Created by darksnake on 16.07.2017.
*/
public class MetaBlock(override val blocks: List<NumassBlock>) : ParentBlock {
public class MetaBlock(private val blocks: List<NumassBlock>) : ParentBlock {
override fun flowBlocks(): Flow<NumassBlock> = blocks.asFlow()
override val startTime: Instant
get() = blocks.first().startTime
override val length: Duration
get() = blocks.sumOf { it.length.inNanoseconds }.nanoseconds
override suspend fun getLength(): Duration = blocks.sumOf { it.getLength().inNanoseconds }.nanoseconds
override val events: Flow<NumassEvent>
get() = blocks.sortedBy { it.startTime }.asFlow().flatMapConcat { it.events }
get() = flow {
blocks.sortedBy { it.startTime }.forEach { emitAll(it.events) }
}
override val frames: Flow<NumassFrame>
get() = blocks.sortedBy { it.startTime }.asFlow().flatMapConcat { it.frames }

View File

@ -24,8 +24,10 @@ import kotlinx.datetime.Instant
import kotlinx.datetime.plus
import kotlin.time.Duration
public open class OrphanNumassEvent(public val amplitude: Short, public val timeOffset: Long) :
Comparable<OrphanNumassEvent> {
public open class OrphanNumassEvent(
public val amplitude: Short,
public val timeOffset: Long,
) : Comparable<OrphanNumassEvent> {
public operator fun component1(): Short = amplitude
public operator fun component2(): Long = timeOffset
@ -43,14 +45,15 @@ public open class OrphanNumassEvent(public val amplitude: Short, public val time
* @property owner an owner block for this event
*
*/
public class NumassEvent(amplitude: Short, timeOffset: Long, public val owner: NumassBlock) :
OrphanNumassEvent(amplitude, timeOffset) {
public class NumassEvent(
amplitude: Short,
timeOffset: Long,
public val owner: NumassBlock,
) : OrphanNumassEvent(amplitude, timeOffset)
public val channel: Int get() = owner.channel
public val NumassEvent.channel: Int get() = owner.channel
public val time: Instant get() = owner.startTime.plus(timeOffset, DateTimeUnit.NANOSECOND)
}
public fun NumassEvent.getTime(): Instant = owner.startTime.plus(timeOffset, DateTimeUnit.NANOSECOND)
/**
@ -69,7 +72,7 @@ public interface NumassBlock {
/**
* The length of the block
*/
public val length: Duration
public suspend fun getLength(): Duration
/**
* Stream of isolated events. Could be empty
@ -94,10 +97,12 @@ public fun OrphanNumassEvent.adopt(parent: NumassBlock): NumassEvent {
*/
public class SimpleBlock(
override val startTime: Instant,
override val length: Duration,
private val length: Duration,
rawEvents: Iterable<OrphanNumassEvent>,
) : NumassBlock {
override suspend fun getLength(): Duration = length
private val eventList by lazy { rawEvents.map { it.adopt(this) } }
override val frames: Flow<NumassFrame> get() = emptyFlow()

View File

@ -16,31 +16,26 @@
package ru.inr.mass.data.api
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.datetime.Instant
import hep.dataforge.meta.Meta
import hep.dataforge.meta.double
import hep.dataforge.meta.get
import hep.dataforge.meta.int
import kotlinx.coroutines.flow.*
import kotlin.time.Duration
import kotlin.time.nanoseconds
/**
* Created by darksnake on 06-Jul-17.
*/
public interface NumassPoint : ParentBlock, Provider {
public interface NumassPoint : ParentBlock {
public val meta: Meta
override val blocks: List<NumassBlock>
/**
* Distinct map of channel number to corresponding grouping block
*/
public val channels: Map<Int, NumassBlock>
get() = blocks.toList().groupBy { it.channel }.mapValues { entry ->
public suspend fun getChannels(): Map<Int, NumassBlock> =
flowBlocks().toList().groupBy { it.channel }.mapValues { entry ->
if (entry.value.size == 1) {
entry.value.first()
} else {
@ -48,66 +43,35 @@ public interface NumassPoint : ParentBlock, Provider {
}
}
override fun content(target: String): Map<Name, Any> = when (target) {
NUMASS_BLOCK_TARGET -> blocks.mapIndexed { index, numassBlock ->
"block[$index]".toName() to numassBlock
}.toMap()
NUMASS_CHANNEL_TARGET -> channels.mapKeys { "channel[${it.key}]".toName() }
else -> super.content(target)
}
/**
* Get the voltage setting for the point
*
* @return
*/
public val voltage: Double get() = meta[HV_KEY].double ?: 0.0
/**
* Get the index for this point in the set
* @return
*/
public val index: Int get() = meta[INDEX_KEY].int ?: -1
/**
* Get the starting time from meta or from first block
*
* @return
*/
override val startTime: Instant
get() = meta[START_TIME_KEY]?.long?.let { Instant.fromEpochMilliseconds(it) } ?: firstBlock.startTime
/**
* Get the length key of meta or calculate length as a sum of block lengths. The latter could be a bit slow
*
* @return
*/
override val length: Duration
get() = blocks.filter { it.channel == 0 }.sumOf { it.length.inNanoseconds }.nanoseconds
override suspend fun getLength(): Duration =
flowBlocks().filter { it.channel == 0 }.toList().sumOf { it.getLength().inNanoseconds }.nanoseconds
/**
* Get all events it all blocks as a single sequence
*
*
* Some performance analysis of different stream concatenation approaches is given here: https://www.techempower.com/blog/2016/10/19/efficient-multiple-stream-concatenation-in-java/
*
*
* @return
*/
override val events: Flow<NumassEvent>
get() = blocks.asFlow().flatMapConcat { it.events }
override val events: Flow<NumassEvent> get() = flowBlocks().flatMapConcat { it.events }
/**
* Get all frames in all blocks as a single sequence
*
* @return
*/
override val frames: Flow<NumassFrame>
get() = blocks.asFlow().flatMapConcat { it.frames }
override val frames: Flow<NumassFrame> get() = flowBlocks().flatMapConcat { it.frames }
override val isSequential: Boolean
get() = channels.size == 1
public suspend fun isSequential(): Boolean = getChannels().size == 1
override fun toString(): String
@ -126,5 +90,5 @@ public interface NumassPoint : ParentBlock, Provider {
* Get the first block if it exists. Throw runtime exception otherwise.
*
*/
public val NumassPoint.firstBlock: NumassBlock
get() = blocks.firstOrNull() ?: throw RuntimeException("The point is empty")
public suspend fun NumassPoint.getFirstBlock(): NumassBlock =
flowBlocks().firstOrNull() ?: throw RuntimeException("The point is empty")

View File

@ -29,8 +29,7 @@ public interface NumassSet : Iterable<NumassPoint>, Provider {
*
* @return
*/
public val startTime: Instant
get() = meta[NumassPoint.START_TIME_KEY].long?.let {
public suspend fun getStartTime(): Instant = meta[NumassPoint.START_TIME_KEY].long?.let {
Instant.fromEpochMilliseconds(it)
} ?: firstPoint.startTime

View File

@ -1,20 +1,26 @@
package ru.inr.mass.data.api
import hep.dataforge.meta.Meta
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.datetime.Instant
/**
* A simple static implementation of NumassPoint
* Created by darksnake on 08.07.2017.
*/
public class SimpleNumassPoint(
override val blocks: List<NumassBlock>,
private val blocks: List<NumassBlock>,
override val meta: Meta,
override val isSequential: Boolean = true,
override val startTime: Instant = Instant.DISTANT_PAST,
override val sequential: Boolean = true,
) : NumassPoint {
init {
check(blocks.isNotEmpty()){"No blocks in a point"}
check(blocks.isNotEmpty()) { "No blocks in a point" }
}
override fun toString(): String = "SimpleNumassPoint(index = ${index}, hv = $voltage)"
override fun flowBlocks(): Flow<NumassBlock> = blocks.asFlow()
override fun toString(): String = "SimpleNumassPoint(index = ${index}, hv = $voltage)"
}

View File

@ -45,18 +45,17 @@ internal class ProtoNumassPoint(
private val protoBuilder: () -> Point,
) : NumassPoint {
private val proto: Point get() = protoBuilder()
val point by lazy(protoBuilder)
override val blocks: List<NumassBlock>
get() = proto.channels.flatMap { channel ->
override fun flowBlocks() = point.channels.flatMap { channel ->
channel.blocks
.map { block -> ProtoBlock(channel.id.toInt(), block, this) }
.map { block -> ProtoNumassBlock(channel.id.toInt(), block, this) }
.sortedBy { it.startTime }
}
}.asFlow()
override val channels: Map<Int, NumassBlock>
get() = proto.channels.groupBy { it.id.toInt() }.mapValues { entry ->
MetaBlock(entry.value.flatMap { it.blocks }.map { ProtoBlock(entry.key, it, this) })
override suspend fun getChannels(): Map<Int, NumassBlock> =
point.channels.groupBy { it.id.toInt() }.mapValues { entry ->
MetaBlock(entry.value.flatMap { it.blocks }.map { ProtoNumassBlock(entry.key, it, this) })
}
override val voltage: Double get() = meta["external_meta.HV1_value"].double ?: super.voltage
@ -66,12 +65,11 @@ internal class ProtoNumassPoint(
override val startTime: Instant
get() = meta["start_time"].long?.let {
Instant.fromEpochMilliseconds(it)
} ?: super.startTime
} ?: Instant.DISTANT_PAST
override val length: Duration
get() = meta["acquisition_time"].double?.let {
override suspend fun getLength(): Duration = meta["acquisition_time"].double?.let {
(it * 1000).toLong().milliseconds
} ?: super.length
} ?: super.getLength()
override fun toString(): String = "ProtoNumassPoint(index = ${index}, hv = $voltage)"
@ -118,10 +116,10 @@ internal class ProtoNumassPoint(
}
public class ProtoBlock(
public class ProtoNumassBlock(
override val channel: Int,
private val block: Point.Channel.Block,
parent: NumassPoint? = null,
private val parent: NumassPoint? = null,
) : NumassBlock {
override val startTime: Instant
@ -132,7 +130,7 @@ public class ProtoBlock(
return Instant.fromEpochSeconds(seconds, reminder.toLong())
}
override val length: Duration = when {
override suspend fun getLength(): Duration = when {
block.length > 0 -> block.length.nanoseconds
parent?.meta["acquisition_time"] != null ->
(parent?.meta["acquisition_time"].double ?: 0.0 * 1000).milliseconds
@ -164,12 +162,12 @@ public class ProtoBlock(
emptyFlow()
}
private fun ByteString.toShortArray(): ShortArray{
private fun ByteString.toShortArray(): ShortArray {
val shortBuffer = asByteBuffer().asShortBuffer()
return if(shortBuffer.hasArray()){
return if (shortBuffer.hasArray()) {
shortBuffer.array()
} else {
ShortArray(shortBuffer.limit()){shortBuffer.get(it)}
ShortArray(shortBuffer.limit()) { shortBuffer.get(it) }
}
}

View File

@ -22,7 +22,7 @@ class TestNumassDirectory {
assertEquals(ListValue.EMPTY, testSet.meta["comments"].value)
assertEquals(31, testSet.points.size)
val point22 = testSet.points.find { it.index == 22 }!!
point22.blocks
point22.flowBlocks()
assertEquals("2018-04-13T21:56:09", point22.meta["end_time"].string)
}
}

View File

@ -1,13 +1,17 @@
plugins {
kotlin("jvm")
id("ru.mipt.npm.kscience")
id("com.github.johnrengelman.shadow") version "6.1.0"
}
kscience{
application()
kscience {
publish()
}
kotlin {
explicitApi = null
}
val dataforgeVersion: String by rootProject.extra
val plotlyVersion: String by rootProject.extra("0.3.1-dev-5")
val kmathVersion: String by rootProject.extra("0.2.0-dev-6")
@ -17,4 +21,5 @@ dependencies {
implementation("hep.dataforge:dataforge-workspace:$dataforgeVersion")
implementation("kscience.plotlykt:plotlykt-core:$plotlyVersion")
implementation("kscience.kmath:kmath-histograms:$kmathVersion")
implementation("kscience.kmath:kmath-for-real:$kmathVersion")
}

View File

@ -1,11 +1,14 @@
package ru.inr.mass.workspace
import hep.dataforge.data.await
import hep.dataforge.names.toName
import kscience.plotly.Plotly
import kscience.plotly.makeFile
import ru.inr.mass.data.proto.readNumassDirectory
import java.nio.file.Path
fun main() {
val dataPath = Path.of("D:\\Work\\Numass\\data\\2018_04\\Adiabacity_19\\set_4\\")
val testSet = NUMASS.context.readNumassDirectory(dataPath)
testSet.plotlyPage().makeFile()
suspend fun main() {
val repo = readNumassRepository("D:\\Work\\Numass\\data\\2018_04")
//val dataPath = Path.of("D:\\Work\\Numass\\data\\2018_04\\Adiabacity_19\\set_4\\")
//val testSet = NUMASS.context.readNumassDirectory(dataPath)
val testSet = repo.getData("Adiabacity_19.set_4".toName())!!.await()
Plotly.numassDirectory(testSet).makeFile()
}

View File

@ -6,17 +6,21 @@ import hep.dataforge.context.logger
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kscience.kmath.histogram.UnivariateHistogram
import kscience.kmath.structures.RealBuffer
import kscience.kmath.structures.asBuffer
import ru.inr.mass.data.api.NumassPoint
/**
* Build an amplitude spectrum
*/
fun NumassPoint.spectrum(): UnivariateHistogram =
UnivariateHistogram.uniform(1.0) {
fun NumassPoint.spectrum(): UnivariateHistogram = UnivariateHistogram.uniform(1.0) {
runBlocking {
events.collect { put(it.amplitude.toDouble()) }
}
}
}
operator fun UnivariateHistogram.component1(): RealBuffer = map {it.position}.toDoubleArray().asBuffer()
operator fun UnivariateHistogram.component2(): RealBuffer = map { it.value }.toDoubleArray().asBuffer()
fun Collection<NumassPoint>.spectrum(): UnivariateHistogram {
if (distinctBy { it.voltage }.size != 1) {

View File

@ -1,6 +1,38 @@
package ru.inr.mass.workspace
import hep.dataforge.data.ActiveDataTree
import hep.dataforge.data.DataTree
import hep.dataforge.data.emitStatic
import hep.dataforge.names.Name
import hep.dataforge.names.NameToken
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import ru.inr.mass.data.proto.NumassDirectorySet
import ru.inr.mass.data.proto.readNumassDirectory
import java.nio.file.Files
import java.nio.file.Path
import kotlin.io.path.ExperimentalPathApi
import kotlin.io.path.exists
import kotlin.io.path.isDirectory
import kotlin.io.path.relativeTo
import kotlin.streams.toList
fun readNumassDirectory(path: String): NumassDirectorySet = NUMASS.context.readNumassDirectory(path)
@OptIn(ExperimentalPathApi::class)
suspend fun readNumassRepository(path: String): DataTree<NumassDirectorySet> = ActiveDataTree {
val basePath = Path.of(path)
@Suppress("BlockingMethodInNonBlockingContext")
withContext(Dispatchers.IO) {
Files.walk(Path.of(path)).filter {
it.isDirectory() && it.resolve("meta").exists()
}.toList().forEach { childPath ->
val name = Name(childPath.relativeTo(basePath).map { segment ->
NameToken(segment.fileName.toString())
})
val value = NUMASS.context.readNumassDirectory(childPath)
emitStatic(name, value, value.meta)
}
}
//TODO add file watcher
}

View File

@ -39,19 +39,19 @@ fun Plot.hvData(data: List<HVEntry>): Trace = scatter {
y.numbers = data.map { it.value }
}
fun NumassDirectorySet.plotlyPage(binSize: Int = 20, range: IntRange = 0..2000): PlotlyPage = Plotly.page {
fun Plotly.numassDirectory(set: NumassDirectorySet, binSize: Int = 20, range: IntRange = 0..2000): PlotlyPage = Plotly.page {
h1 {
+"Numass point set $path"
+"Numass point set ${set.path}"
}
h2 {
+"Amplitude spectrum"
}
plot {
points.sortedBy { it.index }.forEach {
set.points.sortedBy { it.index }.forEach {
amplitudeSpectrum(it, binSize, range)
}
}
getHvData()?.let { entries ->
set.getHvData()?.let { entries ->
h2 {
+"HV"
}