Complete refactoring of goals and DataNodes

This commit is contained in:
Alexander Nozik 2019-08-07 19:28:44 +03:00
parent 44737faa26
commit 1f0a317cd8
16 changed files with 253 additions and 262 deletions

View File

@ -11,7 +11,6 @@ kotlin {
val commonMain by getting{ val commonMain by getting{
dependencies { dependencies {
api(project(":dataforge-meta")) api(project(":dataforge-meta"))
api(kotlin("reflect"))
api("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:$coroutinesVersion") api("org.jetbrains.kotlinx:kotlinx-coroutines-core-common:$coroutinesVersion")
} }
} }
@ -19,6 +18,7 @@ kotlin {
val jvmMain by getting{ val jvmMain by getting{
dependencies { dependencies {
api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") api("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
api(kotlin("reflect"))
} }
} }

View File

@ -0,0 +1,48 @@
package hep.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlin.coroutines.CoroutineContext
/**
* A monitor of goal state that could be accessed only form inside the goal
*/
class CoroutineMonitor : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = CoroutineMonitor
var totalWork: Double = 1.0
var workDone: Double = 0.0
var status: String = ""
/**
* Mark the goal as started
*/
fun start() {
}
/**
* Mark the goal as completed
*/
fun finish() {
workDone = totalWork
}
companion object : CoroutineContext.Key<CoroutineMonitor>
}
class Dependencies(val values: Collection<Job>) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = Dependencies
companion object : CoroutineContext.Key<Dependencies>
}
val CoroutineContext.monitor: CoroutineMonitor? get() = this[CoroutineMonitor]
val CoroutineScope.monitor: CoroutineMonitor? get() = coroutineContext.monitor
val Job.dependencies: Collection<Job> get() = this[Dependencies]?.values ?: emptyList()
val Job.totalWork: Double get() = dependencies.sumByDouble { totalWork } + (monitor?.totalWork ?: 0.0)
val Job.workDone: Double get() = dependencies.sumByDouble { workDone } + (monitor?.workDone ?: 0.0)
val Job.status: String get() = monitor?.status ?: ""
val Job.progress: Double get() = workDone / totalWork

View File

