Merge branch 'kotlinx-io' into dev

# Conflicts:
#	dataforge-io/build.gradle.kts
#	dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/JsonMetaFormat.kt
#	gradle.properties
This commit is contained in:
Alexander Nozik 2023-10-08 16:55:47 +03:00
commit dc2bf5da83
29 changed files with 405 additions and 305 deletions

View File

@ -5,6 +5,7 @@
### Added ### Added
### Changed ### Changed
- Kotlin 1.9
### Deprecated ### Deprecated

View File

@ -1,11 +1,11 @@
import space.kscience.gradle.KScienceVersions
plugins { plugins {
id("space.kscience.gradle.mpp") id("space.kscience.gradle.mpp")
} }
description = "IO module" description = "IO module"
val ioVersion = "0.2.0"
kscience { kscience {
jvm() jvm()
js() js()
@ -15,11 +15,13 @@ kscience {
cbor() cbor()
} }
dependencies { dependencies {
api(project(":dataforge-context")) api(projects.dataforgeContext)
api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}") api("org.jetbrains.kotlinx:kotlinx-io-core:$ioVersion")
api("org.jetbrains.kotlinx:kotlinx-io-bytestring:$ioVersion")
//api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}")
} }
} }
readme{ readme{
maturity = space.kscience.gradle.Maturity.PROTOTYPE maturity = space.kscience.gradle.Maturity.EXPERIMENTAL
} }

View File

@ -1,7 +1,11 @@
package space.kscience.dataforge.io.yaml package space.kscience.dataforge.io.yaml
import io.ktor.utils.io.core.Input import kotlinx.io.Sink
import io.ktor.utils.io.core.Output import kotlinx.io.Source
import kotlinx.io.bytestring.ByteString
import kotlinx.io.bytestring.encodeToByteString
import kotlinx.io.readByteString
import kotlinx.io.writeString
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
@ -19,18 +23,18 @@ public class FrontMatterEnvelopeFormat(
var offset = 0 var offset = 0
offset += discardWithSeparator( offset += discardWithSeparator(
SEPARATOR.encodeToByteArray(), SEPARATOR,
atMost = 1024, atMost = 1024,
) )
val line = ByteArray { val line = ByteArray {
offset += readWithSeparatorTo(this, "\n".encodeToByteArray()) offset += readWithSeparatorTo(this, "\n".encodeToByteString())
}.decodeToString() }.decodeToString()
val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
val packet = ByteArray { val packet = ByteArray {
offset += readWithSeparatorTo(this, SEPARATOR.encodeToByteArray()) offset += readWithSeparatorTo(this, SEPARATOR)
} }
offset += discardLine() offset += discardLine()
@ -39,25 +43,25 @@ public class FrontMatterEnvelopeFormat(
Envelope(meta, binary.view(offset)) Envelope(meta, binary.view(offset))
} }
override fun readObject(input: Input): Envelope = readObject(input.readBinary()) override fun readObject(source: Source): Envelope = readObject(source.readBinary())
override fun writeObject( override fun writeObject(
output: Output, sink: Sink,
obj: Envelope, obj: Envelope,
) { ) {
val metaFormat = metaFormatFactory.build(io.context, meta) val metaFormat = metaFormatFactory.build(io.context, meta)
val formatSuffix = if (metaFormat is YamlMetaFormat) "" else metaFormatFactory.shortName val formatSuffix = if (metaFormat is YamlMetaFormat) "" else metaFormatFactory.shortName
output.writeRawString("$SEPARATOR${formatSuffix}\r\n") sink.writeString("$SEPARATOR${formatSuffix}\r\n")
metaFormat.run { metaFormat.writeObject(output, obj.meta) } metaFormat.run { metaFormat.writeObject(sink, obj.meta) }
output.writeRawString("$SEPARATOR\r\n") sink.writeString("$SEPARATOR\r\n")
//Printing data //Printing data
obj.data?.let { data -> obj.data?.let { data ->
output.writeBinary(data) sink.writeBinary(data)
} }
} }
public companion object : EnvelopeFormatFactory { public companion object : EnvelopeFormatFactory {
public const val SEPARATOR: String = "---" public val SEPARATOR: ByteString = "---".encodeToByteString()
private val metaTypeRegex = "---(\\w*)\\s*".toRegex() private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
@ -69,8 +73,8 @@ public class FrontMatterEnvelopeFormat(
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = binary.read { override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = binary.read {
//read raw string to avoid UTF issues //read raw string to avoid UTF issues
val line = readRawString(3) val line = readByteString(3)
return@read if (line == "---") { return@read if (line == "---".encodeToByteString()) {
default default
} else { } else {
null null
@ -82,12 +86,12 @@ public class FrontMatterEnvelopeFormat(
override fun readObject(binary: Binary): Envelope = default.readObject(binary) override fun readObject(binary: Binary): Envelope = default.readObject(binary)
override fun writeObject( override fun writeObject(
output: Output, sink: Sink,
obj: Envelope, obj: Envelope,
): Unit = default.writeObject(output, obj) ): Unit = default.writeObject(sink, obj)
override fun readObject(input: Input): Envelope = default.readObject(input) override fun readObject(source: Source): Envelope = default.readObject(source)
} }
} }

View File

@ -1,13 +1,13 @@
package space.kscience.dataforge.io.yaml package space.kscience.dataforge.io.yaml
import io.ktor.utils.io.core.Input import kotlinx.io.Sink
import io.ktor.utils.io.core.Output import kotlinx.io.Source
import kotlinx.io.readString
import kotlinx.io.writeString
import net.mamoe.yamlkt.* import net.mamoe.yamlkt.*
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.MetaFormat import space.kscience.dataforge.io.MetaFormat
import space.kscience.dataforge.io.MetaFormatFactory import space.kscience.dataforge.io.MetaFormatFactory
import space.kscience.dataforge.io.readUtf8String
import space.kscience.dataforge.io.writeUtf8String
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.descriptors.get import space.kscience.dataforge.meta.descriptors.get
@ -89,14 +89,14 @@ public fun YamlMap.toMeta(): Meta = YamlMeta(this)
*/ */
public class YamlMetaFormat(private val meta: Meta) : MetaFormat { public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?) { override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?) {
val yaml: YamlMap = meta.toYaml() val yaml: YamlMap = meta.toYaml()
val string = Yaml.encodeToString(YamlMap.serializer(), yaml) val string = Yaml.encodeToString(YamlMap.serializer(), yaml)
output.writeUtf8String(string) sink.writeString(string)
} }
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta { override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta {
val yaml = Yaml.decodeYamlMapFromString(input.readUtf8String()) val yaml = Yaml.decodeYamlMapFromString(source.readString())
return yaml.toMeta() return yaml.toMeta()
} }
@ -109,10 +109,10 @@ public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
private val default = YamlMetaFormat(Meta.EMPTY) private val default = YamlMetaFormat(Meta.EMPTY)
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?): Unit = override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?): Unit =
default.writeMeta(output, meta, descriptor) default.writeMeta(sink, meta, descriptor)
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta = override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta =
default.readMeta(input, descriptor) default.readMeta(source, descriptor)
} }
} }

