Fix large buffers IO. A lot of refactoring.

This commit is contained in:
Alexander Nozik 2019-11-13 18:08:48 +03:00
parent 3e9cb3915c
commit 41d0cdb2b1
17 changed files with 147 additions and 148 deletions

View File

@ -6,7 +6,7 @@ plugins {
id("scientifik.publish") version "0.2.2" apply false id("scientifik.publish") version "0.2.2" apply false
} }
val dataforgeVersion by extra("0.1.5-dev-1") val dataforgeVersion by extra("0.1.5-dev-2")
val bintrayRepo by extra("dataforge") val bintrayRepo by extra("dataforge")
val githubProject by extra("dataforge-core") val githubProject by extra("dataforge-core")

View File

@ -13,16 +13,22 @@ import kotlin.reflect.KClass
sealed class DataItem<out T : Any> : MetaRepr { sealed class DataItem<out T : Any> : MetaRepr {
abstract val type: KClass<out T> abstract val type: KClass<out T>
abstract val meta: Meta
class Node<out T : Any>(val value: DataNode<T>) : DataItem<T>() { class Node<out T : Any>(val value: DataNode<T>) : DataItem<T>() {
override val type: KClass<out T> get() = value.type override val type: KClass<out T> get() = value.type
override fun toMeta(): Meta = value.toMeta() override fun toMeta(): Meta = value.toMeta()
override val meta: Meta get() = value.meta
} }
class Leaf<out T : Any>(val value: Data<T>) : DataItem<T>() { class Leaf<out T : Any>(val value: Data<T>) : DataItem<T>() {
override val type: KClass<out T> get() = value.type override val type: KClass<out T> get() = value.type
override fun toMeta(): Meta = value.toMeta() override fun toMeta(): Meta = value.toMeta()
override val meta: Meta get() = value.meta
} }
} }
@ -38,6 +44,8 @@ interface DataNode<out T : Any> : MetaRepr {
val items: Map<NameToken, DataItem<T>> val items: Map<NameToken, DataItem<T>>
val meta: Meta
override fun toMeta(): Meta = buildMeta { override fun toMeta(): Meta = buildMeta {
"type" put (type.simpleName ?: "undefined") "type" put (type.simpleName ?: "undefined")
"items" put { "items" put {
@ -117,12 +125,9 @@ operator fun <T : Any> DataNode<T>.iterator(): Iterator<Pair<Name, DataItem<T>>>
class DataTree<out T : Any> internal constructor( class DataTree<out T : Any> internal constructor(
override val type: KClass<out T>, override val type: KClass<out T>,
override val items: Map<NameToken, DataItem<T>> override val items: Map<NameToken, DataItem<T>>,
) : DataNode<T> { override val meta: Meta
override fun toString(): String { ) : DataNode<T>
return super.toString()
}
}
private sealed class DataTreeBuilderItem<out T : Any> { private sealed class DataTreeBuilderItem<out T : Any> {
class Node<T : Any>(val tree: DataTreeBuilder<T>) : DataTreeBuilderItem<T>() class Node<T : Any>(val tree: DataTreeBuilder<T>) : DataTreeBuilderItem<T>()
@ -136,6 +141,8 @@ private sealed class DataTreeBuilderItem<out T : Any> {
class DataTreeBuilder<T : Any>(val type: KClass<out T>) { class DataTreeBuilder<T : Any>(val type: KClass<out T>) {
private val map = HashMap<NameToken, DataTreeBuilderItem<T>>() private val map = HashMap<NameToken, DataTreeBuilderItem<T>>()
private var meta = MetaBuilder()
operator fun set(token: NameToken, node: DataTreeBuilder<out T>) { operator fun set(token: NameToken, node: DataTreeBuilder<out T>) {
if (map.containsKey(token)) error("Tree entry with name $token is not empty") if (map.containsKey(token)) error("Tree entry with name $token is not empty")
map[token] = DataTreeBuilderItem.Node(node) map[token] = DataTreeBuilderItem.Node(node)
@ -203,13 +210,19 @@ class DataTreeBuilder<T : Any>(val type: KClass<out T>) {
infix fun String.put(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder(type).apply(block)) infix fun String.put(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder(type).apply(block))
/**
* Update data with given node data and meta with node meta.
*/
fun update(node: DataNode<T>) { fun update(node: DataNode<T>) {
node.dataSequence().forEach { node.dataSequence().forEach {
//TODO check if the place is occupied //TODO check if the place is occupied
this[it.first] = it.second this[it.first] = it.second
} }
meta.update(node.meta)
} }
fun meta(block: MetaBuilder.() -> Unit) = meta.apply(block)
fun build(): DataTree<T> { fun build(): DataTree<T> {
val resMap = map.mapValues { (_, value) -> val resMap = map.mapValues { (_, value) ->
when (value) { when (value) {
@ -217,7 +230,7 @@ class DataTreeBuilder<T : Any>(val type: KClass<out T>) {
is DataTreeBuilderItem.Node -> DataItem.Node(value.tree.build()) is DataTreeBuilderItem.Node -> DataItem.Node(value.tree.build())
} }
} }
return DataTree(type, resMap) return DataTree(type, resMap, meta.seal())
} }
} }

View File

@ -52,6 +52,7 @@ inline fun <reified R : Any> Data<*>.cast(): Data<R> = cast(R::class)
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
fun <R : Any> DataNode<*>.cast(type: KClass<out R>): DataNode<R> { fun <R : Any> DataNode<*>.cast(type: KClass<out R>): DataNode<R> {
return object : DataNode<R> { return object : DataNode<R> {
override val meta: Meta get() = this@cast.meta
override val type: KClass<out R> = type override val type: KClass<out R> = type
override val items: Map<NameToken, DataItem<R>> get() = this@cast.items as Map<NameToken, DataItem<R>> override val items: Map<NameToken, DataItem<R>> get() = this@cast.items as Map<NameToken, DataItem<R>>
} }

View File

@ -1,5 +1,6 @@
package hep.dataforge.data package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.names.NameToken import hep.dataforge.names.NameToken
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -8,6 +9,7 @@ import kotlin.reflect.KClass
* A zero-copy data node wrapper that returns only children with appropriate type. * A zero-copy data node wrapper that returns only children with appropriate type.
*/ */
class TypeFilteredDataNode<out T : Any>(val origin: DataNode<*>, override val type: KClass<out T>) : DataNode<T> { class TypeFilteredDataNode<out T : Any>(val origin: DataNode<*>, override val type: KClass<out T>) : DataNode<T> {
override val meta: Meta get() = origin.meta
override val items: Map<NameToken, DataItem<T>> by lazy { override val items: Map<NameToken, DataItem<T>> by lazy {
origin.items.mapNotNull { (key, item) -> origin.items.mapNotNull { (key, item) ->
when (item) { when (item) {

View File

@ -30,12 +30,10 @@ fun <R : Any> Data<*>.filterIsInstance(type: KClass<out R>): Data<R>? =
* but could contain empty nodes * but could contain empty nodes
*/ */
fun <R : Any> DataNode<*>.filterIsInstance(type: KClass<out R>): DataNode<R> { fun <R : Any> DataNode<*>.filterIsInstance(type: KClass<out R>): DataNode<R> {
return if (canCast(type)) { return when {
cast(type) canCast(type) -> cast(type)
} else if (this is TypeFilteredDataNode) { this is TypeFilteredDataNode -> origin.filterIsInstance(type)
origin.filterIsInstance(type) else -> TypeFilteredDataNode(this, type)
} else {
TypeFilteredDataNode(this, type)
} }
} }

View File

@ -21,6 +21,7 @@ interface Envelope {
val ENVELOPE_DATA_TYPE_KEY = ENVELOPE_NODE_KEY + "dataType" val ENVELOPE_DATA_TYPE_KEY = ENVELOPE_NODE_KEY + "dataType"
val ENVELOPE_DATA_ID_KEY = ENVELOPE_NODE_KEY + "dataID" val ENVELOPE_DATA_ID_KEY = ENVELOPE_NODE_KEY + "dataID"
val ENVELOPE_DESCRIPTION_KEY = ENVELOPE_NODE_KEY + "description" val ENVELOPE_DESCRIPTION_KEY = ENVELOPE_NODE_KEY + "description"
val ENVELOPE_NAME_KEY = ENVELOPE_NODE_KEY + "name"
//const val ENVELOPE_TIME_KEY = "@envelope.time" //const val ENVELOPE_TIME_KEY = "@envelope.time"
/** /**

View File

@ -21,6 +21,7 @@ class EnvelopeBuilder {
var dataType by metaBuilder.string(key = Envelope.ENVELOPE_DATA_TYPE_KEY) var dataType by metaBuilder.string(key = Envelope.ENVELOPE_DATA_TYPE_KEY)
var dataID by metaBuilder.string(key = Envelope.ENVELOPE_DATA_ID_KEY) var dataID by metaBuilder.string(key = Envelope.ENVELOPE_DATA_ID_KEY)
var description by metaBuilder.string(key = Envelope.ENVELOPE_DESCRIPTION_KEY) var description by metaBuilder.string(key = Envelope.ENVELOPE_DESCRIPTION_KEY)
var name by metaBuilder.string(key = Envelope.ENVELOPE_NAME_KEY)
/** /**
* Construct a binary and transform it into byte-array based buffer * Construct a binary and transform it into byte-array based buffer

View File

@ -3,30 +3,31 @@ package hep.dataforge.io
import hep.dataforge.context.Global import hep.dataforge.context.Global
import hep.dataforge.io.EnvelopeParts.FORMAT_META_KEY import hep.dataforge.io.EnvelopeParts.FORMAT_META_KEY
import hep.dataforge.io.EnvelopeParts.FORMAT_NAME_KEY import hep.dataforge.io.EnvelopeParts.FORMAT_NAME_KEY
import hep.dataforge.io.EnvelopeParts.PARTS_DATA_TYPE import hep.dataforge.io.EnvelopeParts.MULTIPART_DATA_TYPE
import hep.dataforge.io.EnvelopeParts.SIZE_KEY import hep.dataforge.io.EnvelopeParts.SIZE_KEY
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.asName
import hep.dataforge.names.plus import hep.dataforge.names.plus
import hep.dataforge.names.toName import hep.dataforge.names.toName
object EnvelopeParts { object EnvelopeParts {
val SIZE_KEY = Envelope.ENVELOPE_NODE_KEY + "parts" + "size" val MULTIPART_KEY = "multipart".asName()
val FORMAT_NAME_KEY = Envelope.ENVELOPE_NODE_KEY + "parts" + "format" val SIZE_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "size"
val FORMAT_META_KEY = Envelope.ENVELOPE_NODE_KEY + "parts" + "meta" val FORMAT_NAME_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "format"
val FORMAT_META_KEY = Envelope.ENVELOPE_NODE_KEY + MULTIPART_KEY + "meta"
const val PARTS_DATA_TYPE = "envelope.parts" const val MULTIPART_DATA_TYPE = "envelope.multipart"
} }
/** /**
* Append multiple serialized envelopes to the data block. Previous data is erased if it was present * Append multiple serialized envelopes to the data block. Previous data is erased if it was present
*/ */
fun EnvelopeBuilder.parts(formatFactory: EnvelopeFormatFactory, envelopes: Collection<Envelope>) { fun EnvelopeBuilder.multipart(format: EnvelopeFormatFactory, envelopes: Collection<Envelope>) {
dataType = PARTS_DATA_TYPE dataType = MULTIPART_DATA_TYPE
meta { meta {
SIZE_KEY put envelopes.size SIZE_KEY put envelopes.size
FORMAT_NAME_KEY put formatFactory.name.toString() FORMAT_NAME_KEY put format.name.toString()
} }
val format = formatFactory()
data { data {
format.run { format.run {
envelopes.forEach { envelopes.forEach {
@ -36,15 +37,15 @@ fun EnvelopeBuilder.parts(formatFactory: EnvelopeFormatFactory, envelopes: Colle
} }
} }
fun EnvelopeBuilder.parts(formatFactory: EnvelopeFormatFactory, builder: suspend SequenceScope<Envelope>.() -> Unit) = fun EnvelopeBuilder.multipart(formatFactory: EnvelopeFormatFactory, builder: suspend SequenceScope<Envelope>.() -> Unit) =
parts(formatFactory, sequence(builder).toList()) multipart(formatFactory, sequence(builder).toList())
/** /**
* If given envelope supports multipart data, return a sequence of those parts (could be empty). Otherwise return null. * If given envelope supports multipart data, return a sequence of those parts (could be empty). Otherwise return null.
*/ */
fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Envelope>? { fun Envelope.parts(io: IOPlugin = Global.plugins.fetch(IOPlugin)): Sequence<Envelope>? {
return when (dataType) { return when (dataType) {
PARTS_DATA_TYPE -> { MULTIPART_DATA_TYPE -> {
val size = meta[SIZE_KEY].int ?: error("Unsized parts not supported yet") val size = meta[SIZE_KEY].int ?: error("Unsized parts not supported yet")
val formatName = meta[FORMAT_NAME_KEY].string?.toName() val formatName = meta[FORMAT_NAME_KEY].string?.toName()
?: error("Inferring parts format is not supported at the moment") ?: error("Inferring parts format is not supported at the moment")

View File

@ -16,7 +16,6 @@ import kotlinx.serialization.ImplicitReflectionSerializer
import kotlinx.serialization.KSerializer import kotlinx.serialization.KSerializer
import kotlinx.serialization.cbor.Cbor import kotlinx.serialization.cbor.Cbor
import kotlinx.serialization.serializer import kotlinx.serialization.serializer
import kotlin.math.min
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
@ -80,33 +79,9 @@ inline fun buildPacketWithoutPool(headerSizeHint: Int = 0, block: BytePacketBuil
} }
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeObject(obj) } fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeObject(obj) }
//TODO Double buffer copy. fix all that with IO-2
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeObject(obj) }.readBytes() fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeObject(obj) }.readBytes()
fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T { fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T = buildPacket { writeFully(array) }.readObject()
//= ByteReadPacket(array).readThis()
val byteArrayInput: Input = object : AbstractInput(
IoBuffer.Pool.borrow(),
remaining = array.size.toLong(),
pool = IoBuffer.Pool
) {
var written = 0
override fun closeSource() {
// do nothing
}
override fun fill(): IoBuffer? {
if (array.size - written <= 0) return null
return IoBuffer.Pool.fill {
reserveEndGap(IoBuffer.ReservedSize)
val toWrite = min(capacity, array.size - written)
writeFully(array, written, toWrite)
written += toWrite
}
}
}
return byteArrayInput.readObject()
}
object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> { object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
override fun invoke(meta: Meta, context: Context): IOFormat<Double> = this override fun invoke(meta: Meta, context: Context): IOFormat<Double> = this

View File

@ -64,7 +64,10 @@ class TaggedEnvelopeFormat(
val metaFormat = io.metaFormat(tag.metaFormatKey) val metaFormat = io.metaFormat(tag.metaFormatKey)
?: error("Meta format with key ${tag.metaFormatKey} not found") ?: error("Meta format with key ${tag.metaFormatKey} not found")
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt())) val metaBytes = readBytes(tag.metaSize.toInt())
val metaPacket = buildPacket {
writeFully(metaBytes)
}
val dataBytes = readBytes(tag.dataSize.toInt()) val dataBytes = readBytes(tag.dataSize.toInt())
val meta = metaFormat.run { metaPacket.readObject() } val meta = metaFormat.run { metaPacket.readObject() }

View File

@ -14,11 +14,14 @@ class EnvelopePartsTest {
} }
data { data {
writeText("Hello World $it") writeText("Hello World $it")
repeat(200){
writeInt(it)
}
} }
} }
} }
val partsEnvelope = Envelope { val partsEnvelope = Envelope {
parts(TaggedEnvelopeFormat, envelopes) multipart(TaggedEnvelopeFormat, envelopes)
} }
@Test @Test
@ -27,6 +30,7 @@ class EnvelopePartsTest {
val reconstructed = TaggedEnvelopeFormat.readBytes(bytes) val reconstructed = TaggedEnvelopeFormat.readBytes(bytes)
val parts = reconstructed.parts()?.toList() ?: emptyList() val parts = reconstructed.parts()?.toList() ?: emptyList()
assertEquals(2, parts[2].meta["value"].int) assertEquals(2, parts[2].meta["value"].int)
println(reconstructed.data!!.size)
} }
} }

View File

@ -1,9 +1,7 @@
package hep.dataforge.io package hep.dataforge.io
import hep.dataforge.meta.EmptyMeta
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import kotlinx.io.nio.asInput import kotlinx.io.nio.asInput
import kotlinx.io.nio.asOutput
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.StandardOpenOption import java.nio.file.StandardOpenOption
@ -15,7 +13,7 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
init { init {
val input = Files.newByteChannel(path, StandardOpenOption.READ).asInput() val input = Files.newByteChannel(path, StandardOpenOption.READ).asInput()
partialEnvelope = format.run { input.use { it.readPartial()} } partialEnvelope = format.run { input.use { it.readPartial() } }
} }
override val meta: Meta get() = partialEnvelope.meta override val meta: Meta get() = partialEnvelope.meta
@ -23,30 +21,3 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset, partialEnvelope.dataSize) override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset, partialEnvelope.dataSize)
} }
fun IOPlugin.readEnvelopeFile(
path: Path,
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
formatMeta: Meta = EmptyMeta
): FileEnvelope {
val format = formatFactory(formatMeta, context)
return FileEnvelope(path, format)
}
fun IOPlugin.writeEnvelopeFile(
path: Path,
envelope: Envelope,
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
formatMeta: Meta = EmptyMeta
) {
val output = Files.newByteChannel(
path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
).asOutput()
with(formatFactory(formatMeta, context)) {
output.writeObject(envelope)
}
}

View File

@ -18,24 +18,41 @@ inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
/** /**
* Read file containing meta using given [formatOverride] or file extension to infer meta type. * Read file containing meta using given [formatOverride] or file extension to infer meta type.
* If [path] is a directory search for file starting with `meta` in it
*/ */
fun IOPlugin.readMetaFile(path: Path, formatOverride: MetaFormat? = null, descriptor: NodeDescriptor? = null): Meta { fun IOPlugin.readMetaFile(path: Path, formatOverride: MetaFormat? = null, descriptor: NodeDescriptor? = null): Meta {
if (!Files.exists(path)) error("Meta file $path does not exist") if (!Files.exists(path)) error("Meta file $path does not exist")
val extension = path.fileName.toString().substringAfterLast('.')
val actualPath: Path = if (Files.isDirectory(path)) {
Files.list(path).asSequence().find { it.fileName.startsWith("meta") }
?: error("The directory $path does not contain meta file")
} else {
path
}
val extension = actualPath.fileName.toString().substringAfterLast('.')
val metaFormat = formatOverride ?: metaFormat(extension) ?: error("Can't resolve meta format $extension") val metaFormat = formatOverride ?: metaFormat(extension) ?: error("Can't resolve meta format $extension")
return metaFormat.run { return metaFormat.run {
Files.newByteChannel(path, StandardOpenOption.READ).asInput().use { it.readMeta(descriptor) } Files.newByteChannel(actualPath, StandardOpenOption.READ).asInput().use { it.readMeta(descriptor) }
} }
} }
/**
* Write meta to file using [metaFormat]. If [path] is a directory, write a file with name equals name of [metaFormat].
* Like "meta.json"
*/
fun IOPlugin.writeMetaFile( fun IOPlugin.writeMetaFile(
path: Path, path: Path,
metaFormat: MetaFormat = JsonMetaFormat, metaFormat: MetaFormatFactory = JsonMetaFormat,
descriptor: NodeDescriptor? = null descriptor: NodeDescriptor? = null
) { ) {
val actualPath = if (Files.isDirectory(path)) {
path.resolve(metaFormat.name.toString())
} else {
path
}
metaFormat.run { metaFormat.run {
Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW).asOutput().use { Files.newByteChannel(actualPath, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW).asOutput().use {
it.writeMeta(meta, descriptor) it.writeMeta(meta, descriptor)
} }
} }
@ -43,9 +60,19 @@ fun IOPlugin.writeMetaFile(
/** /**
* Read and envelope from file if the file exists, return null if file does not exist. * Read and envelope from file if the file exists, return null if file does not exist.
*
* If file is directory, then expect two files inside:
* * **meta.<format name>** for meta
* * **data** for data
*
* If the file is envelope read it using [EnvelopeFormatFactory.peekFormat] functionality to infer format.
*
* If the file is not an envelope and [readNonEnvelopes] is true, return an Envelope without meta, using file as binary.
*
* Return null otherwise.
*/ */
@DFExperimental @DFExperimental
fun IOPlugin.readEnvelopeFromFile(path: Path, readNonEnvelopes: Boolean = false): Envelope? { fun IOPlugin.readEnvelopeFile(path: Path, readNonEnvelopes: Boolean = false): Envelope? {
if (!Files.exists(path)) return null if (!Files.exists(path)) return null
//read two-files directory //read two-files directory
@ -74,7 +101,7 @@ fun IOPlugin.readEnvelopeFromFile(path: Path, readNonEnvelopes: Boolean = false)
val formats = envelopeFormatFactories.mapNotNull { factory -> val formats = envelopeFormatFactories.mapNotNull { factory ->
binary.read { binary.read {
factory.peekFormat(this@readEnvelopeFromFile, this@read) factory.peekFormat(this@readEnvelopeFile, this@read)
} }
} }
return when (formats.size) { return when (formats.size) {
@ -91,3 +118,20 @@ fun IOPlugin.readEnvelopeFromFile(path: Path, readNonEnvelopes: Boolean = false)
else -> error("Envelope format file recognition clash") else -> error("Envelope format file recognition clash")
} }
} }
fun IOPlugin.writeEnvelopeFile(
path: Path,
envelope: Envelope,
format: EnvelopeFormat = TaggedEnvelopeFormat
) {
val output = Files.newByteChannel(
path,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
).asOutput()
with(format) {
output.writeObject(envelope)
}
}

View File

@ -3,7 +3,6 @@ package hep.dataforge.io.tcp
import kotlinx.io.core.AbstractInput import kotlinx.io.core.AbstractInput
import kotlinx.io.core.Input import kotlinx.io.core.Input
import kotlinx.io.core.IoBuffer import kotlinx.io.core.IoBuffer
import kotlinx.io.core.IoBuffer.Companion.NoPool
import kotlinx.io.core.writePacket import kotlinx.io.core.writePacket
import kotlinx.io.streams.readPacketAtMost import kotlinx.io.streams.readPacketAtMost
import java.io.InputStream import java.io.InputStream
@ -13,7 +12,7 @@ import java.io.InputStream
*/ */
internal class InputStreamAsInput( internal class InputStreamAsInput(
private val stream: InputStream private val stream: InputStream
) : AbstractInput(pool = NoPool) { ) : AbstractInput(pool = IoBuffer.Pool) {
override fun fill(): IoBuffer? { override fun fill(): IoBuffer? {

View File

@ -49,8 +49,8 @@ class FileBinaryTest {
val tmpPath = Files.createTempFile("dataforge_test", ".df") val tmpPath = Files.createTempFile("dataforge_test", ".df")
Global.io.writeEnvelopeFile(tmpPath, envelope) Global.io.writeEnvelopeFile(tmpPath, envelope)
val binary = Global.io.readEnvelopeFile(tmpPath).data!! val binary = Global.io.readEnvelopeFile(tmpPath)?.data!!
assertEquals(binary.size?.toInt(), binary.toBytes().size) assertEquals(binary.size.toInt(), binary.toBytes().size)
} }
} }

View File

@ -25,7 +25,7 @@ class FileEnvelopeTest {
val tmpPath = Files.createTempFile("dataforge_test", ".df") val tmpPath = Files.createTempFile("dataforge_test", ".df")
Global.io.writeEnvelopeFile(tmpPath,envelope) Global.io.writeEnvelopeFile(tmpPath,envelope)
println(tmpPath.toUri()) println(tmpPath.toUri())
val restored: Envelope = Global.io.readEnvelopeFile(tmpPath) val restored: Envelope = Global.io.readEnvelopeFile(tmpPath)!!
assertTrue { envelope.contentEquals(restored) } assertTrue { envelope.contentEquals(restored) }
} }
} }

View File

@ -1,17 +1,15 @@
package hep.dataforge.workspace package hep.dataforge.workspace
import hep.dataforge.data.Data import hep.dataforge.data.*
import hep.dataforge.data.DataNode import hep.dataforge.io.Envelope
import hep.dataforge.data.DataTreeBuilder import hep.dataforge.io.IOFormat
import hep.dataforge.data.datum import hep.dataforge.io.IOPlugin
import hep.dataforge.io.* import hep.dataforge.io.readEnvelopeFile
import hep.dataforge.meta.EmptyMeta import hep.dataforge.meta.Meta
import kotlinx.coroutines.Dispatchers import hep.dataforge.meta.get
import kotlinx.coroutines.withContext import hep.dataforge.meta.string
import kotlinx.io.nio.asInput
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.StandardOpenOption
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -20,54 +18,30 @@ import kotlin.reflect.KClass
* The operation is blocking since it must read meta header. The reading of envelope body is lazy * The operation is blocking since it must read meta header. The reading of envelope body is lazy
* @param type explicit type of data read * @param type explicit type of data read
* @param dataFormat binary format * @param dataFormat binary format
* @param envelopeFormatFactory the format of envelope. If null, file is read directly * @param envelopeFormat the format of envelope. If null, file is read directly
* @param metaFile the relative file for optional meta override * @param metaFile the relative file for optional meta override
* @param metaFileFormat the meta format for override * @param metaFileFormat the meta format for override
*/ */
fun <T : Any> IOPlugin.readData( fun <T : Any> IOPlugin.readDataFile(
path: Path, path: Path,
type: KClass<out T>, type: KClass<out T>,
dataFormat: IOFormat<T>, formatResolver: (Meta) -> IOFormat<T>
envelopeFormatFactory: EnvelopeFormatFactory? = null,
metaFile: Path = path.resolveSibling("${path.fileName}.meta"),
metaFileFormat: MetaFormat = JsonMetaFormat
): Data<T> { ): Data<T> {
val externalMeta = if (Files.exists(metaFile)) { val envelope = readEnvelopeFile(path, true) ?: error("Can't read data from $path")
readMetaFile(metaFile) val format = formatResolver(envelope.meta)
} else { return envelope.toData(type, format)
null
}
return if (envelopeFormatFactory == null) {
Data(type, externalMeta ?: EmptyMeta) {
withContext(Dispatchers.IO) {
dataFormat.run {
Files.newByteChannel(path, StandardOpenOption.READ)
.asInput()
.readObject()
}
}
}
} else {
readEnvelopeFile(path, envelopeFormatFactory).let {
if (externalMeta == null) {
it
} else {
it.withMetaLayers(externalMeta)
}
}.toData(type, dataFormat)
}
} }
//TODO wants multi-receiver //TODO wants multi-receiver
fun <T : Any> DataTreeBuilder<T>.file( fun <T : Any> DataTreeBuilder<T>.file(
plugin: IOPlugin, plugin: IOPlugin,
path: Path, path: Path,
dataFormat: IOFormat<T>, formatResolver: (Meta) -> IOFormat<T>
envelopeFormatFactory: EnvelopeFormatFactory? = null
) { ) {
plugin.run { plugin.run {
val data = readData(path, type, dataFormat, envelopeFormatFactory) val data = readDataFile(path, type, formatResolver)
val name = path.fileName.toString().replace(".df", "") val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
datum(name, data) datum(name, data)
} }
} }
@ -75,23 +49,35 @@ fun <T : Any> DataTreeBuilder<T>.file(
/** /**
* Read the directory as a data node * Read the directory as a data node
*/ */
fun <T : Any> IOPlugin.readDataNode( fun <T : Any> IOPlugin.readDataDirectory(
path: Path, path: Path,
type: KClass<out T>, type: KClass<out T>,
dataFormat: IOFormat<T>, formatResolver: (Meta) -> IOFormat<T>
envelopeFormatFactory: EnvelopeFormatFactory? = null
): DataNode<T> { ): DataNode<T> {
if (!Files.isDirectory(path)) error("Provided path $this is not a directory") if (!Files.isDirectory(path)) error("Provided path $this is not a directory")
return DataNode(type) { return DataNode(type) {
Files.list(path).forEach { path -> Files.list(path).forEach { path ->
if (!path.fileName.toString().endsWith(".meta")) { if (!path.fileName.toString().endsWith(".meta")) {
file(this@readDataNode,path, dataFormat, envelopeFormatFactory) file(this@readDataDirectory, path, formatResolver)
} }
} }
} }
} }
//suspend fun <T : Any> Path.writeData( fun <T : Any> DataTreeBuilder<T>.directory(
// data: Data<T>, plugin: IOPlugin,
// format: IOFormat<T>, path: Path,
// ) formatResolver: (Meta) -> IOFormat<T>
) {
plugin.run {
val data = readDataDirectory(path, type, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
node(name, data)
}
}