Partial TaglessEnvelopeFormat

This commit is contained in:
Alexander Nozik 2019-10-20 19:30:51 +03:00
parent e686501662
commit 87ae41886b

View File

@ -4,6 +4,7 @@ import hep.dataforge.context.Context
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.asName import hep.dataforge.names.asName
import kotlinx.io.core.* import kotlinx.io.core.*
import kotlinx.serialization.toUtf8Bytes
class TaglessEnvelopeFormat( class TaglessEnvelopeFormat(
val io: IOPlugin, val io: IOPlugin,
@ -11,6 +12,9 @@ class TaglessEnvelopeFormat(
meta: Meta = EmptyMeta meta: Meta = EmptyMeta
) : EnvelopeFormat { ) : EnvelopeFormat {
private val metaStart = meta[META_START_PROPERTY].string ?: DEFAULT_META_START
private val dataStart = meta[DATA_START_PROPERTY].string ?: DEFAULT_DATA_START
val metaFormat = io.metaFormat(metaType, meta) val metaFormat = io.metaFormat(metaType, meta)
?: error("Meta format with type $metaType could not be resolved in $io") ?: error("Meta format with type $metaType could not be resolved in $io")
@ -32,14 +36,14 @@ class TaglessEnvelopeFormat(
if (!obj.meta.isEmpty()) { if (!obj.meta.isEmpty()) {
val metaBytes = metaFormat.writeBytes(obj.meta) val metaBytes = metaFormat.writeBytes(obj.meta)
writeProperty(META_LENGTH_PROPERTY, metaBytes.size) writeProperty(META_LENGTH_PROPERTY, metaBytes.size)
writeText(DEFAULT_META_START + "\r\n") writeText(metaStart + "\r\n")
writeFully(metaBytes) writeFully(metaBytes)
writeText("\r\n") writeText("\r\n")
} }
//Printing data //Printing data
obj.data?.let { data -> obj.data?.let { data ->
writeText(DEFAULT_DATA_START + "\r\n") writeText(dataStart + "\r\n")
writeFully(data.toBytes()) writeFully(data.toBytes())
} }
} }
@ -64,7 +68,7 @@ class TaglessEnvelopeFormat(
var meta: Meta = EmptyMeta var meta: Meta = EmptyMeta
if (line.startsWith(DEFAULT_META_START)) { if (line.startsWith(metaStart)) {
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default
meta = if (properties.containsKey(META_LENGTH_PROPERTY)) { meta = if (properties.containsKey(META_LENGTH_PROPERTY)) {
val bytes = ByteArray(properties[META_LENGTH_PROPERTY]!!.toInt()) val bytes = ByteArray(properties[META_LENGTH_PROPERTY]!!.toInt())
@ -80,7 +84,7 @@ class TaglessEnvelopeFormat(
do { do {
line = readUTF8Line() ?: return SimpleEnvelope(meta, null) line = readUTF8Line() ?: return SimpleEnvelope(meta, null)
//returning an Envelope without data if end of input is reached //returning an Envelope without data if end of input is reached
} while (!line.startsWith(DEFAULT_DATA_START)) } while (!line.startsWith(dataStart))
val data: Binary? = if (properties.containsKey(DATA_LENGTH_PROPERTY)) { val data: Binary? = if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt()) val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt())
@ -95,119 +99,49 @@ class TaglessEnvelopeFormat(
} }
override fun Input.readPartial(): PartialEnvelope { override fun Input.readPartial(): PartialEnvelope {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. var offset = 0u
} var line: String = ""
do {
line = readUTF8Line() ?: error("Input does not contain tagless envelope header")
offset += line.toUtf8Bytes().size.toUInt()
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
val properties = HashMap<String, String>()
// class TaglessReader(private val override: Map<String, String>) : EnvelopeReader { line = ""
// while (line.isBlank() || line.startsWith("#?")) {
// private val BUFFER_SIZE = 1024 if (line.startsWith("#?")) {
// val match = propertyPattern.find(line)
// @Throws(IOException::class) ?: error("Line $line does not match property declaration pattern")
// override fun read(stream: InputStream): Envelope { val (key, value) = match.destructured
// return read(Channels.newChannel(stream)) properties[key] = value
// } }
// line = readUTF8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong())
// override fun read(channel: ReadableByteChannel): Envelope { offset += line.toUtf8Bytes().size.toUInt()
// val properties = HashMap(override) }
// val buffer = ByteBuffer.allocate(BUFFER_SIZE).apply { position(BUFFER_SIZE) }
// val meta = readMeta(channel, buffer, properties) var meta: Meta = EmptyMeta
// return LazyEnvelope(meta) { BufferedBinary(readData(channel, buffer, properties)) }
// } if (line.startsWith(metaStart)) {
// val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default
// meta = if (properties.containsKey(META_LENGTH_PROPERTY)) {
// /** val bytes = ByteArray(properties[META_LENGTH_PROPERTY]!!.toInt())
// * Read lines using provided channel and buffer. Buffer state is changed by this operation readFully(bytes)
// */ offset += bytes.size.toUInt()
// private fun readLines(channel: ReadableByteChannel, buffer: ByteBuffer): Sequence<String> { metaFormat.readBytes(bytes)
// return sequence { } else {
// val builder = ByteArrayOutputStream() error("Can't partially read an envelope with undefined meta size")
// while (true) { }
// if (!buffer.hasRemaining()) { }
// if (!channel.isOpen) {
// break do {
// } line = readUTF8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong())
// buffer.flip() offset += line.toUtf8Bytes().size.toUInt()
// val count = channel.read(buffer) //returning an Envelope without data if end of input is reached
// buffer.flip() } while (!line.startsWith(dataStart))
// if (count < BUFFER_SIZE) {
// channel.close() val dataSize = properties[DATA_LENGTH_PROPERTY]?.toULong()
// } return PartialEnvelope(meta, offset, dataSize)
// } }
// val b = buffer.get()
// builder.write(b.toInt())
// if (b == '\n'.toByte()) {
// yield(String(builder.toByteArray(), Charsets.UTF_8))
// builder.reset()
// }
// }
// }
// }
//
// @Throws(IOException::class)
// private fun readMeta(
// channel: ReadableByteChannel,
// buffer: ByteBuffer,
// properties: MutableMap<String, String>
// ): Meta {
// val sb = StringBuilder()
// val metaEnd = properties.getOrDefault(DATA_START_PROPERTY, DEFAULT_DATA_START)
// readLines(channel, buffer).takeWhile { it.trim { it <= ' ' } != metaEnd }.forEach { line ->
// if (line.startsWith("#?")) {
// readProperty(line.trim(), properties)
// } else if (line.isEmpty() || line.startsWith("#~")) {
// //Ignore headings, do nothing
// } else {
// sb.append(line).append("\r\n")
// }
// }
//
//
// return if (sb.isEmpty()) {
// Meta.empty()
// } else {
// val metaType = MetaType.resolve(properties)
// try {
// metaType.reader.readString(sb.toString())
// } catch (e: ParseException) {
// throw RuntimeException("Failed to parse meta", e)
// }
//
// }
// }
//
//
// @Throws(IOException::class)
// private fun readData(
// channel: ReadableByteChannel,
// buffer: ByteBuffer,
// properties: Map<String, String>
// ): ByteBuffer {
// val array = ByteArray(buffer.remaining());
// buffer.get(array)
// if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
// val result = ByteBuffer.allocate(Integer.parseInt(properties[DATA_LENGTH_PROPERTY]))
// result.put(array)//TODO fix it to not use direct array access
// channel.read(result)
// return result
// } else {
// val baos = ByteArrayOutputStream()
// baos.write(array)
// while (channel.isOpen) {
// val read = channel.read(buffer)
// buffer.flip()
// if (read < BUFFER_SIZE) {
// channel.close()
// }
//
// baos.write(buffer.array())
// }
// val remainingArray: ByteArray = ByteArray(buffer.remaining())
// buffer.get(remainingArray)
// baos.write(remainingArray)
// return ByteBuffer.wrap(baos.toByteArray())
// }
// }
// }
companion object : EnvelopeFormatFactory { companion object : EnvelopeFormatFactory {