[WIP] change data structure

This commit is contained in:
Alexander Nozik 2024-01-29 22:10:06 +03:00
parent 5fec0518d4
commit 90999f424f
30 changed files with 316 additions and 398 deletions

View File

@ -10,7 +10,7 @@ import space.kscience.dataforge.misc.DFExperimental
/** /**
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute]. * A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute].
*/ */
public fun interface Action<T : Any, R : Any> { public fun interface Action<T, R> {
/** /**
* Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy * Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
@ -24,7 +24,7 @@ public fun interface Action<T : Any, R : Any> {
/** /**
* A convenience method to transform data using given [action] * A convenience method to transform data using given [action]
*/ */
public fun <T : Any, R : Any> DataTree<T>.transform( public fun <T, R> DataTree<T>.transform(
action: Action<T, R>, action: Action<T, R>,
scope: CoroutineScope, scope: CoroutineScope,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
@ -33,11 +33,11 @@ public fun <T : Any, R : Any> DataTree<T>.transform(
/** /**
* Action composition. The result is terminal if one of its parts is terminal * Action composition. The result is terminal if one of its parts is terminal
*/ */
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> = public infix fun <T , I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> =
Action { scope, dataSet, meta -> action.execute(scope, this@then.execute(scope, dataSet, meta), meta) } Action { scope, dataSet, meta -> action.execute(scope, this@then.execute(scope, dataSet, meta), meta) }
@DFExperimental @DFExperimental
public suspend operator fun <T : Any, R : Any> Action<T, R>.invoke( public suspend operator fun <T, R> Action<T, R>.invoke(
dataSet: DataTree<T>, dataSet: DataTree<T>,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): DataTree<R> = coroutineScope { execute(this, dataSet, meta) } ): DataTree<R> = coroutineScope { execute(this, dataSet, meta) }

View File

@ -79,7 +79,7 @@ internal class MapAction<T : Any, R : Any>(
builder.result(env, data.await()) builder.result(env, data.await())
} }
//setting the data node //setting the data node
emit(newName, newData) data(newName, newData)
} }
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) { override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {

View File

@ -103,7 +103,7 @@ internal class ReduceAction<T : Any, R : Any>(
meta = groupMeta meta = groupMeta
) { group.result.invoke(env, it) } ) { group.result.invoke(env, it) }
emit(env.name, res) data(env.name, res)
} }
} }
} }

View File

