From 4117a05df40e743cca2bb7a2c827646c802d4438 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Mon, 3 Oct 2022 20:36:28 +0300 Subject: [PATCH] Add in-memory caching for workspaces --- build.gradle.kts | 21 +++++-- dataforge-context/build.gradle.kts | 2 +- dataforge-data/build.gradle.kts | 2 +- .../space/kscience/dataforge/data/DataSet.kt | 5 ++ .../kscience/dataforge/data/dataTransform.kt | 3 +- dataforge-io/build.gradle.kts | 2 +- dataforge-meta/build.gradle.kts | 2 +- .../dataforge/meta/ObservableMetaWrapper.kt | 4 +- .../kscience/dataforge/meta/SealedMeta.kt | 10 +++- dataforge-workspace/build.gradle.kts | 7 ++- .../kscience/dataforge/workspace/Task.kt | 5 ++ .../kscience/dataforge/workspace/TaskData.kt | 4 +- .../kscience/dataforge/workspace/Workspace.kt | 5 +- .../{SimpleWorkspace.kt => WorkspaceBase.kt} | 8 ++- .../dataforge/workspace/WorkspaceBuilder.kt | 12 +++- .../dataforge/workspace/WorkspaceCache.kt | 38 +++++++++++++ .../dataforge/workspace/WorkspacePlugin.kt | 0 .../dataforge/workspace/taskBuilders.kt | 28 +++++++++- .../dataforge/workspace/workspaceJvm.kt | 3 + .../workspace/CachingWorkspaceTest.kt | 46 +++++++++++++++ .../workspace/SimpleWorkspaceTest.kt | 56 +++++++++---------- gradle.properties | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 23 files changed, 210 insertions(+), 57 deletions(-) rename dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/{SimpleWorkspace.kt => WorkspaceBase.kt} (60%) create mode 100644 dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt rename dataforge-workspace/src/{jvmMain => commonMain}/kotlin/space/kscience/dataforge/workspace/WorkspacePlugin.kt (100%) create mode 100644 dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index 6ec0833c..7ffa0076 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,7 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile +import space.kscience.gradle.isInDevelopment +import space.kscience.gradle.useApache2Licence +import space.kscience.gradle.useSPCTeam plugins { id("space.kscience.gradle.project") @@ -12,8 +15,8 @@ allprojects { subprojects { apply(plugin = "maven-publish") - tasks.withType{ - kotlinOptions{ + tasks.withType { + kotlinOptions { freeCompilerArgs = freeCompilerArgs + "-Xcontext-receivers" } } @@ -24,8 +27,18 @@ readme { } ksciencePublish { - github("dataforge-core") - space("https://maven.pkg.jetbrains.space/mipt-npm/p/sci/maven") + pom("https://github.com/SciProgCentre/kmath") { + useApache2Licence() + useSPCTeam() + } + github("dataforge-core", "SciProgCentre") + space( + if (isInDevelopment) { + "https://maven.pkg.jetbrains.space/mipt-npm/p/sci/dev" + } else { + "https://maven.pkg.jetbrains.space/mipt-npm/p/sci/release" + } + ) sonatype() } diff --git a/dataforge-context/build.gradle.kts b/dataforge-context/build.gradle.kts index 056d5590..b0e8f402 100644 --- a/dataforge-context/build.gradle.kts +++ b/dataforge-context/build.gradle.kts @@ -1,11 +1,11 @@ plugins { id("space.kscience.gradle.mpp") - id("space.kscience.gradle.native") } description = "Context and provider definitions" kscience { + native() useCoroutines() } diff --git a/dataforge-data/build.gradle.kts b/dataforge-data/build.gradle.kts index 9af661f9..b3b6583b 100644 --- a/dataforge-data/build.gradle.kts +++ b/dataforge-data/build.gradle.kts @@ -1,9 +1,9 @@ plugins { id("space.kscience.gradle.mpp") - id("space.kscience.gradle.native") } kscience{ + native() useCoroutines() } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt index cf327c9b..72e4eb38 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSet.kt @@ -56,6 +56,11 @@ public fun DataSet.asSequence(): Sequence> = object : override fun iterator(): Iterator> = this@asSequence.iterator() } +/** + * Return a single [Data] in this [DataSet]. Throw error if it is not single. + */ +public fun DataSet.single(): NamedData = asSequence().single() + public fun DataSet.asIterable(): Iterable> = object : Iterable> { override fun iterator(): Iterator> = this@asIterable.iterator() } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index 6d683f12..76577346 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.flow.map import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.seal @@ -37,7 +36,7 @@ public inline fun Data.map( } /** - * Combine this data with the other data using [block]. See [map] for other details + * Combine this data with the other data using [block]. See [Data::map] for other details */ public inline fun Data.combine( other: Data, diff --git a/dataforge-io/build.gradle.kts b/dataforge-io/build.gradle.kts index 3d6ff062..1ee69734 100644 --- a/dataforge-io/build.gradle.kts +++ b/dataforge-io/build.gradle.kts @@ -2,12 +2,12 @@ import space.kscience.gradle.KScienceVersions plugins { id("space.kscience.gradle.mpp") - id("space.kscience.gradle.native") } description = "IO module" kscience { + native() useSerialization(sourceSet = space.kscience.gradle.DependencySourceSet.TEST) { cbor() } diff --git a/dataforge-meta/build.gradle.kts b/dataforge-meta/build.gradle.kts index 4252de77..95c062ec 100644 --- a/dataforge-meta/build.gradle.kts +++ b/dataforge-meta/build.gradle.kts @@ -1,9 +1,9 @@ plugins { id("space.kscience.gradle.mpp") - id("space.kscience.gradle.native") } kscience { + native() useSerialization{ json() } diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/ObservableMetaWrapper.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/ObservableMetaWrapper.kt index c5b2a082..0e5f3ae4 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/ObservableMetaWrapper.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/ObservableMetaWrapper.kt @@ -13,8 +13,8 @@ private class ObservableMetaWrapper( val listeners: MutableSet ) : ObservableMutableMeta { override val items: Map - get() = root.items.mapValues { - ObservableMetaWrapper(root, absoluteName + it.key, listeners) + get() = root.items.keys.associateWith { + ObservableMetaWrapper(root, absoluteName + it, listeners) } override fun getMeta(name: Name): ObservableMutableMeta? = diff --git a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/SealedMeta.kt b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/SealedMeta.kt index 31608af6..6c8ab3e9 100644 --- a/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/SealedMeta.kt +++ b/dataforge-meta/src/commonMain/kotlin/space/kscience/dataforge/meta/SealedMeta.kt @@ -14,7 +14,15 @@ public class SealedMeta internal constructor( ) : TypedMeta { override fun toString(): String = Meta.toString(this) override fun equals(other: Any?): Boolean = Meta.equals(this, other as? Meta) - override fun hashCode(): Int = Meta.hashCode(this) + + /** + * Compute hash code once to optimize later access + */ + private val cachedHashCode by lazy { + Meta.hashCode(this) + } + + override fun hashCode(): Int = cachedHashCode } /** diff --git a/dataforge-workspace/build.gradle.kts b/dataforge-workspace/build.gradle.kts index 1ab9f957..7784bc35 100644 --- a/dataforge-workspace/build.gradle.kts +++ b/dataforge-workspace/build.gradle.kts @@ -1,9 +1,9 @@ plugins { id("space.kscience.gradle.mpp") - id("space.kscience.gradle.native") } kscience{ + native() useCoroutines() } @@ -16,6 +16,11 @@ kotlin { api(project(":dataforge-io")) } } + jvmTest{ + dependencies{ + implementation("ch.qos.logback:logback-classic:1.4.1") + } + } } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index 75c411ce..cc83c988 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -22,6 +22,11 @@ import kotlin.reflect.typeOf @Type(TYPE) public interface Task : Described { + /** + * A task identification string used to compare tasks and check task body for change + */ + public val fingerprint: String get() = hashCode().toString() + /** * Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model * and a handler for actual result diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt index bd2bdb48..4a3bb620 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskData.kt @@ -17,7 +17,7 @@ public interface TaskData : NamedData { /** * The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used. */ - public val task: Name + public val taskName: Name /** * Stage configuration used to produce this data. @@ -34,7 +34,7 @@ private class TaskDataImpl( override val workspace: Workspace, override val data: Data, override val name: Name, - override val task: Name, + override val taskName: Name, override val taskMeta: Meta, ) : TaskData, Data by data { // override val dependencies: Collection> = data.dependencies.map { diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt index 0053850d..b53f20f1 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt @@ -15,6 +15,9 @@ public interface DataSelector{ public suspend fun select(workspace: Workspace, meta: Meta): DataSet } +/** + * An environment for pull-mode computation + */ @Type(Workspace.TYPE) public interface Workspace : ContextAware, Provider { /** @@ -28,7 +31,7 @@ public interface Workspace : ContextAware, Provider { public val targets: Map /** - * All stages associated with the workspace + * All tasks associated with the workspace */ public val tasks: Map> diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/SimpleWorkspace.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt similarity index 60% rename from dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/SimpleWorkspace.kt rename to dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt index e800d8b3..e9422703 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/SimpleWorkspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt @@ -10,16 +10,18 @@ import space.kscience.dataforge.names.Name /** * A simple workspace without caching */ -public class SimpleWorkspace( +public class WorkspaceBase internal constructor( override val context: Context, data: DataSet<*>, override val targets: Map, private val externalTasks: Map>, + private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>, ) : Workspace { override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) - override val tasks: Map> - get() = context.gather>(Task.TYPE) + externalTasks + override val tasks: Map> by lazy { context.gather>(Task.TYPE) + externalTasks } + override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = + postProcess(super.produce(taskName, taskMeta)) } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index 4a847e64..7784bfcc 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -88,6 +88,7 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas private var data: DataSet<*>? = null private val targets: HashMap = HashMap() private val tasks = HashMap>() + private var cache: WorkspaceCache? = null /** * Define a context for the workspace @@ -123,7 +124,16 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas tasks[taskName] = task } - public fun build(): Workspace = SimpleWorkspace(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks) + public fun useCache() { + cache = InMemoryWorkspaceCache() + } + + public fun build(): Workspace { + val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result -> + cache?.evaluate(result) ?: result + } + return WorkspaceBase(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks, postProcess) + } } /** diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt new file mode 100644 index 00000000..0f37d4e9 --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt @@ -0,0 +1,38 @@ +package space.kscience.dataforge.workspace + +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name +import kotlin.reflect.KType + +public interface WorkspaceCache { + public suspend fun evaluate(result: TaskResult): TaskResult +} + +private typealias TaskResultId = Pair + +public class InMemoryWorkspaceCache : WorkspaceCache { + + // never do that at home! + private val cache = HashMap>>() + + //TODO do actual check + private fun TaskData<*>.checkType(taskType: KType): TaskData = this as TaskData + + override suspend fun evaluate(result: TaskResult): TaskResult { + for (d: TaskData in result) { + cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d } + } + + return object : TaskResult by result { + override fun iterator(): Iterator> = (cache[result.taskName to result.taskMeta] + ?.values?.map { it.checkType(result.dataType) } + ?: emptyList()).iterator() + + override fun get(name: Name): TaskData? { + val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null + //TODO check types + return cached.checkType(result.dataType) + } + } + } +} \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/WorkspacePlugin.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspacePlugin.kt similarity index 100% rename from dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/WorkspacePlugin.kt rename to dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspacePlugin.kt diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt index 9fd31d1c..baf17ee1 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt @@ -1,19 +1,38 @@ package space.kscience.dataforge.workspace +import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.data.DataSet import space.kscience.dataforge.data.forEach import space.kscience.dataforge.data.map import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.toMutableMeta import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.plus /** * Select data using given [selector] */ public suspend fun TaskResultBuilder<*>.from( selector: DataSelector, -): DataSet = selector.select(workspace, taskMeta) + meta: Meta = taskMeta +): DataSet = selector.select(workspace, meta) + + +/** + * Select data from a [WorkspacePlugin] attached to this [Workspace] context. + */ +public suspend inline fun TaskResultBuilder<*>.from( + pluginFactory: PluginFactory

, + meta: Meta = taskMeta, + selectorBuilder: P.() -> TaskReference, +): DataSet { + val plugin = workspace.context.plugins[pluginFactory] + ?: error("Plugin ${pluginFactory.tag} not loaded into workspace context") + val taskReference: TaskReference = plugin.selectorBuilder() + return workspace.produce(plugin.name + taskReference.taskName, meta) as TaskResult +} public val TaskResultBuilder<*>.allData: DataSelector<*> get() = object : DataSelector { @@ -27,11 +46,14 @@ public val TaskResultBuilder<*>.allData: DataSelector<*> @DFExperimental public suspend inline fun TaskResultBuilder.pipeFrom( selector: DataSelector, - crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R + selectorMeta: Meta = taskMeta, + dataMetaTransform: MutableMeta.() -> Unit = {}, + crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R, ) { - from(selector).forEach { data -> + from(selector, selectorMeta).forEach { data -> val meta = data.meta.toMutableMeta().apply { taskName put taskMeta + dataMetaTransform() } val res = data.map(workspace.context.coroutineContext, meta) { diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt index 02bf9001..2d368d09 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt @@ -11,6 +11,9 @@ import space.kscience.dataforge.names.matches // data(builder) //} +/** + * Select the whole data set from the workspace filtered by type. + */ @OptIn(DFExperimental::class) public inline fun TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector = object : DataSelector { diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt new file mode 100644 index 00000000..9bf72aec --- /dev/null +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt @@ -0,0 +1,46 @@ +package space.kscience.dataforge.workspace + +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test +import space.kscience.dataforge.data.startAll +import space.kscience.dataforge.data.static +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.boolean +import space.kscience.dataforge.meta.get + +class CachingWorkspaceTest { + val workspace = Workspace { + data { + //statically initialize data + repeat(5) { + static("myData[$it]", it) + } + } + + useCache() + + val doFirst by task { + pipeFrom(data()) { _, name, meta -> + println("Done first on $name with flag=${taskMeta["flag"].boolean ?: false}") + } + } + + val doSecond by task{ + pipeFrom(doFirst) { _, name, meta -> + println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}") + } + } + } + + + @Test + fun testMetaPropagation() = runTest { + val first = workspace.produce("doFirst") + val secondA = workspace.produce("doSecond") + val secondB = workspace.produce("doSecond", Meta { "flag" put true }) + first.startAll(this) + secondA.startAll(this) + secondB.startAll(this) + } + +} \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index db231c80..5e06f45c 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -33,40 +33,44 @@ public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit produce(task, block) } +@OptIn(DFExperimental::class) +internal object TestPlugin : WorkspacePlugin() { + override val tag: PluginTag = PluginTag("test") -@DFExperimental -class SimpleWorkspaceTest { - val testPlugin = object : WorkspacePlugin() { - override val tag: PluginTag = PluginTag("test") - - val test by task { - populateFrom( - workspace.data.map { - it.also { - logger.info { "Test: $it" } - } - } - ) + val test by task { + // type is inferred + pipeFrom(data()) { arg, _, _ -> + logger.info { "Test: $arg" } + arg } + } +} - val testPluginFactory = testPlugin.toFactory() + +@DFExperimental +internal class SimpleWorkspaceTest { + + val testPluginFactory = TestPlugin.toFactory() val workspace = Workspace { context { + //load existing task via plugin into the workspace plugin(testPluginFactory) } data { + //statically initialize data repeat(100) { static("myData[$it]", it) } } val filterOne by task { - workspace.data.getByType("myData[12]")?.let { source -> + val name by taskMeta.string { error("Name field not defined") } + from(testPluginFactory) { test }.getByType(name)?.let { source -> data(source.name, source.map { it }) } } @@ -74,19 +78,11 @@ class SimpleWorkspaceTest { val square by task { pipeFrom(data()) { arg, name, meta -> if (meta["testFlag"].boolean == true) { - println("flag") + println("Side effect") } workspace.logger.info { "Starting square on $name" } arg * arg } -// workspace.data.select().forEach { data -> -// if (data.meta["testFlag"].boolean == true) { -// println("flag") -// } -// val value = data.await() -// workspace.logger.info { "Starting square on $value" } -// emit(data.name, data.map { it * it }) -// } } val linear by task { @@ -94,17 +90,13 @@ class SimpleWorkspaceTest { workspace.logger.info { "Starting linear on $name" } arg * 2 + 1 } -// workspace.data.select().forEach { data -> -// workspace.logger.info { "Starting linear on $data" } -// emit(data.name, data.data.map { it * 2 + 1 }) -// } } val fullSquare by task { val squareData = from(square) val linearData = from(linear) squareData.forEach { data -> - val newData: Data = data.combine(linearData.get(data.name)!!) { l, r -> + val newData: Data = data.combine(linearData[data.name]!!) { l, r -> l + r } data(data.name, newData) @@ -190,8 +182,10 @@ class SimpleWorkspaceTest { @Test fun testFilter() { runBlocking { - val node = workspace.produce("filterOne") - assertEquals(12, node.asSequence().first().await()) + val node = workspace.produce("filterOne") { + "name" put "myData[12]" + } + assertEquals(12, node.single().await()) } } } \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 4ec08ab4..a289b498 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,4 +5,4 @@ kotlin.code.style=official kotlin.mpp.stability.nowarn=true kotlin.incremental.js.ir=true -toolsVersion=0.12.0-kotlin-1.7.20-Beta +toolsVersion=0.13.0-kotlin-1.7.20-Beta diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index aa991fce..ae04661e 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists