Major rework of IO
This commit is contained in:
parent
5921556254
commit
56679cff23
@ -1,8 +0,0 @@
|
||||
package hep.dataforge.data
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
|
||||
/**
|
||||
* Block the thread and get data content
|
||||
*/
|
||||
fun <T : Any> Data<T>.get(): T = runBlocking { await() }
|
@ -1,8 +1,14 @@
|
||||
package hep.dataforge.data
|
||||
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.reflect.full.isSuperclassOf
|
||||
|
||||
/**
|
||||
* Block the thread and get data content
|
||||
*/
|
||||
fun <T : Any> Data<T>.get(): T = runBlocking { await() }
|
||||
|
||||
/**
|
||||
* Check that node is compatible with given type meaning that each element could be cast to the type
|
||||
*/
|
@ -80,7 +80,7 @@ object BinaryMetaFormat : MetaFormat {
|
||||
writeValue(item.value)
|
||||
}
|
||||
is MetaItem.NodeItem -> {
|
||||
writeObject(item.node)
|
||||
writeThis(item.node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Named
|
||||
import hep.dataforge.io.EnvelopeFormat.Companion.ENVELOPE_FORMAT_TYPE
|
||||
import hep.dataforge.io.IOPlugin.Companion.defaultMetaFormats
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.string
|
||||
@ -49,15 +51,23 @@ val Envelope.dataType: String? get() = meta[Envelope.ENVELOPE_DATA_TYPE_KEY].str
|
||||
*/
|
||||
val Envelope.description: String? get() = meta[Envelope.ENVELOPE_DESCRIPTION_KEY].string
|
||||
|
||||
/**
|
||||
* A partially read envelope with meta, but without data
|
||||
*/
|
||||
@ExperimentalUnsignedTypes
|
||||
data class PartialEnvelope(val meta: Meta, val dataOffset: UInt, val dataSize: ULong?)
|
||||
|
||||
@Type(ENVELOPE_FORMAT_TYPE)
|
||||
interface EnvelopeFormat : IOFormat<Envelope>{
|
||||
fun readPartial(input: Input): PartialEnvelope
|
||||
interface EnvelopeFormat : IOFormat<Envelope>, Named {
|
||||
fun Input.readPartial(formats: Collection<MetaFormat> = defaultMetaFormats): PartialEnvelope
|
||||
|
||||
fun Output.writeEnvelope(envelope: Envelope, format: MetaFormat)
|
||||
fun Input.readEnvelope(formats: Collection<MetaFormat> = defaultMetaFormats): Envelope
|
||||
|
||||
override fun Output.writeObject(obj: Envelope) = writeEnvelope(obj, JsonMetaFormat)
|
||||
override fun Input.readThis(): Envelope = readEnvelope()
|
||||
|
||||
fun Output.writeEnvelope(envelope: Envelope, format: MetaFormat = JsonMetaFormat)
|
||||
|
||||
override fun Output.writeThis(obj: Envelope) = writeEnvelope(obj)
|
||||
|
||||
companion object {
|
||||
const val ENVELOPE_FORMAT_TYPE = "envelopeFormat"
|
||||
|
@ -6,9 +6,9 @@ import kotlinx.io.core.*
|
||||
* And interface for serialization facilities
|
||||
*/
|
||||
interface IOFormat<T : Any> {
|
||||
fun Output.writeObject(obj: T)
|
||||
fun Input.readObject(): T
|
||||
fun Output.writeThis(obj: T)
|
||||
fun Input.readThis(): T
|
||||
}
|
||||
|
||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeObject(obj) }
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeObject(obj) }.readBytes()
|
||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) }
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes()
|
@ -6,7 +6,6 @@ import hep.dataforge.context.PluginTag
|
||||
import hep.dataforge.context.content
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
|
||||
@ -21,16 +20,15 @@ class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
|
||||
|
||||
override fun provideTop(target: String): Map<Name, Any> {
|
||||
return when (target) {
|
||||
MetaFormat.META_FORMAT_TYPE -> internalMetaFormats
|
||||
EnvelopeFormat.ENVELOPE_FORMAT_TYPE -> mapOf(
|
||||
TaggedEnvelopeFormat.VERSION.asName() to TaggedEnvelopeFormat(metaFormats)
|
||||
)
|
||||
MetaFormat.META_FORMAT_TYPE -> defaultMetaFormats.toMap()
|
||||
EnvelopeFormat.ENVELOPE_FORMAT_TYPE -> defaultEnvelopeFormats.toMap()
|
||||
else -> super.provideTop(target)
|
||||
}
|
||||
}
|
||||
|
||||
companion object : PluginFactory<IOPlugin> {
|
||||
private val internalMetaFormats = listOf(JsonMetaFormat, BinaryMetaFormat).toMap()
|
||||
val defaultMetaFormats: List<MetaFormat> = listOf(JsonMetaFormat, BinaryMetaFormat)
|
||||
val defaultEnvelopeFormats = listOf(TaggedEnvelopeFormat)
|
||||
|
||||
override val tag: PluginTag = PluginTag("io", group = PluginTag.DATAFORGE_GROUP)
|
||||
override val type: KClass<out IOPlugin> = IOPlugin::class
|
||||
|
@ -56,6 +56,7 @@ fun Value.toJson(descriptor: ValueDescriptor? = null): JsonElement {
|
||||
|
||||
//Use theese methods to customize JSON key mapping
|
||||
private fun NameToken.toJsonKey(descriptor: ItemDescriptor?) = toString()
|
||||
|
||||
private fun NodeDescriptor?.getDescriptor(key: String) = this?.items?.get(key)
|
||||
|
||||
fun Meta.toJson(descriptor: NodeDescriptor? = null): JsonObject {
|
||||
|
@ -15,11 +15,11 @@ interface MetaFormat : IOFormat<Meta>, Named {
|
||||
override val name: String
|
||||
val key: Short
|
||||
|
||||
override fun Output.writeObject(obj: Meta) {
|
||||
override fun Output.writeThis(obj: Meta) {
|
||||
writeMeta(obj, null)
|
||||
}
|
||||
|
||||
override fun Input.readObject(): Meta = readMeta(null)
|
||||
override fun Input.readThis(): Meta = readMeta(null)
|
||||
|
||||
fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor? = null)
|
||||
fun Input.readMeta(descriptor: NodeDescriptor? = null): Meta
|
||||
@ -30,20 +30,20 @@ interface MetaFormat : IOFormat<Meta>, Named {
|
||||
}
|
||||
|
||||
fun Meta.toString(format: MetaFormat = JsonMetaFormat): String = buildPacket {
|
||||
format.run { writeObject(this@toString) }
|
||||
format.run { writeThis(this@toString) }
|
||||
}.readText()
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat): ByteReadPacket = buildPacket {
|
||||
format.run { writeObject(this@toBytes) }
|
||||
format.run { writeThis(this@toBytes) }
|
||||
}
|
||||
|
||||
|
||||
fun MetaFormat.parse(str: String): Meta {
|
||||
return ByteReadPacket(str.toByteArray()).readObject()
|
||||
return ByteReadPacket(str.toByteArray()).readThis()
|
||||
}
|
||||
|
||||
fun MetaFormat.fromBytes(packet: ByteReadPacket): Meta {
|
||||
return packet.readObject()
|
||||
return packet.readThis()
|
||||
}
|
||||
|
||||
|
||||
|
@ -2,34 +2,16 @@ package hep.dataforge.io
|
||||
|
||||
import kotlinx.io.core.*
|
||||
|
||||
class TaggedEnvelopeFormat(val metaFormats: Collection<MetaFormat>) : EnvelopeFormat {
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, format: MetaFormat) {
|
||||
write(this, envelope, format)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an envelope from input into memory
|
||||
*
|
||||
* @param input an input to read from
|
||||
* @param metaFormats a collection of meta formats to resolve
|
||||
*/
|
||||
override fun Input.readObject(): Envelope = read(this, metaFormats)
|
||||
|
||||
override fun readPartial(input: Input): PartialEnvelope = Companion.readPartial(input, metaFormats)
|
||||
|
||||
private data class Tag(
|
||||
val metaFormatKey: Short,
|
||||
val metaSize: UInt,
|
||||
val dataSize: ULong
|
||||
)
|
||||
|
||||
companion object {
|
||||
@ExperimentalUnsignedTypes
|
||||
object TaggedEnvelopeFormat : EnvelopeFormat {
|
||||
const val VERSION = "DF03"
|
||||
private const val START_SEQUENCE = "#~"
|
||||
private const val END_SEQUENCE = "~#\r\n"
|
||||
private const val TAG_SIZE = 26u
|
||||
|
||||
override val name: String get() = VERSION
|
||||
|
||||
private fun Tag.toBytes(): ByteReadPacket = buildPacket(24) {
|
||||
writeText(START_SEQUENCE)
|
||||
writeText(VERSION)
|
||||
@ -50,38 +32,50 @@ class TaggedEnvelopeFormat(val metaFormats: Collection<MetaFormat>) : EnvelopeFo
|
||||
return Tag(metaFormatKey, metaLength, dataLength)
|
||||
}
|
||||
|
||||
fun read(input: Input, metaFormats: Collection<MetaFormat>): Envelope {
|
||||
val tag = input.readTag()
|
||||
override fun Output.writeEnvelope(envelope: Envelope, format: MetaFormat) {
|
||||
val metaBytes = format.writeBytes(envelope.meta)
|
||||
val tag = Tag(format.key, metaBytes.size.toUInt(), envelope.data?.size ?: 0.toULong())
|
||||
writePacket(tag.toBytes())
|
||||
writeFully(metaBytes)
|
||||
envelope.data?.read { copyTo(this@writeEnvelope) }
|
||||
}
|
||||
|
||||
val metaFormat = metaFormats.find { it.key == tag.metaFormatKey }
|
||||
/**
|
||||
* Read an envelope from input into memory
|
||||
*
|
||||
* @param input an input to read from
|
||||
* @param formats a collection of meta formats to resolve
|
||||
*/
|
||||
override fun Input.readEnvelope(formats: Collection<MetaFormat>): Envelope {
|
||||
val tag = readTag()
|
||||
|
||||
val metaFormat = formats.find { it.key == tag.metaFormatKey }
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaPacket = ByteReadPacket(input.readBytes(tag.metaSize.toInt()))
|
||||
val meta = metaFormat.run { metaPacket.readObject() }
|
||||
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
|
||||
val meta = metaFormat.run { metaPacket.readThis() }
|
||||
|
||||
val dataBytes = input.readBytes(tag.dataSize.toInt())
|
||||
val dataBytes = readBytes(tag.dataSize.toInt())
|
||||
|
||||
return SimpleEnvelope(meta, ArrayBinary(dataBytes))
|
||||
}
|
||||
|
||||
fun readPartial(input: Input, metaFormats: Collection<MetaFormat>): PartialEnvelope {
|
||||
val tag = input.readTag()
|
||||
override fun Input.readPartial(formats: Collection<MetaFormat>): PartialEnvelope {
|
||||
val tag = readTag()
|
||||
|
||||
val metaFormat = metaFormats.find { it.key == tag.metaFormatKey }
|
||||
val metaFormat = formats.find { it.key == tag.metaFormatKey }
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaPacket = ByteReadPacket(input.readBytes(tag.metaSize.toInt()))
|
||||
val meta = metaFormat.run { metaPacket.readObject() }
|
||||
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
|
||||
val meta = metaFormat.run { metaPacket.readThis() }
|
||||
|
||||
return PartialEnvelope(meta, TAG_SIZE + tag.metaSize, tag.dataSize)
|
||||
}
|
||||
|
||||
fun write(out: Output, envelope: Envelope, metaFormat: MetaFormat) {
|
||||
val metaBytes = metaFormat.writeBytes(envelope.meta)
|
||||
val tag = Tag(metaFormat.key, metaBytes.size.toUInt(), envelope.data?.size ?: 0.toULong())
|
||||
out.writePacket(tag.toBytes())
|
||||
out.writeFully(metaBytes)
|
||||
envelope.data?.read { copyTo(out) }
|
||||
}
|
||||
}
|
||||
private data class Tag(
|
||||
val metaFormatKey: Short,
|
||||
val metaSize: UInt,
|
||||
val dataSize: ULong
|
||||
)
|
||||
|
||||
}
|
@ -7,6 +7,7 @@ import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
class FileBinary(val path: Path, private val offset: UInt = 0u, size: ULong? = null) : RandomAccessBinary {
|
||||
|
||||
override val size: ULong = size ?: (Files.size(path).toULong() - offset).toULong()
|
||||
|
@ -2,6 +2,7 @@ package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
@ -13,7 +14,7 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
|
||||
|
||||
init {
|
||||
val input = Files.newByteChannel(path, StandardOpenOption.READ).asInput()
|
||||
partialEnvelope = format.readPartial(input)
|
||||
partialEnvelope = format.run { input.readPartial() }
|
||||
}
|
||||
|
||||
override val meta: Meta get() = partialEnvelope.meta
|
||||
@ -22,3 +23,20 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
|
||||
}
|
||||
|
||||
fun Path.readEnvelope(format: EnvelopeFormat) = FileEnvelope(this, format)
|
||||
|
||||
fun Path.writeEnvelope(
|
||||
envelope: Envelope,
|
||||
format: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
metaFormat: MetaFormat = JsonMetaFormat
|
||||
) {
|
||||
val output = Files.newByteChannel(
|
||||
this,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.TRUNCATE_EXISTING
|
||||
).asOutput()
|
||||
|
||||
with(format) {
|
||||
output.writeEnvelope(envelope, metaFormat)
|
||||
}
|
||||
}
|
@ -62,7 +62,7 @@ suspend fun <T : Any> Context.readData(
|
||||
format.run {
|
||||
Files.newByteChannel(path, StandardOpenOption.READ)
|
||||
.asInput()
|
||||
.readObject()
|
||||
.readThis()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user