@ -64,7 +64,7 @@ internal class SplitAction<T : Any, R : Any>(
).apply(rule) ).apply(rule)
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName) //data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
emit( data(
fragmentName, fragmentName,
@Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) { @Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) {
env.result(data.await()) env.result(data.await())

View File

@ -15,7 +15,7 @@ import kotlin.reflect.typeOf
* A data element characterized by its meta * A data element characterized by its meta
*/ */
@DfType(Data.TYPE) @DfType(Data.TYPE)
public interface Data<T> : Goal<T>, MetaRepr { public interface Data<out T> : Goal<T>, MetaRepr {
/** /**
* Type marker for the data. The type is known before the calculation takes place so it could be checked. * Type marker for the data. The type is known before the calculation takes place so it could be checked.
*/ */

View File

@ -12,7 +12,7 @@ import kotlin.reflect.typeOf
/** /**
* A generic data provider * A generic data provider
*/ */
public interface DataSource<T> { public interface DataSource<out T> {
/** /**
* The minimal common ancestor to all data in the node * The minimal common ancestor to all data in the node
@ -28,7 +28,7 @@ public interface DataSource<T> {
/** /**
* A data provider with possible dynamic updates * A data provider with possible dynamic updates
*/ */
public interface ObservableDataSource<T> : DataSource<T> { public interface ObservableDataSource<out T> : DataSource<T> {
/** /**
* Flow updates made to the data * Flow updates made to the data
@ -39,7 +39,7 @@ public interface ObservableDataSource<T> : DataSource<T> {
/** /**
* A tree like structure for data holding * A tree like structure for data holding
*/ */
public interface GenericDataTree<T, out TR : GenericDataTree<T, TR>> : DataSource<T> { public interface GenericDataTree<out T, out TR : GenericDataTree<T, TR>> : DataSource<T> {
public val self: TR public val self: TR
public val data: Data<T>? public val data: Data<T>?
@ -66,7 +66,12 @@ public interface GenericDataTree<T, out TR : GenericDataTree<T, TR>> : DataSourc
} }
} }
public typealias DataTree<T> = GenericDataTree<T, *> public typealias DataTree<T> = GenericDataTree<T, GenericDataTree<T,*>>
/**
* Return a single data in this tree. Throw error if it is not single.
*/
public fun <T> DataTree<T>.single(): NamedData<T> = asSequence().single()
/** /**
* An alias for easier access to tree values * An alias for easier access to tree values
@ -79,7 +84,7 @@ public operator fun <T> DataTree<T>.get(name: String): Data<T>? = read(name.pars
* Return a sequence of all data items in this tree. * Return a sequence of all data items in this tree.
* This method does not take updates into account. * This method does not take updates into account.
*/ */
public fun <T> GenericDataTree<T, DataTree<T>>.asSequence( public fun <T> DataTree<T>.asSequence(
namePrefix: Name = Name.EMPTY, namePrefix: Name = Name.EMPTY,
): Sequence<NamedData<T>> = sequence { ): Sequence<NamedData<T>> = sequence {
data?.let { yield(it.named(Name.EMPTY)) } data?.let { yield(it.named(Name.EMPTY)) }
@ -100,6 +105,9 @@ public tailrec fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branc
else -> items[name.first()]?.branch(name.cutFirst()) else -> items[name.first()]?.branch(name.cutFirst())
} }
public fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: String): TR? =
branch(name.parseAsName())
public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEmpty() public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEmpty()
@PublishedApi @PublishedApi
@ -113,7 +121,7 @@ internal class FlatDataTree<T>(
override val items: Map<NameToken, FlatDataTree<T>> override val items: Map<NameToken, FlatDataTree<T>>
get() = dataSet.keys get() = dataSet.keys
.filter { it.startsWith(prefix) && it.length > prefix.length } .filter { it.startsWith(prefix) && it.length > prefix.length }
.map { it.tokens[prefix.length + 1] } .map { it.tokens[prefix.length] }
.associateWith { FlatDataTree(dataType, dataSet, prefix + it) } .associateWith { FlatDataTree(dataType, dataSet, prefix + it) }
override fun read(name: Name): Data<T>? = dataSet[prefix + name] override fun read(name: Name): Data<T>? = dataSet[prefix + name]
@ -133,20 +141,20 @@ internal fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> = public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> =
FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY) FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY)
public interface GenericObservableDataTree<T, TR : GenericObservableDataTree<T, TR>> : GenericDataTree<T, TR>, public interface GenericObservableDataTree<out T, out TR : GenericObservableDataTree<T, TR>> : GenericDataTree<T, TR>,
ObservableDataSource<T> ObservableDataSource<T>
public typealias ObservableDataTree<T> = GenericObservableDataTree<T, *> public typealias ObservableDataTree<T> = GenericObservableDataTree<T, GenericObservableDataTree<T, *>>
public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> = if (this is ObservableDataTree<T>) updates() else emptyFlow() public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> = if (this is GenericObservableDataTree<T,*>) updates() else emptyFlow()
public fun interface DataSink<T> { public fun interface DataSink<in T> {
public fun emit(name: Name, data: Data<T>?) public fun data(name: Name, data: Data<T>?)
} }
public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> { public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> {
private val map = HashMap<Name, Data<T>>() private val map = HashMap<Name, Data<T>>()
override fun emit(name: Name, data: Data<T>?) { override fun data(name: Name, data: Data<T>?) {
if (data == null) { if (data == null) {
map.remove(name) map.remove(name)
} else { } else {
@ -182,7 +190,7 @@ public interface MutableDataTree<T> : GenericObservableDataTree<T, MutableDataTr
public operator fun set(token: NameToken, data: Data<T>?) public operator fun set(token: NameToken, data: Data<T>?)
override fun emit(name: Name, data: Data<T>?): Unit = set(name, data) override fun data(name: Name, data: Data<T>?): Unit = set(name, data)
} }
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit { public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
@ -266,6 +274,6 @@ public fun <T> Sequence<NamedData<T>>.toObservableTree(dataType: KType, scope: C
MutableDataTree<T>(dataType, scope).apply { MutableDataTree<T>(dataType, scope).apply {
emitAll(this@toObservableTree) emitAll(this@toObservableTree)
updates.onEach { updates.onEach {
emit(it.name, it.data) data(it.name, it.data)
}.launchIn(scope) }.launchIn(scope)
} }

View File

@ -42,7 +42,7 @@ public interface GroupRule {
set.forEach { data -> set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.emit(data.name,data.data) map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.data(data.name,data.data)
} }

View File

@ -4,7 +4,7 @@ import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.misc.Named import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
public interface NamedData<T> : Named, Data<T> { public interface NamedData<out T> : Named, Data<T> {
override val name: Name override val name: Name
public val data: Data<T> public val data: Data<T>
} }

View File

@ -7,23 +7,24 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.isEmpty import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus import space.kscience.dataforge.names.plus
public fun <T> DataSink<T>.emit(value: NamedData<T>) { public fun <T> DataSink<T>.data(value: NamedData<T>) {
emit(value.name, value.data) data(value.name, value.data)
} }
public fun <T> DataSink<T>.emitAll(sequence: Sequence<NamedData<T>>) { public fun <T> DataSink<T>.emitAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach { emit(it) } sequence.forEach { data(it) }
} }
public fun <T> DataSink<T>.emitAll(dataTree: DataTree<T>) { public fun <T> DataSink<T>.branch(dataTree: DataTree<T>) {
emitAll(dataTree.asSequence()) emitAll(dataTree.asSequence())
} }
public inline fun <T> DataSink<T>.emitAll( public inline fun <T> DataSink<T>.branch(
prefix: Name, prefix: Name,
block: DataSink<T>.() -> Unit, block: DataSink<T>.() -> Unit,
) { ) {
@ -31,45 +32,50 @@ public inline fun <T> DataSink<T>.emitAll(
apply(block) apply(block)
} else { } else {
val proxyDataSink = DataSink { nameWithoutPrefix, data -> val proxyDataSink = DataSink { nameWithoutPrefix, data ->
this.emit(prefix + nameWithoutPrefix, data) this.data(prefix + nameWithoutPrefix, data)
} }
proxyDataSink.apply(block) proxyDataSink.apply(block)
} }
} }
public inline fun <T> DataSink<T>.branch(
prefix: String,
block: DataSink<T>.() -> Unit,
): Unit = branch(prefix.asName(), block)
public fun <T> DataSink<T>.emit(name: String, value: Data<T>) {
emit(Name.parse(name), value) public fun <T> DataSink<T>.data(name: String, value: Data<T>) {
data(Name.parse(name), value)
} }
public fun <T> DataSink<T>.emitAll(name: Name, set: DataTree<T>) { public fun <T> DataSink<T>.branch(name: Name, set: DataTree<T>) {
emitAll(name) { emitAll(set.asSequence()) } branch(name) { emitAll(set.asSequence()) }
} }
public fun <T> DataSink<T>.emitAll(name: String, set: DataTree<T>) { public fun <T> DataSink<T>.branch(name: String, set: DataTree<T>) {
emitAll(Name.parse(name)) { emitAll(set.asSequence()) } branch(Name.parse(name)) { emitAll(set.asSequence()) }
} }
/** /**
* Produce lazy [Data] and emit it into the [MutableDataTree] * Produce lazy [Data] and emit it into the [MutableDataTree]
*/ */
public inline fun <reified T> DataSink<T>.produce( public inline fun <reified T> DataSink<T>.data(
name: String, name: String,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
) { ) {
val data = Data(meta, block = producer) val data = Data(meta, block = producer)
emit(name, data) data(name, data)
} }
public inline fun <reified T> DataSink<T>.produce( public inline fun <reified T> DataSink<T>.data(
name: Name, name: Name,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T, noinline producer: suspend () -> T,
) { ) {
val data = Data(meta, block = producer) val data = Data(meta, block = producer)
emit(name, data) data(name, data)
} }
/** /**
@ -79,24 +85,24 @@ public inline fun <reified T> DataSink<T>.static(
name: String, name: String,
data: T, data: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = emit(name, Data.static(data, meta)) ): Unit = data(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.static( public inline fun <reified T> DataSink<T>.static(
name: Name, name: Name,
data: T, data: T,
meta: Meta = Meta.EMPTY, meta: Meta = Meta.EMPTY,
): Unit = emit(name, Data.static(data, meta)) ): Unit = data(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.static( public inline fun <reified T> DataSink<T>.static(
name: String, name: String,
data: T, data: T,
mutableMeta: MutableMeta.() -> Unit, mutableMeta: MutableMeta.() -> Unit,
): Unit = emit(Name.parse(name), Data.static(data, Meta(mutableMeta))) ): Unit = data(Name.parse(name), Data.static(data, Meta(mutableMeta)))
public fun <T> DataSink<T>.populateFrom(sequence: Sequence<NamedData<T>>) { public fun <T> DataSink<T>.populateFrom(sequence: Sequence<NamedData<T>>) {
sequence.forEach { sequence.forEach {
emit(it.name, it.data) data(it.name, it.data)
} }
} }
@ -111,7 +117,7 @@ public fun <T> DataSink<T>.populateFrom(tree: DataTree<T>) {
@DFExperimental @DFExperimental
public fun <T> MutableDataTree<T>.populateFrom(flow: ObservableDataSource<T>): Job = flow.updates().onEach { public fun <T> MutableDataTree<T>.populateFrom(flow: ObservableDataSource<T>): Job = flow.updates().onEach {
//TODO check if the place is occupied //TODO check if the place is occupied
emit(it.name, it.data) data(it.name, it.data)
}.launchIn(scope) }.launchIn(scope)
//public fun <T > DataSetBuilder<T>.populateFrom(flow: Flow<NamedData<T>>) { //public fun <T > DataSetBuilder<T>.populateFrom(flow: Flow<NamedData<T>>) {

View File

@ -9,14 +9,14 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
public data class ValueWithMeta<T>(val meta: Meta, val value: T) public data class ValueWithMeta<T>(val value: T, val meta: Meta)
public suspend fun <T> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(meta, await()) public suspend fun <T> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(await(), meta)
public data class NamedValueWithMeta<T>(val name: Name, val meta: Meta, val value: T) public data class NamedValueWithMeta<T>(val name: Name, val value: T, val meta: Meta)
public suspend fun <T> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> = public suspend fun <T> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
NamedValueWithMeta(name, meta, await()) NamedValueWithMeta(name, await(), meta)
/** /**
@ -25,7 +25,7 @@ public suspend fun <T> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
* @param meta for the resulting data. By default equals input data. * @param meta for the resulting data. By default equals input data.
* @param block the transformation itself * @param block the transformation itself
*/ */
public inline fun <T : Any, reified R : Any> Data<T>.transform( public inline fun <T, reified R> Data<T>.transform(
meta: Meta = this.meta, meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (T) -> R, crossinline block: suspend (T) -> R,
@ -36,7 +36,7 @@ public inline fun <T : Any, reified R : Any> Data<T>.transform(
/** /**
* Combine this data with the other data using [block]. See [Data::map] for other details * Combine this data with the other data using [block]. See [Data::map] for other details
*/ */
public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine( public inline fun <T1, T2, reified R> Data<T1>.combine(
other: Data<T2>, other: Data<T2>,
meta: Meta = this.meta, meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -69,7 +69,7 @@ internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
} }
@DFInternal @DFInternal
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData( public fun <K, T, R> Map<K, Data<T>>.reduceToData(
outputType: KType, outputType: KType,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -89,7 +89,7 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
* @param T type of the input goal * @param T type of the input goal
* @param R type of the result goal * @param R type of the result goal
*/ */
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData( public inline fun <K, T, reified R> Map<K, Data<T>>.reduceToData(
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (Map<K, ValueWithMeta<T>>) -> R, crossinline block: suspend (Map<K, ValueWithMeta<T>>) -> R,
@ -104,7 +104,7 @@ public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
//Iterable operations //Iterable operations
@DFInternal @DFInternal
public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData( public inline fun <T, R> Iterable<Data<T>>.reduceToData(
outputType: KType, outputType: KType,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -119,7 +119,7 @@ public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
} }
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData( public inline fun <T, reified R> Iterable<Data<T>>.reduceToData(
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R, crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
@ -127,7 +127,7 @@ public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData(
transformation(it) transformation(it)
} }
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData( public inline fun <T, reified R> Iterable<Data<T>>.foldToData(
initial: R, initial: R,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -142,7 +142,7 @@ public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
* Transform an [Iterable] of [NamedData] to a single [Data]. * Transform an [Iterable] of [NamedData] to a single [Data].
*/ */
@DFInternal @DFInternal
public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData( public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData(
outputType: KType, outputType: KType,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -157,7 +157,7 @@ public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
} }
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedToData( public inline fun <T, reified R> Iterable<NamedData<T>>.reduceNamedToData(
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R, crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
@ -168,7 +168,7 @@ public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedT
/** /**
* Fold a [Iterable] of named data into a single [Data] * Fold a [Iterable] of named data into a single [Data]
*/ */
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToData( public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
initial: R, initial: R,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -194,18 +194,18 @@ public suspend fun <T, R> DataTree<T>.transform(
val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) { val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) {
block(namedData.awaitWithMeta()) block(namedData.awaitWithMeta())
} }
emit(namedData.name, d) data(namedData.name, d)
} }
} }
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
public suspend inline fun <T : Any, reified R : Any> DataTree<T>.transform( public suspend inline fun <T, reified R> DataTree<T>.transform(
noinline metaTransform: MutableMeta.() -> Unit = {}, noinline metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline block: suspend (NamedValueWithMeta<T>) -> R, noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = this@transform.transform(typeOf<R>(), metaTransform, coroutineContext, block) ): DataTree<R> = this@transform.transform(typeOf<R>(), metaTransform, coroutineContext, block)
public inline fun <T : Any> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) { public inline fun <T> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) {
asSequence().forEach(block) asSequence().forEach(block)
} }
@ -219,13 +219,13 @@ internal fun DataTree<*>.joinMeta(): Meta = Meta {
} }
} }
public inline fun <T : Any, reified R : Any> DataTree<T>.reduceToData( public inline fun <T, reified R> DataTree<T>.reduceToData(
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R, crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R,
): Data<R> = asSequence().asIterable().reduceNamedToData(meta, coroutineContext, transformation) ): Data<R> = asSequence().asIterable().reduceNamedToData(meta, coroutineContext, transformation)
public inline fun <T : Any, reified R : Any> DataTree<T>.foldToData( public inline fun <T, reified R> DataTree<T>.foldToData(
initial: R, initial: R,
meta: Meta = joinMeta(), meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,

View File

@ -14,7 +14,7 @@ import kotlin.reflect.typeOf
* Cast the node to given type if the cast is possible or return null * Cast the node to given type if the cast is possible or return null
*/ */
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? = private fun <R> Data<*>.castOrNull(type: KType): Data<R>? =
if (!this.type.isSubtypeOf(type)) { if (!this.type.isSubtypeOf(type)) {
null null
} else { } else {
@ -55,7 +55,7 @@ public inline fun <reified R : Any> DataTree<*>.filterByType(
/** /**
* Select a single datum if it is present and of given [type] * Select a single datum if it is present and of given [type]
*/ */
public fun <R : Any> DataTree<*>.getByType(type: KType, name: Name): NamedData<R>? = public fun <R> DataTree<*>.getByType(type: KType, name: Name): NamedData<R>? =
get(name)?.castOrNull<R>(type)?.named(name) get(name)?.castOrNull<R>(type)?.named(name)
public inline fun <reified R : Any> DataTree<*>.getByType(name: Name): NamedData<R>? = public inline fun <reified R : Any> DataTree<*>.getByType(name: Name): NamedData<R>? =

View File

@ -12,14 +12,14 @@ import space.kscience.dataforge.names.plus
*/ */
context(DataSink<T>) context(DataSink<T>)
public infix fun <T : Any> String.put(data: Data<T>): Unit = public infix fun <T : Any> String.put(data: Data<T>): Unit =
emit(Name.parse(this), data) data(Name.parse(this), data)
/** /**
* Append node * Append node
*/ */
context(DataSink<T>) context(DataSink<T>)
public infix fun <T : Any> String.put(dataSet: DataTree<T>): Unit = public infix fun <T : Any> String.put(dataSet: DataTree<T>): Unit =
emitAll(this, dataSet) branch(this, dataSet)
/** /**
* Build and append node * Build and append node
@ -27,7 +27,7 @@ public infix fun <T : Any> String.put(dataSet: DataTree<T>): Unit =
context(DataSink<T>) context(DataSink<T>)
public infix fun <T : Any> String.put( public infix fun <T : Any> String.put(
block: DataSink<T>.() -> Unit, block: DataSink<T>.() -> Unit,
): Unit = emitAll(Name.parse(this), block) ): Unit = branch(Name.parse(this), block)
/** /**
* Copy given data set and mirror its changes to this [LegacyDataTreeBuilder] in [this@setAndObserve]. Returns an update [Job] * Copy given data set and mirror its changes to this [LegacyDataTreeBuilder] in [this@setAndObserve]. Returns an update [Job]
@ -37,8 +37,8 @@ public fun <T : Any> CoroutineScope.setAndWatch(
name: Name, name: Name,
dataSet: DataTree<T>, dataSet: DataTree<T>,
): Job = launch { ): Job = launch {
emitAll(name, dataSet) branch(name, dataSet)
dataSet.updates().collect { dataSet.updates().collect {
emit(name + it.name, it.data) data(name + it.name, it.data)
} }
} }

View File

@ -7,7 +7,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
internal class LegacyGenericDataTreeBuilderTest { internal class DataTreeBuilderTest {
@Test @Test
fun testTreeBuild() = runBlocking { fun testTreeBuild() = runBlocking {
val node = DataTree<Any> { val node = DataTree<Any> {

View File

@ -10,10 +10,7 @@ import space.kscience.dataforge.names.asName
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
public interface EnvelopeFormat : IOFormat<Envelope> { public interface EnvelopeFormat : IOFormat<Envelope>
override val type: KType get() = typeOf<Envelope>()
}
public fun EnvelopeFormat.read(input: Source): Envelope = readFrom(input) public fun EnvelopeFormat.read(input: Source): Envelope = readFrom(input)

View File

@ -49,13 +49,13 @@ public fun interface IOWriter<in T> {
*/ */
public interface IOFormat<T> : IOReader<T>, IOWriter<T> public interface IOFormat<T> : IOReader<T>, IOWriter<T>
public fun <T : Any> Source.readWith(format: IOReader<T>): T = format.readFrom(this) public fun <T> Source.readWith(format: IOReader<T>): T = format.readFrom(this)
/** /**
* Read given binary as an object using given format * Read given binary as an object using given format
*/ */
public fun <T> Binary.readWith(format: IOReader<T>): T = read { public fun <T> Binary.readWith(format: IOReader<T>): T = read {
readWith(format) this.readWith(format)
} }
/** /**

View File

@ -21,8 +21,6 @@ import kotlin.reflect.typeOf
*/ */
public interface MetaFormat : IOFormat<Meta> { public interface MetaFormat : IOFormat<Meta> {
override val type: KType get() = typeOf<Meta>()
override fun writeTo(sink: Sink, obj: Meta) { override fun writeTo(sink: Sink, obj: Meta) {
writeMeta(sink, obj, null) writeMeta(sink, obj, null)
} }

View File

@ -1,9 +1,13 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.ObservableDataTree import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.launch
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
/** /**
* A result of a [Task] * A result of a [Task]
@ -12,16 +16,24 @@ import kotlin.reflect.KType
* @param taskMeta The configuration of the task that produced the result * @param taskMeta The configuration of the task that produced the result
*/ */
public data class TaskResult<T>( public data class TaskResult<T>(
public val data: ObservableDataTree<T>, public val content: ObservableDataTree<T>,
public val workspace: Workspace, public val workspace: Workspace,
public val taskName: Name, public val taskName: Name,
public val taskMeta: Meta, public val taskMeta: Meta,
) { ): ObservableDataTree<T> by content
val dataType: KType get() = data.dataType
}
/** /**
* Wrap data into [TaskResult] * Wrap data into [TaskResult]
*/ */
public fun <T> Workspace.wrapResult(data: ObservableDataTree<T>, taskName: Name, taskMeta: Meta): TaskResult<T> = public fun <T> Workspace.wrapResult(data: ObservableDataTree<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
TaskResult(data, this, taskName, taskMeta) TaskResult(data, this, taskName, taskMeta)
/**
* Start computation for all data elements of this node.
* The resulting [Job] is completed only when all of them are completed.
*/
public fun TaskResult<*>.compute(scope: CoroutineScope): Job = scope.launch {
asSequence().forEach {
it.data.launch(scope)
}
}

View File

@ -53,7 +53,7 @@ public interface Workspace : ContextAware, Provider, CoroutineScope {
} }
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): Data<*>? = public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): Data<*>? =
produce(taskName, taskMeta).data[name] produce(taskName, taskMeta)[name]
public companion object { public companion object {
public const val TYPE: String = "workspace" public const val TYPE: String = "workspace"

View File

@ -19,12 +19,12 @@ import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> { public data class TaskReference<T>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
override suspend fun select(workspace: Workspace, meta: Meta): DataTree<T> { override suspend fun select(workspace: Workspace, meta: Meta): DataTree<T> {
if (workspace.tasks[taskName] == task) { if (workspace.tasks[taskName] == task) {
return workspace.produce(taskName, meta).data as DataTree<T> return workspace.produce(taskName, meta) as DataTree<T>
} else { } else {
error("Task $taskName does not belong to the workspace") error("Task $taskName does not belong to the workspace")
} }
@ -125,7 +125,7 @@ public class WorkspaceBuilder(
/** /**
* Define intrinsic data for the workspace * Define intrinsic data for the workspace
*/ */
public fun data(builder: DataSink<*>.() -> Unit) { public fun data(builder: DataSink<Any?>.() -> Unit) {
data.apply(builder) data.apply(builder)
} }

View File

@ -3,7 +3,7 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.actions.Action import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.emitAll import space.kscience.dataforge.data.branch
import space.kscience.dataforge.data.forEach import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.transform import space.kscience.dataforge.data.transform
import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.*
@ -25,12 +25,12 @@ public val TaskResultBuilder<*>.defaultDependencyMeta: Meta
* @param selector a workspace data selector. Could be either task selector or initial data selector. * @param selector a workspace data selector. Could be either task selector or initial data selector.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta]. * @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
*/ */
public suspend fun <T : Any> TaskResultBuilder<*>.from( public suspend fun <T> TaskResultBuilder<*>.from(
selector: DataSelector<T>, selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
): DataTree<T> = selector.select(workspace, dependencyMeta) ): DataTree<T> = selector.select(workspace, dependencyMeta)
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from( public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
plugin: P, plugin: P,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>, selectorBuilder: P.() -> TaskReference<T>,
@ -50,7 +50,7 @@ public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuild
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta]. * @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
* @param selectorBuilder a builder of task from the plugin. * @param selectorBuilder a builder of task from the plugin.
*/ */
public suspend inline fun <reified T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from( public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
pluginFactory: PluginFactory<P>, pluginFactory: PluginFactory<P>,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>, selectorBuilder: P.() -> TaskReference<T>,
@ -77,7 +77,7 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
* @param action process individual data asynchronously. * @param action process individual data asynchronously.
*/ */
@DFExperimental @DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.transformEach( public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach(
selector: DataSelector<T>, selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {}, dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
@ -93,27 +93,27 @@ public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.transf
action(it, data.name, meta) action(it, data.name, meta)
} }
emit(data.name, res) data(data.name, res)
} }
} }
/** /**
* Set given [dataSet] as a task result. * Set given [dataSet] as a task result.
*/ */
public fun <T : Any> TaskResultBuilder<T>.result(dataSet: DataTree<T>) { public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) {
emitAll(dataSet) branch(dataSet)
} }
/** /**
* Use provided [action] to fill the result * Use provided [action] to fill the result
*/ */
@DFExperimental @DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.actionFrom( public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom(
selector: DataSelector<T>, selector: DataSelector<T>,
action: Action<T, R>, action: Action<T, R>,
dependencyMeta: Meta = defaultDependencyMeta, dependencyMeta: Meta = defaultDependencyMeta,
) { ) {
emitAll(action.execute(workspace.context, from(selector, dependencyMeta), dependencyMeta)) branch(action.execute(workspace.context, from(selector, dependencyMeta), dependencyMeta))
} }

View File

@ -95,8 +95,8 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
} }
val cachedTree = result.data.asSequence().map { cacheOne(it) } val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.data.updates().map { cacheOne(it) }) .toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta) return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
} }

View File

@ -1,39 +1,39 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.Name
import kotlin.reflect.KType import kotlin.reflect.KType
import kotlin.reflect.full.isSubtypeOf import kotlin.reflect.full.isSubtypeOf
private typealias TaskResultId = Pair<Name, Meta> private data class TaskResultId(val name: Name, val meta: Meta)
public class InMemoryWorkspaceCache : WorkspaceCache { public class InMemoryWorkspaceCache : WorkspaceCache {
// never do that at home! private val cache = HashMap<TaskResultId, HashMap<Name, Data<*>>>()
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = private fun <T> Data<*>.checkType(taskType: KType): Data<T> =
if (type.isSubtypeOf(taskType)) this as TaskData<T> if (type.isSubtypeOf(taskType)) this as Data<T>
else error("Cached data type mismatch: expected $taskType but got $type") else error("Cached data type mismatch: expected $taskType but got $type")
override suspend fun <T : Any> cache(result: TaskResult<T>): TaskResult<T> { override suspend fun <T> cache(result: TaskResult<T>): TaskResult<T> {
for (d: TaskData<T> in result) { fun cacheOne(data: NamedData<T>): NamedData<T> {
cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d } val cachedData = cache.getOrPut(TaskResultId(result.taskName, result.taskMeta)){
HashMap()
}.getOrPut(data.name){
data.data
}
return cachedData.checkType<T>(result.dataType).named(data.name)
} }
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = (cache[result.taskName to result.taskMeta]
?.values?.map { it.checkType<T>(result.dataType) }
?: emptyList()).iterator()
override fun get(name: Name): TaskData<T>? { val cachedTree = result.asSequence().map { cacheOne(it) }
val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null .toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) })
//TODO check types
return cached.checkType(result.dataType) return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
}
}
} }
} }

View File

@ -1,30 +1,24 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.warn
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.* import space.kscience.dataforge.names.Name
import space.kscience.dataforge.workspace.FileData.Companion.defaultPathToName import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.FileData.defaultPathToName
import java.nio.file.Files import java.nio.file.Files
import java.nio.file.Path import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider import java.nio.file.spi.FileSystemProvider
import java.time.Instant
import kotlin.io.path.* import kotlin.io.path.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf import kotlin.reflect.typeOf
@ -33,18 +27,7 @@ import kotlin.reflect.typeOf
public typealias FileFormatResolver<T> = (path: Path, meta: Meta) -> IOReader<T>? public typealias FileFormatResolver<T> = (path: Path, meta: Meta) -> IOReader<T>?
/** public object FileData {
* A data based on a filesystem [Path]
*/
public class FileData<T> internal constructor(private val data: Data<T>, public val path: Path) : Data<T> by data {
// public val path: String? get() = meta[META_FILE_PATH_KEY].string
// public val extension: String? get() = meta[META_FILE_EXTENSION_KEY].string
//
public val createdTime: Instant? get() = meta[FILE_CREATE_TIME_KEY].string?.let { Instant.parse(it) }
public val updatedTime: Instant? get() = meta[FILE_UPDATE_TIME_KEY].string?.let { Instant.parse(it) }
public companion object {
public val FILE_KEY: Name = "file".asName() public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path" public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension" public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
@ -67,7 +50,6 @@ public class FileData<T> internal constructor(private val data: Data<T>, public
} }
) )
} }
}
} }
@ -75,14 +57,11 @@ public class FileData<T> internal constructor(private val data: Data<T>, public
* Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file. * Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file.
* The operation is blocking since it must read the meta header. The reading of envelope body is lazy * The operation is blocking since it must read the meta header. The reading of envelope body is lazy
*/ */
@OptIn(DFInternal::class) @OptIn(DFExperimental::class)
@DFExperimental public fun IOPlugin.readFileData(
public fun <T : Any> IOPlugin.readDataFile(
path: Path, path: Path,
formatResolver: FileFormatResolver<T>, ): Data<Binary> {
): FileData<T>? {
val envelope = readEnvelopeFile(path, true) val envelope = readEnvelopeFile(path, true)
val format = formatResolver(path, envelope.meta) ?: return null
val updatedMeta = envelope.meta.copy { val updatedMeta = envelope.meta.copy {
FileData.FILE_PATH_KEY put path.toString() FileData.FILE_PATH_KEY put path.toString()
FileData.FILE_EXTENSION_KEY put path.extension FileData.FILE_EXTENSION_KEY put path.extension
@ -91,93 +70,74 @@ public fun <T : Any> IOPlugin.readDataFile(
FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString() FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString() FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
} }
return FileData( return StaticData(
Data(format.type, updatedMeta) { typeOf<Binary>(),
(envelope.data ?: Binary.EMPTY).readWith(format) envelope.data ?: Binary.EMPTY,
}, updatedMeta
path
) )
} }
public fun DataSink<Binary>.file(io: IOPlugin, path: Path, name: Name) {
if (!path.isRegularFile()) error("Only regular files could be handled by this function")
data(name, io.readFileData(path))
}
context(IOPlugin) @DFExperimental public fun DataSink<Binary>.directory(
public fun <T : Any> DataSetBuilder<T>.directory( io: IOPlugin,
path: Path, path: Path,
pathToName: (Path) -> Name = defaultPathToName, pathToName: (Path) -> Name = defaultPathToName,
formatResolver: FileFormatResolver<T>,
) { ) {
if (!path.isDirectory()) error("Only directories could be handled by this function")
val metaFile = path.resolve(IOPlugin.META_FILE_NAME)
val dataFile = path.resolve(IOPlugin.DATA_FILE_NAME)
//process root data
if (metaFile.exists() || dataFile.exists()) {
data(
Name.EMPTY,
StaticData(
typeOf<Binary>(),
dataFile.takeIf { it.exists() }?.asBinary() ?: Binary.EMPTY,
io.readMetaFileOrNull(metaFile) ?: Meta.EMPTY
)
)
}
Files.list(path).forEach { childPath -> Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString() val fileName = childPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) { if (!fileName.startsWith("@")) {
meta(readMetaFile(childPath)) files(io, childPath, pathToName)
} else if (!fileName.startsWith("@")) {
file(childPath, pathToName, formatResolver)
} }
} }
} }
/** public fun DataSink<Binary>.files(io: IOPlugin, path: Path, pathToName: (Path) -> Name = defaultPathToName) {
* Read the directory as a data node. If [path] is a zip archive, read it as directory if (path.isRegularFile() && path.extension == "zip") {
*/
@DFExperimental
@DFInternal
public fun <T : Any> IOPlugin.readDataDirectory(
type: KType,
path: Path,
pathToName: (Path) -> Name = defaultPathToName,
formatResolver: FileFormatResolver<T>,
): LegacyDataTree<T> {
//read zipped data node
if (path.fileName != null && path.fileName.toString().endsWith(".zip")) {
//Using explicit Zip file system to avoid bizarre compatibility bugs //Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" } val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found") ?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, mapOf("create" to "true")) val fs = fsProvider.newFileSystem(path, mapOf("create" to "true"))
return readDataDirectory(type, fs.rootDirectories.first(), pathToName, formatResolver) return files(io, fs.rootDirectories.first(), pathToName)
} }
if (!Files.isDirectory(path)) error("Provided path $path is not a directory") if (path.isRegularFile()) {
return DataTree(type) { file(io, path, pathToName(path))
meta { } else {
FileData.FILE_PATH_KEY put path.toString() directory(io, path, pathToName)
}
directory(path, pathToName, formatResolver)
} }
} }
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(
path: Path,
noinline pathToName: (Path) -> Name = defaultPathToName,
noinline formatResolver: FileFormatResolver<T>,
): LegacyDataTree<T> = readDataDirectory(typeOf<T>(), path, pathToName, formatResolver)
/**
* Read a raw binary data tree from the directory. All files are read as-is (save for meta files).
*/
@DFExperimental
public fun IOPlugin.readRawDirectory(
path: Path,
pathToName: (Path) -> Name = defaultPathToName,
): LegacyDataTree<Binary> = readDataDirectory(path, pathToName) { _, _ -> IOReader.binary }
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) }) private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@DFInternal @DFInternal
@DFExperimental @DFExperimental
public fun <T : Any> IOPlugin.monitorDataDirectory( public fun DataSink<Binary>.monitorFiles(
type: KType, io: IOPlugin,
path: Path, path: Path,
pathToName: (Path) -> Name = defaultPathToName, pathToName: (Path) -> Name = defaultPathToName,
formatResolver: FileFormatResolver<T>, scope: CoroutineScope = io.context,
): DataSource<T> { ): Job {
if (path.fileName.toString().endsWith(".zip")) error("Monitoring not supported for ZipFS") files(io, path, pathToName)
if (!Files.isDirectory(path)) error("Provided path $path is not a directory") return scope.launch(Dispatchers.IO) {
return DataSource(type, context) {
directory(path, pathToName, formatResolver)
launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService() val watchService = path.fileSystem.newWatchService()
path.register( path.register(
@ -193,13 +153,11 @@ public fun <T : Any> IOPlugin.monitorDataDirectory(
for (event: WatchEvent<*> in key.pollEvents()) { for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
remove(eventPath.toName()) data(eventPath.toName(), null)
} else { } else {
val fileName = eventPath.fileName.toString() val fileName = eventPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) { if (!fileName.startsWith("@")) {
meta(readMetaFile(eventPath)) files(io, eventPath, pathToName)
} else if (!fileName.startsWith("@")) {
file(eventPath, pathToName, formatResolver)
} }
} }
} }
@ -207,30 +165,9 @@ public fun <T : Any> IOPlugin.monitorDataDirectory(
} }
} while (isActive && key != null) } while (isActive && key != null)
} }
}
} }
/**
* Start monitoring given directory ([path]) as a [DataSource].
*/
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.monitorDataDirectory(
path: Path,
noinline pathToName: (Path) -> Name = defaultPathToName,
noinline formatResolver: FileFormatResolver<T>,
): DataSource<T> = monitorDataDirectory(typeOf<T>(), path, pathToName, formatResolver)
/**
* Read and monitor raw binary data tree from the directory. All files are read as-is (save for meta files).
*/
@DFExperimental
public fun IOPlugin.monitorRawDirectory(
path: Path,
pathToName: (Path) -> Name = defaultPathToName,
): DataSource<Binary> = monitorDataDirectory(path, pathToName) { _, _ -> IOReader.binary }
/** /**
* Write the data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider * Write the data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
* *
@ -239,7 +176,7 @@ public fun IOPlugin.monitorRawDirectory(
@DFExperimental @DFExperimental
public suspend fun <T : Any> IOPlugin.writeDataDirectory( public suspend fun <T : Any> IOPlugin.writeDataDirectory(
path: Path, path: Path,
dataSet: DataSet<T>, dataSet: DataTree<T>,
format: IOWriter<T>, format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null, envelopeFormat: EnvelopeFormat? = null,
nameToPath: (name: Name, data: Data<T>) -> Path = { name, _ -> nameToPath: (name: Name, data: Data<T>) -> Path = { name, _ ->
@ -262,62 +199,28 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory(
writeEnvelopeDirectory(childPath, envelope) writeEnvelopeDirectory(childPath, envelope)
} }
} }
val directoryMeta = dataSet.meta dataSet.meta?.let { writeMetaFile(path, it) }
writeMetaFile(path, directoryMeta)
} }
} }
/** /**
* Reads the specified resources and returns a [LegacyDataTree] containing the data.
*
* @param resources The names of the resources to read. * @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader. * @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
* @return A DataTree containing the data read from the resources.
*/ */
@DFExperimental @DFExperimental
public fun IOPlugin.readResources( public fun DataSink<Binary>.resources(
io: IOPlugin,
vararg resources: String, vararg resources: String,
pathToName: (Path) -> Name = defaultPathToName, pathToName: (Path) -> Name = defaultPathToName,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader, classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
): LegacyDataTree<Binary> = GenericDataTree { ) {
resources.forEach { resource -> resources.forEach { resource ->
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error( val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error(
"Resource with name $resource is not resolved" "Resource with name $resource is not resolved"
) )
node(resource, readRawDirectory(path, pathToName)) branch(resource.asName()) {
files(io, path, pathToName)
}
} }
} }
/**
* Add file/directory-based data tree item
*/
context(IOPlugin)
@OptIn(DFInternal::class)
@DFExperimental
public fun <T : Any> DataSetBuilder<T>.file(
path: Path,
pathToName: (Path) -> Name = defaultPathToName,
formatResolver: FileFormatResolver<out T>,
) {
try {
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
val data = readDataFile(path, formatResolver)
if (data == null) {
logger.warn { "File format is not resolved for $path. Skipping." }
return
}
val name: Name = data.meta[Envelope.ENVELOPE_NAME_KEY].string?.parseAsName() ?: pathToName(path.last())
data(name, data)
} else {
//otherwise, read as directory
val data: LegacyDataTree<T> = readDataDirectory(dataType, path, pathToName, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string?.parseAsName() ?: pathToName(path.last())
node(name, data)
}
} catch (ex: Exception) {
logger.error(ex) { "Failed to read file or directory at $path: ${ex.message}" }
}
}

