[WIP] Refactoring to kotlinx-io

This commit is contained in:
Alexander Nozik 2023-07-10 10:06:39 +03:00
parent cfa20eedba
commit 2aba1b48dc
8 changed files with 113 additions and 84 deletions

View File

@ -5,6 +5,7 @@
### Added ### Added
### Changed ### Changed
- Kotlin 1.9
### Deprecated ### Deprecated

View File

@ -1,22 +1,24 @@
import space.kscience.gradle.KScienceVersions
plugins { plugins {
id("space.kscience.gradle.mpp") id("space.kscience.gradle.mpp")
} }
description = "IO module" description = "IO module"
val ioVersion = "0.2.0"
kscience { kscience {
jvm() jvm()
js() js()
native() native()
useSerialization("1.4.1") useSerialization()
useSerialization("1.4.1", sourceSet = space.kscience.gradle.DependencySourceSet.TEST) { useSerialization(sourceSet = space.kscience.gradle.DependencySourceSet.TEST) {
cbor() cbor()
} }
dependencies { dependencies {
api(project(":dataforge-context")) api(projects.dataforgeContext)
api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}") api("org.jetbrains.kotlinx:kotlinx-io-core:$ioVersion")
api("org.jetbrains.kotlinx:kotlinx-io-bytestring:$ioVersion")
//api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}")
} }
} }

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.* import kotlinx.io.*
import kotlin.math.min import kotlin.math.min
/** /**
@ -17,13 +17,13 @@ public interface Binary {
* Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed * Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed
* when leaving scope, so it could not be leaked outside of scope of [block]. * when leaving scope, so it could not be leaked outside of scope of [block].
*/ */
public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Input.() -> R): R public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Source.() -> R): R
public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Input.() -> R): R public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Source.() -> R): R
/** /**
* Read a binary with given [offset] relative to this binary and given [binarySize]. * Read a binary with given [offset] relative to this binary and given [binarySize].
* In general, resulting binary is of the same type as this one, but it is not guaranteed. * In general, the resulting binary is of the same type as this one, but it is not guaranteed.
*/ */
public fun view(offset: Int, binarySize: Int = size - offset): Binary public fun view(offset: Int, binarySize: Int = size - offset): Binary
@ -32,31 +32,55 @@ public interface Binary {
} }
} }
public class ByteArraySource(
public val byteArray: ByteArray,
public val offset: Int,
public val size: Int,
) : RawSource {
override fun close() {
// Do nothing
}
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
val byteRead = min(byteCount, size.toLong())
sink.write(byteArray, offset, offset + byteRead.toInt())
return byteRead
}
}
internal class ByteArrayBinary( internal class ByteArrayBinary(
internal val array: ByteArray, internal val array: ByteArray,
internal val start: Int = 0, internal val start: Int = 0,
override val size: Int = array.size - start, override val size: Int = array.size - start,
) : Binary { ) : Binary {
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R { override fun <R> read(offset: Int, atMost: Int, block: Source.() -> R): R {
require(offset >= 0) { "Offset must be positive" } require(offset >= 0) { "Offset must be positive" }
require(offset < array.size) { "Offset $offset is larger than array size" } require(offset < array.size) { "Offset $offset is larger than array size" }
val input = ByteReadPacket(
val input = ByteArraySource(
array, array,
offset + start, offset + start,
min(atMost, size - offset) min(atMost, size - offset)
) ).buffered()
return input.use(block)
return try {
block(input)
} finally {
input.close()
}
} }
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R { override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
require(offset >= 0) { "Offset must be positive" } require(offset >= 0) { "Offset must be positive" }
require(offset < array.size) { "Offset $offset is larger than array size" } require(offset < array.size) { "Offset $offset is larger than array size" }
val input = ByteReadPacket(
val input = ByteArraySource(
array, array,
offset + start, offset + start,
min(atMost, size - offset) min(atMost, size - offset)
) ).buffered()
return try { return try {
block(input) block(input)
} finally { } finally {
@ -77,26 +101,26 @@ public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here? array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here?
} else { } else {
read { read {
readBytes() readByteArray()
} }
} }
//TODO optimize for file-based Inputs //TODO optimize for file-based Inputs
public fun Input.readBinary(size: Int? = null): Binary { public fun Source.readBinary(size: Int? = null): Binary {
val array = if (size == null) readBytes() else readBytes(size) val array = if (size == null) readByteArray() else readByteArray(size)
return ByteArrayBinary(array) return ByteArrayBinary(array)
} }
/** /**
* Direct write of binary to the output. Returns the number of bytes written * Direct write of binary to the output. Returns the number of bytes written
*/ */
public fun Output.writeBinary(binary: Binary): Int { public fun Sink.writeBinary(binary: Binary): Int {
return if (binary is ByteArrayBinary) { return if (binary is ByteArrayBinary) {
writeFully(binary.array, binary.start, binary.start + binary.size) write(binary.array, binary.start, binary.start + binary.size)
binary.size binary.size
} else { } else {
binary.read { binary.read {
copyTo(this@writeBinary).toInt() transferTo(this@writeBinary).toInt()
} }
} }
} }

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Output import kotlinx.io.Sink
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
public class EnvelopeBuilder : Envelope { public class EnvelopeBuilder : Envelope {
@ -33,7 +33,7 @@ public class EnvelopeBuilder : Envelope {
/** /**
* Construct a data binary from given builder * Construct a data binary from given builder
*/ */
public inline fun data(block: Output.() -> Unit) { public inline fun data(block: Sink.() -> Unit) {
data = ByteArray { block() }.asBinary() data = ByteArray { block() }.asBinary()
} }

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.Input import kotlinx.io.Source
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
@ -15,7 +15,7 @@ public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>() override val type: KType get() = typeOf<Envelope>()
} }
public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input) public fun EnvelopeFormat.read(input: Source): Envelope = readObject(input)
@Type(ENVELOPE_FORMAT_TYPE) @Type(ENVELOPE_FORMAT_TYPE)
public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat { public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat {

View File

@ -1,5 +1,8 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import kotlinx.io.bytestring.ByteString
import kotlinx.io.bytestring.decodeToString
import kotlinx.io.write
import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY
import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR
import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE
@ -20,7 +23,7 @@ private class PartDescriptor : Scheme() {
val PARTS_KEY = MULTIPART_KEY + "parts" val PARTS_KEY = MULTIPART_KEY + "parts"
val SEPARATOR_KEY = MULTIPART_KEY + "separator" val SEPARATOR_KEY = MULTIPART_KEY + "separator"
const val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n" val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n".toACIIByteString()
const val MULTIPART_DATA_TYPE = "envelope.multipart" const val MULTIPART_DATA_TYPE = "envelope.multipart"
} }
@ -32,12 +35,12 @@ public typealias EnvelopeParts = List<EnvelopePart>
public fun EnvelopeBuilder.multipart( public fun EnvelopeBuilder.multipart(
parts: EnvelopeParts, parts: EnvelopeParts,
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR,
) { ) {
dataType = MULTIPART_DATA_TYPE dataType = MULTIPART_DATA_TYPE
var offsetCounter = 0 var offsetCounter = 0
val separatorSize = separator.length val separatorSize = separator.size
val partDescriptors = parts.map { (binary, description) -> val partDescriptors = parts.map { (binary, description) ->
offsetCounter += separatorSize offsetCounter += separatorSize
PartDescriptor { PartDescriptor {
@ -51,14 +54,14 @@ public fun EnvelopeBuilder.multipart(
meta { meta {
if (separator != DEFAULT_MULTIPART_DATA_SEPARATOR) { if (separator != DEFAULT_MULTIPART_DATA_SEPARATOR) {
SEPARATOR_KEY put separator SEPARATOR_KEY put separator.decodeToString()
} }
setIndexed(PARTS_KEY, partDescriptors.map { it.toMeta() }) setIndexed(PARTS_KEY, partDescriptors.map { it.toMeta() })
} }
data { data {
parts.forEach { parts.forEach {
writeRawString(separator) write(separator)
writeBinary(it.binary) writeBinary(it.binary)
} }
} }
@ -69,7 +72,7 @@ public fun EnvelopeBuilder.multipart(
*/ */
public fun EnvelopeBuilder.envelopes( public fun EnvelopeBuilder.envelopes(
envelopes: List<Envelope>, envelopes: List<Envelope>,
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR, separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR,
) { ) {
val parts = envelopes.map { val parts = envelopes.map {
val binary = Binary(it, TaggedEnvelopeFormat) val binary = Binary(it, TaggedEnvelopeFormat)

View File

@ -1,6 +1,8 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.core.* import kotlinx.io.Sink
import kotlinx.io.Source
import kotlinx.io.readByteArray
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Factory import space.kscience.dataforge.context.Factory
import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
@ -21,7 +23,7 @@ public interface IOReader<out T> {
*/ */
public val type: KType public val type: KType
public fun readObject(input: Input): T public fun readObject(input: Source): T
public fun readObject(binary: Binary): T = binary.read { readObject(this) } public fun readObject(binary: Binary): T = binary.read { readObject(this) }
@ -32,21 +34,21 @@ public interface IOReader<out T> {
public val binary: IOReader<Binary> = object : IOReader<Binary> { public val binary: IOReader<Binary> = object : IOReader<Binary> {
override val type: KType = typeOf<Binary>() override val type: KType = typeOf<Binary>()
override fun readObject(input: Input): Binary = input.readBytes().asBinary() override fun readObject(input: Source): Binary = input.readByteArray().asBinary()
override fun readObject(binary: Binary): Binary = binary override fun readObject(binary: Binary): Binary = binary
} }
} }
} }
public inline fun <reified T> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> { public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
override val type: KType = typeOf<T>() override val type: KType = typeOf<T>()
override fun readObject(input: Input): T = input.read() override fun readObject(input: Source): T = input.read()
} }
public fun interface IOWriter<in T> { public fun interface IOWriter<in T> {
public fun writeObject(output: Output, obj: T) public fun writeObject(output: Sink, obj: T)
} }
/** /**
@ -54,20 +56,20 @@ public fun interface IOWriter<in T> {
*/ */
public interface IOFormat<T> : IOReader<T>, IOWriter<T> public interface IOFormat<T> : IOReader<T>, IOWriter<T>
public fun <T : Any> Input.readObject(format: IOReader<T>): T = format.readObject(this@readObject) public fun <T : Any> Source.readObject(format: IOReader<T>): T = format.readObject(this@readObject)
public fun <T : Any> IOFormat<T>.readObjectFrom(binary: Binary): T = binary.read { public fun <T : Any> IOFormat<T>.readObjectFrom(binary: Binary): T = binary.read {
readObject(this) readObject(this)
} }
/** /**
* Read given binary as object using given format * Read given binary as an object using given format
*/ */
public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read { public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read {
readObject(format) readObject(format)
} }
public fun <T : Any> Output.writeObject(format: IOWriter<T>, obj: T): Unit = public fun <T : Any> Sink.writeObject(format: IOWriter<T>, obj: T): Unit =
format.writeObject(this@writeObject, obj) format.writeObject(this@writeObject, obj)
@ -94,9 +96,9 @@ public object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
override val type: KType get() = typeOf<Double>() override val type: KType get() = typeOf<Double>()
override fun writeObject(output: Output, obj: Double) { override fun writeObject(output: Sink, obj: Double) {
output.writeDouble(obj) output.writeLong(obj.toBits())
} }
override fun readObject(input: Input): Double = input.readDouble() override fun readObject(input: Source): Double = Double.fromBits(input.readLong())
} }

View File

@ -1,40 +1,38 @@
package space.kscience.dataforge.io package space.kscience.dataforge.io
import io.ktor.utils.io.bits.Memory import kotlinx.io.*
import io.ktor.utils.io.charsets.Charsets import kotlinx.io.bytestring.ByteString
import io.ktor.utils.io.charsets.decodeExactBytes
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.ChunkBuffer
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
public fun Output.writeRawString(str: String) { /**
writeFully(str.toByteArray(Charsets.ISO_8859_1)) * Convert a string literal, containing only ASCII characters to a [ByteString].
* Throws an error if there are non-ASCII characters.
*/
public fun String.toACIIByteString(): ByteString {
val bytes = ByteArray(length) {
val char = get(it)
val code = char.code
if (code > Byte.MAX_VALUE) error("Symbol $char is not ASCII symbol") else code.toByte()
}
return ByteString(bytes)
} }
public fun Output.writeUtf8String(str: String) { public inline fun Buffer(block: Sink.() -> Unit): Buffer = Buffer().apply(block)
writeFully(str.encodeToByteArray())
}
public fun Input.readRawString(size: Int): String { //public fun Source.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found")
return Charsets.ISO_8859_1.newDecoder().decodeExactBytes(this, size)
}
public fun Input.readUtf8String(): String = readBytes().decodeToString() public inline fun ByteArray(block: Sink.() -> Unit): ByteArray =
Buffer(block).readByteArray()
public fun Input.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found") public inline fun Binary(block: Sink.() -> Unit): Binary =
public inline fun ByteArray(block: Output.() -> Unit): ByteArray =
buildPacket(block).readBytes()
public inline fun Binary(block: Output.() -> Unit): Binary =
ByteArray(block).asBinary() ByteArray(block).asBinary()
public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first) public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first)
/** /**
* Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If * Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If
* multiple formats accepts binary, throw an error. * multiple formats accept binary, throw an error.
*/ */
public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? { public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? {
val formats = envelopeFormatFactories.mapNotNull { factory -> val formats = envelopeFormatFactories.mapNotNull { factory ->
@ -111,32 +109,31 @@ private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) }
* *
* @return bytes actually being read, including separator * @return bytes actually being read, including separator
*/ */
public fun Input.readWithSeparatorTo( public fun Source.readWithSeparatorTo(
output: Output, output: Sink,
separator: ByteArray, separator: ByteArray,
atMost: Int = Int.MAX_VALUE, atMost: Int = Int.MAX_VALUE,
errorOnEof: Boolean = false, errorOnEof: Boolean = false,
): Int { ): Int {
var counter = 0 var counter = 0
val rb = RingByteArray(ByteArray(separator.size)) val rb = RingByteArray(ByteArray(separator.size))
takeWhile { buffer ->
while (buffer.canRead()) { while (!exhausted()) {
val byte = buffer.readByte() val byte = readByte()
counter++ counter++
if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.") if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.")
rb.push(byte) rb.push(byte)
if (rb.contentEquals(separator)) { if (rb.contentEquals(separator)) {
return counter return counter
} else if (rb.isFull()) { } else if (rb.isFull()) {
output.writeByte(rb[0]) output.writeByte(rb[0])
}
} }
!endOfInput
} }
if (errorOnEof) { if (errorOnEof) {
error("Read to the end of input without encountering ${separator.decodeToString()}") error("Read to the end of input without encountering ${separator.decodeToString()}")
} else { } else {
for(i in 1 until rb.size){ for (i in 1 until rb.size) {
output.writeByte(rb[i]) output.writeByte(rb[i])
} }
counter += (rb.size - 1) counter += (rb.size - 1)
@ -144,18 +141,18 @@ public fun Input.readWithSeparatorTo(
} }
} }
public fun Input.discardLine(): Int { public fun Source.discardLine(): Int {
return discardUntilDelimiter('\n'.code.toByte()).also { return discardUntilDelimiter('\n'.code.toByte()).also {
discard(1) discard(1)
}.toInt() + 1 }.toInt() + 1
} }
public fun Input.discardWithSeparator( public fun Source.discardWithSeparator(
separator: ByteArray, separator: ByteArray,
atMost: Int = Int.MAX_VALUE, atMost: Int = Int.MAX_VALUE,
errorOnEof: Boolean = false, errorOnEof: Boolean = false,
): Int { ): Int {
val dummy: Output = object : Output(ChunkBuffer.Pool) { val dummy: Sink = object : Sink(ChunkBuffer.Pool) {
override fun closeDestination() { override fun closeDestination() {
// Do nothing // Do nothing
} }