diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt index e8249f0b..af0aab00 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/AbstractAction.kt @@ -35,16 +35,16 @@ public abstract class AbstractAction<T, R>( * Update part of the data set using provided data * * @param source the source data tree in case we need several data items to update - * @param meta the metadata used for the whole data tree + * @param actionMeta the metadata used for the whole data tree * @param updatedData an updated item */ protected open suspend fun DataSink<R>.update( source: DataTree<T>, - meta: Meta, - updatedData: DataUpdate<T>, + actionMeta: Meta, + updateName: Name, ) { //by default regenerate the whole data set - putAll(generate(source, meta)) + putAll(generate(source, actionMeta)) } @OptIn(UnsafeKType::class) @@ -64,8 +64,8 @@ public abstract class AbstractAction<T, R>( } with(updateSink) { - source.updates.collect { du: DataUpdate<T> -> - update(source, meta, du) + source.updates.collect { + update(source, meta, it) } } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt index fefa12d1..2e4b2ddc 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/MapAction.kt @@ -85,8 +85,8 @@ public class MapAction<T, R>( } override fun DataBuilderScope<R>.generate(source: DataTree<T>, meta: Meta): Map<Name, Data<R>> = buildMap { - source.forEach { - val (name, data) = mapOne(it.name, it.data, meta) + source.forEach { data -> + val (name, data) = mapOne(data.name, data, meta) if (data != null) { check(name !in keys) { "Data with key $name already exist in the result" } put(name, data) @@ -96,10 +96,10 @@ public class MapAction<T, R>( override suspend fun DataSink<R>.update( source: DataTree<T>, - meta: Meta, - updatedData: DataUpdate<T>, + actionMeta: Meta, + updateName: Name, ) { - val (name, data) = mapOne(updatedData.name, updatedData.data, meta) + val (name, data) = mapOne(updateName, source.read(updateName), actionMeta) put(name, data) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt index 73388fec..d7bacda5 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/ReduceAction.kt @@ -95,7 +95,7 @@ internal class ReduceAction<T, R>( ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(source).forEach { group -> val dataFlow: Map<Name, Data<T>> = group.data.asSequence().fold(HashMap()) { acc, value -> acc.apply { - acc[value.name] = value.data + acc[value.name] = value } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt index bf3284be..6dfbc7c9 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/actions/SplitAction.kt @@ -80,7 +80,7 @@ internal class SplitAction<T, R>( meta: Meta ): Map<Name, Data<R>> = buildMap { source.forEach { - splitOne(it.name, it.data, meta).forEach { (name, data) -> + splitOne(it.name, it, meta).forEach { (name, data) -> check(name !in keys) { "Data with key $name already exist in the result" } if (data != null) { put(name, data) @@ -91,10 +91,10 @@ internal class SplitAction<T, R>( override suspend fun DataSink<R>.update( source: DataTree<T>, - meta: Meta, - updatedData: DataUpdate<T>, + actionMeta: Meta, + updateName: Name, ) { - putAll(splitOne(updatedData.name, updatedData.data, meta)) + putAll(splitOne(updateName, source.read(updateName), actionMeta)) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataFilter.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataFilter.kt index 38174e50..5538cc28 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataFilter.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataFilter.kt @@ -18,23 +18,25 @@ public fun interface DataFilter { } -public fun DataFilter.accepts(update: DataUpdate<*>): Boolean = accepts(update.name, update.data?.meta, update.type) +//public fun DataFilter.accepts(update: DataUpdate<*>): Boolean = accepts(update.name, update.data?.meta, update.type) -public fun <T, DU : DataUpdate<T>> Sequence<DU>.filterData(predicate: DataFilter): Sequence<DU> = filter { data -> - predicate.accepts(data) -} - -public fun <T, DU : DataUpdate<T>> Flow<DU>.filterData(predicate: DataFilter): Flow<DU> = filter { data -> - predicate.accepts(data) -} +//public fun <T, DU : DataUpdate<T>> Sequence<DU>.filterData(predicate: DataFilter): Sequence<DU> = filter { data -> +// predicate.accepts(data) +//} +// +//public fun <T, DU : DataUpdate<T>> Flow<DU>.filterData(predicate: DataFilter): Flow<DU> = filter { data -> +// predicate.accepts(data) +//} public fun <T> DataSource<T>.filterData( - predicate: DataFilter, + dataFilter: DataFilter, ): DataSource<T> = object : DataSource<T> { override val dataType: KType get() = this@filterData.dataType override fun read(name: Name): Data<T>? = - this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) } + this@filterData.read(name)?.takeIf { + dataFilter.accepts(name, it.meta, it.type) + } } /** @@ -43,8 +45,12 @@ public fun <T> DataSource<T>.filterData( public fun <T> ObservableDataSource<T>.filterData( predicate: DataFilter, ): ObservableDataSource<T> = object : ObservableDataSource<T> { - override val updates: Flow<DataUpdate<T>> - get() = this@filterData.updates.filter { predicate.accepts(it) } + + override val updates: Flow<Name> + get() = this@filterData.updates.filter { + val data = read(it) + predicate.accepts(it, data?.meta, data?.type ?: dataType) + } override val dataType: KType get() = this@filterData.dataType @@ -70,8 +76,11 @@ internal class FilteredDataTree<T>( ?.filter { !it.value.isEmpty() } ?: emptyMap() - override val updates: Flow<DataUpdate<T>> - get() = source.updates.filter { filter.accepts(it) } + override val updates: Flow<Name> + get() = source.updates.filter { + val data = read(it) + filter.accepts(it, data?.meta, data?.type ?: dataType) + } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt index ccd37514..c8a0f2a7 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSink.kt @@ -8,14 +8,17 @@ import space.kscience.dataforge.names.* import kotlin.reflect.KType import kotlin.reflect.typeOf -public interface DataBuilderScope<in T>{ - public companion object: DataBuilderScope<Nothing> +public interface DataBuilderScope<in T> { + public companion object : DataBuilderScope<Nothing> } @Suppress("UNCHECKED_CAST") public fun <T> DataBuilderScope(): DataBuilderScope<T> = DataBuilderScope as DataBuilderScope<T> -public fun interface DataSink<in T>: DataBuilderScope<T> { +/** + * Asynchronous data sink + */ +public fun interface DataSink<in T> : DataBuilderScope<T> { /** * Put data and notify listeners if needed */ @@ -59,7 +62,7 @@ private class MutableDataTreeRoot<T>( ) : MutableDataTree<T> { override val items = HashMap<NameToken, MutableDataTree<T>>() - override val updates = MutableSharedFlow<DataUpdate<T>>(extraBufferCapacity = 100) + override val updates = MutableSharedFlow<Name>(extraBufferCapacity = 100) inner class MutableDataTreeBranch(val branchName: Name) : MutableDataTree<T> { @@ -67,10 +70,8 @@ private class MutableDataTreeRoot<T>( override val items = HashMap<NameToken, MutableDataTree<T>>() - override val updates: Flow<DataUpdate<T>> = this@MutableDataTreeRoot.updates.mapNotNull { update -> - update.name.removeFirstOrNull(branchName)?.let { - DataUpdate(update.data?.type ?: dataType, it, update.data) - } + override val updates: Flow<Name> = this@MutableDataTreeRoot.updates.mapNotNull { update -> + update.removeFirstOrNull(branchName) } override val dataType: KType get() = this@MutableDataTreeRoot.dataType @@ -80,7 +81,7 @@ private class MutableDataTreeRoot<T>( override suspend fun put(token: NameToken, data: Data<T>?) { this.data = data - this@MutableDataTreeRoot.updates.emit(DataUpdate(data?.type ?: dataType, branchName + token, data)) + this@MutableDataTreeRoot.updates.emit(branchName + token) } } @@ -92,7 +93,7 @@ private class MutableDataTreeRoot<T>( override suspend fun put(token: NameToken, data: Data<T>?) { this.data = data - updates.emit(DataUpdate(data?.type ?: dataType, token.asName(), data)) + updates.emit(token.asName()) } } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt index dfde4e0b..7ee87180 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/DataSource.kt @@ -1,8 +1,6 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.* import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.* import kotlin.contracts.contract @@ -31,13 +29,17 @@ public interface DataSource<out T> { public interface ObservableDataSource<out T> : DataSource<T> { /** - * Flow updates made to the data. Updates are considered critical. The producer will suspend unless all updates are consumed. + * Names of updated elements. + * Data updates with the same names could be glued together. + * + * Updates are considered critical. + * The producer will suspend unless all updates are consumed. */ - public val updates: Flow<DataUpdate<T>> + public val updates: Flow<Name> } public suspend fun <T> ObservableDataSource<T>.awaitData(name: Name): Data<T> { - return read(name) ?: updates.first { it.name == name && it.data != null }.data!! + return read(name) ?: updates.filter { it == name }.map { read(name) }.filterNotNull().first() } public suspend fun <T> ObservableDataSource<T>.awaitData(name: String): Data<T> = @@ -59,7 +61,7 @@ public interface DataTree<out T> : ObservableDataSource<T> { /** * Flow updates made to the data */ - override val updates: Flow<DataUpdate<T>> + override val updates: Flow<Name> public companion object { private object EmptyDataTree : DataTree<Nothing> { @@ -68,7 +70,7 @@ public interface DataTree<out T> : ObservableDataSource<T> { override val dataType: KType = typeOf<Unit>() override fun read(name: Name): Data<Nothing>? = null - override val updates: Flow<DataUpdate<Nothing>> get() = emptyFlow() + override val updates: Flow<Name> get() = emptyFlow() } public val EMPTY: DataTree<Nothing> = EmptyDataTree diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/Goal.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/Goal.kt index 95ddbbf7..e54710b1 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/Goal.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/Goal.kt @@ -32,7 +32,7 @@ public interface Goal<out T> { public companion object } -public fun Goal<*>.launch(coroutineScope: CoroutineScope): Job = async(coroutineScope) +public fun Goal<*>.launchIn(coroutineScope: CoroutineScope): Job = async(coroutineScope) public suspend fun <T> Goal<T>.await(): T = coroutineScope { async(this).await() } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt index 54a9715c..b20736ae 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/NamedData.kt @@ -3,43 +3,16 @@ package space.kscience.dataforge.data import space.kscience.dataforge.meta.isEmpty import space.kscience.dataforge.misc.Named import space.kscience.dataforge.names.Name -import kotlin.reflect.KType - -/** - * An interface implementing a data update event. - * - * If [data] is null, then corresponding element should be removed. - */ -public interface DataUpdate<out T> : Named { - public val type: KType - override val name: Name - public val data: Data<T>? -} - -public fun <T> DataUpdate(type: KType, name: Name, data: Data<T>?): DataUpdate<T> = object : DataUpdate<T> { - override val type: KType = type - override val name: Name = name - override val data: Data<T>? = data - - override fun toString(): String { - return "DataUpdate(type=$type, name=$name, data=$data)" - } - -} /** * A data coupled to a name. */ -public interface NamedData<out T> : DataUpdate<T>, Data<T> { - override val data: Data<T> -} +public interface NamedData<out T> : Data<T>, Named -public operator fun NamedData<*>.component1(): Name = name -public operator fun <T> NamedData<T>.component2(): Data<T> = data private class NamedDataImpl<T>( override val name: Name, - override val data: Data<T>, + val data: Data<T>, ) : Data<T> by data, NamedData<T> { override fun toString(): String = buildString { append("NamedData(name=\"$name\"") @@ -54,7 +27,7 @@ private class NamedDataImpl<T>( } public fun <T> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) { - NamedDataImpl(name, this.data) + NamedDataImpl(name, this) } else { NamedDataImpl(name, this) } diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt new file mode 100644 index 00000000..0c1fe0b9 --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/StaticDataBuilder.kt @@ -0,0 +1,63 @@ +package space.kscience.dataforge.data + +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MutableMeta +import space.kscience.dataforge.misc.UnsafeKType +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.parseAsName +import space.kscience.dataforge.names.plus +import kotlin.reflect.KType +import kotlin.reflect.typeOf + + +public fun interface StaticDataBuilder<T> : DataBuilderScope<T> { + public fun put(name: Name, data: Data<T>) +} + +private class DataMapBuilder<T> : StaticDataBuilder<T> { + val map = mutableMapOf<Name, Data<T>>() + + override fun put(name: Name, data: Data<T>) { + if (map.containsKey(name)) { + error("Duplicate key '$name'") + } else { + map.put(name, data) + } + } +} + +public fun <T> StaticDataBuilder<T>.put(name: String, data: Data<T>) { + put(name.parseAsName(), data) +} + +public inline fun <T, reified T1 : T> StaticDataBuilder<T>.putValue( + name: String, + value: T1, + metaBuilder: MutableMeta.() -> Unit = {} +) { + put(name, Data(value, Meta(metaBuilder))) +} + +public fun <T> StaticDataBuilder<T>.putAll(prefix: Name, block: StaticDataBuilder<T>.() -> Unit) { + val map = DataMapBuilder<T>().apply(block).map + map.forEach { (name, data) -> + put(prefix + name, data) + } +} + +public fun <T> StaticDataBuilder<T>.putAll(prefix: String, block: StaticDataBuilder<T>.() -> Unit) = + putAll(prefix.parseAsName(), block) + +public fun <T> StaticDataBuilder<T>.putAll(prefix: String, tree: DataTree<T>) { + tree.forEach { data -> + put(prefix + data.name, data) + } +} + +@UnsafeKType +public fun <T> DataTree.Companion.static(type: KType, block: StaticDataBuilder<T>.() -> Unit): DataTree<T> = + DataMapBuilder<T>().apply(block).map.asTree(type) + +@OptIn(UnsafeKType::class) +public inline fun <reified T> DataTree.Companion.static(noinline block: StaticDataBuilder<T>.() -> Unit): DataTree<T> = + static(typeOf<T>(), block) \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt index b3433616..640e8541 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataBuilders.kt @@ -9,7 +9,7 @@ import space.kscience.dataforge.names.plus public suspend fun <T> DataSink<T>.put(value: NamedData<T>) { - put(value.name, value.data) + put(value.name, value) } public inline fun <T> DataSink<T>.putAll( @@ -89,7 +89,7 @@ public suspend inline fun <reified T> DataSink<T>.putValue( public suspend fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) { sequence.forEach { - put(it.name, it.data) + put(it) } } @@ -99,19 +99,27 @@ public suspend fun <T> DataSink<T>.putAll(map: Map<Name, Data<T>?>) { } } -public suspend fun <T> DataSink<T>.putAll(tree: DataTree<T>) { - putAll(tree.asSequence()) -} +//public suspend fun <T> DataSink<T>.putAll(tree: DataTree<T>) { +// putAll(tree.asSequence()) +//} /** - * Copy given data set and mirror its changes to this [DataSink]. Suspends indefinitely. + * Suspends indefinitely. */ -public suspend fun <T : Any> DataSink<T>.putAllAndWatch( - source: DataTree<T>, - branchName: Name = Name.EMPTY, +public suspend fun <T : Any> DataSink<T>.watch( + source: ObservableDataSource<T>, + prefix: Name = Name.EMPTY, ) { - putAll(branchName, source) +// putAll(branchName, source) source.updates.collect { - put(branchName + it.name, it.data) + put(prefix + it, source.read(it)) } +} + +public suspend fun <T : Any> MutableDataTree<T>.putAllAndWatch( + source: DataTree<T>, + prefix: Name = Name.EMPTY, +) { + putAll(prefix, source) + watch(source,prefix) } \ No newline at end of file diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt index 80105fc6..a8d5ac20 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTransform.kt @@ -200,40 +200,44 @@ public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData( @UnsafeKType -public fun <T, R> DataTree<T>.map( +public fun <T, R> DataTree<T>.transformEach( outputType: KType, scope: CoroutineScope, - metaTransform: MutableMeta.() -> Unit = {}, + metaTransform: MutableMeta.(name: Name) -> Unit = {}, compute: suspend (NamedValueWithMeta<T>) -> R, ): DataTree<R> = DataTree<R>( outputType, scope, initialData = asSequence().associate { namedData: NamedData<T> -> - val newMeta = namedData.meta.toMutableMeta().apply(metaTransform).seal() + val newMeta = namedData.meta.toMutableMeta().apply { + metaTransform(namedData.name) + }.seal() val newData = Data(outputType, newMeta, scope.coroutineContext, listOf(namedData)) { compute(namedData.awaitWithMeta()) } namedData.name to newData } ) { - updates.collect { update -> - val data: Data<T>? = update.data - if (data == null) put(update.name, null) else { - val newMeta = data.meta.toMutableMeta().apply(metaTransform).seal() + updates.collect { name -> + val data: Data<T>? = read(name) + if (data == null) put(name, null) else { + val newMeta = data.meta.toMutableMeta().apply { + metaTransform(name) + }.seal() val d = Data(outputType, newMeta, scope.coroutineContext, listOf(data)) { - compute(NamedValueWithMeta(update.name, data.await(), data.meta)) + compute(NamedValueWithMeta(name, data.await(), data.meta)) } - put(update.name, d) + put(name, d) } } } @OptIn(UnsafeKType::class) -public inline fun <T, reified R> DataTree<T>.map( +public inline fun <T, reified R> DataTree<T>.transformEach( scope: CoroutineScope, - noinline metaTransform: MutableMeta.() -> Unit = {}, + noinline metaTransform: MutableMeta.(name: Name) -> Unit = {}, noinline block: suspend (NamedValueWithMeta<T>) -> R, -): DataTree<R> = map(typeOf<R>(), scope, metaTransform, block) +): DataTree<R> = transformEach(typeOf<R>(), scope, metaTransform, block) public inline fun <T> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) { asSequence().forEach(block) diff --git a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTreeBuilder.kt b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTreeBuilder.kt index 20afcc76..664f9904 100644 --- a/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTreeBuilder.kt +++ b/dataforge-data/src/commonMain/kotlin/space/kscience/dataforge/data/dataTreeBuilder.kt @@ -17,7 +17,7 @@ import kotlin.reflect.typeOf private class FlatDataTree<T>( override val dataType: KType, private val dataSet: Map<Name, Data<T>>, - private val sourceUpdates: SharedFlow<DataUpdate<T>>, + private val sourceUpdates: SharedFlow<Name>, private val prefix: Name, ) : DataTree<T> { override val data: Data<T>? get() = dataSet[prefix] @@ -29,10 +29,9 @@ private class FlatDataTree<T>( override fun read(name: Name): Data<T>? = dataSet[prefix + name] - override val updates: Flow<DataUpdate<T>> = - sourceUpdates.mapNotNull { update -> - update.name.removeFirstOrNull(prefix)?.let { DataUpdate(dataType, it, update.data) } - } + override val updates: Flow<Name> = sourceUpdates.mapNotNull { update -> + update.removeFirstOrNull(prefix) + } } /** @@ -47,7 +46,7 @@ private class DataTreeBuilder<T>( private val mutex = Mutex() - private val updatesFlow = MutableSharedFlow<DataUpdate<T>>() + private val updatesFlow = MutableSharedFlow<Name>() override suspend fun put(name: Name, data: Data<T>?) { @@ -58,7 +57,7 @@ private class DataTreeBuilder<T>( map[name] = data } } - updatesFlow.emit(DataUpdate(data?.type ?: type, name, data)) + updatesFlow.emit(name) } public fun build(): DataTree<T> = FlatDataTree(type, map, updatesFlow, Name.EMPTY) @@ -74,7 +73,7 @@ public fun <T> DataTree( initialData: Map<Name, Data<T>> = emptyMap(), updater: suspend DataSink<T>.() -> Unit, ): DataTree<T> = DataTreeBuilder<T>(dataType, initialData).apply { - scope.launch{ + scope.launch { updater() } }.build() @@ -89,6 +88,13 @@ public inline fun <reified T> DataTree( noinline updater: suspend DataSink<T>.() -> Unit, ): DataTree<T> = DataTree(typeOf<T>(), scope, initialData, updater) +@UnsafeKType +public fun <T> DataTree(type: KType, data: Map<Name, Data<T>>): DataTree<T> = + DataTreeBuilder(type, data).build() + +@OptIn(UnsafeKType::class) +public inline fun <reified T> DataTree(data: Map<Name, Data<T>>): DataTree<T> = + DataTree(typeOf<T>(), data) /** * Represent this flat data map as a [DataTree] without copying it @@ -106,7 +112,7 @@ public inline fun <reified T> Map<Name, Data<T>>.asTree(): DataTree<T> = asTree( @UnsafeKType public fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> = - DataTreeBuilder(type, associate { it.name to it.data }).build() + DataTreeBuilder(type, associate { it.name to it }).build() /** diff --git a/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt b/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt index 1f78b36c..1d4d2ea4 100644 --- a/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt +++ b/dataforge-data/src/commonTest/kotlin/space/kscience/dataforge/data/DataTreeBuilderTest.kt @@ -12,7 +12,7 @@ import kotlin.time.Duration.Companion.milliseconds internal class DataTreeBuilderTest { @Test fun testTreeBuild() = runTest(timeout = 500.milliseconds) { - val node = DataTree<Any> { + val node = DataTree.static<Any> { putAll("primary") { putValue("a", "a") putValue("b", "b") @@ -29,20 +29,18 @@ internal class DataTreeBuilderTest { @Test fun testDataUpdate() = runTest(timeout = 500.milliseconds) { - val updateData = DataTree<Any> { - putAll("update") { - put("a", Data.wrapValue("a")) - put("b", Data.wrapValue("b")) - } + val updateData = DataTree.static<Any> { + put("a", Data.wrapValue("a")) + put("b", Data.wrapValue("b")) } - val node = DataTree<Any> { + val node = DataTree.static<Any> { putAll("primary") { putValue("a", "a") putValue("b", "b") } putValue("root", "root") - putAll(updateData) + putAll("update", updateData) } assertEquals("a", node["update.a"]?.await()) @@ -56,11 +54,11 @@ internal class DataTreeBuilderTest { val subNode = MutableDataTree<Int>() val rootNode = MutableDataTree<Int>() { - job = launch { putAllAndWatch(subNode,"sub".asName())} + job = launch { putAllAndWatch(subNode, "sub".asName()) } } repeat(10) { - subNode.updateValue("value[$it]", it) + subNode.putValue("value[$it]", it) } assertEquals(9, subNode.awaitData("value[9]").await()) diff --git a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt index 0cc81f7a..ba7fdadf 100644 --- a/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt +++ b/dataforge-data/src/jvmMain/kotlin/space/kscience/dataforge/data/dataFilterJvm.kt @@ -1,7 +1,5 @@ package space.kscience.dataforge.data -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.filter import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.names.Name import kotlin.reflect.KType @@ -22,15 +20,15 @@ private fun <R> Data<*>.castOrNull(type: KType): Data<R>? = } } -@Suppress("UNCHECKED_CAST") -@DFInternal -public fun <R> Sequence<DataUpdate<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> = - filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>> - -@Suppress("UNCHECKED_CAST") -@DFInternal -public fun <R> Flow<DataUpdate<*>>.filterByDataType(type: KType): Flow<NamedData<R>> = - filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>> +//@Suppress("UNCHECKED_CAST") +//@DFInternal +//public fun <R> Sequence<DataUpdate<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> = +// filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>> +// +//@Suppress("UNCHECKED_CAST") +//@DFInternal +//public fun <R> Flow<DataUpdate<*>>.filterByDataType(type: KType): Flow<NamedData<R>> = +// filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>> /** * Select all data matching given type and filters. Does not modify paths diff --git a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt index e9ec343c..6828b674 100644 --- a/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt +++ b/dataforge-data/src/jvmTest/kotlin/space/kscience/dataforge/data/ActionsTest.kt @@ -19,7 +19,7 @@ internal class ActionsTest { result { it + 1 } } - val data: DataTree<Int> = DataTree { + val data: DataTree<Int> = DataTree.static { repeat(10) { putValue(it.toString(), it) } @@ -42,7 +42,7 @@ internal class ActionsTest { val result: DataTree<Int> = plusOne(source) repeat(10) { - source.updateValue(it.toString(), it) + source.putValue(it.toString(), it) } assertEquals(2, result.awaitData("1").await()) diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt index a1a754a4..f1e9130a 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Task.kt @@ -1,9 +1,9 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.withContext -import space.kscience.dataforge.data.DataSink +import space.kscience.dataforge.data.DataBuilderScope +import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.GoalExecutionRestriction -import space.kscience.dataforge.data.MutableDataTree import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.MetaReader import space.kscience.dataforge.meta.MetaRepr @@ -62,12 +62,12 @@ public interface TaskWithSpec<T, C : Any> : Task<T> { // block: C.() -> Unit = {}, //): TaskResult<T> = execute(workspace, taskName, spec(block)) -public class TaskResultBuilder<T>( +public class TaskResultScope<T>( + public val resultType: KType, public val workspace: Workspace, public val taskName: Name, public val taskMeta: Meta, - private val dataSink: DataSink<T>, -) : DataSink<T> by dataSink +) : DataBuilderScope<T> /** * Create a [Task] that composes a result using [builder]. Only data from the workspace could be used. @@ -77,10 +77,11 @@ public class TaskResultBuilder<T>( * @param descriptor of meta accepted by this task * @param builder for resulting data set */ +@UnsafeKType public fun <T : Any> Task( resultType: KType, descriptor: MetaDescriptor? = null, - builder: suspend TaskResultBuilder<T>.() -> Unit, + builder: suspend TaskResultScope<T>.() -> DataTree<T>, ): Task<T> = object : Task<T> { override val descriptor: MetaDescriptor? = descriptor @@ -89,23 +90,19 @@ public fun <T : Any> Task( workspace: Workspace, taskName: Name, taskMeta: Meta, - ): TaskResult<T> { + ): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) { //TODO use safe builder and check for external data on add and detects cycles - @OptIn(UnsafeKType::class) - val dataset = MutableDataTree<T>(resultType).apply { - TaskResultBuilder(workspace, taskName, taskMeta, this).apply { - withContext(GoalExecutionRestriction() + workspace.goalLogger) { - builder() - } - } - } - return workspace.wrapResult(dataset, taskName, taskMeta) + val dataset = TaskResultScope<T>(resultType, workspace, taskName, taskMeta).builder() + + + workspace.wrapResult(dataset, taskName, taskMeta) } } +@OptIn(UnsafeKType::class) public inline fun <reified T : Any> Task( descriptor: MetaDescriptor? = null, - noinline builder: suspend TaskResultBuilder<T>.() -> Unit, + noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>, ): Task<T> = Task(typeOf<T>(), descriptor, builder) @@ -117,13 +114,11 @@ public inline fun <reified T : Any> Task( * @param specification a specification for task configuration * @param builder for resulting data set */ - - @Suppress("FunctionName") public fun <T : Any, C : MetaRepr> Task( resultType: KType, specification: MetaReader<C>, - builder: suspend TaskResultBuilder<T>.(C) -> Unit, + builder: suspend TaskResultScope<T>.(C) -> DataTree<T>, ): TaskWithSpec<T, C> = object : TaskWithSpec<T, C> { override val spec: MetaReader<C> = specification @@ -134,15 +129,15 @@ public fun <T : Any, C : MetaRepr> Task( ): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) { //TODO use safe builder and check for external data on add and detects cycles val taskMeta = configuration.toMeta() + @OptIn(UnsafeKType::class) - val dataset = MutableDataTree<T>(resultType).apply { - TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder(configuration) } - } + val dataset = TaskResultScope<T>(resultType, workspace, taskName, taskMeta).builder(configuration) + workspace.wrapResult(dataset, taskName, taskMeta) } } public inline fun <reified T : Any, C : MetaRepr> Task( specification: MetaReader<C>, - noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit, + noinline builder: suspend TaskResultScope<T>.(C) -> DataTree<T>, ): Task<T> = Task(typeOf<T>(), specification, builder) \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt index d4d4291a..aff438ca 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/TaskResult.kt @@ -6,7 +6,7 @@ import kotlinx.coroutines.joinAll import kotlinx.coroutines.launch import space.kscience.dataforge.data.DataTree import space.kscience.dataforge.data.asSequence -import space.kscience.dataforge.data.launch +import space.kscience.dataforge.data.launchIn import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.names.Name @@ -33,9 +33,9 @@ public fun <T> Workspace.wrapResult(data: DataTree<T>, taskName: Name, taskMeta: * Start computation for all data elements of this node. * The resulting [Job] is completed only when all of them are completed. */ -public fun TaskResult<*>.launch(scope: CoroutineScope): Job { +public fun TaskResult<*>.launchIn(scope: CoroutineScope): Job { val jobs = asSequence().map { - it.data.launch(scope) + it.launchIn(scope) }.toList() return scope.launch { jobs.joinAll() } } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt index 013c0171..38d90b31 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBuilder.kt @@ -4,20 +4,17 @@ import space.kscience.dataforge.actions.Action import space.kscience.dataforge.context.Context import space.kscience.dataforge.context.ContextBuilder import space.kscience.dataforge.context.Global -import space.kscience.dataforge.data.DataSink import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.MutableDataTree +import space.kscience.dataforge.data.StaticDataBuilder +import space.kscience.dataforge.data.static import space.kscience.dataforge.meta.* import space.kscience.dataforge.meta.descriptors.MetaDescriptor import space.kscience.dataforge.meta.descriptors.MetaDescriptorBuilder import space.kscience.dataforge.misc.DFBuilder -import space.kscience.dataforge.misc.UnsafeKType import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.asName -import kotlin.collections.set import kotlin.properties.PropertyDelegateProvider import kotlin.properties.ReadOnlyProperty -import kotlin.reflect.typeOf public data class TaskReference<T>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> { @@ -42,7 +39,7 @@ public interface TaskContainer { public inline fun <reified T : Any> TaskContainer.registerTask( name: String, descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, - noinline builder: suspend TaskResultBuilder<T>.() -> Unit, + noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>, ): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder)) /** @@ -51,7 +48,7 @@ public inline fun <reified T : Any> TaskContainer.registerTask( public inline fun <reified T : Any> TaskContainer.buildTask( name: String, descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, - noinline builder: suspend TaskResultBuilder<T>.() -> Unit, + noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>, ): TaskReference<T> { val theName = Name.parse(name) val descriptor = MetaDescriptor(descriptorBuilder) @@ -62,7 +59,7 @@ public inline fun <reified T : Any> TaskContainer.buildTask( public inline fun <reified T : Any> TaskContainer.task( descriptor: MetaDescriptor, - noinline builder: suspend TaskResultBuilder<T>.() -> Unit, + noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>, ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property -> val taskName = Name.parse(property.name) val task = Task(descriptor, builder) @@ -75,7 +72,7 @@ public inline fun <reified T : Any> TaskContainer.task( */ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task( specification: MetaReader<C>, - noinline builder: suspend TaskResultBuilder<T>.(C) -> Unit, + noinline builder: suspend TaskResultScope<T>.(C) -> DataTree<T>, ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = PropertyDelegateProvider { _, property -> val taskName = Name.parse(property.name) val task = Task(specification, builder) @@ -88,7 +85,7 @@ public inline fun <reified T : Any, C : MetaRepr> TaskContainer.task( */ public inline fun <reified T : Any> TaskContainer.task( noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, - noinline builder: suspend TaskResultBuilder<T>.() -> Unit, + noinline builder: suspend TaskResultScope<T>.() -> DataTree<T>, ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<T>>> = task(MetaDescriptor(descriptorBuilder), builder) @@ -102,15 +99,15 @@ public inline fun <T : Any, reified R : Any> TaskContainer.action( noinline descriptorBuilder: MetaDescriptorBuilder.() -> Unit = {}, ): PropertyDelegateProvider<Any?, ReadOnlyProperty<Any?, TaskReference<R>>> = task(MetaDescriptor(descriptorBuilder)) { - result(action.execute(from(selector), taskMeta.copy(metaTransform), workspace)) + action.execute(from(selector), taskMeta.copy(metaTransform), workspace) } public class WorkspaceBuilder( private val parentContext: Context = Global, ) : TaskContainer { private var context: Context? = null - @OptIn(UnsafeKType::class) - private val data = MutableDataTree<Any?>(typeOf<Any?>()) + + private var data: DataTree<Any?>? = null private val targets: HashMap<String, Meta> = HashMap() private val tasks = HashMap<Name, Task<*>>() private var cache: WorkspaceCache? = null @@ -125,8 +122,8 @@ public class WorkspaceBuilder( /** * Define intrinsic data for the workspace */ - public fun data(builder: DataSink<Any?>.() -> Unit) { - data.apply(builder) + public fun data(builder: StaticDataBuilder<Any?>.() -> Unit) { + data = DataTree.static(builder) } /** @@ -152,7 +149,7 @@ public class WorkspaceBuilder( val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result -> cache?.cache(result) ?: result } - return WorkspaceImpl(context ?: parentContext, data, targets, tasks, postProcess) + return WorkspaceImpl(context ?: parentContext, data ?: DataTree.EMPTY, targets, tasks, postProcess) } } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt index 15565995..49b485e5 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/taskBuilders.kt @@ -1,12 +1,13 @@ package space.kscience.dataforge.workspace -import space.kscience.dataforge.actions.Action import space.kscience.dataforge.context.PluginFactory import space.kscience.dataforge.data.DataTree -import space.kscience.dataforge.data.forEach -import space.kscience.dataforge.data.putAll -import space.kscience.dataforge.data.transform -import space.kscience.dataforge.meta.* +import space.kscience.dataforge.data.NamedValueWithMeta +import space.kscience.dataforge.data.transformEach +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.MutableMeta +import space.kscience.dataforge.meta.copy +import space.kscience.dataforge.meta.remove import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name import space.kscience.dataforge.names.plus @@ -14,7 +15,7 @@ import space.kscience.dataforge.names.plus /** * A task meta without a node corresponding to the task itself (removing a node with name of the task). */ -public val TaskResultBuilder<*>.defaultDependencyMeta: Meta +public val TaskResultScope<*>.defaultDependencyMeta: Meta get() = taskMeta.copy { remove(taskName) } @@ -25,12 +26,12 @@ public val TaskResultBuilder<*>.defaultDependencyMeta: Meta * @param selector a workspace data selector. Could be either task selector or initial data selector. * @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta]. */ -public suspend fun <T> TaskResultBuilder<*>.from( +public suspend fun <T> TaskResultScope<*>.from( selector: DataSelector<T>, dependencyMeta: Meta = defaultDependencyMeta, ): DataTree<T> = selector.select(workspace, dependencyMeta) -public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from( +public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultScope<*>.from( plugin: P, dependencyMeta: Meta = defaultDependencyMeta, selectorBuilder: P.() -> TaskReference<T>, @@ -50,7 +51,7 @@ public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>. * @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta]. * @param selectorBuilder a builder of task from the plugin. */ -public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from( +public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultScope<*>.from( pluginFactory: PluginFactory<P>, dependencyMeta: Meta = defaultDependencyMeta, selectorBuilder: P.() -> TaskReference<T>, @@ -64,7 +65,7 @@ public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBui return res as TaskResult<T> } -public val TaskResultBuilder<*>.allData: DataSelector<*> +public val TaskResultScope<*>.allData: DataSelector<*> get() = DataSelector { workspace, _ -> workspace.data } /** @@ -77,43 +78,38 @@ public val TaskResultBuilder<*>.allData: DataSelector<*> * @param action process individual data asynchronously. */ @DFExperimental -public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach( +public suspend inline fun <T, reified R> TaskResultScope<R>.transformEach( selector: DataSelector<T>, dependencyMeta: Meta = defaultDependencyMeta, - dataMetaTransform: MutableMeta.(name: Name) -> Unit = {}, - crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R, -) { - from(selector, dependencyMeta).forEach { data -> - val meta = data.meta.toMutableMeta().apply { - taskMeta[taskName]?.let { taskName.put(it) } - dataMetaTransform(data.name) - } - - val res = data.transform(meta, workspace.context.coroutineContext) { - action(it, data.name, meta) - } - - put(data.name, res) + crossinline dataMetaTransform: MutableMeta.(name: Name) -> Unit = {}, + crossinline action: suspend (NamedValueWithMeta<T>) -> R, +): DataTree<R> = from(selector, dependencyMeta).transformEach<T, R>( + workspace.context, + metaTransform = { name -> + taskMeta[taskName]?.let { taskName put it } + dataMetaTransform(name) } -} - -/** - * Set given [dataSet] as a task result. - */ -public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) { - this.putAll(dataSet) -} - -/** - * Use provided [action] to fill the result - */ -@DFExperimental -public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom( - selector: DataSelector<T>, - action: Action<T, R>, - dependencyMeta: Meta = defaultDependencyMeta, ) { - putAll(action.execute(from(selector, dependencyMeta), dependencyMeta, workspace)) + action(it) } +///** +// * Set given [dataSet] as a task result. +// */ +//public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) { +// putAll(dataSet) +//} + +///** +// * Use provided [action] to fill the result +// */ +//@DFExperimental +//public suspend inline fun <T, reified R> TaskResultScope<R>.actionFrom( +// selector: DataSelector<T>, +// action: Action<T, R>, +// dependencyMeta: Meta = defaultDependencyMeta, +//) { +// putAll(action.execute(from(selector, dependencyMeta), dependencyMeta, workspace)) +//} + diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/CachingAction.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/CachingAction.kt index 5f88ab74..9fc91e33 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/CachingAction.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/CachingAction.kt @@ -3,17 +3,25 @@ package space.kscience.dataforge.workspace import space.kscience.dataforge.actions.AbstractAction import space.kscience.dataforge.data.* import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.names.Name import kotlin.reflect.KType -internal class CachingAction<T>(type: KType, private val caching: (NamedData<T>) -> NamedData<T>) : - AbstractAction<T, T>(type) { - override fun DataSink<T>.generate(source: DataTree<T>, meta: Meta) { +internal class CachingAction<T>( + type: KType, private val caching: (NamedData<T>) -> NamedData<T> +) : AbstractAction<T, T>(type) { + + override fun DataBuilderScope<T>.generate( + source: DataTree<T>, + meta: Meta + ): Map<Name, Data<T>> = buildMap { source.forEach { - put(caching(it)) + val cached = caching(it) + put(cached.name, cached) } } - override suspend fun DataSink<T>.update(source: DataTree<T>, meta: Meta, updatedData: DataUpdate<T>) { - put(updatedData.name, updatedData.data?.named(updatedData.name)?.let(caching)) + override suspend fun DataSink<T>.update(source: DataTree<T>, actionMeta: Meta, updateName: Name) { + val updatedData = source.read(updateName) + put(updateName, updatedData?.named(updateName)?.let(caching)) } } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt new file mode 100644 index 00000000..c3cd3a0b --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileDataTree.kt @@ -0,0 +1,185 @@ +package space.kscience.dataforge.workspace + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.StaticData +import space.kscience.dataforge.io.* +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.copy +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.NameToken +import space.kscience.dataforge.names.asName +import space.kscience.dataforge.names.plus +import java.nio.file.* +import java.nio.file.attribute.BasicFileAttributes +import java.nio.file.spi.FileSystemProvider +import kotlin.io.path.* +import kotlin.reflect.KType +import kotlin.reflect.typeOf + + +public class FileDataTree( + public val io: IOPlugin, + public val path: Path, + private val monitor: Boolean = false +) : DataTree<Binary> { + override val dataType: KType = typeOf<Binary>() + + /** + * Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file. + * The operation is blocking since it must read the meta header. The reading of envelope body is lazy + */ + private fun readFileAsData( + path: Path, + ): Data<Binary> { + val envelope = io.readEnvelopeFile(path, true) + val updatedMeta = envelope.meta.copy { + FILE_PATH_KEY put path.toString() + FILE_EXTENSION_KEY put path.extension + + val attributes = path.readAttributes<BasicFileAttributes>() + FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString() + FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString() + } + return StaticData( + typeOf<Binary>(), + envelope.data ?: Binary.EMPTY, + updatedMeta + ) + } + + private fun readFilesFromDirectory( + path: Path + ): Map<NameToken, FileDataTree> = path.listDirectoryEntries().filterNot { it.name.startsWith("@") }.associate { + NameToken.parse(it.nameWithoutExtension) to FileDataTree(io, it) + } + + override val data: Data<Binary>? + get() = when { + path.isRegularFile() -> { + //TODO process zip + readFileAsData(path) + } + + path.isDirectory() -> { + val dataBinary: Binary? = path.resolve(IOPlugin.DATA_FILE_NAME)?.asBinary() + val meta: Meta? = path.find { it.fileName.startsWith(IOPlugin.META_FILE_NAME) }?.let { + io.readMetaFileOrNull(it) + } + if (dataBinary != null || meta != null) { + StaticData( + typeOf<Binary>(), + dataBinary ?: Binary.EMPTY, + meta ?: Meta.EMPTY + ) + } else { + null + } + } + + else -> { + null + } + } + + + override val items: Map<NameToken, DataTree<Binary>> + get() = when { + path.isDirectory() -> readFilesFromDirectory(path) + path.isRegularFile() && path.extension == "zip" -> { + //Using an explicit Zip file system to avoid bizarre compatibility bugs + val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" } + ?: error("Zip file system provider not found") + val fs = fsProvider.newFileSystem(path, emptyMap<String, Any>()) + readFilesFromDirectory(fs.rootDirectories.single()) + } + + else -> emptyMap() + } + + + override val updates: Flow<Name> = if (monitor) { + callbackFlow<Name> { + val watchService: WatchService = path.fileSystem.newWatchService() + + fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) }) + + fun monitor(childPath: Path): Job { + val key: WatchKey = childPath.register( + watchService, arrayOf( + StandardWatchEventKinds.ENTRY_DELETE, + StandardWatchEventKinds.ENTRY_MODIFY, + StandardWatchEventKinds.ENTRY_CREATE, + ) + ) + + return launch { + while (isActive) { + for (event: WatchEvent<*> in key.pollEvents()) { + val eventPath = event.context() as Path + if (event.kind() === StandardWatchEventKinds.ENTRY_CREATE) { + monitor(eventPath) + } else { + send(eventPath.relativeTo(path).toName()) + } + } + key.reset() + } + } + } + + monitor(path) + + awaitClose { + watchService.close() + } + + }.flowOn(Dispatchers.IO).shareIn(io.context, SharingStarted.WhileSubscribed()) + } else { + emptyFlow() + } + + public companion object { + public val FILE_KEY: Name = "file".asName() + public val FILE_PATH_KEY: Name = FILE_KEY + "path" + public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension" + public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created" + public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated" + public const val DF_FILE_EXTENSION: String = "df" + public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION) + } +} + + +///** +// * @param resources The names of the resources to read. +// * @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader. +// */ +//@DFExperimental +//public fun DataSink<Binary>.resources( +// io: IOPlugin, +// resource: String, +// vararg otherResources: String, +// classLoader: ClassLoader = Thread.currentThread().contextClassLoader, +//) { +// //create a file system if necessary +// val uri = Thread.currentThread().contextClassLoader.getResource("common")!!.toURI() +// try { +// uri.toPath() +// } catch (e: FileSystemNotFoundException) { +// FileSystems.newFileSystem(uri, mapOf("create" to "true")) +// } +// +// listOf(resource, *otherResources).forEach { r -> +// val path = classLoader.getResource(r)?.toURI()?.toPath() ?: error( +// "Resource with name $r is not resolved" +// ) +// io.readAsDataTree(r.asName(), path) +// } +//} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/InMemoryWorkspaceCache.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/InMemoryWorkspaceCache.kt index 8ba39ec1..9e986ba9 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/InMemoryWorkspaceCache.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/InMemoryWorkspaceCache.kt @@ -28,7 +28,7 @@ public class InMemoryWorkspaceCache : WorkspaceCache { val cachedData = cache.getOrPut(TaskResultId(result.taskName, result.taskMeta)){ HashMap() }.getOrPut(data.name){ - data.data + data } cachedData.checkType<T>(result.dataType).named(data.name) } diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/readFileData.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/readFileData.kt deleted file mode 100644 index 37dafab9..00000000 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/readFileData.kt +++ /dev/null @@ -1,188 +0,0 @@ -package space.kscience.dataforge.workspace - -import kotlinx.coroutines.* -import space.kscience.dataforge.data.Data -import space.kscience.dataforge.data.DataSink -import space.kscience.dataforge.data.StaticData -import space.kscience.dataforge.io.* -import space.kscience.dataforge.meta.Meta -import space.kscience.dataforge.meta.copy -import space.kscience.dataforge.misc.DFExperimental -import space.kscience.dataforge.names.Name -import space.kscience.dataforge.names.NameToken -import space.kscience.dataforge.names.asName -import space.kscience.dataforge.names.plus -import java.nio.file.* -import java.nio.file.attribute.BasicFileAttributes -import java.nio.file.spi.FileSystemProvider -import kotlin.io.path.* -import kotlin.reflect.typeOf - - -public object FileData { - public val FILE_KEY: Name = "file".asName() - public val FILE_PATH_KEY: Name = FILE_KEY + "path" - public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension" - public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created" - public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated" - public const val DF_FILE_EXTENSION: String = "df" - public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION) - -} - - -/** - * Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file. - * The operation is blocking since it must read the meta header. The reading of envelope body is lazy - */ -public fun IOPlugin.readFileData( - path: Path, -): Data<Binary> { - val envelope = readEnvelopeFile(path, true) - val updatedMeta = envelope.meta.copy { - FileData.FILE_PATH_KEY put path.toString() - FileData.FILE_EXTENSION_KEY put path.extension - - val attributes = path.readAttributes<BasicFileAttributes>() - FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString() - FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString() - } - return StaticData( - typeOf<Binary>(), - envelope.data ?: Binary.EMPTY, - updatedMeta - ) -} - -public fun DataSink<Binary>.file(io: IOPlugin, name: Name, path: Path) { - if (!path.isRegularFile()) error("Only regular files could be handled by this function") - put(name, io.readFileData(path)) -} - -public fun DataSink<Binary>.directory( - io: IOPlugin, - name: Name, - path: Path, -) { - if (!path.isDirectory()) error("Only directories could be handled by this function") - //process root data - - var dataBinary: Binary? = null - var meta: Meta? = null - Files.list(path).forEach { childPath -> - val fileName = childPath.fileName.toString() - if (fileName == IOPlugin.DATA_FILE_NAME) { - dataBinary = childPath.asBinary() - } else if (fileName.startsWith(IOPlugin.META_FILE_NAME)) { - meta = io.readMetaFileOrNull(childPath) - } else if (!fileName.startsWith("@")) { - val token = if (childPath.isRegularFile() && childPath.extension in FileData.DEFAULT_IGNORE_EXTENSIONS) { - NameToken(childPath.nameWithoutExtension) - } else { - NameToken(childPath.name) - } - - files(io, name + token, childPath) - } - } - - //set data if it is relevant - if (dataBinary != null || meta != null) { - put( - name, - StaticData( - typeOf<Binary>(), - dataBinary ?: Binary.EMPTY, - meta ?: Meta.EMPTY - ) - ) - } -} - -public fun DataSink<Binary>.files( - io: IOPlugin, - name: Name, - path: Path, -) { - if (path.isRegularFile() && path.extension == "zip") { - //Using explicit Zip file system to avoid bizarre compatibility bugs - val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" } - ?: error("Zip file system provider not found") - val fs = fsProvider.newFileSystem(path, emptyMap<String, Any>()) - - files(io, name, fs.rootDirectories.first()) - } - if (path.isRegularFile()) { - file(io, name, path) - } else { - directory(io, name, path) - } -} - - -private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) }) - -public fun DataSink<Binary>.monitorFiles( - io: IOPlugin, - name: Name, - path: Path, - scope: CoroutineScope = io.context, -): Job { - files(io, name, path) - return scope.launch(Dispatchers.IO) { - val watchService = path.fileSystem.newWatchService() - - path.register( - watchService, - StandardWatchEventKinds.ENTRY_DELETE, - StandardWatchEventKinds.ENTRY_MODIFY, - StandardWatchEventKinds.ENTRY_CREATE - ) - - do { - val key = watchService.take() - if (key != null) { - for (event: WatchEvent<*> in key.pollEvents()) { - val eventPath = event.context() as Path - if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { - put(eventPath.toName(), null) - } else { - val fileName = eventPath.fileName.toString() - if (!fileName.startsWith("@")) { - files(io, name, eventPath) - } - } - } - key.reset() - } - } while (isActive && key != null) - } - -} - -/** - * @param resources The names of the resources to read. - * @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader. - */ -@DFExperimental -public fun DataSink<Binary>.resources( - io: IOPlugin, - resource: String, - vararg otherResources: String, - classLoader: ClassLoader = Thread.currentThread().contextClassLoader, -) { - //create a file system if necessary - val uri = Thread.currentThread().contextClassLoader.getResource("common")!!.toURI() - try { - uri.toPath() - } catch (e: FileSystemNotFoundException) { - FileSystems.newFileSystem(uri, mapOf("create" to "true")) - } - - listOf(resource,*otherResources).forEach { r -> - val path = classLoader.getResource(r)?.toURI()?.toPath() ?: error( - "Resource with name $r is not resolved" - ) - files(io, r.asName(), path) - } -} diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt index 61caf7e0..688b5699 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/workspaceJvm.kt @@ -15,14 +15,14 @@ import space.kscience.dataforge.names.matches * Select the whole data set from the workspace filtered by type. */ @OptIn(DFExperimental::class) -public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> = +public inline fun <reified T : Any> TaskResultScope<*>.dataByType(namePattern: Name? = null): DataSelector<T> = DataSelector<T> { workspace, _ -> workspace.data.filterByType { name, _, _ -> namePattern == null || name.matches(namePattern) } } -public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask( +public suspend inline fun <reified T : Any> TaskResultScope<*>.fromTask( task: Name, taskMeta: Meta = Meta.EMPTY, ): DataTree<T> = workspace.produce(task, taskMeta).filterByType() \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/writeFileData.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/writeFileData.kt index 379a79dd..c65570df 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/writeFileData.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/writeFileData.kt @@ -2,7 +2,9 @@ package space.kscience.dataforge.workspace import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext -import space.kscience.dataforge.data.* +import space.kscience.dataforge.data.DataTree +import space.kscience.dataforge.data.forEach +import space.kscience.dataforge.data.meta import space.kscience.dataforge.io.* import space.kscience.dataforge.misc.DFExperimental import space.kscience.dataforge.names.Name @@ -32,8 +34,8 @@ public suspend fun <T : Any> IOPlugin.writeDataDirectory( } else if (!Files.isDirectory(path)) { error("Can't write a node into file") } - dataSet.forEach { (name, data) -> - val childPath = path.resolve(name.tokens.joinToString("/") { token -> token.toStringUnescaped() }) + dataSet.forEach { data -> + val childPath = path.resolve(data.name.tokens.joinToString("/") { token -> token.toStringUnescaped() }) childPath.parent.createDirectories() val envelope = data.toEnvelope(format) if (envelopeFormat != null) { diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt index 7a6a8202..1c43fba0 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt @@ -29,7 +29,7 @@ internal class CachingWorkspaceTest { inMemoryCache() val doFirst by task<Any> { - transformEach(allData) { _, name, _ -> + transformEach(allData) { (name, _, _) -> firstCounter++ println("Done first on $name with flag=${taskMeta["flag"].boolean}") } @@ -39,7 +39,7 @@ internal class CachingWorkspaceTest { transformEach( doFirst, dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY - ) { _, name, _ -> + ) { (name, _, _) -> secondCounter++ println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}") } @@ -52,11 +52,11 @@ internal class CachingWorkspaceTest { val secondC = workspace.produce("doSecond") //use coroutineScope to wait for the result coroutineScope { - first.launch(this) - secondA.launch(this) - secondB.launch(this) + first.launchIn(this) + secondA.launchIn(this) + secondB.launchIn(this) //repeat to check caching - secondC.launch(this) + secondC.launchIn(this) } assertEquals(10, firstCounter) diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt index f526e194..d9fa9ae4 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileDataTest.kt @@ -52,7 +52,7 @@ class FileDataTest { io.writeDataDirectory(dir, dataNode, StringIOFormat) println(dir.toUri().toString()) val data = DataTree { - files(io, Name.EMPTY, dir) + io.readAsDataTree(Name.EMPTY, dir) } val reconstructed = data.map { (_, value) -> value.toByteArray().decodeToString() } assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) @@ -68,7 +68,7 @@ class FileDataTest { zip.deleteExisting() io.writeZip(zip, dataNode, StringIOFormat) println(zip.toUri().toString()) - val reconstructed = DataTree { files(io, Name.EMPTY, zip) } + val reconstructed = DataTree { io.readAsDataTree(Name.EMPTY, zip) } .map { (_, value) -> value.toByteArray().decodeToString() } assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content")) assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await()) diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt index 0cf4f401..7d07481c 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCacheTest.kt @@ -26,7 +26,7 @@ class FileWorkspaceCacheTest { } } - workspace.produce("echo").launch(this) + workspace.produce("echo").launchIn(this) } } \ No newline at end of file