View File

@ -16,8 +16,8 @@ import space.kscience.dataforge.names.matches
*/ */
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> = public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> =
DataSelector<T> { workspace, meta -> DataSelector<T> { workspace, _ ->
workspace.data.filterByType { name, _ -> workspace.data.filterByType { name, _, _ ->
namePattern == null || name.matches(namePattern) namePattern == null || name.matches(namePattern)
} }
} }
@ -25,4 +25,4 @@ public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern:
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask( public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
task: Name, task: Name,
taskMeta: Meta = Meta.EMPTY, taskMeta: Meta = Meta.EMPTY,
): DataTree<T> = workspace.produce(task, taskMeta).data.filterByType() ): DataTree<T> = workspace.produce(task, taskMeta).filterByType()

View File

@ -2,8 +2,7 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataTreeItem import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.LegacyDataTree
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files
@ -15,14 +14,13 @@ import java.util.zip.ZipOutputStream
private suspend fun <T : Any> ZipOutputStream.writeNode( private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String, name: String,
treeItem: DataTreeItem<T>, tree: DataTree<T>,
dataFormat: IOFormat<T>, dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat, envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) { ): Unit = withContext(Dispatchers.IO) {
when (treeItem) {
is DataTreeItem.Leaf -> {
//TODO add directory-based envelope writer //TODO add directory-based envelope writer
val envelope = treeItem.data.toEnvelope(dataFormat) tree.data?.let {
val envelope = it.toEnvelope(dataFormat)
val entry = ZipEntry(name) val entry = ZipEntry(name)
putNextEntry(entry) putNextEntry(entry)
@ -31,26 +29,24 @@ private suspend fun <T : Any> ZipOutputStream.writeNode(
writeWith(envelopeFormat, envelope) writeWith(envelopeFormat, envelope)
} }
write(bytes) write(bytes)
} }
is DataTreeItem.Node -> {
val entry = ZipEntry("$name/") val entry = ZipEntry("$name/")
putNextEntry(entry) putNextEntry(entry)
closeEntry() closeEntry()
treeItem.tree.items.forEach { (token, item) -> tree.items.forEach { (token, item) ->
val childName = "$name/$token" val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat) writeNode(childName, item, dataFormat, envelopeFormat)
} }
}
}
} }
/** /**
* Write this [LegacyDataTree] as a zip archive * Write this [DataTree] as a zip archive
*/ */
@DFExperimental @DFExperimental
public suspend fun <T : Any> LegacyDataTree<T>.writeZip( public suspend fun <T : Any> DataTree<T>.writeZip(
path: Path, path: Path,
format: IOFormat<T>, format: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat, envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
@ -68,6 +64,6 @@ public suspend fun <T : Any> LegacyDataTree<T>.writeZip(
) )
val zos = ZipOutputStream(fos) val zos = ZipOutputStream(fos)
zos.use { zos.use {
it.writeNode("", DataTreeItem.Node(this@writeZip), format, envelopeFormat) it.writeNode("", this@writeZip, format, envelopeFormat)
} }
} }

