diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 718fb46f..ea25e070 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -91,7 +91,7 @@ fun Data.map( meta: Meta = this.meta, block: suspend CoroutineScope.(T) -> R ): Data = DynamicData(outputType, meta, coroutineContext, listOf(this)) { - block(await(this)) + block(await()) } @@ -103,7 +103,7 @@ inline fun Data.map( meta: Meta = this.meta, noinline block: suspend CoroutineScope.(T) -> R ): Data = DynamicData(R::class, meta, coroutineContext, listOf(this)) { - block(await(this)) + block(await()) } /** @@ -119,7 +119,7 @@ inline fun Collection>.reduce( coroutineContext, this ) { - block(map { run { it.await(this) } }) + block(map { run { it.await() } }) } fun Map>.reduce( @@ -133,7 +133,7 @@ fun Map>.reduce( coroutineContext, this.values ) { - block(mapValues { it.value.await(this) }) + block(mapValues { it.value.await() }) } @@ -153,7 +153,7 @@ inline fun Map>.reduce( coroutineContext, this.values ) { - block(mapValues { it.value.await(this) }) + block(mapValues { it.value.await() }) } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt index a673f0b7..764aea62 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt @@ -4,6 +4,7 @@ import hep.dataforge.meta.* import hep.dataforge.names.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlin.collections.component1 import kotlin.collections.component2 @@ -55,6 +56,19 @@ interface DataNode : MetaRepr { } } + /** + * Start computation for all goals in data node and return a job for the whole node + */ + @Suppress("DeferredResultUnused") + fun CoroutineScope.startAll(): Job = launch { + items.values.forEach { + when (it) { + is DataItem.Node<*> -> it.node.run { startAll() } + is DataItem.Leaf<*> -> it.data.run { startAsync() } + } + } + } + companion object { const val TYPE = "dataNode" @@ -68,21 +82,11 @@ interface DataNode : MetaRepr { } } +suspend fun DataNode.join(): Unit = coroutineScope { startAll().join() } + val DataItem?.node: DataNode? get() = (this as? DataItem.Node)?.node val DataItem?.data: Data? get() = (this as? DataItem.Leaf)?.data -/** - * Start computation for all goals in data node and return a job for the whole node - */ -fun DataNode<*>.launchAll(scope: CoroutineScope): Job = scope.launch { - items.values.forEach { - when (it) { - is DataItem.Node<*> -> it.node.launchAll(scope) - is DataItem.Leaf<*> -> it.data.start(scope) - } - } -} - operator fun DataNode.get(name: Name): DataItem? = when (name.length) { 0 -> error("Empty name") 1 -> items[name.first()] diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt index 8275d31e..8c0eeec7 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt @@ -15,9 +15,7 @@ interface Goal { * Get ongoing computation or start a new one. * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. */ - fun startAsync(scope: CoroutineScope): Deferred - - suspend fun CoroutineScope.await(): T = startAsync(this).await() + fun CoroutineScope.startAsync(): Deferred /** * Reset the computation @@ -29,17 +27,15 @@ interface Goal { } } -fun Goal<*>.start(scope: CoroutineScope): Job = startAsync(scope) +suspend fun Goal.await(): T = coroutineScope { startAsync().await() } val Goal<*>.isComplete get() = result?.isCompleted ?: false -suspend fun Goal.await(scope: CoroutineScope): T = scope.await() - open class StaticGoal(val value: T) : Goal { override val dependencies: Collection> get() = emptyList() override val result: Deferred = CompletableDeferred(value) - override fun startAsync(scope: CoroutineScope): Deferred = result + override fun CoroutineScope.startAsync(): Deferred = result override fun reset() { //doNothing @@ -59,18 +55,19 @@ open class DynamicGoal( * Get ongoing computation or start a new one. * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. */ - override fun startAsync(scope: CoroutineScope): Deferred { - val startedDependencies = this.dependencies.map { goal -> - goal.startAsync(scope) + override fun CoroutineScope.startAsync(): Deferred { + val startedDependencies = this@DynamicGoal.dependencies.map { goal -> + goal.run { startAsync() } } - return result ?: scope.async(coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)) { - startedDependencies.forEach { deferred -> - deferred.invokeOnCompletion { error -> - if (error != null) cancel(CancellationException("Dependency $deferred failed with error: ${error.message}")) + return result + ?: async(this@DynamicGoal.coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)) { + startedDependencies.forEach { deferred -> + deferred.invokeOnCompletion { error -> + if (error != null) cancel(CancellationException("Dependency $deferred failed with error: ${error.message}")) + } } - } - block() - }.also { result = it } + block() + }.also { result = it } } /** @@ -89,7 +86,7 @@ fun Goal.map( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(T) -> R ): Goal = DynamicGoal(coroutineContext, listOf(this)) { - block(await(this)) + block(await()) } /** @@ -99,7 +96,7 @@ fun Collection>.reduce( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(Collection) -> R ): Goal = DynamicGoal(coroutineContext, this) { - block(map { run { it.await(this) } }) + block(map { run { it.await() } }) } /** @@ -112,6 +109,6 @@ fun Map>.reduce( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.(Map) -> R ): Goal = DynamicGoal(coroutineContext, this.values) { - block(mapValues { it.value.await(this) }) + block(mapValues { it.value.await() }) } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataCast.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataCast.kt index 2bf8adde..21301dd4 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataCast.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/dataCast.kt @@ -41,7 +41,7 @@ fun Data<*>.cast(type: KClass): Data { override val meta: Meta get() = this@cast.meta override val dependencies: Collection> get() = this@cast.dependencies override val result: Deferred? get() = this@cast.result as Deferred - override fun startAsync(scope: CoroutineScope): Deferred = this@cast.startAsync(scope) as Deferred + override fun CoroutineScope.startAsync(): Deferred = this@cast.run { startAsync() as Deferred } override fun reset() = this@cast.reset() override val type: KClass = type } diff --git a/dataforge-io/dataforge-io-yaml/src/test/kotlin/hep/dataforge/io/yaml/YamlMetaFormatTest.kt b/dataforge-io/dataforge-io-yaml/src/test/kotlin/hep/dataforge/io/yaml/YamlMetaFormatTest.kt index 2be83509..b330e080 100644 --- a/dataforge-io/dataforge-io-yaml/src/test/kotlin/hep/dataforge/io/yaml/YamlMetaFormatTest.kt +++ b/dataforge-io/dataforge-io-yaml/src/test/kotlin/hep/dataforge/io/yaml/YamlMetaFormatTest.kt @@ -10,9 +10,9 @@ import kotlin.test.Test import kotlin.test.assertEquals -class YamlMetaFormatTest{ +class YamlMetaFormatTest { @Test - fun testYamlMetaFormat(){ + fun testYamlMetaFormat() { val meta = buildMeta { "a" put 22 "node" put { diff --git a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/envelopeData.kt b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/envelopeData.kt index 1a713e37..d378726f 100644 --- a/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/envelopeData.kt +++ b/dataforge-workspace/src/commonMain/kotlin/hep/dataforge/workspace/envelopeData.kt @@ -6,7 +6,6 @@ import hep.dataforge.io.Envelope import hep.dataforge.io.IOFormat import hep.dataforge.io.SimpleEnvelope import hep.dataforge.io.readWith -import kotlinx.coroutines.coroutineScope import kotlinx.io.ArrayBinary import kotlin.reflect.KClass @@ -18,9 +17,7 @@ fun Envelope.toData(type: KClass, format: IOFormat): Data } suspend fun Data.toEnvelope(format: IOFormat): Envelope { - val obj = coroutineScope { - await(this) - } + val obj = await() val binary = ArrayBinary.write { format.run { writeObject(obj) } }