Simplify DFTL. fix io bugs

This commit is contained in:
Alexander Nozik 2023-01-25 18:56:19 +03:00
parent 82838b6a92
commit 5d7ddb4e00
21 changed files with 216 additions and 372 deletions

View File

@ -9,6 +9,7 @@
- More fine-grained types in Action builders. - More fine-grained types in Action builders.
### Changed ### Changed
- Simplified `DFTL` envelope format. Closing symbols are unnecessary. Properties are discontinued.
- Meta `get` method allows nullable receiver - Meta `get` method allows nullable receiver
- `withDefault` functions do not add new keys to meta children and are consistent. - `withDefault` functions do not add new keys to meta children and are consistent.
- `dataforge.meta.values` package is merged into `dataforge.meta` for better star imports - `dataforge.meta.values` package is merged into `dataforge.meta` for better star imports

View File

@ -9,7 +9,7 @@ plugins {
allprojects { allprojects {
group = "space.kscience" group = "space.kscience"
version = "0.6.0-dev-15" version = "0.6.1-dev-1"
} }
subprojects { subprojects {

View File

@ -1,13 +1,13 @@
plugins { plugins {
id("space.kscience.gradle.mpp") id("space.kscience.gradle.mpp")
// id("space.kscience.gradle.native")
} }
description = "YAML meta IO" description = "YAML meta IO"
kscience { kscience {
native()
useSerialization{ useSerialization{
yamlKt("0.9.0-dev-1") yamlKt()
} }
} }

View File

@ -2,8 +2,6 @@ package space.kscience.dataforge.io.yaml
import io.ktor.utils.io.core.Input import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.buildPacket
import io.ktor.utils.io.core.readBytes
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
@ -14,50 +12,46 @@ import space.kscience.dataforge.names.plus
public class FrontMatterEnvelopeFormat( public class FrontMatterEnvelopeFormat(
private val io: IOPlugin, private val io: IOPlugin,
private val meta: Meta = Meta.EMPTY, private val meta: Meta = Meta.EMPTY,
private val metaFormatFactory: MetaFormatFactory = YamlMetaFormat,
) : EnvelopeFormat { ) : EnvelopeFormat {
override fun readPartial(input: Input): PartialEnvelope { override fun readObject(binary: Binary): Envelope = binary.read {
var offset = 0 var offset = 0
offset += input.discardWithSeparator( offset += discardWithSeparator(
SEPARATOR.encodeToByteArray(), SEPARATOR.encodeToByteArray(),
atMost = 1024, atMost = 1024,
skipUntilEndOfLine = false
) )
val line = input.readSafeUtf8Line() val line = ByteArray {
offset += readWithSeparatorTo(this, "\n".encodeToByteArray())
}.decodeToString()
val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
//TODO replace by preview val packet = ByteArray {
val packet = buildPacket { offset += readWithSeparatorTo(this, SEPARATOR.encodeToByteArray())
offset += input.readBytesWithSeparatorTo(
this,
SEPARATOR.encodeToByteArray(),
skipUntilEndOfLine = true
)
}
val meta = readMetaFormat.readMeta(packet)
return PartialEnvelope(meta, offset, null)
} }
override fun readObject(input: Input): Envelope { offset += discardLine()
val partial = readPartial(input)
val data = input.readBytes().asBinary() val meta = readMetaFormat.readObject(packet.asBinary())
return SimpleEnvelope(partial.meta, data) Envelope(meta, binary.view(offset))
} }
override fun writeEnvelope( override fun readObject(input: Input): Envelope = readObject(input.readBinary())
override fun writeObject(
output: Output, output: Output,
envelope: Envelope, obj: Envelope,
metaFormatFactory: MetaFormatFactory,
formatMeta: Meta,
) { ) {
val metaFormat = metaFormatFactory.build(this@FrontMatterEnvelopeFormat.io.context, formatMeta) val metaFormat = metaFormatFactory.build(io.context, meta)
output.writeRawString("$SEPARATOR\r\n") val formatSuffix = if (metaFormat is YamlMetaFormat) "" else metaFormatFactory.shortName
metaFormat.run { this.writeObject(output, envelope.meta) } output.writeRawString("$SEPARATOR${formatSuffix}\r\n")
metaFormat.run { metaFormat.writeObject(output, obj.meta) }
output.writeRawString("$SEPARATOR\r\n") output.writeRawString("$SEPARATOR\r\n")
//Printing data //Printing data
envelope.data?.let { data -> obj.data?.let { data ->
output.writeBinary(data) output.writeBinary(data)
} }
} }
@ -84,15 +78,12 @@ public class FrontMatterEnvelopeFormat(
private val default by lazy { build(Global, Meta.EMPTY) } private val default by lazy { build(Global, Meta.EMPTY) }
override fun readPartial(input: Input): PartialEnvelope = override fun readObject(binary: Binary): Envelope = default.readObject(binary)
default.readPartial(input)
override fun writeEnvelope( override fun writeObject(
output: Output, output: Output,
envelope: Envelope, obj: Envelope,
metaFormatFactory: MetaFormatFactory, ): Unit = default.writeObject(output, obj)
formatMeta: Meta,
): Unit = default.writeEnvelope(output, envelope, metaFormatFactory, formatMeta)
override fun readObject(input: Input): Envelope = default.readObject(input) override fun readObject(input: Input): Envelope = default.readObject(input)

View File

@ -21,6 +21,12 @@ public interface Binary {
public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Input.() -> R): R public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Input.() -> R): R
/**
* Read a binary with given [offset] relative to this binary and given [binarySize].
* In general, resulting binary is of the same type as this one, but it is not guaranteed.
*/
public fun view(offset: Int, binarySize: Int = size - offset): Binary
public companion object { public companion object {
public val EMPTY: Binary = ByteArrayBinary(ByteArray(0)) public val EMPTY: Binary = ByteArrayBinary(ByteArray(0))
} }
@ -57,6 +63,9 @@ internal class ByteArrayBinary(
input.close() input.close()
} }
} }
override fun view(offset: Int, binarySize: Int): ByteArrayBinary =
ByteArrayBinary(array, start + offset, binarySize)
} }
public fun ByteArray.asBinary(): Binary = ByteArrayBinary(this) public fun ByteArray.asBinary(): Binary = ByteArrayBinary(this)
@ -65,7 +74,7 @@ public fun ByteArray.asBinary(): Binary = ByteArrayBinary(this)
* Produce a [ByteArray] representing an exact copy of this [Binary] * Produce a [ByteArray] representing an exact copy of this [Binary]
*/ */
public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) { public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
array.copyOf() // TODO do we need to ensure data safety here? array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here?
} else { } else {
read { read {
readBytes() readBytes()
@ -73,8 +82,8 @@ public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
} }
//TODO optimize for file-based Inputs //TODO optimize for file-based Inputs
public fun Input.readBinary(size: Int): Binary { public fun Input.readBinary(size: Int? = null): Binary {
val array = readBytes(size) val array = if (size == null) readBytes() else readBytes(size)
return ByteArrayBinary(array) return ByteArrayBinary(array)
} }

View File

@ -34,7 +34,7 @@ public interface Envelope {
} }
} }
public class SimpleEnvelope(override val meta: Meta, override val data: Binary?) : Envelope internal class SimpleEnvelope(override val meta: Meta, override val data: Binary?) : Envelope
public fun Envelope(meta: Meta, data: Binary?): Envelope = SimpleEnvelope(meta, data) public fun Envelope(meta: Meta, data: Binary?): Envelope = SimpleEnvelope(meta, data)