View File

@ -4,7 +4,6 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.boolean
@ -37,7 +36,6 @@ internal class CachingWorkspaceTest {
} }
} }
@Suppress("UNUSED_VARIABLE")
val doSecond by task<Any> { val doSecond by task<Any> {
transformEach( transformEach(
doFirst, doFirst,
@ -54,11 +52,11 @@ internal class CachingWorkspaceTest {
val secondB = workspace.produce("doSecond", Meta { "flag" put true }) val secondB = workspace.produce("doSecond", Meta { "flag" put true })
val secondC = workspace.produce("doSecond") val secondC = workspace.produce("doSecond")
coroutineScope { coroutineScope {
first.startAll(this) first.compute(this)
secondA.startAll(this) secondA.compute(this)
secondB.startAll(this) secondB.compute(this)
//repeat to check caching //repeat to check caching
secondC.startAll(this) secondC.compute(this)
} }
assertEquals(10, firstCounter) assertEquals(10, firstCounter)
assertEquals(10, secondCounter) assertEquals(10, secondCounter)

View File

@ -20,13 +20,13 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val result: Data<Int> = selectedData.foldToData(0) { result, data -> val result: Data<Int> = selectedData.foldToData(0) { result, data ->
result + data.value result + data.value
} }
emit("result", result) data("result", result)
} }
val singleData by task<Int> { val singleData by task<Int> {
workspace.data.filterByType<Int>()["myData[12]"]?.let { workspace.data.filterByType<Int>()["myData[12]"]?.let {
emit("result", it) data("result", it)
} }
} }
@ -55,12 +55,12 @@ class DataPropagationTest {
@Test @Test
fun testAllData() = runTest { fun testAllData() = runTest {
val node = testWorkspace.produce("Test.allData") val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.asSequence().single().await()) assertEquals(4950, node.content.asSequence().single().await())
} }
@Test @Test
fun testSingleData() = runTest { fun testSingleData() = runTest {
val node = testWorkspace.produce("Test.singleData") val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.asSequence().single().await()) assertEquals(12, node.content.asSequence().single().await())
} }
} }