@ -2,7 +2,8 @@ package hep.dataforge.data
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr import hep.dataforge.meta.MetaRepr
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlin.reflect.KClass import kotlin.reflect.KClass
/** /**
@ -21,25 +22,25 @@ interface Data<out T : Any> : MetaRepr {
/** /**
* Lazy data value * Lazy data value
*/ */
val goal: Goal<T> val task: Deferred<T>
override fun toMeta(): Meta = meta override fun toMeta(): Meta = meta
companion object { companion object {
const val TYPE = "data" const val TYPE = "data"
fun <T : Any> of(type: KClass<out T>, goal: Goal<T>, meta: Meta): Data<T> = DataImpl(type, goal, meta) fun <T : Any> of(type: KClass<out T>, goal: Deferred<T>, meta: Meta): Data<T> = DataImpl(type, goal, meta)
inline fun <reified T : Any> of(goal: Goal<T>, meta: Meta): Data<T> = of(T::class, goal, meta) inline fun <reified T : Any> of(goal: Deferred<T>, meta: Meta): Data<T> = of(T::class, goal, meta)
fun <T : Any> of(name: String, type: KClass<out T>, goal: Goal<T>, meta: Meta): Data<T> = fun <T : Any> of(name: String, type: KClass<out T>, goal: Deferred<T>, meta: Meta): Data<T> =
NamedData(name, of(type, goal, meta)) NamedData(name, of(type, goal, meta))
inline fun <reified T : Any> of(name: String, goal: Goal<T>, meta: Meta): Data<T> = inline fun <reified T : Any> of(name: String, goal: Deferred<T>, meta: Meta): Data<T> =
of(name, T::class, goal, meta) of(name, T::class, goal, meta)
fun <T : Any> static(scope: CoroutineScope, value: T, meta: Meta): Data<T> = fun <T : Any> static(value: T, meta: Meta): Data<T> =
DataImpl(value::class, Goal.static(scope, value), meta) DataImpl(value::class, CompletableDeferred(value), meta)
} }
} }
@ -47,21 +48,21 @@ interface Data<out T : Any> : MetaRepr {
* Upcast a [Data] to a supertype * Upcast a [Data] to a supertype
*/ */
inline fun <reified R : Any, reified T : R> Data<T>.cast(): Data<R> { inline fun <reified R : Any, reified T : R> Data<T>.cast(): Data<R> {
return Data.of(R::class, goal, meta) return Data.of(R::class, task, meta)
} }
fun <R : Any, T : R> Data<T>.cast(type: KClass<R>): Data<R> { fun <R : Any, T : R> Data<T>.cast(type: KClass<R>): Data<R> {
return Data.of(type, goal, meta) return Data.of(type, task, meta)
} }
suspend fun <T : Any> Data<T>.await(): T = goal.await() suspend fun <T : Any> Data<T>.await(): T = task.await()
/** /**
* Generic Data implementation * Generic Data implementation
*/ */
private class DataImpl<out T : Any>( private class DataImpl<out T : Any>(
override val type: KClass<out T>, override val type: KClass<out T>,
override val goal: Goal<T>, override val task: Deferred<T>,
override val meta: Meta override val meta: Meta
) : Data<T> ) : Data<T>

View File

@ -20,10 +20,10 @@ class DataFilter(override val config: Config) : Specific {
* Apply meta-based filter to given data node * Apply meta-based filter to given data node
*/ */
fun <T : Any> DataNode<T>.filter(filter: DataFilter): DataNode<T> { fun <T : Any> DataNode<T>.filter(filter: DataFilter): DataNode<T> {
val sourceNode = filter.from?.let { getNode(it.toName()) } ?: this@filter val sourceNode = filter.from?.let { get(it.toName()).node } ?: this@filter
val regex = filter.pattern.toRegex() val regex = filter.pattern.toRegex()
val targetNode = DataTreeBuilder(type).apply { val targetNode = DataTreeBuilder(type).apply {
sourceNode.data().forEach { (name, data) -> sourceNode.dataSequence().forEach { (name, data) ->
if (name.toString().matches(regex)) { if (name.toString().matches(regex)) {
this[name] = data this[name] = data
} }

View File

@ -1,8 +1,26 @@
package hep.dataforge.data package hep.dataforge.data
import hep.dataforge.names.* import hep.dataforge.names.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
import kotlin.reflect.KClass import kotlin.reflect.KClass
sealed class DataItem<out T : Any> {
abstract val type: KClass<out T>
class Node<out T : Any>(val value: DataNode<T>) : DataItem<T>() {
override val type: KClass<out T> get() = value.type
}
class Leaf<out T : Any>(val value: Data<T>) : DataItem<T>() {
override val type: KClass<out T> get() = value.type
}
}
/** /**
* A tree-like data structure grouped into the node. All data inside the node must inherit its type * A tree-like data structure grouped into the node. All data inside the node must inherit its type
*/ */
@ -13,93 +31,89 @@ interface DataNode<out T : Any> {
*/ */
val type: KClass<out T> val type: KClass<out T>
/** val items: Map<NameToken, DataItem<T>>
* Get the specific data if it exists
*/
operator fun get(name: Name): Data<T>?
/**
* Get a subnode with given name if it exists.
*/
fun getNode(name: Name): DataNode<T>?
/**
* Walk the tree upside down and provide all data nodes with full names
*/
fun data(): Sequence<Pair<Name, Data<T>>>
/**
* A sequence of all nodes in the tree walking upside down, excluding self
*/
fun nodes(): Sequence<Pair<Name, DataNode<T>>>
operator fun iterator(): Iterator<Pair<Name, Data<T>>> = data().iterator()
companion object { companion object {
const val TYPE = "dataNode" const val TYPE = "dataNode"
fun <T : Any> build(type: KClass<out T>, block: DataTreeBuilder<T>.() -> Unit) = fun <T : Any> build(type: KClass<out T>, block: DataTreeBuilder<T>.() -> Unit) =
DataTreeBuilder<T>(type).apply(block).build() DataTreeBuilder(type).apply(block).build()
fun <T : Any> builder(type: KClass<out T>) = DataTreeBuilder(type) fun <T : Any> builder(type: KClass<out T>) = DataTreeBuilder(type)
} }
} }
internal sealed class DataTreeItem<out T : Any> { val <T : Any> DataItem<T>?.node: DataNode<T>? get() = (this as? DataItem.Node<T>)?.value
class Node<out T : Any>(val tree: DataTree<T>) : DataTreeItem<T>() val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.value
class Value<out T : Any>(val value: Data<T>) : DataTreeItem<T>()
/**
* Start computation for all goals in data node
*/
fun DataNode<*>.startAll(): Unit = items.values.forEach {
when (it) {
is DataItem.Node<*> -> it.value.startAll()
is DataItem.Leaf<*> -> it.value.task.start()
}
} }
fun DataNode<*>.joinAll(scope: CoroutineScope): Job = scope.launch {
startAll()
items.forEach {
when (val value = it.value) {
is DataItem.Node -> value.value.joinAll(this).join()
is DataItem.Leaf -> value.value.task.await()
}
}
}
operator fun <T : Any> DataNode<T>.get(name: Name): DataItem<T>? = when (name.length) {
0 -> error("Empty name")
1 -> (items[name.first()] as? DataItem.Leaf)
else -> get(name.first()!!.asName()).node?.get(name.cutFirst())
}
/**
* Sequence of all children including nodes
*/
fun <T : Any> DataNode<T>.asSequence(): Sequence<Pair<Name, DataItem<T>>> = sequence {
items.forEach { (head, item) ->
yield(head.asName() to item)
if (item is DataItem.Node) {
val subSequence = item.value.asSequence()
.map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
/**
* Sequence of data entries
*/
fun <T : Any> DataNode<T>.dataSequence(): Sequence<Pair<Name, Data<T>>> = sequence {
items.forEach { (head, item) ->
when (item) {
is DataItem.Leaf -> yield(head.asName() to item.value)
is DataItem.Node -> {
val subSequence = item.value.dataSequence()
.map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
}
operator fun <T : Any> DataNode<T>.iterator(): Iterator<Pair<Name, DataItem<T>>> = asSequence().iterator()
class DataTree<out T : Any> internal constructor( class DataTree<out T : Any> internal constructor(
override val type: KClass<out T>, override val type: KClass<out T>,
private val items: Map<NameToken, DataTreeItem<T>> override val items: Map<NameToken, DataItem<T>>
) : DataNode<T> { ) : DataNode<T> {
//TODO add node-level meta? //TODO add node-level meta?
override fun get(name: Name): Data<T>? = when (name.length) {
0 -> error("Empty name")
1 -> (items[name.first()] as? DataTreeItem.Value)?.value
else -> getNode(name.first()!!.asName())?.get(name.cutFirst())
}
override fun getNode(name: Name): DataTree<T>? = when (name.length) {
0 -> this
1 -> (items[name.first()] as? DataTreeItem.Node)?.tree
else -> getNode(name.first()!!.asName())?.getNode(name.cutFirst())
}
override fun data(): Sequence<Pair<Name, Data<T>>> {
return sequence {
items.forEach { (head, tree) ->
when (tree) {
is DataTreeItem.Value -> yield(head.asName() to tree.value)
is DataTreeItem.Node -> {
val subSequence =
tree.tree.data().map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
}
}
override fun nodes(): Sequence<Pair<Name, DataNode<T>>> {
return sequence {
items.forEach { (head, tree) ->
if (tree is DataTreeItem.Node) {
yield(head.asName() to tree.tree)
val subSequence =
tree.tree.nodes().map { (name, node) -> (head.asName() + name) to node }
yieldAll(subSequence)
}
}
}
}
} }
private sealed class DataTreeBuilderItem<out T : Any> { private sealed class DataTreeBuilderItem<out T : Any> {
class Node<T : Any>(val tree: DataTreeBuilder<T>) : DataTreeBuilderItem<T>() class Node<T : Any>(val tree: DataTreeBuilder<T>) : DataTreeBuilderItem<T>()
class Value<T : Any>(val value: Data<T>) : DataTreeBuilderItem<T>() class Leaf<T : Any>(val value: Data<T>) : DataTreeBuilderItem<T>()
} }
/** /**
@ -115,7 +129,7 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
operator fun set(token: NameToken, data: Data<T>) { operator fun set(token: NameToken, data: Data<T>) {
if (map.containsKey(token)) error("Tree entry with name $token is not empty") if (map.containsKey(token)) error("Tree entry with name $token is not empty")
map[token] = DataTreeBuilderItem.Value(data) map[token] = DataTreeBuilderItem.Leaf(data)
} }
private fun buildNode(token: NameToken): DataTreeBuilder<T> { private fun buildNode(token: NameToken): DataTreeBuilder<T> {
@ -152,6 +166,11 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
operator fun set(name: Name, node: DataNode<T>) = set(name, node.builder()) operator fun set(name: Name, node: DataNode<T>) = set(name, node.builder())
operator fun set(name: Name, item: DataItem<T>) = when (item) {
is DataItem.Node<T> -> set(name, item.value.builder())
is DataItem.Leaf<T> -> set(name, item.value)
}
/** /**
* Append data to node * Append data to node
*/ */
@ -162,14 +181,16 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
*/ */
infix fun String.to(node: DataNode<T>) = set(toName(), node) infix fun String.to(node: DataNode<T>) = set(toName(), node)
infix fun String.to(item: DataItem<T>) = set(toName(), item)
/** /**
* Build and append node * Build and append node
*/ */
infix fun String.to(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder<T>(type).apply(block)) infix fun String.to(block: DataTreeBuilder<T>.() -> Unit) = set(toName(), DataTreeBuilder<T>(type).apply(block))
fun update(node: DataNode<T>){ fun update(node: DataNode<T>) {
node.data().forEach { node.dataSequence().forEach {
//TODO check if the place is occupied //TODO check if the place is occupied
this[it.first] = it.second this[it.first] = it.second
} }
@ -178,8 +199,8 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
fun build(): DataTree<T> { fun build(): DataTree<T> {
val resMap = map.mapValues { (_, value) -> val resMap = map.mapValues { (_, value) ->
when (value) { when (value) {
is DataTreeBuilderItem.Value -> DataTreeItem.Value(value.value) is DataTreeBuilderItem.Leaf -> DataItem.Leaf(value.value)
is DataTreeBuilderItem.Node -> DataTreeItem.Node(value.tree.build()) is DataTreeBuilderItem.Node -> DataItem.Node(value.tree.build())
} }
} }
return DataTree(type, resMap) return DataTree(type, resMap)
@ -190,27 +211,20 @@ class DataTreeBuilder<T : Any>(private val type: KClass<out T>) {
* Generate a mutable builder from this node. Node content is not changed * Generate a mutable builder from this node. Node content is not changed
*/ */
fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder(type).apply { fun <T : Any> DataNode<T>.builder(): DataTreeBuilder<T> = DataTreeBuilder(type).apply {
data().forEach { (name, data) -> this[name] = data } dataSequence().forEach { (name, data) -> this[name] = data }
} }
/**
* Start computation for all goals in data node
*/
fun DataNode<*>.startAll() = data().forEach { (_, data) -> data.goal.start() }
fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataNode.build(type) { fun <T : Any> DataNode<T>.filter(predicate: (Name, Data<T>) -> Boolean): DataNode<T> = DataNode.build(type) {
data().forEach { (name, data) -> dataSequence().forEach { (name, data) ->
if (predicate(name, data)) { if (predicate(name, data)) {
this[name] = data this[name] = data
} }
} }
} }
fun <T: Any> DataNode<T>.first(): Data<T> = data().first().second fun <T : Any> DataNode<T>.first(): Data<T>? = dataSequence().first().second
/** /**
* Check that node is compatible with given type meaning that each element could be cast to the type * Check that node is compatible with given type meaning that each element could be cast to the type
*/ */
expect fun DataNode<*>.checkType(type: KClass<*>) expect fun DataNode<*>.checkType(type: KClass<*>)
//fun <T : Any, R: T> DataNode<T>.filterIsInstance(type: KClass<R>): DataNode<R> = filter{_,data -> type.}

View File

@ -1,131 +0,0 @@
package hep.dataforge.data
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* A special deferred with explicit dependencies and some additional information like progress and unique id
*/
interface Goal<out T> : Deferred<T>, CoroutineScope {
val scope: CoroutineScope
override val coroutineContext get() = scope.coroutineContext
val dependencies: Collection<Goal<*>>
val totalWork: Double get() = dependencies.sumByDouble { totalWork } + (monitor?.totalWork ?: 0.0)
val workDone: Double get() = dependencies.sumByDouble { workDone } + (monitor?.workDone ?: 0.0)
val status: String get() = monitor?.status ?: ""
val progress: Double get() = workDone / totalWork
companion object {
/**
* Create goal wrapping static value. This goal is always completed
*/
fun <T> static(scope: CoroutineScope, value: T): Goal<T> =
StaticGoalImpl(scope, CompletableDeferred(value))
}
}
/**
* A monitor of goal state that could be accessed only form inside the goal
*/
class GoalMonitor : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = GoalMonitor
var totalWork: Double = 1.0
var workDone: Double = 0.0
var status: String = ""
/**
* Mark the goal as started
*/
fun start() {
}
/**
* Mark the goal as completed
*/
fun finish() {
workDone = totalWork
}
companion object : CoroutineContext.Key<GoalMonitor>
}
val CoroutineScope.monitor: GoalMonitor? get() = coroutineContext[GoalMonitor]
private class GoalImpl<T>(
override val scope: CoroutineScope,
override val dependencies: Collection<Goal<*>>,
deferred: Deferred<T>
) : Goal<T>, Deferred<T> by deferred
private class StaticGoalImpl<T>(override val scope: CoroutineScope, deferred: CompletableDeferred<T>) : Goal<T>,
Deferred<T> by deferred {
override val dependencies: Collection<Goal<*>> get() = emptyList()
override val status: String get() = ""
override val totalWork: Double get() = 0.0
override val workDone: Double get() = 0.0
}
/**
* Create a new [Goal] with given [dependencies] and execution [block]. The block takes monitor as parameter.
* The goal block runs in a supervised scope, meaning that when it fails, it won't affect external scope.
*
* **Important:** Unlike regular deferred, the [Goal] is started lazily, so the actual calculation is called only when result is requested.
*/
fun <R> CoroutineScope.goal(
dependencies: Collection<Goal<*>> = emptyList(),
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> R
): Goal<R> {
val deferred = async(context + GoalMonitor(), start = CoroutineStart.LAZY) {
dependencies.forEach { it.start() }
monitor?.start()
//Running in supervisor scope in order to allow manual error handling
return@async supervisorScope {
block().also {
monitor?.finish()
}
}
}
return GoalImpl(this, dependencies, deferred)
}
/**
* Create a one-to-one goal based on existing goal
*/
fun <T, R> Goal<T>.pipe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R
): Goal<R> = goal(listOf(this), context) { block(await()) }
/**
* Create a joining goal.
* @param scope the scope for resulting goal. By default use first goal in list
*/
fun <T, R> Collection<Goal<T>>.join(
scope: CoroutineScope = first(),
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R
): Goal<R> = scope.goal(this, context) {
block(map { it.await() })
}
/**
* A joining goal for a map
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
*/
fun <K, T, R> Map<K, Goal<T>>.join(
scope: CoroutineScope = values.first(),
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R
): Goal<R> = scope.goal(this.values, context) {
block(mapValues { it.value.await() })
}

View File

@ -44,7 +44,7 @@ object GroupBuilder {
override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> { override fun <T : Any> invoke(node: DataNode<T>): Map<String, DataNode<T>> {
val map = HashMap<String, DataTreeBuilder<T>>() val map = HashMap<String, DataTreeBuilder<T>>()
node.data().forEach { (name, data) -> node.dataSequence().forEach { (name, data) ->
val tagValue = data.meta[key]?.string ?: defaultTagValue val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data map.getOrPut(tagValue) { DataNode.builder(node.type) }[name] = data
} }

View File

@ -6,6 +6,7 @@ import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.builder import hep.dataforge.meta.builder
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.toName import hep.dataforge.names.toName
import kotlinx.coroutines.Deferred
import kotlin.coroutines.CoroutineContext import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -89,15 +90,13 @@ class JoinAction<T : Any, R : Any>(
val laminate = Laminate(group.meta, meta) val laminate = Laminate(group.meta, meta)
val goalMap: Map<Name, Goal<T>> = group.node val goalMap: Map<Name, Deferred<T>> = group.node.dataSequence().associate { it.first to it.second.task }
.data()
.associate { it.first to it.second.goal }
val groupName: String = group.name; val groupName: String = group.name;
val env = ActionEnv(groupName.toName(), laminate.builder()) val env = ActionEnv(groupName.toName(), laminate.builder())
val goal = goalMap.join(context = context) { group.result.invoke(env, it) } val goal = goalMap.join(context) { group.result.invoke(env, it) }
val res = Data.of(outputType, goal, env.meta) val res = Data.of(outputType, goal, env.meta)
@ -108,4 +107,4 @@ class JoinAction<T : Any, R : Any>(
} }
} }
operator fun <T> Map<Name,T>.get(name:String) = get(name.toName()) operator fun <T> Map<Name, T>.get(name: String) = get(name.toName())

View File

@ -35,7 +35,7 @@ class PipeAction<T : Any, R : Any>(
node.checkType(inputType) node.checkType(inputType)
return DataNode.build(outputType) { return DataNode.build(outputType) {
node.data().forEach { (name, data) -> node.dataSequence().forEach { (name, data) ->
//merging data meta with action meta (data meta is primary) //merging data meta with action meta (data meta is primary)
val oldMeta = meta.builder().apply { update(data.meta) } val oldMeta = meta.builder().apply { update(data.meta) }
// creating environment from old meta and name // creating environment from old meta and name
@ -47,7 +47,7 @@ class PipeAction<T : Any, R : Any>(
//getting new meta //getting new meta
val newMeta = builder.meta.seal() val newMeta = builder.meta.seal()
//creating a goal with custom context if provided //creating a goal with custom context if provided
val goal = data.goal.pipe(context) { builder.result(env, it) } val goal = data.task.pipe(context) { builder.result(env, it) }
//setting the data node //setting the data node
this[newName] = Data.of(outputType, goal, newMeta) this[newName] = Data.of(outputType, goal, newMeta)
} }

View File

@ -45,7 +45,7 @@ class SplitAction<T : Any, R : Any>(
node.checkType(inputType) node.checkType(inputType)
return DataNode.build(outputType) { return DataNode.build(outputType) {
node.data().forEach { (name, data) -> node.dataSequence().forEach { (name, data) ->
val laminate = Laminate(data.meta, meta) val laminate = Laminate(data.meta, meta)
@ -58,7 +58,7 @@ class SplitAction<T : Any, R : Any>(
rule(env) rule(env)
val goal = data.goal.pipe(context = context) { env.result(it) } val goal = data.task.pipe(context) { env.result(it) }
val res = Data.of(outputType, goal, env.meta) val res = Data.of(outputType, goal, env.meta)
set(env.name, res) set(env.name, res)

View File

@ -0,0 +1,60 @@
package hep.dataforge.data
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* Create a new [Deferred] with given [dependencies] and execution [block]. The block takes monitor as parameter.
*
* **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested.
*/
fun <T> CoroutineScope.task(
context: CoroutineContext,
dependencies: Collection<Job> = emptyList(),
block: suspend CoroutineScope.() -> T
): Deferred<T> = async(context + CoroutineMonitor() + Dependencies(dependencies), start = CoroutineStart.LAZY) {
dependencies.forEach { job ->
job.start()
job.invokeOnCompletion { error ->
if (error != null) cancel(CancellationException("Dependency $job failed with error: ${error.message}"))
}
}
return@async block()
}
/**
* Create a one-to-one goal based on existing goal
*/
fun <T, R> Deferred<T>.pipe(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R
): Deferred<R> = CoroutineScope(this + context).task(context, listOf(this)) {
block(await())
}
/**
* Create a joining goal.
* @param scope the scope for resulting goal. By default use first goal in list
*/
fun <T, R> Collection<Deferred<T>>.join(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R
): Deferred<R> = scope.task(context, this) {
block(map { it.await() })
}
/**
* A joining goal for a map
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
*/
fun <K, T, R> Map<K, Deferred<T>>.join(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R
): Deferred<R> = CoroutineScope(values.first() + context).task(context, this.values) {
block(mapValues { it.value.await() })
}

View File

@ -1,13 +1,14 @@
package hep.dataforge.data package hep.dataforge.data
import hep.dataforge.names.Name import hep.dataforge.names.NameToken
import kotlinx.coroutines.Deferred
import kotlin.reflect.KClass import kotlin.reflect.KClass
import kotlin.reflect.full.isSubclassOf import kotlin.reflect.full.isSubclassOf
fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<R>): Data<R>? { fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<R>): Data<R>? {
return if (type.isSubclassOf(type)) { return if (type.isSubclassOf(type)) {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
Data.of(type, goal as Goal<R>, meta) Data.of(type, task as Deferred<R>, meta)
} else { } else {
null null
} }
@ -17,7 +18,7 @@ fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<R>): Data<R>? {
* Filter a node by data and node type. Resulting node and its subnodes is guaranteed to have border type [type], * Filter a node by data and node type. Resulting node and its subnodes is guaranteed to have border type [type],
* but could contain empty nodes * but could contain empty nodes
*/ */
fun <T : Any, R : Any> DataNode<T>.cast(type: KClass<R>): DataNode<R> { fun <T : Any, R : Any> DataNode<T>.cast(type: KClass<out R>): DataNode<R> {
return if (this is CastDataNode) { return if (this is CastDataNode) {
origin.cast(type) origin.cast(type)
} else { } else {
@ -28,19 +29,18 @@ fun <T : Any, R : Any> DataNode<T>.cast(type: KClass<R>): DataNode<R> {
inline fun <T : Any, reified R : Any> DataNode<T>.cast(): DataNode<R> = cast(R::class) inline fun <T : Any, reified R : Any> DataNode<T>.cast(): DataNode<R> = cast(R::class)
class CastDataNode<out T : Any>(val origin: DataNode<Any>, override val type: KClass<out T>) : DataNode<T> { class CastDataNode<out T : Any>(val origin: DataNode<Any>, override val type: KClass<out T>) : DataNode<T> {
override val items: Map<NameToken, DataItem<T>> by lazy {
override fun get(name: Name): Data<T>? = origin.items.mapNotNull { (key, item) ->
origin[name]?.safeCast(type) when (item) {
is DataItem.Leaf -> {
override fun getNode(name: Name): DataNode<T>? { (item.value.safeCast(type))?.let {
return origin.getNode(name)?.cast(type) key to DataItem.Leaf(it)
} }
override fun data(): Sequence<Pair<Name, Data<T>>> =
origin.data().mapNotNull { pair ->
pair.second.safeCast(type)?.let { pair.first to it }
} }
is DataItem.Node -> {
override fun nodes(): Sequence<Pair<Name, DataNode<T>>> = key to DataItem.Node(item.value.cast(type))
origin.nodes().map { it.first to it.second.cast(type) } }
}
}.associate { it }
}
} }

View File

@ -5,4 +5,4 @@ import kotlinx.coroutines.runBlocking
/** /**
* Block the thread and get data content * Block the thread and get data content
*/ */
fun <T : Any> Data<T>.get(): T = runBlocking { await() } fun <T : Any> Data<T>.get(): T = runBlocking { task.await() }

View File

@ -51,7 +51,7 @@ data class TaskModel(
*/ */
fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> { fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> {
return DataTreeBuilder(Any::class).apply { return DataTreeBuilder(Any::class).apply {
dependencies.asSequence().flatMap { it.apply(workspace).data() }.forEach { (name, data) -> dependencies.asSequence().flatMap { it.apply(workspace).data }.forEach { (name, data) ->
//TODO add concise error on replacement //TODO add concise error on replacement
this[name] = data this[name] = data
} }

View File

@ -33,8 +33,8 @@ interface Workspace : ContextAware, Provider {
return when (target) { return when (target) {
"target", Meta.TYPE -> targets.mapKeys { it.key.toName() } "target", Meta.TYPE -> targets.mapKeys { it.key.toName() }
Task.TYPE -> tasks Task.TYPE -> tasks
Data.TYPE -> data.data().toMap() Data.TYPE -> data.data.toMap()
DataNode.TYPE -> data.nodes().toMap() DataNode.TYPE -> data.nodes.toMap()
else -> emptyMap() else -> emptyMap()
} }
} }

View File

@ -2,7 +2,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.data.Data import hep.dataforge.data.Data
import hep.dataforge.data.goal import hep.dataforge.data.task
import hep.dataforge.descriptors.NodeDescriptor import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.io.IOFormat import hep.dataforge.io.IOFormat
import hep.dataforge.io.JsonMetaFormat import hep.dataforge.io.JsonMetaFormat
@ -58,7 +58,7 @@ suspend fun <T : Any> Context.readData(
} else { } else {
null null
} }
val goal = goal { val goal = task {
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
format.run { format.run {
Files.newByteChannel(path, StandardOpenOption.READ) Files.newByteChannel(path, StandardOpenOption.READ)