From 2aba1b48dce011906231ba5ab67353f9901cadfa Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 10 Jul 2023 10:06:39 +0300 Subject: [PATCH] [WIP] Refactoring to kotlinx-io --- CHANGELOG.md | 1 + dataforge-io/build.gradle.kts | 14 ++-- .../space/kscience/dataforge/io/Binary.kt | 58 +++++++++----- .../kscience/dataforge/io/EnvelopeBuilder.kt | 4 +- .../kscience/dataforge/io/EnvelopeFormat.kt | 4 +- .../kscience/dataforge/io/EnvelopeParts.kt | 15 ++-- .../space/kscience/dataforge/io/IOFormat.kt | 26 ++++--- .../space/kscience/dataforge/io/ioMisc.kt | 75 +++++++++---------- 8 files changed, 113 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac4a5574..ac3db029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added ### Changed +- Kotlin 1.9 ### Deprecated diff --git a/dataforge-io/build.gradle.kts b/dataforge-io/build.gradle.kts index 91319d4d..94aefa51 100644 --- a/dataforge-io/build.gradle.kts +++ b/dataforge-io/build.gradle.kts @@ -1,22 +1,24 @@ -import space.kscience.gradle.KScienceVersions - plugins { id("space.kscience.gradle.mpp") } description = "IO module" +val ioVersion = "0.2.0" + kscience { jvm() js() native() - useSerialization("1.4.1") - useSerialization("1.4.1", sourceSet = space.kscience.gradle.DependencySourceSet.TEST) { + useSerialization() + useSerialization(sourceSet = space.kscience.gradle.DependencySourceSet.TEST) { cbor() } dependencies { - api(project(":dataforge-context")) - api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}") + api(projects.dataforgeContext) + api("org.jetbrains.kotlinx:kotlinx-io-core:$ioVersion") + api("org.jetbrains.kotlinx:kotlinx-io-bytestring:$ioVersion") + //api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}") } } diff --git a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/Binary.kt b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/Binary.kt index bcc20064..c4e3bfbe 100644 --- a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/Binary.kt +++ b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/Binary.kt @@ -1,6 +1,6 @@ package space.kscience.dataforge.io -import io.ktor.utils.io.core.* +import kotlinx.io.* import kotlin.math.min /** @@ -17,13 +17,13 @@ public interface Binary { * Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed * when leaving scope, so it could not be leaked outside of scope of [block]. */ - public fun read(offset: Int = 0, atMost: Int = size - offset, block: Input.() -> R): R + public fun read(offset: Int = 0, atMost: Int = size - offset, block: Source.() -> R): R - public suspend fun readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Input.() -> R): R + public suspend fun readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Source.() -> R): R /** * Read a binary with given [offset] relative to this binary and given [binarySize]. - * In general, resulting binary is of the same type as this one, but it is not guaranteed. + * In general, the resulting binary is of the same type as this one, but it is not guaranteed. */ public fun view(offset: Int, binarySize: Int = size - offset): Binary @@ -32,31 +32,55 @@ public interface Binary { } } +public class ByteArraySource( + public val byteArray: ByteArray, + public val offset: Int, + public val size: Int, +) : RawSource { + override fun close() { + // Do nothing + } + + override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { + val byteRead = min(byteCount, size.toLong()) + sink.write(byteArray, offset, offset + byteRead.toInt()) + return byteRead + } +} + internal class ByteArrayBinary( internal val array: ByteArray, internal val start: Int = 0, override val size: Int = array.size - start, ) : Binary { - override fun read(offset: Int, atMost: Int, block: Input.() -> R): R { + override fun read(offset: Int, atMost: Int, block: Source.() -> R): R { require(offset >= 0) { "Offset must be positive" } require(offset < array.size) { "Offset $offset is larger than array size" } - val input = ByteReadPacket( + + val input = ByteArraySource( array, offset + start, min(atMost, size - offset) - ) - return input.use(block) + ).buffered() + + return try { + block(input) + } finally { + input.close() + } } - override suspend fun readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R { + override suspend fun readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R { require(offset >= 0) { "Offset must be positive" } require(offset < array.size) { "Offset $offset is larger than array size" } - val input = ByteReadPacket( + + val input = ByteArraySource( array, offset + start, min(atMost, size - offset) - ) + ).buffered() + return try { block(input) } finally { @@ -77,26 +101,26 @@ public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) { array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here? } else { read { - readBytes() + readByteArray() } } //TODO optimize for file-based Inputs -public fun Input.readBinary(size: Int? = null): Binary { - val array = if (size == null) readBytes() else readBytes(size) +public fun Source.readBinary(size: Int? = null): Binary { + val array = if (size == null) readByteArray() else readByteArray(size) return ByteArrayBinary(array) } /** * Direct write of binary to the output. Returns the number of bytes written */ -public fun Output.writeBinary(binary: Binary): Int { +public fun Sink.writeBinary(binary: Binary): Int { return if (binary is ByteArrayBinary) { - writeFully(binary.array, binary.start, binary.start + binary.size) + write(binary.array, binary.start, binary.start + binary.size) binary.size } else { binary.read { - copyTo(this@writeBinary).toInt() + transferTo(this@writeBinary).toInt() } } } \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeBuilder.kt b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeBuilder.kt index eedd2075..e38df146 100644 --- a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeBuilder.kt +++ b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeBuilder.kt @@ -1,6 +1,6 @@ package space.kscience.dataforge.io -import io.ktor.utils.io.core.Output +import kotlinx.io.Sink import space.kscience.dataforge.meta.* public class EnvelopeBuilder : Envelope { @@ -33,7 +33,7 @@ public class EnvelopeBuilder : Envelope { /** * Construct a data binary from given builder */ - public inline fun data(block: Output.() -> Unit) { + public inline fun data(block: Sink.() -> Unit) { data = ByteArray { block() }.asBinary() } diff --git a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeFormat.kt b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeFormat.kt index 28b59abb..d748d9f8 100644 --- a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeFormat.kt +++ b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeFormat.kt @@ -1,6 +1,6 @@ package space.kscience.dataforge.io -import io.ktor.utils.io.core.Input +import kotlinx.io.Source import space.kscience.dataforge.context.Context import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE import space.kscience.dataforge.meta.Meta @@ -15,7 +15,7 @@ public interface EnvelopeFormat : IOFormat { override val type: KType get() = typeOf() } -public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input) +public fun EnvelopeFormat.read(input: Source): Envelope = readObject(input) @Type(ENVELOPE_FORMAT_TYPE) public interface EnvelopeFormatFactory : IOFormatFactory, EnvelopeFormat { diff --git a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeParts.kt b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeParts.kt index 248ecf19..0d3017d6 100644 --- a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeParts.kt +++ b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/EnvelopeParts.kt @@ -1,5 +1,8 @@ package space.kscience.dataforge.io +import kotlinx.io.bytestring.ByteString +import kotlinx.io.bytestring.decodeToString +import kotlinx.io.write import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE @@ -20,7 +23,7 @@ private class PartDescriptor : Scheme() { val PARTS_KEY = MULTIPART_KEY + "parts" val SEPARATOR_KEY = MULTIPART_KEY + "separator" - const val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n" + val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n".toACIIByteString() const val MULTIPART_DATA_TYPE = "envelope.multipart" } @@ -32,12 +35,12 @@ public typealias EnvelopeParts = List public fun EnvelopeBuilder.multipart( parts: EnvelopeParts, - separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, + separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR, ) { dataType = MULTIPART_DATA_TYPE var offsetCounter = 0 - val separatorSize = separator.length + val separatorSize = separator.size val partDescriptors = parts.map { (binary, description) -> offsetCounter += separatorSize PartDescriptor { @@ -51,14 +54,14 @@ public fun EnvelopeBuilder.multipart( meta { if (separator != DEFAULT_MULTIPART_DATA_SEPARATOR) { - SEPARATOR_KEY put separator + SEPARATOR_KEY put separator.decodeToString() } setIndexed(PARTS_KEY, partDescriptors.map { it.toMeta() }) } data { parts.forEach { - writeRawString(separator) + write(separator) writeBinary(it.binary) } } @@ -69,7 +72,7 @@ public fun EnvelopeBuilder.multipart( */ public fun EnvelopeBuilder.envelopes( envelopes: List, - separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, + separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR, ) { val parts = envelopes.map { val binary = Binary(it, TaggedEnvelopeFormat) diff --git a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/IOFormat.kt b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/IOFormat.kt index 12ea587b..623260e9 100644 --- a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/IOFormat.kt +++ b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/IOFormat.kt @@ -1,6 +1,8 @@ package space.kscience.dataforge.io -import io.ktor.utils.io.core.* +import kotlinx.io.Sink +import kotlinx.io.Source +import kotlinx.io.readByteArray import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Factory import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE @@ -21,7 +23,7 @@ public interface IOReader { */ public val type: KType - public fun readObject(input: Input): T + public fun readObject(input: Source): T public fun readObject(binary: Binary): T = binary.read { readObject(this) } @@ -32,21 +34,21 @@ public interface IOReader { public val binary: IOReader = object : IOReader { override val type: KType = typeOf() - override fun readObject(input: Input): Binary = input.readBytes().asBinary() + override fun readObject(input: Source): Binary = input.readByteArray().asBinary() override fun readObject(binary: Binary): Binary = binary } } } -public inline fun IOReader(crossinline read: Input.() -> T): IOReader = object : IOReader { +public inline fun IOReader(crossinline read: Source.() -> T): IOReader = object : IOReader { override val type: KType = typeOf() - override fun readObject(input: Input): T = input.read() + override fun readObject(input: Source): T = input.read() } public fun interface IOWriter { - public fun writeObject(output: Output, obj: T) + public fun writeObject(output: Sink, obj: T) } /** @@ -54,20 +56,20 @@ public fun interface IOWriter { */ public interface IOFormat : IOReader, IOWriter -public fun Input.readObject(format: IOReader): T = format.readObject(this@readObject) +public fun Source.readObject(format: IOReader): T = format.readObject(this@readObject) public fun IOFormat.readObjectFrom(binary: Binary): T = binary.read { readObject(this) } /** - * Read given binary as object using given format + * Read given binary as an object using given format */ public fun Binary.readWith(format: IOReader): T = read { readObject(format) } -public fun Output.writeObject(format: IOWriter, obj: T): Unit = +public fun Sink.writeObject(format: IOWriter, obj: T): Unit = format.writeObject(this@writeObject, obj) @@ -94,9 +96,9 @@ public object DoubleIOFormat : IOFormat, IOFormatFactory { override val type: KType get() = typeOf() - override fun writeObject(output: Output, obj: Double) { - output.writeDouble(obj) + override fun writeObject(output: Sink, obj: Double) { + output.writeLong(obj.toBits()) } - override fun readObject(input: Input): Double = input.readDouble() + override fun readObject(input: Source): Double = Double.fromBits(input.readLong()) } \ No newline at end of file diff --git a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/ioMisc.kt b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/ioMisc.kt index ad03ba6e..c0132895 100644 --- a/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/ioMisc.kt +++ b/dataforge-io/src/commonMain/kotlin/space/kscience/dataforge/io/ioMisc.kt @@ -1,40 +1,38 @@ package space.kscience.dataforge.io -import io.ktor.utils.io.bits.Memory -import io.ktor.utils.io.charsets.Charsets -import io.ktor.utils.io.charsets.decodeExactBytes -import io.ktor.utils.io.core.* -import io.ktor.utils.io.core.internal.ChunkBuffer +import kotlinx.io.* +import kotlinx.io.bytestring.ByteString import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental -public fun Output.writeRawString(str: String) { - writeFully(str.toByteArray(Charsets.ISO_8859_1)) +/** + * Convert a string literal, containing only ASCII characters to a [ByteString]. + * Throws an error if there are non-ASCII characters. + */ +public fun String.toACIIByteString(): ByteString { + val bytes = ByteArray(length) { + val char = get(it) + val code = char.code + if (code > Byte.MAX_VALUE) error("Symbol $char is not ASCII symbol") else code.toByte() + } + return ByteString(bytes) } -public fun Output.writeUtf8String(str: String) { - writeFully(str.encodeToByteArray()) -} +public inline fun Buffer(block: Sink.() -> Unit): Buffer = Buffer().apply(block) -public fun Input.readRawString(size: Int): String { - return Charsets.ISO_8859_1.newDecoder().decodeExactBytes(this, size) -} +//public fun Source.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found") -public fun Input.readUtf8String(): String = readBytes().decodeToString() +public inline fun ByteArray(block: Sink.() -> Unit): ByteArray = + Buffer(block).readByteArray() -public fun Input.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found") - -public inline fun ByteArray(block: Output.() -> Unit): ByteArray = - buildPacket(block).readBytes() - -public inline fun Binary(block: Output.() -> Unit): Binary = +public inline fun Binary(block: Sink.() -> Unit): Binary = ByteArray(block).asBinary() public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first) /** * Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If - * multiple formats accepts binary, throw an error. + * multiple formats accept binary, throw an error. */ public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? { val formats = envelopeFormatFactories.mapNotNull { factory -> @@ -111,32 +109,31 @@ private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) } * * @return bytes actually being read, including separator */ -public fun Input.readWithSeparatorTo( - output: Output, +public fun Source.readWithSeparatorTo( + output: Sink, separator: ByteArray, atMost: Int = Int.MAX_VALUE, errorOnEof: Boolean = false, ): Int { var counter = 0 val rb = RingByteArray(ByteArray(separator.size)) - takeWhile { buffer -> - while (buffer.canRead()) { - val byte = buffer.readByte() - counter++ - if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.") - rb.push(byte) - if (rb.contentEquals(separator)) { - return counter - } else if (rb.isFull()) { - output.writeByte(rb[0]) - } + + while (!exhausted()) { + val byte = readByte() + counter++ + if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.") + rb.push(byte) + if (rb.contentEquals(separator)) { + return counter + } else if (rb.isFull()) { + output.writeByte(rb[0]) } - !endOfInput } + if (errorOnEof) { error("Read to the end of input without encountering ${separator.decodeToString()}") } else { - for(i in 1 until rb.size){ + for (i in 1 until rb.size) { output.writeByte(rb[i]) } counter += (rb.size - 1) @@ -144,18 +141,18 @@ public fun Input.readWithSeparatorTo( } } -public fun Input.discardLine(): Int { +public fun Source.discardLine(): Int { return discardUntilDelimiter('\n'.code.toByte()).also { discard(1) }.toInt() + 1 } -public fun Input.discardWithSeparator( +public fun Source.discardWithSeparator( separator: ByteArray, atMost: Int = Int.MAX_VALUE, errorOnEof: Boolean = false, ): Int { - val dummy: Output = object : Output(ChunkBuffer.Pool) { + val dummy: Sink = object : Sink(ChunkBuffer.Pool) { override fun closeDestination() { // Do nothing }