From 87ae41886b01ab73fbd0846930d96ab59aa1d607 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 20 Oct 2019 19:30:51 +0300 Subject: [PATCH] Partial TaglessEnvelopeFormat --- .../hep/dataforge/io/TaglessEnvelopeFormat.kt | 166 ++++++------------ 1 file changed, 50 insertions(+), 116 deletions(-) diff --git a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/TaglessEnvelopeFormat.kt b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/TaglessEnvelopeFormat.kt index 4bdeca78..af01acc4 100644 --- a/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/TaglessEnvelopeFormat.kt +++ b/dataforge-io/src/commonMain/kotlin/hep/dataforge/io/TaglessEnvelopeFormat.kt @@ -4,6 +4,7 @@ import hep.dataforge.context.Context import hep.dataforge.meta.* import hep.dataforge.names.asName import kotlinx.io.core.* +import kotlinx.serialization.toUtf8Bytes class TaglessEnvelopeFormat( val io: IOPlugin, @@ -11,6 +12,9 @@ class TaglessEnvelopeFormat( meta: Meta = EmptyMeta ) : EnvelopeFormat { + private val metaStart = meta[META_START_PROPERTY].string ?: DEFAULT_META_START + private val dataStart = meta[DATA_START_PROPERTY].string ?: DEFAULT_DATA_START + val metaFormat = io.metaFormat(metaType, meta) ?: error("Meta format with type $metaType could not be resolved in $io") @@ -32,14 +36,14 @@ class TaglessEnvelopeFormat( if (!obj.meta.isEmpty()) { val metaBytes = metaFormat.writeBytes(obj.meta) writeProperty(META_LENGTH_PROPERTY, metaBytes.size) - writeText(DEFAULT_META_START + "\r\n") + writeText(metaStart + "\r\n") writeFully(metaBytes) writeText("\r\n") } //Printing data obj.data?.let { data -> - writeText(DEFAULT_DATA_START + "\r\n") + writeText(dataStart + "\r\n") writeFully(data.toBytes()) } } @@ -64,7 +68,7 @@ class TaglessEnvelopeFormat( var meta: Meta = EmptyMeta - if (line.startsWith(DEFAULT_META_START)) { + if (line.startsWith(metaStart)) { val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default meta = if (properties.containsKey(META_LENGTH_PROPERTY)) { val bytes = ByteArray(properties[META_LENGTH_PROPERTY]!!.toInt()) @@ -80,7 +84,7 @@ class TaglessEnvelopeFormat( do { line = readUTF8Line() ?: return SimpleEnvelope(meta, null) //returning an Envelope without data if end of input is reached - } while (!line.startsWith(DEFAULT_DATA_START)) + } while (!line.startsWith(dataStart)) val data: Binary? = if (properties.containsKey(DATA_LENGTH_PROPERTY)) { val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt()) @@ -95,119 +99,49 @@ class TaglessEnvelopeFormat( } override fun Input.readPartial(): PartialEnvelope { - TODO("not implemented") //To change body of created functions use File | Settings | File Templates. - } + var offset = 0u + var line: String = "" + do { + line = readUTF8Line() ?: error("Input does not contain tagless envelope header") + offset += line.toUtf8Bytes().size.toUInt() + } while (!line.startsWith(TAGLESS_ENVELOPE_HEADER)) + val properties = HashMap() -// class TaglessReader(private val override: Map) : EnvelopeReader { -// -// private val BUFFER_SIZE = 1024 -// -// @Throws(IOException::class) -// override fun read(stream: InputStream): Envelope { -// return read(Channels.newChannel(stream)) -// } -// -// override fun read(channel: ReadableByteChannel): Envelope { -// val properties = HashMap(override) -// val buffer = ByteBuffer.allocate(BUFFER_SIZE).apply { position(BUFFER_SIZE) } -// val meta = readMeta(channel, buffer, properties) -// return LazyEnvelope(meta) { BufferedBinary(readData(channel, buffer, properties)) } -// } -// -// -// /** -// * Read lines using provided channel and buffer. Buffer state is changed by this operation -// */ -// private fun readLines(channel: ReadableByteChannel, buffer: ByteBuffer): Sequence { -// return sequence { -// val builder = ByteArrayOutputStream() -// while (true) { -// if (!buffer.hasRemaining()) { -// if (!channel.isOpen) { -// break -// } -// buffer.flip() -// val count = channel.read(buffer) -// buffer.flip() -// if (count < BUFFER_SIZE) { -// channel.close() -// } -// } -// val b = buffer.get() -// builder.write(b.toInt()) -// if (b == '\n'.toByte()) { -// yield(String(builder.toByteArray(), Charsets.UTF_8)) -// builder.reset() -// } -// } -// } -// } -// -// @Throws(IOException::class) -// private fun readMeta( -// channel: ReadableByteChannel, -// buffer: ByteBuffer, -// properties: MutableMap -// ): Meta { -// val sb = StringBuilder() -// val metaEnd = properties.getOrDefault(DATA_START_PROPERTY, DEFAULT_DATA_START) -// readLines(channel, buffer).takeWhile { it.trim { it <= ' ' } != metaEnd }.forEach { line -> -// if (line.startsWith("#?")) { -// readProperty(line.trim(), properties) -// } else if (line.isEmpty() || line.startsWith("#~")) { -// //Ignore headings, do nothing -// } else { -// sb.append(line).append("\r\n") -// } -// } -// -// -// return if (sb.isEmpty()) { -// Meta.empty() -// } else { -// val metaType = MetaType.resolve(properties) -// try { -// metaType.reader.readString(sb.toString()) -// } catch (e: ParseException) { -// throw RuntimeException("Failed to parse meta", e) -// } -// -// } -// } -// -// -// @Throws(IOException::class) -// private fun readData( -// channel: ReadableByteChannel, -// buffer: ByteBuffer, -// properties: Map -// ): ByteBuffer { -// val array = ByteArray(buffer.remaining()); -// buffer.get(array) -// if (properties.containsKey(DATA_LENGTH_PROPERTY)) { -// val result = ByteBuffer.allocate(Integer.parseInt(properties[DATA_LENGTH_PROPERTY])) -// result.put(array)//TODO fix it to not use direct array access -// channel.read(result) -// return result -// } else { -// val baos = ByteArrayOutputStream() -// baos.write(array) -// while (channel.isOpen) { -// val read = channel.read(buffer) -// buffer.flip() -// if (read < BUFFER_SIZE) { -// channel.close() -// } -// -// baos.write(buffer.array()) -// } -// val remainingArray: ByteArray = ByteArray(buffer.remaining()) -// buffer.get(remainingArray) -// baos.write(remainingArray) -// return ByteBuffer.wrap(baos.toByteArray()) -// } -// } -// } + line = "" + while (line.isBlank() || line.startsWith("#?")) { + if (line.startsWith("#?")) { + val match = propertyPattern.find(line) + ?: error("Line $line does not match property declaration pattern") + val (key, value) = match.destructured + properties[key] = value + } + line = readUTF8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong()) + offset += line.toUtf8Bytes().size.toUInt() + } + + var meta: Meta = EmptyMeta + + if (line.startsWith(metaStart)) { + val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default + meta = if (properties.containsKey(META_LENGTH_PROPERTY)) { + val bytes = ByteArray(properties[META_LENGTH_PROPERTY]!!.toInt()) + readFully(bytes) + offset += bytes.size.toUInt() + metaFormat.readBytes(bytes) + } else { + error("Can't partially read an envelope with undefined meta size") + } + } + + do { + line = readUTF8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong()) + offset += line.toUtf8Bytes().size.toUInt() + //returning an Envelope without data if end of input is reached + } while (!line.startsWith(dataStart)) + + val dataSize = properties[DATA_LENGTH_PROPERTY]?.toULong() + return PartialEnvelope(meta, offset, dataSize) + } companion object : EnvelopeFormatFactory {