Migrate model to MPP

This commit is contained in:
Alexander Nozik 2021-01-29 19:23:16 +03:00
parent edcc3e4956
commit 71e55ed5fc
19 changed files with 174 additions and 271 deletions

View File

@ -8,7 +8,7 @@ allprojects {
} }
group = "ru.inr.mass" group = "ru.inr.mass"
version = "0.1.0" version = "0.1.0-SNAPSHOT"
} }
val dataforgeVersion by extra("0.3.0-dev-2") val dataforgeVersion by extra("0.3.0-dev-2")

View File

@ -1,5 +1,5 @@
plugins { plugins {
kotlin("jvm") kotlin("multiplatform")
id("ru.mipt.npm.kscience") id("ru.mipt.npm.kscience")
} }
@ -9,6 +9,13 @@ kscience{
val dataforgeVersion: String by rootProject.extra val dataforgeVersion: String by rootProject.extra
kotlin.sourceSets {
commonMain {
dependencies { dependencies {
api("hep.dataforge:dataforge-context:$dataforgeVersion") api("hep.dataforge:dataforge-context:$dataforgeVersion")
api("org.jetbrains.kotlinx:kotlinx-datetime:0.1.1")
} }
}
}

View File

@ -2,11 +2,10 @@ package ru.inr.mass.data.api
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMap
import kotlinx.coroutines.flow.flatMapConcat import kotlinx.coroutines.flow.flatMapConcat
import java.time.Duration import kotlinx.datetime.Instant
import java.time.Instant import kotlin.time.Duration
import java.util.stream.Stream import kotlin.time.nanoseconds
public interface ParentBlock : NumassBlock { public interface ParentBlock : NumassBlock {
public val blocks: List<NumassBlock> public val blocks: List<NumassBlock>
@ -27,7 +26,7 @@ public class MetaBlock(override val blocks: List<NumassBlock>) : ParentBlock {
get() = blocks.first().startTime get() = blocks.first().startTime
override val length: Duration override val length: Duration
get() = Duration.ofNanos(blocks.stream().mapToLong { block -> block.length.toNanos() }.sum()) get() = blocks.sumOf { it.length.inNanoseconds }.nanoseconds
override val events: Flow<NumassEvent> override val events: Flow<NumassEvent>
get() = blocks.sortedBy { it.startTime }.asFlow().flatMapConcat { it.events } get() = blocks.sortedBy { it.startTime }.asFlow().flatMapConcat { it.events }

View File

@ -19,8 +19,10 @@ package ru.inr.mass.data.api
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.emptyFlow
import java.time.Duration import kotlinx.datetime.DateTimeUnit
import java.time.Instant import kotlinx.datetime.Instant
import kotlinx.datetime.plus
import kotlin.time.Duration
public open class OrphanNumassEvent(public val amplitude: Short, public val timeOffset: Long) : public open class OrphanNumassEvent(public val amplitude: Short, public val timeOffset: Long) :
Comparable<OrphanNumassEvent> { Comparable<OrphanNumassEvent> {
@ -46,7 +48,7 @@ public class NumassEvent(amplitude: Short, timeOffset: Long, public val owner: N
public val channel: Int get() = owner.channel public val channel: Int get() = owner.channel
public val time: Instant get() = owner.startTime.plusNanos(timeOffset) public val time: Instant get() = owner.startTime.plus(timeOffset, DateTimeUnit.NANOSECOND)
} }
@ -103,12 +105,14 @@ public class SimpleBlock(
override val events: Flow<NumassEvent> get() = eventList.asFlow() override val events: Flow<NumassEvent> get() = eventList.asFlow()
public companion object { public companion object {
public suspend fun produce(
}
}
public suspend fun SimpleBlock(
startTime: Instant, startTime: Instant,
length: Duration, length: Duration,
producer: suspend () -> Iterable<OrphanNumassEvent>, producer: suspend () -> Iterable<OrphanNumassEvent>,
): SimpleBlock { ): SimpleBlock {
return SimpleBlock(startTime, length, producer()) return SimpleBlock(startTime, length, producer())
} }
}
}

View File

@ -0,0 +1,19 @@
package ru.inr.mass.data.api
import kotlinx.datetime.Instant
import kotlin.time.Duration
/**
* The continuous frame of digital detector data
* Created by darksnake on 06-Jul-17.
* @param time The absolute start time of the frame
* @param tickSize The time interval per tick
* @param signal The buffered signal shape in ticks
*/
public class NumassFrame(
public val time: Instant,
public val tickSize: Duration,
public val signal: ShortArray,
) {
public val length: Duration get() = tickSize * signal.size
}

View File

@ -22,11 +22,10 @@ import hep.dataforge.names.toName
import hep.dataforge.provider.Provider import hep.dataforge.provider.Provider
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flatMap
import kotlinx.coroutines.flow.flatMapConcat import kotlinx.coroutines.flow.flatMapConcat
import java.time.Duration import kotlinx.datetime.Instant
import java.time.Instant import kotlin.time.Duration
import java.util.stream.Stream import kotlin.time.nanoseconds
/** /**
* Created by darksnake on 06-Jul-17. * Created by darksnake on 06-Jul-17.
@ -76,7 +75,7 @@ public interface NumassPoint : ParentBlock, Provider {
* @return * @return
*/ */
override val startTime: Instant override val startTime: Instant
get() = meta[START_TIME_KEY]?.long?.let { Instant.ofEpochMilli(it) } ?: firstBlock.startTime get() = meta[START_TIME_KEY]?.long?.let { Instant.fromEpochMilliseconds(it) } ?: firstBlock.startTime
/** /**
* Get the length key of meta or calculate length as a sum of block lengths. The latter could be a bit slow * Get the length key of meta or calculate length as a sum of block lengths. The latter could be a bit slow
@ -84,8 +83,7 @@ public interface NumassPoint : ParentBlock, Provider {
* @return * @return
*/ */
override val length: Duration override val length: Duration
get() = Duration.ofNanos(blocks.stream().filter { it.channel == 0 }.mapToLong { it -> it.length.toNanos() } get() = blocks.filter { it.channel == 0 }.sumOf { it.length.inNanoseconds }.nanoseconds
.sum())
/** /**
* Get all events it all blocks as a single sequence * Get all events it all blocks as a single sequence
@ -111,6 +109,8 @@ public interface NumassPoint : ParentBlock, Provider {
override val isSequential: Boolean override val isSequential: Boolean
get() = channels.size == 1 get() = channels.size == 1
override fun toString(): String
public companion object { public companion object {
public const val NUMASS_BLOCK_TARGET: String = "block" public const val NUMASS_BLOCK_TARGET: String = "block"
public const val NUMASS_CHANNEL_TARGET: String = "channel" public const val NUMASS_CHANNEL_TARGET: String = "channel"

View File

@ -11,7 +11,7 @@ import hep.dataforge.meta.long
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.toName import hep.dataforge.names.toName
import hep.dataforge.provider.Provider import hep.dataforge.provider.Provider
import java.time.Instant import kotlinx.datetime.Instant
/** /**
* A single set of numass measurements together with metadata. * A single set of numass measurements together with metadata.
@ -31,7 +31,7 @@ public interface NumassSet : Iterable<NumassPoint>, Provider {
*/ */
public val startTime: Instant public val startTime: Instant
get() = meta[NumassPoint.START_TIME_KEY].long?.let { get() = meta[NumassPoint.START_TIME_KEY].long?.let {
Instant.ofEpochMilli(it) Instant.fromEpochMilliseconds(it)
} ?: firstPoint.startTime } ?: firstPoint.startTime
//suspend fun getHvData(): Table? //suspend fun getHvData(): Table?

View File

@ -1,11 +1,11 @@
package ru.inr.mass.data.api package ru.inr.mass.data.api
import java.util.stream.Stream import kotlinx.coroutines.flow.Flow
/** /**
* An ancestor to numass frame analyzers * An ancestor to numass frame analyzers
* Created by darksnake on 07.07.2017. * Created by darksnake on 07.07.2017.
*/ */
public interface SignalProcessor { public interface SignalProcessor {
public fun analyze(frame: NumassFrame): Stream<NumassEvent> public fun analyze(frame: NumassFrame): Flow<NumassEvent>
} }

View File

@ -0,0 +1,20 @@
package ru.inr.mass.data.api
import hep.dataforge.meta.Meta
/**
* A simple static implementation of NumassPoint
* Created by darksnake on 08.07.2017.
*/
public class SimpleNumassPoint(
override val blocks: List<NumassBlock>,
override val meta: Meta,
override val isSequential: Boolean = true,
) : NumassPoint {
init {
check(blocks.isNotEmpty()){"No blocks in a point"}
}
override fun toString(): String = "SimpleNumassPoint(index = ${index}, hv = $voltage)"
}

View File

@ -1,27 +0,0 @@
package ru.inr.mass.data.api
import java.nio.ShortBuffer
import java.time.Duration
import java.time.Instant
/**
* The continuous frame of digital detector data
* Created by darksnake on 06-Jul-17.
*/
public class NumassFrame(
/**
* The absolute start time of the frame
*/
public val time: Instant,
/**
* The time interval per tick
*/
public val tickSize: Duration,
/**
* The buffered signal shape in ticks
*/
public val signal: ShortBuffer) {
public val length: Duration
get() = tickSize.multipliedBy(signal.capacity().toLong())
}

View File

@ -1,30 +0,0 @@
package ru.inr.mass.data.api
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
/**
* A simple static implementation of NumassPoint
* Created by darksnake on 08.07.2017.
*/
public class SimpleNumassPoint(
override val blocks: List<NumassBlock>,
override val meta: Meta,
override val isSequential: Boolean = true,
) : NumassPoint {
// /**
// * Input blocks must be sorted
// * @param voltage
// * @param blocks
// */
// constructor(blocks: Collection<NumassBlock>, voltage: Double) :
// this(blocks.sortedBy { it.startTime }, MetaBuilder("point").setValue(NumassPoint.HV_KEY, voltage))
init {
if (blocks.isEmpty()) {
throw IllegalArgumentException("No blocks in collection")
}
}
}

View File

@ -12,8 +12,7 @@ val dataforgeVersion: String by rootProject.extra
dependencies { dependencies {
api(project(":numass-data-model")) api(project(":numass-data-model"))
api("hep.dataforge:dataforge-workspace:$dataforgeVersion") api("hep.dataforge:dataforge-io:$dataforgeVersion")
implementation("javax.annotation:javax.annotation-api:1.3.1")
} }
wire{ wire{

View File

@ -1,52 +0,0 @@
///*
// * Copyright 2018 Alexander Nozik.
// *
// * Licensed under the Apache License, Version 2.0 (the "License");
// * you may not use this file except in compliance with the License.
// * You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//package ru.inr.mass.data.proto
//
//import hep.dataforge.meta.Meta
//import hep.dataforge.storage.files.MutableFileEnvelope
//import java.nio.ByteBuffer
//import java.nio.file.Files
//import java.nio.file.Path
//import java.nio.file.StandardOpenOption
//
//class NumassFileEnvelope(path: Path) : MutableFileEnvelope(path) {
//
// private val tag by lazy { Files.newByteChannel(path, StandardOpenOption.READ).use { NumassEnvelopeType.LegacyTag().read(it) } }
//
// override val dataOffset: Long by lazy { (tag.length + tag.metaSize).toLong() }
//
// override var dataLength: Int
// get() = tag.dataSize
// set(value) {
// if (value > Int.MAX_VALUE) {
// throw RuntimeException("Too large data block")
// }
// tag.dataSize = value
// if (channel.write(tag.toBytes(), 0L) < tag.length) {
// throw error("Tag is not overwritten.")
// }
// }
//
//
// override val meta: Meta by lazy {
// val buffer = ByteBuffer.allocate(tag.metaSize).also {
// channel.read(it, tag.length.toLong())
// }
// tag.metaType.reader.readBuffer(buffer)
// }
//}
//

View File

@ -20,16 +20,21 @@ import hep.dataforge.io.Envelope
import hep.dataforge.meta.* import hep.dataforge.meta.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.datetime.DateTimeUnit
import kotlinx.datetime.Instant
import kotlinx.datetime.plus
import kotlinx.io.asInputStream import kotlinx.io.asInputStream
import kotlinx.io.readByteArray import kotlinx.io.readByteArray
import okio.ByteString
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import ru.inr.mass.data.api.* import ru.inr.mass.data.api.*
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream import java.io.ByteArrayOutputStream
import java.io.InputStream import java.io.InputStream
import java.time.Duration
import java.time.Instant
import java.util.zip.Inflater import java.util.zip.Inflater
import kotlin.time.Duration
import kotlin.time.milliseconds
import kotlin.time.nanoseconds
/** /**
* Protobuf based numass point * Protobuf based numass point
@ -60,14 +65,15 @@ internal class ProtoNumassPoint(
override val startTime: Instant override val startTime: Instant
get() = meta["start_time"].long?.let { get() = meta["start_time"].long?.let {
Instant.ofEpochMilli(it) Instant.fromEpochMilliseconds(it)
} ?: super.startTime } ?: super.startTime
override val length: Duration override val length: Duration
get() = meta["acquisition_time"].double?.let { get() = meta["acquisition_time"].double?.let {
Duration.ofMillis((it * 1000).toLong()) (it * 1000).toLong().milliseconds
} ?: super.length } ?: super.length
override fun toString(): String = "ProtoNumassPoint(index = ${index}, hv = $voltage)"
public companion object { public companion object {
@ -123,19 +129,19 @@ public class ProtoBlock(
val nanos = block.time val nanos = block.time
val seconds = Math.floorDiv(nanos, 1e9.toInt().toLong()) val seconds = Math.floorDiv(nanos, 1e9.toInt().toLong())
val reminder = (nanos % 1e9).toInt() val reminder = (nanos % 1e9).toInt()
return Instant.ofEpochSecond(seconds, reminder.toLong()) return Instant.fromEpochSeconds(seconds, reminder.toLong())
} }
override val length: Duration = when { override val length: Duration = when {
block.length > 0 -> Duration.ofNanos(block.length) block.length > 0 -> block.length.nanoseconds
parent?.meta["acquisition_time"] != null -> parent?.meta["acquisition_time"] != null ->
Duration.ofMillis((parent?.meta["acquisition_time"].double ?: 0.0 * 1000).toLong()) (parent?.meta["acquisition_time"].double ?: 0.0 * 1000).milliseconds
else -> { else -> {
LoggerFactory.getLogger(javaClass) LoggerFactory.getLogger(javaClass)
.error("No length information on block. Trying to infer from first and last events") .error("No length information on block. Trying to infer from first and last events")
val times = runBlocking { events.map { it.timeOffset }.toList() } val times = runBlocking { events.map { it.timeOffset }.toList() }
val nanos = (times.maxOrNull()!! - times.minOrNull()!!) val nanos = (times.maxOrNull()!! - times.minOrNull()!!)
Duration.ofNanos(nanos) nanos.nanoseconds
} }
} }
@ -158,14 +164,22 @@ public class ProtoBlock(
emptyFlow() emptyFlow()
} }
private fun ByteString.toShortArray(): ShortArray{
val shortBuffer = asByteBuffer().asShortBuffer()
return if(shortBuffer.hasArray()){
shortBuffer.array()
} else {
ShortArray(shortBuffer.limit()){shortBuffer.get(it)}
}
}
override val frames: Flow<NumassFrame> override val frames: Flow<NumassFrame>
get() { get() {
val tickSize = Duration.ofNanos(block.bin_size) val tickSize = block.bin_size.nanoseconds
return block.frames.asFlow().map { frame -> return block.frames.asFlow().map { frame ->
val time = startTime.plusNanos(frame.time) val time = startTime.plus(frame.time, DateTimeUnit.NANOSECOND)
val frameData = frame.data_.asByteBuffer() val frameData = frame.data_
NumassFrame(time, tickSize, frameData.asShortBuffer()) NumassFrame(time, tickSize, frameData.toShortArray())
} }
} }
} }

View File

@ -104,7 +104,7 @@ internal class TaggedNumassEnvelopeFormat(private val io: IOPlugin) : EnvelopeFo
IOFormat.NAME_KEY put name.toString() IOFormat.NAME_KEY put name.toString()
} }
public companion object : EnvelopeFormatFactory { companion object : EnvelopeFormatFactory {
private const val START_SEQUENCE = "#!" private const val START_SEQUENCE = "#!"
private const val END_SEQUENCE = "!#\r\n" private const val END_SEQUENCE = "!#\r\n"
@ -170,118 +170,4 @@ internal class TaggedNumassEnvelopeFormat(private val io: IOPlugin) : EnvelopeFo
override fun readObject(input: Input): Envelope = default.readObject(input) override fun readObject(input: Input): Envelope = default.readObject(input)
} }
} }
///**
// * An envelope type for legacy numass tags. Reads legacy tag and writes DF02 tags
// */
//object NumassEnvelopeType : EnvelopeFormatFactory {
//
// override val code: Int = DefaultEnvelopeType.DEFAULT_ENVELOPE_CODE
//
// override val name: String = "numass"
//
// override fun description(): String = "Numass legacy envelope"
//
// /**
// * Read as legacy
// */
// override fun getReader(properties: Map<String, String>): EnvelopeReader {
// return NumassEnvelopeReader()
// }
//
// /**
// * Write as default
// */
// override fun getWriter(properties: Map<String, String>): EnvelopeWriter {
// return DefaultEnvelopeWriter(this, MetaType.resolve(properties))
// }
//
// class LegacyTag : EnvelopeTag() {
// override val startSequence: ByteArray
// get() = LEGACY_START_SEQUENCE
//
// override val endSequence: ByteArray
// get() = LEGACY_END_SEQUENCE
//
// /**
// * Get the length of tag in bytes. -1 means undefined size in case tag was modified
// *
// * @return
// */
// override val length: Int
// get() = 30
//
// /**
// * Read leagscy version 1 tag without leading tag head
// *
// * @param buffer
// * @return
// * @throws IOException
// */
// override fun readHeader(buffer: ByteBuffer): Map<String, Value> {
// val res = HashMap<String, Value>()
//
// val type = buffer.getInt(2)
// res[Envelope.TYPE_PROPERTY] = Value.of(type)
//
// val metaTypeCode = buffer.getShort(10)
// val metaType = MetaType.resolve(metaTypeCode)
//
// if (metaType != null) {
// res[Envelope.META_TYPE_PROPERTY] = metaType.name.parseValue()
// } else {
// LoggerFactory.getLogger(EnvelopeTag::class.java).warn("Could not resolve meta type. Using default")
// }
//
// val metaLength = Integer.toUnsignedLong(buffer.getInt(14))
// res[Envelope.META_LENGTH_PROPERTY] = Value.of(metaLength)
// val dataLength = Integer.toUnsignedLong(buffer.getInt(22))
// res[Envelope.DATA_LENGTH_PROPERTY] = Value.of(dataLength)
// return res
// }
// }
//
// private class NumassEnvelopeReader : DefaultEnvelopeReader() {
// override fun newTag(): EnvelopeTag {
// return LegacyTag()
// }
// }
//
// companion object {
// val INSTANCE = NumassEnvelopeType()
//
// val LEGACY_START_SEQUENCE = byteArrayOf('#'.toByte(), '!'.toByte())
// val LEGACY_END_SEQUENCE = byteArrayOf('!'.toByte(), '#'.toByte(), '\r'.toByte(), '\n'.toByte())
//
// /**
// * Replacement for standard type infer to include legacy type
// *
// * @param path
// * @return
// */
// fun infer(path: Path): EnvelopeType? {
// return try {
// FileChannel.open(path, StandardOpenOption.READ).use {
// val buffer = it.map(FileChannel.MapMode.READ_ONLY, 0, 6)
// when {
// //TODO use templates from appropriate types
// buffer.get(0) == '#'.toByte() && buffer.get(1) == '!'.toByte() -> INSTANCE
// buffer.get(0) == '#'.toByte() && buffer.get(1) == '!'.toByte() &&
// buffer.get(4) == 'T'.toByte() && buffer.get(5) == 'L'.toByte() -> TaglessEnvelopeType.INSTANCE
// buffer.get(0) == '#'.toByte() && buffer.get(1) == '~'.toByte() -> DefaultEnvelopeType.INSTANCE
// else -> null
// }
// }
// } catch (ex: Exception) {
// LoggerFactory.getLogger(EnvelopeType::class.java).warn("Could not infer envelope type of file {} due to exception: {}", path, ex)
// null
// }
//
// }
//
// }
//
//}

View File

@ -0,0 +1,20 @@
plugins {
kotlin("jvm")
id("ru.mipt.npm.kscience")
}
kscience{
application()
publish()
}
val dataforgeVersion: String by rootProject.extra
val plotlyVersion: String by rootProject.extra("0.3.1-dev-5")
val kmathVersion: String by rootProject.extra("0.2.0-dev-6")
dependencies {
implementation(project(":numass-data-proto"))
implementation("hep.dataforge:dataforge-workspace:$dataforgeVersion")
implementation("kscience.plotlykt:plotlykt-core:$plotlyVersion")
implementation("kscience.kmath:kmath-histograms:$kmathVersion")
}

View File

@ -0,0 +1,33 @@
@file:Suppress("EXPERIMENTAL_API_USAGE")
package ru.inr.mass.workspace
import hep.dataforge.context.logger
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kscience.kmath.histogram.UnivariateHistogram
import ru.inr.mass.data.api.NumassPoint
/**
* Build an amplitude spectrum
*/
fun NumassPoint.spectrum(): UnivariateHistogram =
UnivariateHistogram.uniform(1.0) {
runBlocking {
events.collect { put(it.channel.toDouble()) }
}
}
fun Collection<NumassPoint>.spectrum(): UnivariateHistogram {
if (distinctBy { it.voltage }.size != 1) {
numass.logger.warn { "Spectrum is generated from points with different hv value: $this" }
}
return UnivariateHistogram.uniform(1.0) {
runBlocking {
this@spectrum.forEach { point ->
point.events.collect { put(it.channel.toDouble()) }
}
}
}
}

View File

@ -0,0 +1,10 @@
package ru.inr.mass.workspace
import hep.dataforge.workspace.Workspace
import ru.inr.mass.data.proto.NumassProtoPlugin
val numass = Workspace {
context("numass") {
plugin(NumassProtoPlugin)
}
}

View File

@ -36,3 +36,4 @@ pluginManagement {
include("numass-data-model") include("numass-data-model")
include("numass-data-proto") include("numass-data-proto")
include("numass-workspace")