[WIP] Refactoring to kotlinx-io
This commit is contained in:
parent
2aba1b48dc
commit
a699c36f8e
@ -23,5 +23,5 @@ kscience {
|
|||||||
}
|
}
|
||||||
|
|
||||||
readme{
|
readme{
|
||||||
maturity = space.kscience.gradle.Maturity.PROTOTYPE
|
maturity = space.kscience.gradle.Maturity.EXPERIMENTAL
|
||||||
}
|
}
|
@ -1,7 +1,11 @@
|
|||||||
package space.kscience.dataforge.io.yaml
|
package space.kscience.dataforge.io.yaml
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Input
|
import kotlinx.io.Sink
|
||||||
import io.ktor.utils.io.core.Output
|
import kotlinx.io.Source
|
||||||
|
import kotlinx.io.bytestring.ByteString
|
||||||
|
import kotlinx.io.bytestring.encodeToByteString
|
||||||
|
import kotlinx.io.readByteString
|
||||||
|
import kotlinx.io.writeString
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.io.*
|
import space.kscience.dataforge.io.*
|
||||||
@ -19,18 +23,18 @@ public class FrontMatterEnvelopeFormat(
|
|||||||
var offset = 0
|
var offset = 0
|
||||||
|
|
||||||
offset += discardWithSeparator(
|
offset += discardWithSeparator(
|
||||||
SEPARATOR.encodeToByteArray(),
|
SEPARATOR,
|
||||||
atMost = 1024,
|
atMost = 1024,
|
||||||
)
|
)
|
||||||
|
|
||||||
val line = ByteArray {
|
val line = ByteArray {
|
||||||
offset += readWithSeparatorTo(this, "\n".encodeToByteArray())
|
offset += readWithSeparatorTo(this, "\n".encodeToByteString())
|
||||||
}.decodeToString()
|
}.decodeToString()
|
||||||
|
|
||||||
val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
|
val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
|
||||||
|
|
||||||
val packet = ByteArray {
|
val packet = ByteArray {
|
||||||
offset += readWithSeparatorTo(this, SEPARATOR.encodeToByteArray())
|
offset += readWithSeparatorTo(this, SEPARATOR)
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += discardLine()
|
offset += discardLine()
|
||||||
@ -39,25 +43,25 @@ public class FrontMatterEnvelopeFormat(
|
|||||||
Envelope(meta, binary.view(offset))
|
Envelope(meta, binary.view(offset))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(input: Input): Envelope = readObject(input.readBinary())
|
override fun readObject(source: Source): Envelope = readObject(source.readBinary())
|
||||||
|
|
||||||
override fun writeObject(
|
override fun writeObject(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
obj: Envelope,
|
obj: Envelope,
|
||||||
) {
|
) {
|
||||||
val metaFormat = metaFormatFactory.build(io.context, meta)
|
val metaFormat = metaFormatFactory.build(io.context, meta)
|
||||||
val formatSuffix = if (metaFormat is YamlMetaFormat) "" else metaFormatFactory.shortName
|
val formatSuffix = if (metaFormat is YamlMetaFormat) "" else metaFormatFactory.shortName
|
||||||
output.writeRawString("$SEPARATOR${formatSuffix}\r\n")
|
sink.writeString("$SEPARATOR${formatSuffix}\r\n")
|
||||||
metaFormat.run { metaFormat.writeObject(output, obj.meta) }
|
metaFormat.run { metaFormat.writeObject(sink, obj.meta) }
|
||||||
output.writeRawString("$SEPARATOR\r\n")
|
sink.writeString("$SEPARATOR\r\n")
|
||||||
//Printing data
|
//Printing data
|
||||||
obj.data?.let { data ->
|
obj.data?.let { data ->
|
||||||
output.writeBinary(data)
|
sink.writeBinary(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public companion object : EnvelopeFormatFactory {
|
public companion object : EnvelopeFormatFactory {
|
||||||
public const val SEPARATOR: String = "---"
|
public val SEPARATOR: ByteString = "---".encodeToByteString()
|
||||||
|
|
||||||
private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
|
private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
|
||||||
|
|
||||||
@ -69,8 +73,8 @@ public class FrontMatterEnvelopeFormat(
|
|||||||
|
|
||||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = binary.read {
|
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = binary.read {
|
||||||
//read raw string to avoid UTF issues
|
//read raw string to avoid UTF issues
|
||||||
val line = readRawString(3)
|
val line = readByteString(3)
|
||||||
return@read if (line == "---") {
|
return@read if (line == "---".encodeToByteString()) {
|
||||||
default
|
default
|
||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
@ -82,12 +86,12 @@ public class FrontMatterEnvelopeFormat(
|
|||||||
override fun readObject(binary: Binary): Envelope = default.readObject(binary)
|
override fun readObject(binary: Binary): Envelope = default.readObject(binary)
|
||||||
|
|
||||||
override fun writeObject(
|
override fun writeObject(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
obj: Envelope,
|
obj: Envelope,
|
||||||
): Unit = default.writeObject(output, obj)
|
): Unit = default.writeObject(sink, obj)
|
||||||
|
|
||||||
|
|
||||||
override fun readObject(input: Input): Envelope = default.readObject(input)
|
override fun readObject(source: Source): Envelope = default.readObject(source)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,13 +1,13 @@
|
|||||||
package space.kscience.dataforge.io.yaml
|
package space.kscience.dataforge.io.yaml
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Input
|
import kotlinx.io.Sink
|
||||||
import io.ktor.utils.io.core.Output
|
import kotlinx.io.Source
|
||||||
|
import kotlinx.io.readString
|
||||||
|
import kotlinx.io.writeString
|
||||||
import net.mamoe.yamlkt.*
|
import net.mamoe.yamlkt.*
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.io.MetaFormat
|
import space.kscience.dataforge.io.MetaFormat
|
||||||
import space.kscience.dataforge.io.MetaFormatFactory
|
import space.kscience.dataforge.io.MetaFormatFactory
|
||||||
import space.kscience.dataforge.io.readUtf8String
|
|
||||||
import space.kscience.dataforge.io.writeUtf8String
|
|
||||||
import space.kscience.dataforge.meta.*
|
import space.kscience.dataforge.meta.*
|
||||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||||
import space.kscience.dataforge.meta.descriptors.get
|
import space.kscience.dataforge.meta.descriptors.get
|
||||||
@ -89,14 +89,14 @@ public fun YamlMap.toMeta(): Meta = YamlMeta(this)
|
|||||||
*/
|
*/
|
||||||
public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
|
public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
|
||||||
|
|
||||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?) {
|
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?) {
|
||||||
val yaml: YamlMap = meta.toYaml()
|
val yaml: YamlMap = meta.toYaml()
|
||||||
val string = Yaml.encodeToString(YamlMap.serializer(), yaml)
|
val string = Yaml.encodeToString(YamlMap.serializer(), yaml)
|
||||||
output.writeUtf8String(string)
|
sink.writeString(string)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta {
|
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta {
|
||||||
val yaml = Yaml.decodeYamlMapFromString(input.readUtf8String())
|
val yaml = Yaml.decodeYamlMapFromString(source.readString())
|
||||||
return yaml.toMeta()
|
return yaml.toMeta()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,10 +109,10 @@ public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
|
|||||||
|
|
||||||
private val default = YamlMetaFormat(Meta.EMPTY)
|
private val default = YamlMetaFormat(Meta.EMPTY)
|
||||||
|
|
||||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
||||||
default.writeMeta(output, meta, descriptor)
|
default.writeMeta(sink, meta, descriptor)
|
||||||
|
|
||||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta =
|
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta =
|
||||||
default.readMeta(input, descriptor)
|
default.readMeta(source, descriptor)
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,6 +1,9 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import kotlinx.io.*
|
import kotlinx.io.Sink
|
||||||
|
import kotlinx.io.Source
|
||||||
|
import kotlinx.io.buffered
|
||||||
|
import kotlinx.io.readByteArray
|
||||||
import kotlin.math.min
|
import kotlin.math.min
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -32,22 +35,6 @@ public interface Binary {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public class ByteArraySource(
|
|
||||||
public val byteArray: ByteArray,
|
|
||||||
public val offset: Int,
|
|
||||||
public val size: Int,
|
|
||||||
) : RawSource {
|
|
||||||
override fun close() {
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
|
|
||||||
val byteRead = min(byteCount, size.toLong())
|
|
||||||
sink.write(byteArray, offset, offset + byteRead.toInt())
|
|
||||||
return byteRead
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
internal class ByteArrayBinary(
|
internal class ByteArrayBinary(
|
||||||
internal val array: ByteArray,
|
internal val array: ByteArray,
|
||||||
internal val start: Int = 0,
|
internal val start: Int = 0,
|
||||||
@ -58,17 +45,11 @@ internal class ByteArrayBinary(
|
|||||||
require(offset >= 0) { "Offset must be positive" }
|
require(offset >= 0) { "Offset must be positive" }
|
||||||
require(offset < array.size) { "Offset $offset is larger than array size" }
|
require(offset < array.size) { "Offset $offset is larger than array size" }
|
||||||
|
|
||||||
val input = ByteArraySource(
|
return ByteArraySource(
|
||||||
array,
|
array,
|
||||||
offset + start,
|
offset + start,
|
||||||
min(atMost, size - offset)
|
min(atMost, size - offset)
|
||||||
).buffered()
|
).buffered().use(block)
|
||||||
|
|
||||||
return try {
|
|
||||||
block(input)
|
|
||||||
} finally {
|
|
||||||
input.close()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
|
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
|
||||||
|
@ -23,7 +23,7 @@ private class PartDescriptor : Scheme() {
|
|||||||
val PARTS_KEY = MULTIPART_KEY + "parts"
|
val PARTS_KEY = MULTIPART_KEY + "parts"
|
||||||
val SEPARATOR_KEY = MULTIPART_KEY + "separator"
|
val SEPARATOR_KEY = MULTIPART_KEY + "separator"
|
||||||
|
|
||||||
val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n".toACIIByteString()
|
val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n".toAsciiByteString()
|
||||||
|
|
||||||
const val MULTIPART_DATA_TYPE = "envelope.multipart"
|
const val MULTIPART_DATA_TYPE = "envelope.multipart"
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ public interface IOReader<out T> {
|
|||||||
*/
|
*/
|
||||||
public val type: KType
|
public val type: KType
|
||||||
|
|
||||||
public fun readObject(input: Source): T
|
public fun readObject(source: Source): T
|
||||||
|
|
||||||
public fun readObject(binary: Binary): T = binary.read { readObject(this) }
|
public fun readObject(binary: Binary): T = binary.read { readObject(this) }
|
||||||
|
|
||||||
@ -34,7 +34,7 @@ public interface IOReader<out T> {
|
|||||||
public val binary: IOReader<Binary> = object : IOReader<Binary> {
|
public val binary: IOReader<Binary> = object : IOReader<Binary> {
|
||||||
override val type: KType = typeOf<Binary>()
|
override val type: KType = typeOf<Binary>()
|
||||||
|
|
||||||
override fun readObject(input: Source): Binary = input.readByteArray().asBinary()
|
override fun readObject(source: Source): Binary = source.readByteArray().asBinary()
|
||||||
|
|
||||||
override fun readObject(binary: Binary): Binary = binary
|
override fun readObject(binary: Binary): Binary = binary
|
||||||
}
|
}
|
||||||
@ -44,11 +44,11 @@ public interface IOReader<out T> {
|
|||||||
public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
|
public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
|
||||||
override val type: KType = typeOf<T>()
|
override val type: KType = typeOf<T>()
|
||||||
|
|
||||||
override fun readObject(input: Source): T = input.read()
|
override fun readObject(source: Source): T = source.read()
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun interface IOWriter<in T> {
|
public fun interface IOWriter<in T> {
|
||||||
public fun writeObject(output: Sink, obj: T)
|
public fun writeObject(sink: Sink, obj: T)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -96,9 +96,9 @@ public object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
|||||||
|
|
||||||
override val type: KType get() = typeOf<Double>()
|
override val type: KType get() = typeOf<Double>()
|
||||||
|
|
||||||
override fun writeObject(output: Sink, obj: Double) {
|
override fun writeObject(sink: Sink, obj: Double) {
|
||||||
output.writeLong(obj.toBits())
|
sink.writeLong(obj.toBits())
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(input: Source): Double = Double.fromBits(input.readLong())
|
override fun readObject(source: Source): Double = Double.fromBits(source.readLong())
|
||||||
}
|
}
|
@ -1,10 +1,10 @@
|
|||||||
@file:Suppress("UNUSED_PARAMETER")
|
|
||||||
|
|
||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Input
|
import kotlinx.io.Sink
|
||||||
import io.ktor.utils.io.core.Output
|
import kotlinx.io.Source
|
||||||
|
import kotlinx.io.readString
|
||||||
|
import kotlinx.io.writeString
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
import kotlinx.serialization.json.JsonObject
|
import kotlinx.serialization.json.JsonObject
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
@ -18,13 +18,13 @@ import space.kscience.dataforge.meta.toMeta
|
|||||||
*/
|
*/
|
||||||
public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat {
|
public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat {
|
||||||
|
|
||||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?) {
|
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?) {
|
||||||
val jsonObject = meta.toJson(descriptor)
|
val jsonObject = meta.toJson(descriptor)
|
||||||
output.writeUtf8String(json.encodeToString(JsonObject.serializer(), jsonObject))
|
sink.writeString(json.encodeToString(JsonObject.serializer(), jsonObject))
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta {
|
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta {
|
||||||
val str = input.readUtf8String()//readByteArray().decodeToString()
|
val str = source.readString()
|
||||||
val jsonElement = json.parseToJsonElement(str)
|
val jsonElement = json.parseToJsonElement(str)
|
||||||
return jsonElement.toMeta(descriptor)
|
return jsonElement.toMeta(descriptor)
|
||||||
}
|
}
|
||||||
@ -39,10 +39,10 @@ public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat
|
|||||||
|
|
||||||
private val default = JsonMetaFormat()
|
private val default = JsonMetaFormat()
|
||||||
|
|
||||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
||||||
default.run { writeMeta(output, meta, descriptor) }
|
default.run { writeMeta(sink, meta, descriptor) }
|
||||||
|
|
||||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta =
|
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta =
|
||||||
default.run { readMeta(input, descriptor) }
|
default.run { readMeta(source, descriptor) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.ByteReadPacket
|
|
||||||
import io.ktor.utils.io.core.Input
|
import kotlinx.io.Sink
|
||||||
import io.ktor.utils.io.core.Output
|
import kotlinx.io.Source
|
||||||
import io.ktor.utils.io.core.use
|
import kotlinx.io.buffered
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
||||||
@ -23,19 +23,19 @@ public interface MetaFormat : IOFormat<Meta> {
|
|||||||
|
|
||||||
override val type: KType get() = typeOf<Meta>()
|
override val type: KType get() = typeOf<Meta>()
|
||||||
|
|
||||||
override fun writeObject(output: Output, obj: Meta) {
|
override fun writeObject(sink: Sink, obj: Meta) {
|
||||||
writeMeta(output, obj, null)
|
writeMeta(sink, obj, null)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(input: Input): Meta = readMeta(input)
|
override fun readObject(source: Source): Meta = readMeta(source)
|
||||||
|
|
||||||
public fun writeMeta(
|
public fun writeMeta(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
descriptor: MetaDescriptor? = null,
|
descriptor: MetaDescriptor? = null,
|
||||||
)
|
)
|
||||||
|
|
||||||
public fun readMeta(input: Input, descriptor: MetaDescriptor? = null): Meta
|
public fun readMeta(source: Source, descriptor: MetaDescriptor? = null): Meta
|
||||||
}
|
}
|
||||||
|
|
||||||
@Type(META_FORMAT_TYPE)
|
@Type(META_FORMAT_TYPE)
|
||||||
@ -63,9 +63,7 @@ public fun Meta.toString(format: MetaFormat): String = ByteArray {
|
|||||||
|
|
||||||
public fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory.build(Global, Meta.EMPTY))
|
public fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory.build(Global, Meta.EMPTY))
|
||||||
|
|
||||||
public fun MetaFormat.parse(str: String): Meta {
|
public fun MetaFormat.parse(str: String): Meta = readObject(StringSource(str).buffered())
|
||||||
return ByteReadPacket(str.encodeToByteArray()).use { readObject(it) }
|
|
||||||
}
|
|
||||||
|
|
||||||
public fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = build(Global, formatMeta).parse(str)
|
public fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = build(Global, formatMeta).parse(str)
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.*
|
import kotlinx.io.*
|
||||||
|
import kotlinx.io.bytestring.decodeToString
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
@ -18,7 +19,7 @@ import space.kscience.dataforge.names.plus
|
|||||||
public class TaggedEnvelopeFormat(
|
public class TaggedEnvelopeFormat(
|
||||||
public val io: IOPlugin,
|
public val io: IOPlugin,
|
||||||
public val version: VERSION = VERSION.DF02,
|
public val version: VERSION = VERSION.DF02,
|
||||||
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat
|
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat,
|
||||||
) : EnvelopeFormat {
|
) : EnvelopeFormat {
|
||||||
|
|
||||||
// private val metaFormat = io.metaFormat(metaFormatKey)
|
// private val metaFormat = io.metaFormat(metaFormatKey)
|
||||||
@ -26,59 +27,60 @@ public class TaggedEnvelopeFormat(
|
|||||||
|
|
||||||
|
|
||||||
private fun Tag.toBinary() = Binary {
|
private fun Tag.toBinary() = Binary {
|
||||||
writeRawString(START_SEQUENCE)
|
write(START_SEQUENCE)
|
||||||
writeRawString(version.name)
|
writeString(version.name)
|
||||||
writeShort(metaFormatKey)
|
writeShort(metaFormatKey)
|
||||||
writeUInt(metaSize)
|
writeUInt(metaSize)
|
||||||
when (version) {
|
when (version) {
|
||||||
VERSION.DF02 -> {
|
VERSION.DF02 -> {
|
||||||
writeUInt(dataSize.toUInt())
|
writeUInt(dataSize.toUInt())
|
||||||
}
|
}
|
||||||
|
|
||||||
VERSION.DF03 -> {
|
VERSION.DF03 -> {
|
||||||
writeULong(dataSize)
|
writeULong(dataSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
writeRawString(END_SEQUENCE)
|
write(END_SEQUENCE)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun writeObject(
|
override fun writeObject(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
obj: Envelope,
|
obj: Envelope,
|
||||||
) {
|
) {
|
||||||
val metaFormat = metaFormatFactory.build(io.context, Meta.EMPTY)
|
val metaFormat = metaFormatFactory.build(io.context, Meta.EMPTY)
|
||||||
val metaBytes = Binary(obj.meta,metaFormat)
|
val metaBytes = Binary(obj.meta, metaFormat)
|
||||||
val actualSize: ULong = (obj.data?.size ?: 0).toULong()
|
val actualSize: ULong = (obj.data?.size ?: 0).toULong()
|
||||||
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
|
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
|
||||||
output.writeBinary(tag.toBinary())
|
sink.writeBinary(tag.toBinary())
|
||||||
output.writeBinary(metaBytes)
|
sink.writeBinary(metaBytes)
|
||||||
output.writeRawString("\r\n")
|
sink.writeString("\r\n")
|
||||||
obj.data?.let {
|
obj.data?.let {
|
||||||
output.writeBinary(it)
|
sink.writeBinary(it)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read an envelope from input into memory
|
* Read an envelope from input into memory
|
||||||
*
|
*
|
||||||
* @param input an input to read from
|
* @param source an input to read from
|
||||||
* @param formats a collection of meta formats to resolve
|
* @param formats a collection of meta formats to resolve
|
||||||
*/
|
*/
|
||||||
override fun readObject(input: Input): Envelope {
|
override fun readObject(source: Source): Envelope {
|
||||||
val tag = input.readTag(this.version)
|
val tag = source.readTag(this.version)
|
||||||
|
|
||||||
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
|
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
|
||||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||||
|
|
||||||
val metaBinary = input.readBinary(tag.metaSize.toInt())
|
val metaBinary = source.readBinary(tag.metaSize.toInt())
|
||||||
|
|
||||||
val meta: Meta = metaFormat.readObjectFrom(metaBinary)
|
val meta: Meta = metaFormat.readObjectFrom(metaBinary)
|
||||||
|
|
||||||
val data = input.readBinary(tag.dataSize.toInt())
|
val data = source.readBinary(tag.dataSize.toInt())
|
||||||
|
|
||||||
return SimpleEnvelope(meta, data)
|
return SimpleEnvelope(meta, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(binary: Binary): Envelope = binary.read{
|
override fun readObject(binary: Binary): Envelope = binary.read {
|
||||||
val tag = readTag(version)
|
val tag = readTag(version)
|
||||||
|
|
||||||
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
|
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
|
||||||
@ -104,8 +106,8 @@ public class TaggedEnvelopeFormat(
|
|||||||
}
|
}
|
||||||
|
|
||||||
public companion object : EnvelopeFormatFactory {
|
public companion object : EnvelopeFormatFactory {
|
||||||
private const val START_SEQUENCE = "#~"
|
private val START_SEQUENCE = "#~".toAsciiByteString()
|
||||||
private const val END_SEQUENCE = "~#\r\n"
|
private val END_SEQUENCE = "~#\r\n".toAsciiByteString()
|
||||||
|
|
||||||
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "tagged"
|
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "tagged"
|
||||||
|
|
||||||
@ -121,27 +123,26 @@ public class TaggedEnvelopeFormat(
|
|||||||
return TaggedEnvelopeFormat(io, version)
|
return TaggedEnvelopeFormat(io, version)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun Input.readTag(version: VERSION): Tag {
|
private fun Source.readTag(version: VERSION): Tag {
|
||||||
val start = readRawString(2)
|
val start = readByteString(2)
|
||||||
if (start != START_SEQUENCE) error("The input is not an envelope")
|
if (start != START_SEQUENCE) error("The input is not an envelope")
|
||||||
val versionString = readRawString(4)
|
val versionString = readByteString(4)
|
||||||
if (version.name != versionString) error("Wrong version of DataForge: expected $version but found $versionString")
|
if (version.name.toAsciiByteString() != versionString) error("Wrong version of DataForge: expected $version but found $versionString")
|
||||||
val metaFormatKey = readShort()
|
val metaFormatKey = readShort()
|
||||||
val metaLength = readUInt()
|
val metaLength = readUInt()
|
||||||
val dataLength: ULong = when (version) {
|
val dataLength: ULong = when (version) {
|
||||||
VERSION.DF02 -> readUInt().toULong()
|
VERSION.DF02 -> readUInt().toULong()
|
||||||
VERSION.DF03 -> readULong()
|
VERSION.DF03 -> readULong()
|
||||||
}
|
}
|
||||||
val end = readRawString(4)
|
val end = readByteString(4)
|
||||||
if (end != END_SEQUENCE) error("The input is not an envelope")
|
if (end != END_SEQUENCE) error("The input is not an envelope")
|
||||||
return Tag(metaFormatKey, metaLength, dataLength)
|
return Tag(metaFormatKey, metaLength, dataLength)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
|
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = try {
|
||||||
return try {
|
|
||||||
binary.read {
|
binary.read {
|
||||||
val header = readRawString(6)
|
val header = readByteString(6)
|
||||||
return@read when (header.substring(2..5)) {
|
when (header.substring(2, 6).decodeToString()) {
|
||||||
VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02)
|
VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02)
|
||||||
VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03)
|
VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03)
|
||||||
else -> null
|
else -> null
|
||||||
@ -150,7 +151,6 @@ public class TaggedEnvelopeFormat(
|
|||||||
} catch (ex: Exception) {
|
} catch (ex: Exception) {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private val default by lazy { build(Global, Meta.EMPTY) }
|
private val default by lazy { build(Global, Meta.EMPTY) }
|
||||||
|
|
||||||
@ -158,16 +158,13 @@ public class TaggedEnvelopeFormat(
|
|||||||
default.run { readObject(binary) }
|
default.run { readObject(binary) }
|
||||||
|
|
||||||
override fun writeObject(
|
override fun writeObject(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
obj: Envelope,
|
obj: Envelope,
|
||||||
): Unit = default.run {
|
): Unit = default.run {
|
||||||
writeObject(
|
writeObject(sink, obj)
|
||||||
output,
|
|
||||||
obj,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(input: Input): Envelope = default.readObject(input)
|
override fun readObject(source: Source): Envelope = default.readObject(source)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -1,9 +1,9 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.ByteReadPacket
|
|
||||||
import io.ktor.utils.io.core.Input
|
import kotlinx.io.*
|
||||||
import io.ktor.utils.io.core.Output
|
import kotlinx.io.bytestring.ByteString
|
||||||
import io.ktor.utils.io.core.readUTF8UntilDelimiterTo
|
import kotlinx.io.bytestring.encodeToByteString
|
||||||
import space.kscience.dataforge.context.Context
|
import space.kscience.dataforge.context.Context
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
@ -29,34 +29,35 @@ public class TaglessEnvelopeFormat(
|
|||||||
// }
|
// }
|
||||||
|
|
||||||
override fun writeObject(
|
override fun writeObject(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
obj: Envelope,
|
obj: Envelope,
|
||||||
) {
|
) {
|
||||||
val metaFormat = metaFormatFactory.build(this.io.context, meta)
|
val metaFormat = metaFormatFactory.build(this.io.context, meta)
|
||||||
|
|
||||||
//printing header
|
//printing header
|
||||||
output.writeRawString(TAGLESS_ENVELOPE_HEADER + "\r\n")
|
sink.write(TAGLESS_ENVELOPE_HEADER)
|
||||||
|
sink.writeString("\r\n")
|
||||||
|
|
||||||
//Printing meta
|
//Printing meta
|
||||||
if (!obj.meta.isEmpty()) {
|
if (!obj.meta.isEmpty()) {
|
||||||
val metaBinary = Binary(obj.meta, metaFormat)
|
val metaBinary = Binary(obj.meta, metaFormat)
|
||||||
output.writeUtf8String(META_START + "-${metaFormatFactory.shortName}\r\n")
|
sink.writeString(META_START + "-${metaFormatFactory.shortName}\r\n")
|
||||||
output.writeBinary(metaBinary)
|
sink.writeBinary(metaBinary)
|
||||||
output.writeRawString("\r\n")
|
sink.writeString("\r\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
//Printing data
|
//Printing data
|
||||||
obj.data?.let { data ->
|
obj.data?.let { data ->
|
||||||
//val actualSize: Int = envelope.data?.size ?: 0
|
//val actualSize: Int = envelope.data?.size ?: 0
|
||||||
output.writeUtf8String(DATA_START + "\r\n")
|
sink.writeString(DATA_START + "\r\n")
|
||||||
output.writeBinary(data)
|
sink.writeBinary(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(input: Input): Envelope {
|
override fun readObject(source: Source): Envelope {
|
||||||
//read preamble
|
//read preamble
|
||||||
input.discardWithSeparator(
|
source.discardWithSeparator(
|
||||||
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(),
|
TAGLESS_ENVELOPE_HEADER,
|
||||||
atMost = 1024,
|
atMost = 1024,
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -64,22 +65,22 @@ public class TaglessEnvelopeFormat(
|
|||||||
|
|
||||||
var data: Binary? = null
|
var data: Binary? = null
|
||||||
|
|
||||||
input.discardWithSeparator(
|
source.discardWithSeparator(
|
||||||
SEPARATOR_PREFIX,
|
SEPARATOR_PREFIX,
|
||||||
atMost = 1024,
|
atMost = 1024,
|
||||||
)
|
)
|
||||||
|
|
||||||
var header: String = ByteArray {
|
var header: String = ByteArray {
|
||||||
input.readUTF8UntilDelimiterTo(this, "\n")
|
source.readWithSeparatorTo(this, "\n".encodeToByteString())
|
||||||
}.decodeToString()
|
}.decodeToString()
|
||||||
|
|
||||||
while (!input.endOfInput) {
|
while (!source.exhausted()) {
|
||||||
val block = ByteArray {
|
val block = ByteArray {
|
||||||
input.readWithSeparatorTo(this, SEPARATOR_PREFIX)
|
source.readWithSeparatorTo(this, SEPARATOR_PREFIX)
|
||||||
}
|
}
|
||||||
|
|
||||||
val nextHeader = ByteArray {
|
val nextHeader = ByteArray {
|
||||||
input.readWithSeparatorTo(this, "\n".encodeToByteArray())
|
source.readWithSeparatorTo(this, "\n".encodeToByteString())
|
||||||
}.decodeToString()
|
}.decodeToString()
|
||||||
|
|
||||||
//terminate on end
|
//terminate on end
|
||||||
@ -89,7 +90,7 @@ public class TaglessEnvelopeFormat(
|
|||||||
if (header.startsWith("META")) {
|
if (header.startsWith("META")) {
|
||||||
//TODO check format
|
//TODO check format
|
||||||
val metaFormat: MetaFormatFactory = JsonMetaFormat
|
val metaFormat: MetaFormatFactory = JsonMetaFormat
|
||||||
meta = metaFormat.readMeta(ByteReadPacket(block))
|
meta = metaFormat.readMeta(ByteArraySource(block).buffered())
|
||||||
}
|
}
|
||||||
|
|
||||||
if (header.startsWith("DATA")) {
|
if (header.startsWith("DATA")) {
|
||||||
@ -111,9 +112,9 @@ public class TaglessEnvelopeFormat(
|
|||||||
|
|
||||||
public const val TAGLESS_ENVELOPE_TYPE: String = "tagless"
|
public const val TAGLESS_ENVELOPE_TYPE: String = "tagless"
|
||||||
|
|
||||||
public val SEPARATOR_PREFIX: ByteArray = "\n#~".encodeToByteArray()
|
public val SEPARATOR_PREFIX: ByteString = "\n#~".encodeToByteString()
|
||||||
|
|
||||||
public const val TAGLESS_ENVELOPE_HEADER: String = "#~DFTL"
|
public val TAGLESS_ENVELOPE_HEADER: ByteString = "#~DFTL".encodeToByteString()
|
||||||
|
|
||||||
// public const val META_START_PROPERTY: String = "metaSeparator"
|
// public const val META_START_PROPERTY: String = "metaSeparator"
|
||||||
public const val META_START: String = "#~META"
|
public const val META_START: String = "#~META"
|
||||||
@ -134,21 +135,18 @@ public class TaglessEnvelopeFormat(
|
|||||||
override fun readObject(binary: Binary): Envelope = default.run { readObject(binary) }
|
override fun readObject(binary: Binary): Envelope = default.run { readObject(binary) }
|
||||||
|
|
||||||
override fun writeObject(
|
override fun writeObject(
|
||||||
output: Output,
|
sink: Sink,
|
||||||
obj: Envelope,
|
obj: Envelope,
|
||||||
): Unit = default.run {
|
): Unit = default.run {
|
||||||
writeObject(
|
writeObject(sink, obj)
|
||||||
output,
|
|
||||||
obj,
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun readObject(input: Input): Envelope = default.readObject(input)
|
override fun readObject(source: Source): Envelope = default.readObject(source)
|
||||||
|
|
||||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
|
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
|
||||||
return try {
|
return try {
|
||||||
binary.read {
|
binary.read {
|
||||||
val string = readRawString(TAGLESS_ENVELOPE_HEADER.length)
|
val string = readByteString(TAGLESS_ENVELOPE_HEADER.size)
|
||||||
return@read if (string == TAGLESS_ENVELOPE_HEADER) {
|
return@read if (string == TAGLESS_ENVELOPE_HEADER) {
|
||||||
TaglessEnvelopeFormat(io)
|
TaglessEnvelopeFormat(io)
|
||||||
} else {
|
} else {
|
||||||
|
@ -2,14 +2,17 @@ package space.kscience.dataforge.io
|
|||||||
|
|
||||||
import kotlinx.io.*
|
import kotlinx.io.*
|
||||||
import kotlinx.io.bytestring.ByteString
|
import kotlinx.io.bytestring.ByteString
|
||||||
|
import kotlinx.io.bytestring.decodeToString
|
||||||
|
import kotlinx.io.bytestring.encodeToByteString
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
|
import kotlin.math.min
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Convert a string literal, containing only ASCII characters to a [ByteString].
|
* Convert a string literal, containing only ASCII characters to a [ByteString].
|
||||||
* Throws an error if there are non-ASCII characters.
|
* Throws an error if there are non-ASCII characters.
|
||||||
*/
|
*/
|
||||||
public fun String.toACIIByteString(): ByteString {
|
public fun String.toAsciiByteString(): ByteString {
|
||||||
val bytes = ByteArray(length) {
|
val bytes = ByteArray(length) {
|
||||||
val char = get(it)
|
val char = get(it)
|
||||||
val code = char.code
|
val code = char.code
|
||||||
@ -94,24 +97,30 @@ private class RingByteArray(
|
|||||||
else -> inputArray.indices.all { inputArray[it] == get(it) }
|
else -> inputArray.indices.all { inputArray[it] == get(it) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun contentEquals(byteString: ByteString): Boolean = when {
|
||||||
|
byteString.size != buffer.size -> false
|
||||||
|
size < buffer.size -> false
|
||||||
|
else -> (0 until byteString.size).all { byteString[it] == get(it) }
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) }
|
private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read [Input] into [output] until designated multibyte [separator] and optionally continues until
|
* Read [Source] into [output] until designated multibyte [separator] and optionally continues until
|
||||||
* the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read.
|
* the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read.
|
||||||
* Also fails if [separator] not found until the end of input.
|
* Also fails if [separator] not found until the end of input.
|
||||||
*
|
*
|
||||||
* Separator itself is not read into Output.
|
* The Separator itself is not read into [Sink].
|
||||||
*
|
*
|
||||||
* @param errorOnEof if true error is thrown if separator is never encountered
|
* @param errorOnEof if true error is thrown if separator is never encountered
|
||||||
*
|
*
|
||||||
* @return bytes actually being read, including separator
|
* @return bytes actually being read, including separator
|
||||||
*/
|
*/
|
||||||
public fun Source.readWithSeparatorTo(
|
public fun Source.readWithSeparatorTo(
|
||||||
output: Sink,
|
output: Sink?,
|
||||||
separator: ByteArray,
|
separator: ByteString,
|
||||||
atMost: Int = Int.MAX_VALUE,
|
atMost: Int = Int.MAX_VALUE,
|
||||||
errorOnEof: Boolean = false,
|
errorOnEof: Boolean = false,
|
||||||
): Int {
|
): Int {
|
||||||
@ -126,7 +135,7 @@ public fun Source.readWithSeparatorTo(
|
|||||||
if (rb.contentEquals(separator)) {
|
if (rb.contentEquals(separator)) {
|
||||||
return counter
|
return counter
|
||||||
} else if (rb.isFull()) {
|
} else if (rb.isFull()) {
|
||||||
output.writeByte(rb[0])
|
output?.writeByte(rb[0])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -134,33 +143,93 @@ public fun Source.readWithSeparatorTo(
|
|||||||
error("Read to the end of input without encountering ${separator.decodeToString()}")
|
error("Read to the end of input without encountering ${separator.decodeToString()}")
|
||||||
} else {
|
} else {
|
||||||
for (i in 1 until rb.size) {
|
for (i in 1 until rb.size) {
|
||||||
output.writeByte(rb[i])
|
output?.writeByte(rb[i])
|
||||||
}
|
}
|
||||||
counter += (rb.size - 1)
|
counter += (rb.size - 1)
|
||||||
return counter
|
return counter
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public fun Source.discardLine(): Int {
|
/**
|
||||||
return discardUntilDelimiter('\n'.code.toByte()).also {
|
* Discard all bytes until [separator] is encountered. Separator is discarded sa well.
|
||||||
discard(1)
|
* Return the total number of bytes read.
|
||||||
}.toInt() + 1
|
*/
|
||||||
}
|
|
||||||
|
|
||||||
public fun Source.discardWithSeparator(
|
public fun Source.discardWithSeparator(
|
||||||
separator: ByteArray,
|
separator: ByteString,
|
||||||
atMost: Int = Int.MAX_VALUE,
|
atMost: Int = Int.MAX_VALUE,
|
||||||
errorOnEof: Boolean = false,
|
errorOnEof: Boolean = false,
|
||||||
): Int {
|
): Int = readWithSeparatorTo(null, separator, atMost, errorOnEof)
|
||||||
val dummy: Sink = object : Sink(ChunkBuffer.Pool) {
|
|
||||||
override fun closeDestination() {
|
/**
|
||||||
|
* Discard all symbol until newline is discovered. Carriage return is not discarded.
|
||||||
|
*/
|
||||||
|
public fun Source.discardLine(
|
||||||
|
atMost: Int = Int.MAX_VALUE,
|
||||||
|
errorOnEof: Boolean = false,
|
||||||
|
): Int = discardWithSeparator("\n".encodeToByteString(), atMost, errorOnEof)
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A [Source] based on [ByteArray]
|
||||||
|
*/
|
||||||
|
public class ByteArraySource(
|
||||||
|
private val byteArray: ByteArray,
|
||||||
|
private val offset: Int = 0,
|
||||||
|
private val size: Int = byteArray.size - offset,
|
||||||
|
) : RawSource {
|
||||||
|
|
||||||
|
init {
|
||||||
|
require(offset >= 0) { "Offset must be positive" }
|
||||||
|
require(offset + size <= byteArray.size) { "End index is ${offset + size}, but the array size is ${byteArray.size}" }
|
||||||
|
}
|
||||||
|
|
||||||
|
private var pointer = offset
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun flush(source: Memory, offset: Int, length: Int) {
|
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
|
||||||
// Do nothing
|
if (pointer == offset + size) return -1
|
||||||
|
val byteRead = min(byteCount.toInt(), (size + offset - pointer))
|
||||||
|
sink.write(byteArray, pointer, pointer + byteRead)
|
||||||
|
pointer += byteRead
|
||||||
|
return byteRead.toLong()
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return readWithSeparatorTo(dummy, separator, atMost, errorOnEof)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A [Source] based on [String]
|
||||||
|
*/
|
||||||
|
public class StringSource(
|
||||||
|
public val string: String,
|
||||||
|
public val offset: Int = 0,
|
||||||
|
public val size: Int = string.length - offset,
|
||||||
|
) : RawSource {
|
||||||
|
|
||||||
|
private var pointer = offset
|
||||||
|
|
||||||
|
override fun close() {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
|
||||||
|
if (pointer == offset + size) return -1
|
||||||
|
val byteRead = min(byteCount.toInt(), (size + offset - pointer))
|
||||||
|
sink.writeString(string, pointer, pointer + byteRead)
|
||||||
|
pointer += byteRead
|
||||||
|
return byteRead.toLong()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun Sink.writeDouble(value: Double) {
|
||||||
|
writeLong(value.toBits())
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun Source.readDouble(): Double = Double.fromBits(readLong())
|
||||||
|
|
||||||
|
public fun Sink.writeFloat(value: Float) {
|
||||||
|
writeInt(value.toBits())
|
||||||
|
}
|
||||||
|
|
||||||
|
public fun Source.readFloat(): Float = Float.fromBits(readInt())
|
@ -1,6 +1,5 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.readInt
|
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.readBytes
|
import kotlinx.io.readByteArray
|
||||||
|
import kotlinx.io.writeString
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
@ -12,7 +13,7 @@ class EnvelopeFormatTest {
|
|||||||
"d" put 22.2
|
"d" put 22.2
|
||||||
}
|
}
|
||||||
data {
|
data {
|
||||||
writeUtf8String("12345678")
|
writeString("12345678")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -22,7 +23,7 @@ class EnvelopeFormatTest {
|
|||||||
val res = readFromByteArray(byteArray)
|
val res = readFromByteArray(byteArray)
|
||||||
assertEquals(envelope.meta, res.meta)
|
assertEquals(envelope.meta, res.meta)
|
||||||
val bytes = res.data?.read {
|
val bytes = res.data?.read {
|
||||||
readBytes()
|
readByteArray()
|
||||||
}
|
}
|
||||||
assertEquals("12345678", bytes?.decodeToString())
|
assertEquals("12345678", bytes?.decodeToString())
|
||||||
}
|
}
|
||||||
@ -34,13 +35,13 @@ class EnvelopeFormatTest {
|
|||||||
val res = readFromByteArray(byteArray)
|
val res = readFromByteArray(byteArray)
|
||||||
assertEquals(envelope.meta, res.meta)
|
assertEquals(envelope.meta, res.meta)
|
||||||
val bytes = res.data?.read {
|
val bytes = res.data?.read {
|
||||||
readBytes()
|
readByteArray()
|
||||||
}
|
}
|
||||||
assertEquals("12345678", bytes?.decodeToString())
|
assertEquals("12345678", bytes?.decodeToString())
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun testManualDftl(){
|
fun testManualDftl() {
|
||||||
val envelopeString = """
|
val envelopeString = """
|
||||||
#~DFTL
|
#~DFTL
|
||||||
#~META
|
#~META
|
||||||
@ -56,7 +57,7 @@ class EnvelopeFormatTest {
|
|||||||
val res = TaglessEnvelopeFormat.readFromByteArray(envelopeString.encodeToByteArray())
|
val res = TaglessEnvelopeFormat.readFromByteArray(envelopeString.encodeToByteArray())
|
||||||
assertEquals(envelope.meta, res.meta)
|
assertEquals(envelope.meta, res.meta)
|
||||||
val bytes = res.data?.read {
|
val bytes = res.data?.read {
|
||||||
readBytes()
|
readByteArray()
|
||||||
}
|
}
|
||||||
assertEquals("12345678", bytes?.decodeToString())
|
assertEquals("12345678", bytes?.decodeToString())
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.ByteReadPacket
|
import kotlinx.io.buffered
|
||||||
import io.ktor.utils.io.core.readBytes
|
import kotlinx.io.bytestring.encodeToByteString
|
||||||
import io.ktor.utils.io.core.readUTF8Line
|
import kotlinx.io.readByteArray
|
||||||
|
import kotlinx.io.readLine
|
||||||
import kotlin.test.Test
|
import kotlin.test.Test
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
import kotlin.test.assertFails
|
import kotlin.test.assertFails
|
||||||
@ -11,9 +12,9 @@ class IOTest {
|
|||||||
@Test
|
@Test
|
||||||
fun readBytes() {
|
fun readBytes() {
|
||||||
val bytes = ByteArray(8) { it.toByte() }
|
val bytes = ByteArray(8) { it.toByte() }
|
||||||
val input = ByteReadPacket(bytes)
|
val input = ByteArraySource(bytes).buffered()
|
||||||
@Suppress("UNUSED_VARIABLE") val first = input.readBytes(4)
|
@Suppress("UNUSED_VARIABLE") val first = input.readByteArray(4)
|
||||||
val second = input.readBytes(4)
|
val second = input.readByteArray(4)
|
||||||
assertEquals(4.toByte(), second[0])
|
assertEquals(4.toByte(), second[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -31,25 +32,25 @@ class IOTest {
|
|||||||
|
|
||||||
binary.read {
|
binary.read {
|
||||||
val array = ByteArray {
|
val array = ByteArray {
|
||||||
val read = readWithSeparatorTo(this, "---".encodeToByteArray()) + discardLine()
|
val read = readWithSeparatorTo(this, "---".encodeToByteString()) + discardLine()
|
||||||
assertEquals(12, read)
|
assertEquals(12, read)
|
||||||
}
|
}
|
||||||
assertEquals("""
|
assertEquals("""
|
||||||
aaa
|
aaa
|
||||||
bbb
|
bbb
|
||||||
""".trimIndent(),array.decodeToString().trim())
|
""".trimIndent(),array.decodeToString().trim())
|
||||||
assertEquals("ccc", readUTF8Line()?.trim())
|
assertEquals("ccc", readLine()?.trim())
|
||||||
}
|
}
|
||||||
|
|
||||||
assertFails {
|
assertFails {
|
||||||
binary.read {
|
binary.read {
|
||||||
discardWithSeparator("---".encodeToByteArray(), atMost = 3 )
|
discardWithSeparator("---".encodeToByteString(), atMost = 3 )
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assertFails {
|
assertFails {
|
||||||
binary.read{
|
binary.read{
|
||||||
discardWithSeparator("-+-".encodeToByteArray(), errorOnEof = true)
|
discardWithSeparator("-+-".encodeToByteString(), errorOnEof = true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
|
import kotlinx.io.writeString
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.meta.get
|
import space.kscience.dataforge.meta.get
|
||||||
import space.kscience.dataforge.meta.int
|
import space.kscience.dataforge.meta.int
|
||||||
@ -18,9 +19,9 @@ class MultipartTest {
|
|||||||
"value" put it
|
"value" put it
|
||||||
}
|
}
|
||||||
data {
|
data {
|
||||||
writeUtf8String("Hello World $it")
|
writeString("Hello World $it")
|
||||||
repeat(300) {
|
repeat(300) {
|
||||||
writeRawString("$it ")
|
writeString("$it ")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,11 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.ByteReadPacket
|
import kotlinx.io.buffered
|
||||||
import io.ktor.utils.io.core.use
|
|
||||||
|
|
||||||
|
|
||||||
fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = ByteArray {
|
fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = ByteArray {
|
||||||
writeObject(this, obj)
|
writeObject(this, obj)
|
||||||
}
|
}
|
||||||
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteReadPacket(array).use {
|
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteArraySource(array).buffered().use {
|
||||||
readObject(it)
|
readObject(it)
|
||||||
}
|
}
|
@ -1,8 +1,11 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.*
|
|
||||||
import io.ktor.utils.io.streams.asOutput
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
|
import kotlinx.io.Sink
|
||||||
|
import kotlinx.io.Source
|
||||||
|
import kotlinx.io.asSink
|
||||||
|
import kotlinx.io.buffered
|
||||||
import space.kscience.dataforge.meta.Meta
|
import space.kscience.dataforge.meta.Meta
|
||||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||||
import space.kscience.dataforge.meta.isEmpty
|
import space.kscience.dataforge.meta.isEmpty
|
||||||
@ -23,18 +26,18 @@ internal class PathBinary(
|
|||||||
override val size: Int = Files.size(path).toInt() - fileOffset,
|
override val size: Int = Files.size(path).toInt() - fileOffset,
|
||||||
) : Binary {
|
) : Binary {
|
||||||
|
|
||||||
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R = runBlocking {
|
override fun <R> read(offset: Int, atMost: Int, block: Source.() -> R): R = runBlocking {
|
||||||
readSuspend(offset, atMost, block)
|
readSuspend(offset, atMost, block)
|
||||||
}
|
}
|
||||||
|
|
||||||
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R {
|
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
|
||||||
val actualOffset = offset + fileOffset
|
val actualOffset = offset + fileOffset
|
||||||
val actualSize = min(atMost, size - offset)
|
val actualSize = min(atMost, size - offset)
|
||||||
val array = path.inputStream().use {
|
val array = path.inputStream().use {
|
||||||
it.skip(actualOffset.toLong())
|
it.skip(actualOffset.toLong())
|
||||||
it.readNBytes(actualSize)
|
it.readNBytes(actualSize)
|
||||||
}
|
}
|
||||||
return ByteReadPacket(array).block()
|
return ByteArraySource(array).buffered().use { it.block() }
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun view(offset: Int, binarySize: Int) = PathBinary(path, fileOffset + offset, binarySize)
|
override fun view(offset: Int, binarySize: Int) = PathBinary(path, fileOffset + offset, binarySize)
|
||||||
@ -42,36 +45,36 @@ internal class PathBinary(
|
|||||||
|
|
||||||
public fun Path.asBinary(): Binary = PathBinary(this)
|
public fun Path.asBinary(): Binary = PathBinary(this)
|
||||||
|
|
||||||
public fun <R> Path.read(block: Input.() -> R): R = asBinary().read(block = block)
|
public fun <R> Path.read(block: Source.() -> R): R = asBinary().read(block = block)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write a live output to a newly created file. If file does not exist, throws error
|
* Write a live output to a newly created file. If file does not exist, throws error
|
||||||
*/
|
*/
|
||||||
public fun Path.write(block: Output.() -> Unit): Unit {
|
public fun Path.write(block: Sink.() -> Unit): Unit {
|
||||||
val stream = Files.newOutputStream(this, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
val stream = Files.newOutputStream(this, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
||||||
stream.asOutput().use(block)
|
stream.asSink().buffered().use(block)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new file or append to exiting one with given output [block]
|
* Create a new file or append to exiting one with given output [block]
|
||||||
*/
|
*/
|
||||||
public fun Path.append(block: Output.() -> Unit): Unit {
|
public fun Path.append(block: Sink.() -> Unit): Unit {
|
||||||
val stream = Files.newOutputStream(
|
val stream = Files.newOutputStream(
|
||||||
this,
|
this,
|
||||||
StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE
|
StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE
|
||||||
)
|
)
|
||||||
stream.asOutput().use(block)
|
stream.asSink().buffered().use(block)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new file or replace existing one using given output [block]
|
* Create a new file or replace existing one using given output [block]
|
||||||
*/
|
*/
|
||||||
public fun Path.rewrite(block: Output.() -> Unit): Unit {
|
public fun Path.rewrite(block: Sink.() -> Unit): Unit {
|
||||||
val stream = Files.newOutputStream(
|
val stream = Files.newOutputStream(
|
||||||
this,
|
this,
|
||||||
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE
|
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE
|
||||||
)
|
)
|
||||||
stream.asOutput().use(block)
|
stream.asSink().buffered().use(block)
|
||||||
}
|
}
|
||||||
|
|
||||||
@DFExperimental
|
@DFExperimental
|
||||||
@ -260,7 +263,7 @@ public fun IOPlugin.writeEnvelopeDirectory(
|
|||||||
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
|
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
|
||||||
dataFile.write {
|
dataFile.write {
|
||||||
envelope.data?.read {
|
envelope.data?.read {
|
||||||
val copied = copyTo(this@write)
|
val copied = transferTo(this@write)
|
||||||
if (copied != envelope.data?.size?.toLong()) {
|
if (copied != envelope.data?.size?.toLong()) {
|
||||||
error("The number of copied bytes does not equal data size")
|
error("The number of copied bytes does not equal data size")
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,11 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Input
|
import kotlinx.io.Source
|
||||||
import io.ktor.utils.io.streams.asInput
|
import kotlinx.io.asSource
|
||||||
|
import kotlinx.io.buffered
|
||||||
|
|
||||||
public fun IOPlugin.resource(name: String): Binary? = context.javaClass.getResource(name)?.readBytes()?.asBinary()
|
|
||||||
|
|
||||||
public inline fun <R> IOPlugin.readResource(name: String, block: Input.() -> R): R =
|
public fun IOPlugin.resource(name: String): Binary? = { }.javaClass.getResource(name)?.readBytes()?.asBinary()
|
||||||
context.javaClass.getResource(name)?.openStream()?.asInput()?.block() ?: error("Can't read resource $name")
|
|
||||||
|
public inline fun <R> IOPlugin.readResource(name: String, block: Source.() -> R): R =
|
||||||
|
{ }.javaClass.getResource(name)?.openStream()?.asSource()?.buffered()?.block() ?: error("Can't read resource $name")
|
@ -1,6 +1,5 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.writeDouble
|
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package space.kscience.dataforge.io
|
package space.kscience.dataforge.io
|
||||||
|
|
||||||
import io.ktor.utils.io.core.writeDouble
|
|
||||||
import space.kscience.dataforge.context.Global
|
import space.kscience.dataforge.context.Global
|
||||||
import space.kscience.dataforge.misc.DFExperimental
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
|
@ -1,9 +1,5 @@
|
|||||||
package space.kscience.dataforge.workspace
|
package space.kscience.dataforge.workspace
|
||||||
|
|
||||||
import io.ktor.utils.io.core.Input
|
|
||||||
import io.ktor.utils.io.core.Output
|
|
||||||
import io.ktor.utils.io.core.readBytes
|
|
||||||
import io.ktor.utils.io.core.writeFully
|
|
||||||
import kotlinx.serialization.ExperimentalSerializationApi
|
import kotlinx.serialization.ExperimentalSerializationApi
|
||||||
import kotlinx.serialization.KSerializer
|
import kotlinx.serialization.KSerializer
|
||||||
import kotlinx.serialization.json.Json
|
import kotlinx.serialization.json.Json
|
||||||
|
Loading…
Reference in New Issue
Block a user