Compare commits
11 Commits
3806f97c77
...
706521a6b6
Author | SHA1 | Date | |
---|---|---|---|
706521a6b6 | |||
94000689da | |||
851fdda311 | |||
cbbcd18df3 | |||
dc2bf5da83 | |||
f732b85cc5 | |||
259b882e63 | |||
526f230300 | |||
a136db16ff | |||
a699c36f8e | |||
2aba1b48dc |
@ -3,14 +3,21 @@
|
||||
## Unreleased
|
||||
|
||||
### Added
|
||||
- Added separate `Meta`, `SealedMeta` and `ObservableMutableMeta` builders.
|
||||
|
||||
### Changed
|
||||
- Kotlin 1.9.20.
|
||||
- Migrated from ktor-io to kotlinx-io.
|
||||
- `MutableMeta` builder now returns a simplified version of meta that does not hold listeners.
|
||||
- Ktor-io is replaced with kotlinx-io.
|
||||
- More concise names for read/write methods in IO.
|
||||
|
||||
### Deprecated
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
- Memory leak in SealedMeta builder
|
||||
|
||||
### Security
|
||||
|
||||
|
@ -1,5 +1,4 @@
|
||||
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
|
||||
import space.kscience.gradle.isInDevelopment
|
||||
import space.kscience.gradle.useApache2Licence
|
||||
import space.kscience.gradle.useSPCTeam
|
||||
|
||||
@ -9,7 +8,7 @@ plugins {
|
||||
|
||||
allprojects {
|
||||
group = "space.kscience"
|
||||
version = "0.6.2"
|
||||
version = "0.6.3-dev-1"
|
||||
}
|
||||
|
||||
subprojects {
|
||||
@ -31,14 +30,7 @@ ksciencePublish {
|
||||
useApache2Licence()
|
||||
useSPCTeam()
|
||||
}
|
||||
github("dataforge-core", "SciProgCentre")
|
||||
space(
|
||||
if (isInDevelopment) {
|
||||
"https://maven.pkg.jetbrains.space/spc/p/sci/dev"
|
||||
} else {
|
||||
"https://maven.pkg.jetbrains.space/spc/p/sci/maven"
|
||||
}
|
||||
)
|
||||
repository("spc","https://maven.sciprog.center/kscience")
|
||||
sonatype()
|
||||
}
|
||||
|
||||
|
@ -1,11 +1,11 @@
|
||||
import space.kscience.gradle.KScienceVersions
|
||||
|
||||
plugins {
|
||||
id("space.kscience.gradle.mpp")
|
||||
}
|
||||
|
||||
description = "IO module"
|
||||
|
||||
val ioVersion = "0.2.1"
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
js()
|
||||
@ -15,11 +15,12 @@ kscience {
|
||||
cbor()
|
||||
}
|
||||
dependencies {
|
||||
api(project(":dataforge-context"))
|
||||
api("io.ktor:ktor-io:${KScienceVersions.ktorVersion}")
|
||||
api(projects.dataforgeContext)
|
||||
api("org.jetbrains.kotlinx:kotlinx-io-core:$ioVersion")
|
||||
api("org.jetbrains.kotlinx:kotlinx-io-bytestring:$ioVersion")
|
||||
}
|
||||
}
|
||||
|
||||
readme {
|
||||
maturity = space.kscience.gradle.Maturity.PROTOTYPE
|
||||
readme{
|
||||
maturity = space.kscience.gradle.Maturity.EXPERIMENTAL
|
||||
}
|
@ -1,7 +1,11 @@
|
||||
package space.kscience.dataforge.io.yaml
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.bytestring.ByteString
|
||||
import kotlinx.io.bytestring.encodeToByteString
|
||||
import kotlinx.io.readByteString
|
||||
import kotlinx.io.writeString
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.io.*
|
||||
@ -15,49 +19,49 @@ public class FrontMatterEnvelopeFormat(
|
||||
private val metaFormatFactory: MetaFormatFactory = YamlMetaFormat,
|
||||
) : EnvelopeFormat {
|
||||
|
||||
override fun readObject(binary: Binary): Envelope = binary.read {
|
||||
override fun readFrom(binary: Binary): Envelope = binary.read {
|
||||
var offset = 0
|
||||
|
||||
offset += discardWithSeparator(
|
||||
SEPARATOR.encodeToByteArray(),
|
||||
SEPARATOR,
|
||||
atMost = 1024,
|
||||
)
|
||||
|
||||
val line = ByteArray {
|
||||
offset += readWithSeparatorTo(this, "\n".encodeToByteArray())
|
||||
offset += readWithSeparatorTo(this, "\n".encodeToByteString())
|
||||
}.decodeToString()
|
||||
|
||||
val readMetaFormat = line.trim().takeIf { it.isNotBlank() }?.let { io.resolveMetaFormat(it) } ?: YamlMetaFormat
|
||||
|
||||
val packet = ByteArray {
|
||||
offset += readWithSeparatorTo(this, SEPARATOR.encodeToByteArray())
|
||||
offset += readWithSeparatorTo(this, SEPARATOR)
|
||||
}
|
||||
|
||||
offset += discardLine()
|
||||
|
||||
val meta = readMetaFormat.readObject(packet.asBinary())
|
||||
val meta = readMetaFormat.readFrom(packet.asBinary())
|
||||
Envelope(meta, binary.view(offset))
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): Envelope = readObject(input.readBinary())
|
||||
override fun readFrom(source: Source): Envelope = readFrom(source.readBinary())
|
||||
|
||||
override fun writeObject(
|
||||
output: Output,
|
||||
override fun writeTo(
|
||||
sink: Sink,
|
||||
obj: Envelope,
|
||||
) {
|
||||
val metaFormat = metaFormatFactory.build(io.context, meta)
|
||||
val formatSuffix = if (metaFormat is YamlMetaFormat) "" else metaFormatFactory.shortName
|
||||
output.writeRawString("$SEPARATOR${formatSuffix}\r\n")
|
||||
metaFormat.run { metaFormat.writeObject(output, obj.meta) }
|
||||
output.writeRawString("$SEPARATOR\r\n")
|
||||
sink.writeString("$SEPARATOR${formatSuffix}\r\n")
|
||||
metaFormat.run { metaFormat.writeTo(sink, obj.meta) }
|
||||
sink.writeString("$SEPARATOR\r\n")
|
||||
//Printing data
|
||||
obj.data?.let { data ->
|
||||
output.writeBinary(data)
|
||||
sink.writeBinary(data)
|
||||
}
|
||||
}
|
||||
|
||||
public companion object : EnvelopeFormatFactory {
|
||||
public const val SEPARATOR: String = "---"
|
||||
public val SEPARATOR: ByteString = "---".encodeToByteString()
|
||||
|
||||
private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
|
||||
|
||||
@ -69,8 +73,8 @@ public class FrontMatterEnvelopeFormat(
|
||||
|
||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = binary.read {
|
||||
//read raw string to avoid UTF issues
|
||||
val line = readRawString(3)
|
||||
return@read if (line == "---") {
|
||||
val line = readByteString(3)
|
||||
return@read if (line == "---".encodeToByteString()) {
|
||||
default
|
||||
} else {
|
||||
null
|
||||
@ -79,15 +83,15 @@ public class FrontMatterEnvelopeFormat(
|
||||
|
||||
private val default by lazy { build(Global, Meta.EMPTY) }
|
||||
|
||||
override fun readObject(binary: Binary): Envelope = default.readObject(binary)
|
||||
override fun readFrom(binary: Binary): Envelope = default.readFrom(binary)
|
||||
|
||||
override fun writeObject(
|
||||
output: Output,
|
||||
override fun writeTo(
|
||||
sink: Sink,
|
||||
obj: Envelope,
|
||||
): Unit = default.writeObject(output, obj)
|
||||
): Unit = default.writeTo(sink, obj)
|
||||
|
||||
|
||||
override fun readObject(input: Input): Envelope = default.readObject(input)
|
||||
override fun readFrom(source: Source): Envelope = default.readFrom(source)
|
||||
|
||||
}
|
||||
}
|
@ -1,13 +1,13 @@
|
||||
package space.kscience.dataforge.io.yaml
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.readString
|
||||
import kotlinx.io.writeString
|
||||
import net.mamoe.yamlkt.*
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.io.MetaFormat
|
||||
import space.kscience.dataforge.io.MetaFormatFactory
|
||||
import space.kscience.dataforge.io.readUtf8String
|
||||
import space.kscience.dataforge.io.writeUtf8String
|
||||
import space.kscience.dataforge.meta.*
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
import space.kscience.dataforge.meta.descriptors.get
|
||||
@ -89,14 +89,14 @@ public fun YamlMap.toMeta(): Meta = YamlMeta(this)
|
||||
*/
|
||||
public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
|
||||
|
||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?) {
|
||||
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?) {
|
||||
val yaml: YamlMap = meta.toYaml()
|
||||
val string = Yaml.encodeToString(YamlMap.serializer(), yaml)
|
||||
output.writeUtf8String(string)
|
||||
sink.writeString(string)
|
||||
}
|
||||
|
||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta {
|
||||
val yaml = Yaml.decodeYamlMapFromString(input.readUtf8String())
|
||||
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta {
|
||||
val yaml = Yaml.decodeYamlMapFromString(source.readString())
|
||||
return yaml.toMeta()
|
||||
}
|
||||
|
||||
@ -109,10 +109,10 @@ public class YamlMetaFormat(private val meta: Meta) : MetaFormat {
|
||||
|
||||
private val default = YamlMetaFormat(Meta.EMPTY)
|
||||
|
||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
||||
default.writeMeta(output, meta, descriptor)
|
||||
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
||||
default.writeMeta(sink, meta, descriptor)
|
||||
|
||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta =
|
||||
default.readMeta(input, descriptor)
|
||||
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta =
|
||||
default.readMeta(source, descriptor)
|
||||
}
|
||||
}
|
@ -1,6 +1,9 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.*
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.buffered
|
||||
import kotlinx.io.readByteArray
|
||||
import kotlin.math.min
|
||||
|
||||
/**
|
||||
@ -17,13 +20,13 @@ public interface Binary {
|
||||
* Read maximum of [atMost] bytes as input from the binary, starting at [offset]. The generated input is always closed
|
||||
* when leaving scope, so it could not be leaked outside of scope of [block].
|
||||
*/
|
||||
public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Input.() -> R): R
|
||||
public fun <R> read(offset: Int = 0, atMost: Int = size - offset, block: Source.() -> R): R
|
||||
|
||||
public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Input.() -> R): R
|
||||
public suspend fun <R> readSuspend(offset: Int = 0, atMost: Int = size - offset, block: suspend Source.() -> R): R
|
||||
|
||||
/**
|
||||
* Read a binary with given [offset] relative to this binary and given [binarySize].
|
||||
* In general, resulting binary is of the same type as this one, but it is not guaranteed.
|
||||
* In general, the resulting binary is of the same type as this one, but it is not guaranteed.
|
||||
*/
|
||||
public fun view(offset: Int, binarySize: Int = size - offset): Binary
|
||||
|
||||
@ -38,25 +41,27 @@ internal class ByteArrayBinary(
|
||||
override val size: Int = array.size - start,
|
||||
) : Binary {
|
||||
|
||||
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R {
|
||||
override fun <R> read(offset: Int, atMost: Int, block: Source.() -> R): R {
|
||||
require(offset >= 0) { "Offset must be positive" }
|
||||
require(offset < array.size) { "Offset $offset is larger than array size" }
|
||||
val input = ByteReadPacket(
|
||||
|
||||
return ByteArraySource(
|
||||
array,
|
||||
offset + start,
|
||||
min(atMost, size - offset)
|
||||
)
|
||||
return input.use(block)
|
||||
).buffered().use(block)
|
||||
}
|
||||
|
||||
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R {
|
||||
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
|
||||
require(offset >= 0) { "Offset must be positive" }
|
||||
require(offset < array.size) { "Offset $offset is larger than array size" }
|
||||
val input = ByteReadPacket(
|
||||
|
||||
val input = ByteArraySource(
|
||||
array,
|
||||
offset + start,
|
||||
min(atMost, size - offset)
|
||||
)
|
||||
).buffered()
|
||||
|
||||
return try {
|
||||
block(input)
|
||||
} finally {
|
||||
@ -77,26 +82,26 @@ public fun Binary.toByteArray(): ByteArray = if (this is ByteArrayBinary) {
|
||||
array.copyOfRange(start, start + size) // TODO do we need to ensure data safety here?
|
||||
} else {
|
||||
read {
|
||||
readBytes()
|
||||
readByteArray()
|
||||
}
|
||||
}
|
||||
|
||||
//TODO optimize for file-based Inputs
|
||||
public fun Input.readBinary(size: Int? = null): Binary {
|
||||
val array = if (size == null) readBytes() else readBytes(size)
|
||||
public fun Source.readBinary(size: Int? = null): Binary {
|
||||
val array = if (size == null) readByteArray() else readByteArray(size)
|
||||
return ByteArrayBinary(array)
|
||||
}
|
||||
|
||||
/**
|
||||
* Direct write of binary to the output. Returns the number of bytes written
|
||||
*/
|
||||
public fun Output.writeBinary(binary: Binary): Int {
|
||||
public fun Sink.writeBinary(binary: Binary): Int {
|
||||
return if (binary is ByteArrayBinary) {
|
||||
writeFully(binary.array, binary.start, binary.start + binary.size)
|
||||
write(binary.array, binary.start, binary.start + binary.size)
|
||||
binary.size
|
||||
} else {
|
||||
binary.read {
|
||||
copyTo(this@writeBinary).toInt()
|
||||
transferTo(this@writeBinary).toInt()
|
||||
}
|
||||
}
|
||||
}
|
@ -1,6 +1,6 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.Output
|
||||
import kotlinx.io.Sink
|
||||
import space.kscience.dataforge.meta.*
|
||||
|
||||
public class EnvelopeBuilder : Envelope {
|
||||
@ -33,7 +33,7 @@ public class EnvelopeBuilder : Envelope {
|
||||
/**
|
||||
* Construct a data binary from given builder
|
||||
*/
|
||||
public inline fun data(block: Output.() -> Unit) {
|
||||
public inline fun data(block: Sink.() -> Unit) {
|
||||
data = ByteArray { block() }.asBinary()
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import kotlinx.io.Source
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
@ -15,7 +15,7 @@ public interface EnvelopeFormat : IOFormat<Envelope> {
|
||||
override val type: KType get() = typeOf<Envelope>()
|
||||
}
|
||||
|
||||
public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input)
|
||||
public fun EnvelopeFormat.read(input: Source): Envelope = readFrom(input)
|
||||
|
||||
@Type(ENVELOPE_FORMAT_TYPE)
|
||||
public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat {
|
||||
|
@ -1,5 +1,8 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import kotlinx.io.bytestring.ByteString
|
||||
import kotlinx.io.bytestring.decodeToString
|
||||
import kotlinx.io.write
|
||||
import space.kscience.dataforge.io.Envelope.Companion.ENVELOPE_NODE_KEY
|
||||
import space.kscience.dataforge.io.PartDescriptor.Companion.DEFAULT_MULTIPART_DATA_SEPARATOR
|
||||
import space.kscience.dataforge.io.PartDescriptor.Companion.MULTIPART_DATA_TYPE
|
||||
@ -20,7 +23,7 @@ private class PartDescriptor : Scheme() {
|
||||
val PARTS_KEY = MULTIPART_KEY + "parts"
|
||||
val SEPARATOR_KEY = MULTIPART_KEY + "separator"
|
||||
|
||||
const val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n"
|
||||
val DEFAULT_MULTIPART_DATA_SEPARATOR = "\r\n#~PART~#\r\n".toAsciiByteString()
|
||||
|
||||
const val MULTIPART_DATA_TYPE = "envelope.multipart"
|
||||
}
|
||||
@ -32,12 +35,12 @@ public typealias EnvelopeParts = List<EnvelopePart>
|
||||
|
||||
public fun EnvelopeBuilder.multipart(
|
||||
parts: EnvelopeParts,
|
||||
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR,
|
||||
separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR,
|
||||
) {
|
||||
dataType = MULTIPART_DATA_TYPE
|
||||
|
||||
var offsetCounter = 0
|
||||
val separatorSize = separator.length
|
||||
val separatorSize = separator.size
|
||||
val partDescriptors = parts.map { (binary, description) ->
|
||||
offsetCounter += separatorSize
|
||||
PartDescriptor {
|
||||
@ -51,14 +54,14 @@ public fun EnvelopeBuilder.multipart(
|
||||
|
||||
meta {
|
||||
if (separator != DEFAULT_MULTIPART_DATA_SEPARATOR) {
|
||||
SEPARATOR_KEY put separator
|
||||
SEPARATOR_KEY put separator.decodeToString()
|
||||
}
|
||||
setIndexed(PARTS_KEY, partDescriptors.map { it.toMeta() })
|
||||
}
|
||||
|
||||
data {
|
||||
parts.forEach {
|
||||
writeRawString(separator)
|
||||
write(separator)
|
||||
writeBinary(it.binary)
|
||||
}
|
||||
}
|
||||
@ -69,7 +72,7 @@ public fun EnvelopeBuilder.multipart(
|
||||
*/
|
||||
public fun EnvelopeBuilder.envelopes(
|
||||
envelopes: List<Envelope>,
|
||||
separator: String = DEFAULT_MULTIPART_DATA_SEPARATOR,
|
||||
separator: ByteString = DEFAULT_MULTIPART_DATA_SEPARATOR,
|
||||
) {
|
||||
val parts = envelopes.map {
|
||||
val binary = Binary(it, TaggedEnvelopeFormat)
|
||||
|
@ -1,6 +1,8 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.*
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.readByteArray
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Factory
|
||||
import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
|
||||
@ -21,9 +23,9 @@ public interface IOReader<out T> {
|
||||
*/
|
||||
public val type: KType
|
||||
|
||||
public fun readObject(input: Input): T
|
||||
public fun readFrom(source: Source): T
|
||||
|
||||
public fun readObject(binary: Binary): T = binary.read { readObject(this) }
|
||||
public fun readFrom(binary: Binary): T = binary.read { readFrom(this) }
|
||||
|
||||
public companion object {
|
||||
/**
|
||||
@ -32,21 +34,21 @@ public interface IOReader<out T> {
|
||||
public val binary: IOReader<Binary> = object : IOReader<Binary> {
|
||||
override val type: KType = typeOf<Binary>()
|
||||
|
||||
override fun readObject(input: Input): Binary = input.readBytes().asBinary()
|
||||
override fun readFrom(source: Source): Binary = source.readByteArray().asBinary()
|
||||
|
||||
override fun readObject(binary: Binary): Binary = binary
|
||||
override fun readFrom(binary: Binary): Binary = binary
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public inline fun <reified T> IOReader(crossinline read: Input.() -> T): IOReader<T> = object : IOReader<T> {
|
||||
public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
|
||||
override val type: KType = typeOf<T>()
|
||||
|
||||
override fun readObject(input: Input): T = input.read()
|
||||
override fun readFrom(source: Source): T = source.read()
|
||||
}
|
||||
|
||||
public fun interface IOWriter<in T> {
|
||||
public fun writeObject(output: Output, obj: T)
|
||||
public fun writeTo(sink: Sink, obj: T)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -54,21 +56,20 @@ public fun interface IOWriter<in T> {
|
||||
*/
|
||||
public interface IOFormat<T> : IOReader<T>, IOWriter<T>
|
||||
|
||||
public fun <T : Any> Input.readObject(format: IOReader<T>): T = format.readObject(this@readObject)
|
||||
public fun <T : Any> Source.readWith(format: IOReader<T>): T = format.readFrom(this)
|
||||
|
||||
public fun <T : Any> IOFormat<T>.readObjectFrom(binary: Binary): T = binary.read {
|
||||
readObject(this)
|
||||
/**
|
||||
* Read given binary as an object using given format
|
||||
*/
|
||||
public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read {
|
||||
readWith(format)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read given binary as object using given format
|
||||
* Write an object to the [Sink] with given [format]
|
||||
*/
|
||||
public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read {
|
||||
readObject(format)
|
||||
}
|
||||
|
||||
public fun <T : Any> Output.writeObject(format: IOWriter<T>, obj: T): Unit =
|
||||
format.writeObject(this@writeObject, obj)
|
||||
public fun <T : Any> Sink.writeWith(format: IOWriter<T>, obj: T): Unit =
|
||||
format.writeTo(this, obj)
|
||||
|
||||
|
||||
@Type(IO_FORMAT_TYPE)
|
||||
@ -85,7 +86,7 @@ public interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
|
||||
}
|
||||
}
|
||||
|
||||
public fun <T : Any> Binary(obj: T, format: IOWriter<T>): Binary = Binary { format.writeObject(this, obj) }
|
||||
public fun <T : Any> Binary(obj: T, format: IOWriter<T>): Binary = Binary { format.writeTo(this, obj) }
|
||||
|
||||
public object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
||||
override fun build(context: Context, meta: Meta): IOFormat<Double> = this
|
||||
@ -94,9 +95,9 @@ public object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
||||
|
||||
override val type: KType get() = typeOf<Double>()
|
||||
|
||||
override fun writeObject(output: Output, obj: Double) {
|
||||
output.writeDouble(obj)
|
||||
override fun writeTo(sink: Sink, obj: Double) {
|
||||
sink.writeLong(obj.toBits())
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): Double = input.readDouble()
|
||||
override fun readFrom(source: Source): Double = Double.fromBits(source.readLong())
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
@file:Suppress("UNUSED_PARAMETER")
|
||||
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.readString
|
||||
import kotlinx.io.writeString
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlinx.serialization.json.JsonElement
|
||||
import space.kscience.dataforge.context.Context
|
||||
@ -18,13 +18,13 @@ import space.kscience.dataforge.meta.toMeta
|
||||
*/
|
||||
public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat {
|
||||
|
||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?) {
|
||||
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?) {
|
||||
val jsonElement = meta.toJson(descriptor)
|
||||
output.writeUtf8String(json.encodeToString(JsonElement.serializer(), jsonElement))
|
||||
sink.writeString(json.encodeToString(JsonElement.serializer(), jsonElement))
|
||||
}
|
||||
|
||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta {
|
||||
val str = input.readUtf8String()//readByteArray().decodeToString()
|
||||
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta {
|
||||
val str = source.readString()
|
||||
val jsonElement = json.parseToJsonElement(str)
|
||||
return jsonElement.toMeta(descriptor)
|
||||
}
|
||||
@ -39,10 +39,10 @@ public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat
|
||||
|
||||
private val default = JsonMetaFormat()
|
||||
|
||||
override fun writeMeta(output: Output, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
||||
default.run { writeMeta(output, meta, descriptor) }
|
||||
override fun writeMeta(sink: Sink, meta: Meta, descriptor: MetaDescriptor?): Unit =
|
||||
default.run { writeMeta(sink, meta, descriptor) }
|
||||
|
||||
override fun readMeta(input: Input, descriptor: MetaDescriptor?): Meta =
|
||||
default.run { readMeta(input, descriptor) }
|
||||
override fun readMeta(source: Source, descriptor: MetaDescriptor?): Meta =
|
||||
default.run { readMeta(source, descriptor) }
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.ByteReadPacket
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import io.ktor.utils.io.core.use
|
||||
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.buffered
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
||||
@ -23,19 +23,19 @@ public interface MetaFormat : IOFormat<Meta> {
|
||||
|
||||
override val type: KType get() = typeOf<Meta>()
|
||||
|
||||
override fun writeObject(output: Output, obj: Meta) {
|
||||
writeMeta(output, obj, null)
|
||||
override fun writeTo(sink: Sink, obj: Meta) {
|
||||
writeMeta(sink, obj, null)
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): Meta = readMeta(input)
|
||||
override fun readFrom(source: Source): Meta = readMeta(source)
|
||||
|
||||
public fun writeMeta(
|
||||
output: Output,
|
||||
sink: Sink,
|
||||
meta: Meta,
|
||||
descriptor: MetaDescriptor? = null,
|
||||
)
|
||||
|
||||
public fun readMeta(input: Input, descriptor: MetaDescriptor? = null): Meta
|
||||
public fun readMeta(source: Source, descriptor: MetaDescriptor? = null): Meta
|
||||
}
|
||||
|
||||
@Type(META_FORMAT_TYPE)
|
||||
@ -57,15 +57,13 @@ public interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
|
||||
|
||||
public fun Meta.toString(format: MetaFormat): String = ByteArray {
|
||||
format.run {
|
||||
writeObject(this@ByteArray, this@toString)
|
||||
writeTo(this@ByteArray, this@toString)
|
||||
}
|
||||
}.decodeToString()
|
||||
|
||||
public fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory.build(Global, Meta.EMPTY))
|
||||
|
||||
public fun MetaFormat.parse(str: String): Meta {
|
||||
return ByteReadPacket(str.encodeToByteArray()).use { readObject(it) }
|
||||
}
|
||||
public fun MetaFormat.parse(str: String): Meta = readFrom(StringSource(str).buffered())
|
||||
|
||||
public fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = build(Global, formatMeta).parse(str)
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.*
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.bytestring.decodeToString
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
@ -18,7 +19,7 @@ import space.kscience.dataforge.names.plus
|
||||
public class TaggedEnvelopeFormat(
|
||||
public val io: IOPlugin,
|
||||
public val version: VERSION = VERSION.DF02,
|
||||
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat
|
||||
public val metaFormatFactory: MetaFormatFactory = JsonMetaFormat,
|
||||
) : EnvelopeFormat {
|
||||
|
||||
// private val metaFormat = io.metaFormat(metaFormatKey)
|
||||
@ -26,59 +27,60 @@ public class TaggedEnvelopeFormat(
|
||||
|
||||
|
||||
private fun Tag.toBinary() = Binary {
|
||||
writeRawString(START_SEQUENCE)
|
||||
writeRawString(version.name)
|
||||
write(START_SEQUENCE)
|
||||
writeString(version.name)
|
||||
writeShort(metaFormatKey)
|
||||
writeUInt(metaSize)
|
||||
when (version) {
|
||||
VERSION.DF02 -> {
|
||||
writeUInt(dataSize.toUInt())
|
||||
}
|
||||
|
||||
VERSION.DF03 -> {
|
||||
writeULong(dataSize)
|
||||
}
|
||||
}
|
||||
writeRawString(END_SEQUENCE)
|
||||
write(END_SEQUENCE)
|
||||
}
|
||||
|
||||
override fun writeObject(
|
||||
output: Output,
|
||||
override fun writeTo(
|
||||
sink: Sink,
|
||||
obj: Envelope,
|
||||
) {
|
||||
val metaFormat = metaFormatFactory.build(io.context, Meta.EMPTY)
|
||||
val metaBytes = Binary(obj.meta,metaFormat)
|
||||
val metaBytes = Binary(obj.meta, metaFormat)
|
||||
val actualSize: ULong = (obj.data?.size ?: 0).toULong()
|
||||
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
|
||||
output.writeBinary(tag.toBinary())
|
||||
output.writeBinary(metaBytes)
|
||||
output.writeRawString("\r\n")
|
||||
sink.writeBinary(tag.toBinary())
|
||||
sink.writeBinary(metaBytes)
|
||||
sink.writeString("\r\n")
|
||||
obj.data?.let {
|
||||
output.writeBinary(it)
|
||||
sink.writeBinary(it)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an envelope from input into memory
|
||||
*
|
||||
* @param input an input to read from
|
||||
* @param source an input to read from
|
||||
* @param formats a collection of meta formats to resolve
|
||||
*/
|
||||
override fun readObject(input: Input): Envelope {
|
||||
val tag = input.readTag(this.version)
|
||||
override fun readFrom(source: Source): Envelope {
|
||||
val tag = source.readTag(this.version)
|
||||
|
||||
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaBinary = input.readBinary(tag.metaSize.toInt())
|
||||
val metaBinary = source.readBinary(tag.metaSize.toInt())
|
||||
|
||||
val meta: Meta = metaFormat.readObjectFrom(metaBinary)
|
||||
val meta: Meta = metaFormat.readFrom(metaBinary)
|
||||
|
||||
val data = input.readBinary(tag.dataSize.toInt())
|
||||
val data = source.readBinary(tag.dataSize.toInt())
|
||||
|
||||
return SimpleEnvelope(meta, data)
|
||||
}
|
||||
|
||||
override fun readObject(binary: Binary): Envelope = binary.read{
|
||||
override fun readFrom(binary: Binary): Envelope = binary.read {
|
||||
val tag = readTag(version)
|
||||
|
||||
val metaFormat = io.resolveMetaFormat(tag.metaFormatKey)
|
||||
@ -86,7 +88,7 @@ public class TaggedEnvelopeFormat(
|
||||
|
||||
val metaBinary = readBinary(tag.metaSize.toInt())
|
||||
|
||||
val meta: Meta = metaFormat.readObjectFrom(metaBinary)
|
||||
val meta: Meta = metaFormat.readFrom(metaBinary)
|
||||
|
||||
|
||||
SimpleEnvelope(meta, binary.view((version.tagSize + tag.metaSize).toInt(), tag.dataSize.toInt()))
|
||||
@ -104,8 +106,8 @@ public class TaggedEnvelopeFormat(
|
||||
}
|
||||
|
||||
public companion object : EnvelopeFormatFactory {
|
||||
private const val START_SEQUENCE = "#~"
|
||||
private const val END_SEQUENCE = "~#\r\n"
|
||||
private val START_SEQUENCE = "#~".toAsciiByteString()
|
||||
private val END_SEQUENCE = "~#\r\n".toAsciiByteString()
|
||||
|
||||
override val name: Name = EnvelopeFormatFactory.ENVELOPE_FACTORY_NAME + "tagged"
|
||||
|
||||
@ -121,27 +123,26 @@ public class TaggedEnvelopeFormat(
|
||||
return TaggedEnvelopeFormat(io, version)
|
||||
}
|
||||
|
||||
private fun Input.readTag(version: VERSION): Tag {
|
||||
val start = readRawString(2)
|
||||
private fun Source.readTag(version: VERSION): Tag {
|
||||
val start = readByteString(2)
|
||||
if (start != START_SEQUENCE) error("The input is not an envelope")
|
||||
val versionString = readRawString(4)
|
||||
if (version.name != versionString) error("Wrong version of DataForge: expected $version but found $versionString")
|
||||
val versionString = readByteString(4)
|
||||
if (version.name.toAsciiByteString() != versionString) error("Wrong version of DataForge: expected $version but found $versionString")
|
||||
val metaFormatKey = readShort()
|
||||
val metaLength = readUInt()
|
||||
val dataLength: ULong = when (version) {
|
||||
VERSION.DF02 -> readUInt().toULong()
|
||||
VERSION.DF03 -> readULong()
|
||||
}
|
||||
val end = readRawString(4)
|
||||
val end = readByteString(4)
|
||||
if (end != END_SEQUENCE) error("The input is not an envelope")
|
||||
return Tag(metaFormatKey, metaLength, dataLength)
|
||||
}
|
||||
|
||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
|
||||
return try {
|
||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? = try {
|
||||
binary.read {
|
||||
val header = readRawString(6)
|
||||
return@read when (header.substring(2..5)) {
|
||||
val header = readByteString(6)
|
||||
when (header.substring(2, 6).decodeToString()) {
|
||||
VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02)
|
||||
VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03)
|
||||
else -> null
|
||||
@ -150,24 +151,20 @@ public class TaggedEnvelopeFormat(
|
||||
} catch (ex: Exception) {
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
private val default by lazy { build(Global, Meta.EMPTY) }
|
||||
|
||||
override fun readObject(binary: Binary): Envelope =
|
||||
default.run { readObject(binary) }
|
||||
override fun readFrom(binary: Binary): Envelope =
|
||||
default.run { readFrom(binary) }
|
||||
|
||||
override fun writeObject(
|
||||
output: Output,
|
||||
override fun writeTo(
|
||||
sink: Sink,
|
||||
obj: Envelope,
|
||||
): Unit = default.run {
|
||||
writeObject(
|
||||
output,
|
||||
obj,
|
||||
)
|
||||
writeTo(sink, obj)
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): Envelope = default.readObject(input)
|
||||
override fun readFrom(source: Source): Envelope = default.readFrom(source)
|
||||
}
|
||||
|
||||
}
|
@ -1,9 +1,9 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.ByteReadPacket
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import io.ktor.utils.io.core.readUTF8UntilDelimiterTo
|
||||
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.bytestring.ByteString
|
||||
import kotlinx.io.bytestring.encodeToByteString
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
@ -28,35 +28,36 @@ public class TaglessEnvelopeFormat(
|
||||
// writeFully("#? $key: $value;\r\n".encodeToByteArray())
|
||||
// }
|
||||
|
||||
override fun writeObject(
|
||||
output: Output,
|
||||
override fun writeTo(
|
||||
sink: Sink,
|
||||
obj: Envelope,
|
||||
) {
|
||||
val metaFormat = metaFormatFactory.build(this.io.context, meta)
|
||||
|
||||
//printing header
|
||||
output.writeRawString(TAGLESS_ENVELOPE_HEADER + "\r\n")
|
||||
sink.write(TAGLESS_ENVELOPE_HEADER)
|
||||
sink.writeString("\r\n")
|
||||
|
||||
//Printing meta
|
||||
if (!obj.meta.isEmpty()) {
|
||||
val metaBinary = Binary(obj.meta, metaFormat)
|
||||
output.writeUtf8String(META_START + "-${metaFormatFactory.shortName}\r\n")
|
||||
output.writeBinary(metaBinary)
|
||||
output.writeRawString("\r\n")
|
||||
sink.writeString(META_START + "-${metaFormatFactory.shortName}\r\n")
|
||||
sink.writeBinary(metaBinary)
|
||||
sink.writeString("\r\n")
|
||||
}
|
||||
|
||||
//Printing data
|
||||
obj.data?.let { data ->
|
||||
//val actualSize: Int = envelope.data?.size ?: 0
|
||||
output.writeUtf8String(DATA_START + "\r\n")
|
||||
output.writeBinary(data)
|
||||
sink.writeString(DATA_START + "\r\n")
|
||||
sink.writeBinary(data)
|
||||
}
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): Envelope {
|
||||
override fun readFrom(source: Source): Envelope {
|
||||
//read preamble
|
||||
input.discardWithSeparator(
|
||||
TAGLESS_ENVELOPE_HEADER.encodeToByteArray(),
|
||||
source.discardWithSeparator(
|
||||
TAGLESS_ENVELOPE_HEADER,
|
||||
atMost = 1024,
|
||||
)
|
||||
|
||||
@ -64,22 +65,22 @@ public class TaglessEnvelopeFormat(
|
||||
|
||||
var data: Binary? = null
|
||||
|
||||
input.discardWithSeparator(
|
||||
source.discardWithSeparator(
|
||||
SEPARATOR_PREFIX,
|
||||
atMost = 1024,
|
||||
)
|
||||
|
||||
var header: String = ByteArray {
|
||||
input.readUTF8UntilDelimiterTo(this, "\n")
|
||||
source.readWithSeparatorTo(this, "\n".encodeToByteString())
|
||||
}.decodeToString()
|
||||
|
||||
while (!input.endOfInput) {
|
||||
while (!source.exhausted()) {
|
||||
val block = ByteArray {
|
||||
input.readWithSeparatorTo(this, SEPARATOR_PREFIX)
|
||||
source.readWithSeparatorTo(this, SEPARATOR_PREFIX)
|
||||
}
|
||||
|
||||
val nextHeader = ByteArray {
|
||||
input.readWithSeparatorTo(this, "\n".encodeToByteArray())
|
||||
source.readWithSeparatorTo(this, "\n".encodeToByteString())
|
||||
}.decodeToString()
|
||||
|
||||
//terminate on end
|
||||
@ -89,7 +90,7 @@ public class TaglessEnvelopeFormat(
|
||||
if (header.startsWith("META")) {
|
||||
//TODO check format
|
||||
val metaFormat: MetaFormatFactory = JsonMetaFormat
|
||||
meta = metaFormat.readMeta(ByteReadPacket(block))
|
||||
meta = metaFormat.readMeta(ByteArraySource(block).buffered())
|
||||
}
|
||||
|
||||
if (header.startsWith("DATA")) {
|
||||
@ -111,9 +112,9 @@ public class TaglessEnvelopeFormat(
|
||||
|
||||
public const val TAGLESS_ENVELOPE_TYPE: String = "tagless"
|
||||
|
||||
public val SEPARATOR_PREFIX: ByteArray = "\n#~".encodeToByteArray()
|
||||
public val SEPARATOR_PREFIX: ByteString = "\n#~".encodeToByteString()
|
||||
|
||||
public const val TAGLESS_ENVELOPE_HEADER: String = "#~DFTL"
|
||||
public val TAGLESS_ENVELOPE_HEADER: ByteString = "#~DFTL".encodeToByteString()
|
||||
|
||||
// public const val META_START_PROPERTY: String = "metaSeparator"
|
||||
public const val META_START: String = "#~META"
|
||||
@ -131,24 +132,21 @@ public class TaglessEnvelopeFormat(
|
||||
|
||||
private val default by lazy { build(Global, Meta.EMPTY) }
|
||||
|
||||
override fun readObject(binary: Binary): Envelope = default.run { readObject(binary) }
|
||||
override fun readFrom(binary: Binary): Envelope = default.run { readFrom(binary) }
|
||||
|
||||
override fun writeObject(
|
||||
output: Output,
|
||||
override fun writeTo(
|
||||
sink: Sink,
|
||||
obj: Envelope,
|
||||
): Unit = default.run {
|
||||
writeObject(
|
||||
output,
|
||||
obj,
|
||||
)
|
||||
writeTo(sink, obj)
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): Envelope = default.readObject(input)
|
||||
override fun readFrom(source: Source): Envelope = default.readFrom(source)
|
||||
|
||||
override fun peekFormat(io: IOPlugin, binary: Binary): EnvelopeFormat? {
|
||||
return try {
|
||||
binary.read {
|
||||
val string = readRawString(TAGLESS_ENVELOPE_HEADER.length)
|
||||
val string = readByteString(TAGLESS_ENVELOPE_HEADER.size)
|
||||
return@read if (string == TAGLESS_ENVELOPE_HEADER) {
|
||||
TaglessEnvelopeFormat(io)
|
||||
} else {
|
||||
|
@ -1,40 +1,41 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.bits.Memory
|
||||
import io.ktor.utils.io.charsets.Charsets
|
||||
import io.ktor.utils.io.charsets.decodeExactBytes
|
||||
import io.ktor.utils.io.core.*
|
||||
import io.ktor.utils.io.core.internal.ChunkBuffer
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.bytestring.ByteString
|
||||
import kotlinx.io.bytestring.decodeToString
|
||||
import kotlinx.io.bytestring.encodeToByteString
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import kotlin.math.min
|
||||
|
||||
public fun Output.writeRawString(str: String) {
|
||||
writeFully(str.toByteArray(Charsets.ISO_8859_1))
|
||||
/**
|
||||
* Convert a string literal, containing only ASCII characters to a [ByteString].
|
||||
* Throws an error if there are non-ASCII characters.
|
||||
*/
|
||||
public fun String.toAsciiByteString(): ByteString {
|
||||
val bytes = ByteArray(length) {
|
||||
val char = get(it)
|
||||
val code = char.code
|
||||
if (code > Byte.MAX_VALUE) error("Symbol $char is not ASCII symbol") else code.toByte()
|
||||
}
|
||||
return ByteString(bytes)
|
||||
}
|
||||
|
||||
public fun Output.writeUtf8String(str: String) {
|
||||
writeFully(str.encodeToByteArray())
|
||||
}
|
||||
public inline fun Buffer(block: Sink.() -> Unit): Buffer = Buffer().apply(block)
|
||||
|
||||
public fun Input.readRawString(size: Int): String {
|
||||
return Charsets.ISO_8859_1.newDecoder().decodeExactBytes(this, size)
|
||||
}
|
||||
//public fun Source.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found")
|
||||
|
||||
public fun Input.readUtf8String(): String = readBytes().decodeToString()
|
||||
public inline fun ByteArray(block: Sink.() -> Unit): ByteArray =
|
||||
Buffer(block).readByteArray()
|
||||
|
||||
public fun Input.readSafeUtf8Line(): String = readUTF8Line() ?: error("Line not found")
|
||||
|
||||
public inline fun ByteArray(block: Output.() -> Unit): ByteArray =
|
||||
buildPacket(block).readBytes()
|
||||
|
||||
public inline fun Binary(block: Output.() -> Unit): Binary =
|
||||
public inline fun Binary(block: Sink.() -> Unit): Binary =
|
||||
ByteArray(block).asBinary()
|
||||
|
||||
public operator fun Binary.get(range: IntRange): Binary = view(range.first, range.last - range.first)
|
||||
|
||||
/**
|
||||
* Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts the binary, return null. If
|
||||
* multiple formats accepts binary, throw an error.
|
||||
* multiple formats accept binary, throw an error.
|
||||
*/
|
||||
public fun IOPlugin.peekBinaryEnvelopeFormat(binary: Binary): EnvelopeFormat? {
|
||||
val formats = envelopeFormatFactories.mapNotNull { factory ->
|
||||
@ -56,7 +57,7 @@ public fun IOPlugin.readEnvelope(
|
||||
binary: Binary,
|
||||
readNonEnvelopes: Boolean = false,
|
||||
formatPicker: IOPlugin.(Binary) -> EnvelopeFormat? = IOPlugin::peekBinaryEnvelopeFormat,
|
||||
): Envelope = formatPicker(binary)?.readObject(binary) ?: if (readNonEnvelopes) {
|
||||
): Envelope = formatPicker(binary)?.readFrom(binary) ?: if (readNonEnvelopes) {
|
||||
// if no format accepts file, read it as binary
|
||||
Envelope(Meta.EMPTY, binary)
|
||||
} else error("Can't infer format for $binary")
|
||||
@ -96,74 +97,139 @@ private class RingByteArray(
|
||||
else -> inputArray.indices.all { inputArray[it] == get(it) }
|
||||
}
|
||||
|
||||
fun contentEquals(byteString: ByteString): Boolean = when {
|
||||
byteString.size != buffer.size -> false
|
||||
size < buffer.size -> false
|
||||
else -> (0 until byteString.size).all { byteString[it] == get(it) }
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private fun RingByteArray.toArray(): ByteArray = ByteArray(size) { get(it) }
|
||||
|
||||
/**
|
||||
* Read [Input] into [output] until designated multibyte [separator] and optionally continues until
|
||||
* Read [Source] into [output] until designated multibyte [separator] and optionally continues until
|
||||
* the end of the line after it. Throw error if [separator] not found and [atMost] bytes are read.
|
||||
* Also fails if [separator] not found until the end of input.
|
||||
*
|
||||
* Separator itself is not read into Output.
|
||||
* The Separator itself is not read into [Sink].
|
||||
*
|
||||
* @param errorOnEof if true error is thrown if separator is never encountered
|
||||
*
|
||||
* @return bytes actually being read, including separator
|
||||
*/
|
||||
public fun Input.readWithSeparatorTo(
|
||||
output: Output,
|
||||
separator: ByteArray,
|
||||
public fun Source.readWithSeparatorTo(
|
||||
output: Sink?,
|
||||
separator: ByteString,
|
||||
atMost: Int = Int.MAX_VALUE,
|
||||
errorOnEof: Boolean = false,
|
||||
): Int {
|
||||
var counter = 0
|
||||
val rb = RingByteArray(ByteArray(separator.size))
|
||||
takeWhile { buffer ->
|
||||
while (buffer.canRead()) {
|
||||
val byte = buffer.readByte()
|
||||
|
||||
while (!exhausted()) {
|
||||
val byte = readByte()
|
||||
counter++
|
||||
if (counter >= atMost) error("Maximum number of bytes to be read $atMost reached.")
|
||||
rb.push(byte)
|
||||
if (rb.contentEquals(separator)) {
|
||||
return counter
|
||||
} else if (rb.isFull()) {
|
||||
output.writeByte(rb[0])
|
||||
output?.writeByte(rb[0])
|
||||
}
|
||||
}
|
||||
!endOfInput
|
||||
}
|
||||
|
||||
if (errorOnEof) {
|
||||
error("Read to the end of input without encountering ${separator.decodeToString()}")
|
||||
} else {
|
||||
for(i in 1 until rb.size){
|
||||
output.writeByte(rb[i])
|
||||
for (i in 1 until rb.size) {
|
||||
output?.writeByte(rb[i])
|
||||
}
|
||||
counter += (rb.size - 1)
|
||||
return counter
|
||||
}
|
||||
}
|
||||
|
||||
public fun Input.discardLine(): Int {
|
||||
return discardUntilDelimiter('\n'.code.toByte()).also {
|
||||
discard(1)
|
||||
}.toInt() + 1
|
||||
}
|
||||
|
||||
public fun Input.discardWithSeparator(
|
||||
separator: ByteArray,
|
||||
/**
|
||||
* Discard all bytes until [separator] is encountered. Separator is discarded sa well.
|
||||
* Return the total number of bytes read.
|
||||
*/
|
||||
public fun Source.discardWithSeparator(
|
||||
separator: ByteString,
|
||||
atMost: Int = Int.MAX_VALUE,
|
||||
errorOnEof: Boolean = false,
|
||||
): Int {
|
||||
val dummy: Output = object : Output(ChunkBuffer.Pool) {
|
||||
override fun closeDestination() {
|
||||
): Int = readWithSeparatorTo(null, separator, atMost, errorOnEof)
|
||||
|
||||
/**
|
||||
* Discard all symbol until newline is discovered. Carriage return is not discarded.
|
||||
*/
|
||||
public fun Source.discardLine(
|
||||
atMost: Int = Int.MAX_VALUE,
|
||||
errorOnEof: Boolean = false,
|
||||
): Int = discardWithSeparator("\n".encodeToByteString(), atMost, errorOnEof)
|
||||
|
||||
|
||||
/**
|
||||
* A [Source] based on [ByteArray]
|
||||
*/
|
||||
internal class ByteArraySource(
|
||||
private val byteArray: ByteArray,
|
||||
private val offset: Int = 0,
|
||||
private val size: Int = byteArray.size - offset,
|
||||
) : RawSource {
|
||||
|
||||
init {
|
||||
require(offset >= 0) { "Offset must be positive" }
|
||||
require(offset + size <= byteArray.size) { "End index is ${offset + size}, but the array size is ${byteArray.size}" }
|
||||
}
|
||||
|
||||
private var pointer = offset
|
||||
|
||||
override fun close() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
override fun flush(source: Memory, offset: Int, length: Int) {
|
||||
// Do nothing
|
||||
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
|
||||
if (pointer == offset + size) return -1
|
||||
val byteRead = min(byteCount.toInt(), (size + offset - pointer))
|
||||
sink.write(byteArray, pointer, pointer + byteRead)
|
||||
pointer += byteRead
|
||||
return byteRead.toLong()
|
||||
}
|
||||
}
|
||||
|
||||
return readWithSeparatorTo(dummy, separator, atMost, errorOnEof)
|
||||
}
|
||||
|
||||
/**
|
||||
* A [Source] based on [String]
|
||||
*/
|
||||
public class StringSource(
|
||||
public val string: String,
|
||||
public val offset: Int = 0,
|
||||
public val size: Int = string.length - offset,
|
||||
) : RawSource {
|
||||
|
||||
private var pointer = offset
|
||||
|
||||
override fun close() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
override fun readAtMostTo(sink: Buffer, byteCount: Long): Long {
|
||||
if (pointer == offset + size) return -1
|
||||
val byteRead = min(byteCount.toInt(), (size + offset - pointer))
|
||||
sink.writeString(string, pointer, pointer + byteRead)
|
||||
pointer += byteRead
|
||||
return byteRead.toLong()
|
||||
}
|
||||
}
|
||||
|
||||
public fun Sink.writeDouble(value: Double) {
|
||||
writeLong(value.toBits())
|
||||
}
|
||||
|
||||
public fun Source.readDouble(): Double = Double.fromBits(readLong())
|
||||
|
||||
public fun Sink.writeFloat(value: Float) {
|
||||
writeInt(value.toBits())
|
||||
}
|
||||
|
||||
public fun Source.readFloat(): Float = Float.fromBits(readInt())
|
@ -1,6 +1,5 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.readInt
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.readBytes
|
||||
import kotlinx.io.readByteArray
|
||||
import kotlinx.io.writeString
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@ -12,7 +13,7 @@ class EnvelopeFormatTest {
|
||||
"d" put 22.2
|
||||
}
|
||||
data {
|
||||
writeUtf8String("12345678")
|
||||
writeString("12345678")
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,7 +23,7 @@ class EnvelopeFormatTest {
|
||||
val res = readFromByteArray(byteArray)
|
||||
assertEquals(envelope.meta, res.meta)
|
||||
val bytes = res.data?.read {
|
||||
readBytes()
|
||||
readByteArray()
|
||||
}
|
||||
assertEquals("12345678", bytes?.decodeToString())
|
||||
}
|
||||
@ -34,13 +35,13 @@ class EnvelopeFormatTest {
|
||||
val res = readFromByteArray(byteArray)
|
||||
assertEquals(envelope.meta, res.meta)
|
||||
val bytes = res.data?.read {
|
||||
readBytes()
|
||||
readByteArray()
|
||||
}
|
||||
assertEquals("12345678", bytes?.decodeToString())
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testManualDftl(){
|
||||
fun testManualDftl() {
|
||||
val envelopeString = """
|
||||
#~DFTL
|
||||
#~META
|
||||
@ -56,7 +57,7 @@ class EnvelopeFormatTest {
|
||||
val res = TaglessEnvelopeFormat.readFromByteArray(envelopeString.encodeToByteArray())
|
||||
assertEquals(envelope.meta, res.meta)
|
||||
val bytes = res.data?.read {
|
||||
readBytes()
|
||||
readByteArray()
|
||||
}
|
||||
assertEquals("12345678", bytes?.decodeToString())
|
||||
}
|
||||
|
@ -1,8 +1,9 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.ByteReadPacket
|
||||
import io.ktor.utils.io.core.readBytes
|
||||
import io.ktor.utils.io.core.readUTF8Line
|
||||
import kotlinx.io.buffered
|
||||
import kotlinx.io.bytestring.encodeToByteString
|
||||
import kotlinx.io.readByteArray
|
||||
import kotlinx.io.readLine
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFails
|
||||
@ -11,9 +12,9 @@ class IOTest {
|
||||
@Test
|
||||
fun readBytes() {
|
||||
val bytes = ByteArray(8) { it.toByte() }
|
||||
val input = ByteReadPacket(bytes)
|
||||
@Suppress("UNUSED_VARIABLE") val first = input.readBytes(4)
|
||||
val second = input.readBytes(4)
|
||||
val input = ByteArraySource(bytes).buffered()
|
||||
@Suppress("UNUSED_VARIABLE") val first = input.readByteArray(4)
|
||||
val second = input.readByteArray(4)
|
||||
assertEquals(4.toByte(), second[0])
|
||||
}
|
||||
|
||||
@ -31,25 +32,25 @@ class IOTest {
|
||||
|
||||
binary.read {
|
||||
val array = ByteArray {
|
||||
val read = readWithSeparatorTo(this, "---".encodeToByteArray()) + discardLine()
|
||||
val read = readWithSeparatorTo(this, "---".encodeToByteString()) + discardLine()
|
||||
assertEquals(12, read)
|
||||
}
|
||||
assertEquals("""
|
||||
aaa
|
||||
bbb
|
||||
""".trimIndent(),array.decodeToString().trim())
|
||||
assertEquals("ccc", readUTF8Line()?.trim())
|
||||
assertEquals("ccc", readLine()?.trim())
|
||||
}
|
||||
|
||||
assertFails {
|
||||
binary.read {
|
||||
discardWithSeparator("---".encodeToByteArray(), atMost = 3 )
|
||||
discardWithSeparator("---".encodeToByteString(), atMost = 3 )
|
||||
}
|
||||
}
|
||||
|
||||
assertFails {
|
||||
binary.read{
|
||||
discardWithSeparator("-+-".encodeToByteArray(), errorOnEof = true)
|
||||
discardWithSeparator("-+-".encodeToByteString(), errorOnEof = true)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,11 +7,11 @@ import kotlin.test.assertEquals
|
||||
|
||||
|
||||
fun Meta.toByteArray(format: MetaFormat = JsonMetaFormat) = ByteArray {
|
||||
format.writeObject(this@ByteArray, this@toByteArray)
|
||||
format.writeTo(this@ByteArray, this@toByteArray)
|
||||
}
|
||||
|
||||
fun MetaFormat.fromByteArray(packet: ByteArray): Meta {
|
||||
return packet.asBinary().read { readObject(this) }
|
||||
return packet.asBinary().read { readFrom(this) }
|
||||
}
|
||||
|
||||
class MetaFormatTest {
|
||||
|
@ -1,5 +1,6 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import kotlinx.io.writeString
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.meta.int
|
||||
@ -18,9 +19,9 @@ class MultipartTest {
|
||||
"value" put it
|
||||
}
|
||||
data {
|
||||
writeUtf8String("Hello World $it")
|
||||
writeString("Hello World $it")
|
||||
repeat(300) {
|
||||
writeRawString("$it ")
|
||||
writeString("$it ")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,11 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.ByteReadPacket
|
||||
import io.ktor.utils.io.core.use
|
||||
import kotlinx.io.buffered
|
||||
|
||||
|
||||
fun <T : Any> IOFormat<T>.writeToByteArray(obj: T): ByteArray = ByteArray {
|
||||
writeObject(this, obj)
|
||||
writeTo(this, obj)
|
||||
}
|
||||
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteReadPacket(array).use {
|
||||
readObject(it)
|
||||
fun <T : Any> IOFormat<T>.readFromByteArray(array: ByteArray): T = ByteArraySource(array).buffered().use {
|
||||
readFrom(it)
|
||||
}
|
@ -1,8 +1,11 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.*
|
||||
import io.ktor.utils.io.streams.asOutput
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.asSink
|
||||
import kotlinx.io.buffered
|
||||
import space.kscience.dataforge.meta.Meta
|
||||
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
|
||||
import space.kscience.dataforge.meta.isEmpty
|
||||
@ -23,18 +26,18 @@ internal class PathBinary(
|
||||
override val size: Int = Files.size(path).toInt() - fileOffset,
|
||||
) : Binary {
|
||||
|
||||
override fun <R> read(offset: Int, atMost: Int, block: Input.() -> R): R = runBlocking {
|
||||
override fun <R> read(offset: Int, atMost: Int, block: Source.() -> R): R = runBlocking {
|
||||
readSuspend(offset, atMost, block)
|
||||
}
|
||||
|
||||
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Input.() -> R): R {
|
||||
override suspend fun <R> readSuspend(offset: Int, atMost: Int, block: suspend Source.() -> R): R {
|
||||
val actualOffset = offset + fileOffset
|
||||
val actualSize = min(atMost, size - offset)
|
||||
val array = path.inputStream().use {
|
||||
it.skip(actualOffset.toLong())
|
||||
it.readNBytes(actualSize)
|
||||
}
|
||||
return ByteReadPacket(array).block()
|
||||
return ByteArraySource(array).buffered().use { it.block() }
|
||||
}
|
||||
|
||||
override fun view(offset: Int, binarySize: Int) = PathBinary(path, fileOffset + offset, binarySize)
|
||||
@ -42,40 +45,40 @@ internal class PathBinary(
|
||||
|
||||
public fun Path.asBinary(): Binary = PathBinary(this)
|
||||
|
||||
public fun <R> Path.read(block: Input.() -> R): R = asBinary().read(block = block)
|
||||
public fun <R> Path.read(block: Source.() -> R): R = asBinary().read(block = block)
|
||||
|
||||
/**
|
||||
* Write a live output to a newly created file. If file does not exist, throws error
|
||||
*/
|
||||
public fun Path.write(block: Output.() -> Unit): Unit {
|
||||
public fun Path.write(block: Sink.() -> Unit): Unit {
|
||||
val stream = Files.newOutputStream(this, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
||||
stream.asOutput().use(block)
|
||||
stream.asSink().buffered().use(block)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file or append to exiting one with given output [block]
|
||||
*/
|
||||
public fun Path.append(block: Output.() -> Unit): Unit {
|
||||
public fun Path.append(block: Sink.() -> Unit): Unit {
|
||||
val stream = Files.newOutputStream(
|
||||
this,
|
||||
StandardOpenOption.WRITE, StandardOpenOption.APPEND, StandardOpenOption.CREATE
|
||||
)
|
||||
stream.asOutput().use(block)
|
||||
stream.asSink().buffered().use(block)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new file or replace existing one using given output [block]
|
||||
*/
|
||||
public fun Path.rewrite(block: Output.() -> Unit): Unit {
|
||||
public fun Path.rewrite(block: Sink.() -> Unit): Unit {
|
||||
val stream = Files.newOutputStream(
|
||||
this,
|
||||
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE
|
||||
)
|
||||
stream.asOutput().use(block)
|
||||
stream.asSink().buffered().use(block)
|
||||
}
|
||||
|
||||
@DFExperimental
|
||||
public fun EnvelopeFormat.readFile(path: Path): Envelope = readObject(path.asBinary())
|
||||
public fun EnvelopeFormat.readFile(path: Path): Envelope = readFrom(path.asBinary())
|
||||
|
||||
/**
|
||||
* Resolve IOFormat based on type
|
||||
@ -235,7 +238,7 @@ public fun IOPlugin.writeEnvelopeFile(
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
) {
|
||||
path.rewrite {
|
||||
envelopeFormat.writeObject(this, envelope)
|
||||
envelopeFormat.writeTo(this, envelope)
|
||||
}
|
||||
}
|
||||
|
||||
@ -260,7 +263,7 @@ public fun IOPlugin.writeEnvelopeDirectory(
|
||||
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
|
||||
dataFile.write {
|
||||
envelope.data?.read {
|
||||
val copied = copyTo(this@write)
|
||||
val copied = transferTo(this@write)
|
||||
if (copied != envelope.data?.size?.toLong()) {
|
||||
error("The number of copied bytes does not equal data size")
|
||||
}
|
||||
|
@ -1,9 +1,11 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.streams.asInput
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.asSource
|
||||
import kotlinx.io.buffered
|
||||
|
||||
public fun IOPlugin.resource(name: String): Binary? = context.javaClass.getResource(name)?.readBytes()?.asBinary()
|
||||
|
||||
public inline fun <R> IOPlugin.readResource(name: String, block: Input.() -> R): R =
|
||||
context.javaClass.getResource(name)?.openStream()?.asInput()?.block() ?: error("Can't read resource $name")
|
||||
public fun IOPlugin.resource(name: String): Binary? = { }.javaClass.getResource(name)?.readBytes()?.asBinary()
|
||||
|
||||
public inline fun <R> IOPlugin.readResource(name: String, block: Source.() -> R): R =
|
||||
{ }.javaClass.getResource(name)?.openStream()?.asSource()?.buffered()?.block() ?: error("Can't read resource $name")
|
@ -1,6 +1,5 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.writeDouble
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import java.nio.file.Files
|
||||
|
@ -1,6 +1,5 @@
|
||||
package space.kscience.dataforge.io
|
||||
|
||||
import io.ktor.utils.io.core.writeDouble
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import java.nio.file.Files
|
||||
|
@ -45,9 +45,9 @@ public final class space/kscience/dataforge/meta/False : space/kscience/dataforg
|
||||
|
||||
public final class space/kscience/dataforge/meta/JsonMetaKt {
|
||||
public static final fun getJSON_ARRAY_KEY (Lspace/kscience/dataforge/meta/Meta$Companion;)Ljava/lang/String;
|
||||
public static final fun toJson (Lspace/kscience/dataforge/meta/Meta;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;)Lkotlinx/serialization/json/JsonObject;
|
||||
public static final fun toJson (Lspace/kscience/dataforge/meta/Meta;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;)Lkotlinx/serialization/json/JsonElement;
|
||||
public static final fun toJson (Lspace/kscience/dataforge/meta/Value;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;)Lkotlinx/serialization/json/JsonElement;
|
||||
public static synthetic fun toJson$default (Lspace/kscience/dataforge/meta/Meta;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;ILjava/lang/Object;)Lkotlinx/serialization/json/JsonObject;
|
||||
public static synthetic fun toJson$default (Lspace/kscience/dataforge/meta/Meta;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;ILjava/lang/Object;)Lkotlinx/serialization/json/JsonElement;
|
||||
public static synthetic fun toJson$default (Lspace/kscience/dataforge/meta/Value;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;ILjava/lang/Object;)Lkotlinx/serialization/json/JsonElement;
|
||||
public static final fun toMeta (Lkotlinx/serialization/json/JsonElement;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;)Lspace/kscience/dataforge/meta/SealedMeta;
|
||||
public static final fun toMeta (Lkotlinx/serialization/json/JsonObject;Lspace/kscience/dataforge/meta/descriptors/MetaDescriptor;)Lspace/kscience/dataforge/meta/SealedMeta;
|
||||
@ -639,17 +639,6 @@ public final class space/kscience/dataforge/meta/ValueType : java/lang/Enum {
|
||||
public static fun values ()[Lspace/kscience/dataforge/meta/ValueType;
|
||||
}
|
||||
|
||||
public final class space/kscience/dataforge/meta/ValueType$$serializer : kotlinx/serialization/internal/GeneratedSerializer {
|
||||
public static final field INSTANCE Lspace/kscience/dataforge/meta/ValueType$$serializer;
|
||||
public fun childSerializers ()[Lkotlinx/serialization/KSerializer;
|
||||
public synthetic fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Ljava/lang/Object;
|
||||
public fun deserialize (Lkotlinx/serialization/encoding/Decoder;)Lspace/kscience/dataforge/meta/ValueType;
|
||||
public fun getDescriptor ()Lkotlinx/serialization/descriptors/SerialDescriptor;
|
||||
public synthetic fun serialize (Lkotlinx/serialization/encoding/Encoder;Ljava/lang/Object;)V
|
||||
public fun serialize (Lkotlinx/serialization/encoding/Encoder;Lspace/kscience/dataforge/meta/ValueType;)V
|
||||
public fun typeParametersSerializers ()[Lkotlinx/serialization/KSerializer;
|
||||
}
|
||||
|
||||
public final class space/kscience/dataforge/meta/ValueType$Companion {
|
||||
public final fun serializer ()Lkotlinx/serialization/KSerializer;
|
||||
}
|
||||
|
@ -50,7 +50,9 @@ public interface Meta : MetaRepr, MetaProvider {
|
||||
override fun toMeta(): Meta = this
|
||||
|
||||
override fun toString(): String
|
||||
|
||||
override fun equals(other: Any?): Boolean
|
||||
|
||||
override fun hashCode(): Int
|
||||
|
||||
public companion object {
|
||||
|
@ -11,7 +11,7 @@ import kotlin.js.JsName
|
||||
* Mark a meta builder
|
||||
*/
|
||||
@DslMarker
|
||||
public annotation class MetaBuilder
|
||||
public annotation class MetaBuilderMarker
|
||||
|
||||
/**
|
||||
* A generic interface that gives access to getting and setting meta notes and values
|
||||
@ -27,7 +27,7 @@ public interface MutableMetaProvider : MetaProvider, MutableValueProvider {
|
||||
* TODO documentation
|
||||
*/
|
||||
@Serializable(MutableMetaSerializer::class)
|
||||
@MetaBuilder
|
||||
@MetaBuilderMarker
|
||||
public interface MutableMeta : Meta, MutableMetaProvider {
|
||||
|
||||
override val items: Map<NameToken, MutableMeta>
|
||||
@ -90,8 +90,8 @@ public interface MutableMeta : Meta, MutableMetaProvider {
|
||||
setMeta(this, repr.toMeta())
|
||||
}
|
||||
|
||||
public infix fun Name.put(mutableMeta: MutableMeta.() -> Unit) {
|
||||
setMeta(this, Meta(mutableMeta))
|
||||
public infix fun Name.put(builder: MutableMeta.() -> Unit) {
|
||||
getOrCreate(this).apply(builder)
|
||||
}
|
||||
|
||||
public infix fun String.put(meta: Meta) {
|
||||
@ -131,7 +131,7 @@ public interface MutableMeta : Meta, MutableMetaProvider {
|
||||
}
|
||||
|
||||
public infix fun String.put(builder: MutableMeta.() -> Unit) {
|
||||
setMeta(Name.parse(this), MutableMeta(builder))
|
||||
getOrCreate(parseAsName()).apply(builder)
|
||||
}
|
||||
}
|
||||
|
||||
@ -381,16 +381,14 @@ public fun Meta.toMutableMeta(): ObservableMutableMeta = MutableMetaImpl(value,
|
||||
|
||||
public fun Meta.asMutableMeta(): MutableMeta = (this as? MutableMeta) ?: toMutableMeta()
|
||||
|
||||
@Suppress("FunctionName")
|
||||
@JsName("newMutableMeta")
|
||||
public fun MutableMeta(): ObservableMutableMeta = MutableMetaImpl(null)
|
||||
@JsName("newObservableMutableMeta")
|
||||
public fun ObservableMutableMeta(): ObservableMutableMeta = MutableMetaImpl(null)
|
||||
|
||||
/**
|
||||
* Build a [MutableMeta] using given transformation
|
||||
*/
|
||||
@Suppress("FunctionName")
|
||||
public inline fun MutableMeta(builder: MutableMeta.() -> Unit = {}): ObservableMutableMeta =
|
||||
MutableMeta().apply(builder)
|
||||
public inline fun ObservableMutableMeta(builder: MutableMeta.() -> Unit = {}): ObservableMutableMeta =
|
||||
ObservableMutableMeta().apply(builder)
|
||||
|
||||
|
||||
/**
|
||||
|
@ -51,6 +51,8 @@ private class ObservableMetaWrapper(
|
||||
|
||||
override fun setMeta(name: Name, node: Meta?) {
|
||||
val oldMeta = get(name)
|
||||
//don't forget to remove listener
|
||||
oldMeta?.removeListener(this)
|
||||
root.setMeta(absoluteName + name, node)
|
||||
if (oldMeta != node) {
|
||||
invalidate(name)
|
||||
|
@ -1,16 +1,17 @@
|
||||
package space.kscience.dataforge.meta
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.dataforge.names.NameToken
|
||||
import space.kscience.dataforge.names.*
|
||||
import kotlin.js.JsName
|
||||
|
||||
/**
|
||||
* The meta implementation which is guaranteed to be immutable.
|
||||
*
|
||||
*/
|
||||
@Serializable
|
||||
public class SealedMeta internal constructor(
|
||||
public class SealedMeta(
|
||||
override val value: Value?,
|
||||
override val items: Map<NameToken, SealedMeta>
|
||||
override val items: Map<NameToken, SealedMeta>,
|
||||
) : TypedMeta<SealedMeta> {
|
||||
override fun toString(): String = Meta.toString(this)
|
||||
override fun equals(other: Any?): Boolean = Meta.equals(this, other as? Meta)
|
||||
@ -26,7 +27,7 @@ public class SealedMeta internal constructor(
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate sealed node from [this]. If it is already sealed return it as is.
|
||||
* Generate sealed node from [this]. If it is already sealed, return it as is.
|
||||
*/
|
||||
public fun Meta.seal(): SealedMeta = this as? SealedMeta ?: SealedMeta(
|
||||
value,
|
||||
@ -47,7 +48,79 @@ public fun Meta(value: String): SealedMeta = Meta(value.asValue())
|
||||
@Suppress("FunctionName")
|
||||
public fun Meta(value: Boolean): SealedMeta = Meta(value.asValue())
|
||||
|
||||
@Suppress("FunctionName")
|
||||
public inline fun Meta(builder: MutableMeta.() -> Unit): SealedMeta =
|
||||
MutableMeta(builder).seal()
|
||||
|
||||
/**
|
||||
* A lightweight mutable meta without an observability
|
||||
*/
|
||||
@PublishedApi
|
||||
internal class MetaBuilder(
|
||||
override var value: Value? = null,
|
||||
override val items: MutableMap<NameToken, MetaBuilder> = hashMapOf(),
|
||||
) : MutableMeta {
|
||||
|
||||
override fun getOrCreate(name: Name): MetaBuilder {
|
||||
val existing = get(name) as? MetaBuilder
|
||||
return if (existing == null) {
|
||||
val newItem = MetaBuilder()
|
||||
setMeta(name, newItem)
|
||||
newItem
|
||||
} else {
|
||||
existing
|
||||
}
|
||||
}
|
||||
|
||||
private fun wrap(meta: Meta): MetaBuilder = meta as? MetaBuilder ?: MetaBuilder(
|
||||
meta.value,
|
||||
meta.items.mapValuesTo(hashMapOf()) { wrap(it.value) }
|
||||
)
|
||||
|
||||
|
||||
override fun setMeta(name: Name, node: Meta?) {
|
||||
when (name.length) {
|
||||
0 -> error("Can't set a meta with empty name")
|
||||
1 -> {
|
||||
val token = name.first()
|
||||
//remove child and invalidate if argument is null
|
||||
if (node == null) {
|
||||
items.remove(token)
|
||||
} else {
|
||||
items[token] = wrap(node)
|
||||
}
|
||||
}
|
||||
|
||||
else -> {
|
||||
getOrCreate(name.first().asName()).setMeta(name.cutFirst(), node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun toString(): String = Meta.toString(this)
|
||||
|
||||
override fun equals(other: Any?): Boolean = Meta.equals(this, other as? Meta)
|
||||
|
||||
override fun hashCode(): Int = Meta.hashCode(this)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a read-only meta.
|
||||
*/
|
||||
public inline fun Meta(builder: MutableMeta.() -> Unit): Meta =
|
||||
MetaBuilder().apply(builder).seal()
|
||||
|
||||
/**
|
||||
* Create an immutable meta.
|
||||
*/
|
||||
public inline fun SealedMeta(builder: MutableMeta.() -> Unit): SealedMeta =
|
||||
MetaBuilder().apply(builder).seal()
|
||||
|
||||
/**
|
||||
* Create an empty meta mutable meta.
|
||||
*/
|
||||
@JsName("newMutableMeta")
|
||||
public fun MutableMeta(): MutableMeta = MetaBuilder()
|
||||
|
||||
/**
|
||||
* Create a mutable meta with given builder.
|
||||
*/
|
||||
public inline fun MutableMeta(builder: MutableMeta.() -> Unit = {}): MutableMeta =
|
||||
MutableMeta().apply(builder)
|
||||
|
@ -105,7 +105,7 @@ public value class MetaTransformation(private val transformations: Collection<Tr
|
||||
* Generate an observable configuration that contains only elements defined by transformation rules and changes with the source
|
||||
*/
|
||||
@DFExperimental
|
||||
public fun generate(source: ObservableMeta): ObservableMeta = MutableMeta().apply {
|
||||
public fun generate(source: ObservableMeta): ObservableMeta = ObservableMutableMeta{
|
||||
transformations.forEach { rule ->
|
||||
rule.selectItems(source).forEach { name ->
|
||||
rule.transformItem(name, source[name], this)
|
||||
|
@ -1,15 +0,0 @@
|
||||
package space.kscience.dataforge.meta
|
||||
|
||||
import org.junit.jupiter.api.Test
|
||||
import kotlin.test.assertFails
|
||||
|
||||
class JvmMutableMetaTest {
|
||||
@Test
|
||||
fun recursiveMeta(){
|
||||
val meta = MutableMeta {
|
||||
"a" put 2
|
||||
}
|
||||
|
||||
assertFails { meta["child.a"] = meta }
|
||||
}
|
||||
}
|
@ -10,17 +10,35 @@ kscience{
|
||||
useSerialization{
|
||||
protobuf()
|
||||
}
|
||||
dependencies {
|
||||
api(projects.dataforgeContext)
|
||||
api(projects.dataforgeData)
|
||||
api(projects.dataforgeIo)
|
||||
}
|
||||
dependencies(jvmTest){
|
||||
implementation(spclibs.logback.classic)
|
||||
implementation(projects.dataforgeIo.dataforgeIoYaml)
|
||||
}
|
||||
// dependencies {
|
||||
// api(projects.dataforgeContext)
|
||||
// api(projects.dataforgeData)
|
||||
// api(projects.dataforgeIo)
|
||||
// }
|
||||
// dependencies(jvmTest){
|
||||
// implementation(spclibs.logback.classic)
|
||||
// implementation(projects.dataforgeIo.dataforgeIoYaml)
|
||||
// }
|
||||
}
|
||||
|
||||
readme{
|
||||
maturity = space.kscience.gradle.Maturity.EXPERIMENTAL
|
||||
}
|
||||
|
||||
kotlin{
|
||||
sourceSets{
|
||||
commonMain{
|
||||
dependencies {
|
||||
api(projects.dataforgeContext)
|
||||
api(projects.dataforgeData)
|
||||
api(projects.dataforgeIo)
|
||||
}
|
||||
}
|
||||
getByName("jvmTest"){
|
||||
dependencies {
|
||||
implementation(spclibs.logback.classic)
|
||||
implementation(projects.dataforgeIo.dataforgeIoYaml)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -33,9 +33,12 @@ public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuild
|
||||
dependencyMeta: Meta = defaultDependencyMeta,
|
||||
selectorBuilder: P.() -> TaskReference<T>,
|
||||
): DataSet<T> {
|
||||
require(workspace.context.plugins.contains(plugin)){"Plugin $plugin is not loaded into $workspace"}
|
||||
require(workspace.context.plugins.contains(plugin)) { "Plugin $plugin is not loaded into $workspace" }
|
||||
val taskReference: TaskReference<T> = plugin.selectorBuilder()
|
||||
return workspace.produce(plugin.name + taskReference.taskName, dependencyMeta) as TaskResult<T>
|
||||
val res = workspace.produce(plugin.name + taskReference.taskName, dependencyMeta)
|
||||
//TODO add explicit check after https://youtrack.jetbrains.com/issue/KT-32956
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return res as TaskResult<T>
|
||||
}
|
||||
|
||||
/**
|
||||
@ -45,7 +48,7 @@ public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuild
|
||||
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
|
||||
* @param selectorBuilder a builder of task from the plugin.
|
||||
*/
|
||||
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
|
||||
public suspend inline fun <reified T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
|
||||
pluginFactory: PluginFactory<P>,
|
||||
dependencyMeta: Meta = defaultDependencyMeta,
|
||||
selectorBuilder: P.() -> TaskReference<T>,
|
||||
@ -53,7 +56,10 @@ public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuild
|
||||
val plugin = workspace.context.plugins[pluginFactory]
|
||||
?: error("Plugin ${pluginFactory.tag} not loaded into workspace context")
|
||||
val taskReference: TaskReference<T> = plugin.selectorBuilder()
|
||||
return workspace.produce(plugin.name + taskReference.taskName, dependencyMeta) as TaskResult<T>
|
||||
val res = workspace.produce(plugin.name + taskReference.taskName, dependencyMeta)
|
||||
//TODO explicit check after https://youtrack.jetbrains.com/issue/KT-32956
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
return res as TaskResult<T>
|
||||
}
|
||||
|
||||
public val TaskResultBuilder<*>.allData: DataSelector<*>
|
||||
|
@ -1,9 +1,6 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import io.ktor.utils.io.core.readBytes
|
||||
import io.ktor.utils.io.core.writeFully
|
||||
import kotlinx.io.*
|
||||
import kotlinx.serialization.ExperimentalSerializationApi
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.json.Json
|
||||
@ -30,10 +27,10 @@ public class JsonIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
|
||||
|
||||
override fun readObject(input: Input): T = Json.decodeFromString(serializer, input.readUtf8String())
|
||||
override fun readFrom(source: Source): T = Json.decodeFromString(serializer, source.readString())
|
||||
|
||||
override fun writeObject(output: Output, obj: T) {
|
||||
output.writeUtf8String(Json.encodeToString(serializer, obj))
|
||||
override fun writeTo(sink: Sink, obj: T) {
|
||||
sink.writeString(Json.encodeToString(serializer, obj))
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,10 +40,10 @@ public class ProtobufIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
|
||||
|
||||
override fun readObject(input: Input): T = ProtoBuf.decodeFromByteArray(serializer, input.readBytes())
|
||||
override fun readFrom(source: Source): T = ProtoBuf.decodeFromByteArray(serializer, source.readByteArray())
|
||||
|
||||
override fun writeObject(output: Output, obj: T) {
|
||||
output.writeFully(ProtoBuf.encodeToByteArray(serializer, obj))
|
||||
override fun writeTo(sink: Sink, obj: T) {
|
||||
sink.write(ProtoBuf.encodeToByteArray(serializer, obj))
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,7 +85,7 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
|
||||
val envelope = Envelope {
|
||||
meta = data.meta
|
||||
data {
|
||||
writeObject(format, result)
|
||||
writeWith(format, result)
|
||||
}
|
||||
}
|
||||
io.writeEnvelopeFile(path, envelope)
|
||||
|
@ -1,13 +1,10 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import io.ktor.utils.io.streams.asOutput
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import space.kscience.dataforge.data.DataTree
|
||||
import space.kscience.dataforge.data.DataTreeItem
|
||||
import space.kscience.dataforge.io.EnvelopeFormat
|
||||
import space.kscience.dataforge.io.IOFormat
|
||||
import space.kscience.dataforge.io.TaggedEnvelopeFormat
|
||||
import space.kscience.dataforge.io.*
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
@ -28,11 +25,15 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
|
||||
val envelope = treeItem.data.toEnvelope(dataFormat)
|
||||
val entry = ZipEntry(name)
|
||||
putNextEntry(entry)
|
||||
asOutput().run {
|
||||
envelopeFormat.writeObject(this, envelope)
|
||||
flush()
|
||||
|
||||
//TODO remove additional copy
|
||||
val bytes = ByteArray {
|
||||
writeWith(envelopeFormat, envelope)
|
||||
}
|
||||
write(bytes)
|
||||
|
||||
}
|
||||
|
||||
is DataTreeItem.Node -> {
|
||||
val entry = ZipEntry("$name/")
|
||||
putNextEntry(entry)
|
||||
|
@ -1,12 +1,18 @@
|
||||
package space.kscience.dataforge.workspace
|
||||
|
||||
import io.ktor.utils.io.core.Input
|
||||
import io.ktor.utils.io.core.Output
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import kotlinx.io.Sink
|
||||
import kotlinx.io.Source
|
||||
import kotlinx.io.readString
|
||||
import kotlinx.io.writeString
|
||||
import space.kscience.dataforge.context.Context
|
||||
import space.kscience.dataforge.context.Global
|
||||
import space.kscience.dataforge.data.*
|
||||
import space.kscience.dataforge.io.*
|
||||
import space.kscience.dataforge.io.Envelope
|
||||
import space.kscience.dataforge.io.IOFormat
|
||||
import space.kscience.dataforge.io.io
|
||||
import space.kscience.dataforge.io.readEnvelopeFile
|
||||
import space.kscience.dataforge.io.yaml.YamlPlugin
|
||||
import space.kscience.dataforge.meta.get
|
||||
import space.kscience.dataforge.misc.DFExperimental
|
||||
@ -36,11 +42,11 @@ class FileDataTest {
|
||||
object StringIOFormat : IOFormat<String> {
|
||||
override val type: KType get() = typeOf<String>()
|
||||
|
||||
override fun writeObject(output: Output, obj: String) {
|
||||
output.writeUtf8String(obj)
|
||||
override fun writeTo(sink: Sink, obj: String) {
|
||||
sink.writeString(obj)
|
||||
}
|
||||
|
||||
override fun readObject(input: Input): String = input.readUtf8String()
|
||||
override fun readFrom(source: Source): String = source.readString()
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -59,9 +65,9 @@ class FileDataTest {
|
||||
|
||||
@Test
|
||||
@DFExperimental
|
||||
fun testZipWriteRead() = with(Global.io) {
|
||||
fun testZipWriteRead() = runTest {
|
||||
with(Global.io) {
|
||||
val zip = Files.createTempFile("df_data_node", ".zip")
|
||||
runBlocking {
|
||||
dataNode.writeZip(zip, StringIOFormat)
|
||||
println(zip.toUri().toString())
|
||||
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat }
|
||||
|
@ -6,4 +6,5 @@ kotlin.mpp.stability.nowarn=true
|
||||
kotlin.incremental.js.ir=true
|
||||
kotlin.native.ignoreDisabledTargets=true
|
||||
|
||||
toolsVersion=0.14.9-kotlin-1.8.20
|
||||
toolsVersion=0.15.0-kotlin-1.9.20-RC
|
||||
#kotlin.experimental.tryK2=true
|
2
gradle/wrapper/gradle-wrapper.properties
vendored
2
gradle/wrapper/gradle-wrapper.properties
vendored
@ -1,5 +1,5 @@
|
||||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
||||
|
Loading…
Reference in New Issue
Block a user