diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt index 65b19f0e..92ca716d 100644 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt @@ -16,21 +16,39 @@ package inr.numass.control.dante -import kotlinx.coroutines.experimental.Job -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.channels.ReceiveChannel -import kotlinx.coroutines.experimental.channels.produce -import java.io.InputStream import kotlin.coroutines.experimental.buildSequence +internal val Byte.positive + get() = toInt() and 0xFF + +internal val Int.positive + get() = toLong() and 0xFFFFFFFF + +internal val Int.byte: Byte + get() { + if (this >= 256) { + throw RuntimeException("Number less than 256 is expected") + } else { + return toByte() + } + } + +internal val Long.int: Int + get() { + if (this >= 0xFFFFFFFF) { + throw RuntimeException("Number less than 256 is expected") + } else { + return toInt() + } + } + +internal val ByteArray.hex + get() = this.joinToString(separator = "") { it.positive.toString(16).padStart(2, '0') } object Communications { val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE) - private val Byte.positive - get() = toInt() and 0xFF - enum class Register(val code: Int) { FIRMWARE_VERSION(0), DPP_REGISTER_1(1), @@ -61,14 +79,16 @@ object Communications { SINGLE_SPECTRUM_MODE(2), MAP_MODE(6), LIST_MODE(4), - WAVEFORM_MODE(3) + WAVEFORM_MODE(3); + + val long = byte.toLong() } /** * Build command header */ - fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Register, length: Byte): ByteArray { + fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Byte, length: Byte): ByteArray { assert(command in listOf(CommandType.READ, CommandType.WRITE)) assert(board in 0..255) assert(packet in 0..255) @@ -79,7 +99,7 @@ object Communications { header[2] = command.byte header[3] = board header[4] = packet - header[5] = start.code.toByte() + header[5] = start header[6] = length return header } @@ -101,49 +121,53 @@ object Communications { /** * Create DANTE command and stuff it. + * @param length size of data array/4 */ - fun wrapCommand(command: CommandType, board: Byte, packet: Byte, start: Register, data: ByteArray): ByteArray { - when (command) { - CommandType.READ -> assert(data.isEmpty()) - CommandType.WRITE -> assert(data.size % 4 == 0) - else -> throw RuntimeException("Command $command not expected") + fun wrapCommand(command: CommandType, board: Int, packet: Int, start: Int, length: Int, data: ByteArray): ByteArray { + if (command == CommandType.READ) { + assert(data.isEmpty()) + } else { + assert(data.size % 4 == 0) + assert(length == data.size / 4) } - val length: Byte = (data.size / 4).toByte() - val header = buildHeader(command, board, packet, start, length) + val header = buildHeader(command, board.byte, packet.byte, start.byte, length.byte) val res = (header + data).escape() return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte()) } - data class DanteMessage(val command: CommandType, val board: Byte, val packet: Byte, val payload: ByteArray) { + data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) { override fun toString(): String { return "${command.name}[$board, $packet]: ${payload.size}" } } - /** - * Read the stream and return resulting messages in ReceiveChannel - */ - fun readStream(stream: InputStream, parent: Job): ReceiveChannel { - return produce(capacity = Channel.UNLIMITED, parent = parent) { - while (true) { - val first = stream.read() - if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) { - // second check is not executed unless first one is false - val header = ByteArray(6) - stream.read(header) - val command = CommandType.values().find { it.byte == header[0] }!! - val board = header[1] - val packet = header[2] - val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive - val payload = ByteArray(length) - stream.read(payload) - send(DanteMessage(command, board, packet, payload)) - } - } - } - } +// /** +// * Read the stream and return resulting messages in ReceiveChannel +// */ +// fun readStream(stream: InputStream, parent: Job): ReceiveChannel { +// return produce(capacity = Channel.UNLIMITED, parent = parent) { +// while (true) { +// if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) { +// // second check is not executed unless first one is true +// val header = ByteArray(6) +// stream.read(header) +// val command = CommandType.values().find { it.byte == header[0] }!! +// val board = header[1] +// val packet = header[2] +// val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive +// val payload = ByteArray(length) +// stream.read(payload) +// send(DanteMessage(command, board.positive, packet.positive, payload)) +// } +// } +// } +// } + +// fun readSocket(socket: Socket, parent: Job): ReceiveChannel { +// return readStream(socket.getInputStream(), parent) +// } } /* diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt index c127cd00..90466545 100644 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt @@ -16,40 +16,99 @@ package inr.numass.control.dante +import hep.dataforge.kodex.buildMeta +import hep.dataforge.kodex.orElse +import hep.dataforge.meta.Meta import inr.numass.control.dante.Communications.CommandType.* -import kotlinx.coroutines.experimental.CancellationException -import kotlinx.coroutines.experimental.Job +import inr.numass.control.dante.Communications.Register.* +import inr.numass.data.NumassProto +import inr.numass.data.api.NumassPoint +import inr.numass.data.storage.ProtoNumassPoint +import kotlinx.coroutines.experimental.* import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.launch -import kotlinx.coroutines.experimental.runBlocking import org.slf4j.LoggerFactory import java.io.OutputStream +import java.lang.Math.pow import java.net.Socket +import java.nio.ByteBuffer +import java.nio.ByteOrder +import java.time.Instant +import java.util.* import java.util.concurrent.atomic.AtomicLong +import kotlin.collections.HashMap +import kotlin.coroutines.experimental.buildSequence +import kotlin.math.ceil //TODO implement using Device -class DanteClient(val ip: String) : AutoCloseable { - private val RESET_COMMAND = arrayOf(0xDD, 0x55, 0xDD, 0xEE) +class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { + private val RESET_COMMAND = byteArrayOf(0xDD.toByte(), 0x55, 0xDD.toByte(), 0xEE.toByte()) private val logger = LoggerFactory.getLogger(javaClass) private val packetNumber = AtomicLong(0) - private var parentJob = Job() + private lateinit var parentJob: Job + private val pool = newFixedThreadPoolContext(8, "Dante") private val connections: MutableMap> = HashMap() - private val commandChannel = Channel(capacity = Channel.UNLIMITED) + private val sendChannel = Channel(capacity = Channel.UNLIMITED) private lateinit var output: OutputStream private lateinit var outputJob: Job + /** + * Synchronous reading and writing of registers + */ + private val comChannel = Channel(capacity = Channel.UNLIMITED) + private val dataChannel = Channel(capacity = Channel.UNLIMITED) + //private val statisticsChannel = Channel(capacity = Channel.UNLIMITED) + + /** + * @param num number + * @param name the name of the board + */ + private data class BoardState(val num: Int, var meta: Meta? = null) + + private val boards = (0 until chainLength).map { BoardState(it) } + fun open() { + //create a new parent job + this.parentJob = Job() (0..3).forEach { openPort(it) } } + override fun close() { + //TODO send termination signal + connections.values.forEach { + it.first.close() + it.second.cancel() + } + parentJob.cancel(CancellationException("Server stopped")) + } + + /** + * Disconnect and reconnect without clearing configuration + */ + fun reconnect() { + close() + runBlocking { + clearData() + } + open() + } + + /** + * Reset everything + */ + fun reset() { + async { + sendChannel.send(RESET_COMMAND) + } + } + private fun openPort(port: Int) { //closing existing connection connections[port]?.let { @@ -59,102 +118,315 @@ class DanteClient(val ip: String) : AutoCloseable { val socket = Socket(ip, 8000 + port) + logger.info("Opened socket {}", socket.inetAddress) + //Create command queue on port 0 if (port == 0) { outputJob.cancel() output = socket.getOutputStream() - outputJob = launch(parent = parentJob) { - output.write(commandChannel.receive()) + outputJob = launch(context = pool, parent = parentJob) { + val command = sendChannel.receive() + output.write(command) + logger.debug("Sent {}", command.hex) } } - val channel = Communications.readStream(socket.getInputStream(), parentJob) - val job = launch { + val job = launch(context = pool, parent = parentJob) { + val stream = socket.getInputStream() while (true) { - handle(channel.receive()) - //TODO handle errors and reconnect + if (stream.read() == Communications.PACKET_HEADER_START_BYTES[0] && stream.read() == Communications.PACKET_HEADER_START_BYTES[1]) { + // second check is not executed unless first one is true + val header = ByteArray(6) + stream.read(header) + val command = Communications.CommandType.values().find { it.byte == header[0] }!! + val board = header[1] + val packet = header[2] + val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive + val payload = ByteArray(length) + stream.read(payload) + handle(Communications.DanteMessage(command, board.positive, packet.positive, payload)) + } } + //TODO handle errors and reconnect } connections[port] = Pair(socket, job) } - fun send(command: ByteArray) { - runBlocking { - commandChannel.send(command) - } + private suspend fun send(command: Communications.CommandType, board: Int, packet: Int, register: Int, data: ByteArray = ByteArray(0), length: Int = (data.size / 4)) { + logger.debug("Sending {}[{}, {}] of size {}*4 to {}", command.name, board, packet, length, register) + sendChannel.send(Communications.wrapCommand(command, board, packet, register, length, data)) } - fun send(command: Communications.CommandType, board: Byte, packet: Byte, start: Communications.Register, data: ByteArray) { - send(Communications.wrapCommand(command, board, packet, start, data)) - } - - suspend fun handle(response: Communications.DanteMessage) { + private suspend fun handle(response: Communications.DanteMessage) { logger.debug("Received {}", response.toString()) when (response.command) { - READ -> TODO() - WRITE -> TODO() - SINGLE_SPECTRUM_MODE -> TODO() - MAP_MODE -> TODO() - LIST_MODE -> TODO() - WAVEFORM_MODE -> TODO() + READ, WRITE -> comChannel.send(response) + SINGLE_SPECTRUM_MODE, MAP_MODE, LIST_MODE, WAVEFORM_MODE -> dataChannel.send(response) } } - /* - async def __write_config(self, board, register, data, mask=None): - assert isinstance(data, (bytes, bytearray)) - data_len = len(data) - if mask: - assert isinstance(mask, (bytes, bytearray)) - assert len(mask) == data_len - resp = await self.__send_message( - COMMANDS['READ'], board, register, data_len // 4) - - data_bin = bitarray(endian='big') - data_bin.frombytes(data) - - resp_payload = bitarray(endian='big') - resp_payload.frombytes(resp['payload']) - - mask_bin = bitarray(endian='big') - mask_bin.frombytes(mask) - - data_masked = data_bin & mask_bin - - mask_bin.invert() - - data_out = (data_masked | (resp_payload & mask_bin)).tobytes() - else: - data_out = data - - return await self.__send_message( - COMMANDS['WRITE'], board, register, data_len // 4, data_out) + /** + * Generate next packet number */ - - private fun writeParameter(board: Byte, register: Byte, data: ByteArray) { - val packet = packetNumber.incrementAndGet() % 256 - send(WRITE, board, register, data) + private fun nextPacket(): Int { + return (packetNumber.incrementAndGet() % 256 - 128).toInt() } + private fun List.asBuffer(): ByteBuffer { + val buffer = ByteBuffer.allocate(this.size * 4) + buffer.order(ByteOrder.BIG_ENDIAN) + this.forEach { buffer.putInt(it.int) } //safe transform to 4-byte integer + return buffer + } - override fun close() { - //TODO send termination signal - connections.values.forEach { - it.first.close() - it.second.cancel() + private val ByteBuffer.bits: BitSet + get() = BitSet.valueOf(this) + + /** + * Write single or multiple registers + * + */ + private suspend fun writeRegister(board: Int, register: Int, data: List, mask: Long? = null) { + //representing data as byte buffer + val buffer = if (mask != null) { + val oldData = readRegister(board, register, data.size) + //(oldData & not_mask) | (newData & mask); + (0..data.size).map { (oldData[it] and mask.inv()).or(data[it] and mask) }.asBuffer(); + } else { + data.asBuffer() } - parentJob.cancel(CancellationException("Server stopped")) + + send(WRITE, board, nextPacket(), register, buffer.array()) } - fun reconnect() { - close() - //create a new parent job - parentJob = Job() - open() + private suspend fun writeRegister(board: Int, register: Int, data: Long, mask: Long? = null) { + writeRegister(board, register, listOf(data), mask) } + + private suspend fun readRegister(board: Int, register: Int, length: Int = 1): List { + val packet = nextPacket() + send(READ, board, packet, register, length = length) + var message: Communications.DanteMessage? = null + //skip other responses + while (message == null || !(message.command == READ && message.packet == packet)) { + message = comChannel.receive() + } + + return buildSequence { + val intBuffer = ByteBuffer.wrap(message.payload).asIntBuffer() + while (intBuffer.hasRemaining()) { + yield(intBuffer.get()) + } + }.map { it.positive }.toList() + } + + /** + * Write a single DPP parameter + * @param register number of parameter register + * @param value value of the parameter as unsigned integer + */ + suspend fun writeDPP(board: Int, register: Int, value: Long) { + //sending register number and value in the same command + writeRegister(board, DPP_REGISTER_1.code, listOf(register.toLong(), value)) + writeRegister(board, DPP_CONFIG_COMMIT_OFFSET.code, 0x00000001, 0x00000001) + } + +// suspend fun writeDPP(board: Int, register: Int, value: Int) { +// writeDPP(board, register, value.toLong()) +// } + + /** + * Configure single board using provided meta + */ + suspend fun configureBoard(board: Int, meta: Meta) { + val gain = meta.getDouble("gain") + val det_thresh = meta.getInt("detection_thresold") + val pileup_thr = meta.getInt("pileup_thresold") + val en_fil_peak_time = meta.getInt("energy_filter.peaking_time") + val en_fil_flattop = meta.getInt("energy_filter.flat_top") + val fast_peak_time = meta.getInt("fast_filter.peaking_time") + val fast_flattop = meta.getInt("fast_filter.flat_top") + val recovery_time = meta.getValue("recovery_time").longValue() + val zero_peak_rate = meta.getInt("zero_peak_rate") + val inverted_input = meta.getInt("inverted_input") + + assert(en_fil_peak_time in 1..511) + assert(gain in 0.01..(en_fil_peak_time * 2 - 0.01)) + assert(det_thresh in 0..4096) + assert(pileup_thr in 0..4096) + assert(en_fil_flattop in 1..15) + assert(fast_peak_time in 1..31) + assert(fast_flattop in 1..31) + assert(recovery_time in (0.0..pow(2.0, 24.0) - 1)) + assert(zero_peak_rate in 0..500) + assert(inverted_input in listOf(0, 1)) + assert((en_fil_peak_time + en_fil_flattop) * 2 < 1023) + + logger.info("Starting {} board configuration", board) + + writeDPP(board, 128, inverted_input * (1L shl 24)) + writeDPP(board, 162, recovery_time) + writeDPP(board, 181, 0) + writeDPP(board, 185, 0) + writeDPP(board, 175, 1) + writeDPP(board, 160, (pileup_thr / gain).toLong() + (1 shl 31)) + writeDPP(board, 160, (pileup_thr / gain * 2).toLong() and (1 shl 31).inv()) //set bit 2 to zero + writeDPP(board, 152, (det_thresh / gain * fast_peak_time).toLong()) + + if (fast_flattop == 1) { + writeDPP(board, 154, 0) + } else { + writeDPP(board, 154, ceil(fast_flattop.toDouble() / 2).toLong()) // TODO check this + } + + if (zero_peak_rate == 0) { + writeDPP(board, 142, 0) + } else { + writeDPP(board, 142, (1.0 / zero_peak_rate / 10 * 1e6).toLong() + (1 shl 31)) + } + + writeDPP(board, 180, (fast_flattop + 1).toLong()) + + if ((2 * fast_peak_time + fast_flattop) > 4 * en_fil_flattop) { + writeDPP(board, 140, ((2 * fast_peak_time + fast_flattop) * 4 + 1).toLong()) + } else { + writeDPP(board, 140, en_fil_flattop.toLong()) + } + + writeDPP(board, 141, (fast_peak_time + fast_flattop + 4).toLong()) + writeDPP(board, 156, fast_peak_time + 0 * (1L shl 28)) + writeDPP(board, 150, fast_flattop + 0 * (1L shl 28)) + writeDPP(board, 149, en_fil_peak_time + 1 * (1L shl 28)) + writeDPP(board, 150, en_fil_flattop + 1 * (1L shl 28)) + writeDPP(board, 153, en_fil_peak_time * 2 + en_fil_flattop * 2 + 1 * (1L shl 28)) + writeDPP(board, 184, (gain * (1 shl 24) / en_fil_peak_time).toLong() + 1 * (1L shl 28)) + writeDPP(board, 148, 0b11) + writeDPP(board, 128, 0b100 + inverted_input * (1L shl 24)) + writeDPP(board, 128, 1 + inverted_input * (1L shl 24)) + + logger.info("Finished {} board configuration", board) + } + + /** + * Configure all boards + */ + suspend fun configureAll(meta: Meta) { + boards.forEach { + configureBoard(it.num, meta) + } + logger.info("Finished configuration of all actibe boards") + } + + /** + * Clear unused data + */ + private suspend fun clearData() { + for (element in dataChannel) { + logger.warn("Dumping residual data packet {}", element.toString()) + } + } + + private suspend fun clearCommunications() { + for (element in comChannel) { + logger.debug("Dumping residual communication packet {}", element.toString()) + } + } + + /** + * Handle statistics asynchronously + */ + fun handleStatistics(channel: Int, message: ByteBuffer) { + logger.info("Received statistics packet from board {}", channel) + //TODO + } + + /** + * Gather data in list mode + * @param length measurement time in milliseconds + * @param statisticsPackets number of statistics packets per measurement + */ + suspend fun readPoint(length: Int, statisticsInterval: Int = 1000): NumassPoint { + clearData() + + logger.info("Starting list point acquisition {} ms", length) + boards.forEach { + writeRegister(it.num, ACQUISITION_SETTINGS.code, Communications.AcquisitionMode.LIST_MODE.long, 0x00000007) + writeRegister(it.num, ACQUISITION_TIME.code, length.toLong()) + writeRegister(it.num, TIME_PER_MAP_POINT.code, statisticsInterval.toLong(), 0x00FFFFFF) + writeRegister(it.num, MAP_POINTS.code, (length.toDouble() / statisticsInterval).toLong(), 0x00FFFFFF) + } + writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000001) + + val start = Instant.now() + val builder = NumassProto.Point.newBuilder() + + + val dataCollectorJob = launch(context = pool, parent = parentJob) { + while (true) { + val packet = dataChannel.receive() + val channel = packet.board + // get or create new channel block + val channelBuilder = builder.channelsBuilderList.find { it.id.toInt() == channel } + .orElse { + builder.addChannelsBuilder().setId(channel.toLong()) + .also { + //initializing single block + it.addBlocksBuilder().also { + it.time = (start.epochSecond * 1e9 + start.nano).toLong() + it.binSize = 8 // tick in nanos + it.length = length.toLong() * 1000 //block length in nanos + } + } + } + val blockBuilder = channelBuilder.getBlocksBuilder(0) + val eventsBuilder = blockBuilder.eventsBuilder + + val buffer = ByteBuffer.wrap(packet.payload) + while (buffer.hasRemaining()) { + val firstWord = buffer.getInt() + val secondWord = buffer.getInt() + if (firstWord == STATISTIC_HEADER && secondWord == STATISTIC_HEADER) { + val statistics = ByteArray(128) + buffer.get(statistics) + //TODO use statistics for meta + handleStatistics(channel, ByteBuffer.wrap(statistics)) + } else if (firstWord == 0) { + //TODO handle zeros + } else { + val time: Long = secondWord.positive shl 14 + firstWord ushr 18 + val amp: Short = (firstWord and 0x0000FFFF).toShort() + eventsBuilder.addTimes(time * 8) + eventsBuilder.addAmplitudes(amp.toLong()) + } + } + } + } + + /** + * Cancel data collection after specific time passed + */ + val schedulerJob = launch(context = pool, parent = parentJob) { + delay(length + 2000L) + dataCollectorJob.cancel(CancellationException("Expired")) + } + + val meta = buildMeta { + setNode("dpp", boards.first().meta) + } + + dataCollectorJob.join() + return ProtoNumassPoint(builder.build(), meta) + } + + companion object { + const val STATISTIC_HEADER: Int = 0x0C000000 + } + } + + /* def __make_binary_base(): point = dfparser.Point() @@ -163,157 +435,6 @@ class DanteClient(val ip: String) : AutoCloseable { channel.blocks.add() return point - async def __write_dpp( - self, board: int, register: int, value: bytes): - await self.__write_config( - board, ORDER['DPP_REGISTER_1'], struct.pack('>I', register)) - await self.__write_config(board, ORDER['DPP_REGISTER_2'], value) - await self.__write_config( - board, ORDER['DPP_CONFIG_COMMIT_OFFSET'], - b'\x00\x00\x00\x01', b'\x00\x00\x00\x01') - - async def __initialize_boards(self): - dpp_params = SETTINGS['DANTE']['dpp'] - - gain = np.double(dpp_params['gain']) - det_thresh = np.uint32(dpp_params['detection_thresold']) - pileup_thr = np.uint32(dpp_params['pileup_thresold']) - en_fil_peak_time = np.uint32( - dpp_params['energy_filter']['peaking_time']) - en_fil_flattop = np.uint32(dpp_params['energy_filter']['flat_top']) - fast_peak_time = np.uint32(dpp_params['fast_filter']['peaking_time']) - fast_flattop = np.uint32(dpp_params['fast_filter']['flat_top']) - recovery_time = np.uint32(dpp_params['recovery_time']) - zero_peak_rate = np.uint32(dpp_params['zero_peak_rate']) - inverted_input = np.uint32(dpp_params['inverted_input']) - - assert 1 <= en_fil_peak_time <= 511 - assert 0.01 <= gain <= en_fil_peak_time * 2 - 0.01 - assert 0 <= det_thresh <= 4096 - assert 0 <= pileup_thr <= 4096 - assert 1 <= en_fil_flattop <= 15 - assert 1 <= fast_peak_time <= 31 - assert 1 <= fast_flattop <= 31 - assert 0 <= recovery_time <= 2**24 - 1 - assert 0 <= zero_peak_rate <= 500 - assert inverted_input in [0, 1] - assert (en_fil_peak_time + en_fil_flattop) * 2 < 1023 - - for board in BOARDS: - logger.info('start %s board initialisation', board) - - val_128 = struct.pack('>I', 0 + inverted_input * (1 << 24)) - await self.__write_dpp(board, 128, val_128) - - val_162 = struct.pack('>I', recovery_time) - await self.__write_dpp(board, 162, val_162) - - await self.__write_dpp(board, 181, b'\x00\x00\x00\x00') - - await self.__write_dpp(board, 185, b'\x00\x00\x00\x00') - - await self.__write_dpp(board, 175, b'\x00\x00\x00\x01') - - val_160 = struct.pack('>I', int(pileup_thr / gain) + 1 * (1 << 31)) - await self.__write_dpp(board, 160, val_160) - - val_160 = struct.pack( - '>I', int(pileup_thr / gain * 2) + 0 * (1 << 31)) - await self.__write_dpp(board, 160, val_160) - - val_152 = struct.pack( - '>I', int(det_thresh / gain * fast_peak_time)) - await self.__write_dpp(board, 152, val_152) - - if fast_flattop == 1: - await self.__write_dpp(board, 154, b'\x00\x00\x00\x00') - else: - val_154 = np.uint32(np.ceil(fast_flattop) / 2) - await self.__write_dpp(board, 154, val_154) - - if zero_peak_rate == 0: - await self.__write_dpp(board, 142, b'\x00\x00\x00\x00') - else: - val_142 = np.uint32(1 / zero_peak_rate / 10 * 1e6 + (1 << 31)) - await self.__write_dpp(board, 142, val_142) - - val_180 = struct.pack('>I', fast_flattop + 1) - await self.__write_dpp(board, 180, val_180) - - if (2 * fast_peak_time + fast_flattop) > 4 * en_fil_flattop: - val_140 = struct.pack('>I', np.uint32(np.ceil( - (2 * fast_peak_time + fast_flattop) * 4) + 1)) - await self.__write_dpp(board, 140, val_140) - else: - await self.__write_dpp(board, 140, en_fil_flattop) - - val_141 = struct.pack('>I', fast_peak_time + fast_flattop + 4) - await self.__write_dpp(board, 141, val_141) - - val_156 = struct.pack('>I', fast_peak_time + 0 * (1 << 28)) - await self.__write_dpp(board, 156, val_156) - - val_150 = struct.pack('>I', fast_flattop + 0 * (1 << 28)) - await self.__write_dpp(board, 150, val_150) - - val_149 = struct.pack('>I', en_fil_peak_time + 1 * (1 << 28)) - await self.__write_dpp(board, 149, val_149) - - val_150 = struct.pack('>I', en_fil_flattop + 1 * (1 << 28)) - await self.__write_dpp(board, 150, val_150) - - val_153 = struct.pack( - '>I', - en_fil_peak_time * 2 + en_fil_flattop * 2 + 1 * (1 << 28)) - await self.__write_dpp(board, 153, val_153) - - val_184 = struct.pack( - '>I', int(gain * (1 << 24) / en_fil_peak_time) + 1 * (1 << 28)) - await self.__write_dpp(board, 184, val_184) - - await self.__write_dpp(board, 148, struct.pack('>I', 1 + (1 << 1))) - - val_128 = struct.pack( - '>I', 1 * (1 << 2) + inverted_input * (1 << 24)) - await self.__write_dpp(board, 128, val_128) - - val_128 = struct.pack('>I', 1 + inverted_input * (1 << 24)) - await self.__write_dpp(board, 128, val_128) - - self.__send_reply_ok() - - async def __start_acquisition(self, acq_time_ms, map_points=10): - acq_time_ms = acq_time_ms - logger.info('start point acquisition %s ms', acq_time_ms) - for board in BOARDS: - await self.__write_config( - board, ORDER['ACQUISITION_SETTINGS'], - b'\x00\x00\x00\x04', b'\x00\x00\x00\x07') - await self.__write_config( - board, ORDER['ACQUISITION_TIME'], - struct.pack('>I', acq_time_ms)) - await self.__write_config( - board, ORDER['MAP_POINTS'], - struct.pack('>I', acq_time_ms // map_points)) - await self.__write_config( - board, ORDER['TIME_PER_MAP_POINT'], - struct.pack('>I', map_points)) - await self.__write_config( - 0, ORDER['ACQUISITION_STATUS'], - b'\x00\x00\x00\x01', b'\x00\x00\x00\x01') - - def __get_packet_number(self): - """Generate available packet number for board.""" - packet_num = ALL_PACKETS.difference(PACKET_NUMBERS).pop() - PACKET_NUMBERS.add(packet_num) - EVENTS[packet_num] = asyncio.Event() - return packet_num - - def __put_packet_number(self, packet_num): - """Release packet number to available.""" - PACKET_NUMBERS.remove(packet_num) - del EVENTS[packet_num] - async def __send_message( self, command, board_num=0, start_addr=0, length=0, data=b''): """Send message and wait for response.""" diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt new file mode 100644 index 00000000..2b655132 --- /dev/null +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteTest.kt @@ -0,0 +1,54 @@ +/* + * Copyright 2018 Alexander Nozik. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package inr.numass.control.dante + +import hep.dataforge.kodex.buildMeta +import inr.numass.data.channel +import kotlinx.coroutines.experimental.runBlocking + +fun main(args: Array) { + val client = DanteClient("192.168.111.123", 7) + val meta = buildMeta { + + /* + val gain = meta.getDouble("gain") + val det_thresh = meta.getInt("detection_thresold") + val pileup_thr = meta.getInt("pileup_thresold") + val en_fil_peak_time = meta.getInt("energy_filter.peaking_time") + val en_fil_flattop = meta.getInt("energy_filter.flat_top") + val fast_peak_time = meta.getInt("fast_filter.peaking_time") + val fast_flattop = meta.getInt("fast_filter.flat_top") + val recovery_time = meta.getValue("recovery_time").longValue() + val zero_peak_rate = meta.getInt("zero_peak_rate") + val inverted_input = meta.getInt("inverted_input") + */ + } + runBlocking { + client.configureAll(meta) + } + val point = runBlocking { + client.readPoint(10*1000) + } + println("***META***") + println(point.meta) + println("***BLOCKS***") + point.blocks.forEach { + println("channel: ${it.channel}") + println("\tlength: ${it.length}") + println("\tevents: ${it.events.count()}") + } +} \ No newline at end of file diff --git a/numass-core/src/main/kotlin/inr/numass/data/api/NumassPoint.kt b/numass-core/src/main/kotlin/inr/numass/data/api/NumassPoint.kt index 4dbf4d57..e63b117b 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/api/NumassPoint.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/api/NumassPoint.kt @@ -89,7 +89,7 @@ interface NumassPoint : Metoid, NumassBlock { return if (envelope.dataType?.startsWith("numass.point.classic") ?: envelope.meta.hasValue("split")) { ClassicNumassPoint(envelope) } else { - ProtoNumassPoint(envelope) + ProtoNumassPoint.fromEnvelope(envelope) } } } diff --git a/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt b/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt index 2f74d070..4aca7c95 100644 --- a/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt +++ b/numass-core/src/main/kotlin/inr/numass/data/storage/ProtoNumassPoint.kt @@ -15,7 +15,6 @@ import inr.numass.data.api.NumassPoint import inr.numass.data.dataStream import inr.numass.data.legacy.NumassFileEnvelope import org.slf4j.LoggerFactory -import java.io.IOException import java.nio.file.Path import java.time.Duration import java.time.Instant @@ -27,29 +26,16 @@ import java.util.stream.Stream * Protobuf based numass point * Created by darksnake on 09.07.2017. */ -class ProtoNumassPoint(private val envelope: Envelope) : NumassPoint { - - private val point: NumassProto.Point - get() = try { - envelope.dataStream.use { - NumassProto.Point.parseFrom(it) - } - } catch (ex: IOException) { - throw RuntimeException("Failed to read point via protobuf", ex) - } +class ProtoNumassPoint(val proto: NumassProto.Point, override val meta: Meta) : NumassPoint { override val blocks: Stream - get() = point.channelsList.stream() + get() = proto.channelsList.stream() .flatMap { channel -> channel.blocksList.stream() .map { block -> ProtoBlock(channel.id.toInt(), block, this) } .sorted(Comparator.comparing { it.startTime }) } - - override val meta: Meta = envelope.meta - - override val voltage: Double = meta.getDouble("external_meta.HV1_value", super.voltage) override val index: Int = meta.getInt("external_meta.point_index", super.index) @@ -63,7 +49,14 @@ class ProtoNumassPoint(private val envelope: Envelope) : NumassPoint { companion object { fun readFile(path: Path): ProtoNumassPoint { - return ProtoNumassPoint(NumassFileEnvelope.open(path, true)) + return fromEnvelope(NumassFileEnvelope.open(path, true)) + } + + fun fromEnvelope(envelope: Envelope): ProtoNumassPoint { + val proto = envelope.dataStream.use { + NumassProto.Point.parseFrom(it) + } + return ProtoNumassPoint(proto, envelope.meta) } fun readFile(path: String, context: Context = Global): ProtoNumassPoint {