Fix text envelope formats partial reads

This commit is contained in:
Alexander Nozik 2022-04-29 18:35:44 +03:00
parent 6d396368b7
commit 82d37f4b55
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
13 changed files with 284 additions and 92 deletions

View File

@ -12,6 +12,7 @@
- DataTree `items` call is blocking.
- DataSet `getData` is no longer suspended and renamed to `get`
- DataSet operates with sequences of data instead of flows
- PartialEnvelope uses `Int` instead `UInt`.
### Deprecated
@ -19,6 +20,7 @@
### Fixed
- Meta file name in readMeta from directory
- Tagless and FrontMatter envelope partial readers fix.
### Security

View File

@ -4,7 +4,7 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.6.0-dev-2"
version = "0.6.0-dev-3"
repositories{
mavenCentral()
}

View File

@ -83,6 +83,10 @@ public class StaticData<T : Any>(
override val meta: Meta = Meta.EMPTY,
) : Data<T>, StaticGoal<T>(value)
@Suppress("FunctionName")
public inline fun <reified T : Any> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
StaticData(typeOf<T>(), value, meta)
@Suppress("FunctionName")
@DFInternal
public fun <T : Any> Data(

View File

@ -2,8 +2,8 @@ package space.kscience.dataforge.io.yaml
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.buildPacket
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.readUTF8Line
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.io.*
@ -11,6 +11,8 @@ import space.kscience.dataforge.io.IOFormat.Companion.META_KEY
import space.kscience.dataforge.io.IOFormat.Companion.NAME_KEY
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
@DFExperimental
public class FrontMatterEnvelopeFormat(
@ -19,51 +21,33 @@ public class FrontMatterEnvelopeFormat(
) : EnvelopeFormat {
override fun readPartial(input: Input): PartialEnvelope {
var line: String
var offset = 0u
do {
line = input.readUTF8Line() ?: error("Input does not contain front matter separator")
offset += line.encodeToByteArray().size.toUInt()
} while (!line.startsWith(SEPARATOR))
var offset = 0
val readMetaFormat =
metaTypeRegex.matchEntire(line)?.groupValues?.first()
?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
offset += input.discardWithSeparator(
SEPARATOR.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = false
)
val line = input.readSafeUtf8Line()
val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
//TODO replace by preview
val meta = Binary {
do {
line = input.readSafeUtf8Line()
writeUtf8String(line + "\r\n")
offset += line.encodeToByteArray().size.toUInt()
} while (!line.startsWith(SEPARATOR))
}.read {
readMetaFormat.readMeta(input)
val packet = buildPacket {
offset += input.readBytesWithSeparatorTo(
this,
SEPARATOR.encodeToByteArray(),
skipUntilEndOfLine = true
)
}
val meta = readMetaFormat.readMeta(packet)
return PartialEnvelope(meta, offset, null)
}
override fun readObject(input: Input): Envelope {
var line: String
do {
line = input.readSafeUtf8Line() //?: error("Input does not contain front matter separator")
} while (!line.startsWith(SEPARATOR))
val readMetaFormat =
metaTypeRegex.matchEntire(line)?.groupValues?.first()
?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
val meta = Binary {
do {
writeUtf8String(input.readSafeUtf8Line() + "\r\n")
} while (!line.startsWith(SEPARATOR))
}.read {
readMetaFormat.readMeta(input)
}
val bytes = input.readBytes()
val data = bytes.asBinary()
return SimpleEnvelope(meta, data)
val partial = readPartial(input)
val data = input.readBytes().asBinary()
return SimpleEnvelope(partial.meta, data)
}
override fun writeEnvelope(
@ -92,6 +76,8 @@ public class FrontMatterEnvelopeFormat(
private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "frontMatter"
override fun build(context: Context, meta: Meta): EnvelopeFormat {
return FrontMatterEnvelopeFormat(context.io, meta)
}

View File

@ -4,6 +4,7 @@ import space.kscience.dataforge.context.AbstractPlugin
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.io.EnvelopeFormatFactory
import space.kscience.dataforge.io.IOPlugin
import space.kscience.dataforge.io.MetaFormatFactory
import space.kscience.dataforge.meta.Meta
@ -20,6 +21,7 @@ public class YamlPlugin(meta: Meta) : AbstractPlugin(meta) {
override fun content(target: String): Map<Name, Any> = when (target) {
MetaFormatFactory.META_FORMAT_TYPE -> mapOf("yaml".asName() to YamlMetaFormat)
EnvelopeFormatFactory.ENVELOPE_FORMAT_TYPE -> mapOf(FrontMatterEnvelopeFormat.name to FrontMatterEnvelopeFormat)
else -> super.content(target)
}

View File

@ -0,0 +1,34 @@
package space.kscience.dataforge.io.yaml
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.io
import space.kscience.dataforge.io.readEnvelope
import space.kscience.dataforge.io.toByteArray
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import kotlin.test.Test
import kotlin.test.assertEquals
internal class FrontMatterEnvelopeFormatTest {
val context = Context {
plugin(YamlPlugin)
}
@Test
fun frontMatter(){
val text = """
---
content_type: magprog
magprog_section: contacts
section_title: Контакты
language: ru
---
Some text here
""".trimIndent()
val envelope = context.io.readEnvelope(text)
assertEquals("Some text here", envelope.data!!.toByteArray().decodeToString().trim())
assertEquals("magprog", envelope.meta["content_type"].string)
}
}

View File

@ -14,7 +14,7 @@ import kotlin.reflect.typeOf
/**
* A partially read envelope with meta, but without data
*/
public data class PartialEnvelope(val meta: Meta, val dataOffset: UInt, val dataSize: ULong?)
public data class PartialEnvelope(val meta: Meta, val dataOffset: Int, val dataSize: ULong?)
public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>()
@ -39,7 +39,6 @@ public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input)
@Type(ENVELOPE_FORMAT_TYPE)
public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat {
override val name: Name get() = "envelope".asName()
override val type: KType get() = typeOf<Envelope>()
override fun build(context: Context, meta: Meta): EnvelopeFormat
@ -51,6 +50,7 @@ public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeForm
public fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat?
public companion object {
public val ENVELOPE_FACTORY_NAME: Name = "envelope".asName()
public const val ENVELOPE_FORMAT_TYPE: String = "io.format.envelope"
}
}

View File

@ -92,7 +92,7 @@ public class TaggedEnvelopeFormat(
val meta: Meta = metaFormat.readObject(metaBinary)
return PartialEnvelope(meta, version.tagSize + tag.metaSize, tag.dataSize)
return PartialEnvelope(meta, (version.tagSize + tag.metaSize).toInt(), tag.dataSize)
}
private data class Tag(
@ -117,7 +117,7 @@ public class TaggedEnvelopeFormat(
private const val START_SEQUENCE = "#~"
private const val END_SEQUENCE = "~#\r\n"
override val name: Name = super.name + "tagged"
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "tagged"
override fun build(context: Context, meta: Meta): EnvelopeFormat {
val io = context.io

View File

@ -10,7 +10,7 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import kotlin.collections.set
/**
@ -33,7 +33,7 @@ public class TaglessEnvelopeFormat(
output: Output,
envelope: Envelope,
metaFormatFactory: MetaFormatFactory,
formatMeta: Meta
formatMeta: Meta,
) {
val metaFormat = metaFormatFactory.build(this.io.context, formatMeta)
@ -66,13 +66,16 @@ public class TaglessEnvelopeFormat(
}
override fun readObject(input: Input): Envelope {
var line: String
do {
line = input.readSafeUtf8Line() // ?: error("Input does not contain tagless envelope header")
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
//read preamble
input.discardWithSeparator(
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = true
)
val properties = HashMap<String, String>()
line = ""
var line = ""
while (line.isBlank() || line.startsWith("#?")) {
if (line.startsWith("#?")) {
val match = propertyPattern.find(line)
@ -80,9 +83,17 @@ public class TaglessEnvelopeFormat(
val (key, value) = match.destructured
properties[key] = value
}
//If can't read line, return envelope without data
if (input.endOfInput) return SimpleEnvelope(Meta.EMPTY, null)
line = input.readSafeUtf8Line()
try {
line = ByteArray {
try {
input.readBytesWithSeparatorTo(this, byteArrayOf('\n'.code.toByte()), 1024)
} catch (ex: BufferLimitExceededException) {
throw IllegalStateException("Property line exceeds maximum line length (1024)", ex)
}
}.decodeToString().trim()
} catch (ex: EOFException) {
return SimpleEnvelope(Meta.EMPTY, Binary.EMPTY)
}
}
var meta: Meta = Meta.EMPTY
@ -93,18 +104,16 @@ public class TaglessEnvelopeFormat(
meta = if (metaSize != null) {
metaFormat.readObject(input.readBinary(metaSize))
} else {
metaFormat.readObject(input)
error("Can't partially read an envelope with undefined meta size")
}
}
do {
try {
line = input.readSafeUtf8Line()
} catch (ex: EOFException) {
//returning an Envelope without data if end of input is reached
return SimpleEnvelope(meta, null)
}
} while (!line.startsWith(dataStart))
//skip until data start
input.discardWithSeparator(
dataStart.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = true
)
val data: Binary = if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
input.readBinary(properties[DATA_LENGTH_PROPERTY]!!.toInt())
@ -112,24 +121,27 @@ public class TaglessEnvelopeFormat(
// readByteArray(bytes)
// bytes.asBinary()
} else {
Binary {
input.copyTo(this)
}
input.readBytes().asBinary()
}
return SimpleEnvelope(meta, data)
}
override fun readPartial(input: Input): PartialEnvelope {
var offset = 0u
var line: String
do {
line = input.readSafeUtf8Line()// ?: error("Input does not contain tagless envelope header")
offset += line.encodeToByteArray().size.toUInt()
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
var offset = 0
//read preamble
offset += input.discardWithSeparator(
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = true
)
val properties = HashMap<String, String>()
line = ""
var line = ""
while (line.isBlank() || line.startsWith("#?")) {
if (line.startsWith("#?")) {
val match = propertyPattern.find(line)
@ -138,10 +150,16 @@ public class TaglessEnvelopeFormat(
properties[key] = value
}
try {
line = input.readSafeUtf8Line()
offset += line.encodeToByteArray().size.toUInt()
line = ByteArray {
val read = try {
input.readBytesWithSeparatorTo(this, byteArrayOf('\n'.code.toByte()), 1024)
} catch (ex: BufferLimitExceededException) {
throw IllegalStateException("Property line exceeds maximum line length (1024)", ex)
}
offset += read
}.decodeToString().trim()
} catch (ex: EOFException) {
return PartialEnvelope(Meta.EMPTY, offset.toUInt(), 0.toULong())
return PartialEnvelope(Meta.EMPTY, offset, 0.toULong())
}
}
@ -151,18 +169,19 @@ public class TaglessEnvelopeFormat(
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.resolveMetaFormat(it) } ?: JsonMetaFormat
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
meta = if (metaSize != null) {
offset += metaSize.toUInt()
offset += metaSize
metaFormat.readObject(input.readBinary(metaSize))
} else {
error("Can't partially read an envelope with undefined meta size")
}
}
do {
line = input.readSafeUtf8Line() //?: return PartialEnvelope(Meta.EMPTY, offset.toUInt(), 0.toULong())
offset += line.encodeToByteArray().size.toUInt()
//returning an Envelope without data if end of input is reached
} while (!line.startsWith(dataStart))
//skip until data start
offset += input.discardWithSeparator(
dataStart.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = true
)
val dataSize = properties[DATA_LENGTH_PROPERTY]?.toULong()
return PartialEnvelope(meta, offset, dataSize)
@ -192,7 +211,7 @@ public class TaglessEnvelopeFormat(
public const val code: Int = 0x4446544c //DFTL
override val name: Name = TAGLESS_ENVELOPE_TYPE.asName()
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + TAGLESS_ENVELOPE_TYPE
override fun build(context: Context, meta: Meta): EnvelopeFormat = TaglessEnvelopeFormat(context.io, meta)

View File

@ -1,8 +1,10 @@
package space.kscience.dataforge.io
import io.ktor.utils.io.bits.Memory
import io.ktor.utils.io.charsets.Charsets
import io.ktor.utils.io.charsets.decodeExactBytes
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.ChunkBuffer
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import kotlin.math.min
@ -86,7 +88,7 @@ public fun EnvelopeFormat.readBinary(binary: Binary): Envelope {
* A zero-copy read from
*/
@DFExperimental
public fun IOPlugin.readEnvelopeBinary(
public fun IOPlugin.readEnvelope(
binary: Binary,
readNonEnvelopes: Boolean = false,
formatPicker: IOPlugin.(Binary) -> EnvelopeFormat? = IOPlugin::peekBinaryEnvelopeFormat,
@ -94,3 +96,102 @@ public fun IOPlugin.readEnvelopeBinary(
// if no format accepts file, read it as binary
SimpleEnvelope(Meta.EMPTY, binary)
} else error("Can't infer format for $binary")
@DFExperimental
public fun IOPlugin.readEnvelope(
string: String,
readNonEnvelopes: Boolean = false,
formatPicker: IOPlugin.(Binary) -> EnvelopeFormat? = IOPlugin::peekBinaryEnvelopeFormat,
): Envelope = readEnvelope(string.encodeToByteArray().asBinary(), readNonEnvelopes, formatPicker)
private class RingByteArray(
private val buffer: ByteArray,
private var startIndex: Int = 0,
var size: Int = 0,
) {
operator fun get(index: Int): Byte {
require(index >= 0) { "Index must be positive" }
require(index < size) { "Index $index is out of circular buffer size $size" }
return buffer[startIndex.forward(index)]
}
fun isFull(): Boolean = size == buffer.size
fun push(element: Byte) {
buffer[startIndex.forward(size)] = element
if (isFull()) startIndex++ else size++
}
private fun Int.forward(n: Int): Int = (this + n) % (buffer.size)
fun compare(inputArray: ByteArray): Boolean = when {
inputArray.size != buffer.size -> false
size < buffer.size -> false
else -> inputArray.indices.all { inputArray[it] == get(it) }
}
}
/**
* Read [Input] into [output] until designated multy-byte [separator] and optionally continues until
* the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read.
* Also fails if [separator] not found until the end of input.
*
* Separator itself is not read into Output.
*
* @return bytes actually being read, including separator
*/
public fun Input.readBytesWithSeparatorTo(
output: Output,
separator: ByteArray,
atMost: Int = Int.MAX_VALUE,
skipUntilEndOfLine: Boolean = false,
): Int {
var counter = 0
val rb = RingByteArray(ByteArray(separator.size))
var separatorFound = false
takeWhile { buffer ->
while (buffer.canRead()) {
val byte = buffer.readByte()
counter++
if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.")
//If end-of-line-search is on, terminate
if (separatorFound) {
if (endOfInput || byte == '\n'.code.toByte()) {
return counter
}
} else {
rb.push(byte)
if (rb.compare(separator)) {
separatorFound = true
if (!skipUntilEndOfLine) {
return counter
}
} else if (rb.isFull()) {
output.writeByte(rb[0])
}
}
}
!endOfInput
}
error("Read to the end of input without encountering ${separator.decodeToString()}")
}
public fun Input.discardWithSeparator(
separator: ByteArray,
atMost: Int = Int.MAX_VALUE,
skipUntilEndOfLine: Boolean = false,
): Int {
val dummy: Output = object :Output(ChunkBuffer.Pool){
override fun closeDestination() {
// Do nothing
}
override fun flush(source: Memory, offset: Int, length: Int) {
// Do nothing
}
}
return readBytesWithSeparatorTo(dummy, separator, atMost, skipUntilEndOfLine)
}

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.readDouble
import io.ktor.utils.io.core.writeDouble
import kotlin.test.Test
@ -39,6 +40,8 @@ class EnvelopeFormatTest {
TaglessEnvelopeFormat.run {
val byteArray = writeToByteArray(envelope)
//println(byteArray.decodeToString())
val partial = readPartial(ByteReadPacket(byteArray))
assertEquals(8, partial.dataSize?.toInt())
val res = readFromByteArray(byteArray)
assertEquals(envelope.meta, res.meta)
val double = res.data?.read {

View File

@ -2,8 +2,10 @@ package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.readUTF8Line
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFails
class IOTest {
@Test
@ -14,4 +16,42 @@ class IOTest {
val second = input.readBytes(4)
assertEquals(4.toByte(), second[0])
}
@Test
fun readUntilSeparator() {
val source = """
aaa
bbb
---
ccc
ddd
""".trimIndent()
val binary = source.encodeToByteArray().asBinary()
binary.read {
val array = ByteArray {
val read = readBytesWithSeparatorTo(this, "---".encodeToByteArray(), skipUntilEndOfLine = true)
assertEquals(12, read)
}
assertEquals("""
aaa
bbb
""".trimIndent(),array.decodeToString().trim())
assertEquals("ccc", readUTF8Line()?.trim())
}
assertFails {
binary.read {
discardWithSeparator("---".encodeToByteArray(), atMost = 3)
}
}
assertFails {
binary.read{
discardWithSeparator("-+-".encodeToByteArray())
}
}
}
}

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
@ -34,7 +35,7 @@ public interface Workspace : ContextAware, Provider {
return when (target) {
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)}
Task.TYPE -> tasks
//Data.TYPE -> data.flow().toMap()
Data.TYPE -> data.dataSequence().associateBy { it.name }
else -> emptyMap()
}
}
@ -46,7 +47,7 @@ public interface Workspace : ContextAware, Provider {
}
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): TaskData<*>? =
produce(taskName, taskMeta).get(name)
produce(taskName, taskMeta)[name]
public companion object {
public const val TYPE: String = "workspace"