refactoring of data transformations

This commit is contained in:
Alexander Nozik 2024-01-04 18:59:57 +03:00
parent 6ba189fa34
commit 8f3c2f3950
13 changed files with 136 additions and 69 deletions

View File

@ -6,10 +6,12 @@
- Wasm artifacts
- Add automatic MetaConverter for serializeable objects
- Add Meta and MutableMeta delegates for convertable and serializeable
- Meta mapping for data.
### Changed
- Descriptor `children` renamed to `nodes`
- `MetaConverter` now inherits `MetaSpec` (former `Specifiction`). So `MetaConverter` could be used more universally.
- Meta copy and modification now use lightweight non-observable meta builders.
### Deprecated
- `node(key,converter)` in favor of `serializable` delegate

View File

@ -12,7 +12,7 @@ import kotlin.reflect.full.findAnnotation
@DFExperimental
public val KClass<*>.dfId: String
public val KClass<*>.dfType: String
get() = findAnnotation<DfType>()?.id ?: simpleName ?: ""
/**
@ -20,13 +20,13 @@ public val KClass<*>.dfId: String
*/
@DFExperimental
public inline fun <reified T : Any> Provider.provideByType(name: String): T? {
val target = T::class.dfId
val target = T::class.dfType
return provide(target, name)
}
@DFExperimental
public inline fun <reified T : Any> Provider.top(): Map<Name, T> {
val target = T::class.dfId
val target = T::class.dfType
return top(target)
}
@ -35,15 +35,15 @@ public inline fun <reified T : Any> Provider.top(): Map<Name, T> {
*/
@DFExperimental
public inline fun <reified T : Any> Context.gather(inherit: Boolean = true): Map<Name, T> =
gather<T>(T::class.dfId, inherit)
gather<T>(T::class.dfType, inherit)
@DFExperimental
public inline fun <reified T : Any> PluginBuilder.provides(items: Map<Name, T>) {
provides(T::class.dfId, items)
provides(T::class.dfType, items)
}
@DFExperimental
public inline fun <reified T : Any> PluginBuilder.provides(vararg items: Named) {
provides(T::class.dfId, *items)
provides(T::class.dfType, *items)
}

View File

@ -7,30 +7,28 @@ import space.kscience.dataforge.misc.DFExperimental
/**
* A simple data transformation on a data node. Actions should avoid doing actual dependency evaluation in [execute].
*/
public interface Action<in T : Any, out R : Any> {
public fun interface Action<in T : Any, out R : Any> {
/**
* Transform the data in the node, producing a new node. By default, it is assumed that all calculations are lazy
* so not actual computation is started at this moment.
*/
public fun execute(dataSet: DataSet<T>, meta: Meta = Meta.EMPTY): DataSet<R>
public fun execute(dataSet: DataSet<T>, meta: Meta): DataSet<R>
public companion object
}
/**
* A convenience method to transform data using given [action]
*/
public fun <T : Any, R : Any> DataSet<T>.transform(action: Action<T, R>, meta: Meta = Meta.EMPTY): DataSet<R> =
action.execute(this, meta)
/**
* Action composition. The result is terminal if one of its parts is terminal
*/
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> {
// TODO introduce composite action and add optimize by adding action to the list
return object : Action<T, R> {
override fun execute(
dataSet: DataSet<T>,
meta: Meta,
): DataSet<R> = action.execute(this@then.execute(dataSet, meta), meta)
}
}
public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): Action<T, R> =
Action<T, R> { dataSet, meta -> action.execute(this@then.execute(dataSet, meta), meta) }
@DFExperimental
public operator fun <T : Any, R : Any> Action<T, R>.invoke(

View File

@ -87,7 +87,6 @@ public class StaticData<T : Any>(
public inline fun <reified T : Any> Data(value: T, meta: Meta = Meta.EMPTY): StaticData<T> =
StaticData(typeOf<T>(), value, meta)
@Suppress("FunctionName")
@DFInternal
public fun <T : Any> Data(
type: KType,
@ -98,7 +97,6 @@ public fun <T : Any> Data(
): Data<T> = LazyData(type, meta, context, dependencies, block)
@OptIn(DFInternal::class)
@Suppress("FunctionName")
public inline fun <reified T : Any> Data(
meta: Meta = Meta.EMPTY,
context: CoroutineContext = EmptyCoroutineContext,

View File

@ -0,0 +1,23 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.copy
private class MetaMaskData<T>(val origin: Data<T>, override val meta: Meta) : Data<T> by origin
/**
* A data with overriden meta. It reflects original data computed state.
*/
public fun <T> Data<T>.withMeta(newMeta: Meta): Data<T> = if (this is MetaMaskData) {
MetaMaskData(origin, newMeta)
} else {
MetaMaskData(this, newMeta)
}
/**
* Create a new [Data] with the same computation, but different meta. The meta is created by applying [block] to
* the existing data meta.
*/
public inline fun <T> Data<T>.mapMeta(block: MutableMeta.() -> Unit): Data<T> = withMeta(meta.copy(block))

View File

@ -1,11 +1,9 @@
package space.kscience.dataforge.data
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.seal
import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.NameToken
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KType
@ -28,8 +26,8 @@ public suspend fun <T : Any> NamedData<T>.awaitWithMeta(): NamedValueWithMeta<T>
* @param block the transformation itself
*/
public inline fun <T : Any, reified R : Any> Data<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (T) -> R,
): Data<R> = Data(meta, coroutineContext, listOf(this)) {
block(await())
@ -40,8 +38,8 @@ public inline fun <T : Any, reified R : Any> Data<T>.map(
*/
public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
other: Data<T2>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
crossinline block: suspend (left: T1, right: T2) -> R,
): Data<R> = Data(meta, coroutineContext, listOf(this, other)) {
block(await(), other.await())
@ -50,12 +48,22 @@ public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
//data collection operations
@PublishedApi
internal fun Iterable<Data<*>>.joinMeta(): Meta = Meta {
var counter = 0
forEach { data ->
val inputIndex = (data as? NamedData)?.name?.toString() ?: (counter++).toString()
val token = NameToken("data", inputIndex)
set(token, data.meta)
}
}
/**
* Lazily reduce a collection of [Data] to a single data.
*/
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline block: suspend (List<ValueWithMeta<T>>) -> R,
): Data<R> = Data(
meta,
@ -65,11 +73,19 @@ public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
block(map { it.awaitWithMeta() })
}
@PublishedApi
internal fun Map<*, Data<*>>.joinMeta(): Meta = Meta {
forEach { (key, data) ->
val token = NameToken("data", key.toString())
set(token, data.meta)
}
}
@DFInternal
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
block: suspend (Map<K, ValueWithMeta<T>>) -> R,
): Data<R> = Data(
outputType,
@ -87,8 +103,8 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
* @param R type of the result goal
*/
public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline block: suspend (Map<K, ValueWithMeta<T>>) -> R,
): Data<R> = Data(
meta,
@ -103,8 +119,8 @@ public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
@DFInternal
public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
): Data<R> = Data(
outputType,
@ -117,20 +133,20 @@ public inline fun <T : Any, R : Any> Iterable<Data<T>>.reduceToData(
@OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Collection<ValueWithMeta<T>>) -> R,
): Data<R> = reduceToData(typeOf<R>(), coroutineContext, meta) {
): Data<R> = reduceToData(typeOf<R>(), meta, coroutineContext) {
transformation(it)
}
public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: ValueWithMeta<T>) -> R,
): Data<R> = reduceToData(
coroutineContext, meta
meta, coroutineContext
) {
it.fold(initial) { acc, t -> block(acc, t) }
}
@ -141,8 +157,8 @@ public inline fun <T : Any, reified R : Any> Iterable<Data<T>>.foldToData(
@DFInternal
public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
outputType: KType,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
): Data<R> = Data(
outputType,
@ -155,10 +171,10 @@ public inline fun <T : Any, R : Any> Iterable<NamedData<T>>.reduceNamedToData(
@OptIn(DFInternal::class)
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Collection<NamedValueWithMeta<T>>) -> R,
): Data<R> = reduceNamedToData(typeOf<R>(), coroutineContext, meta) {
): Data<R> = reduceNamedToData(typeOf<R>(), meta, coroutineContext) {
transformation(it)
}
@ -167,11 +183,11 @@ public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.reduceNamedT
*/
public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = reduceNamedToData(
coroutineContext, meta
meta, coroutineContext
) {
it.fold(initial) { acc, t -> block(acc, t) }
}
@ -181,8 +197,8 @@ public inline fun <T : Any, reified R : Any> Iterable<NamedData<T>>.foldNamedToD
@DFInternal
public suspend fun <T : Any, R : Any> DataSet<T>.map(
outputType: KType,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = DataTree<R>(outputType) {
forEach {
@ -196,10 +212,10 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
@OptIn(DFInternal::class)
public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline metaTransform: MutableMeta.() -> Unit = {},
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline block: suspend (NamedValueWithMeta<T>) -> R,
): DataTree<R> = map(typeOf<R>(), coroutineContext, metaTransform, block)
): DataTree<R> = map(typeOf<R>(), metaTransform, coroutineContext, block)
public inline fun <T : Any> DataSet<T>.forEach(block: (NamedData<T>) -> Unit) {
for (d in this) {
@ -207,15 +223,25 @@ public inline fun <T : Any> DataSet<T>.forEach(block: (NamedData<T>) -> Unit) {
}
}
// DataSet reduction
@PublishedApi
internal fun DataSet<*>.joinMeta(): Meta = Meta {
forEach { (key, data) ->
val token = NameToken("data", key.toString())
set(token, data.meta)
}
}
public inline fun <T : Any, reified R : Any> DataSet<T>.reduceToData(
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline transformation: suspend (Iterable<NamedValueWithMeta<T>>) -> R,
): Data<R> = asIterable().reduceNamedToData(coroutineContext, meta, transformation)
): Data<R> = asIterable().reduceNamedToData(meta, coroutineContext, transformation)
public inline fun <T : Any, reified R : Any> DataSet<T>.foldToData(
initial: R,
meta: Meta = joinMeta(),
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
crossinline block: suspend (result: R, data: NamedValueWithMeta<T>) -> R,
): Data<R> = asIterable().foldNamedToData(initial, coroutineContext, meta, block)
): Data<R> = asIterable().foldNamedToData(initial, meta, coroutineContext, block)

View File

@ -370,7 +370,8 @@ public fun MutableMeta.append(key: String, value: Value): Unit = append(Name.par
/**
* Create a mutable copy of this meta. The copy is created even if the Meta is already mutable
*/
public fun Meta.toMutableMeta(): ObservableMutableMeta = MutableMetaImpl(value, items)
public fun Meta.toMutableMeta(): MutableMeta =
MutableMeta { update(this@toMutableMeta) } //MutableMetaImpl(value, items)
public fun Meta.asMutableMeta(): MutableMeta = (this as? MutableMeta) ?: toMutableMeta()
@ -385,12 +386,14 @@ public inline fun ObservableMutableMeta(builder: MutableMeta.() -> Unit = {}): O
/**
* Create a copy of this [Meta], optionally applying the given [block].
* The listeners of the original Config are not retained.
* Create a read-only copy of this [Meta]. [modification] is an optional modification applied to [Meta] on copy.
*
* The copy does not reflect changes of the initial Meta.
*/
public inline fun Meta.copy(block: MutableMeta.() -> Unit = {}): Meta =
toMutableMeta().apply(block)
public inline fun Meta.copy(modification: MutableMeta.() -> Unit = {}): Meta = Meta {
update(this@copy)
modification()
}
private class MutableMetaWithDefault(
val source: MutableMeta, val default: MetaProvider, val rootName: Name,

View File

@ -101,11 +101,6 @@ internal class MetaBuilder(
override fun hashCode(): Int = Meta.hashCode(this)
}
/**
* Create a read-only meta.
*/
public inline fun Meta(builder: MutableMeta.() -> Unit): Meta =
MetaBuilder().apply(builder).seal()
/**
* Create an immutable meta.
@ -113,6 +108,11 @@ public inline fun Meta(builder: MutableMeta.() -> Unit): Meta =
public inline fun SealedMeta(builder: MutableMeta.() -> Unit): SealedMeta =
MetaBuilder().apply(builder).seal()
/**
* Create a read-only meta.
*/
public inline fun Meta(builder: MutableMeta.() -> Unit): Meta = SealedMeta(builder)
/**
* Create an empty meta mutable meta.
*/

View File

@ -6,6 +6,3 @@ package space.kscience.dataforge.misc
@MustBeDocumented
@Target(AnnotationTarget.CLASS)
public annotation class DfType(val id: String)
@Deprecated("Replace with DfType", replaceWith = ReplaceWith("DfType"))
public typealias DfId = DfType

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.actions.Action
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
@ -68,7 +69,7 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
}
/**
* Perform a lazy mapping task using given [selector] and [action]. The meta of resulting
* Perform a lazy mapping task using given [selector] and one-to-one [action].
* TODO move selector to receiver with multi-receivers
*
* @param selector a workspace data selector. Could be either task selector or initial data selector.
@ -77,7 +78,7 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
* @param action process individual data asynchronously.
*/
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFrom(
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.transformEach(
selector: DataSelector<T>,
dependencyMeta: Meta = defaultDependencyMeta,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
@ -89,7 +90,7 @@ public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFr
dataMetaTransform(data.name)
}
val res = data.map(workspace.context.coroutineContext, meta) {
val res = data.map(meta, workspace.context.coroutineContext) {
action(it, data.name, meta)
}
@ -97,4 +98,23 @@ public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFr
}
}
/**
* Set given [dataSet] as a task result.
*/
public fun <T : Any> TaskResultBuilder<T>.result(dataSet: DataSet<T>) {
node(Name.EMPTY, dataSet)
}
/**
* Use provided [action] to fill the result
*/
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.actionFrom(
selector: DataSelector<T>,
action: Action<T,R>,
dependencyMeta: Meta = defaultDependencyMeta,
) {
node(Name.EMPTY, action.execute(from(selector,dependencyMeta), dependencyMeta))
}

View File

@ -31,7 +31,7 @@ internal class CachingWorkspaceTest {
inMemoryCache()
val doFirst by task<Any> {
pipeFrom(allData) { _, name, _ ->
transformEach(allData) { _, name, _ ->
firstCounter++
println("Done first on $name with flag=${taskMeta["flag"].boolean}")
}
@ -39,7 +39,7 @@ internal class CachingWorkspaceTest {
@Suppress("UNUSED_VARIABLE")
val doSecond by task<Any> {
pipeFrom(
transformEach(
doFirst,
dependencyMeta = if(taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
) { _, name, _ ->

View File

@ -24,7 +24,7 @@ class FileWorkspaceCacheTest {
@Suppress("UNUSED_VARIABLE")
val echo by task<String> {
pipeFrom(dataByType<String>()) { arg, _, _ -> arg }
transformEach(dataByType<String>()) { arg, _, _ -> arg }
}
}

View File

@ -37,7 +37,7 @@ internal object TestPlugin : WorkspacePlugin() {
val test by task {
// type is inferred
pipeFrom(dataByType<Int>()) { arg, _, _ ->
transformEach(dataByType<Int>()) { arg, _, _ ->
logger.info { "Test: $arg" }
arg
}
@ -74,7 +74,7 @@ internal class SimpleWorkspaceTest {
}
val square by task<Int> {
pipeFrom(dataByType<Int>()) { arg, name, meta ->
transformEach(dataByType<Int>()) { arg, name, meta ->
if (meta["testFlag"].boolean == true) {
println("Side effect")
}
@ -84,7 +84,7 @@ internal class SimpleWorkspaceTest {
}
val linear by task<Int> {
pipeFrom(dataByType<Int>()) { arg, name, _ ->
transformEach(dataByType<Int>()) { arg, name, _ ->
workspace.logger.info { "Starting linear on $name" }
arg * 2 + 1
}