Meta viewer and other minor fixed

This commit is contained in:
Alexander Nozik 2018-04-18 20:10:54 +03:00
parent a425d6cb6c
commit 859b2ec1d4
2 changed files with 153 additions and 286 deletions

View File

@ -1,259 +0,0 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inr.numass.control.dante
import kotlin.coroutines.experimental.buildSequence
internal val Byte.positive
get() = toInt() and 0xFF
internal val Int.positive
get() = toLong() and 0xFFFFFFFF
internal val Int.byte: Byte
get() {
if (this >= 256) {
throw RuntimeException("Number less than 256 is expected")
} else {
return toByte()
}
}
internal val Long.int: Int
get() {
if (this >= 0xFFFFFFFF) {
throw RuntimeException("Number less than 256 is expected")
} else {
return toInt()
}
}
internal val ByteArray.hex
get() = this.joinToString(separator = "") { it.positive.toString(16).padStart(2, '0') }
object Communications {
val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE)
enum class Register(val code: Int) {
FIRMWARE_VERSION(0),
DPP_REGISTER_1(1),
DPP_REGISTER_2(2),
DPP_CONFIG_COMMIT_OFFSET(3),
ACQUISITION_STATUS(4),
ACQUISITION_TIME(5),
ELAPSED_TIME(6),
ACQUISITION_SETTINGS(7),
WAVEFORM_LENGTH(8),
MAP_POINTS(9),
TIME_PER_MAP_POINT(10),
ETH_CONFIGURATION_DATA(11),
ETHERNET_COMMIT(13),
CALIB_DONE_SIGNALS(14)
}
enum class CommandType(val byte: Byte) {
READ(0xD0.toByte()),
WRITE(0xD1.toByte()),
SINGLE_SPECTRUM_MODE(0xD2.toByte()),
MAP_MODE(0xD6.toByte()),
LIST_MODE(0xD4.toByte()),
WAVEFORM_MODE(0xD3.toByte()),
}
enum class AcquisitionMode(val byte: Byte) {
SINGLE_SPECTRUM_MODE(2),
MAP_MODE(6),
LIST_MODE(4),
WAVEFORM_MODE(3);
val long = byte.toLong()
}
/**
* Build command header
*/
fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Byte, length: Byte): ByteArray {
assert(command in listOf(CommandType.READ, CommandType.WRITE))
assert(board in 0..255)
assert(packet in 0..255)
assert(length in 0..255)
val header = ByteArray(8)
header[0] = PACKET_HEADER_START_BYTES[0].toByte()
header[1] = PACKET_HEADER_START_BYTES[1].toByte()
header[2] = command.byte
header[3] = board
header[4] = packet
header[5] = start
header[6] = length
return header
}
/**
* Escape the sequence using DANTE convention
*/
private fun ByteArray.escape(): ByteArray {
return buildSequence {
this@escape.forEach {
yield(it)
if (it == 0xdd.toByte()) {
yield(it)
}
}
}.toList().toByteArray()
}
/**
* Create DANTE command and stuff it.
* @param length size of data array/4
*/
fun wrapCommand(command: CommandType, board: Int, packet: Int, start: Int, length: Int, data: ByteArray): ByteArray {
if (command == CommandType.READ) {
assert(data.isEmpty())
} else {
assert(data.size % 4 == 0)
assert(length == data.size / 4)
}
val header = buildHeader(command, board.byte, packet.byte, start.byte, length.byte)
val res = (header + data).escape()
return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte())
}
data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) {
override fun toString(): String {
return "${command.name}[$board, $packet] of size ${payload.size}"
}
}
// /**
// * Read the stream and return resulting messages in ReceiveChannel
// */
// fun readStream(stream: InputStream, parent: Job): ReceiveChannel<DanteMessage> {
// return produce(capacity = Channel.UNLIMITED, parent = parent) {
// while (true) {
// if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) {
// // second check is not executed unless first one is true
// val header = ByteArray(6)
// stream.read(header)
// val command = CommandType.values().find { it.byte == header[0] }!!
// val board = header[1]
// val packet = header[2]
// val length = header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive
// val payload = ByteArray(length)
// stream.read(payload)
// send(DanteMessage(command, board.positive, packet.positive, payload))
// }
// }
// }
// }
// fun readSocket(socket: Socket, parent: Job): ReceiveChannel<DanteMessage> {
// return readStream(socket.getInputStream(), parent)
// }
}
/*
def extract_response(stream):
r"""Extract response messages from stream.
Method will rstrip junk bytes before response (assuming it could be only
\x00 values), extract every complete message and cut them from stream.
- if stream contains uncomlete message, method leaves it untouched.
Returns croped stream and extracted messages. Message structure:
{
packet_num - request packet id
command - command or response code
payload - payload binary data
board - board number
}
"""
messages = []
while True:
idx = stream.find(__PACKET_HEADER_START_BYTES)
if idx != -1:
stream = stream[idx:]
payload_size = struct.unpack('>I', b'\x00%s' % (stream[5:8]))[0]
if len(stream) >= payload_size + 8:
messages.append({
'command': stream[2],
'board': stream[3],
'packet_num': stream[4],
'payload': stream[8: payload_size * 4 + 8]
})
stream = stream[payload_size + 8:]
else:
return stream, messages
else:
return stream, messages
def create_response(
command, board_num=0, packet_num=0, data=None):
"""Create response packet.
For test purposes only!
"""
assert data is None or isinstance(data, (bytes, bytearray))
assert 0 <= board_num <= 255
assert 0 <= packet_num <= 255
if not data:
data = b''
header = bytearray(8)
header[0:2] = __PACKET_HEADER_START_BYTES
header[2] = command[0]
header[3] = board_num
header[4] = packet_num
header[5:8] = struct.pack('>I', len(data))[1:]
return b'%s%s' % (header, data)
def extract_request(stream):
"""Extract stuffed requests from stream.
For test purposes only!
"""
messages = []
while True:
start_idx = stream.find(b'\xdd\xaa')
end_idx = stream.find(b'\xdd\x55')
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
packet = stream[start_idx + 2: end_idx]
messages.append({
'command': packet[2],
'board': packet[3],
'packet_num': packet[4],
'start_addr': packet[5],
'length': packet[6],
'payload': packet[8:],
})
stream = stream[end_idx + 2:]
else:
return stream, messages
*/