View File

@ -1,7 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Input import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -11,29 +10,9 @@ import space.kscience.dataforge.names.asName
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
/**
* A partially read envelope with meta, but without data
*/
public data class PartialEnvelope(val meta: Meta, val dataOffset: Int, val dataSize: ULong?)
public interface EnvelopeFormat : IOFormat<Envelope> { public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>() override val type: KType get() = typeOf<Envelope>()
public val defaultMetaFormat: MetaFormatFactory get() = JsonMetaFormat
public fun readPartial(input: Input): PartialEnvelope
public fun writeEnvelope(
output: Output,
envelope: Envelope,
metaFormatFactory: MetaFormatFactory = defaultMetaFormat,
formatMeta: Meta = Meta.EMPTY,
)
override fun readObject(input: Input): Envelope
override fun writeObject(output: Output, obj: Envelope): Unit = writeEnvelope(output, obj)
} }
public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input) public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input)

View File

@ -1,12 +1,10 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import space.kscience.dataforge.context.invoke
import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY
import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR
import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE
import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_KEY import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_KEY
import space.kscience.dataforge.io.PartDescriptor.Companion.PARTS_KEY import space.kscience.dataforge.io.PartDescriptor.Companion.PARTS_KEY
import space.kscience.dataforge.io.PartDescriptor.Companion.PART_FORMAT_KEY
import space.kscience.dataforge.io.PartDescriptor.Companion.SEPARATOR_KEY import space.kscience.dataforge.io.PartDescriptor.Companion.SEPARATOR_KEY
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
@ -24,8 +22,6 @@ private class PartDescriptor : Scheme() {
const val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n" const val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n"
val PART_FORMAT_KEY = "format".asName()
const val MULTIPART_DATA_TYPE = "envelope.multipart" const val MULTIPART_DATA_TYPE = "envelope.multipart"
} }
} }
@ -73,21 +69,12 @@ public fun EnvelopeBuilder.multipart(
*/ */
public fun EnvelopeBuilder.envelopes( public fun EnvelopeBuilder.envelopes(
envelopes: List<Envelope>, envelopes: List<Envelope>,
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
formatMeta: Meta? = null,
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR,
) { ) {
val parts = envelopes.map { val parts = envelopes.map {
val format = formatMeta?.let { formatFactory(formatMeta) } ?: formatFactory val binary = Binary(it, TaggedEnvelopeFormat)
val binary = Binary(it, format)
EnvelopePart(binary, null) EnvelopePart(binary, null)
} }
meta {
(MULTIPART_KEY + PART_FORMAT_KEY) put {
IOFormatFactory.NAME_KEY put formatFactory.name.toString()
formatMeta?.let { IOFormatFactory.META_KEY put formatMeta }
}
}
multipart(parts, separator) multipart(parts, separator)
} }
@ -115,14 +102,4 @@ public val EnvelopePart.name: String? get() = description?.get("name").string
/** /**
* Represent envelope part by an envelope * Represent envelope part by an envelope
*/ */
public fun EnvelopePart.envelope(plugin: IOPlugin): Envelope { public fun EnvelopePart.envelope(): Envelope = binary.readWith(TaggedEnvelopeFormat)
val formatItem = description?.get(PART_FORMAT_KEY)
return if (formatItem != null) {
val format: EnvelopeFormat = plugin.resolveEnvelopeFormat(formatItem)
?: error("Envelope format for $formatItem is not resolved")
binary.readWith(format)
} else {
error("Envelope description not found")
//SimpleEnvelope(description ?: Meta.EMPTY, binary)
}
}

View File

@ -25,6 +25,8 @@ public interface IOReader<out T> {
public val type: KType public val type: KType
public fun readObject(input: Input): T public fun readObject(input: Input): T
public fun readObject(binary: Binary): T = binary.read { readObject(this) }
} }
public inline fun <reified T> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> { public inline fun <reified T> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> {

View File

@ -18,6 +18,7 @@ import space.kscience.dataforge.names.plus
public class TaggedEnvelopeFormat( public class TaggedEnvelopeFormat(
public val io: IOPlugin, public val io: IOPlugin,
public val version: VERSION = VERSION.DF02, public val version: VERSION = VERSION.DF02,
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat
) : EnvelopeFormat { ) : EnvelopeFormat {
// private val metaFormat = io.metaFormat(metaFormatKey) // private val metaFormat = io.metaFormat(metaFormatKey)
@ -40,20 +41,18 @@ public class TaggedEnvelopeFormat(
writeRawString(END_SEQUENCE) writeRawString(END_SEQUENCE)
} }
override fun writeEnvelope( override fun writeObject(
output: Output, output: Output,
envelope: Envelope, obj: Envelope,
metaFormatFactory: MetaFormatFactory,
formatMeta: Meta,
) { ) {
val metaFormat = metaFormatFactory.build(this@TaggedEnvelopeFormat.io.context, formatMeta) val metaFormat = metaFormatFactory.build(io.context, Meta.EMPTY)
val metaBytes = Binary(envelope.meta,metaFormat) val metaBytes = Binary(obj.meta,metaFormat)
val actualSize: ULong = (envelope.data?.size ?: 0).toULong() val actualSize: ULong = (obj.data?.size ?: 0).toULong()
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize) val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
output.writeBinary(tag.toBinary()) output.writeBinary(tag.toBinary())
output.writeBinary(metaBytes) output.writeBinary(metaBytes)
output.writeRawString("\r\n") output.writeRawString("\r\n")
envelope.data?.let { obj.data?.let {
output.writeBinary(it) output.writeBinary(it)
} }
} }
@ -79,18 +78,18 @@ public class TaggedEnvelopeFormat(
return SimpleEnvelope(meta, data) return SimpleEnvelope(meta, data)
} }
override fun readPartial(input: Input): PartialEnvelope { override fun readObject(binary: Binary): Envelope = binary.read{
val tag = input.readTag(this.version) val tag = readTag(version)
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey) val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
?: error("Meta format with key ${tag.metaFormatKey} not found") ?: error("Meta format with key ${tag.metaFormatKey} not found")
val metaBinary = input.readBinary(tag.metaSize.toInt()) val metaBinary = readBinary(tag.metaSize.toInt())
val meta: Meta = metaFormat.readObjectFrom(metaBinary) val meta: Meta = metaFormat.readObjectFrom(metaBinary)
return PartialEnvelope(meta, (version.tagSize + tag.metaSize).toInt(), tag.dataSize) SimpleEnvelope(meta, binary.view((version.tagSize + tag.metaSize).toInt(), tag.dataSize.toInt()))
} }
private data class Tag( private data class Tag(
@ -155,20 +154,16 @@ public class TaggedEnvelopeFormat(
private val default by lazy { build(Global, Meta.EMPTY) } private val default by lazy { build(Global, Meta.EMPTY) }
override fun readPartial(input: Input): PartialEnvelope = override fun readObject(binary: Binary): Envelope =
default.run { readPartial(input) } default.run { readObject(binary) }
override fun writeEnvelope( override fun writeObject(
output: Output, output: Output,
envelope: Envelope, obj: Envelope,
metaFormatFactory: MetaFormatFactory,
formatMeta: Meta,
): Unit = default.run { ): Unit = default.run {
writeEnvelope( writeObject(
output, output,
envelope, obj,
metaFormatFactory,
formatMeta
) )
} }

View File

@ -1,64 +1,54 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.* import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.readUTF8UntilDelimiterTo
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.isEmpty import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.plus
import kotlin.collections.set
/** /**
* A text envelope format with human-readable tag. * A text envelope format based on block separators.
* TODO add description * TODO add description
*/ */
public class TaglessEnvelopeFormat( public class TaglessEnvelopeFormat(
public val io: IOPlugin, public val io: IOPlugin,
public val meta: Meta = Meta.EMPTY, public val meta: Meta = Meta.EMPTY,
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat,
) : EnvelopeFormat { ) : EnvelopeFormat {
private val metaStart = meta[META_START_PROPERTY].string ?: DEFAULT_META_START // private val metaStart = meta[META_START_PROPERTY].string ?: DEFAULT_META_START
private val dataStart = meta[DATA_START_PROPERTY].string ?: DEFAULT_DATA_START // private val dataStart = meta[DATA_START_PROPERTY].string ?: DEFAULT_DATA_START
private fun Output.writeProperty(key: String, value: Any) { // private fun Output.writeProperty(key: String, value: Any) {
writeFully("#? $key: $value;\r\n".encodeToByteArray()) // writeFully("#? $key: $value;\r\n".encodeToByteArray())
} // }
override fun writeEnvelope( override fun writeObject(
output: Output, output: Output,
envelope: Envelope, obj: Envelope,
metaFormatFactory: MetaFormatFactory,
formatMeta: Meta,
) { ) {
val metaFormat = metaFormatFactory.build(this.io.context, formatMeta) val metaFormat = metaFormatFactory.build(this.io.context, meta)
//printing header //printing header
output.writeRawString(TAGLESS_ENVELOPE_HEADER + "\r\n") output.writeRawString(TAGLESS_ENVELOPE_HEADER + "\r\n")
//printing all properties
output.writeProperty(META_TYPE_PROPERTY,
metaFormatFactory.shortName)
//TODO add optional metaFormat properties
val actualSize: Int = envelope.data?.size ?: 0
output.writeProperty(DATA_LENGTH_PROPERTY, actualSize)
//Printing meta //Printing meta
if (!envelope.meta.isEmpty()) { if (!obj.meta.isEmpty()) {
val metaBinary = Binary(envelope.meta, metaFormat) val metaBinary = Binary(obj.meta, metaFormat)
output.writeProperty(META_LENGTH_PROPERTY, output.writeUtf8String(META_START + "-${metaFormatFactory.shortName}\r\n")
metaBinary.size + 2)
output.writeUtf8String(this.metaStart + "\r\n")
output.writeBinary(metaBinary) output.writeBinary(metaBinary)
output.writeRawString("\r\n") output.writeRawString("\r\n")
} }
//Printing data //Printing data
envelope.data?.let { data -> obj.data?.let { data ->
output.writeUtf8String(this.dataStart + "\r\n") //val actualSize: Int = envelope.data?.size ?: 0
output.writeUtf8String(DATA_START + "\r\n")
output.writeBinary(data) output.writeBinary(data)
} }
} }
@ -68,121 +58,46 @@ public class TaglessEnvelopeFormat(
input.discardWithSeparator( input.discardWithSeparator(
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(), TAGLESS_ENVELOPE_HEADER.encodeToByteArray(),
atMost = 1024, atMost = 1024,
skipUntilEndOfLine = true
) )
val properties = HashMap<String, String>()
var line = ""
while (line.isBlank() || line.startsWith("#?")) {
if (line.startsWith("#?")) {
val match = propertyPattern.find(line)
?: error("Line $line does not match property declaration pattern")
val (key, value) = match.destructured
properties[key] = value
}
try {
line = ByteArray {
try {
input.readBytesWithSeparatorTo(this, byteArrayOf('\n'.code.toByte()), 1024)
} catch (ex: BufferLimitExceededException) {
throw IllegalStateException("Property line exceeds maximum line length (1024)", ex)
}
}.decodeToString().trim()
} catch (ex: EOFException) {
return SimpleEnvelope(Meta.EMPTY, Binary.EMPTY)
}
}
var meta: Meta = Meta.EMPTY var meta: Meta = Meta.EMPTY
if (line.startsWith(metaStart)) { var data: Binary? = null
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.resolveMetaFormat(it) } ?: JsonMetaFormat
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
meta = if (metaSize != null) {
metaFormat.readObjectFrom(input.readBinary(metaSize))
} else {
error("Can't partially read an envelope with undefined meta size")
}
}
//skip until data start
input.discardWithSeparator( input.discardWithSeparator(
dataStart.encodeToByteArray(), SEPARATOR_PREFIX,
atMost = 1024, atMost = 1024,
skipUntilEndOfLine = true
) )
val data: Binary = if (properties.containsKey(DATA_LENGTH_PROPERTY)) { var header: String = ByteArray {
input.readBinary(properties[DATA_LENGTH_PROPERTY]!!.toInt()) input.readUTF8UntilDelimiterTo(this, "\n")
// val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt()) }.decodeToString()
// readByteArray(bytes)
// bytes.asBinary() while (!input.endOfInput) {
} else { val block = ByteArray {
input.readBytes().asBinary() input.readWithSeparatorTo(this, SEPARATOR_PREFIX)
} }
return SimpleEnvelope(meta, data) val nextHeader = ByteArray {
input.readWithSeparatorTo(this, "\n".encodeToByteArray())
}.decodeToString()
//terminate on end
if (header.startsWith("END")) break
if (header.startsWith("META")) {
//TODO check format
val metaFormat: MetaFormatFactory = JsonMetaFormat
meta = metaFormat.readMeta(ByteReadPacket(block))
} }
if (header.startsWith("DATA")) {
override fun readPartial(input: Input): PartialEnvelope { data = block.asBinary()
var offset = 0
//read preamble
offset += input.discardWithSeparator(
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = true
)
val properties = HashMap<String, String>()
var line = ""
while (line.isBlank() || line.startsWith("#?")) {
if (line.startsWith("#?")) {
val match = propertyPattern.find(line)
?: error("Line $line does not match property declaration pattern")
val (key, value) = match.destructured
properties[key] = value
} }
try { header = nextHeader
line = ByteArray {
val read = try {
input.readBytesWithSeparatorTo(this, byteArrayOf('\n'.code.toByte()), 1024)
} catch (ex: BufferLimitExceededException) {
throw IllegalStateException("Property line exceeds maximum line length (1024)", ex)
} }
offset += read return Envelope(meta, data)
}.decodeToString().trim()
} catch (ex: EOFException) {
return PartialEnvelope(Meta.EMPTY, offset, 0.toULong())
}
}
var meta: Meta = Meta.EMPTY
if (line.startsWith(metaStart)) {
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.resolveMetaFormat(it) } ?: JsonMetaFormat
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
meta = if (metaSize != null) {
offset += metaSize
metaFormat.readObjectFrom(input.readBinary(metaSize))
} else {
error("Can't partially read an envelope with undefined meta size")
}
}
//skip until data start
offset += input.discardWithSeparator(
dataStart.encodeToByteArray(),
atMost = 1024,
skipUntilEndOfLine = true
)
val dataSize = properties[DATA_LENGTH_PROPERTY]?.toULong()
return PartialEnvelope(meta, offset, dataSize)
} }
public companion object : EnvelopeFormatFactory { public companion object : EnvelopeFormatFactory {
@ -196,11 +111,17 @@ public class TaglessEnvelopeFormat(
public const val TAGLESS_ENVELOPE_TYPE: String = "tagless" public const val TAGLESS_ENVELOPE_TYPE: String = "tagless"
public const val TAGLESS_ENVELOPE_HEADER: String = "#~DFTL~#" public val SEPARATOR_PREFIX: ByteArray = "\n#~".encodeToByteArray()
public const val META_START_PROPERTY: String = "metaSeparator"
public const val DEFAULT_META_START: String = "#~META~#" public const val TAGLESS_ENVELOPE_HEADER: String = "#~DFTL"
public const val DATA_START_PROPERTY: String = "dataSeparator"
public const val DEFAULT_DATA_START: String = "#~DATA~#" // public const val META_START_PROPERTY: String = "metaSeparator"
public const val META_START: String = "#~META"
// public const val DATA_START_PROPERTY: String = "dataSeparator"
public const val DATA_START: String = "#~DATA"
public const val END: String = "#~END"
public const val code: Int = 0x4446544c //DFTL public const val code: Int = 0x4446544c //DFTL
@ -210,20 +131,15 @@ public class TaglessEnvelopeFormat(
private val default by lazy { build(Global, Meta.EMPTY) } private val default by lazy { build(Global, Meta.EMPTY) }
override fun readPartial(input: Input): PartialEnvelope = override fun readObject(binary: Binary): Envelope = default.run { readObject(binary) }
default.run { readPartial(input) }
override fun writeEnvelope( override fun writeObject(
output: Output, output: Output,
envelope: Envelope, obj: Envelope,
metaFormatFactory: MetaFormatFactory,
formatMeta: Meta,
): Unit = default.run { ): Unit = default.run {
writeEnvelope( writeObject(
output, output,
envelope, obj,
metaFormatFactory,
formatMeta
) )
} }

View File

@ -7,7 +7,6 @@ import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.ChunkBuffer import io.ktor.utils.io.core.internal.ChunkBuffer
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.math.min
public fun Output.writeRawString(str: String) { public fun Output.writeRawString(str: String) {
writeFully(str.toByteArray(Charsets.ISO_8859_1)) writeFully(str.toByteArray(Charsets.ISO_8859_1))
@ -31,26 +30,7 @@ public inline fun ByteArray(block: Output.() -> Unit): ByteArray =
public inline fun Binary(block: Output.() -> Unit): Binary = public inline fun Binary(block: Output.() -> Unit): Binary =
ByteArray(block).asBinary() ByteArray(block).asBinary()
/** public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first)
* View section of a [Binary] as an independent binary
*/
public class BinaryView(private val source: Binary, private val start: Int, override val size: Int) : Binary {
init {
require(start > 0)
require(start + size <= source.size) { "View boundary is outside source binary size" }
}
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R =
source.read(start + offset, min(size, atMost), block)
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R =
source.readSuspend(start + offset, min(size, atMost), block)
}
public fun Binary.view(start: Int, size: Int): BinaryView = BinaryView(this, start, size)
public operator fun Binary.get(range: IntRange): BinaryView = view(range.first, range.last - range.first)
/** /**
* Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If * Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If
@ -68,22 +48,6 @@ public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? {
} }
} }
/**
* Zero-copy read this binary as an envelope using given [this@toEnvelope]
*/
@DFExperimental
public fun EnvelopeFormat.readBinary(binary: Binary): Envelope {
val partialEnvelope: PartialEnvelope = binary.read {
run {
readPartial(this@read)
}
}
val offset: Int = partialEnvelope.dataOffset.toInt()
val size: Int = partialEnvelope.dataSize?.toInt() ?: (binary.size - offset)
val envelopeBinary = BinaryView(binary, offset, size)
return SimpleEnvelope(partialEnvelope.meta, envelopeBinary)
}
/** /**
* A zero-copy read from * A zero-copy read from
*/ */
@ -92,9 +56,9 @@ public fun IOPlugin.readEnvelope(
binary: Binary, binary: Binary,
readNonEnvelopes: Boolean = false, readNonEnvelopes: Boolean = false,
formatPicker: IOPlugin.(Binary) -> EnvelopeFormat? = IOPlugin::peekBinaryEnvelopeFormat, formatPicker: IOPlugin.(Binary) -> EnvelopeFormat? = IOPlugin::peekBinaryEnvelopeFormat,
): Envelope = formatPicker(binary)?.readBinary(binary) ?: if (readNonEnvelopes) { ): Envelope = formatPicker(binary)?.readObject(binary) ?: if (readNonEnvelopes) {
// if no format accepts file, read it as binary // if no format accepts file, read it as binary
SimpleEnvelope(Meta.EMPTY, binary) Envelope(Meta.EMPTY, binary)
} else error("Can't infer format for $binary") } else error("Can't infer format for $binary")
@DFExperimental @DFExperimental
@ -126,62 +90,70 @@ private class RingByteArray(
private fun Int.forward(n: Int): Int = (this + n) % (buffer.size) private fun Int.forward(n: Int): Int = (this + n) % (buffer.size)
fun compare(inputArray: ByteArray): Boolean = when { fun contentEquals(inputArray: ByteArray): Boolean = when {
inputArray.size != buffer.size -> false inputArray.size != buffer.size -> false
size < buffer.size -> false size < buffer.size -> false
else -> inputArray.indices.all { inputArray[it] == get(it) } else -> inputArray.indices.all { inputArray[it] == get(it) }
} }
} }
private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) }
/** /**
* Read [Input] into [output] until designated multy-byte [separator] and optionally continues until * Read [Input] into [output] until designated multibyte [separator] and optionally continues until
* the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read. * the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read.
* Also fails if [separator] not found until the end of input. * Also fails if [separator] not found until the end of input.
* *
* Separator itself is not read into Output. * Separator itself is not read into Output.
* *
* @param errorOnEof if true error is thrown if separator is never encountered
*
* @return bytes actually being read, including separator * @return bytes actually being read, including separator
*/ */
public fun Input.readBytesWithSeparatorTo( public fun Input.readWithSeparatorTo(
output: Output, output: Output,
separator: ByteArray, separator: ByteArray,
atMost: Int = Int.MAX_VALUE, atMost: Int = Int.MAX_VALUE,
skipUntilEndOfLine: Boolean = false, errorOnEof: Boolean = false,
): Int { ): Int {
var counter = 0 var counter = 0
val rb = RingByteArray(ByteArray(separator.size)) val rb = RingByteArray(ByteArray(separator.size))
var separatorFound = false
takeWhile { buffer -> takeWhile { buffer ->
while (buffer.canRead()) { while (buffer.canRead()) {
val byte = buffer.readByte() val byte = buffer.readByte()
counter++ counter++
if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.") if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.")
//If end-of-line-search is on, terminate
if (separatorFound) {
if (endOfInput || byte == '\n'.code.toByte()) {
return counter
}
} else {
rb.push(byte) rb.push(byte)
if (rb.compare(separator)) { if (rb.contentEquals(separator)) {
separatorFound = true
if (!skipUntilEndOfLine) {
return counter return counter
}
} else if (rb.isFull()) { } else if (rb.isFull()) {
output.writeByte(rb[0]) output.writeByte(rb[0])
} }
} }
}
!endOfInput !endOfInput
} }
if (errorOnEof) {
error("Read to the end of input without encountering ${separator.decodeToString()}") error("Read to the end of input without encountering ${separator.decodeToString()}")
} else {
for(i in 1 until rb.size){
output.writeByte(rb[i])
}
counter += (rb.size - 1)
return counter
}
}
public fun Input.discardLine(): Int {
return discardUntilDelimiter('\n'.code.toByte()).also {
discard(1)
}.toInt() + 1
} }
public fun Input.discardWithSeparator( public fun Input.discardWithSeparator(
separator: ByteArray, separator: ByteArray,
atMost: Int = Int.MAX_VALUE, atMost: Int = Int.MAX_VALUE,
skipUntilEndOfLine: Boolean = false, errorOnEof: Boolean = false,
): Int { ): Int {
val dummy: Output = object : Output(ChunkBuffer.Pool) { val dummy: Output = object : Output(ChunkBuffer.Pool) {
override fun closeDestination() { override fun closeDestination() {
@ -193,5 +165,5 @@ public fun Input.discardWithSeparator(
} }
} }
return readBytesWithSeparatorTo(dummy, separator, atMost, skipUntilEndOfLine) return readWithSeparatorTo(dummy, separator, atMost, errorOnEof)
} }

View File

@ -1,8 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.readDouble
import io.ktor.utils.io.core.writeDouble
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -14,40 +12,52 @@ class EnvelopeFormatTest {
"d" put 22.2 "d" put 22.2
} }
data { data {
writeDouble(22.2) writeUtf8String("12345678")
// repeat(2000){
// writeInt(it)
// }
} }
} }
@Test @Test
fun testTaggedFormat() { fun testTaggedFormat() = with(TaggedEnvelopeFormat) {
TaggedEnvelopeFormat.run {
val byteArray = writeToByteArray(envelope) val byteArray = writeToByteArray(envelope)
//println(byteArray.decodeToString())
val res = readFromByteArray(byteArray) val res = readFromByteArray(byteArray)
assertEquals(envelope.meta, res.meta) assertEquals(envelope.meta, res.meta)
val double = res.data?.read { val bytes = res.data?.read {
readDouble() readBytes()
}
assertEquals(22.2, double)
} }
assertEquals("12345678", bytes?.decodeToString())
} }
@Test @Test
fun testTaglessFormat() { fun testTaglessFormat() = with(TaglessEnvelopeFormat) {
TaglessEnvelopeFormat.run {
val byteArray = writeToByteArray(envelope) val byteArray = writeToByteArray(envelope)
//println(byteArray.decodeToString()) println(byteArray.decodeToString())
val partial = readPartial(ByteReadPacket(byteArray))
assertEquals(8, partial.dataSize?.toInt())
val res = readFromByteArray(byteArray) val res = readFromByteArray(byteArray)
assertEquals(envelope.meta, res.meta) assertEquals(envelope.meta, res.meta)
val double = res.data?.read { val bytes = res.data?.read {
readDouble() readBytes()
} }
assertEquals(22.2, double) assertEquals("12345678", bytes?.decodeToString())
} }
@Test
fun testManualDftl(){
val envelopeString = """
#~DFTL
#~META
{
"@envelope": {
"type": "test.format"
},
"d": 22.2
}
#~DATA
12345678
""".trimIndent()
val res = TaglessEnvelopeFormat.readFromByteArray(envelopeString.encodeToByteArray())
assertEquals(envelope.meta, res.meta)
val bytes = res.data?.read {
readBytes()
}
assertEquals("12345678", bytes?.decodeToString())
} }
} }

