Dev zelenyy 2 #29
@ -6,7 +6,7 @@ plugins {
|
||||
id("scientifik.publish") version "0.2.2" apply false
|
||||
}
|
||||
|
||||
val dataforgeVersion by extra("0.1.5-dev-1")
|
||||
val dataforgeVersion by extra("0.1.5-dev-2")
|
||||
|
||||
val bintrayRepo by extra("dataforge")
|
||||
val githubProject by extra("dataforge-core")
|
||||
|
@ -13,16 +13,22 @@ import kotlin.reflect.KClass
|
||||
sealed class DataItem<out T : Any> : MetaRepr {
|
||||
abstract val type: KClass<out T>
|
||||
|
||||
abstract val meta: Meta
|
||||
|
||||
class Node<out T : Any>(val value: DataNode<T>) : DataItem<T>() {
|
||||
override val type: KClass<out T> get() = value.type
|
||||
|
||||
override fun toMeta(): Meta = value.toMeta()
|
||||
|
||||
override val meta: Meta get() = value.meta
|
||||
}
|
||||
|
||||
class Leaf<out T : Any>(val value: Data<T>) : DataItem<T>() {
|
||||
override val type: KClass<out T> get() = value.type
|
||||
|
||||
override fun toMeta(): Meta = value.toMeta()
|
||||
|
||||
override val meta: Meta get() = value.meta
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,6 +44,8 @@ interface DataNode<out T : Any> : MetaRepr {
|
||||
|
||||
val items: Map<NameToken, DataItem<T>>
|
||||
|
||||
val meta: Meta
|
||||
|
||||
override fun toMeta(): Meta = buildMeta {
|
||||
"type" put (type.simpleName ?: "undefined")
|
||||
"items" put {
|
||||
@ -64,21 +72,13 @@ val <T : Any> DataItem<T>?.node: DataNode<T>? get() = (this as? DataItem.Node<T>
|
||||
val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.value
|
||||
|
||||
/**
|
||||
* Start computation for all goals in data node
|
||||
* Start computation for all goals in data node and return a job for the whole node
|
||||
*/
|
||||
fun DataNode<*>.startAll(scope: CoroutineScope): Unit = items.values.forEach {
|
||||
when (it) {
|
||||
is DataItem.Node<*> -> it.value.startAll(scope)
|
||||
is DataItem.Leaf<*> -> it.value.start(scope)
|
||||
}
|
||||
}
|
||||
|
||||
fun DataNode<*>.joinAll(scope: CoroutineScope): Job = scope.launch {
|
||||
startAll(scope)
|
||||
items.forEach {
|
||||
when (val value = it.value) {
|
||||
is DataItem.Node -> value.value.joinAll(this).join()
|
||||
is DataItem.Leaf -> value.value.await(scope)
|
||||
fun DataNode<*>.launchAll(scope: CoroutineScope): Job = scope.launch {
|
||||
items.values.forEach {
|
||||
when (it) {
|
||||
is DataItem.Node<*> -> it.value.launchAll(scope)
|
||||
is DataItem.Leaf<*> -> it.value.start(scope)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -125,12 +125,9 @@ operator fun <T : Any> DataNode<T>.iterator(): Iterator<Pair<Name, DataItem<T>>>
|
||||
|
||||
class DataTree<out T : Any> internal constructor(
|
||||
override val type: KClass<out T>,
|
||||
override val items: Map<NameToken, DataItem<T>>
|
||||
) : DataNode<T> {
|
||||
override fun toString(): String {
|
||||
return super.toString()
|
||||
}
|
||||
}
|
||||
override val items: Map<NameToken, DataItem<T>>,
|
||||
override val meta: Meta
|
||||
) : DataNode<T>
|
||||
|
||||
private sealed class DataTreeBuilderItem<out T : Any> {
|
||||
class Node<T : Any>(val tree: DataTreeBuilder<T>) : DataTreeBuilderItem<T>()
|
||||
@ -144,6 +141,8 @@ private sealed class DataTreeBuilderItem<out T : Any> {
|
||||
class DataTreeBuilder<T : Any>(val type: KClass<out T>) {
|
||||
private val map = HashMap<NameToken, DataTreeBuilderItem<T>>()
|
||||
|
||||
private var meta = MetaBuilder()
|
||||
|
||||
operator fun set(token: NameToken, node: DataTreeBuilder<out T>) {
|
||||
if (map.containsKey(token)) error("Tree entry with name $token is not empty")
|
||||
map[token] = DataTreeBuilderItem.Node(node)
|
||||
@ -211,13 +210,19 @@ class DataTreeBuilder<T : Any>(val type: KClass<out T>) {
|
||||
infix fun String.put(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder(type).apply(block))
|
||||
|
||||
|
||||
/**
|
||||
* Update data with given node data and meta with node meta.
|
||||
*/
|
||||
fun update(node: DataNode<T>) {
|
||||
node.dataSequence().forEach {
|
||||
//TODO check if the place is occupied
|
||||
this[it.first] = it.second
|
||||
}
|
||||
meta.update(node.meta)
|
||||
}
|
||||
|
||||
fun meta(block: MetaBuilder.() -> Unit) = meta.apply(block)
|
||||
|
||||
fun build(): DataTree<T> {
|
||||
val resMap = map.mapValues { (_, value) ->
|
||||
when (value) {
|
||||
@ -225,7 +230,7 @@ class DataTreeBuilder<T : Any>(val type: KClass<out T>) {
|
||||
is DataTreeBuilderItem.Node -> DataItem.Node(value.tree.build())
|
||||
}
|
||||
}
|
||||
return DataTree(type, resMap)
|
||||
return DataTree(type, resMap, meta.seal())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -52,6 +52,7 @@ inline fun <reified R : Any> Data<*>.cast(): Data<R> = cast(R::class)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
fun <R : Any> DataNode<*>.cast(type: KClass<out R>): DataNode<R> {
|
||||
return object : DataNode<R> {
|
||||
override val meta: Meta get() = this@cast.meta
|
||||
override val type: KClass<out R> = type
|
||||
override val items: Map<NameToken, DataItem<R>> get() = this@cast.items as Map<NameToken, DataItem<R>>
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package hep.dataforge.data
|
||||
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.NameToken
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
@ -8,6 +9,7 @@ import kotlin.reflect.KClass
|
||||
* A zero-copy data node wrapper that returns only children with appropriate type.
|
||||
*/
|
||||
class TypeFilteredDataNode<out T : Any>(val origin: DataNode<*>, override val type: KClass<out T>) : DataNode<T> {
|
||||
override val meta: Meta get() = origin.meta
|
||||
override val items: Map<NameToken, DataItem<T>> by lazy {
|
||||
origin.items.mapNotNull { (key, item) ->
|
||||
when (item) {
|
||||
|
@ -30,12 +30,10 @@ fun <R : Any> Data<*>.filterIsInstance(type: KClass<out R>): Data<R>? =
|
||||
* but could contain empty nodes
|
||||
*/
|
||||
fun <R : Any> DataNode<*>.filterIsInstance(type: KClass<out R>): DataNode<R> {
|
||||
return if (canCast(type)) {
|
||||
cast(type)
|
||||
} else if (this is TypeFilteredDataNode) {
|
||||
origin.filterIsInstance(type)
|
||||
} else {
|
||||
TypeFilteredDataNode(this, type)
|
||||
return when {
|
||||
canCast(type) -> cast(type)
|
||||
this is TypeFilteredDataNode -> origin.filterIsInstance(type)
|
||||
else -> TypeFilteredDataNode(this, type)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,7 +24,7 @@ class FrontMatterEnvelopeFormat(
|
||||
|
||||
val readMetaFormat =
|
||||
metaTypeRegex.matchEntire(line)?.groupValues?.first()
|
||||
?.let { io.metaFormat(it) } ?: YamlMetaFormat.default
|
||||
?.let { io.metaFormat(it) } ?: YamlMetaFormat
|
||||
|
||||
val metaBlock = buildPacket {
|
||||
do {
|
||||
@ -45,7 +45,7 @@ class FrontMatterEnvelopeFormat(
|
||||
|
||||
val readMetaFormat =
|
||||
metaTypeRegex.matchEntire(line)?.groupValues?.first()
|
||||
?.let { io.metaFormat(it) } ?: YamlMetaFormat.default
|
||||
?.let { io.metaFormat(it) } ?: YamlMetaFormat
|
||||
|
||||
val metaBlock = buildPacket {
|
||||
do {
|
||||
@ -72,7 +72,7 @@ class FrontMatterEnvelopeFormat(
|
||||
private val metaTypeRegex = "---(\\w*)\\s*".toRegex()
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): EnvelopeFormat {
|
||||
return FrontMatterEnvelopeFormat(context.io, meta)
|
||||
return FrontMatterEnvelopeFormat(context.io, meta)
|
||||
}
|
||||
|
||||
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
|
||||
@ -84,5 +84,16 @@ class FrontMatterEnvelopeFormat(
|
||||
}
|
||||
}
|
||||
|
||||
private val default by lazy { invoke() }
|
||||
|
||||
override fun Input.readPartial(): PartialEnvelope =
|
||||
default.run { readPartial() }
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) =
|
||||
default.run { writeEnvelope(envelope, metaFormatFactory, formatMeta) }
|
||||
|
||||
override fun Input.readObject(): Envelope =
|
||||
default.run { readObject() }
|
||||
|
||||
}
|
||||
}
|
@ -45,12 +45,18 @@ class YamlMetaFormat(val meta: Meta) : MetaFormat {
|
||||
}
|
||||
|
||||
companion object : MetaFormatFactory {
|
||||
val default = YamlMetaFormat()
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = YamlMetaFormat(meta)
|
||||
|
||||
override val name: Name = super.name + "yaml"
|
||||
|
||||
override val key: Short = 0x594d //YM
|
||||
|
||||
private val default = YamlMetaFormat()
|
||||
|
||||
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) =
|
||||
default.run { writeMeta(meta, descriptor) }
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta =
|
||||
default.run { readMeta(descriptor) }
|
||||
}
|
||||
}
|
@ -8,7 +8,7 @@ import kotlin.math.min
|
||||
*/
|
||||
interface Binary {
|
||||
/**
|
||||
* The size of binary in bytes
|
||||
* The size of binary in bytes. [ULong.MAX_VALUE] if size is not defined and input should be read until its end is reached
|
||||
*/
|
||||
val size: ULong
|
||||
|
||||
@ -18,6 +18,10 @@ interface Binary {
|
||||
* Some implementation may forbid this to be called twice. In this case second call will throw an exception.
|
||||
*/
|
||||
fun <R> read(block: Input.() -> R): R
|
||||
|
||||
companion object {
|
||||
val EMPTY = EmptyBinary
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -48,12 +52,11 @@ fun RandomAccessBinary.readPacket(from: UInt, size: UInt): ByteReadPacket = read
|
||||
@ExperimentalUnsignedTypes
|
||||
object EmptyBinary : RandomAccessBinary {
|
||||
|
||||
override val size: ULong = 0.toULong()
|
||||
override val size: ULong = 0u
|
||||
|
||||
override fun <R> read(from: UInt, size: UInt, block: Input.() -> R): R {
|
||||
error("The binary is empty")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
@ -79,9 +82,9 @@ fun <T : Any> Binary.readWith(format: IOFormat<T>): T = format.run {
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Any> IOFormat<T>.writeBinary(obj: T): Binary {
|
||||
val packet = buildPacket {
|
||||
writeObject(obj)
|
||||
}
|
||||
return ArrayBinary(packet.readBytes())
|
||||
}
|
||||
//fun <T : Any> IOFormat<T>.writeBinary(obj: T): Binary {
|
||||
// val packet = buildPacket {
|
||||
// writeObject(obj)
|
||||
// }
|
||||
// return ArrayBinary(packet.readBytes())
|
||||
//}
|
@ -21,6 +21,7 @@ interface Envelope {
|
||||
val ENVELOPE_DATA_TYPE_KEY = ENVELOPE_NODE_KEY + "dataType"
|
||||
val ENVELOPE_DATA_ID_KEY = ENVELOPE_NODE_KEY + "dataID"
|
||||
val ENVELOPE_DESCRIPTION_KEY = ENVELOPE_NODE_KEY + "description"
|
||||
val ENVELOPE_NAME_KEY = ENVELOPE_NODE_KEY + "name"
|
||||
//const val ENVELOPE_TIME_KEY = "@envelope.time"
|
||||
|
||||
/**
|
||||
|
@ -21,6 +21,7 @@ class EnvelopeBuilder {
|
||||
var dataType by metaBuilder.string(key = Envelope.ENVELOPE_DATA_TYPE_KEY)
|
||||
var dataID by metaBuilder.string(key = Envelope.ENVELOPE_DATA_ID_KEY)
|
||||
var description by metaBuilder.string(key = Envelope.ENVELOPE_DESCRIPTION_KEY)
|
||||
var name by metaBuilder.string(key = Envelope.ENVELOPE_NAME_KEY)
|
||||
|
||||
/**
|
||||
* Construct a binary and transform it into byte-array based buffer
|
||||
|
@ -31,7 +31,7 @@ interface EnvelopeFormat : IOFormat<Envelope> {
|
||||
}
|
||||
|
||||
@Type(ENVELOPE_FORMAT_TYPE)
|
||||
interface EnvelopeFormatFactory : IOFormatFactory<Envelope> {
|
||||
interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat {
|
||||
override val name: Name get() = "envelope".asName()
|
||||
override val type: KClass<out Envelope> get() = Envelope::class
|
||||
|
||||
|
@ -3,27 +3,31 @@ package hep.dataforge.io
|
||||
import hep.dataforge.context.Global
|
||||
import hep.dataforge.io.EnvelopeParts.FORMAT_META_KEY
|
||||
import hep.dataforge.io.EnvelopeParts.FORMAT_NAME_KEY
|
||||
import hep.dataforge.io.EnvelopeParts.PARTS_DATA_TYPE
|
||||
import hep.dataforge.io.EnvelopeParts.MULTIPART_DATA_TYPE
|
||||
import hep.dataforge.io.EnvelopeParts.SIZE_KEY
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.names.toName
|
||||
|
||||
object EnvelopeParts {
|
||||
val SIZE_KEY = Envelope.ENVELOPE_NODE_KEY + "parts" + "size"
|
||||
val FORMAT_NAME_KEY = Envelope.ENVELOPE_NODE_KEY + "parts" + "format"
|
||||
val FORMAT_META_KEY = Envelope.ENVELOPE_NODE_KEY + "parts" + "meta"
|
||||
val MULTIPART_KEY = "multipart".asName()
|
||||
val SIZE_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "size"
|
||||
val FORMAT_NAME_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "format"
|
||||
val FORMAT_META_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "meta"
|
||||
|
||||
const val PARTS_DATA_TYPE = "envelope.parts"
|
||||
const val MULTIPART_DATA_TYPE = "envelope.multipart"
|
||||
}
|
||||
|
||||
fun EnvelopeBuilder.parts(formatFactory: EnvelopeFormatFactory, envelopes: Collection<Envelope>) {
|
||||
dataType = PARTS_DATA_TYPE
|
||||
/**
|
||||
* Append multiple serialized envelopes to the data block. Previous data is erased if it was present
|
||||
*/
|
||||
fun EnvelopeBuilder.multipart(format: EnvelopeFormatFactory, envelopes: Collection<Envelope>) {
|
||||
dataType = MULTIPART_DATA_TYPE
|
||||
meta {
|
||||
SIZE_KEY put envelopes.size
|
||||
FORMAT_NAME_KEY put formatFactory.name.toString()
|
||||
FORMAT_NAME_KEY put format.name.toString()
|
||||
}
|
||||
val format = formatFactory()
|
||||
data {
|
||||
format.run {
|
||||
envelopes.forEach {
|
||||
@ -33,12 +37,15 @@ fun EnvelopeBuilder.parts(formatFactory: EnvelopeFormatFactory, envelopes: Colle
|
||||
}
|
||||
}
|
||||
|
||||
fun EnvelopeBuilder.parts(formatFactory: EnvelopeFormatFactory, builder: suspend SequenceScope<Envelope>.() -> Unit) =
|
||||
parts(formatFactory, sequence(builder).toList())
|
||||
fun EnvelopeBuilder.multipart(formatFactory: EnvelopeFormatFactory, builder: suspend SequenceScope<Envelope>.() -> Unit) =
|
||||
multipart(formatFactory, sequence(builder).toList())
|
||||
|
||||
fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Envelope> {
|
||||
/**
|
||||
* If given envelope supports multipart data, return a sequence of those parts (could be empty). Otherwise return null.
|
||||
*/
|
||||
fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Envelope>? {
|
||||
return when (dataType) {
|
||||
PARTS_DATA_TYPE -> {
|
||||
MULTIPART_DATA_TYPE -> {
|
||||
val size = meta[SIZE_KEY].int ?: error("Unsized parts not supported yet")
|
||||
val formatName = meta[FORMAT_NAME_KEY].string?.toName()
|
||||
?: error("Inferring parts format is not supported at the moment")
|
||||
@ -55,6 +62,6 @@ fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Enve
|
||||
} ?: emptySequence()
|
||||
}
|
||||
}
|
||||
else -> emptySequence()
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import kotlinx.serialization.ImplicitReflectionSerializer
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.cbor.Cbor
|
||||
import kotlinx.serialization.serializer
|
||||
import kotlin.math.min
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
@ -80,33 +79,9 @@ inline fun buildPacketWithoutPool(headerSizeHint: Int = 0, block: BytePacketBuil
|
||||
}
|
||||
|
||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeObject(obj) }
|
||||
//TODO Double buffer copy. fix all that with IO-2
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeObject(obj) }.readBytes()
|
||||
fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T {
|
||||
//= ByteReadPacket(array).readThis()
|
||||
val byteArrayInput: Input = object : AbstractInput(
|
||||
IoBuffer.Pool.borrow(),
|
||||
remaining = array.size.toLong(),
|
||||
pool = IoBuffer.Pool
|
||||
) {
|
||||
var written = 0
|
||||
override fun closeSource() {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
override fun fill(): IoBuffer? {
|
||||
if (array.size - written <= 0) return null
|
||||
|
||||
return IoBuffer.Pool.fill {
|
||||
reserveEndGap(IoBuffer.ReservedSize)
|
||||
val toWrite = min(capacity, array.size - written)
|
||||
writeFully(array, written, toWrite)
|
||||
written += toWrite
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return byteArrayInput.readObject()
|
||||
}
|
||||
fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T = buildPacket { writeFully(array) }.readObject()
|
||||
|
||||
object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
||||
override fun invoke(meta: Meta, context: Context): IOFormat<Double> = this
|
||||
|
@ -38,12 +38,18 @@ class JsonMetaFormat(private val json: Json = Json.indented) : MetaFormat {
|
||||
}
|
||||
|
||||
companion object : MetaFormatFactory {
|
||||
val default = JsonMetaFormat()
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = default
|
||||
|
||||
override val name: Name = super.name + "json"
|
||||
override val key: Short = 0x4a53//"JS"
|
||||
|
||||
private val default = JsonMetaFormat()
|
||||
|
||||
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) =
|
||||
default.run { writeMeta(meta,descriptor) }
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta =
|
||||
default.run { readMeta(descriptor) }
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,7 +96,7 @@ fun Meta.toJson(descriptor: NodeDescriptor? = null): JsonObject {
|
||||
fun JsonElement.toMeta(descriptor: NodeDescriptor? = null): Meta {
|
||||
return when (val item = toMetaItem(descriptor)) {
|
||||
is MetaItem.NodeItem<*> -> item.node
|
||||
is MetaItem.ValueItem ->item.value.toMeta()
|
||||
is MetaItem.ValueItem -> item.value.toMeta()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,7 @@ interface MetaFormat : IOFormat<Meta> {
|
||||
}
|
||||
|
||||
@Type(META_FORMAT_TYPE)
|
||||
interface MetaFormatFactory : IOFormatFactory<Meta> {
|
||||
interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
|
||||
override val name: Name get() = "meta".asName()
|
||||
|
||||
override val type: KClass<out Meta> get() = Meta::class
|
||||
@ -47,7 +47,7 @@ fun Meta.toString(format: MetaFormat): String = buildPacket {
|
||||
|
||||
fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory())
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat.default): ByteReadPacket = buildPacket {
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat): ByteReadPacket = buildPacket {
|
||||
format.run { writeObject(this@toBytes) }
|
||||
}
|
||||
|
||||
|
@ -39,7 +39,12 @@ class TaggedEnvelopeFormat(
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) {
|
||||
val metaFormat = metaFormatFactory.invoke(formatMeta, io.context)
|
||||
val metaBytes = metaFormat.writeBytes(envelope.meta)
|
||||
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, envelope.data?.size ?: 0.toULong())
|
||||
val actualSize: ULong = if (envelope.data == null) {
|
||||
0u
|
||||
} else {
|
||||
envelope.data?.size ?: ULong.MAX_VALUE
|
||||
}
|
||||
val tag = Tag(metaFormatFactory.key, metaBytes.size.toUInt() + 2u, actualSize)
|
||||
writePacket(tag.toBytes())
|
||||
writeFully(metaBytes)
|
||||
writeText("\r\n")
|
||||
@ -59,7 +64,10 @@ class TaggedEnvelopeFormat(
|
||||
val metaFormat = io.metaFormat(tag.metaFormatKey)
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
|
||||
val metaBytes = readBytes(tag.metaSize.toInt())
|
||||
val metaPacket = buildPacket {
|
||||
writeFully(metaBytes)
|
||||
}
|
||||
val dataBytes = readBytes(tag.dataSize.toInt())
|
||||
|
||||
val meta = metaFormat.run { metaPacket.readObject() }
|
||||
@ -134,7 +142,16 @@ class TaggedEnvelopeFormat(
|
||||
}
|
||||
}
|
||||
|
||||
val default by lazy { invoke() }
|
||||
private val default by lazy { invoke() }
|
||||
|
||||
override fun Input.readPartial(): PartialEnvelope =
|
||||
default.run { readPartial() }
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) =
|
||||
default.run { writeEnvelope(envelope, metaFormatFactory, formatMeta) }
|
||||
|
||||
override fun Input.readObject(): Envelope =
|
||||
default.run { readObject() }
|
||||
}
|
||||
|
||||
}
|
@ -27,7 +27,13 @@ class TaglessEnvelopeFormat(
|
||||
//printing all properties
|
||||
writeProperty(META_TYPE_PROPERTY, metaFormatFactory.type)
|
||||
//TODO add optional metaFormat properties
|
||||
writeProperty(DATA_LENGTH_PROPERTY, envelope.data?.size ?: 0)
|
||||
val actualSize: ULong = if (envelope.data == null) {
|
||||
0u
|
||||
} else {
|
||||
envelope.data?.size ?: ULong.MAX_VALUE
|
||||
}
|
||||
|
||||
writeProperty(DATA_LENGTH_PROPERTY, actualSize)
|
||||
|
||||
//Printing meta
|
||||
if (!envelope.meta.isEmpty()) {
|
||||
@ -67,7 +73,7 @@ class TaglessEnvelopeFormat(
|
||||
var meta: Meta = EmptyMeta
|
||||
|
||||
if (line.startsWith(metaStart)) {
|
||||
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default
|
||||
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat
|
||||
val metaSize = properties.get(META_LENGTH_PROPERTY)?.toInt()
|
||||
meta = if (metaSize != null) {
|
||||
val metaPacket = buildPacket {
|
||||
@ -122,7 +128,7 @@ class TaglessEnvelopeFormat(
|
||||
var meta: Meta = EmptyMeta
|
||||
|
||||
if (line.startsWith(metaStart)) {
|
||||
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat.default
|
||||
val metaFormat = properties[META_TYPE_PROPERTY]?.let { io.metaFormat(it) } ?: JsonMetaFormat
|
||||
|
||||
val metaSize = properties.get(META_LENGTH_PROPERTY)?.toInt()
|
||||
meta = if (metaSize != null) {
|
||||
@ -171,7 +177,16 @@ class TaglessEnvelopeFormat(
|
||||
return TaglessEnvelopeFormat(context.io, meta)
|
||||
}
|
||||
|
||||
val default by lazy { invoke() }
|
||||
private val default by lazy { invoke() }
|
||||
|
||||
override fun Input.readPartial(): PartialEnvelope =
|
||||
default.run { readPartial() }
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, metaFormatFactory: MetaFormatFactory, formatMeta: Meta) =
|
||||
default.run { writeEnvelope(envelope, metaFormatFactory, formatMeta) }
|
||||
|
||||
override fun Input.readObject(): Envelope =
|
||||
default.run { readObject() }
|
||||
|
||||
override fun peekFormat(io: IOPlugin, input: Input): EnvelopeFormat? {
|
||||
return try {
|
||||
|
@ -18,7 +18,7 @@ class EnvelopeFormatTest {
|
||||
@ExperimentalStdlibApi
|
||||
@Test
|
||||
fun testTaggedFormat(){
|
||||
TaggedEnvelopeFormat.default.run {
|
||||
TaggedEnvelopeFormat.run {
|
||||
val bytes = writeBytes(envelope)
|
||||
println(bytes.decodeToString())
|
||||
val res = readBytes(bytes)
|
||||
@ -32,7 +32,7 @@ class EnvelopeFormatTest {
|
||||
|
||||
@Test
|
||||
fun testTaglessFormat(){
|
||||
TaglessEnvelopeFormat.default.run {
|
||||
TaglessEnvelopeFormat.run {
|
||||
val bytes = writeBytes(envelope)
|
||||
println(bytes.decodeToString())
|
||||
val res = readBytes(bytes)
|
||||
|
@ -14,19 +14,23 @@ class EnvelopePartsTest {
|
||||
}
|
||||
data {
|
||||
writeText("Hello World $it")
|
||||
repeat(200){
|
||||
writeInt(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
val partsEnvelope = Envelope {
|
||||
parts(TaggedEnvelopeFormat, envelopes)
|
||||
multipart(TaggedEnvelopeFormat, envelopes)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun testParts() {
|
||||
val bytes = TaggedEnvelopeFormat.default.writeBytes(partsEnvelope)
|
||||
val reconstructed = TaggedEnvelopeFormat.default.readBytes(bytes)
|
||||
val parts = reconstructed.parts().toList()
|
||||
val bytes = TaggedEnvelopeFormat.writeBytes(partsEnvelope)
|
||||
val reconstructed = TaggedEnvelopeFormat.readBytes(bytes)
|
||||
val parts = reconstructed.parts()?.toList() ?: emptyList()
|
||||
assertEquals(2, parts[2].meta["value"].int)
|
||||
println(reconstructed.data!!.size)
|
||||
}
|
||||
|
||||
}
|
@ -1,9 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
@ -15,7 +13,7 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
|
||||
|
||||
init {
|
||||
val input = Files.newByteChannel(path, StandardOpenOption.READ).asInput()
|
||||
partialEnvelope = format.run { input.use { it.readPartial()} }
|
||||
partialEnvelope = format.run { input.use { it.readPartial() } }
|
||||
}
|
||||
|
||||
override val meta: Meta get() = partialEnvelope.meta
|
||||
@ -23,30 +21,3 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
|
||||
override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset, partialEnvelope.dataSize)
|
||||
}
|
||||
|
||||
fun IOPlugin.readEnvelopeFile(
|
||||
path: Path,
|
||||
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
): FileEnvelope {
|
||||
val format = formatFactory(formatMeta, context)
|
||||
return FileEnvelope(path, format)
|
||||
}
|
||||
|
||||
fun IOPlugin.writeEnvelopeFile(
|
||||
path: Path,
|
||||
envelope: Envelope,
|
||||
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
) {
|
||||
val output = Files.newByteChannel(
|
||||
path,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.TRUNCATE_EXISTING
|
||||
).asOutput()
|
||||
|
||||
with(formatFactory(formatMeta, context)) {
|
||||
output.writeObject(envelope)
|
||||
}
|
||||
}
|
||||
|
||||
|
137
dataforge-io/src/jvmMain/kotlin/hep/dataforge/io/fileIO.kt
Normal file
137
dataforge-io/src/jvmMain/kotlin/hep/dataforge/io/fileIO.kt
Normal file
@ -0,0 +1,137 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.meta.DFExperimental
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.reflect.full.isSuperclassOf
|
||||
import kotlin.streams.asSequence
|
||||
|
||||
inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
|
||||
return ioFormats.values.find { it.type.isSuperclassOf(T::class) } as IOFormat<T>?
|
||||
}
|
||||
|
||||
/**
|
||||
* Read file containing meta using given [formatOverride] or file extension to infer meta type.
|
||||
* If [path] is a directory search for file starting with `meta` in it
|
||||
*/
|
||||
fun IOPlugin.readMetaFile(path: Path, formatOverride: MetaFormat? = null, descriptor: NodeDescriptor? = null): Meta {
|
||||
if (!Files.exists(path)) error("Meta file $path does not exist")
|
||||
|
||||
val actualPath: Path = if (Files.isDirectory(path)) {
|
||||
Files.list(path).asSequence().find { it.fileName.startsWith("meta") }
|
||||
?: error("The directory $path does not contain meta file")
|
||||
} else {
|
||||
path
|
||||
}
|
||||
val extension = actualPath.fileName.toString().substringAfterLast('.')
|
||||
|
||||
val metaFormat = formatOverride ?: metaFormat(extension) ?: error("Can't resolve meta format $extension")
|
||||
return metaFormat.run {
|
||||
Files.newByteChannel(actualPath, StandardOpenOption.READ).asInput().use { it.readMeta(descriptor) }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write meta to file using [metaFormat]. If [path] is a directory, write a file with name equals name of [metaFormat].
|
||||
* Like "meta.json"
|
||||
*/
|
||||
fun IOPlugin.writeMetaFile(
|
||||
path: Path,
|
||||
metaFormat: MetaFormatFactory = JsonMetaFormat,
|
||||
descriptor: NodeDescriptor? = null
|
||||
) {
|
||||
val actualPath = if (Files.isDirectory(path)) {
|
||||
path.resolve(metaFormat.name.toString())
|
||||
} else {
|
||||
path
|
||||
}
|
||||
metaFormat.run {
|
||||
Files.newByteChannel(actualPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW).asOutput().use {
|
||||
it.writeMeta(meta, descriptor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read and envelope from file if the file exists, return null if file does not exist.
|
||||
*
|
||||
* If file is directory, then expect two files inside:
|
||||
* * **meta.<format name>** for meta
|
||||
* * **data** for data
|
||||
*
|
||||
* If the file is envelope read it using [EnvelopeFormatFactory.peekFormat] functionality to infer format.
|
||||
*
|
||||
* If the file is not an envelope and [readNonEnvelopes] is true, return an Envelope without meta, using file as binary.
|
||||
*
|
||||
* Return null otherwise.
|
||||
*/
|
||||
@DFExperimental
|
||||
fun IOPlugin.readEnvelopeFile(path: Path, readNonEnvelopes: Boolean = false): Envelope? {
|
||||
if (!Files.exists(path)) return null
|
||||
|
||||
//read two-files directory
|
||||
if (Files.isDirectory(path)) {
|
||||
val metaFile = Files.list(path).asSequence()
|
||||
.singleOrNull { it.fileName.toString().startsWith("meta") }
|
||||
|
||||
val meta = if (metaFile == null) {
|
||||
EmptyMeta
|
||||
} else {
|
||||
readMetaFile(metaFile)
|
||||
}
|
||||
|
||||
val dataFile = path.resolve("data")
|
||||
|
||||
val data: Binary? = if (Files.exists(dataFile)) {
|
||||
dataFile.asBinary()
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
||||
return SimpleEnvelope(meta, data)
|
||||
}
|
||||
|
||||
val binary = path.asBinary()
|
||||
|
||||
val formats = envelopeFormatFactories.mapNotNull { factory ->
|
||||
binary.read {
|
||||
factory.peekFormat(this@readEnvelopeFile, this@read)
|
||||
}
|
||||
}
|
||||
return when (formats.size) {
|
||||
0 -> if (readNonEnvelopes) {
|
||||
SimpleEnvelope(Meta.empty, binary)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
1 -> formats.first().run {
|
||||
binary.read {
|
||||
readObject()
|
||||
}
|
||||
}
|
||||
else -> error("Envelope format file recognition clash")
|
||||
}
|
||||
}
|
||||
|
||||
fun IOPlugin.writeEnvelopeFile(
|
||||
path: Path,
|
||||
envelope: Envelope,
|
||||
format: EnvelopeFormat = TaggedEnvelopeFormat
|
||||
) {
|
||||
val output = Files.newByteChannel(
|
||||
path,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.TRUNCATE_EXISTING
|
||||
).asOutput()
|
||||
|
||||
with(format) {
|
||||
output.writeObject(envelope)
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.io.functions.FunctionServer
|
||||
import hep.dataforge.io.functions.function
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.buildMeta
|
||||
import hep.dataforge.names.Name
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.reflect.full.isSuperclassOf
|
||||
|
||||
|
||||
fun IOPlugin.resolveIOFormatName(type: KClass<*>): Name {
|
||||
return ioFormats.entries.find { it.value.type.isSuperclassOf(type) }?.key
|
||||
?: error("Can't resolve IOFormat for type $type")
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> IOPlugin.generateFunctionMeta(functionName: String): Meta = buildMeta {
|
||||
FunctionServer.FUNCTION_NAME_KEY put functionName
|
||||
FunctionServer.INPUT_FORMAT_KEY put resolveIOFormatName(T::class).toString()
|
||||
FunctionServer.OUTPUT_FORMAT_KEY put resolveIOFormatName(R::class).toString()
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> FunctionServer.function(
|
||||
functionName: String
|
||||
): (suspend (T) -> R) {
|
||||
val plugin = context.plugins.get<IOPlugin>() ?: error("IO plugin not loaded")
|
||||
val meta = plugin.generateFunctionMeta<T, R>(functionName)
|
||||
return function(meta)
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.io.functions.FunctionServer
|
||||
import hep.dataforge.io.functions.FunctionServer.Companion.FUNCTION_NAME_KEY
|
||||
import hep.dataforge.io.functions.FunctionServer.Companion.INPUT_FORMAT_KEY
|
||||
import hep.dataforge.io.functions.FunctionServer.Companion.OUTPUT_FORMAT_KEY
|
||||
import hep.dataforge.io.functions.function
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.buildMeta
|
||||
import hep.dataforge.names.Name
|
||||
import kotlinx.io.nio.asOutput
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.reflect.KClass
|
||||
import kotlin.reflect.full.isSuperclassOf
|
||||
|
||||
inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
|
||||
return ioFormats.values.find { it.type.isSuperclassOf(T::class) } as IOFormat<T>?
|
||||
}
|
||||
|
||||
fun IOPlugin.resolveIOFormatName(type: KClass<*>): Name {
|
||||
return ioFormats.entries.find { it.value.type.isSuperclassOf(type) }?.key
|
||||
?: error("Can't resolve IOFormat for type $type")
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> IOPlugin.generateFunctionMeta(functionName: String): Meta = buildMeta {
|
||||
FUNCTION_NAME_KEY put functionName
|
||||
INPUT_FORMAT_KEY put resolveIOFormatName(T::class).toString()
|
||||
OUTPUT_FORMAT_KEY put resolveIOFormatName(R::class).toString()
|
||||
}
|
||||
|
||||
inline fun <reified T : Any, reified R : Any> FunctionServer.function(
|
||||
functionName: String
|
||||
): (suspend (T) -> R) {
|
||||
val plugin = context.plugins.get<IOPlugin>() ?: error("IO plugin not loaded")
|
||||
val meta = plugin.generateFunctionMeta<T, R>(functionName)
|
||||
return function(meta)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write meta to file in a given [format]
|
||||
*/
|
||||
fun Meta.write(path: Path, format: MetaFormat, descriptor: NodeDescriptor? = null) {
|
||||
format.run {
|
||||
Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
||||
.asOutput()
|
||||
.writeMeta(this@write, descriptor)
|
||||
}
|
||||
}
|
@ -3,7 +3,6 @@ package hep.dataforge.io.tcp
|
||||
import kotlinx.io.core.AbstractInput
|
||||
import kotlinx.io.core.Input
|
||||
import kotlinx.io.core.IoBuffer
|
||||
import kotlinx.io.core.IoBuffer.Companion.NoPool
|
||||
import kotlinx.io.core.writePacket
|
||||
import kotlinx.io.streams.readPacketAtMost
|
||||
import java.io.InputStream
|
||||
@ -13,7 +12,7 @@ import java.io.InputStream
|
||||
*/
|
||||
internal class InputStreamAsInput(
|
||||
private val stream: InputStream
|
||||
) : AbstractInput(pool = NoPool) {
|
||||
) : AbstractInput(pool = IoBuffer.Pool) {
|
||||
|
||||
|
||||
override fun fill(): IoBuffer? {
|
||||
|
@ -39,7 +39,7 @@ class FileBinaryTest {
|
||||
}
|
||||
val binary = envelopeFromFile.data!!
|
||||
println(binary.toBytes().size)
|
||||
assertEquals(binary.size.toInt(), binary.toBytes().size)
|
||||
assertEquals(binary.size?.toInt(), binary.toBytes().size)
|
||||
|
||||
}
|
||||
|
||||
@ -49,7 +49,7 @@ class FileBinaryTest {
|
||||
val tmpPath = Files.createTempFile("dataforge_test", ".df")
|
||||
Global.io.writeEnvelopeFile(tmpPath, envelope)
|
||||
|
||||
val binary = Global.io.readEnvelopeFile(tmpPath).data!!
|
||||
val binary = Global.io.readEnvelopeFile(tmpPath)?.data!!
|
||||
assertEquals(binary.size.toInt(), binary.toBytes().size)
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ class FileEnvelopeTest {
|
||||
val tmpPath = Files.createTempFile("dataforge_test", ".df")
|
||||
Global.io.writeEnvelopeFile(tmpPath,envelope)
|
||||
println(tmpPath.toUri())
|
||||
val restored: Envelope = Global.io.readEnvelopeFile(tmpPath)
|
||||
val restored: Envelope = Global.io.readEnvelopeFile(tmpPath)!!
|
||||
assertTrue { envelope.contentEquals(restored) }
|
||||
}
|
||||
|
||||
|
@ -1,93 +1,47 @@
|
||||
package hep.dataforge.workspace
|
||||
|
||||
import hep.dataforge.data.Data
|
||||
import hep.dataforge.data.DataNode
|
||||
import hep.dataforge.data.DataTreeBuilder
|
||||
import hep.dataforge.data.datum
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.io.*
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.data.*
|
||||
import hep.dataforge.io.Envelope
|
||||
import hep.dataforge.io.IOFormat
|
||||
import hep.dataforge.io.IOPlugin
|
||||
import hep.dataforge.io.readEnvelopeFile
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.string
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
* Read meta from file in a given [MetaFormat]
|
||||
*/
|
||||
fun MetaFormat.readMetaFile(path: Path, descriptor: NodeDescriptor? = null): Meta {
|
||||
return Files.newByteChannel(path, StandardOpenOption.READ)
|
||||
.asInput()
|
||||
.readMeta(descriptor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write meta to file using given [MetaFormat]
|
||||
*/
|
||||
fun MetaFormat.writeMetaFile(path: Path, meta: Meta, descriptor: NodeDescriptor? = null) {
|
||||
return Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
||||
.asOutput()
|
||||
.writeMeta(meta, descriptor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read data with supported envelope format and binary format. If envelope format is null, then read binary directly from file.
|
||||
* The operation is blocking since it must read meta header. The reading of envelope body is lazy
|
||||
* @param type explicit type of data read
|
||||
* @param dataFormat binary format
|
||||
* @param envelopeFormatFactory the format of envelope. If null, file is read directly
|
||||
* @param envelopeFormat the format of envelope. If null, file is read directly
|
||||
* @param metaFile the relative file for optional meta override
|
||||
* @param metaFileFormat the meta format for override
|
||||
*/
|
||||
fun <T : Any> IOPlugin.readData(
|
||||
fun <T : Any> IOPlugin.readDataFile(
|
||||
path: Path,
|
||||
type: KClass<out T>,
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormatFactory: EnvelopeFormatFactory? = null,
|
||||
metaFile: Path = path.resolveSibling("${path.fileName}.meta"),
|
||||
metaFileFormat: MetaFormat = JsonMetaFormat.default
|
||||
formatResolver: (Meta) -> IOFormat<T>
|
||||
): Data<T> {
|
||||
val externalMeta = if (Files.exists(metaFile)) {
|
||||
metaFileFormat.readMetaFile(metaFile)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
return if (envelopeFormatFactory == null) {
|
||||
Data(type, externalMeta ?: EmptyMeta) {
|
||||
withContext(Dispatchers.IO) {
|
||||
dataFormat.run {
|
||||
Files.newByteChannel(path, StandardOpenOption.READ)
|
||||
.asInput()
|
||||
.readObject()
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
readEnvelopeFile(path, envelopeFormatFactory).let {
|
||||
if (externalMeta == null) {
|
||||
it
|
||||
} else {
|
||||
it.withMetaLayers(externalMeta)
|
||||
}
|
||||
}.toData(type, dataFormat)
|
||||
}
|
||||
val envelope = readEnvelopeFile(path, true) ?: error("Can't read data from $path")
|
||||
val format = formatResolver(envelope.meta)
|
||||
return envelope.toData(type, format)
|
||||
}
|
||||
|
||||
//TODO wants multi-receiver
|
||||
fun <T : Any> DataTreeBuilder<T>.file(
|
||||
plugin: IOPlugin,
|
||||
path: Path,
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormatFactory: EnvelopeFormatFactory? = null
|
||||
formatResolver: (Meta) -> IOFormat<T>
|
||||
) {
|
||||
plugin.run {
|
||||
val data = readData(path, type, dataFormat, envelopeFormatFactory)
|
||||
val name = path.fileName.toString().replace(".df", "")
|
||||
val data = readDataFile(path, type, formatResolver)
|
||||
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
|
||||
?: path.fileName.toString().replace(".df", "")
|
||||
datum(name, data)
|
||||
}
|
||||
}
|
||||
@ -95,23 +49,35 @@ fun <T : Any> DataTreeBuilder<T>.file(
|
||||
/**
|
||||
* Read the directory as a data node
|
||||
*/
|
||||
fun <T : Any> IOPlugin.readDataNode(
|
||||
fun <T : Any> IOPlugin.readDataDirectory(
|
||||
path: Path,
|
||||
type: KClass<out T>,
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormatFactory: EnvelopeFormatFactory? = null
|
||||
formatResolver: (Meta) -> IOFormat<T>
|
||||
): DataNode<T> {
|
||||
if (!Files.isDirectory(path)) error("Provided path $this is not a directory")
|
||||
return DataNode(type) {
|
||||
Files.list(path).forEach { path ->
|
||||
if (!path.fileName.toString().endsWith(".meta")) {
|
||||
file(this@readDataNode,path, dataFormat, envelopeFormatFactory)
|
||||
file(this@readDataDirectory, path, formatResolver)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//suspend fun <T : Any> Path.writeData(
|
||||
// data: Data<T>,
|
||||
// format: IOFormat<T>,
|
||||
// )
|
||||
fun <T : Any> DataTreeBuilder<T>.directory(
|
||||
plugin: IOPlugin,
|
||||
path: Path,
|
||||
formatResolver: (Meta) -> IOFormat<T>
|
||||
) {
|
||||
plugin.run {
|
||||
val data = readDataDirectory(path, type, formatResolver)
|
||||
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
|
||||
?: path.fileName.toString().replace(".df", "")
|
||||
node(name, data)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user