Dante update

This commit is contained in:
Alexander Nozik 2018-04-23 12:51:50 +03:00
parent 804b4c6d4f
commit 97f534646d
3 changed files with 23 additions and 60 deletions

View File

@ -27,6 +27,7 @@ import inr.numass.data.storage.ProtoNumassPoint
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.Channel
import org.slf4j.LoggerFactory
import java.io.DataInputStream
import java.io.OutputStream
import java.lang.Math.pow
import java.net.Socket
@ -58,7 +59,7 @@ internal val Int.byte: Byte
internal val Long.int: Int
get() {
if (this >= 0xFFFFFFFF) {
throw RuntimeException("Number less than ${Int.MAX_VALUE*2L} is expected")
throw RuntimeException("Number less than ${Int.MAX_VALUE * 2L} is expected")
} else {
return toInt()
}
@ -102,7 +103,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
*/
private data class BoardState(val num: Int, var meta: Meta? = null)
private val boards = (0 until chainLength).map { BoardState(it) }
private val boards = (0 until chainLength).map { BoardState(it) }
fun open() {
@ -180,7 +181,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
val packet = header[2]
val length = (header[3].positive * 0x100 + header[4].positive * 0x010 + header[5].positive) * 4
val payload = ByteArray(length)
stream.read(payload)
DataInputStream(stream).readFully(payload)
handle(DanteMessage(command, board.positive, packet.positive, payload))
}
}
@ -227,7 +228,7 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
private suspend fun writeRegister(board: Int, register: Int, data: List<Long>, mask: Long? = null) {
//representing data as byte buffer
val buffer = if (mask != null) {
val oldData = withTimeout(1000) { readRegister(board, register, data.size) }
val oldData = withTimeout(5000) { readRegister(board, register, data.size) }
//(oldData & not_mask) | (newData & mask);
(0 until data.size).map { (oldData[it] and mask.inv()).or(data[it] and mask) }.asBuffer();
} else {
@ -437,12 +438,18 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
val firstWord = buffer.getInt()
val secondWord = buffer.getInt()
if (firstWord == STATISTIC_HEADER && secondWord == STATISTIC_HEADER) {
val statistics = ByteArray(128)
buffer.get(statistics)
//TODO use statistics for meta
handleStatistics(channel, ByteBuffer.wrap(statistics))
if (buffer.remaining() < 128) {
logger.error("Can't read statistics from list message, 128 bytes expected, but {} found", buffer.remaining())
break
} else {
val statistics = ByteArray(32 * 4)
buffer.get(statistics)
//TODO use statistics for meta
handleStatistics(channel, ByteBuffer.wrap(statistics))
}
} else if (firstWord == 0) {
//TODO handle zeros
logger.info("Received zero packet from board {}", channel)
} else {
val time: Long = secondWord.positive shl 14 + firstWord ushr 18
val amp: Short = (firstWord and 0x0000FFFF).toShort()
@ -464,11 +471,13 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
putNode("dpp", it)
}
}
return ProtoNumassPoint(builder.build(), meta)
val proto = builder.build()
return ProtoNumassPoint(meta) { proto }
}
companion object {
const val STATISTIC_HEADER: Int = 0x0C000000
const val STATISTIC_HEADER: Int = 0xC0000000.toInt()
val PACKET_HEADER_START_BYTES = arrayOf(0xAA, 0xEE)
@ -567,50 +576,4 @@ class DanteClient(val ip: String, chainLength: Int) : AutoCloseable {
}
}
}
/*
def __make_binary_base():
point = dfparser.Point()
for board in BOARDS:
channel = point.channels.add(id=board)
channel.blocks.add()
return point
async def __send_message(
self, command, board_num=0, start_addr=0, length=0, data=b''):
"""Send message and wait for response."""
msg_id = self.__get_packet_number()
cmd = create_command(
command, board_num, msg_id, start_addr, length, data)
logger.debug(
'Send %s %s %s %s', command[0], board_num, start_addr, length)
await SEND_QUEUE.put(cmd)
logger.debug('[%s, %s] waiting for response', board_num, msg_id)
await EVENTS[msg_id].wait()
resp = PACKET_DATA[msg_id]
self.__put_packet_number(msg_id)
logger.debug('[%s, %s] get response: %s', board_num, msg_id, resp)
return resp
async def __send_acquired_point(self):
while self.recv_time is None or (datetime.now() - self.recv_time)\
.total_seconds() < WAIT_TIME_S:
await asyncio.sleep(SETTINGS['params']['print_count_s'])
logger.info('%s events acquired', self.events_count)
logger.info(
"%s seconds elapsed since acq data. Dump point.",
(datetime.now() - self.recv_time).total_seconds())
end_time = self.recv_time.replace(microsecond=0).isoformat()
data = self.point.SerializeToString()
self.point_meta['binary_size'] = len(data)
self.point_meta['end_time'] = end_time
self.send_message(
meta=self.point_meta,
data=data)
*/
}

View File

@ -163,8 +163,8 @@ fun getAmplitudeSpectrum(events: Sequence<NumassEvent>, length: Double, config:
}
val minChannel = config.getInt("window.lo") { spectrum.keys.min() }
val maxChannel = config.getInt("window.up") { spectrum.keys.max() }
val minChannel = config.getInt("window.lo") { spectrum.keys.min()?:0 }
val maxChannel = config.getInt("window.up") { spectrum.keys.max()?: 4096 }
return ListTable.Builder(format)
.rows(IntStream.range(minChannel, maxChannel)

View File

@ -76,7 +76,7 @@ allPars.setParDomain("trap", 0d, Double.POSITIVE_INFINITY);
// ListTable config = OldDataReader.readConfig(configName);
SpectrumGenerator generator = new SpectrumGenerator(model, allPars, 12316);
def data = generator.generateData(DataModelUtils.getUniformSpectrumConfiguration(13000, 18500, 604800 / 100 * 100, 100));
def data = generator.generateData(DataModelUtils.getUniformSpectrumConfiguration(12000, 18500, 604800 / 100 * 100, 130));
//data = TritiumUtils.correctForDeadTime(data, new SpectrumAdapter(), 10e-9);
// data = data.filter("X", Value.of(15510.0), Value.of(18610.0));