Full Workspace and data refactor

This commit is contained in:
Alexander Nozik 2021-01-26 15:11:12 +03:00
parent 366d32a04a
commit 9dacd98f64
46 changed files with 880 additions and 1021 deletions

View File

@ -63,7 +63,7 @@ public class ActiveDataTree<T : Any>(
}
}
override suspend fun set(name: Name, data: Data<T>?) {
override suspend fun emit(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
@ -80,9 +80,9 @@ public class ActiveDataTree<T : Any>(
* Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job]
*/
public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet<T>): Job = launch {
set(name, dataSet)
emit(name, dataSet)
dataSet.updates.collect { nameInBranch ->
set(name + nameInBranch, dataSet.getData(nameInBranch))
emit(name + nameInBranch, dataSet.getData(nameInBranch))
}
}
}
@ -90,7 +90,8 @@ public class ActiveDataTree<T : Any>(
/**
* Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope]
*/
public suspend fun <T : Any> DataTree.Companion.active(
@Suppress("FunctionName")
public suspend fun <T : Any> ActiveDataTree(
type: KClass<out T>,
block: suspend ActiveDataTree<T>.() -> Unit,
): DataTree<T> {
@ -99,17 +100,18 @@ public suspend fun <T : Any> DataTree.Companion.active(
return tree
}
public suspend inline fun <reified T : Any> DataTree.Companion.active(
@Suppress("FunctionName")
public suspend inline fun <reified T : Any> ActiveDataTree(
crossinline block: suspend ActiveDataTree<T>.() -> Unit,
): DataTree<T> = ActiveDataTree(T::class).apply { block() }
public suspend inline fun <reified T : Any> ActiveDataTree<T>.set(
public suspend inline fun <reified T : Any> ActiveDataTree<T>.emit(
name: Name,
noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = set(name, DataTree.active(T::class, block))
): Unit = emit(name, ActiveDataTree(T::class, block))
public suspend inline fun <reified T : Any> ActiveDataTree<T>.set(
public suspend inline fun <reified T : Any> ActiveDataTree<T>.emit(
name: String,
noinline block: suspend ActiveDataTree<T>.() -> Unit,
): Unit = set(name.toName(), DataTree.active(T::class, block))
): Unit = emit(name.toName(), ActiveDataTree(T::class, block))

View File

@ -34,16 +34,16 @@ public abstract class CachingAction<in T : Any, out R : Any>(
dataSet: DataSet<T>,
meta: Meta,
scope: CoroutineScope?,
): DataSet<R> = DataTree.active(outputType) {
): DataSet<R> = ActiveDataTree(outputType) {
coroutineScope {
collectFrom(transform(dataSet, meta))
populate(transform(dataSet, meta))
}
scope?.let {
dataSet.updates.collect {
//clear old nodes
remove(it)
//collect new items
collectFrom(scope.transform(dataSet, meta, it))
populate(scope.transform(dataSet, meta, it))
//FIXME if the target is data, updates are fired twice
}
}

View File

@ -62,7 +62,7 @@ public class LazyData<T : Any>(
override val meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
block: suspend CoroutineScope.() -> T,
block: suspend () -> T,
) : Data<T>, LazyGoal<T>(context, dependencies, block)
public class StaticData<T : Any>(
@ -78,7 +78,7 @@ public fun <T : Any> Data(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
block: suspend CoroutineScope.() -> T,
block: suspend () -> T,
): Data<T> = LazyData(type, meta, context, dependencies, block)
@Suppress("FunctionName")
@ -86,80 +86,5 @@ public inline fun <reified T : Any> Data(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Data<*>> = emptyList(),
noinline block: suspend CoroutineScope.() -> T,
noinline block: suspend () -> T,
): Data<T> = Data(T::class, meta, context, dependencies, block)
public fun <T : Any, R : Any> Data<T>.map(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
block: suspend CoroutineScope.(T) -> R,
): Data<R> = LazyData(outputType, meta, coroutineContext, listOf(this)) {
block(await())
}
/**
* Create a data pipe
*/
public inline fun <T : Any, reified R : Any> Data<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
noinline block: suspend CoroutineScope.(T) -> R,
): Data<R> = LazyData(R::class, meta, coroutineContext, listOf(this)) {
block(await())
}
/**
* Create a joined data.
*/
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta,
noinline block: suspend CoroutineScope.(Collection<T>) -> R,
): Data<R> = LazyData(
R::class,
meta,
coroutineContext,
this
) {
block(map { run { it.await() } })
}
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduce(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta,
block: suspend CoroutineScope.(Map<K, T>) -> R,
): LazyData<R> = LazyData(
outputType,
meta,
coroutineContext,
this.values
) {
block(mapValues { it.value.await() })
}
/**
* A joining of multiple data into a single one
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
*/
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta,
noinline block: suspend CoroutineScope.(Map<K, T>) -> R,
): LazyData<R> = LazyData(
R::class,
meta,
coroutineContext,
this.values
) {
block(mapValues { it.value.await() })
}

View File

@ -36,10 +36,21 @@ public interface DataSet<out T : Any> {
public companion object {
public val META_KEY: Name = "@meta".asName()
/**
* An empty [DataSet] that suits all types
*/
public val EMPTY: DataSet<Nothing> = object : DataSet<Nothing> {
override val dataType: KClass<out Nothing> = Nothing::class
override fun flow(): Flow<NamedData<Nothing>> = emptyFlow()
override suspend fun getData(name: Name): Data<Nothing>? = null
}
}
}
public interface ActiveDataSet<T: Any>: DataSet<T>{
public interface ActiveDataSet<T : Any> : DataSet<T> {
/**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
* Those can include new data items and replacement of existing ones. The replaced items could update existing data content
@ -49,7 +60,7 @@ public interface ActiveDataSet<T: Any>: DataSet<T>{
public val updates: Flow<Name>
}
public val <T: Any> DataSet<T>.updates: Flow<Name> get() = if(this is ActiveDataSet) updates else emptyFlow()
public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is ActiveDataSet) updates else emptyFlow()
/**
* Flow all data nodes with names starting with [branchName]

View File

@ -16,34 +16,37 @@ public interface DataSetBuilder<in T : Any> {
*/
public suspend fun remove(name: Name)
public suspend fun set(name: Name, data: Data<T>?)
public suspend fun emit(name: Name, data: Data<T>?)
/**
* Set a current state of given [dataSet] into a branch [name]. Does not propagate updates
*/
public suspend fun set(name: Name, dataSet: DataSet<T>){
public suspend fun emit(name: Name, dataSet: DataSet<T>) {
//remove previous items
remove(name)
if (name != Name.EMPTY) {
remove(name)
}
//Set new items
dataSet.flow().collect {
set(name + it.name, it.data)
emit(name + it.name, it.data)
}
}
/**
* Append data to node
*/
public suspend infix fun String.put(data: Data<T>): Unit = set(toName(), data)
public suspend infix fun String.put(data: Data<T>): Unit = emit(toName(), data)
/**
* Append node
*/
public suspend infix fun String.put(dataSet: DataSet<T>): Unit = set(toName(), dataSet)
public suspend infix fun String.put(dataSet: DataSet<T>): Unit = emit(toName(), dataSet)
/**
* Build and append node
*/
public suspend infix fun String.put(block: suspend DataSetBuilder<T>.() -> Unit): Unit = set(toName(), block)
public suspend infix fun String.put(block: suspend DataSetBuilder<T>.() -> Unit): Unit = emit(toName(), block)
}
private class SubSetBuilder<in T : Any>(private val parent: DataSetBuilder<T>, private val branch: Name) :
@ -52,57 +55,60 @@ private class SubSetBuilder<in T : Any>(private val parent: DataSetBuilder<T>, p
parent.remove(branch + name)
}
override suspend fun set(name: Name, data: Data<T>?) {
parent.set(branch + name, data)
override suspend fun emit(name: Name, data: Data<T>?) {
parent.emit(branch + name, data)
}
override suspend fun set(name: Name, dataSet: DataSet<T>) {
parent.set(branch + name, dataSet)
override suspend fun emit(name: Name, dataSet: DataSet<T>) {
parent.emit(branch + name, dataSet)
}
}
public suspend fun <T : Any> DataSetBuilder<T>.set(name: Name, block: suspend DataSetBuilder<T>.() -> Unit){
SubSetBuilder(this,name).apply { block() }
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: Name, block: suspend DataSetBuilder<T>.() -> Unit) {
SubSetBuilder(this, name).apply { block() }
}
public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, data: Data<T>) {
set(name.toName(), data)
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: String, data: Data<T>) {
emit(name.toName(), data)
}
public suspend fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, meta: Meta = Meta.EMPTY) {
set(name, Data.static(data, meta))
emit(name, Data.static(data, meta))
}
public suspend fun <T : Any> DataSetBuilder<T>.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) {
set(name, Data.static(data, Meta(block)))
emit(name, Data.static(data, Meta(block)))
}
public suspend fun <T : Any> DataSetBuilder<T>.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) {
set(name.toName(), Data.static(data, Meta(block)))
emit(name.toName(), Data.static(data, Meta(block)))
}
public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, set: DataSet<T>) {
this.set(name.toName(), set)
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: String, set: DataSet<T>) {
this.emit(name.toName(), set)
}
public suspend fun <T : Any> DataSetBuilder<T>.set(name: String, block: suspend DataSetBuilder<T>.() -> Unit): Unit =
this@set.set(name.toName(), block)
public suspend fun <T : Any> DataSetBuilder<T>.emit(name: String, block: suspend DataSetBuilder<T>.() -> Unit): Unit =
this@emit.emit(name.toName(), block)
public suspend fun <T : Any> DataSetBuilder<T>.emit(data: NamedData<T>) {
emit(data.name, data.data)
}
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public suspend fun <T : Any> DataSetBuilder<T>.update(tree: DataSet<T>): Unit = coroutineScope {
public suspend fun <T : Any> DataSetBuilder<T>.populate(tree: DataSet<T>): Unit = coroutineScope {
tree.flow().collect {
//TODO check if the place is occupied
set(it.name, it.data)
emit(it.name, it.data)
}
}
public suspend fun <T : Any> DataSetBuilder<T>.collectFrom(flow: Flow<NamedData<T>>) {
public suspend fun <T : Any> DataSetBuilder<T>.populate(flow: Flow<NamedData<T>>) {
flow.collect {
set(it.name, it.data)
emit(it.name, it.data)
}
}

View File

@ -49,10 +49,13 @@ public open class StaticGoal<T>(public val value: T) : Goal<T> {
}
}
/**
* @param coroutineContext additional context information
*/
public open class LazyGoal<T>(
private val coroutineContext: CoroutineContext = EmptyCoroutineContext,
override val dependencies: Collection<Goal<*>> = emptyList(),
public val block: suspend CoroutineScope.() -> T,
public val block: suspend () -> T,
) : Goal<T> {
final override var deferred: Deferred<T>? = null
@ -61,20 +64,40 @@ public open class LazyGoal<T>(
/**
* Get ongoing computation or start a new one.
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
* If [GoalExecutionRestriction] is present in the [coroutineScope] context, the call could produce a error a warning
* depending on the settings.
*/
@DFExperimental
override fun async(coroutineScope: CoroutineScope): Deferred<T> {
val log = coroutineScope.coroutineContext[GoalLogger]
// Check if context restricts goal computation
coroutineScope.coroutineContext[GoalExecutionRestriction]?.let { restriction ->
when (restriction.policy) {
GoalExecutionRestrictionPolicy.WARNING -> log?.emit(GoalLogger.WARNING_TAG) { "Goal eager execution is prohibited by the coroutine scope policy" }
GoalExecutionRestrictionPolicy.ERROR -> error("Goal eager execution is prohibited by the coroutine scope policy")
else -> {
/*do nothing*/
}
}
}
log?.emit { "Starting dependencies computation for ${this@LazyGoal}" }
val startedDependencies = this.dependencies.map { goal ->
goal.run { async(coroutineScope) }
}
return deferred ?: coroutineScope.async(
this.coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)
coroutineContext
+ CoroutineMonitor()
+ Dependencies(startedDependencies)
+ GoalExecutionRestriction(GoalExecutionRestrictionPolicy.NONE) // Remove restrictions on goal execution
) {
//cancel execution if error encountered in one of dependencies
startedDependencies.forEach { deferred ->
deferred.invokeOnCompletion { error ->
if (error != null) this.cancel(CancellationException("Dependency $deferred failed with error: ${error.message}"))
}
}
coroutineContext[GoalLogger]?.emit { "Starting computation of ${this@LazyGoal}" }
block()
}.also { deferred = it }
}
@ -86,38 +109,4 @@ public open class LazyGoal<T>(
deferred?.cancel()
deferred = null
}
}
/**
* Create a one-to-one goal based on existing goal
*/
public fun <T, R> Goal<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R,
): Goal<R> = LazyGoal(coroutineContext, listOf(this)) {
block(await())
}
/**
* Create a joining goal.
*/
public fun <T, R> Collection<Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R,
): Goal<R> = LazyGoal(coroutineContext, this) {
block(map { run { it.await() } })
}
/**
* A joining goal for a map
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
*/
public fun <K, T, R> Map<K, Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R,
): Goal<R> = LazyGoal(coroutineContext, this.values) {
block(mapValues { it.value.await() })
}
}

View File

@ -0,0 +1,17 @@
package hep.dataforge.data
import kotlin.coroutines.CoroutineContext
public enum class GoalExecutionRestrictionPolicy {
NONE,
WARNING,
ERROR
}
public class GoalExecutionRestriction(
public val policy: GoalExecutionRestrictionPolicy = GoalExecutionRestrictionPolicy.ERROR,
) : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = Companion
public companion object : CoroutineContext.Key<GoalExecutionRestriction>
}

View File

@ -0,0 +1,13 @@
package hep.dataforge.data
import kotlin.coroutines.CoroutineContext
public interface GoalLogger : CoroutineContext.Element {
override val key: CoroutineContext.Key<*> get() = GoalLogger
public fun emit(vararg tags: String, message: suspend () -> String)
public companion object : CoroutineContext.Key<GoalLogger>{
public const val WARNING_TAG: String = "WARNING"
}
}

View File

@ -33,20 +33,26 @@ public interface GroupRule {
* @param defaultTagValue
* @return
*/
public fun byValue(scope: CoroutineScope, key: String, defaultTagValue: String): GroupRule =
object : GroupRule {
public fun byValue(
scope: CoroutineScope,
key: String,
defaultTagValue: String,
): GroupRule = object : GroupRule {
override suspend fun <T : Any> gather(dataType: KClass<out T>, set: DataSet<T>): Map<String, DataSet<T>> {
val map = HashMap<String, ActiveDataTree<T>>()
override suspend fun <T : Any> gather(
dataType: KClass<out T>,
set: DataSet<T>,
): Map<String, DataSet<T>> {
val map = HashMap<String, ActiveDataTree<T>>()
set.flow().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { ActiveDataTree(dataType) }.set(data.name, data.data)
}
return map
set.flow().collect { data ->
val tagValue = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { ActiveDataTree(dataType) }.emit(data.name, data.data)
}
return map
}
}
// @ValueDef(key = "byValue", required = true, info = "The name of annotation value by which grouping should be made")

View File

@ -67,14 +67,14 @@ public class MapAction<in T : Any, out R : Any>(
val flow = dataSet.flow().map(::mapOne)
return DataTree.active(outputType) {
collectFrom(flow)
return ActiveDataTree(outputType) {
populate(flow)
scope?.launch {
dataSet.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
collectFrom(dataSet.flowChildren(name).map(::mapOne))
populate(dataSet.flowChildren(name).map(::mapOne))
}
}
}

View File

@ -104,7 +104,7 @@ public class ReduceAction<T : Any, R : Any>(
val env = ActionEnv(groupName.toName(), groupMeta, meta)
val res: LazyData<R> = dataFlow.reduce(
val res: LazyData<R> = dataFlow.reduceToData(
outputType,
meta = groupMeta
) { group.result.invoke(env, it) }

View File

@ -62,14 +62,14 @@ public class SplitAction<T : Any, R : Any>(
}
}
return DataTree.active(outputType) {
collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne))
return ActiveDataTree(outputType) {
populate(dataSet.flow().flatMapConcat(transform = ::splitOne))
scope?.launch {
dataSet.updates.collect { name ->
//clear old nodes
remove(name)
//collect new items
collectFrom(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne))
populate(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne))
}
}
}

View File

@ -42,32 +42,34 @@ internal class StaticDataTree<T : Any>(
}
}
override suspend fun set(name: Name, data: Data<T>?) {
override suspend fun emit(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) })
}
override suspend fun set(name: Name, dataSet: DataSet<T>) {
override suspend fun emit(name: Name, dataSet: DataSet<T>) {
if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet))
} else {
coroutineScope {
dataSet.flow().collect {
set(name + it.name, it.data)
emit(name + it.name, it.data)
}
}
}
}
}
@Suppress("FunctionName")
public suspend fun <T : Any> DataTree(
dataType: KClass<out T>,
block: suspend DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree(dataType).apply { block() }
@Suppress("FunctionName")
public suspend inline fun <reified T : Any> DataTree(
noinline block: suspend DataSetBuilder<T>.() -> Unit,
): DataTree<T> = DataTree(T::class, block)
public suspend fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType){
update(this@seal)
populate(this@seal)
}

View File

