From 5921556254b800a86bcfe9f4d1547e6aafe2a7f4 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Fri, 9 Aug 2019 16:31:59 +0300 Subject: [PATCH] Reworked Data and Goal mechanics --- .../kotlin/hep/dataforge/data/Data.kt | 160 ++++++++++++++---- .../kotlin/hep/dataforge/data/DataNode.kt | 10 +- .../kotlin/hep/dataforge/data/Goal.kt | 117 +++++++++++++ .../kotlin/hep/dataforge/data/JoinAction.kt | 9 +- .../kotlin/hep/dataforge/data/PipeAction.kt | 10 +- .../kotlin/hep/dataforge/data/SplitAction.kt | 6 +- .../kotlin/hep/dataforge/data/goals.kt | 66 -------- .../kotlin/hep/dataforge/data/CastDataNode.kt | 17 +- .../kotlin/hep/dataforge/data/_data.kt | 2 +- .../hep/dataforge/workspace/TaskBuilder.kt | 13 +- .../hep/dataforge/workspace/fileData.kt | 4 +- 11 files changed, 276 insertions(+), 138 deletions(-) create mode 100644 dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt delete mode 100644 dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt index 6bbd06f8..9b0d9027 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Data.kt @@ -1,15 +1,17 @@ package hep.dataforge.data +import hep.dataforge.meta.EmptyMeta import hep.dataforge.meta.Meta import hep.dataforge.meta.MetaRepr -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred +import kotlinx.coroutines.CoroutineScope +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext import kotlin.reflect.KClass /** * A data element characterized by its meta */ -interface Data : MetaRepr { +interface Data : Goal, MetaRepr { /** * Type marker for the data. The type is known before the calculation takes place so it could be checked. */ @@ -19,52 +21,148 @@ interface Data : MetaRepr { */ val meta: Meta - /** - * Lazy data value - */ - val task: Deferred - override fun toMeta(): Meta = meta companion object { const val TYPE = "data" - fun of(type: KClass, goal: Deferred, meta: Meta): Data = DataImpl(type, goal, meta) + operator fun invoke( + type: KClass, + meta: Meta = EmptyMeta, + context: CoroutineContext = EmptyCoroutineContext, + dependencies: Collection> = emptyList(), + block: suspend CoroutineScope.() -> T + ): Data = DynamicData(type, meta, context, dependencies, block) - inline fun of(goal: Deferred, meta: Meta): Data = of(T::class, goal, meta) + operator inline fun invoke( + meta: Meta = EmptyMeta, + context: CoroutineContext = EmptyCoroutineContext, + dependencies: Collection> = emptyList(), + noinline block: suspend CoroutineScope.() -> T + ): Data = invoke(T::class, meta, context, dependencies, block) - fun of(name: String, type: KClass, goal: Deferred, meta: Meta): Data = - NamedData(name, of(type, goal, meta)) + operator fun invoke( + name: String, + type: KClass, + meta: Meta = EmptyMeta, + context: CoroutineContext = EmptyCoroutineContext, + dependencies: Collection> = emptyList(), + block: suspend CoroutineScope.() -> T + ): Data = NamedData(name, invoke(type, meta, context, dependencies, block)) - inline fun of(name: String, goal: Deferred, meta: Meta): Data = - of(name, T::class, goal, meta) + operator inline fun invoke( + name: String, + meta: Meta = EmptyMeta, + context: CoroutineContext = EmptyCoroutineContext, + dependencies: Collection> = emptyList(), + noinline block: suspend CoroutineScope.() -> T + ): Data = + invoke(name, T::class, meta, context, dependencies, block) - fun static(value: T, meta: Meta): Data = - DataImpl(value::class, CompletableDeferred(value), meta) + fun static(value: T, meta: Meta = EmptyMeta): Data = + StaticData(value, meta) + } +} + + +fun Data.cast(type: KClass): Data { + return object : Data by this { + override val type: KClass = type } } /** * Upcast a [Data] to a supertype */ -inline fun Data.cast(): Data { - return Data.of(R::class, task, meta) -} +inline fun Data.cast(): Data = cast(R::class) -fun Data.cast(type: KClass): Data { - return Data.of(type, task, meta) -} -suspend fun Data.await(): T = task.await() - -/** - * Generic Data implementation - */ -private class DataImpl( +class DynamicData( override val type: KClass, - override val task: Deferred, - override val meta: Meta -) : Data + override val meta: Meta = EmptyMeta, + context: CoroutineContext = EmptyCoroutineContext, + dependencies: Collection> = emptyList(), + block: suspend CoroutineScope.() -> T +) : Data, DynamicGoal(context, dependencies, block) + +class StaticData( + value: T, + override val meta: Meta = EmptyMeta +) : Data, StaticGoal(value) { + override val type: KClass get() = value::class +} class NamedData(val name: String, data: Data) : Data by data +fun Data.pipe( + outputType: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = this.meta, + block: suspend CoroutineScope.(T) -> R +): Data = DynamicData(outputType, meta, coroutineContext, listOf(this)) { + block(await(this)) +} + + +/** + * Create a data pipe + */ +inline fun Data.pipe( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta = this.meta, + noinline block: suspend CoroutineScope.(T) -> R +): Data = DynamicData(R::class, meta, coroutineContext, listOf(this)) { + block(await(this)) +} + +/** + * Create a joined data. + */ +inline fun Collection>.join( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta, + noinline block: suspend CoroutineScope.(Collection) -> R +): Data = DynamicData( + R::class, + meta, + coroutineContext, + this +) { + block(map { this.run { it.await(this) } }) +} + +fun Map>.join( + outputType: KClass, + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta, + block: suspend CoroutineScope.(Map) -> R +): DynamicData = DynamicData( + outputType, + meta, + coroutineContext, + this.values +) { + block(mapValues { it.value.await(this) }) +} + + +/** + * A joining of multiple data into a single one + * @param K type of the map key + * @param T type of the input goal + * @param R type of the result goal + */ +inline fun Map>.join( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + meta: Meta, + noinline block: suspend CoroutineScope.(Map) -> R +): DynamicData = DynamicData( + R::class, + meta, + coroutineContext, + this.values +) { + block(mapValues { it.value.await(this) }) +} + + diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt index 08c5af0c..a407b512 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/DataNode.kt @@ -49,19 +49,19 @@ val DataItem?.data: Data? get() = (this as? DataItem.Leaf)?.v /** * Start computation for all goals in data node */ -fun DataNode<*>.startAll(): Unit = items.values.forEach { +fun DataNode<*>.startAll(scope: CoroutineScope): Unit = items.values.forEach { when (it) { - is DataItem.Node<*> -> it.value.startAll() - is DataItem.Leaf<*> -> it.value.task.start() + is DataItem.Node<*> -> it.value.startAll(scope) + is DataItem.Leaf<*> -> it.value.start(scope) } } fun DataNode<*>.joinAll(scope: CoroutineScope): Job = scope.launch { - startAll() + startAll(scope) items.forEach { when (val value = it.value) { is DataItem.Node -> value.value.joinAll(this).join() - is DataItem.Leaf -> value.value.task.await() + is DataItem.Leaf -> value.value.await(scope) } } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt new file mode 100644 index 00000000..54bb743e --- /dev/null +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/Goal.kt @@ -0,0 +1,117 @@ +package hep.dataforge.data + +import kotlinx.coroutines.* +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +interface Goal { + val dependencies: Collection> + /** + * Returns current running coroutine if the goal is started + */ + val result: Deferred? + + /** + * Get ongoing computation or start a new one. + * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. + */ + fun startAsync(scope: CoroutineScope): Deferred + + suspend fun CoroutineScope.await(): T = startAsync(this).await() + + /** + * Reset the computation + */ + fun reset() + + companion object { + + } +} + +fun Goal<*>.start(scope: CoroutineScope): Job = startAsync(scope) + +val Goal<*>.isComplete get() = result?.isCompleted ?: false + +suspend fun Goal.await(scope: CoroutineScope): T = scope.await() + +open class StaticGoal(val value: T) : Goal { + override val dependencies: Collection> get() = emptyList() + override val result: Deferred = CompletableDeferred(value) + + override fun startAsync(scope: CoroutineScope): Deferred = result + + override fun reset() { + //doNothing + } +} + +open class DynamicGoal( + val coroutineContext: CoroutineContext = EmptyCoroutineContext, + override val dependencies: Collection> = emptyList(), + val block: suspend CoroutineScope.() -> T +) : Goal { + + final override var result: Deferred? = null + private set + + /** + * Get ongoing computation or start a new one. + * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. + */ + override fun startAsync(scope: CoroutineScope): Deferred { + val startedDependencies = this.dependencies.map { goal -> + goal.startAsync(scope) + } + return result ?: scope.async(coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)) { + startedDependencies.forEach { deferred -> + deferred.invokeOnCompletion { error -> + if (error != null) cancel(CancellationException("Dependency $deferred failed with error: ${error.message}")) + } + } + block() + }.also { result = it } + } + + /** + * Reset the computation + */ + override fun reset() { + result?.cancel() + result = null + } +} + +/** + * Create a one-to-one goal based on existing goal + */ +fun Goal.pipe( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.(T) -> R +): Goal = DynamicGoal(coroutineContext, listOf(this)) { + block(await(this)) +} + +/** + * Create a joining goal. + */ +fun Collection>.join( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.(Collection) -> R +): Goal = DynamicGoal(coroutineContext, this) { + block(map { this.run { it.await(this) } }) +} + +/** + * A joining goal for a map + * @param K type of the map key + * @param T type of the input goal + * @param R type of the result goal + */ +fun Map>.join( + coroutineContext: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.(Map) -> R +): Goal = DynamicGoal(coroutineContext, this.values) { + block(mapValues { it.value.await(this) }) +} + diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt index 636d4a87..5f0f5845 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/JoinAction.kt @@ -6,8 +6,6 @@ import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.builder import hep.dataforge.names.Name import hep.dataforge.names.toName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred import kotlin.reflect.KClass @@ -78,7 +76,6 @@ class JoinGroupBuilder(val actionMeta: Meta) { class JoinAction( val inputType: KClass, val outputType: KClass, - val scope: CoroutineScope, private val action: JoinGroupBuilder.() -> Unit ) : Action { @@ -89,15 +86,13 @@ class JoinAction( val laminate = Laminate(group.meta, meta) - val goalMap: Map> = group.node.dataSequence().associate { it.first to it.second.task } + val dataMap = group.node.dataSequence().associate { it } val groupName: String = group.name; val env = ActionEnv(groupName.toName(), laminate.builder()) - val goal = goalMap.join(scope) { group.result.invoke(env, it) } - - val res = Data.of(outputType, goal, env.meta) + val res: DynamicData = dataMap.join(outputType, meta = laminate) { group.result.invoke(env, it) } set(env.name, res) } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt index 7e2099bb..c84e5a13 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/PipeAction.kt @@ -2,7 +2,6 @@ package hep.dataforge.data import hep.dataforge.meta.* import hep.dataforge.names.Name -import kotlinx.coroutines.CoroutineScope import kotlin.reflect.KClass class ActionEnv(val name: Name, val meta: Meta) @@ -26,7 +25,6 @@ class PipeBuilder(var name: Name, var meta: MetaBuilder) { class PipeAction( val inputType: KClass, val outputType: KClass, - val scope: CoroutineScope, private val block: PipeBuilder.() -> Unit ) : Action { @@ -45,10 +43,9 @@ class PipeAction( val newName = builder.name //getting new meta val newMeta = builder.meta.seal() - //creating a goal with custom context if provided - val goal = data.task.pipe(scope) { builder.result(env, it) } + val newData = data.pipe(outputType, meta = newMeta) { builder.result(env, it) } //setting the data node - this[newName] = Data.of(outputType, goal, newMeta) + this[newName] = newData } } } @@ -56,9 +53,8 @@ class PipeAction( inline fun DataNode.pipe( meta: Meta, - scope: CoroutineScope, noinline action: PipeBuilder.() -> Unit -): DataNode = PipeAction(T::class, R::class, scope, action).invoke(this, meta) +): DataNode = PipeAction(T::class, R::class, action).invoke(this, meta) diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt index d2545973..be9764a6 100644 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt +++ b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/SplitAction.kt @@ -6,7 +6,6 @@ import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.builder import hep.dataforge.names.Name import hep.dataforge.names.toName -import kotlinx.coroutines.CoroutineScope import kotlin.collections.set import kotlin.reflect.KClass @@ -36,7 +35,6 @@ class SplitBuilder(val name: Name, val meta: Meta) { class SplitAction( val inputType: KClass, val outputType: KClass, - val scope: CoroutineScope, private val action: SplitBuilder.() -> Unit ) : Action { @@ -57,9 +55,7 @@ class SplitAction( rule(env) - val goal = data.task.pipe(scope) { env.result(it) } - - val res = Data.of(outputType, goal, env.meta) + val res = data.pipe(outputType, meta = env.meta) { env.result(it) } set(env.name, res) } } diff --git a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt b/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt deleted file mode 100644 index 82693c96..00000000 --- a/dataforge-data/src/commonMain/kotlin/hep/dataforge/data/goals.kt +++ /dev/null @@ -1,66 +0,0 @@ -package hep.dataforge.data - -import kotlinx.coroutines.* -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext - -/** - * Create a new [Deferred] with given [dependencies] and execution [block]. The block takes monitor as parameter. - * - * **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested. - */ -fun goal( - scope: CoroutineScope, - coroutineContext: CoroutineContext = EmptyCoroutineContext, - dependencies: Collection = emptyList(), - block: suspend CoroutineScope.() -> T -): Deferred = scope.async( - coroutineContext + CoroutineMonitor() + Dependencies(dependencies), - start = CoroutineStart.LAZY -) { - dependencies.forEach { job -> - job.start() - job.invokeOnCompletion { error -> - if (error != null) cancel(CancellationException("Dependency $job failed with error: ${error.message}")) - } - } - return@async block() -} - -/** - * Create a one-to-one goal based on existing goal - */ -fun Deferred.pipe( - scope: CoroutineScope, - coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.(T) -> R -): Deferred = goal(scope, coroutineContext, listOf(this)) { - block(await()) -} - -/** - * Create a joining goal. - * @param scope the scope for resulting goal. By default use first goal in list - */ -fun Collection>.join( - scope: CoroutineScope, - coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.(Collection) -> R -): Deferred = goal(scope, coroutineContext, this) { - block(map { it.await() }) -} - -/** - * A joining goal for a map - * @param K type of the map key - * @param T type of the input goal - * @param R type of the result goal - */ -fun Map>.join( - scope: CoroutineScope, - coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend CoroutineScope.(Map) -> R -): Deferred = goal(scope, coroutineContext, this.values) { - block(mapValues { it.value.await() }) -} - diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CastDataNode.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CastDataNode.kt index 512af4a1..324864fc 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CastDataNode.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/CastDataNode.kt @@ -1,14 +1,23 @@ package hep.dataforge.data +import hep.dataforge.meta.Meta import hep.dataforge.names.NameToken +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlin.reflect.KClass import kotlin.reflect.full.isSubclassOf -fun Data.safeCast(type: KClass): Data? { - return if (type.isSubclassOf(type)) { - @Suppress("UNCHECKED_CAST") - Data.of(type, task as Deferred, meta) +@Suppress("UNCHECKED_CAST") +fun Data.safeCast(type: KClass): Data? { + return if (this.type.isSubclassOf(type)) { + return object : Data { + override val meta: Meta get() = this@safeCast.meta + override val dependencies: Collection> get() = this@safeCast.dependencies + override val result: Deferred? get() = this@safeCast.result as Deferred + override fun startAsync(scope: CoroutineScope): Deferred = this@safeCast.startAsync(scope) as Deferred + override fun reset() = this@safeCast.reset() + override val type: KClass = type + } } else { null } diff --git a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_data.kt b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_data.kt index df6cc33e..00c9e656 100644 --- a/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_data.kt +++ b/dataforge-data/src/jvmMain/kotlin/hep/dataforge/data/_data.kt @@ -5,4 +5,4 @@ import kotlinx.coroutines.runBlocking /** * Block the thread and get data content */ -fun Data.get(): T = runBlocking { task.await() } \ No newline at end of file +fun Data.get(): T = runBlocking { await() } \ No newline at end of file diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt index 0b439032..175c4d87 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/TaskBuilder.kt @@ -85,8 +85,7 @@ class TaskBuilder(val name: String) { val context = this PipeAction( inputType = T::class, - outputType = R::class, - scope = context + outputType = R::class ) { block(context) } } } @@ -103,8 +102,7 @@ class TaskBuilder(val name: String) { val context = this PipeAction( inputType = T::class, - outputType = R::class, - scope = context + outputType = R::class ) { //TODO automatically append task meta result = { data -> @@ -125,8 +123,7 @@ class TaskBuilder(val name: String) { action(from, to) { JoinAction( inputType = T::class, - outputType = R::class, - scope = this + outputType = R::class ) { block(this@action) } } } @@ -144,7 +141,6 @@ class TaskBuilder(val name: String) { JoinAction( inputType = T::class, outputType = R::class, - scope = context, action = { result( actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous" @@ -167,8 +163,7 @@ class TaskBuilder(val name: String) { action(from, to) { SplitAction( inputType = T::class, - outputType = R::class, - scope = this + outputType = R::class ) { block(this@action) } } } diff --git a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt index 9d2efc95..25350006 100644 --- a/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt +++ b/dataforge-workspace/src/jvmMain/kotlin/hep/dataforge/workspace/fileData.kt @@ -9,7 +9,6 @@ import hep.dataforge.io.MetaFormat import hep.dataforge.meta.EmptyMeta import hep.dataforge.meta.Meta import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.withContext import kotlinx.io.nio.asInput @@ -58,7 +57,7 @@ suspend fun Context.readData( } else { null } - val goal = async { + Data(type, externalMeta ?: EmptyMeta){ withContext(Dispatchers.IO) { format.run { Files.newByteChannel(path, StandardOpenOption.READ) @@ -67,6 +66,5 @@ suspend fun Context.readData( } } } - Data.of(type, goal, externalMeta ?: EmptyMeta) } } \ No newline at end of file