Reworked Data and Goal mechanics
This commit is contained in:
parent
43c18bcde7
commit
5921556254
@ -1,15 +1,17 @@
|
|||||||
package hep.dataforge.data
|
package hep.dataforge.data
|
||||||
|
|
||||||
|
import hep.dataforge.meta.EmptyMeta
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.MetaRepr
|
import hep.dataforge.meta.MetaRepr
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Deferred
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A data element characterized by its meta
|
* A data element characterized by its meta
|
||||||
*/
|
*/
|
||||||
interface Data<out T : Any> : MetaRepr {
|
interface Data<out T : Any> : Goal<T>, MetaRepr {
|
||||||
/**
|
/**
|
||||||
* Type marker for the data. The type is known before the calculation takes place so it could be checked.
|
* Type marker for the data. The type is known before the calculation takes place so it could be checked.
|
||||||
*/
|
*/
|
||||||
@ -19,52 +21,148 @@ interface Data<out T : Any> : MetaRepr {
|
|||||||
*/
|
*/
|
||||||
val meta: Meta
|
val meta: Meta
|
||||||
|
|
||||||
/**
|
|
||||||
* Lazy data value
|
|
||||||
*/
|
|
||||||
val task: Deferred<T>
|
|
||||||
|
|
||||||
override fun toMeta(): Meta = meta
|
override fun toMeta(): Meta = meta
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val TYPE = "data"
|
const val TYPE = "data"
|
||||||
|
|
||||||
fun <T : Any> of(type: KClass<out T>, goal: Deferred<T>, meta: Meta): Data<T> = DataImpl(type, goal, meta)
|
operator fun <T : Any> invoke(
|
||||||
|
type: KClass<out T>,
|
||||||
|
meta: Meta = EmptyMeta,
|
||||||
|
context: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
dependencies: Collection<Data<*>> = emptyList(),
|
||||||
|
block: suspend CoroutineScope.() -> T
|
||||||
|
): Data<T> = DynamicData(type, meta, context, dependencies, block)
|
||||||
|
|
||||||
inline fun <reified T : Any> of(goal: Deferred<T>, meta: Meta): Data<T> = of(T::class, goal, meta)
|
operator inline fun <reified T : Any> invoke(
|
||||||
|
meta: Meta = EmptyMeta,
|
||||||
|
context: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
dependencies: Collection<Data<*>> = emptyList(),
|
||||||
|
noinline block: suspend CoroutineScope.() -> T
|
||||||
|
): Data<T> = invoke(T::class, meta, context, dependencies, block)
|
||||||
|
|
||||||
fun <T : Any> of(name: String, type: KClass<out T>, goal: Deferred<T>, meta: Meta): Data<T> =
|
operator fun <T : Any> invoke(
|
||||||
NamedData(name, of(type, goal, meta))
|
name: String,
|
||||||
|
type: KClass<out T>,
|
||||||
|
meta: Meta = EmptyMeta,
|
||||||
|
context: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
dependencies: Collection<Data<*>> = emptyList(),
|
||||||
|
block: suspend CoroutineScope.() -> T
|
||||||
|
): Data<T> = NamedData(name, invoke(type, meta, context, dependencies, block))
|
||||||
|
|
||||||
inline fun <reified T : Any> of(name: String, goal: Deferred<T>, meta: Meta): Data<T> =
|
operator inline fun <reified T : Any> invoke(
|
||||||
of(name, T::class, goal, meta)
|
name: String,
|
||||||
|
meta: Meta = EmptyMeta,
|
||||||
|
context: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
dependencies: Collection<Data<*>> = emptyList(),
|
||||||
|
noinline block: suspend CoroutineScope.() -> T
|
||||||
|
): Data<T> =
|
||||||
|
invoke(name, T::class, meta, context, dependencies, block)
|
||||||
|
|
||||||
fun <T : Any> static(value: T, meta: Meta): Data<T> =
|
fun <T : Any> static(value: T, meta: Meta = EmptyMeta): Data<T> =
|
||||||
DataImpl(value::class, CompletableDeferred(value), meta)
|
StaticData(value, meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fun <R : Any, T : R> Data<T>.cast(type: KClass<R>): Data<R> {
|
||||||
|
return object : Data<R> by this {
|
||||||
|
override val type: KClass<out R> = type
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Upcast a [Data] to a supertype
|
* Upcast a [Data] to a supertype
|
||||||
*/
|
*/
|
||||||
inline fun <reified R : Any, reified T : R> Data<T>.cast(): Data<R> {
|
inline fun <reified R : Any, T : R> Data<T>.cast(): Data<R> = cast(R::class)
|
||||||
return Data.of(R::class, task, meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun <R : Any, T : R> Data<T>.cast(type: KClass<R>): Data<R> {
|
|
||||||
return Data.of(type, task, meta)
|
|
||||||
}
|
|
||||||
|
|
||||||
suspend fun <T : Any> Data<T>.await(): T = task.await()
|
class DynamicData<T : Any>(
|
||||||
|
|
||||||
/**
|
|
||||||
* Generic Data implementation
|
|
||||||
*/
|
|
||||||
private class DataImpl<out T : Any>(
|
|
||||||
override val type: KClass<out T>,
|
override val type: KClass<out T>,
|
||||||
override val task: Deferred<T>,
|
override val meta: Meta = EmptyMeta,
|
||||||
override val meta: Meta
|
context: CoroutineContext = EmptyCoroutineContext,
|
||||||
) : Data<T>
|
dependencies: Collection<Data<*>> = emptyList(),
|
||||||
|
block: suspend CoroutineScope.() -> T
|
||||||
|
) : Data<T>, DynamicGoal<T>(context, dependencies, block)
|
||||||
|
|
||||||
|
class StaticData<T : Any>(
|
||||||
|
value: T,
|
||||||
|
override val meta: Meta = EmptyMeta
|
||||||
|
) : Data<T>, StaticGoal<T>(value) {
|
||||||
|
override val type: KClass<out T> get() = value::class
|
||||||
|
}
|
||||||
|
|
||||||
class NamedData<out T : Any>(val name: String, data: Data<T>) : Data<T> by data
|
class NamedData<out T : Any>(val name: String, data: Data<T>) : Data<T> by data
|
||||||
|
|
||||||
|
fun <T : Any, R : Any> Data<T>.pipe(
|
||||||
|
outputType: KClass<out R>,
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
meta: Meta = this.meta,
|
||||||
|
block: suspend CoroutineScope.(T) -> R
|
||||||
|
): Data<R> = DynamicData(outputType, meta, coroutineContext, listOf(this)) {
|
||||||
|
block(await(this))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a data pipe
|
||||||
|
*/
|
||||||
|
inline fun <T : Any, reified R : Any> Data<T>.pipe(
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
meta: Meta = this.meta,
|
||||||
|
noinline block: suspend CoroutineScope.(T) -> R
|
||||||
|
): Data<R> = DynamicData(R::class, meta, coroutineContext, listOf(this)) {
|
||||||
|
block(await(this))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a joined data.
|
||||||
|
*/
|
||||||
|
inline fun <T : Any, reified R : Any> Collection<Data<T>>.join(
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
meta: Meta,
|
||||||
|
noinline block: suspend CoroutineScope.(Collection<T>) -> R
|
||||||
|
): Data<R> = DynamicData(
|
||||||
|
R::class,
|
||||||
|
meta,
|
||||||
|
coroutineContext,
|
||||||
|
this
|
||||||
|
) {
|
||||||
|
block(map { this.run { it.await(this) } })
|
||||||
|
}
|
||||||
|
|
||||||
|
fun <K, T : Any, R : Any> Map<K, Data<T>>.join(
|
||||||
|
outputType: KClass<out R>,
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
meta: Meta,
|
||||||
|
block: suspend CoroutineScope.(Map<K, T>) -> R
|
||||||
|
): DynamicData<R> = DynamicData(
|
||||||
|
outputType,
|
||||||
|
meta,
|
||||||
|
coroutineContext,
|
||||||
|
this.values
|
||||||
|
) {
|
||||||
|
block(mapValues { it.value.await(this) })
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A joining of multiple data into a single one
|
||||||
|
* @param K type of the map key
|
||||||
|
* @param T type of the input goal
|
||||||
|
* @param R type of the result goal
|
||||||
|
*/
|
||||||
|
inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.join(
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
meta: Meta,
|
||||||
|
noinline block: suspend CoroutineScope.(Map<K, T>) -> R
|
||||||
|
): DynamicData<R> = DynamicData(
|
||||||
|
R::class,
|
||||||
|
meta,
|
||||||
|
coroutineContext,
|
||||||
|
this.values
|
||||||
|
) {
|
||||||
|
block(mapValues { it.value.await(this) })
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -49,19 +49,19 @@ val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.v
|
|||||||
/**
|
/**
|
||||||
* Start computation for all goals in data node
|
* Start computation for all goals in data node
|
||||||
*/
|
*/
|
||||||
fun DataNode<*>.startAll(): Unit = items.values.forEach {
|
fun DataNode<*>.startAll(scope: CoroutineScope): Unit = items.values.forEach {
|
||||||
when (it) {
|
when (it) {
|
||||||
is DataItem.Node<*> -> it.value.startAll()
|
is DataItem.Node<*> -> it.value.startAll(scope)
|
||||||
is DataItem.Leaf<*> -> it.value.task.start()
|
is DataItem.Leaf<*> -> it.value.start(scope)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun DataNode<*>.joinAll(scope: CoroutineScope): Job = scope.launch {
|
fun DataNode<*>.joinAll(scope: CoroutineScope): Job = scope.launch {
|
||||||
startAll()
|
startAll(scope)
|
||||||
items.forEach {
|
items.forEach {
|
||||||
when (val value = it.value) {
|
when (val value = it.value) {
|
||||||
is DataItem.Node -> value.value.joinAll(this).join()
|
is DataItem.Node -> value.value.joinAll(this).join()
|
||||||
is DataItem.Leaf -> value.value.task.await()
|
is DataItem.Leaf -> value.value.await(scope)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
117
dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt
Normal file
117
dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
package hep.dataforge.data
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
|
import kotlin.coroutines.CoroutineContext
|
||||||
|
import kotlin.coroutines.EmptyCoroutineContext
|
||||||
|
|
||||||
|
interface Goal<out T> {
|
||||||
|
val dependencies: Collection<Goal<*>>
|
||||||
|
/**
|
||||||
|
* Returns current running coroutine if the goal is started
|
||||||
|
*/
|
||||||
|
val result: Deferred<T>?
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get ongoing computation or start a new one.
|
||||||
|
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
|
||||||
|
*/
|
||||||
|
fun startAsync(scope: CoroutineScope): Deferred<T>
|
||||||
|
|
||||||
|
suspend fun CoroutineScope.await(): T = startAsync(this).await()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the computation
|
||||||
|
*/
|
||||||
|
fun reset()
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fun Goal<*>.start(scope: CoroutineScope): Job = startAsync(scope)
|
||||||
|
|
||||||
|
val Goal<*>.isComplete get() = result?.isCompleted ?: false
|
||||||
|
|
||||||
|
suspend fun <T> Goal<T>.await(scope: CoroutineScope): T = scope.await()
|
||||||
|
|
||||||
|
open class StaticGoal<T>(val value: T) : Goal<T> {
|
||||||
|
override val dependencies: Collection<Goal<*>> get() = emptyList()
|
||||||
|
override val result: Deferred<T> = CompletableDeferred(value)
|
||||||
|
|
||||||
|
override fun startAsync(scope: CoroutineScope): Deferred<T> = result
|
||||||
|
|
||||||
|
override fun reset() {
|
||||||
|
//doNothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
open class DynamicGoal<T>(
|
||||||
|
val coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
override val dependencies: Collection<Goal<*>> = emptyList(),
|
||||||
|
val block: suspend CoroutineScope.() -> T
|
||||||
|
) : Goal<T> {
|
||||||
|
|
||||||
|
final override var result: Deferred<T>? = null
|
||||||
|
private set
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get ongoing computation or start a new one.
|
||||||
|
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
|
||||||
|
*/
|
||||||
|
override fun startAsync(scope: CoroutineScope): Deferred<T> {
|
||||||
|
val startedDependencies = this.dependencies.map { goal ->
|
||||||
|
goal.startAsync(scope)
|
||||||
|
}
|
||||||
|
return result ?: scope.async(coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)) {
|
||||||
|
startedDependencies.forEach { deferred ->
|
||||||
|
deferred.invokeOnCompletion { error ->
|
||||||
|
if (error != null) cancel(CancellationException("Dependency $deferred failed with error: ${error.message}"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
block()
|
||||||
|
}.also { result = it }
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the computation
|
||||||
|
*/
|
||||||
|
override fun reset() {
|
||||||
|
result?.cancel()
|
||||||
|
result = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a one-to-one goal based on existing goal
|
||||||
|
*/
|
||||||
|
fun <T, R> Goal<T>.pipe(
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
block: suspend CoroutineScope.(T) -> R
|
||||||
|
): Goal<R> = DynamicGoal(coroutineContext, listOf(this)) {
|
||||||
|
block(await(this))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a joining goal.
|
||||||
|
*/
|
||||||
|
fun <T, R> Collection<Goal<T>>.join(
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
block: suspend CoroutineScope.(Collection<T>) -> R
|
||||||
|
): Goal<R> = DynamicGoal(coroutineContext, this) {
|
||||||
|
block(map { this.run { it.await(this) } })
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A joining goal for a map
|
||||||
|
* @param K type of the map key
|
||||||
|
* @param T type of the input goal
|
||||||
|
* @param R type of the result goal
|
||||||
|
*/
|
||||||
|
fun <K, T, R> Map<K, Goal<T>>.join(
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
|
block: suspend CoroutineScope.(Map<K, T>) -> R
|
||||||
|
): Goal<R> = DynamicGoal(coroutineContext, this.values) {
|
||||||
|
block(mapValues { it.value.await(this) })
|
||||||
|
}
|
||||||
|
|
@ -6,8 +6,6 @@ import hep.dataforge.meta.MetaBuilder
|
|||||||
import hep.dataforge.meta.builder
|
import hep.dataforge.meta.builder
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
import hep.dataforge.names.toName
|
import hep.dataforge.names.toName
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.Deferred
|
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
|
||||||
@ -78,7 +76,6 @@ class JoinGroupBuilder<T : Any, R : Any>(val actionMeta: Meta) {
|
|||||||
class JoinAction<T : Any, R : Any>(
|
class JoinAction<T : Any, R : Any>(
|
||||||
val inputType: KClass<T>,
|
val inputType: KClass<T>,
|
||||||
val outputType: KClass<R>,
|
val outputType: KClass<R>,
|
||||||
val scope: CoroutineScope,
|
|
||||||
private val action: JoinGroupBuilder<T, R>.() -> Unit
|
private val action: JoinGroupBuilder<T, R>.() -> Unit
|
||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
@ -89,15 +86,13 @@ class JoinAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
val laminate = Laminate(group.meta, meta)
|
val laminate = Laminate(group.meta, meta)
|
||||||
|
|
||||||
val goalMap: Map<Name, Deferred<T>> = group.node.dataSequence().associate { it.first to it.second.task }
|
val dataMap = group.node.dataSequence().associate { it }
|
||||||
|
|
||||||
val groupName: String = group.name;
|
val groupName: String = group.name;
|
||||||
|
|
||||||
val env = ActionEnv(groupName.toName(), laminate.builder())
|
val env = ActionEnv(groupName.toName(), laminate.builder())
|
||||||
|
|
||||||
val goal = goalMap.join(scope) { group.result.invoke(env, it) }
|
val res: DynamicData<R> = dataMap.join(outputType, meta = laminate) { group.result.invoke(env, it) }
|
||||||
|
|
||||||
val res = Data.of(outputType, goal, env.meta)
|
|
||||||
|
|
||||||
set(env.name, res)
|
set(env.name, res)
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@ package hep.dataforge.data
|
|||||||
|
|
||||||
import hep.dataforge.meta.*
|
import hep.dataforge.meta.*
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
class ActionEnv(val name: Name, val meta: Meta)
|
class ActionEnv(val name: Name, val meta: Meta)
|
||||||
@ -26,7 +25,6 @@ class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
|
|||||||
class PipeAction<T : Any, R : Any>(
|
class PipeAction<T : Any, R : Any>(
|
||||||
val inputType: KClass<T>,
|
val inputType: KClass<T>,
|
||||||
val outputType: KClass<R>,
|
val outputType: KClass<R>,
|
||||||
val scope: CoroutineScope,
|
|
||||||
private val block: PipeBuilder<T, R>.() -> Unit
|
private val block: PipeBuilder<T, R>.() -> Unit
|
||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
@ -45,10 +43,9 @@ class PipeAction<T : Any, R : Any>(
|
|||||||
val newName = builder.name
|
val newName = builder.name
|
||||||
//getting new meta
|
//getting new meta
|
||||||
val newMeta = builder.meta.seal()
|
val newMeta = builder.meta.seal()
|
||||||
//creating a goal with custom context if provided
|
val newData = data.pipe(outputType, meta = newMeta) { builder.result(env, it) }
|
||||||
val goal = data.task.pipe(scope) { builder.result(env, it) }
|
|
||||||
//setting the data node
|
//setting the data node
|
||||||
this[newName] = Data.of(outputType, goal, newMeta)
|
this[newName] = newData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -56,9 +53,8 @@ class PipeAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
|
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
scope: CoroutineScope,
|
|
||||||
noinline action: PipeBuilder<T, R>.() -> Unit
|
noinline action: PipeBuilder<T, R>.() -> Unit
|
||||||
): DataNode<R> = PipeAction(T::class, R::class, scope, action).invoke(this, meta)
|
): DataNode<R> = PipeAction(T::class, R::class, action).invoke(this, meta)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,7 +6,6 @@ import hep.dataforge.meta.MetaBuilder
|
|||||||
import hep.dataforge.meta.builder
|
import hep.dataforge.meta.builder
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
import hep.dataforge.names.toName
|
import hep.dataforge.names.toName
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlin.collections.set
|
import kotlin.collections.set
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
@ -36,7 +35,6 @@ class SplitBuilder<T : Any, R : Any>(val name: Name, val meta: Meta) {
|
|||||||
class SplitAction<T : Any, R : Any>(
|
class SplitAction<T : Any, R : Any>(
|
||||||
val inputType: KClass<T>,
|
val inputType: KClass<T>,
|
||||||
val outputType: KClass<R>,
|
val outputType: KClass<R>,
|
||||||
val scope: CoroutineScope,
|
|
||||||
private val action: SplitBuilder<T, R>.() -> Unit
|
private val action: SplitBuilder<T, R>.() -> Unit
|
||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
@ -57,9 +55,7 @@ class SplitAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
rule(env)
|
rule(env)
|
||||||
|
|
||||||
val goal = data.task.pipe(scope) { env.result(it) }
|
val res = data.pipe(outputType, meta = env.meta) { env.result(it) }
|
||||||
|
|
||||||
val res = Data.of(outputType, goal, env.meta)
|
|
||||||
set(env.name, res)
|
set(env.name, res)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,66 +0,0 @@
|
|||||||
package hep.dataforge.data
|
|
||||||
|
|
||||||
import kotlinx.coroutines.*
|
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new [Deferred] with given [dependencies] and execution [block]. The block takes monitor as parameter.
|
|
||||||
*
|
|
||||||
* **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested.
|
|
||||||
*/
|
|
||||||
fun <T> goal(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
|
||||||
dependencies: Collection<Job> = emptyList(),
|
|
||||||
block: suspend CoroutineScope.() -> T
|
|
||||||
): Deferred<T> = scope.async(
|
|
||||||
coroutineContext + CoroutineMonitor() + Dependencies(dependencies),
|
|
||||||
start = CoroutineStart.LAZY
|
|
||||||
) {
|
|
||||||
dependencies.forEach { job ->
|
|
||||||
job.start()
|
|
||||||
job.invokeOnCompletion { error ->
|
|
||||||
if (error != null) cancel(CancellationException("Dependency $job failed with error: ${error.message}"))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return@async block()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a one-to-one goal based on existing goal
|
|
||||||
*/
|
|
||||||
fun <T, R> Deferred<T>.pipe(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
|
||||||
block: suspend CoroutineScope.(T) -> R
|
|
||||||
): Deferred<R> = goal(scope, coroutineContext, listOf(this)) {
|
|
||||||
block(await())
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a joining goal.
|
|
||||||
* @param scope the scope for resulting goal. By default use first goal in list
|
|
||||||
*/
|
|
||||||
fun <T, R> Collection<Deferred<T>>.join(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
|
||||||
block: suspend CoroutineScope.(Collection<T>) -> R
|
|
||||||
): Deferred<R> = goal(scope, coroutineContext, this) {
|
|
||||||
block(map { it.await() })
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A joining goal for a map
|
|
||||||
* @param K type of the map key
|
|
||||||
* @param T type of the input goal
|
|
||||||
* @param R type of the result goal
|
|
||||||
*/
|
|
||||||
fun <K, T, R> Map<K, Deferred<T>>.join(
|
|
||||||
scope: CoroutineScope,
|
|
||||||
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
|
||||||
block: suspend CoroutineScope.(Map<K, T>) -> R
|
|
||||||
): Deferred<R> = goal(scope, coroutineContext, this.values) {
|
|
||||||
block(mapValues { it.value.await() })
|
|
||||||
}
|
|
||||||
|
|
@ -1,14 +1,23 @@
|
|||||||
package hep.dataforge.data
|
package hep.dataforge.data
|
||||||
|
|
||||||
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.names.NameToken
|
import hep.dataforge.names.NameToken
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Deferred
|
import kotlinx.coroutines.Deferred
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
import kotlin.reflect.full.isSubclassOf
|
import kotlin.reflect.full.isSubclassOf
|
||||||
|
|
||||||
fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<R>): Data<R>? {
|
@Suppress("UNCHECKED_CAST")
|
||||||
return if (type.isSubclassOf(type)) {
|
fun <T : Any, R : Any> Data<T>.safeCast(type: KClass<out R>): Data<R>? {
|
||||||
@Suppress("UNCHECKED_CAST")
|
return if (this.type.isSubclassOf(type)) {
|
||||||
Data.of(type, task as Deferred<R>, meta)
|
return object : Data<R> {
|
||||||
|
override val meta: Meta get() = this@safeCast.meta
|
||||||
|
override val dependencies: Collection<Goal<*>> get() = this@safeCast.dependencies
|
||||||
|
override val result: Deferred<R>? get() = this@safeCast.result as Deferred<R>
|
||||||
|
override fun startAsync(scope: CoroutineScope): Deferred<R> = this@safeCast.startAsync(scope) as Deferred<R>
|
||||||
|
override fun reset() = this@safeCast.reset()
|
||||||
|
override val type: KClass<out R> = type
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
|
@ -5,4 +5,4 @@ import kotlinx.coroutines.runBlocking
|
|||||||
/**
|
/**
|
||||||
* Block the thread and get data content
|
* Block the thread and get data content
|
||||||
*/
|
*/
|
||||||
fun <T : Any> Data<T>.get(): T = runBlocking { task.await() }
|
fun <T : Any> Data<T>.get(): T = runBlocking { await() }
|
@ -85,8 +85,7 @@ class TaskBuilder(val name: String) {
|
|||||||
val context = this
|
val context = this
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
scope = context
|
|
||||||
) { block(context) }
|
) { block(context) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -103,8 +102,7 @@ class TaskBuilder(val name: String) {
|
|||||||
val context = this
|
val context = this
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
scope = context
|
|
||||||
) {
|
) {
|
||||||
//TODO automatically append task meta
|
//TODO automatically append task meta
|
||||||
result = { data ->
|
result = { data ->
|
||||||
@ -125,8 +123,7 @@ class TaskBuilder(val name: String) {
|
|||||||
action(from, to) {
|
action(from, to) {
|
||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
scope = this
|
|
||||||
) { block(this@action) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -144,7 +141,6 @@ class TaskBuilder(val name: String) {
|
|||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class,
|
||||||
scope = context,
|
|
||||||
action = {
|
action = {
|
||||||
result(
|
result(
|
||||||
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
|
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
|
||||||
@ -167,8 +163,7 @@ class TaskBuilder(val name: String) {
|
|||||||
action(from, to) {
|
action(from, to) {
|
||||||
SplitAction(
|
SplitAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class
|
||||||
scope = this
|
|
||||||
) { block(this@action) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,6 @@ import hep.dataforge.io.MetaFormat
|
|||||||
import hep.dataforge.meta.EmptyMeta
|
import hep.dataforge.meta.EmptyMeta
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.async
|
|
||||||
import kotlinx.coroutines.coroutineScope
|
import kotlinx.coroutines.coroutineScope
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.io.nio.asInput
|
import kotlinx.io.nio.asInput
|
||||||
@ -58,7 +57,7 @@ suspend fun <T : Any> Context.readData(
|
|||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
val goal = async {
|
Data(type, externalMeta ?: EmptyMeta){
|
||||||
withContext(Dispatchers.IO) {
|
withContext(Dispatchers.IO) {
|
||||||
format.run {
|
format.run {
|
||||||
Files.newByteChannel(path, StandardOpenOption.READ)
|
Files.newByteChannel(path, StandardOpenOption.READ)
|
||||||
@ -67,6 +66,5 @@ suspend fun <T : Any> Context.readData(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Data.of(type, goal, externalMeta ?: EmptyMeta)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user