@ -12,10 +12,9 @@ import kotlin.reflect.KClass
/**
* A stateless filtered [DataSet]
*/
@DFExperimental
public fun <T : Any> DataSet<T>.filter(
predicate: suspend (Name, Data<T>) -> Boolean,
): DataSet<T> = object : DataSet<T> {
): ActiveDataSet<T> = object : ActiveDataSet<T> {
override val dataType: KClass<out T> get() = this@filter.dataType
override fun flow(): Flow<NamedData<T>> =
@ -36,7 +35,7 @@ public fun <T : Any> DataSet<T>.filter(
* Generate a wrapper data set with a given name prefix appended to all names
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) this
else object : DataSet<T> {
else object : ActiveDataSet<T> {
override val dataType: KClass<out T> get() = this@withNamePrefix.dataType
override fun flow(): Flow<NamedData<T>> = this@withNamePrefix.flow().map { it.data.named(prefix + it.name) }
@ -51,8 +50,9 @@ else object : DataSet<T> {
/**
* Get a subset of data starting with a given [branchName]
*/
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) this
else object : DataSet<T> {
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) {
this
} else object : ActiveDataSet<T> {
override val dataType: KClass<out T> get() = this@branch.dataType
override fun flow(): Flow<NamedData<T>> = this@branch.flow().mapNotNull {
@ -70,3 +70,4 @@ public fun <T : Any> DataSet<T>.branch(branchName: String): DataSet<T> = this@br
@DFExperimental
public suspend fun <T : Any> DataSet<T>.rootData(): Data<T>? = getData(Name.EMPTY)

View File

@ -12,7 +12,7 @@ public suspend fun DataSet<*>.getMeta(): Meta? = getData(DataSet.META_KEY)?.meta
/**
* Add meta-data node to a [DataSet]
*/
public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta))
public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = emit(DataSet.META_KEY, Data.empty(meta))
/**
* Add meta-data node to a [DataSet]

View File

@ -0,0 +1,181 @@
package hep.dataforge.data
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.seal
import hep.dataforge.meta.toMutableMeta
import kotlinx.coroutines.flow.*
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
public fun <T : Any, R : Any> Data<T>.map(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
block: suspend (T) -> R,
): Data<R> = LazyData(outputType, meta, coroutineContext, listOf(this)) {
block(await())
}
/**
* Create a data mapping
*/
public inline fun <T : Any, reified R : Any> Data<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
crossinline block: suspend (T) -> R,
): Data<R> = LazyData(R::class, meta, coroutineContext, listOf(this)) {
block(await())
}
/**
* Combine this data with the other data using [block]
*/
public inline fun <T1 : Any, T2: Any, reified R : Any> Data<T1>.combine(
other: Data<T2>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
crossinline block: suspend (left: T1, right: T2) -> R,
): Data<R> = LazyData(R::class, meta, coroutineContext, listOf(this,other)) {
block(await(), other.await())
}
//data collection operations
/**
* Create a joined data.
*/
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline block: suspend (Collection<T>) -> R,
): Data<R> = LazyData(
R::class,
meta,
coroutineContext,
this
) {
block(map { run { it.await() } })
}
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
block: suspend (Map<K, T>) -> R,
): LazyData<R> = LazyData(
outputType,
meta,
coroutineContext,
this.values
) {
block(mapValues { it.value.await() })
}
/**
* A joining of multiple data into a single one
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
*/
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline block: suspend (Map<K, T>) -> R,
): LazyData<R> = LazyData(
R::class,
meta,
coroutineContext,
this.values
) {
block(mapValues { it.value.await() })
}
/**
* Transform a [Flow] of [NamedData] to a single [Data]. Execution restrictions are removed for inner [Flow]
*/
public suspend fun <T : Any, R : Any> Flow<NamedData<T>>.reduceToData(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
transformation: suspend (Flow<NamedData<T>>) -> R,
): LazyData<R> = LazyData(
outputType,
meta,
coroutineContext,
toList()
) {
transformation(this)
}
//flow operations
public suspend inline fun <T : Any, reified R : Any> Flow<NamedData<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline transformation: suspend (Flow<NamedData<T>>) -> R,
): LazyData<R> = reduceToData(R::class, coroutineContext, meta) {
transformation(it)
}
/**
* Fold a flow of named data into a single [Data]
*/
public suspend inline fun <T : Any, reified R : Any> Flow<NamedData<T>>.foldToData(
initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline block: suspend (result: R, data: NamedData<T>) -> R,
): LazyData<R> = reduceToData(
coroutineContext, meta
) {
it.fold(initial, block)
}
//DataSet operations
public suspend fun <T : Any, R : Any> DataSet<T>.map(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
metaTransform: MetaBuilder.() -> Unit = {},
block: suspend (T) -> R,
): DataSet<R> = DataTree(outputType) {
populate(
flow().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
it.map(outputType, coroutineContext, newMeta, block).named(it.name)
}
)
}
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline metaTransform: MetaBuilder.() -> Unit = {},
noinline block: suspend (T) -> R,
): DataSet<R> = map(R::class, coroutineContext, metaTransform, block)
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
flow().collect {
block(it)
}
}
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline transformation: suspend (Flow<NamedData<T>>) -> R,
): LazyData<R> = flow().reduceToData(coroutineContext, meta, transformation)
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
initial: R,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline block: suspend (result: R, data: NamedData<T>) -> R,
): LazyData<R> = flow().foldToData(initial, coroutineContext, meta, block)

View File

@ -1,31 +1,12 @@
package hep.dataforge.data
import kotlinx.coroutines.runBlocking
import kotlin.reflect.KClass
import kotlin.reflect.full.isSubclassOf
/**
* Block the thread and get data content
*/
public fun <T : Any> Data<T>.value(): T = runBlocking { await() }
/**
* Check if data could be safely cast to given class
*/
internal fun <R : Any> Data<*>.canCast(type: KClass<out R>): Boolean =
this.type.isSubclassOf(type)
//public fun <R : Any, T : R> Data<T>.upcast(type: KClass<out R>): Data<R> {
// return object : Data<R> by this {
// override val type: KClass<out R> = type
// }
//}
//
///**
// * Safe upcast a [Data] to a supertype
// */
//public inline fun <reified R : Any, T : R> Data<T>.upcast(): Data<R> = upcast(R::class)
internal fun <R : Any> Data<*>.canCast(type: KClass<out R>): Boolean = this.type.isSubclassOf(type)
/**
* Cast the node to given type if the cast is possible or return null

View File

@ -1,29 +0,0 @@
package hep.dataforge.data
import hep.dataforge.names.Name
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlin.reflect.KClass
public fun <R : Any> DataSet<*>.filterIsInstance(type: KClass<out R>): DataSet<R> = object : DataSet<R> {
override val dataType: KClass<out R> = type
@Suppress("UNCHECKED_CAST")
override fun flow(): Flow<NamedData<R>> = this@filterIsInstance.flow().filter {
it.canCast(type)
}.map {
it as NamedData<R>
}
override suspend fun getData(name: Name): Data<R>? = this@filterIsInstance.getData(name)?.castOrNull(type)
override val updates: Flow<Name> = this@filterIsInstance.updates.filter {
val datum = this@filterIsInstance.getData(it)
datum?.canCast(type) ?: false
}
}
public inline fun <reified R : Any> DataSet<*>.filterIsInstance(): DataSet<R> = filterIsInstance(R::class)

View File

@ -0,0 +1,49 @@
package hep.dataforge.data
import hep.dataforge.meta.DFExperimental
import hep.dataforge.names.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlin.reflect.KClass
/**
* Select all data matching given type and filters. Does not modify paths
*/
@OptIn(DFExperimental::class)
public fun <R : Any> DataSet<*>.select(
type: KClass<out R>,
namePattern: Name? = null,
): ActiveDataSet<R> = object : ActiveDataSet<R> {
override val dataType: KClass<out R> = type
@Suppress("UNCHECKED_CAST")
override fun flow(): Flow<NamedData<R>> = this@select.flow().filter {
it.canCast(type) && (namePattern == null || it.name.matches(namePattern))
}.map {
it as NamedData<R>
}
override suspend fun getData(name: Name): Data<R>? = this@select.getData(name)?.castOrNull(type)
override val updates: Flow<Name> = this@select.updates.filter {
val datum = this@select.getData(it)
datum?.canCast(type) ?: false
}
}
/**
* Select a single datum of the appropriate type
*/
public inline fun <reified R : Any> DataSet<*>.select(namePattern: Name? = null): DataSet<R> =
select(R::class, namePattern)
public suspend fun <R : Any> DataSet<*>.selectOne(type: KClass<out R>, name: Name): NamedData<R>? =
getData(name)?.castOrNull(type)?.named(name)
public suspend inline fun <reified R : Any> DataSet<*>.selectOne(name: Name): NamedData<R>? = selectOne(R::class, name)
public suspend inline fun <reified R : Any> DataSet<*>.selectOne(name: String): NamedData<R>? =
selectOne(R::class, name.toName())

View File

