Moving to io-2
This commit is contained in:
parent
45a5b6fe28
commit
2e643287ef
@ -1,9 +1,9 @@
|
||||
import scientifik.ScientifikExtension
|
||||
|
||||
plugins {
|
||||
id("scientifik.mpp") version "0.2.4" apply false
|
||||
id("scientifik.jvm") version "0.2.4" apply false
|
||||
id("scientifik.publish") version "0.2.4" apply false
|
||||
id("scientifik.mpp") version "0.2.5" apply false
|
||||
id("scientifik.jvm") version "0.2.5" apply false
|
||||
id("scientifik.publish") version "0.2.5" apply false
|
||||
}
|
||||
|
||||
val dataforgeVersion by extra("0.1.5-dev-3")
|
||||
@ -14,6 +14,10 @@ val githubProject by extra("dataforge-core")
|
||||
allprojects {
|
||||
group = "hep.dataforge"
|
||||
version = dataforgeVersion
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
}
|
||||
}
|
||||
|
||||
subprojects {
|
||||
|
@ -4,27 +4,30 @@ plugins {
|
||||
|
||||
description = "IO module"
|
||||
|
||||
scientifik{
|
||||
scientifik {
|
||||
withSerialization()
|
||||
withIO()
|
||||
//withIO()
|
||||
}
|
||||
|
||||
val ioVersion by rootProject.extra("0.2.0-npm-dev-2")
|
||||
|
||||
kotlin {
|
||||
sourceSets {
|
||||
commonMain{
|
||||
commonMain {
|
||||
dependencies {
|
||||
api(project(":dataforge-context"))
|
||||
api("org.jetbrains.kotlinx:kotlinx-io:$ioVersion")
|
||||
//api("org.jetbrains.kotlinx:kotlinx-io-metadata:$ioVersion")
|
||||
}
|
||||
}
|
||||
jvmMain{
|
||||
jvmMain {
|
||||
dependencies {
|
||||
|
||||
//api("org.jetbrains.kotlinx:kotlinx-io-jvm:$ioVersion")
|
||||
}
|
||||
}
|
||||
jsMain{
|
||||
dependencies{
|
||||
api(npm("text-encoding"))
|
||||
jsMain {
|
||||
dependencies {
|
||||
//api("org.jetbrains.kotlinx:kotlinx-io-js:$ioVersion")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,8 @@ import hep.dataforge.io.*
|
||||
import hep.dataforge.meta.DFExperimental
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.core.*
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.serialization.toUtf8Bytes
|
||||
|
||||
@DFExperimental
|
||||
@ -18,7 +19,7 @@ class FrontMatterEnvelopeFormat(
|
||||
var line: String = ""
|
||||
var offset = 0u
|
||||
do {
|
||||
line = readUTF8Line() ?: error("Input does not contain front matter separator")
|
||||
line = readUtf8Line() ?: error("Input does not contain front matter separator")
|
||||
offset += line.toUtf8Bytes().size.toUInt()
|
||||
} while (!line.startsWith(SEPARATOR))
|
||||
|
||||
@ -28,7 +29,7 @@ class FrontMatterEnvelopeFormat(
|
||||
|
||||
val metaBlock = buildPacket {
|
||||
do {
|
||||
line = readUTF8Line() ?: error("Input does not contain closing front matter separator")
|
||||
line = readUtf8Line() ?: error("Input does not contain closing front matter separator")
|
||||
appendln(line)
|
||||
offset += line.toUtf8Bytes().size.toUInt()
|
||||
} while (!line.startsWith(SEPARATOR))
|
||||
@ -40,7 +41,7 @@ class FrontMatterEnvelopeFormat(
|
||||
override fun Input.readObject(): Envelope {
|
||||
var line: String = ""
|
||||
do {
|
||||
line = readUTF8Line() ?: error("Input does not contain front matter separator")
|
||||
line = readUtf8Line() ?: error("Input does not contain front matter separator")
|
||||
} while (!line.startsWith(SEPARATOR))
|
||||
|
||||
val readMetaFormat =
|
||||
@ -49,7 +50,7 @@ class FrontMatterEnvelopeFormat(
|
||||
|
||||
val metaBlock = buildPacket {
|
||||
do {
|
||||
appendln(readUTF8Line() ?: error("Input does not contain closing front matter separator"))
|
||||
appendln(readUtf8Line() ?: error("Input does not contain closing front matter separator"))
|
||||
} while (!line.startsWith(SEPARATOR))
|
||||
}
|
||||
val meta = readMetaFormat.fromBytes(metaBlock)
|
||||
@ -76,7 +77,7 @@ class FrontMatterEnvelopeFormat(
|
||||
}
|
||||
|
||||
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
|
||||
val line = input.readUTF8Line(3, 30)
|
||||
val line = input.readUtf8Line(3, 30)
|
||||
return if (line != null && line.startsWith("---")) {
|
||||
invoke()
|
||||
} else {
|
||||
|
@ -8,12 +8,10 @@ import hep.dataforge.meta.DFExperimental
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.toMap
|
||||
import hep.dataforge.meta.toMeta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.plus
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.readUByte
|
||||
import kotlinx.io.core.writeText
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.readUByte
|
||||
import kotlinx.io.writeText
|
||||
import org.yaml.snakeyaml.Yaml
|
||||
import java.io.InputStream
|
||||
|
||||
@ -47,7 +45,7 @@ class YamlMetaFormat(val meta: Meta) : MetaFormat {
|
||||
companion object : MetaFormatFactory {
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = YamlMetaFormat(meta)
|
||||
|
||||
override val name: Name = super.name + "yaml"
|
||||
override val shortName = "yaml"
|
||||
|
||||
override val key: Short = 0x594d //YM
|
||||
|
||||
|
@ -1,94 +0,0 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import kotlinx.io.core.*
|
||||
import kotlin.math.min
|
||||
|
||||
/**
|
||||
* A source of binary data
|
||||
*/
|
||||
interface Binary {
|
||||
/**
|
||||
* The size of binary in bytes. [ULong.MAX_VALUE] if size is not defined and input should be read until its end is reached
|
||||
*/
|
||||
val size: ULong get() = ULong.MAX_VALUE
|
||||
|
||||
/**
|
||||
* Read continuous [Input] from this binary stating from the beginning.
|
||||
* The input is automatically closed on scope close.
|
||||
* Some implementation may forbid this to be called twice. In this case second call will throw an exception.
|
||||
*/
|
||||
fun <R> read(block: Input.() -> R): R
|
||||
|
||||
companion object {
|
||||
val EMPTY = EmptyBinary
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A [Binary] with addition random access functionality. It by default allows multiple [read] operations.
|
||||
*/
|
||||
@ExperimentalUnsignedTypes
|
||||
interface RandomAccessBinary : Binary {
|
||||
/**
|
||||
* Read at most [size] of bytes starting at [from] offset from the beginning of the binary.
|
||||
* This method could be called multiple times simultaneously.
|
||||
*
|
||||
* If size
|
||||
*/
|
||||
fun <R> read(from: UInt, size: UInt = UInt.MAX_VALUE, block: Input.() -> R): R
|
||||
|
||||
override fun <R> read(block: Input.() -> R): R = read(0.toUInt(), UInt.MAX_VALUE, block)
|
||||
}
|
||||
|
||||
fun Binary.toBytes(): ByteArray = read {
|
||||
readBytes()
|
||||
}
|
||||
|
||||
fun Binary.contentToString(): String = read {
|
||||
readText()
|
||||
}
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
fun RandomAccessBinary.readPacket(from: UInt, size: UInt): ByteReadPacket = read(from, size) {
|
||||
buildPacket { copyTo(this) }
|
||||
}
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
object EmptyBinary : RandomAccessBinary {
|
||||
|
||||
override val size: ULong = 0u
|
||||
|
||||
override fun <R> read(from: UInt, size: UInt, block: Input.() -> R): R {
|
||||
error("The binary is empty")
|
||||
}
|
||||
}
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
inline class ArrayBinary(val array: ByteArray) : RandomAccessBinary {
|
||||
override val size: ULong get() = array.size.toULong()
|
||||
|
||||
override fun <R> read(from: UInt, size: UInt, block: Input.() -> R): R {
|
||||
val theSize = min(size, array.size.toUInt() - from)
|
||||
return buildPacket {
|
||||
writeFully(array, from.toInt(), theSize.toInt())
|
||||
}.block()
|
||||
}
|
||||
}
|
||||
|
||||
fun ByteArray.asBinary() = ArrayBinary(this)
|
||||
|
||||
/**
|
||||
* Read given binary as object using given format
|
||||
*/
|
||||
fun <T : Any> Binary.readWith(format: IOFormat<T>): T = format.run {
|
||||
read {
|
||||
readObject()
|
||||
}
|
||||
}
|
||||
|
||||
//fun <T : Any> IOFormat<T>.writeBinary(obj: T): Binary {
|
||||
// val packet = buildPacket {
|
||||
// writeObject(obj)
|
||||
// }
|
||||
// return ArrayBinary(packet.readBytes())
|
||||
//}
|
@ -3,16 +3,13 @@ package hep.dataforge.io
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.values.*
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.readText
|
||||
import kotlinx.io.core.writeText
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.text.readUtf8String
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
|
||||
object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
|
||||
override val name: Name = super.name + "bin"
|
||||
override val shortName: String = "bin"
|
||||
override val key: Short = 0x4249//BI
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = this
|
||||
@ -25,7 +22,7 @@ object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
|
||||
|
||||
private fun Output.writeString(str: String) {
|
||||
writeInt(str.length)
|
||||
writeText(str)
|
||||
writeUtf8String(str)
|
||||
}
|
||||
|
||||
fun Output.writeValue(value: Value) {
|
||||
@ -93,7 +90,7 @@ object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
|
||||
|
||||
private fun Input.readString(): String {
|
||||
val length = readInt()
|
||||
return readText(max = length)
|
||||
return readUtf8String(length)
|
||||
}
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
|
@ -6,6 +6,7 @@ import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.string
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.plus
|
||||
import kotlinx.io.Binary
|
||||
|
||||
interface Envelope {
|
||||
val meta: Meta
|
||||
|
@ -1,9 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.*
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.buildPacket
|
||||
import kotlinx.io.core.readBytes
|
||||
import kotlinx.io.*
|
||||
|
||||
class EnvelopeBuilder {
|
||||
private val metaBuilder = MetaBuilder()
|
||||
@ -27,10 +25,10 @@ class EnvelopeBuilder {
|
||||
* Construct a binary and transform it into byte-array based buffer
|
||||
*/
|
||||
fun data(block: Output.() -> Unit) {
|
||||
val bytes = buildPacket {
|
||||
val bytes = buildBytes {
|
||||
block()
|
||||
}
|
||||
data = ArrayBinary(bytes.readBytes())
|
||||
data = ArrayBinary(bytes.toByteArray())
|
||||
}
|
||||
|
||||
internal fun build() = SimpleEnvelope(metaBuilder.seal(), data)
|
||||
|
@ -7,8 +7,8 @@ import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.provider.Type
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
|
@ -24,16 +24,24 @@ object EnvelopeParts {
|
||||
/**
|
||||
* Append multiple serialized envelopes to the data block. Previous data is erased if it was present
|
||||
*/
|
||||
fun EnvelopeBuilder.multipart(format: EnvelopeFormatFactory, envelopes: Collection<Envelope>) {
|
||||
@DFExperimental
|
||||
fun EnvelopeBuilder.multipart(
|
||||
envelopes: Collection<Envelope>,
|
||||
format: EnvelopeFormatFactory,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
) {
|
||||
dataType = MULTIPART_DATA_TYPE
|
||||
meta {
|
||||
SIZE_KEY put envelopes.size
|
||||
FORMAT_NAME_KEY put format.name.toString()
|
||||
if (!formatMeta.isEmpty()) {
|
||||
FORMAT_META_KEY put formatMeta
|
||||
}
|
||||
}
|
||||
data {
|
||||
format.run {
|
||||
format(formatMeta).run {
|
||||
envelopes.forEach {
|
||||
writeObject(it)
|
||||
writeEnvelope(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -43,18 +51,25 @@ fun EnvelopeBuilder.multipart(format: EnvelopeFormatFactory, envelopes: Collecti
|
||||
* Create a multipart partition in the envelope adding additional name-index mapping in meta
|
||||
*/
|
||||
@DFExperimental
|
||||
fun EnvelopeBuilder.multipart(format: EnvelopeFormatFactory, envelopes: Map<String, Envelope>) {
|
||||
fun EnvelopeBuilder.multipart(
|
||||
envelopes: Map<String, Envelope>,
|
||||
format: EnvelopeFormatFactory,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
) {
|
||||
dataType = MULTIPART_DATA_TYPE
|
||||
meta {
|
||||
SIZE_KEY put envelopes.size
|
||||
FORMAT_NAME_KEY put format.name.toString()
|
||||
if (!formatMeta.isEmpty()) {
|
||||
FORMAT_META_KEY put formatMeta
|
||||
}
|
||||
}
|
||||
data {
|
||||
format.run {
|
||||
var counter = 0
|
||||
envelopes.forEach {(key, envelope)->
|
||||
envelopes.forEach { (key, envelope) ->
|
||||
writeObject(envelope)
|
||||
meta{
|
||||
meta {
|
||||
append(INDEX_KEY, buildMeta {
|
||||
"key" put key
|
||||
"index" put counter
|
||||
@ -66,14 +81,17 @@ fun EnvelopeBuilder.multipart(format: EnvelopeFormatFactory, envelopes: Map<Stri
|
||||
}
|
||||
}
|
||||
|
||||
@DFExperimental
|
||||
fun EnvelopeBuilder.multipart(
|
||||
formatFactory: EnvelopeFormatFactory,
|
||||
formatMeta: Meta = EmptyMeta,
|
||||
builder: suspend SequenceScope<Envelope>.() -> Unit
|
||||
) = multipart(formatFactory, sequence(builder).toList())
|
||||
) = multipart(sequence(builder).toList(), formatFactory, formatMeta)
|
||||
|
||||
/**
|
||||
* If given envelope supports multipart data, return a sequence of those parts (could be empty). Otherwise return null.
|
||||
*/
|
||||
@DFExperimental
|
||||
fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Envelope>? {
|
||||
return when (dataType) {
|
||||
MULTIPART_DATA_TYPE -> {
|
||||
|
@ -10,12 +10,9 @@ import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.provider.Type
|
||||
import hep.dataforge.values.Value
|
||||
import kotlinx.io.core.*
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.buffer.Buffer
|
||||
import kotlinx.io.pool.ObjectPool
|
||||
import kotlinx.serialization.ImplicitReflectionSerializer
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.cbor.Cbor
|
||||
import kotlinx.serialization.serializer
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
@ -49,7 +46,7 @@ class ListIOFormat<T : Any>(val format: IOFormat<T>) : IOFormat<List<T>> {
|
||||
|
||||
val <T : Any> IOFormat<T>.list get() = ListIOFormat(this)
|
||||
|
||||
fun ObjectPool<IoBuffer>.fill(block: IoBuffer.() -> Unit): IoBuffer {
|
||||
fun ObjectPool<Buffer>.fill(block: Buffer.() -> Unit): Buffer {
|
||||
val buffer = borrow()
|
||||
return try {
|
||||
buffer.apply(block)
|
||||
@ -71,19 +68,11 @@ interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
|
||||
}
|
||||
}
|
||||
|
||||
@Deprecated("To be removed in io-2")
|
||||
inline fun buildPacketWithoutPool(headerSizeHint: Int = 0, block: BytePacketBuilder.() -> Unit): ByteReadPacket {
|
||||
val builder = BytePacketBuilder(headerSizeHint, IoBuffer.NoPool)
|
||||
block(builder)
|
||||
return builder.build()
|
||||
}
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): Bytes = buildBytes { writeObject(obj) }
|
||||
|
||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeObject(obj) }
|
||||
|
||||
@Deprecated("Not to be used outside tests due to double buffer write")
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeObject(obj) }.readBytes()
|
||||
@Deprecated("Not to be used outside tests due to double buffer write")
|
||||
fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T = buildPacket { writeFully(array) }.readObject()
|
||||
fun <T : Any> IOFormat<T>.writeByteArray(obj: T): ByteArray = buildBytes { writeObject(obj) }.toByteArray()
|
||||
fun <T : Any> IOFormat<T>.readByteArray(array: ByteArray): T = array.asBinary().read { readObject() }
|
||||
|
||||
object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
||||
override fun invoke(meta: Meta, context: Context): IOFormat<Double> = this
|
||||
@ -117,25 +106,10 @@ object ValueIOFormat : IOFormat<Value>, IOFormatFactory<Value> {
|
||||
}
|
||||
|
||||
/**
|
||||
* Experimental
|
||||
* Read given binary as object using given format
|
||||
*/
|
||||
@ImplicitReflectionSerializer
|
||||
class SerializerIOFormat<T : Any>(
|
||||
type: KClass<T>,
|
||||
val serializer: KSerializer<T> = type.serializer()
|
||||
) : IOFormat<T> {
|
||||
|
||||
//override val name: Name = type.simpleName?.toName() ?: EmptyName
|
||||
|
||||
|
||||
override fun Output.writeObject(obj: T) {
|
||||
val bytes = Cbor.plain.dump(serializer, obj)
|
||||
writeFully(bytes)
|
||||
}
|
||||
|
||||
override fun Input.readObject(): T {
|
||||
//FIXME reads the whole input
|
||||
val bytes = readBytes()
|
||||
return Cbor.plain.load(serializer, bytes)
|
||||
fun <T : Any> Binary.readWith(format: IOFormat<T>): T = format.run {
|
||||
read {
|
||||
readObject()
|
||||
}
|
||||
}
|
@ -20,7 +20,7 @@ class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
|
||||
metaFormatFactories.find { it.key == key }?.invoke(meta)
|
||||
|
||||
fun metaFormat(name: String, meta: Meta = EmptyMeta): MetaFormat? =
|
||||
metaFormatFactories.find { it.name.last().toString() == name }?.invoke(meta)
|
||||
metaFormatFactories.find { it.shortName == name }?.invoke(meta)
|
||||
|
||||
val envelopeFormatFactories by lazy {
|
||||
context.content<EnvelopeFormatFactory>(ENVELOPE_FORMAT_TYPE).values
|
||||
|
@ -9,15 +9,15 @@ import hep.dataforge.descriptors.ValueDescriptor
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaBase
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.NameToken
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.names.toName
|
||||
import hep.dataforge.values.*
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.readText
|
||||
import kotlinx.io.core.writeText
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.text.readUtf8String
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
|
||||
|
||||
import kotlinx.serialization.json.*
|
||||
import kotlin.collections.component1
|
||||
import kotlin.collections.component2
|
||||
@ -28,11 +28,11 @@ class JsonMetaFormat(private val json: Json = Json.indented) : MetaFormat {
|
||||
|
||||
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) {
|
||||
val jsonObject = meta.toJson(descriptor)
|
||||
writeText(json.stringify(JsonObjectSerializer, jsonObject))
|
||||
writeUtf8String(json.stringify(JsonObjectSerializer, jsonObject))
|
||||
}
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
|
||||
val str = readText()
|
||||
val str = readUtf8String()
|
||||
val jsonElement = json.parseJson(str)
|
||||
return jsonElement.toMeta()
|
||||
}
|
||||
@ -40,13 +40,13 @@ class JsonMetaFormat(private val json: Json = Json.indented) : MetaFormat {
|
||||
companion object : MetaFormatFactory {
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = default
|
||||
|
||||
override val name: Name = super.name + "json"
|
||||
override val shortName = "json"
|
||||
override val key: Short = 0x4a53//"JS"
|
||||
|
||||
private val default = JsonMetaFormat()
|
||||
|
||||
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) =
|
||||
default.run { writeMeta(meta,descriptor) }
|
||||
default.run { writeMeta(meta, descriptor) }
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta =
|
||||
default.run { readMeta(descriptor) }
|
||||
|
@ -6,8 +6,9 @@ import hep.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.provider.Type
|
||||
import kotlinx.io.core.*
|
||||
import kotlinx.io.*
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
@ -28,11 +29,13 @@ interface MetaFormat : IOFormat<Meta> {
|
||||
|
||||
@Type(META_FORMAT_TYPE)
|
||||
interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
|
||||
override val name: Name get() = "meta".asName()
|
||||
val shortName: String
|
||||
|
||||
override val name: Name get() = "meta".asName() + shortName
|
||||
|
||||
override val type: KClass<out Meta> get() = Meta::class
|
||||
|
||||
val key: Short
|
||||
val key: Short get() = name.hashCode().toShort()
|
||||
|
||||
override operator fun invoke(meta: Meta, context: Context): MetaFormat
|
||||
|
||||
@ -41,24 +44,16 @@ interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
|
||||
}
|
||||
}
|
||||
|
||||
fun Meta.toString(format: MetaFormat): String = buildPacket {
|
||||
fun Meta.toString(format: MetaFormat): String = buildBytes {
|
||||
format.run { writeObject(this@toString) }
|
||||
}.readText()
|
||||
}.toByteArray().decodeToString()
|
||||
|
||||
fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory())
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat): ByteReadPacket = buildPacket {
|
||||
format.run { writeObject(this@toBytes) }
|
||||
}
|
||||
|
||||
fun MetaFormat.parse(str: String): Meta {
|
||||
return buildPacket { writeText(str) }.readObject()
|
||||
return str.encodeToByteArray().read { readObject() }
|
||||
}
|
||||
|
||||
fun MetaFormatFactory.parse(str: String): Meta = invoke().parse(str)
|
||||
|
||||
fun MetaFormat.fromBytes(packet: ByteReadPacket): Meta {
|
||||
return packet.readObject()
|
||||
}
|
||||
fun MetaFormatFactory.parse(str: String, formatMeta: Meta): Meta = invoke(formatMeta).parse(str)
|
||||
|
||||
|
||||
|
@ -7,48 +7,51 @@ import hep.dataforge.meta.string
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.names.toName
|
||||
import kotlinx.io.charsets.Charsets
|
||||
import kotlinx.io.core.*
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.text.readRawString
|
||||
import kotlinx.io.text.writeRawString
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
@ExperimentalIoApi
|
||||
class TaggedEnvelopeFormat(
|
||||
val io: IOPlugin,
|
||||
val version: VERSION = TaggedEnvelopeFormat.VERSION.DF02
|
||||
val version: VERSION = VERSION.DF02
|
||||
) : EnvelopeFormat {
|
||||
|
||||
// private val metaFormat = io.metaFormat(metaFormatKey)
|
||||
// ?: error("Meta format with key $metaFormatKey could not be resolved in $io")
|
||||
|
||||
|
||||
private fun Tag.toBytes(): ByteReadPacket = buildPacket(24) {
|
||||
writeText(START_SEQUENCE)
|
||||
writeText(version.name)
|
||||
private fun Tag.toBytes() = buildBytes(24) {
|
||||
writeRawString(START_SEQUENCE)
|
||||
writeRawString(version.name)
|
||||
writeShort(metaFormatKey)
|
||||
writeUInt(metaSize)
|
||||
when (version) {
|
||||
TaggedEnvelopeFormat.VERSION.DF02 -> {
|
||||
VERSION.DF02 -> {
|
||||
writeUInt(dataSize.toUInt())
|
||||
}
|
||||
TaggedEnvelopeFormat.VERSION.DF03 -> {
|
||||
VERSION.DF03 -> {
|
||||
writeULong(dataSize)
|
||||
}
|
||||
}
|
||||
writeText(END_SEQUENCE)
|
||||
writeRawString(END_SEQUENCE)
|
||||
}
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) {
|
||||
val metaFormat = metaFormatFactory.invoke(formatMeta, io.context)
|
||||
val metaBytes = metaFormat.writePacket(envelope.meta)
|
||||
val metaBytes = metaFormat.writeBytes(envelope.meta)
|
||||
val actualSize: ULong = if (envelope.data == null) {
|
||||
0u
|
||||
0
|
||||
} else {
|
||||
envelope.data?.size ?: ULong.MAX_VALUE
|
||||
envelope.data?.size ?: Binary.INFINITE
|
||||
}.toULong()
|
||||
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
|
||||
writeBinary(tag.toBytes())
|
||||
writeBinary(metaBytes)
|
||||
writeRawString("\r\n")
|
||||
envelope.data?.let {
|
||||
writeBinary(it)
|
||||
}
|
||||
val tag = Tag(metaFormatFactory.key, metaBytes.remaining.toUInt() + 2u, actualSize)
|
||||
writePacket(tag.toBytes())
|
||||
writePacket(metaBytes)
|
||||
writeText("\r\n")
|
||||
envelope.data?.read { copyTo(this@writeEnvelope) }
|
||||
flush()
|
||||
}
|
||||
|
||||
@ -64,14 +67,17 @@ class TaggedEnvelopeFormat(
|
||||
val metaFormat = io.metaFormat(tag.metaFormatKey)
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaBytes = readBytes(tag.metaSize.toInt())
|
||||
val metaPacket = buildPacket {
|
||||
writeFully(metaBytes)
|
||||
val meta: Meta = limit(tag.metaSize.toInt()).use {
|
||||
metaFormat.run {
|
||||
readObject()
|
||||
}
|
||||
}
|
||||
val dataBytes = readBytes(tag.dataSize.toInt())
|
||||
|
||||
val meta = metaFormat.run { metaPacket.readObject() }
|
||||
return SimpleEnvelope(meta, ArrayBinary(dataBytes))
|
||||
val data = buildBytes {
|
||||
writeInput(this@readObject, tag.dataSize.toInt())
|
||||
}
|
||||
|
||||
return SimpleEnvelope(meta, data)
|
||||
}
|
||||
|
||||
override fun Input.readPartial(): PartialEnvelope {
|
||||
@ -80,8 +86,11 @@ class TaggedEnvelopeFormat(
|
||||
val metaFormat = io.metaFormat(tag.metaFormatKey)
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
|
||||
val meta = metaFormat.run { metaPacket.readObject() }
|
||||
val meta: Meta = limit(tag.metaSize.toInt()).run {
|
||||
metaFormat.run {
|
||||
readObject()
|
||||
}
|
||||
}
|
||||
|
||||
return PartialEnvelope(meta, version.tagSize + tag.metaSize, tag.dataSize)
|
||||
}
|
||||
@ -107,16 +116,16 @@ class TaggedEnvelopeFormat(
|
||||
val io = context.io
|
||||
|
||||
val metaFormatName = meta["name"].string?.toName() ?: JsonMetaFormat.name
|
||||
val metaFormatFactory = io.metaFormatFactories.find { it.name == metaFormatName }
|
||||
?: error("Meta format could not be resolved")
|
||||
//Check if appropriate factory exists
|
||||
io.metaFormatFactories.find { it.name == metaFormatName } ?: error("Meta format could not be resolved")
|
||||
|
||||
return TaggedEnvelopeFormat(io)
|
||||
}
|
||||
|
||||
private fun Input.readTag(version: VERSION): Tag {
|
||||
val start = readTextExactBytes(2, charset = Charsets.ISO_8859_1)
|
||||
val start = readRawString(2)
|
||||
if (start != START_SEQUENCE) error("The input is not an envelope")
|
||||
val versionString = readTextExactBytes(4, charset = Charsets.ISO_8859_1)
|
||||
val versionString = readRawString(4)
|
||||
if (version.name != versionString) error("Wrong version of DataForge: expected $version but found $versionString")
|
||||
val metaFormatKey = readShort()
|
||||
val metaLength = readUInt()
|
||||
@ -124,14 +133,14 @@ class TaggedEnvelopeFormat(
|
||||
VERSION.DF02 -> readUInt().toULong()
|
||||
VERSION.DF03 -> readULong()
|
||||
}
|
||||
val end = readTextExactBytes(4, charset = Charsets.ISO_8859_1)
|
||||
val end = readRawString(4)
|
||||
if (end != END_SEQUENCE) error("The input is not an envelope")
|
||||
return Tag(metaFormatKey, metaLength, dataLength)
|
||||
}
|
||||
|
||||
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
|
||||
return try {
|
||||
val header = input.readTextExactBytes(6)
|
||||
val header = input.readRawString(6)
|
||||
when (header.substring(2..5)) {
|
||||
VERSION.DF02.name -> TaggedEnvelopeFormat(io, VERSION.DF02)
|
||||
VERSION.DF03.name -> TaggedEnvelopeFormat(io, VERSION.DF03)
|
||||
|
@ -3,9 +3,14 @@ package hep.dataforge.io
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.asName
|
||||
import kotlinx.io.core.*
|
||||
import kotlinx.io.*
|
||||
import kotlinx.io.text.readRawString
|
||||
import kotlinx.io.text.readUtf8Line
|
||||
import kotlinx.io.text.writeRawString
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
import kotlinx.serialization.toUtf8Bytes
|
||||
|
||||
@ExperimentalIoApi
|
||||
class TaglessEnvelopeFormat(
|
||||
val io: IOPlugin,
|
||||
meta: Meta = EmptyMeta
|
||||
@ -15,47 +20,46 @@ class TaglessEnvelopeFormat(
|
||||
private val dataStart = meta[DATA_START_PROPERTY].string ?: DEFAULT_DATA_START
|
||||
|
||||
private fun Output.writeProperty(key: String, value: Any) {
|
||||
writeText("#? $key: $value;\r\n")
|
||||
writeUtf8String("#? $key: $value;\r\n")
|
||||
}
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) {
|
||||
val metaFormat = metaFormatFactory(formatMeta, io.context)
|
||||
|
||||
//printing header
|
||||
writeText(TAGLESS_ENVELOPE_HEADER + "\r\n")
|
||||
writeRawString(TAGLESS_ENVELOPE_HEADER + "\r\n")
|
||||
|
||||
//printing all properties
|
||||
writeProperty(META_TYPE_PROPERTY, metaFormatFactory.type)
|
||||
writeProperty(META_TYPE_PROPERTY, metaFormatFactory.shortName)
|
||||
//TODO add optional metaFormat properties
|
||||
val actualSize: ULong = if (envelope.data == null) {
|
||||
0u
|
||||
val actualSize: Int = if (envelope.data == null) {
|
||||
0
|
||||
} else {
|
||||
envelope.data?.size ?: ULong.MAX_VALUE
|
||||
envelope.data?.size ?: Binary.INFINITE
|
||||
}
|
||||
|
||||
writeProperty(DATA_LENGTH_PROPERTY, actualSize)
|
||||
|
||||
//Printing meta
|
||||
if (!envelope.meta.isEmpty()) {
|
||||
val metaBytes = metaFormat.writePacket(envelope.meta)
|
||||
writeProperty(META_LENGTH_PROPERTY, metaBytes.remaining)
|
||||
writeText(metaStart + "\r\n")
|
||||
writePacket(metaBytes)
|
||||
writeText("\r\n")
|
||||
val metaBytes = metaFormat.writeBytes(envelope.meta)
|
||||
writeProperty(META_LENGTH_PROPERTY, metaBytes.size + 2)
|
||||
writeUtf8String(metaStart + "\r\n")
|
||||
writeBinary(metaBytes)
|
||||
writeUtf8String("\r\n")
|
||||
}
|
||||
|
||||
//Printing data
|
||||
envelope.data?.let { data ->
|
||||
writeText(dataStart + "\r\n")
|
||||
writeFully(data.toBytes())
|
||||
writeUtf8String(dataStart + "\r\n")
|
||||
writeBinary(data)
|
||||
}
|
||||
flush()
|
||||
}
|
||||
|
||||
override fun Input.readObject(): Envelope {
|
||||
var line: String = ""
|
||||
var line: String
|
||||
do {
|
||||
line = readUTF8Line() ?: error("Input does not contain tagless envelope header")
|
||||
line = readUtf8Line() // ?: error("Input does not contain tagless envelope header")
|
||||
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
|
||||
val properties = HashMap<String, String>()
|
||||
|
||||
@ -67,19 +71,23 @@ class TaglessEnvelopeFormat(
|
||||
val (key, value) = match.destructured
|
||||
properties[key] = value
|
||||
}
|
||||
line = readUTF8Line() ?: return SimpleEnvelope(Meta.empty, null)
|
||||
try {
|
||||
line = readUtf8Line()
|
||||
} catch (ex: EOFException) {
|
||||
//If can't read line, return envelope without data
|
||||
return SimpleEnvelope(Meta.empty, null)
|
||||
}
|
||||
}
|
||||
|
||||
var meta: Meta = EmptyMeta
|
||||
|
||||
if (line.startsWith(metaStart)) {
|
||||
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat
|
||||
val metaSize = properties.get(META_LENGTH_PROPERTY)?.toInt()
|
||||
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
|
||||
meta = if (metaSize != null) {
|
||||
val metaPacket = buildPacket {
|
||||
writeFully(readBytes(metaSize))
|
||||
limit(metaSize).run {
|
||||
metaFormat.run { readObject() }
|
||||
}
|
||||
metaFormat.run { metaPacket.readObject() }
|
||||
} else {
|
||||
metaFormat.run {
|
||||
readObject()
|
||||
@ -88,17 +96,22 @@ class TaglessEnvelopeFormat(
|
||||
}
|
||||
|
||||
do {
|
||||
line = readUTF8Line() ?: return SimpleEnvelope(meta, null)
|
||||
//returning an Envelope without data if end of input is reached
|
||||
try {
|
||||
line = readUtf8Line()
|
||||
} catch (ex: EOFException) {
|
||||
//returning an Envelope without data if end of input is reached
|
||||
return SimpleEnvelope(meta, null)
|
||||
}
|
||||
} while (!line.startsWith(dataStart))
|
||||
|
||||
val data: Binary? = if (properties.containsKey(DATA_LENGTH_PROPERTY)) {
|
||||
val bytes = ByteArray(properties[DATA_LENGTH_PROPERTY]!!.toInt())
|
||||
readFully(bytes)
|
||||
readArray(bytes)
|
||||
bytes.asBinary()
|
||||
} else {
|
||||
val bytes = readBytes()
|
||||
bytes.asBinary()
|
||||
buildBytes {
|
||||
writeInput(this@readObject)
|
||||
}
|
||||
}
|
||||
|
||||
return SimpleEnvelope(meta, data)
|
||||
@ -106,9 +119,9 @@ class TaglessEnvelopeFormat(
|
||||
|
||||
override fun Input.readPartial(): PartialEnvelope {
|
||||
var offset = 0u
|
||||
var line: String = ""
|
||||
var line: String
|
||||
do {
|
||||
line = readUTF8Line() ?: error("Input does not contain tagless envelope header")
|
||||
line = readUtf8Line()// ?: error("Input does not contain tagless envelope header")
|
||||
offset += line.toUtf8Bytes().size.toUInt()
|
||||
} while (!line.startsWith(TAGLESS_ENVELOPE_HEADER))
|
||||
val properties = HashMap<String, String>()
|
||||
@ -121,29 +134,31 @@ class TaglessEnvelopeFormat(
|
||||
val (key, value) = match.destructured
|
||||
properties[key] = value
|
||||
}
|
||||
line = readUTF8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong())
|
||||
offset += line.toUtf8Bytes().size.toUInt()
|
||||
try {
|
||||
line = readUtf8Line()
|
||||
offset += line.toUtf8Bytes().size.toUInt()
|
||||
} catch (ex: EOFException) {
|
||||
return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong())
|
||||
}
|
||||
}
|
||||
|
||||
var meta: Meta = EmptyMeta
|
||||
|
||||
if (line.startsWith(metaStart)) {
|
||||
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat
|
||||
|
||||
val metaSize = properties.get(META_LENGTH_PROPERTY)?.toInt()
|
||||
val metaSize = properties[META_LENGTH_PROPERTY]?.toInt()
|
||||
meta = if (metaSize != null) {
|
||||
val metaPacket = buildPacket {
|
||||
writeFully(readBytes(metaSize))
|
||||
}
|
||||
offset += metaSize.toUInt()
|
||||
metaFormat.run { metaPacket.readObject() }
|
||||
limit(metaSize).run {
|
||||
metaFormat.run { readObject() }
|
||||
}
|
||||
} else {
|
||||
error("Can't partially read an envelope with undefined meta size")
|
||||
}
|
||||
}
|
||||
|
||||
do {
|
||||
line = readUTF8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong())
|
||||
line = readUtf8Line() ?: return PartialEnvelope(Meta.empty, offset.toUInt(), 0.toULong())
|
||||
offset += line.toUtf8Bytes().size.toUInt()
|
||||
//returning an Envelope without data if end of input is reached
|
||||
} while (!line.startsWith(dataStart))
|
||||
@ -190,9 +205,8 @@ class TaglessEnvelopeFormat(
|
||||
|
||||
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
|
||||
return try {
|
||||
val buffer = ByteArray(TAGLESS_ENVELOPE_HEADER.length)
|
||||
input.readFully(buffer)
|
||||
return if (String(buffer) == TAGLESS_ENVELOPE_HEADER) {
|
||||
val string = input.readRawString(TAGLESS_ENVELOPE_HEADER.length)
|
||||
return if (string == TAGLESS_ENVELOPE_HEADER) {
|
||||
TaglessEnvelopeFormat(io)
|
||||
} else {
|
||||
null
|
||||
|
@ -13,6 +13,7 @@ import kotlinx.serialization.json.JsonOutput
|
||||
|
||||
|
||||
@Serializer(Value::class)
|
||||
@UseExperimental(InternalSerializationApi::class)
|
||||
object ValueSerializer : KSerializer<Value> {
|
||||
private val valueTypeSerializer = EnumSerializer(ValueType::class)
|
||||
private val listSerializer by lazy { ArrayListSerializer(ValueSerializer) }
|
||||
|
@ -55,6 +55,7 @@ inline class SerialDescriptorBuilder(private val impl: SerialClassDescImpl) {
|
||||
fun doubleArray(name: String, isOptional: Boolean = false, vararg annotations: Annotation) =
|
||||
element(name, DoubleArraySerializer.descriptor, isOptional, *annotations)
|
||||
|
||||
@UseExperimental(InternalSerializationApi::class)
|
||||
inline fun <reified E : Enum<E>> enum(name: String, isOptional: Boolean = false, vararg annotations: Annotation) =
|
||||
element(name, EnumSerializer(E::class).descriptor, isOptional, *annotations)
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import kotlinx.io.readDouble
|
||||
import kotlinx.io.writeDouble
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@ -12,16 +14,18 @@ class EnvelopeFormatTest {
|
||||
}
|
||||
data{
|
||||
writeDouble(22.2)
|
||||
// repeat(2000){
|
||||
// writeInt(it)
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
@ExperimentalStdlibApi
|
||||
@Test
|
||||
fun testTaggedFormat(){
|
||||
TaggedEnvelopeFormat.run {
|
||||
val bytes = writeBytes(envelope)
|
||||
println(bytes.decodeToString())
|
||||
val res = readBytes(bytes)
|
||||
val byteArray = this.writeByteArray(envelope)
|
||||
println(byteArray.decodeToString())
|
||||
val res = readByteArray(byteArray)
|
||||
assertEquals(envelope.meta,res.meta)
|
||||
val double = res.data?.read {
|
||||
readDouble()
|
||||
@ -33,9 +37,9 @@ class EnvelopeFormatTest {
|
||||
@Test
|
||||
fun testTaglessFormat(){
|
||||
TaglessEnvelopeFormat.run {
|
||||
val bytes = writeBytes(envelope)
|
||||
println(bytes.decodeToString())
|
||||
val res = readBytes(bytes)
|
||||
val byteArray = writeByteArray(envelope)
|
||||
println(byteArray.decodeToString())
|
||||
val res = readByteArray(byteArray)
|
||||
assertEquals(envelope.meta,res.meta)
|
||||
val double = res.data?.read {
|
||||
readDouble()
|
||||
|
@ -1,12 +1,22 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.*
|
||||
import kotlinx.io.Bytes
|
||||
import kotlinx.io.buildBytes
|
||||
import kotlinx.serialization.json.JsonPrimitive
|
||||
import kotlinx.serialization.json.json
|
||||
import kotlinx.serialization.json.jsonArray
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat): Bytes = buildBytes {
|
||||
format.run { writeObject(this@toBytes) }
|
||||
}
|
||||
|
||||
fun MetaFormat.fromBytes(packet: Bytes): Meta {
|
||||
return packet.read { readObject() }
|
||||
}
|
||||
|
||||
class MetaFormatTest {
|
||||
@Test
|
||||
fun testBinaryMetaFormat() {
|
||||
|
@ -5,8 +5,6 @@ import hep.dataforge.io.serialization.MetaSerializer
|
||||
import hep.dataforge.io.serialization.NameSerializer
|
||||
import hep.dataforge.meta.buildMeta
|
||||
import hep.dataforge.names.toName
|
||||
import kotlinx.io.charsets.Charsets
|
||||
import kotlinx.io.core.String
|
||||
import kotlinx.serialization.cbor.Cbor
|
||||
import kotlinx.serialization.json.Json
|
||||
import kotlin.test.Test
|
||||
@ -41,7 +39,7 @@ class MetaSerializerTest {
|
||||
}
|
||||
|
||||
val bytes = Cbor.dump(MetaSerializer, meta)
|
||||
println(String(bytes, charset = Charsets.ISO_8859_1))
|
||||
println(bytes.contentToString())
|
||||
val restored = Cbor.load(MetaSerializer, bytes)
|
||||
assertEquals(restored, meta)
|
||||
}
|
||||
|
@ -1,33 +1,38 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.DFExperimental
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.int
|
||||
import kotlinx.io.core.writeText
|
||||
import kotlinx.io.text.writeUtf8String
|
||||
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class EnvelopePartsTest {
|
||||
@DFExperimental
|
||||
class MultipartTest {
|
||||
val envelopes = (0..5).map {
|
||||
Envelope {
|
||||
meta {
|
||||
"value" put it
|
||||
}
|
||||
data {
|
||||
writeText("Hello World $it")
|
||||
repeat(200){
|
||||
writeUtf8String("Hello World $it")
|
||||
repeat(2000) {
|
||||
writeInt(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
val partsEnvelope = Envelope {
|
||||
multipart(TaggedEnvelopeFormat, envelopes)
|
||||
multipart(envelopes, TaggedEnvelopeFormat)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testParts() {
|
||||
val bytes = TaggedEnvelopeFormat.writeBytes(partsEnvelope)
|
||||
val reconstructed = TaggedEnvelopeFormat.readBytes(bytes)
|
||||
val bytes = TaggedEnvelopeFormat.writeByteArray(partsEnvelope)
|
||||
assertTrue { bytes.size > envelopes.sumBy { it.data!!.size.toInt() } }
|
||||
val reconstructed = TaggedEnvelopeFormat.readByteArray(bytes)
|
||||
val parts = reconstructed.parts()?.toList() ?: emptyList()
|
||||
assertEquals(2, parts[2].meta["value"].int)
|
||||
println(reconstructed.data!!.size)
|
@ -1,31 +0,0 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.buildPacket
|
||||
import java.nio.channels.FileChannel
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.math.min
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
class FileBinary(val path: Path, private val offset: UInt = 0u, size: ULong? = null) : RandomAccessBinary {
|
||||
|
||||
override val size: ULong = size ?: (Files.size(path).toULong() - offset).toULong()
|
||||
|
||||
init {
|
||||
if( size != null && Files.size(path) < offset.toLong() + size.toLong()){
|
||||
error("Can't read binary from file. File is to short.")
|
||||
}
|
||||
}
|
||||
|
||||
override fun <R> read(from: UInt, size: UInt, block: Input.() -> R): R {
|
||||
FileChannel.open(path, StandardOpenOption.READ).use {
|
||||
val theSize: UInt = min(size, Files.size(path).toUInt() - offset)
|
||||
val buffer = it.map(FileChannel.MapMode.READ_ONLY, (from + offset).toLong(), theSize.toLong())
|
||||
return buildPacket { writeFully(buffer) }.block()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun Path.asBinary(offset: UInt = 0u, size: ULong? = null): FileBinary = FileBinary(this, offset, size)
|
@ -1,23 +1,20 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.nio.asInput
|
||||
import java.nio.file.Files
|
||||
import kotlinx.io.Binary
|
||||
import kotlinx.io.FileBinary
|
||||
import kotlinx.io.read
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
|
||||
class FileEnvelope internal constructor(val path: Path, val format: EnvelopeFormat) : Envelope {
|
||||
//TODO do not like this constructor. Hope to replace it later
|
||||
|
||||
private val partialEnvelope: PartialEnvelope
|
||||
|
||||
init {
|
||||
val input = Files.newByteChannel(path, StandardOpenOption.READ).asInput()
|
||||
partialEnvelope = format.run { input.use { it.readPartial() } }
|
||||
private val partialEnvelope: PartialEnvelope = path.read {
|
||||
format.run { readPartial() }
|
||||
}
|
||||
|
||||
override val meta: Meta get() = partialEnvelope.meta
|
||||
|
||||
override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset, partialEnvelope.dataSize)
|
||||
override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset.toInt(), partialEnvelope.dataSize?.toInt())
|
||||
}
|
||||
|
||||
|
@ -5,13 +5,9 @@ import hep.dataforge.meta.DFExperimental
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.isEmpty
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.copyTo
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
import kotlinx.io.*
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.reflect.full.isSuperclassOf
|
||||
import kotlin.streams.asSequence
|
||||
|
||||
@ -23,7 +19,6 @@ inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
|
||||
return ioFormats.values.find { it.type.isSuperclassOf(T::class) } as IOFormat<T>?
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Read file containing meta using given [formatOverride] or file extension to infer meta type.
|
||||
* If [path] is a directory search for file starting with `meta` in it
|
||||
@ -41,7 +36,9 @@ fun IOPlugin.readMetaFile(path: Path, formatOverride: MetaFormat? = null, descri
|
||||
|
||||
val metaFormat = formatOverride ?: metaFormat(extension) ?: error("Can't resolve meta format $extension")
|
||||
return metaFormat.run {
|
||||
Files.newByteChannel(actualPath, StandardOpenOption.READ).asInput().use { it.readMeta(descriptor) }
|
||||
actualPath.read{
|
||||
readMeta(descriptor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,8 +58,8 @@ fun IOPlugin.writeMetaFile(
|
||||
path
|
||||
}
|
||||
metaFormat.run {
|
||||
Files.newByteChannel(actualPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW).asOutput().use {
|
||||
it.writeMeta(meta, descriptor)
|
||||
actualPath.write{
|
||||
writeMeta(meta, descriptor)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -139,24 +136,12 @@ fun IOPlugin.readEnvelopeFile(
|
||||
} else null
|
||||
}
|
||||
|
||||
private fun Path.useOutput(consumer: Output.() -> Unit) {
|
||||
//TODO forbid rewrite?
|
||||
Files.newByteChannel(
|
||||
this,
|
||||
StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING
|
||||
).asOutput().use {
|
||||
it.consumer()
|
||||
it.flush()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a binary into file. Throws an error if file already exists
|
||||
*/
|
||||
fun <T : Any> IOFormat<T>.writeToFile(path: Path, obj: T) {
|
||||
path.useOutput {
|
||||
path.write {
|
||||
writeObject(obj)
|
||||
flush()
|
||||
}
|
||||
}
|
||||
|
||||
@ -170,7 +155,7 @@ fun IOPlugin.writeEnvelopeFile(
|
||||
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
metaFormat: MetaFormatFactory? = null
|
||||
) {
|
||||
path.useOutput {
|
||||
path.write {
|
||||
with(envelopeFormat) {
|
||||
writeEnvelope(envelope, metaFormat ?: envelopeFormat.defaultMetaFormat)
|
||||
}
|
||||
@ -196,10 +181,10 @@ fun IOPlugin.writeEnvelopeDirectory(
|
||||
writeMetaFile(path, envelope.meta, metaFormat)
|
||||
}
|
||||
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
|
||||
dataFile.useOutput {
|
||||
dataFile.write {
|
||||
envelope.data?.read {
|
||||
val copied = copyTo(this@useOutput)
|
||||
if (envelope.data?.size != ULong.MAX_VALUE && copied != envelope.data?.size?.toLong()) {
|
||||
val copied = writeInput(this)
|
||||
if (envelope.data?.size != Binary.INFINITE && copied != envelope.data?.size) {
|
||||
error("The number of copied bytes does not equal data size")
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,6 @@ import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.asCoroutineDispatcher
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.io.streams.writePacket
|
||||
import java.net.Socket
|
||||
import java.util.concurrent.Executors
|
||||
import kotlin.time.ExperimentalTime
|
||||
@ -52,14 +51,14 @@ class EnvelopeClient(
|
||||
override suspend fun respond(request: Envelope): Envelope = withContext(dispatcher) {
|
||||
//val address = InetSocketAddress(host,port)
|
||||
val socket = Socket(host, port)
|
||||
val input = socket.getInputStream().asInput()
|
||||
val output = socket.getOutputStream()
|
||||
val inputStream = socket.getInputStream()
|
||||
val outputStream = socket.getOutputStream()
|
||||
format.run {
|
||||
output.writePacket {
|
||||
outputStream.write {
|
||||
writeObject(request)
|
||||
}
|
||||
logger.debug { "Sent request with type ${request.type} to ${socket.remoteSocketAddress}" }
|
||||
val res = input.readObject()
|
||||
val res = inputStream.readBlocking { readObject() }
|
||||
logger.debug { "Received response with type ${res.type} from ${socket.remoteSocketAddress}" }
|
||||
return@withContext res
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ import hep.dataforge.io.type
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.io.streams.writePacket
|
||||
import java.net.ServerSocket
|
||||
import java.net.Socket
|
||||
import kotlin.concurrent.thread
|
||||
@ -71,11 +70,11 @@ class EnvelopeServer(
|
||||
|
||||
private fun readSocket(socket: Socket) {
|
||||
thread {
|
||||
val input = socket.getInputStream().asInput()
|
||||
val inputStream = socket.getInputStream()
|
||||
val outputStream = socket.getOutputStream()
|
||||
format.run {
|
||||
while (socket.isConnected) {
|
||||
val request = input.readObject()
|
||||
val request = inputStream.readBlocking { readObject() }
|
||||
logger.debug { "Accepted request with type ${request.type} from ${socket.remoteSocketAddress}" }
|
||||
if (request.type == SHUTDOWN_ENVELOPE_TYPE) {
|
||||
//Echo shutdown command
|
||||
@ -86,7 +85,7 @@ class EnvelopeServer(
|
||||
}
|
||||
runBlocking {
|
||||
val response = responder.respond(request)
|
||||
outputStream.writePacket {
|
||||
outputStream.write {
|
||||
writeObject(response)
|
||||
}
|
||||
logger.debug { "Sent response with type ${response.type} to ${socket.remoteSocketAddress}" }
|
||||
|
@ -1,32 +0,0 @@
|
||||
package hep.dataforge.io.tcp
|
||||
|
||||
import kotlinx.io.core.AbstractInput
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.IoBuffer
|
||||
import kotlinx.io.core.writePacket
|
||||
import kotlinx.io.streams.readPacketAtMost
|
||||
import java.io.InputStream
|
||||
|
||||
/**
|
||||
* Modified version of InputStream to Input converter that supports waiting for input
|
||||
*/
|
||||
internal class InputStreamAsInput(
|
||||
private val stream: InputStream
|
||||
) : AbstractInput(pool = IoBuffer.Pool) {
|
||||
|
||||
|
||||
override fun fill(): IoBuffer? {
|
||||
val packet = stream.readPacketAtMost(4096)
|
||||
return pool.borrow().apply {
|
||||
resetForWrite(4096)
|
||||
writePacket(packet)
|
||||
}
|
||||
}
|
||||
|
||||
override fun closeSource() {
|
||||
stream.close()
|
||||
}
|
||||
}
|
||||
|
||||
fun InputStream.asInput(): Input =
|
||||
InputStreamAsInput(this)
|
@ -0,0 +1,62 @@
|
||||
package hep.dataforge.io.tcp
|
||||
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.asBinary
|
||||
import kotlinx.io.buffer.Buffer
|
||||
import kotlinx.io.buffer.get
|
||||
import kotlinx.io.buffer.set
|
||||
import java.io.InputStream
|
||||
import java.io.OutputStream
|
||||
|
||||
private class InputStreamInput(val source: InputStream, val waitForInput: Boolean = false) : Input() {
|
||||
override fun closeSource() {
|
||||
source.close()
|
||||
}
|
||||
|
||||
override fun fill(buffer: Buffer): Int {
|
||||
if (waitForInput) {
|
||||
while (source.available() == 0) {
|
||||
//block until input is available
|
||||
}
|
||||
}
|
||||
var bufferPos = 0
|
||||
do {
|
||||
val byte = source.read()
|
||||
buffer[bufferPos] = byte.toByte()
|
||||
bufferPos++
|
||||
} while (byte > 0 && bufferPos < buffer.size && source.available() > 0)
|
||||
return bufferPos
|
||||
}
|
||||
}
|
||||
|
||||
private class OutputStreamOutput(val out: OutputStream) : Output() {
|
||||
override fun flush(source: Buffer, length: Int) {
|
||||
for (i in 0..length) {
|
||||
out.write(source[i].toInt())
|
||||
}
|
||||
out.flush()
|
||||
}
|
||||
|
||||
override fun closeSource() {
|
||||
out.flush()
|
||||
out.close()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fun <R> InputStream.read(size: Int, block: Input.() -> R): R {
|
||||
val buffer = ByteArray(size)
|
||||
read(buffer)
|
||||
return buffer.asBinary().read(block)
|
||||
}
|
||||
|
||||
fun <R> InputStream.read(block: Input.() -> R): R =
|
||||
InputStreamInput(this, false).block()
|
||||
|
||||
fun <R> InputStream.readBlocking(block: Input.() -> R): R =
|
||||
InputStreamInput(this, true).block()
|
||||
|
||||
fun OutputStream.write(block: Output.() -> Unit) {
|
||||
OutputStreamOutput(this).block()
|
||||
}
|
@ -1,6 +1,9 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Global
|
||||
import kotlinx.io.asBinary
|
||||
import kotlinx.io.toByteArray
|
||||
import kotlinx.io.writeDouble
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
@ -21,11 +24,11 @@ class FileBinaryTest {
|
||||
@Test
|
||||
fun testSize() {
|
||||
val binary = envelope.data
|
||||
assertEquals(binary?.size?.toInt(), binary?.toBytes()?.size)
|
||||
assertEquals(binary?.size?.toInt(), binary?.toByteArray()?.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testFileData(){
|
||||
fun testFileData() {
|
||||
val dataFile = Files.createTempFile("dataforge_test_bin", ".bin")
|
||||
dataFile.toFile().writeText("This is my binary")
|
||||
val envelopeFromFile = Envelope {
|
||||
@ -34,12 +37,12 @@ class FileBinaryTest {
|
||||
"b" put 22.2
|
||||
}
|
||||
dataType = "hep.dataforge.satellite"
|
||||
dataID = "cellDepositTest" // добавил только что
|
||||
dataID = "cellDepositTest"
|
||||
data = dataFile.asBinary()
|
||||
}
|
||||
val binary = envelopeFromFile.data!!
|
||||
println(binary.toBytes().size)
|
||||
assertEquals(binary.size?.toInt(), binary.toBytes().size)
|
||||
println(binary.toByteArray().size)
|
||||
assertEquals(binary.size.toInt(), binary.toByteArray().size)
|
||||
|
||||
}
|
||||
|
||||
@ -50,7 +53,6 @@ class FileBinaryTest {
|
||||
Global.io.writeEnvelopeFile(tmpPath, envelope)
|
||||
|
||||
val binary = Global.io.readEnvelopeFile(tmpPath)?.data!!
|
||||
assertEquals(binary.size.toInt(), binary.toBytes().size)
|
||||
assertEquals(binary.size.toInt(), binary.toByteArray().size)
|
||||
}
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Global
|
||||
import kotlinx.io.writeDouble
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertTrue
|
||||
|
@ -4,11 +4,12 @@ import hep.dataforge.context.Global
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.Responder
|
||||
import hep.dataforge.io.TaggedEnvelopeFormat
|
||||
import hep.dataforge.io.writeBytes
|
||||
import hep.dataforge.io.writeByteArray
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.junit.jupiter.api.AfterAll
|
||||
import org.junit.jupiter.api.BeforeAll
|
||||
import kotlinx.io.writeDouble
|
||||
import org.junit.AfterClass
|
||||
import org.junit.BeforeClass
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.time.ExperimentalTime
|
||||
@ -16,7 +17,7 @@ import kotlin.time.ExperimentalTime
|
||||
@ExperimentalStdlibApi
|
||||
object EchoResponder : Responder {
|
||||
override suspend fun respond(request: Envelope): Envelope {
|
||||
val string = TaggedEnvelopeFormat().run { writeBytes(request).decodeToString() }
|
||||
val string = TaggedEnvelopeFormat().run { writeByteArray(request).decodeToString() }
|
||||
println("ECHO:")
|
||||
println(string)
|
||||
return request
|
||||
@ -30,20 +31,20 @@ class EnvelopeServerTest {
|
||||
@JvmStatic
|
||||
val echoEnvelopeServer = EnvelopeServer(Global, 7778, EchoResponder, GlobalScope)
|
||||
|
||||
@BeforeAll
|
||||
@BeforeClass
|
||||
@JvmStatic
|
||||
fun start() {
|
||||
echoEnvelopeServer.start()
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
@AfterClass
|
||||
@JvmStatic
|
||||
fun close() {
|
||||
echoEnvelopeServer.stop()
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@Test(timeout = 1000)
|
||||
fun doEchoTest() {
|
||||
val request = Envelope.invoke {
|
||||
type = "test.echo"
|
||||
|
@ -9,7 +9,7 @@ import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class TextOutput(override val context: Context, private val output: kotlinx.io.core.Output) : Output<Any> {
|
||||
class TextOutput(override val context: Context, private val output: kotlinx.io.Output) : Output<Any> {
|
||||
private val cache = HashMap<KClass<*>, TextRenderer>()
|
||||
|
||||
/**
|
||||
@ -40,7 +40,7 @@ class TextOutput(override val context: Context, private val output: kotlinx.io.c
|
||||
}
|
||||
|
||||
/**
|
||||
* A text or binary renderer based on [kotlinx.io.core.Output]
|
||||
* A text or binary renderer based on [kotlinx.io.Output]
|
||||
*/
|
||||
@Type(TEXT_RENDERER_TYPE)
|
||||
interface TextRenderer {
|
||||
@ -53,7 +53,7 @@ interface TextRenderer {
|
||||
*/
|
||||
val type: KClass<*>
|
||||
|
||||
suspend fun kotlinx.io.core.Output.render(obj: Any)
|
||||
suspend fun kotlinx.io.Output.render(obj: Any)
|
||||
|
||||
companion object {
|
||||
const val TEXT_RENDERER_TYPE = "dataforge.textRenderer"
|
||||
@ -64,7 +64,7 @@ object DefaultTextRenderer : TextRenderer {
|
||||
override val priority: Int = Int.MAX_VALUE
|
||||
override val type: KClass<*> = Any::class
|
||||
|
||||
override suspend fun kotlinx.io.core.Output.render(obj: Any) {
|
||||
override suspend fun kotlinx.io.Output.render(obj: Any) {
|
||||
append(obj.toString())
|
||||
append('\n')
|
||||
}
|
||||
|
@ -2,10 +2,13 @@ package hep.dataforge.workspace
|
||||
|
||||
import hep.dataforge.data.Data
|
||||
import hep.dataforge.data.await
|
||||
import hep.dataforge.io.*
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.IOFormat
|
||||
import hep.dataforge.io.SimpleEnvelope
|
||||
import hep.dataforge.io.readWith
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.buildPacket
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.buildPacket
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
|
@ -6,10 +6,10 @@ import hep.dataforge.io.IOFormat
|
||||
import hep.dataforge.io.io
|
||||
import hep.dataforge.meta.DFExperimental
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.readText
|
||||
import kotlinx.io.core.writeText
|
||||
import kotlinx.io.Input
|
||||
import kotlinx.io.Output
|
||||
import kotlinx.io.readText
|
||||
import kotlinx.io.writeText
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.Ignore
|
||||
import kotlin.test.Test
|
||||
|
@ -32,3 +32,5 @@ include(
|
||||
":dataforge-workspace",
|
||||
":dataforge-scripting"
|
||||
)
|
||||
|
||||
//includeBuild("../kotlinx-io")
|
Loading…
Reference in New Issue
Block a user