View File

@ -9,38 +9,32 @@ import kotlinx.io.writeString
import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.* import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.Envelope import space.kscience.dataforge.io.*
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.io
import space.kscience.dataforge.io.readEnvelopeFile
import space.kscience.dataforge.io.yaml.YamlPlugin import space.kscience.dataforge.io.yaml.YamlPlugin
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files
import kotlin.io.path.fileSize import kotlin.io.path.fileSize
import kotlin.io.path.toPath import kotlin.io.path.toPath
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.Test import kotlin.test.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
class FileDataTest { class FileDataTest {
val dataNode = GenericDataTree<String> { val dataNode = DataTree<String> {
node("dir") { branch("dir") {
static("a", "Some string") { static("a", "Some string") {
"content" put "Some string" "content" put "Some string"
} }
} }
static("b", "root data") static("b", "root data")
meta { // meta {
"content" put "This is root meta node" // "content" put "This is root meta node"
} // }
} }
object StringIOFormat : IOFormat<String> { object StringIOFormat : IOFormat<String> {
override val type: KType get() = typeOf<String>()
override fun writeTo(sink: Sink, obj: String) { override fun writeTo(sink: Sink, obj: String) {
sink.writeString(obj) sink.writeString(obj)
@ -52,11 +46,13 @@ class FileDataTest {
@Test @Test
@DFExperimental @DFExperimental
fun testDataWriteRead() = with(Global.io) { fun testDataWriteRead() = with(Global.io) {
val io = Global.io
val dir = Files.createTempDirectory("df_data_node") val dir = Files.createTempDirectory("df_data_node")
runBlocking { runBlocking {
writeDataDirectory(dir, dataNode, StringIOFormat) writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString()) println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir) { _, _ -> StringIOFormat } val reconstructed = DataTree { files(io, dir) }
.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
} }
@ -66,14 +62,15 @@ class FileDataTest {
@Test @Test
@DFExperimental @DFExperimental
fun testZipWriteRead() = runTest { fun testZipWriteRead() = runTest {
with(Global.io) { val io = Global.io
val zip = Files.createTempFile("df_data_node", ".zip") val zip = Files.createTempFile("df_data_node", ".zip")
dataNode.writeZip(zip, StringIOFormat) dataNode.writeZip(zip, StringIOFormat)
println(zip.toUri().toString()) println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat } val reconstructed = DataTree { files(io, zip) }
.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
} }
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)

View File

@ -3,7 +3,6 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static import space.kscience.dataforge.data.static
import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files import java.nio.file.Files
@ -22,13 +21,12 @@ class FileWorkspaceCacheTest {
} }
fileCache(Files.createTempDirectory("dataforge-temporary-cache")) fileCache(Files.createTempDirectory("dataforge-temporary-cache"))
@Suppress("UNUSED_VARIABLE")
val echo by task<String> { val echo by task<String> {
transformEach(dataByType<String>()) { arg, _, _ -> arg } transformEach(dataByType<String>()) { arg, _, _ -> arg }
} }
} }
workspace.produce("echo").startAll(this) workspace.produce("echo").compute(this)
} }
} }

