Cleanup after task refactoring

This commit is contained in:
Alexander Nozik 2021-01-26 16:13:31 +03:00
parent 80d3a64cdf
commit 2291072e26
16 changed files with 60 additions and 41 deletions

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
package hep.dataforge.actions
import hep.dataforge.data.DataSet
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.*
@ -31,3 +33,10 @@ public infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I,
}
}
@DFExperimental
public suspend fun <T : Any, R : Any> DataSet<T>.transformWith(
action: Action<T, R>,
meta: Meta = Meta.EMPTY,
scope: CoroutineScope? = null,
): DataSet<R> = action.execute(this, meta, scope)

View File

@ -1,5 +1,6 @@
package hep.dataforge.data
package hep.dataforge.actions
import hep.dataforge.data.*
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import kotlinx.coroutines.CoroutineScope

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
package hep.dataforge.actions
import hep.dataforge.data.Data
import hep.dataforge.data.StaticData
import hep.dataforge.meta.isEmpty
import hep.dataforge.misc.Named
import hep.dataforge.names.Name

View File

@ -1,5 +1,6 @@
package hep.dataforge.data
package hep.dataforge.actions
import hep.dataforge.data.*
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
@ -44,17 +45,6 @@ public class ReduceGroupBuilder<T : Any, R : Any>(
}
}
// /**
// * Add a single fixed group to grouping rules
// */
// public fun group(groupName: String, filter: DataMapper, action: JoinGroup<T, R>.() -> Unit) {
// groupRules += { node ->
// listOf(
// JoinGroup<T, R>(groupName, node.filter(filter)).apply(action)
// )
// }
// }
public fun group(
groupName: String,
filter: suspend (Name, Data<T>) -> Boolean,

View File

@ -1,5 +1,6 @@
package hep.dataforge.data
package hep.dataforge.actions
import hep.dataforge.data.*
import hep.dataforge.meta.Laminate
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
import hep.dataforge.actions.Action
import hep.dataforge.actions.NamedData
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name
import hep.dataforge.names.startsWith

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
import hep.dataforge.actions.NamedData
import hep.dataforge.actions.named
import hep.dataforge.meta.Meta
import hep.dataforge.meta.set
import hep.dataforge.names.*

View File

@ -1,5 +1,6 @@
package hep.dataforge.data
import hep.dataforge.actions.NamedData
import hep.dataforge.meta.DFExperimental
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
import hep.dataforge.actions.NamedData
import hep.dataforge.actions.named
import hep.dataforge.meta.*
import hep.dataforge.misc.Type
import hep.dataforge.names.*

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
import hep.dataforge.actions.NamedData
import hep.dataforge.actions.named
import hep.dataforge.meta.DFExperimental
import hep.dataforge.names.*
import kotlinx.coroutines.flow.Flow

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
import hep.dataforge.actions.NamedData
import hep.dataforge.actions.named
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.seal
@ -11,36 +13,41 @@ import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.reflect.KClass
/**
* Lazily transform this data to another data. By convention [block] should not use external data (be pure).
* @param coroutineContext additional [CoroutineContext] elements used for data computation.
* @param meta for the resulting data. By default equals input data.
* @param block the transformation itself
*/
public fun <T : Any, R : Any> Data<T>.map(
outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
block: suspend (T) -> R,
): Data<R> = LazyData(outputType, meta, coroutineContext, listOf(this)) {
): LazyData<R> = LazyData(outputType, meta, coroutineContext, listOf(this)) {
block(await())
}
/**
* Create a data mapping
* See [map]
*/
public inline fun <T : Any, reified R : Any> Data<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
crossinline block: suspend (T) -> R,
): Data<R> = LazyData(R::class, meta, coroutineContext, listOf(this)) {
): LazyData<R> = LazyData(R::class, meta, coroutineContext, listOf(this)) {
block(await())
}
/**
* Combine this data with the other data using [block]
* Combine this data with the other data using [block]. See [map] for other details
*/
public inline fun <T1 : Any, T2: Any, reified R : Any> Data<T1>.combine(
public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
other: Data<T2>,
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta,
crossinline block: suspend (left: T1, right: T2) -> R,
): Data<R> = LazyData(R::class, meta, coroutineContext, listOf(this,other)) {
): LazyData<R> = LazyData(R::class, meta, coroutineContext, listOf(this, other)) {
block(await(), other.await())
}
@ -48,19 +55,19 @@ public inline fun <T1 : Any, T2: Any, reified R : Any> Data<T1>.combine(
//data collection operations
/**
* Create a joined data.
* Lazily reduce a collection of [Data] to a single data.
*/
public inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
noinline block: suspend (Collection<T>) -> R,
): Data<R> = LazyData(
crossinline block: suspend (Collection<T>) -> R,
): LazyData<R> = LazyData(
R::class,
meta,
coroutineContext,
this
) {
block(map { run { it.await() } })
block(map { it.await() })
}
public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
@ -79,7 +86,7 @@ public fun <K, T : Any, R : Any> Map<K, Data<T>>.reduceToData(
/**
* A joining of multiple data into a single one
* Lazily reduce a [Map] of [Data] with any static key.
* @param K type of the map key
* @param T type of the input goal
* @param R type of the result goal
@ -97,8 +104,10 @@ public inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduceToData(
block(mapValues { it.value.await() })
}
//flow operations
/**
* Transform a [Flow] of [NamedData] to a single [Data]. Execution restrictions are removed for inner [Flow]
* Transform a [Flow] of [NamedData] to a single [Data].
*/
public suspend fun <T : Any, R : Any> Flow<NamedData<T>>.reduceToData(
outputType: KClass<out R>,
@ -114,8 +123,6 @@ public suspend fun <T : Any, R : Any> Flow<NamedData<T>>.reduceToData(
transformation(this)
}
//flow operations
public suspend inline fun <T : Any, reified R : Any> Flow<NamedData<T>>.reduceToData(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = Meta.EMPTY,
@ -145,7 +152,7 @@ public suspend fun <T : Any, R : Any> DataSet<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
metaTransform: MetaBuilder.() -> Unit = {},
block: suspend (T) -> R,
): DataSet<R> = DataTree(outputType) {
): DataTree<R> = DataTree(outputType) {
populate(
flow().map {
val newMeta = it.meta.toMutableMeta().apply(metaTransform).seal()
@ -158,7 +165,7 @@ public suspend inline fun <T : Any, reified R : Any> DataSet<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
noinline metaTransform: MetaBuilder.() -> Unit = {},
noinline block: suspend (T) -> R,
): DataSet<R> = map(R::class, coroutineContext, metaTransform, block)
): DataTree<R> = map(R::class, coroutineContext, metaTransform, block)
public suspend fun <T : Any> DataSet<T>.forEach(block: suspend (NamedData<T>) -> Unit) {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }

View File

@ -1,5 +1,7 @@
package hep.dataforge.data
import hep.dataforge.actions.NamedData
import hep.dataforge.actions.named
import hep.dataforge.meta.DFExperimental
import hep.dataforge.names.*
import kotlinx.coroutines.flow.Flow

View File

@ -1,5 +1,6 @@
package hep.dataforge.data
import hep.dataforge.actions.MapAction
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals

View File

@ -15,8 +15,6 @@ import kotlin.reflect.KClass
@Type(TYPE)
public interface Task<out T : Any> : Described {
public val type: KClass<out T>
/**
* Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result
@ -36,8 +34,8 @@ public class TaskResultBuilder<T : Any>(
public val workspace: Workspace,
public val taskName: Name,
public val taskMeta: Meta,
private val dataSync: DataSetBuilder<T>,
) : DataSetBuilder<T> by dataSync
private val dataDrop: DataSetBuilder<T>,
) : DataSetBuilder<T> by dataDrop
/**
* Create a [Task] that composes a result using [builder]. Only data from the workspace could be used.
@ -50,8 +48,6 @@ public fun <T : Any> Task(
builder: suspend TaskResultBuilder<T>.() -> Unit,
): Task<T> = object : Task<T> {
override val type: KClass<out T> = resultType
override val descriptor: ItemDescriptor? = descriptor
override suspend fun execute(
@ -60,7 +56,7 @@ public fun <T : Any> Task(
taskMeta: Meta,
): TaskResult<T> = withContext(GoalExecutionRestriction() + workspace.goalLogger) {
//TODO use safe builder and check for external data on add and detects cycles
val dataset = DataTree(type) {
val dataset = DataTree(resultType) {
TaskResultBuilder(workspace,taskName, taskMeta, this).apply { builder() }
}
workspace.internalize(dataset, taskName, taskMeta)

View File

@ -1,7 +1,7 @@
package hep.dataforge.workspace
import hep.dataforge.actions.NamedData
import hep.dataforge.data.Data
import hep.dataforge.data.NamedData
import hep.dataforge.meta.Meta
import hep.dataforge.names.Name

View File

@ -1,5 +1,6 @@
package hep.dataforge.workspace
import hep.dataforge.actions.get
import hep.dataforge.context.*
import hep.dataforge.data.*
import hep.dataforge.meta.*