diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt index 33d4ca05..7cd1ced5 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt @@ -1,9 +1,11 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.startsWith import kotlin.reflect.KType @@ -33,26 +35,38 @@ public abstract class AbstractAction( /** * Update part of the data set using provided data + * + * @param source the source data tree in case we need several data items to update */ protected open fun DataSink.update( - allData: DataTree, + source: DataTree, meta: Meta, namedData: NamedData, ){ //by default regenerate the whole data set - generate(allData,meta) + generate(source,meta) } + @OptIn(DFInternal::class) override fun execute( - scope: CoroutineScope, dataSet: DataTree, meta: Meta, - ): ObservableDataTree = MutableDataTree(outputType, scope).apply { - generate(dataSet, meta) - scope.launch { - dataSet.updates().collect { + ): DataTree = if(dataSet.isObservable()) { + MutableDataTree(outputType, dataSet.updatesScope).apply { + generate(dataSet, meta) + dataSet.updates().onEach { update(dataSet, meta, it) + }.launchIn(updatesScope) + + //close updates when the source is closed + updatesScope.launch { + dataSet.awaitClose() + close() } } + } else{ + DataTree(outputType){ + generate(dataSet, meta) + } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt index ac903aee..5ed60db9 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/Action.kt @@ -1,9 +1,6 @@ package space.kscience.dataforge.actions -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.ObservableDataTree import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFExperimental @@ -16,7 +13,7 @@ public fun interface Action { * Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy * so not actual computation is started at this moment. */ - public fun execute(scope: CoroutineScope, dataSet: DataTree, meta: Meta): ObservableDataTree + public fun execute(dataSet: DataTree, meta: Meta): DataTree public companion object } @@ -26,20 +23,21 @@ public fun interface Action { */ public fun DataTree.transform( action: Action, - scope: CoroutineScope, meta: Meta = Meta.EMPTY, -): DataTree = action.execute(scope, this, meta) +): DataTree = action.execute(this, meta) /** * Action composition. The result is terminal if one of its parts is terminal */ -public infix fun Action.then(action: Action): Action = - Action { scope, dataSet, meta -> action.execute(scope, this@then.execute(scope, dataSet, meta), meta) } +public infix fun Action.then(action: Action): Action = Action { dataSet, meta -> + action.execute(this@then.execute(dataSet, meta), meta) +} @DFExperimental -public suspend operator fun Action.invoke( +public operator fun Action.invoke( dataSet: DataTree, meta: Meta = Meta.EMPTY, -): DataTree = coroutineScope { execute(this, dataSet, meta) } +): DataTree = execute(dataSet, meta) + diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt index 3cf7c788..1f40ed73 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt @@ -79,14 +79,14 @@ internal class MapAction( builder.result(env, data.await()) } //setting the data node - data(newName, newData) + put(newName, newData) } override fun DataSink.generate(data: DataTree, meta: Meta) { data.forEach { mapOne(it.name, it.data, meta) } } - override fun DataSink.update(allData: DataTree, meta: Meta, namedData: NamedData) { + override fun DataSink.update(source: DataTree, meta: Meta, namedData: NamedData) { mapOne(namedData.name, namedData.data, namedData.meta) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt index 5b862046..9440be55 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt @@ -103,7 +103,7 @@ internal class ReduceAction( meta = groupMeta ) { group.result.invoke(env, it) } - data(env.name, res) + put(env.name, res) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt index 7926ce0b..057419a7 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt @@ -64,7 +64,7 @@ internal class SplitAction( ).apply(rule) //data.map(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) - data( + put( fragmentName, @Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) { env.result(data.await()) @@ -77,7 +77,7 @@ internal class SplitAction( data.forEach { splitOne(it.name, it.data, meta) } } - override fun DataSink.update(allData: DataTree, meta: Meta, namedData: NamedData) { + override fun DataSink.update(source: DataTree, meta: Meta, namedData: NamedData) { splitOne(namedData.name, namedData.data, namedData.meta) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt index d6011839..d379d027 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt @@ -1,11 +1,11 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.* import kotlinx.coroutines.flow.* -import kotlinx.coroutines.launch import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.* +import kotlin.contracts.contract import kotlin.reflect.KType import kotlin.reflect.typeOf @@ -66,7 +66,7 @@ public interface GenericDataTree> : DataS } } -public typealias DataTree = GenericDataTree> +public typealias DataTree = GenericDataTree> /** * Return a single data in this tree. Throw error if it is not single. @@ -87,7 +87,7 @@ public operator fun DataTree.get(name: String): Data? = read(name.pars public fun DataTree.asSequence( namePrefix: Name = Name.EMPTY, ): Sequence> = sequence { - data?.let { yield(it.named(Name.EMPTY)) } + data?.let { yield(it.named(namePrefix)) } items.forEach { (token, tree) -> yieldAll(tree.asSequence(namePrefix + token)) } @@ -113,8 +113,8 @@ public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEm @PublishedApi internal class FlatDataTree( override val dataType: KType, - val dataSet: Map>, - val prefix: Name, + private val dataSet: Map>, + private val prefix: Name, ) : GenericDataTree> { override val self: FlatDataTree get() = this override val data: Data? get() = dataSet[prefix] @@ -141,20 +141,56 @@ internal fun Sequence>.toTree(type: KType): DataTree = public inline fun Sequence>.toTree(): DataTree = FlatDataTree(typeOf(), associate { it.name to it.data }, Name.EMPTY) -public interface GenericObservableDataTree> : GenericDataTree, - ObservableDataSource +public interface GenericObservableDataTree> : + GenericDataTree, ObservableDataSource, AutoCloseable { + + /** + * A scope that is used to propagate updates. When this scope is closed, no new updates could arrive. + */ + public val updatesScope: CoroutineScope + + /** + * Close this data tree updates channel + */ + override fun close() { + updatesScope.cancel() + } + +} public typealias ObservableDataTree = GenericObservableDataTree> -public fun DataTree.updates(): Flow> = if (this is GenericObservableDataTree) updates() else emptyFlow() - -public fun interface DataSink { - public fun data(name: Name, data: Data?) +/** + * Check if the [DataTree] is observable + */ +public fun DataTree.isObservable(): Boolean { + contract { + returns(true) implies (this@isObservable is GenericObservableDataTree) + } + return this is GenericObservableDataTree } +/** + * Wait for this data tree to stop spawning updates (updatesScope is closed). + * If this [DataTree] is not observable, return immediately. + */ +public suspend fun DataTree.awaitClose() { + if (isObservable()) { + updatesScope.coroutineContext[Job]?.join() + } +} + +public fun DataTree.updates(): Flow> = + if (this is GenericObservableDataTree) updates() else emptyFlow() + +public fun interface DataSink { + public fun put(name: Name, data: Data?) +} + +@DFInternal public class DataTreeBuilder(private val type: KType) : DataSink { private val map = HashMap>() - override fun data(name: Name, data: Data?) { + override fun put(name: Name, data: Data?) { if (data == null) { map.remove(name) } else { @@ -174,6 +210,7 @@ public inline fun DataTree( /** * Create and a data tree. */ +@OptIn(DFInternal::class) public inline fun DataTree( generator: DataSink.() -> Unit, ): DataTree = DataTreeBuilder(typeOf()).apply(generator).build() @@ -182,77 +219,88 @@ public inline fun DataTree( * A mutable version of [GenericDataTree] */ public interface MutableDataTree : GenericObservableDataTree>, DataSink { - public val scope: CoroutineScope - override var data: Data? override val items: Map> + public fun getOrCreateItem(token: NameToken): MutableDataTree + public operator fun set(token: NameToken, data: Data?) - override fun data(name: Name, data: Data?): Unit = set(name, data) + override fun put(name: Name, data: Data?): Unit = set(name, data) } public tailrec operator fun MutableDataTree.set(name: Name, data: Data?): Unit { when (name.length) { 0 -> this.data = data 1 -> set(name.first(), data) - else -> items[name.first()]?.set(name.cutFirst(), data) + else -> getOrCreateItem(name.first())[name.cutFirst()] = data } } -private class ObservableMutableDataTreeImpl( +private class MutableDataTreeImpl( override val dataType: KType, - override val scope: CoroutineScope, + override val updatesScope: CoroutineScope, ) : MutableDataTree { + private val updates = MutableSharedFlow>() private val children = HashMap>() override var data: Data? = null set(value) { + if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree") field = value if (value != null) { - scope.launch { + updatesScope.launch { updates.emit(value.named(Name.EMPTY)) } } } override val items: Map> get() = children + + override fun getOrCreateItem(token: NameToken): MutableDataTree = children.getOrPut(token){ + MutableDataTreeImpl(dataType, updatesScope) + } + override val self: MutableDataTree get() = this override fun set(token: NameToken, data: Data?) { - children.getOrPut(token) { - ObservableMutableDataTreeImpl(dataType, scope).also { subTree -> - subTree.updates().onEach { - updates.emit(it.named(token + it.name)) - }.launchIn(scope) - } - }.data = data + if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree") + val subTree = getOrCreateItem(token) + subTree.updates().onEach { + updates.emit(it.named(token + it.name)) + }.launchIn(updatesScope) + subTree.data = data } - override fun updates(): Flow> = flow { - //emit this node updates - updates.collect { - emit(it) - } - } + override fun updates(): Flow> = updates } +/** + * Create a new [MutableDataTree] + * + * @param parentScope a [CoroutineScope] to control data propagation. By default uses [GlobalScope] + */ +@OptIn(DelicateCoroutinesApi::class) public fun MutableDataTree( type: KType, - scope: CoroutineScope -): MutableDataTree = ObservableMutableDataTreeImpl(type, scope) + parentScope: CoroutineScope = GlobalScope, +): MutableDataTree = MutableDataTreeImpl( + type, + CoroutineScope(parentScope.coroutineContext + Job(parentScope.coroutineContext[Job])) +) /** * Create and initialize a observable mutable data tree. */ +@OptIn(DelicateCoroutinesApi::class) public inline fun MutableDataTree( - scope: CoroutineScope, + parentScope: CoroutineScope = GlobalScope, generator: MutableDataTree.() -> Unit = {}, -): MutableDataTree = MutableDataTree(typeOf(), scope).apply { generator() } +): MutableDataTree = MutableDataTree(typeOf(), parentScope).apply { generator() } //@DFInternal //public fun ObservableDataTree( @@ -262,18 +310,21 @@ public inline fun MutableDataTree( //): ObservableDataTree = MutableDataTree(type, scope.coroutineContext).apply(generator) public inline fun ObservableDataTree( - scope: CoroutineScope, + parentScope: CoroutineScope, generator: MutableDataTree.() -> Unit = {}, -): ObservableDataTree = MutableDataTree(typeOf(), scope).apply(generator) +): ObservableDataTree = MutableDataTree(typeOf(), parentScope).apply(generator) /** * Collect a [Sequence] into an observable tree with additional [updates] */ -public fun Sequence>.toObservableTree(dataType: KType, scope: CoroutineScope, updates: Flow>): ObservableDataTree = - MutableDataTree(dataType, scope).apply { - emitAll(this@toObservableTree) - updates.onEach { - data(it.name, it.data) - }.launchIn(scope) - } +public fun Sequence>.toObservableTree( + dataType: KType, + parentScope: CoroutineScope, + updates: Flow>, +): ObservableDataTree = MutableDataTree(dataType, parentScope).apply { + this.putAll(this@toObservableTree) + updates.onEach { + put(it.name, it.data) + }.launchIn(updatesScope) +} diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt index 1a1c66b2..90486d85 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/GroupRule.kt @@ -17,6 +17,7 @@ package space.kscience.dataforge.data import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.string +import space.kscience.dataforge.misc.DFInternal public interface GroupRule { public fun gather(set: DataTree): Map> @@ -30,6 +31,7 @@ public interface GroupRule { * @param defaultTagValue * @return */ + @OptIn(DFInternal::class) public fun byMetaValue( key: String, defaultTagValue: String, @@ -42,7 +44,7 @@ public interface GroupRule { set.forEach { data -> val tagValue: String = data.meta[key]?.string ?: defaultTagValue - map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.data(data.name,data.data) + map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.put(data.name, data.data) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt index 72b1bb33..8e8b6eaa 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt @@ -12,16 +12,12 @@ import space.kscience.dataforge.names.isEmpty import space.kscience.dataforge.names.plus -public fun DataSink.data(value: NamedData) { - data(value.name, value.data) -} - -public fun DataSink.emitAll(sequence: Sequence>) { - sequence.forEach { data(it) } +public fun DataSink.put(value: NamedData) { + put(value.name, value.data) } public fun DataSink.branch(dataTree: DataTree) { - emitAll(dataTree.asSequence()) + putAll(dataTree.asSequence()) } public inline fun DataSink.branch( @@ -32,7 +28,7 @@ public inline fun DataSink.branch( apply(block) } else { val proxyDataSink = DataSink { nameWithoutPrefix, data -> - this.data(prefix + nameWithoutPrefix, data) + this.put(prefix + nameWithoutPrefix, data) } proxyDataSink.apply(block) @@ -45,69 +41,69 @@ public inline fun DataSink.branch( ): Unit = branch(prefix.asName(), block) -public fun DataSink.data(name: String, value: Data) { - data(Name.parse(name), value) +public fun DataSink.put(name: String, value: Data) { + put(Name.parse(name), value) } public fun DataSink.branch(name: Name, set: DataTree) { - branch(name) { emitAll(set.asSequence()) } + branch(name) { putAll(set.asSequence()) } } public fun DataSink.branch(name: String, set: DataTree) { - branch(Name.parse(name)) { emitAll(set.asSequence()) } + branch(Name.parse(name)) { putAll(set.asSequence()) } } /** * Produce lazy [Data] and emit it into the [MutableDataTree] */ -public inline fun DataSink.data( +public inline fun DataSink.put( name: String, meta: Meta = Meta.EMPTY, noinline producer: suspend () -> T, ) { val data = Data(meta, block = producer) - data(name, data) + put(name, data) } -public inline fun DataSink.data( +public inline fun DataSink.put( name: Name, meta: Meta = Meta.EMPTY, noinline producer: suspend () -> T, ) { val data = Data(meta, block = producer) - data(name, data) + put(name, data) } /** * Emit static data with the fixed value */ -public inline fun DataSink.static( +public inline fun DataSink.wrap( name: String, data: T, meta: Meta = Meta.EMPTY, -): Unit = data(name, Data.static(data, meta)) +): Unit = put(name, Data.static(data, meta)) -public inline fun DataSink.static( +public inline fun DataSink.wrap( name: Name, data: T, meta: Meta = Meta.EMPTY, -): Unit = data(name, Data.static(data, meta)) +): Unit = put(name, Data.static(data, meta)) -public inline fun DataSink.static( +public inline fun DataSink.wrap( name: String, data: T, mutableMeta: MutableMeta.() -> Unit, -): Unit = data(Name.parse(name), Data.static(data, Meta(mutableMeta))) +): Unit = put(Name.parse(name), Data.static(data, Meta(mutableMeta))) -public fun DataSink.populateFrom(sequence: Sequence>) { +public fun DataSink.putAll(sequence: Sequence>) { sequence.forEach { - data(it.name, it.data) + put(it.name, it.data) } } -public fun DataSink.populateFrom(tree: DataTree) { - populateFrom(tree.asSequence()) +public fun DataSink.putAll(tree: DataTree) { + this.putAll(tree.asSequence()) } @@ -115,13 +111,22 @@ public fun DataSink.populateFrom(tree: DataTree) { * Update data with given node data and meta with node meta. */ @DFExperimental -public fun MutableDataTree.populateFrom(flow: ObservableDataSource): Job = flow.updates().onEach { - //TODO check if the place is occupied - data(it.name, it.data) -}.launchIn(scope) +public fun MutableDataTree.putAll(source: DataTree) { + source.forEach { + put(it.name, it.data) + } +} -//public fun DataSetBuilder.populateFrom(flow: Flow>) { -// flow.collect { -// data(it.name, it.data) -// } -//} +/** + * Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job] + */ +public fun DataSink.watchBranch( + name: Name, + dataSet: ObservableDataTree, +): Job { + branch(name, dataSet) + return dataSet.updates().onEach { + put(name + it.name, it.data) + }.launchIn(dataSet.updatesScope) + +} \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index f33ec1f0..8c7ce70e 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -194,7 +194,7 @@ public suspend fun DataTree.transform( val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) { block(namedData.awaitWithMeta()) } - data(namedData.name, d) + put(namedData.name, d) } } diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt index 5f8791d0..cfccb02b 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataSetBuilderInContext.kt @@ -1,10 +1,6 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.launch import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.plus /** @@ -12,7 +8,7 @@ import space.kscience.dataforge.names.plus */ context(DataSink) public infix fun String.put(data: Data): Unit = - data(Name.parse(this), data) + put(Name.parse(this), data) /** * Append node @@ -29,16 +25,3 @@ public infix fun String.put( block: DataSink.() -> Unit, ): Unit = branch(Name.parse(this), block) -/** - * Copy given data set and mirror its changes to this [LegacyDataTreeBuilder] in [this@setAndObserve]. Returns an update [Job] - */ -context(DataSink) -public fun CoroutineScope.setAndWatch( - name: Name, - dataSet: DataTree, -): Job = launch { - branch(name, dataSet) - dataSet.updates().collect { - data(name + it.name, it.data) - } -} \ No newline at end of file diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt index 17549b6b..4aa6e6d4 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt @@ -1,7 +1,7 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test import space.kscience.dataforge.actions.Action @@ -10,13 +10,13 @@ import space.kscience.dataforge.actions.mapping import space.kscience.dataforge.misc.DFExperimental import kotlin.test.assertEquals -@OptIn(DFExperimental::class, ExperimentalCoroutinesApi::class) +@OptIn(DFExperimental::class) internal class ActionsTest { @Test fun testStaticMapAction() = runTest { val data: DataTree = DataTree { repeat(10) { - static(it.toString(), it) + wrap(it.toString(), it) } } @@ -28,20 +28,24 @@ internal class ActionsTest { } @Test - fun testDynamicMapAction() = runTest { - val data: MutableDataTree = MutableDataTree(this) + fun testDynamicMapAction() = runBlocking { + val source: MutableDataTree = MutableDataTree() val plusOne = Action.mapping { result { it + 1 } } - val result = plusOne(data) + val result = plusOne(source) + repeat(10) { - data.static(it.toString(), it) + source.wrap(it.toString(), it) } - delay(20) + delay(10) + + source.close() + result.awaitClose() assertEquals(2, result["1"]?.await()) } diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt index c5c450f7..95b7a7bd 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt @@ -1,6 +1,8 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.* +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.asName import kotlin.test.Test @@ -9,26 +11,25 @@ import kotlin.test.assertEquals internal class DataTreeBuilderTest { @Test - fun testTreeBuild() = runBlocking { + fun testTreeBuild() = runTest { val node = DataTree { "primary" put { - static("a", "a") - static("b", "b") + wrap("a", "a") + wrap("b", "b") } - static("c.d", "c.d") - static("c.f", "c.f") - } - runBlocking { - assertEquals("a", node["primary.a"]?.await()) - assertEquals("b", node["primary.b"]?.await()) - assertEquals("c.d", node["c.d"]?.await()) - assertEquals("c.f", node["c.f"]?.await()) + wrap("c.d", "c.d") + wrap("c.f", "c.f") } + assertEquals("a", node["primary.a"]?.await()) + assertEquals("b", node["primary.b"]?.await()) + assertEquals("c.d", node["c.d"]?.await()) + assertEquals("c.f", node["c.f"]?.await()) + } @OptIn(DFExperimental::class) @Test - fun testDataUpdate() = runBlocking { + fun testDataUpdate() = runTest { val updateData = DataTree { "update" put { "a" put Data.static("a") @@ -38,54 +39,30 @@ internal class DataTreeBuilderTest { val node = DataTree { "primary" put { - static("a", "a") - static("b", "b") + wrap("a", "a") + wrap("b", "b") } - static("root", "root") - populateFrom(updateData) + wrap("root", "root") + putAll(updateData) } - runBlocking { - assertEquals("a", node["update.a"]?.await()) - assertEquals("a", node["primary.a"]?.await()) - } + assertEquals("a", node["update.a"]?.await()) + assertEquals("a", node["primary.a"]?.await()) } @Test fun testDynamicUpdates() = runBlocking { - try { - lateinit var updateJob: Job - supervisorScope { - val subNode = ObservableDataTree(this) { - updateJob = launch { - repeat(10) { - delay(10) - static("value", it) - } - delay(10) - } - } - launch { - subNode.updates().collect { - println(it) - } - } - val rootNode = ObservableDataTree(this) { - setAndWatch("sub".asName(), subNode) - } + val subNode = MutableDataTree() - launch { - rootNode.updates().collect { - println(it) - } - } - updateJob.join() - assertEquals(9, rootNode["sub.value"]?.await()) - cancel() - } - } catch (t: Throwable) { - if (t !is CancellationException) throw t + val rootNode = MutableDataTree { + watchBranch("sub".asName(), subNode) } + repeat(10) { + subNode.wrap("value[$it]", it) + } + + delay(20) + assertEquals(9, rootNode["sub.value[9]"]?.await()) } } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index af2520eb..e636de49 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -88,12 +88,17 @@ public fun Task( workspace: Workspace, taskName: Name, taskMeta: Meta, - ): TaskResult = withContext(GoalExecutionRestriction() + workspace.goalLogger) { + ): TaskResult { //TODO use safe builder and check for external data on add and detects cycles - val dataset = MutableDataTree(resultType, this).apply { - TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder() } + val dataset = MutableDataTree(resultType, workspace.context).apply { + TaskResultBuilder(workspace, taskName, taskMeta, this).apply { + withContext(GoalExecutionRestriction() + workspace.goalLogger) { + builder() + } + } } - workspace.wrapResult(dataset, taskName, taskMeta) + return workspace.wrapResult(dataset, taskName, taskMeta) + } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt index 94da4383..7aa94101 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt @@ -2,6 +2,7 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job +import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import space.kscience.dataforge.data.ObservableDataTree import space.kscience.dataforge.data.asSequence @@ -20,7 +21,7 @@ public data class TaskResult( public val workspace: Workspace, public val taskName: Name, public val taskMeta: Meta, -): ObservableDataTree by content +) : ObservableDataTree by content /** * Wrap data into [TaskResult] @@ -32,8 +33,9 @@ public fun Workspace.wrapResult(data: ObservableDataTree, taskName: Name, * Start computation for all data elements of this node. * The resulting [Job] is completed only when all of them are completed. */ -public fun TaskResult<*>.compute(scope: CoroutineScope): Job = scope.launch { - asSequence().forEach { +public fun TaskResult<*>.launch(scope: CoroutineScope): Job { + val jobs = asSequence().map { it.data.launch(scope) - } + }.toList() + return scope.launch { jobs.joinAll() } } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index 6ccdcb1c..4705c3b0 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -102,7 +102,7 @@ public inline fun TaskContainer.action( noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, ): PropertyDelegateProvider>> = task(MetaDescriptor(descriptorBuilder)) { - result(action.execute(workspace.context, from(selector), taskMeta.copy(metaTransform))) + result(action.execute(from(selector), taskMeta.copy(metaTransform))) } public class WorkspaceBuilder( diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt index 95e7d481..1900ff23 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt @@ -93,7 +93,7 @@ public suspend inline fun TaskResultBuilder.transformEach( action(it, data.name, meta) } - data(data.name, res) + put(data.name, res) } } @@ -113,7 +113,7 @@ public suspend inline fun TaskResultBuilder.actionFrom( action: Action, dependencyMeta: Meta = defaultDependencyMeta, ) { - branch(action.execute(workspace.context, from(selector, dependencyMeta), dependencyMeta)) + branch(action.execute(from(selector, dependencyMeta), dependencyMeta)) } diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/fileData.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/fileData.kt index 4eba363b..51c9a5e8 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/fileData.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/fileData.kt @@ -11,7 +11,6 @@ import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.NameToken import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.plus -import space.kscience.dataforge.workspace.FileData.defaultPathToName import java.nio.file.Files import java.nio.file.Path import java.nio.file.StandardWatchEventKinds @@ -36,20 +35,6 @@ public object FileData { public const val DF_FILE_EXTENSION: String = "df" public val DEFAULT_IGNORE_EXTENSIONS: Set = setOf(DF_FILE_EXTENSION) - /** - * Transform file name into DataForg name. Ignores DataForge file extensions. - */ - public val defaultPathToName: (Path) -> Name = { path -> - Name( - path.map { segment -> - if (segment.isRegularFile() && segment.extension in DEFAULT_IGNORE_EXTENSIONS) { - NameToken(path.nameWithoutExtension) - } else { - NameToken(path.name) - } - } - ) - } } @@ -77,51 +62,68 @@ public fun IOPlugin.readFileData( ) } -public fun DataSink.file(io: IOPlugin, path: Path, name: Name) { +public fun DataSink.file(io: IOPlugin, name: Name, path: Path) { if (!path.isRegularFile()) error("Only regular files could be handled by this function") - data(name, io.readFileData(path)) + put(name, io.readFileData(path)) } public fun DataSink.directory( io: IOPlugin, + name: Name, path: Path, - pathToName: (Path) -> Name = defaultPathToName, ) { if (!path.isDirectory()) error("Only directories could be handled by this function") - val metaFile = path.resolve(IOPlugin.META_FILE_NAME) - val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME) //process root data - if (metaFile.exists() || dataFile.exists()) { - data( - Name.EMPTY, + + var dataBinary: Binary? = null + var meta: Meta? = null + Files.list(path).forEach { childPath -> + val fileName = childPath.fileName.toString() + if (fileName == IOPlugin.DATA_FILE_NAME) { + dataBinary = childPath.asBinary() + } else if (fileName.startsWith(IOPlugin.META_FILE_NAME)) { + meta = io.readMetaFileOrNull(childPath) + } else if (!fileName.startsWith("@")) { + val token = if (childPath.isRegularFile() && childPath.extension in FileData.DEFAULT_IGNORE_EXTENSIONS) { + NameToken(childPath.nameWithoutExtension) + } else { + NameToken(childPath.name) + } + + files(io, name + token, childPath) + } + } + + //set data if it is relevant + if (dataBinary != null || meta != null) { + put( + name, StaticData( typeOf(), - dataFile.takeIf { it.exists() }?.asBinary() ?: Binary.EMPTY, - io.readMetaFileOrNull(metaFile) ?: Meta.EMPTY + dataBinary ?: Binary.EMPTY, + meta ?: Meta.EMPTY ) ) } - Files.list(path).forEach { childPath -> - val fileName = childPath.fileName.toString() - if (!fileName.startsWith("@")) { - files(io, childPath, pathToName) - } - } } -public fun DataSink.files(io: IOPlugin, path: Path, pathToName: (Path) -> Name = defaultPathToName) { +public fun DataSink.files( + io: IOPlugin, + name: Name, + path: Path, +) { if (path.isRegularFile() && path.extension == "zip") { //Using explicit Zip file system to avoid bizarre compatibility bugs val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" } ?: error("Zip file system provider not found") val fs = fsProvider.newFileSystem(path, mapOf("create" to "true")) - return files(io, fs.rootDirectories.first(), pathToName) + files(io, name, fs.rootDirectories.first()) } if (path.isRegularFile()) { - file(io, path, pathToName(path)) + file(io, name, path) } else { - directory(io, path, pathToName) + directory(io, name, path) } } @@ -132,11 +134,11 @@ private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) @DFExperimental public fun DataSink.monitorFiles( io: IOPlugin, + name: Name, path: Path, - pathToName: (Path) -> Name = defaultPathToName, scope: CoroutineScope = io.context, ): Job { - files(io, path, pathToName) + files(io, name, path) return scope.launch(Dispatchers.IO) { val watchService = path.fileSystem.newWatchService() @@ -153,11 +155,11 @@ public fun DataSink.monitorFiles( for (event: WatchEvent<*> in key.pollEvents()) { val eventPath = event.context() as Path if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { - data(eventPath.toName(), null) + put(eventPath.toName(), null) } else { val fileName = eventPath.fileName.toString() if (!fileName.startsWith("@")) { - files(io, eventPath, pathToName) + files(io, name, eventPath) } } } @@ -179,29 +181,24 @@ public suspend fun IOPlugin.writeDataDirectory( dataSet: DataTree, format: IOWriter, envelopeFormat: EnvelopeFormat? = null, - nameToPath: (name: Name, data: Data) -> Path = { name, _ -> - Path(name.tokens.joinToString("/") { token -> token.toStringUnescaped() }) - }, -) { - withContext(Dispatchers.IO) { - if (!Files.exists(path)) { - Files.createDirectories(path) - } else if (!Files.isDirectory(path)) { - error("Can't write a node into file") - } - dataSet.forEach { (name, data) -> - val childPath = path.resolve(nameToPath(name, data)) - childPath.parent.createDirectories() - val envelope = data.toEnvelope(format) - if (envelopeFormat != null) { - writeEnvelopeFile(childPath, envelope, envelopeFormat) - } else { - writeEnvelopeDirectory(childPath, envelope) - } - } - dataSet.meta?.let { writeMetaFile(path, it) } - +): Unit = withContext(Dispatchers.IO) { + if (!Files.exists(path)) { + Files.createDirectories(path) + } else if (!Files.isDirectory(path)) { + error("Can't write a node into file") } + dataSet.forEach { (name, data) -> + val childPath = path.resolve(name.tokens.joinToString("/") { token -> token.toStringUnescaped() }) + childPath.parent.createDirectories() + val envelope = data.toEnvelope(format) + if (envelopeFormat != null) { + writeEnvelopeFile(childPath, envelope, envelopeFormat) + } else { + writeEnvelopeDirectory(childPath, envelope) + } + } + dataSet.meta?.let { writeMetaFile(path, it) } + } /** @@ -212,15 +209,12 @@ public suspend fun IOPlugin.writeDataDirectory( public fun DataSink.resources( io: IOPlugin, vararg resources: String, - pathToName: (Path) -> Name = defaultPathToName, classLoader: ClassLoader = Thread.currentThread().contextClassLoader, ) { resources.forEach { resource -> val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error( "Resource with name $resource is not resolved" ) - branch(resource.asName()) { - files(io, path, pathToName) - } + files(io, resource.asName(), path) } } diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/zipData.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/zipData.kt index 83b617cb..9d175bd3 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/zipData.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/zipData.kt @@ -3,67 +3,36 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.io.* +import space.kscience.dataforge.io.EnvelopeFormat +import space.kscience.dataforge.io.IOPlugin +import space.kscience.dataforge.io.IOWriter import space.kscience.dataforge.misc.DFExperimental -import java.nio.file.Files import java.nio.file.Path -import java.nio.file.StandardOpenOption -import java.util.zip.ZipEntry -import java.util.zip.ZipOutputStream - - -private suspend fun ZipOutputStream.writeNode( - name: String, - tree: DataTree, - dataFormat: IOFormat, - envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat, -): Unit = withContext(Dispatchers.IO) { - //TODO add directory-based envelope writer - tree.data?.let { - val envelope = it.toEnvelope(dataFormat) - val entry = ZipEntry(name) - putNextEntry(entry) - - //TODO remove additional copy - val bytes = ByteArray { - writeWith(envelopeFormat, envelope) - } - write(bytes) - } - - - val entry = ZipEntry("$name/") - putNextEntry(entry) - closeEntry() - tree.items.forEach { (token, item) -> - val childName = "$name/$token" - writeNode(childName, item, dataFormat, envelopeFormat) - } - -} +import java.nio.file.spi.FileSystemProvider +import kotlin.io.path.exists +import kotlin.io.path.extension /** * Write this [DataTree] as a zip archive */ @DFExperimental -public suspend fun DataTree.writeZip( +public suspend fun IOPlugin.writeZip( path: Path, - format: IOFormat, - envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat, + dataSet: DataTree, + format: IOWriter, + envelopeFormat: EnvelopeFormat? = null, ): Unit = withContext(Dispatchers.IO) { - val actualFile = if (path.toString().endsWith(".zip")) { + if (path.exists()) error("Can't override existing zip data file $path") + val actualFile = if (path.extension == "zip") { path } else { path.resolveSibling(path.fileName.toString() + ".zip") } - val fos = Files.newOutputStream( - actualFile, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING - ) - val zos = ZipOutputStream(fos) - zos.use { - it.writeNode("", this@writeZip, format, envelopeFormat) + val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" } + ?: error("Zip file system provider not found") + //val fs = FileSystems.newFileSystem(actualFile, mapOf("create" to true)) + val fs = fsProvider.newFileSystem(actualFile, mapOf("create" to true)) + fs.use { + writeDataDirectory(fs.rootDirectories.first(), dataSet, format, envelopeFormat) } -} \ No newline at end of file +} diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt index 78ce853e..e5c2c230 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt @@ -1,17 +1,16 @@ package space.kscience.dataforge.workspace -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test -import space.kscience.dataforge.data.static +import space.kscience.dataforge.data.wrap import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.get import space.kscience.dataforge.misc.DFExperimental import kotlin.test.assertEquals -@OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class) +@OptIn(DFExperimental::class) internal class CachingWorkspaceTest { @Test @@ -23,7 +22,7 @@ internal class CachingWorkspaceTest { data { //statically initialize data repeat(5) { - static("myData[$it]", it) + wrap("myData[$it]", it) } } @@ -39,7 +38,7 @@ internal class CachingWorkspaceTest { val doSecond by task { transformEach( doFirst, - dependencyMeta = if(taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY + dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY ) { _, name, _ -> secondCounter++ println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}") @@ -51,13 +50,15 @@ internal class CachingWorkspaceTest { val secondA = workspace.produce("doSecond") val secondB = workspace.produce("doSecond", Meta { "flag" put true }) val secondC = workspace.produce("doSecond") + //use coroutineScope to wait for the result coroutineScope { - first.compute(this) - secondA.compute(this) - secondB.compute(this) + first.launch(this) + secondA.launch(this) + secondB.launch(this) //repeat to check caching - secondC.compute(this) + secondC.launch(this) } + assertEquals(10, firstCounter) assertEquals(10, secondCounter) } diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt index 403b1c51..d611b1c8 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/DataPropagationTest.kt @@ -20,13 +20,13 @@ class DataPropagationTestPlugin : WorkspacePlugin() { val result: Data = selectedData.foldToData(0) { result, data -> result + data.value } - data("result", result) + put("result", result) } val singleData by task { workspace.data.filterByType()["myData[12]"]?.let { - data("result", it) + put("result", it) } } @@ -47,7 +47,7 @@ class DataPropagationTest { } data { repeat(100) { - static("myData[$it]", it) + wrap("myData[$it]", it) } } } diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt index 1a21fb17..10a1c268 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt @@ -1,6 +1,5 @@ package space.kscience.dataforge.workspace -import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.io.Sink import kotlinx.io.Source @@ -13,7 +12,9 @@ import space.kscience.dataforge.io.* import space.kscience.dataforge.io.yaml.YamlPlugin import space.kscience.dataforge.meta.get import space.kscience.dataforge.misc.DFExperimental +import space.kscience.dataforge.names.Name import java.nio.file.Files +import kotlin.io.path.deleteExisting import kotlin.io.path.fileSize import kotlin.io.path.toPath import kotlin.test.Test @@ -23,11 +24,11 @@ import kotlin.test.assertEquals class FileDataTest { val dataNode = DataTree { branch("dir") { - static("a", "Some string") { + wrap("a", "Some string") { "content" put "Some string" } } - static("b", "root data") + wrap("b", "root data") // meta { // "content" put "This is root meta node" // } @@ -45,17 +46,17 @@ class FileDataTest { @Test @DFExperimental - fun testDataWriteRead() = with(Global.io) { + fun testDataWriteRead() = runTest { val io = Global.io val dir = Files.createTempDirectory("df_data_node") - runBlocking { - writeDataDirectory(dir, dataNode, StringIOFormat) - println(dir.toUri().toString()) - val reconstructed = DataTree { files(io, dir) } - .transform { (_, value) -> value.toByteArray().decodeToString() } - assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) - assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) + io.writeDataDirectory(dir, dataNode, StringIOFormat) + println(dir.toUri().toString()) + val data = DataTree { + files(io, Name.EMPTY, dir) } + val reconstructed = data.transform { (_, value) -> value.toByteArray().decodeToString() } + assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) + assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) } @@ -64,9 +65,10 @@ class FileDataTest { fun testZipWriteRead() = runTest { val io = Global.io val zip = Files.createTempFile("df_data_node", ".zip") - dataNode.writeZip(zip, StringIOFormat) + zip.deleteExisting() + io.writeZip(zip, dataNode, StringIOFormat) println(zip.toUri().toString()) - val reconstructed = DataTree { files(io, zip) } + val reconstructed = DataTree { files(io, Name.EMPTY, zip) } .transform { (_, value) -> value.toByteArray().decodeToString() } assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt index ee497e1b..0f16b1c8 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt @@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test -import space.kscience.dataforge.data.static +import space.kscience.dataforge.data.wrap import space.kscience.dataforge.misc.DFExperimental import java.nio.file.Files @@ -16,7 +16,7 @@ class FileWorkspaceCacheTest { data { //statically initialize data repeat(5) { - static("myData[$it]", it) + wrap("myData[$it]", it) } } fileCache(Files.createTempDirectory("dataforge-temporary-cache")) @@ -26,7 +26,7 @@ class FileWorkspaceCacheTest { } } - workspace.produce("echo").compute(this) + workspace.produce("echo").launch(this) } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt index 91fa2f6c..b49b9d54 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/SimpleWorkspaceTest.kt @@ -62,14 +62,14 @@ internal class SimpleWorkspaceTest { data { //statically initialize data repeat(100) { - static("myData[$it]", it) + wrap("myData[$it]", it) } } val filterOne by task { val name by taskMeta.string { error("Name field not defined") } from(testPluginFactory) { test }[name]?.let { source: Data -> - data(name, source) + put(name, source) } } @@ -97,7 +97,7 @@ internal class SimpleWorkspaceTest { val newData: Data = data.combine(linearData[data.name]!!) { l, r -> l + r } - data(data.name, newData) + put(data.name, newData) } } @@ -106,7 +106,7 @@ internal class SimpleWorkspaceTest { val res = from(square).foldToData(0) { l, r -> l + r.value } - data("sum", res) + put("sum", res) } val averageByGroup by task { @@ -116,13 +116,13 @@ internal class SimpleWorkspaceTest { l + r.value } - data("even", evenSum) + put("even", evenSum) val oddSum = workspace.data.filterByType { name, _, _ -> name.toString().toInt() % 2 == 1 }.foldToData(0) { l, r -> l + r.value } - data("odd", oddSum) + put("odd", oddSum) } val delta by task { @@ -132,7 +132,7 @@ internal class SimpleWorkspaceTest { val res = even.combine(odd) { l, r -> l - r } - data("res", res) + put("res", res) } val customPipe by task { @@ -140,7 +140,7 @@ internal class SimpleWorkspaceTest { val meta = data.meta.toMutableMeta().apply { "newValue" put 22 } - data(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it }) + put(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it }) } }