diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt deleted file mode 100644 index 32317fb8..00000000 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/Communications.kt +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Copyright 2018 Alexander Nozik. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package inr.numass.control.dante - -import kotlin.coroutines.experimental.buildSequence - -internal val Byte.positive - get() = toInt() and 0xFF - -internal val Int.positive - get() = toLong() and 0xFFFFFFFF - -internal val Int.byte: Byte - get() { - if (this >= 256) { - throw RuntimeException("Number less than 256 is expected") - } else { - return toByte() - } - } - -internal val Long.int: Int - get() { - if (this >= 0xFFFFFFFF) { - throw RuntimeException("Number less than 256 is expected") - } else { - return toInt() - } - } - -internal val ByteArray.hex - get() = this.joinToString(separator = "") { it.positive.toString(16).padStart(2, '0') } - -object Communications { - - val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE) - - enum class Register(val code: Int) { - FIRMWARE_VERSION(0), - DPP_REGISTER_1(1), - DPP_REGISTER_2(2), - DPP_CONFIG_COMMIT_OFFSET(3), - ACQUISITION_STATUS(4), - ACQUISITION_TIME(5), - ELAPSED_TIME(6), - ACQUISITION_SETTINGS(7), - WAVEFORM_LENGTH(8), - MAP_POINTS(9), - TIME_PER_MAP_POINT(10), - ETH_CONFIGURATION_DATA(11), - ETHERNET_COMMIT(13), - CALIB_DONE_SIGNALS(14) - } - - enum class CommandType(val byte: Byte) { - READ(0xD0.toByte()), - WRITE(0xD1.toByte()), - SINGLE_SPECTRUM_MODE(0xD2.toByte()), - MAP_MODE(0xD6.toByte()), - LIST_MODE(0xD4.toByte()), - WAVEFORM_MODE(0xD3.toByte()), - } - - enum class AcquisitionMode(val byte: Byte) { - SINGLE_SPECTRUM_MODE(2), - MAP_MODE(6), - LIST_MODE(4), - WAVEFORM_MODE(3); - - val long = byte.toLong() - } - - - /** - * Build command header - */ - fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Byte, length: Byte): ByteArray { - assert(command in listOf(CommandType.READ, CommandType.WRITE)) - assert(board in 0..255) - assert(packet in 0..255) - assert(length in 0..255) - val header = ByteArray(8) - header[0] = PACKET_HEADER_START_BYTES[0].toByte() - header[1] = PACKET_HEADER_START_BYTES[1].toByte() - header[2] = command.byte - header[3] = board - header[4] = packet - header[5] = start - header[6] = length - return header - } - - /** - * Escape the sequence using DANTE convention - */ - private fun ByteArray.escape(): ByteArray { - return buildSequence { - this@escape.forEach { - yield(it) - if (it == 0xdd.toByte()) { - yield(it) - } - } - }.toList().toByteArray() - } - - - /** - * Create DANTE command and stuff it. - * @param length size of data array/4 - */ - fun wrapCommand(command: CommandType, board: Int, packet: Int, start: Int, length: Int, data: ByteArray): ByteArray { - if (command == CommandType.READ) { - assert(data.isEmpty()) - } else { - assert(data.size % 4 == 0) - assert(length == data.size / 4) - } - - val header = buildHeader(command, board.byte, packet.byte, start.byte, length.byte) - - val res = (header + data).escape() - return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte()) - } - - data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) { - override fun toString(): String { - return "${command.name}[$board, $packet] of size ${payload.size}" - } - } - -// /** -// * Read the stream and return resulting messages in ReceiveChannel -// */ -// fun readStream(stream: InputStream, parent: Job): ReceiveChannel { -// return produce(capacity = Channel.UNLIMITED, parent = parent) { -// while (true) { -// if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) { -// // second check is not executed unless first one is true -// val header = ByteArray(6) -// stream.read(header) -// val command = CommandType.values().find { it.byte == header[0] }!! -// val board = header[1] -// val packet = header[2] -// val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive -// val payload = ByteArray(length) -// stream.read(payload) -// send(DanteMessage(command, board.positive, packet.positive, payload)) -// } -// } -// } -// } - -// fun readSocket(socket: Socket, parent: Job): ReceiveChannel { -// return readStream(socket.getInputStream(), parent) -// } -} -/* - -def extract_response(stream): - r"""Extract response messages from stream. - - Method will rstrip junk bytes before response (assuming it could be only - \x00 values), extract every complete message and cut them from stream. - - if stream contains uncomlete message, method leaves it untouched. - - Returns croped stream and extracted messages. Message structure: - { - packet_num - request packet id - command - command or response code - payload - payload binary data - board - board number - } - """ - messages = [] - while True: - idx = stream.find(__PACKET_HEADER_START_BYTES) - if idx != -1: - stream = stream[idx:] - payload_size = struct.unpack('>I', b'\x00%s' % (stream[5:8]))[0] - if len(stream) >= payload_size + 8: - messages.append({ - 'command': stream[2], - 'board': stream[3], - 'packet_num': stream[4], - 'payload': stream[8: payload_size * 4 + 8] - }) - stream = stream[payload_size + 8:] - else: - return stream, messages - else: - return stream, messages - - - - -def create_response( - command, board_num=0, packet_num=0, data=None): - """Create response packet. - - For test purposes only! - """ - assert data is None or isinstance(data, (bytes, bytearray)) - assert 0 <= board_num <= 255 - assert 0 <= packet_num <= 255 - - if not data: - data = b'' - - header = bytearray(8) - header[0:2] = __PACKET_HEADER_START_BYTES - header[2] = command[0] - header[3] = board_num - header[4] = packet_num - header[5:8] = struct.pack('>I', len(data))[1:] - - return b'%s%s' % (header, data) - - -def extract_request(stream): - """Extract stuffed requests from stream. - - For test purposes only! - """ - messages = [] - while True: - start_idx = stream.find(b'\xdd\xaa') - end_idx = stream.find(b'\xdd\x55') - if start_idx != -1 and end_idx != -1 and end_idx > start_idx: - packet = stream[start_idx + 2: end_idx] - messages.append({ - 'command': packet[2], - 'board': packet[3], - 'packet_num': packet[4], - 'start_addr': packet[5], - 'length': packet[6], - 'payload': packet[8:], - }) - stream = stream[end_idx + 2:] - else: - return stream, messages - - - - */ \ No newline at end of file diff --git a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt index 9c8b3f07..052dde81 100644 --- a/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt +++ b/numass-control/dante/src/main/kotlin/inr/numass/control/dante/DanteClient.kt @@ -19,8 +19,8 @@ package inr.numass.control.dante import hep.dataforge.kodex.buildMeta import hep.dataforge.kodex.orElse import hep.dataforge.meta.Meta -import inr.numass.control.dante.Communications.CommandType.* -import inr.numass.control.dante.Communications.Register.* +import inr.numass.control.dante.DanteClient.Companion.CommandType.* +import inr.numass.control.dante.DanteClient.Companion.Register.* import inr.numass.data.NumassProto import inr.numass.data.api.NumassPoint import inr.numass.data.storage.ProtoNumassPoint @@ -40,6 +40,34 @@ import kotlin.collections.HashMap import kotlin.coroutines.experimental.buildSequence import kotlin.math.ceil +internal val Byte.positive + get() = toInt() and 0xFF + +internal val Int.positive + get() = toLong() and 0xFFFFFFFF + +internal val Int.byte: Byte + get() { + if (this >= 256) { + throw RuntimeException("Number less than 256 is expected") + } else { + return toByte() + } + } + +internal val Long.int: Int + get() { + if (this >= 0xFFFFFFFF) { + throw RuntimeException("Number less than ${Int.MAX_VALUE*2L} is expected") + } else { + return toInt() + } + } + +internal val ByteArray.hex + get() = this.joinToString(separator = "") { it.positive.toString(16).padStart(2, '0') } + + //TODO implement using Device class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { private val RESET_COMMAND = byteArrayOf(0xDD.toByte(), 0x55, 0xDD.toByte(), 0xEE.toByte()) @@ -60,8 +88,12 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { /** * Synchronous reading and writing of registers */ - private val comChannel = Channel(capacity = Channel.UNLIMITED) - private val dataChannel = Channel(capacity = Channel.UNLIMITED) + private val comChannel = Channel(capacity = Channel.UNLIMITED) + + /** + * Data packets channel + */ + private val dataChannel = Channel(capacity = Channel.UNLIMITED) //private val statisticsChannel = Channel(capacity = Channel.UNLIMITED) /** @@ -138,18 +170,18 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { val job = launch(context = pool, parent = parentJob) { val stream = socket.getInputStream() while (true) { - if (stream.read() == Communications.PACKET_HEADER_START_BYTES[0] && stream.read() == Communications.PACKET_HEADER_START_BYTES[1]) { + if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) { // second check is not executed unless first one is true val header = ByteArray(6) stream.read(header) - val command = Communications.CommandType.values().find { it.byte == header[0] } + val command = CommandType.values().find { it.byte == header[0] } ?: throw RuntimeException("Unknown command code: ${header[0]}") val board = header[1] val packet = header[2] val length = (header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive) * 4 val payload = ByteArray(length) stream.read(payload) - handle(Communications.DanteMessage(command, board.positive, packet.positive, payload)) + handle(DanteMessage(command, board.positive, packet.positive, payload)) } } //TODO handle errors and reconnect @@ -158,12 +190,12 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { connections[port] = Pair(socket, job) } - private suspend fun send(command: Communications.CommandType, board: Int, packet: Int, register: Int, data: ByteArray = ByteArray(0), length: Int = (data.size / 4)) { + private suspend fun send(command: CommandType, board: Int, packet: Int, register: Int, data: ByteArray = ByteArray(0), length: Int = (data.size / 4)) { logger.debug("Sending {}[{}, {}] of size {}*4 to {}", command.name, board, packet, length, register) - sendChannel.send(Communications.wrapCommand(command, board, packet, register, length, data)) + sendChannel.send(wrapCommand(command, board, packet, register, length, data)) } - private suspend fun handle(response: Communications.DanteMessage) { + private suspend fun handle(response: DanteMessage) { logger.debug("Received {}", response.toString()) when (response.command) { READ, WRITE -> comChannel.send(response) @@ -210,10 +242,9 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { } private suspend fun readRegister(board: Int, register: Int, length: Int = 1): List { - val packet = nextPacket() send(READ, board, packet, register, length = length) - var message: Communications.DanteMessage? = null + var message: DanteMessage? = null //skip other responses while (message == null || !(message.command == READ && message.packet == packet)) { message = comChannel.receive() @@ -228,7 +259,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { } suspend fun getFirmwareVersion(): Long { - return readRegister(0, Communications.Register.FIRMWARE_VERSION.code)[0] + return readRegister(0, Register.FIRMWARE_VERSION.code)[0] } /** @@ -242,10 +273,6 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { writeRegister(board, DPP_CONFIG_COMMIT_OFFSET.code, 0x00000001, 0x00000001) } -// suspend fun writeDPP(board: Int, register: Int, value: Int) { -// writeDPP(board, register, value.toLong()) -// } - /** * Configure single board using provided meta */ @@ -363,25 +390,26 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { logger.info("Starting list point acquisition {} ms", length) boards.forEach { - writeRegister(it.num, ACQUISITION_SETTINGS.code, Communications.AcquisitionMode.LIST_MODE.long, 0x00000007) + writeRegister(it.num, ACQUISITION_SETTINGS.code, AcquisitionMode.LIST_MODE.long, 0x00000007) writeRegister(it.num, ACQUISITION_TIME.code, length.toLong()) writeRegister(it.num, TIME_PER_MAP_POINT.code, statisticsInterval.toLong(), 0x00FFFFFF) writeRegister(it.num, MAP_POINTS.code, (length.toDouble() / statisticsInterval).toLong(), 0x00FFFFFF) - if (readRegister(it.num, ACQUISITION_SETTINGS.code)[0] != Communications.AcquisitionMode.LIST_MODE.long) { + if (readRegister(it.num, ACQUISITION_SETTINGS.code)[0] != AcquisitionMode.LIST_MODE.long) { throw RuntimeException("Set list mode failed") } } -// logger.info("Waiting for configuration to settle") -// delay(500) + writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000001) val start = Instant.now() val builder = NumassProto.Point.newBuilder() + //collecting packages - val dataCollectorJob = launch(context = pool, parent = parentJob) { - while (Duration.between(start, Instant.now()) < Duration.ofMillis(length + 4000L)) { + while (Duration.between(start, Instant.now()) < Duration.ofMillis(length + 2000L)) { + try { + //Waiting for a packet for a second. If it is not there, returning and checking time condition val packet = withTimeout(1000) { dataChannel.receive() } if (packet.command != LIST_MODE) { logger.warn("Unexpected packet type: {}", packet.command.name) @@ -422,23 +450,121 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { eventsBuilder.addAmplitudes(amp.toLong()) } } + } catch (ex: Exception) { + if (ex !is CancellationException) { + logger.error("Exception raised during packet gathering", ex) + } } - writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000000) } + //Stopping acquisition just in case + writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000000) val meta = buildMeta { boards.first().meta?.let { putNode("dpp", it) } } - - - dataCollectorJob.join() return ProtoNumassPoint(builder.build(), meta) } companion object { const val STATISTIC_HEADER: Int = 0x0C000000 + + val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE) + + enum class Register(val code: Int) { + FIRMWARE_VERSION(0), + DPP_REGISTER_1(1), + DPP_REGISTER_2(2), + DPP_CONFIG_COMMIT_OFFSET(3), + ACQUISITION_STATUS(4), + ACQUISITION_TIME(5), + ELAPSED_TIME(6), + ACQUISITION_SETTINGS(7), + WAVEFORM_LENGTH(8), + MAP_POINTS(9), + TIME_PER_MAP_POINT(10), + ETH_CONFIGURATION_DATA(11), + ETHERNET_COMMIT(13), + CALIB_DONE_SIGNALS(14) + } + + enum class CommandType(val byte: Byte) { + READ(0xD0.toByte()), + WRITE(0xD1.toByte()), + SINGLE_SPECTRUM_MODE(0xD2.toByte()), + MAP_MODE(0xD6.toByte()), + LIST_MODE(0xD4.toByte()), + WAVEFORM_MODE(0xD3.toByte()), + } + + enum class AcquisitionMode(val byte: Byte) { + SINGLE_SPECTRUM_MODE(2), + MAP_MODE(6), + LIST_MODE(4), + WAVEFORM_MODE(3); + + val long = byte.toLong() + } + + + /** + * Build command header + */ + fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Byte, length: Byte): ByteArray { + assert(command in listOf(CommandType.READ, CommandType.WRITE)) + assert(board in 0..255) + assert(packet in 0..255) + assert(length in 0..255) + val header = ByteArray(8) + header[0] = PACKET_HEADER_START_BYTES[0].toByte() + header[1] = PACKET_HEADER_START_BYTES[1].toByte() + header[2] = command.byte + header[3] = board + header[4] = packet + header[5] = start + header[6] = length + return header + } + + /** + * Escape the sequence using DANTE convention + */ + private fun ByteArray.escape(): ByteArray { + return buildSequence { + this@escape.forEach { + yield(it) + if (it == 0xdd.toByte()) { + yield(it) + } + } + }.toList().toByteArray() + } + + + /** + * Create DANTE command and stuff it. + * @param length size of data array/4 + */ + fun wrapCommand(command: CommandType, board: Int, packet: Int, start: Int, length: Int, data: ByteArray): ByteArray { + if (command == CommandType.READ) { + assert(data.isEmpty()) + } else { + assert(data.size % 4 == 0) + assert(length == data.size / 4) + } + + val header = buildHeader(command, board.byte, packet.byte, start.byte, length.byte) + + val res = (header + data).escape() + return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte()) + } + + data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) { + override fun toString(): String { + return "${command.name}[$board, $packet] of size ${payload.size}" + } + } } }