Migration to new Storage API

This commit is contained in:
Alexander Nozik 2018-11-25 18:03:31 +03:00
parent 32732c9de6
commit da0b896d29
40 changed files with 400 additions and 255 deletions

View File

@ -13,5 +13,4 @@ out/*
.idea/
*.iml
numass-core/gen/
*/gen/*

View File

@ -31,7 +31,8 @@ import hep.dataforge.meta.Meta
import hep.dataforge.states.StateDef
import hep.dataforge.states.valueState
import hep.dataforge.storage.StorageConnection
import hep.dataforge.storage.TableLoader
import hep.dataforge.storage.tables.TableLoader
import hep.dataforge.storage.tables.createTable
import hep.dataforge.tables.TableFormat
import hep.dataforge.tables.TableFormatBuilder
import hep.dataforge.utils.DateTimeUtils
@ -91,13 +92,7 @@ class PKT8Device(context: Context, meta: Meta) : PortSensor(context, meta) {
private fun buildLoader(connection: StorageConnection): TableLoader {
val storage = connection.storage
val suffix = DateTimeUtils.fileSuffix()
try {
return LoaderFactory.buildPointLoader(storage, "cryotemp_$suffix", "", "timestamp", tableFormat)
} catch (e: StorageException) {
throw RuntimeException("Failed to builder loader from storage", e)
}
return storage.createTable("cryotemp_$suffix", tableFormat)
}
@Throws(ControlException::class)

View File

@ -16,12 +16,19 @@
package inr.numass.control.dante
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.context.launch
import hep.dataforge.meta.Meta
import hep.dataforge.meta.buildMeta
import hep.dataforge.orElse
import inr.numass.control.dante.DanteClient.Companion.CommandType.*
import inr.numass.control.dante.DanteClient.Companion.Register.*
import inr.numass.data.NumassProto
import inr.numass.data.ProtoNumassPoint
import inr.numass.data.api.NumassPoint
import inr.numass.data.storage.ProtoNumassPoint
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.io.DataInputStream
import java.io.OutputStream
import java.lang.Math.pow
@ -64,15 +71,13 @@ internal val ByteArray.hex
//TODO implement using Device
class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
class DanteClient(override val context: Context,val ip: String, chainLength: Int) : AutoCloseable, ContextAware {
private val RESET_COMMAND = byteArrayOf(0xDD.toByte(), 0x55, 0xDD.toByte(), 0xEE.toByte())
private val logger = LoggerFactory.getLogger(javaClass)
private val packetNumber = AtomicLong(0)
private lateinit var parentJob: Job
private val pool = newFixedThreadPoolContext(8, "Dante")
private val parentJob: Job = SupervisorJob()
private val pool = newFixedThreadPoolContext(8, "Dante") + parentJob
private val connections: MutableMap<Int, Pair<Socket, Job>> = HashMap()
@ -101,8 +106,8 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
fun open() {
//create a new parent job
this.parentJob = Job()
//start supervisor job
parentJob.start()
(0..3).forEach {
openPort(it)
}
@ -114,7 +119,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
it.first.close()
it.second.cancel()
}
parentJob.cancel(CancellationException("Server stopped"))
parentJob.cancel()
}
/**
@ -132,7 +137,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
* Reset everything
*/
fun reset() {
async {
launch {
sendChannel.send(RESET_COMMAND)
}
}
@ -152,7 +157,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
if (port == 0) {
//outputJob.cancel()
output = socket.getOutputStream()
outputJob = launch(context = pool, parent = parentJob) {
outputJob = launch(pool) {
while (true) {
val command = sendChannel.receive()
output.write(command)
@ -162,7 +167,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
}
val job = launch(context = pool, parent = parentJob) {
val job = launch(pool) {
val stream = socket.getInputStream()
while (true) {
if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) {
@ -249,7 +254,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
}
return sequence {
val intBuffer = ByteBuffer.wrap(message.payload).asIntBuffer()
val intBuffer = ByteBuffer.wrap(message!!.payload).asIntBuffer()
while (intBuffer.hasRemaining()) {
yield(intBuffer.get())
}

View File

@ -29,10 +29,11 @@ import inr.numass.data.analyzers.NumassAnalyzer
import inr.numass.data.analyzers.SimpleAnalyzer
import inr.numass.data.analyzers.withBinning
import inr.numass.data.api.NumassBlock
import inr.numass.data.channel
import kotlinx.coroutines.runBlocking
fun main(args: Array<String>) {
val client = DanteClient("192.168.111.120", 8)
val client = DanteClient(Global,"192.168.111.120", 8)
client.open()
val meta = buildMeta {
"gain" to 1.0

View File

@ -15,13 +15,23 @@
*/
package inr.numass.control.magnet
import hep.dataforge.context.Context
import hep.dataforge.control.devices.AbstractDevice
import hep.dataforge.control.ports.Port
import hep.dataforge.control.ports.PortFactory
import hep.dataforge.description.ValueDef
import hep.dataforge.exceptions.ControlException
import hep.dataforge.exceptions.PortException
import hep.dataforge.meta.Meta
import hep.dataforge.meta.buildMeta
import hep.dataforge.states.StateDef
import hep.dataforge.states.StateDefs
import hep.dataforge.states.valueState
import hep.dataforge.utils.DateTimeUtils
import hep.dataforge.values.ValueType.*
import inr.numass.control.DeviceView
import inr.numass.control.magnet.fx.MagnetDisplay
import kotlinx.coroutines.runBlocking
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit

View File

@ -16,6 +16,8 @@
package inr.numass.control.magnet.fx
import hep.dataforge.exceptions.PortException
import hep.dataforge.fx.asDoubleProperty
import hep.dataforge.states.ValueState
import inr.numass.control.DeviceDisplayFX
import inr.numass.control.magnet.LambdaMagnet
import javafx.application.Platform
@ -24,6 +26,7 @@ import javafx.beans.value.ObservableValue
import javafx.scene.control.*
import javafx.scene.layout.AnchorPane
import javafx.scene.paint.Color
import tornadofx.*
/**
* FXML Controller class

View File

@ -33,7 +33,7 @@ import hep.dataforge.meta.Meta
import hep.dataforge.states.StateDef
import hep.dataforge.states.StateDefs
import hep.dataforge.states.valueState
import hep.dataforge.storage.commons.StorageConnection
import hep.dataforge.storage.StorageConnection
import hep.dataforge.tables.TableFormatBuilder
import hep.dataforge.tables.ValuesListener
import hep.dataforge.useMeta

View File

@ -4,11 +4,10 @@ import hep.dataforge.control.connections.DeviceConnection
import hep.dataforge.control.connections.Roles
import hep.dataforge.control.devices.Device
import hep.dataforge.nullable
import hep.dataforge.storage.MutableStorage
import hep.dataforge.storage.MutableTableLoader
import hep.dataforge.storage.Storage
import hep.dataforge.storage.StorageConnection
import hep.dataforge.storage.files.createTable
import hep.dataforge.storage.tables.MutableTableLoader
import hep.dataforge.storage.tables.createTable
import hep.dataforge.tables.TableFormat
import hep.dataforge.tables.ValuesListener
import hep.dataforge.utils.DateTimeUtils
@ -29,11 +28,12 @@ class NumassStorageConnection(private val loaderName: String? = null, private va
val loaderName = "${loaderName ?: device.name}_$suffix"
device.forEachConnection(Roles.STORAGE_ROLE, StorageConnection::class.java) { connection ->
try {
connection.context.launch(Dispatchers.IO) {
//create a loader instance for each connected storage
val pl = loaderMap.getOrPut(connection.storage) {
(connection.storage as MutableStorage).createTable(loaderName, format)
connection.storage.createTable(loaderName, format)
}
connection.context.launch(Dispatchers.IO) {
pl.append(point)
}
} catch (ex: Exception) {

View File

@ -3,7 +3,7 @@ package inr.numass.control
import hep.dataforge.control.devices.AbstractDevice
import hep.dataforge.nullable
import hep.dataforge.storage.StorageConnection
import hep.dataforge.storage.TableLoader
import hep.dataforge.storage.tables.TableLoader
import hep.dataforge.values.Values
import kotlinx.coroutines.Dispatchers

View File

@ -2,6 +2,7 @@ package inr.numass.control.readvac
import hep.dataforge.control.devices.Sensor
import hep.dataforge.control.devices.Sensor.Companion.RESULT_VALUE
import kotlinx.coroutines.runBlocking
import org.apache.commons.cli.DefaultParser
import org.apache.commons.cli.HelpFormatter
import org.apache.commons.cli.Options

View File

@ -5,6 +5,10 @@
*/
package inr.numass.control.readvac
import hep.dataforge.connections.Connection
import hep.dataforge.connections.RoleDef
import hep.dataforge.context.Context
import hep.dataforge.context.launch
import hep.dataforge.control.collectors.RegularPointCollector
import hep.dataforge.control.connections.Roles
import hep.dataforge.control.devices.Device
@ -12,9 +16,23 @@ import hep.dataforge.control.devices.DeviceHub
import hep.dataforge.control.devices.DeviceListener
import hep.dataforge.control.devices.PortSensor.Companion.CONNECTED_STATE
import hep.dataforge.control.devices.Sensor
import hep.dataforge.description.ValueDef
import hep.dataforge.exceptions.ControlException
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.states.StateDef
import hep.dataforge.storage.StorageConnection
import hep.dataforge.storage.tables.TableLoader
import hep.dataforge.storage.tables.createTable
import hep.dataforge.tables.TableFormatBuilder
import hep.dataforge.utils.DateTimeUtils
import hep.dataforge.values.Value
import hep.dataforge.values.ValueMap
import hep.dataforge.values.ValueType
import hep.dataforge.values.Values
import inr.numass.control.DeviceView
import inr.numass.control.StorageHelper
import kotlinx.coroutines.time.delay
import java.time.Duration
import java.time.Instant
import java.util.*
@ -77,7 +95,8 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
val suffix = DateTimeUtils.fileSuffix()
return LoaderFactory.buildPointLoader(connection.storage, "vactms_$suffix", "", "timestamp", format.build())
return connection.storage.createTable("vactms_$suffix", format.build())
//LoaderFactory.buildPointLoader(connection.storage, "vactms_$suffix", "", "timestamp", format.build())
}
override fun connectAll(connection: Connection, vararg roles: String) {
@ -92,7 +111,7 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
private fun notifyResult(values: Values, timestamp: Instant = Instant.now()) {
super.notifyResult(values,timestamp)
super.notifyResult(values, timestamp)
helper.push(values)
}
@ -112,7 +131,7 @@ class VacCollectorDevice(context: Context, meta: Meta, val sensors: Collection<S
while (true) {
notifyMeasurementState(MeasurementState.IN_PROGRESS)
sensors.forEach { sensor ->
if (sensor.states.getBoolean(CONNECTED_STATE,false)) {
if (sensor.states.getBoolean(CONNECTED_STATE, false)) {
sensor.measure()
}
}

View File

@ -1,50 +1,12 @@
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.3'
}
}
apply plugin: 'com.google.protobuf'
description = "A bse package with minimal dependencies for numass"
dependencies {
compile project(":numass-core:numass-data-api")
compile project(":numass-core:numass-data-proto")
compile "hep.dataforge:dataforge-storage"
compile "hep.dataforge:dataforge-json"
compile 'com.google.protobuf:protobuf-java:3.5.0'
// https://mvnrepository.com/artifact/com.github.robtimus/sftp-fs
compile group: 'com.github.robtimus', name: 'sftp-fs', version: '1.1.3'
}
protobuf {
// Configure the protoc executable
protoc {
// Download from repositories
artifact = 'com.google.protobuf:protoc:3.5.0'
}
generatedFilesBaseDir = "$projectDir/gen"
}
compileKotlin {
dependsOn(':numass-core:generateProto')
}
sourceSets {
main.kotlin.srcDirs += 'gen/main/java'
}
clean {
delete protobuf.generatedFilesBaseDir
}
idea {
module {
sourceDirs += file("${protobuf.generatedFilesBaseDir}/main/java");
sourceDirs += file("src/main/proto")
}
}

View File

@ -0,0 +1,20 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
idea
kotlin("jvm")
}
repositories {
mavenCentral()
}
dependencies {
compile(kotlin("stdlib-jdk8"))
compile("hep.dataforge:dataforge-core")
}
tasks.withType<KotlinCompile> {
kotlinOptions.jvmTarget = "1.8"
}

View File

@ -1,6 +1,21 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.data.api
import inr.numass.data.channel
import java.io.Serializable
import java.time.Duration
import java.time.Instant
@ -62,6 +77,8 @@ interface NumassBlock {
* Stream of frames. Could be empty
*/
val frames: Stream<NumassFrame>
val channel: Int get() = 0
}
fun OrphanNumassEvent.adopt(parent: NumassBlock): NumassEvent {

View File

@ -1,12 +1,24 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.data.api
import hep.dataforge.io.envelopes.Envelope
import hep.dataforge.meta.Metoid
import hep.dataforge.providers.Provider
import hep.dataforge.providers.Provides
import inr.numass.data.channel
import inr.numass.data.storage.ClassicNumassPoint
import inr.numass.data.storage.ProtoNumassPoint
import java.time.Duration
import java.time.Instant
import java.util.stream.Stream
@ -119,13 +131,5 @@ interface NumassPoint : Metoid, ParentBlock, Provider {
const val LENGTH_KEY = "length"
const val HV_KEY = "voltage"
const val INDEX_KEY = "index"
fun read(envelope: Envelope): NumassPoint {
return if (envelope.dataType?.startsWith("numass.point.classic") ?: envelope.meta.hasValue("split")) {
ClassicNumassPoint(envelope)
} else {
ProtoNumassPoint.fromEnvelope(envelope)
}
}
}
}

View File

@ -0,0 +1,48 @@
import com.google.protobuf.gradle.GenerateProtoTask
import com.google.protobuf.gradle.protobuf
import com.google.protobuf.gradle.protoc
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
idea
kotlin("jvm")
id("com.google.protobuf") version "0.8.7"
}
repositories {
mavenCentral()
}
dependencies {
compile(kotlin("stdlib-jdk8"))
compile("com.google.protobuf:protobuf-java:3.6.1")
compile(project(":numass-core:numass-data-api"))
compile("hep.dataforge:dataforge-storage")
}
tasks.withType<KotlinCompile> {
kotlinOptions.jvmTarget = "1.8"
dependsOn(":numass-core:numass-data-proto:generateProto")
}
sourceSets{
create("proto"){
proto {
srcDir("src/main/proto")
}
}
}
protobuf {
// Configure the protoc executable
protoc {
// Download from repositories
artifact = "com.google.protobuf:protoc:3.6.1"
}
generatedFilesBaseDir = "$projectDir/gen"
}
//tasks.getByName("clean").doLast{
// delete(protobuf.protobuf.generatedFilesBaseDir)
//}

View File

@ -1,4 +1,20 @@
package inr.numass
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.data
import hep.dataforge.io.envelopes.*
import hep.dataforge.values.Value
@ -105,7 +121,7 @@ class NumassEnvelopeType : EnvelopeType {
val buffer = it.map(FileChannel.MapMode.READ_ONLY, 0, 6)
when {
//TODO use templates from appropriate types
buffer.get(0) == '#'.toByte() && buffer.get(1) == '!'.toByte() -> NumassEnvelopeType.INSTANCE
buffer.get(0) == '#'.toByte() && buffer.get(1) == '!'.toByte() -> INSTANCE
buffer.get(0) == '#'.toByte() && buffer.get(1) == '!'.toByte() &&
buffer.get(4) == 'T'.toByte() && buffer.get(5) == 'L'.toByte() -> TaglessEnvelopeType.INSTANCE
buffer.get(0) == '#'.toByte() && buffer.get(1) == '~'.toByte() -> DefaultEnvelopeType.INSTANCE

View File

@ -0,0 +1,52 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.data
import hep.dataforge.meta.Meta
import hep.dataforge.storage.files.MutableFileEnvelope
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
class NumassFileEnvelope(path: Path) : MutableFileEnvelope(path) {
private val tag by lazy { Files.newByteChannel(path, StandardOpenOption.READ).use { NumassEnvelopeType.LegacyTag().read(it) } }
override val dataOffset: Long by lazy { (tag.length + tag.metaSize).toLong() }
override var dataLength: Int
get() = tag.dataSize
set(value) {
if (value > Int.MAX_VALUE) {
throw RuntimeException("Too large data block")
}
tag.dataSize = value
if (channel.write(tag.toBytes(), 0L) < tag.length) {
throw error("Tag is not overwritten.")
}
}
override val meta: Meta by lazy {
val buffer = ByteBuffer.allocate(tag.metaSize).also {
channel.read(it, tag.length.toLong())
}
tag.metaType.reader.readBuffer(buffer)
}
}

View File

@ -1,19 +1,34 @@
package inr.numass.data.storage
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.data
import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.io.envelopes.Envelope
import hep.dataforge.meta.Meta
import inr.numass.data.NumassProto
import inr.numass.data.api.*
import inr.numass.data.dataStream
import inr.numass.data.legacy.NumassFileEnvelope
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.nio.file.Path
import java.time.Duration
import java.time.Instant
import java.util.stream.IntStream
import java.util.stream.Stream
import java.util.zip.Inflater
import kotlin.streams.toList
/**
@ -61,17 +76,48 @@ class ProtoNumassPoint(override val meta: Meta, val protoBuilder: () -> NumassPr
return fromEnvelope(NumassFileEnvelope(path))
}
/**
* Get valid data stream utilizing compression if it is present
*/
private fun Envelope.dataStream(): InputStream = if (this.meta.getString("compression", "none") == "zlib") {
//TODO move to new type of data
val inflatter = Inflater()
val array: ByteArray = with(data.buffer) {
if (hasArray()) {
array()
} else {
ByteArray(this.limit()).also {
this.position(0)
get(it)
}
}
}
inflatter.setInput(array)
val bos = ByteArrayOutputStream()
val buffer = ByteArray(8192)
while (!inflatter.finished()) {
val size = inflatter.inflate(buffer)
bos.write(buffer, 0, size)
}
val unzippeddata = bos.toByteArray()
inflatter.end()
ByteArrayInputStream(unzippeddata)
} else {
this.data.stream
}
fun fromEnvelope(envelope: Envelope): ProtoNumassPoint {
return ProtoNumassPoint(envelope.meta) {
envelope.dataStream.use {
envelope.dataStream().use {
NumassProto.Point.parseFrom(it)
}
}
}
fun readFile(path: String, context: Context = Global): ProtoNumassPoint {
return readFile(context.getFile(path).absolutePath)
}
// fun readFile(path: String, context: Context = Global): ProtoNumassPoint {
// return readFile(context.getFile(path).absolutePath)
// }
fun ofEpochNanos(nanos: Long): Instant {
val seconds = Math.floorDiv(nanos, 1e9.toInt().toLong())
@ -81,7 +127,7 @@ class ProtoNumassPoint(override val meta: Meta, val protoBuilder: () -> NumassPr
}
}
class ProtoBlock(val channel: Int, private val block: NumassProto.Point.Channel.Block, val parent: NumassPoint? = null) : NumassBlock {
class ProtoBlock(override val channel: Int, private val block: NumassProto.Point.Channel.Block, val parent: NumassPoint? = null) : NumassBlock {
override val startTime: Instant
get() = ProtoNumassPoint.ofEpochNanos(block.time)

View File

@ -0,0 +1,33 @@
syntax = "proto3";
package inr.numass.data;
message Point {
// A single channel for multichannel detector readout
message Channel {
//A continuous measurement block
message Block {
// Raw data frame
message Frame {
uint64 time = 1; // Time in nanos from the beginning of the block
bytes data = 2; // Frame data as an array of int16 mesured in arbitrary channels
}
// Event block obtained directly from device of from frame analysis
// In order to save space, times and amplitudes are in separate arrays.
// Amplitude and time with the same index correspond to the same event
message Events {
repeated uint64 times = 1; // Array of time in nanos from the beginning of the block
repeated uint64 amplitudes = 2; // Array of amplitudes of events in channels
}
uint64 time = 1; // Block start in epoch nanos
repeated Frame frames = 2; // Frames array
Events events = 3; // Events array
uint64 length = 4; // block size in nanos. If missing, take from meta.
uint64 bin_size = 5; // tick size in nanos. Obsolete, to be removed
}
uint64 id = 1; // The number of measuring channel
repeated Block blocks = 2; // Blocks
}
repeated Channel channels = 1; // Array of measuring channels
}

View File

@ -1,15 +1,27 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.data
import hep.dataforge.io.envelopes.Envelope
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import inr.numass.data.api.*
import inr.numass.data.storage.ProtoBlock
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.InputStream
import inr.numass.data.storage.ClassicNumassPoint
import java.util.stream.Collectors
import java.util.zip.Inflater
import kotlin.streams.asSequence
import kotlin.streams.toList
@ -42,47 +54,16 @@ object NumassDataUtils {
fun adapter(): SpectrumAdapter {
return SpectrumAdapter("Uset", "CR", "CRerr", "Time")
}
fun read(envelope: Envelope): NumassPoint {
return if (envelope.dataType?.startsWith("numass.point.classic") ?: envelope.meta.hasValue("split")) {
ClassicNumassPoint(envelope)
} else {
ProtoNumassPoint.fromEnvelope(envelope)
}
}
}
/**
* Get valid data stream utilizing compression if it is present
*/
val Envelope.dataStream: InputStream
get() = if (this.meta.getString("compression", "none") == "zlib") {
//TODO move to new type of data
val inflatter = Inflater()
val array: ByteArray = with(data.buffer) {
if (hasArray()) {
array()
} else {
ByteArray(this.limit()).also {
this.position(0)
get(it)
}
}
}
inflatter.setInput(array)
val bos = ByteArrayOutputStream()
val buffer = ByteArray(8192)
while (!inflatter.finished()) {
val size = inflatter.inflate(buffer)
bos.write(buffer, 0, size)
}
val unzippeddata = bos.toByteArray()
inflatter.end()
ByteArrayInputStream(unzippeddata)
} else {
this.data.stream
}
val NumassBlock.channel: Int
get() = if (this is ProtoBlock) {
this.channel
} else {
0
}
suspend fun NumassBlock.transformChain(transform: (NumassEvent, NumassEvent) -> Pair<Short, Long>?): NumassBlock {
return SimpleBlock.produce(this.startTime, this.length) {
this.events.asSequence()

View File

@ -1,70 +0,0 @@
package inr.numass.data.legacy
import hep.dataforge.meta.Meta
import hep.dataforge.storage.files.MutableFileEnvelope
import inr.numass.NumassEnvelopeType
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
class NumassFileEnvelope(path: Path) : MutableFileEnvelope(path) {
private val tag by lazy { Files.newByteChannel(path, StandardOpenOption.READ).use { NumassEnvelopeType.LegacyTag().read(it) } }
override val dataOffset: Long by lazy { (tag.length + tag.metaSize).toLong() }
override var dataLength: Int
get() = tag.dataSize
set(value) {
if (value > Int.MAX_VALUE) {
throw RuntimeException("Too large data block")
}
tag.dataSize = value
if (channel.write(tag.toBytes(), 0L) < tag.length) {
throw error("Tag is not overwritten.")
}
}
override val meta: Meta by lazy {
val buffer = ByteBuffer.allocate(tag.metaSize).also {
channel.read(it, tag.length.toLong())
}
tag.metaType.reader.readBuffer(buffer)
}
}
//
// override fun buildTag(): EnvelopeTag {
// return NumassEnvelopeType.LegacyTag()
// }
//
// companion object {
//
// val LEGACY_START_SEQUENCE = byteArrayOf('#'.toByte(), '!'.toByte())
// val LEGACY_END_SEQUENCE = byteArrayOf('!'.toByte(), '#'.toByte(), '\r'.toByte(), '\n'.toByte())
//
// fun open(path: Path, readOnly: Boolean): MutableFileEnvelope {
// // if (!Files.exists(path)) {
// // throw new RuntimeException("File envelope does not exist");
// // }
//
// try {
// FileChannel.open(path, READ).use { channel ->
// val buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 2)
// return if (buffer.compareTo(ByteBuffer.wrap(LEGACY_START_SEQUENCE)) == 0) {
// NumassFileEnvelope(path, readOnly)
// } else {
// MutableFileEnvelope.open(path, readOnly)
// }
// }
// } catch (e: IOException) {
// throw RuntimeException("Failed to open file envelope", e)
// }
//
// }
// }

View File

@ -2,11 +2,11 @@ package inr.numass.data.storage
import hep.dataforge.io.envelopes.Envelope
import hep.dataforge.meta.Meta
import inr.numass.data.NumassFileEnvelope
import inr.numass.data.api.NumassBlock
import inr.numass.data.api.NumassEvent
import inr.numass.data.api.NumassFrame
import inr.numass.data.api.NumassPoint
import inr.numass.data.legacy.NumassFileEnvelope
import org.slf4j.LoggerFactory
import java.io.IOException
import java.nio.ByteBuffer

View File

@ -25,7 +25,8 @@ import hep.dataforge.storage.Loader
import hep.dataforge.storage.StorageElement
import hep.dataforge.storage.files.FileStorageElement
import hep.dataforge.tables.Table
import inr.numass.NumassEnvelopeType
import inr.numass.data.NumassDataUtils
import inr.numass.data.NumassEnvelopeType
import inr.numass.data.api.NumassPoint
import inr.numass.data.api.NumassSet
import org.slf4j.LoggerFactory
@ -93,7 +94,7 @@ class NumassDataLoader(
override val points: List<NumassPoint>
get() = pointEnvelopes.map {
NumassPoint.read(it)
NumassDataUtils.read(it)
}

View File

@ -23,7 +23,7 @@ import hep.dataforge.meta.Meta
import hep.dataforge.storage.StorageElement
import hep.dataforge.storage.files.FileStorage
import hep.dataforge.storage.files.FileStorageElement
import inr.numass.NumassEnvelopeType
import inr.numass.data.NumassEnvelopeType
import kotlinx.coroutines.runBlocking
import java.nio.file.Files
import java.nio.file.Path

View File

@ -1 +1 @@
inr.numass.NumassEnvelopeType
inr.numass.data.NumassEnvelopeType

View File

@ -21,10 +21,10 @@ import hep.dataforge.description.Descriptors
import hep.dataforge.meta.buildMeta
import hep.dataforge.plots.data.DataPlot
import inr.numass.NumassPlugin
import inr.numass.data.ProtoNumassPoint
import inr.numass.data.analyzers.NumassAnalyzer.Companion.AMPLITUDE_ADAPTER
import inr.numass.data.analyzers.SmartAnalyzer
import inr.numass.data.analyzers.withBinning
import inr.numass.data.storage.ProtoNumassPoint
import inr.numass.displayChart
import java.nio.file.Paths

View File

@ -12,7 +12,6 @@ import inr.numass.data.analyzers.TimeAnalyzer
import inr.numass.data.api.NumassPoint
import inr.numass.data.api.NumassSet
import inr.numass.data.api.SimpleNumassPoint
import inr.numass.data.channel
import inr.numass.data.storage.NumassDirectory
fun main(args: Array<String>) {

View File

@ -1,8 +1,7 @@
package inr.numass.scripts.tristan
import inr.numass.data.channel
import inr.numass.data.ProtoNumassPoint
import inr.numass.data.plotAmplitudeSpectrum
import inr.numass.data.storage.ProtoNumassPoint
import inr.numass.data.transformChain
import kotlinx.coroutines.runBlocking
import java.io.File

View File

@ -20,7 +20,6 @@ import hep.dataforge.context.Global
import hep.dataforge.storage.files.FileStorage
import hep.dataforge.toList
import inr.numass.data.api.NumassPoint
import inr.numass.data.channel
import inr.numass.data.storage.NumassDataLoader
import inr.numass.data.storage.NumassDirectory

View File

@ -1,11 +1,10 @@
package inr.numass.scripts.tristan
import inr.numass.data.ProtoNumassPoint
import inr.numass.data.api.MetaBlock
import inr.numass.data.api.NumassBlock
import inr.numass.data.api.NumassPoint
import inr.numass.data.channel
import inr.numass.data.plotAmplitudeSpectrum
import inr.numass.data.storage.ProtoNumassPoint
import java.io.File

View File

@ -8,8 +8,8 @@ import hep.dataforge.fx.runGoal
import hep.dataforge.fx.ui
import hep.dataforge.storage.Storage
import inr.numass.NumassProperties
import inr.numass.data.api.NumassPoint
import inr.numass.data.legacy.NumassFileEnvelope
import inr.numass.data.NumassDataUtils
import inr.numass.data.NumassFileEnvelope
import inr.numass.data.storage.NumassDataLoader
import inr.numass.data.storage.NumassDirectory
import javafx.beans.property.SimpleObjectProperty
@ -183,7 +183,7 @@ class MainView(val context: Context = Global.getContext("viewer")) : View(title
envelope?.let {
if (it.meta.hasMeta("external_meta")) {
//try to read as point
val point = NumassPoint.read(it)
val point = NumassDataUtils.read(it)
runLater {
contentView = AmplitudeView().apply {
set(path.fileName.toString(), CachedPoint(point))

View File

@ -8,7 +8,8 @@ import hep.dataforge.fx.ui
import hep.dataforge.plots.PlotGroup
import hep.dataforge.plots.data.DataPlot
import hep.dataforge.plots.jfreechart.JFreeChartFrame
import hep.dataforge.storage.TableLoader
import hep.dataforge.storage.tables.TableLoader
import hep.dataforge.storage.tables.asTable
import hep.dataforge.tables.Adapters
import hep.dataforge.tables.Table
import javafx.collections.FXCollections
@ -68,7 +69,7 @@ class SlowControlView : View(title = "Numass slow control view", icon = ImageVie
private suspend fun getData(loader: TableLoader): Table {
//TODO add query
return loader.asTable()
return loader.asTable().await()
}
operator fun set(id: String, loader: TableLoader) {

View File

@ -6,8 +6,8 @@ import hep.dataforge.meta.Meta
import hep.dataforge.meta.Metoid
import hep.dataforge.names.AlphanumComparator
import hep.dataforge.storage.Storage
import hep.dataforge.storage.TableLoader
import hep.dataforge.storage.files.FileTableLoader
import hep.dataforge.storage.tables.TableLoader
import inr.numass.data.api.NumassPoint
import inr.numass.data.api.NumassSet
import inr.numass.data.storage.NumassDataLoader

View File

@ -11,10 +11,15 @@ include ":numass-control:dante"
include ":numass-main"
//
include ":numass-core"
include 'numass-core:numass-data-api'
include 'numass-core:numass-data-proto'
//include ":numass-server"
//
include ":numass-viewer"
if(file("../dataforge").exists()){
if (file("../dataforge").exists()) {
includeBuild('../dataforge')
}