View File

@ -1,6 +1,9 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.* import kotlinx.io.Sink
import kotlinx.io.Source
import kotlinx.io.buffered
import kotlinx.io.readByteArray
import kotlin.math.min import kotlin.math.min
/** /**
@ -17,13 +20,13 @@ public interface Binary {
* Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed * Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed
* when leaving scope, so it could not be leaked outside of scope of [block]. * when leaving scope, so it could not be leaked outside of scope of [block].
*/ */
public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Input.() -> R): R public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Source.() -> R): R
public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Input.() -> R): R public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Source.() -> R): R
/** /**
* Read a binary with given [offset] relative to this binary and given [binarySize]. * Read a binary with given [offset] relative to this binary and given [binarySize].
* In general, resulting binary is of the same type as this one, but it is not guaranteed. * In general, the resulting binary is of the same type as this one, but it is not guaranteed.
*/ */
public fun view(offset: Int, binarySize: Int = size - offset): Binary public fun view(offset: Int, binarySize: Int = size - offset): Binary
@ -38,25 +41,27 @@ internal class ByteArrayBinary(
override val size: Int = array.size - start, override val size: Int = array.size - start,
) : Binary { ) : Binary {
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R { override fun <R> read(offset: Int, atMost: Int, block: Source.() -> R): R {
require(offset >= 0) { "Offset must be positive" } require(offset >= 0) { "Offset must be positive" }
require(offset < array.size) { "Offset $offset is larger than array size" } require(offset < array.size) { "Offset $offset is larger than array size" }
val input = ByteReadPacket(
return ByteArraySource(
array, array,
offset + start, offset + start,
min(atMost, size - offset) min(atMost, size - offset)
) ).buffered().use(block)
return input.use(block)
} }
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R { override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
require(offset >= 0) { "Offset must be positive" } require(offset >= 0) { "Offset must be positive" }
require(offset < array.size) { "Offset $offset is larger than array size" } require(offset < array.size) { "Offset $offset is larger than array size" }
val input = ByteReadPacket(
val input = ByteArraySource(
array, array,
offset + start, offset + start,
min(atMost, size - offset) min(atMost, size - offset)
) ).buffered()
return try { return try {
block(input) block(input)
} finally { } finally {
@ -77,26 +82,26 @@ public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here? array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here?
} else { } else {
read { read {
readBytes() readByteArray()
} }
} }
//TODO optimize for file-based Inputs //TODO optimize for file-based Inputs
public fun Input.readBinary(size: Int? = null): Binary { public fun Source.readBinary(size: Int? = null): Binary {
val array = if (size == null) readBytes() else readBytes(size) val array = if (size == null) readByteArray() else readByteArray(size)
return ByteArrayBinary(array) return ByteArrayBinary(array)
} }
/** /**
* Direct write of binary to the output. Returns the number of bytes written * Direct write of binary to the output. Returns the number of bytes written
*/ */
public fun Output.writeBinary(binary: Binary): Int { public fun Sink.writeBinary(binary: Binary): Int {
return if (binary is ByteArrayBinary) { return if (binary is ByteArrayBinary) {
writeFully(binary.array, binary.start, binary.start + binary.size) write(binary.array, binary.start, binary.start + binary.size)
binary.size binary.size
} else { } else {
binary.read { binary.read {
copyTo(this@writeBinary).toInt() transferTo(this@writeBinary).toInt()
} }
} }
} }

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Output import kotlinx.io.Sink
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
public class EnvelopeBuilder : Envelope { public class EnvelopeBuilder : Envelope {
@ -33,7 +33,7 @@ public class EnvelopeBuilder : Envelope {
/** /**
* Construct a data binary from given builder * Construct a data binary from given builder
*/ */
public inline fun data(block: Output.() -> Unit) { public inline fun data(block: Sink.() -> Unit) {
data = ByteArray { block() }.asBinary() data = ByteArray { block() }.asBinary()
} }

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Input import kotlinx.io.Source
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -15,7 +15,7 @@ public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>() override val type: KType get() = typeOf<Envelope>()
} }
public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input) public fun EnvelopeFormat.read(input: Source): Envelope = readObject(input)
@Type(ENVELOPE_FORMAT_TYPE) @Type(ENVELOPE_FORMAT_TYPE)
public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat { public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat {

View File

@ -1,5 +1,8 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import kotlinx.io.bytestring.ByteString
import kotlinx.io.bytestring.decodeToString
import kotlinx.io.write
import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY
import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR
import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE
@ -20,7 +23,7 @@ private class PartDescriptor : Scheme() {
val PARTS_KEY = MULTIPART_KEY + "parts" val PARTS_KEY = MULTIPART_KEY + "parts"
val SEPARATOR_KEY = MULTIPART_KEY + "separator" val SEPARATOR_KEY = MULTIPART_KEY + "separator"
const val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n" val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n".toAsciiByteString()
const val MULTIPART_DATA_TYPE = "envelope.multipart" const val MULTIPART_DATA_TYPE = "envelope.multipart"
} }
@ -32,12 +35,12 @@ public typealias EnvelopeParts = List<EnvelopePart>
public fun EnvelopeBuilder.multipart( public fun EnvelopeBuilder.multipart(
parts: EnvelopeParts, parts: EnvelopeParts,
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR,
) { ) {
dataType = MULTIPART_DATA_TYPE dataType = MULTIPART_DATA_TYPE
var offsetCounter = 0 var offsetCounter = 0
val separatorSize = separator.length val separatorSize = separator.size
val partDescriptors = parts.map { (binary, description) -> val partDescriptors = parts.map { (binary, description) ->
offsetCounter += separatorSize offsetCounter += separatorSize
PartDescriptor { PartDescriptor {
@ -51,14 +54,14 @@ public fun EnvelopeBuilder.multipart(
meta { meta {
if (separator != DEFAULT_MULTIPART_DATA_SEPARATOR) { if (separator != DEFAULT_MULTIPART_DATA_SEPARATOR) {
SEPARATOR_KEY put separator SEPARATOR_KEY put separator.decodeToString()
} }
setIndexed(PARTS_KEY, partDescriptors.map { it.toMeta() }) setIndexed(PARTS_KEY, partDescriptors.map { it.toMeta() })
} }
data { data {
parts.forEach { parts.forEach {
writeRawString(separator) write(separator)
writeBinary(it.binary) writeBinary(it.binary)
} }
} }
@ -69,7 +72,7 @@ public fun EnvelopeBuilder.multipart(
*/ */
public fun EnvelopeBuilder.envelopes( public fun EnvelopeBuilder.envelopes(
envelopes: List<Envelope>, envelopes: List<Envelope>,
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR,
) { ) {
val parts = envelopes.map { val parts = envelopes.map {
val binary = Binary(it, TaggedEnvelopeFormat) val binary = Binary(it, TaggedEnvelopeFormat)

View File

@ -1,6 +1,8 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.* import kotlinx.io.Sink
import kotlinx.io.Source
import kotlinx.io.readByteArray
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
@ -21,7 +23,7 @@ public interface IOReader<out T> {
*/ */
public val type: KType public val type: KType
public fun readObject(input: Input): T public fun readObject(source: Source): T
public fun readObject(binary: Binary): T = binary.read { readObject(this) } public fun readObject(binary: Binary): T = binary.read { readObject(this) }
@ -32,21 +34,21 @@ public interface IOReader<out T> {
public val binary: IOReader<Binary> = object : IOReader<Binary> { public val binary: IOReader<Binary> = object : IOReader<Binary> {
override val type: KType = typeOf<Binary>() override val type: KType = typeOf<Binary>()
override fun readObject(input: Input): Binary = input.readBytes().asBinary() override fun readObject(source: Source): Binary = source.readByteArray().asBinary()
override fun readObject(binary: Binary): Binary = binary override fun readObject(binary: Binary): Binary = binary
} }
} }
} }
public inline fun <reified T> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> { public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
override val type: KType = typeOf<T>() override val type: KType = typeOf<T>()
override fun readObject(input: Input): T = input.read() override fun readObject(source: Source): T = source.read()
} }
public fun interface IOWriter<in T> { public fun interface IOWriter<in T> {
public fun writeObject(output: Output, obj: T) public fun writeObject(sink: Sink, obj: T)
} }
/** /**
@ -54,20 +56,20 @@ public fun interface IOWriter<in T> {
*/ */
public interface IOFormat<T> : IOReader<T>, IOWriter<T> public interface IOFormat<T> : IOReader<T>, IOWriter<T>
public fun <T : Any> Input.readObject(format: IOReader<T>): T = format.readObject(this@readObject) public fun <T : Any> Source.readObject(format: IOReader<T>): T = format.readObject(this@readObject)
public fun <T : Any> IOFormat<T>.readObjectFrom(binary: Binary): T = binary.read { public fun <T : Any> IOFormat<T>.readObjectFrom(binary: Binary): T = binary.read {
readObject(this) readObject(this)
} }
/** /**
* Read given binary as object using given format * Read given binary as an object using given format
*/ */
public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read { public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read {
readObject(format) readObject(format)
} }
public fun <T : Any> Output.writeObject(format: IOWriter<T>, obj: T): Unit = public fun <T : Any> Sink.writeObject(format: IOWriter<T>, obj: T): Unit =
format.writeObject(this@writeObject, obj) format.writeObject(this@writeObject, obj)
@ -94,9 +96,9 @@ public object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
override val type: KType get() = typeOf<Double>() override val type: KType get() = typeOf<Double>()
override fun writeObject(output: Output, obj: Double) { override fun writeObject(sink: Sink, obj: Double) {
output.writeDouble(obj) sink.writeLong(obj.toBits())
} }
override fun readObject(input: Input): Double = input.readDouble() override fun readObject(source: Source): Double = Double.fromBits(source.readLong())
} }

View File

@ -1,10 +1,10 @@
@file:Suppress("UNUSED_PARAMETER")
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Input import kotlinx.io.Sink
import io.ktor.utils.io.core.Output import kotlinx.io.Source
import kotlinx.io.readString
import kotlinx.io.writeString
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonElement
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
@ -18,13 +18,13 @@ import space.kscience.dataforge.meta.toMeta
*/ */
public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat { public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat {
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?) { override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?) {
val jsonElement = meta.toJson(descriptor) val jsonElement = meta.toJson(descriptor)
output.writeUtf8String(json.encodeToString(JsonElement.serializer(), jsonElement)) sink.writeString(json.encodeToString(JsonElement.serializer(), jsonElement))
} }
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta { override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta {
val str = input.readUtf8String()//readByteArray().decodeToString() val str = source.readString()
val jsonElement = json.parseToJsonElement(str) val jsonElement = json.parseToJsonElement(str)
return jsonElement.toMeta(descriptor) return jsonElement.toMeta(descriptor)
} }
@ -39,10 +39,10 @@ public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat
private val default = JsonMetaFormat() private val default = JsonMetaFormat()
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?): Unit = override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?): Unit =
default.run { writeMeta(output, meta, descriptor) } default.run { writeMeta(sink, meta, descriptor) }
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta = override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta =
default.run { readMeta(input, descriptor) } default.run { readMeta(source, descriptor) }
} }
} }

View File

@ -1,9 +1,9 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.Input import kotlinx.io.Sink
import io.ktor.utils.io.core.Output import kotlinx.io.Source
import io.ktor.utils.io.core.use import kotlinx.io.buffered
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
@ -23,19 +23,19 @@ public interface MetaFormat : IOFormat<Meta> {
override val type: KType get() = typeOf<Meta>() override val type: KType get() = typeOf<Meta>()
override fun writeObject(output: Output, obj: Meta) { override fun writeObject(sink: Sink, obj: Meta) {
writeMeta(output, obj, null) writeMeta(sink, obj, null)
} }
override fun readObject(input: Input): Meta = readMeta(input) override fun readObject(source: Source): Meta = readMeta(source)
public fun writeMeta( public fun writeMeta(
output: Output, sink: Sink,
meta: Meta, meta: Meta,
descriptor: MetaDescriptor? = null, descriptor: MetaDescriptor? = null,
) )
public fun readMeta(input: Input, descriptor: MetaDescriptor? = null): Meta public fun readMeta(source: Source, descriptor: MetaDescriptor? = null): Meta
} }
@Type(META_FORMAT_TYPE) @Type(META_FORMAT_TYPE)
@ -63,9 +63,7 @@ public fun Meta.toString(format: MetaFormat): String = ByteArray {
public fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory.build(Global, Meta.EMPTY)) public fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory.build(Global, Meta.EMPTY))
public fun MetaFormat.parse(str: String): Meta { public fun MetaFormat.parse(str: String): Meta = readObject(StringSource(str).buffered())
return ByteReadPacket(str.encodeToByteArray()).use { readObject(it) }
}
public fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = build(Global, formatMeta).parse(str) public fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = build(Global, formatMeta).parse(str)

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.* import kotlinx.io.*
import kotlinx.io.bytestring.decodeToString
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -18,7 +19,7 @@ import space.kscience.dataforge.names.plus
public class TaggedEnvelopeFormat( public class TaggedEnvelopeFormat(
public val io: IOPlugin, public val io: IOPlugin,
public val version: VERSION = VERSION.DF02, public val version: VERSION = VERSION.DF02,
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat,
) : EnvelopeFormat { ) : EnvelopeFormat {
// private val metaFormat = io.metaFormat(metaFormatKey) // private val metaFormat = io.metaFormat(metaFormatKey)
@ -26,54 +27,55 @@ public class TaggedEnvelopeFormat(
private fun Tag.toBinary() = Binary { private fun Tag.toBinary() = Binary {
writeRawString(START_SEQUENCE) write(START_SEQUENCE)
writeRawString(version.name) writeString(version.name)
writeShort(metaFormatKey) writeShort(metaFormatKey)
writeUInt(metaSize) writeUInt(metaSize)
when (version) { when (version) {
VERSION.DF02 -> { VERSION.DF02 -> {
writeUInt(dataSize.toUInt()) writeUInt(dataSize.toUInt())
} }
VERSION.DF03 -> { VERSION.DF03 -> {
writeULong(dataSize) writeULong(dataSize)
} }
} }
writeRawString(END_SEQUENCE) write(END_SEQUENCE)
} }
override fun writeObject( override fun writeObject(
output: Output, sink: Sink,
obj: Envelope, obj: Envelope,
) { ) {
val metaFormat = metaFormatFactory.build(io.context, Meta.EMPTY) val metaFormat = metaFormatFactory.build(io.context, Meta.EMPTY)
val metaBytes = Binary(obj.meta, metaFormat) val metaBytes = Binary(obj.meta, metaFormat)
val actualSize: ULong = (obj.data?.size ?: 0).toULong() val actualSize: ULong = (obj.data?.size ?: 0).toULong()
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize) val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
output.writeBinary(tag.toBinary()) sink.writeBinary(tag.toBinary())
output.writeBinary(metaBytes) sink.writeBinary(metaBytes)
output.writeRawString("\r\n") sink.writeString("\r\n")
obj.data?.let { obj.data?.let {
output.writeBinary(it) sink.writeBinary(it)
} }
} }
/** /**
* Read an envelope from input into memory * Read an envelope from input into memory
* *
* @param input an input to read from * @param source an input to read from
* @param formats a collection of meta formats to resolve * @param formats a collection of meta formats to resolve
*/ */
override fun readObject(input: Input): Envelope { override fun readObject(source: Source): Envelope {
val tag = input.readTag(this.version) val tag = source.readTag(this.version)
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey) val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
?: error("Meta format with key ${tag.metaFormatKey} not found") ?: error("Meta format with key ${tag.metaFormatKey} not found")
val metaBinary = input.readBinary(tag.metaSize.toInt()) val metaBinary = source.readBinary(tag.metaSize.toInt())
val meta: Meta = metaFormat.readObjectFrom(metaBinary) val meta: Meta = metaFormat.readObjectFrom(metaBinary)
val data = input.readBinary(tag.dataSize.toInt()) val data = source.readBinary(tag.dataSize.toInt())
return SimpleEnvelope(meta, data) return SimpleEnvelope(meta, data)
} }
@ -104,8 +106,8 @@ public class TaggedEnvelopeFormat(
} }
public companion object : EnvelopeFormatFactory { public companion object : EnvelopeFormatFactory {
private const val START_SEQUENCE = "#~" private val START_SEQUENCE = "#~".toAsciiByteString()
private const val END_SEQUENCE = "~#\r\n" private val END_SEQUENCE = "~#\r\n".toAsciiByteString()
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "tagged" override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "tagged"
@ -121,27 +123,26 @@ public class TaggedEnvelopeFormat(
return TaggedEnvelopeFormat(io, version) return TaggedEnvelopeFormat(io, version)
} }
private fun Input.readTag(version: VERSION): Tag { private fun Source.readTag(version: VERSION): Tag {
val start = readRawString(2) val start = readByteString(2)
if (start != START_SEQUENCE) error("The input is not an envelope") if (start != START_SEQUENCE) error("The input is not an envelope")
val versionString = readRawString(4) val versionString = readByteString(4)
if (version.name != versionString) error("Wrong version of DataForge: expected $version but found $versionString") if (version.name.toAsciiByteString() != versionString) error("Wrong version of DataForge: expected $version but found $versionString")
val metaFormatKey = readShort() val metaFormatKey = readShort()
val metaLength = readUInt() val metaLength = readUInt()
val dataLength: ULong = when (version) { val dataLength: ULong = when (version) {
VERSION.DF02 -> readUInt().toULong() VERSION.DF02 -> readUInt().toULong()
VERSION.DF03 -> readULong() VERSION.DF03 -> readULong()
} }
val end = readRawString(4) val end = readByteString(4)
if (end != END_SEQUENCE) error("The input is not an envelope") if (end != END_SEQUENCE) error("The input is not an envelope")
return Tag(metaFormatKey, metaLength, dataLength) return Tag(metaFormatKey, metaLength, dataLength)
} }
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? { override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = try {
return try {
binary.read { binary.read {
val header = readRawString(6) val header = readByteString(6)
return@read when (header.substring(2..5)) { when (header.substring(2, 6).decodeToString()) {
VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02) VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02)
VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03) VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03)
else -> null else -> null
@ -150,7 +151,6 @@ public class TaggedEnvelopeFormat(
} catch (ex: Exception) { } catch (ex: Exception) {
null null
} }
}
private val default by lazy { build(Global, Meta.EMPTY) } private val default by lazy { build(Global, Meta.EMPTY) }
@ -158,16 +158,13 @@ public class TaggedEnvelopeFormat(
default.run { readObject(binary) } default.run { readObject(binary) }
override fun writeObject( override fun writeObject(
output: Output, sink: Sink,
obj: Envelope, obj: Envelope,
): Unit = default.run { ): Unit = default.run {
writeObject( writeObject(sink, obj)
output,
obj,
)
} }
override fun readObject(input: Input): Envelope = default.readObject(input) override fun readObject(source: Source): Envelope = default.readObject(source)
} }
} }

View File

@ -1,9 +1,9 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket
import io.ktor.utils.io.core.Input import kotlinx.io.*
import io.ktor.utils.io.core.Output import kotlinx.io.bytestring.ByteString
import io.ktor.utils.io.core.readUTF8UntilDelimiterTo import kotlinx.io.bytestring.encodeToByteString
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -29,34 +29,35 @@ public class TaglessEnvelopeFormat(
// } // }
override fun writeObject( override fun writeObject(
output: Output, sink: Sink,
obj: Envelope, obj: Envelope,
) { ) {
val metaFormat = metaFormatFactory.build(this.io.context, meta) val metaFormat = metaFormatFactory.build(this.io.context, meta)
//printing header //printing header
output.writeRawString(TAGLESS_ENVELOPE_HEADER + "\r\n") sink.write(TAGLESS_ENVELOPE_HEADER)
sink.writeString("\r\n")
//Printing meta //Printing meta
if (!obj.meta.isEmpty()) { if (!obj.meta.isEmpty()) {
val metaBinary = Binary(obj.meta, metaFormat) val metaBinary = Binary(obj.meta, metaFormat)
output.writeUtf8String(META_START + "-${metaFormatFactory.shortName}\r\n") sink.writeString(META_START + "-${metaFormatFactory.shortName}\r\n")
output.writeBinary(metaBinary) sink.writeBinary(metaBinary)
output.writeRawString("\r\n") sink.writeString("\r\n")
} }
//Printing data //Printing data
obj.data?.let { data -> obj.data?.let { data ->
//val actualSize: Int = envelope.data?.size ?: 0 //val actualSize: Int = envelope.data?.size ?: 0
output.writeUtf8String(DATA_START + "\r\n") sink.writeString(DATA_START + "\r\n")
output.writeBinary(data) sink.writeBinary(data)
} }
} }
override fun readObject(input: Input): Envelope { override fun readObject(source: Source): Envelope {
//read preamble //read preamble
input.discardWithSeparator( source.discardWithSeparator(
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(), TAGLESS_ENVELOPE_HEADER,
atMost = 1024, atMost = 1024,
) )
@ -64,22 +65,22 @@ public class TaglessEnvelopeFormat(
var data: Binary? = null var data: Binary? = null
input.discardWithSeparator( source.discardWithSeparator(
SEPARATOR_PREFIX, SEPARATOR_PREFIX,
atMost = 1024, atMost = 1024,
) )
var header: String = ByteArray { var header: String = ByteArray {
input.readUTF8UntilDelimiterTo(this, "\n") source.readWithSeparatorTo(this, "\n".encodeToByteString())
}.decodeToString() }.decodeToString()
while (!input.endOfInput) { while (!source.exhausted()) {
val block = ByteArray { val block = ByteArray {
input.readWithSeparatorTo(this, SEPARATOR_PREFIX) source.readWithSeparatorTo(this, SEPARATOR_PREFIX)
} }
val nextHeader = ByteArray { val nextHeader = ByteArray {
input.readWithSeparatorTo(this, "\n".encodeToByteArray()) source.readWithSeparatorTo(this, "\n".encodeToByteString())
}.decodeToString() }.decodeToString()
//terminate on end //terminate on end
@ -89,7 +90,7 @@ public class TaglessEnvelopeFormat(
if (header.startsWith("META")) { if (header.startsWith("META")) {
//TODO check format //TODO check format
val metaFormat: MetaFormatFactory = JsonMetaFormat val metaFormat: MetaFormatFactory = JsonMetaFormat
meta = metaFormat.readMeta(ByteReadPacket(block)) meta = metaFormat.readMeta(ByteArraySource(block).buffered())
} }
if (header.startsWith("DATA")) { if (header.startsWith("DATA")) {
@ -111,9 +112,9 @@ public class TaglessEnvelopeFormat(
public const val TAGLESS_ENVELOPE_TYPE: String = "tagless" public const val TAGLESS_ENVELOPE_TYPE: String = "tagless"
public val SEPARATOR_PREFIX: ByteArray = "\n#~".encodeToByteArray() public val SEPARATOR_PREFIX: ByteString = "\n#~".encodeToByteString()
public const val TAGLESS_ENVELOPE_HEADER: String = "#~DFTL" public val TAGLESS_ENVELOPE_HEADER: ByteString = "#~DFTL".encodeToByteString()
// public const val META_START_PROPERTY: String = "metaSeparator" // public const val META_START_PROPERTY: String = "metaSeparator"
public const val META_START: String = "#~META" public const val META_START: String = "#~META"
@ -134,21 +135,18 @@ public class TaglessEnvelopeFormat(
override fun readObject(binary: Binary): Envelope = default.run { readObject(binary) } override fun readObject(binary: Binary): Envelope = default.run { readObject(binary) }
override fun writeObject( override fun writeObject(
output: Output, sink: Sink,
obj: Envelope, obj: Envelope,
): Unit = default.run { ): Unit = default.run {
writeObject( writeObject(sink, obj)
output,
obj,
)
} }
override fun readObject(input: Input): Envelope = default.readObject(input) override fun readObject(source: Source): Envelope = default.readObject(source)
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? { override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
return try { return try {
binary.read { binary.read {
val string = readRawString(TAGLESS_ENVELOPE_HEADER.length) val string = readByteString(TAGLESS_ENVELOPE_HEADER.size)
return@read if (string == TAGLESS_ENVELOPE_HEADER) { return@read if (string == TAGLESS_ENVELOPE_HEADER) {
TaglessEnvelopeFormat(io) TaglessEnvelopeFormat(io)
} else { } else {

View File

@ -1,40 +1,41 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.bits.Memory import kotlinx.io.*
import io.ktor.utils.io.charsets.Charsets import kotlinx.io.bytestring.ByteString
import io.ktor.utils.io.charsets.decodeExactBytes import kotlinx.io.bytestring.decodeToString
import io.ktor.utils.io.core.* import kotlinx.io.bytestring.encodeToByteString
import io.ktor.utils.io.core.internal.ChunkBuffer
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.math.min
public fun Output.writeRawString(str: String) { /**
writeFully(str.toByteArray(Charsets.ISO_8859_1)) * Convert a string literal, containing only ASCII characters to a [ByteString].
* Throws an error if there are non-ASCII characters.
*/
public fun String.toAsciiByteString(): ByteString {
val bytes = ByteArray(length) {
val char = get(it)
val code = char.code
if (code > Byte.MAX_VALUE) error("Symbol $char is not ASCII symbol") else code.toByte()
}
return ByteString(bytes)
} }
public fun Output.writeUtf8String(str: String) { public inline fun Buffer(block: Sink.() -> Unit): Buffer = Buffer().apply(block)
writeFully(str.encodeToByteArray())
}
public fun Input.readRawString(size: Int): String { //public fun Source.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found")
return Charsets.ISO_8859_1.newDecoder().decodeExactBytes(this, size)
}
public fun Input.readUtf8String(): String = readBytes().decodeToString() public inline fun ByteArray(block: Sink.() -> Unit): ByteArray =
Buffer(block).readByteArray()
public fun Input.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found") public inline fun Binary(block: Sink.() -> Unit): Binary =
public inline fun ByteArray(block: Output.() -> Unit): ByteArray =
buildPacket(block).readBytes()
public inline fun Binary(block: Output.() -> Unit): Binary =
ByteArray(block).asBinary() ByteArray(block).asBinary()
public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first) public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first)
/** /**
* Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If * Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If
* multiple formats accepts binary, throw an error. * multiple formats accept binary, throw an error.
*/ */
public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? { public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? {
val formats = envelopeFormatFactories.mapNotNull { factory -> val formats = envelopeFormatFactories.mapNotNull { factory ->
@ -96,74 +97,139 @@ private class RingByteArray(
else -> inputArray.indices.all { inputArray[it] == get(it) } else -> inputArray.indices.all { inputArray[it] == get(it) }
} }
fun contentEquals(byteString: ByteString): Boolean = when {
byteString.size != buffer.size -> false
size < buffer.size -> false
else -> (0 until byteString.size).all { byteString[it] == get(it) }
}
} }
private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) } private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) }
/** /**
* Read [Input] into [output] until designated multibyte [separator] and optionally continues until * Read [Source] into [output] until designated multibyte [separator] and optionally continues until
* the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read. * the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read.
* Also fails if [separator] not found until the end of input. * Also fails if [separator] not found until the end of input.
* *
* Separator itself is not read into Output. * The Separator itself is not read into [Sink].
* *
* @param errorOnEof if true error is thrown if separator is never encountered * @param errorOnEof if true error is thrown if separator is never encountered
* *
* @return bytes actually being read, including separator * @return bytes actually being read, including separator
*/ */
public fun Input.readWithSeparatorTo( public fun Source.readWithSeparatorTo(
output: Output, output: Sink?,
separator: ByteArray, separator: ByteString,
atMost: Int = Int.MAX_VALUE, atMost: Int = Int.MAX_VALUE,
errorOnEof: Boolean = false, errorOnEof: Boolean = false,
): Int { ): Int {
var counter = 0 var counter = 0
val rb = RingByteArray(ByteArray(separator.size)) val rb = RingByteArray(ByteArray(separator.size))
takeWhile { buffer ->
while (buffer.canRead()) { while (!exhausted()) {
val byte = buffer.readByte() val byte = readByte()
counter++ counter++
if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.") if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.")
rb.push(byte) rb.push(byte)
if (rb.contentEquals(separator)) { if (rb.contentEquals(separator)) {
return counter return counter
} else if (rb.isFull()) { } else if (rb.isFull()) {
output.writeByte(rb[0]) output?.writeByte(rb[0])
} }
} }
!endOfInput
}
if (errorOnEof) { if (errorOnEof) {
error("Read to the end of input without encountering ${separator.decodeToString()}") error("Read to the end of input without encountering ${separator.decodeToString()}")
} else { } else {
for (i in 1 until rb.size) { for (i in 1 until rb.size) {
output.writeByte(rb[i]) output?.writeByte(rb[i])
} }
counter += (rb.size - 1) counter += (rb.size - 1)
return counter return counter
} }
} }
public fun Input.discardLine(): Int { /**
return discardUntilDelimiter('\n'.code.toByte()).also { * Discard all bytes until [separator] is encountered. Separator is discarded sa well.
discard(1) * Return the total number of bytes read.
}.toInt() + 1 */
} public fun Source.discardWithSeparator(
separator: ByteString,
public fun Input.discardWithSeparator(
separator: ByteArray,
atMost: Int = Int.MAX_VALUE, atMost: Int = Int.MAX_VALUE,
errorOnEof: Boolean = false, errorOnEof: Boolean = false,
): Int { ): Int = readWithSeparatorTo(null, separator, atMost, errorOnEof)
val dummy: Output = object : Output(ChunkBuffer.Pool) {
override fun closeDestination() { /**
* Discard all symbol until newline is discovered. Carriage return is not discarded.
*/
public fun Source.discardLine(
atMost: Int = Int.MAX_VALUE,
errorOnEof: Boolean = false,
): Int = discardWithSeparator("\n".encodeToByteString(), atMost, errorOnEof)
/**
* A [Source] based on [ByteArray]
*/
public class ByteArraySource(
private val byteArray: ByteArray,
private val offset: Int = 0,
private val size: Int = byteArray.size - offset,
) : RawSource {
init {
require(offset >= 0) { "Offset must be positive" }
require(offset + size <= byteArray.size) { "End index is ${offset + size}, but the array size is ${byteArray.size}" }
}
private var pointer = offset
override fun close() {
// Do nothing // Do nothing
} }
override fun flush(source: Memory, offset: Int, length: Int) { override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
// Do nothing if (pointer == offset + size) return -1
val byteRead = min(byteCount.toInt(), (size + offset - pointer))
sink.write(byteArray, pointer, pointer + byteRead)
pointer += byteRead
return byteRead.toLong()
} }
} }
return readWithSeparatorTo(dummy, separator, atMost, errorOnEof) /**
* A [Source] based on [String]
*/
public class StringSource(
public val string: String,
public val offset: Int = 0,
public val size: Int = string.length - offset,
) : RawSource {
private var pointer = offset
override fun close() {
// Do nothing
} }
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
if (pointer == offset + size) return -1
val byteRead = min(byteCount.toInt(), (size + offset - pointer))
sink.writeString(string, pointer, pointer + byteRead)
pointer += byteRead
return byteRead.toLong()
}
}
public fun Sink.writeDouble(value: Double) {
writeLong(value.toBits())
}
public fun Source.readDouble(): Double = Double.fromBits(readLong())
public fun Sink.writeFloat(value: Float) {
writeInt(value.toBits())
}
public fun Source.readFloat(): Float = Float.fromBits(readInt())

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.readInt
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.readBytes import kotlinx.io.readByteArray
import kotlinx.io.writeString
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -12,7 +13,7 @@ class EnvelopeFormatTest {
"d" put 22.2 "d" put 22.2
} }
data { data {
writeUtf8String("12345678") writeString("12345678")
} }
} }
@ -22,7 +23,7 @@ class EnvelopeFormatTest {
val res = readFromByteArray(byteArray) val res = readFromByteArray(byteArray)
assertEquals(envelope.meta, res.meta) assertEquals(envelope.meta, res.meta)
val bytes = res.data?.read { val bytes = res.data?.read {
readBytes() readByteArray()
} }
assertEquals("12345678", bytes?.decodeToString()) assertEquals("12345678", bytes?.decodeToString())
} }
@ -34,7 +35,7 @@ class EnvelopeFormatTest {
val res = readFromByteArray(byteArray) val res = readFromByteArray(byteArray)
assertEquals(envelope.meta, res.meta) assertEquals(envelope.meta, res.meta)
val bytes = res.data?.read { val bytes = res.data?.read {
readBytes() readByteArray()
} }
assertEquals("12345678", bytes?.decodeToString()) assertEquals("12345678", bytes?.decodeToString())
} }
@ -56,7 +57,7 @@ class EnvelopeFormatTest {
val res = TaglessEnvelopeFormat.readFromByteArray(envelopeString.encodeToByteArray()) val res = TaglessEnvelopeFormat.readFromByteArray(envelopeString.encodeToByteArray())
assertEquals(envelope.meta, res.meta) assertEquals(envelope.meta, res.meta)
val bytes = res.data?.read { val bytes = res.data?.read {
readBytes() readByteArray()
} }
assertEquals("12345678", bytes?.decodeToString()) assertEquals("12345678", bytes?.decodeToString())
} }

View File

@ -1,8 +1,9 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket import kotlinx.io.buffered
import io.ktor.utils.io.core.readBytes import kotlinx.io.bytestring.encodeToByteString
import io.ktor.utils.io.core.readUTF8Line import kotlinx.io.readByteArray
import kotlinx.io.readLine
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFails import kotlin.test.assertFails
@ -11,9 +12,9 @@ class IOTest {
@Test @Test
fun readBytes() { fun readBytes() {
val bytes = ByteArray(8) { it.toByte() } val bytes = ByteArray(8) { it.toByte() }
val input = ByteReadPacket(bytes) val input = ByteArraySource(bytes).buffered()
@Suppress("UNUSED_VARIABLE") val first = input.readBytes(4) @Suppress("UNUSED_VARIABLE") val first = input.readByteArray(4)
val second = input.readBytes(4) val second = input.readByteArray(4)
assertEquals(4.toByte(), second[0]) assertEquals(4.toByte(), second[0])
} }
@ -31,25 +32,25 @@ class IOTest {
binary.read { binary.read {
val array = ByteArray { val array = ByteArray {
val read = readWithSeparatorTo(this, "---".encodeToByteArray()) + discardLine() val read = readWithSeparatorTo(this, "---".encodeToByteString()) + discardLine()
assertEquals(12, read) assertEquals(12, read)
} }
assertEquals(""" assertEquals("""
aaa aaa
bbb bbb
""".trimIndent(),array.decodeToString().trim()) """.trimIndent(),array.decodeToString().trim())
assertEquals("ccc", readUTF8Line()?.trim()) assertEquals("ccc", readLine()?.trim())
} }
assertFails { assertFails {
binary.read { binary.read {
discardWithSeparator("---".encodeToByteArray(), atMost = 3 ) discardWithSeparator("---".encodeToByteString(), atMost = 3 )
} }
} }
assertFails { assertFails {
binary.read{ binary.read{
discardWithSeparator("-+-".encodeToByteArray(), errorOnEof = true) discardWithSeparator("-+-".encodeToByteString(), errorOnEof = true)
} }
} }

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import kotlinx.io.writeString
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.int import space.kscience.dataforge.meta.int
@ -18,9 +19,9 @@ class MultipartTest {
"value" put it "value" put it
} }
data { data {
writeUtf8String("Hello World $it") writeString("Hello World $it")
repeat(300) { repeat(300) {
writeRawString("$it ") writeString("$it ")
} }
} }
} }

View File

@ -1,12 +1,11 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.ByteReadPacket import kotlinx.io.buffered
import io.ktor.utils.io.core.use
fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = ByteArray { fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = ByteArray {
writeObject(this, obj) writeObject(this, obj)
} }
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteReadPacket(array).use { fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteArraySource(array).buffered().use {
readObject(it) readObject(it)
} }

View File

@ -1,8 +1,11 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.*
import io.ktor.utils.io.streams.asOutput
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.io.Sink
import kotlinx.io.Source
import kotlinx.io.asSink
import kotlinx.io.buffered
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.isEmpty import space.kscience.dataforge.meta.isEmpty
@ -23,18 +26,18 @@ internal class PathBinary(
override val size: Int = Files.size(path).toInt() - fileOffset, override val size: Int = Files.size(path).toInt() - fileOffset,
) : Binary { ) : Binary {
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R = runBlocking { override fun <R> read(offset: Int, atMost: Int, block: Source.() -> R): R = runBlocking {
readSuspend(offset, atMost, block) readSuspend(offset, atMost, block)
} }
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R { override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
val actualOffset = offset + fileOffset val actualOffset = offset + fileOffset
val actualSize = min(atMost, size - offset) val actualSize = min(atMost, size - offset)
val array = path.inputStream().use { val array = path.inputStream().use {
it.skip(actualOffset.toLong()) it.skip(actualOffset.toLong())
it.readNBytes(actualSize) it.readNBytes(actualSize)
} }
return ByteReadPacket(array).block() return ByteArraySource(array).buffered().use { it.block() }
} }
override fun view(offset: Int, binarySize: Int) = PathBinary(path, fileOffset + offset, binarySize) override fun view(offset: Int, binarySize: Int) = PathBinary(path, fileOffset + offset, binarySize)
@ -42,36 +45,36 @@ internal class PathBinary(
public fun Path.asBinary(): Binary = PathBinary(this) public fun Path.asBinary(): Binary = PathBinary(this)
public fun <R> Path.read(block: Input.() -> R): R = asBinary().read(block = block) public fun <R> Path.read(block: Source.() -> R): R = asBinary().read(block = block)
/** /**
* Write a live output to a newly created file. If file does not exist, throws error * Write a live output to a newly created file. If file does not exist, throws error
*/ */
public fun Path.write(block: Output.() -> Unit): Unit { public fun Path.write(block: Sink.() -> Unit): Unit {
val stream = Files.newOutputStream(this, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW) val stream = Files.newOutputStream(this, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
stream.asOutput().use(block) stream.asSink().buffered().use(block)
} }
/** /**
* Create a new file or append to exiting one with given output [block] * Create a new file or append to exiting one with given output [block]
*/ */
public fun Path.append(block: Output.() -> Unit): Unit { public fun Path.append(block: Sink.() -> Unit): Unit {
val stream = Files.newOutputStream( val stream = Files.newOutputStream(
this, this,
StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE
) )
stream.asOutput().use(block) stream.asSink().buffered().use(block)
} }
/** /**
* Create a new file or replace existing one using given output [block] * Create a new file or replace existing one using given output [block]
*/ */
public fun Path.rewrite(block: Output.() -> Unit): Unit { public fun Path.rewrite(block: Sink.() -> Unit): Unit {
val stream = Files.newOutputStream( val stream = Files.newOutputStream(
this, this,
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE
) )
stream.asOutput().use(block) stream.asSink().buffered().use(block)
} }
@DFExperimental @DFExperimental
@ -260,7 +263,7 @@ public fun IOPlugin.writeEnvelopeDirectory(
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME) val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
dataFile.write { dataFile.write {
envelope.data?.read { envelope.data?.read {
val copied = copyTo(this@write) val copied = transferTo(this@write)
if (copied != envelope.data?.size?.toLong()) { if (copied != envelope.data?.size?.toLong()) {
error("The number of copied bytes does not equal data size") error("The number of copied bytes does not equal data size")
} }

View File

@ -1,9 +1,11 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Input import kotlinx.io.Source
import io.ktor.utils.io.streams.asInput import kotlinx.io.asSource
import kotlinx.io.buffered
public fun IOPlugin.resource(name: String): Binary? = context.javaClass.getResource(name)?.readBytes()?.asBinary()
public inline fun <R> IOPlugin.readResource(name: String, block: Input.() -> R): R = public fun IOPlugin.resource(name: String): Binary? = { }.javaClass.getResource(name)?.readBytes()?.asBinary()
context.javaClass.getResource(name)?.openStream()?.asInput()?.block() ?: error("Can't read resource $name")
public inline fun <R> IOPlugin.readResource(name: String, block: Source.() -> R): R =
{ }.javaClass.getResource(name)?.openStream()?.asSource()?.buffered()?.block() ?: error("Can't read resource $name")

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.writeDouble
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.writeDouble
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files

View File

@ -10,17 +10,35 @@ kscience{
useSerialization{ useSerialization{
protobuf() protobuf()
} }
dependencies { // dependencies {
api(projects.dataforgeContext) // api(projects.dataforgeContext)
api(projects.dataforgeData) // api(projects.dataforgeData)
api(projects.dataforgeIo) // api(projects.dataforgeIo)
} // }
dependencies(jvmTest){ // dependencies(jvmTest){
implementation(spclibs.logback.classic) // implementation(spclibs.logback.classic)
implementation(projects.dataforgeIo.dataforgeIoYaml) // implementation(projects.dataforgeIo.dataforgeIoYaml)
} // }
} }
readme{ readme{
maturity = space.kscience.gradle.Maturity.EXPERIMENTAL maturity = space.kscience.gradle.Maturity.EXPERIMENTAL
} }
kotlin{
sourceSets{
commonMain{
dependencies {
api(projects.dataforgeContext)
api(projects.dataforgeData)
api(projects.dataforgeIo)
}
}
getByName("jvmTest"){
dependencies {
implementation(spclibs.logback.classic)
implementation(projects.dataforgeIo.dataforgeIoYaml)
}
}
}
}

View File

@ -1,9 +1,6 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import io.ktor.utils.io.core.Input import kotlinx.io.*
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.writeFully
import kotlinx.serialization.ExperimentalSerializationApi import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json import kotlinx.serialization.json.Json
@ -30,10 +27,10 @@ public class JsonIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T> private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
override fun readObject(input: Input): T = Json.decodeFromString(serializer, input.readUtf8String()) override fun readObject(input: Source): T = Json.decodeFromString(serializer, input.readString())
override fun writeObject(output: Output, obj: T) { override fun writeObject(output: Sink, obj: T) {
output.writeUtf8String(Json.encodeToString(serializer, obj)) output.writeString(Json.encodeToString(serializer, obj))
} }
} }
@ -43,10 +40,10 @@ public class ProtobufIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T> private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
override fun readObject(input: Input): T = ProtoBuf.decodeFromByteArray(serializer, input.readBytes()) override fun readObject(input: Source): T = ProtoBuf.decodeFromByteArray(serializer, input.readByteArray())
override fun writeObject(output: Output, obj: T) { override fun writeObject(output: Sink, obj: T) {
output.writeFully(ProtoBuf.encodeToByteArray(serializer, obj)) output.write(ProtoBuf.encodeToByteArray(serializer, obj))
} }
} }

View File

@ -1,13 +1,10 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import io.ktor.utils.io.streams.asOutput
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataTreeItem import space.kscience.dataforge.data.DataTreeItem
import space.kscience.dataforge.io.EnvelopeFormat import space.kscience.dataforge.io.*
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.TaggedEnvelopeFormat
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
@ -28,11 +25,15 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
val envelope = treeItem.data.toEnvelope(dataFormat) val envelope = treeItem.data.toEnvelope(dataFormat)
val entry = ZipEntry(name) val entry = ZipEntry(name)
putNextEntry(entry) putNextEntry(entry)
asOutput().run {
envelopeFormat.writeObject(this, envelope) //TODO remove additional copy
flush() val bytes = ByteArray {
writeObject(envelopeFormat, envelope)
} }
write(bytes)
} }
is DataTreeItem.Node -> { is DataTreeItem.Node -> {
val entry = ZipEntry("$name/") val entry = ZipEntry("$name/")
putNextEntry(entry) putNextEntry(entry)

View File

@ -1,8 +1,11 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.io.Sink
import kotlinx.io.Source
import kotlinx.io.readString
import kotlinx.io.writeString
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
@ -36,11 +39,11 @@ class FileDataTest {
object StringIOFormat : IOFormat<String> { object StringIOFormat : IOFormat<String> {
override val type: KType get() = typeOf<String>() override val type: KType get() = typeOf<String>()
override fun writeObject(output: Output, obj: String) { override fun writeObject(output: Sink, obj: String) {
output.writeUtf8String(obj) output.writeString(obj)
} }
override fun readObject(input: Input): String = input.readUtf8String() override fun readObject(input: Source): String = input.readString()
} }
@Test @Test
@ -59,9 +62,9 @@ class FileDataTest {
@Test @Test
@DFExperimental @DFExperimental
fun testZipWriteRead() = with(Global.io) { fun testZipWriteRead() = runTest {
with(Global.io) {
val zip = Files.createTempFile("df_data_node", ".zip") val zip = Files.createTempFile("df_data_node", ".zip")
runBlocking {
dataNode.writeZip(zip, StringIOFormat) dataNode.writeZip(zip, StringIOFormat)
println(zip.toUri().toString()) println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat } val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat }

View File

@ -6,4 +6,5 @@ kotlin.mpp.stability.nowarn=true
kotlin.incremental.js.ir=true kotlin.incremental.js.ir=true
kotlin.native.ignoreDisabledTargets=true kotlin.native.ignoreDisabledTargets=true
toolsVersion=0.14.9-kotlin-1.8.20 toolsVersion=0.14.9-kotlin-1.9.0
kotlin.experimental.tryK2=true

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists