WIP full data refactor

This commit is contained in:
Alexander Nozik 2021-01-10 14:32:52 +03:00
parent 13c0d189bb
commit 9ed4245d84
74 changed files with 1516 additions and 1161 deletions

View File

@ -18,6 +18,7 @@
- \[Major breaking change\] `MetaItem` renamed to `TypedMetaItem` and `MetaItem` is now an alias for `TypedMetaItem<*>`
- \[Major breaking change\] Moved `NodeItem` and `ValueItem` to a top level
- Plugins are removed from Context constructor and added lazily in ContextBuilder
- \[Major breaking change\] Full refactor of DataTree/DataSource
### Deprecated

View File

@ -1,6 +1,7 @@
package hep.dataforge.context
import hep.dataforge.meta.Meta
import hep.dataforge.misc.Named
import hep.dataforge.names.Name
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
@ -8,7 +9,7 @@ import kotlin.reflect.KProperty
public abstract class AbstractPlugin(override val meta: Meta = Meta.EMPTY) : Plugin {
private var _context: Context? = null
private val dependencies = ArrayList<PluginFactory<*>>()
private val dependencies = HashMap<PluginFactory<*>, Meta>()
override val context: Context
get() = _context ?: error("Plugin $tag is not attached")
@ -21,13 +22,13 @@ public abstract class AbstractPlugin(override val meta: Meta = Meta.EMPTY) : Plu
this._context = null
}
final override fun dependsOn(): List<PluginFactory<*>> = dependencies
final override fun dependsOn(): Map<PluginFactory<*>, Meta> = dependencies
/**
* Register plugin dependency and return a delegate which provides lazily initialized reference to dependent plugin
*/
protected fun <P : Plugin> require(factory: PluginFactory<P>): ReadOnlyProperty<AbstractPlugin, P> {
dependencies.add(factory)
protected fun <P : Plugin> require(factory: PluginFactory<P>, meta: Meta = Meta.EMPTY): ReadOnlyProperty<AbstractPlugin, P> {
dependencies[factory] = meta
return PluginDependencyDelegate(factory.type)
}
}

View File

@ -4,6 +4,7 @@ import hep.dataforge.meta.Laminate
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.itemSequence
import hep.dataforge.misc.Named
import hep.dataforge.names.Name
import hep.dataforge.provider.Provider
import kotlinx.coroutines.CoroutineScope
@ -21,7 +22,7 @@ import kotlin.coroutines.CoroutineContext
* be overridden by plugin implementation.
*
*/
public open class Context(
public open class Context internal constructor(
final override val name: Name,
public val parent: Context?,
meta: Meta,
@ -39,7 +40,7 @@ public open class Context(
/**
* A [PluginManager] for current context
*/
public val plugins: PluginManager by lazy { PluginManager(this)}
public val plugins: PluginManager by lazy { PluginManager(this) }
override val defaultTarget: String get() = Plugin.TARGET
@ -86,6 +87,9 @@ public open class Context(
}
}
public fun Context(name: String, parent: Context = Global, block: ContextBuilder.() -> Unit = {}): Context =
Global.context(name, parent, block)
/**
* The interface for something that encapsulated in context
*

View File

@ -3,10 +3,11 @@ package hep.dataforge.context
import hep.dataforge.context.Plugin.Companion.TARGET
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.misc.Named
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
import hep.dataforge.type.Type
/**
* The interface to define a Context plugin. A plugin stores all runtime features of a context.
@ -37,7 +38,7 @@ public interface Plugin : Named, ContextAware, Provider, MetaRepr {
* dependencies must be initialized and enabled in the Context before this
* plugin is enabled.
*/
public fun dependsOn(): Collection<PluginFactory<*>>
public fun dependsOn(): Map<PluginFactory<*>, Meta>
/**
* Start this plugin and attach registration info to the context. This method

View File

@ -1,6 +1,6 @@
package hep.dataforge.context
import hep.dataforge.type.Type
import hep.dataforge.misc.Type
import kotlin.reflect.KClass
@Type(PluginFactory.TYPE)

View File

@ -89,8 +89,8 @@ public class PluginManager(override val context: Context) : ContextAware, Iterab
if (get(plugin::class, plugin.tag, recursive = false) != null) {
error("Plugin with tag ${plugin.tag} already exists in ${context.name}")
} else {
for (tag in plugin.dependsOn()) {
fetch(tag, true)
for ((factory, meta) in plugin.dependsOn()) {
fetch(factory, meta, true)
}
logger.info { "Loading plugin ${plugin.name} into ${context.name}" }
@ -123,7 +123,7 @@ public class PluginManager(override val context: Context) : ContextAware, Iterab
/**
* Get an existing plugin with given meta or load new one using provided factory
*/
public fun <T : Plugin> fetch(factory: PluginFactory<T>, recursive: Boolean = true, meta: Meta = Meta.EMPTY): T {
public fun <T : Plugin> fetch(factory: PluginFactory<T>, meta: Meta = Meta.EMPTY, recursive: Boolean = true): T {
val loaded = get(factory.type, factory.tag, recursive)
return when {
loaded == null -> load(factory(meta, context))
@ -136,7 +136,7 @@ public class PluginManager(override val context: Context) : ContextAware, Iterab
factory: PluginFactory<T>,
recursive: Boolean = true,
metaBuilder: MetaBuilder.() -> Unit,
): T = fetch(factory, recursive, Meta(metaBuilder))
): T = fetch(factory, Meta(metaBuilder), recursive)
override fun iterator(): Iterator<Plugin> = plugins.iterator()

View File

@ -1,5 +1,6 @@
package hep.dataforge.context
import hep.dataforge.misc.Named
import hep.dataforge.provider.Path
import mu.KLogger
import mu.KotlinLogging

View File

@ -48,8 +48,9 @@ public fun <T : Any> Context.gather(
putAll(top(target, type))
plugins.forEach { plugin ->
plugin.top(target, type).forEach { (name, value) ->
if (containsKey(name)) error("Name conflict during gather. An item with name $name could not be gathered from $plugin because key is already present.")
put(plugin.name + name, value)
val itemName = plugin.name + name
if (containsKey(itemName)) error("Name conflict during gather. An item with name $name could not be gathered from $plugin because key is already present.")
put(itemName, value)
}
}
if (inherit) {

View File

@ -3,8 +3,8 @@ package hep.dataforge.provider
import hep.dataforge.context.Context
import hep.dataforge.context.gather
import hep.dataforge.meta.DFExperimental
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.type.Type
import kotlin.reflect.KClass
import kotlin.reflect.full.findAnnotation

View File

@ -20,4 +20,4 @@ kotlin {
}
}
}
}
}

View File

@ -1,21 +1,22 @@
package hep.dataforge.data
import hep.dataforge.meta.Meta
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
/**
* A simple data transformation on a data node
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [run].
*/
public interface Action<in T : Any, out R : Any> {
/**
* Transform the data in the node, producing a new node. By default it is assumed that all calculations are lazy
* so not actual computation is started at this moment
* so not actual computation is started at this moment.
*
* [scope] context used to compute the initial result, also it is used for updates propagation
*/
public operator fun invoke(node: DataNode<T>, meta: Meta): DataNode<R>
public suspend fun run(set: DataSet<T>, meta: Meta, scope: CoroutineScope): DataSet<R>
/**
* Terminal action is the one that could not be invoked lazily and requires some kind of blocking computation to invoke
*/
public val isTerminal: Boolean get() = false
public companion object
}
/**
@ -24,12 +25,9 @@ public interface Action<in T : Any, out R : Any> {
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> {
// TODO introduce composite action and add optimize by adding action to the list
return object : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
return action(this@then.invoke(node, meta), meta)
override suspend fun run(set: DataSet<T>, meta: Meta, scope: CoroutineScope): DataSet<R> {
return action.run(this@then.run(set, meta, scope), meta, scope)
}
override val isTerminal: Boolean
get() = this@then.isTerminal || action.isTerminal
}
}

View File

@ -0,0 +1,48 @@
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.startsWith
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlin.reflect.KType
/**
* Remove all values with keys starting with [name]
*/
internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
val toRemove = keys.filter { it.startsWith(name) }
toRemove.forEach(::remove)
}
/**
* An action that caches results on-demand and recalculates them on source push
*/
public abstract class CachingAction<in T : Any, out R : Any>(
public val outputType: KType,
) : Action<T, R> {
protected abstract fun CoroutineScope.transform(
set: DataSet<T>,
meta: Meta,
key: Name = Name.EMPTY,
): Flow<NamedData<R>>
override suspend fun run(
set: DataSet<T>,
meta: Meta,
scope: CoroutineScope,
): DataSet<R> = DataTree.dynamic(outputType, scope) {
collectFrom(scope.transform(set, meta))
scope.let {
set.updates.collect {
//clear old nodes
remove(it)
//collect new items
collectFrom(scope.transform(set, meta, it))
//FIXME if the target is data, updates are fired twice
}
}
}
}

View File

@ -3,8 +3,10 @@ package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.isEmpty
import hep.dataforge.type.Type
import kotlinx.coroutines.CoroutineScope
import hep.dataforge.misc.Named
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
@ -34,52 +36,34 @@ public interface Data<out T : Any> : Goal<T>, MetaRepr {
public companion object {
public const val TYPE: String = "data"
public operator fun <T : Any> invoke(
type: KClass<out T>,
public fun <T : Any> static(
value: T,
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
block: suspend CoroutineScope.() -> T,
): Data<T> = ComputationData(type, meta, context, dependencies, block)
): Data<T> = StaticData(value, meta)
public inline operator fun <reified T : Any> invoke(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
noinline block: suspend CoroutineScope.() -> T,
): Data<T> = invoke(T::class, meta, context, dependencies, block)
public operator fun <T : Any> invoke(
name: String,
type: KClass<out T>,
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
block: suspend CoroutineScope.() -> T,
): Data<T> = NamedData(name, invoke(type, meta, context, dependencies, block))
public inline operator fun <reified T : Any> invoke(
name: String,
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
noinline block: suspend CoroutineScope.() -> T,
): Data<T> =
invoke(name, T::class, meta, context, dependencies, block)
public fun <T : Any> static(value: T, meta: Meta = Meta.EMPTY): Data<T> =
StaticData(value, meta)
/**
* An empty data containing only meta
*/
public fun empty(meta: Meta): Data<Nothing> = object : Data<Nothing> {
override val type: KClass<out Nothing> = Nothing::class
override val meta: Meta = meta
override val dependencies: Collection<Goal<*>> = emptyList()
override val deferred: Deferred<Nothing> get() = GlobalScope.async(start = CoroutineStart.LAZY) {
error("The Data is empty and could not be computed")
}
override fun async(coroutineScope: CoroutineScope): Deferred<Nothing> = deferred
override fun reset() {}
}
}
}
public class ComputationData<T : Any>(
public class LazyData<T : Any>(
override val type: KClass<out T>,
override val meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
block: suspend CoroutineScope.() -> T,
) : Data<T>, ComputationGoal<T>(context, dependencies, block)
) : Data<T>, LazyGoal<T>(context, dependencies, block)
public class StaticData<T : Any>(
value: T,
@ -88,14 +72,40 @@ public class StaticData<T : Any>(
override val type: KClass<out T> get() = value::class
}
public class NamedData<out T : Any>(public val name: String, data: Data<T>) : Data<T> by data
@Suppress("FunctionName")
public fun <T : Any> Data(
type: KClass<out T>,
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
block: suspend CoroutineScope.() -> T,
): Data<T> = LazyData(type, meta, context, dependencies, block)
@Suppress("FunctionName")
public inline fun <reified T : Any> Data(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
noinline block: suspend CoroutineScope.() -> T,
): Data<T> = Data(T::class, meta, context, dependencies, block)
public class NamedData<out T : Any> internal constructor(
override val name: Name,
public val data: Data<T>,
) : Data<T> by data, Named
public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedData(name, this.data)
} else {
NamedData(name, this)
}
public fun <T : Any, R : Any> Data<T>.map(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
block: suspend CoroutineScope.(T) -> R,
): Data<R> = ComputationData(outputType, meta, coroutineContext, listOf(this)) {
): Data<R> = LazyData(outputType, meta, coroutineContext, listOf(this)) {
block(await())
}
@ -107,7 +117,7 @@ public inline fun <T : Any, reified R : Any> Data<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
noinline block: suspend CoroutineScope.(T) -> R,
): Data<R> = ComputationData(R::class, meta, coroutineContext, listOf(this)) {
): Data<R> = LazyData(R::class, meta, coroutineContext, listOf(this)) {
block(await())
}
@ -118,7 +128,7 @@ public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta,
noinline block: suspend CoroutineScope.(Collection<T>) -> R,
): Data<R> = ComputationData(
): Data<R> = LazyData(
R::class,
meta,
coroutineContext,
@ -132,7 +142,7 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta,
block: suspend CoroutineScope.(Map<K, T>) -> R,
): ComputationData<R> = ComputationData(
): LazyData<R> = LazyData(
outputType,
meta,
coroutineContext,
@ -152,7 +162,7 @@ public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta,
noinline block: suspend CoroutineScope.(Map<K, T>) -> R,
): ComputationData<R> = ComputationData(
): LazyData<R> = LazyData(
R::class,
meta,
coroutineContext,

View File

@ -1,53 +0,0 @@
package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.names.toName
public class DataFilter : Scheme() {
/**
* A source node for the filter
*/
public var from: String? by string()
/**
* A target placement for the filtered node
*/
public var to: String? by string()
/**
* A regular expression pattern for the filter
*/
public var pattern: String by string(".*")
// val prefix by string()
// val suffix by string()
public companion object : SchemeSpec<DataFilter>(::DataFilter)
}
/**
* Apply meta-based filter to given data node
*/
public fun <T : Any> DataNode<T>.filter(filter: DataFilter): DataNode<T> {
val sourceNode = filter.from?.let { get(it.toName()).node } ?: this@filter
val regex = filter.pattern.toRegex()
val targetNode = DataTreeBuilder(type).apply {
sourceNode.dataSequence().forEach { (name, data) ->
if (name.toString().matches(regex)) {
this[name] = data
}
}
}
return filter.to?.let {
DataTreeBuilder(type).apply { this[it.toName()] = targetNode }.build()
} ?: targetNode.build()
}
/**
* Filter data using [DataFilter] specification
*/
public fun <T : Any> DataNode<T>.filter(filter: Meta): DataNode<T> = filter(DataFilter.read(filter))
/**
* Filter data using [DataFilter] builder
*/
public fun <T : Any> DataNode<T>.filter(filterBuilder: DataFilter.() -> Unit): DataNode<T> =
filter(DataFilter(filterBuilder))

View File

@ -1,142 +0,0 @@
package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.names.*
import hep.dataforge.type.Type
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.reflect.KClass
public sealed class DataItem<out T : Any> : MetaRepr {
public abstract val type: KClass<out T>
public abstract val meta: Meta
public class Node<out T : Any>(public val node: DataNode<T>) : DataItem<T>() {
override val type: KClass<out T> get() = node.type
override fun toMeta(): Meta = node.toMeta()
override val meta: Meta get() = node.meta
}
public class Leaf<out T : Any>(public val data: Data<T>) : DataItem<T>() {
override val type: KClass<out T> get() = data.type
override fun toMeta(): Meta = data.toMeta()
override val meta: Meta get() = data.meta
}
}
/**
* A tree-like data structure grouped into the node. All data inside the node must inherit its type
*/
@Type(DataNode.TYPE)
public interface DataNode<out T : Any> : MetaRepr {
/**
* The minimal common ancestor to all data in the node
*/
public val type: KClass<out T>
/**
* Children items of this data node
*/
public val items: Map<NameToken, DataItem<T>>
/**
* Meta for this node
*/
public val meta: Meta
override fun toMeta(): Meta = Meta {
"type" put (type.simpleName ?: "undefined")
"meta" put meta
"items" put {
this@DataNode.items.forEach {
it.key.toString() put it.value.toMeta()
}
}
}
public companion object {
public const val TYPE: String = "dataNode"
public fun <T : Any> builder(type: KClass<out T>): DataTreeBuilder<T> = DataTreeBuilder(type)
}
}
/**
* Start computation for all goals in data node and return a job for the whole node
*/
@Suppress("DeferredResultUnused")
public fun <T : Any> DataNode<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
items.values.forEach {
when (it) {
is DataItem.Node<*> -> it.node.run { startAll(this@launch) }
is DataItem.Leaf<*> -> it.data.run { this.startAsync(this@launch) }
}
}
}
public suspend fun <T: Any> DataNode<T>.join(): Unit = coroutineScope { startAll(this).join() }
public val <T : Any> DataItem<T>?.node: DataNode<T>? get() = (this as? DataItem.Node<T>)?.node
public val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.data
public operator fun <T : Any> DataNode<T>.get(name: Name): DataItem<T>? = when (name.length) {
0 -> DataItem.Node(this)
1 -> items[name.firstOrNull()]
else -> get(name.firstOrNull()!!.asName()).node?.get(name.cutFirst())
}
public operator fun <T : Any> DataNode<T>.get(name: String): DataItem<T>? = get(name.toName())
/**
* Sequence of all children including nodes
*/
public fun <T : Any> DataNode<T>.itemSequence(): Sequence<Pair<Name, DataItem<T>>> = sequence {
items.forEach { (head, item) ->
yield(head.asName() to item)
if (item is DataItem.Node) {
val subSequence = item.node.itemSequence()
.map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
/**
* Sequence of data entries
*/
public fun <T : Any> DataNode<T>.dataSequence(): Sequence<Pair<Name, Data<T>>> = sequence {
items.forEach { (head, item) ->
when (item) {
is DataItem.Leaf -> yield(head.asName() to item.data)
is DataItem.Node -> {
val subSequence = item.node.dataSequence()
.map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
}
@DFExperimental
public fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataTree(type) {
dataSequence().forEach { (name, data) ->
if (predicate(name, data)) {
this[name] = data
}
}
}
public fun <T : Any> DataNode<T>.first(): Data<T>? = dataSequence().firstOrNull()?.second
public operator fun <T : Any> DataNode<T>.iterator(): Iterator<Pair<Name, DataItem<T>>> = itemSequence().iterator()

View File

@ -0,0 +1,121 @@
package hep.dataforge.data
import hep.dataforge.meta.DFExperimental
import hep.dataforge.names.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.reflect.KClass
public interface DataSet<out T : Any> {
/**
* The minimal common ancestor to all data in the node
*/
public val dataType: KClass<out T>
/**
* Traverse this provider or its child. The order is not guaranteed.
* [root] points to a root name for traversal. If it is empty, traverse this source, if it points to a [Data],
* return flow, that contains single [Data], if it points to a node with children, return children.
*/
public fun flow(): Flow<NamedData<T>>
/**
* Get data with given name.
*/
public suspend fun getData(name: Name): Data<T>?
/**
* Get a snapshot of names of children of given node. Empty if node does not exist or is a leaf.
*
* By default traverses the whole tree. Could be optimized in descendants
*/
public suspend fun listChildren(prefix: Name = Name.EMPTY): List<Name> =
flow().map { it.name }.filter { it.startsWith(prefix) && (it.length == prefix.length + 1) }.toList()
/**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
* Those can include new data items and replacement of existing ones. The replaced items could update existing data content
* and replace it completely, so they should be pulled again.
*
*/
public val updates: Flow<Name>
public companion object {
public val META_KEY: Name = "@meta".asName()
}
}
/**
* A stateless filtered [DataSet]
*/
@DFExperimental
public fun <T : Any> DataSet<T>.filter(
predicate: suspend (Name, Data<T>) -> Boolean,
): DataSet<T> = object : DataSet<T> {
override val dataType: KClass<out T> get() = this@filter.dataType
override fun flow(): Flow<NamedData<T>> =
this@filter.flow().filter { predicate(it.name, it.data) }
override suspend fun getData(name: Name): Data<T>? = this@filter.getData(name)?.takeIf {
predicate(name, it)
}
override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name ->
val theData = this@filter.getData(name) ?: return@flowFilter false
predicate(name, theData)
}
}
/**
* Flow all data nodes with names starting with [branchName]
*/
public fun <T : Any> DataSet<T>.flowChildren(branchName: Name): Flow<NamedData<T>> = this@flowChildren.flow().filter {
it.name.startsWith(branchName)
}
/**
* Get a subset of data starting with a given [branchName]
*/
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) this
else object : DataSet<T> {
override val dataType: KClass<out T> get() = this@branch.dataType
override fun flow(): Flow<NamedData<T>> = this@branch.flow().mapNotNull {
it.name.removeHeadOrNull(branchName)?.let { name ->
it.data.named(name)
}
}
override suspend fun getData(name: Name): Data<T>? = this@branch.getData(branchName + name)
override val updates: Flow<Name> get() = this@branch.updates.mapNotNull { it.removeHeadOrNull(branchName) }
}
/**
* Generate a wrapper data set with a given name prefix appended to all names
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) this
else object : DataSet<T> {
override val dataType: KClass<out T> get() = this@withNamePrefix.dataType
override fun flow(): Flow<NamedData<T>> = this@withNamePrefix.flow().map { it.data.named(prefix + it.name) }
override suspend fun getData(name: Name): Data<T>? =
name.removeHeadOrNull(name)?.let { this@withNamePrefix.getData(it) }
override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
}
/**
* Start computation for all goals in data node and return a job for the whole node
*/
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
flow().map {
it.launch(this@launch)
}.toList().joinAll()
}
public suspend fun <T : Any> DataSet<T>.join(): Unit = coroutineScope { startAll(this).join() }

View File

@ -0,0 +1,71 @@
package hep.dataforge.data
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.collect
public interface DataSetBuilder<in T : Any> {
public fun remove(name: Name)
public operator fun set(name: Name, data: Data<T>?)
public suspend fun set(name: Name, dataSet: DataSet<T>)
public operator fun set(name: Name, block: DataSetBuilder<T>.() -> Unit)
/**
* Append data to node
*/
public infix fun String.put(data: Data<T>): Unit = set(toName(), data)
/**
* Append node
*/
public suspend infix fun String.put(tree: DataSet<T>): Unit = set(toName(), tree)
/**
* Build and append node
*/
public infix fun String.put(block: DataSetBuilder<T>.() -> Unit): Unit = set(toName(), block)
}
public operator fun <T : Any> DataSetBuilder<T>.set(name: String, data: Data<T>) {
this@set[name.toName()] = data
}
public fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, meta: Meta = Meta.EMPTY) {
set(name, Data.static(data, meta))
}
public fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) {
set(name, Data.static(data, Meta(block)))
}
public fun <T : Any> DataSetBuilder<T>.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) {
set(name.toName(), Data.static(data, Meta(block)))
}
public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, set: DataSet<T>) {
this.set(name.toName(), set)
}
public operator fun <T : Any> DataSetBuilder<T>.set(name: String, block: DataSetBuilder<T>.() -> Unit): Unit =
this@set.set(name.toName(), block)
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public suspend fun <T: Any> DataSetBuilder<T>.update(tree: DataSet<T>): Unit = coroutineScope{
tree.flow().collect {
//TODO check if the place is occupied
set(it.name, it.data)
}
}

View File

@ -0,0 +1,106 @@
package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.misc.Type
import hep.dataforge.names.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.reflect.KClass
public sealed class DataTreeItem<out T : Any> {
public class Node<out T : Any>(public val tree: DataTree<T>) : DataTreeItem<T>()
public class Leaf<out T : Any>(public val data: Data<T>) : DataTreeItem<T>()
}
public val <T : Any> DataTreeItem<T>.type: KClass<out T>
get() = when (this) {
is DataTreeItem.Node -> tree.dataType
is DataTreeItem.Leaf -> data.type
}
/**
* A tree-like [DataSet] grouped into the node. All data inside the node must inherit its type
*/
@Type(DataTree.TYPE)
public interface DataTree<out T : Any> : DataSet<T> {
/**
* Children items of this [DataTree] provided asynchronously
*/
public suspend fun items(): Map<NameToken, DataTreeItem<T>>
// override fun flow(): Flow<NamedData<T>> = flow flowBuilder@{
// val item = getItem(root) ?: return@flowBuilder
// when (item) {
// is DataTreeItem.Leaf -> emit(item.data.named(root))
// is DataTreeItem.Node -> item.tree.items().forEach { (token, childItem: DataTreeItem<T>) ->
// when (childItem) {
// is DataTreeItem.Leaf -> emit(childItem.data.named(root + token))
// is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(root + token + it.name) })
// }
// }
// }
// }
override fun flow(): Flow<NamedData<T>> = flow {
items().forEach { (token, childItem: DataTreeItem<T>) ->
when (childItem) {
is DataTreeItem.Leaf -> emit(childItem.data.named(token.asName()))
is DataTreeItem.Node -> emitAll(childItem.tree.flow().map { it.named(token + it.name) })
}
}
}
override suspend fun listChildren(prefix: Name): List<Name> =
getItem(prefix).tree?.items()?.keys?.map { prefix + it } ?: emptyList()
override suspend fun getData(name: Name): Data<T>? = when (name.length) {
0 -> null
1 -> items()[name.firstOrNull()!!].data
else -> items()[name.firstOrNull()!!].tree?.getData(name.cutFirst())
}
public companion object {
public const val TYPE: String = "dataTree"
}
}
/**
* Get a [DataTreeItem] with given [name] or null if the item does not exist
*/
public tailrec suspend fun <T : Any> DataTree<T>.getItem(name: Name): DataTreeItem<T>? = when (name.length) {
0 -> DataTreeItem.Node(this)
1 -> items()[name.firstOrNull()]
else -> items()[name.firstOrNull()!!].tree?.getItem(name.cutFirst())
}
public val <T : Any> DataTreeItem<T>?.tree: DataTree<T>? get() = (this as? DataTreeItem.Node<T>)?.tree
public val <T : Any> DataTreeItem<T>?.data: Data<T>? get() = (this as? DataTreeItem.Leaf<T>)?.data
/**
* Flow of all children including nodes
*/
public fun <T : Any> DataTree<T>.itemFlow(): Flow<Pair<Name, DataTreeItem<T>>> = flow {
items().forEach { (head, item) ->
emit(head.asName() to item)
if (item is DataTreeItem.Node) {
val subSequence = item.tree.itemFlow()
.map { (name, data) -> (head.asName() + name) to data }
emitAll(subSequence)
}
}
}
/**
* Get a branch of this [DataTree] with a given [branchName].
* The difference from similar method for [DataSet] is that internal logic is more simple and the return value is a [DataTree]
*/
public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> = object : DataTree<T> {
override val dataType: KClass<out T> get() = this@branch.dataType
override val updates: Flow<Name> = this@branch.updates.mapNotNull { it.removeHeadOrNull(branchName) }
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = getItem(branchName).tree?.items() ?: emptyMap()
}

View File

@ -1,172 +0,0 @@
package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.names.*
import kotlin.reflect.KClass
public class DataTree<out T : Any> internal constructor(
override val type: KClass<out T>,
override val items: Map<NameToken, DataItem<T>>,
override val meta: Meta
) : DataNode<T>
private sealed class DataTreeBuilderItem<out T : Any> {
class Node<T : Any>(val tree: DataTreeBuilder<T>) : DataTreeBuilderItem<T>()
class Leaf<T : Any>(val value: Data<T>) : DataTreeBuilderItem<T>()
}
/**
* A builder for a DataTree.
*/
@DFBuilder
public class DataTreeBuilder<T : Any>(public val type: KClass<out T>) {
private val map = HashMap<NameToken, DataTreeBuilderItem<T>>()
private var meta = MetaBuilder()
public operator fun set(token: NameToken, node: DataTreeBuilder<out T>) {
if (map.containsKey(token)) error("Tree entry with name $token is not empty")
map[token] = DataTreeBuilderItem.Node(node)
}
public operator fun set(token: NameToken, data: Data<T>) {
if (map.containsKey(token)) error("Tree entry with name $token is not empty")
map[token] = DataTreeBuilderItem.Leaf(data)
}
private fun buildNode(token: NameToken): DataTreeBuilder<T> {
return if (!map.containsKey(token)) {
DataTreeBuilder(type).also { map[token] = DataTreeBuilderItem.Node(it) }
} else {
(map[token] as? DataTreeBuilderItem.Node<T> ?: error("The node with name $token is occupied by leaf")).tree
}
}
private fun buildNode(name: Name): DataTreeBuilder<T> {
return when (name.length) {
0 -> this
1 -> buildNode(name.firstOrNull()!!)
else -> buildNode(name.firstOrNull()!!).buildNode(name.cutFirst())
}
}
public operator fun set(name: Name, data: Data<T>) {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, data)
2 -> buildNode(name.cutLast())[name.lastOrNull()!!] = data
}
}
public operator fun set(name: Name, node: DataTreeBuilder<out T>) {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, node)
2 -> buildNode(name.cutLast())[name.lastOrNull()!!] = node
}
}
public operator fun set(name: Name, node: DataNode<T>): Unit = set(name, node.builder())
public operator fun set(name: Name, item: DataItem<T>): Unit = when (item) {
is DataItem.Node<T> -> set(name, item.node.builder())
is DataItem.Leaf<T> -> set(name, item.data)
}
/**
* Append data to node
*/
public infix fun String.put(data: Data<T>): Unit = set(toName(), data)
/**
* Append node
*/
public infix fun String.put(node: DataNode<T>): Unit = set(toName(), node)
public infix fun String.put(item: DataItem<T>): Unit = set(toName(), item)
/**
* Build and append node
*/
public infix fun String.put(block: DataTreeBuilder<T>.() -> Unit): Unit = set(toName(), DataTreeBuilder(type).apply(block))
/**
* Update data with given node data and meta with node meta.
*/
public fun update(node: DataNode<T>) {
node.dataSequence().forEach {
//TODO check if the place is occupied
this[it.first] = it.second
}
meta.update(node.meta)
}
public fun meta(block: MetaBuilder.() -> Unit): MetaBuilder = meta.apply(block)
public fun meta(meta: Meta) {
this.meta = meta.builder()
}
public fun build(): DataTree<T> {
val resMap = map.mapValues { (_, value) ->
when (value) {
is DataTreeBuilderItem.Leaf -> DataItem.Leaf(value.value)
is DataTreeBuilderItem.Node -> DataItem.Node(value.tree.build())
}
}
return DataTree(type, resMap, meta.seal())
}
}
@Suppress("FunctionName")
public fun <T : Any> DataTree(type: KClass<out T>, block: DataTreeBuilder<T>.() -> Unit): DataTree<T> =
DataTreeBuilder(type).apply(block).build()
@Suppress("FunctionName")
public inline fun <reified T : Any> DataTree(noinline block: DataTreeBuilder<T>.() -> Unit): DataTree<T> =
DataTreeBuilder(T::class).apply(block).build()
public fun <T : Any> DataTreeBuilder<T>.datum(name: Name, data: Data<T>) {
this[name] = data
}
public fun <T : Any> DataTreeBuilder<T>.datum(name: String, data: Data<T>) {
this[name.toName()] = data
}
public fun <T : Any> DataTreeBuilder<T>.static(name: Name, data: T, meta: Meta = Meta.EMPTY) {
this[name] = Data.static(data, meta)
}
public fun <T : Any> DataTreeBuilder<T>.static(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) {
this[name] = Data.static(data, Meta(block))
}
public fun <T : Any> DataTreeBuilder<T>.static(name: String, data: T, block: MetaBuilder.() -> Unit = {}) {
this[name.toName()] = Data.static(data, Meta(block))
}
public fun <T : Any> DataTreeBuilder<T>.node(name: Name, node: DataNode<T>) {
this[name] = node
}
public fun <T : Any> DataTreeBuilder<T>.node(name: String, node: DataNode<T>) {
this[name.toName()] = node
}
public inline fun <reified T : Any> DataTreeBuilder<T>.node(name: Name, noinline block: DataTreeBuilder<T>.() -> Unit) {
this[name] = DataTree(T::class, block)
}
public inline fun <reified T : Any> DataTreeBuilder<T>.node(name: String, noinline block: DataTreeBuilder<T>.() -> Unit) {
this[name.toName()] = DataTree(T::class, block)
}
/**
* Generate a mutable builder from this node. Node content is not changed
*/
public fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder(type).apply {
dataSequence().forEach { (name, data) -> this[name] = data }
}

View File

@ -12,15 +12,17 @@ public interface Goal<out T> {
public val dependencies: Collection<Goal<*>>
/**
* Returns current running coroutine if the goal is started
* Returns current running coroutine if the goal is started. Null if the computation is not started.
*/
public val result: Deferred<T>?
public val deferred: Deferred<T>?
/**
* Get ongoing computation or start a new one.
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
*
* If the computation is already running, the scope is not used.
*/
public fun startAsync(coroutineScope: CoroutineScope): Deferred<T>
public fun async(coroutineScope: CoroutineScope): Deferred<T>
/**
* Reset the computation
@ -30,28 +32,30 @@ public interface Goal<out T> {
public companion object
}
public suspend fun <T> Goal<T>.await(): T = coroutineScope { startAsync(this).await() }
public fun Goal<*>.launch(coroutineScope: CoroutineScope): Job = async(coroutineScope)
public val Goal<*>.isComplete: Boolean get() = result?.isCompleted ?: false
public suspend fun <T> Goal<T>.await(): T = coroutineScope { async(this).await() }
public val Goal<*>.isComplete: Boolean get() = deferred?.isCompleted ?: false
public open class StaticGoal<T>(public val value: T) : Goal<T> {
override val dependencies: Collection<Goal<*>> get() = emptyList()
override val result: Deferred<T> = CompletableDeferred(value)
override val deferred: Deferred<T> = CompletableDeferred(value)
override fun startAsync(coroutineScope: CoroutineScope): Deferred<T> = result
override fun async(coroutineScope: CoroutineScope): Deferred<T> = deferred
override fun reset() {
//doNothing
}
}
public open class ComputationGoal<T>(
public open class LazyGoal<T>(
private val coroutineContext: CoroutineContext = EmptyCoroutineContext,
override val dependencies: Collection<Goal<*>> = emptyList(),
public val block: suspend CoroutineScope.() -> T,
) : Goal<T> {
final override var result: Deferred<T>? = null
final override var deferred: Deferred<T>? = null
private set
/**
@ -59,11 +63,11 @@ public open class ComputationGoal<T>(
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
*/
@DFExperimental
override fun startAsync(coroutineScope: CoroutineScope): Deferred<T> {
override fun async(coroutineScope: CoroutineScope): Deferred<T> {
val startedDependencies = this.dependencies.map { goal ->
goal.run { startAsync(coroutineScope) }
goal.run { async(coroutineScope) }
}
return result ?: coroutineScope.async(
return deferred ?: coroutineScope.async(
this.coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)
) {
startedDependencies.forEach { deferred ->
@ -72,15 +76,15 @@ public open class ComputationGoal<T>(
}
}
block()
}.also { result = it }
}.also { deferred = it }
}
/**
* Reset the computation
*/
override fun reset() {
result?.cancel()
result = null
deferred?.cancel()
deferred = null
}
}
@ -90,7 +94,7 @@ public open class ComputationGoal<T>(
public fun <T, R> Goal<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R,
): Goal<R> = ComputationGoal(coroutineContext, listOf(this)) {
): Goal<R> = LazyGoal(coroutineContext, listOf(this)) {
block(await())
}
@ -100,7 +104,7 @@ public fun <T, R> Goal<T>.map(
public fun <T, R> Collection<Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R,
): Goal<R> = ComputationGoal(coroutineContext, this) {
): Goal<R> = LazyGoal(coroutineContext, this) {
block(map { run { it.await() } })
}
@ -113,7 +117,7 @@ public fun <T, R> Collection<Goal<T>>.reduce(
public fun <K, T, R> Map<K, Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R,
): Goal<R> = ComputationGoal(coroutineContext, this.values) {
): Goal<R> = LazyGoal(coroutineContext, this.values) {
block(mapValues { it.value.await() })
}

