Some refactoring in goal mechanics
This commit is contained in:
parent
65633bbd0d
commit
43c18bcde7
@ -3,7 +3,7 @@ plugins {
|
|||||||
id("scientifik.publish") version "0.1.4" apply false
|
id("scientifik.publish") version "0.1.4" apply false
|
||||||
}
|
}
|
||||||
|
|
||||||
val dataforgeVersion by extra("0.1.3-dev-10")
|
val dataforgeVersion by extra("0.1.3-dev-11")
|
||||||
|
|
||||||
val bintrayRepo by extra("dataforge")
|
val bintrayRepo by extra("dataforge")
|
||||||
val githubProject by extra("dataforge-core")
|
val githubProject by extra("dataforge-core")
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package hep.dataforge.data
|
package hep.dataforge.data
|
||||||
|
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.names.Name
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple data transformation on a data node
|
* A simple data transformation on a data node
|
||||||
@ -34,19 +33,3 @@ infix fun <T : Any, I : Any, R : Any> Action<T, I>.then(action: Action<I, R>): A
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
///**
|
|
||||||
// * An action that performs the same transformation on each of input data nodes. Null results are ignored.
|
|
||||||
// * The transformation is non-suspending because it is lazy.
|
|
||||||
// */
|
|
||||||
//class PipeAction<in T : Any, out R : Any>(val transform: (Name, Data<T>, Meta) -> Data<R>?) : Action<T, R> {
|
|
||||||
// override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> = DataNode.build {
|
|
||||||
// node.data().forEach { (name, data) ->
|
|
||||||
// val res = transform(name, data, meta)
|
|
||||||
// if (res != null) {
|
|
||||||
// set(name, res)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
|
@ -6,9 +6,8 @@ import hep.dataforge.meta.MetaBuilder
|
|||||||
import hep.dataforge.meta.builder
|
import hep.dataforge.meta.builder
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
import hep.dataforge.names.toName
|
import hep.dataforge.names.toName
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Deferred
|
import kotlinx.coroutines.Deferred
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
|
||||||
@ -79,7 +78,7 @@ class JoinGroupBuilder<T : Any, R : Any>(val actionMeta: Meta) {
|
|||||||
class JoinAction<T : Any, R : Any>(
|
class JoinAction<T : Any, R : Any>(
|
||||||
val inputType: KClass<T>,
|
val inputType: KClass<T>,
|
||||||
val outputType: KClass<R>,
|
val outputType: KClass<R>,
|
||||||
val context: CoroutineContext = EmptyCoroutineContext,
|
val scope: CoroutineScope,
|
||||||
private val action: JoinGroupBuilder<T, R>.() -> Unit
|
private val action: JoinGroupBuilder<T, R>.() -> Unit
|
||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
@ -96,7 +95,7 @@ class JoinAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
val env = ActionEnv(groupName.toName(), laminate.builder())
|
val env = ActionEnv(groupName.toName(), laminate.builder())
|
||||||
|
|
||||||
val goal = goalMap.join(context) { group.result.invoke(env, it) }
|
val goal = goalMap.join(scope) { group.result.invoke(env, it) }
|
||||||
|
|
||||||
val res = Data.of(outputType, goal, env.meta)
|
val res = Data.of(outputType, goal, env.meta)
|
||||||
|
|
||||||
|
@ -2,8 +2,7 @@ package hep.dataforge.data
|
|||||||
|
|
||||||
import hep.dataforge.meta.*
|
import hep.dataforge.meta.*
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
import kotlin.coroutines.CoroutineContext
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
class ActionEnv(val name: Name, val meta: Meta)
|
class ActionEnv(val name: Name, val meta: Meta)
|
||||||
@ -27,7 +26,7 @@ class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
|
|||||||
class PipeAction<T : Any, R : Any>(
|
class PipeAction<T : Any, R : Any>(
|
||||||
val inputType: KClass<T>,
|
val inputType: KClass<T>,
|
||||||
val outputType: KClass<R>,
|
val outputType: KClass<R>,
|
||||||
val context: CoroutineContext = EmptyCoroutineContext,
|
val scope: CoroutineScope,
|
||||||
private val block: PipeBuilder<T, R>.() -> Unit
|
private val block: PipeBuilder<T, R>.() -> Unit
|
||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
@ -47,7 +46,7 @@ class PipeAction<T : Any, R : Any>(
|
|||||||
//getting new meta
|
//getting new meta
|
||||||
val newMeta = builder.meta.seal()
|
val newMeta = builder.meta.seal()
|
||||||
//creating a goal with custom context if provided
|
//creating a goal with custom context if provided
|
||||||
val goal = data.task.pipe(context) { builder.result(env, it) }
|
val goal = data.task.pipe(scope) { builder.result(env, it) }
|
||||||
//setting the data node
|
//setting the data node
|
||||||
this[newName] = Data.of(outputType, goal, newMeta)
|
this[newName] = Data.of(outputType, goal, newMeta)
|
||||||
}
|
}
|
||||||
@ -57,9 +56,9 @@ class PipeAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
|
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe(
|
||||||
meta: Meta,
|
meta: Meta,
|
||||||
context: CoroutineContext = EmptyCoroutineContext,
|
scope: CoroutineScope,
|
||||||
noinline action: PipeBuilder<T, R>.() -> Unit
|
noinline action: PipeBuilder<T, R>.() -> Unit
|
||||||
): DataNode<R> = PipeAction(T::class, R::class, context, action).invoke(this, meta)
|
): DataNode<R> = PipeAction(T::class, R::class, scope, action).invoke(this, meta)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,9 +6,8 @@ import hep.dataforge.meta.MetaBuilder
|
|||||||
import hep.dataforge.meta.builder
|
import hep.dataforge.meta.builder
|
||||||
import hep.dataforge.names.Name
|
import hep.dataforge.names.Name
|
||||||
import hep.dataforge.names.toName
|
import hep.dataforge.names.toName
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlin.collections.set
|
import kotlin.collections.set
|
||||||
import kotlin.coroutines.CoroutineContext
|
|
||||||
import kotlin.coroutines.EmptyCoroutineContext
|
|
||||||
import kotlin.reflect.KClass
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
|
||||||
@ -37,7 +36,7 @@ class SplitBuilder<T : Any, R : Any>(val name: Name, val meta: Meta) {
|
|||||||
class SplitAction<T : Any, R : Any>(
|
class SplitAction<T : Any, R : Any>(
|
||||||
val inputType: KClass<T>,
|
val inputType: KClass<T>,
|
||||||
val outputType: KClass<R>,
|
val outputType: KClass<R>,
|
||||||
val context: CoroutineContext = EmptyCoroutineContext,
|
val scope: CoroutineScope,
|
||||||
private val action: SplitBuilder<T, R>.() -> Unit
|
private val action: SplitBuilder<T, R>.() -> Unit
|
||||||
) : Action<T, R> {
|
) : Action<T, R> {
|
||||||
|
|
||||||
@ -58,7 +57,7 @@ class SplitAction<T : Any, R : Any>(
|
|||||||
|
|
||||||
rule(env)
|
rule(env)
|
||||||
|
|
||||||
val goal = data.task.pipe(context) { env.result(it) }
|
val goal = data.task.pipe(scope) { env.result(it) }
|
||||||
|
|
||||||
val res = Data.of(outputType, goal, env.meta)
|
val res = Data.of(outputType, goal, env.meta)
|
||||||
set(env.name, res)
|
set(env.name, res)
|
||||||
|
@ -10,11 +10,12 @@ import kotlin.coroutines.EmptyCoroutineContext
|
|||||||
* **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested.
|
* **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested.
|
||||||
*/
|
*/
|
||||||
fun <T> goal(
|
fun <T> goal(
|
||||||
context: CoroutineContext = EmptyCoroutineContext,
|
scope: CoroutineScope,
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
dependencies: Collection<Job> = emptyList(),
|
dependencies: Collection<Job> = emptyList(),
|
||||||
block: suspend CoroutineScope.() -> T
|
block: suspend CoroutineScope.() -> T
|
||||||
): Deferred<T> = CoroutineScope(context).async(
|
): Deferred<T> = scope.async(
|
||||||
CoroutineMonitor() + Dependencies(dependencies),
|
coroutineContext + CoroutineMonitor() + Dependencies(dependencies),
|
||||||
start = CoroutineStart.LAZY
|
start = CoroutineStart.LAZY
|
||||||
) {
|
) {
|
||||||
dependencies.forEach { job ->
|
dependencies.forEach { job ->
|
||||||
@ -30,9 +31,10 @@ fun <T> goal(
|
|||||||
* Create a one-to-one goal based on existing goal
|
* Create a one-to-one goal based on existing goal
|
||||||
*/
|
*/
|
||||||
fun <T, R> Deferred<T>.pipe(
|
fun <T, R> Deferred<T>.pipe(
|
||||||
context: CoroutineContext = EmptyCoroutineContext,
|
scope: CoroutineScope,
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
block: suspend CoroutineScope.(T) -> R
|
block: suspend CoroutineScope.(T) -> R
|
||||||
): Deferred<R> = goal(this + context,listOf(this)) {
|
): Deferred<R> = goal(scope, coroutineContext, listOf(this)) {
|
||||||
block(await())
|
block(await())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -41,9 +43,10 @@ fun <T, R> Deferred<T>.pipe(
|
|||||||
* @param scope the scope for resulting goal. By default use first goal in list
|
* @param scope the scope for resulting goal. By default use first goal in list
|
||||||
*/
|
*/
|
||||||
fun <T, R> Collection<Deferred<T>>.join(
|
fun <T, R> Collection<Deferred<T>>.join(
|
||||||
context: CoroutineContext = EmptyCoroutineContext,
|
scope: CoroutineScope,
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
block: suspend CoroutineScope.(Collection<T>) -> R
|
block: suspend CoroutineScope.(Collection<T>) -> R
|
||||||
): Deferred<R> = goal(context, this) {
|
): Deferred<R> = goal(scope, coroutineContext, this) {
|
||||||
block(map { it.await() })
|
block(map { it.await() })
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,9 +57,10 @@ fun <T, R> Collection<Deferred<T>>.join(
|
|||||||
* @param R type of the result goal
|
* @param R type of the result goal
|
||||||
*/
|
*/
|
||||||
fun <K, T, R> Map<K, Deferred<T>>.join(
|
fun <K, T, R> Map<K, Deferred<T>>.join(
|
||||||
context: CoroutineContext = EmptyCoroutineContext,
|
scope: CoroutineScope,
|
||||||
|
coroutineContext: CoroutineContext = EmptyCoroutineContext,
|
||||||
block: suspend CoroutineScope.(Map<K, T>) -> R
|
block: suspend CoroutineScope.(Map<K, T>) -> R
|
||||||
): Deferred<R> = goal(context, this.values) {
|
): Deferred<R> = goal(scope, coroutineContext, this.values) {
|
||||||
block(mapValues { it.value.await() })
|
block(mapValues { it.value.await() })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,8 @@ class TaskBuilder(val name: String) {
|
|||||||
val context = this
|
val context = this
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class,
|
||||||
|
scope = context
|
||||||
) { block(context) }
|
) { block(context) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -102,7 +103,8 @@ class TaskBuilder(val name: String) {
|
|||||||
val context = this
|
val context = this
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class,
|
||||||
|
scope = context
|
||||||
) {
|
) {
|
||||||
//TODO automatically append task meta
|
//TODO automatically append task meta
|
||||||
result = { data ->
|
result = { data ->
|
||||||
@ -123,7 +125,8 @@ class TaskBuilder(val name: String) {
|
|||||||
action(from, to) {
|
action(from, to) {
|
||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class,
|
||||||
|
scope = this
|
||||||
) { block(this@action) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -141,6 +144,7 @@ class TaskBuilder(val name: String) {
|
|||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class,
|
outputType = R::class,
|
||||||
|
scope = context,
|
||||||
action = {
|
action = {
|
||||||
result(
|
result(
|
||||||
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
|
actionMeta[TaskModel.MODEL_TARGET_KEY]?.string ?: "@anonymous"
|
||||||
@ -163,7 +167,8 @@ class TaskBuilder(val name: String) {
|
|||||||
action(from, to) {
|
action(from, to) {
|
||||||
SplitAction(
|
SplitAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class,
|
||||||
|
scope = this
|
||||||
) { block(this@action) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,6 @@ package hep.dataforge.workspace
|
|||||||
|
|
||||||
import hep.dataforge.context.Context
|
import hep.dataforge.context.Context
|
||||||
import hep.dataforge.data.Data
|
import hep.dataforge.data.Data
|
||||||
import hep.dataforge.data.goal
|
|
||||||
import hep.dataforge.descriptors.NodeDescriptor
|
import hep.dataforge.descriptors.NodeDescriptor
|
||||||
import hep.dataforge.io.IOFormat
|
import hep.dataforge.io.IOFormat
|
||||||
import hep.dataforge.io.JsonMetaFormat
|
import hep.dataforge.io.JsonMetaFormat
|
||||||
@ -10,6 +9,7 @@ import hep.dataforge.io.MetaFormat
|
|||||||
import hep.dataforge.meta.EmptyMeta
|
import hep.dataforge.meta.EmptyMeta
|
||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.async
|
||||||
import kotlinx.coroutines.coroutineScope
|
import kotlinx.coroutines.coroutineScope
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.io.nio.asInput
|
import kotlinx.io.nio.asInput
|
||||||
@ -58,7 +58,7 @@ suspend fun <T : Any> Context.readData(
|
|||||||
} else {
|
} else {
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
val goal = goal {
|
val goal = async {
|
||||||
withContext(Dispatchers.IO) {
|
withContext(Dispatchers.IO) {
|
||||||
format.run {
|
format.run {
|
||||||
Files.newByteChannel(path, StandardOpenOption.READ)
|
Files.newByteChannel(path, StandardOpenOption.READ)
|
||||||
|
Loading…
Reference in New Issue
Block a user