Data tree refactored to a uniform tree instead of sealed class.

This commit is contained in:
Alexander Nozik 2024-02-03 17:25:49 +03:00
parent 90999f424f
commit 466e460989
23 changed files with 353 additions and 346 deletions

View File

@ -1,9 +1,11 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.startsWith import space.kscience.dataforge.names.startsWith
import kotlin.reflect.KType import kotlin.reflect.KType
@ -33,26 +35,38 @@ public abstract class AbstractAction<T : Any, R : Any>(
/** /**
* Update part of the data set using provided data * Update part of the data set using provided data
*
* @param source the source data tree in case we need several data items to update
*/ */
protected open fun DataSink<R>.update( protected open fun DataSink<R>.update(
allData: DataTree<T>, source: DataTree<T>,
meta: Meta, meta: Meta,
namedData: NamedData<T>, namedData: NamedData<T>,
){ ){
//by default regenerate the whole data set //by default regenerate the whole data set
generate(allData,meta) generate(source,meta)
} }
@OptIn(DFInternal::class)
override fun execute( override fun execute(
scope: CoroutineScope,
dataSet: DataTree<T>, dataSet: DataTree<T>,
meta: Meta, meta: Meta,
): ObservableDataTree<R> = MutableDataTree<R>(outputType, scope).apply { ): DataTree<R> = if(dataSet.isObservable()) {
MutableDataTree<R>(outputType, dataSet.updatesScope).apply {
generate(dataSet, meta) generate(dataSet, meta)
scope.launch { dataSet.updates().onEach {
dataSet.updates().collect {
update(dataSet, meta, it) update(dataSet, meta, it)
} }.launchIn(updatesScope)
//close updates when the source is closed
updatesScope.launch {
dataSet.awaitClose()
close()
}
}
} else{
DataTree(outputType){
generate(dataSet, meta)
} }
} }
} }

View File

@ -1,9 +1,6 @@
package space.kscience.dataforge.actions package space.kscience.dataforge.actions
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.coroutineScope
import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
@ -16,7 +13,7 @@ public fun interface Action<T, R> {
* Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy * Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
* so not actual computation is started at this moment. * so not actual computation is started at this moment.
*/ */
public fun execute(scope: CoroutineScope, dataSet: DataTree<T>, meta: Meta): ObservableDataTree<R> public fun execute(dataSet: DataTree<T>, meta: Meta): DataTree<R>
public companion object public companion object
} }
@ -26,20 +23,21 @@ public fun interface Action<T, R> {
*/ */
public fun <T, R> DataTree<T>.transform( public fun <T, R> DataTree<T>.transform(
action: Action<T, R>, action: Action<T, R>,
scope: CoroutineScope,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): DataTree<R> = action.execute(scope, this, meta) ): DataTree<R> = action.execute(this, meta)
/** /**
* Action composition. The result is terminal if one of its parts is terminal * Action composition. The result is terminal if one of its parts is terminal
*/ */
public infix fun <T , I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> = public infix fun <T, I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> = Action { dataSet, meta ->
Action { scope, dataSet, meta -> action.execute(scope, this@then.execute(scope, dataSet, meta), meta) } action.execute(this@then.execute(dataSet, meta), meta)
}
@DFExperimental @DFExperimental
public suspend operator fun <T, R> Action<T, R>.invoke( public operator fun <T, R> Action<T, R>.invoke(
dataSet: DataTree<T>, dataSet: DataTree<T>,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): DataTree<R> = coroutineScope { execute(this, dataSet, meta) } ): DataTree<R> = execute(dataSet, meta)

View File

@ -79,14 +79,14 @@ internal class MapAction<T : Any, R : Any>(
builder.result(env, data.await()) builder.result(env, data.await())
} }
//setting the data node //setting the data node
data(newName, newData) put(newName, newData)
} }
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) { override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
data.forEach { mapOne(it.name, it.data, meta) } data.forEach { mapOne(it.name, it.data, meta) }
} }
override fun DataSink<R>.update(allData: DataTree<T>, meta: Meta, namedData: NamedData<T>) { override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, namedData: NamedData<T>) {
mapOne(namedData.name, namedData.data, namedData.meta) mapOne(namedData.name, namedData.data, namedData.meta)
} }
} }

View File

@ -103,7 +103,7 @@ internal class ReduceAction<T : Any, R : Any>(
meta = groupMeta meta = groupMeta
) { group.result.invoke(env, it) } ) { group.result.invoke(env, it) }
data(env.name, res) put(env.name, res)
} }
} }
} }

View File

@ -64,7 +64,7 @@ internal class SplitAction<T : Any, R : Any>(
).apply(rule) ).apply(rule)
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) //data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
data( put(
fragmentName, fragmentName,
@Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) { @Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) {
env.result(data.await()) env.result(data.await())
@ -77,7 +77,7 @@ internal class SplitAction<T : Any, R : Any>(
data.forEach { splitOne(it.name, it.data, meta) } data.forEach { splitOne(it.name, it.data, meta) }
} }
override fun DataSink<R>.update(allData: DataTree<T>, meta: Meta, namedData: NamedData<T>) { override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, namedData: NamedData<T>) {
splitOne(namedData.name, namedData.data, namedData.meta) splitOne(namedData.name, namedData.data, namedData.meta)
} }
} }

View File