View File

@ -15,14 +15,16 @@
*/
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.collect
import kotlin.reflect.KClass
public interface GroupRule {
public operator fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>>
public suspend fun <T : Any> gather(dataType: KClass<out T>, set: DataSet<T>): Map<String, DataSet<T>>
public companion object{
public companion object {
/**
* Create grouping rule that creates groups for different values of value
* field with name [key]
@ -31,19 +33,20 @@ public interface GroupRule {
* @param defaultTagValue
* @return
*/
public fun byValue(key: String, defaultTagValue: String): GroupRule = object :
GroupRule {
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> {
val map = HashMap<String, DataTreeBuilder<T>>()
public fun byValue(scope: CoroutineScope, key: String, defaultTagValue: String): GroupRule =
object : GroupRule {
node.dataSequence().forEach { (name, data) ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data
override suspend fun <T : Any> gather(dataType: KClass<out T>, set: DataSet<T>): Map<String, DataSet<T>> {
val map = HashMap<String, MutableDataTree<T>>()
set.flow().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { MutableDataTree(dataType, scope) }.set(data.name, data.data)
}
return map
}
return map.mapValues { it.value.build() }
}
}
// @ValueDef(key = "byValue", required = true, info = "The name of annotation value by which grouping should be made")
@ -52,17 +55,20 @@ public interface GroupRule {
// def = "default",
// info = "Default value which should be used for content in which the grouping value is not presented"
// )
public fun byMeta(config: Meta): GroupRule {
//TODO expand grouping options
return config["byValue"]?.string?.let {
byValue(
it,
config["defaultValue"]?.string ?: "default"
)
}
?: object : GroupRule {
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> = mapOf("" to node)
}
}
// public fun byMeta(scope: CoroutineScope, config: Meta): GroupRule {
// //TODO expand grouping options
// return config["byValue"]?.string?.let {
// byValue(
// scope,
// it,
// config["defaultValue"]?.string ?: "default"
// )
// } ?: object : GroupRule {
// override suspend fun <T : Any> gather(
// dataType: KClass<T>,
// source: DataSource<T>,
// ): Map<String, DataSource<T>> = mapOf("" to source)
// }
// }
}
}

View File

@ -2,6 +2,10 @@ package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlin.reflect.KClass
/**
@ -10,7 +14,7 @@ import kotlin.reflect.KClass
public data class ActionEnv(
val name: Name,
val meta: Meta,
val actionMeta: Meta
val actionMeta: Meta,
)
/**
@ -30,20 +34,22 @@ public class MapActionBuilder<T, R>(public var name: Name, public var meta: Meta
public class MapAction<T : Any, out R : Any>(
private val outputType: KClass<out R>,
private val block: MapActionBuilder<T, R>.() -> Unit
public val outputType: KClass<out R>,
private val block: MapActionBuilder<T, R>.() -> Unit,
) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataTree(outputType) {
node.dataSequence().forEach { (name, data) ->
/*
* Creating a new environment for action using **old** name, old meta and task meta
*/
val env = ActionEnv(name, data.meta, meta)
override suspend fun run(
set: DataSet<T>,
meta: Meta,
scope: CoroutineScope,
): DataSet<R> = DataTree.dynamic(outputType, scope) {
suspend fun mapOne(data: NamedData<T>): NamedData<R> {
// Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(data.name, data.meta, meta)
//applying transformation from builder
val builder = MapActionBuilder<T, R>(
name,
data.name,
data.meta.builder(), // using data meta
meta
).apply(block)
@ -56,15 +62,26 @@ public class MapAction<T : Any, out R : Any>(
val newData = data.map(outputType, meta = newMeta) { builder.result(env, it) }
//setting the data node
this[newName] = newData
return newData.named(newName)
}
collectFrom(set.flow().map(::mapOne))
scope.launch {
set.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
collectFrom(set.flowChildren(name).map(::mapOne))
}
}
}
}
public inline fun <T : Any, reified R : Any> DataNode<T>.map(
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
meta: Meta,
noinline action: MapActionBuilder<in T, out R>.() -> Unit
): DataNode<R> = MapAction(R::class, action).invoke(this, meta)
updatesScope: CoroutineScope,
noinline action: MapActionBuilder<in T, out R>.() -> Unit,
): DataSet<R> = MapAction(R::class, action).run(this, meta, updatesScope)

View File

@ -0,0 +1,159 @@
package hep.dataforge.data
import hep.dataforge.meta.*
import hep.dataforge.names.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.reflect.KClass
/**
* A mutable [DataTree.Companion.dynamic]. It
*/
public class MutableDataTree<T : Any>(
override val dataType: KClass<out T>,
public val scope: CoroutineScope,
) : DataTree<T>, DataSetBuilder<T> {
private val mutex = Mutex()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = mutex.withLock { treeItems }
private val _updates = MutableSharedFlow<Name>()
override val updates: Flow<Name>
get() = _updates
private suspend fun remove(token: NameToken) {
mutex.withLock {
if (treeItems.remove(token) != null) {
_updates.emit(token.asName())
}
}
}
override fun remove(name: Name) {
scope.launch {
if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? MutableDataTree)?.remove(name.lastOrNull()!!)
}
}
private suspend fun set(token: NameToken, node: DataSet<T>) {
//if (_map.containsKey(token)) error("Tree entry with name $token is not empty")
mutex.withLock {
treeItems[token] = DataTreeItem.Node(node.toMutableTree(scope))
coroutineScope {
node.updates.onEach {
_updates.emit(token + it)
}.launchIn(this)
}
_updates.emit(token.asName())
}
}
private suspend fun set(token: NameToken, data: Data<T>) {
mutex.withLock {
treeItems[token] = DataTreeItem.Leaf(data)
_updates.emit(token.asName())
}
}
private suspend fun getOrCreateNode(token: NameToken): MutableDataTree<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? MutableDataTree<T>
?: MutableDataTree(dataType, scope).also { set(token, it) }
private suspend fun getOrCreateNode(name: Name): MutableDataTree<T> {
return when (name.length) {
0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
}
}
override fun set(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
scope.launch {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, data)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
}
}
}
}
private suspend fun setTree(name: Name, node: MutableDataTree<out T>) {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, node)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, node)
}
}
override suspend fun set(name: Name, dataSet: DataSet<T>): Unit {
if (dataSet is MutableDataTree) {
setTree(name, dataSet)
} else {
setTree(name, dataSet.toMutableTree(scope))
}
}
override fun set(name: Name, block: DataSetBuilder<T>.() -> Unit) {
scope.launch {
setTree(name, MutableDataTree(dataType, scope).apply(block))
}
}
public fun collectFrom(flow: Flow<NamedData<T>>) {
flow.onEach {
set(it.name, it.data)
}.launchIn(scope)
}
}
public suspend fun <T : Any> DataTree.Companion.dynamic(
type: KClass<out T>,
updatesScope: CoroutineScope,
block: suspend MutableDataTree<T>.() -> Unit,
): DataTree<T> {
val tree = MutableDataTree(type, updatesScope)
tree.block()
return tree
}
public suspend inline fun <reified T : Any> DataTree.Companion.dynamic(
updatesScope: CoroutineScope,
crossinline block: suspend MutableDataTree<T>.() -> Unit,
): DataTree<T> = MutableDataTree(T::class, updatesScope).apply { block() }
public suspend inline fun <reified T : Any> MutableDataTree<T>.set(
name: Name,
noinline block: suspend MutableDataTree<T>.() -> Unit,
): Unit = set(name, DataTree.dynamic(T::class, scope, block))
public suspend inline fun <reified T : Any> MutableDataTree<T>.set(
name: String,
noinline block: suspend MutableDataTree<T>.() -> Unit,
): Unit = set(name.toName(), DataTree.dynamic(T::class, scope, block))
/**
* Generate a mutable builder from this node. Node content is not changed
*/
public suspend fun <T : Any> DataSet<T>.toMutableTree(
scope: CoroutineScope,
): MutableDataTree<T> = MutableDataTree(dataType, scope).apply {
flow().collect { set(it.name, it.data) }
this@toMutableTree.updates.onEach {
set(it, getData(it))
}.launchIn(scope)
}
public fun <T : Any> MutableDataTree<T>.branch(branchName: Name): MutableDataTree<T> =
(this as DataTree<T>).branch(branchName) as MutableDataTree<T>

View File