View File

@ -27,8 +27,8 @@ public fun <P : Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory
override val tag: PluginTag = this@toFactory.tag override val tag: PluginTag = this@toFactory.tag
} }
public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet<Any> = runBlocking { public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataTree<*> = runBlocking {
produce(task, block) produce(task, block).content
} }
@OptIn(DFExperimental::class) @OptIn(DFExperimental::class)
@ -68,8 +68,8 @@ internal class SimpleWorkspaceTest {
val filterOne by task<Int> { val filterOne by task<Int> {
val name by taskMeta.string { error("Name field not defined") } val name by taskMeta.string { error("Name field not defined") }
from(testPluginFactory) { test }.getByType<Int>(name)?.let { source -> from(testPluginFactory) { test }[name]?.let { source: Data<Int> ->
data(source.name, source.map { it }) data(name, source)
} }
} }
@ -110,14 +110,14 @@ internal class SimpleWorkspaceTest {
} }
val averageByGroup by task<Int> { val averageByGroup by task<Int> {
val evenSum = workspace.data.filterByType<Int> { name, _ -> val evenSum = workspace.data.filterByType<Int> { name, _, _ ->
name.toString().toInt() % 2 == 0 name.toString().toInt() % 2 == 0
}.foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.value l + r.value
} }
data("even", evenSum) data("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _ -> val oddSum = workspace.data.filterByType<Int> { name, _, _ ->
name.toString().toInt() % 2 == 1 name.toString().toInt() % 2 == 1
}.foldToData(0) { l, r -> }.foldToData(0) { l, r ->
l + r.value l + r.value
@ -159,7 +159,7 @@ internal class SimpleWorkspaceTest {
@Timeout(1) @Timeout(1)
fun testMetaPropagation() = runTest { fun testMetaPropagation() = runTest {
val node = workspace.produce("sum") { "testFlag" put true } val node = workspace.produce("sum") { "testFlag" put true }
val res = node.asSequence().single().await() val res = node.single().await()
} }
@Test @Test
@ -170,20 +170,25 @@ internal class SimpleWorkspaceTest {
} }
@Test @Test
fun testFullSquare() { fun testFullSquare() = runTest {
runBlocking { val result = workspace.produce("fullSquare")
val node = workspace.produce("fullSquare") result.forEach {
println(node.toMeta()) println(
"""
Name: ${it.name}
Meta: ${it.meta}
Data: ${it.data.await()}
""".trimIndent()
)
} }
} }
@Test @Test
fun testFilter() { fun testFilter() = runTest {
runBlocking {
val node = workspace.produce("filterOne") { val node = workspace.produce("filterOne") {
"name" put "myData[12]" "name" put "myData[12]"
} }
assertEquals(12, node.single().await()) assertEquals(12, node.single().await())
} }
}
} }