@ -1,11 +1,11 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.*
import kotlinx.coroutines.flow.* import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.*
import kotlin.contracts.contract
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
@ -87,7 +87,7 @@ public operator fun <T> DataTree<T>.get(name: String): Data<T>? = read(name.pars
public fun <T> DataTree<T>.asSequence( public fun <T> DataTree<T>.asSequence(
namePrefix: Name = Name.EMPTY, namePrefix: Name = Name.EMPTY,
): Sequence<NamedData<T>> = sequence { ): Sequence<NamedData<T>> = sequence {
data?.let { yield(it.named(Name.EMPTY)) } data?.let { yield(it.named(namePrefix)) }
items.forEach { (token, tree) -> items.forEach { (token, tree) ->
yieldAll(tree.asSequence(namePrefix + token)) yieldAll(tree.asSequence(namePrefix + token))
} }
@ -113,8 +113,8 @@ public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEm
@PublishedApi @PublishedApi
internal class FlatDataTree<T>( internal class FlatDataTree<T>(
override val dataType: KType, override val dataType: KType,
val dataSet: Map<Name, Data<T>>, private val dataSet: Map<Name, Data<T>>,
val prefix: Name, private val prefix: Name,
) : GenericDataTree<T, FlatDataTree<T>> { ) : GenericDataTree<T, FlatDataTree<T>> {
override val self: FlatDataTree<T> get() = this override val self: FlatDataTree<T> get() = this
override val data: Data<T>? get() = dataSet[prefix] override val data: Data<T>? get() = dataSet[prefix]
@ -141,20 +141,56 @@ internal fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> = public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> =
FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY) FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY)
public interface GenericObservableDataTree<out T, out TR : GenericObservableDataTree<T, TR>> : GenericDataTree<T, TR>, public interface GenericObservableDataTree<out T, out TR : GenericObservableDataTree<T, TR>> :
ObservableDataSource<T> GenericDataTree<T, TR>, ObservableDataSource<T>, AutoCloseable {
/**
* A scope that is used to propagate updates. When this scope is closed, no new updates could arrive.
*/
public val updatesScope: CoroutineScope
/**
* Close this data tree updates channel
*/
override fun close() {
updatesScope.cancel()
}
}
public typealias ObservableDataTree<T> = GenericObservableDataTree<T, GenericObservableDataTree<T, *>> public typealias ObservableDataTree<T> = GenericObservableDataTree<T, GenericObservableDataTree<T, *>>
public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> = if (this is GenericObservableDataTree<T,*>) updates() else emptyFlow() /**
* Check if the [DataTree] is observable
public fun interface DataSink<in T> { */
public fun data(name: Name, data: Data<T>?) public fun <T> DataTree<T>.isObservable(): Boolean {
contract {
returns(true) implies (this@isObservable is GenericObservableDataTree<T, *>)
}
return this is GenericObservableDataTree<T, *>
} }
/**
* Wait for this data tree to stop spawning updates (updatesScope is closed).
* If this [DataTree] is not observable, return immediately.
*/
public suspend fun <T> DataTree<T>.awaitClose() {
if (isObservable()) {
updatesScope.coroutineContext[Job]?.join()
}
}
public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> =
if (this is GenericObservableDataTree<T, *>) updates() else emptyFlow()
public fun interface DataSink<in T> {
public fun put(name: Name, data: Data<T>?)
}
@DFInternal
public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> { public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> {
private val map = HashMap<Name, Data<T>>() private val map = HashMap<Name, Data<T>>()
override fun data(name: Name, data: Data<T>?) { override fun put(name: Name, data: Data<T>?) {
if (data == null) { if (data == null) {
map.remove(name) map.remove(name)
} else { } else {
@ -174,6 +210,7 @@ public inline fun <T> DataTree(
/** /**
* Create and a data tree. * Create and a data tree.
*/ */
@OptIn(DFInternal::class)
public inline fun <reified T> DataTree( public inline fun <reified T> DataTree(
generator: DataSink<T>.() -> Unit, generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(typeOf<T>()).apply(generator).build() ): DataTree<T> = DataTreeBuilder<T>(typeOf<T>()).apply(generator).build()
@ -182,77 +219,88 @@ public inline fun <reified T> DataTree(
* A mutable version of [GenericDataTree] * A mutable version of [GenericDataTree]
*/ */
public interface MutableDataTree<T> : GenericObservableDataTree<T, MutableDataTree<T>>, DataSink<T> { public interface MutableDataTree<T> : GenericObservableDataTree<T, MutableDataTree<T>>, DataSink<T> {
public val scope: CoroutineScope
override var data: Data<T>? override var data: Data<T>?
override val items: Map<NameToken, MutableDataTree<T>> override val items: Map<NameToken, MutableDataTree<T>>
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public operator fun set(token: NameToken, data: Data<T>?) public operator fun set(token: NameToken, data: Data<T>?)
override fun data(name: Name, data: Data<T>?): Unit = set(name, data) override fun put(name: Name, data: Data<T>?): Unit = set(name, data)
} }
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit { public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
when (name.length) { when (name.length) {
0 -> this.data = data 0 -> this.data = data
1 -> set(name.first(), data) 1 -> set(name.first(), data)
else -> items[name.first()]?.set(name.cutFirst(), data) else -> getOrCreateItem(name.first())[name.cutFirst()] = data
} }
} }
private class ObservableMutableDataTreeImpl<T>( private class MutableDataTreeImpl<T>(
override val dataType: KType, override val dataType: KType,
override val scope: CoroutineScope, override val updatesScope: CoroutineScope,
) : MutableDataTree<T> { ) : MutableDataTree<T> {
private val updates = MutableSharedFlow<NamedData<T>>() private val updates = MutableSharedFlow<NamedData<T>>()
private val children = HashMap<NameToken, MutableDataTree<T>>() private val children = HashMap<NameToken, MutableDataTree<T>>()
override var data: Data<T>? = null override var data: Data<T>? = null
set(value) { set(value) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
field = value field = value
if (value != null) { if (value != null) {
scope.launch { updatesScope.launch {
updates.emit(value.named(Name.EMPTY)) updates.emit(value.named(Name.EMPTY))
} }
} }
} }
override val items: Map<NameToken, MutableDataTree<T>> get() = children override val items: Map<NameToken, MutableDataTree<T>> get() = children
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = children.getOrPut(token){
MutableDataTreeImpl(dataType, updatesScope)
}
override val self: MutableDataTree<T> get() = this override val self: MutableDataTree<T> get() = this
override fun set(token: NameToken, data: Data<T>?) { override fun set(token: NameToken, data: Data<T>?) {
children.getOrPut(token) { if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
ObservableMutableDataTreeImpl<T>(dataType, scope).also { subTree -> val subTree = getOrCreateItem(token)
subTree.updates().onEach { subTree.updates().onEach {
updates.emit(it.named(token + it.name)) updates.emit(it.named(token + it.name))
}.launchIn(scope) }.launchIn(updatesScope)
} subTree.data = data
}.data = data
} }
override fun updates(): Flow<NamedData<T>> = flow { override fun updates(): Flow<NamedData<T>> = updates
//emit this node updates
updates.collect {
emit(it)
}
}
} }
/**
* Create a new [MutableDataTree]
*
* @param parentScope a [CoroutineScope] to control data propagation. By default uses [GlobalScope]
*/
@OptIn(DelicateCoroutinesApi::class)
public fun <T> MutableDataTree( public fun <T> MutableDataTree(
type: KType, type: KType,
scope: CoroutineScope parentScope: CoroutineScope = GlobalScope,
): MutableDataTree<T> = ObservableMutableDataTreeImpl<T>(type, scope) ): MutableDataTree<T> = MutableDataTreeImpl<T>(
type,
CoroutineScope(parentScope.coroutineContext + Job(parentScope.coroutineContext[Job]))
)
/** /**
* Create and initialize a observable mutable data tree. * Create and initialize a observable mutable data tree.
*/ */
@OptIn(DelicateCoroutinesApi::class)
public inline fun <reified T> MutableDataTree( public inline fun <reified T> MutableDataTree(
scope: CoroutineScope, parentScope: CoroutineScope = GlobalScope,
generator: MutableDataTree<T>.() -> Unit = {}, generator: MutableDataTree<T>.() -> Unit = {},
): MutableDataTree<T> = MutableDataTree<T>(typeOf<T>(), scope).apply { generator() } ): MutableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply { generator() }
//@DFInternal //@DFInternal
//public fun <T> ObservableDataTree( //public fun <T> ObservableDataTree(
@ -262,18 +310,21 @@ public inline fun <reified T> MutableDataTree(
//): ObservableDataTree<T> = MutableDataTree<T>(type, scope.coroutineContext).apply(generator) //): ObservableDataTree<T> = MutableDataTree<T>(type, scope.coroutineContext).apply(generator)
public inline fun <reified T> ObservableDataTree( public inline fun <reified T> ObservableDataTree(
scope: CoroutineScope, parentScope: CoroutineScope,
generator: MutableDataTree<T>.() -> Unit = {}, generator: MutableDataTree<T>.() -> Unit = {},
): ObservableDataTree<T> = MutableDataTree<T>(typeOf<T>(), scope).apply(generator) ): ObservableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply(generator)
/** /**
* Collect a [Sequence] into an observable tree with additional [updates] * Collect a [Sequence] into an observable tree with additional [updates]
*/ */
public fun <T> Sequence<NamedData<T>>.toObservableTree(dataType: KType, scope: CoroutineScope, updates: Flow<NamedData<T>>): ObservableDataTree<T> = public fun <T> Sequence<NamedData<T>>.toObservableTree(
MutableDataTree<T>(dataType, scope).apply { dataType: KType,
emitAll(this@toObservableTree) parentScope: CoroutineScope,
updates: Flow<NamedData<T>>,
): ObservableDataTree<T> = MutableDataTree<T>(dataType, parentScope).apply {
this.putAll(this@toObservableTree)
updates.onEach { updates.onEach {
data(it.name, it.data) put(it.name, it.data)
}.launchIn(scope) }.launchIn(updatesScope)
} }

View File

@ -17,6 +17,7 @@ package space.kscience.dataforge.data
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
public interface GroupRule { public interface GroupRule {
public fun <T : Any> gather(set: DataTree<T>): Map<String, DataTree<T>> public fun <T : Any> gather(set: DataTree<T>): Map<String, DataTree<T>>
@ -30,6 +31,7 @@ public interface GroupRule {
* @param defaultTagValue * @param defaultTagValue
* @return * @return
*/ */
@OptIn(DFInternal::class)
public fun byMetaValue( public fun byMetaValue(
key: String, key: String,
defaultTagValue: String, defaultTagValue: String,
@ -42,7 +44,7 @@ public interface GroupRule {
set.forEach { data -> set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.data(data.name,data.data) map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.put(data.name, data.data)
} }

View File

@ -12,16 +12,12 @@ import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.plus
public fun <T> DataSink<T>.data(value: NamedData<T>) { public fun <T> DataSink<T>.put(value: NamedData<T>) {
data(value.name, value.data) put(value.name, value.data)
}
public fun <T> DataSink<T>.emitAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach { data(it) }
} }
public fun <T> DataSink<T>.branch(dataTree: DataTree<T>) { public fun <T> DataSink<T>.branch(dataTree: DataTree<T>) {
emitAll(dataTree.asSequence()) putAll(dataTree.asSequence())
} }
public inline fun <T> DataSink<T>.branch( public inline fun <T> DataSink<T>.branch(
@ -32,7 +28,7 @@ public inline fun <T> DataSink<T>.branch(
apply(block) apply(block)
} else { } else {
val proxyDataSink = DataSink { nameWithoutPrefix, data -> val proxyDataSink = DataSink { nameWithoutPrefix, data ->
this.data(prefix + nameWithoutPrefix, data) this.put(prefix + nameWithoutPrefix, data)
} }
proxyDataSink.apply(block) proxyDataSink.apply(block)
@ -45,69 +41,69 @@ public inline fun <T> DataSink<T>.branch(
): Unit = branch(prefix.asName(), block) ): Unit = branch(prefix.asName(), block)
public fun <T> DataSink<T>.data(name: String, value: Data<T>) { public fun <T> DataSink<T>.put(name: String, value: Data<T>) {
data(Name.parse(name), value) put(Name.parse(name), value)
} }
public fun <T> DataSink<T>.branch(name: Name, set: DataTree<T>) { public fun <T> DataSink<T>.branch(name: Name, set: DataTree<T>) {
branch(name) { emitAll(set.asSequence()) } branch(name) { putAll(set.asSequence()) }
} }
public fun <T> DataSink<T>.branch(name: String, set: DataTree<T>) { public fun <T> DataSink<T>.branch(name: String, set: DataTree<T>) {
branch(Name.parse(name)) { emitAll(set.asSequence()) } branch(Name.parse(name)) { putAll(set.asSequence()) }
} }
/** /**
* Produce lazy [Data] and emit it into the [MutableDataTree] * Produce lazy [Data] and emit it into the [MutableDataTree]
*/ */
public inline fun <reified T> DataSink<T>.data( public inline fun <reified T> DataSink<T>.put(
name: String, name: String,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
) { ) {
val data = Data(meta, block = producer) val data = Data(meta, block = producer)
data(name, data) put(name, data)
} }
public inline fun <reified T> DataSink<T>.data( public inline fun <reified T> DataSink<T>.put(
name: Name, name: Name,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
) { ) {
val data = Data(meta, block = producer) val data = Data(meta, block = producer)
data(name, data) put(name, data)
} }
/** /**
* Emit static data with the fixed value * Emit static data with the fixed value
*/ */
public inline fun <reified T> DataSink<T>.static( public inline fun <reified T> DataSink<T>.wrap(
name: String, name: String,
data: T, data: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta)) ): Unit = put(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.static( public inline fun <reified T> DataSink<T>.wrap(
name: Name, name: Name,
data: T, data: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta)) ): Unit = put(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.static( public inline fun <reified T> DataSink<T>.wrap(
name: String, name: String,
data: T, data: T,
mutableMeta: MutableMeta.() -> Unit, mutableMeta: MutableMeta.() -> Unit,
): Unit = data(Name.parse(name), Data.static(data, Meta(mutableMeta))) ): Unit = put(Name.parse(name), Data.static(data, Meta(mutableMeta)))
public fun <T> DataSink<T>.populateFrom(sequence: Sequence<NamedData<T>>) { public fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach { sequence.forEach {
data(it.name, it.data) put(it.name, it.data)
} }
} }
public fun <T> DataSink<T>.populateFrom(tree: DataTree<T>) { public fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
populateFrom(tree.asSequence()) this.putAll(tree.asSequence())
} }
@ -115,13 +111,22 @@ public fun <T> DataSink<T>.populateFrom(tree: DataTree<T>) {
* Update data with given node data and meta with node meta. * Update data with given node data and meta with node meta.
*/ */
@DFExperimental @DFExperimental
public fun <T> MutableDataTree<T>.populateFrom(flow: ObservableDataSource<T>): Job = flow.updates().onEach { public fun <T> MutableDataTree<T>.putAll(source: DataTree<T>) {
//TODO check if the place is occupied source.forEach {
data(it.name, it.data) put(it.name, it.data)
}.launchIn(scope) }
}
//public fun <T > DataSetBuilder<T>.populateFrom(flow: Flow<NamedData<T>>) { /**
// flow.collect { * Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job]
// data(it.name, it.data) */
// } public fun <T : Any> DataSink<T>.watchBranch(
//} name: Name,
dataSet: ObservableDataTree<T>,
): Job {
branch(name, dataSet)
return dataSet.updates().onEach {
put(name + it.name, it.data)
}.launchIn(dataSet.updatesScope)
}

View File

@ -194,7 +194,7 @@ public suspend fun <T, R> DataTree<T>.transform(
val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) { val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) {
block(namedData.awaitWithMeta()) block(namedData.awaitWithMeta())
} }
data(namedData.name, d) put(namedData.name, d)
} }
} }

View File

@ -1,10 +1,6 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
/** /**
@ -12,7 +8,7 @@ import space.kscience.dataforge.names.plus
*/ */
context(DataSink<T>) context(DataSink<T>)
public infix fun <T : Any> String.put(data: Data<T>): Unit = public infix fun <T : Any> String.put(data: Data<T>): Unit =
data(Name.parse(this), data) put(Name.parse(this), data)
/** /**
* Append node * Append node
@ -29,16 +25,3 @@ public infix fun <T : Any> String.put(
block: DataSink<T>.() -> Unit, block: DataSink<T>.() -> Unit,
): Unit = branch(Name.parse(this), block) ): Unit = branch(Name.parse(this), block)
/**
* Copy given data set and mirror its changes to this [LegacyDataTreeBuilder] in [this@setAndObserve]. Returns an update [Job]
*/
context(DataSink<T>)
public fun <T : Any> CoroutineScope.setAndWatch(
name: Name,
dataSet: DataTree<T>,
): Job = launch {
branch(name, dataSet)
dataSet.updates().collect {
data(name + it.name, it.data)
}
}

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.actions.Action import space.kscience.dataforge.actions.Action
@ -10,13 +10,13 @@ import space.kscience.dataforge.actions.mapping
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals import kotlin.test.assertEquals
@OptIn(DFExperimental::class, ExperimentalCoroutinesApi::class) @OptIn(DFExperimental::class)
internal class ActionsTest { internal class ActionsTest {
@Test @Test
fun testStaticMapAction() = runTest { fun testStaticMapAction() = runTest {
val data: DataTree<Int> = DataTree { val data: DataTree<Int> = DataTree {
repeat(10) { repeat(10) {
static(it.toString(), it) wrap(it.toString(), it)
} }
} }
@ -28,20 +28,24 @@ internal class ActionsTest {
} }
@Test @Test
fun testDynamicMapAction() = runTest { fun testDynamicMapAction() = runBlocking {
val data: MutableDataTree<Int> = MutableDataTree(this) val source: MutableDataTree<Int> = MutableDataTree()
val plusOne = Action.mapping<Int, Int> { val plusOne = Action.mapping<Int, Int> {
result { it + 1 } result { it + 1 }
} }
val result = plusOne(data) val result = plusOne(source)
repeat(10) { repeat(10) {
data.static(it.toString(), it) source.wrap(it.toString(), it)
} }
delay(20) delay(10)
source.close()
result.awaitClose()
assertEquals(2, result["1"]?.await()) assertEquals(2, result["1"]?.await())
} }

View File

@ -1,6 +1,8 @@
package space.kscience.dataforge.data package space.kscience.dataforge.data
import kotlinx.coroutines.* import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import kotlin.test.Test import kotlin.test.Test
@ -9,26 +11,25 @@ import kotlin.test.assertEquals
internal class DataTreeBuilderTest { internal class DataTreeBuilderTest {
@Test @Test
fun testTreeBuild() = runBlocking { fun testTreeBuild() = runTest {
val node = DataTree<Any> { val node = DataTree<Any> {
"primary" put { "primary" put {
static("a", "a") wrap("a", "a")
static("b", "b") wrap("b", "b")
} }
static("c.d", "c.d") wrap("c.d", "c.d")
static("c.f", "c.f") wrap("c.f", "c.f")
} }
runBlocking {
assertEquals("a", node["primary.a"]?.await()) assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await()) assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await()) assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await()) assertEquals("c.f", node["c.f"]?.await())
}
} }
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
@Test @Test
fun testDataUpdate() = runBlocking { fun testDataUpdate() = runTest {
val updateData = DataTree<Any> { val updateData = DataTree<Any> {
"update" put { "update" put {
"a" put Data.static("a") "a" put Data.static("a")
@ -38,54 +39,30 @@ internal class DataTreeBuilderTest {
val node = DataTree<Any> { val node = DataTree<Any> {
"primary" put { "primary" put {
static("a", "a") wrap("a", "a")
static("b", "b") wrap("b", "b")
} }
static("root", "root") wrap("root", "root")
populateFrom(updateData) putAll(updateData)
} }
runBlocking {
assertEquals("a", node["update.a"]?.await()) assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await()) assertEquals("a", node["primary.a"]?.await())
} }
}
@Test @Test
fun testDynamicUpdates() = runBlocking { fun testDynamicUpdates() = runBlocking {
try { val subNode = MutableDataTree<Int>()
lateinit var updateJob: Job
supervisorScope { val rootNode = MutableDataTree<Int> {
val subNode = ObservableDataTree<Int>(this) { watchBranch("sub".asName(), subNode)
updateJob = launch { }
repeat(10) { repeat(10) {
delay(10) subNode.wrap("value[$it]", it)
static("value", it)
}
delay(10)
}
}
launch {
subNode.updates().collect {
println(it)
}
}
val rootNode = ObservableDataTree<Int>(this) {
setAndWatch("sub".asName(), subNode)
}
launch {
rootNode.updates().collect {
println(it)
}
}
updateJob.join()
assertEquals(9, rootNode["sub.value"]?.await())
cancel()
}
} catch (t: Throwable) {
if (t !is CancellationException) throw t
} }
delay(20)
assertEquals(9, rootNode["sub.value[9]"]?.await())
} }
} }

View File

@ -88,12 +88,17 @@ public fun <T : Any> Task(
workspace: Workspace, workspace: Workspace,
taskName: Name, taskName: Name,
taskMeta: Meta, taskMeta: Meta,
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) { ): TaskResult<T> {
//TODO use safe builder and check for external data on add and detects cycles //TODO use safe builder and check for external data on add and detects cycles
val dataset = MutableDataTree<T>(resultType, this).apply { val dataset = MutableDataTree<T>(resultType, workspace.context).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder() } TaskResultBuilder(workspace, taskName, taskMeta, this).apply {
withContext(GoalExecutionRestriction() + workspace.goalLogger) {
builder()
} }
workspace.wrapResult(dataset, taskName, taskMeta) }
}
return workspace.wrapResult(dataset, taskName, taskMeta)
} }
} }

View File

@ -2,6 +2,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import space.kscience.dataforge.data.ObservableDataTree import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.data.asSequence import space.kscience.dataforge.data.asSequence
@ -32,8 +33,9 @@ public fun <T> Workspace.wrapResult(data: ObservableDataTree<T>, taskName: Name,
* Start computation for all data elements of this node. * Start computation for all data elements of this node.
* The resulting [Job] is completed only when all of them are completed. * The resulting [Job] is completed only when all of them are completed.
*/ */
public fun TaskResult<*>.compute(scope: CoroutineScope): Job = scope.launch { public fun TaskResult<*>.launch(scope: CoroutineScope): Job {
asSequence().forEach { val jobs = asSequence().map {
it.data.launch(scope) it.data.launch(scope)
} }.toList()
return scope.launch { jobs.joinAll() }
} }

View File

@ -102,7 +102,7 @@ public inline fun <T : Any, reified R : Any> TaskContainer.action(
noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {},
): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<R>>> = ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<R>>> =
task(MetaDescriptor(descriptorBuilder)) { task(MetaDescriptor(descriptorBuilder)) {
result(action.execute(workspace.context, from(selector), taskMeta.copy(metaTransform))) result(action.execute(from(selector), taskMeta.copy(metaTransform)))
} }
public class WorkspaceBuilder( public class WorkspaceBuilder(

View File

@ -93,7 +93,7 @@ public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach(
action(it, data.name, meta) action(it, data.name, meta)
} }
data(data.name, res) put(data.name, res)
} }
} }
@ -113,7 +113,7 @@ public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom(
action: Action<T, R>, action: Action<T, R>,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
) { ) {
branch(action.execute(workspace.context, from(selector, dependencyMeta), dependencyMeta)) branch(action.execute(from(selector, dependencyMeta), dependencyMeta))
} }

View File

@ -11,7 +11,6 @@ import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.FileData.defaultPathToName
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds import java.nio.file.StandardWatchEventKinds
@ -36,20 +35,6 @@ public object FileData {
public const val DF_FILE_EXTENSION: String = "df" public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION) public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
/**
* Transform file name into DataForg name. Ignores DataForge file extensions.
*/
public val defaultPathToName: (Path) -> Name = { path ->
Name(
path.map { segment ->
if (segment.isRegularFile() && segment.extension in DEFAULT_IGNORE_EXTENSIONS) {
NameToken(path.nameWithoutExtension)
} else {
NameToken(path.name)
}
}
)
}
} }
@ -77,51 +62,68 @@ public fun IOPlugin.readFileData(
) )
} }
public fun DataSink<Binary>.file(io: IOPlugin, path: Path, name: Name) { public fun DataSink<Binary>.file(io: IOPlugin, name: Name, path: Path) {
if (!path.isRegularFile()) error("Only regular files could be handled by this function") if (!path.isRegularFile()) error("Only regular files could be handled by this function")
data(name, io.readFileData(path)) put(name, io.readFileData(path))
} }
public fun DataSink<Binary>.directory( public fun DataSink<Binary>.directory(
io: IOPlugin, io: IOPlugin,
name: Name,
path: Path, path: Path,
pathToName: (Path) -> Name = defaultPathToName,
) { ) {
if (!path.isDirectory()) error("Only directories could be handled by this function") if (!path.isDirectory()) error("Only directories could be handled by this function")
val metaFile = path.resolve(IOPlugin.META_FILE_NAME)
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
//process root data //process root data
if (metaFile.exists() || dataFile.exists()) {
data( var dataBinary: Binary? = null
Name.EMPTY, var meta: Meta? = null
StaticData(
typeOf<Binary>(),
dataFile.takeIf { it.exists() }?.asBinary() ?: Binary.EMPTY,
io.readMetaFileOrNull(metaFile) ?: Meta.EMPTY
)
)
}
Files.list(path).forEach { childPath -> Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString() val fileName = childPath.fileName.toString()
if (!fileName.startsWith("@")) { if (fileName == IOPlugin.DATA_FILE_NAME) {
files(io, childPath, pathToName) dataBinary = childPath.asBinary()
} else if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta = io.readMetaFileOrNull(childPath)
} else if (!fileName.startsWith("@")) {
val token = if (childPath.isRegularFile() && childPath.extension in FileData.DEFAULT_IGNORE_EXTENSIONS) {
NameToken(childPath.nameWithoutExtension)
} else {
NameToken(childPath.name)
} }
files(io, name + token, childPath)
} }
} }
public fun DataSink<Binary>.files(io: IOPlugin, path: Path, pathToName: (Path) -> Name = defaultPathToName) { //set data if it is relevant
if (dataBinary != null || meta != null) {
put(
name,
StaticData(
typeOf<Binary>(),
dataBinary ?: Binary.EMPTY,
meta ?: Meta.EMPTY
)
)
}
}
public fun DataSink<Binary>.files(
io: IOPlugin,
name: Name,
path: Path,
) {
if (path.isRegularFile() && path.extension == "zip") { if (path.isRegularFile() && path.extension == "zip") {
//Using explicit Zip file system to avoid bizarre compatibility bugs //Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" } val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found") ?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, mapOf("create" to "true")) val fs = fsProvider.newFileSystem(path, mapOf("create" to "true"))
return files(io, fs.rootDirectories.first(), pathToName) files(io, name, fs.rootDirectories.first())
} }
if (path.isRegularFile()) { if (path.isRegularFile()) {
file(io, path, pathToName(path)) file(io, name, path)
} else { } else {
directory(io, path, pathToName) directory(io, name, path)
} }
} }
@ -132,11 +134,11 @@ private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension)
@DFExperimental @DFExperimental
public fun DataSink<Binary>.monitorFiles( public fun DataSink<Binary>.monitorFiles(
io: IOPlugin, io: IOPlugin,
name: Name,
path: Path, path: Path,
pathToName: (Path) -> Name = defaultPathToName,
scope: CoroutineScope = io.context, scope: CoroutineScope = io.context,
): Job { ): Job {
files(io, path, pathToName) files(io, name, path)
return scope.launch(Dispatchers.IO) { return scope.launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService() val watchService = path.fileSystem.newWatchService()
@ -153,11 +155,11 @@ public fun DataSink<Binary>.monitorFiles(
for (event: WatchEvent<*> in key.pollEvents()) { for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
data(eventPath.toName(), null) put(eventPath.toName(), null)
} else { } else {
val fileName = eventPath.fileName.toString() val fileName = eventPath.fileName.toString()
if (!fileName.startsWith("@")) { if (!fileName.startsWith("@")) {
files(io, eventPath, pathToName) files(io, name, eventPath)
} }
} }
} }
@ -179,18 +181,14 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
dataSet: DataTree<T>, dataSet: DataTree<T>,
format: IOWriter<T>, format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null, envelopeFormat: EnvelopeFormat? = null,
nameToPath: (name: Name, data: Data<T>) -> Path = { name, _ -> ): Unit = withContext(Dispatchers.IO) {
Path(name.tokens.joinToString("/") { token -> token.toStringUnescaped() })
},
) {
withContext(Dispatchers.IO) {
if (!Files.exists(path)) { if (!Files.exists(path)) {
Files.createDirectories(path) Files.createDirectories(path)
} else if (!Files.isDirectory(path)) { } else if (!Files.isDirectory(path)) {
error("Can't write a node into file") error("Can't write a node into file")
} }
dataSet.forEach { (name, data) -> dataSet.forEach { (name, data) ->
val childPath = path.resolve(nameToPath(name, data)) val childPath = path.resolve(name.tokens.joinToString("/") { token -> token.toStringUnescaped() })
childPath.parent.createDirectories() childPath.parent.createDirectories()
val envelope = data.toEnvelope(format) val envelope = data.toEnvelope(format)
if (envelopeFormat != null) { if (envelopeFormat != null) {
@ -202,7 +200,6 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
dataSet.meta?.let { writeMetaFile(path, it) } dataSet.meta?.let { writeMetaFile(path, it) }
} }
}
/** /**
* @param resources The names of the resources to read. * @param resources The names of the resources to read.
@ -212,15 +209,12 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
public fun DataSink<Binary>.resources( public fun DataSink<Binary>.resources(
io: IOPlugin, io: IOPlugin,
vararg resources: String, vararg resources: String,
pathToName: (Path) -> Name = defaultPathToName,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader, classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
) { ) {
resources.forEach { resource -> resources.forEach { resource ->
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error( val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error(
"Resource with name $resource is not resolved" "Resource with name $resource is not resolved"
) )
branch(resource.asName()) { files(io, resource.asName(), path)
files(io, path, pathToName)
}
} }
} }

View File

@ -3,67 +3,36 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.EnvelopeFormat
import space.kscience.dataforge.io.IOPlugin
import space.kscience.dataforge.io.IOWriter
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.StandardOpenOption import java.nio.file.spi.FileSystemProvider
import java.util.zip.ZipEntry import kotlin.io.path.exists
import java.util.zip.ZipOutputStream import kotlin.io.path.extension
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
tree: DataTree<T>,
dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
//TODO add directory-based envelope writer
tree.data?.let {
val envelope = it.toEnvelope(dataFormat)
val entry = ZipEntry(name)
putNextEntry(entry)
//TODO remove additional copy
val bytes = ByteArray {
writeWith(envelopeFormat, envelope)
}
write(bytes)
}
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
tree.items.forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}
}
/** /**
* Write this [DataTree] as a zip archive * Write this [DataTree] as a zip archive
*/ */
@DFExperimental @DFExperimental
public suspend fun <T : Any> DataTree<T>.writeZip( public suspend fun <T : Any> IOPlugin.writeZip(
path: Path, path: Path,
format: IOFormat<T>, dataSet: DataTree<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat, format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
): Unit = withContext(Dispatchers.IO) { ): Unit = withContext(Dispatchers.IO) {
val actualFile = if (path.toString().endsWith(".zip")) { if (path.exists()) error("Can't override existing zip data file $path")
val actualFile = if (path.extension == "zip") {
path path
} else { } else {
path.resolveSibling(path.fileName.toString() + ".zip") path.resolveSibling(path.fileName.toString() + ".zip")
} }
val fos = Files.newOutputStream( val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
actualFile, ?: error("Zip file system provider not found")
StandardOpenOption.WRITE, //val fs = FileSystems.newFileSystem(actualFile, mapOf("create" to true))
StandardOpenOption.CREATE, val fs = fsProvider.newFileSystem(actualFile, mapOf("create" to true))
StandardOpenOption.TRUNCATE_EXISTING fs.use {
) writeDataDirectory(fs.rootDirectories.first(), dataSet, format, envelopeFormat)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", this@writeZip, format, envelopeFormat)
} }
} }

View File

@ -1,17 +1,16 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.static import space.kscience.dataforge.data.wrap
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class) @OptIn(DFExperimental::class)
internal class CachingWorkspaceTest { internal class CachingWorkspaceTest {
@Test @Test
@ -23,7 +22,7 @@ internal class CachingWorkspaceTest {
data { data {
//statically initialize data //statically initialize data
repeat(5) { repeat(5) {
static("myData[$it]", it) wrap("myData[$it]", it)
} }
} }
@ -51,13 +50,15 @@ internal class CachingWorkspaceTest {
val secondA = workspace.produce("doSecond") val secondA = workspace.produce("doSecond")
val secondB = workspace.produce("doSecond", Meta { "flag" put true }) val secondB = workspace.produce("doSecond", Meta { "flag" put true })
val secondC = workspace.produce("doSecond") val secondC = workspace.produce("doSecond")
//use coroutineScope to wait for the result
coroutineScope { coroutineScope {
first.compute(this) first.launch(this)
secondA.compute(this) secondA.launch(this)
secondB.compute(this) secondB.launch(this)
//repeat to check caching //repeat to check caching
secondC.compute(this) secondC.launch(this)
} }
assertEquals(10, firstCounter) assertEquals(10, firstCounter)
assertEquals(10, secondCounter) assertEquals(10, secondCounter)
} }

View File

@ -20,13 +20,13 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val result: Data<Int> = selectedData.foldToData(0) { result, data -> val result: Data<Int> = selectedData.foldToData(0) { result, data ->
result + data.value result + data.value
} }
data("result", result) put("result", result)
} }
val singleData by task<Int> { val singleData by task<Int> {
workspace.data.filterByType<Int>()["myData[12]"]?.let { workspace.data.filterByType<Int>()["myData[12]"]?.let {
data("result", it) put("result", it)
} }
} }
@ -47,7 +47,7 @@ class DataPropagationTest {
} }
data { data {
repeat(100) { repeat(100) {
static("myData[$it]", it) wrap("myData[$it]", it)
} }
} }
} }

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.io.Sink import kotlinx.io.Sink
import kotlinx.io.Source import kotlinx.io.Source
@ -13,7 +12,9 @@ import space.kscience.dataforge.io.*
import space.kscience.dataforge.io.yaml.YamlPlugin import space.kscience.dataforge.io.yaml.YamlPlugin
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import java.nio.file.Files import java.nio.file.Files
import kotlin.io.path.deleteExisting
import kotlin.io.path.fileSize import kotlin.io.path.fileSize
import kotlin.io.path.toPath import kotlin.io.path.toPath
import kotlin.test.Test import kotlin.test.Test
@ -23,11 +24,11 @@ import kotlin.test.assertEquals
class FileDataTest { class FileDataTest {
val dataNode = DataTree<String> { val dataNode = DataTree<String> {
branch("dir") { branch("dir") {
static("a", "Some string") { wrap("a", "Some string") {
"content" put "Some string" "content" put "Some string"
} }
} }
static("b", "root data") wrap("b", "root data")
// meta { // meta {
// "content" put "This is root meta node" // "content" put "This is root meta node"
// } // }
@ -45,18 +46,18 @@ class FileDataTest {
@Test @Test
@DFExperimental @DFExperimental
fun testDataWriteRead() = with(Global.io) { fun testDataWriteRead() = runTest {
val io = Global.io val io = Global.io
val dir = Files.createTempDirectory("df_data_node") val dir = Files.createTempDirectory("df_data_node")
runBlocking { io.writeDataDirectory(dir, dataNode, StringIOFormat)
writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString()) println(dir.toUri().toString())
val reconstructed = DataTree { files(io, dir) } val data = DataTree {
.transform { (_, value) -> value.toByteArray().decodeToString() } files(io, Name.EMPTY, dir)
}
val reconstructed = data.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
} }
}
@Test @Test
@ -64,9 +65,10 @@ class FileDataTest {
fun testZipWriteRead() = runTest { fun testZipWriteRead() = runTest {
val io = Global.io val io = Global.io
val zip = Files.createTempFile("df_data_node", ".zip") val zip = Files.createTempFile("df_data_node", ".zip")
dataNode.writeZip(zip, StringIOFormat) zip.deleteExisting()
io.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString()) println(zip.toUri().toString())
val reconstructed = DataTree { files(io, zip) } val reconstructed = DataTree { files(io, Name.EMPTY, zip) }
.transform { (_, value) -> value.toByteArray().decodeToString() } .transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())

View File

@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.static import space.kscience.dataforge.data.wrap
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files
@ -16,7 +16,7 @@ class FileWorkspaceCacheTest {
data { data {
//statically initialize data //statically initialize data
repeat(5) { repeat(5) {
static("myData[$it]", it) wrap("myData[$it]", it)
} }
} }
fileCache(Files.createTempDirectory("dataforge-temporary-cache")) fileCache(Files.createTempDirectory("dataforge-temporary-cache"))
@ -26,7 +26,7 @@ class FileWorkspaceCacheTest {
} }
} }
workspace.produce("echo").compute(this) workspace.produce("echo").launch(this)
} }
} }