View File

@ -19,8 +19,8 @@ package inr.numass.control.dante
import hep.dataforge.kodex.buildMeta import hep.dataforge.kodex.buildMeta
import hep.dataforge.kodex.orElse import hep.dataforge.kodex.orElse
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import inr.numass.control.dante.Communications.CommandType.* import inr.numass.control.dante.DanteClient.Companion.CommandType.*
import inr.numass.control.dante.Communications.Register.* import inr.numass.control.dante.DanteClient.Companion.Register.*
import inr.numass.data.NumassProto import inr.numass.data.NumassProto
import inr.numass.data.api.NumassPoint import inr.numass.data.api.NumassPoint
import inr.numass.data.storage.ProtoNumassPoint import inr.numass.data.storage.ProtoNumassPoint
@ -40,6 +40,34 @@ import kotlin.collections.HashMap
import kotlin.coroutines.experimental.buildSequence import kotlin.coroutines.experimental.buildSequence
import kotlin.math.ceil import kotlin.math.ceil
internal val Byte.positive
get() = toInt() and 0xFF
internal val Int.positive
get() = toLong() and 0xFFFFFFFF
internal val Int.byte: Byte
get() {
if (this >= 256) {
throw RuntimeException("Number less than 256 is expected")
} else {
return toByte()
}
}
internal val Long.int: Int
get() {
if (this >= 0xFFFFFFFF) {
throw RuntimeException("Number less than ${Int.MAX_VALUE*2L} is expected")
} else {
return toInt()
}
}
internal val ByteArray.hex
get() = this.joinToString(separator = "") { it.positive.toString(16).padStart(2, '0') }
//TODO implement using Device //TODO implement using Device
class DanteClient(val ip: String, chainLength: Int) : AutoCloseable { class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
private val RESET_COMMAND = byteArrayOf(0xDD.toByte(), 0x55, 0xDD.toByte(), 0xEE.toByte()) private val RESET_COMMAND = byteArrayOf(0xDD.toByte(), 0x55, 0xDD.toByte(), 0xEE.toByte())
@ -60,8 +88,12 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
/** /**
* Synchronous reading and writing of registers * Synchronous reading and writing of registers
*/ */
private val comChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED) private val comChannel = Channel<DanteMessage>(capacity = Channel.UNLIMITED)
private val dataChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED)
/**
* Data packets channel
*/
private val dataChannel = Channel<DanteMessage>(capacity = Channel.UNLIMITED)
//private val statisticsChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED) //private val statisticsChannel = Channel<Communications.DanteMessage>(capacity = Channel.UNLIMITED)
/** /**
@ -138,18 +170,18 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
val job = launch(context = pool, parent = parentJob) { val job = launch(context = pool, parent = parentJob) {
val stream = socket.getInputStream() val stream = socket.getInputStream()
while (true) { while (true) {
if (stream.read() == Communications.PACKET_HEADER_START_BYTES[0] && stream.read() == Communications.PACKET_HEADER_START_BYTES[1]) { if (stream.read() == PACKET_HEADER_START_BYTES[0] && stream.read() == PACKET_HEADER_START_BYTES[1]) {
// second check is not executed unless first one is true // second check is not executed unless first one is true
val header = ByteArray(6) val header = ByteArray(6)
stream.read(header) stream.read(header)
val command = Communications.CommandType.values().find { it.byte == header[0] } val command = CommandType.values().find { it.byte == header[0] }
?: throw RuntimeException("Unknown command code: ${header[0]}") ?: throw RuntimeException("Unknown command code: ${header[0]}")
val board = header[1] val board = header[1]
val packet = header[2] val packet = header[2]
val length = (header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive) * 4 val length = (header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive) * 4
val payload = ByteArray(length) val payload = ByteArray(length)
stream.read(payload) stream.read(payload)
handle(Communications.DanteMessage(command, board.positive, packet.positive, payload)) handle(DanteMessage(command, board.positive, packet.positive, payload))
} }
} }
//TODO handle errors and reconnect //TODO handle errors and reconnect
@ -158,12 +190,12 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
connections[port] = Pair(socket, job) connections[port] = Pair(socket, job)
} }
private suspend fun send(command: Communications.CommandType, board: Int, packet: Int, register: Int, data: ByteArray = ByteArray(0), length: Int = (data.size / 4)) { private suspend fun send(command: CommandType, board: Int, packet: Int, register: Int, data: ByteArray = ByteArray(0), length: Int = (data.size / 4)) {
logger.debug("Sending {}[{}, {}] of size {}*4 to {}", command.name, board, packet, length, register) logger.debug("Sending {}[{}, {}] of size {}*4 to {}", command.name, board, packet, length, register)
sendChannel.send(Communications.wrapCommand(command, board, packet, register, length, data)) sendChannel.send(wrapCommand(command, board, packet, register, length, data))
} }
private suspend fun handle(response: Communications.DanteMessage) { private suspend fun handle(response: DanteMessage) {
logger.debug("Received {}", response.toString()) logger.debug("Received {}", response.toString())
when (response.command) { when (response.command) {
READ, WRITE -> comChannel.send(response) READ, WRITE -> comChannel.send(response)
@ -210,10 +242,9 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
} }
private suspend fun readRegister(board: Int, register: Int, length: Int = 1): List<Long> { private suspend fun readRegister(board: Int, register: Int, length: Int = 1): List<Long> {
val packet = nextPacket() val packet = nextPacket()
send(READ, board, packet, register, length = length) send(READ, board, packet, register, length = length)
var message: Communications.DanteMessage? = null var message: DanteMessage? = null
//skip other responses //skip other responses
while (message == null || !(message.command == READ && message.packet == packet)) { while (message == null || !(message.command == READ && message.packet == packet)) {
message = comChannel.receive() message = comChannel.receive()
@ -228,7 +259,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
} }
suspend fun getFirmwareVersion(): Long { suspend fun getFirmwareVersion(): Long {
return readRegister(0, Communications.Register.FIRMWARE_VERSION.code)[0] return readRegister(0, Register.FIRMWARE_VERSION.code)[0]
} }
/** /**
@ -242,10 +273,6 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
writeRegister(board, DPP_CONFIG_COMMIT_OFFSET.code, 0x00000001, 0x00000001) writeRegister(board, DPP_CONFIG_COMMIT_OFFSET.code, 0x00000001, 0x00000001)
} }
// suspend fun writeDPP(board: Int, register: Int, value: Int) {
// writeDPP(board, register, value.toLong())
// }
/** /**
* Configure single board using provided meta * Configure single board using provided meta
*/ */
@ -363,25 +390,26 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
logger.info("Starting list point acquisition {} ms", length) logger.info("Starting list point acquisition {} ms", length)
boards.forEach { boards.forEach {
writeRegister(it.num, ACQUISITION_SETTINGS.code, Communications.AcquisitionMode.LIST_MODE.long, 0x00000007) writeRegister(it.num, ACQUISITION_SETTINGS.code, AcquisitionMode.LIST_MODE.long, 0x00000007)
writeRegister(it.num, ACQUISITION_TIME.code, length.toLong()) writeRegister(it.num, ACQUISITION_TIME.code, length.toLong())
writeRegister(it.num, TIME_PER_MAP_POINT.code, statisticsInterval.toLong(), 0x00FFFFFF) writeRegister(it.num, TIME_PER_MAP_POINT.code, statisticsInterval.toLong(), 0x00FFFFFF)
writeRegister(it.num, MAP_POINTS.code, (length.toDouble() / statisticsInterval).toLong(), 0x00FFFFFF) writeRegister(it.num, MAP_POINTS.code, (length.toDouble() / statisticsInterval).toLong(), 0x00FFFFFF)
if (readRegister(it.num, ACQUISITION_SETTINGS.code)[0] != Communications.AcquisitionMode.LIST_MODE.long) { if (readRegister(it.num, ACQUISITION_SETTINGS.code)[0] != AcquisitionMode.LIST_MODE.long) {
throw RuntimeException("Set list mode failed") throw RuntimeException("Set list mode failed")
} }
} }
// logger.info("Waiting for configuration to settle")
// delay(500)
writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000001) writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000001)
val start = Instant.now() val start = Instant.now()
val builder = NumassProto.Point.newBuilder() val builder = NumassProto.Point.newBuilder()
//collecting packages
val dataCollectorJob = launch(context = pool, parent = parentJob) { while (Duration.between(start, Instant.now()) < Duration.ofMillis(length + 2000L)) {
while (Duration.between(start, Instant.now()) < Duration.ofMillis(length + 4000L)) { try {
//Waiting for a packet for a second. If it is not there, returning and checking time condition
val packet = withTimeout(1000) { dataChannel.receive() } val packet = withTimeout(1000) { dataChannel.receive() }
if (packet.command != LIST_MODE) { if (packet.command != LIST_MODE) {
logger.warn("Unexpected packet type: {}", packet.command.name) logger.warn("Unexpected packet type: {}", packet.command.name)
@ -422,23 +450,121 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
eventsBuilder.addAmplitudes(amp.toLong()) eventsBuilder.addAmplitudes(amp.toLong())
} }
} }
} catch (ex: Exception) {
if (ex !is CancellationException) {
logger.error("Exception raised during packet gathering", ex)
} }
}
}
//Stopping acquisition just in case
writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000000) writeRegister(0, ACQUISITION_STATUS.code, 0x00000001, 0x00000000)
}
val meta = buildMeta { val meta = buildMeta {
boards.first().meta?.let { boards.first().meta?.let {
putNode("dpp", it) putNode("dpp", it)
} }
} }
dataCollectorJob.join()
return ProtoNumassPoint(builder.build(), meta) return ProtoNumassPoint(builder.build(), meta)
} }
companion object { companion object {
const val STATISTIC_HEADER: Int = 0x0C000000 const val STATISTIC_HEADER: Int = 0x0C000000
val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE)
enum class Register(val code: Int) {
FIRMWARE_VERSION(0),
DPP_REGISTER_1(1),
DPP_REGISTER_2(2),
DPP_CONFIG_COMMIT_OFFSET(3),
ACQUISITION_STATUS(4),
ACQUISITION_TIME(5),
ELAPSED_TIME(6),
ACQUISITION_SETTINGS(7),
WAVEFORM_LENGTH(8),
MAP_POINTS(9),
TIME_PER_MAP_POINT(10),
ETH_CONFIGURATION_DATA(11),
ETHERNET_COMMIT(13),
CALIB_DONE_SIGNALS(14)
}
enum class CommandType(val byte: Byte) {
READ(0xD0.toByte()),
WRITE(0xD1.toByte()),
SINGLE_SPECTRUM_MODE(0xD2.toByte()),
MAP_MODE(0xD6.toByte()),
LIST_MODE(0xD4.toByte()),
WAVEFORM_MODE(0xD3.toByte()),
}
enum class AcquisitionMode(val byte: Byte) {
SINGLE_SPECTRUM_MODE(2),
MAP_MODE(6),
LIST_MODE(4),
WAVEFORM_MODE(3);
val long = byte.toLong()
}
/**
* Build command header
*/
fun buildHeader(command: CommandType, board: Byte, packet: Byte, start: Byte, length: Byte): ByteArray {
assert(command in listOf(CommandType.READ, CommandType.WRITE))
assert(board in 0..255)
assert(packet in 0..255)
assert(length in 0..255)
val header = ByteArray(8)
header[0] = PACKET_HEADER_START_BYTES[0].toByte()
header[1] = PACKET_HEADER_START_BYTES[1].toByte()
header[2] = command.byte
header[3] = board
header[4] = packet
header[5] = start
header[6] = length
return header
}
/**
* Escape the sequence using DANTE convention
*/
private fun ByteArray.escape(): ByteArray {
return buildSequence {
this@escape.forEach {
yield(it)
if (it == 0xdd.toByte()) {
yield(it)
}
}
}.toList().toByteArray()
}
/**
* Create DANTE command and stuff it.
* @param length size of data array/4
*/
fun wrapCommand(command: CommandType, board: Int, packet: Int, start: Int, length: Int, data: ByteArray): ByteArray {
if (command == CommandType.READ) {
assert(data.isEmpty())
} else {
assert(data.size % 4 == 0)
assert(length == data.size / 4)
}
val header = buildHeader(command, board.byte, packet.byte, start.byte, length.byte)
val res = (header + data).escape()
return byteArrayOf(0xdd.toByte(), 0xaa.toByte()) + res + byteArrayOf(0xdd.toByte(), 0x55.toByte())
}
data class DanteMessage(val command: CommandType, val board: Int, val packet: Int, val payload: ByteArray) {
override fun toString(): String {
return "${command.name}[$board, $packet] of size ${payload.size}"
}
}
} }
} }