View File

@ -31,7 +31,7 @@ class IOTest {
binary.read { binary.read {
val array = ByteArray { val array = ByteArray {
val read = readBytesWithSeparatorTo(this, "---".encodeToByteArray(), skipUntilEndOfLine = true) val read = readWithSeparatorTo(this, "---".encodeToByteArray()) + discardLine()
assertEquals(12, read) assertEquals(12, read)
} }
assertEquals(""" assertEquals("""
@ -43,13 +43,13 @@ class IOTest {
assertFails { assertFails {
binary.read { binary.read {
discardWithSeparator("---".encodeToByteArray(), atMost = 3) discardWithSeparator("---".encodeToByteArray(), atMost = 3 )
} }
} }
assertFails { assertFails {
binary.read{ binary.read{
discardWithSeparator("-+-".encodeToByteArray()) discardWithSeparator("-+-".encodeToByteArray(), errorOnEof = true)
} }
} }

View File

@ -27,12 +27,12 @@ class MultipartTest {
} }
val partsEnvelope = Envelope { val partsEnvelope = Envelope {
envelopes(envelopes, TaglessEnvelopeFormat) envelopes(envelopes)
} }
@Test @Test
fun testParts() { fun testParts() {
val format = TaglessEnvelopeFormat val format = TaggedEnvelopeFormat
val singleEnvelopeData = Binary(envelopes[0], format) val singleEnvelopeData = Binary(envelopes[0], format)
val singleEnvelopeSize = singleEnvelopeData.size val singleEnvelopeSize = singleEnvelopeData.size
val bytes = Binary(partsEnvelope, format) val bytes = Binary(partsEnvelope, format)
@ -40,7 +40,7 @@ class MultipartTest {
val reconstructed = bytes.readWith(format) val reconstructed = bytes.readWith(format)
println(reconstructed.meta) println(reconstructed.meta)
val parts = reconstructed.parts() val parts = reconstructed.parts()
val envelope = parts[2].envelope(io) val envelope = parts[2].envelope()
assertEquals(2, envelope.meta["value"].int) assertEquals(2, envelope.meta["value"].int)
println(reconstructed.data!!.size) println(reconstructed.data!!.size)
} }

View File

@ -36,6 +36,8 @@ internal class PathBinary(
} }
return ByteReadPacket(array).block() return ByteReadPacket(array).block()
} }
override fun view(offset: Int, binarySize: Int) = PathBinary(path, fileOffset + offset, binarySize)
} }
public fun Path.asBinary(): Binary = PathBinary(this) public fun Path.asBinary(): Binary = PathBinary(this)
@ -73,15 +75,7 @@ public fun Path.rewrite(block: Output.() -> Unit): Unit {
} }
@DFExperimental @DFExperimental
public fun EnvelopeFormat.readFile(path: Path): Envelope { public fun EnvelopeFormat.readFile(path: Path): Envelope = readObject(path.asBinary())
val partialEnvelope: PartialEnvelope = path.asBinary().read {
readPartial(this@read)
}
val offset: Int = partialEnvelope.dataOffset.toInt()
val size: Int = partialEnvelope.dataSize?.toInt() ?: (Files.size(path).toInt() - offset)
val binary = PathBinary(path, offset, size)
return SimpleEnvelope(partialEnvelope.meta, binary)
}
/** /**
* Resolve IOFormat based on type * Resolve IOFormat based on type
@ -239,10 +233,9 @@ public fun IOPlugin.writeEnvelopeFile(
path: Path, path: Path,
envelope: Envelope, envelope: Envelope,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat, envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
metaFormat: MetaFormatFactory? = null,
) { ) {
path.rewrite { path.rewrite {
envelopeFormat.writeEnvelope(this, envelope, metaFormat ?: envelopeFormat.defaultMetaFormat) envelopeFormat.writeObject(this, envelope)
} }
} }

View File

@ -17,5 +17,5 @@ public fun <T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(format
public suspend fun <T : Any> Data<T>.toEnvelope(format: IOWriter<T>): Envelope { public suspend fun <T : Any> Data<T>.toEnvelope(format: IOWriter<T>): Envelope {
val obj = await() val obj = await()
val binary = Binary(obj, format) val binary = Binary(obj, format)
return SimpleEnvelope(meta, binary) return Envelope(meta, binary)
} }

View File

@ -30,7 +30,6 @@ import kotlin.io.path.nameWithoutExtension
import kotlin.io.path.readAttributes import kotlin.io.path.readAttributes
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
import kotlin.streams.toList
//public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T> //public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
@ -193,7 +192,6 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
tree: DataTree<T>, tree: DataTree<T>,
format: IOWriter<T>, format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null, envelopeFormat: EnvelopeFormat? = null,
metaFormat: MetaFormatFactory? = null,
) { ) {
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
if (!Files.exists(path)) { if (!Files.exists(path)) {
@ -210,15 +208,15 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
is DataTreeItem.Leaf -> { is DataTreeItem.Leaf -> {
val envelope = item.data.toEnvelope(format) val envelope = item.data.toEnvelope(format)
if (envelopeFormat != null) { if (envelopeFormat != null) {
writeEnvelopeFile(childPath, envelope, envelopeFormat, metaFormat) writeEnvelopeFile(childPath, envelope, envelopeFormat)
} else { } else {
writeEnvelopeDirectory(childPath, envelope, metaFormat ?: JsonMetaFormat) writeEnvelopeDirectory(childPath, envelope)
} }
} }
} }
} }
val treeMeta = tree.meta val treeMeta = tree.meta
writeMetaFile(path, treeMeta, metaFormat ?: JsonMetaFormat) writeMetaFile(path, treeMeta)
} }
} }

View File

@ -29,7 +29,7 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
val entry = ZipEntry(name) val entry = ZipEntry(name)
putNextEntry(entry) putNextEntry(entry)
asOutput().run { asOutput().run {
envelopeFormat.writeEnvelope(this, envelope) envelopeFormat.writeObject(this, envelope)
flush() flush()
} }
} }

View File

@ -23,13 +23,13 @@ internal class CachingWorkspaceTest {
useCache() useCache()
val doFirst by task<Any> { val doFirst by task<Any> {
pipeFrom(data()) { _, name, meta -> pipeFrom(data()) { _, name, _ ->
println("Done first on $name with flag=${taskMeta["flag"].boolean ?: false}") println("Done first on $name with flag=${taskMeta["flag"].boolean ?: false}")
} }
} }
val doSecond by task<Any>{ val doSecond by task<Any>{
pipeFrom(doFirst) { _, name, meta -> pipeFrom(doFirst) { _, name, _ ->
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}") println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
} }
} }

View File

@ -4,5 +4,6 @@ org.gradle.jvmargs=-Xmx4096m
kotlin.code.style=official kotlin.code.style=official
kotlin.mpp.stability.nowarn=true kotlin.mpp.stability.nowarn=true
kotlin.incremental.js.ir=true kotlin.incremental.js.ir=true
kotlin.native.ignoreDisabledTargets=true
toolsVersion=0.13.3-kotlin-1.7.20 toolsVersion=0.13.3-kotlin-1.7.20