View File

@ -62,14 +62,14 @@ internal class SimpleWorkspaceTest {
data { data {
//statically initialize data //statically initialize data
repeat(100) { repeat(100) {
static("myData[$it]", it) wrap("myData[$it]", it)
} }
} }
val filterOne by task<Int> { val filterOne by task<Int> {
val name by taskMeta.string { error("Name field not defined") } val name by taskMeta.string { error("Name field not defined") }
from(testPluginFactory) { test }[name]?.let { source: Data<Int> -> from(testPluginFactory) { test }[name]?.let { source: Data<Int> ->
data(name, source) put(name, source)
} }
} }
@ -97,7 +97,7 @@ internal class SimpleWorkspaceTest {
val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r -> val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r ->
l + r l + r
} }
data(data.name, newData) put(data.name, newData)
} }
} }
@ -106,7 +106,7 @@ internal class SimpleWorkspaceTest {
val res = from(square).foldToData(0) { l, r -> val res = from(square).foldToData(0) { l, r ->
l + r.value l + r.value
} }
data("sum", res) put("sum", res)
} }
val averageByGroup by task<Int> { val averageByGroup by task<Int> {
@ -116,13 +116,13 @@ internal class SimpleWorkspaceTest {
l + r.value l + r.value
} }
data("even", evenSum) put("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _, _ -> val oddSum = workspace.data.filterByType<Int> { name, _, _ ->
name.toString().toInt() % 2 == 1 name.toString().toInt() % 2 == 1
}.foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.value l + r.value
} }
data("odd", oddSum) put("odd", oddSum)
} }
val delta by task<Int> { val delta by task<Int> {
@ -132,7 +132,7 @@ internal class SimpleWorkspaceTest {
val res = even.combine(odd) { l, r -> val res = even.combine(odd) { l, r ->
l - r l - r
} }
data("res", res) put("res", res)
} }
val customPipe by task<Int> { val customPipe by task<Int> {
@ -140,7 +140,7 @@ internal class SimpleWorkspaceTest {
val meta = data.meta.toMutableMeta().apply { val meta = data.meta.toMutableMeta().apply {
"newValue" put 22 "newValue" put 22
} }
data(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it }) put(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it })
} }
} }