@ -4,9 +4,14 @@ import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
/**
* Block the thread and get data content
*/
public fun <T : Any> Data<T>.value(): T = runBlocking { await() }
class ActionsTest {
val data: DataTree<Int> = runBlocking {
DataTree.static {
DataTree {
repeat(10) {
data(it.toString(), it)
}

View File

@ -10,20 +10,20 @@ import kotlin.test.assertEquals
internal class DataTreeBuilderTest {
@Test
fun testDataUpdate() = runBlocking {
val updateData: DataTree<Any> = DataTree.static {
val updateData: DataTree<Any> = DataTree {
"update" put {
"a" put Data.static("a")
"b" put Data.static("b")
}
}
val node = DataTree.static<Any> {
set("primary") {
val node = DataTree<Any> {
emit("primary") {
data("a", "a")
data("b", "b")
}
data("root", "root")
update(updateData)
populate(updateData)
}
@ -34,13 +34,15 @@ internal class DataTreeBuilderTest {
@Test
fun testDynamicUpdates() = runBlocking {
try {
lateinit var updateJob: Job
supervisorScope {
val subNode = DataTree.active<Int> {
launch {
val subNode = ActiveDataTree<Int> {
updateJob = launch {
repeat(10) {
delay(10)
data("value", it)
}
delay(10)
}
}
launch {
@ -48,7 +50,7 @@ internal class DataTreeBuilderTest {
println(it)
}
}
val rootNode = DataTree.active<Int> {
val rootNode = ActiveDataTree<Int> {
setAndObserve("sub".toName(), subNode)
}
@ -57,7 +59,7 @@ internal class DataTreeBuilderTest {
println(it)
}
}
delay(200)
updateJob.join()
assertEquals(9, rootNode.getData("sub.value")?.value())
cancel()
}

View File

@ -192,13 +192,13 @@ public class NodeDescriptor(config: Config = Config()) : ItemDescriptor(config)
internal val ITEM_KEY: Name = "item".asName()
internal val IS_NODE_KEY: Name = "@isNode".asName()
public inline operator fun invoke(block: NodeDescriptor.() -> Unit): NodeDescriptor =
NodeDescriptor().apply(block)
//TODO infer descriptor from spec
}
}
public inline fun NodeDescriptor(block: NodeDescriptor.() -> Unit): NodeDescriptor =
NodeDescriptor().apply(block)
/**
* Get a descriptor item associated with given name or null if item for given name not provided
*/

View File

@ -38,7 +38,7 @@ class MetaDelegateTest {
assertEquals("theString", testObject.myValue)
assertEquals(TestEnum.NO, testObject.enumValue)
assertEquals(2.2, testObject.safeValue)
assertEquals("ddd", testObject.inner?.innerValue)
assertEquals("ddd", testObject.inner.innerValue)
}

View File

@ -0,0 +1,32 @@
package hep.dataforge.names
import hep.dataforge.meta.DFExperimental
import kotlin.test.Test
import kotlin.test.assertFails
import kotlin.test.assertFalse
import kotlin.test.assertTrue
@OptIn(DFExperimental::class)
class NameMatchTest {
@Test
fun matchWildCards() {
val theName = "a.b.c.d".toName()
assertTrue { theName.matches("a.b.**") }
assertTrue { theName.matches("a.*.c.**") }
assertTrue { theName.matches("**.d") }
assertTrue { theName.matches("**.b.**") }
assertTrue { theName.matches("a.*.*.d") }
assertFails { theName.matches("a.**.d") }
assertFalse { theName.matches("a.b.c.d.**") }
}
@Test
fun matchPattern() {
val theName = "a[dd+2].b[13].c.d[\"d\"]".toName()
assertTrue { theName.matches("a[.*].b[.*].c[.*].d[.*]") }
assertTrue { theName.matches("a[.*].b[.*].c.d[.*]") }
assertFalse { theName.matches("a[.*].b[.*].*.d") }
assertTrue { theName.matches("""\\w[dd\\+2].b[.*].c[.*].d[.*]""") }
assertFalse { theName.matches("""\\s[dd\\+2].b[.*].c[.*].d[.*]""") }
}
}

View File

@ -3,7 +3,6 @@ package hep.dataforge.scripting
import hep.dataforge.context.Context
import hep.dataforge.context.Global
import hep.dataforge.context.logger
import hep.dataforge.workspace.SimpleWorkspaceBuilder
import hep.dataforge.workspace.Workspace
import hep.dataforge.workspace.WorkspaceBuilder
import java.io.File
@ -17,7 +16,7 @@ import kotlin.script.experimental.jvmhost.BasicJvmScriptingHost
public object Builders {
private fun buildWorkspace(source: SourceCode, context: Context = Global): Workspace {
val builder = SimpleWorkspaceBuilder(context)
val builder = WorkspaceBuilder(context)
val workspaceScriptConfiguration = ScriptCompilationConfiguration {
// baseClass(Any::class)
@ -30,6 +29,7 @@ public object Builders {
dependenciesFromCurrentContext(wholeClasspath = true)
}
hostConfiguration(defaultJvmScriptingHostConfiguration)
compilerOptions("-jvm-target", Runtime.version().feature().toString())
}
val evaluationConfiguration = ScriptEvaluationConfiguration {

View File

@ -3,8 +3,9 @@ package hep.dataforge.scripting
import hep.dataforge.context.Global
import hep.dataforge.meta.get
import hep.dataforge.meta.int
import hep.dataforge.workspace.SimpleWorkspaceBuilder
import hep.dataforge.workspace.context
import hep.dataforge.workspace.WorkspaceBuilder
import hep.dataforge.workspace.target
import kotlin.test.Test
import kotlin.test.assertEquals
@ -12,7 +13,7 @@ import kotlin.test.assertEquals
class BuildersKtTest {
@Test
fun checkBuilder(){
val workspace = SimpleWorkspaceBuilder(Global).apply {
val workspace = WorkspaceBuilder(Global).apply {
println("I am working")
context("test")

View File

@ -0,0 +1,17 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.logger
import hep.dataforge.data.GoalLogger
import kotlinx.coroutines.launch
public class ContextGoalLogger(public val context: Context) : GoalLogger {
override fun emit(vararg tags: String, message: suspend () -> String) {
context.launch {
val text = message()
context.logger.info { text }
}
}
}
public val Workspace.goalLogger: GoalLogger get() = ContextGoalLogger(context)

View File

@ -1,104 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.data.*
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.plus
import hep.dataforge.names.removeHeadOrNull
import hep.dataforge.names.toName
import kotlinx.coroutines.flow.*
import kotlin.reflect.KClass
public interface DataPlacement: MetaRepr {
/**
* Select a placement for a data with given [name] and [meta]. The result is null if data should be ignored.
*/
public fun place(name: Name, meta: Meta, dataType: KClass<*>): Name?
public companion object {
public val ALL: DataPlacement = object : DataPlacement {
override fun place(name: Name, meta: Meta, dataType: KClass<*>): Name = name
override fun toMeta(): Meta = Meta{"from" put "*"}
}
public fun into(target: Name): DataPlacement = DataPlacementScheme{
to = target.toString()
}
public fun into(target: String): DataPlacement = DataPlacementScheme{
to = target
}
}
}
public fun DataPlacement.place(datum: NamedData<*>): Name? = place(datum.name, datum.meta, datum.type)
private class ArrangedDataSet<T : Any>(
private val source: DataSet<T>,
private val placement: DataPlacement,
) : DataSet<T> {
override val dataType: KClass<out T> get() = source.dataType
override fun flow(): Flow<NamedData<T>> = source.flow().mapNotNull {
val newName = placement.place(it) ?: return@mapNotNull null
it.data.named(newName)
}
override suspend fun getData(name: Name): Data<T>? = flow().filter { it.name == name }.firstOrNull()
override val updates: Flow<Name> = source.updates.flatMapConcat {
flowChildren(it).mapNotNull(placement::place)
}
}
public class DataPlacementScheme : Scheme(), DataPlacement {
/**
* A source node for the filter
*/
public var from: String? by string()
/**
* A target placement for the filtered node
*/
public var to: String? by string()
/**
* A regular expression pattern for the filter
*/
public var pattern: String? by string()
// val prefix by string()
// val suffix by string()
override fun place(name: Name, meta: Meta, dataType: KClass<*>): Name? {
val fromName = from?.toName() ?: Name.EMPTY
val nameReminder = name.removeHeadOrNull(fromName) ?: return null
val regex = pattern?.toRegex()
return if (regex == null || nameReminder.toString().matches(regex)) {
(to?.toName() ?: Name.EMPTY) + nameReminder
} else {
null
}
}
public companion object : SchemeSpec<DataPlacementScheme>(::DataPlacementScheme)
}
/**
* Apply data node rearrangement
*/
public fun <T : Any> DataSet<T>.rearrange(placement: DataPlacement): DataSet<T> = ArrangedDataSet(this, placement)
///**
// * Mask data using [DataPlacementScheme] specification
// */
//public fun <T : Any> DataSet<T>.rearrange(placement: Meta): DataSet<T> =
// rearrange(DataPlacementScheme.read(placement))
/**
* Mask data using [DataPlacementScheme] builder
*/
public fun <T : Any> DataSet<T>.rearrange(placementBuilder: DataPlacementScheme.() -> Unit): DataSet<T> =
rearrange(DataPlacementScheme(placementBuilder))

View File

@ -1,73 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.toMutableMeta
import hep.dataforge.names.Name
import hep.dataforge.names.asName
import hep.dataforge.names.plus
/**
* A dependency of the task which allows to lazily create a data tree for single dependency
*/
public sealed class Dependency : MetaRepr {
public abstract suspend fun apply(workspace: Workspace): DataSet<Any>
}
public class DataDependency(private val placement: DataPlacement = DataPlacement.ALL) : Dependency() {
override suspend fun apply(workspace: Workspace): DataSet<Any> = workspace.data.rearrange(placement)
override fun toMeta(): Meta = placement.toMeta()
}
public abstract class TaskDependency<out T : Any>(
public val meta: Meta,
protected val placement: DataPlacement,
) : Dependency() {
public abstract fun resolveTask(workspace: Workspace): WorkStage<T>
/**
* A name of the dependency for logging and serialization
*/
public abstract val name: Name
override suspend fun apply(workspace: Workspace): DataSet<T> {
val task = resolveTask(workspace)
val result = workspace.run(task, meta)
return result.rearrange(placement)
}
}
public class ExternalTaskDependency<T : Any>(
public val task: WorkStage<T>,
meta: Meta,
placement: DataPlacement,
) : TaskDependency<T>(meta, placement) {
override fun resolveTask(workspace: Workspace): WorkStage<T> = task
override val name: Name get() = EXTERNAL_TASK_NAME + task.name
override fun toMeta(): Meta = placement.toMeta().toMutableMeta().apply {
"name" put name.toString()
"task" put task.toString()
"meta" put meta
}
public companion object {
public val EXTERNAL_TASK_NAME: Name = "@external".asName()
}
}
public class WorkspaceTaskDependency(
override val name: Name,
meta: Meta,
placement: DataPlacement,
) : TaskDependency<Any>(meta, placement) {
override fun resolveTask(workspace: Workspace): WorkStage<*> = workspace.stages[name]
?: error("Task with name $name is not found in the workspace")
override fun toMeta(): Meta {
TODO("Not yet implemented")
}
}

View File

@ -12,16 +12,14 @@ import hep.dataforge.names.Name
*/
public class SimpleWorkspace(
override val context: Context,
override val data: DataSet<Any>,
data: DataSet<*>,
override val targets: Map<String, Meta>,
stages: Map<Name, WorkStage<Any>>
private val externalTasks: Map<Name, Task<*>>,
) : Workspace {
override val stages: Map<Name, WorkStage<*>> by lazy {
context.gather<WorkStage<*>>(WorkStage.TYPE) + stages
}
override val data: TaskResult<*> = internalize(data, Name.EMPTY, Meta.EMPTY)
public companion object {
override val tasks: Map<Name, Task<*>>
get() = context.gather<Task<*>>(Task.TYPE) + externalTasks
}
}

View File

@ -1,46 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
/**
* A result of a [WorkStage]
*/
public interface StageDataSet<out T : Any> : DataSet<T> {
/**
* The [Workspace] this [DataSet] belongs to
*/
public val workspace: Workspace
/**
* The [Name] of the stage that produced this [DataSet]
*/
public val stageName: Name
/**
* The configuration of the stage that produced this [DataSet]
*/
public val stageMeta: Meta
override fun flow(): Flow<StageData<T>>
override suspend fun getData(name: Name): StageData<T>?
}
private class StageDataSetImpl<out T : Any>(
override val workspace: Workspace,
val dataSet: DataSet<T>,
override val stageName: Name,
override val stageMeta: Meta,
) : StageDataSet<T>, DataSet<T> by dataSet {
override fun flow(): Flow<StageData<T>> = dataSet.flow().map {
workspace.internalize(it, it.name, stageName, stageMeta)
}
override suspend fun getData(name: Name): StageData<T>? = dataSet.getData(name)?.let {
workspace.internalize(it, name, stageName, stageMeta)
}
}

View File

@ -0,0 +1,74 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSetBuilder
import hep.dataforge.data.DataTree
import hep.dataforge.data.GoalExecutionRestriction
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.Described
import hep.dataforge.meta.descriptors.ItemDescriptor
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.workspace.Task.Companion.TYPE
import kotlinx.coroutines.withContext
import kotlin.reflect.KClass
@Type(TYPE)
public interface Task<out T : Any> : Described {
public val type: KClass<out T>
/**
* Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result
*
* @param workspace a workspace to run task in
* @param taskName the name of the task in this workspace
* @param taskMeta configuration for current stage computation
*/
public suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T>
public companion object {
public const val TYPE: String = "workspace.stage"
}
}
public class TaskResultBuilder<T : Any>(
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
private val dataSync: DataSetBuilder<T>,
) : DataSetBuilder<T> by dataSync
/**
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
* Data dependency cycles are not allowed.
*/
@Suppress("FunctionName")
public fun <T : Any> Task(
resultType: KClass<out T>,
descriptor: ItemDescriptor? = null,
builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = object : Task<T> {
override val type: KClass<out T> = resultType
override val descriptor: ItemDescriptor? = descriptor
override suspend fun execute(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
val dataset = DataTree(type) {
TaskResultBuilder(workspace,taskName, taskMeta, this).apply { builder() }
}
workspace.internalize(dataset, taskName, taskMeta)
}
}
@Suppress("FunctionName")
public inline fun <reified T : Any> Task(
descriptor: ItemDescriptor? = null,
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = Task(T::class, descriptor, builder)

View File

@ -8,7 +8,7 @@ import hep.dataforge.names.Name
/**
* A [Workspace]-locked [NamedData], that serves as a computation model.
*/
public interface StageData<out T : Any> : NamedData<T> {
public interface TaskData<out T : Any> : NamedData<T> {
/**
* The [Workspace] this data belongs to
*/
@ -17,31 +17,31 @@ public interface StageData<out T : Any> : NamedData<T> {
/**
* The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used.
*/
public val stage: Name
public val task: Name
/**
* Stage configuration used to produce this data.
*/
public val stageMeta: Meta
public val taskMeta: Meta
/**
* Dependencies that allow to compute transitive dependencies as well.
*/
override val dependencies: Collection<StageData<*>>
// override val dependencies: Collection<TaskData<*>>
}
private class StageDataImpl<out T : Any>(
private class TaskDataImpl<out T : Any>(
override val workspace: Workspace,
override val data: Data<T>,
override val name: Name,
override val stage: Name,
override val stageMeta: Meta,
) : StageData<T>, Data<T> by data {
override val dependencies: Collection<StageData<*>> = data.dependencies.map {
it as? StageData<*> ?: error("StageData can't depend on external data")
}
override val task: Name,
override val taskMeta: Meta,
) : TaskData<T>, Data<T> by data {
// override val dependencies: Collection<TaskData<*>> = data.dependencies.map {
// it as? TaskData<*> ?: error("TaskData can't depend on external data")
// }
}
internal fun <T : Any> Workspace.internalize(data: Data<T>, name: Name, stage: Name, stageMeta: Meta): StageData<T> =
StageDataImpl(this, data, name, stage, stageMeta)
internal fun <T : Any> Workspace.internalize(data: Data<T>, name: Name, stage: Name, stageMeta: Meta): TaskData<T> =
TaskDataImpl(this, data, name, stage, stageMeta)

View File

@ -0,0 +1,49 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
/**
* A result of a [Task]
*/
public interface TaskResult<out T : Any> : DataSet<T> {
/**
* The [Workspace] this [DataSet] belongs to
*/
public val workspace: Workspace
/**
* The [Name] of the stage that produced this [DataSet]
*/
public val taskName: Name
/**
* The configuration of the stage that produced this [DataSet]
*/
public val taskMeta: Meta
override fun flow(): Flow<TaskData<T>>
override suspend fun getData(name: Name): TaskData<T>?
}
private class TaskResultImpl<out T : Any>(
override val workspace: Workspace,
val dataSet: DataSet<T>,
override val taskName: Name,
override val taskMeta: Meta,
) : TaskResult<T>, DataSet<T> by dataSet {
override fun flow(): Flow<TaskData<T>> = dataSet.flow().map {
workspace.internalize(it, it.name, taskName, taskMeta)
}
override suspend fun getData(name: Name): TaskData<T>? = dataSet.getData(name)?.let {
workspace.internalize(it, name, taskName, taskMeta)
}
}
internal fun <T : Any> Workspace.internalize(dataSet: DataSet<T>, stage: Name, stageMeta: Meta): TaskResult<T> =
TaskResultImpl(this, dataSet, stage, stageMeta)

View File

@ -1,23 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.meta.Meta
import hep.dataforge.meta.descriptors.Described
import hep.dataforge.misc.Type
import hep.dataforge.workspace.WorkStage.Companion.TYPE
@Type(TYPE)
public interface WorkStage<out R : Any> : Described {
/**
* Compute a [StageDataSet] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result
*
* @param workspace a workspace to run task model in
* @param meta configuration for current stage computation
*/
public suspend fun execute(workspace: Workspace, meta: Meta): StageDataSet<R>
public companion object {
public const val TYPE: String = "workspace.stage"
}
}

View File

@ -2,6 +2,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.ContextAware
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.misc.Type
import hep.dataforge.names.Name
import hep.dataforge.names.toName
@ -13,7 +14,7 @@ public interface Workspace : ContextAware, Provider {
/**
* The whole data node for current workspace
*/
public val data: StageDataSet<*>
public val data: TaskResult<*>
/**
* All targets associated with the workspace
@ -23,44 +24,36 @@ public interface Workspace : ContextAware, Provider {
/**
* All stages associated with the workspace
*/
public val stages: Map<Name, WorkStage<*>>
public val tasks: Map<Name, Task<*>>
override fun content(target: String): Map<Name, Any> {
return when (target) {
"target", Meta.TYPE -> targets.mapKeys { it.key.toName() }
WorkStage.TYPE -> stages
Task.TYPE -> tasks
//Data.TYPE -> data.flow().toMap()
else -> emptyMap()
}
}
public suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> {
if (taskName == Name.EMPTY) return data
val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace")
return task.execute(this, taskName, taskMeta)
}
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): TaskData<*>? =
produce(taskName, taskMeta).getData(name)
public companion object {
public const val TYPE: String = "workspace"
}
}
public suspend fun Workspace.stage(taskName: Name, taskMeta: Meta): StageDataSet<*> {
val task = stages[taskName] ?: error("Task with name $taskName not found in the workspace")
return task.execute(this, taskMeta)
}
public suspend fun Workspace.produce(task: String, target: String): TaskResult<*> =
produce(task.toName(), targets[target] ?: error("Target with key $target not found in $this"))
public suspend fun Workspace.getData(taskName: Name, taskMeta: Meta, name: Name): StageData<*>? =
stage(taskName, taskMeta).getData(name)
public suspend fun Workspace.produce(task: String, meta: Meta): TaskResult<*> =
produce(task.toName(), meta)
//public suspend fun Workspace.execute(task: WorkStage<*>, target: String): DataSet<Any> {
// val meta = targets[target] ?: error("A target with name $target not found in $this")
// return run(task, meta)
//}
//
//
//public suspend fun Workspace.execute(task: String, target: String): DataSet<Any> =
// stages[task.toName()]?.let { execute(it, target) } ?: error("Task with name $task not found")
//
//public suspend fun Workspace.execute(task: String, meta: Meta): DataSet<Any> =
// stages[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found")
//
//public suspend fun Workspace.execute(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> =
// execute(task, Meta(block))
//
//public suspend fun <T : Any> Workspace.execute(task: WorkStage<T>, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet<T> =
// run(task, Meta(metaBuilder))
public suspend fun Workspace.produce(task: String, block: MetaBuilder.() -> Unit = {}): TaskResult<*> =
produce(task, Meta(block))

View File

@ -0,0 +1,103 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextBuilder
import hep.dataforge.context.Global
import hep.dataforge.data.ActiveDataTree
import hep.dataforge.data.DataSet
import hep.dataforge.data.DataSetBuilder
import hep.dataforge.data.DataTree
import hep.dataforge.meta.DFBuilder
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.KClass
public data class TaskReference<T: Any>(public val taskName: Name, public val task: Task<T>)
public interface TaskContainer {
public fun registerTask(taskName: Name, task: Task<*>)
}
public fun <T : Any> TaskContainer.registerTask(
resultType: KClass<out T>,
name: String,
descriptorBuilder: NodeDescriptor.() -> Unit = {},
builder: suspend TaskResultBuilder<T>.() -> Unit,
): Unit = registerTask(name.toName(), Task(resultType, NodeDescriptor(descriptorBuilder), builder))
public inline fun <reified T : Any> TaskContainer.registerTask(
name: String,
noinline descriptorBuilder: NodeDescriptor.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): Unit = registerTask(T::class, name, descriptorBuilder, builder)
public inline fun <reified T : Any> TaskContainer.task(
noinline descriptorBuilder: NodeDescriptor.() -> Unit = {},
noinline builder: suspend TaskResultBuilder<T>.() -> Unit,
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property ->
val taskName = property.name.toName()
val task = Task(T::class, NodeDescriptor(descriptorBuilder), builder)
registerTask(taskName, task)
ReadOnlyProperty { _, _ -> TaskReference(taskName, task) }
}
public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer {
private var context: Context? = null
private var data: DataSet<*>? = null
private val targets: HashMap<String, Meta> = HashMap()
private val tasks = HashMap<Name, Task<*>>()
/**
* Define a context for the workspace
*/
public fun context(name: String = "workspace", block: ContextBuilder.() -> Unit = {}) {
this.context = ContextBuilder(parentContext, name).apply(block).build()
}
/**
* Define intrinsic data for the workspace
*/
public suspend fun buildData(builder: suspend DataSetBuilder<Any>.() -> Unit) {
data = DataTree(builder)
}
@DFExperimental
public suspend fun buildActiveData(builder: suspend ActiveDataTree<Any>.() -> Unit) {
data = ActiveDataTree(builder)
}
/**
* Define a new target
*/
public fun target(name: String, meta: Meta?) {
if (meta == null) {
targets.remove(name)
} else {
targets[name] = meta
}
}
override fun registerTask(taskName: Name, task: Task<*>) {
tasks[taskName] = task
}
public fun build(): Workspace = SimpleWorkspace(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks)
}
/**
* Define a new target with a builder
*/
public inline fun WorkspaceBuilder.target(name: String, metaBuilder: MetaBuilder.() -> Unit): Unit =
target(name, Meta(metaBuilder))
@DFBuilder
public fun Workspace(parentContext: Context = Global, builder: WorkspaceBuilder.() -> Unit): Workspace {
return WorkspaceBuilder(parentContext).apply(builder).build()
}

View File

@ -1,208 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.data.*
import hep.dataforge.meta.*
import hep.dataforge.meta.descriptors.NodeDescriptor
import hep.dataforge.names.Name
import hep.dataforge.workspace.old.GenericTask
import hep.dataforge.workspace.old.TaskModel
import hep.dataforge.workspace.old.TaskModelBuilder
import hep.dataforge.workspace.old.data
import kotlin.reflect.KClass
private typealias DataTransformation<R> = suspend (context: Context, model: TaskModel, data: DataSet<Any>) -> DataSet<R>
@DFBuilder
@DFExperimental
public class TaskBuilder<R : Any>(private val name: Name, public val type: KClass<out R>) {
private var modelTransform: TaskModelBuilder.(Meta) -> Unit = {
data()
}
// private val additionalDependencies = HashSet<Dependency>()
private var descriptor: NodeDescriptor? = null
private val dataTransforms: MutableList<DataTransformation<R>> = ArrayList()
// override fun add(dependency: Dependency) {
// additionalDependencies.add(dependency)
// }
public fun model(modelTransform: TaskModelBuilder.(Meta) -> Unit) {
this.modelTransform = modelTransform
}
public class TaskEnv(
public val name: Name,
public val meta: Meta,
public val context: Context,
public val data: DataSet<Any>,
)
/**
* Add a transformation on untyped data
* @param from the prefix for root node in data
* @param to the prefix for the target node.
*/
@JvmName("rawTransform")
public fun transform(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
block: TaskEnv.(DataSet<*>) -> DataSet<R>,
) {
dataTransforms += { context, model, data ->
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
val startData = data.branch(from)
env.block(startData).withNamePrefix(to)
}
}
public fun <T : Any> transform(
inputType: KClass<out T>,
block: suspend TaskEnv.(DataSet<T>) -> DataSet<R>,
) {
dataTransforms += { context, model, data ->
val env = TaskEnv(Name.EMPTY, model.meta, context, data)
env.block(data.filterIsInstance(inputType))
}
}
public inline fun <reified T : Any> transform(
noinline block: suspend TaskEnv.(DataSet<T>) -> DataSet<R>,
): Unit = transform(T::class, block)
/**
* Perform given action on data elements in `from` node in input and put the result to `to` node
*/
public inline fun <reified T : Any> action(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: TaskEnv.() -> Action<T, R>,
) {
transform { data: DataSet<T> ->
block().execute(data, meta, context)
}
}
/**
* A customized map action with ability to change meta and name
*/
public inline fun <reified T : Any> mapAction(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: MapActionBuilder<T, R>.(TaskEnv) -> Unit,
) {
action(from, to) {
val env = this
MapAction<T, R>(type) {
block(env)
}
}
}
/**
* A simple map action without changing meta or name
*/
public inline fun <reified T : Any> map(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: suspend TaskEnv.(T) -> R,
) {
action(from, to) {
MapAction<T, R>(type) {
//TODO automatically append task meta
result = { data ->
block(data)
}
}
}
}
/**
* Join elements in gathered data by multiple groups
*/
public inline fun <reified T : Any> reduceByGroup(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: ReduceGroupBuilder<T, R>.(TaskEnv) -> Unit, //TODO needs KEEP-176
) {
action(from, to) {
val env = this
ReduceAction(inputType = T::class, outputType = type) { block(env) }
}
}
/**
* Join all elemlents in gathered data matching input type
*/
public inline fun <reified T : Any> reduce(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: suspend TaskEnv.(Map<Name, T>) -> R,
) {
action(from, to) {
ReduceAction(inputType = T::class, outputType = type) {
result(
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
) { data ->
block(data)
}
}
}
}
/**
* Split each element in gathered data into fixed number of fragments
*/
public inline fun <reified T : Any> split(
from: Name = Name.EMPTY,
to: Name = Name.EMPTY,
crossinline block: SplitBuilder<T, R>.(TaskEnv) -> Unit, //TODO needs KEEP-176
) {
action(from, to) {
val env = this
SplitAction<T, R>(type) { block(this, env) }
}
}
/**
* Use DSL to create a descriptor for this task
*/
public fun description(transform: NodeDescriptor.() -> Unit) {
this.descriptor = NodeDescriptor().apply(transform)
}
internal fun build(): GenericTask<R> {
return GenericTask(
name,
type,
descriptor ?: NodeDescriptor(),
modelTransform
) {
val workspace = this
{ dataSet ->
val model = this
if (dataTransforms.isEmpty()) {
//return data node as is
logger.warn { "No transformation present, returning input data" }
dataSet.castOrNull(type) ?: error("$type expected, but $type received")
} else {
DataTree.active(type, workspace.context){
dataTransforms.forEach { transformation ->
val res = transformation(workspace.context, model, dataSet)
update(res)
}
}
}
}
}
}
}
@DFExperimental
public suspend inline fun <reified T : Any> TaskBuilder.TaskEnv.dataTree(
crossinline block: suspend ActiveDataTree<T>.() -> Unit,
): DataTree<T> = DataTree.active(context, block)

View File

@ -1,87 +0,0 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextBuilder
import hep.dataforge.context.Global
import hep.dataforge.data.ActiveDataTree
import hep.dataforge.meta.*
import hep.dataforge.names.toName
import kotlin.reflect.KClass
@DFBuilder
public interface WorkspaceBuilder {
public val parentContext: Context
public var context: Context
public var data: ActiveDataTree<Any>
public var tasks: MutableSet<WorkStage<Any>>
public var targets: MutableMap<String, Meta>
public fun build(): Workspace
}
/**
* Set the context for future workspcace
*/
public fun WorkspaceBuilder.context(name: String = "WORKSPACE", block: ContextBuilder.() -> Unit = {}) {
context = ContextBuilder(parentContext, name).apply(block).build()
}
public inline fun WorkspaceBuilder.data(
block: ActiveDataTree<Any>.() -> Unit,
): Unit{
data.apply(block)
}
public fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit) {
targets[name] = Meta(block).seal()
}
class WorkspaceTask(val workspace: Workspace, val name: String)
/**
* Use existing target as a base updating it with the block
*/
public fun WorkspaceBuilder.target(name: String, base: String, block: MetaBuilder.() -> Unit) {
val parentTarget = targets[base] ?: error("Base target with name $base not found")
targets[name] = parentTarget.toMutableMeta()
.apply { "@baseTarget" put base }
.apply(block)
.seal()
}
public fun <T : Any> WorkspaceBuilder.task(
name: String,
type: KClass<out T>,
builder: TaskBuilder<T>.() -> Unit,
): WorkspaceTask = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) }
public inline fun <reified T : Any> WorkspaceBuilder.task(
name: String,
noinline builder: TaskBuilder<T>.() -> Unit,
): WorkStage<T> = task(name, T::class, builder)
@JvmName("rawTask")
public fun WorkspaceBuilder.task(
name: String,
builder: TaskBuilder<Any>.() -> Unit,
): WorkStage<Any> = task(name, Any::class, builder)
/**
* A builder for a simple workspace
*/
public class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder {
override var context: Context = parentContext
override var data: ActiveDataTree<Any> = ActiveDataTree(Any::class, context)
override var tasks: MutableSet<WorkStage<Any>> = HashSet()
override var targets: MutableMap<String, Meta> = HashMap()
override fun build(): SimpleWorkspace {
return SimpleWorkspace(context, data, targets, tasks)
}
}
public fun Workspace(
parent: Context = Global,
block: SimpleWorkspaceBuilder.() -> Unit,
): Workspace = SimpleWorkspaceBuilder(parent).apply(block).build()

View File

@ -2,41 +2,21 @@ package hep.dataforge.workspace
import hep.dataforge.context.AbstractPlugin
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.workspace.old.GenericTask
import kotlin.reflect.KClass
/**
* An abstract plugin with some additional boilerplate to effectively work with workspace context
*/
public abstract class WorkspacePlugin : AbstractPlugin() {
private val _tasks = HashSet<WorkStage<*>>()
public val tasks: Collection<WorkStage<*>> get() = _tasks
public abstract class WorkspacePlugin : AbstractPlugin(), TaskContainer {
private val tasks = HashMap<Name,Task<*>>()
override fun content(target: String): Map<Name, Any> {
return when (target) {
WorkStage.TYPE -> tasks.toMap()
Task.TYPE -> tasks
else -> emptyMap()
}
}
public fun task(task: WorkStage<*>){
_tasks.add(task)
override fun registerTask(taskName: Name, task: Task<*>) {
tasks[taskName] = task
}
public fun <T : Any> task(
name: String,
type: KClass<out T>,
builder: TaskBuilder<T>.() -> Unit
): GenericTask<T> = TaskBuilder(name.toName(), type).apply(builder).build().also {
_tasks.add(it)
}
public inline fun <reified T : Any> task(
name: String,
noinline builder: TaskBuilder<T>.() -> Unit
): GenericTask<T> = task(name, T::class, builder)
//
////TODO add delegates to build gradle-like tasks
}

View File

@ -82,7 +82,7 @@ public suspend fun <T : Any> DataSetBuilder<T>.file(
val data = readDataFile(path, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
set(name, data)
emit(name, data)
}
} else {
//otherwise, read as directory
@ -90,7 +90,7 @@ public suspend fun <T : Any> DataSetBuilder<T>.file(
val data = readDataDirectory(path, formatResolver)
val name = data.getMeta()[Envelope.ENVELOPE_NAME_KEY].string
?: path.fileName.toString().replace(".df", "")
set(name, data)
emit(name, data)
}
}
}
@ -99,7 +99,7 @@ public suspend fun <T : Any> DataSetBuilder<T>.file(
* Read the directory as a data node. If [path] is a zip archive, read it as directory
*/
@DFExperimental
public fun <T : Any> IOPlugin.readDataDirectory(
public suspend fun <T : Any> IOPlugin.readDataDirectory(
path: Path,
formatResolver: FileFormatResolver<T>,
): DataTree<T> {
@ -110,7 +110,7 @@ public fun <T : Any> IOPlugin.readDataDirectory(
return readDataDirectory(fs.rootDirectories.first(), formatResolver)
}
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataTree.static(formatResolver.kClass) {
return DataTree(formatResolver.kClass) {
Files.list(path).toList().forEach { path ->
val fileName = path.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
@ -125,7 +125,7 @@ public fun <T : Any> IOPlugin.readDataDirectory(
}
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(path: Path): DataTree<T> =
public suspend inline fun <reified T : Any> IOPlugin.readDataDirectory(path: Path): DataTree<T> =
readDataDirectory(path, formatResolver())
/**

View File

@ -0,0 +1,24 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSet
import hep.dataforge.data.select
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
public suspend inline fun <reified T : Any> TaskResultBuilder<T>.from(
task: Name,
taskMeta: Meta = Meta.EMPTY,
): DataSet<T> = workspace.produce(task, taskMeta).select()
@Suppress("UNCHECKED_CAST")
public suspend fun <R : Any> TaskResultBuilder<*>.from(
reference: TaskReference<R>,
taskMeta: Meta = Meta.EMPTY,
): DataSet<R> {
if (workspace.tasks[reference.taskName] == reference.task) {
return workspace.produce(reference.taskName, taskMeta) as TaskResult<R>
} else {
throw error("Task ${reference.taskName} does not belong to the workspace")
}
}

View File

@ -0,0 +1,8 @@
package hep.dataforge.workspace
import hep.dataforge.data.DataSetBuilder
import kotlinx.coroutines.runBlocking
public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder<Any>.() -> Unit): Unit = runBlocking {
buildData(builder)
}

View File

@ -5,54 +5,28 @@ import hep.dataforge.context.PluginFactory
import hep.dataforge.context.PluginTag
import hep.dataforge.data.*
import hep.dataforge.meta.Meta
import hep.dataforge.workspace.old.data
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.reduce
import hep.dataforge.names.toName
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.runBlocking
import kotlin.reflect.KClass
import kotlin.test.Test
import kotlin.test.assertEquals
fun <T : Any> DataSet<T>.first(): NamedData<T>? = runBlocking { flow().firstOrNull() }
class DataPropagationTestPlugin : WorkspacePlugin() {
override val tag: PluginTag = Companion.tag
val testAllData = task("allData", Int::class) {
model {
data()
}
transform<Int> { data ->
DataTree.active(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
val allData by task<Int> {
val selectedData = workspace.data.select<Int>()
val result: Data<Int> = selectedData.flow().foldToData(0) { result, data ->
result + data.await()
}
emit("result", result)
}
val testSingleData = task("singleData", Int::class) {
model {
data(pattern = "myData\\[12\\]")
}
transform<Int> { data ->
DataTree.active(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
}
}
val testAllRegexData = task("allRegexData", Int::class) {
model {
data(pattern = "myData.*")
}
transform<Int> { data ->
DataTree.active(context) {
val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair }
data("result", result)
}
val singleData by task<Int> {
workspace.data.select<Int>().getData("myData[12]".toName())?.let {
emit("result", it)
}
}
@ -72,9 +46,11 @@ class DataPropagationTest {
context {
plugin(DataPropagationTestPlugin)
}
data {
repeat(100) {
data("myData[$it]", it)
runBlocking {
data {
repeat(100) {
data("myData[$it]", it)
}
}
}
}
@ -82,24 +58,16 @@ class DataPropagationTest {
@Test
fun testAllData() {
runBlocking {
val node = testWorkspace.execute("Test.allData")
assertEquals(4950, node.first()!!.value())
}
}
@Test
fun testAllRegexData() {
runBlocking {
val node = testWorkspace.execute("Test.allRegexData")
assertEquals(4950, node.first()!!.value())
val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.flow().single().await())
}
}
@Test
fun testSingleData() {
runBlocking {
val node = testWorkspace.execute("Test.singleData")
assertEquals(12, node.first()!!.value())
val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.flow().single().await())
}
}
}

View File

@ -20,15 +20,17 @@ import kotlin.test.assertEquals
class FileDataTest {
val dataNode = DataTree.static<String> {
set("dir") {
data("a", "Some string") {
"content" put "Some string"
val dataNode = runBlocking {
DataTree<String> {
emit("dir") {
data("a", "Some string") {
"content" put "Some string"
}
}
data("b", "root data")
meta {
"content" put "This is root meta node"
}
}
data("b", "root data")
meta {
"content" put "This is root meta node"
}
}
@ -50,10 +52,10 @@ class FileDataTest {
}
object StringFormatResolver: FileFormatResolver<String>{
object StringFormatResolver : FileFormatResolver<String> {
override val type: KType = typeOf<String>()
override fun invoke(path: Path, meta: Meta): IOFormat<String> =StringIOFormat
override fun invoke(path: Path, meta: Meta): IOFormat<String> = StringIOFormat
}
@ -64,12 +66,10 @@ class FileDataTest {
val dir = Files.createTempDirectory("df_data_node")
runBlocking {
writeDataDirectory(dir, dataNode, StringIOFormat)
}
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir,StringFormatResolver)
runBlocking {
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir, StringFormatResolver)
assertEquals(dataNode.getData("dir.a")?.meta, reconstructed.getData("dir.a")?.meta)
assertEquals(dataNode.getData("b")?.value(), reconstructed.getData("b")?.value())
assertEquals(dataNode.getData("b")?.await(), reconstructed.getData("b")?.await())
}
}
}
@ -82,12 +82,10 @@ class FileDataTest {
val zip = Files.createTempFile("df_data_node", ".zip")
runBlocking {
writeZip(zip, dataNode, StringIOFormat)
}
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip, StringFormatResolver)
runBlocking {
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip, StringFormatResolver)
assertEquals(dataNode.getData("dir.a")?.meta, reconstructed.getData("dir.a")?.meta)
assertEquals(dataNode.getData("b")?.value(), reconstructed.getData("b")?.value())
assertEquals(dataNode.getData("b")?.await(), reconstructed.getData("b")?.await())
}
}
}

View File

@ -4,9 +4,8 @@ import hep.dataforge.context.*
import hep.dataforge.data.*
import hep.dataforge.meta.*
import hep.dataforge.names.plus
import hep.dataforge.workspace.old.data
import hep.dataforge.workspace.old.dependsOn
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.single
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Timeout
import kotlin.reflect.KClass
@ -14,6 +13,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
/**
* Make a fake-factory for a one single plugin. Useful for unique or test plugins
*/
@ -24,8 +24,8 @@ public inline fun <reified P : Plugin> P.toFactory(): PluginFactory<P> = object
override val type: KClass<out P> = P::class
}
public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> = runBlocking{
execute(task, block)
public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet<Any> = runBlocking {
produce(task, block)
}
@ -33,12 +33,18 @@ class SimpleWorkspaceTest {
val testPlugin = object : WorkspacePlugin() {
override val tag: PluginTag = PluginTag("test")
val contextTask = task("test", Any::class) {
map<Any> {
context.logger.info { "Test: $it" }
}
val test by task<Any> {
populate(
workspace.data.map {
it.also {
logger.info { "Test: $it" }
}
}
)
}
}
val testPluginFactory = testPlugin.toFactory()
val workspace = Workspace {
@ -53,98 +59,82 @@ class SimpleWorkspaceTest {
}
}
val filterTask = task<Int>("filterOne") {
model {
data("myData\\[12\\]")
}
map<Int> {
it
val filterOne by task<Int> {
workspace.data.selectOne<Int>("myData[12]")?.let { source ->
emit(source.name, source.map { it })
}
}
val square = task<Int>("square") {
map<Int> { data ->
if (meta["testFlag"].boolean == true) {
val square by task<Int> {
workspace.data.select<Int>().forEach { data ->
if (data.meta["testFlag"].boolean == true) {
println("flag")
}
context.logger.info { "Starting square on $data" }
data * data
val value = data.await()
workspace.logger.info { "Starting square on $value" }
emit(data.name, data.map { it * it })
}
}
val linear = task<Int>("linear") {
map<Int> { data ->
context.logger.info { "Starting linear on $data" }
data * 2 + 1
val linear by task<Int> {
workspace.data.select<Int>().forEach { data ->
workspace.logger.info { "Starting linear on $data" }
emit(data.name, data.data.map { it * 2 + 1 })
}
}
val fullSquare = task<Int>("fullsquare") {
model {
val squareDep = dependsOn(square, placement = DataPlacement.into("square"))
val linearDep = dependsOn(linear, placement = DataPlacement.into("linear"))
}
transform<Int> { data ->
val squareNode = data.branch("square").filterIsInstance<Int>() //squareDep()
val linearNode = data.branch("linear").filterIsInstance<Int>() //linearDep()
dataTree {
squareNode.flow().collect {
val newData: Data<Int> = Data {
val squareValue = squareNode.getData(it.name)!!.value()
val linearValue = linearNode.getData(it.name)!!.value()
squareValue + linearValue
}
set(name, newData)
}
val fullSquare by task<Int> {
val squareData = from(square)
val linearData = from(linear)
squareData.forEach { data ->
val newData: Data<Int> = data.combine(linearData.getData(data.name)!!) { l, r ->
l + r
}
emit(data.name, newData)
}
}
task<Int>("sum") {
model {
dependsOn(square)
}
reduce<Int> { data ->
context.logger.info { "Starting sum" }
data.values.sum()
val sum by task<Int> {
workspace.logger.info { "Starting sum" }
val res = from(square).foldToData(0) { l, r ->
l + r.await()
}
emit("sum", res)
}
val average = task<Double>("average") {
reduceByGroup<Int> { env ->
group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) {
result { data ->
env.context.logger.info { "Starting even" }
data.values.average()
}
}
group("odd", filter = { name, _ -> name.toString().toInt() % 2 == 1 }) {
result { data ->
env.context.logger.info { "Starting odd" }
data.values.average()
}
}
val averageByGroup by task<Int> {
val evenSum = workspace.data.filter { name, _ ->
name.toString().toInt() % 2 == 0
}.select<Int>().foldToData(0) { l, r ->
l + r.await()
}
emit("even", evenSum)
val oddSum = workspace.data.filter { name, _ ->
name.toString().toInt() % 2 == 1
}.select<Int>().foldToData(0) { l, r ->
l + r.await()
}
emit("odd", oddSum)
}
task("delta") {
model {
dependsOn(average)
}
reduce<Double> { data ->
data["even"]!! - data["odd"]!!
val delta by task<Int> {
val averaged = from(averageByGroup)
val even = averaged.getData("event")!!
val odd = averaged.getData("odd")!!
val res = even.combine(odd) { l, r ->
l - r
}
emit("res", res)
}
val customPipeTask = task<Int>("custom") {
mapAction<Int> {
meta = meta.toMutableMeta().apply {
val customPipe by task<Int> {
workspace.data.select<Int>().forEach { data ->
val meta = data.meta.toMutableMeta().apply {
"newValue" put 22
}
name += "new"
result {
meta["value"].int ?: 0 + it
}
emit(data.name + "new", data.map { (data.meta["value"].int ?: 0) + it })
}
}
@ -154,21 +144,25 @@ class SimpleWorkspaceTest {
@Test
@Timeout(1)
fun testWorkspace() {
val node = workspace.runBlocking("sum")
val res = node.first()
assertEquals(328350, res?.value())
runBlocking {
val node = workspace.runBlocking("sum")
val res = node.flow().single()
assertEquals(328350, res.await())
}
}
@Test
@Timeout(1)
fun testMetaPropagation() {
val node = workspace.runBlocking("sum") { "testFlag" put true }
val res = node.first()?.value()
runBlocking {
val node = workspace.produce("sum") { "testFlag" put true }
val res = node.flow().single().await()
}
}
@Test
fun testPluginTask() {
val tasks = workspace.stages
val tasks = workspace.tasks
assertTrue { tasks["test.test"] != null }
//val node = workspace.run("test.test", "empty")
}
@ -176,16 +170,16 @@ class SimpleWorkspaceTest {
@Test
fun testFullSquare() {
runBlocking {
val node = workspace.execute("fullsquare")
val node = workspace.produce("fullSquare")
println(node.toMeta())
}
}
@Test
fun testFilter() {
val node = workspace.runBlocking("filterOne")
runBlocking {
assertEquals(12, node.first()?.value())
val node = workspace.produce("filterOne")
assertEquals(12, node.flow().first().await())
}
}
}