Meta viewer and other minor fixed
This commit is contained in:
parent
cb2969a337
commit
32b9a44a7f
@ -16,21 +16,39 @@
|
|||||||
|
|
||||||
package inr.numass.control.dante
|
package inr.numass.control.dante
|
||||||
|
|
||||||
import kotlinx.coroutines.experimental.Job
|
|
||||||
import kotlinx.coroutines.experimental.channels.Channel
|
|
||||||
import kotlinx.coroutines.experimental.channels.ReceiveChannel
|
|
||||||
import kotlinx.coroutines.experimental.channels.produce
|
|
||||||
import java.io.InputStream
|
|
||||||
import kotlin.coroutines.experimental.buildSequence
|
import kotlin.coroutines.experimental.buildSequence
|
||||||
|
|
||||||
|
internal val Byte.positive
|
||||||
|
get() = toInt() and 0xFF
|
||||||
|
|
||||||
|
internal val Int.positive
|
||||||
|
get() = toLong() and 0xFFFFFFFF
|
||||||
|
|
||||||
|
internal val Int.byte: Byte
|
||||||
|
get() {
|
||||||
|
if (this >= 256) {
|
||||||
|
throw RuntimeException("Number less than 256 is expected")
|
||||||
|
} else {
|
||||||
|
return toByte()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal val Long.int: Int
|
||||||
|
get() {
|
||||||
|
if (this >= 0xFFFFFFFF) {
|
||||||
|
throw RuntimeException("Number less than 256 is expected")
|
||||||
|
} else {
|
||||||
|
return toInt()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal val ByteArray.hex
|
||||||
|
get() = this.joinToString(separator = "") { it.positive.toString(16).padStart(2, '0') }
|
||||||
|
|
||||||
object Communications {
|
object Communications {
|
||||||
|
|
||||||
val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE)
|
val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE)
|
||||||
|
|
||||||
private val Byte.positive
|
|
||||||
get() = toInt() and 0xFF
|
|
||||||
|
|
||||||
enum class Register(val code: Int) {
|
enum class Register(val code: Int) {
|
||||||
FIRMWARE_VERSION(0),
|
FIRMWARE_VERSION(0),
|
||||||
DPP_REGISTER_1(1),
|
DPP_REGISTER_1(1),
|
||||||
@ -61,14 +79,16 @@ object Communications {
|
|||||||
SINGLE_SPECTRUM_MODE(2),
|
SINGLE_SPECTRUM_MODE(2),
|
||||||
MAP_MODE(6),
|
MAP_MODE(6),
|
||||||
LIST_MODE(4),
|
LIST_MODE(4),
|
||||||
WAVEFORM_MODE(3)
|
WAVEFORM_MODE(3);
|
||||||
|
|
||||||
|
val long = byte.toLong()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build command header
|
* Build command header
|
||||||
*/
|
*/
|
||||||
fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Register, length: Byte): ByteArray {
|
fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Byte, length: Byte): ByteArray {
|
||||||
assert(command in listOf(CommandType.READ, CommandType.WRITE))
|
assert(command in listOf(CommandType.READ, CommandType.WRITE))
|
||||||
assert(board in 0..255)
|
assert(board in 0..255)
|
||||||
assert(packet in 0..255)
|
assert(packet in 0..255)
|
||||||
@ -79,7 +99,7 @@ object Communications {
|
|||||||
header[2] = command.byte
|
header[2] = command.byte
|
||||||
header[3] = board
|
header[3] = board
|
||||||
header[4] = packet
|
header[4] = packet
|
||||||
header[5] = start.code.toByte()
|
header[5] = start
|
||||||
header[6] = length
|
header[6] = length
|
||||||
return header
|
return header
|
||||||
}
|
}
|
||||||
@ -101,49 +121,53 @@ object Communications {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Create DANTE command and stuff it.
|
* Create DANTE command and stuff it.
|
||||||
|
* @param length size of data array/4
|
||||||
*/
|
*/
|
||||||
fun wrapCommand(command: CommandType, board: Byte, packet: Byte, start: Register, data: ByteArray): ByteArray {
|
fun wrapCommand(command: CommandType, board: Int, packet: Int, start: Int, length: Int, data: ByteArray): ByteArray {
|
||||||
when (command) {
|
if (command == CommandType.READ) {
|
||||||
CommandType.READ -> assert(data.isEmpty())
|
assert(data.isEmpty())
|
||||||
CommandType.WRITE -> assert(data.size % 4 == 0)
|
} else {
|
||||||
else -> throw RuntimeException("Command $command not expected")
|
assert(data.size % 4 == 0)
|
||||||
|
assert(length == data.size / 4)
|
||||||
}
|
}
|
||||||
|
|
||||||
val length: Byte = (data.size / 4).toByte()
|
val header = buildHeader(command, board.byte, packet.byte, start.byte, length.byte)
|
||||||
val header = buildHeader(command, board, packet, start, length)
|
|
||||||
|
|
||||||
val res = (header + data).escape()
|
val res = (header + data).escape()
|
||||||
return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte())
|
return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte())
|
||||||
}
|
}
|
||||||
|
|
||||||
data class DanteMessage(val command: CommandType, val board: Byte, val packet: Byte, val payload: ByteArray) {
|
data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) {
|
||||||
override fun toString(): String {
|
override fun toString(): String {
|
||||||
return "${command.name}[$board, $packet]: ${payload.size}"
|
return "${command.name}[$board, $packet]: ${payload.size}"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// /**
|
||||||
* Read the stream and return resulting messages in ReceiveChannel
|
// * Read the stream and return resulting messages in ReceiveChannel
|
||||||
*/
|
// */
|
||||||
fun readStream(stream: InputStream, parent: Job): ReceiveChannel<DanteMessage> {
|
// fun readStream(stream: InputStream, parent: Job): ReceiveChannel<DanteMessage> {
|
||||||
return produce(capacity = Channel.UNLIMITED, parent = parent) {
|
// return produce(capacity = Channel.UNLIMITED, parent = parent) {
|
||||||
while (true) {
|
// while (true) {
|
||||||
val first = stream.read()
|
// if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) {
|
||||||
if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) {
|
// // second check is not executed unless first one is true
|
||||||
// second check is not executed unless first one is false
|
// val header = ByteArray(6)
|
||||||
val header = ByteArray(6)
|
// stream.read(header)
|
||||||
stream.read(header)
|
// val command = CommandType.values().find { it.byte == header[0] }!!
|
||||||
val command = CommandType.values().find { it.byte == header[0] }!!
|
// val board = header[1]
|
||||||
val board = header[1]
|
// val packet = header[2]
|
||||||
val packet = header[2]
|
// val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive
|
||||||
val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive
|
// val payload = ByteArray(length)
|
||||||
val payload = ByteArray(length)
|
// stream.read(payload)
|
||||||
stream.read(payload)
|
// send(DanteMessage(command, board.positive, packet.positive, payload))
|
||||||
send(DanteMessage(command, board, packet, payload))
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
|
||||||
|
// fun readSocket(socket: Socket, parent: Job): ReceiveChannel<DanteMessage> {
|
||||||
|
// return readStream(socket.getInputStream(), parent)
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
|
|
||||||
|
@ -16,40 +16,99 @@
|
|||||||
|
|
||||||
package inr.numass.control.dante
|
package inr.numass.control.dante
|
||||||
|
|
||||||
|
import hep.dataforge.kodex.buildMeta
|
||||||
|
import hep.dataforge.kodex.orElse
|
||||||
|
import hep.dataforge.meta.Meta
|
||||||
import inr.numass.control.dante.Communications.CommandType.*
|
import inr.numass.control.dante.Communications.CommandType.*
|
||||||
import kotlinx.coroutines.experimental.CancellationException
|
import inr.numass.control.dante.Communications.Register.*
|
||||||
import kotlinx.coroutines.experimental.Job
|
import inr.numass.data.NumassProto
|
||||||
|
import inr.numass.data.api.NumassPoint
|
||||||
|
import inr.numass.data.storage.ProtoNumassPoint
|
||||||
|
import kotlinx.coroutines.experimental.*
|
||||||
import kotlinx.coroutines.experimental.channels.Channel
|
import kotlinx.coroutines.experimental.channels.Channel
|
||||||
import kotlinx.coroutines.experimental.launch
|
|
||||||
import kotlinx.coroutines.experimental.runBlocking
|
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.io.OutputStream
|
import java.io.OutputStream
|
||||||
|
import java.lang.Math.pow
|
||||||
import java.net.Socket
|
import java.net.Socket
|
||||||
|
import java.nio.ByteBuffer
|
||||||
|
import java.nio.ByteOrder
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.*
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import kotlin.collections.HashMap
|
||||||
|
import kotlin.coroutines.experimental.buildSequence
|
||||||
|
import kotlin.math.ceil
|
||||||
|
|
||||||
//TODO implement using Device
|
//TODO implement using Device
|
||||||
class DanteClient(val ip: String) : AutoCloseable {
|
class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
|
||||||
private val RESET_COMMAND = arrayOf(0xDD, 0x55, 0xDD, 0xEE)
|
private val RESET_COMMAND = byteArrayOf(0xDD.toByte(), 0x55, 0xDD.toByte(), 0xEE.toByte())
|
||||||
|
|
||||||
private val logger = LoggerFactory.getLogger(javaClass)
|
private val logger = LoggerFactory.getLogger(javaClass)
|
||||||
|
|
||||||
private val packetNumber = AtomicLong(0)
|
private val packetNumber = AtomicLong(0)
|
||||||
|
|
||||||
private var parentJob = Job()
|
private lateinit var parentJob: Job
|
||||||
|
private val pool = newFixedThreadPoolContext(8, "Dante")
|
||||||
|
|
||||||
private val connections: MutableMap<Int, Pair<Socket, Job>> = HashMap()
|
private val connections: MutableMap<Int, Pair<Socket, Job>> = HashMap()
|
||||||
|
|
||||||
private val commandChannel = Channel<ByteArray>(capacity = Channel.UNLIMITED)
|
private val sendChannel = Channel<ByteArray>(capacity = Channel.UNLIMITED)
|
||||||
private lateinit var output: OutputStream
|
private lateinit var output: OutputStream
|
||||||
private lateinit var outputJob: Job
|
private lateinit var outputJob: Job
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Synchronous reading and writing of registers
|
||||||
|
*/
|
||||||
|
private val comChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED)
|
||||||
|
private val dataChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED)
|
||||||
|
//private val statisticsChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param num number
|
||||||
|
* @param name the name of the board
|
||||||
|
*/
|
||||||
|
private data class BoardState(val num: Int, var meta: Meta? = null)
|
||||||
|
|
||||||
|
private val boards = (0 until chainLength).map { BoardState(it) }
|
||||||
|
|
||||||
|
|
||||||
fun open() {
|
fun open() {
|
||||||
|
//create a new parent job
|
||||||
|
this.parentJob = Job()
|
||||||
(0..3).forEach {
|
(0..3).forEach {
|
||||||
openPort(it)
|
openPort(it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
//TODO send termination signal
|
||||||
|
connections.values.forEach {
|
||||||
|
it.first.close()
|
||||||
|
it.second.cancel()
|
||||||
|
}
|
||||||
|
parentJob.cancel(CancellationException("Server stopped"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disconnect and reconnect without clearing configuration
|
||||||
|
*/
|
||||||
|
fun reconnect() {
|
||||||
|
close()
|
||||||
|
runBlocking {
|
||||||
|
clearData()
|
||||||
|
}
|
||||||
|
open()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset everything
|
||||||
|
*/
|
||||||
|
fun reset() {
|
||||||
|
async {
|
||||||
|
sendChannel.send(RESET_COMMAND)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private fun openPort(port: Int) {
|
private fun openPort(port: Int) {
|
||||||
//closing existing connection
|
//closing existing connection
|
||||||
connections[port]?.let {
|
connections[port]?.let {
|
||||||
@ -59,102 +118,315 @@ class DanteClient(val ip: String) : AutoCloseable {
|
|||||||
|
|
||||||
val socket = Socket(ip, 8000 + port)
|
val socket = Socket(ip, 8000 + port)
|
||||||
|
|
||||||
|
logger.info("Opened socket {}", socket.inetAddress)
|
||||||
|
|
||||||
//Create command queue on port 0
|
//Create command queue on port 0
|
||||||
if (port == 0) {
|
if (port == 0) {
|
||||||
outputJob.cancel()
|
outputJob.cancel()
|
||||||
output = socket.getOutputStream()
|
output = socket.getOutputStream()
|
||||||
outputJob = launch(parent = parentJob) {
|
outputJob = launch(context = pool, parent = parentJob) {
|
||||||
output.write(commandChannel.receive())
|
val command = sendChannel.receive()
|
||||||
|
output.write(command)
|
||||||
|
logger.debug("Sent {}", command.hex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val channel = Communications.readStream(socket.getInputStream(), parentJob)
|
|
||||||
|
|
||||||
val job = launch {
|
val job = launch(context = pool, parent = parentJob) {
|
||||||
|
val stream = socket.getInputStream()
|
||||||
while (true) {
|
while (true) {
|
||||||
handle(channel.receive())
|
if (stream.read() == Communications.PACKET_HEADER_START_BYTES[0] && stream.read() == Communications.PACKET_HEADER_START_BYTES[1]) {
|
||||||
//TODO handle errors and reconnect
|
// second check is not executed unless first one is true
|
||||||
|
val header = ByteArray(6)
|
||||||
|
stream.read(header)
|
||||||
|
val command = Communications.CommandType.values().find { it.byte == header[0] }!!
|
||||||
|
val board = header[1]
|
||||||
|
val packet = header[2]
|
||||||
|
val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive
|
||||||
|
val payload = ByteArray(length)
|
||||||
|
stream.read(payload)
|
||||||
|
handle(Communications.DanteMessage(command, board.positive, packet.positive, payload))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//TODO handle errors and reconnect
|
||||||
|
}
|
||||||
|
|
||||||
connections[port] = Pair(socket, job)
|
connections[port] = Pair(socket, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun send(command: ByteArray) {
|
private suspend fun send(command: Communications.CommandType, board: Int, packet: Int, register: Int, data: ByteArray = ByteArray(0), length: Int = (data.size / 4)) {
|
||||||
runBlocking {
|
logger.debug("Sending {}[{}, {}] of size {}*4 to {}", command.name, board, packet, length, register)
|
||||||
commandChannel.send(command)
|
sendChannel.send(Communications.wrapCommand(command, board, packet, register, length, data))
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fun send(command: Communications.CommandType, board: Byte, packet: Byte, start: Communications.Register, data: ByteArray) {
|
private suspend fun handle(response: Communications.DanteMessage) {
|
||||||
send(Communications.wrapCommand(command, board, packet, start, data))
|
|
||||||
}
|
|
||||||
|
|
||||||
suspend fun handle(response: Communications.DanteMessage) {
|
|
||||||
logger.debug("Received {}", response.toString())
|
logger.debug("Received {}", response.toString())
|
||||||
when (response.command) {
|
when (response.command) {
|
||||||
READ -> TODO()
|
READ, WRITE -> comChannel.send(response)
|
||||||
WRITE -> TODO()
|
SINGLE_SPECTRUM_MODE, MAP_MODE, LIST_MODE, WAVEFORM_MODE -> dataChannel.send(response)
|
||||||
SINGLE_SPECTRUM_MODE -> TODO()
|
|
||||||
MAP_MODE -> TODO()
|
|
||||||
LIST_MODE -> TODO()
|
|
||||||
WAVEFORM_MODE -> TODO()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
async def __write_config(self, board, register, data, mask=None):
|
* Generate next packet number
|
||||||
assert isinstance(data, (bytes, bytearray))
|
|
||||||
data_len = len(data)
|
|
||||||
if mask:
|
|
||||||
assert isinstance(mask, (bytes, bytearray))
|
|
||||||
assert len(mask) == data_len
|
|
||||||
resp = await self.__send_message(
|
|
||||||
COMMANDS['READ'], board, register, data_len // 4)
|
|
||||||
|
|
||||||
data_bin = bitarray(endian='big')
|
|
||||||
data_bin.frombytes(data)
|
|
||||||
|
|
||||||
resp_payload = bitarray(endian='big')
|
|
||||||
resp_payload.frombytes(resp['payload'])
|
|
||||||
|
|
||||||
mask_bin = bitarray(endian='big')
|
|
||||||
mask_bin.frombytes(mask)
|
|
||||||
|
|
||||||
data_masked = data_bin & mask_bin
|
|
||||||
|
|
||||||
mask_bin.invert()
|
|
||||||
|
|
||||||
data_out = (data_masked | (resp_payload & mask_bin)).tobytes()
|
|
||||||
else:
|
|
||||||
data_out = data
|
|
||||||
|
|
||||||
return await self.__send_message(
|
|
||||||
COMMANDS['WRITE'], board, register, data_len // 4, data_out)
|
|
||||||
*/
|
*/
|
||||||
|
private fun nextPacket(): Int {
|
||||||
|
return (packetNumber.incrementAndGet() % 256 - 128).toInt()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun List<Long>.asBuffer(): ByteBuffer {
|
||||||
|
val buffer = ByteBuffer.allocate(this.size * 4)
|
||||||
|
buffer.order(ByteOrder.BIG_ENDIAN)
|
||||||
|
this.forEach { buffer.putInt(it.int) } //safe transform to 4-byte integer
|
||||||
|
return buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
private val ByteBuffer.bits: BitSet
|
||||||
|
get() = BitSet.valueOf(this)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write single or multiple registers
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
private suspend fun writeRegister(board: Int, register: Int, data: List<Long>, mask: Long? = null) {
|
||||||
|
//representing data as byte buffer
|
||||||
|
val buffer = if (mask != null) {
|
||||||
|
val oldData = readRegister(board, register, data.size)
|
||||||
|
//(oldData & not_mask) | (newData & mask);
|
||||||
|
(0..data.size).map { (oldData[it] and mask.inv()).or(data[it] and mask) }.asBuffer();
|
||||||
|
} else {
|
||||||
|
data.asBuffer()
|
||||||
|
}
|
||||||
|
|
||||||
|
send(WRITE, board, nextPacket(), register, buffer.array())
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun writeRegister(board: Int, register: Int, data: Long, mask: Long? = null) {
|
||||||
|
writeRegister(board, register, listOf(data), mask)
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun readRegister(board: Int, register: Int, length: Int = 1): List<Long> {
|
||||||
|
val packet = nextPacket()
|
||||||
|
send(READ, board, packet, register, length = length)
|
||||||
|
var message: Communications.DanteMessage? = null
|
||||||
|
//skip other responses
|
||||||
|
while (message == null || !(message.command == READ && message.packet == packet)) {
|
||||||
|
message = comChannel.receive()
|
||||||
|
}
|
||||||
|
|
||||||
|
return buildSequence {
|
||||||
|
val intBuffer = ByteBuffer.wrap(message.payload).asIntBuffer()
|
||||||
|
while (intBuffer.hasRemaining()) {
|
||||||
|
yield(intBuffer.get())
|
||||||
|
}
|
||||||
|
}.map { it.positive }.toList()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write a single DPP parameter
|
||||||
|
* @param register number of parameter register
|
||||||
|
* @param value value of the parameter as unsigned integer
|
||||||
|
*/
|
||||||
|
suspend fun writeDPP(board: Int, register: Int, value: Long) {
|
||||||
|
//sending register number and value in the same command
|
||||||
|
writeRegister(board, DPP_REGISTER_1.code, listOf(register.toLong(), value))
|
||||||
|
writeRegister(board, DPP_CONFIG_COMMIT_OFFSET.code, 0x00000001, 0x00000001)
|
||||||
|
}
|
||||||
|
|
||||||
|
// suspend fun writeDPP(board: Int, register: Int, value: Int) {
|
||||||
|
// writeDPP(board, register, value.toLong())
|
||||||
|
// }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure single board using provided meta
|
||||||
|
*/
|
||||||
|
suspend fun configureBoard(board: Int, meta: Meta) {
|
||||||
|
val gain = meta.getDouble("gain")
|
||||||
|
val det_thresh = meta.getInt("detection_thresold")
|
||||||
|
val pileup_thr = meta.getInt("pileup_thresold")
|
||||||
|
val en_fil_peak_time = meta.getInt("energy_filter.peaking_time")
|
||||||
|
val en_fil_flattop = meta.getInt("energy_filter.flat_top")
|
||||||
|
val fast_peak_time = meta.getInt("fast_filter.peaking_time")
|
||||||
|
val fast_flattop = meta.getInt("fast_filter.flat_top")
|
||||||
|
val recovery_time = meta.getValue("recovery_time").longValue()
|
||||||
|
val zero_peak_rate = meta.getInt("zero_peak_rate")
|
||||||
|
val inverted_input = meta.getInt("inverted_input")
|
||||||
|
|
||||||
|
assert(en_fil_peak_time in 1..511)
|
||||||
|
assert(gain in 0.01..(en_fil_peak_time * 2 - 0.01))
|
||||||
|
assert(det_thresh in 0..4096)
|
||||||
|
assert(pileup_thr in 0..4096)
|
||||||
|
assert(en_fil_flattop in 1..15)
|
||||||
|
assert(fast_peak_time in 1..31)
|
||||||
|
assert(fast_flattop in 1..31)
|
||||||
|
assert(recovery_time in (0.0..pow(2.0, 24.0) - 1))
|
||||||
|
assert(zero_peak_rate in 0..500)
|
||||||
|
assert(inverted_input in listOf(0, 1))
|
||||||
|
assert((en_fil_peak_time + en_fil_flattop) * 2 < 1023)
|
||||||
|
|
||||||
|
logger.info("Starting {} board configuration", board)
|
||||||
|
|
||||||
|
writeDPP(board, 128, inverted_input * (1L shl 24))
|
||||||
|
writeDPP(board, 162, recovery_time)
|
||||||
|
writeDPP(board, 181, 0)
|
||||||
|
writeDPP(board, 185, 0)
|
||||||
|
writeDPP(board, 175, 1)
|
||||||
|
writeDPP(board, 160, (pileup_thr / gain).toLong() + (1 shl 31))
|
||||||
|
writeDPP(board, 160, (pileup_thr / gain * 2).toLong() and (1 shl 31).inv()) //set bit 2 to zero
|
||||||
|
writeDPP(board, 152, (det_thresh / gain * fast_peak_time).toLong())
|
||||||
|
|
||||||
|
if (fast_flattop == 1) {
|
||||||
|
writeDPP(board, 154, 0)
|
||||||
|
} else {
|
||||||
|
writeDPP(board, 154, ceil(fast_flattop.toDouble() / 2).toLong()) // TODO check this
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zero_peak_rate == 0) {
|
||||||
|
writeDPP(board, 142, 0)
|
||||||
|
} else {
|
||||||
|
writeDPP(board, 142, (1.0 / zero_peak_rate / 10 * 1e6).toLong() + (1 shl 31))
|
||||||
|
}
|
||||||
|
|
||||||
|
writeDPP(board, 180, (fast_flattop + 1).toLong())
|
||||||
|
|
||||||
|
if ((2 * fast_peak_time + fast_flattop) > 4 * en_fil_flattop) {
|
||||||
|
writeDPP(board, 140, ((2 * fast_peak_time + fast_flattop) * 4 + 1).toLong())
|
||||||
|
} else {
|
||||||
|
writeDPP(board, 140, en_fil_flattop.toLong())
|
||||||
|
}
|
||||||
|
|
||||||
|
writeDPP(board, 141, (fast_peak_time + fast_flattop + 4).toLong())
|
||||||
|
writeDPP(board, 156, fast_peak_time + 0 * (1L shl 28))
|
||||||
|
writeDPP(board, 150, fast_flattop + 0 * (1L shl 28))
|
||||||
|
writeDPP(board, 149, en_fil_peak_time + 1 * (1L shl 28))
|
||||||
|
writeDPP(board, 150, en_fil_flattop + 1 * (1L shl 28))
|
||||||
|
writeDPP(board, 153, en_fil_peak_time * 2 + en_fil_flattop * 2 + 1 * (1L shl 28))
|
||||||
|
writeDPP(board, 184, (gain * (1 shl 24) / en_fil_peak_time).toLong() + 1 * (1L shl 28))
|
||||||
|
writeDPP(board, 148, 0b11)
|
||||||
|
writeDPP(board, 128, 0b100 + inverted_input * (1L shl 24))
|
||||||
|
writeDPP(board, 128, 1 + inverted_input * (1L shl 24))
|
||||||
|
|
||||||
|
logger.info("Finished {} board configuration", board)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure all boards
|
||||||
|
*/
|
||||||
|
suspend fun configureAll(meta: Meta) {
|
||||||
|
boards.forEach {
|
||||||
|
configureBoard(it.num, meta)
|
||||||
|
}
|
||||||
|
logger.info("Finished configuration of all actibe boards")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear unused data
|
||||||
|
*/
|
||||||
|
private suspend fun clearData() {
|
||||||
|
for (element in dataChannel) {
|
||||||
|
logger.warn("Dumping residual data packet {}", element.toString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private suspend fun clearCommunications() {
|
||||||
|
for (element in comChannel) {
|
||||||
|
logger.debug("Dumping residual communication packet {}", element.toString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle statistics asynchronously
|
||||||
|
*/
|
||||||
|
fun handleStatistics(channel: Int, message: ByteBuffer) {
|
||||||
|
logger.info("Received statistics packet from board {}", channel)
|
||||||
|
//TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gather data in list mode
|
||||||
|
* @param length measurement time in milliseconds
|
||||||
|
* @param statisticsPackets number of statistics packets per measurement
|
||||||
|
*/
|
||||||
|
suspend fun readPoint(length: Int, statisticsInterval: Int = 1000): NumassPoint {
|
||||||
|
clearData()
|
||||||
|
|
||||||
|
logger.info("Starting list point acquisition {} ms", length)
|
||||||
|
boards.forEach {
|
||||||
|
writeRegister(it.num, ACQUISITION_SETTINGS.code, Communications.AcquisitionMode.LIST_MODE.long, 0x00000007)
|
||||||
|
writeRegister(it.num, ACQUISITION_TIME.code, length.toLong())
|
||||||
|
writeRegister(it.num, TIME_PER_MAP_POINT.code, statisticsInterval.toLong(), 0x00FFFFFF)
|
||||||
|
writeRegister(it.num, MAP_POINTS.code, (length.toDouble() / statisticsInterval).toLong(), 0x00FFFFFF)
|
||||||
|
}
|
||||||
|
writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000001)
|
||||||
|
|
||||||
|
val start = Instant.now()
|
||||||
|
val builder = NumassProto.Point.newBuilder()
|
||||||
|
|
||||||
|
|
||||||
|
val dataCollectorJob = launch(context = pool, parent = parentJob) {
|
||||||
|
while (true) {
|
||||||
|
val packet = dataChannel.receive()
|
||||||
|
val channel = packet.board
|
||||||
|
// get or create new channel block
|
||||||
|
val channelBuilder = builder.channelsBuilderList.find { it.id.toInt() == channel }
|
||||||
|
.orElse {
|
||||||
|
builder.addChannelsBuilder().setId(channel.toLong())
|
||||||
|
.also {
|
||||||
|
//initializing single block
|
||||||
|
it.addBlocksBuilder().also {
|
||||||
|
it.time = (start.epochSecond * 1e9 + start.nano).toLong()
|
||||||
|
it.binSize = 8 // tick in nanos
|
||||||
|
it.length = length.toLong() * 1000 //block length in nanos
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val blockBuilder = channelBuilder.getBlocksBuilder(0)
|
||||||
|
val eventsBuilder = blockBuilder.eventsBuilder
|
||||||
|
|
||||||
|
val buffer = ByteBuffer.wrap(packet.payload)
|
||||||
|
while (buffer.hasRemaining()) {
|
||||||
|
val firstWord = buffer.getInt()
|
||||||
|
val secondWord = buffer.getInt()
|
||||||
|
if (firstWord == STATISTIC_HEADER && secondWord == STATISTIC_HEADER) {
|
||||||
|
val statistics = ByteArray(128)
|
||||||
|
buffer.get(statistics)
|
||||||
|
//TODO use statistics for meta
|
||||||
|
handleStatistics(channel, ByteBuffer.wrap(statistics))
|
||||||
|
} else if (firstWord == 0) {
|
||||||
|
//TODO handle zeros
|
||||||
|
} else {
|
||||||
|
val time: Long = secondWord.positive shl 14 + firstWord ushr 18
|
||||||
|
val amp: Short = (firstWord and 0x0000FFFF).toShort()
|
||||||
|
eventsBuilder.addTimes(time * 8)
|
||||||
|
eventsBuilder.addAmplitudes(amp.toLong())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel data collection after specific time passed
|
||||||
|
*/
|
||||||
|
val schedulerJob = launch(context = pool, parent = parentJob) {
|
||||||
|
delay(length + 2000L)
|
||||||
|
dataCollectorJob.cancel(CancellationException("Expired"))
|
||||||
|
}
|
||||||
|
|
||||||
|
val meta = buildMeta {
|
||||||
|
setNode("dpp", boards.first().meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
dataCollectorJob.join()
|
||||||
|
return ProtoNumassPoint(builder.build(), meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
const val STATISTIC_HEADER: Int = 0x0C000000
|
||||||
|
}
|
||||||
|
|
||||||
private fun writeParameter(board: Byte, register: Byte, data: ByteArray) {
|
|
||||||
val packet = packetNumber.incrementAndGet() % 256
|
|
||||||
send(WRITE, board, register, data)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
override fun close() {
|
|
||||||
//TODO send termination signal
|
|
||||||
connections.values.forEach {
|
|
||||||
it.first.close()
|
|
||||||
it.second.cancel()
|
|
||||||
}
|
|
||||||
parentJob.cancel(CancellationException("Server stopped"))
|
|
||||||
}
|
|
||||||
|
|
||||||
fun reconnect() {
|
|
||||||
close()
|
|
||||||
//create a new parent job
|
|
||||||
parentJob = Job()
|
|
||||||
open()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
/*
|
||||||
def __make_binary_base():
|
def __make_binary_base():
|
||||||
point = dfparser.Point()
|
point = dfparser.Point()
|
||||||
@ -163,157 +435,6 @@ class DanteClient(val ip: String) : AutoCloseable {
|
|||||||
channel.blocks.add()
|
channel.blocks.add()
|
||||||
return point
|
return point
|
||||||
|
|
||||||
async def __write_dpp(
|
|
||||||
self, board: int, register: int, value: bytes):
|
|
||||||
await self.__write_config(
|
|
||||||
board, ORDER['DPP_REGISTER_1'], struct.pack('>I', register))
|
|
||||||
await self.__write_config(board, ORDER['DPP_REGISTER_2'], value)
|
|
||||||
await self.__write_config(
|
|
||||||
board, ORDER['DPP_CONFIG_COMMIT_OFFSET'],
|
|
||||||
b'\x00\x00\x00\x01', b'\x00\x00\x00\x01')
|
|
||||||
|
|
||||||
async def __initialize_boards(self):
|
|
||||||
dpp_params = SETTINGS['DANTE']['dpp']
|
|
||||||
|
|
||||||
gain = np.double(dpp_params['gain'])
|
|
||||||
det_thresh = np.uint32(dpp_params['detection_thresold'])
|
|
||||||
pileup_thr = np.uint32(dpp_params['pileup_thresold'])
|
|
||||||
en_fil_peak_time = np.uint32(
|
|
||||||
dpp_params['energy_filter']['peaking_time'])
|
|
||||||
en_fil_flattop = np.uint32(dpp_params['energy_filter']['flat_top'])
|
|
||||||
fast_peak_time = np.uint32(dpp_params['fast_filter']['peaking_time'])
|
|
||||||
fast_flattop = np.uint32(dpp_params['fast_filter']['flat_top'])
|
|
||||||
recovery_time = np.uint32(dpp_params['recovery_time'])
|
|
||||||
zero_peak_rate = np.uint32(dpp_params['zero_peak_rate'])
|
|
||||||
inverted_input = np.uint32(dpp_params['inverted_input'])
|
|
||||||
|
|
||||||
assert 1 <= en_fil_peak_time <= 511
|
|
||||||
assert 0.01 <= gain <= en_fil_peak_time * 2 - 0.01
|
|
||||||
assert 0 <= det_thresh <= 4096
|
|
||||||
assert 0 <= pileup_thr <= 4096
|
|
||||||
assert 1 <= en_fil_flattop <= 15
|
|
||||||
assert 1 <= fast_peak_time <= 31
|
|
||||||
assert 1 <= fast_flattop <= 31
|
|
||||||
assert 0 <= recovery_time <= 2**24 - 1
|
|
||||||
assert 0 <= zero_peak_rate <= 500
|
|
||||||
assert inverted_input in [0, 1]
|
|
||||||
assert (en_fil_peak_time + en_fil_flattop) * 2 < 1023
|
|
||||||
|
|
||||||
for board in BOARDS:
|
|
||||||
logger.info('start %s board initialisation', board)
|
|
||||||
|
|
||||||
val_128 = struct.pack('>I', 0 + inverted_input * (1 << 24))
|
|
||||||
await self.__write_dpp(board, 128, val_128)
|
|
||||||
|
|
||||||
val_162 = struct.pack('>I', recovery_time)
|
|
||||||
await self.__write_dpp(board, 162, val_162)
|
|
||||||
|
|
||||||
await self.__write_dpp(board, 181, b'\x00\x00\x00\x00')
|
|
||||||
|
|
||||||
await self.__write_dpp(board, 185, b'\x00\x00\x00\x00')
|
|
||||||
|
|
||||||
await self.__write_dpp(board, 175, b'\x00\x00\x00\x01')
|
|
||||||
|
|
||||||
val_160 = struct.pack('>I', int(pileup_thr / gain) + 1 * (1 << 31))
|
|
||||||
await self.__write_dpp(board, 160, val_160)
|
|
||||||
|
|
||||||
val_160 = struct.pack(
|
|
||||||
'>I', int(pileup_thr / gain * 2) + 0 * (1 << 31))
|
|
||||||
await self.__write_dpp(board, 160, val_160)
|
|
||||||
|
|
||||||
val_152 = struct.pack(
|
|
||||||
'>I', int(det_thresh / gain * fast_peak_time))
|
|
||||||
await self.__write_dpp(board, 152, val_152)
|
|
||||||
|
|
||||||
if fast_flattop == 1:
|
|
||||||
await self.__write_dpp(board, 154, b'\x00\x00\x00\x00')
|
|
||||||
else:
|
|
||||||
val_154 = np.uint32(np.ceil(fast_flattop) / 2)
|
|
||||||
await self.__write_dpp(board, 154, val_154)
|
|
||||||
|
|
||||||
if zero_peak_rate == 0:
|
|
||||||
await self.__write_dpp(board, 142, b'\x00\x00\x00\x00')
|
|
||||||
else:
|
|
||||||
val_142 = np.uint32(1 / zero_peak_rate / 10 * 1e6 + (1 << 31))
|
|
||||||
await self.__write_dpp(board, 142, val_142)
|
|
||||||
|
|
||||||
val_180 = struct.pack('>I', fast_flattop + 1)
|
|
||||||
await self.__write_dpp(board, 180, val_180)
|
|
||||||
|
|
||||||
if (2 * fast_peak_time + fast_flattop) > 4 * en_fil_flattop:
|
|
||||||
val_140 = struct.pack('>I', np.uint32(np.ceil(
|
|
||||||
(2 * fast_peak_time + fast_flattop) * 4) + 1))
|
|
||||||
await self.__write_dpp(board, 140, val_140)
|
|
||||||
else:
|
|
||||||
await self.__write_dpp(board, 140, en_fil_flattop)
|
|
||||||
|
|
||||||
val_141 = struct.pack('>I', fast_peak_time + fast_flattop + 4)
|
|
||||||
await self.__write_dpp(board, 141, val_141)
|
|
||||||
|
|
||||||
val_156 = struct.pack('>I', fast_peak_time + 0 * (1 << 28))
|
|
||||||
await self.__write_dpp(board, 156, val_156)
|
|
||||||
|
|
||||||
val_150 = struct.pack('>I', fast_flattop + 0 * (1 << 28))
|
|
||||||
await self.__write_dpp(board, 150, val_150)
|
|
||||||
|
|
||||||
val_149 = struct.pack('>I', en_fil_peak_time + 1 * (1 << 28))
|
|
||||||
await self.__write_dpp(board, 149, val_149)
|
|
||||||
|
|
||||||
val_150 = struct.pack('>I', en_fil_flattop + 1 * (1 << 28))
|
|
||||||
await self.__write_dpp(board, 150, val_150)
|
|
||||||
|
|
||||||
val_153 = struct.pack(
|
|
||||||
'>I',
|
|
||||||
en_fil_peak_time * 2 + en_fil_flattop * 2 + 1 * (1 << 28))
|
|
||||||
await self.__write_dpp(board, 153, val_153)
|
|
||||||
|
|
||||||
val_184 = struct.pack(
|
|
||||||
'>I', int(gain * (1 << 24) / en_fil_peak_time) + 1 * (1 << 28))
|
|
||||||
await self.__write_dpp(board, 184, val_184)
|
|
||||||
|
|
||||||
await self.__write_dpp(board, 148, struct.pack('>I', 1 + (1 << 1)))
|
|
||||||
|
|
||||||
val_128 = struct.pack(
|
|
||||||
'>I', 1 * (1 << 2) + inverted_input * (1 << 24))
|
|
||||||
await self.__write_dpp(board, 128, val_128)
|
|
||||||
|
|
||||||
val_128 = struct.pack('>I', 1 + inverted_input * (1 << 24))
|
|
||||||
await self.__write_dpp(board, 128, val_128)
|
|
||||||
|
|
||||||
self.__send_reply_ok()
|
|
||||||
|
|
||||||
async def __start_acquisition(self, acq_time_ms, map_points=10):
|
|
||||||
acq_time_ms = acq_time_ms
|
|
||||||
logger.info('start point acquisition %s ms', acq_time_ms)
|
|
||||||
for board in BOARDS:
|
|
||||||
await self.__write_config(
|
|
||||||
board, ORDER['ACQUISITION_SETTINGS'],
|
|
||||||
b'\x00\x00\x00\x04', b'\x00\x00\x00\x07')
|
|
||||||
await self.__write_config(
|
|
||||||
board, ORDER['ACQUISITION_TIME'],
|
|
||||||
struct.pack('>I', acq_time_ms))
|
|
||||||
await self.__write_config(
|
|
||||||
board, ORDER['MAP_POINTS'],
|
|
||||||
struct.pack('>I', acq_time_ms // map_points))
|
|
||||||
await self.__write_config(
|
|
||||||
board, ORDER['TIME_PER_MAP_POINT'],
|
|
||||||
struct.pack('>I', map_points))
|
|
||||||
await self.__write_config(
|
|
||||||
0, ORDER['ACQUISITION_STATUS'],
|
|
||||||
b'\x00\x00\x00\x01', b'\x00\x00\x00\x01')
|
|
||||||
|
|
||||||
def __get_packet_number(self):
|
|
||||||
"""Generate available packet number for board."""
|
|
||||||
packet_num = ALL_PACKETS.difference(PACKET_NUMBERS).pop()
|
|
||||||
PACKET_NUMBERS.add(packet_num)
|
|
||||||
EVENTS[packet_num] = asyncio.Event()
|
|
||||||
return packet_num
|
|
||||||
|
|
||||||
def __put_packet_number(self, packet_num):
|
|
||||||
"""Release packet number to available."""
|
|
||||||
PACKET_NUMBERS.remove(packet_num)
|
|
||||||
del EVENTS[packet_num]
|
|
||||||
|
|
||||||
async def __send_message(
|
async def __send_message(
|
||||||
self, command, board_num=0, start_addr=0, length=0, data=b''):
|
self, command, board_num=0, start_addr=0, length=0, data=b''):
|
||||||
"""Send message and wait for response."""
|
"""Send message and wait for response."""
|
||||||
|
@ -0,0 +1,54 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2018 Alexander Nozik.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package inr.numass.control.dante
|
||||||
|
|
||||||
|
import hep.dataforge.kodex.buildMeta
|
||||||
|
import inr.numass.data.channel
|
||||||
|
import kotlinx.coroutines.experimental.runBlocking
|
||||||
|
|
||||||
|
fun main(args: Array<String>) {
|
||||||
|
val client = DanteClient("192.168.111.123", 7)
|
||||||
|
val meta = buildMeta {
|
||||||
|
|
||||||
|
/*
|
||||||
|
val gain = meta.getDouble("gain")
|
||||||
|
val det_thresh = meta.getInt("detection_thresold")
|
||||||
|
val pileup_thr = meta.getInt("pileup_thresold")
|
||||||
|
val en_fil_peak_time = meta.getInt("energy_filter.peaking_time")
|
||||||
|
val en_fil_flattop = meta.getInt("energy_filter.flat_top")
|
||||||
|
val fast_peak_time = meta.getInt("fast_filter.peaking_time")
|
||||||
|
val fast_flattop = meta.getInt("fast_filter.flat_top")
|
||||||
|
val recovery_time = meta.getValue("recovery_time").longValue()
|
||||||
|
val zero_peak_rate = meta.getInt("zero_peak_rate")
|
||||||
|
val inverted_input = meta.getInt("inverted_input")
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
runBlocking {
|
||||||
|
client.configureAll(meta)
|
||||||
|
}
|
||||||
|
val point = runBlocking {
|
||||||
|
client.readPoint(10*1000)
|
||||||
|
}
|
||||||
|
println("***META***")
|
||||||
|
println(point.meta)
|
||||||
|
println("***BLOCKS***")
|
||||||
|
point.blocks.forEach {
|
||||||
|
println("channel: ${it.channel}")
|
||||||
|
println("\tlength: ${it.length}")
|
||||||
|
println("\tevents: ${it.events.count()}")
|
||||||
|
}
|
||||||
|
}
|
@ -89,7 +89,7 @@ interface NumassPoint : Metoid, NumassBlock {
|
|||||||
return if (envelope.dataType?.startsWith("numass.point.classic") ?: envelope.meta.hasValue("split")) {
|
return if (envelope.dataType?.startsWith("numass.point.classic") ?: envelope.meta.hasValue("split")) {
|
||||||
ClassicNumassPoint(envelope)
|
ClassicNumassPoint(envelope)
|
||||||
} else {
|
} else {
|
||||||
ProtoNumassPoint(envelope)
|
ProtoNumassPoint.fromEnvelope(envelope)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,6 @@ import inr.numass.data.api.NumassPoint
|
|||||||
import inr.numass.data.dataStream
|
import inr.numass.data.dataStream
|
||||||
import inr.numass.data.legacy.NumassFileEnvelope
|
import inr.numass.data.legacy.NumassFileEnvelope
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.io.IOException
|
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
@ -27,29 +26,16 @@ import java.util.stream.Stream
|
|||||||
* Protobuf based numass point
|
* Protobuf based numass point
|
||||||
* Created by darksnake on 09.07.2017.
|
* Created by darksnake on 09.07.2017.
|
||||||
*/
|
*/
|
||||||
class ProtoNumassPoint(private val envelope: Envelope) : NumassPoint {
|
class ProtoNumassPoint(val proto: NumassProto.Point, override val meta: Meta) : NumassPoint {
|
||||||
|
|
||||||
private val point: NumassProto.Point
|
|
||||||
get() = try {
|
|
||||||
envelope.dataStream.use {
|
|
||||||
NumassProto.Point.parseFrom(it)
|
|
||||||
}
|
|
||||||
} catch (ex: IOException) {
|
|
||||||
throw RuntimeException("Failed to read point via protobuf", ex)
|
|
||||||
}
|
|
||||||
|
|
||||||
override val blocks: Stream<NumassBlock>
|
override val blocks: Stream<NumassBlock>
|
||||||
get() = point.channelsList.stream()
|
get() = proto.channelsList.stream()
|
||||||
.flatMap { channel ->
|
.flatMap { channel ->
|
||||||
channel.blocksList.stream()
|
channel.blocksList.stream()
|
||||||
.map { block -> ProtoBlock(channel.id.toInt(), block, this) }
|
.map { block -> ProtoBlock(channel.id.toInt(), block, this) }
|
||||||
.sorted(Comparator.comparing<ProtoBlock, Instant> { it.startTime })
|
.sorted(Comparator.comparing<ProtoBlock, Instant> { it.startTime })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
override val meta: Meta = envelope.meta
|
|
||||||
|
|
||||||
|
|
||||||
override val voltage: Double = meta.getDouble("external_meta.HV1_value", super.voltage)
|
override val voltage: Double = meta.getDouble("external_meta.HV1_value", super.voltage)
|
||||||
|
|
||||||
override val index: Int = meta.getInt("external_meta.point_index", super.index)
|
override val index: Int = meta.getInt("external_meta.point_index", super.index)
|
||||||
@ -63,7 +49,14 @@ class ProtoNumassPoint(private val envelope: Envelope) : NumassPoint {
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun readFile(path: Path): ProtoNumassPoint {
|
fun readFile(path: Path): ProtoNumassPoint {
|
||||||
return ProtoNumassPoint(NumassFileEnvelope.open(path, true))
|
return fromEnvelope(NumassFileEnvelope.open(path, true))
|
||||||
|
}
|
||||||
|
|
||||||
|
fun fromEnvelope(envelope: Envelope): ProtoNumassPoint {
|
||||||
|
val proto = envelope.dataStream.use {
|
||||||
|
NumassProto.Point.parseFrom(it)
|
||||||
|
}
|
||||||
|
return ProtoNumassPoint(proto, envelope.meta)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun readFile(path: String, context: Context = Global): ProtoNumassPoint {
|
fun readFile(path: String, context: Context = Global): ProtoNumassPoint {
|
||||||
|
Loading…
Reference in New Issue
Block a user