Refactored IO logic
This commit is contained in:
parent
23a2d29e84
commit
44a34eee40
@ -3,7 +3,7 @@ plugins {
|
||||
id("scientifik.publish") version "0.2.1" apply false
|
||||
}
|
||||
|
||||
val dataforgeVersion by extra("0.1.4-dev-5")
|
||||
val dataforgeVersion by extra("0.1.4-dev-6")
|
||||
|
||||
val bintrayRepo by extra("dataforge")
|
||||
val githubProject by extra("dataforge-core")
|
||||
|
@ -0,0 +1,8 @@
|
||||
package hep.dataforge.context
|
||||
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
|
||||
interface Factory<out T : Any> {
|
||||
operator fun invoke(meta: Meta = EmptyMeta, context: Context = Global): T
|
||||
}
|
@ -38,7 +38,7 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
|
||||
* @param recursive search for parent [PluginManager] plugins
|
||||
* @param predicate condition for the plugin
|
||||
*/
|
||||
fun get(recursive: Boolean = true, predicate: (Plugin) -> Boolean): Plugin? = sequence(recursive).find(predicate)
|
||||
fun find(recursive: Boolean = true, predicate: (Plugin) -> Boolean): Plugin? = sequence(recursive).find(predicate)
|
||||
|
||||
|
||||
/**
|
||||
@ -47,7 +47,7 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
|
||||
* @param tag
|
||||
* @return
|
||||
*/
|
||||
operator fun get(tag: PluginTag, recursive: Boolean = true): Plugin? = get(recursive) { tag.matches(it.tag) }
|
||||
operator fun get(tag: PluginTag, recursive: Boolean = true): Plugin? = find(recursive) { tag.matches(it.tag) }
|
||||
|
||||
|
||||
/**
|
||||
@ -63,11 +63,13 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
|
||||
*/
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
operator fun <T : Any> get(type: KClass<T>, tag: PluginTag? = null, recursive: Boolean = true): T? =
|
||||
get(recursive) { type.isInstance(it) && (tag == null || tag.matches(it.tag)) } as T?
|
||||
find(recursive) { type.isInstance(it) && (tag == null || tag.matches(it.tag)) } as T?
|
||||
|
||||
inline fun <reified T : Any> get(tag: PluginTag? = null, recursive: Boolean = true): T? =
|
||||
inline operator fun <reified T : Any> get(tag: PluginTag? = null, recursive: Boolean = true): T? =
|
||||
get(T::class, tag, recursive)
|
||||
|
||||
inline operator fun <reified T : Plugin> get(factory: PluginFactory<T>, recursive: Boolean = true): T? =
|
||||
get(factory.type, factory.tag, recursive)
|
||||
|
||||
/**
|
||||
* Load given plugin into this manager and return loaded instance.
|
||||
@ -97,7 +99,7 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
|
||||
* Load a plugin using its factory
|
||||
*/
|
||||
fun <T : Plugin> load(factory: PluginFactory<T>, meta: Meta = EmptyMeta): T =
|
||||
load(factory(meta))
|
||||
load(factory(meta, context))
|
||||
|
||||
fun <T : Plugin> load(factory: PluginFactory<T>, metaBuilder: MetaBuilder.() -> Unit): T =
|
||||
load(factory, buildMeta(metaBuilder))
|
||||
@ -122,7 +124,7 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
|
||||
fun <T : Plugin> fetch(factory: PluginFactory<T>, recursive: Boolean = true, meta: Meta = EmptyMeta): T {
|
||||
val loaded = get(factory.type, factory.tag, recursive)
|
||||
return when {
|
||||
loaded == null -> load(factory(meta))
|
||||
loaded == null -> load(factory(meta, context))
|
||||
loaded.meta == meta -> loaded // if meta is the same, return existing plugin
|
||||
else -> throw RuntimeException("Can't load plugin with tag ${factory.tag}. Plugin with this tag and different configuration already exists in context.")
|
||||
}
|
||||
|
@ -4,10 +4,9 @@ import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
interface PluginFactory<T : Plugin> {
|
||||
interface PluginFactory<T : Plugin> : Factory<T> {
|
||||
val tag: PluginTag
|
||||
val type: KClass<out T>
|
||||
operator fun invoke(meta: Meta = EmptyMeta): T
|
||||
}
|
||||
|
||||
expect object PluginRepository {
|
||||
@ -25,25 +24,26 @@ expect object PluginRepository {
|
||||
* Fetch specific plugin and instantiate it with given meta
|
||||
*/
|
||||
fun PluginRepository.fetch(tag: PluginTag, meta: Meta = EmptyMeta): Plugin =
|
||||
list().find { it.tag.matches(tag) }?.invoke(meta) ?: error("Plugin with tag $tag not found in the repository")
|
||||
list().find { it.tag.matches(tag) }?.invoke(meta = meta)
|
||||
?: error("Plugin with tag $tag not found in the repository")
|
||||
|
||||
fun <T : Plugin> PluginRepository.register(
|
||||
tag: PluginTag,
|
||||
type: KClass<out T>,
|
||||
constructor: (Meta) -> T
|
||||
constructor: (Context, Meta) -> T
|
||||
): PluginFactory<T> {
|
||||
val factory = object : PluginFactory<T> {
|
||||
override val tag: PluginTag = tag
|
||||
override val type: KClass<out T> = type
|
||||
|
||||
override fun invoke(meta: Meta): T = constructor(meta)
|
||||
override fun invoke(meta: Meta, context: Context): T = constructor(context, meta)
|
||||
|
||||
}
|
||||
register(factory)
|
||||
return factory
|
||||
}
|
||||
|
||||
inline fun <reified T : Plugin> PluginRepository.register(tag: PluginTag, noinline constructor: (Meta) -> T) =
|
||||
inline fun <reified T : Plugin> PluginRepository.register(tag: PluginTag, noinline constructor: (Context, Meta) -> T) =
|
||||
register(tag, T::class, constructor)
|
||||
|
||||
fun PluginRepository.register(plugin: Plugin) = register(plugin.tag, plugin::class) { plugin }
|
||||
fun PluginRepository.register(plugin: Plugin) = register(plugin.tag, plugin::class) { _, _ -> plugin }
|
@ -1,5 +1,6 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.Name
|
||||
@ -10,10 +11,12 @@ import kotlinx.io.core.Output
|
||||
import kotlinx.io.core.readText
|
||||
import kotlinx.io.core.writeText
|
||||
|
||||
object BinaryMetaFormat : MetaFormat {
|
||||
object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
|
||||
override val name: Name = super.name + "bin"
|
||||
override val key: Short = 0x4249//BI
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = this
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
|
||||
return (readMetaItem() as MetaItem.NodeItem).node
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Named
|
||||
import hep.dataforge.io.EnvelopeFormat.Companion.ENVELOPE_FORMAT_TYPE
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
@ -16,22 +16,23 @@ import kotlin.reflect.KClass
|
||||
@ExperimentalUnsignedTypes
|
||||
data class PartialEnvelope(val meta: Meta, val dataOffset: UInt, val dataSize: ULong?)
|
||||
|
||||
|
||||
interface EnvelopeFormat : IOFormat<Envelope> {
|
||||
fun Input.readPartial(): PartialEnvelope
|
||||
|
||||
override fun Input.readThis(): Envelope
|
||||
|
||||
override fun Output.writeThis(obj: Envelope)
|
||||
}
|
||||
|
||||
@Type(ENVELOPE_FORMAT_TYPE)
|
||||
interface EnvelopeFormat : IOFormat<Envelope>, Named {
|
||||
interface EnvelopeFormatFactory : IOFormatFactory<Envelope> {
|
||||
override val name: Name get() = "envelope".asName()
|
||||
override val type: KClass<out Envelope> get() = Envelope::class
|
||||
|
||||
fun Input.readPartial(formats: Collection<MetaFormat> = IOPlugin.defaultMetaFormats): PartialEnvelope
|
||||
|
||||
fun Input.readEnvelope(formats: Collection<MetaFormat> = IOPlugin.defaultMetaFormats): Envelope
|
||||
|
||||
override fun Input.readThis(): Envelope = readEnvelope()
|
||||
|
||||
fun Output.writeEnvelope(envelope: Envelope, format: MetaFormat = JsonMetaFormat)
|
||||
|
||||
override fun Output.writeThis(obj: Envelope) = writeEnvelope(obj)
|
||||
override fun invoke(meta: Meta, context: Context): EnvelopeFormat
|
||||
|
||||
companion object {
|
||||
const val ENVELOPE_FORMAT_TYPE = "envelopeFormat"
|
||||
const val ENVELOPE_FORMAT_TYPE = "io.format.envelope"
|
||||
}
|
||||
}
|
@ -1,12 +1,13 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.context.Factory
|
||||
import hep.dataforge.context.Named
|
||||
import hep.dataforge.io.IOFormat.Companion.TYPE
|
||||
import hep.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.names.EmptyName
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
import hep.dataforge.names.toName
|
||||
import hep.dataforge.provider.Type
|
||||
import hep.dataforge.values.Value
|
||||
import kotlinx.io.core.*
|
||||
@ -19,27 +20,33 @@ import kotlin.reflect.KClass
|
||||
/**
|
||||
* And interface for serialization facilities
|
||||
*/
|
||||
@Type(TYPE)
|
||||
interface IOFormat<T : Any> : Named {
|
||||
|
||||
interface IOFormat<T : Any> {
|
||||
fun Output.writeThis(obj: T)
|
||||
fun Input.readThis(): T
|
||||
|
||||
|
||||
}
|
||||
|
||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) }
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes()
|
||||
fun <T : Any> IOFormat<T>.readBytes(array: ByteArray): T = ByteReadPacket(array).readThis()
|
||||
|
||||
@Type(IO_FORMAT_TYPE)
|
||||
interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
|
||||
/**
|
||||
* Explicit type for dynamic type checks
|
||||
*/
|
||||
val type: KClass<out T>
|
||||
|
||||
fun Output.writeThis(obj: T)
|
||||
fun Input.readThis(): T
|
||||
|
||||
companion object {
|
||||
const val TYPE = "ioFormat"
|
||||
const val IO_FORMAT_TYPE = "io.format"
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) }
|
||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes()
|
||||
fun <T: Any> IOFormat<T>.readBytes(array: ByteArray): T = ByteReadPacket(array).readThis()
|
||||
|
||||
|
||||
object DoubleIOFormat : IOFormat<Double> {
|
||||
object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
|
||||
override fun invoke(meta: Meta, context: Context): IOFormat<Double> = this
|
||||
|
||||
override val name: Name = "double".asName()
|
||||
|
||||
@ -52,7 +59,8 @@ object DoubleIOFormat : IOFormat<Double> {
|
||||
override fun Input.readThis(): Double = readDouble()
|
||||
}
|
||||
|
||||
object ValueIOFormat : IOFormat<Value> {
|
||||
object ValueIOFormat : IOFormat<Value>, IOFormatFactory<Value> {
|
||||
override fun invoke(meta: Meta, context: Context): IOFormat<Value> = this
|
||||
|
||||
override val name: Name = "value".asName()
|
||||
|
||||
@ -73,11 +81,11 @@ object ValueIOFormat : IOFormat<Value> {
|
||||
*/
|
||||
@ImplicitReflectionSerializer
|
||||
class SerializerIOFormat<T : Any>(
|
||||
override val type: KClass<T>,
|
||||
type: KClass<T>,
|
||||
val serializer: KSerializer<T> = type.serializer()
|
||||
) : IOFormat<T> {
|
||||
|
||||
override val name: Name = type.simpleName?.toName() ?: EmptyName
|
||||
//override val name: Name = type.simpleName?.toName() ?: EmptyName
|
||||
|
||||
|
||||
override fun Output.writeThis(obj: T) {
|
||||
|
@ -1,9 +1,10 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.*
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.MetaItem
|
||||
import hep.dataforge.meta.string
|
||||
import hep.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
|
||||
import hep.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
|
||||
import hep.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
||||
import hep.dataforge.meta.*
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.get
|
||||
import kotlin.reflect.KClass
|
||||
@ -11,47 +12,49 @@ import kotlin.reflect.KClass
|
||||
class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
|
||||
override val tag: PluginTag get() = Companion.tag
|
||||
|
||||
val metaFormats by lazy {
|
||||
context.content<MetaFormat>(MetaFormat.META_FORMAT_TYPE).values
|
||||
val metaFormatFactories by lazy {
|
||||
context.content<MetaFormatFactory>(META_FORMAT_TYPE).values
|
||||
}
|
||||
|
||||
fun metaFormat(key: Short): MetaFormat? = metaFormats.find { it.key == key }
|
||||
fun metaFormat(name: String): MetaFormat? = metaFormats.find { it.name.toString() == name }
|
||||
fun metaFormat(key: Short, meta: Meta = EmptyMeta): MetaFormat? =
|
||||
metaFormatFactories.find { it.key == key }?.invoke(meta)
|
||||
|
||||
fun metaFormat(name: String, meta: Meta = EmptyMeta): MetaFormat? =
|
||||
metaFormatFactories.find { it.name.toString() == name }?.invoke(meta)
|
||||
|
||||
val envelopeFormatFactories by lazy {
|
||||
context.content<EnvelopeFormatFactory>(ENVELOPE_FORMAT_TYPE).values
|
||||
}
|
||||
|
||||
override fun provideTop(target: String): Map<Name, Any> {
|
||||
return when (target) {
|
||||
MetaFormat.META_FORMAT_TYPE -> defaultMetaFormats.toMap()
|
||||
EnvelopeFormat.ENVELOPE_FORMAT_TYPE -> defaultEnvelopeFormats.toMap()
|
||||
IOFormat.TYPE -> defaultIOFormats.toMap()
|
||||
META_FORMAT_TYPE -> defaultMetaFormats.toMap()
|
||||
ENVELOPE_FORMAT_TYPE -> defaultEnvelopeFormats.toMap()
|
||||
else -> super.provideTop(target)
|
||||
}
|
||||
}
|
||||
|
||||
val ioFormats: Map<Name, IOFormat<*>> by lazy {
|
||||
context.content<IOFormat<*>>(IOFormat.TYPE)
|
||||
val ioFormats: Map<Name, IOFormatFactory<*>> by lazy {
|
||||
context.content<IOFormatFactory<*>>(IO_FORMAT_TYPE)
|
||||
}
|
||||
|
||||
fun <T: Any> resolveIOFormat(item: MetaItem<*>, type: KClass<out T>): IOFormat<T>? {
|
||||
val key = item.string ?: error("Not a string value!")
|
||||
fun <T : Any> resolveIOFormat(item: MetaItem<*>, type: KClass<out T>): IOFormat<T>? {
|
||||
val key = item.string ?: item.node["name"]?.string ?: error("Format name not defined")
|
||||
return ioFormats[key]?.let {
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
if(it.type!= type) error("Format type ${it.type} is not the same as requested type ${type}")
|
||||
else it as IOFormat<T>
|
||||
if (it.type != type) error("Format type ${it.type} is not the same as requested type $type")
|
||||
else it.invoke(item.node["meta"].node ?: EmptyMeta, context) as IOFormat<T>
|
||||
}
|
||||
}
|
||||
|
||||
companion object : PluginFactory<IOPlugin> {
|
||||
val defaultMetaFormats: List<MetaFormat> = listOf(JsonMetaFormat, BinaryMetaFormat)
|
||||
val defaultMetaFormats: List<MetaFormatFactory> = listOf(JsonMetaFormat, BinaryMetaFormat)
|
||||
val defaultEnvelopeFormats = listOf(TaggedEnvelopeFormat)
|
||||
|
||||
val defaultIOFormats = listOf(
|
||||
DoubleIOFormat,
|
||||
ValueIOFormat,
|
||||
BinaryMetaFormat
|
||||
)
|
||||
|
||||
override val tag: PluginTag = PluginTag("io", group = PluginTag.DATAFORGE_GROUP)
|
||||
override val type: KClass<out IOPlugin> = IOPlugin::class
|
||||
override fun invoke(meta: Meta): IOPlugin = IOPlugin(meta)
|
||||
override fun invoke(meta: Meta, context: Context): IOPlugin = IOPlugin(meta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val Context.io: IOPlugin get() = plugins.fetch(IOPlugin)
|
@ -2,6 +2,7 @@
|
||||
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.descriptors.ItemDescriptor
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.descriptors.ValueDescriptor
|
||||
@ -23,26 +24,32 @@ import kotlin.collections.component2
|
||||
import kotlin.collections.set
|
||||
|
||||
|
||||
object JsonMetaFormat : MetaFormat {
|
||||
|
||||
override val name: Name = super.name + "json"
|
||||
override val key: Short = 0x4a53//"JS"
|
||||
class JsonMetaFormat(private val json: Json = Json.plain) : MetaFormat {
|
||||
|
||||
override fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor?) {
|
||||
val json = meta.toJson(descriptor)
|
||||
writeText(json.toString())
|
||||
val jsonObject = meta.toJson(descriptor)
|
||||
writeText(json.stringify(JsonObjectSerializer, jsonObject))
|
||||
}
|
||||
|
||||
override fun Input.readMeta(descriptor: NodeDescriptor?): Meta {
|
||||
val str = readText()
|
||||
val json = Json.plain.parseJson(str)
|
||||
val jsonElement = json.parseJson(str)
|
||||
|
||||
if (json is JsonObject) {
|
||||
return json.toMeta()
|
||||
if (jsonElement is JsonObject) {
|
||||
return jsonElement.toMeta()
|
||||
} else {
|
||||
TODO("Non-object root not supported")
|
||||
}
|
||||
}
|
||||
|
||||
companion object : MetaFormatFactory {
|
||||
val default = JsonMetaFormat()
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): MetaFormat = default
|
||||
|
||||
override val name: Name = super.name + "json"
|
||||
override val key: Short = 0x4a53//"JS"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,8 +1,8 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Named
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.descriptors.NodeDescriptor
|
||||
import hep.dataforge.io.MetaFormat.Companion.META_FORMAT_TYPE
|
||||
import hep.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.asName
|
||||
@ -13,12 +13,8 @@ import kotlin.reflect.KClass
|
||||
/**
|
||||
* A format for meta serialization
|
||||
*/
|
||||
@Type(META_FORMAT_TYPE)
|
||||
interface MetaFormat : IOFormat<Meta>, Named {
|
||||
override val name: Name get() = "meta".asName()
|
||||
val key: Short
|
||||
|
||||
override val type: KClass<out Meta> get() = Meta::class
|
||||
interface MetaFormat : IOFormat<Meta> {
|
||||
|
||||
override fun Output.writeThis(obj: Meta) {
|
||||
writeMeta(obj, null)
|
||||
@ -28,9 +24,20 @@ interface MetaFormat : IOFormat<Meta>, Named {
|
||||
|
||||
fun Output.writeMeta(meta: Meta, descriptor: NodeDescriptor? = null)
|
||||
fun Input.readMeta(descriptor: NodeDescriptor? = null): Meta
|
||||
}
|
||||
|
||||
@Type(META_FORMAT_TYPE)
|
||||
interface MetaFormatFactory : IOFormatFactory<Meta> {
|
||||
override val name: Name get() = "meta".asName()
|
||||
|
||||
override val type: KClass<out Meta> get() = Meta::class
|
||||
|
||||
val key: Short
|
||||
|
||||
override operator fun invoke(meta: Meta, context: Context): MetaFormat
|
||||
|
||||
companion object {
|
||||
const val META_FORMAT_TYPE = "metaFormat"
|
||||
const val META_FORMAT_TYPE = "io.format.meta"
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,15 +45,18 @@ fun Meta.toString(format: MetaFormat): String = buildPacket {
|
||||
format.run { writeThis(this@toString) }
|
||||
}.readText()
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat): ByteReadPacket = buildPacket {
|
||||
fun Meta.toString(formatFactory: MetaFormatFactory): String = toString(formatFactory())
|
||||
|
||||
fun Meta.toBytes(format: MetaFormat = JsonMetaFormat.default): ByteReadPacket = buildPacket {
|
||||
format.run { writeThis(this@toBytes) }
|
||||
}
|
||||
|
||||
|
||||
fun MetaFormat.parse(str: String): Meta {
|
||||
return ByteReadPacket(str.toByteArray()).readThis()
|
||||
}
|
||||
|
||||
fun MetaFormatFactory.parse(str: String): Meta = invoke().parse(str)
|
||||
|
||||
fun MetaFormat.fromBytes(packet: ByteReadPacket): Meta {
|
||||
return packet.readThis()
|
||||
}
|
||||
|
@ -1,17 +1,29 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.meta.Meta
|
||||
import hep.dataforge.meta.get
|
||||
import hep.dataforge.meta.string
|
||||
import hep.dataforge.names.Name
|
||||
import hep.dataforge.names.plus
|
||||
import hep.dataforge.names.toName
|
||||
import kotlinx.io.core.*
|
||||
|
||||
@ExperimentalUnsignedTypes
|
||||
object TaggedEnvelopeFormat : EnvelopeFormat {
|
||||
const val VERSION = "DF03"
|
||||
private const val START_SEQUENCE = "#~"
|
||||
private const val END_SEQUENCE = "~#\r\n"
|
||||
private const val TAG_SIZE = 24u
|
||||
class TaggedEnvelopeFormat(val io: IOPlugin, meta: Meta) : EnvelopeFormat {
|
||||
|
||||
override val name: Name = super.name + VERSION
|
||||
private val metaFormat: MetaFormat
|
||||
|
||||
private val metaFormatKey: Short
|
||||
|
||||
init {
|
||||
val metaName = meta["name"].string?.toName() ?: JsonMetaFormat.name
|
||||
val metaFormatFactory = io.metaFormatFactories.find { it.name == metaName }
|
||||
?: error("Meta format could not be resolved")
|
||||
|
||||
metaFormat = metaFormatFactory(meta, io.context)
|
||||
metaFormatKey = metaFormatFactory.key
|
||||
}
|
||||
|
||||
private fun Tag.toBytes(): ByteReadPacket = buildPacket(24) {
|
||||
writeText(START_SEQUENCE)
|
||||
@ -35,12 +47,12 @@ object TaggedEnvelopeFormat : EnvelopeFormat {
|
||||
return Tag(metaFormatKey, metaLength, dataLength)
|
||||
}
|
||||
|
||||
override fun Output.writeEnvelope(envelope: Envelope, format: MetaFormat) {
|
||||
val metaBytes = format.writeBytes(envelope.meta)
|
||||
val tag = Tag(format.key, metaBytes.size.toUInt(), envelope.data?.size ?: 0.toULong())
|
||||
override fun Output.writeThis(obj: Envelope) {
|
||||
val metaBytes = metaFormat.writeBytes(obj.meta)
|
||||
val tag = Tag(metaFormatKey, metaBytes.size.toUInt(), obj.data?.size ?: 0.toULong())
|
||||
writePacket(tag.toBytes())
|
||||
writeFully(metaBytes)
|
||||
envelope.data?.read { copyTo(this@writeEnvelope) }
|
||||
obj.data?.read { copyTo(this@writeThis) }
|
||||
}
|
||||
|
||||
/**
|
||||
@ -49,10 +61,10 @@ object TaggedEnvelopeFormat : EnvelopeFormat {
|
||||
* @param input an input to read from
|
||||
* @param formats a collection of meta formats to resolve
|
||||
*/
|
||||
override fun Input.readEnvelope(formats: Collection<MetaFormat>): Envelope {
|
||||
override fun Input.readThis(): Envelope {
|
||||
val tag = readTag()
|
||||
|
||||
val metaFormat = formats.find { it.key == tag.metaFormatKey }
|
||||
val metaFormat = io.metaFormat(tag.metaFormatKey)
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
|
||||
@ -62,10 +74,10 @@ object TaggedEnvelopeFormat : EnvelopeFormat {
|
||||
return SimpleEnvelope(meta, ArrayBinary(dataBytes))
|
||||
}
|
||||
|
||||
override fun Input.readPartial(formats: Collection<MetaFormat>): PartialEnvelope {
|
||||
override fun Input.readPartial(): PartialEnvelope {
|
||||
val tag = readTag()
|
||||
|
||||
val metaFormat = formats.find { it.key == tag.metaFormatKey }
|
||||
val metaFormat = io.metaFormat(tag.metaFormatKey)
|
||||
?: error("Meta format with key ${tag.metaFormatKey} not found")
|
||||
|
||||
val metaPacket = ByteReadPacket(readBytes(tag.metaSize.toInt()))
|
||||
@ -80,4 +92,20 @@ object TaggedEnvelopeFormat : EnvelopeFormat {
|
||||
val dataSize: ULong
|
||||
)
|
||||
|
||||
companion object : EnvelopeFormatFactory {
|
||||
const val VERSION = "DF03"
|
||||
private const val START_SEQUENCE = "#~"
|
||||
private const val END_SEQUENCE = "~#\r\n"
|
||||
private const val TAG_SIZE = 24u
|
||||
|
||||
override val name: Name = super.name + VERSION
|
||||
|
||||
override fun invoke(meta: Meta, context: Context): EnvelopeFormat {
|
||||
val plugin = context.plugins.fetch(IOPlugin)
|
||||
return TaggedEnvelopeFormat(plugin, meta)
|
||||
}
|
||||
|
||||
val default = invoke()
|
||||
}
|
||||
|
||||
}
|
@ -18,7 +18,7 @@ class EnvelopeFormatTest {
|
||||
@ExperimentalStdlibApi
|
||||
@Test
|
||||
fun testTaggedFormat(){
|
||||
TaggedEnvelopeFormat.run {
|
||||
TaggedEnvelopeFormat().run {
|
||||
val bytes = writeBytes(envelope)
|
||||
println(bytes.decodeToString())
|
||||
val res = readBytes(bytes)
|
||||
|
@ -1,5 +1,6 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
@ -22,22 +23,30 @@ class FileEnvelope internal constructor(val path: Path, val format: EnvelopeForm
|
||||
override val data: Binary? = FileBinary(path, partialEnvelope.dataOffset, partialEnvelope.dataSize)
|
||||
}
|
||||
|
||||
fun Path.readEnvelope(format: EnvelopeFormat = TaggedEnvelopeFormat) = FileEnvelope(this, format)
|
||||
fun IOPlugin.readEnvelopeFile(
|
||||
path: Path,
|
||||
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
): FileEnvelope {
|
||||
val format = formatFactory(formatMeta, context)
|
||||
return FileEnvelope(path, format)
|
||||
}
|
||||
|
||||
fun Path.writeEnvelope(
|
||||
fun IOPlugin.writeEnvelopeFile(
|
||||
path: Path,
|
||||
envelope: Envelope,
|
||||
format: EnvelopeFormat = TaggedEnvelopeFormat,
|
||||
metaFormat: MetaFormat = JsonMetaFormat
|
||||
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
) {
|
||||
val output = Files.newByteChannel(
|
||||
this,
|
||||
path,
|
||||
StandardOpenOption.WRITE,
|
||||
StandardOpenOption.CREATE,
|
||||
StandardOpenOption.TRUNCATE_EXISTING
|
||||
).asOutput()
|
||||
|
||||
with(format) {
|
||||
output.writeEnvelope(envelope, metaFormat)
|
||||
with(formatFactory(formatMeta, context)) {
|
||||
output.writeThis(envelope)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,8 @@ package hep.dataforge.io.tcp
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.context.ContextAware
|
||||
import hep.dataforge.io.*
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.asCoroutineDispatcher
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.io.streams.writePacket
|
||||
@ -15,11 +17,14 @@ class EnvelopeClient(
|
||||
override val context: Context,
|
||||
val host: String,
|
||||
val port: Int,
|
||||
val format: EnvelopeFormat = TaggedEnvelopeFormat
|
||||
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
) : Responder, ContextAware {
|
||||
|
||||
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
|
||||
private val format = formatFactory(formatMeta, context = context)
|
||||
|
||||
// private var socket: SocketChannel? = null
|
||||
//
|
||||
// private fun getSocket(): Socket {
|
||||
|
@ -2,10 +2,12 @@ package hep.dataforge.io.tcp
|
||||
|
||||
import hep.dataforge.context.Context
|
||||
import hep.dataforge.context.ContextAware
|
||||
import hep.dataforge.io.EnvelopeFormat
|
||||
import hep.dataforge.io.EnvelopeFormatFactory
|
||||
import hep.dataforge.io.Responder
|
||||
import hep.dataforge.io.TaggedEnvelopeFormat
|
||||
import hep.dataforge.io.type
|
||||
import hep.dataforge.meta.EmptyMeta
|
||||
import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.io.streams.writePacket
|
||||
import java.net.ServerSocket
|
||||
@ -17,11 +19,14 @@ class EnvelopeServer(
|
||||
val port: Int,
|
||||
val responder: Responder,
|
||||
val scope: CoroutineScope,
|
||||
val format: EnvelopeFormat = TaggedEnvelopeFormat
|
||||
formatFactory: EnvelopeFormatFactory = TaggedEnvelopeFormat,
|
||||
formatMeta: Meta = EmptyMeta
|
||||
) : ContextAware {
|
||||
|
||||
private var job: Job? = null
|
||||
|
||||
private val format = formatFactory(formatMeta, context = context)
|
||||
|
||||
fun start() {
|
||||
if (job == null) {
|
||||
logger.info { "Starting envelope server on port $port" }
|
||||
|
@ -1,5 +1,6 @@
|
||||
package hep.dataforge.io
|
||||
|
||||
import hep.dataforge.context.Global
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.Ignore
|
||||
import kotlin.test.Test
|
||||
@ -24,9 +25,9 @@ class FileEnvelopeTest {
|
||||
@Ignore
|
||||
fun testFileWriteRead() {
|
||||
val tmpPath = Files.createTempFile("dataforge_test", ".df")
|
||||
tmpPath.writeEnvelope(envelope)
|
||||
Global.io.writeEnvelopeFile(tmpPath,envelope)
|
||||
println(tmpPath.toUri())
|
||||
val restored: Envelope = tmpPath.readEnvelope()
|
||||
val restored: Envelope = Global.io.readEnvelopeFile(tmpPath)
|
||||
assertTrue { envelope.contentEquals(restored) }
|
||||
}
|
||||
}
|
@ -17,7 +17,7 @@ import kotlin.time.ExperimentalTime
|
||||
@ExperimentalStdlibApi
|
||||
object EchoResponder : Responder {
|
||||
override suspend fun respond(request: Envelope): Envelope {
|
||||
val string = TaggedEnvelopeFormat.run { writeBytes(request).decodeToString() }
|
||||
val string = TaggedEnvelopeFormat().run { writeBytes(request).decodeToString() }
|
||||
println("ECHO:")
|
||||
println(string)
|
||||
return request
|
||||
|
@ -59,6 +59,8 @@ interface Meta : MetaRepr {
|
||||
* A key for single value node
|
||||
*/
|
||||
const val VALUE_KEY = "@value"
|
||||
|
||||
val empty: EmptyMeta = EmptyMeta
|
||||
}
|
||||
}
|
||||
|
||||
@ -183,15 +185,15 @@ operator fun <M : MetaNode<M>> MetaNode<M>?.get(key: NameToken): MetaItem<M>? =
|
||||
/**
|
||||
* Equals, hashcode and to string for any meta
|
||||
*/
|
||||
abstract class MetaBase: Meta{
|
||||
abstract class MetaBase : Meta {
|
||||
|
||||
override fun equals(other: Any?): Boolean = if(other is Meta) {
|
||||
override fun equals(other: Any?): Boolean = if (other is Meta) {
|
||||
this.items == other.items
|
||||
} else {
|
||||
false
|
||||
}
|
||||
|
||||
override fun hashCode(): Int = items.hashCode()
|
||||
override fun hashCode(): Int = items.hashCode()
|
||||
|
||||
override fun toString(): String = items.toString()
|
||||
}
|
||||
@ -228,7 +230,7 @@ object EmptyMeta : MetaBase() {
|
||||
* Unsafe methods to access values and nodes directly from [MetaItem]
|
||||
*/
|
||||
|
||||
val MetaItem<*>?.value
|
||||
val MetaItem<*>?.value: Value?
|
||||
get() = (this as? ValueItem)?.value
|
||||
?: (this?.node?.get(VALUE_KEY) as? ValueItem)?.value
|
||||
|
||||
@ -252,7 +254,7 @@ val MetaItem<*>?.stringList get() = value?.list?.map { it.string } ?: emptyList(
|
||||
val <M : Meta> MetaItem<M>?.node: M?
|
||||
get() = when (this) {
|
||||
null -> null
|
||||
is ValueItem -> error("Trying to interpret value meta item as node item")
|
||||
is ValueItem -> null//error("Trying to interpret value meta item as node item")
|
||||
is NodeItem -> node
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ class ConsoleOutputManager : AbstractPlugin(), OutputManager {
|
||||
|
||||
override val type = ConsoleOutputManager::class
|
||||
|
||||
override fun invoke(meta:Meta) = ConsoleOutputManager()
|
||||
override fun invoke(meta: Meta, context: Context) = ConsoleOutputManager()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -11,83 +11,101 @@ import hep.dataforge.meta.Meta
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import kotlinx.io.nio.asInput
|
||||
import kotlinx.io.nio.asOutput
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Path
|
||||
import java.nio.file.StandardOpenOption
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
/**
|
||||
* Read meta from file in a given [format]
|
||||
* Read meta from file in a given [MetaFormat]
|
||||
*/
|
||||
fun Path.readMeta(format: MetaFormat, descriptor: NodeDescriptor? = null): Meta {
|
||||
return format.run {
|
||||
Files.newByteChannel(this@readMeta, StandardOpenOption.READ)
|
||||
.asInput()
|
||||
.readMeta(descriptor)
|
||||
}
|
||||
fun MetaFormat.readMetaFile(path: Path, descriptor: NodeDescriptor? = null): Meta {
|
||||
return Files.newByteChannel(path, StandardOpenOption.READ)
|
||||
.asInput()
|
||||
.readMeta(descriptor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write meta to file using given [MetaFormat]
|
||||
*/
|
||||
fun MetaFormat.writeMetaFile(path: Path, meta: Meta, descriptor: NodeDescriptor? = null) {
|
||||
return Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)
|
||||
.asOutput()
|
||||
.writeMeta(meta, descriptor)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read data with supported envelope format and binary format. If envelope format is null, then read binary directly from file.
|
||||
* The operation is blocking since it must read meta header. The reading of envelope body is lazy
|
||||
* @param type explicit type of data read
|
||||
* @param format binary format
|
||||
* @param envelopeFormat the format of envelope. If null, file is read directly
|
||||
* @param dataFormat binary format
|
||||
* @param envelopeFormatFactory the format of envelope. If null, file is read directly
|
||||
* @param metaFile the relative file for optional meta override
|
||||
* @param metaFileFormat the meta format for override
|
||||
*/
|
||||
fun <T : Any> Path.readData(
|
||||
fun <T : Any> IOPlugin.readData(
|
||||
path: Path,
|
||||
type: KClass<out T>,
|
||||
format: IOFormat<T>,
|
||||
envelopeFormat: EnvelopeFormat? = null,
|
||||
metaFile: Path = resolveSibling("$fileName.meta"),
|
||||
metaFileFormat: MetaFormat = JsonMetaFormat
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormatFactory: EnvelopeFormatFactory? = null,
|
||||
metaFile: Path = path.resolveSibling("${path.fileName}.meta"),
|
||||
metaFileFormat: MetaFormat = JsonMetaFormat.default
|
||||
): Data<T> {
|
||||
val externalMeta = if (Files.exists(metaFile)) {
|
||||
metaFile.readMeta(metaFileFormat)
|
||||
metaFileFormat.readMetaFile(metaFile)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
return if (envelopeFormat == null) {
|
||||
return if (envelopeFormatFactory == null) {
|
||||
Data(type, externalMeta ?: EmptyMeta) {
|
||||
withContext(Dispatchers.IO) {
|
||||
format.run {
|
||||
Files.newByteChannel(this@readData, StandardOpenOption.READ)
|
||||
dataFormat.run {
|
||||
Files.newByteChannel(path, StandardOpenOption.READ)
|
||||
.asInput()
|
||||
.readThis()
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
readEnvelope(envelopeFormat).let {
|
||||
readEnvelopeFile(path, envelopeFormatFactory).let {
|
||||
if (externalMeta == null) {
|
||||
it
|
||||
} else {
|
||||
it.withMetaLayers(externalMeta)
|
||||
}
|
||||
}.toData(type, format)
|
||||
}.toData(type, dataFormat)
|
||||
}
|
||||
}
|
||||
|
||||
fun <T : Any> DataTreeBuilder<T>.file(path: Path, format: IOFormat<T>, envelopeFormat: EnvelopeFormat? = null) {
|
||||
val data = path.readData(type, format, envelopeFormat)
|
||||
val name = path.fileName.toString().replace(".df", "")
|
||||
datum(name, data)
|
||||
//TODO wants multi-receiver
|
||||
fun <T : Any> DataTreeBuilder<T>.file(
|
||||
plugin: IOPlugin,
|
||||
path: Path,
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormatFactory: EnvelopeFormatFactory? = null
|
||||
) {
|
||||
plugin.run {
|
||||
val data = readData(path, type, dataFormat, envelopeFormatFactory)
|
||||
val name = path.fileName.toString().replace(".df", "")
|
||||
datum(name, data)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the directory as a data node
|
||||
*/
|
||||
fun <T : Any> Path.readDataNode(
|
||||
fun <T : Any> IOPlugin.readDataNode(
|
||||
path: Path,
|
||||
type: KClass<out T>,
|
||||
format: IOFormat<T>,
|
||||
envelopeFormat: EnvelopeFormat? = null
|
||||
dataFormat: IOFormat<T>,
|
||||
envelopeFormatFactory: EnvelopeFormatFactory? = null
|
||||
): DataNode<T> {
|
||||
if (!Files.isDirectory(this)) error("Provided path $this is not a directory")
|
||||
if (!Files.isDirectory(path)) error("Provided path $this is not a directory")
|
||||
return DataNode(type) {
|
||||
Files.list(this@readDataNode).forEach { path ->
|
||||
Files.list(path).forEach { path ->
|
||||
if (!path.fileName.toString().endsWith(".meta")) {
|
||||
file(path, format, envelopeFormat)
|
||||
file(this@readDataNode,path, dataFormat, envelopeFormatFactory)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user