diff --git a/build.gradle.kts b/build.gradle.kts index ecaee7a6..758279ab 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -3,7 +3,7 @@ plugins { id("scientifik.publish") version "0.1.4" apply false } -val dataforgeVersion by extra("0.1.3-dev-10") +val dataforgeVersion by extra("0.1.3-dev-11") val bintrayRepo by extra("dataforge") val githubProject by extra("dataforge-core") diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt index 228522dc..cf030c75 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Action.kt @@ -1,7 +1,6 @@ package hep.dataforge.data import hep.dataforge.meta.Meta -import hep.dataforge.names.Name /** * A simple data transformation on a data node @@ -34,19 +33,3 @@ infix fun Action.then(action: Action): A } } - -///** -// * An action that performs the same transformation on each of input data nodes. Null results are ignored. -// * The transformation is non-suspending because it is lazy. -// */ -//class PipeAction(val transform: (Name, Data, Meta) -> Data?) : Action { -// override fun invoke(node: DataNode, meta: Meta): DataNode = DataNode.build { -// node.data().forEach { (name, data) -> -// val res = transform(name, data, meta) -// if (res != null) { -// set(name, res) -// } -// } -// } -//} - diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt index 0a78fa7e..636d4a87 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt @@ -6,9 +6,8 @@ import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.builder import hep.dataforge.names.Name import hep.dataforge.names.toName +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass @@ -79,7 +78,7 @@ class JoinGroupBuilder(val actionMeta: Meta) { class JoinAction( val inputType: KClass, val outputType: KClass, - val context: CoroutineContext = EmptyCoroutineContext, + val scope: CoroutineScope, private val action: JoinGroupBuilder.() -> Unit ) : Action { @@ -96,7 +95,7 @@ class JoinAction( val env = ActionEnv(groupName.toName(), laminate.builder()) - val goal = goalMap.join(context) { group.result.invoke(env, it) } + val goal = goalMap.join(scope) { group.result.invoke(env, it) } val res = Data.of(outputType, goal, env.meta) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt index 17f22911..7e2099bb 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt @@ -2,8 +2,7 @@ package hep.dataforge.data import hep.dataforge.meta.* import hep.dataforge.names.Name -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext +import kotlinx.coroutines.CoroutineScope import kotlin.reflect.KClass class ActionEnv(val name: Name, val meta: Meta) @@ -27,7 +26,7 @@ class PipeBuilder(var name: Name, var meta: MetaBuilder) { class PipeAction( val inputType: KClass, val outputType: KClass, - val context: CoroutineContext = EmptyCoroutineContext, + val scope: CoroutineScope, private val block: PipeBuilder.() -> Unit ) : Action { @@ -47,7 +46,7 @@ class PipeAction( //getting new meta val newMeta = builder.meta.seal() //creating a goal with custom context if provided - val goal = data.task.pipe(context) { builder.result(env, it) } + val goal = data.task.pipe(scope) { builder.result(env, it) } //setting the data node this[newName] = Data.of(outputType, goal, newMeta) } @@ -57,9 +56,9 @@ class PipeAction( inline fun DataNode.pipe( meta: Meta, - context: CoroutineContext = EmptyCoroutineContext, + scope: CoroutineScope, noinline action: PipeBuilder.() -> Unit -): DataNode = PipeAction(T::class, R::class, context, action).invoke(this, meta) +): DataNode = PipeAction(T::class, R::class, scope, action).invoke(this, meta) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt index 5606bae5..d2545973 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -6,9 +6,8 @@ import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.builder import hep.dataforge.names.Name import hep.dataforge.names.toName +import kotlinx.coroutines.CoroutineScope import kotlin.collections.set -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass @@ -37,7 +36,7 @@ class SplitBuilder(val name: Name, val meta: Meta) { class SplitAction( val inputType: KClass, val outputType: KClass, - val context: CoroutineContext = EmptyCoroutineContext, + val scope: CoroutineScope, private val action: SplitBuilder.() -> Unit ) : Action { @@ -58,7 +57,7 @@ class SplitAction( rule(env) - val goal = data.task.pipe(context) { env.result(it) } + val goal = data.task.pipe(scope) { env.result(it) } val res = Data.of(outputType, goal, env.meta) set(env.name, res) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt index f4f200f0..82693c96 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt @@ -10,11 +10,12 @@ import kotlin.coroutines.EmptyCoroutineContext * **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested. */ fun goal( - context: CoroutineContext = EmptyCoroutineContext, + scope: CoroutineScope, + coroutineContext: CoroutineContext = EmptyCoroutineContext, dependencies: Collection = emptyList(), block: suspend CoroutineScope.() -> T -): Deferred = CoroutineScope(context).async( - CoroutineMonitor() + Dependencies(dependencies), +): Deferred = scope.async( + coroutineContext + CoroutineMonitor() + Dependencies(dependencies), start = CoroutineStart.LAZY ) { dependencies.forEach { job -> @@ -30,9 +31,10 @@ fun goal( * Create a one-to-one goal based on existing goal */ fun Deferred.pipe( - context: CoroutineContext = EmptyCoroutineContext, + scope: CoroutineScope, + coroutineContext: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(T) -> R -): Deferred = goal(this + context,listOf(this)) { +): Deferred = goal(scope, coroutineContext, listOf(this)) { block(await()) } @@ -41,9 +43,10 @@ fun Deferred.pipe( * @param scope the scope for resulting goal. By default use first goal in list */ fun Collection>.join( - context: CoroutineContext = EmptyCoroutineContext, + scope: CoroutineScope, + coroutineContext: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(Collection) -> R -): Deferred = goal(context, this) { +): Deferred = goal(scope, coroutineContext, this) { block(map { it.await() }) } @@ -54,9 +57,10 @@ fun Collection>.join( * @param R type of the result goal */ fun Map>.join( - context: CoroutineContext = EmptyCoroutineContext, + scope: CoroutineScope, + coroutineContext: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(Map) -> R -): Deferred = goal(context, this.values) { +): Deferred = goal(scope, coroutineContext, this.values) { block(mapValues { it.value.await() }) } diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt index 175c4d87..0b439032 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt @@ -85,7 +85,8 @@ class TaskBuilder(val name: String) { val context = this PipeAction( inputType = T::class, - outputType = R::class + outputType = R::class, + scope = context ) { block(context) } } } @@ -102,7 +103,8 @@ class TaskBuilder(val name: String) { val context = this PipeAction( inputType = T::class, - outputType = R::class + outputType = R::class, + scope = context ) { //TODO automatically append task meta result = { data -> @@ -123,7 +125,8 @@ class TaskBuilder(val name: String) { action(from, to) { JoinAction( inputType = T::class, - outputType = R::class + outputType = R::class, + scope = this ) { block(this@action) } } } @@ -141,6 +144,7 @@ class TaskBuilder(val name: String) { JoinAction( inputType = T::class, outputType = R::class, + scope = context, action = { result( actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous" @@ -163,7 +167,8 @@ class TaskBuilder(val name: String) { action(from, to) { SplitAction( inputType = T::class, - outputType = R::class + outputType = R::class, + scope = this ) { block(this@action) } } } diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt index 62fc97bd..9d2efc95 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt @@ -2,7 +2,6 @@ package hep.dataforge.workspace import hep.dataforge.context.Context import hep.dataforge.data.Data -import hep.dataforge.data.goal import hep.dataforge.descriptors.NodeDescriptor import hep.dataforge.io.IOFormat import hep.dataforge.io.JsonMetaFormat @@ -10,6 +9,7 @@ import hep.dataforge.io.MetaFormat import hep.dataforge.meta.EmptyMeta import hep.dataforge.meta.Meta import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.withContext import kotlinx.io.nio.asInput @@ -58,7 +58,7 @@ suspend fun Context.readData( } else { null } - val goal = goal { + val goal = async { withContext(Dispatchers.IO) { format.run { Files.newByteChannel(path, StandardOpenOption.READ)