diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt index 82563eac..b8200fa5 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ActiveDataTree.kt @@ -63,7 +63,7 @@ public class ActiveDataTree( } } - override suspend fun set(name: Name, data: Data?) { + override suspend fun emit(name: Name, data: Data?) { if (data == null) { remove(name) } else { @@ -80,9 +80,9 @@ public class ActiveDataTree( * Copy given data set and mirror its changes to this [ActiveDataTree] in [this@setAndObserve]. Returns an update [Job] */ public fun CoroutineScope.setAndObserve(name: Name, dataSet: DataSet): Job = launch { - set(name, dataSet) + emit(name, dataSet) dataSet.updates.collect { nameInBranch -> - set(name + nameInBranch, dataSet.getData(nameInBranch)) + emit(name + nameInBranch, dataSet.getData(nameInBranch)) } } } @@ -90,7 +90,8 @@ public class ActiveDataTree( /** * Create a dynamic tree. Initial data is placed synchronously. Updates are propagated via [updatesScope] */ -public suspend fun DataTree.Companion.active( +@Suppress("FunctionName") +public suspend fun ActiveDataTree( type: KClass, block: suspend ActiveDataTree.() -> Unit, ): DataTree { @@ -99,17 +100,18 @@ public suspend fun DataTree.Companion.active( return tree } -public suspend inline fun DataTree.Companion.active( +@Suppress("FunctionName") +public suspend inline fun ActiveDataTree( crossinline block: suspend ActiveDataTree.() -> Unit, ): DataTree = ActiveDataTree(T::class).apply { block() } -public suspend inline fun ActiveDataTree.set( +public suspend inline fun ActiveDataTree.emit( name: Name, noinline block: suspend ActiveDataTree.() -> Unit, -): Unit = set(name, DataTree.active(T::class, block)) +): Unit = emit(name, ActiveDataTree(T::class, block)) -public suspend inline fun ActiveDataTree.set( +public suspend inline fun ActiveDataTree.emit( name: String, noinline block: suspend ActiveDataTree.() -> Unit, -): Unit = set(name.toName(), DataTree.active(T::class, block)) +): Unit = emit(name.toName(), ActiveDataTree(T::class, block)) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt index e440a765..5f8f8704 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/CachingAction.kt @@ -34,16 +34,16 @@ public abstract class CachingAction( dataSet: DataSet, meta: Meta, scope: CoroutineScope?, - ): DataSet = DataTree.active(outputType) { + ): DataSet = ActiveDataTree(outputType) { coroutineScope { - collectFrom(transform(dataSet, meta)) + populate(transform(dataSet, meta)) } scope?.let { dataSet.updates.collect { //clear old nodes remove(it) //collect new items - collectFrom(scope.transform(dataSet, meta, it)) + populate(scope.transform(dataSet, meta, it)) //FIXME if the target is data, updates are fired twice } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 57647c03..6d0c4c38 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -62,7 +62,7 @@ public class LazyData( override val meta: Meta = Meta.EMPTY, context: CoroutineContext = EmptyCoroutineContext, dependencies: Collection> = emptyList(), - block: suspend CoroutineScope.() -> T, + block: suspend () -> T, ) : Data, LazyGoal(context, dependencies, block) public class StaticData( @@ -78,7 +78,7 @@ public fun Data( meta: Meta = Meta.EMPTY, context: CoroutineContext = EmptyCoroutineContext, dependencies: Collection> = emptyList(), - block: suspend CoroutineScope.() -> T, + block: suspend () -> T, ): Data = LazyData(type, meta, context, dependencies, block) @Suppress("FunctionName") @@ -86,80 +86,5 @@ public inline fun Data( meta: Meta = Meta.EMPTY, context: CoroutineContext = EmptyCoroutineContext, dependencies: Collection> = emptyList(), - noinline block: suspend CoroutineScope.() -> T, + noinline block: suspend () -> T, ): Data = Data(T::class, meta, context, dependencies, block) - - - -public fun Data.map( - outputType: KClass, - coroutineContext: CoroutineContext = EmptyCoroutineContext, - meta: Meta = this.meta, - block: suspend CoroutineScope.(T) -> R, -): Data = LazyData(outputType, meta, coroutineContext, listOf(this)) { - block(await()) -} - - -/** - * Create a data pipe - */ -public inline fun Data.map( - coroutineContext: CoroutineContext = EmptyCoroutineContext, - meta: Meta = this.meta, - noinline block: suspend CoroutineScope.(T) -> R, -): Data = LazyData(R::class, meta, coroutineContext, listOf(this)) { - block(await()) -} - -/** - * Create a joined data. - */ -public inline fun Collection>.reduce( - coroutineContext: CoroutineContext = EmptyCoroutineContext, - meta: Meta, - noinline block: suspend CoroutineScope.(Collection) -> R, -): Data = LazyData( - R::class, - meta, - coroutineContext, - this -) { - block(map { run { it.await() } }) -} - -public fun Map>.reduce( - outputType: KClass, - coroutineContext: CoroutineContext = EmptyCoroutineContext, - meta: Meta, - block: suspend CoroutineScope.(Map) -> R, -): LazyData = LazyData( - outputType, - meta, - coroutineContext, - this.values -) { - block(mapValues { it.value.await() }) -} - - -/** - * A joining of multiple data into a single one - * @param K type of the map key - * @param T type of the input goal - * @param R type of the result goal - */ -public inline fun Map>.reduce( - coroutineContext: CoroutineContext = EmptyCoroutineContext, - meta: Meta, - noinline block: suspend CoroutineScope.(Map) -> R, -): LazyData = LazyData( - R::class, - meta, - coroutineContext, - this.values -) { - block(mapValues { it.value.await() }) -} - - diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt index 1c68c02a..db20fb16 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSet.kt @@ -36,10 +36,21 @@ public interface DataSet { public companion object { public val META_KEY: Name = "@meta".asName() + + /** + * An empty [DataSet] that suits all types + */ + public val EMPTY: DataSet = object : DataSet { + override val dataType: KClass = Nothing::class + + override fun flow(): Flow> = emptyFlow() + + override suspend fun getData(name: Name): Data? = null + } } } -public interface ActiveDataSet: DataSet{ +public interface ActiveDataSet : DataSet { /** * A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes. * Those can include new data items and replacement of existing ones. The replaced items could update existing data content @@ -49,7 +60,7 @@ public interface ActiveDataSet: DataSet{ public val updates: Flow } -public val DataSet.updates: Flow get() = if(this is ActiveDataSet) updates else emptyFlow() +public val DataSet.updates: Flow get() = if (this is ActiveDataSet) updates else emptyFlow() /** * Flow all data nodes with names starting with [branchName] diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt index 17120270..5a0a3b55 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataSetBuilder.kt @@ -16,34 +16,37 @@ public interface DataSetBuilder { */ public suspend fun remove(name: Name) - public suspend fun set(name: Name, data: Data?) + public suspend fun emit(name: Name, data: Data?) /** * Set a current state of given [dataSet] into a branch [name]. Does not propagate updates */ - public suspend fun set(name: Name, dataSet: DataSet){ + public suspend fun emit(name: Name, dataSet: DataSet) { //remove previous items - remove(name) + if (name != Name.EMPTY) { + remove(name) + } + //Set new items dataSet.flow().collect { - set(name + it.name, it.data) + emit(name + it.name, it.data) } } /** * Append data to node */ - public suspend infix fun String.put(data: Data): Unit = set(toName(), data) + public suspend infix fun String.put(data: Data): Unit = emit(toName(), data) /** * Append node */ - public suspend infix fun String.put(dataSet: DataSet): Unit = set(toName(), dataSet) + public suspend infix fun String.put(dataSet: DataSet): Unit = emit(toName(), dataSet) /** * Build and append node */ - public suspend infix fun String.put(block: suspend DataSetBuilder.() -> Unit): Unit = set(toName(), block) + public suspend infix fun String.put(block: suspend DataSetBuilder.() -> Unit): Unit = emit(toName(), block) } private class SubSetBuilder(private val parent: DataSetBuilder, private val branch: Name) : @@ -52,57 +55,60 @@ private class SubSetBuilder(private val parent: DataSetBuilder, p parent.remove(branch + name) } - override suspend fun set(name: Name, data: Data?) { - parent.set(branch + name, data) + override suspend fun emit(name: Name, data: Data?) { + parent.emit(branch + name, data) } - override suspend fun set(name: Name, dataSet: DataSet) { - parent.set(branch + name, dataSet) + override suspend fun emit(name: Name, dataSet: DataSet) { + parent.emit(branch + name, dataSet) } } -public suspend fun DataSetBuilder.set(name: Name, block: suspend DataSetBuilder.() -> Unit){ - SubSetBuilder(this,name).apply { block() } +public suspend fun DataSetBuilder.emit(name: Name, block: suspend DataSetBuilder.() -> Unit) { + SubSetBuilder(this, name).apply { block() } } -public suspend fun DataSetBuilder.set(name: String, data: Data) { - set(name.toName(), data) +public suspend fun DataSetBuilder.emit(name: String, data: Data) { + emit(name.toName(), data) } public suspend fun DataSetBuilder.data(name: Name, data: T, meta: Meta = Meta.EMPTY) { - set(name, Data.static(data, meta)) + emit(name, Data.static(data, meta)) } public suspend fun DataSetBuilder.data(name: Name, data: T, block: MetaBuilder.() -> Unit = {}) { - set(name, Data.static(data, Meta(block))) + emit(name, Data.static(data, Meta(block))) } public suspend fun DataSetBuilder.data(name: String, data: T, block: MetaBuilder.() -> Unit = {}) { - set(name.toName(), Data.static(data, Meta(block))) + emit(name.toName(), Data.static(data, Meta(block))) } -public suspend fun DataSetBuilder.set(name: String, set: DataSet) { - this.set(name.toName(), set) +public suspend fun DataSetBuilder.emit(name: String, set: DataSet) { + this.emit(name.toName(), set) } -public suspend fun DataSetBuilder.set(name: String, block: suspend DataSetBuilder.() -> Unit): Unit = - this@set.set(name.toName(), block) +public suspend fun DataSetBuilder.emit(name: String, block: suspend DataSetBuilder.() -> Unit): Unit = + this@emit.emit(name.toName(), block) +public suspend fun DataSetBuilder.emit(data: NamedData) { + emit(data.name, data.data) +} /** * Update data with given node data and meta with node meta. */ @DFExperimental -public suspend fun DataSetBuilder.update(tree: DataSet): Unit = coroutineScope { +public suspend fun DataSetBuilder.populate(tree: DataSet): Unit = coroutineScope { tree.flow().collect { //TODO check if the place is occupied - set(it.name, it.data) + emit(it.name, it.data) } } -public suspend fun DataSetBuilder.collectFrom(flow: Flow>) { +public suspend fun DataSetBuilder.populate(flow: Flow>) { flow.collect { - set(it.name, it.data) + emit(it.name, it.data) } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt index 3bde5971..eef9e7cb 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt @@ -49,10 +49,13 @@ public open class StaticGoal(public val value: T) : Goal { } } +/** + * @param coroutineContext additional context information + */ public open class LazyGoal( private val coroutineContext: CoroutineContext = EmptyCoroutineContext, override val dependencies: Collection> = emptyList(), - public val block: suspend CoroutineScope.() -> T, + public val block: suspend () -> T, ) : Goal { final override var deferred: Deferred? = null @@ -61,20 +64,40 @@ public open class LazyGoal( /** * Get ongoing computation or start a new one. * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. + * If [GoalExecutionRestriction] is present in the [coroutineScope] context, the call could produce a error a warning + * depending on the settings. */ @DFExperimental override fun async(coroutineScope: CoroutineScope): Deferred { + val log = coroutineScope.coroutineContext[GoalLogger] + // Check if context restricts goal computation + coroutineScope.coroutineContext[GoalExecutionRestriction]?.let { restriction -> + when (restriction.policy) { + GoalExecutionRestrictionPolicy.WARNING -> log?.emit(GoalLogger.WARNING_TAG) { "Goal eager execution is prohibited by the coroutine scope policy" } + GoalExecutionRestrictionPolicy.ERROR -> error("Goal eager execution is prohibited by the coroutine scope policy") + else -> { + /*do nothing*/ + } + } + } + + log?.emit { "Starting dependencies computation for ${this@LazyGoal}" } val startedDependencies = this.dependencies.map { goal -> goal.run { async(coroutineScope) } } return deferred ?: coroutineScope.async( - this.coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies) + coroutineContext + + CoroutineMonitor() + + Dependencies(startedDependencies) + + GoalExecutionRestriction(GoalExecutionRestrictionPolicy.NONE) // Remove restrictions on goal execution ) { + //cancel execution if error encountered in one of dependencies startedDependencies.forEach { deferred -> deferred.invokeOnCompletion { error -> if (error != null) this.cancel(CancellationException("Dependency $deferred failed with error: ${error.message}")) } } + coroutineContext[GoalLogger]?.emit { "Starting computation of ${this@LazyGoal}" } block() }.also { deferred = it } } @@ -86,38 +109,4 @@ public open class LazyGoal( deferred?.cancel() deferred = null } -} - -/** - * Create a one-to-one goal based on existing goal - */ -public fun Goal.map( - coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.(T) -> R, -): Goal = LazyGoal(coroutineContext, listOf(this)) { - block(await()) -} - -/** - * Create a joining goal. - */ -public fun Collection>.reduce( - coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.(Collection) -> R, -): Goal = LazyGoal(coroutineContext, this) { - block(map { run { it.await() } }) -} - -/** - * A joining goal for a map - * @param K type of the map key - * @param T type of the input goal - * @param R type of the result goal - */ -public fun Map>.reduce( - coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.(Map) -> R, -): Goal = LazyGoal(coroutineContext, this.values) { - block(mapValues { it.value.await() }) -} - +} \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GoalExecutionRestriction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GoalExecutionRestriction.kt new file mode 100644 index 00000000..38e439d5 --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GoalExecutionRestriction.kt @@ -0,0 +1,17 @@ +package hep.dataforge.data + +import kotlin.coroutines.CoroutineContext + +public enum class GoalExecutionRestrictionPolicy { + NONE, + WARNING, + ERROR +} + +public class GoalExecutionRestriction( + public val policy: GoalExecutionRestrictionPolicy = GoalExecutionRestrictionPolicy.ERROR, +) : CoroutineContext.Element { + override val key: CoroutineContext.Key<*> get() = Companion + + public companion object : CoroutineContext.Key +} \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GoalLogger.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GoalLogger.kt new file mode 100644 index 00000000..f0520578 --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GoalLogger.kt @@ -0,0 +1,13 @@ +package hep.dataforge.data + +import kotlin.coroutines.CoroutineContext + +public interface GoalLogger : CoroutineContext.Element { + override val key: CoroutineContext.Key<*> get() = GoalLogger + + public fun emit(vararg tags: String, message: suspend () -> String) + + public companion object : CoroutineContext.Key{ + public const val WARNING_TAG: String = "WARNING" + } +} \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt index fcd22eae..41e1de53 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/GroupRule.kt @@ -33,20 +33,26 @@ public interface GroupRule { * @param defaultTagValue * @return */ - public fun byValue(scope: CoroutineScope, key: String, defaultTagValue: String): GroupRule = - object : GroupRule { + public fun byValue( + scope: CoroutineScope, + key: String, + defaultTagValue: String, + ): GroupRule = object : GroupRule { - override suspend fun gather(dataType: KClass, set: DataSet): Map> { - val map = HashMap>() + override suspend fun gather( + dataType: KClass, + set: DataSet, + ): Map> { + val map = HashMap>() - set.flow().collect { data -> - val tagValue = data.meta[key]?.string ?: defaultTagValue - map.getOrPut(tagValue) { ActiveDataTree(dataType) }.set(data.name, data.data) - } - - return map + set.flow().collect { data -> + val tagValue = data.meta[key]?.string ?: defaultTagValue + map.getOrPut(tagValue) { ActiveDataTree(dataType) }.emit(data.name, data.data) } + + return map } + } // @ValueDef(key = "byValue", required = true, info = "The name of annotation value by which grouping should be made") diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt index ec6442d9..cfd37f0a 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/MapAction.kt @@ -67,14 +67,14 @@ public class MapAction( val flow = dataSet.flow().map(::mapOne) - return DataTree.active(outputType) { - collectFrom(flow) + return ActiveDataTree(outputType) { + populate(flow) scope?.launch { dataSet.updates.collect { name -> //clear old nodes remove(name) //collect new items - collectFrom(dataSet.flowChildren(name).map(::mapOne)) + populate(dataSet.flowChildren(name).map(::mapOne)) } } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ReduceAction.kt index 1cc4ec10..b326b344 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/ReduceAction.kt @@ -104,7 +104,7 @@ public class ReduceAction( val env = ActionEnv(groupName.toName(), groupMeta, meta) - val res: LazyData = dataFlow.reduce( + val res: LazyData = dataFlow.reduceToData( outputType, meta = groupMeta ) { group.result.invoke(env, it) } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt index 48f4fc93..3d8fca5a 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -62,14 +62,14 @@ public class SplitAction( } } - return DataTree.active(outputType) { - collectFrom(dataSet.flow().flatMapConcat(transform = ::splitOne)) + return ActiveDataTree(outputType) { + populate(dataSet.flow().flatMapConcat(transform = ::splitOne)) scope?.launch { dataSet.updates.collect { name -> //clear old nodes remove(name) //collect new items - collectFrom(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne)) + populate(dataSet.flowChildren(name).flatMapConcat(transform = ::splitOne)) } } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt index c3191ef9..05cd85de 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/StaticDataTree.kt @@ -42,32 +42,34 @@ internal class StaticDataTree( } } - override suspend fun set(name: Name, data: Data?) { + override suspend fun emit(name: Name, data: Data?) { set(name, data?.let { DataTreeItem.Leaf(it) }) } - override suspend fun set(name: Name, dataSet: DataSet) { + override suspend fun emit(name: Name, dataSet: DataSet) { if (dataSet is StaticDataTree) { set(name, DataTreeItem.Node(dataSet)) } else { coroutineScope { dataSet.flow().collect { - set(name + it.name, it.data) + emit(name + it.name, it.data) } } } } } +@Suppress("FunctionName") public suspend fun DataTree( dataType: KClass, block: suspend DataSetBuilder.() -> Unit, ): DataTree = StaticDataTree(dataType).apply { block() } +@Suppress("FunctionName") public suspend inline fun DataTree( noinline block: suspend DataSetBuilder.() -> Unit, ): DataTree = DataTree(T::class, block) public suspend fun DataSet.seal(): DataTree = DataTree(dataType){ - update(this@seal) + populate(this@seal) } \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetOperations.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataFilter.kt similarity index 93% rename from dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetOperations.kt rename to dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataFilter.kt index 9db5f963..5f2b2a8a 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetOperations.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataFilter.kt @@ -12,10 +12,9 @@ import kotlin.reflect.KClass /** * A stateless filtered [DataSet] */ -@DFExperimental public fun DataSet.filter( predicate: suspend (Name, Data) -> Boolean, -): DataSet = object : DataSet { +): ActiveDataSet = object : ActiveDataSet { override val dataType: KClass get() = this@filter.dataType override fun flow(): Flow> = @@ -36,7 +35,7 @@ public fun DataSet.filter( * Generate a wrapper data set with a given name prefix appended to all names */ public fun DataSet.withNamePrefix(prefix: Name): DataSet = if (prefix.isEmpty()) this -else object : DataSet { +else object : ActiveDataSet { override val dataType: KClass get() = this@withNamePrefix.dataType override fun flow(): Flow> = this@withNamePrefix.flow().map { it.data.named(prefix + it.name) } @@ -51,8 +50,9 @@ else object : DataSet { /** * Get a subset of data starting with a given [branchName] */ -public fun DataSet.branch(branchName: Name): DataSet = if (branchName.isEmpty()) this -else object : DataSet { +public fun DataSet.branch(branchName: Name): DataSet = if (branchName.isEmpty()) { + this +} else object : ActiveDataSet { override val dataType: KClass get() = this@branch.dataType override fun flow(): Flow> = this@branch.flow().mapNotNull { @@ -70,3 +70,4 @@ public fun DataSet.branch(branchName: String): DataSet = this@br @DFExperimental public suspend fun DataSet.rootData(): Data? = getData(Name.EMPTY) + diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt index 4b833e10..5ca07aa5 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataSetMeta.kt @@ -12,7 +12,7 @@ public suspend fun DataSet<*>.getMeta(): Meta? = getData(DataSet.META_KEY)?.meta /** * Add meta-data node to a [DataSet] */ -public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = set(DataSet.META_KEY, Data.empty(meta)) +public suspend fun DataSetBuilder<*>.meta(meta: Meta): Unit = emit(DataSet.META_KEY, Data.empty(meta)) /** * Add meta-data node to a [DataSet] diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataTransform.kt new file mode 100644 index 00000000..39f5f306 --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataTransform.kt @@ -0,0 +1,181 @@ +package hep.dataforge.data + +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.meta.seal +import hep.dataforge.meta.toMutableMeta +import kotlinx.coroutines.flow.* +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.reflect.KClass + + +public fun Data.map( + outputType: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = this.meta, + block: suspend (T) -> R, +): Data = LazyData(outputType, meta, coroutineContext, listOf(this)) { + block(await()) +} + +/** + * Create a data mapping + */ +public inline fun Data.map( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = this.meta, + crossinline block: suspend (T) -> R, +): Data = LazyData(R::class, meta, coroutineContext, listOf(this)) { + block(await()) +} + +/** + * Combine this data with the other data using [block] + */ +public inline fun Data.combine( + other: Data, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = this.meta, + crossinline block: suspend (left: T1, right: T2) -> R, +): Data = LazyData(R::class, meta, coroutineContext, listOf(this,other)) { + block(await(), other.await()) +} + + +//data collection operations + +/** + * Create a joined data. + */ +public inline fun Collection>.reduceToData( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + noinline block: suspend (Collection) -> R, +): Data = LazyData( + R::class, + meta, + coroutineContext, + this +) { + block(map { run { it.await() } }) +} + +public fun Map>.reduceToData( + outputType: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + block: suspend (Map) -> R, +): LazyData = LazyData( + outputType, + meta, + coroutineContext, + this.values +) { + block(mapValues { it.value.await() }) +} + + +/** + * A joining of multiple data into a single one + * @param K type of the map key + * @param T type of the input goal + * @param R type of the result goal + */ +public inline fun Map>.reduceToData( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + noinline block: suspend (Map) -> R, +): LazyData = LazyData( + R::class, + meta, + coroutineContext, + this.values +) { + block(mapValues { it.value.await() }) +} + +/** + * Transform a [Flow] of [NamedData] to a single [Data]. Execution restrictions are removed for inner [Flow] + */ +public suspend fun Flow>.reduceToData( + outputType: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + transformation: suspend (Flow>) -> R, +): LazyData = LazyData( + outputType, + meta, + coroutineContext, + toList() +) { + transformation(this) +} + +//flow operations + +public suspend inline fun Flow>.reduceToData( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + noinline transformation: suspend (Flow>) -> R, +): LazyData = reduceToData(R::class, coroutineContext, meta) { + transformation(it) +} + +/** + * Fold a flow of named data into a single [Data] + */ +public suspend inline fun Flow>.foldToData( + initial: R, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + noinline block: suspend (result: R, data: NamedData) -> R, +): LazyData = reduceToData( + coroutineContext, meta +) { + it.fold(initial, block) +} + +//DataSet operations + +public suspend fun DataSet.map( + outputType: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + metaTransform: MetaBuilder.() -> Unit = {}, + block: suspend (T) -> R, +): DataSet = DataTree(outputType) { + populate( + flow().map { + val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal() + it.map(outputType, coroutineContext, newMeta, block).named(it.name) + } + ) +} + +public suspend inline fun DataSet.map( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + noinline metaTransform: MetaBuilder.() -> Unit = {}, + noinline block: suspend (T) -> R, +): DataSet = map(R::class, coroutineContext, metaTransform, block) + +public suspend fun DataSet.forEach(block: suspend (NamedData) -> Unit) { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + flow().collect { + block(it) + } +} + +public suspend inline fun DataSet.reduceToData( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + noinline transformation: suspend (Flow>) -> R, +): LazyData = flow().reduceToData(coroutineContext, meta, transformation) + +public suspend inline fun DataSet.foldToData( + initial: R, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = Meta.EMPTY, + noinline block: suspend (result: R, data: NamedData) -> R, +): LazyData = flow().foldToData(initial, coroutineContext, meta, block) \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/dataJVM.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/dataJVM.kt index 775adfc1..4178766a 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/dataJVM.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/dataJVM.kt @@ -1,31 +1,12 @@ package hep.dataforge.data -import kotlinx.coroutines.runBlocking import kotlin.reflect.KClass import kotlin.reflect.full.isSubclassOf -/** - * Block the thread and get data content - */ -public fun Data.value(): T = runBlocking { await() } - /** * Check if data could be safely cast to given class */ -internal fun Data<*>.canCast(type: KClass): Boolean = - this.type.isSubclassOf(type) - - -//public fun Data.upcast(type: KClass): Data { -// return object : Data by this { -// override val type: KClass = type -// } -//} -// -///** -// * Safe upcast a [Data] to a supertype -// */ -//public inline fun Data.upcast(): Data = upcast(R::class) +internal fun Data<*>.canCast(type: KClass): Boolean = this.type.isSubclassOf(type) /** * Cast the node to given type if the cast is possible or return null diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/filterIsInstance.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/filterIsInstance.kt deleted file mode 100644 index 4997573a..00000000 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/filterIsInstance.kt +++ /dev/null @@ -1,29 +0,0 @@ -package hep.dataforge.data - -import hep.dataforge.names.Name -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter -import kotlinx.coroutines.flow.map -import kotlin.reflect.KClass - - -public fun DataSet<*>.filterIsInstance(type: KClass): DataSet = object : DataSet { - override val dataType: KClass = type - - @Suppress("UNCHECKED_CAST") - override fun flow(): Flow> = this@filterIsInstance.flow().filter { - it.canCast(type) - }.map { - it as NamedData - } - - override suspend fun getData(name: Name): Data? = this@filterIsInstance.getData(name)?.castOrNull(type) - - override val updates: Flow = this@filterIsInstance.updates.filter { - val datum = this@filterIsInstance.getData(it) - datum?.canCast(type) ?: false - } - -} - -public inline fun DataSet<*>.filterIsInstance(): DataSet = filterIsInstance(R::class) \ No newline at end of file diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/select.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/select.kt new file mode 100644 index 00000000..5b30a0d3 --- /dev/null +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/select.kt @@ -0,0 +1,49 @@ +package hep.dataforge.data + +import hep.dataforge.meta.DFExperimental +import hep.dataforge.names.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlin.reflect.KClass + + +/** + * Select all data matching given type and filters. Does not modify paths + */ +@OptIn(DFExperimental::class) +public fun DataSet<*>.select( + type: KClass, + namePattern: Name? = null, +): ActiveDataSet = object : ActiveDataSet { + override val dataType: KClass = type + + @Suppress("UNCHECKED_CAST") + override fun flow(): Flow> = this@select.flow().filter { + it.canCast(type) && (namePattern == null || it.name.matches(namePattern)) + }.map { + it as NamedData + } + + override suspend fun getData(name: Name): Data? = this@select.getData(name)?.castOrNull(type) + + override val updates: Flow = this@select.updates.filter { + val datum = this@select.getData(it) + datum?.canCast(type) ?: false + } + +} + +/** + * Select a single datum of the appropriate type + */ +public inline fun DataSet<*>.select(namePattern: Name? = null): DataSet = + select(R::class, namePattern) + +public suspend fun DataSet<*>.selectOne(type: KClass, name: Name): NamedData? = + getData(name)?.castOrNull(type)?.named(name) + +public suspend inline fun DataSet<*>.selectOne(name: Name): NamedData? = selectOne(R::class, name) + +public suspend inline fun DataSet<*>.selectOne(name: String): NamedData? = + selectOne(R::class, name.toName()) \ No newline at end of file diff --git a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt index d1944785..50240b28 100644 --- a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/ActionsTest.kt @@ -4,9 +4,14 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import kotlin.test.assertEquals +/** + * Block the thread and get data content + */ +public fun Data.value(): T = runBlocking { await() } + class ActionsTest { val data: DataTree = runBlocking { - DataTree.static { + DataTree { repeat(10) { data(it.toString(), it) } diff --git a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt index 98feae57..4050e731 100644 --- a/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/hep/dataforge/data/DataTreeBuilderTest.kt @@ -10,20 +10,20 @@ import kotlin.test.assertEquals internal class DataTreeBuilderTest { @Test fun testDataUpdate() = runBlocking { - val updateData: DataTree = DataTree.static { + val updateData: DataTree = DataTree { "update" put { "a" put Data.static("a") "b" put Data.static("b") } } - val node = DataTree.static { - set("primary") { + val node = DataTree { + emit("primary") { data("a", "a") data("b", "b") } data("root", "root") - update(updateData) + populate(updateData) } @@ -34,13 +34,15 @@ internal class DataTreeBuilderTest { @Test fun testDynamicUpdates() = runBlocking { try { + lateinit var updateJob: Job supervisorScope { - val subNode = DataTree.active { - launch { + val subNode = ActiveDataTree { + updateJob = launch { repeat(10) { delay(10) data("value", it) } + delay(10) } } launch { @@ -48,7 +50,7 @@ internal class DataTreeBuilderTest { println(it) } } - val rootNode = DataTree.active { + val rootNode = ActiveDataTree { setAndObserve("sub".toName(), subNode) } @@ -57,7 +59,7 @@ internal class DataTreeBuilderTest { println(it) } } - delay(200) + updateJob.join() assertEquals(9, rootNode.getData("sub.value")?.value()) cancel() } diff --git a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/descriptors/ItemDescriptor.kt b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/descriptors/ItemDescriptor.kt index 7dd5f905..e03c832a 100644 --- a/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/descriptors/ItemDescriptor.kt +++ b/dataforge-meta/src/commonMain/kotlin/hep/dataforge/meta/descriptors/ItemDescriptor.kt @@ -192,13 +192,13 @@ public class NodeDescriptor(config: Config = Config()) : ItemDescriptor(config) internal val ITEM_KEY: Name = "item".asName() internal val IS_NODE_KEY: Name = "@isNode".asName() - public inline operator fun invoke(block: NodeDescriptor.() -> Unit): NodeDescriptor = - NodeDescriptor().apply(block) - //TODO infer descriptor from spec } } +public inline fun NodeDescriptor(block: NodeDescriptor.() -> Unit): NodeDescriptor = + NodeDescriptor().apply(block) + /** * Get a descriptor item associated with given name or null if item for given name not provided */ diff --git a/dataforge-meta/src/commonTest/kotlin/hep/dataforge/meta/MetaDelegateTest.kt b/dataforge-meta/src/commonTest/kotlin/hep/dataforge/meta/MetaDelegateTest.kt index a6b514d4..c460c602 100644 --- a/dataforge-meta/src/commonTest/kotlin/hep/dataforge/meta/MetaDelegateTest.kt +++ b/dataforge-meta/src/commonTest/kotlin/hep/dataforge/meta/MetaDelegateTest.kt @@ -38,7 +38,7 @@ class MetaDelegateTest { assertEquals("theString", testObject.myValue) assertEquals(TestEnum.NO, testObject.enumValue) assertEquals(2.2, testObject.safeValue) - assertEquals("ddd", testObject.inner?.innerValue) + assertEquals("ddd", testObject.inner.innerValue) } diff --git a/dataforge-meta/src/commonTest/kotlin/hep/dataforge/names/NameMatchTest.kt b/dataforge-meta/src/commonTest/kotlin/hep/dataforge/names/NameMatchTest.kt new file mode 100644 index 00000000..b91d94a6 --- /dev/null +++ b/dataforge-meta/src/commonTest/kotlin/hep/dataforge/names/NameMatchTest.kt @@ -0,0 +1,32 @@ +package hep.dataforge.names + +import hep.dataforge.meta.DFExperimental +import kotlin.test.Test +import kotlin.test.assertFails +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +@OptIn(DFExperimental::class) +class NameMatchTest { + @Test + fun matchWildCards() { + val theName = "a.b.c.d".toName() + assertTrue { theName.matches("a.b.**") } + assertTrue { theName.matches("a.*.c.**") } + assertTrue { theName.matches("**.d") } + assertTrue { theName.matches("**.b.**") } + assertTrue { theName.matches("a.*.*.d") } + assertFails { theName.matches("a.**.d") } + assertFalse { theName.matches("a.b.c.d.**") } + } + + @Test + fun matchPattern() { + val theName = "a[dd+2].b[13].c.d[\"d\"]".toName() + assertTrue { theName.matches("a[.*].b[.*].c[.*].d[.*]") } + assertTrue { theName.matches("a[.*].b[.*].c.d[.*]") } + assertFalse { theName.matches("a[.*].b[.*].*.d") } + assertTrue { theName.matches("""\\w[dd\\+2].b[.*].c[.*].d[.*]""") } + assertFalse { theName.matches("""\\s[dd\\+2].b[.*].c[.*].d[.*]""") } + } +} \ No newline at end of file diff --git a/dataforge-scripting/src/jvmMain/kotlin/hep/dataforge/scripting/Builders.kt b/dataforge-scripting/src/jvmMain/kotlin/hep/dataforge/scripting/Builders.kt index 1e325595..1842ba45 100644 --- a/dataforge-scripting/src/jvmMain/kotlin/hep/dataforge/scripting/Builders.kt +++ b/dataforge-scripting/src/jvmMain/kotlin/hep/dataforge/scripting/Builders.kt @@ -3,7 +3,6 @@ package hep.dataforge.scripting import hep.dataforge.context.Context import hep.dataforge.context.Global import hep.dataforge.context.logger -import hep.dataforge.workspace.SimpleWorkspaceBuilder import hep.dataforge.workspace.Workspace import hep.dataforge.workspace.WorkspaceBuilder import java.io.File @@ -17,7 +16,7 @@ import kotlin.script.experimental.jvmhost.BasicJvmScriptingHost public object Builders { private fun buildWorkspace(source: SourceCode, context: Context = Global): Workspace { - val builder = SimpleWorkspaceBuilder(context) + val builder = WorkspaceBuilder(context) val workspaceScriptConfiguration = ScriptCompilationConfiguration { // baseClass(Any::class) @@ -30,6 +29,7 @@ public object Builders { dependenciesFromCurrentContext(wholeClasspath = true) } hostConfiguration(defaultJvmScriptingHostConfiguration) + compilerOptions("-jvm-target", Runtime.version().feature().toString()) } val evaluationConfiguration = ScriptEvaluationConfiguration { diff --git a/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt b/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt index 7d96c168..cd0985a7 100644 --- a/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt +++ b/dataforge-scripting/src/jvmTest/kotlin/hep/dataforge/scripting/BuildersKtTest.kt @@ -3,8 +3,9 @@ package hep.dataforge.scripting import hep.dataforge.context.Global import hep.dataforge.meta.get import hep.dataforge.meta.int -import hep.dataforge.workspace.SimpleWorkspaceBuilder -import hep.dataforge.workspace.context +import hep.dataforge.workspace.WorkspaceBuilder + +import hep.dataforge.workspace.target import kotlin.test.Test import kotlin.test.assertEquals @@ -12,7 +13,7 @@ import kotlin.test.assertEquals class BuildersKtTest { @Test fun checkBuilder(){ - val workspace = SimpleWorkspaceBuilder(Global).apply { + val workspace = WorkspaceBuilder(Global).apply { println("I am working") context("test") diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/ContextGoalLogger.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/ContextGoalLogger.kt new file mode 100644 index 00000000..aae36ac6 --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/ContextGoalLogger.kt @@ -0,0 +1,17 @@ +package hep.dataforge.workspace + +import hep.dataforge.context.Context +import hep.dataforge.context.logger +import hep.dataforge.data.GoalLogger +import kotlinx.coroutines.launch + +public class ContextGoalLogger(public val context: Context) : GoalLogger { + override fun emit(vararg tags: String, message: suspend () -> String) { + context.launch { + val text = message() + context.logger.info { text } + } + } +} + +public val Workspace.goalLogger: GoalLogger get() = ContextGoalLogger(context) \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/DataPlacement.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/DataPlacement.kt deleted file mode 100644 index 04e3ecb4..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/DataPlacement.kt +++ /dev/null @@ -1,104 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.data.* -import hep.dataforge.meta.* -import hep.dataforge.names.Name -import hep.dataforge.names.plus -import hep.dataforge.names.removeHeadOrNull -import hep.dataforge.names.toName -import kotlinx.coroutines.flow.* -import kotlin.reflect.KClass - -public interface DataPlacement: MetaRepr { - /** - * Select a placement for a data with given [name] and [meta]. The result is null if data should be ignored. - */ - public fun place(name: Name, meta: Meta, dataType: KClass<*>): Name? - - public companion object { - public val ALL: DataPlacement = object : DataPlacement { - override fun place(name: Name, meta: Meta, dataType: KClass<*>): Name = name - - override fun toMeta(): Meta = Meta{"from" put "*"} - } - - public fun into(target: Name): DataPlacement = DataPlacementScheme{ - to = target.toString() - } - - public fun into(target: String): DataPlacement = DataPlacementScheme{ - to = target - } - - } -} - -public fun DataPlacement.place(datum: NamedData<*>): Name? = place(datum.name, datum.meta, datum.type) - -private class ArrangedDataSet( - private val source: DataSet, - private val placement: DataPlacement, -) : DataSet { - override val dataType: KClass get() = source.dataType - - override fun flow(): Flow> = source.flow().mapNotNull { - val newName = placement.place(it) ?: return@mapNotNull null - it.data.named(newName) - } - - override suspend fun getData(name: Name): Data? = flow().filter { it.name == name }.firstOrNull() - - override val updates: Flow = source.updates.flatMapConcat { - flowChildren(it).mapNotNull(placement::place) - } -} - -public class DataPlacementScheme : Scheme(), DataPlacement { - /** - * A source node for the filter - */ - public var from: String? by string() - - /** - * A target placement for the filtered node - */ - public var to: String? by string() - - /** - * A regular expression pattern for the filter - */ - public var pattern: String? by string() -// val prefix by string() -// val suffix by string() - - override fun place(name: Name, meta: Meta, dataType: KClass<*>): Name? { - val fromName = from?.toName() ?: Name.EMPTY - val nameReminder = name.removeHeadOrNull(fromName) ?: return null - val regex = pattern?.toRegex() - return if (regex == null || nameReminder.toString().matches(regex)) { - (to?.toName() ?: Name.EMPTY) + nameReminder - } else { - null - } - } - - public companion object : SchemeSpec(::DataPlacementScheme) -} - - -/** - * Apply data node rearrangement - */ -public fun DataSet.rearrange(placement: DataPlacement): DataSet = ArrangedDataSet(this, placement) - -///** -// * Mask data using [DataPlacementScheme] specification -// */ -//public fun DataSet.rearrange(placement: Meta): DataSet = -// rearrange(DataPlacementScheme.read(placement)) - -/** - * Mask data using [DataPlacementScheme] builder - */ -public fun DataSet.rearrange(placementBuilder: DataPlacementScheme.() -> Unit): DataSet = - rearrange(DataPlacementScheme(placementBuilder)) \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt deleted file mode 100644 index 3c342e90..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Dependency.kt +++ /dev/null @@ -1,73 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.data.DataSet -import hep.dataforge.meta.Meta -import hep.dataforge.meta.MetaRepr -import hep.dataforge.meta.toMutableMeta -import hep.dataforge.names.Name -import hep.dataforge.names.asName -import hep.dataforge.names.plus - -/** - * A dependency of the task which allows to lazily create a data tree for single dependency - */ -public sealed class Dependency : MetaRepr { - public abstract suspend fun apply(workspace: Workspace): DataSet -} - -public class DataDependency(private val placement: DataPlacement = DataPlacement.ALL) : Dependency() { - override suspend fun apply(workspace: Workspace): DataSet = workspace.data.rearrange(placement) - - override fun toMeta(): Meta = placement.toMeta() -} - -public abstract class TaskDependency( - public val meta: Meta, - protected val placement: DataPlacement, -) : Dependency() { - public abstract fun resolveTask(workspace: Workspace): WorkStage - - /** - * A name of the dependency for logging and serialization - */ - public abstract val name: Name - - override suspend fun apply(workspace: Workspace): DataSet { - val task = resolveTask(workspace) - val result = workspace.run(task, meta) - return result.rearrange(placement) - } -} - -public class ExternalTaskDependency( - public val task: WorkStage, - meta: Meta, - placement: DataPlacement, -) : TaskDependency(meta, placement) { - override fun resolveTask(workspace: Workspace): WorkStage = task - - override val name: Name get() = EXTERNAL_TASK_NAME + task.name - - override fun toMeta(): Meta = placement.toMeta().toMutableMeta().apply { - "name" put name.toString() - "task" put task.toString() - "meta" put meta - } - - public companion object { - public val EXTERNAL_TASK_NAME: Name = "@external".asName() - } -} - -public class WorkspaceTaskDependency( - override val name: Name, - meta: Meta, - placement: DataPlacement, -) : TaskDependency(meta, placement) { - override fun resolveTask(workspace: Workspace): WorkStage<*> = workspace.stages[name] - ?: error("Task with name $name is not found in the workspace") - - override fun toMeta(): Meta { - TODO("Not yet implemented") - } -} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt index 442e52bf..ab8d0465 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/SimpleWorkspace.kt @@ -12,16 +12,14 @@ import hep.dataforge.names.Name */ public class SimpleWorkspace( override val context: Context, - override val data: DataSet, + data: DataSet<*>, override val targets: Map, - stages: Map> + private val externalTasks: Map>, ) : Workspace { - override val stages: Map> by lazy { - context.gather>(WorkStage.TYPE) + stages - } + override val data: TaskResult<*> = internalize(data, Name.EMPTY, Meta.EMPTY) - public companion object { + override val tasks: Map> + get() = context.gather>(Task.TYPE) + externalTasks - } } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt deleted file mode 100644 index c0bf3784..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageDataSet.kt +++ /dev/null @@ -1,46 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.data.DataSet -import hep.dataforge.meta.Meta -import hep.dataforge.names.Name -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map - -/** - * A result of a [WorkStage] - */ -public interface StageDataSet : DataSet { - /** - * The [Workspace] this [DataSet] belongs to - */ - public val workspace: Workspace - - /** - * The [Name] of the stage that produced this [DataSet] - */ - public val stageName: Name - - /** - * The configuration of the stage that produced this [DataSet] - */ - public val stageMeta: Meta - - override fun flow(): Flow> - override suspend fun getData(name: Name): StageData? -} - -private class StageDataSetImpl( - override val workspace: Workspace, - val dataSet: DataSet, - override val stageName: Name, - override val stageMeta: Meta, -) : StageDataSet, DataSet by dataSet { - - override fun flow(): Flow> = dataSet.flow().map { - workspace.internalize(it, it.name, stageName, stageMeta) - } - - override suspend fun getData(name: Name): StageData? = dataSet.getData(name)?.let { - workspace.internalize(it, name, stageName, stageMeta) - } -} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt new file mode 100644 index 00000000..f2808cd5 --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Task.kt @@ -0,0 +1,74 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.DataSetBuilder +import hep.dataforge.data.DataTree +import hep.dataforge.data.GoalExecutionRestriction +import hep.dataforge.meta.Meta +import hep.dataforge.meta.descriptors.Described +import hep.dataforge.meta.descriptors.ItemDescriptor +import hep.dataforge.misc.Type +import hep.dataforge.names.Name +import hep.dataforge.workspace.Task.Companion.TYPE +import kotlinx.coroutines.withContext +import kotlin.reflect.KClass + +@Type(TYPE) +public interface Task : Described { + + public val type: KClass + + /** + * Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model + * and a handler for actual result + * + * @param workspace a workspace to run task in + * @param taskName the name of the task in this workspace + * @param taskMeta configuration for current stage computation + */ + public suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult + + public companion object { + public const val TYPE: String = "workspace.stage" + } +} + +public class TaskResultBuilder( + public val workspace: Workspace, + public val taskName: Name, + public val taskMeta: Meta, + private val dataSync: DataSetBuilder, +) : DataSetBuilder by dataSync + +/** + * Create a [Task] that composes a result using [builder]. Only data from the workspace could be used. + * Data dependency cycles are not allowed. + */ +@Suppress("FunctionName") +public fun Task( + resultType: KClass, + descriptor: ItemDescriptor? = null, + builder: suspend TaskResultBuilder.() -> Unit, +): Task = object : Task { + + override val type: KClass = resultType + + override val descriptor: ItemDescriptor? = descriptor + + override suspend fun execute( + workspace: Workspace, + taskName: Name, + taskMeta: Meta, + ): TaskResult = withContext(GoalExecutionRestriction() + workspace.goalLogger) { + //TODO use safe builder and check for external data on add and detects cycles + val dataset = DataTree(type) { + TaskResultBuilder(workspace,taskName, taskMeta, this).apply { builder() } + } + workspace.internalize(dataset, taskName, taskMeta) + } +} + +@Suppress("FunctionName") +public inline fun Task( + descriptor: ItemDescriptor? = null, + noinline builder: suspend TaskResultBuilder.() -> Unit, +): Task = Task(T::class, descriptor, builder) \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskData.kt similarity index 58% rename from dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt rename to dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskData.kt index f17cb09a..f317b00a 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/StageData.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskData.kt @@ -8,7 +8,7 @@ import hep.dataforge.names.Name /** * A [Workspace]-locked [NamedData], that serves as a computation model. */ -public interface StageData : NamedData { +public interface TaskData : NamedData { /** * The [Workspace] this data belongs to */ @@ -17,31 +17,31 @@ public interface StageData : NamedData { /** * The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used. */ - public val stage: Name + public val task: Name /** * Stage configuration used to produce this data. */ - public val stageMeta: Meta + public val taskMeta: Meta /** * Dependencies that allow to compute transitive dependencies as well. */ - override val dependencies: Collection> +// override val dependencies: Collection> } -private class StageDataImpl( +private class TaskDataImpl( override val workspace: Workspace, override val data: Data, override val name: Name, - override val stage: Name, - override val stageMeta: Meta, -) : StageData, Data by data { - override val dependencies: Collection> = data.dependencies.map { - it as? StageData<*> ?: error("StageData can't depend on external data") - } + override val task: Name, + override val taskMeta: Meta, +) : TaskData, Data by data { +// override val dependencies: Collection> = data.dependencies.map { +// it as? TaskData<*> ?: error("TaskData can't depend on external data") +// } } -internal fun Workspace.internalize(data: Data, name: Name, stage: Name, stageMeta: Meta): StageData = - StageDataImpl(this, data, name, stage, stageMeta) +internal fun Workspace.internalize(data: Data, name: Name, stage: Name, stageMeta: Meta): TaskData = + TaskDataImpl(this, data, name, stage, stageMeta) diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskResult.kt new file mode 100644 index 00000000..c8adcd9a --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/TaskResult.kt @@ -0,0 +1,49 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.DataSet +import hep.dataforge.meta.Meta +import hep.dataforge.names.Name +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map + +/** + * A result of a [Task] + */ +public interface TaskResult : DataSet { + /** + * The [Workspace] this [DataSet] belongs to + */ + public val workspace: Workspace + + /** + * The [Name] of the stage that produced this [DataSet] + */ + public val taskName: Name + + /** + * The configuration of the stage that produced this [DataSet] + */ + public val taskMeta: Meta + + override fun flow(): Flow> + override suspend fun getData(name: Name): TaskData? +} + +private class TaskResultImpl( + override val workspace: Workspace, + val dataSet: DataSet, + override val taskName: Name, + override val taskMeta: Meta, +) : TaskResult, DataSet by dataSet { + + override fun flow(): Flow> = dataSet.flow().map { + workspace.internalize(it, it.name, taskName, taskMeta) + } + + override suspend fun getData(name: Name): TaskData? = dataSet.getData(name)?.let { + workspace.internalize(it, name, taskName, taskMeta) + } +} + +internal fun Workspace.internalize(dataSet: DataSet, stage: Name, stageMeta: Meta): TaskResult = + TaskResultImpl(this, dataSet, stage, stageMeta) \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt deleted file mode 100644 index 663e7d4b..00000000 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkStage.kt +++ /dev/null @@ -1,23 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.meta.Meta -import hep.dataforge.meta.descriptors.Described -import hep.dataforge.misc.Type -import hep.dataforge.workspace.WorkStage.Companion.TYPE - -@Type(TYPE) -public interface WorkStage : Described { - - /** - * Compute a [StageDataSet] using given meta. In general, the result is lazy and represents both computation model - * and a handler for actual result - * - * @param workspace a workspace to run task model in - * @param meta configuration for current stage computation - */ - public suspend fun execute(workspace: Workspace, meta: Meta): StageDataSet - - public companion object { - public const val TYPE: String = "workspace.stage" - } -} \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt index 738e574b..af5ee317 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/Workspace.kt @@ -2,6 +2,7 @@ package hep.dataforge.workspace import hep.dataforge.context.ContextAware import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder import hep.dataforge.misc.Type import hep.dataforge.names.Name import hep.dataforge.names.toName @@ -13,7 +14,7 @@ public interface Workspace : ContextAware, Provider { /** * The whole data node for current workspace */ - public val data: StageDataSet<*> + public val data: TaskResult<*> /** * All targets associated with the workspace @@ -23,44 +24,36 @@ public interface Workspace : ContextAware, Provider { /** * All stages associated with the workspace */ - public val stages: Map> + public val tasks: Map> override fun content(target: String): Map { return when (target) { "target", Meta.TYPE -> targets.mapKeys { it.key.toName() } - WorkStage.TYPE -> stages + Task.TYPE -> tasks //Data.TYPE -> data.flow().toMap() else -> emptyMap() } } + public suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> { + if (taskName == Name.EMPTY) return data + val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace") + return task.execute(this, taskName, taskMeta) + } + + public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): TaskData<*>? = + produce(taskName, taskMeta).getData(name) + public companion object { public const val TYPE: String = "workspace" } } -public suspend fun Workspace.stage(taskName: Name, taskMeta: Meta): StageDataSet<*> { - val task = stages[taskName] ?: error("Task with name $taskName not found in the workspace") - return task.execute(this, taskMeta) -} +public suspend fun Workspace.produce(task: String, target: String): TaskResult<*> = + produce(task.toName(), targets[target] ?: error("Target with key $target not found in $this")) -public suspend fun Workspace.getData(taskName: Name, taskMeta: Meta, name: Name): StageData<*>? = - stage(taskName, taskMeta).getData(name) +public suspend fun Workspace.produce(task: String, meta: Meta): TaskResult<*> = + produce(task.toName(), meta) -//public suspend fun Workspace.execute(task: WorkStage<*>, target: String): DataSet { -// val meta = targets[target] ?: error("A target with name $target not found in $this") -// return run(task, meta) -//} -// -// -//public suspend fun Workspace.execute(task: String, target: String): DataSet = -// stages[task.toName()]?.let { execute(it, target) } ?: error("Task with name $task not found") -// -//public suspend fun Workspace.execute(task: String, meta: Meta): DataSet = -// stages[task.toName()]?.let { run(it, meta) } ?: error("Task with name $task not found") -// -//public suspend fun Workspace.execute(task: String, block: MetaBuilder.() -> Unit = {}): DataSet = -// execute(task, Meta(block)) -// -//public suspend fun Workspace.execute(task: WorkStage, metaBuilder: MetaBuilder.() -> Unit = {}): DataSet = -// run(task, Meta(metaBuilder)) \ No newline at end of file +public suspend fun Workspace.produce(task: String, block: MetaBuilder.() -> Unit = {}): TaskResult<*> = + produce(task, Meta(block)) diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt new file mode 100644 index 00000000..aa3519a3 --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt @@ -0,0 +1,103 @@ +package hep.dataforge.workspace + +import hep.dataforge.context.Context +import hep.dataforge.context.ContextBuilder +import hep.dataforge.context.Global +import hep.dataforge.data.ActiveDataTree +import hep.dataforge.data.DataSet +import hep.dataforge.data.DataSetBuilder +import hep.dataforge.data.DataTree +import hep.dataforge.meta.DFBuilder +import hep.dataforge.meta.DFExperimental +import hep.dataforge.meta.Meta +import hep.dataforge.meta.MetaBuilder +import hep.dataforge.meta.descriptors.NodeDescriptor +import hep.dataforge.names.Name +import hep.dataforge.names.toName +import kotlin.properties.PropertyDelegateProvider +import kotlin.properties.ReadOnlyProperty +import kotlin.reflect.KClass + +public data class TaskReference(public val taskName: Name, public val task: Task) + +public interface TaskContainer { + public fun registerTask(taskName: Name, task: Task<*>) +} + +public fun TaskContainer.registerTask( + resultType: KClass, + name: String, + descriptorBuilder: NodeDescriptor.() -> Unit = {}, + builder: suspend TaskResultBuilder.() -> Unit, +): Unit = registerTask(name.toName(), Task(resultType, NodeDescriptor(descriptorBuilder), builder)) + +public inline fun TaskContainer.registerTask( + name: String, + noinline descriptorBuilder: NodeDescriptor.() -> Unit = {}, + noinline builder: suspend TaskResultBuilder.() -> Unit, +): Unit = registerTask(T::class, name, descriptorBuilder, builder) + +public inline fun TaskContainer.task( + noinline descriptorBuilder: NodeDescriptor.() -> Unit = {}, + noinline builder: suspend TaskResultBuilder.() -> Unit, +): PropertyDelegateProvider>> = PropertyDelegateProvider { _, property -> + val taskName = property.name.toName() + val task = Task(T::class, NodeDescriptor(descriptorBuilder), builder) + registerTask(taskName, task) + ReadOnlyProperty { _, _ -> TaskReference(taskName, task) } +} + + +public class WorkspaceBuilder(private val parentContext: Context = Global) : TaskContainer { + private var context: Context? = null + private var data: DataSet<*>? = null + private val targets: HashMap = HashMap() + private val tasks = HashMap>() + + /** + * Define a context for the workspace + */ + public fun context(name: String = "workspace", block: ContextBuilder.() -> Unit = {}) { + this.context = ContextBuilder(parentContext, name).apply(block).build() + } + + /** + * Define intrinsic data for the workspace + */ + public suspend fun buildData(builder: suspend DataSetBuilder.() -> Unit) { + data = DataTree(builder) + } + + @DFExperimental + public suspend fun buildActiveData(builder: suspend ActiveDataTree.() -> Unit) { + data = ActiveDataTree(builder) + } + + /** + * Define a new target + */ + public fun target(name: String, meta: Meta?) { + if (meta == null) { + targets.remove(name) + } else { + targets[name] = meta + } + } + + override fun registerTask(taskName: Name, task: Task<*>) { + tasks[taskName] = task + } + + public fun build(): Workspace = SimpleWorkspace(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks) +} + +/** + * Define a new target with a builder + */ +public inline fun WorkspaceBuilder.target(name: String, metaBuilder: MetaBuilder.() -> Unit): Unit = + target(name, Meta(metaBuilder)) + +@DFBuilder +public fun Workspace(parentContext: Context = Global, builder: WorkspaceBuilder.() -> Unit): Workspace { + return WorkspaceBuilder(parentContext).apply(builder).build() +} \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt deleted file mode 100644 index 4a8e0acd..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt +++ /dev/null @@ -1,208 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.context.Context -import hep.dataforge.data.* -import hep.dataforge.meta.* -import hep.dataforge.meta.descriptors.NodeDescriptor -import hep.dataforge.names.Name -import hep.dataforge.workspace.old.GenericTask -import hep.dataforge.workspace.old.TaskModel -import hep.dataforge.workspace.old.TaskModelBuilder -import hep.dataforge.workspace.old.data -import kotlin.reflect.KClass - -private typealias DataTransformation = suspend (context: Context, model: TaskModel, data: DataSet) -> DataSet - -@DFBuilder -@DFExperimental -public class TaskBuilder(private val name: Name, public val type: KClass) { - private var modelTransform: TaskModelBuilder.(Meta) -> Unit = { - data() - } - - // private val additionalDependencies = HashSet() - private var descriptor: NodeDescriptor? = null - private val dataTransforms: MutableList> = ArrayList() - -// override fun add(dependency: Dependency) { -// additionalDependencies.add(dependency) -// } - - public fun model(modelTransform: TaskModelBuilder.(Meta) -> Unit) { - this.modelTransform = modelTransform - } - - - public class TaskEnv( - public val name: Name, - public val meta: Meta, - public val context: Context, - public val data: DataSet, - ) - - /** - * Add a transformation on untyped data - * @param from the prefix for root node in data - * @param to the prefix for the target node. - */ - @JvmName("rawTransform") - public fun transform( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - block: TaskEnv.(DataSet<*>) -> DataSet, - ) { - dataTransforms += { context, model, data -> - val env = TaskEnv(Name.EMPTY, model.meta, context, data) - val startData = data.branch(from) - env.block(startData).withNamePrefix(to) - } - } - - public fun transform( - inputType: KClass, - block: suspend TaskEnv.(DataSet) -> DataSet, - ) { - dataTransforms += { context, model, data -> - val env = TaskEnv(Name.EMPTY, model.meta, context, data) - env.block(data.filterIsInstance(inputType)) - } - } - - public inline fun transform( - noinline block: suspend TaskEnv.(DataSet) -> DataSet, - ): Unit = transform(T::class, block) - - - /** - * Perform given action on data elements in `from` node in input and put the result to `to` node - */ - public inline fun action( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - crossinline block: TaskEnv.() -> Action, - ) { - transform { data: DataSet -> - block().execute(data, meta, context) - } - } - - - /** - * A customized map action with ability to change meta and name - */ - public inline fun mapAction( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - crossinline block: MapActionBuilder.(TaskEnv) -> Unit, - ) { - action(from, to) { - val env = this - MapAction(type) { - block(env) - } - } - } - - /** - * A simple map action without changing meta or name - */ - public inline fun map( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - crossinline block: suspend TaskEnv.(T) -> R, - ) { - action(from, to) { - MapAction(type) { - //TODO automatically append task meta - result = { data -> - block(data) - } - } - } - } - - /** - * Join elements in gathered data by multiple groups - */ - public inline fun reduceByGroup( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - crossinline block: ReduceGroupBuilder.(TaskEnv) -> Unit, //TODO needs KEEP-176 - ) { - action(from, to) { - val env = this - ReduceAction(inputType = T::class, outputType = type) { block(env) } - } - } - - /** - * Join all elemlents in gathered data matching input type - */ - public inline fun reduce( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - crossinline block: suspend TaskEnv.(Map) -> R, - ) { - action(from, to) { - ReduceAction(inputType = T::class, outputType = type) { - result( - actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous" - ) { data -> - block(data) - } - } - } - } - - /** - * Split each element in gathered data into fixed number of fragments - */ - public inline fun split( - from: Name = Name.EMPTY, - to: Name = Name.EMPTY, - crossinline block: SplitBuilder.(TaskEnv) -> Unit, //TODO needs KEEP-176 - ) { - action(from, to) { - val env = this - SplitAction(type) { block(this, env) } - } - } - - /** - * Use DSL to create a descriptor for this task - */ - public fun description(transform: NodeDescriptor.() -> Unit) { - this.descriptor = NodeDescriptor().apply(transform) - } - - internal fun build(): GenericTask { - return GenericTask( - name, - type, - descriptor ?: NodeDescriptor(), - modelTransform - ) { - val workspace = this - { dataSet -> - val model = this - if (dataTransforms.isEmpty()) { - //return data node as is - logger.warn { "No transformation present, returning input data" } - dataSet.castOrNull(type) ?: error("$type expected, but $type received") - } else { - DataTree.active(type, workspace.context){ - dataTransforms.forEach { transformation -> - val res = transformation(workspace.context, model, dataSet) - update(res) - } - } - } - } - } - } -} - -@DFExperimental -public suspend inline fun TaskBuilder.TaskEnv.dataTree( - crossinline block: suspend ActiveDataTree.() -> Unit, -): DataTree = DataTree.active(context, block) \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt deleted file mode 100644 index e92950ff..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspaceBuilder.kt +++ /dev/null @@ -1,87 +0,0 @@ -package hep.dataforge.workspace - -import hep.dataforge.context.Context -import hep.dataforge.context.ContextBuilder -import hep.dataforge.context.Global -import hep.dataforge.data.ActiveDataTree -import hep.dataforge.meta.* -import hep.dataforge.names.toName -import kotlin.reflect.KClass - -@DFBuilder -public interface WorkspaceBuilder { - public val parentContext: Context - public var context: Context - public var data: ActiveDataTree - public var tasks: MutableSet> - public var targets: MutableMap - - public fun build(): Workspace -} - -/** - * Set the context for future workspcace - */ -public fun WorkspaceBuilder.context(name: String = "WORKSPACE", block: ContextBuilder.() -> Unit = {}) { - context = ContextBuilder(parentContext, name).apply(block).build() -} - -public inline fun WorkspaceBuilder.data( - block: ActiveDataTree.() -> Unit, -): Unit{ - data.apply(block) -} - - -public fun WorkspaceBuilder.target(name: String, block: MetaBuilder.() -> Unit) { - targets[name] = Meta(block).seal() -} - -class WorkspaceTask(val workspace: Workspace, val name: String) - -/** - * Use existing target as a base updating it with the block - */ -public fun WorkspaceBuilder.target(name: String, base: String, block: MetaBuilder.() -> Unit) { - val parentTarget = targets[base] ?: error("Base target with name $base not found") - targets[name] = parentTarget.toMutableMeta() - .apply { "@baseTarget" put base } - .apply(block) - .seal() -} - -public fun WorkspaceBuilder.task( - name: String, - type: KClass, - builder: TaskBuilder.() -> Unit, -): WorkspaceTask = TaskBuilder(name.toName(), type).apply(builder).build().also { tasks.add(it) } - -public inline fun WorkspaceBuilder.task( - name: String, - noinline builder: TaskBuilder.() -> Unit, -): WorkStage = task(name, T::class, builder) - -@JvmName("rawTask") -public fun WorkspaceBuilder.task( - name: String, - builder: TaskBuilder.() -> Unit, -): WorkStage = task(name, Any::class, builder) - -/** - * A builder for a simple workspace - */ -public class SimpleWorkspaceBuilder(override val parentContext: Context) : WorkspaceBuilder { - override var context: Context = parentContext - override var data: ActiveDataTree = ActiveDataTree(Any::class, context) - override var tasks: MutableSet> = HashSet() - override var targets: MutableMap = HashMap() - - override fun build(): SimpleWorkspace { - return SimpleWorkspace(context, data, targets, tasks) - } -} - -public fun Workspace( - parent: Context = Global, - block: SimpleWorkspaceBuilder.() -> Unit, -): Workspace = SimpleWorkspaceBuilder(parent).apply(block).build() \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt index 0ccaccf3..b30d141b 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/WorkspacePlugin.kt @@ -2,41 +2,21 @@ package hep.dataforge.workspace import hep.dataforge.context.AbstractPlugin import hep.dataforge.names.Name -import hep.dataforge.names.toName -import hep.dataforge.workspace.old.GenericTask -import kotlin.reflect.KClass /** * An abstract plugin with some additional boilerplate to effectively work with workspace context */ -public abstract class WorkspacePlugin : AbstractPlugin() { - private val _tasks = HashSet>() - public val tasks: Collection> get() = _tasks +public abstract class WorkspacePlugin : AbstractPlugin(), TaskContainer { + private val tasks = HashMap>() override fun content(target: String): Map { return when (target) { - WorkStage.TYPE -> tasks.toMap() + Task.TYPE -> tasks else -> emptyMap() } } - public fun task(task: WorkStage<*>){ - _tasks.add(task) + override fun registerTask(taskName: Name, task: Task<*>) { + tasks[taskName] = task } - - public fun task( - name: String, - type: KClass, - builder: TaskBuilder.() -> Unit - ): GenericTask = TaskBuilder(name.toName(), type).apply(builder).build().also { - _tasks.add(it) - } - - public inline fun task( - name: String, - noinline builder: TaskBuilder.() -> Unit - ): GenericTask = task(name, T::class, builder) - -// -////TODO add delegates to build gradle-like tasks } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt index d1601c91..29ff0c62 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt @@ -82,7 +82,7 @@ public suspend fun DataSetBuilder.file( val data = readDataFile(path, formatResolver) val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: path.fileName.toString().replace(".df", "") - set(name, data) + emit(name, data) } } else { //otherwise, read as directory @@ -90,7 +90,7 @@ public suspend fun DataSetBuilder.file( val data = readDataDirectory(path, formatResolver) val name = data.getMeta()[Envelope.ENVELOPE_NAME_KEY].string ?: path.fileName.toString().replace(".df", "") - set(name, data) + emit(name, data) } } } @@ -99,7 +99,7 @@ public suspend fun DataSetBuilder.file( * Read the directory as a data node. If [path] is a zip archive, read it as directory */ @DFExperimental -public fun IOPlugin.readDataDirectory( +public suspend fun IOPlugin.readDataDirectory( path: Path, formatResolver: FileFormatResolver, ): DataTree { @@ -110,7 +110,7 @@ public fun IOPlugin.readDataDirectory( return readDataDirectory(fs.rootDirectories.first(), formatResolver) } if (!Files.isDirectory(path)) error("Provided path $path is not a directory") - return DataTree.static(formatResolver.kClass) { + return DataTree(formatResolver.kClass) { Files.list(path).toList().forEach { path -> val fileName = path.fileName.toString() if (fileName.startsWith(IOPlugin.META_FILE_NAME)) { @@ -125,7 +125,7 @@ public fun IOPlugin.readDataDirectory( } @DFExperimental -public inline fun IOPlugin.readDataDirectory(path: Path): DataTree = +public suspend inline fun IOPlugin.readDataDirectory(path: Path): DataTree = readDataDirectory(path, formatResolver()) /** diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/taskBuilders.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/taskBuilders.kt new file mode 100644 index 00000000..fce1a372 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/taskBuilders.kt @@ -0,0 +1,24 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.DataSet +import hep.dataforge.data.select +import hep.dataforge.meta.Meta +import hep.dataforge.names.Name + +public suspend inline fun TaskResultBuilder.from( + task: Name, + taskMeta: Meta = Meta.EMPTY, +): DataSet = workspace.produce(task, taskMeta).select() + + +@Suppress("UNCHECKED_CAST") +public suspend fun TaskResultBuilder<*>.from( + reference: TaskReference, + taskMeta: Meta = Meta.EMPTY, +): DataSet { + if (workspace.tasks[reference.taskName] == reference.task) { + return workspace.produce(reference.taskName, taskMeta) as TaskResult + } else { + throw error("Task ${reference.taskName} does not belong to the workspace") + } +} diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/workspaceExtensions.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/workspaceExtensions.kt new file mode 100644 index 00000000..00f213f5 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/workspaceExtensions.kt @@ -0,0 +1,8 @@ +package hep.dataforge.workspace + +import hep.dataforge.data.DataSetBuilder +import kotlinx.coroutines.runBlocking + +public fun WorkspaceBuilder.data(builder: suspend DataSetBuilder.() -> Unit): Unit = runBlocking { + buildData(builder) +} \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt index ba8db797..41b08d2a 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/DataPropagationTest.kt @@ -5,54 +5,28 @@ import hep.dataforge.context.PluginFactory import hep.dataforge.context.PluginTag import hep.dataforge.data.* import hep.dataforge.meta.Meta -import hep.dataforge.workspace.old.data -import kotlinx.coroutines.flow.firstOrNull -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.reduce +import hep.dataforge.names.toName +import kotlinx.coroutines.flow.single import kotlinx.coroutines.runBlocking import kotlin.reflect.KClass import kotlin.test.Test import kotlin.test.assertEquals -fun DataSet.first(): NamedData? = runBlocking { flow().firstOrNull() } - class DataPropagationTestPlugin : WorkspacePlugin() { override val tag: PluginTag = Companion.tag - val testAllData = task("allData", Int::class) { - model { - data() - } - transform { data -> - DataTree.active(context) { - val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } - data("result", result) - } + val allData by task { + val selectedData = workspace.data.select() + val result: Data = selectedData.flow().foldToData(0) { result, data -> + result + data.await() } + emit("result", result) } - val testSingleData = task("singleData", Int::class) { - model { - data(pattern = "myData\\[12\\]") - } - transform { data -> - DataTree.active(context) { - val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } - data("result", result) - } - } - } - - val testAllRegexData = task("allRegexData", Int::class) { - model { - data(pattern = "myData.*") - } - transform { data -> - DataTree.active(context) { - val result = data.flow().map { it.value() }.reduce { acc, pair -> acc + pair } - data("result", result) - } + val singleData by task { + workspace.data.select().getData("myData[12]".toName())?.let { + emit("result", it) } } @@ -72,9 +46,11 @@ class DataPropagationTest { context { plugin(DataPropagationTestPlugin) } - data { - repeat(100) { - data("myData[$it]", it) + runBlocking { + data { + repeat(100) { + data("myData[$it]", it) + } } } } @@ -82,24 +58,16 @@ class DataPropagationTest { @Test fun testAllData() { runBlocking { - val node = testWorkspace.execute("Test.allData") - assertEquals(4950, node.first()!!.value()) - } - } - - @Test - fun testAllRegexData() { - runBlocking { - val node = testWorkspace.execute("Test.allRegexData") - assertEquals(4950, node.first()!!.value()) + val node = testWorkspace.produce("Test.allData") + assertEquals(4950, node.flow().single().await()) } } @Test fun testSingleData() { runBlocking { - val node = testWorkspace.execute("Test.singleData") - assertEquals(12, node.first()!!.value()) + val node = testWorkspace.produce("Test.singleData") + assertEquals(12, node.flow().single().await()) } } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/FileDataTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/FileDataTest.kt index b5a7c445..ff22054d 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/FileDataTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/FileDataTest.kt @@ -20,15 +20,17 @@ import kotlin.test.assertEquals class FileDataTest { - val dataNode = DataTree.static { - set("dir") { - data("a", "Some string") { - "content" put "Some string" + val dataNode = runBlocking { + DataTree { + emit("dir") { + data("a", "Some string") { + "content" put "Some string" + } + } + data("b", "root data") + meta { + "content" put "This is root meta node" } - } - data("b", "root data") - meta { - "content" put "This is root meta node" } } @@ -50,10 +52,10 @@ class FileDataTest { } - object StringFormatResolver: FileFormatResolver{ + object StringFormatResolver : FileFormatResolver { override val type: KType = typeOf() - override fun invoke(path: Path, meta: Meta): IOFormat =StringIOFormat + override fun invoke(path: Path, meta: Meta): IOFormat = StringIOFormat } @@ -64,12 +66,10 @@ class FileDataTest { val dir = Files.createTempDirectory("df_data_node") runBlocking { writeDataDirectory(dir, dataNode, StringIOFormat) - } - println(dir.toUri().toString()) - val reconstructed = readDataDirectory(dir,StringFormatResolver) - runBlocking { + println(dir.toUri().toString()) + val reconstructed = readDataDirectory(dir, StringFormatResolver) assertEquals(dataNode.getData("dir.a")?.meta, reconstructed.getData("dir.a")?.meta) - assertEquals(dataNode.getData("b")?.value(), reconstructed.getData("b")?.value()) + assertEquals(dataNode.getData("b")?.await(), reconstructed.getData("b")?.await()) } } } @@ -82,12 +82,10 @@ class FileDataTest { val zip = Files.createTempFile("df_data_node", ".zip") runBlocking { writeZip(zip, dataNode, StringIOFormat) - } - println(zip.toUri().toString()) - val reconstructed = readDataDirectory(zip, StringFormatResolver) - runBlocking { + println(zip.toUri().toString()) + val reconstructed = readDataDirectory(zip, StringFormatResolver) assertEquals(dataNode.getData("dir.a")?.meta, reconstructed.getData("dir.a")?.meta) - assertEquals(dataNode.getData("b")?.value(), reconstructed.getData("b")?.value()) + assertEquals(dataNode.getData("b")?.await(), reconstructed.getData("b")?.await()) } } } diff --git a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt index 11562a31..2147294b 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/hep/dataforge/workspace/SimpleWorkspaceTest.kt @@ -4,9 +4,8 @@ import hep.dataforge.context.* import hep.dataforge.data.* import hep.dataforge.meta.* import hep.dataforge.names.plus -import hep.dataforge.workspace.old.data -import hep.dataforge.workspace.old.dependsOn -import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.single import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Timeout import kotlin.reflect.KClass @@ -14,6 +13,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue + /** * Make a fake-factory for a one single plugin. Useful for unique or test plugins */ @@ -24,8 +24,8 @@ public inline fun P.toFactory(): PluginFactory

= object override val type: KClass = P::class } -public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet = runBlocking{ - execute(task, block) +public fun Workspace.runBlocking(task: String, block: MetaBuilder.() -> Unit = {}): DataSet = runBlocking { + produce(task, block) } @@ -33,12 +33,18 @@ class SimpleWorkspaceTest { val testPlugin = object : WorkspacePlugin() { override val tag: PluginTag = PluginTag("test") - val contextTask = task("test", Any::class) { - map { - context.logger.info { "Test: $it" } - } + val test by task { + populate( + workspace.data.map { + it.also { + logger.info { "Test: $it" } + } + } + ) } } + + val testPluginFactory = testPlugin.toFactory() val workspace = Workspace { @@ -53,98 +59,82 @@ class SimpleWorkspaceTest { } } - val filterTask = task("filterOne") { - model { - data("myData\\[12\\]") - } - map { - it + val filterOne by task { + workspace.data.selectOne("myData[12]")?.let { source -> + emit(source.name, source.map { it }) } } - val square = task("square") { - map { data -> - if (meta["testFlag"].boolean == true) { + val square by task { + workspace.data.select().forEach { data -> + if (data.meta["testFlag"].boolean == true) { println("flag") } - context.logger.info { "Starting square on $data" } - data * data + val value = data.await() + workspace.logger.info { "Starting square on $value" } + emit(data.name, data.map { it * it }) } } - val linear = task("linear") { - map { data -> - context.logger.info { "Starting linear on $data" } - data * 2 + 1 + val linear by task { + workspace.data.select().forEach { data -> + workspace.logger.info { "Starting linear on $data" } + emit(data.name, data.data.map { it * 2 + 1 }) } } - val fullSquare = task("fullsquare") { - model { - val squareDep = dependsOn(square, placement = DataPlacement.into("square")) - val linearDep = dependsOn(linear, placement = DataPlacement.into("linear")) - } - transform { data -> - val squareNode = data.branch("square").filterIsInstance() //squareDep() - val linearNode = data.branch("linear").filterIsInstance() //linearDep() - dataTree { - squareNode.flow().collect { - val newData: Data = Data { - val squareValue = squareNode.getData(it.name)!!.value() - val linearValue = linearNode.getData(it.name)!!.value() - squareValue + linearValue - } - set(name, newData) - } + val fullSquare by task { + val squareData = from(square) + val linearData = from(linear) + squareData.forEach { data -> + val newData: Data = data.combine(linearData.getData(data.name)!!) { l, r -> + l + r } + emit(data.name, newData) } } - task("sum") { - model { - dependsOn(square) - } - reduce { data -> - context.logger.info { "Starting sum" } - data.values.sum() + val sum by task { + workspace.logger.info { "Starting sum" } + val res = from(square).foldToData(0) { l, r -> + l + r.await() } + emit("sum", res) } - val average = task("average") { - reduceByGroup { env -> - group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) { - result { data -> - env.context.logger.info { "Starting even" } - data.values.average() - } - } - group("odd", filter = { name, _ -> name.toString().toInt() % 2 == 1 }) { - result { data -> - env.context.logger.info { "Starting odd" } - data.values.average() - } - } + val averageByGroup by task { + val evenSum = workspace.data.filter { name, _ -> + name.toString().toInt() % 2 == 0 + }.select().foldToData(0) { l, r -> + l + r.await() } + + emit("even", evenSum) + val oddSum = workspace.data.filter { name, _ -> + name.toString().toInt() % 2 == 1 + }.select().foldToData(0) { l, r -> + l + r.await() + } + emit("odd", oddSum) } - task("delta") { - model { - dependsOn(average) - } - reduce { data -> - data["even"]!! - data["odd"]!! + val delta by task { + val averaged = from(averageByGroup) + val even = averaged.getData("event")!! + val odd = averaged.getData("odd")!! + val res = even.combine(odd) { l, r -> + l - r } + emit("res", res) } - val customPipeTask = task("custom") { - mapAction { - meta = meta.toMutableMeta().apply { + val customPipe by task { + workspace.data.select().forEach { data -> + val meta = data.meta.toMutableMeta().apply { "newValue" put 22 } - name += "new" - result { - meta["value"].int ?: 0 + it - } + emit(data.name + "new", data.map { (data.meta["value"].int ?: 0) + it }) + } } @@ -154,21 +144,25 @@ class SimpleWorkspaceTest { @Test @Timeout(1) fun testWorkspace() { - val node = workspace.runBlocking("sum") - val res = node.first() - assertEquals(328350, res?.value()) + runBlocking { + val node = workspace.runBlocking("sum") + val res = node.flow().single() + assertEquals(328350, res.await()) + } } @Test @Timeout(1) fun testMetaPropagation() { - val node = workspace.runBlocking("sum") { "testFlag" put true } - val res = node.first()?.value() + runBlocking { + val node = workspace.produce("sum") { "testFlag" put true } + val res = node.flow().single().await() + } } @Test fun testPluginTask() { - val tasks = workspace.stages + val tasks = workspace.tasks assertTrue { tasks["test.test"] != null } //val node = workspace.run("test.test", "empty") } @@ -176,16 +170,16 @@ class SimpleWorkspaceTest { @Test fun testFullSquare() { runBlocking { - val node = workspace.execute("fullsquare") + val node = workspace.produce("fullSquare") println(node.toMeta()) } } @Test fun testFilter() { - val node = workspace.runBlocking("filterOne") runBlocking { - assertEquals(12, node.first()?.value()) + val node = workspace.produce("filterOne") + assertEquals(12, node.flow().first().await()) } } } \ No newline at end of file