From a425d6cb6cda87a255af7c962e98944d20aff57a Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Wed, 18 Apr 2018 14:22:03 +0300 Subject: [PATCH] Fix for number started xml name --- .../numass/control/dante/Communications.kt | 2 +- .../inr/numass/control/dante/DanteClient.kt | 77 +++++++++------ .../inr/numass/control/dante/DanteTest.kt | 98 ++++++++++++++----- 3 files changed, 122 insertions(+), 55 deletions(-) diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt index 92ca716d..32317fb8 100644 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt @@ -139,7 +139,7 @@ object Communications { data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) { override fun toString(): String { - return "${command.name}[$board, $packet]: ${payload.size}" + return "${command.name}[$board, $packet] of size ${payload.size}" } } diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt index 90466545..9c8b3f07 100644 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt @@ -32,6 +32,7 @@ import java.lang.Math.pow import java.net.Socket import java.nio.ByteBuffer import java.nio.ByteOrder +import java.time.Duration import java.time.Instant import java.util.* import java.util.concurrent.atomic.AtomicLong @@ -69,7 +70,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { */ private data class BoardState(val num: Int, var meta: Meta? = null) - private val boards = (0 until chainLength).map { BoardState(it) } + private val boards = (0..chainLength).map { BoardState(it) } fun open() { @@ -118,16 +119,18 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { val socket = Socket(ip, 8000 + port) - logger.info("Opened socket {}", socket.inetAddress) + logger.info("Opened socket {}", "${socket.inetAddress}:${socket.port}") //Create command queue on port 0 if (port == 0) { - outputJob.cancel() + //outputJob.cancel() output = socket.getOutputStream() outputJob = launch(context = pool, parent = parentJob) { - val command = sendChannel.receive() - output.write(command) - logger.debug("Sent {}", command.hex) + while (true) { + val command = sendChannel.receive() + output.write(command) + logger.trace("Sent {}", command.hex) + } } } @@ -139,10 +142,11 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { // second check is not executed unless first one is true val header = ByteArray(6) stream.read(header) - val command = Communications.CommandType.values().find { it.byte == header[0] }!! + val command = Communications.CommandType.values().find { it.byte == header[0] } + ?: throw RuntimeException("Unknown command code: ${header[0]}") val board = header[1] val packet = header[2] - val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive + val length = (header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive) * 4 val payload = ByteArray(length) stream.read(payload) handle(Communications.DanteMessage(command, board.positive, packet.positive, payload)) @@ -171,7 +175,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { * Generate next packet number */ private fun nextPacket(): Int { - return (packetNumber.incrementAndGet() % 256 - 128).toInt() + return (packetNumber.incrementAndGet() % 256).toInt() } private fun List.asBuffer(): ByteBuffer { @@ -191,9 +195,9 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { private suspend fun writeRegister(board: Int, register: Int, data: List, mask: Long? = null) { //representing data as byte buffer val buffer = if (mask != null) { - val oldData = readRegister(board, register, data.size) + val oldData = withTimeout(1000) { readRegister(board, register, data.size) } //(oldData & not_mask) | (newData & mask); - (0..data.size).map { (oldData[it] and mask.inv()).or(data[it] and mask) }.asBuffer(); + (0 until data.size).map { (oldData[it] and mask.inv()).or(data[it] and mask) }.asBuffer(); } else { data.asBuffer() } @@ -206,6 +210,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { } private suspend fun readRegister(board: Int, register: Int, length: Int = 1): List { + val packet = nextPacket() send(READ, board, packet, register, length = length) var message: Communications.DanteMessage? = null @@ -222,6 +227,10 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { }.map { it.positive }.toList() } + suspend fun getFirmwareVersion(): Long { + return readRegister(0, Communications.Register.FIRMWARE_VERSION.code)[0] + } + /** * Write a single DPP parameter * @param register number of parameter register @@ -242,15 +251,15 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { */ suspend fun configureBoard(board: Int, meta: Meta) { val gain = meta.getDouble("gain") - val det_thresh = meta.getInt("detection_thresold") - val pileup_thr = meta.getInt("pileup_thresold") + val det_thresh = meta.getInt("detection_threshold") + val pileup_thr = meta.getInt("pileup_threshold") val en_fil_peak_time = meta.getInt("energy_filter.peaking_time") val en_fil_flattop = meta.getInt("energy_filter.flat_top") val fast_peak_time = meta.getInt("fast_filter.peaking_time") val fast_flattop = meta.getInt("fast_filter.flat_top") val recovery_time = meta.getValue("recovery_time").longValue() val zero_peak_rate = meta.getInt("zero_peak_rate") - val inverted_input = meta.getInt("inverted_input") + val inverted_input = meta.getInt("inverted_input", 0) assert(en_fil_peak_time in 1..511) assert(gain in 0.01..(en_fil_peak_time * 2 - 0.01)) @@ -306,7 +315,9 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { writeDPP(board, 128, 0b100 + inverted_input * (1L shl 24)) writeDPP(board, 128, 1 + inverted_input * (1L shl 24)) + logger.info("Finished {} board configuration", board) + boards.find { it.num == board }?.meta = meta } /** @@ -323,14 +334,14 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { * Clear unused data */ private suspend fun clearData() { - for (element in dataChannel) { - logger.warn("Dumping residual data packet {}", element.toString()) + while (!dataChannel.isEmpty) { + logger.warn("Dumping residual data packet {}", dataChannel.receive().toString()) } } private suspend fun clearCommunications() { - for (element in comChannel) { - logger.debug("Dumping residual communication packet {}", element.toString()) + while (!dataChannel.isEmpty) { + logger.debug("Dumping residual communication packet {}", dataChannel.receive().toString()) } } @@ -356,7 +367,13 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { writeRegister(it.num, ACQUISITION_TIME.code, length.toLong()) writeRegister(it.num, TIME_PER_MAP_POINT.code, statisticsInterval.toLong(), 0x00FFFFFF) writeRegister(it.num, MAP_POINTS.code, (length.toDouble() / statisticsInterval).toLong(), 0x00FFFFFF) + + if (readRegister(it.num, ACQUISITION_SETTINGS.code)[0] != Communications.AcquisitionMode.LIST_MODE.long) { + throw RuntimeException("Set list mode failed") + } } +// logger.info("Waiting for configuration to settle") +// delay(500) writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000001) val start = Instant.now() @@ -364,8 +381,12 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { val dataCollectorJob = launch(context = pool, parent = parentJob) { - while (true) { - val packet = dataChannel.receive() + while (Duration.between(start, Instant.now()) < Duration.ofMillis(length + 4000L)) { + val packet = withTimeout(1000) { dataChannel.receive() } + if (packet.command != LIST_MODE) { + logger.warn("Unexpected packet type: {}", packet.command.name) + continue + } val channel = packet.board // get or create new channel block val channelBuilder = builder.channelsBuilderList.find { it.id.toInt() == channel } @@ -376,7 +397,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { it.addBlocksBuilder().also { it.time = (start.epochSecond * 1e9 + start.nano).toLong() it.binSize = 8 // tick in nanos - it.length = length.toLong() * 1000 //block length in nanos + it.length = (length * 1e6).toLong() //block length in nanos } } } @@ -402,20 +423,16 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { } } } - } - - /** - * Cancel data collection after specific time passed - */ - val schedulerJob = launch(context = pool, parent = parentJob) { - delay(length + 2000L) - dataCollectorJob.cancel(CancellationException("Expired")) + writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000000) } val meta = buildMeta { - setNode("dpp", boards.first().meta) + boards.first().meta?.let { + putNode("dpp", it) + } } + dataCollectorJob.join() return ProtoNumassPoint(builder.build(), meta) } diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt index 2b655132..32507db5 100644 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt @@ -16,39 +16,89 @@ package inr.numass.control.dante +import hep.dataforge.context.Context +import hep.dataforge.context.Global +import hep.dataforge.fx.plots.FXPlotManager +import hep.dataforge.kodex.KMetaBuilder import hep.dataforge.kodex.buildMeta +import hep.dataforge.kodex.configure +import hep.dataforge.kodex.nullable +import hep.dataforge.plots.data.DataPlot +import hep.dataforge.tables.Adapters +import inr.numass.data.analyzers.NumassAnalyzer +import inr.numass.data.analyzers.SimpleAnalyzer +import inr.numass.data.analyzers.withBinning +import inr.numass.data.api.NumassBlock import inr.numass.data.channel import kotlinx.coroutines.experimental.runBlocking fun main(args: Array) { - val client = DanteClient("192.168.111.123", 7) + val client = DanteClient("192.168.111.120", 7) + client.open() val meta = buildMeta { - - /* - val gain = meta.getDouble("gain") - val det_thresh = meta.getInt("detection_thresold") - val pileup_thr = meta.getInt("pileup_thresold") - val en_fil_peak_time = meta.getInt("energy_filter.peaking_time") - val en_fil_flattop = meta.getInt("energy_filter.flat_top") - val fast_peak_time = meta.getInt("fast_filter.peaking_time") - val fast_flattop = meta.getInt("fast_filter.flat_top") - val recovery_time = meta.getValue("recovery_time").longValue() - val zero_peak_rate = meta.getInt("zero_peak_rate") - val inverted_input = meta.getInt("inverted_input") - */ + "gain" to 1.0 + "detection_threshold" to 150 + "pileup_threshold" to 1 + "energy_filter" to { + "peaking_time" to 63 + "flat_top" to 2 + } + "fast_filter" to { + "peaking_time" to 4 + "flat_top" to 1 + } + "recovery_time" to 100 + "zero_peak_rate" to 0 } runBlocking { + println("Firmware version: ${client.getFirmwareVersion()}") + +// client.reset() +// delay(500) + client.configureAll(meta) + + val point = client.readPoint(10 * 1000) + + println("***META***") + println(point.meta) + println("***BLOCKS***") + point.blocks.forEach { + println("channel: ${it.channel}") + println("\tlength: ${it.length}") + println("\tevents: ${it.events.count()}") + it.plotAmplitudeSpectrum(plotName = it.channel.toString()) + } } - val point = runBlocking { - client.readPoint(10*1000) - } - println("***META***") - println(point.meta) - println("***BLOCKS***") - point.blocks.forEach { - println("channel: ${it.channel}") - println("\tlength: ${it.length}") - println("\tevents: ${it.events.count()}") +} + +fun NumassBlock.plotAmplitudeSpectrum(plotName: String = "spectrum", frameName: String = "", context: Context = Global, metaAction: KMetaBuilder.() -> Unit = {}) { + val meta = buildMeta("meta", metaAction) + val plotManager = context.load(FXPlotManager::class) + val binning = meta.getInt("binning", 20) + val lo = meta.optNumber("window.lo").nullable?.toInt() + val up = meta.optNumber("window.up").nullable?.toInt() + val data = SimpleAnalyzer().getAmplitudeSpectrum(this, meta.getMetaOrEmpty("spectrum")).withBinning(binning, lo, up) + plotManager.display(name = frameName) { + val valueAxis = if (meta.getBoolean("normalize", false)) { + NumassAnalyzer.COUNT_RATE_KEY + } else { + NumassAnalyzer.COUNT_KEY + } + plots.configure { + "connectionType" to "step" + "thickness" to 2 + "showLine" to true + "showSymbol" to false + "showErrors" to false + }.setType(DataPlot::class) + + val plot = DataPlot.plot( + plotName, + Adapters.buildXYAdapter(NumassAnalyzer.CHANNEL_KEY, valueAxis), + data + ) + plot.configure(meta) + add(plot) } } \ No newline at end of file