@ -1,13 +1,19 @@
package hep.dataforge.data
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.fold
import kotlin.reflect.KClass
public class JoinGroup<T : Any, R : Any>(public var name: String, internal val node: DataNode<T>) {
@DFExperimental
public class JoinGroup<T : Any, R : Any>(public var name: String, internal val set: DataSet<T>) {
public var meta: MetaBuilder = MetaBuilder()
@ -19,35 +25,44 @@ public class JoinGroup<T : Any, R : Any>(public var name: String, internal val n
}
public class ReduceGroupBuilder<T : Any, R : Any>(public val actionMeta: Meta) {
private val groupRules: MutableList<(DataNode<T>) -> List<JoinGroup<T, R>>> = ArrayList();
@DFExperimental
public class ReduceGroupBuilder<T : Any, R : Any>(
private val inputType: KClass<out T>,
private val scope: CoroutineScope,
public val actionMeta: Meta,
) {
private val groupRules: MutableList<suspend (DataSet<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/**
* introduce grouping by value name
* introduce grouping by meta value
*/
public fun byValue(tag: String, defaultTag: String = "@default", action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node ->
GroupRule.byValue(tag, defaultTag).invoke(node).map {
GroupRule.byValue(scope, tag, defaultTag).gather(inputType, node).map {
JoinGroup<T, R>(it.key, it.value).apply(action)
}
}
}
/**
* Add a single fixed group to grouping rules
*/
public fun group(groupName: String, filter: DataFilter, action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node ->
listOf(
JoinGroup<T, R>(groupName, node.filter(filter)).apply(action)
)
}
}
// /**
// * Add a single fixed group to grouping rules
// */
// public fun group(groupName: String, filter: DataMapper, action: JoinGroup<T, R>.() -> Unit) {
// groupRules += { node ->
// listOf(
// JoinGroup<T, R>(groupName, node.filter(filter)).apply(action)
// )
// }
// }
public fun group(groupName: String, filter: (Name, Data<T>) -> Boolean, action: JoinGroup<T, R>.() -> Unit) {
groupRules += { node ->
public fun group(
groupName: String,
filter: suspend (Name, Data<T>) -> Boolean,
action: JoinGroup<T, R>.() -> Unit,
) {
groupRules += { source ->
listOf(
JoinGroup<T, R>(groupName, node.filter(filter)).apply(action)
JoinGroup<T, R>(groupName, source.filter(filter)).apply(action)
)
}
}
@ -61,27 +76,27 @@ public class ReduceGroupBuilder<T : Any, R : Any>(public val actionMeta: Meta) {
}
}
internal fun buildGroups(input: DataNode<T>): List<JoinGroup<T, R>> {
internal suspend fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> {
return groupRules.flatMap { it.invoke(input) }
}
}
/**
* The same rules as for KPipe
*/
@DFExperimental
public class ReduceAction<T : Any, R : Any>(
private val outputType: KClass<out R>,
private val action: ReduceGroupBuilder<T, R>.() -> Unit
) : Action<T, R> {
private val inputType: KClass<out T>,
outputType: KClass<out R>,
private val action: ReduceGroupBuilder<T, R>.() -> Unit,
) : CachingAction<T, R>(outputType) {
//TODO optimize reduction. Currently the whole action recalculates on push
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataTree(outputType) {
ReduceGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group ->
//val laminate = Laminate(group.meta, meta)
val dataMap = group.node.dataSequence().associate { it }
override fun CoroutineScope.transform(set: DataSet<T>, meta: Meta, key: Name): Flow<NamedData<R>> = flow {
ReduceGroupBuilder<T, R>(inputType,this@transform, meta).apply(action).buildGroups(set).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.flow().fold(HashMap()) { acc, value ->
acc.apply {
acc[value.name] = value.data
}
}
val groupName: String = group.name
@ -89,14 +104,13 @@ public class ReduceAction<T : Any, R : Any>(
val env = ActionEnv(groupName.toName(), groupMeta, meta)
val res: ComputationData<R> = dataMap.reduce(
val res: LazyData<R> = dataFlow.reduce(
outputType,
meta = groupMeta
) { group.result.invoke(env, it) }
set(env.name, res)
emit(res.named(env.name))
}
}
}

View File

@ -6,20 +6,24 @@ import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.builder
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import kotlin.collections.set
import kotlin.reflect.KClass
public class FragmentRule<T : Any, R : Any>(public val name: Name, public var meta: MetaBuilder) {
public lateinit var result: suspend (T) -> R
public fun result(f: suspend (T) -> R) {
result = f;
}
}
public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val meta: Meta) {
public class FragmentRule<T : Any, R : Any>(public val name: Name, public var meta: MetaBuilder) {
public lateinit var result: suspend (T) -> R
public fun result(f: suspend (T) -> R) {
result = f;
}
}
internal val fragments: MutableMap<Name, FragmentRule<T, R>.() -> Unit> = HashMap()
/**
@ -32,27 +36,40 @@ public class SplitBuilder<T : Any, R : Any>(public val name: Name, public val me
}
}
/**
* Action that splits each incoming element into a number of fragments defined in builder
*/
public class SplitAction<T : Any, R : Any>(
private val outputType: KClass<out R>,
private val action: SplitBuilder<T, R>.() -> Unit
private val action: SplitBuilder<T, R>.() -> Unit,
) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataTree(outputType) {
node.dataSequence().forEach { (name, data) ->
override suspend fun run(
set: DataSet<T>,
meta: Meta,
scope: CoroutineScope,
): DataSet<R> = DataTree.dynamic(outputType, scope) {
suspend fun splitOne(data: NamedData<T>): Flow<NamedData<R>> {
val laminate = Laminate(data.meta, meta)
val split = SplitBuilder<T, R>(name, data.meta).apply(action)
val split = SplitBuilder<T, R>(data.name, data.meta).apply(action)
// apply individual fragment rules to result
split.fragments.forEach { (fragmentName, rule) ->
val env = FragmentRule<T, R>(fragmentName, laminate.builder())
return split.fragments.entries.asFlow().map { (fragmentName, rule) ->
val env = SplitBuilder.FragmentRule<T, R>(fragmentName, laminate.builder()).apply(rule)
data.map(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
}
}
rule(env)
val res = data.map(outputType, meta = env.meta) { env.result(it) }
set(env.name, res)
collectFrom(set.flow().flatMapConcat(transform = ::splitOne))
scope.launch {
set.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
collectFrom(set.flowChildren(name).flatMapConcat(transform = ::splitOne))
}
}
}

View File

@ -0,0 +1,79 @@
package hep.dataforge.data
import hep.dataforge.names.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.*
import kotlin.reflect.KClass
private class StaticDataTree<T : Any>(
override val dataType: KClass<out T>,
) : DataSetBuilder<T>, DataTree<T> {
private val items: MutableMap<NameToken, DataTreeItem<T>> = HashMap()
override val updates: Flow<Name> = emptyFlow()
override suspend fun items(): Map<NameToken, DataTreeItem<T>> = items
override fun remove(name: Name) {
when (name.length) {
0 -> error("Can't remove root tree node")
1 -> items.remove(name.firstOrNull()!!)
else -> (items[name.firstOrNull()!!].tree as? StaticDataTree<T>)?.remove(name.cutFirst())
}
}
fun getOrCreateNode(name: Name): StaticDataTree<T> = when (name.length) {
0 -> this
1 -> {
val itemName = name.firstOrNull()!!
(items[itemName].tree as? StaticDataTree<T>) ?: StaticDataTree(dataType).also {
items[itemName] = DataTreeItem.Node(it)
}
}
else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName())
}
private operator fun set(name: Name, item: DataTreeItem<T>?) {
if (name.isEmpty()) error("Can't set top level tree node")
if (item == null) {
remove(name)
} else {
getOrCreateNode(name.cutLast()).items[name.lastOrNull()!!] = item
}
}
override fun set(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) })
}
override suspend fun set(name: Name, dataSet: DataSet<T>) {
if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet))
} else {
coroutineScope {
dataSet.flow().collect {
set(name + it.name, it.data)
}
}
}
}
override fun set(name: Name, block: DataSetBuilder<T>.() -> Unit) {
val tree = StaticDataTree(dataType).apply(block)
set(name, DataTreeItem.Node(tree))
}
}
public fun <T : Any> DataTree.Companion.static(
dataType: KClass<out T>,
block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree(dataType).apply(block)
public inline fun <reified T : Any> DataTree.Companion.static(
noinline block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = static(T::class, block)
public suspend fun <T : Any> DataSet<T>.toStaticTree(): DataTree<T> = StaticDataTree(dataType).apply {
update(this@toStaticTree)
}

View File

@ -0,0 +1,20 @@
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
/**
* Get a metadata node for this set if it is present
*/
public suspend fun DataSet<*>.getMeta(): Meta? = getData(DataSet.META_KEY)?.meta
/**
* Add meta-data node to a [DataSet]
*/
public fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta))
/**
* Add meta-data node to a [DataSet]
*/
public fun DataSetBuilder<*>.meta(metaBuilder: MetaBuilder.() -> Unit): Unit = meta(Meta(metaBuilder))

View File

@ -1,27 +0,0 @@
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.names.NameToken
import kotlin.reflect.KClass
/**
* A zero-copy data node wrapper that returns only children with appropriate type.
*/
public class TypeFilteredDataNode<out T : Any>(public val origin: DataNode<*>, override val type: KClass<out T>) : DataNode<T> {
override val meta: Meta get() = origin.meta
override val items: Map<NameToken, DataItem<T>> by lazy {
origin.items.mapNotNull { (key, item) ->
when (item) {
is DataItem.Leaf -> {
(item.data.filterIsInstance(type))?.let {
key to DataItem.Leaf(it)
}
}
is DataItem.Node -> {
key to DataItem.Node(item.node.filterIsInstance(type))
}
}
}.associate { it }
}
}

View File

@ -1,9 +1,7 @@
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.names.NameToken
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlinx.coroutines.runBlocking
import kotlin.reflect.KClass
import kotlin.reflect.full.isSubclassOf
@ -11,13 +9,8 @@ import kotlin.reflect.full.isSubclassOf
/**
* Block the thread and get data content
*/
public fun <T : Any> Data<T>.get(): T = runBlocking { await() }
public fun <T : Any> Data<T>.value(): T = runBlocking { await() }
/**
* Check that node is compatible with given type meaning that each element could be cast to the type
*/
internal fun <R : Any> DataNode<*>.canCast(type: KClass<out R>): Boolean =
type.isSubclassOf(this.type)
/**
* Check if data could be safely cast to given class
*/
@ -25,88 +18,53 @@ internal fun <R : Any> Data<*>.canCast(type: KClass<out R>): Boolean =
this.type.isSubclassOf(type)
public fun <R : Any, T : R> Data<T>.upcast(type: KClass<out R>): Data<R> {
return object : Data<R> by this {
override val type: KClass<out R> = type
}
}
/**
* Safe upcast a [Data] to a supertype
*/
public inline fun <reified R : Any, T : R> Data<T>.upcast(): Data<R> = upcast(R::class)
public fun <R : Any> DataItem<*>.canCast(type: KClass<out R>): Boolean = when (this) {
is DataItem.Node -> node.canCast(type)
is DataItem.Leaf -> data.canCast(type)
}
/**
* Unsafe cast of data node
*/
@Suppress("UNCHECKED_CAST")
public fun <R : Any> Data<*>.cast(type: KClass<out R>): Data<R> {
if(!canCast(type)) error("Can't cast ${this.type} to $type")
return object : Data<R> {
override val meta: Meta get() = this@cast.meta
override val dependencies: Collection<Goal<*>> get() = this@cast.dependencies
override val result: Deferred<R>? get() = this@cast.result as Deferred<R>?
override fun startAsync(coroutineScope: CoroutineScope): Deferred<R> = this@cast.run {
startAsync(coroutineScope) as Deferred<R>
}
override fun reset() = this@cast.reset()
override val type: KClass<out R> = type
}
}
public inline fun <reified R : Any> Data<*>.cast(): Data<R> = cast(R::class)
@Suppress("UNCHECKED_CAST")
public fun <R : Any> DataNode<*>.cast(type: KClass<out R>): DataNode<R> {
return object : DataNode<R> {
override val meta: Meta get() = this@cast.meta
override val type: KClass<out R> = type
override val items: Map<NameToken, DataItem<R>> get() = this@cast.items as Map<NameToken, DataItem<R>>
}
}
public inline fun <reified R : Any> DataNode<*>.cast(): DataNode<R> = cast(R::class)
/**
* Check that node is compatible with given type meaning that each element could be cast to the type
*/
public fun <T : Any> DataNode<*>.ensureType(type: KClass<out T>) {
if (!canCast(type)) {
error("$type expected, but $type received")
}
}
//public fun <R : Any, T : R> Data<T>.upcast(type: KClass<out R>): Data<R> {
// return object : Data<R> by this {
// override val type: KClass<out R> = type
// }
//}
//
///**
// * Safe upcast a [Data] to a supertype
// */
//public inline fun <reified R : Any, T : R> Data<T>.upcast(): Data<R> = upcast(R::class)
/**
* Cast the node to given type if the cast is possible or return null
*/
public fun <R : Any> Data<*>.filterIsInstance(type: KClass<out R>): Data<R>? =
if (canCast(type)) cast(type) else null
/**
* Filter a node by data and node type. Resulting node and its subnodes is guaranteed to have border type [type],
* but could contain empty nodes
*/
public fun <R : Any> DataNode<*>.filterIsInstance(type: KClass<out R>): DataNode<R> {
return when {
canCast(type) -> cast(type)
this is TypeFilteredDataNode -> origin.filterIsInstance(type)
else -> TypeFilteredDataNode(this, type)
@Suppress("UNCHECKED_CAST")
public fun <R : Any> Data<*>.castOrNull(type: KClass<out R>): Data<R>? =
if (!canCast(type)) null else object : Data<R> by (this as Data<R>) {
override val type: KClass<out R> = type
}
}
/**
* Filter all elements of given data item that could be cast to given type. If no elements are available, return null.
* Unsafe cast of data node
*/
public fun <R : Any> DataItem<*>?.filterIsInstance(type: KClass<out R>): DataItem<R>? = when (this) {
null -> null
is DataItem.Node -> DataItem.Node(this.node.filterIsInstance(type))
is DataItem.Leaf -> this.data.filterIsInstance(type)?.let { DataItem.Leaf(it) }
public fun <R : Any> Data<*>.cast(type: KClass<out R>): Data<R> =
castOrNull(type) ?: error("Can't cast ${this.type} to $type")
public inline fun <reified R : Any> Data<*>.cast(): Data<R> = cast(R::class)
@Suppress("UNCHECKED_CAST")
public fun <R : Any> DataSet<*>.castOrNull(type: KClass<out R>): DataSet<R>? =
if (!canCast(type)) null else object : DataSet<R> by (this as DataSet<R>) {
override val dataType: KClass<out R> = type
}
public fun <R : Any> DataSet<*>.cast(type: KClass<out R>): DataSet<R> =
castOrNull(type) ?: error("Can't cast ${this.dataType} to $type")
/**
* Check that node is compatible with given type meaning that each element could be cast to the type
*/
internal fun <R : Any> DataSet<*>.canCast(type: KClass<out R>): Boolean =
type.isSubclassOf(this.dataType)
public operator fun <T : Any> DataTree<T>.get(name: Name): DataTreeItem<T>? = runBlocking {
getItem(name)
}
public inline fun <reified R : Any> DataItem<*>?.filterIsInstance(): DataItem<R>? = this@filterIsInstance.filterIsInstance(R::class)
public operator fun <T : Any> DataTree<T>.get(name: String): DataTreeItem<T>? = get(name.toName())

View File

@ -0,0 +1,29 @@
package hep.dataforge.data
import hep.dataforge.names.Name
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlin.reflect.KClass
public fun <R : Any> DataSet<*>.filterIsInstance(type: KClass<out R>): DataSet<R> = object : DataSet<R> {
override val dataType: KClass<out R> = type
@Suppress("UNCHECKED_CAST")
override fun flow(): Flow<NamedData<R>> = this@filterIsInstance.flow().filter {
it.canCast(type)
}.map {
it as NamedData<R>
}
override suspend fun getData(name: Name): Data<R>? = this@filterIsInstance.getData(name)?.castOrNull(type)
override val updates: Flow<Name> = this@filterIsInstance.updates.filter {
val datum = this@filterIsInstance.getData(it)
datum?.canCast(type) ?: false
}
}
public inline fun <reified R : Any> DataSet<*>.filterIsInstance(): DataSet<R> = filterIsInstance(R::class)

View File

@ -1,5 +1,6 @@
package hep.dataforge.data
import kotlinx.coroutines.runBlocking
import kotlin.test.Test
import kotlin.test.assertTrue
@ -7,23 +8,24 @@ import kotlin.test.assertTrue
internal class DataTreeBuilderTest{
@Test
fun testDataUpdate(){
val updateData = DataTree<Any>{
val updateData: DataTree<Any> = DataTree.static{
"update" put {
"a" put Data.static("a")
"b" put Data.static("b")
}
}
val node = DataTree<Any>{
node("primary"){
static("a","a")
static("b","b")
val node = DataTree.static<Any>{
set("primary"){
data("a","a")
data("b","b")
}
data("root","root")
runBlocking {
update(updateData)
}
static("root","root")
update(updateData)
}
println(node.toMeta())
assertTrue { node["update.a"] != null }
assertTrue { node["primary.a"] != null }

View File

@ -22,10 +22,10 @@ import net.mamoe.yamlkt.*
public fun Meta.toYaml(): YamlMap {
val map: Map<String, Any?> = items.entries.associate { (key, item) ->
key.toString() to when (item) {
is ValueItem -> {
is MetaItemValue -> {
item.value.value
}
is NodeItem -> {
is MetaItemNode -> {
item.node.toYaml()
}
}
@ -53,7 +53,7 @@ private class YamlMeta(private val yamlMap: YamlMap, private val descriptor: Nod
(it as YamlLiteral).content.parseValue()
}
)
map[token] = ValueItem(listValue)
map[token] = MetaItemValue(listValue)
} else value.forEachIndexed { index, yamlElement ->
val indexKey = (itemDescriptor as? NodeDescriptor)?.indexKey ?: ItemDescriptor.DEFAULT_INDEX_KEY
val indexValue: String = (yamlElement as? YamlMap)?.getStringOrNull(indexKey)

View File

@ -19,7 +19,7 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
override fun invoke(meta: Meta, context: Context): MetaFormat = this
override fun readMeta(input: Input, descriptor: NodeDescriptor?): Meta {
return (input.readMetaItem() as NodeItem).node
return (input.readMetaItem() as MetaItemNode).node
}
private fun Output.writeChar(char: Char) = writeByte(char.toByte())
@ -85,10 +85,10 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
meta.items.forEach { (key, item) ->
output.writeString(key.toString())
when (item) {
is ValueItem -> {
is MetaItemValue -> {
output.writeValue(item.value)
}
is NodeItem -> {
is MetaItemNode -> {
writeObject(output, item.node)
}
}
@ -103,19 +103,19 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
@Suppress("UNCHECKED_CAST")
public fun Input.readMetaItem(): TypedMetaItem<MetaBuilder> {
return when (val keyChar = readByte().toChar()) {
'S' -> ValueItem(StringValue(readString()))
'N' -> ValueItem(Null)
'+' -> ValueItem(True)
'-' -> ValueItem(True)
's' -> ValueItem(NumberValue(readShort()))
'i' -> ValueItem(NumberValue(readInt()))
'l' -> ValueItem(NumberValue(readInt()))
'f' -> ValueItem(NumberValue(readFloat()))
'd' -> ValueItem(NumberValue(readDouble()))
'S' -> MetaItemValue(StringValue(readString()))
'N' -> MetaItemValue(Null)
'+' -> MetaItemValue(True)
'-' -> MetaItemValue(True)
's' -> MetaItemValue(NumberValue(readShort()))
'i' -> MetaItemValue(NumberValue(readInt()))
'l' -> MetaItemValue(NumberValue(readInt()))
'f' -> MetaItemValue(NumberValue(readFloat()))
'd' -> MetaItemValue(NumberValue(readDouble()))
'L' -> {
val length = readInt()
val list = (1..length).map { (readMetaItem() as ValueItem).value }
ValueItem(Value.of(list))
val list = (1..length).map { (readMetaItem() as MetaItemValue).value }
MetaItemValue(Value.of(list))
}
'M' -> {
val length = readInt()
@ -126,7 +126,7 @@ public object BinaryMetaFormat : MetaFormat, MetaFormatFactory {
set(name, item)
}
}
NodeItem(meta)
MetaItemNode(meta)
}
else -> error("Unknown serialization key character: $keyChar")
}

View File

@ -3,12 +3,13 @@ package hep.dataforge.io
import hep.dataforge.context.Context
import hep.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORMAT_TYPE
import hep.dataforge.meta.Meta
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.type.Type
import kotlinx.io.Input
import kotlinx.io.Output
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A partially read envelope with meta, but without data
@ -16,6 +17,8 @@ import kotlin.reflect.KClass
public data class PartialEnvelope(val meta: Meta, val dataOffset: UInt, val dataSize: ULong?)
public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>()
public val defaultMetaFormat: MetaFormatFactory get() = JsonMetaFormat
public fun readPartial(input: Input): PartialEnvelope
@ -37,7 +40,7 @@ public fun EnvelopeFormat.read(input: Input): Envelope = readObject(input)
@Type(ENVELOPE_FORMAT_TYPE)
public interface EnvelopeFormatFactory : IOFormatFactory<Envelope>, EnvelopeFormat {
override val name: Name get() = "envelope".asName()
override val type: KClass<out Envelope> get() = Envelope::class
override val type: KType get() = typeOf<Envelope>()
override fun invoke(meta: Meta, context: Context): EnvelopeFormat

View File

@ -2,25 +2,28 @@ package hep.dataforge.io
import hep.dataforge.context.Context
import hep.dataforge.context.Factory
import hep.dataforge.context.Named
import hep.dataforge.io.IOFormat.Companion.NAME_KEY
import hep.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaItemValue
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.ValueItem
import hep.dataforge.misc.Named
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.type.Type
import hep.dataforge.values.Value
import kotlinx.io.*
import kotlinx.io.buffer.Buffer
import kotlinx.io.pool.ObjectPool
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* And interface for reading and writing objects into with IO streams
*/
public interface IOFormat<T : Any> : MetaRepr {
public val type: KType
public fun writeObject(output: Output, obj: T)
public fun readObject(input: Input): T
@ -42,10 +45,14 @@ public fun <T : Any> Binary.readWith(format: IOFormat<T>): T = read {
public fun <T : Any> Output.writeWith(format: IOFormat<T>, obj: T): Unit =
format.run { writeObject(this@writeWith, obj) }
public class ListIOFormat<T : Any>(public val format: IOFormat<T>) : IOFormat<List<T>> {
public inline fun <reified T : Any> IOFormat.Companion.listOf(
format: IOFormat<T>,
): IOFormat<List<T>> = object : IOFormat<List<T>> {
override val type: KType = typeOf<List<T>>()
override fun writeObject(output: Output, obj: List<T>) {
output.writeInt(obj.size)
this.format.run {
format.run {
obj.forEach {
writeObject(output, it)
}
@ -63,9 +70,8 @@ public class ListIOFormat<T : Any>(public val format: IOFormat<T>) : IOFormat<Li
NAME_KEY put "list"
"contentFormat" put format.toMeta()
}
}
//public val <T : Any> IOFormat<T>.list: ListIOFormat<T> get() = ListIOFormat(this)
}
public fun ObjectPool<Buffer>.fill(block: Buffer.() -> Unit): Buffer {
val buffer = borrow()
@ -82,7 +88,7 @@ public interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named, MetaRep
/**
* Explicit type for dynamic type checks
*/
public val type: KClass<out T>
public val type: KType
override fun toMeta(): Meta = Meta {
NAME_KEY put name.toString()
@ -100,7 +106,7 @@ public object DoubleIOFormat : IOFormat<Double>, IOFormatFactory<Double> {
override val name: Name = "double".asName()
override val type: KClass<out Double> get() = Double::class
override val type: KType get() = typeOf<Double>()
override fun writeObject(output: Output, obj: kotlin.Double) {
output.writeDouble(obj)
@ -114,14 +120,14 @@ public object ValueIOFormat : IOFormat<Value>, IOFormatFactory<Value> {
override val name: Name = "value".asName()
override val type: KClass<out Value> get() = Value::class
override val type: KType get() = typeOf<Value>()
override fun writeObject(output: Output, obj: Value) {
BinaryMetaFormat.run { output.writeValue(obj) }
}
override fun readObject(input: Input): Value {
return (BinaryMetaFormat.run { input.readMetaItem() } as? ValueItem)?.value
return (BinaryMetaFormat.run { input.readMetaItem() } as? MetaItemValue)?.value
?: error("The item is not a value")
}
}

View File

@ -16,12 +16,16 @@ import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonObject
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A Json format for Meta representation
*/
public class JsonMetaFormat(private val json: Json = DEFAULT_JSON) : MetaFormat {
override val type: KType get() = typeOf<Meta>()
override fun writeMeta(output: Output, meta: Meta, descriptor: NodeDescriptor?) {
val jsonObject = meta.toJson(descriptor)
output.writeUtf8String(json.encodeToString(JsonObject.serializer(), jsonObject))

View File

@ -4,20 +4,22 @@ import hep.dataforge.context.Context
import hep.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.plus
import hep.dataforge.type.Type
import kotlinx.io.ByteArrayInput
import kotlinx.io.Input
import kotlinx.io.Output
import kotlinx.io.use
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A format for meta serialization
*/
public interface MetaFormat : IOFormat<Meta> {
override val type: KType get() = typeOf<Meta>()
override fun writeObject(output: Output, obj: Meta) {
writeMeta(output, obj, null)
@ -40,7 +42,7 @@ public interface MetaFormatFactory : IOFormatFactory<Meta>, MetaFormat {
override val name: Name get() = "meta".asName() + shortName
override val type: KClass<out Meta> get() = Meta::class
override val type: KType get() = typeOf<Meta>()
public val key: Short get() = name.hashCode().toShort()

View File

@ -8,7 +8,8 @@ import kotlinx.io.*
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import kotlin.reflect.full.isSuperclassOf
import kotlin.reflect.full.isSupertypeOf
import kotlin.reflect.typeOf
import kotlin.streams.asSequence
public fun <R> Path.read(block: Input.() -> R): R = asBinary().read(block = block)
@ -59,7 +60,7 @@ public fun Path.readEnvelope(format: EnvelopeFormat): Envelope {
@Suppress("UNCHECKED_CAST")
@DFExperimental
public inline fun <reified T : Any> IOPlugin.resolveIOFormat(): IOFormat<T>? {
return ioFormatFactories.find { it.type.isSuperclassOf(T::class) } as IOFormat<T>?
return ioFormatFactories.find { it.type.isSupertypeOf(typeOf<T>())} as IOFormat<T>?
}
/**
@ -115,18 +116,18 @@ public fun IOPlugin.writeMetaFile(
* Return inferred [EnvelopeFormat] if only one format could read given file. If no format accepts file, return null. If
* multiple formats accepts file, throw an error.
*/
public fun IOPlugin.peekBinaryFormat(path: Path): EnvelopeFormat? {
public fun IOPlugin.peekFileEnvelopeFormat(path: Path): EnvelopeFormat? {
val binary = path.asBinary()
val formats = envelopeFormatFactories.mapNotNull { factory ->
binary.read {
factory.peekFormat(this@peekBinaryFormat, this@read)
factory.peekFormat(this@peekFileEnvelopeFormat, this@read)
}
}
return when (formats.size) {
0 -> null
1 -> formats.first()
else -> error("Envelope format binary recognition clash")
else -> error("Envelope format binary recognition clash: $formats")
}
}
@ -137,10 +138,10 @@ public val IOPlugin.Companion.DATA_FILE_NAME: String get() = "@data"
* Read and envelope from file if the file exists, return null if file does not exist.
*
* If file is directory, then expect two files inside:
* * **meta.<format name>** for meta
* * **meta.<meta format extension>** for meta
* * **data** for data
*
* If the file is envelope read it using [EnvelopeFormatFactory.peekFormat] functionality to infer format.
* If the file is envelope read it using [EnvelopeFormatFactory.peekFormat] functionality to infer format (if not overridden with [formatPicker]).
*
* If the file is not an envelope and [readNonEnvelopes] is true, return an Envelope without meta, using file as binary.
*
@ -150,7 +151,7 @@ public val IOPlugin.Companion.DATA_FILE_NAME: String get() = "@data"
public fun IOPlugin.readEnvelopeFile(
path: Path,
readNonEnvelopes: Boolean = false,
formatPeeker: IOPlugin.(Path) -> EnvelopeFormat? = IOPlugin::peekBinaryFormat,
formatPicker: IOPlugin.(Path) -> EnvelopeFormat? = IOPlugin::peekFileEnvelopeFormat,
): Envelope? {
if (!Files.exists(path)) return null
@ -177,7 +178,7 @@ public fun IOPlugin.readEnvelopeFile(
return SimpleEnvelope(meta, data)
}
return formatPeeker(path)?.let { format ->
return formatPicker(path)?.let { format ->
path.readEnvelope(format)
} ?: if (readNonEnvelopes) { // if no format accepts file, read it as binary
SimpleEnvelope(Meta.EMPTY, path.asBinary())

View File

@ -56,12 +56,12 @@ public class Config() : AbstractMutableMeta<Config>(), ObservableItemProvider {
override fun replaceItem(key: NameToken, oldItem: TypedMetaItem<Config>?, newItem: TypedMetaItem<Config>?) {
if (newItem == null) {
children.remove(key)
if (oldItem != null && oldItem is NodeItem<Config>) {
if (oldItem != null && oldItem is MetaItemNode<Config>) {
oldItem.node.removeListener(this)
}
} else {
children[key] = newItem
if (newItem is NodeItem) {
if (newItem is MetaItemNode) {
newItem.node.onChange(this) { name, oldChild, newChild ->
itemChanged(key + name, oldChild, newChild)
}
@ -102,8 +102,8 @@ public fun Meta.toConfig(): Config = Config().also { builder ->
this.items.mapValues { entry ->
val item = entry.value
builder[entry.key.asName()] = when (item) {
is ValueItem -> item.value
is NodeItem -> NodeItem(item.node.asConfig())
is MetaItemValue -> item.value
is MetaItemNode -> MetaItemNode(item.node.asConfig())
}
}
}

View File

@ -17,7 +17,7 @@ public fun interface ItemProvider {
/**
* Perform recursive item search using given [name]. Each [NameToken] is treated as a name in [Meta.items] of a parent node.
*
* If [name] is empty return current [Meta] as a [NodeItem]
* If [name] is empty return current [Meta] as a [MetaItemNode]
*/
public operator fun ItemProvider?.get(name: Name): MetaItem? = this?.getItem(name)

View File

@ -38,10 +38,10 @@ private fun Meta.toJsonWithIndex(descriptor: NodeDescriptor?, indexValue: String
val elementMap = HashMap<String, JsonElement>()
fun MetaItem.toJsonElement(itemDescriptor: ItemDescriptor?, index: String?): JsonElement = when (this) {
is ValueItem -> {
is MetaItemValue -> {
value.toJson(itemDescriptor as? ValueDescriptor)
}
is NodeItem -> {
is MetaItemNode -> {
node.toJsonWithIndex(itemDescriptor as? NodeDescriptor, index)
}
}
@ -99,11 +99,11 @@ public fun JsonPrimitive.toValue(descriptor: ValueDescriptor?): Value {
public fun JsonElement.toMetaItem(descriptor: ItemDescriptor? = null): TypedMetaItem<JsonMeta> = when (this) {
is JsonPrimitive -> {
val value = this.toValue(descriptor as? ValueDescriptor)
ValueItem(value)
MetaItemValue(value)
}
is JsonObject -> {
val meta = JsonMeta(this, descriptor as? NodeDescriptor)
NodeItem(meta)
MetaItemNode(meta)
}
is JsonArray -> {
if (this.all { it is JsonPrimitive }) {
@ -115,7 +115,7 @@ public fun JsonElement.toMetaItem(descriptor: ItemDescriptor? = null): TypedMeta
(it as JsonPrimitive).toValue(descriptor as? ValueDescriptor)
}.asValue()
}
ValueItem(value)
MetaItemValue(value)
} else {
//We can't return multiple items therefore we create top level node
buildJsonObject { put(JSON_ARRAY_KEY, this@toMetaItem) }.toMetaItem(descriptor)
@ -136,10 +136,10 @@ public class JsonMeta(private val json: JsonObject, private val descriptor: Node
val itemDescriptor = descriptor?.items?.get(jsonKey)
when (value) {
is JsonPrimitive -> {
map[key] = ValueItem(value.toValue(itemDescriptor as? ValueDescriptor))
map[key] = MetaItemValue(value.toValue(itemDescriptor as? ValueDescriptor))
}
is JsonObject -> {
map[key] = NodeItem(
map[key] = MetaItemNode(
JsonMeta(
value,
itemDescriptor as? NodeDescriptor
@ -153,7 +153,7 @@ public class JsonMeta(private val json: JsonObject, private val descriptor: Node
(it as JsonPrimitive).toValue(itemDescriptor as? ValueDescriptor)
}
)
map[key] = ValueItem(listValue)
map[key] = MetaItemValue(listValue)
} else value.forEachIndexed { index, jsonElement ->
val indexKey = (itemDescriptor as? NodeDescriptor)?.indexKey ?: DEFAULT_INDEX_KEY
val indexValue: String = (jsonElement as? JsonObject)

View File

@ -44,11 +44,11 @@ public class Laminate(layers: List<Meta>) : MetaBase() {
private fun Sequence<MetaItem>.merge(): TypedMetaItem<SealedMeta> {
return when {
all { it is ValueItem } -> //If all items are values, take first
all { it is MetaItemValue } -> //If all items are values, take first
first().seal()
all { it is NodeItem } -> {
all { it is MetaItemNode } -> {
//list nodes in item
val nodes = map { (it as NodeItem).node }
val nodes = map { (it as MetaItemNode).node }
//represent as key->value entries
val entries = nodes.flatMap { it.items.entries.asSequence() }
//group by keys
@ -57,13 +57,13 @@ public class Laminate(layers: List<Meta>) : MetaBase() {
val items = groups.mapValues { entry ->
entry.value.asSequence().map { it.value }.merge()
}
NodeItem(SealedMeta(items))
MetaItemNode(SealedMeta(items))
}
else -> map {
when (it) {
is ValueItem -> NodeItem(Meta { Meta.VALUE_KEY put it.value })
is NodeItem -> it
is MetaItemValue -> MetaItemNode(Meta { Meta.VALUE_KEY put it.value })
is MetaItemNode -> it
}
}.merge()
}

View File

@ -16,8 +16,8 @@ public interface MetaRepr {
/**
* Generic meta tree representation. Elements are [TypedMetaItem] objects that could be represented by three different entities:
* * [ValueItem] (leaf)
* * [NodeItem] single node
* * [MetaItemValue] (leaf)
* * [MetaItemNode] single node
*
* * Same name siblings are supported via elements with the same [Name] but different queries
*/
@ -28,7 +28,7 @@ public interface Meta : MetaRepr, ItemProvider {
public val items: Map<NameToken, MetaItem>
override fun getItem(name: Name): MetaItem? {
if (name.isEmpty()) return NodeItem(this)
if (name.isEmpty()) return MetaItemNode(this)
return name.firstOrNull()?.let { token ->
val tail = name.cutFirst()
when (tail.length) {
@ -68,8 +68,8 @@ public operator fun Meta.get(token: NameToken): MetaItem? = items.get(token)
public fun Meta.valueSequence(): Sequence<Pair<Name, Value>> {
return items.asSequence().flatMap { (key, item) ->
when (item) {
is ValueItem -> sequenceOf(key.asName() to item.value)
is NodeItem -> item.node.valueSequence().map { pair -> (key.asName() + pair.first) to pair.second }
is MetaItemValue -> sequenceOf(key.asName() to item.value)
is MetaItemNode -> item.node.valueSequence().map { pair -> (key.asName() + pair.first) to pair.second }
}
}
}
@ -80,7 +80,7 @@ public fun Meta.valueSequence(): Sequence<Pair<Name, Value>> {
public fun Meta.itemSequence(): Sequence<Pair<Name, MetaItem>> = sequence {
items.forEach { (key, item) ->
yield(key.asName() to item)
if (item is NodeItem) {
if (item is MetaItemNode) {
yieldAll(item.node.itemSequence().map { (innerKey, innerItem) ->
(key + innerKey) to innerItem
})

View File

@ -71,7 +71,7 @@ public class MetaBuilder : AbstractMutableMeta<MetaBuilder>() {
set(this,value.toList())
}
public infix fun String.put(metaBuilder: MetaBuilder.() -> Unit) {
public inline infix fun String.put(metaBuilder: MetaBuilder.() -> Unit) {
this@MetaBuilder[this] = MetaBuilder().apply(metaBuilder)
}
@ -126,8 +126,8 @@ public fun Meta.builder(): MetaBuilder {
items.mapValues { entry ->
val item = entry.value
builder[entry.key.asName()] = when (item) {
is ValueItem -> item.value
is NodeItem -> NodeItem(item.node.builder())
is MetaItemValue -> item.value
is MetaItemNode -> MetaItemNode(item.node.builder())
}
}
}

View File

@ -5,8 +5,8 @@ import kotlinx.serialization.Serializable
/**
* A member of the meta tree. Could be represented as one of following:
* * a [ValueItem] (leaf)
* * a [NodeItem] (node)
* * a [MetaItemValue] (leaf)
* * a [MetaItemNode] (node)
*/
@Serializable(MetaItemSerializer::class)
public sealed class TypedMetaItem<out M : Meta>() {
@ -31,11 +31,11 @@ public sealed class TypedMetaItem<out M : Meta>() {
public typealias MetaItem = TypedMetaItem<*>
@Serializable(MetaItemSerializer::class)
public class ValueItem(public val value: Value) : TypedMetaItem<Nothing>() {
public class MetaItemValue(public val value: Value) : TypedMetaItem<Nothing>() {
override fun toString(): String = value.toString()
override fun equals(other: Any?): Boolean {
return this.value == (other as? ValueItem)?.value
return this.value == (other as? MetaItemValue)?.value
}
override fun hashCode(): Int {
@ -44,25 +44,25 @@ public class ValueItem(public val value: Value) : TypedMetaItem<Nothing>() {
}
@Serializable(MetaItemSerializer::class)
public class NodeItem<M : Meta>(public val node: M) : TypedMetaItem<M>() {
public class MetaItemNode<M : Meta>(public val node: M) : TypedMetaItem<M>() {
//Fixing serializer for node could cause class cast problems, but it should not since Meta descendants are not serializable
override fun toString(): String = node.toString()
override fun equals(other: Any?): Boolean = node == (other as? NodeItem<*>)?.node
override fun equals(other: Any?): Boolean = node == (other as? MetaItemNode<*>)?.node
override fun hashCode(): Int = node.hashCode()
}
public fun Value.asMetaItem(): ValueItem = ValueItem(this)
public fun <M : Meta> M.asMetaItem(): NodeItem<M> = NodeItem(this)
public fun Value.asMetaItem(): MetaItemValue = MetaItemValue(this)
public fun <M : Meta> M.asMetaItem(): MetaItemNode<M> = MetaItemNode(this)
/**
* Unsafe methods to access values and nodes directly from [TypedMetaItem]
*/
public val MetaItem?.value: Value?
get() = (this as? ValueItem)?.value
?: (this?.node?.get(Meta.VALUE_KEY) as? ValueItem)?.value
get() = (this as? MetaItemValue)?.value
?: (this?.node?.get(Meta.VALUE_KEY) as? MetaItemValue)?.value
public val MetaItem?.string: String? get() = value?.string
public val MetaItem?.boolean: Boolean? get() = value?.boolean
@ -73,17 +73,18 @@ public val MetaItem?.int: Int? get() = number?.toInt()
public val MetaItem?.long: Long? get() = number?.toLong()
public val MetaItem?.short: Short? get() = number?.toShort()
public inline fun <reified E : Enum<E>> MetaItem?.enum(): E? = if (this is ValueItem && this.value is EnumValue<*>) {
this.value.value as E
} else {
string?.let { enumValueOf<E>(it) }
}
public inline fun <reified E : Enum<E>> MetaItem?.enum(): E? =
if (this is MetaItemValue && this.value is EnumValue<*>) {
this.value.value as E
} else {
string?.let { enumValueOf<E>(it) }
}
public val MetaItem.stringList: List<String>? get() = value?.list?.map { it.string }
public val <M : Meta> TypedMetaItem<M>?.node: M?
get() = when (this) {
null -> null
is ValueItem -> null//error("Trying to interpret value meta item as node item")
is NodeItem -> node
is MetaItemValue -> null//error("Trying to interpret value meta item as node item")
is MetaItemNode -> node
}

View File

@ -38,10 +38,10 @@ public object MetaItemSerializer : KSerializer<MetaItem> {
override fun serialize(encoder: Encoder, value: MetaItem) {
encoder.encodeStructure(descriptor) {
encodeBooleanElement(descriptor, 0, value is NodeItem)
encodeBooleanElement(descriptor, 0, value is MetaItemNode)
when (value) {
is ValueItem -> encodeSerializableElement(descriptor, 1, ValueSerializer, value.value)
is NodeItem -> encodeSerializableElement(descriptor, 1, MetaSerializer, value.node)
is MetaItemValue -> encodeSerializableElement(descriptor, 1, ValueSerializer, value.value)
is MetaItemNode -> encodeSerializableElement(descriptor, 1, MetaSerializer, value.node)
}
}
}

View File

@ -119,7 +119,7 @@ public fun MutableItemProvider.node(key: Name? = null): ReadWriteProperty<Any?,
)
public inline fun <reified M : MutableMeta<M>> M.node(key: Name? = null): ReadWriteProperty<Any?, M?> =
item(key).convert(reader = { it?.let { it.node as M } }, writer = { it?.let { NodeItem(it) } })
item(key).convert(reader = { it?.let { it.node as M } }, writer = { it?.let { MetaItemNode(it) } })
/* Number delegates */

View File

@ -64,7 +64,7 @@ public fun MutableItemProvider.setIndexed(
metas: Iterable<Meta>,
indexFactory: (Meta, index: Int) -> String = { _, index -> index.toString() },
) {
setIndexedItems(name, metas.map { NodeItem(it) }) { item, index -> indexFactory(item.node!!, index) }
setIndexedItems(name, metas.map { MetaItemNode(it) }) { item, index -> indexFactory(item.node!!, index) }
}
public operator fun MutableItemProvider.set(name: Name, metas: Iterable<Meta>): Unit = setIndexed(name, metas)

View File

@ -30,8 +30,8 @@ public abstract class AbstractMutableMeta<M : MutableMeta<M>> : AbstractTypedMet
private fun wrapItem(item: MetaItem?): TypedMetaItem<M>? = when (item) {
null -> null
is ValueItem -> item
is NodeItem -> NodeItem(wrapNode(item.node))
is MetaItemValue -> item
is MetaItemNode -> MetaItemNode(wrapNode(item.node))
}
/**
@ -56,7 +56,7 @@ public abstract class AbstractMutableMeta<M : MutableMeta<M>> : AbstractTypedMet
val token = name.firstOrNull()!!
//get existing or create new node. Query is ignored for new node
if (items[token] == null) {
replaceItem(token, null, NodeItem(empty()))
replaceItem(token, null, MetaItemNode(empty()))
}
items[token]?.node!!.set(name.cutFirst(), item)
}
@ -87,7 +87,7 @@ public fun MutableItemProvider.append(name: String, value: Any?): Unit = append(
public fun <M : AbstractMutableMeta<M>> M.edit(name: Name, builder: M.() -> Unit) {
val item = when (val existingItem = get(name)) {
null -> empty().also { set(name, it) }
is NodeItem<M> -> existingItem.node
is MetaItemNode<M> -> existingItem.node
else -> error("Can't edit value meta item")
}
item.apply(builder)

View File

@ -18,6 +18,6 @@ public fun Meta.seal(): SealedMeta = this as? SealedMeta ?: SealedMeta(items.map
@Suppress("UNCHECKED_CAST")
public fun MetaItem.seal(): TypedMetaItem<SealedMeta> = when (this) {
is ValueItem -> this
is NodeItem -> NodeItem(node.seal())
is MetaItemValue -> this
is MetaItemNode -> MetaItemNode(node.seal())
}

View File

@ -24,16 +24,16 @@ private class DescriptorMeta(val descriptor: NodeDescriptor) : Meta, MetaBase()
public fun NodeDescriptor.defaultMeta(): Laminate = Laminate(default, DescriptorMeta(this))
/**
* Build a default [NodeItem] from this node descriptor
* Build a default [MetaItemNode] from this node descriptor
*/
internal fun NodeDescriptor.defaultItem(): NodeItem<*> =
NodeItem(defaultMeta())
internal fun NodeDescriptor.defaultItem(): MetaItemNode<*> =
MetaItemNode(defaultMeta())
/**
* Build a default [ValueItem] from this descriptor
* Build a default [MetaItemValue] from this descriptor
*/
internal fun ValueDescriptor.defaultItem(): ValueItem? {
return ValueItem(default ?: return null)
internal fun ValueDescriptor.defaultItem(): MetaItemValue? {
return MetaItemValue(default ?: return null)
}
/**

View File

@ -279,7 +279,7 @@ public class ValueDescriptor(config: Config = Config()) : ItemDescriptor(config)
}
},
writer = {
ValueItem(it.asValue())
MetaItemValue(it.asValue())
}
)

View File

@ -11,8 +11,8 @@ import hep.dataforge.values.Value
public fun Meta.toMap(descriptor: NodeDescriptor? = null): Map<String, Any?> {
return items.entries.associate { (token, item) ->
token.toString() to when (item) {
is NodeItem -> item.node.toMap()
is ValueItem -> item.value.value
is MetaItemNode -> item.node.toMap()
is MetaItemValue -> item.value.value
}
}
}
@ -26,15 +26,15 @@ public fun Map<String, Any?>.toMeta(descriptor: NodeDescriptor? = null): Meta =
@Suppress("UNCHECKED_CAST")
fun toItem(value: Any?): MetaItem = when (value) {
is MetaItem -> value
is Meta -> NodeItem(value)
is Map<*, *> -> NodeItem((value as Map<String, Any?>).toMeta())
else -> ValueItem(Value.of(value))
is Meta -> MetaItemNode(value)
is Map<*, *> -> MetaItemNode((value as Map<String, Any?>).toMeta())
else -> MetaItemValue(Value.of(value))
}
entries.forEach { (key, value) ->
if (value is List<*>) {
val items = value.map { toItem(it) }
if (items.all { it is ValueItem }) {
if (items.all { it is MetaItemValue }) {
set(key, ListValue(items.map { it.value!! }))
} else {
setIndexedItems(key.toName(), value.map { toItem(it) })

View File

@ -19,90 +19,90 @@ public interface MetaConverter<T : Any> {
public val meta: MetaConverter<Meta> = object : MetaConverter<Meta> {
override fun itemToObject(item: MetaItem): Meta = when (item) {
is NodeItem -> item.node
is ValueItem -> item.value.toMeta()
is MetaItemNode -> item.node
is MetaItemValue -> item.value.toMeta()
}
override fun objectToMetaItem(obj: Meta): MetaItem = NodeItem(obj)
override fun objectToMetaItem(obj: Meta): MetaItem = MetaItemNode(obj)
}
public val value: MetaConverter<Value> = object : MetaConverter<Value> {
override fun itemToObject(item: MetaItem): Value = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}
override fun objectToMetaItem(obj: Value): MetaItem = ValueItem(obj)
override fun objectToMetaItem(obj: Value): MetaItem = MetaItemValue(obj)
}
public val string: MetaConverter<String> = object : MetaConverter<String> {
override fun itemToObject(item: MetaItem): String = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.string
override fun objectToMetaItem(obj: String): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: String): MetaItem = MetaItemValue(obj.asValue())
}
public val boolean: MetaConverter<Boolean> = object : MetaConverter<Boolean> {
override fun itemToObject(item: MetaItem): Boolean = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.boolean
override fun objectToMetaItem(obj: Boolean): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: Boolean): MetaItem = MetaItemValue(obj.asValue())
}
public val number: MetaConverter<Number> = object : MetaConverter<Number> {
override fun itemToObject(item: MetaItem): Number = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.number
override fun objectToMetaItem(obj: Number): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: Number): MetaItem = MetaItemValue(obj.asValue())
}
public val double: MetaConverter<Double> = object : MetaConverter<Double> {
override fun itemToObject(item: MetaItem): Double = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.double
override fun objectToMetaItem(obj: Double): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: Double): MetaItem = MetaItemValue(obj.asValue())
}
public val float: MetaConverter<Float> = object : MetaConverter<Float> {
override fun itemToObject(item: MetaItem): Float = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.float
override fun objectToMetaItem(obj: Float): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: Float): MetaItem = MetaItemValue(obj.asValue())
}
public val int: MetaConverter<Int> = object : MetaConverter<Int> {
override fun itemToObject(item: MetaItem): Int = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.int
override fun objectToMetaItem(obj: Int): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: Int): MetaItem = MetaItemValue(obj.asValue())
}
public val long: MetaConverter<Long> = object : MetaConverter<Long> {
override fun itemToObject(item: MetaItem): Long = when (item) {
is NodeItem -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is ValueItem -> item.value
is MetaItemNode -> item.node[Meta.VALUE_KEY].value ?: error("Can't convert node to a value")
is MetaItemValue -> item.value
}.long
override fun objectToMetaItem(obj: Long): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: Long): MetaItem = MetaItemValue(obj.asValue())
}
public inline fun <reified E : Enum<E>> enum(): MetaConverter<E> = object : MetaConverter<E> {
@Suppress("USELESS_CAST")
override fun itemToObject(item: MetaItem): E = item.enum<E>() as? E ?: error("The Item is not a Enum")
override fun objectToMetaItem(obj: E): MetaItem = ValueItem(obj.asValue())
override fun objectToMetaItem(obj: E): MetaItem = MetaItemValue(obj.asValue())
}
public fun <T> valueList(writer: (T) -> Value = {Value.of(it)}, reader: (Value) -> T): MetaConverter<List<T>> =
@ -111,7 +111,7 @@ public interface MetaConverter<T : Any> {
item.value?.list?.map(reader) ?: error("The item is not a value list")
override fun objectToMetaItem(obj: List<T>): MetaItem =
ValueItem(obj.map(writer).asValue())
MetaItemValue(obj.map(writer).asValue())
}
}
@ -120,5 +120,5 @@ public interface MetaConverter<T : Any> {
public fun <T : Any> MetaConverter<T>.nullableItemToObject(item: MetaItem?): T? = item?.let { itemToObject(it) }
public fun <T : Any> MetaConverter<T>.nullableObjectToMetaItem(obj: T?): MetaItem? = obj?.let { objectToMetaItem(it) }
public fun <T : Any> MetaConverter<T>.metaToObject(meta: Meta): T = itemToObject(NodeItem(meta))
public fun <T : Any> MetaConverter<T>.valueToObject(value: Value): T = itemToObject(ValueItem(value))
public fun <T : Any> MetaConverter<T>.metaToObject(meta: Meta): T = itemToObject(MetaItemNode(meta))
public fun <T : Any> MetaConverter<T>.valueToObject(value: Value): T = itemToObject(MetaItemValue(value))

View File

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hep.dataforge.context
package hep.dataforge.misc
import hep.dataforge.names.Name
import hep.dataforge.names.asName

View File

@ -1,4 +1,4 @@
package hep.dataforge.type
package hep.dataforge.misc
/**
* A text label for internal DataForge type classification. Alternative for mime container type.

View File

@ -114,7 +114,7 @@ public fun String.toName(): Name {
}
else -> when (it) {
'.' -> {
val query = if(queryBuilder.isEmpty()) null else queryBuilder.toString()
val query = if (queryBuilder.isEmpty()) null else queryBuilder.toString()
yield(NameToken(bodyBuilder.toString(), query))
bodyBuilder = StringBuilder()
queryBuilder = StringBuilder()
@ -128,7 +128,7 @@ public fun String.toName(): Name {
}
}
}
val query = if(queryBuilder.isEmpty()) null else queryBuilder.toString()
val query = if (queryBuilder.isEmpty()) null else queryBuilder.toString()
yield(NameToken(bodyBuilder.toString(), query))
}
return Name(tokens.toList())
@ -184,7 +184,16 @@ public fun Name.startsWith(token: NameToken): Boolean = firstOrNull() == token
public fun Name.endsWith(token: NameToken): Boolean = lastOrNull() == token
public fun Name.startsWith(name: Name): Boolean =
this.length >= name.length && tokens.subList(0, name.length) == name.tokens
this.length >= name.length && (this == name || tokens.subList(0, name.length) == name.tokens)
public fun Name.endsWith(name: Name): Boolean =
this.length >= name.length && tokens.subList(length - name.length, length) == name.tokens
this.length >= name.length && (this == name || tokens.subList(length - name.length, length) == name.tokens)
/**
* if [this] starts with given [head] name, returns the reminder of the name (could be empty). Otherwise returns null
*/
public fun Name.removeHeadOrNull(head: Name): Name? = if (startsWith(head)) {
Name(tokens.subList(head.length, head.length))
} else {
null
}

View File

@ -23,8 +23,8 @@ public fun Meta.toDynamic(): dynamic {
if (this is DynamicMeta) return this.obj
fun MetaItem.toDynamic(): dynamic = when (this) {
is ValueItem -> this.value.toDynamic()
is NodeItem -> this.node.toDynamic()
is MetaItemValue -> this.value.toDynamic()
is MetaItemNode -> this.node.toDynamic()
}
val res = js("{}")
@ -50,13 +50,13 @@ public class DynamicMeta(internal val obj: dynamic) : MetaBase() {
@Suppress("UNCHECKED_CAST", "USELESS_CAST")
private fun asItem(obj: dynamic): TypedMetaItem<DynamicMeta>? {
return when {
obj == null -> ValueItem(Null)
isArray(obj) && (obj as Array<Any?>).all { isPrimitive(it) } -> ValueItem(Value.of(obj as Array<Any?>))
obj == null -> MetaItemValue(Null)
isArray(obj) && (obj as Array<Any?>).all { isPrimitive(it) } -> MetaItemValue(Value.of(obj as Array<Any?>))
else -> when (jsTypeOf(obj)) {
"boolean" -> ValueItem(Value.of(obj as Boolean))
"number" -> ValueItem(Value.of(obj as Number))
"string" -> ValueItem(Value.of(obj as String))
"object" -> NodeItem(DynamicMeta(obj))
"boolean" -> MetaItemValue(Value.of(obj as Boolean))
"number" -> MetaItemValue(Value.of(obj as Number))
"string" -> MetaItemValue(Value.of(obj as String))
"object" -> MetaItemNode(DynamicMeta(obj))
else -> null
}
}
@ -68,7 +68,7 @@ public class DynamicMeta(internal val obj: dynamic) : MetaBase() {
if (isArray(value)) {
val array = value as Array<Any?>
return@flatMap if (array.all { isPrimitive(it) }) {
listOf(NameToken(key) to ValueItem(Value.of(array)))
listOf(NameToken(key) to MetaItemValue(Value.of(array)))
} else {
array.mapIndexedNotNull { index, it ->
val item = asItem(it) ?: return@mapIndexedNotNull null

View File

@ -0,0 +1,96 @@
package hep.dataforge.workspace
import hep.dataforge.data.*
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.plus
import hep.dataforge.names.removeHeadOrNull
import hep.dataforge.names.toName
import kotlinx.coroutines.flow.*
import kotlin.reflect.KClass
public interface DataPlacement: MetaRepr {
/**
* Select a placement for a data with given [name] and [meta]. The result is null if data should be ignored.
*/
public fun place(name: Name, meta: Meta, dataType: KClass<*>): Name?
public companion object {
public val ALL: DataPlacement = object : DataPlacement {
override fun place(name: Name, meta: Meta, dataType: KClass<*>): Name = name
override fun toMeta(): Meta = Meta{"from" put "*"}
}
}
}
public fun DataPlacement.place(datum: NamedData<*>): Name? = place(datum.name, datum.meta, datum.type)
private class ArrangedDataSet<T : Any>(
private val source: DataSet<T>,
private val placement: DataPlacement,
) : DataSet<T> {
override val dataType: KClass<out T> get() = source.dataType
override fun flow(): Flow<NamedData<T>> = source.flow().mapNotNull {
val newName = placement.place(it) ?: return@mapNotNull null
it.data.named(newName)
}
override suspend fun getData(name: Name): Data<T>? = flow().filter { it.name == name }.firstOrNull()
override val updates: Flow<Name> = source.updates.flatMapConcat {
flowChildren(it).mapNotNull(placement::place)
}
}
public class DataPlacementScheme : Scheme(), DataPlacement {
/**
* A source node for the filter
*/
public var from: String? by string()
/**
* A target placement for the filtered node
*/
public var to: String? by string()
/**
* A regular expression pattern for the filter
*/
public var pattern: String? by string()
// val prefix by string()
// val suffix by string()
override fun place(name: Name, meta: Meta, dataType: KClass<*>): Name? {
val fromName = from?.toName() ?: Name.EMPTY
val nameReminder = name.removeHeadOrNull(fromName) ?: return null
val regex = pattern?.toRegex()
return if (regex == null || nameReminder.toString().matches(regex)) {
(to?.toName() ?: Name.EMPTY) + nameReminder
} else {
null
}
}
public companion object : SchemeSpec<DataPlacementScheme>(::DataPlacementScheme)
}
/**
* Apply data node rearrangement
*/
public fun <T : Any> DataSet<T>.rearrange(placement: DataPlacement): DataSet<T> = ArrangedDataSet(this, placement)
///**
// * Mask data using [DataPlacementScheme] specification
// */
//public fun <T : Any> DataSet<T>.rearrange(placement: Meta): DataSet<T> =
// rearrange(DataPlacementScheme.read(placement))
/**
* Mask data using [DataPlacementScheme] builder
*/
public fun <T : Any> DataSet<T>.rearrange(placementBuilder: DataPlacementScheme.() -> Unit): DataSet<T> =
rearrange(DataPlacementScheme(placementBuilder))

View File

@ -1,56 +1,29 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataFilter
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataTree
import hep.dataforge.data.filter
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.builder
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.isEmpty
import hep.dataforge.names.plus
/**
* A dependency of the task which allows to lazily create a data tree for single dependency
*/
public sealed class Dependency : MetaRepr {
public abstract fun apply(workspace: Workspace): DataNode<Any>
public abstract suspend fun apply(workspace: Workspace): DataSet<Any>
}
public class DataDependency(private val filter: DataFilter, private val placement: Name = Name.EMPTY) : Dependency() {
override fun apply(workspace: Workspace): DataNode<Any> {
val result = workspace.data.filter(filter)
return if (placement.isEmpty()) {
result
} else {
DataTree(Any::class) { this[placement] = result }
}
}
public class DataDependency(private val placement: DataPlacement = DataPlacement.ALL) : Dependency() {
override suspend fun apply(workspace: Workspace): DataSet<Any> = workspace.data.rearrange(placement)
override fun toMeta(): Meta = Meta {
"data" put filter.toMeta()
"to" put placement.toString()
}
}
public class AllDataDependency(private val placement: Name = Name.EMPTY) : Dependency() {
override fun apply(workspace: Workspace): DataNode<Any> = if (placement.isEmpty()) {
workspace.data
} else {
DataTree(Any::class) { this[placement] = workspace.data }
}
override fun toMeta(): MetaBuilder = Meta {
"data" put "@all"
"to" put placement.toString()
}
override fun toMeta(): Meta = placement.toMeta()
}
public abstract class TaskDependency<out T : Any>(
public val meta: Meta,
public val placement: Name = Name.EMPTY
protected val placement: DataPlacement,
) : Dependency() {
public abstract fun resolveTask(workspace: Workspace): Task<T>
@ -59,43 +32,42 @@ public abstract class TaskDependency<out T : Any>(
*/
public abstract val name: Name
override fun apply(workspace: Workspace): DataNode<T> {
override suspend fun apply(workspace: Workspace): DataSet<T> {
val task = resolveTask(workspace)
if (task.isTerminal) TODO("Support terminal task")
val result = workspace.run(task, meta)
return if (placement.isEmpty()) {
result
} else {
DataTree(task.type) { this[placement] = result }
}
}
override fun toMeta(): Meta = Meta {
"task" put name.toString()
"meta" put meta
"to" put placement.toString()
return result.rearrange(placement)
}
}
public class DirectTaskDependency<T : Any>(
public class ExternalTaskDependency<T : Any>(
public val task: Task<T>,
meta: Meta,
placement: Name
placement: DataPlacement,
) : TaskDependency<T>(meta, placement) {
override fun resolveTask(workspace: Workspace): Task<T> = task
override val name: Name get() = DIRECT_TASK_NAME + task.name
override val name: Name get() = EXTERNAL_TASK_NAME + task.name
override fun toMeta(): Meta = placement.toMeta().builder().apply {
"name" put name.toString()
"task" put task.toString()
"meta" put meta
}
public companion object {
public val DIRECT_TASK_NAME: Name = "@direct".asName()
public val EXTERNAL_TASK_NAME: Name = "@external".asName()
}
}
public class WorkspaceTaskDependency(
override val name: Name,
meta: Meta,
placement: Name
placement: DataPlacement,
) : TaskDependency<Any>(meta, placement) {
override fun resolveTask(workspace: Workspace): Task<*> =
workspace.tasks[name] ?: error("Task with name $name is not found in the workspace")
override fun resolveTask(workspace: Workspace): Task<*> = workspace.tasks[name]
?: error("Task with name $name is not found in the workspace")
override fun toMeta(): Meta {
TODO("Not yet implemented")
}
}

View File

@ -1,7 +1,7 @@
package hep.dataforge.workspace
import hep.dataforge.context.logger
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.meta.get
@ -17,10 +17,10 @@ public class GenericTask<R : Any>(
override val type: KClass<out R>,
override val descriptor: NodeDescriptor,
private val modelTransform: TaskModelBuilder.(Meta) -> Unit,
private val dataTransform: Workspace.() -> TaskModel.(DataNode<Any>) -> DataNode<R>
private val dataTransform: Workspace.() -> suspend TaskModel.(DataSet<Any>) -> DataSet<R>
) : Task<R> {
override fun run(workspace: Workspace, model: TaskModel): DataNode<R> {
override suspend fun run(workspace: Workspace, model: TaskModel): DataSet<R> {
//validate model
validate(model)
@ -42,11 +42,11 @@ public class GenericTask<R : Any>(
* task. By default model uses the meta node with the same node as the name of the task.
*
* @param workspace
* @param taskConfig
* @param taskMeta
* @return
*/
override fun build(workspace: Workspace, taskConfig: Meta): TaskModel {
val taskMeta = taskConfig[name]?.node ?: taskConfig
override fun build(workspace: Workspace, taskMeta: Meta): TaskModel {
val taskMeta = taskMeta[name]?.node ?: taskMeta
val builder = TaskModelBuilder(name, taskMeta)
builder.modelTransform(taskMeta)
return builder.build()

View File

@ -3,7 +3,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.gather
import hep.dataforge.context.toMap
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataTree
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
@ -13,7 +13,7 @@ import hep.dataforge.names.Name
*/
public class SimpleWorkspace(
override val context: Context,
override val data: DataNode<Any>,
override val data: DataTree<Any>,
override val targets: Map<String, Meta>,
tasks: Collection<Task<Any>>
) : Workspace {

View File

@ -1,19 +1,15 @@
package hep.dataforge.workspace
import hep.dataforge.context.Named
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.Described
import hep.dataforge.type.Type
import hep.dataforge.misc.Named
import hep.dataforge.misc.Type
import hep.dataforge.workspace.Task.Companion.TYPE
import kotlin.reflect.KClass
@Type(TYPE)
public interface Task<out R : Any> : Named, Described {
/**
* Terminal task is the one that could not build model lazily
*/
public val isTerminal: Boolean get() = false
/**
* The explicit type of the node returned by the task
@ -21,13 +17,13 @@ public interface Task<out R : Any> : Named, Described {
public val type: KClass<out R>
/**
* Build a model for this task
* Build a model for this task. Does not run any computations unless task [isEager]
*
* @param workspace
* @param taskConfig
* @param taskMeta
* @return
*/
public fun build(workspace: Workspace, taskConfig: Meta): TaskModel
public fun build(workspace: Workspace, taskMeta: Meta): TaskModel
/**
* Check if the model is valid and is acceptable by the task. Throw exception if not.
@ -46,7 +42,7 @@ public interface Task<out R : Any> : Named, Described {
* @param model - a model to be executed
* @return
*/
public fun run(workspace: Workspace, model: TaskModel): DataNode<R>
public suspend fun run(workspace: Workspace, model: TaskModel): DataSet<R>
public companion object {
public const val TYPE: String = "task"

View File

@ -5,15 +5,16 @@
*/
package hep.dataforge.workspace
import hep.dataforge.data.DataFilter
import hep.dataforge.data.DataTree
import hep.dataforge.data.DataTreeBuilder
import hep.dataforge.data.dynamic
import hep.dataforge.data.update
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.toName
import hep.dataforge.workspace.TaskModel.Companion.MODEL_TARGET_KEY
//FIXME TaskModel should store individual propagation of all data elements, not just nodes
/**
* A model for task execution
@ -24,7 +25,7 @@ import hep.dataforge.workspace.TaskModel.Companion.MODEL_TARGET_KEY
public data class TaskModel(
val name: Name,
val meta: Meta,
val dependencies: Collection<Dependency>
val dependencies: Collection<Dependency>,
) : MetaRepr {
//TODO provide a way to get task descriptor
//TODO add pre-run check of task result type?
@ -35,12 +36,10 @@ public data class TaskModel(
"dependsOn" put {
val dataDependencies = dependencies.filterIsInstance<DataDependency>()
val taskDependencies = dependencies.filterIsInstance<TaskDependency<*>>()
setIndexed("data".toName(), dataDependencies.map { it.toMeta() })
setIndexed("data".toName(), dataDependencies.map { it.toMeta() }) //Should list all data here
setIndexed(
"task".toName(),
taskDependencies.map { it.toMeta() }) { _, index ->
taskDependencies[index].name.toString()
}
taskDependencies.map { it.toMeta() }) { _, index -> taskDependencies[index].name.toString() }
//TODO ensure all dependencies are listed
}
}
@ -53,12 +52,10 @@ public data class TaskModel(
/**
* Build input for the task
*/
public fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> {
return DataTreeBuilder(Any::class).apply {
dependencies.forEach { dep ->
update(dep.apply(workspace))
}
}.build()
public suspend fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> = DataTree.dynamic(workspace.context) {
dependencies.forEach { dep ->
update(dep.apply(workspace))
}
}
public interface TaskDependencyContainer {
@ -71,59 +68,53 @@ public interface TaskDependencyContainer {
*/
public fun TaskDependencyContainer.dependsOn(
name: Name,
placement: Name = Name.EMPTY,
meta: Meta = defaultMeta
): WorkspaceTaskDependency =
WorkspaceTaskDependency(name, meta, placement).also { add(it) }
placement: DataPlacement = DataPlacement.ALL,
meta: Meta = defaultMeta,
): WorkspaceTaskDependency = WorkspaceTaskDependency(name, meta, placement).also { add(it) }
public fun TaskDependencyContainer.dependsOn(
name: String,
placement: Name = Name.EMPTY,
meta: Meta = defaultMeta
): WorkspaceTaskDependency =
dependsOn(name.toName(), placement, meta)
placement: DataPlacement = DataPlacement.ALL,
meta: Meta = defaultMeta,
): WorkspaceTaskDependency = dependsOn(name.toName(), placement, meta)
public fun <T : Any> TaskDependencyContainer.dependsOn(
task: Task<T>,
placement: Name = Name.EMPTY,
meta: Meta = defaultMeta
): DirectTaskDependency<T> =
DirectTaskDependency(task, meta, placement).also { add(it) }
placement: DataPlacement = DataPlacement.ALL,
meta: Meta = defaultMeta,
): ExternalTaskDependency<T> = ExternalTaskDependency(task, meta, placement).also { add(it) }
public fun <T : Any> TaskDependencyContainer.dependsOn(
task: Task<T>,
placement: String,
meta: Meta = defaultMeta
): DirectTaskDependency<T> =
DirectTaskDependency(task, meta, placement.toName()).also { add(it) }
public fun <T : Any> TaskDependencyContainer.dependsOn(
task: Task<T>,
placement: Name = Name.EMPTY,
metaBuilder: MetaBuilder.() -> Unit
): DirectTaskDependency<T> =
dependsOn(task, placement, Meta(metaBuilder))
placement: DataPlacement = DataPlacement.ALL,
metaBuilder: MetaBuilder.() -> Unit,
): ExternalTaskDependency<T> = dependsOn(task, placement, Meta(metaBuilder))
/**
* Add custom data dependency
*/
public fun TaskDependencyContainer.data(action: DataFilter.() -> Unit): DataDependency =
DataDependency(DataFilter(action)).also { add(it) }
public fun TaskDependencyContainer.data(action: DataPlacementScheme.() -> Unit): DataDependency =
DataDependency(DataPlacementScheme(action)).also { add(it) }
/**
* User-friendly way to add data dependency
*/
public fun TaskDependencyContainer.data(pattern: String? = null, from: String? = null, to: String? = null): DataDependency =
public fun TaskDependencyContainer.data(
pattern: String? = null,
from: String? = null,
to: String? = null,
): DataDependency =
data {
pattern?.let { this.pattern = it }
from?.let { this.from = it }
to?.let { this.to = it }
}
/**
* Add all data as root node
*/
public fun TaskDependencyContainer.allData(to: Name = Name.EMPTY): AllDataDependency = AllDataDependency(to).also { add(it) }
///**
// * Add all data as root node
// */
//public fun TaskDependencyContainer.allData(to: Name = Name.EMPTY): AllDataDependency = AllDataDependency(to).also { add(it) }
/**
* A builder for [TaskModel]

View File

@ -1,15 +1,13 @@
package hep.dataforge.workspace
import hep.dataforge.context.ContextAware
import hep.dataforge.data.Data
import hep.dataforge.data.DataNode
import hep.dataforge.data.dataSequence
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
import hep.dataforge.type.Type
@Type(Workspace.TYPE)
@ -17,7 +15,7 @@ public interface Workspace : ContextAware, Provider {
/**
* The whole data node for current workspace
*/
public val data: DataNode<Any>
public val data: DataSet<Any>
/**
* All targets associated with the workspace
@ -33,8 +31,7 @@ public interface Workspace : ContextAware, Provider {
return when (target) {
"target", Meta.TYPE -> targets.mapKeys { it.key.toName() }
Task.TYPE -> tasks
Data.TYPE -> data.dataSequence().toMap()
//DataNode.TYPE -> data.nodes.toMap()
//Data.TYPE -> data.flow().toMap()
else -> emptyMap()
}
}
@ -42,7 +39,7 @@ public interface Workspace : ContextAware, Provider {
/**
* Invoke a task in the workspace utilizing caching if possible
*/
public fun <R : Any> run(task: Task<R>, config: Meta): DataNode<R> {
public suspend fun <R : Any> run(task: Task<R>, config: Meta): DataSet<R> {
val model = task.build(this, config)
task.validate(model)
return task.run(this, model)
@ -53,20 +50,20 @@ public interface Workspace : ContextAware, Provider {
}
}
public fun Workspace.run(task: Task<*>, target: String): DataNode<Any> {
public suspend fun Workspace.run(task: Task<*>, target: String): DataSet<Any> {
val meta = targets[target] ?: error("A target with name $target not found in $this")
return run(task, meta)
}
public fun Workspace.run(task: String, target: String): DataNode<Any> =
public suspend fun Workspace.run(task: String, target: String): DataSet<Any> =
tasks[task.toName()]?.let { run(it, target) } ?: error("Task with name $task not found")
public fun Workspace.run(task: String, meta: Meta): DataNode<Any> =
public suspend fun Workspace.run(task: String, meta: Meta): DataSet<Any> =
tasks[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found")
public fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}): DataNode<Any> =
public suspend fun Workspace.run(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> =
run(task, Meta(block))
public fun <T : Any> Workspace.run(task: Task<T>, metaBuilder: MetaBuilder.() -> Unit = {}): DataNode<T> =
public suspend fun <T : Any> Workspace.run(task: Task<T>, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet<T> =
run(task, Meta(metaBuilder))

View File

@ -8,8 +8,12 @@ import kotlin.reflect.KClass
/**
* Convert an [Envelope] to a data via given format. The actual parsing is done lazily.
*/
public fun <T : Any> Envelope.toData(type: KClass<out T>, format: IOFormat<T>): Data<T> = Data(type, meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
public fun <T : Any> Envelope.toData(format: IOFormat<T>): Data<T> {
@Suppress("UNCHECKED_CAST")
val kclass: KClass<T> = format.type.classifier as? KClass<T> ?: error("IOFormat type is not a class")
return Data(kclass, meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}
}
public suspend fun <T : Any> Data<T>.toEnvelope(format: IOFormat<T>): Envelope {

View File

@ -3,41 +3,23 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.logger
import hep.dataforge.data.*
import hep.dataforge.meta.DFBuilder
import hep.dataforge.meta.Meta
import hep.dataforge.meta.*
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.meta.get
import hep.dataforge.meta.string
import hep.dataforge.names.Name
import hep.dataforge.names.isEmpty
import hep.dataforge.names.toName
import kotlin.reflect.KClass
private typealias DataTransformation<R> = suspend (context: Context, model: TaskModel, data: DataSet<Any>) -> DataSet<R>
@DFBuilder
public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass<out R>) {
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { allData() }
@DFExperimental
public class TaskBuilder<R : Any>(private val name: Name, public val type: KClass<out R>) {
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = {
data()
}
// private val additionalDependencies = HashSet<Dependency>()
private var descriptor: NodeDescriptor? = null
private val dataTransforms: MutableList<DataTransformation> = ArrayList()
/**
* TODO will look better as extension class
*/
private inner class DataTransformation(
val from: String = "",
val to: String = "",
val transform: (Context, TaskModel, DataNode<Any>) -> DataNode<R>,
) {
operator fun invoke(workspace: Workspace, model: TaskModel, node: DataNode<Any>): DataNode<R>? {
val localData = if (from.isEmpty()) {
node
} else {
node[from].node ?: return null
}
return transform(workspace.context, model, localData)
}
}
private val dataTransforms: MutableList<DataTransformation<R>> = ArrayList()
// override fun add(dependency: Dependency) {
// additionalDependencies.add(dependency)
@ -47,75 +29,74 @@ public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass
this.modelTransform = modelTransform
}
/**
* Add a transformation on untyped data
*/
@JvmName("rawTransform")
public fun transform(
from: String = "",
to: String = "",
block: TaskEnv.(DataNode<*>) -> DataNode<R>,
) {
dataTransforms += DataTransformation(from, to) { context, model, data ->
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
env.block(data)
}
}
public fun <T : Any> transform(
inputType: KClass<out T>,
from: String = "",
to: String = "",
block: TaskEnv.(DataNode<T>) -> DataNode<R>,
) {
dataTransforms += DataTransformation(from, to) { context, model, data ->
data.ensureType(inputType)
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
env.block(data.cast(inputType))
}
}
public inline fun <reified T : Any> transform(
from: String = "",
to: String = "",
noinline block: TaskEnv.(DataNode<T>) -> DataNode<R>,
) {
transform(T::class, from, to, block)
}
/**
* Perform given action on data elements in `from` node in input and put the result to `to` node
*/
public inline fun <reified T : Any> action(
from: String = "",
to: String = "",
crossinline block: TaskEnv.() -> Action<T, R>,
) {
transform(from, to) { data: DataNode<T> ->
block().invoke(data, meta)
}
}
public class TaskEnv(
public val name: Name,
public val meta: Meta,
public val context: Context,
public val data: DataNode<Any>,
public val data: DataSet<Any>,
) {
public operator fun <T : Any> DirectTaskDependency<T>.invoke(): DataNode<T> = if (placement.isEmpty()) {
data.cast(task.type)
} else {
data[placement].node?.cast(task.type)
?: error("Could not find results of direct task dependency $this at \"$placement\"")
// public operator fun <T : Any> DirectTaskDependency<T>.invoke(): DataSet<T> = if (placement.isEmpty()) {
// data.cast(task.type)
// } else {
// data[placement].tree?.cast(task.type)
// ?: error("Could not find results of direct task dependency $this at \"$placement\"")
// }
}
/**
* Add a transformation on untyped data
* @param from the prefix for root node in data
* @param to the prefix for the target node.
*/
@JvmName("rawTransform")
public fun transform(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
block: TaskEnv.(DataSet<*>) -> DataSet<R>,
) {
dataTransforms += { context, model, data ->
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
val startData = data.branch(from)
env.block(startData).withNamePrefix(to)
}
}
public fun <T : Any> transform(
inputType: KClass<out T>,
block: suspend TaskEnv.(DataSet<T>) -> DataSet<R>,
) {
dataTransforms += { context, model, data ->
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
env.block(data.filterIsInstance(inputType))
}
}
public inline fun <reified T : Any> transform(
noinline block: suspend TaskEnv.(DataSet<T>) -> DataSet<R>,
): Unit = transform(T::class, block)
/**
* Perform given action on data elements in `from` node in input and put the result to `to` node
*/
public inline fun <reified T : Any> action(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: TaskEnv.() -> Action<T, R>,
) {
transform { data: DataSet<T> ->
block().run(data, meta, context)
}
}
/**
* A customized map action with ability to change meta and name
*/
public inline fun <reified T : Any> mapAction(
from: String = "",
to: String = "",
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: MapActionBuilder<T, R>.(TaskEnv) -> Unit,
) {
action(from, to) {
@ -130,12 +111,12 @@ public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass
* A simple map action without changing meta or name
*/
public inline fun <reified T : Any> map(
from: String = "",
to: String = "",
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: suspend TaskEnv.(T) -> R,
) {
action(from, to) {
MapAction<T,R>(type) {
MapAction<T, R>(type) {
//TODO automatically append task meta
result = { data ->
block(data)
@ -148,13 +129,13 @@ public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass
* Join elements in gathered data by multiple groups
*/
public inline fun <reified T : Any> reduceByGroup(
from: String = "",
to: String = "",
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: ReduceGroupBuilder<T, R>.(TaskEnv) -> Unit, //TODO needs KEEP-176
) {
action(from, to) {
val env = this
ReduceAction<T, R>(type) { block(env) }
ReduceAction(inputType = T::class, outputType = type) { block(env) }
}
}
@ -162,12 +143,12 @@ public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass
* Join all elemlents in gathered data matching input type
*/
public inline fun <reified T : Any> reduce(
from: String = "",
to: String = "",
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: suspend TaskEnv.(Map<Name, T>) -> R,
) {
action(from, to) {
ReduceAction<T, R>(type) {
ReduceAction(inputType = T::class, outputType = type) {
result(
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
) { data ->
@ -181,8 +162,8 @@ public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass
* Split each element in gathered data into fixed number of fragments
*/
public inline fun <reified T : Any> split(
from: String = "",
to: String = "",
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: SplitBuilder<T, R>.(TaskEnv) -> Unit, //TODO needs KEEP-176
) {
action(from, to) {
@ -206,26 +187,19 @@ public class TaskBuilder<R : Any>(public val name: Name, public val type: KClass
modelTransform
) {
val workspace = this
return@GenericTask { data ->
{ dataSet ->
val model = this
if (dataTransforms.isEmpty()) {
//return data node as is
logger.warn { "No transformation present, returning input data" }
data.ensureType(type)
data.cast(type)
dataSet.castOrNull(type) ?: error("$type expected, but $type received")
} else {
val builder = DataTreeBuilder(type)
val builder = MutableDataTree(type, workspace.context)
dataTransforms.forEach { transformation ->
val res = transformation(workspace, model, data)
if (res != null) {
if (transformation.to.isEmpty()) {
builder.update(res)
} else {
builder[transformation.to.toName()] = res
}
}
val res = transformation(workspace.context, model, dataSet)
builder.update(res)
}
builder.build()
builder
}
}
}

View File

@ -3,11 +3,10 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextBuilder
import hep.dataforge.context.Global
import hep.dataforge.data.DataNode
import hep.dataforge.data.DataTreeBuilder
import hep.dataforge.data.DataTree
import hep.dataforge.data.MutableDataTree
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.isEmpty
import hep.dataforge.names.toName
import kotlin.reflect.KClass
@ -15,7 +14,7 @@ import kotlin.reflect.KClass
public interface WorkspaceBuilder {
public val parentContext: Context
public var context: Context
public var data: DataTreeBuilder<Any>
public var data: MutableDataTree<Any>
public var tasks: MutableSet<Task<Any>>
public var targets: MutableMap<String, Meta>
@ -31,23 +30,17 @@ public fun WorkspaceBuilder.context(name: String = "WORKSPACE", block: ContextBu
public inline fun <reified T : Any> WorkspaceBuilder.data(
name: Name = Name.EMPTY,
noinline block: DataTreeBuilder<T>.() -> Unit
): DataNode<T> {
val node = DataTreeBuilder(T::class).apply(block)
if (name.isEmpty()) {
@Suppress("UNCHECKED_CAST")
data = node as DataTreeBuilder<Any>
} else {
data[name] = node
}
return node.build()
noinline block: MutableDataTree<T>.() -> Unit,
): DataTree<T> {
TODO()
//data.branch(name).apply(block)
}
@JvmName("rawData")
public fun WorkspaceBuilder.data(
name: Name = Name.EMPTY,
block: DataTreeBuilder<Any>.() -> Unit
): DataNode<Any> = data<Any>(name, block)
block: MutableDataTree<Any>.() -> Unit,
): DataTree<Any> = data<Any>(name, block)
public fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit) {
@ -68,18 +61,18 @@ public fun WorkspaceBuilder.target(name: String, base: String, block: MetaBuilde
public fun <T : Any> WorkspaceBuilder.task(
name: String,
type: KClass<out T>,
builder: TaskBuilder<T>.() -> Unit
builder: TaskBuilder<T>.() -> Unit,
): Task<T> = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) }
public inline fun <reified T : Any> WorkspaceBuilder.task(
name: String,
noinline builder: TaskBuilder<T>.() -> Unit
noinline builder: TaskBuilder<T>.() -> Unit,
): Task<T> = task(name, T::class, builder)
@JvmName("rawTask")
public fun WorkspaceBuilder.task(
name: String,
builder: TaskBuilder<Any>.() -> Unit
builder: TaskBuilder<Any>.() -> Unit,
): Task<Any> = task(name, Any::class, builder)
/**
@ -87,12 +80,12 @@ public fun WorkspaceBuilder.task(
*/
public class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder {
override var context: Context = parentContext
override var data: DataTreeBuilder<Any> = DataTreeBuilder(Any::class)
override var data: MutableDataTree<Any> = MutableDataTree(Any::class,context)
override var tasks: MutableSet<Task<Any>> = HashSet()
override var targets: MutableMap<String, Meta> = HashMap()
override fun build(): SimpleWorkspace {
return SimpleWorkspace(context, data.build(), targets, tasks)
return SimpleWorkspace(context, data, targets, tasks)
}
}

View File

@ -5,6 +5,7 @@ import hep.dataforge.data.*
import hep.dataforge.io.*
import hep.dataforge.meta.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.io.asOutput
import java.nio.file.FileSystem
@ -14,9 +15,15 @@ import java.nio.file.StandardOpenOption
import java.nio.file.spi.FileSystemProvider
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.streams.toList
public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
//public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
public interface FileFormatResolver<T: Any>{
public val type: KType
public operator fun invoke (path: Path, meta: Meta): IOFormat<T>
}
private fun newZFS(path: Path): FileSystem {
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
@ -36,25 +43,23 @@ private fun newZFS(path: Path): FileSystem {
@DFExperimental
public fun <T : Any> IOPlugin.readDataFile(
path: Path,
type: KClass<out T>,
formatResolver: FileFormatResolver<T>,
): Data<T> {
val envelope = readEnvelopeFile(path, true) ?: error("Can't read data from $path")
val format = formatResolver(path, envelope.meta)
return envelope.toData(type, format)
return envelope.toData(format)
}
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataFile(path: Path): Data<T> =
readDataFile(path, T::class) { _, _ ->
resolveIOFormat<T>() ?: error("Can't resolve IO format for ${T::class}")
}
public inline fun <reified T : Any> IOPlugin.readDataFile(path: Path): Data<T> = readDataFile(path) { _, _ ->
resolveIOFormat<T>() ?: error("Can't resolve IO format for ${T::class}")
}
/**
* Add file/directory-based data tree item
*/
@DFExperimental
public fun <T : Any> DataTreeBuilder<T>.file(
public suspend fun <T : Any> DataSetBuilder<T>.file(
plugin: IOPlugin,
path: Path,
formatResolver: FileFormatResolver<T>,
@ -62,18 +67,18 @@ public fun <T : Any> DataTreeBuilder<T>.file(
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
plugin.run {
val data = readDataFile(path, type, formatResolver)
val data = readDataFile(path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
datum(name, data)
set(name, data)
}
} else {
//otherwise, read as directory
plugin.run {
val data = readDataDirectory(path, type, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
val data = readDataDirectory(path, formatResolver)
val name = data.getMeta()[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
node(name, data)
set(name, data)
}
}
}
@ -84,31 +89,32 @@ public fun <T : Any> DataTreeBuilder<T>.file(
@DFExperimental
public fun <T : Any> IOPlugin.readDataDirectory(
path: Path,
type: KClass<out T>,
formatResolver: FileFormatResolver<T>,
): DataNode<T> {
): DataTree<T> {
//read zipped data node
if (path.fileName != null && path.fileName.toString().endsWith(".zip")) {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fs = newZFS(path)
return readDataDirectory(fs.rootDirectories.first(), type, formatResolver)
return readDataDirectory(fs.rootDirectories.first(), formatResolver)
}
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataTree(type) {
Files.list(path).forEach { path ->
return DataTree.static(formatResolver.type) {
Files.list(path).toList().forEach { path ->
val fileName = path.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(path))
} else if (!fileName.startsWith("@")) {
file(this@readDataDirectory, path, formatResolver)
runBlocking {
file(this@readDataDirectory, path, formatResolver)
}
}
}
}
}
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(path: Path): DataNode<T> =
readDataDirectory(path, T::class) { _, _ ->
public inline fun <reified T : Any> IOPlugin.readDataDirectory(path: Path): DataTree<T> =
readDataDirectory(path) { _, _ ->
resolveIOFormat<T>() ?: error("Can't resolve IO format for ${T::class}")
}
@ -118,7 +124,7 @@ public inline fun <reified T : Any> IOPlugin.readDataDirectory(path: Path): Data
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeDataDirectory(
path: Path,
node: DataNode<T>,
tree: DataTree<T>,
format: IOFormat<T>,
envelopeFormat: EnvelopeFormat? = null,
metaFormat: MetaFormatFactory? = null,
@ -129,13 +135,13 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
node.items.forEach { (token, item) ->
tree.items().forEach { (token, item) ->
val childPath = path.resolve(token.toString())
when (item) {
is DataItem.Node -> {
writeDataDirectory(childPath, item.node, format, envelopeFormat)
writeDataDirectory(childPath, item.tree, format, envelopeFormat)
}
is DataItem.Leaf -> {
is DataTreeItem.Leaf -> {
val envelope = item.data.toEnvelope(format)
if (envelopeFormat != null) {
writeEnvelopeFile(childPath, envelope, envelopeFormat, metaFormat)
@ -145,8 +151,9 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
}
}
}
if (!node.meta.isEmpty()) {
writeMetaFile(path, node.meta, metaFormat ?: JsonMetaFormat)
val treeMeta = tree.getMeta()
if (treeMeta != null) {
writeMetaFile(path, treeMeta, metaFormat ?: JsonMetaFormat)
}
}
}
@ -154,26 +161,26 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
item: DataItem<T>,
treeItem: DataTreeItem<T>,
dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
) {
withContext(Dispatchers.IO) {
when (item) {
is DataItem.Leaf -> {
when (treeItem) {
is DataTreeItem.Leaf -> {
//TODO add directory-based envelope writer
val envelope = item.data.toEnvelope(dataFormat)
val envelope = treeItem.data.toEnvelope(dataFormat)
val entry = ZipEntry(name)
putNextEntry(entry)
envelopeFormat.run {
writeObject(asOutput(), envelope)
}
}
is DataItem.Node -> {
is DataTreeItem.Node -> {
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
item.node.items.forEach { (token, item) ->
treeItem.tree.items().forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}
@ -185,7 +192,7 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
@DFExperimental
suspend fun <T : Any> IOPlugin.writeZip(
path: Path,
node: DataNode<T>,
tree: DataTree<T>,
format: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
) {
@ -201,7 +208,7 @@ suspend fun <T : Any> IOPlugin.writeZip(
StandardOpenOption.TRUNCATE_EXISTING)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", DataItem.Node(node), format, envelopeFormat)
it.writeNode("", DataTreeItem.Node(tree), format, envelopeFormat)
}
// if (Files.exists(actualFile) && Files.size(path) == 0.toLong()) {

View File

@ -5,11 +5,15 @@ import hep.dataforge.context.PluginFactory
import hep.dataforge.context.PluginTag
import hep.dataforge.data.*
import hep.dataforge.meta.Meta
import hep.dataforge.names.asName
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import kotlinx.coroutines.runBlocking
import kotlin.reflect.KClass
import kotlin.test.Test
import kotlin.test.assertEquals
public fun <T : Any> DataSet<T>.first(): NamedData<T>? = runBlocking { flow().firstOrNull() }
class DataPropagationTestPlugin : WorkspacePlugin() {
override val tag: PluginTag = Companion.tag
@ -19,9 +23,9 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
allData()
}
transform<Int> { data ->
DataTree {
val result = data.dataSequence().map { it.second.get() }.reduce { acc, pair -> acc + pair }
set("result".asName(), Data { result })
DataTree.dynamic {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
}
}
@ -32,9 +36,9 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data("myData\\[12\\]")
}
transform<Int> { data ->
DataTree {
val result = data.dataSequence().map { it.second.get() }.reduce { acc, pair -> acc + pair }
set("result".asName(), Data { result })
DataTree.dynamic {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
}
}
@ -44,9 +48,9 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
data(pattern = "myData.*")
}
transform<Int> { data ->
DataTree{
val result = data.dataSequence().map { it.second.get() }.reduce { acc, pair -> acc + pair }
set("result".asName(), Data { result })
DataTree.dynamic {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
}
}
@ -69,7 +73,7 @@ class DataPropagationTest {
}
data {
repeat(100) {
static("myData[$it]", it)
data("myData[$it]", it)
}
}
}
@ -77,18 +81,18 @@ class DataPropagationTest {
@Test
fun testAllData() {
val node = testWorkspace.run("Test.allData")
assertEquals(4950, node.first()!!.get())
assertEquals(4950, node.first()!!.value())
}
@Test
fun testAllRegexData() {
val node = testWorkspace.run("Test.allRegexData")
assertEquals(4950, node.first()!!.get())
assertEquals(4950, node.first()!!.value())
}
@Test
fun testSingleData() {
val node = testWorkspace.run("Test.singleData")
assertEquals(12, node.first()!!.get())
assertEquals(12, node.first()!!.value())
}
}

View File

@ -12,24 +12,29 @@ import kotlinx.io.Output
import kotlinx.io.text.readUtf8String
import kotlinx.io.text.writeUtf8String
import java.nio.file.Files
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.Test
import kotlin.test.assertEquals
class FileDataTest {
val dataNode = DataTree<String> {
node("dir") {
static("a", "Some string") {
val dataNode = DataTree.static<String> {
set("dir") {
data("a", "Some string") {
"content" put "Some string"
}
}
static("b", "root data")
data("b", "root data")
meta {
"content" put "This is root meta node"
}
}
object StringIOFormat : IOFormat<String> {
override val type: KType = typeOf<String>()
override fun writeObject(output: Output, obj: String) {
output.writeUtf8String(obj)
}
@ -55,7 +60,7 @@ class FileDataTest {
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir, String::class) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta, reconstructed["dir.a"]?.meta)
assertEquals(dataNode["b"]?.data?.get(), reconstructed["b"]?.data?.get())
assertEquals(dataNode["b"]?.data?.get(), reconstructed["b"]?.data?.value())
}
}
@ -71,7 +76,7 @@ class FileDataTest {
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip, String::class) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta, reconstructed["dir.a"]?.meta)
assertEquals(dataNode["b"]?.data?.get(), reconstructed["b"]?.data?.get())
assertEquals(dataNode["b"]?.data?.get(), reconstructed["b"]?.data?.value())
}
}
}

View File

@ -4,6 +4,8 @@ import hep.dataforge.context.*
import hep.dataforge.data.*
import hep.dataforge.meta.*
import hep.dataforge.names.plus
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
import kotlin.reflect.KClass
import kotlin.test.Test
import kotlin.test.assertEquals
@ -12,13 +14,25 @@ import kotlin.test.assertTrue
/**
* Make a fake-factory for a one single plugin. Useful for unique or test plugins
*/
public inline fun <reified P: Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory<P> {
public inline fun <reified P : Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory<P> {
override fun invoke(meta: Meta, context: Context): P = this@toFactory
override val tag: PluginTag = this@toFactory.tag
override val type: KClass<out P> = P::class
}
public fun DataTree<*>.toMeta(): Meta = Meta {
"type" put (dataType.simpleName ?: "undefined")
"items" put {
runBlocking {
flow().collect {
it.name.toString() put it.data.meta
}
}
}
}
class SimpleWorkspaceTest {
val testPlugin = object : WorkspacePlugin() {
override val tag: PluginTag = PluginTag("test")
@ -39,7 +53,7 @@ class SimpleWorkspaceTest {
data {
repeat(100) {
static("myData[$it]", it)
data("myData[$it]", it)
}
}
@ -47,7 +61,7 @@ class SimpleWorkspaceTest {
model {
data("myData\\[12\\]")
}
map<Int>{
map<Int> {
it
}
}
@ -75,13 +89,13 @@ class SimpleWorkspaceTest {
val linearDep = dependsOn(linear, placement = "linear")
}
transform<Int> { data ->
val squareNode = data["square"].node!!.cast<Int>()//squareDep()
val linearNode = data["linear"].node!!.cast<Int>()//linearDep()
DataTree<Int> {
squareNode.dataSequence().forEach { (name, _) ->
val newData = Data {
val squareValue = squareNode[name].data!!.get()
val linearValue = linearNode[name].data!!.get()
val squareNode = data["square"].tree!!.filterIsInstance<Int>() //squareDep()
val linearNode = data["linear"].tree!!.filterIsInstance<Int>() //linearDep()
DataTree.dynamic<Int> {
squareNode.flow().collect {
val newData: Data<Int> = Data {
val squareValue = squareNode.getData(it.name)!!.value()
val linearValue = linearNode.getData(it.name)!!.value()
squareValue + linearValue
}
set(name, newData)
@ -145,13 +159,13 @@ class SimpleWorkspaceTest {
fun testWorkspace() {
val node = workspace.run("sum")
val res = node.first()
assertEquals(328350, res?.get())
assertEquals(328350, res?.value())
}
@Test
fun testMetaPropagation() {
val node = workspace.run("sum") { "testFlag" put true }
val res = node.first()?.get()
val res = node.first()?.value()
}
@Test
@ -170,6 +184,8 @@ class SimpleWorkspaceTest {
@Test
fun testGather() {
val node = workspace.run("filterOne")
assertEquals(12, node.first()?.get())
runBlocking {
assertEquals(12, node.first()?.value())
}
}
}

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -10,8 +10,8 @@ pluginManagement {
maven("https://dl.bintray.com/mipt-npm/dev")
}
val toolsVersion = "0.7.0"
val kotlinVersion = "1.4.20"
val toolsVersion = "0.7.1"
val kotlinVersion = "1.4.21"
plugins {
id("ru.mipt.npm.project") version toolsVersion