Compare commits

..

No commits in common. "6524c6c69182b51eddaf7ae73e55b997e7de8b34" and "5e3de7073747d35e0f44e28602be30ba6cdb8c36" have entirely different histories.

55 changed files with 1751 additions and 1261 deletions

2
.gitignore vendored
View File

@ -5,7 +5,5 @@ out/
.gradle
build/
.kotlin
!gradle-wrapper.jar

View File

@ -12,7 +12,7 @@ kscience {
useCoroutines()
useSerialization()
commonMain {
api(projects.dataforgeMeta)
api(project(":dataforge-meta"))
api(spclibs.atomicfu)
}
jvmMain{

View File

@ -1,7 +1,5 @@
package space.kscience.dataforge.actions
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
@ -21,51 +19,46 @@ internal fun MutableMap<Name, *>.removeWhatStartsWith(name: Name) {
/**
* An action that caches results on-demand and recalculates them on source push
*/
public abstract class AbstractAction<T : Any, R : Any>(
public abstract class AbstractAction<in T : Any, R : Any>(
public val outputType: KType,
) : Action<T, R> {
/**
* Generate initial content of the output
*/
protected abstract fun DataSink<R>.generate(
data: DataTree<T>,
protected abstract fun DataSetBuilder<R>.generate(
data: DataSet<T>,
meta: Meta,
)
/**
* Update part of the data set using provided data
*
* @param source the source data tree in case we need several data items to update
* Update part of the data set when given [updateKey] is triggered by the source
*/
protected open fun DataSink<R>.update(
source: DataTree<T>,
protected open fun DataSourceBuilder<R>.update(
dataSet: DataSet<T>,
meta: Meta,
namedData: NamedData<T>,
){
//by default regenerate the whole data set
generate(source,meta)
updateKey: Name,
) {
// By default, recalculate the whole dataset
generate(dataSet, meta)
}
@OptIn(DFInternal::class)
override fun execute(
dataSet: DataTree<T>,
dataSet: DataSet<T>,
meta: Meta,
): DataTree<R> = if(dataSet.isObservable()) {
MutableDataTree<R>(outputType, dataSet.updatesScope).apply {
): DataSet<R> = if (dataSet is DataSource) {
DataSource(outputType, dataSet){
generate(dataSet, meta)
dataSet.updates().onEach {
update(dataSet, meta, it)
}.launchIn(updatesScope)
//close updates when the source is closed
updatesScope.launch {
dataSet.awaitClose()
close()
launch {
dataSet.updates.collect { name ->
update(dataSet, meta, name)
}
}
}
} else{
DataTree(outputType){
} else {
DataTree<R>(outputType) {
generate(dataSet, meta)
}
}

View File

@ -1,19 +1,19 @@
package space.kscience.dataforge.actions
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
/**
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute].
*/
public fun interface Action<T, R> {
public fun interface Action<in T : Any, out R : Any> {
/**
* Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
* so not actual computation is started at this moment.
*/
public fun execute(dataSet: DataTree<T>, meta: Meta): DataTree<R>
public fun execute(dataSet: DataSet<T>, meta: Meta): DataSet<R>
public companion object
}
@ -21,23 +21,18 @@ public fun interface Action<T, R> {
/**
* A convenience method to transform data using given [action]
*/
public fun <T, R> DataTree<T>.transform(
action: Action<T, R>,
meta: Meta = Meta.EMPTY,
): DataTree<R> = action.execute(this, meta)
public fun <T : Any, R : Any> DataSet<T>.transform(action: Action<T, R>, meta: Meta = Meta.EMPTY): DataSet<R> =
action.execute(this, meta)
/**
* Action composition. The result is terminal if one of its parts is terminal
*/
public infix fun <T, I, R> Action<T, I>.then(action: Action<I, R>): Action<T, R> = Action { dataSet, meta ->
action.execute(this@then.execute(dataSet, meta), meta)
}
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> =
Action<T, R> { dataSet, meta -> action.execute(this@then.execute(dataSet, meta), meta) }
@DFExperimental
public operator fun <T, R> Action<T, R>.invoke(
dataSet: DataTree<T>,
public operator fun <T : Any, R : Any> Action<T, R>.invoke(
dataSet: DataSet<T>,
meta: Meta = Meta.EMPTY,
): DataTree<R> = execute(dataSet, meta)
): DataSet<R> = execute(dataSet, meta)

View File

@ -29,7 +29,6 @@ public class MapActionBuilder<T, R>(
public var name: Name,
public var meta: MutableMeta,
public val actionMeta: Meta,
public val dataType: KType,
@PublishedApi internal var outputType: KType,
) {
@ -46,16 +45,19 @@ public class MapActionBuilder<T, R>(
/**
* Calculate the result of goal
*/
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(T) -> R1): Unit = result(typeOf<R1>(), f)
public inline fun <reified R1 : R> result(noinline f: suspend ActionEnv.(T) -> R1) {
outputType = typeOf<R1>()
result = f;
}
}
@PublishedApi
internal class MapAction<T : Any, R : Any>(
internal class MapAction<in T : Any, R : Any>(
outputType: KType,
private val block: MapActionBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) {
private fun DataSink<R>.mapOne(name: Name, data: Data<T>, meta: Meta) {
private fun DataSetBuilder<R>.mapOne(name: Name, data: Data<T>, meta: Meta) {
// Creating a new environment for action using **old** name, old meta and task meta
val env = ActionEnv(name, data.meta, meta)
@ -64,7 +66,6 @@ internal class MapAction<T : Any, R : Any>(
name,
data.meta.toMutableMeta(), // using data meta
meta,
data.type,
outputType
).apply(block)
@ -79,15 +80,16 @@ internal class MapAction<T : Any, R : Any>(
builder.result(env, data.await())
}
//setting the data node
put(newName, newData)
data(newName, newData)
}
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
data.forEach { mapOne(it.name, it.data, meta) }
}
override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, namedData: NamedData<T>) {
mapOne(namedData.name, namedData.data, namedData.meta)
override fun DataSourceBuilder<R>.update(dataSet: DataSet<T>, meta: Meta, updateKey: Name) {
remove(updateKey)
dataSet[updateKey]?.let { mapOne(updateKey, it, meta) }
}
}

View File

@ -14,7 +14,7 @@ import kotlin.reflect.typeOf
public class JoinGroup<T : Any, R : Any>(
public var name: String,
internal val set: DataTree<T>,
internal val set: DataSet<T>,
@PublishedApi internal var outputType: KType,
) {
@ -39,7 +39,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
public val actionMeta: Meta,
private val outputType: KType,
) {
private val groupRules: MutableList<(DataTree<T>) -> List<JoinGroup<T, R>>> = ArrayList();
private val groupRules: MutableList<(DataSet<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/**
* introduce grouping by meta value
@ -54,12 +54,12 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
public fun group(
groupName: String,
predicate: DataFilter,
predicate: (Name, Meta) -> Boolean,
action: JoinGroup<T, R>.() -> Unit,
) {
groupRules += { source ->
listOf(
JoinGroup<T, R>(groupName, source.filterData(predicate), outputType).apply(action)
JoinGroup<T, R>(groupName, source.filter(predicate), outputType).apply(action)
)
}
}
@ -73,7 +73,7 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
}
}
internal fun buildGroups(input: DataTree<T>): List<JoinGroup<T, R>> =
internal fun buildGroups(input: DataSet<T>): List<JoinGroup<T, R>> =
groupRules.flatMap { it.invoke(input) }
}
@ -85,7 +85,7 @@ internal class ReduceAction<T : Any, R : Any>(
) : AbstractAction<T, R>(outputType) {
//TODO optimize reduction. Currently, the whole action recalculates on push
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
ReduceGroupBuilder<T, R>(meta, outputType).apply(action).buildGroups(data).forEach { group ->
val dataFlow: Map<Name, Data<T>> = group.set.asSequence().fold(HashMap()) { acc, value ->
acc.apply {
@ -103,7 +103,7 @@ internal class ReduceAction<T : Any, R : Any>(
meta = groupMeta
) { group.result.invoke(env, it) }
put(env.name, res)
data(env.name, res)
}
}
}

View File

@ -49,7 +49,7 @@ internal class SplitAction<T : Any, R : Any>(
private val action: SplitBuilder<T, R>.() -> Unit,
) : AbstractAction<T, R>(outputType) {
private fun DataSink<R>.splitOne(name: Name, data: Data<T>, meta: Meta) {
private fun DataSetBuilder<R>.splitOne(name: Name, data: Data<T>, meta: Meta) {
val laminate = Laminate(data.meta, meta)
val split = SplitBuilder<T, R>(name, data.meta).apply(action)
@ -64,7 +64,7 @@ internal class SplitAction<T : Any, R : Any>(
).apply(rule)
//data.map<R>(outputType, meta = env.meta) { env.result(it) }.named(fragmentName)
put(
data(
fragmentName,
@Suppress("OPT_IN_USAGE") Data(outputType, meta = env.meta, dependencies = listOf(data)) {
env.result(data.await())
@ -73,12 +73,13 @@ internal class SplitAction<T : Any, R : Any>(
}
}
override fun DataSink<R>.generate(data: DataTree<T>, meta: Meta) {
override fun DataSetBuilder<R>.generate(data: DataSet<T>, meta: Meta) {
data.forEach { splitOne(it.name, it.data, meta) }
}
override fun DataSink<R>.update(source: DataTree<T>, meta: Meta, namedData: NamedData<T>) {
splitOne(namedData.name, namedData.data, namedData.meta)
override fun DataSourceBuilder<R>.update(dataSet: DataSet<T>, meta: Meta, updateKey: Name) {
remove(updateKey)
dataSet[updateKey]?.let { splitOne(updateKey, it, meta) }
}
}

View File

@ -41,7 +41,7 @@ public interface Data<out T> : Goal<T>, MetaRepr {
*/
internal val TYPE_OF_NOTHING: KType = typeOf<Unit>()
public inline fun <reified T> static(
public inline fun <reified T : Any> static(
value: T,
meta: Meta = Meta.EMPTY,
): Data<T> = StaticData(typeOf<T>(), value, meta)
@ -69,37 +69,37 @@ public interface Data<out T> : Goal<T>, MetaRepr {
* A lazily computed variant of [Data] based on [LazyGoal]
* One must ensure that proper [type] is used so this method should not be used
*/
private class LazyData<T>(
private class LazyData<T : Any>(
override val type: KType,
override val meta: Meta = Meta.EMPTY,
additionalContext: CoroutineContext = EmptyCoroutineContext,
dependencies: Iterable<Goal<*>> = emptyList(),
dependencies: Collection<Goal<*>> = emptyList(),
block: suspend () -> T,
) : Data<T>, LazyGoal<T>(additionalContext, dependencies, block)
public class StaticData<T>(
public class StaticData<T : Any>(
override val type: KType,
value: T,
override val meta: Meta = Meta.EMPTY,
) : Data<T>, StaticGoal<T>(value)
@Suppress("FunctionName")
public inline fun <reified T> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
public inline fun <reified T : Any> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
StaticData(typeOf<T>(), value, meta)
@DFInternal
public fun <T> Data(
public fun <T : Any> Data(
type: KType,
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Iterable<Goal<*>> = emptyList(),
dependencies: Collection<Goal<*>> = emptyList(),
block: suspend () -> T,
): Data<T> = LazyData(type, meta, context, dependencies, block)
@OptIn(DFInternal::class)
public inline fun <reified T> Data(
public inline fun <reified T : Any> Data(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,
dependencies: Iterable<Goal<*>> = emptyList(),
dependencies: Collection<Goal<*>> = emptyList(),
noinline block: suspend () -> T,
): Data<T> = Data(typeOf<T>(), meta, context, dependencies, block)

View File

@ -1,89 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
public fun interface DataFilter {
public fun accepts(name: Name, meta: Meta, type: KType): Boolean
public companion object {
public val EMPTY: DataFilter = DataFilter { _, _, _ -> true }
}
}
public fun DataFilter.accepts(data: NamedData<*>): Boolean = accepts(data.name, data.meta, data.type)
public fun <T> Sequence<NamedData<T>>.filterData(predicate: DataFilter): Sequence<NamedData<T>> = filter { data ->
predicate.accepts(data)
}
public fun <T> Flow<NamedData<T>>.filterData(predicate: DataFilter): Flow<NamedData<T>> = filter { data ->
predicate.accepts(data)
}
public fun <T> DataSource<T>.filterData(
predicate: DataFilter,
): DataSource<T> = object : DataSource<T> {
override val dataType: KType get() = this@filterData.dataType
override fun read(name: Name): Data<T>? =
this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) }
}
/**
* Stateless filtered [ObservableDataSource]
*/
public fun <T> ObservableDataSource<T>.filterData(
predicate: DataFilter,
): ObservableDataSource<T> = object : ObservableDataSource<T> {
override fun updates(): Flow<NamedData<T>> = this@filterData.updates().filter { predicate.accepts(it) }
override val dataType: KType get() = this@filterData.dataType
override fun read(name: Name): Data<T>? =
this@filterData.read(name)?.takeIf { predicate.accepts(name, it.meta, it.type) }
}
public fun <T> GenericDataTree<T, *>.filterData(
predicate: DataFilter,
): DataTree<T> = asSequence().filterData(predicate).toTree(dataType)
public fun <T> GenericObservableDataTree<T, *>.filterData(
scope: CoroutineScope,
predicate: DataFilter,
): ObservableDataTree<T> = asSequence().filterData(predicate).toObservableTree(dataType, scope, updates().filterData(predicate))
///**
// * Generate a wrapper data set with a given name prefix appended to all names
// */
//public fun <T : Any> DataTree<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) {
// this
//} else object : DataSource<T> {
//
// override val dataType: KType get() = this@withNamePrefix.dataType
//
// override val coroutineContext: CoroutineContext
// get() = (this@withNamePrefix as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
//
// override val meta: Meta get() = this@withNamePrefix.meta
//
//
// override fun iterator(): Iterator<NamedData<T>> = iterator {
// for (d in this@withNamePrefix) {
// yield(d.data.named(prefix + d.name))
// }
// }
//
// override fun get(name: Name): Data<T>? =
// name.removeFirstOrNull(name)?.let { this@withNamePrefix.get(it) }
//
// override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
//}
//

View File

@ -0,0 +1,124 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.mapNotNull
import space.kscience.dataforge.data.Data.Companion.TYPE_OF_NOTHING
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.endsWith
import space.kscience.dataforge.names.parseAsName
import kotlin.reflect.KType
public interface DataSet<out T : Any> {
/**
* The minimal common ancestor to all data in the node
*/
public val dataType: KType
/**
* Meta-data associated with this node. If no meta is provided, returns [Meta.EMPTY].
*/
public val meta: Meta
/**
* Traverse this [DataSet] returning named data instances. The order is not guaranteed.
*/
public operator fun iterator(): Iterator<NamedData<T>>
/**
* Get data with given name.
*/
public operator fun get(name: Name): Data<T>?
public companion object {
public val META_KEY: Name = "@meta".asName()
/**
* An empty [DataSet] that suits all types
*/
public val EMPTY: DataSet<Nothing> = object : DataSet<Nothing> {
override val dataType: KType = TYPE_OF_NOTHING
override val meta: Meta get() = Meta.EMPTY
override fun iterator(): Iterator<NamedData<Nothing>> = emptySequence<NamedData<Nothing>>().iterator()
override fun get(name: Name): Data<Nothing>? = null
}
}
}
public fun <T : Any> DataSet<T>.asSequence(): Sequence<NamedData<T>> = object : Sequence<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asSequence.iterator()
}
/**
* Return a single [Data] in this [DataSet]. Throw error if it is not single.
*/
public fun <T : Any> DataSet<T>.single(): NamedData<T> = asSequence().single()
public fun <T : Any> DataSet<T>.asIterable(): Iterable<NamedData<T>> = object : Iterable<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asIterable.iterator()
}
public operator fun <T : Any> DataSet<T>.get(name: String): Data<T>? = get(name.parseAsName())
/**
* A [DataSet] with propagated updates.
*/
public interface DataSource<out T : Any> : DataSet<T>, CoroutineScope {
/**
* A flow of updated item names. Updates are propagated in a form of [Flow] of names of updated nodes.
* Those can include new data items and replacement of existing ones. The replaced items could update existing data content
* and replace it completely, so they should be pulled again.
*
*/
public val updates: Flow<Name>
/**
* Stop generating updates from this [DataSource]
*/
public fun close() {
coroutineContext[Job]?.cancel()
}
}
public val <T : Any> DataSet<T>.updates: Flow<Name> get() = if (this is DataSource) updates else emptyFlow()
//
///**
// * Flow all data nodes with names starting with [branchName]
// */
//public fun <T : Any> DataSet<T>.children(branchName: Name): Sequence<NamedData<T>> =
// this@children.asSequence().filter {
// it.name.startsWith(branchName)
// }
/**
* Start computation for all goals in data node and return a job for the whole node
*/
public fun <T : Any> DataSet<T>.startAll(coroutineScope: CoroutineScope): Job = coroutineScope.launch {
asIterable().map {
it.launch(this@launch)
}.joinAll()
}
public suspend fun <T : Any> DataSet<T>.computeAndJoinAll(): Unit = coroutineScope { startAll(this).join() }
public fun DataSet<*>.toMeta(): Meta = Meta {
forEach {
if (it.name.endsWith(DataSet.META_KEY)) {
set(it.name, it.meta)
} else {
it.name put {
"type" put it.type.toString()
"meta" put it.meta
}
}
}
}
public val <T : Any> DataSet<T>.updatesWithData: Flow<NamedData<T>> get() = updates.mapNotNull { get(it)?.named(it) }

View File

@ -0,0 +1,165 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus
import kotlin.reflect.KType
public interface DataSetBuilder<in T : Any> {
public val dataType: KType
/**
* Remove all data items starting with [name]
*/
public fun remove(name: Name)
public fun data(name: Name, data: Data<T>?)
/**
* Set a current state of given [dataSet] into a branch [name]. Does not propagate updates
*/
public fun node(name: Name, dataSet: DataSet<T>) {
//remove previous items
if (name != Name.EMPTY) {
remove(name)
}
//Set new items
dataSet.forEach {
data(name + it.name, it.data)
}
}
/**
* Set meta for the given node
*/
public fun meta(name: Name, meta: Meta)
}
/**
* Define meta in this [DataSet]
*/
public fun <T : Any> DataSetBuilder<T>.meta(value: Meta): Unit = meta(Name.EMPTY, value)
/**
* Define meta in this [DataSet]
*/
public fun <T : Any> DataSetBuilder<T>.meta(mutableMeta: MutableMeta.() -> Unit): Unit = meta(Meta(mutableMeta))
@PublishedApi
internal class SubSetBuilder<in T : Any>(
private val parent: DataSetBuilder<T>,
private val branch: Name,
) : DataSetBuilder<T> {
override val dataType: KType get() = parent.dataType
override fun remove(name: Name) {
parent.remove(branch + name)
}
override fun data(name: Name, data: Data<T>?) {
parent.data(branch + name, data)
}
override fun node(name: Name, dataSet: DataSet<T>) {
parent.node(branch + name, dataSet)
}
override fun meta(name: Name, meta: Meta) {
parent.meta(branch + name, meta)
}
}
public inline fun <T : Any> DataSetBuilder<T>.node(
name: Name,
crossinline block: DataSetBuilder<T>.() -> Unit,
) {
if (name.isEmpty()) block() else SubSetBuilder(this, name).block()
}
public fun <T : Any> DataSetBuilder<T>.data(name: String, value: Data<T>) {
data(Name.parse(name), value)
}
public fun <T : Any> DataSetBuilder<T>.node(name: String, set: DataSet<T>) {
node(Name.parse(name), set)
}
public inline fun <T : Any> DataSetBuilder<T>.node(
name: String,
crossinline block: DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), block)
public fun <T : Any> DataSetBuilder<T>.set(value: NamedData<T>) {
data(value.name, value.data)
}
/**
* Produce lazy [Data] and emit it into the [DataSetBuilder]
*/
public inline fun <reified T : Any> DataSetBuilder<T>.produce(
name: String,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
data(name, data)
}
public inline fun <reified T : Any> DataSetBuilder<T>.produce(
name: Name,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
data(name, data)
}
/**
* Emit a static data with the fixed value
*/
public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta))
public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: Name,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = data(name, Data.static(data, meta))
public inline fun <reified T : Any> DataSetBuilder<T>.static(
name: String,
data: T,
mutableMeta: MutableMeta.() -> Unit,
): Unit = data(Name.parse(name), Data.static(data, Meta(mutableMeta)))
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public fun <T : Any> DataSetBuilder<T>.populateFrom(tree: DataSet<T>): Unit {
tree.forEach {
//TODO check if the place is occupied
data(it.name, it.data)
}
}
//public fun <T : Any> DataSetBuilder<T>.populateFrom(flow: Flow<NamedData<T>>) {
// flow.collect {
// data(it.name, it.data)
// }
//}
public fun <T : Any> DataSetBuilder<T>.populateFrom(sequence: Sequence<NamedData<T>>) {
sequence.forEach {
data(it.name, it.data)
}
}

View File

@ -1,330 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.*
import kotlin.contracts.contract
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
* A generic data provider
*/
public interface DataSource<out T> {
/**
* The minimal common ancestor to all data in the node
*/
public val dataType: KType
/**
* Get data with given name. Or null if it is not present
*/
public fun read(name: Name): Data<T>?
}
/**
* A data provider with possible dynamic updates
*/
public interface ObservableDataSource<out T> : DataSource<T> {
/**
* Flow updates made to the data
*/
public fun updates(): Flow<NamedData<T>>
}
/**
* A tree like structure for data holding
*/
public interface GenericDataTree<out T, out TR : GenericDataTree<T, TR>> : DataSource<T> {
public val self: TR
public val data: Data<T>?
public val items: Map<NameToken, TR>
override fun read(name: Name): Data<T>? = when (name.length) {
0 -> data
else -> items[name.first()]?.read(name.cutFirst())
}
public companion object {
private object EmptyDataTree : GenericDataTree<Nothing, EmptyDataTree> {
override val self: EmptyDataTree get() = this
override val data: Data<Nothing>? = null
override val items: Map<NameToken, EmptyDataTree> = emptyMap()
override val dataType: KType = typeOf<Unit>()
override fun read(name: Name): Data<Nothing>? = null
}
public val EMPTY: GenericDataTree<Nothing, *> = EmptyDataTree
}
}
public typealias DataTree<T> = GenericDataTree<T, GenericDataTree<T, *>>
/**
* Return a single data in this tree. Throw error if it is not single.
*/
public fun <T> DataTree<T>.single(): NamedData<T> = asSequence().single()
/**
* An alias for easier access to tree values
*/
public operator fun <T> DataTree<T>.get(name: Name): Data<T>? = read(name)
public operator fun <T> DataTree<T>.get(name: String): Data<T>? = read(name.parseAsName())
/**
* Return a sequence of all data items in this tree.
* This method does not take updates into account.
*/
public fun <T> DataTree<T>.asSequence(
namePrefix: Name = Name.EMPTY,
): Sequence<NamedData<T>> = sequence {
data?.let { yield(it.named(namePrefix)) }
items.forEach { (token, tree) ->
yieldAll(tree.asSequence(namePrefix + token))
}
}
public val DataTree<*>.meta: Meta? get() = data?.meta
/**
* Provide subtree if it exists
*/
public tailrec fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: Name): TR? =
when (name.length) {
0 -> self
1 -> items[name.first()]
else -> items[name.first()]?.branch(name.cutFirst())
}
public fun <T, TR : GenericDataTree<T, TR>> GenericDataTree<T, TR>.branch(name: String): TR? =
branch(name.parseAsName())
public fun GenericDataTree<*, *>.isEmpty(): Boolean = data == null && items.isEmpty()
@PublishedApi
internal class FlatDataTree<T>(
override val dataType: KType,
private val dataSet: Map<Name, Data<T>>,
private val prefix: Name,
) : GenericDataTree<T, FlatDataTree<T>> {
override val self: FlatDataTree<T> get() = this
override val data: Data<T>? get() = dataSet[prefix]
override val items: Map<NameToken, FlatDataTree<T>>
get() = dataSet.keys
.filter { it.startsWith(prefix) && it.length > prefix.length }
.map { it.tokens[prefix.length] }
.associateWith { FlatDataTree(dataType, dataSet, prefix + it) }
override fun read(name: Name): Data<T>? = dataSet[prefix + name]
}
/**
* Represent this flat data map as a [DataTree] without copying it
*/
public inline fun <reified T> Map<Name, Data<T>>.asTree(): DataTree<T> = FlatDataTree(typeOf<T>(), this, Name.EMPTY)
internal fun <T> Sequence<NamedData<T>>.toTree(type: KType): DataTree<T> =
FlatDataTree(type, associate { it.name to it.data }, Name.EMPTY)
/**
* Collect a sequence of [NamedData] to a [DataTree]
*/
public inline fun <reified T> Sequence<NamedData<T>>.toTree(): DataTree<T> =
FlatDataTree(typeOf<T>(), associate { it.name to it.data }, Name.EMPTY)
public interface GenericObservableDataTree<out T, out TR : GenericObservableDataTree<T, TR>> :
GenericDataTree<T, TR>, ObservableDataSource<T>, AutoCloseable {
/**
* A scope that is used to propagate updates. When this scope is closed, no new updates could arrive.
*/
public val updatesScope: CoroutineScope
/**
* Close this data tree updates channel
*/
override fun close() {
updatesScope.cancel()
}
}
public typealias ObservableDataTree<T> = GenericObservableDataTree<T, GenericObservableDataTree<T, *>>
/**
* Check if the [DataTree] is observable
*/
public fun <T> DataTree<T>.isObservable(): Boolean {
contract {
returns(true) implies (this@isObservable is GenericObservableDataTree<T, *>)
}
return this is GenericObservableDataTree<T, *>
}
/**
* Wait for this data tree to stop spawning updates (updatesScope is closed).
* If this [DataTree] is not observable, return immediately.
*/
public suspend fun <T> DataTree<T>.awaitClose() {
if (isObservable()) {
updatesScope.coroutineContext[Job]?.join()
}
}
public fun <T> DataTree<T>.updates(): Flow<NamedData<T>> =
if (this is GenericObservableDataTree<T, *>) updates() else emptyFlow()
public fun interface DataSink<in T> {
public fun put(name: Name, data: Data<T>?)
}
@DFInternal
public class DataTreeBuilder<T>(private val type: KType) : DataSink<T> {
private val map = HashMap<Name, Data<T>>()
override fun put(name: Name, data: Data<T>?) {
if (data == null) {
map.remove(name)
} else {
map[name] = data
}
}
public fun build(): DataTree<T> = FlatDataTree(type, map, Name.EMPTY)
}
@DFInternal
public inline fun <T> DataTree(
dataType: KType,
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(dataType).apply(generator).build()
/**
* Create and a data tree.
*/
@OptIn(DFInternal::class)
public inline fun <reified T> DataTree(
generator: DataSink<T>.() -> Unit,
): DataTree<T> = DataTreeBuilder<T>(typeOf<T>()).apply(generator).build()
/**
* A mutable version of [GenericDataTree]
*/
public interface MutableDataTree<T> : GenericObservableDataTree<T, MutableDataTree<T>>, DataSink<T> {
override var data: Data<T>?
override val items: Map<NameToken, MutableDataTree<T>>
public fun getOrCreateItem(token: NameToken): MutableDataTree<T>
public operator fun set(token: NameToken, data: Data<T>?)
override fun put(name: Name, data: Data<T>?): Unit = set(name, data)
}
public tailrec operator fun <T> MutableDataTree<T>.set(name: Name, data: Data<T>?): Unit {
when (name.length) {
0 -> this.data = data
1 -> set(name.first(), data)
else -> getOrCreateItem(name.first())[name.cutFirst()] = data
}
}
private class MutableDataTreeImpl<T>(
override val dataType: KType,
override val updatesScope: CoroutineScope,
) : MutableDataTree<T> {
private val updates = MutableSharedFlow<NamedData<T>>()
private val children = HashMap<NameToken, MutableDataTree<T>>()
override var data: Data<T>? = null
set(value) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
field = value
if (value != null) {
updatesScope.launch {
updates.emit(value.named(Name.EMPTY))
}
}
}
override val items: Map<NameToken, MutableDataTree<T>> get() = children
override fun getOrCreateItem(token: NameToken): MutableDataTree<T> = children.getOrPut(token){
MutableDataTreeImpl(dataType, updatesScope)
}
override val self: MutableDataTree<T> get() = this
override fun set(token: NameToken, data: Data<T>?) {
if (!updatesScope.isActive) error("Can't send updates to closed MutableDataTree")
val subTree = getOrCreateItem(token)
subTree.updates().onEach {
updates.emit(it.named(token + it.name))
}.launchIn(updatesScope)
subTree.data = data
}
override fun updates(): Flow<NamedData<T>> = updates
}
/**
* Create a new [MutableDataTree]
*
* @param parentScope a [CoroutineScope] to control data propagation. By default uses [GlobalScope]
*/
@OptIn(DelicateCoroutinesApi::class)
public fun <T> MutableDataTree(
type: KType,
parentScope: CoroutineScope = GlobalScope,
): MutableDataTree<T> = MutableDataTreeImpl<T>(
type,
CoroutineScope(parentScope.coroutineContext + Job(parentScope.coroutineContext[Job]))
)
/**
* Create and initialize a observable mutable data tree.
*/
@OptIn(DelicateCoroutinesApi::class)
public inline fun <reified T> MutableDataTree(
parentScope: CoroutineScope = GlobalScope,
generator: MutableDataTree<T>.() -> Unit = {},
): MutableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply { generator() }
//@DFInternal
//public fun <T> ObservableDataTree(
// type: KType,
// scope: CoroutineScope,
// generator: suspend MutableDataTree<T>.() -> Unit = {},
//): ObservableDataTree<T> = MutableDataTree<T>(type, scope.coroutineContext).apply(generator)
public inline fun <reified T> ObservableDataTree(
parentScope: CoroutineScope,
generator: MutableDataTree<T>.() -> Unit = {},
): ObservableDataTree<T> = MutableDataTree<T>(typeOf<T>(), parentScope).apply(generator)
/**
* Collect a [Sequence] into an observable tree with additional [updates]
*/
public fun <T> Sequence<NamedData<T>>.toObservableTree(
dataType: KType,
parentScope: CoroutineScope,
updates: Flow<NamedData<T>>,
): ObservableDataTree<T> = MutableDataTree<T>(dataType, parentScope).apply {
this.putAll(this@toObservableTree)
updates.onEach {
put(it.name, it.data)
}.launchIn(updatesScope)
}

View File

@ -0,0 +1,119 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.names.*
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public sealed class DataTreeItem<out T : Any> {
public abstract val meta: Meta
public class Node<out T : Any>(public val tree: DataTree<T>) : DataTreeItem<T>() {
override val meta: Meta get() = tree.meta
}
public class Leaf<out T : Any>(public val data: Data<T>) : DataTreeItem<T>() {
override val meta: Meta get() = data.meta
}
}
public val <T : Any> DataTreeItem<T>.type: KType
get() = when (this) {
is DataTreeItem.Node -> tree.dataType
is DataTreeItem.Leaf -> data.type
}
/**
* A tree-like [DataSet] grouped into the node. All data inside the node must inherit its type
*/
@DfType(DataTree.TYPE)
public interface DataTree<out T : Any> : DataSet<T> {
/**
* Top-level children items of this [DataTree]
*/
public val items: Map<NameToken, DataTreeItem<T>>
override val meta: Meta get() = items[META_ITEM_NAME_TOKEN]?.meta ?: Meta.EMPTY
override fun iterator(): Iterator<NamedData<T>> = iterator {
items.forEach { (token, childItem: DataTreeItem<T>) ->
if (!token.body.startsWith("@")) {
when (childItem) {
is DataTreeItem.Leaf -> yield(childItem.data.named(token.asName()))
is DataTreeItem.Node -> yieldAll(childItem.tree.asSequence().map { it.named(token + it.name) })
}
}
}
}
override fun get(name: Name): Data<T>? = when (name.length) {
0 -> null
1 -> items[name.firstOrNull()!!].data
else -> items[name.firstOrNull()!!].tree?.get(name.cutFirst())
}
public companion object {
public const val TYPE: String = "dataTree"
/**
* A name token used to designate tree node meta
*/
public val META_ITEM_NAME_TOKEN: NameToken = NameToken("@meta")
@DFInternal
public fun <T : Any> emptyWithType(type: KType, meta: Meta = Meta.EMPTY): DataTree<T> = object : DataTree<T> {
override val items: Map<NameToken, DataTreeItem<T>> get() = emptyMap()
override val dataType: KType get() = type
override val meta: Meta get() = meta
}
@OptIn(DFInternal::class)
public inline fun <reified T : Any> empty(meta: Meta = Meta.EMPTY): DataTree<T> =
emptyWithType<T>(typeOf<T>(), meta)
}
}
public fun <T : Any> DataTree<T>.listChildren(prefix: Name): List<Name> =
getItem(prefix).tree?.items?.keys?.map { prefix + it } ?: emptyList()
/**
* Get a [DataTreeItem] with given [name] or null if the item does not exist
*/
public tailrec fun <T : Any> DataTree<T>.getItem(name: Name): DataTreeItem<T>? = when (name.length) {
0 -> DataTreeItem.Node(this)
1 -> items[name.firstOrNull()]
else -> items[name.firstOrNull()!!].tree?.getItem(name.cutFirst())
}
public val <T : Any> DataTreeItem<T>?.tree: DataTree<T>? get() = (this as? DataTreeItem.Node<T>)?.tree
public val <T : Any> DataTreeItem<T>?.data: Data<T>? get() = (this as? DataTreeItem.Leaf<T>)?.data
/**
* A [Sequence] of all children including nodes
*/
public fun <T : Any> DataTree<T>.traverseItems(): Sequence<Pair<Name, DataTreeItem<T>>> = sequence {
items.forEach { (head, item) ->
yield(head.asName() to item)
if (item is DataTreeItem.Node) {
val subSequence = item.tree.traverseItems()
.map { (name, data) -> (head.asName() + name) to data }
yieldAll(subSequence)
}
}
}
/**
* Get a branch of this [DataTree] with a given [branchName].
* The difference from similar method for [DataSet] is that internal logic is more simple and the return value is a [DataTree]
*/
@OptIn(DFInternal::class)
public fun <T : Any> DataTree<T>.branch(branchName: Name): DataTree<T> =
getItem(branchName)?.tree ?: DataTree.emptyWithType(dataType)
public fun <T : Any> DataTree<T>.branch(branchName: String): DataTree<T> = branch(branchName.parseAsName())

View File

@ -0,0 +1,121 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface DataSourceBuilder<T : Any> : DataSetBuilder<T>, DataSource<T> {
override val updates: MutableSharedFlow<Name>
}
/**
* A mutable [DataTree] that propagates updates
*/
public class DataTreeBuilder<T : Any> internal constructor(
override val dataType: KType,
coroutineContext: CoroutineContext,
) : DataTree<T>, DataSourceBuilder<T> {
override val coroutineContext: CoroutineContext =
coroutineContext + Job(coroutineContext[Job]) + GoalExecutionRestriction()
private val treeItems = HashMap<NameToken, DataTreeItem<T>>()
override val items: Map<NameToken, DataTreeItem<T>>
get() = treeItems.filter { !it.key.body.startsWith("@") }
override val updates: MutableSharedFlow<Name> = MutableSharedFlow<Name>()
@ThreadSafe
private fun remove(token: NameToken) {
if (treeItems.remove(token) != null) {
launch {
updates.emit(token.asName())
}
}
}
override fun remove(name: Name) {
if (name.isEmpty()) error("Can't remove the root node")
(getItem(name.cutLast()).tree as? DataTreeBuilder)?.remove(name.lastOrNull()!!)
}
@ThreadSafe
private fun set(token: NameToken, data: Data<T>) {
treeItems[token] = DataTreeItem.Leaf(data)
}
@ThreadSafe
private fun set(token: NameToken, node: DataTree<T>) {
treeItems[token] = DataTreeItem.Node(node)
}
private fun getOrCreateNode(token: NameToken): DataTreeBuilder<T> =
(treeItems[token] as? DataTreeItem.Node<T>)?.tree as? DataTreeBuilder<T>
?: DataTreeBuilder<T>(dataType, coroutineContext).also { set(token, it) }
private fun getOrCreateNode(name: Name): DataTreeBuilder<T> = when (name.length) {
0 -> this
1 -> getOrCreateNode(name.firstOrNull()!!)
else -> getOrCreateNode(name.firstOrNull()!!).getOrCreateNode(name.cutFirst())
}
override fun data(name: Name, data: Data<T>?) {
if (data == null) {
remove(name)
} else {
when (name.length) {
0 -> error("Can't add data with empty name")
1 -> set(name.firstOrNull()!!, data)
2 -> getOrCreateNode(name.cutLast()).set(name.lastOrNull()!!, data)
}
}
launch {
updates.emit(name)
}
}
override fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if (item is DataTreeItem.Leaf) error("TODO: Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
/**
* Create a dynamic [DataSource]. Initial data is placed synchronously.
*/
@DFInternal
@Suppress("FunctionName")
public fun <T : Any> DataSource(
type: KType,
parent: CoroutineScope,
block: DataSourceBuilder<T>.() -> Unit = {},
): DataTreeBuilder<T> = DataTreeBuilder<T>(type, parent.coroutineContext).apply(block)
@Suppress("OPT_IN_USAGE", "FunctionName")
public inline fun <reified T : Any> DataSource(
parent: CoroutineScope,
crossinline block: DataSourceBuilder<T>.() -> Unit = {},
): DataTreeBuilder<T> = DataSource(typeOf<T>(), parent) { block() }
public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: Name,
parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(name, DataSource(parent, block))
public inline fun <reified T : Any> DataSourceBuilder<T>.emit(
name: String,
parent: CoroutineScope,
noinline block: DataSourceBuilder<T>.() -> Unit,
): Unit = node(Name.parse(name), DataSource(parent, block))

View File

@ -9,7 +9,7 @@ import kotlin.coroutines.EmptyCoroutineContext
* Lazy computation result with its dependencies to allowing to stat computing dependencies ahead of time
*/
public interface Goal<out T> {
public val dependencies: Iterable<Goal<*>>
public val dependencies: Collection<Goal<*>>
/**
* Returns current running coroutine if the goal is started. Null if the computation is not started.
@ -54,7 +54,7 @@ public open class StaticGoal<T>(public val value: T) : Goal<T> {
*/
public open class LazyGoal<T>(
private val coroutineContext: CoroutineContext = EmptyCoroutineContext,
override val dependencies: Iterable<Goal<*>> = emptyList(),
override val dependencies: Collection<Goal<*>> = emptyList(),
public val block: suspend () -> T,
) : Goal<T> {
@ -82,8 +82,8 @@ public open class LazyGoal<T>(
}
log?.emit { "Starting dependencies computation for ${this@LazyGoal}" }
val startedDependencies = dependencies.map { goal ->
goal.async(coroutineScope)
val startedDependencies = this.dependencies.map { goal ->
goal.run { async(coroutineScope) }
}
return deferred ?: coroutineScope.async(
coroutineContext

View File

@ -15,12 +15,13 @@
*/
package space.kscience.dataforge.data
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
public interface GroupRule {
public fun <T : Any> gather(set: DataTree<T>): Map<String, DataTree<T>>
public fun <T : Any> gather(set: DataSet<T>): Map<String, DataSet<T>>
public companion object {
/**
@ -38,17 +39,39 @@ public interface GroupRule {
): GroupRule = object : GroupRule {
override fun <T : Any> gather(
set: DataTree<T>,
): Map<String, DataTree<T>> {
val map = HashMap<String, DataTreeBuilder<T>>()
set: DataSet<T>,
): Map<String, DataSet<T>> {
val map = HashMap<String, DataSet<T>>()
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
map.getOrPut(tagValue) { DataTreeBuilder(set.dataType) }.put(data.name, data.data)
if (set is DataSource) {
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { DataTreeBuilder(set.dataType, set.coroutineContext) } as DataTreeBuilder<T>)
.data(data.name, data.data)
set.launch {
set.updates.collect { name ->
val dataUpdate = set[name]
val updateTagValue = dataUpdate?.meta?.get(key)?.string ?: defaultTagValue
map.getOrPut(updateTagValue) {
DataSource(set.dataType, this) {
data(name, dataUpdate)
}
}
}
}
}
} else {
set.forEach { data ->
val tagValue: String = data.meta[key]?.string ?: defaultTagValue
(map.getOrPut(tagValue) { StaticDataTree(set.dataType) } as StaticDataTree<T>)
.data(data.name, data.data)
}
}
return map.mapValues { it.value.build() }
return map
}
}
}

View File

@ -4,7 +4,7 @@ import space.kscience.dataforge.meta.isEmpty
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name
public interface NamedData<out T> : Named, Data<T> {
public interface NamedData<out T : Any> : Named, Data<T> {
override val name: Name
public val data: Data<T>
}
@ -12,7 +12,7 @@ public interface NamedData<out T> : Named, Data<T> {
public operator fun NamedData<*>.component1(): Name = name
public operator fun <T: Any> NamedData<T>.component2(): Data<T> = data
private class NamedDataImpl<T>(
private class NamedDataImpl<out T : Any>(
override val name: Name,
override val data: Data<T>,
) : Data<T> by data, NamedData<T> {
@ -28,7 +28,7 @@ private class NamedDataImpl<T>(
}
}
public fun <T> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
public fun <T : Any> Data<T>.named(name: Name): NamedData<T> = if (this is NamedData) {
NamedDataImpl(name, this.data)
} else {
NamedDataImpl(name, this)

View File

@ -0,0 +1,80 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@PublishedApi
internal class StaticDataTree<T : Any>(
override val dataType: KType,
) : DataSetBuilder<T>, DataTree<T> {
private val _items: MutableMap<NameToken, DataTreeItem<T>> = HashMap()
override val items: Map<NameToken, DataTreeItem<T>>
get() = _items.filter { !it.key.body.startsWith("@") }
override fun remove(name: Name) {
when (name.length) {
0 -> error("Can't remove root tree node")
1 -> _items.remove(name.firstOrNull()!!)
else -> (_items[name.firstOrNull()!!].tree as? StaticDataTree<T>)?.remove(name.cutFirst())
}
}
private fun getOrCreateNode(name: Name): StaticDataTree<T> = when (name.length) {
0 -> this
1 -> {
val itemName = name.firstOrNull()!!
(_items[itemName].tree as? StaticDataTree<T>) ?: StaticDataTree<T>(dataType).also {
_items[itemName] = DataTreeItem.Node(it)
}
}
else -> getOrCreateNode(name.cutLast()).getOrCreateNode(name.lastOrNull()!!.asName())
}
private fun set(name: Name, item: DataTreeItem<T>?) {
if (name.isEmpty()) error("Can't set top level tree node")
if (item == null) {
remove(name)
} else {
getOrCreateNode(name.cutLast())._items[name.lastOrNull()!!] = item
}
}
override fun data(name: Name, data: Data<T>?) {
set(name, data?.let { DataTreeItem.Leaf(it) })
}
override fun node(name: Name, dataSet: DataSet<T>) {
if (dataSet is StaticDataTree) {
set(name, DataTreeItem.Node(dataSet))
} else {
dataSet.forEach {
data(name + it.name, it.data)
}
}
}
override fun meta(name: Name, meta: Meta) {
val item = getItem(name)
if (item is DataTreeItem.Leaf) TODO("Can't change meta of existing leaf item.")
data(name + DataTree.META_ITEM_NAME_TOKEN, Data.empty(meta))
}
}
public inline fun <T : Any> DataTree(
dataType: KType,
block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = StaticDataTree<T>(dataType).apply { block() }
public inline fun <reified T : Any> DataTree(
noinline block: DataSetBuilder<T>.() -> Unit,
): DataTree<T> = DataTree(typeOf<T>(), block)
@OptIn(DFExperimental::class)
public fun <T : Any> DataSet<T>.seal(): DataTree<T> = DataTree(dataType) {
populateFrom(this@seal)
}

View File

@ -1,132 +0,0 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.isEmpty
import space.kscience.dataforge.names.plus
public fun <T> DataSink<T>.put(value: NamedData<T>) {
put(value.name, value.data)
}
public fun <T> DataSink<T>.branch(dataTree: DataTree<T>) {
putAll(dataTree.asSequence())
}
public inline fun <T> DataSink<T>.branch(
prefix: Name,
block: DataSink<T>.() -> Unit,
) {
if (prefix.isEmpty()) {
apply(block)
} else {
val proxyDataSink = DataSink { nameWithoutPrefix, data ->
this.put(prefix + nameWithoutPrefix, data)
}
proxyDataSink.apply(block)
}
}
public inline fun <T> DataSink<T>.branch(
prefix: String,
block: DataSink<T>.() -> Unit,
): Unit = branch(prefix.asName(), block)
public fun <T> DataSink<T>.put(name: String, value: Data<T>) {
put(Name.parse(name), value)
}
public fun <T> DataSink<T>.branch(name: Name, set: DataTree<T>) {
branch(name) { putAll(set.asSequence()) }
}
public fun <T> DataSink<T>.branch(name: String, set: DataTree<T>) {
branch(Name.parse(name)) { putAll(set.asSequence()) }
}
/**
* Produce lazy [Data] and emit it into the [MutableDataTree]
*/
public inline fun <reified T> DataSink<T>.put(
name: String,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
put(name, data)
}
public inline fun <reified T> DataSink<T>.put(
name: Name,
meta: Meta = Meta.EMPTY,
noinline producer: suspend () -> T,
) {
val data = Data(meta, block = producer)
put(name, data)
}
/**
* Emit static data with the fixed value
*/
public inline fun <reified T> DataSink<T>.wrap(
name: String,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.wrap(
name: Name,
data: T,
meta: Meta = Meta.EMPTY,
): Unit = put(name, Data.static(data, meta))
public inline fun <reified T> DataSink<T>.wrap(
name: String,
data: T,
mutableMeta: MutableMeta.() -> Unit,
): Unit = put(Name.parse(name), Data.static(data, Meta(mutableMeta)))
public fun <T> DataSink<T>.putAll(sequence: Sequence<NamedData<T>>) {
sequence.forEach {
put(it.name, it.data)
}
}
public fun <T> DataSink<T>.putAll(tree: DataTree<T>) {
this.putAll(tree.asSequence())
}
/**
* Update data with given node data and meta with node meta.
*/
@DFExperimental
public fun <T> MutableDataTree<T>.putAll(source: DataTree<T>) {
source.forEach {
put(it.name, it.data)
}
}
/**
* Copy given data set and mirror its changes to this [DataSink] in [this@setAndObserve]. Returns an update [Job]
*/
public fun <T : Any> DataSink<T>.watchBranch(
name: Name,
dataSet: ObservableDataTree<T>,
): Job {
branch(name, dataSet)
return dataSet.updates().onEach {
put(name + it.name, it.data)
}.launchIn(dataSet.updatesScope)
}

View File

@ -0,0 +1,105 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
/**
* A stateless filtered [DataSet]
*/
public fun <T : Any> DataSet<T>.filter(
predicate: (Name, Meta) -> Boolean,
): DataSource<T> = object : DataSource<T> {
override val dataType: KType get() = this@filter.dataType
override val coroutineContext: CoroutineContext
get() = (this@filter as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filter.meta
override fun iterator(): Iterator<NamedData<T>> = iterator {
for (d in this@filter) {
if (predicate(d.name, d.meta)) {
yield(d)
}
}
}
override fun get(name: Name): Data<T>? = this@filter.get(name)?.takeIf {
predicate(name, it.meta)
}
override val updates: Flow<Name> = this@filter.updates.filter flowFilter@{ name ->
val theData = this@filter[name] ?: return@flowFilter false
predicate(name, theData.meta)
}
}
/**
* Generate a wrapper data set with a given name prefix appended to all names
*/
public fun <T : Any> DataSet<T>.withNamePrefix(prefix: Name): DataSet<T> = if (prefix.isEmpty()) {
this
} else object : DataSource<T> {
override val dataType: KType get() = this@withNamePrefix.dataType
override val coroutineContext: CoroutineContext
get() = (this@withNamePrefix as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@withNamePrefix.meta
override fun iterator(): Iterator<NamedData<T>> = iterator {
for (d in this@withNamePrefix) {
yield(d.data.named(prefix + d.name))
}
}
override fun get(name: Name): Data<T>? =
name.removeFirstOrNull(name)?.let { this@withNamePrefix.get(it) }
override val updates: Flow<Name> get() = this@withNamePrefix.updates.map { prefix + it }
}
/**
* Get a subset of data starting with a given [branchName]
*/
public fun <T : Any> DataSet<T>.branch(branchName: Name): DataSet<T> = if (branchName.isEmpty()) {
this
} else object : DataSource<T> {
override val dataType: KType get() = this@branch.dataType
override val coroutineContext: CoroutineContext
get() = (this@branch as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@branch.meta
override fun iterator(): Iterator<NamedData<T>> = iterator {
for (d in this@branch) {
d.name.removeFirstOrNull(branchName)?.let { name ->
yield(d.data.named(name))
}
}
}
override fun get(name: Name): Data<T>? = this@branch.get(branchName + name)
override val updates: Flow<Name> get() = this@branch.updates.mapNotNull { it.removeFirstOrNull(branchName) }
}
public fun <T : Any> DataSet<T>.branch(branchName: String): DataSet<T> = this@branch.branch(branchName.parseAsName())
@DFExperimental
public suspend fun <T : Any> DataSet<T>.rootData(): Data<T>? = get(Name.EMPTY)

View File

@ -9,14 +9,14 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public data class ValueWithMeta<T>(val value: T, val meta: Meta)
public data class ValueWithMeta<T>(val meta: Meta, val value: T)
public suspend fun <T> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(await(), meta)
public suspend fun <T : Any> Data<T>.awaitWithMeta(): ValueWithMeta<T> = ValueWithMeta(meta, await())
public data class NamedValueWithMeta<T>(val name: Name, val value: T, val meta: Meta)
public data class NamedValueWithMeta<T>(val name: Name, val meta: Meta, val value: T)
public suspend fun <T> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
NamedValueWithMeta(name, await(), meta)
public suspend fun <T : Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
NamedValueWithMeta(name, meta, await())
/**
@ -25,7 +25,7 @@ public suspend fun <T> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T> =
* @param meta for the resulting data. By default equals input data.
* @param block the transformation itself
*/
public inline fun <T, reified R> Data<T>.transform(
public inline fun <T : Any, reified R : Any> Data<T>.map(
meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (T) -> R,
@ -36,7 +36,7 @@ public inline fun <T, reified R> Data<T>.transform(
/**
* Combine this data with the other data using [block]. See [Data::map] for other details
*/
public inline fun <T1, T2, reified R> Data<T1>.combine(
public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
other: Data<T2>,
meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -58,7 +58,20 @@ internal fun Iterable<Data<*>>.joinMeta(): Meta = Meta {
}
}
/**
* Lazily reduce a collection of [Data] to a single data.
*/
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (List<ValueWithMeta<T>>) -> R,
): Data<R> = Data(
meta,
coroutineContext,
this
) {
block(map { it.awaitWithMeta() })
}
@PublishedApi
internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
@ -69,7 +82,7 @@ internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
}
@DFInternal
public fun <K, T, R> Map<K, Data<T>>.reduceToData(
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -89,7 +102,7 @@ public fun <K, T, R> Map<K, Data<T>>.reduceToData(
* @param T type of the input goal
* @param R type of the result goal
*/
public inline fun <K, T, reified R> Map<K, Data<T>>.reduceToData(
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (Map<K, ValueWithMeta<T>>) -> R,
@ -104,7 +117,7 @@ public inline fun <K, T, reified R> Map<K, Data<T>>.reduceToData(
//Iterable operations
@DFInternal
public inline fun <T, R> Iterable<Data<T>>.reduceToData(
public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -119,7 +132,7 @@ public inline fun <T, R> Iterable<Data<T>>.reduceToData(
}
@OptIn(DFInternal::class)
public inline fun <T, reified R> Iterable<Data<T>>.reduceToData(
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
@ -127,7 +140,7 @@ public inline fun <T, reified R> Iterable<Data<T>>.reduceToData(
transformation(it)
}
public inline fun <T, reified R> Iterable<Data<T>>.foldToData(
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -142,7 +155,7 @@ public inline fun <T, reified R> Iterable<Data<T>>.foldToData(
* Transform an [Iterable] of [NamedData] to a single [Data].
*/
@DFInternal
public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData(
public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -157,7 +170,7 @@ public inline fun <T, R> Iterable<NamedData<T>>.reduceNamedToData(
}
@OptIn(DFInternal::class)
public inline fun <T, reified R> Iterable<NamedData<T>>.reduceNamedToData(
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
@ -168,7 +181,7 @@ public inline fun <T, reified R> Iterable<NamedData<T>>.reduceNamedToData(
/**
* Fold a [Iterable] of named data into a single [Data]
*/
public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
@ -182,52 +195,53 @@ public inline fun <T, reified R> Iterable<NamedData<T>>.foldNamedToData(
//DataSet operations
@DFInternal
public suspend fun <T, R> DataTree<T>.transform(
public suspend fun <T : Any, R : Any> DataSet<T>.map(
outputType: KType,
metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(outputType){
//quasi-synchronous processing of elements in the tree
asSequence().forEach { namedData: NamedData<T> ->
val newMeta = namedData.meta.toMutableMeta().apply(metaTransform).seal()
val d = Data(outputType, newMeta, coroutineContext, listOf(namedData)) {
block(namedData.awaitWithMeta())
): DataTree<R> = DataTree<R>(outputType) {
forEach {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
val d = Data(outputType, newMeta, coroutineContext, listOf(it)) {
block(it.awaitWithMeta())
}
put(namedData.name, d)
data(it.name, d)
}
}
@OptIn(DFInternal::class)
public suspend inline fun <T, reified R> DataTree<T>.transform(
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
noinline metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = this@transform.transform(typeOf<R>(), metaTransform, coroutineContext, block)
): DataTree<R> = map(typeOf<R>(), metaTransform, coroutineContext, block)
public inline fun <T> DataTree<T>.forEach(block: (NamedData<T>) -> Unit) {
asSequence().forEach(block)
public inline fun <T : Any> DataSet<T>.forEach(block: (NamedData<T>) -> Unit) {
for (d in this) {
block(d)
}
}
// DataSet reduction
@PublishedApi
internal fun DataTree<*>.joinMeta(): Meta = Meta {
asSequence().forEach {
val token = NameToken("data", it.name.toString())
set(token, it.meta)
internal fun DataSet<*>.joinMeta(): Meta = Meta {
forEach { (key, data) ->
val token = NameToken("data", key.toString())
set(token, data.meta)
}
}
public inline fun <T, reified R> DataTree<T>.reduceToData(
public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R,
): Data<R> = asSequence().asIterable().reduceNamedToData(meta, coroutineContext, transformation)
): Data<R> = asIterable().reduceNamedToData(meta, coroutineContext, transformation)
public inline fun <T, reified R> DataTree<T>.foldToData(
public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = asSequence().asIterable().foldNamedToData(initial, meta, coroutineContext, block)
): Data<R> = asIterable().foldNamedToData(initial, meta, coroutineContext, block)

View File

@ -1,10 +1,12 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.filter
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
import kotlin.reflect.full.isSubtypeOf
import kotlin.reflect.typeOf
@ -14,7 +16,7 @@ import kotlin.reflect.typeOf
* Cast the node to given type if the cast is possible or return null
*/
@Suppress("UNCHECKED_CAST")
private fun <R> Data<*>.castOrNull(type: KType): Data<R>? =
private fun <R : Any> Data<*>.castOrNull(type: KType): Data<R>? =
if (!this.type.isSubtypeOf(type)) {
null
} else {
@ -23,65 +25,61 @@ private fun <R> Data<*>.castOrNull(type: KType): Data<R>? =
}
}
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <R> Sequence<NamedData<*>>.filterByDataType(type: KType): Sequence<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Sequence<NamedData<R>>
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <R> Flow<NamedData<*>>.filterByDataType(type: KType): Flow<NamedData<R>> =
filter { it.type.isSubtypeOf(type) } as Flow<NamedData<R>>
/**
* Select all data matching given type and filters. Does not modify paths
*
* @param predicate additional filtering condition based on item name and meta. By default, accepts all
* @param predicate addition filtering condition based on item name and meta. By default, accepts all
*/
@DFInternal
public fun <R> DataTree<*>.filterByType(
@OptIn(DFExperimental::class)
public fun <R : Any> DataSet<*>.filterByType(
type: KType,
predicate: DataFilter = DataFilter.EMPTY,
): DataTree<R> = asSequence().filterByDataType<R>(type).filterData(predicate).toTree(type)
predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
): DataSource<R> = object : DataSource<R> {
override val dataType = type
override val coroutineContext: CoroutineContext
get() = (this@filterByType as? DataSource)?.coroutineContext ?: EmptyCoroutineContext
override val meta: Meta get() = this@filterByType.meta
private fun checkDatum(name: Name, datum: Data<*>): Boolean = datum.type.isSubtypeOf(type)
&& predicate(name, datum.meta)
override fun iterator(): Iterator<NamedData<R>> = iterator {
for(d in this@filterByType){
if(checkDatum(d.name,d.data)){
@Suppress("UNCHECKED_CAST")
yield(d as NamedData<R>)
}
}
}
override fun get(name: Name): Data<R>? = this@filterByType[name]?.let { datum ->
if (checkDatum(name, datum)) datum.castOrNull(type) else null
}
override val updates: Flow<Name> = this@filterByType.updates.filter { name ->
get(name)?.let { datum ->
checkDatum(name, datum)
} ?: false
}
}
/**
* Select a single datum of the appropriate type
*/
@OptIn(DFInternal::class)
public inline fun <reified R : Any> DataTree<*>.filterByType(
predicate: DataFilter = DataFilter.EMPTY,
): DataTree<R> = filterByType(typeOf<R>(), predicate)
public inline fun <reified R : Any> DataSet<*>.filterByType(
noinline predicate: (name: Name, meta: Meta) -> Boolean = { _, _ -> true },
): DataSet<R> = filterByType(typeOf<R>(), predicate)
/**
* Select a single datum if it is present and of given [type]
*/
public fun <R> DataTree<*>.getByType(type: KType, name: Name): NamedData<R>? =
public fun <R : Any> DataSet<*>.getByType(type: KType, name: Name): NamedData<R>? =
get(name)?.castOrNull<R>(type)?.named(name)
public inline fun <reified R : Any> DataTree<*>.getByType(name: Name): NamedData<R>? =
public inline fun <reified R : Any> DataSet<*>.getByType(name: Name): NamedData<R>? =
this@getByType.getByType(typeOf<R>(), name)
public inline fun <reified R : Any> DataTree<*>.getByType(name: String): NamedData<R>? =
public inline fun <reified R : Any> DataSet<*>.getByType(name: String): NamedData<R>? =
this@getByType.getByType(typeOf<R>(), Name.parse(name))
/**
* Select all data matching given type and filters. Does not modify paths
*
* @param predicate additional filtering condition based on item name and meta. By default, accepts all
*/
@DFInternal
public fun <R> ObservableDataTree<*>.filterByType(
type: KType,
scope: CoroutineScope,
predicate: DataFilter = DataFilter.EMPTY,
): ObservableDataTree<R> = asSequence()
.filterByDataType<R>(type)
.filterData(predicate)
.toObservableTree(type, scope, updates().filterByDataType<R>(type).filterData(predicate))
@OptIn(DFInternal::class)
public inline fun <reified R> ObservableDataTree<*>.filterByType(
scope: CoroutineScope,
predicate: DataFilter = DataFilter.EMPTY,
): ObservableDataTree<R> = filterByType(typeOf<R>(),scope,predicate)

View File

@ -1,27 +1,40 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
/**
* Append data to node
*/
context(DataSink<T>)
public infix fun <T : Any> String.put(data: Data<T>): Unit =
put(Name.parse(this), data)
context(DataSetBuilder<T>) public infix fun <T : Any> String.put(data: Data<T>): Unit =
data(Name.parse(this), data)
/**
* Append node
*/
context(DataSink<T>)
public infix fun <T : Any> String.put(dataSet: DataTree<T>): Unit =
branch(this, dataSet)
context(DataSetBuilder<T>) public infix fun <T : Any> String.put(dataSet: DataSet<T>): Unit =
node(Name.parse(this), dataSet)
/**
* Build and append node
*/
context(DataSink<T>)
public infix fun <T : Any> String.put(
block: DataSink<T>.() -> Unit,
): Unit = branch(Name.parse(this), block)
context(DataSetBuilder<T>) public infix fun <T : Any> String.put(
block: DataSetBuilder<T>.() -> Unit,
): Unit = node(Name.parse(this), block)
/**
* Copy given data set and mirror its changes to this [DataTreeBuilder] in [this@setAndObserve]. Returns an update [Job]
*/
context(DataSetBuilder<T>) public fun <T : Any> CoroutineScope.setAndWatch(
name: Name,
dataSet: DataSet<T>,
): Job = launch {
node(name, dataSet)
dataSet.updates.collect { nameInBranch ->
data(name + nameInBranch, dataSet.get(nameInBranch))
}
}

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.actions.Action
@ -10,13 +10,13 @@ import space.kscience.dataforge.actions.mapping
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals
@OptIn(DFExperimental::class)
@OptIn(DFExperimental::class, ExperimentalCoroutinesApi::class)
internal class ActionsTest {
@Test
fun testStaticMapAction() = runTest {
val data: DataTree<Int> = DataTree {
repeat(10) {
wrap(it.toString(), it)
static(it.toString(), it)
}
}
@ -28,26 +28,23 @@ internal class ActionsTest {
}
@Test
fun testDynamicMapAction() = runBlocking {
val source: MutableDataTree<Int> = MutableDataTree()
fun testDynamicMapAction() = runTest {
val data: DataSourceBuilder<Int> = DataSource(this)
val plusOne = Action.mapping<Int, Int> {
result { it + 1 }
}
val result = plusOne(source)
val result = plusOne(data)
repeat(10) {
source.wrap(it.toString(), it)
data.static(it.toString(), it)
}
delay(10)
source.close()
result.awaitClose()
delay(20)
assertEquals(2, result["1"]?.await())
data.close()
}
}

View File

@ -1,8 +1,6 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.asName
import kotlin.test.Test
@ -11,26 +9,27 @@ import kotlin.test.assertEquals
internal class DataTreeBuilderTest {
@Test
fun testTreeBuild() = runTest {
fun testTreeBuild() = runBlocking {
val node = DataTree<Any> {
"primary" put {
wrap("a", "a")
wrap("b", "b")
static("a", "a")
static("b", "b")
}
wrap("c.d", "c.d")
wrap("c.f", "c.f")
static("c.d", "c.d")
static("c.f", "c.f")
}
runBlocking {
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await())
}
assertEquals("a", node["primary.a"]?.await())
assertEquals("b", node["primary.b"]?.await())
assertEquals("c.d", node["c.d"]?.await())
assertEquals("c.f", node["c.f"]?.await())
}
@OptIn(DFExperimental::class)
@Test
fun testDataUpdate() = runTest {
val updateData = DataTree<Any> {
fun testDataUpdate() = runBlocking {
val updateData: DataTree<Any> = DataTree {
"update" put {
"a" put Data.static("a")
"b" put Data.static("b")
@ -39,30 +38,54 @@ internal class DataTreeBuilderTest {
val node = DataTree<Any> {
"primary" put {
wrap("a", "a")
wrap("b", "b")
static("a", "a")
static("b", "b")
}
wrap("root", "root")
putAll(updateData)
static("root", "root")
populateFrom(updateData)
}
assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await())
runBlocking {
assertEquals("a", node["update.a"]?.await())
assertEquals("a", node["primary.a"]?.await())
}
}
@Test
fun testDynamicUpdates() = runBlocking {
val subNode = MutableDataTree<Int>()
try {
lateinit var updateJob: Job
supervisorScope {
val subNode = DataSource<Int>(this) {
updateJob = launch {
repeat(10) {
delay(10)
static("value", it)
}
delay(10)
}
}
launch {
subNode.updatesWithData.collect {
println(it)
}
}
val rootNode = DataSource<Int>(this) {
setAndWatch("sub".asName(), subNode)
}
val rootNode = MutableDataTree<Int> {
watchBranch("sub".asName(), subNode)
launch {
rootNode.updatesWithData.collect {
println(it)
}
}
updateJob.join()
assertEquals(9, rootNode["sub.value"]?.await())
cancel()
}
} catch (t: Throwable) {
if (t !is CancellationException) throw t
}
repeat(10) {
subNode.wrap("value[$it]", it)
}
delay(20)
assertEquals(9, rootNode["sub.value[9]"]?.await())
}
}

View File

@ -4,7 +4,7 @@ plugins {
description = "IO module"
val ioVersion = "0.3.1"
val ioVersion = "0.3.0"
kscience {
jvm()

View File

@ -10,7 +10,10 @@ import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
import kotlin.reflect.typeOf
public interface EnvelopeFormat : IOFormat<Envelope>
public interface EnvelopeFormat : IOFormat<Envelope> {
override val type: KType get() = typeOf<Envelope>()
}
public fun EnvelopeFormat.read(input: Source): Envelope = readFrom(input)

View File

@ -17,7 +17,11 @@ import kotlin.reflect.typeOf
/**
* Reader of a custom object from input
*/
public fun interface IOReader<out T> {
public interface IOReader<out T> {
/**
* The type of object being read
*/
public val type: KType
public fun readFrom(source: Source): T
@ -28,6 +32,7 @@ public fun interface IOReader<out T> {
* no-op reader for binaries.
*/
public val binary: IOReader<Binary> = object : IOReader<Binary> {
override val type: KType = typeOf<Binary>()
override fun readFrom(source: Source): Binary = source.readByteArray().asBinary()
@ -37,6 +42,8 @@ public fun interface IOReader<out T> {
}
public inline fun <reified T> IOReader(crossinline read: Source.() -> T): IOReader<T> = object : IOReader<T> {
override val type: KType = typeOf<T>()
override fun readFrom(source: Source): T = source.read()
}
@ -49,24 +56,24 @@ public fun interface IOWriter<in T> {
*/
public interface IOFormat<T> : IOReader<T>, IOWriter<T>
public fun <T> Source.readWith(format: IOReader<T>): T = format.readFrom(this)
public fun <T : Any> Source.readWith(format: IOReader<T>): T = format.readFrom(this)
/**
* Read given binary as an object using given format
*/
public fun <T> Binary.readWith(format: IOReader<T>): T = read {
this.readWith(format)
public fun <T : Any> Binary.readWith(format: IOReader<T>): T = read {
readWith(format)
}
/**
* Write an object to the [Sink] with given [format]
*/
public fun <T> Sink.writeWith(format: IOWriter<T>, obj: T): Unit =
public fun <T : Any> Sink.writeWith(format: IOWriter<T>, obj: T): Unit =
format.writeTo(this, obj)
@DfType(IO_FORMAT_TYPE)
public interface IOFormatFactory<T> : Factory<IOFormat<T>>, Named {
public interface IOFormatFactory<T : Any> : Factory<IOFormat<T>>, Named {
/**
* Explicit type for dynamic type checks
*/
@ -79,7 +86,7 @@ public interface IOFormatFactory<T> : Factory<IOFormat<T>>, Named {
}
}
public fun <T> Binary(obj: T, format: IOWriter<T>): Binary = Binary { format.writeTo(this, obj) }
public fun <T : Any> Binary(obj: T, format: IOWriter<T>): Binary = Binary { format.writeTo(this, obj) }
public object FloatIOFormat : IOFormat<Float>, IOFormatFactory<Float> {
override fun build(context: Context, meta: Meta): IOFormat<Float> = this

View File

@ -5,6 +5,7 @@ import space.kscience.dataforge.io.EnvelopeFormatFactory.Companion.ENVELOPE_FORM
import space.kscience.dataforge.io.IOFormatFactory.Companion.IO_FORMAT_TYPE
import space.kscience.dataforge.io.MetaFormatFactory.Companion.META_FORMAT_TYPE
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
@ -20,11 +21,11 @@ public class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
@Suppress("UNCHECKED_CAST")
@DFInternal
public fun <T> resolveIOFormat(type: KType, meta: Meta): IOFormat<T>? =
public fun <T : Any> resolveIOFormat(type: KType, meta: Meta): IOFormat<T>? =
ioFormatFactories.singleOrNull { it.type == type }?.build(context, meta) as? IOFormat<T>
@OptIn(DFInternal::class)
public inline fun <reified T> resolveIOFormat(meta: Meta = Meta.EMPTY): IOFormat<T>? =
public inline fun <reified T : Any> resolveIOFormat(meta: Meta = Meta.EMPTY): IOFormat<T>? =
resolveIOFormat(typeOf<T>(), meta)

View File

@ -21,6 +21,8 @@ import kotlin.reflect.typeOf
*/
public interface MetaFormat : IOFormat<Meta> {
override val type: KType get() = typeOf<Meta>()
override fun writeTo(sink: Sink, obj: Meta) {
writeMeta(sink, obj, null)
}

View File

@ -6,6 +6,8 @@ import kotlinx.serialization.json.encodeToJsonElement
import kotlinx.serialization.serializer
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.misc.DFExperimental
import kotlin.reflect.KType
import kotlin.reflect.typeOf
/**
@ -13,10 +15,15 @@ import space.kscience.dataforge.misc.DFExperimental
*/
public interface MetaConverter<T>: MetaSpec<T> {
/**
* Runtime type of [T]
*/
public val type: KType
/**
* A descriptor for resulting meta
*/
override val descriptor: MetaDescriptor? get() = null
override val descriptor: MetaDescriptor get() = MetaDescriptor.EMPTY
/**
* Attempt conversion of [source] to an object or return null if conversion failed
@ -31,16 +38,22 @@ public interface MetaConverter<T>: MetaSpec<T> {
public companion object {
public val meta: MetaConverter<Meta> = object : MetaConverter<Meta> {
override val type: KType = typeOf<Meta>()
override fun readOrNull(source: Meta): Meta = source
override fun convert(obj: Meta): Meta = obj
}
public val value: MetaConverter<Value> = object : MetaConverter<Value> {
override val type: KType = typeOf<Value>()
override fun readOrNull(source: Meta): Value? = source.value
override fun convert(obj: Value): Meta = Meta(obj)
}
public val string: MetaConverter<String> = object : MetaConverter<String> {
override val type: KType = typeOf<String>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.STRING)
}
@ -51,6 +64,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val boolean: MetaConverter<Boolean> = object : MetaConverter<Boolean> {
override val type: KType = typeOf<Boolean>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.BOOLEAN)
}
@ -60,6 +75,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val number: MetaConverter<Number> = object : MetaConverter<Number> {
override val type: KType = typeOf<Number>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -69,6 +86,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val double: MetaConverter<Double> = object : MetaConverter<Double> {
override val type: KType = typeOf<Double>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -78,6 +97,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val float: MetaConverter<Float> = object : MetaConverter<Float> {
override val type: KType = typeOf<Float>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -87,6 +108,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val int: MetaConverter<Int> = object : MetaConverter<Int> {
override val type: KType = typeOf<Int>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -96,6 +119,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public val long: MetaConverter<Long> = object : MetaConverter<Long> {
override val type: KType = typeOf<Long>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.NUMBER)
}
@ -105,6 +130,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
}
public inline fun <reified E : Enum<E>> enum(): MetaConverter<E> = object : MetaConverter<E> {
override val type: KType = typeOf<E>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.STRING)
allowedValues(enumValues<E>())
@ -120,6 +147,8 @@ public interface MetaConverter<T>: MetaSpec<T> {
writer: (T) -> Value = { Value.of(it) },
reader: (Value) -> T,
): MetaConverter<List<T>> = object : MetaConverter<List<T>> {
override val type: KType = typeOf<List<T>>()
override val descriptor: MetaDescriptor = MetaDescriptor {
valueType(ValueType.LIST)
}
@ -136,6 +165,7 @@ public interface MetaConverter<T>: MetaSpec<T> {
public inline fun <reified T> serializable(
descriptor: MetaDescriptor? = null,
): MetaConverter<T> = object : MetaConverter<T> {
override val type: KType = typeOf<T>()
private val serializer: KSerializer<T> = serializer()
override fun readOrNull(source: Meta): T? {

View File

@ -166,9 +166,9 @@ public inline fun <T : Scheme> T.copy(spec: SchemeSpec<T>, block: T.() -> Unit =
/**
* A specification for simplified generation of wrappers
*/
public open class SchemeSpec<T : Scheme>(
public open class SchemeSpec<out T : Scheme>(
private val builder: () -> T,
) : MetaConverter<T> {
) : MetaSpec<T> {
override val descriptor: MetaDescriptor? get() = null
@ -187,8 +187,6 @@ public open class SchemeSpec<T : Scheme>(
it.initialize(MutableMeta(), Meta.EMPTY, descriptor)
}
override fun convert(obj: T): Meta = obj.meta
/**
* A convenience method to use specifications in builders
*/

View File

@ -113,13 +113,6 @@ public class Name(public val tokens: List<NameToken>) {
}
}
/**
* Transform this [Name] to a string without escaping special characters in tokens.
*
* Parsing it back will produce a valid, but different name
*/
public fun Name.toStringUnescaped(): String = tokens.joinToString(separator = Name.NAME_SEPARATOR) { it.toStringUnescaped() }
public operator fun Name.get(i: Int): NameToken = tokens[i]
/**

View File

@ -0,0 +1,46 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataTree.Companion.META_ITEM_NAME_TOKEN
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.IOReader
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
public abstract class EnvelopeTask<T : Any>(
override val descriptor: MetaDescriptor?,
private val reader: IOReader<T>,
) : Task<T> {
public abstract suspend fun produceEnvelopes(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): Map<Name, Envelope>
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> =
Result(workspace, taskName, taskMeta, reader, produceEnvelopes(workspace, taskName, taskMeta))
private class Result<T : Any>(
override val workspace: Workspace,
override val taskName: Name,
override val taskMeta: Meta,
val reader: IOReader<T>,
envelopes: Map<Name, Envelope>,
) : TaskResult<T> {
private val dataMap = envelopes.mapValues {
workspace.wrapData(it.value.toData(reader), it.key, taskName, taskMeta)
}
override val meta: Meta get() = dataMap[META_ITEM_NAME_TOKEN.asName()]?.meta ?: Meta.EMPTY
override val dataType: KType get() = reader.type
override fun iterator(): Iterator<TaskData<T>> = dataMap.values.iterator()
override fun get(name: Name): TaskData<T>? = dataMap[name]
}
}

View File

@ -1,9 +1,9 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.DataSetBuilder
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.GoalExecutionRestriction
import space.kscience.dataforge.data.MutableDataTree
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaRepr
import space.kscience.dataforge.meta.MetaSpec
@ -20,7 +20,7 @@ import kotlin.reflect.typeOf
* In general no computations should be made until the result is called.
*/
@DfType(TYPE)
public interface Task<T> : Described {
public interface Task<out T : Any> : Described {
/**
* A task identification string used to compare tasks and check task body for change
@ -45,7 +45,7 @@ public interface Task<T> : Described {
/**
* A [Task] with [MetaSpec] for wrapping and unwrapping task configuration
*/
public interface TaskWithSpec<T, C : Any> : Task<T> {
public interface TaskWithSpec<out T : Any, C : Any> : Task<T> {
public val spec: MetaSpec<C>
override val descriptor: MetaDescriptor? get() = spec.descriptor
@ -61,12 +61,12 @@ public interface TaskWithSpec<T, C : Any> : Task<T> {
// block: C.() -> Unit = {},
//): TaskResult<T> = execute(workspace, taskName, spec(block))
public class TaskResultBuilder<T>(
public class TaskResultBuilder<in T : Any>(
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
private val dataSink: DataSink<T>,
) : DataSink<T> by dataSink
private val dataDrop: DataSetBuilder<T>,
) : DataSetBuilder<T> by dataDrop
/**
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
@ -88,17 +88,12 @@ public fun <T : Any> Task(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): TaskResult<T> {
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
val dataset = MutableDataTree<T>(resultType, workspace.context).apply {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply {
withContext(GoalExecutionRestriction() + workspace.goalLogger) {
builder()
}
}
val dataset = DataTree<T>(resultType) {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder() }
}
return workspace.wrapResult(dataset, taskName, taskMeta)
workspace.wrapResult(dataset, taskName, taskMeta)
}
}
@ -116,7 +111,6 @@ public inline fun <reified T : Any> Task(
* @param specification a specification for task configuration
* @param builder for resulting data set
*/
@Suppress("FunctionName")
public fun <T : Any, C : MetaRepr> Task(
resultType: KType,
@ -132,7 +126,7 @@ public fun <T : Any, C : MetaRepr> Task(
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
val taskMeta = configuration.toMeta()
val dataset = MutableDataTree<T>(resultType, this).apply {
val dataset = DataTree<T>(resultType) {
TaskResultBuilder(workspace, taskName, taskMeta, this).apply { builder(configuration) }
}
workspace.wrapResult(dataset, taskName, taskMeta)

View File

@ -0,0 +1,50 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.NamedData
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/**
* A [Workspace]-locked [NamedData], that serves as a computation model.
*/
public interface TaskData<out T : Any> : NamedData<T> {
/**
* The [Workspace] this data belongs to
*/
public val workspace: Workspace
/**
* The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used.
*/
public val taskName: Name
/**
* Stage configuration used to produce this data.
*/
public val taskMeta: Meta
/**
* Dependencies that allow to compute transitive dependencies as well.
*/
// override val dependencies: Collection<TaskData<*>>
}
private class TaskDataImpl<out T : Any>(
override val workspace: Workspace,
override val data: Data<T>,
override val name: Name,
override val taskName: Name,
override val taskMeta: Meta,
) : TaskData<T>, Data<T> by data {
// override val dependencies: Collection<TaskData<*>> = data.dependencies.map {
// it as? TaskData<*> ?: error("TaskData can't depend on external data")
// }
}
/**
* Adopt data into this workspace
*/
public fun <T : Any> Workspace.wrapData(data: Data<T>, name: Name, taskName: Name, taskMeta: Meta): TaskData<T> =
TaskDataImpl(this, data, name, taskName, taskMeta)

View File

@ -1,41 +1,54 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.data.launch
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/**
* A result of a [Task]
* @param workspace the [Workspace] that produced the result
* @param taskName the name of the task that produced the result
* @param taskMeta The configuration of the task that produced the result
*/
public data class TaskResult<T>(
public val content: ObservableDataTree<T>,
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
) : ObservableDataTree<T> by content
public interface TaskResult<out T : Any> : DataSet<T> {
/**
* The [Workspace] this [DataSet] belongs to
*/
public val workspace: Workspace
/**
* The [Name] of the stage that produced this [DataSet]
*/
public val taskName: Name
/**
* The configuration of the stage that produced this [DataSet]
*/
public val taskMeta: Meta
override fun iterator(): Iterator<TaskData<T>>
override fun get(name: Name): TaskData<T>?
}
private class TaskResultImpl<out T : Any>(
override val workspace: Workspace,
override val taskName: Name,
override val taskMeta: Meta,
val dataSet: DataSet<T>,
) : TaskResult<T>, DataSet<T> by dataSet {
override fun iterator(): Iterator<TaskData<T>> = iterator {
dataSet.forEach {
yield(workspace.wrapData(it, it.name, taskName, taskMeta))
}
}
override fun get(name: Name): TaskData<T>? = dataSet[name]?.let {
workspace.wrapData(it, name, taskName, taskMeta)
}
}
/**
* Wrap data into [TaskResult]
*/
public fun <T> Workspace.wrapResult(data: ObservableDataTree<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
TaskResult(data, this, taskName, taskMeta)
/**
* Start computation for all data elements of this node.
* The resulting [Job] is completed only when all of them are completed.
*/
public fun TaskResult<*>.launch(scope: CoroutineScope): Job {
val jobs = asSequence().map {
it.data.launch(scope)
}.toList()
return scope.launch { jobs.joinAll() }
}
public fun <T : Any> Workspace.wrapResult(dataSet: DataSet<T>, taskName: Name, taskMeta: Meta): TaskResult<T> =
TaskResultImpl(this, taskName, taskMeta, dataSet)

View File

@ -1,32 +1,29 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.CoroutineScope
import space.kscience.dataforge.context.ContextAware
import space.kscience.dataforge.data.*
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.asSequence
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.misc.DfType
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.provider.Provider
import kotlin.coroutines.CoroutineContext
public fun interface DataSelector<T> {
public suspend fun select(workspace: Workspace, meta: Meta): DataTree<T>
public interface DataSelector<T: Any>{
public suspend fun select(workspace: Workspace, meta: Meta): DataSet<T>
}
/**
* An environment for pull-mode computation
*/
@DfType(Workspace.TYPE)
public interface Workspace : ContextAware, Provider, CoroutineScope {
override val coroutineContext: CoroutineContext get() = context.coroutineContext
public interface Workspace : ContextAware, Provider {
/**
* The whole data node for current workspace
*/
public val data: ObservableDataTree<*>
public val data: TaskResult<*>
/**
* All targets associated with the workspace
@ -40,7 +37,7 @@ public interface Workspace : ContextAware, Provider, CoroutineScope {
override fun content(target: String): Map<Name, Any> {
return when (target) {
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key) }
"target", Meta.TYPE -> targets.mapKeys { Name.parse(it.key)}
Task.TYPE -> tasks
Data.TYPE -> data.asSequence().associateBy { it.name }
else -> emptyMap()
@ -52,7 +49,7 @@ public interface Workspace : ContextAware, Provider, CoroutineScope {
return task.execute(this, taskName, taskMeta)
}
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): Data<*>? =
public suspend fun produceData(taskName: Name, taskMeta: Meta, name: Name): TaskData<*>? =
produce(taskName, taskMeta)[name]
public companion object {

View File

@ -5,9 +5,9 @@ import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.ContextBuilder
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.MutableDataTree
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.DataSource
import space.kscience.dataforge.data.DataSourceBuilder
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.descriptors.MetaDescriptorBuilder
@ -17,14 +17,13 @@ import space.kscience.dataforge.names.asName
import kotlin.collections.set
import kotlin.properties.PropertyDelegateProvider
import kotlin.properties.ReadOnlyProperty
import kotlin.reflect.typeOf
public data class TaskReference<T>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
public data class TaskReference<T : Any>(public val taskName: Name, public val task: Task<T>) : DataSelector<T> {
@Suppress("UNCHECKED_CAST")
override suspend fun select(workspace: Workspace, meta: Meta): DataTree<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> {
if (workspace.tasks[taskName] == task) {
return workspace.produce(taskName, meta) as DataTree<T>
return workspace.produce(taskName, meta) as TaskResult<T>
} else {
error("Task $taskName does not belong to the workspace")
}
@ -46,7 +45,7 @@ public inline fun <reified T : Any> TaskContainer.registerTask(
): Unit = registerTask(Name.parse(name), Task(MetaDescriptor(descriptorBuilder), builder))
/**
* Create and register a new task
* Create a new t
*/
public inline fun <reified T : Any> TaskContainer.buildTask(
name: String,
@ -110,7 +109,7 @@ public class WorkspaceBuilder(
private val coroutineScope: CoroutineScope = parentContext,
) : TaskContainer {
private var context: Context? = null
private val data = MutableDataTree<Any?>(typeOf<Any?>(), coroutineScope)
private val data = DataSource<Any>(coroutineScope)
private val targets: HashMap<String, Meta> = HashMap()
private val tasks = HashMap<Name, Task<*>>()
private var cache: WorkspaceCache? = null
@ -125,7 +124,7 @@ public class WorkspaceBuilder(
/**
* Define intrinsic data for the workspace
*/
public fun data(builder: DataSink<Any?>.() -> Unit) {
public fun data(builder: DataSourceBuilder<Any>.() -> Unit) {
data.apply(builder)
}
@ -150,7 +149,7 @@ public class WorkspaceBuilder(
public fun build(): Workspace {
val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result ->
cache?.cache(result) ?: result
cache?.evaluate(result) ?: result
}
return WorkspaceImpl(context ?: parentContext, data, targets, tasks, postProcess)
}

View File

@ -1,5 +1,5 @@
package space.kscience.dataforge.workspace
public interface WorkspaceCache {
public suspend fun <T> cache(result: TaskResult<T>): TaskResult<T>
public suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T>
}

View File

@ -2,19 +2,21 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.data.ObservableDataTree
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
internal class WorkspaceImpl internal constructor(
override val context: Context,
override val data: ObservableDataTree<*>,
data: DataSet<*>,
override val targets: Map<String, Meta>,
tasks: Map<Name, Task<*>>,
private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>,
) : Workspace {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>> by lazy { context.gather<Task<*>>(Task.TYPE) + tasks }
override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> {

View File

@ -4,14 +4,13 @@ import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFInternal
import kotlin.reflect.typeOf
/**
* Convert an [Envelope] to a data via given format. The actual parsing is done lazily.
*/
@OptIn(DFInternal::class)
public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(typeOf<T>(), meta) {
public fun <T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(format.type, meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}

View File

@ -2,10 +2,9 @@ package space.kscience.dataforge.workspace
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.branch
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.transform
import space.kscience.dataforge.data.map
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
@ -25,22 +24,22 @@ public val TaskResultBuilder<*>.defaultDependencyMeta: Meta
* @param selector a workspace data selector. Could be either task selector or initial data selector.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
*/
public suspend fun <T> TaskResultBuilder<*>.from(
public suspend fun <T : Any> TaskResultBuilder<*>.from(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
): DataTree<T> = selector.select(workspace, dependencyMeta)
): DataSet<T> = selector.select(workspace, dependencyMeta)
public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
plugin: P,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
): TaskResult<T> {
): DataSet<T> {
require(workspace.context.plugins.contains(plugin)) { "Plugin $plugin is not loaded into $workspace" }
val taskReference: TaskReference<T> = plugin.selectorBuilder()
val res = workspace.produce(plugin.name + taskReference.taskName, dependencyMeta)
//TODO add explicit check after https://youtrack.jetbrains.com/issue/KT-32956
@Suppress("UNCHECKED_CAST")
return res as TaskResult<T>
return res as TaskResult<T>
}
/**
@ -50,11 +49,11 @@ public suspend inline fun <T, reified P : WorkspacePlugin> TaskResultBuilder<*>.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
* @param selectorBuilder a builder of task from the plugin.
*/
public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
public suspend inline fun <reified T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
pluginFactory: PluginFactory<P>,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
): TaskResult<T> {
): DataSet<T> {
val plugin = workspace.context.plugins[pluginFactory]
?: error("Plugin ${pluginFactory.tag} not loaded into workspace context")
val taskReference: TaskReference<T> = plugin.selectorBuilder()
@ -65,7 +64,9 @@ public suspend inline fun <reified T, reified P : WorkspacePlugin> TaskResultBui
}
public val TaskResultBuilder<*>.allData: DataSelector<*>
get() = DataSelector { workspace, _ -> workspace.data }
get() = object : DataSelector<Any> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<Any> = workspace.data
}
/**
* Perform a lazy mapping task using given [selector] and one-to-one [action].
@ -77,7 +78,7 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
* @param action process individual data asynchronously.
*/
@DFExperimental
public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach(
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.transformEach(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
@ -89,31 +90,31 @@ public suspend inline fun <T, reified R> TaskResultBuilder<R>.transformEach(
dataMetaTransform(data.name)
}
val res = data.transform(meta, workspace.context.coroutineContext) {
val res = data.map(meta, workspace.context.coroutineContext) {
action(it, data.name, meta)
}
put(data.name, res)
data(data.name, res)
}
}
/**
* Set given [dataSet] as a task result.
*/
public fun <T> TaskResultBuilder<T>.result(dataSet: DataTree<T>) {
branch(dataSet)
public fun <T : Any> TaskResultBuilder<T>.result(dataSet: DataSet<T>) {
node(Name.EMPTY, dataSet)
}
/**
* Use provided [action] to fill the result
*/
@DFExperimental
public suspend inline fun <T, reified R> TaskResultBuilder<R>.actionFrom(
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.actionFrom(
selector: DataSelector<T>,
action: Action<T, R>,
action: Action<T,R>,
dependencyMeta: Meta = defaultDependencyMeta,
) {
branch(action.execute(from(selector, dependencyMeta), dependencyMeta))
node(Name.EMPTY, action.execute(from(selector,dependencyMeta), dependencyMeta))
}

View File

@ -1,6 +1,5 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.map
import kotlinx.io.*
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
@ -10,10 +9,12 @@ import kotlinx.serialization.serializer
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.request
import space.kscience.dataforge.data.*
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.withIndex
import java.nio.file.Path
import kotlin.io.path.deleteIfExists
@ -21,7 +22,7 @@ import kotlin.io.path.div
import kotlin.io.path.exists
import kotlin.reflect.KType
public class JsonIOFormat<T>(private val type: KType) : IOFormat<T> {
public class JsonIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
@Suppress("UNCHECKED_CAST")
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
@ -34,7 +35,7 @@ public class JsonIOFormat<T>(private val type: KType) : IOFormat<T> {
}
@OptIn(ExperimentalSerializationApi::class)
public class ProtobufIOFormat<T>(private val type: KType) : IOFormat<T> {
public class ProtobufIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
@Suppress("UNCHECKED_CAST")
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
@ -52,14 +53,14 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
// private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
@OptIn(DFExperimental::class, DFInternal::class)
override suspend fun <T> cache(result: TaskResult<T>): TaskResult<T> {
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
val io = result.workspace.context.request(IOPlugin)
val format: IOFormat<T> = io.resolveIOFormat(result.dataType, result.taskMeta)
?: ProtobufIOFormat(result.dataType)
?: error("Can't resolve IOFormat for ${result.dataType}")
fun cacheOne(data: NamedData<T>): NamedData<T> {
fun evaluateDatum(data: TaskData<T>): TaskData<T> {
val path = cacheDirectory /
result.taskName.withIndex(result.taskMeta.hashCode().toString(16)).toString() /
@ -91,14 +92,15 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
}
}
return datum.named(data.name)
return data.workspace.wrapData(datum, data.name, data.taskName, data.taskMeta)
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> =
result.iterator().asSequence().map { evaluateDatum(it) }.iterator()
val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
override fun get(name: Name): TaskData<T>? = result[name]?.let { evaluateDatum(it) }
}
}
}

View File

@ -1,39 +1,39 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
import kotlin.reflect.full.isSubtypeOf
private data class TaskResultId(val name: Name, val meta: Meta)
private typealias TaskResultId = Pair<Name, Meta>
public class InMemoryWorkspaceCache : WorkspaceCache {
private val cache = HashMap<TaskResultId, HashMap<Name, Data<*>>>()
// never do that at home!
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
@Suppress("UNCHECKED_CAST")
private fun <T> Data<*>.checkType(taskType: KType): Data<T> =
if (type.isSubtypeOf(taskType)) this as Data<T>
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> =
if (type.isSubtypeOf(taskType)) this as TaskData<T>
else error("Cached data type mismatch: expected $taskType but got $type")
override suspend fun <T> cache(result: TaskResult<T>): TaskResult<T> {
fun cacheOne(data: NamedData<T>): NamedData<T> {
val cachedData = cache.getOrPut(TaskResultId(result.taskName, result.taskMeta)){
HashMap()
}.getOrPut(data.name){
data.data
}
return cachedData.checkType<T>(result.dataType).named(data.name)
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
for (d: TaskData<T> in result) {
cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d }
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = (cache[result.taskName to result.taskMeta]
?.values?.map { it.checkType<T>(result.dataType) }
?: emptyList()).iterator()
val cachedTree = result.asSequence().map { cacheOne(it) }
.toObservableTree(result.dataType, result.workspace, result.updates().map { cacheOne(it) })
return result.workspace.wrapResult(cachedTree, result.taskName, result.taskMeta)
override fun get(name: Name): TaskData<T>? {
val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null
//TODO check types
return cached.checkType(result.dataType)
}
}
}
}

View File

@ -0,0 +1,318 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.warn
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import space.kscience.dataforge.workspace.FileData.Companion.DEFAULT_IGNORE_EXTENSIONS
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import java.time.Instant
import kotlin.io.path.*
import kotlin.reflect.KType
import kotlin.reflect.typeOf
//public typealias FileFormatResolver<T> = (Path, Meta) -> IOFormat<T>
public typealias FileFormatResolver<T> = (path: Path, meta: Meta) -> IOReader<T>?
/**
* A data based on a filesystem [Path]
*/
public class FileData<T> internal constructor(private val data: Data<T>, public val path: Path) : Data<T> by data {
// public val path: String? get() = meta[META_FILE_PATH_KEY].string
// public val extension: String? get() = meta[META_FILE_EXTENSION_KEY].string
//
public val createdTime: Instant? get() = meta[FILE_CREATE_TIME_KEY].string?.let { Instant.parse(it) }
public val updatedTime: Instant? get() = meta[FILE_UPDATE_TIME_KEY].string?.let { Instant.parse(it) }
public companion object {
public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created"
public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated"
public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
}
}
/**
* Read data with supported envelope format and binary format. If envelope format is null, then read binary directly from file.
* The operation is blocking since it must read meta header. The reading of envelope body is lazy
*/
@OptIn(DFInternal::class)
@DFExperimental
public fun <T : Any> IOPlugin.readDataFile(
path: Path,
formatResolver: FileFormatResolver<T>,
): FileData<T>? {
val envelope = readEnvelopeFile(path, true)
val format = formatResolver(path, envelope.meta) ?: return null
val updatedMeta = envelope.meta.copy {
FileData.FILE_PATH_KEY put path.toString()
FileData.FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return FileData(
Data(format.type, updatedMeta) {
(envelope.data ?: Binary.EMPTY).readWith(format)
},
path
)
}
context(IOPlugin) @DFExperimental
public fun <T : Any> DataSetBuilder<T>.directory(
path: Path,
ignoreExtensions: Set<String>,
formatResolver: FileFormatResolver<T>,
) {
Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(childPath))
} else if (!fileName.startsWith("@")) {
file(childPath, ignoreExtensions, formatResolver)
}
}
}
/**
* Read the directory as a data node. If [path] is a zip archive, read it as directory
*/
@DFExperimental
@DFInternal
public fun <T : Any> IOPlugin.readDataDirectory(
type: KType,
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
formatResolver: FileFormatResolver<T>,
): DataTree<T> {
//read zipped data node
if (path.fileName != null && path.fileName.toString().endsWith(".zip")) {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, mapOf("create" to "true"))
return readDataDirectory(type, fs.rootDirectories.first(), ignoreExtensions, formatResolver)
}
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataTree(type) {
meta {
FileData.FILE_PATH_KEY put path.toString()
}
directory(path, ignoreExtensions, formatResolver)
}
}
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.readDataDirectory(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
noinline formatResolver: FileFormatResolver<T>,
): DataTree<T> = readDataDirectory(typeOf<T>(), path, ignoreExtensions, formatResolver)
/**
* Read a raw binary data tree from the directory. All files are read as-is (save for meta files).
*/
@DFExperimental
public fun IOPlugin.readRawDirectory(
path: Path,
ignoreExtensions: Set<String> = emptySet(),
): DataTree<Binary> = readDataDirectory(path, ignoreExtensions) { _, _ -> IOReader.binary }
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@DFInternal
@DFExperimental
public fun <T : Any> IOPlugin.monitorDataDirectory(
type: KType,
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
formatResolver: FileFormatResolver<T>,
): DataSource<T> {
if (path.fileName.toString().endsWith(".zip")) error("Monitoring not supported for ZipFS")
if (!Files.isDirectory(path)) error("Provided path $path is not a directory")
return DataSource(type, context) {
directory(path, ignoreExtensions, formatResolver)
launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService()
path.register(
watchService,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE
)
do {
val key = watchService.take()
if (key != null) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
remove(eventPath.toName())
} else {
val fileName = eventPath.fileName.toString()
if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta(readMetaFile(eventPath))
} else if (!fileName.startsWith("@")) {
file(eventPath, ignoreExtensions, formatResolver)
}
}
}
key.reset()
}
} while (isActive && key != null)
}
}
}
/**
* Start monitoring given directory ([path]) as a [DataSource].
*/
@OptIn(DFInternal::class)
@DFExperimental
public inline fun <reified T : Any> IOPlugin.monitorDataDirectory(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
noinline formatResolver: FileFormatResolver<T>,
): DataSource<T> = monitorDataDirectory(typeOf<T>(), path, ignoreExtensions, formatResolver)
/**
* Read and monitor raw binary data tree from the directory. All files are read as-is (save for meta files).
*/
@DFExperimental
public fun IOPlugin.monitorRawDirectory(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
): DataSource<Binary> = monitorDataDirectory(path, ignoreExtensions) { _, _ -> IOReader.binary }
/**
* Write data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
*/
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeDataDirectory(
path: Path,
tree: DataTree<T>,
format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
) {
withContext(Dispatchers.IO) {
if (!Files.exists(path)) {
Files.createDirectories(path)
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
tree.items.forEach { (token, item) ->
val childPath = path.resolve(token.toString())
when (item) {
is DataTreeItem.Node -> {
writeDataDirectory(childPath, item.tree, format, envelopeFormat)
}
is DataTreeItem.Leaf -> {
val envelope = item.data.toEnvelope(format)
if (envelopeFormat != null) {
writeEnvelopeFile(childPath, envelope, envelopeFormat)
} else {
writeEnvelopeDirectory(childPath, envelope)
}
}
}
}
val treeMeta = tree.meta
writeMetaFile(path, treeMeta)
}
}
/**
* Reads the specified resources and returns a [DataTree] containing the data.
*
* @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
* @return A DataTree containing the data read from the resources.
*/
@DFExperimental
private fun IOPlugin.readResources(
vararg resources: String,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
): DataTree<Binary> {
// require(resource.isNotBlank()) {"Can't mount root resource tree as data root"}
return DataTree {
resources.forEach { resource ->
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error(
"Resource with name $resource is not resolved"
)
node(resource, readRawDirectory(path))
}
}
}
/**
* Add file/directory-based data tree item
*
* @param ignoreExtensions a list of file extensions for which extension should be cut from the resulting item name
*/
context(IOPlugin)
@OptIn(DFInternal::class)
@DFExperimental
public fun <T : Any> DataSetBuilder<T>.file(
path: Path,
ignoreExtensions: Set<String> = DEFAULT_IGNORE_EXTENSIONS,
formatResolver: FileFormatResolver<out T>,
) {
fun defaultPath() = if (path.extension in ignoreExtensions) path.nameWithoutExtension else path.name
try {
//If path is a single file or a special directory, read it as single datum
if (!Files.isDirectory(path) || Files.list(path).allMatch { it.fileName.toString().startsWith("@") }) {
val data = readDataFile(path, formatResolver)
if (data == null) {
logger.warn { "File format is not resolved for $path. Skipping." }
return
}
val name: String = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: defaultPath()
data(name.asName(), data)
} else {
//otherwise, read as directory
val data: DataTree<T> = readDataDirectory(dataType, path, ignoreExtensions, formatResolver)
val name = data.meta[Envelope.ENVELOPE_NAME_KEY].string ?: defaultPath()
node(name.asName(), data)
}
} catch (ex: Exception) {
logger.error { "Failed to read file or directory at $path: ${ex.message}" }
}
}

View File

@ -1,186 +0,0 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.*
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.DataSink
import space.kscience.dataforge.data.StaticData
import space.kscience.dataforge.io.*
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.copy
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import space.kscience.dataforge.names.asName
import space.kscience.dataforge.names.plus
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardWatchEventKinds
import java.nio.file.WatchEvent
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.*
import kotlin.reflect.typeOf
public object FileData {
public val FILE_KEY: Name = "file".asName()
public val FILE_PATH_KEY: Name = FILE_KEY + "path"
public val FILE_EXTENSION_KEY: Name = FILE_KEY + "extension"
public val FILE_CREATE_TIME_KEY: Name = FILE_KEY + "created"
public val FILE_UPDATE_TIME_KEY: Name = FILE_KEY + "updated"
public const val DF_FILE_EXTENSION: String = "df"
public val DEFAULT_IGNORE_EXTENSIONS: Set<String> = setOf(DF_FILE_EXTENSION)
}
/**
* Read data with supported envelope format and binary format. If the envelope format is null, then read binary directly from file.
* The operation is blocking since it must read the meta header. The reading of envelope body is lazy
*/
@OptIn(DFExperimental::class)
public fun IOPlugin.readFileData(
path: Path,
): Data<Binary> {
val envelope = readEnvelopeFile(path, true)
val updatedMeta = envelope.meta.copy {
FileData.FILE_PATH_KEY put path.toString()
FileData.FILE_EXTENSION_KEY put path.extension
val attributes = path.readAttributes<BasicFileAttributes>()
FileData.FILE_UPDATE_TIME_KEY put attributes.lastModifiedTime().toInstant().toString()
FileData.FILE_CREATE_TIME_KEY put attributes.creationTime().toInstant().toString()
}
return StaticData(
typeOf<Binary>(),
envelope.data ?: Binary.EMPTY,
updatedMeta
)
}
public fun DataSink<Binary>.file(io: IOPlugin, name: Name, path: Path) {
if (!path.isRegularFile()) error("Only regular files could be handled by this function")
put(name, io.readFileData(path))
}
public fun DataSink<Binary>.directory(
io: IOPlugin,
name: Name,
path: Path,
) {
if (!path.isDirectory()) error("Only directories could be handled by this function")
//process root data
var dataBinary: Binary? = null
var meta: Meta? = null
Files.list(path).forEach { childPath ->
val fileName = childPath.fileName.toString()
if (fileName == IOPlugin.DATA_FILE_NAME) {
dataBinary = childPath.asBinary()
} else if (fileName.startsWith(IOPlugin.META_FILE_NAME)) {
meta = io.readMetaFileOrNull(childPath)
} else if (!fileName.startsWith("@")) {
val token = if (childPath.isRegularFile() && childPath.extension in FileData.DEFAULT_IGNORE_EXTENSIONS) {
NameToken(childPath.nameWithoutExtension)
} else {
NameToken(childPath.name)
}
files(io, name + token, childPath)
}
}
//set data if it is relevant
if (dataBinary != null || meta != null) {
put(
name,
StaticData(
typeOf<Binary>(),
dataBinary ?: Binary.EMPTY,
meta ?: Meta.EMPTY
)
)
}
}
public fun DataSink<Binary>.files(
io: IOPlugin,
name: Name,
path: Path,
) {
if (path.isRegularFile() && path.extension == "zip") {
//Using explicit Zip file system to avoid bizarre compatibility bugs
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
val fs = fsProvider.newFileSystem(path, emptyMap<String, Any>())
files(io, name, fs.rootDirectories.first())
}
if (path.isRegularFile()) {
file(io, name, path)
} else {
directory(io, name, path)
}
}
private fun Path.toName() = Name(map { NameToken.parse(it.nameWithoutExtension) })
@DFInternal
@DFExperimental
public fun DataSink<Binary>.monitorFiles(
io: IOPlugin,
name: Name,
path: Path,
scope: CoroutineScope = io.context,
): Job {
files(io, name, path)
return scope.launch(Dispatchers.IO) {
val watchService = path.fileSystem.newWatchService()
path.register(
watchService,
StandardWatchEventKinds.ENTRY_DELETE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_CREATE
)
do {
val key = watchService.take()
if (key != null) {
for (event: WatchEvent<*> in key.pollEvents()) {
val eventPath = event.context() as Path
if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
put(eventPath.toName(), null)
} else {
val fileName = eventPath.fileName.toString()
if (!fileName.startsWith("@")) {
files(io, name, eventPath)
}
}
}
key.reset()
}
} while (isActive && key != null)
}
}
/**
* @param resources The names of the resources to read.
* @param classLoader The class loader to use for loading the resources. By default, it uses the current thread's context class loader.
*/
@DFExperimental
public fun DataSink<Binary>.resources(
io: IOPlugin,
vararg resources: String,
classLoader: ClassLoader = Thread.currentThread().contextClassLoader,
) {
resources.forEach { resource ->
val path = classLoader.getResource(resource)?.toURI()?.toPath() ?: error(
"Resource with name $resource is not resolved"
)
files(io, resource.asName(), path)
}
}

View File

@ -1,6 +1,6 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.filterByType
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
@ -16,13 +16,14 @@ import space.kscience.dataforge.names.matches
*/
@OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> =
DataSelector<T> { workspace, _ ->
workspace.data.filterByType { name, _, _ ->
namePattern == null || name.matches(namePattern)
}
object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> =
workspace.data.filterByType { name, _ ->
namePattern == null || name.matches(namePattern)
}
}
public suspend inline fun <reified T : Any> TaskResultBuilder<*>.fromTask(
task: Name,
taskMeta: Meta = Meta.EMPTY,
): DataTree<T> = workspace.produce(task, taskMeta).filterByType()
): DataSet<T> = workspace.produce(task, taskMeta).filterByType()

View File

@ -1,72 +0,0 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.spi.FileSystemProvider
import kotlin.io.path.Path
import kotlin.io.path.createDirectories
import kotlin.io.path.exists
import kotlin.io.path.extension
/**
* Write the data tree to existing directory or create a new one using default [java.nio.file.FileSystem] provider
*
* @param nameToPath a [Name] to [Path] converter used to create
*/
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeDataDirectory(
path: Path,
dataSet: DataTree<T>,
format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
): Unit = withContext(Dispatchers.IO) {
if (!Files.exists(path)) {
Files.createDirectories(path)
} else if (!Files.isDirectory(path)) {
error("Can't write a node into file")
}
dataSet.forEach { (name, data) ->
val childPath = path.resolve(name.tokens.joinToString("/") { token -> token.toStringUnescaped() })
childPath.parent.createDirectories()
val envelope = data.toEnvelope(format)
if (envelopeFormat != null) {
writeEnvelopeFile(childPath, envelope, envelopeFormat)
} else {
writeEnvelopeDirectory(childPath, envelope)
}
}
dataSet.meta?.let { writeMetaFile(path, it) }
}
/**
* Write this [DataTree] as a zip archive
*/
@DFExperimental
public suspend fun <T : Any> IOPlugin.writeZip(
path: Path,
dataSet: DataTree<T>,
format: IOWriter<T>,
envelopeFormat: EnvelopeFormat? = null,
): Unit = withContext(Dispatchers.IO) {
if (path.exists()) error("Can't override existing zip data file $path")
val actualFile = if (path.extension == "zip") {
path
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fsProvider = FileSystemProvider.installedProviders().find { it.scheme == "jar" }
?: error("Zip file system provider not found")
//val fs = FileSystems.newFileSystem(actualFile, mapOf("create" to true))
val fs = fsProvider.newFileSystem(actualFile, mapOf("create" to true))
fs.use {
writeDataDirectory(fs.rootDirectories.first(), dataSet, format, envelopeFormat)
}
}

View File

@ -0,0 +1,73 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import space.kscience.dataforge.data.DataTree
import space.kscience.dataforge.data.DataTreeItem
import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.zip.ZipEntry
import java.util.zip.ZipOutputStream
private suspend fun <T : Any> ZipOutputStream.writeNode(
name: String,
treeItem: DataTreeItem<T>,
dataFormat: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
when (treeItem) {
is DataTreeItem.Leaf -> {
//TODO add directory-based envelope writer
val envelope = treeItem.data.toEnvelope(dataFormat)
val entry = ZipEntry(name)
putNextEntry(entry)
//TODO remove additional copy
val bytes = ByteArray {
writeWith(envelopeFormat, envelope)
}
write(bytes)
}
is DataTreeItem.Node -> {
val entry = ZipEntry("$name/")
putNextEntry(entry)
closeEntry()
treeItem.tree.items.forEach { (token, item) ->
val childName = "$name/$token"
writeNode(childName, item, dataFormat, envelopeFormat)
}
}
}
}
/**
* Write this [DataTree] as a zip archive
*/
@DFExperimental
public suspend fun <T : Any> DataTree<T>.writeZip(
path: Path,
format: IOFormat<T>,
envelopeFormat: EnvelopeFormat = TaggedEnvelopeFormat,
): Unit = withContext(Dispatchers.IO) {
val actualFile = if (path.toString().endsWith(".zip")) {
path
} else {
path.resolveSibling(path.fileName.toString() + ".zip")
}
val fos = Files.newOutputStream(
actualFile,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING
)
val zos = ZipOutputStream(fos)
zos.use {
it.writeNode("", DataTreeItem.Node(this@writeZip), format, envelopeFormat)
}
}

View File

@ -1,16 +1,18 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.wrap
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals
@OptIn(DFExperimental::class)
@OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class)
internal class CachingWorkspaceTest {
@Test
@ -22,7 +24,7 @@ internal class CachingWorkspaceTest {
data {
//statically initialize data
repeat(5) {
wrap("myData[$it]", it)
static("myData[$it]", it)
}
}
@ -35,10 +37,11 @@ internal class CachingWorkspaceTest {
}
}
@Suppress("UNUSED_VARIABLE")
val doSecond by task<Any> {
transformEach(
doFirst,
dependencyMeta = if (taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
dependencyMeta = if(taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
) { _, name, _ ->
secondCounter++
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
@ -50,15 +53,13 @@ internal class CachingWorkspaceTest {
val secondA = workspace.produce("doSecond")
val secondB = workspace.produce("doSecond", Meta { "flag" put true })
val secondC = workspace.produce("doSecond")
//use coroutineScope to wait for the result
coroutineScope {
first.launch(this)
secondA.launch(this)
secondB.launch(this)
first.startAll(this)
secondA.startAll(this)
secondB.startAll(this)
//repeat to check caching
secondC.launch(this)
secondC.startAll(this)
}
assertEquals(10, firstCounter)
assertEquals(10, secondCounter)
}

View File

@ -20,13 +20,13 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
val result: Data<Int> = selectedData.foldToData(0) { result, data ->
result + data.value
}
put("result", result)
data("result", result)
}
val singleData by task<Int> {
workspace.data.filterByType<Int>()["myData[12]"]?.let {
put("result", it)
data("result", it)
}
}
@ -47,7 +47,7 @@ class DataPropagationTest {
}
data {
repeat(100) {
wrap("myData[$it]", it)
static("myData[$it]", it)
}
}
}
@ -55,12 +55,12 @@ class DataPropagationTest {
@Test
fun testAllData() = runTest {
val node = testWorkspace.produce("Test.allData")
assertEquals(4950, node.content.asSequence().single().await())
assertEquals(4950, node.asSequence().single().await())
}
@Test
fun testSingleData() = runTest {
val node = testWorkspace.produce("Test.singleData")
assertEquals(12, node.content.asSequence().single().await())
assertEquals(12, node.asSequence().single().await())
}
}

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest
import kotlinx.io.Sink
import kotlinx.io.Source
@ -8,34 +9,38 @@ import kotlinx.io.writeString
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.Global
import space.kscience.dataforge.data.*
import space.kscience.dataforge.io.*
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.IOFormat
import space.kscience.dataforge.io.io
import space.kscience.dataforge.io.readEnvelopeFile
import space.kscience.dataforge.io.yaml.YamlPlugin
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import java.nio.file.Files
import kotlin.io.path.deleteExisting
import kotlin.io.path.fileSize
import kotlin.io.path.toPath
import kotlin.reflect.KType
import kotlin.reflect.typeOf
import kotlin.test.Test
import kotlin.test.assertEquals
class FileDataTest {
val dataNode = DataTree<String> {
branch("dir") {
wrap("a", "Some string") {
node("dir") {
static("a", "Some string") {
"content" put "Some string"
}
}
wrap("b", "root data")
// meta {
// "content" put "This is root meta node"
// }
static("b", "root data")
meta {
"content" put "This is root meta node"
}
}
object StringIOFormat : IOFormat<String> {
override val type: KType get() = typeOf<String>()
override fun writeTo(sink: Sink, obj: String) {
sink.writeString(obj)
@ -46,33 +51,29 @@ class FileDataTest {
@Test
@DFExperimental
fun testDataWriteRead() = runTest {
val io = Global.io
fun testDataWriteRead() = with(Global.io) {
val dir = Files.createTempDirectory("df_data_node")
io.writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val data = DataTree {
files(io, Name.EMPTY, dir)
runBlocking {
writeDataDirectory(dir, dataNode, StringIOFormat)
println(dir.toUri().toString())
val reconstructed = readDataDirectory(dir) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
val reconstructed = data.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
@Test
@DFExperimental
fun testZipWriteRead() = runTest {
val io = Global.io
val zip = Files.createTempFile("df_data_node", ".zip")
zip.deleteExisting()
io.writeZip(zip, dataNode, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = DataTree { files(io, Name.EMPTY, zip) }
.transform { (_, value) -> value.toByteArray().decodeToString() }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
with(Global.io) {
val zip = Files.createTempFile("df_data_node", ".zip")
dataNode.writeZip(zip, StringIOFormat)
println(zip.toUri().toString())
val reconstructed = readDataDirectory(zip) { _, _ -> StringIOFormat }
assertEquals(dataNode["dir.a"]?.meta?.get("content"), reconstructed["dir.a"]?.meta?.get("content"))
assertEquals(dataNode["b"]?.await(), reconstructed["b"]?.await())
}
}
@OptIn(DFExperimental::class)

View File

@ -3,7 +3,8 @@ package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.wrap
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
@ -16,17 +17,18 @@ class FileWorkspaceCacheTest {
data {
//statically initialize data
repeat(5) {
wrap("myData[$it]", it)
static("myData[$it]", it)
}
}
fileCache(Files.createTempDirectory("dataforge-temporary-cache"))
@Suppress("UNUSED_VARIABLE")
val echo by task<String> {
transformEach(dataByType<String>()) { arg, _, _ -> arg }
}
}
workspace.produce("echo").launch(this)
workspace.produce("echo").startAll(this)
}
}

View File

@ -27,8 +27,8 @@ public fun <P : Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory
override val tag: PluginTag = this@toFactory.tag
}
public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataTree<*> = runBlocking {
produce(task, block).content
public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet<Any> = runBlocking {
produce(task, block)
}
@OptIn(DFExperimental::class)
@ -62,14 +62,14 @@ internal class SimpleWorkspaceTest {
data {
//statically initialize data
repeat(100) {
wrap("myData[$it]", it)
static("myData[$it]", it)
}
}
val filterOne by task<Int> {
val name by taskMeta.string { error("Name field not defined") }
from(testPluginFactory) { test }[name]?.let { source: Data<Int> ->
put(name, source)
from(testPluginFactory) { test }.getByType<Int>(name)?.let { source ->
data(source.name, source.map { it })
}
}
@ -97,7 +97,7 @@ internal class SimpleWorkspaceTest {
val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r ->
l + r
}
put(data.name, newData)
data(data.name, newData)
}
}
@ -106,23 +106,23 @@ internal class SimpleWorkspaceTest {
val res = from(square).foldToData(0) { l, r ->
l + r.value
}
put("sum", res)
data("sum", res)
}
val averageByGroup by task<Int> {
val evenSum = workspace.data.filterByType<Int> { name, _, _ ->
val evenSum = workspace.data.filterByType<Int> { name, _ ->
name.toString().toInt() % 2 == 0
}.foldToData(0) { l, r ->
l + r.value
}
put("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _, _ ->
data("even", evenSum)
val oddSum = workspace.data.filterByType<Int> { name, _ ->
name.toString().toInt() % 2 == 1
}.foldToData(0) { l, r ->
l + r.value
}
put("odd", oddSum)
data("odd", oddSum)
}
val delta by task<Int> {
@ -132,7 +132,7 @@ internal class SimpleWorkspaceTest {
val res = even.combine(odd) { l, r ->
l - r
}
put("res", res)
data("res", res)
}
val customPipe by task<Int> {
@ -140,7 +140,7 @@ internal class SimpleWorkspaceTest {
val meta = data.meta.toMutableMeta().apply {
"newValue" put 22
}
put(data.name + "new", data.transform { (data.meta["value"].int ?: 0) + it })
data(data.name + "new", data.map { (data.meta["value"].int ?: 0) + it })
}
}
@ -159,7 +159,7 @@ internal class SimpleWorkspaceTest {
@Timeout(1)
fun testMetaPropagation() = runTest {
val node = workspace.produce("sum") { "testFlag" put true }
val res = node.single().await()
val res = node.asSequence().single().await()
}
@Test
@ -170,25 +170,20 @@ internal class SimpleWorkspaceTest {
}
@Test
fun testFullSquare() = runTest {
val result = workspace.produce("fullSquare")
result.forEach {
println(
"""
Name: ${it.name}
Meta: ${it.meta}
Data: ${it.data.await()}
""".trimIndent()
)
fun testFullSquare() {
runBlocking {
val node = workspace.produce("fullSquare")
println(node.toMeta())
}
}
@Test
fun testFilter() = runTest {
val node = workspace.produce("filterOne") {
"name" put "myData[12]"
fun testFilter() {
runBlocking {
val node = workspace.produce("filterOne") {
"name" put "myData[12]"
}
assertEquals(12, node.single().await())
}
assertEquals(12, node.single().await())
}
}