Build passes, but a lot of tests fails. Need more work.

This commit is contained in:
Alexander Nozik 2019-08-07 21:14:20 +03:00
parent 72954a8370
commit 65633bbd0d
8 changed files with 31 additions and 24 deletions

View File

@ -9,11 +9,14 @@ import kotlin.coroutines.EmptyCoroutineContext
* *
* **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested. * **Important:** Unlike regular deferred, the [Deferred] is started lazily, so the actual calculation is called only when result is requested.
*/ */
fun <T> CoroutineScope.task( fun <T> goal(
context: CoroutineContext, context: CoroutineContext = EmptyCoroutineContext,
dependencies: Collection<Job> = emptyList(), dependencies: Collection<Job> = emptyList(),
block: suspend CoroutineScope.() -> T block: suspend CoroutineScope.() -> T
): Deferred<T> = async(context + CoroutineMonitor() + Dependencies(dependencies), start = CoroutineStart.LAZY) { ): Deferred<T> = CoroutineScope(context).async(
CoroutineMonitor() + Dependencies(dependencies),
start = CoroutineStart.LAZY
) {
dependencies.forEach { job -> dependencies.forEach { job ->
job.start() job.start()
job.invokeOnCompletion { error -> job.invokeOnCompletion { error ->
@ -29,7 +32,7 @@ fun <T> CoroutineScope.task(
fun <T, R> Deferred<T>.pipe( fun <T, R> Deferred<T>.pipe(
context: CoroutineContext = EmptyCoroutineContext, context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R block: suspend CoroutineScope.(T) -> R
): Deferred<R> = CoroutineScope(this + context).task(context, listOf(this)) { ): Deferred<R> = goal(this + context,listOf(this)) {
block(await()) block(await())
} }
@ -38,10 +41,9 @@ fun <T, R> Deferred<T>.pipe(
* @param scope the scope for resulting goal. By default use first goal in list * @param scope the scope for resulting goal. By default use first goal in list
*/ */
fun <T, R> Collection<Deferred<T>>.join( fun <T, R> Collection<Deferred<T>>.join(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext, context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R block: suspend CoroutineScope.(Collection<T>) -> R
): Deferred<R> = scope.task(context, this) { ): Deferred<R> = goal(context, this) {
block(map { it.await() }) block(map { it.await() })
} }
@ -54,7 +56,7 @@ fun <T, R> Collection<Deferred<T>>.join(
fun <K, T, R> Map<K, Deferred<T>>.join( fun <K, T, R> Map<K, Deferred<T>>.join(
context: CoroutineContext = EmptyCoroutineContext, context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R block: suspend CoroutineScope.(Map<K, T>) -> R
): Deferred<R> = CoroutineScope(values.first() + context).task(context, this.values) { ): Deferred<R> = goal(context, this.values) {
block(mapValues { it.value.await() }) block(mapValues { it.value.await() })
} }

View File

@ -186,7 +186,12 @@ operator fun <M : MetaNode<M>> MetaNode<M>?.get(key: NameToken): MetaItem<M>? =
abstract class MetaBase: Meta{ abstract class MetaBase: Meta{
override fun equals(other: Any?): Boolean = if(other is Meta) { override fun equals(other: Any?): Boolean = if(other is Meta) {
items == other.items this.items == other.items
// val items = items
// val otherItems = other.items
// (items.keys == otherItems.keys) && items.keys.all {
// items[it] == otherItems[it]
// }
} else { } else {
false false
} }

View File

@ -8,6 +8,7 @@ package hep.dataforge.workspace
import hep.dataforge.data.DataFilter import hep.dataforge.data.DataFilter
import hep.dataforge.data.DataTree import hep.dataforge.data.DataTree
import hep.dataforge.data.DataTreeBuilder import hep.dataforge.data.DataTreeBuilder
import hep.dataforge.data.dataSequence
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.EmptyName import hep.dataforge.names.EmptyName
import hep.dataforge.names.Name import hep.dataforge.names.Name
@ -51,7 +52,7 @@ data class TaskModel(
*/ */
fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> { fun TaskModel.buildInput(workspace: Workspace): DataTree<Any> {
return DataTreeBuilder(Any::class).apply { return DataTreeBuilder(Any::class).apply {
dependencies.asSequence().flatMap { it.apply(workspace).data }.forEach { (name, data) -> dependencies.asSequence().flatMap { it.apply(workspace).dataSequence() }.forEach { (name, data) ->
//TODO add concise error on replacement //TODO add concise error on replacement
this[name] = data this[name] = data
} }

View File

@ -3,6 +3,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.ContextAware import hep.dataforge.context.ContextAware
import hep.dataforge.data.Data import hep.dataforge.data.Data
import hep.dataforge.data.DataNode import hep.dataforge.data.DataNode
import hep.dataforge.data.dataSequence
import hep.dataforge.meta.Meta import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.buildMeta import hep.dataforge.meta.buildMeta
@ -33,8 +34,8 @@ interface Workspace : ContextAware, Provider {
return when (target) { return when (target) {
"target", Meta.TYPE -> targets.mapKeys { it.key.toName() } "target", Meta.TYPE -> targets.mapKeys { it.key.toName() }
Task.TYPE -> tasks Task.TYPE -> tasks
Data.TYPE -> data.data.toMap() Data.TYPE -> data.dataSequence().toMap()
DataNode.TYPE -> data.nodes.toMap() //DataNode.TYPE -> data.nodes.toMap()
else -> emptyMap() else -> emptyMap()
} }
} }

View File

@ -8,8 +8,6 @@ import hep.dataforge.data.DataTreeBuilder
import hep.dataforge.meta.* import hep.dataforge.meta.*
import hep.dataforge.names.Name import hep.dataforge.names.Name
import hep.dataforge.names.toName import hep.dataforge.names.toName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.GlobalScope
@TaskBuildScope @TaskBuildScope
interface WorkspaceBuilder { interface WorkspaceBuilder {
@ -36,14 +34,14 @@ fun WorkspaceBuilder.data(name: Name, data: Data<Any>) {
fun WorkspaceBuilder.data(name: String, data: Data<Any>) = data(name.toName(), data) fun WorkspaceBuilder.data(name: String, data: Data<Any>) = data(name.toName(), data)
fun WorkspaceBuilder.static(name: Name, data: Any, scope: CoroutineScope = GlobalScope, meta: Meta = EmptyMeta) = fun WorkspaceBuilder.static(name: Name, data: Any, meta: Meta = EmptyMeta) =
data(name, Data.static(scope, data, meta)) data(name, Data.static(data, meta))
fun WorkspaceBuilder.static(name: Name, data: Any, scope: CoroutineScope = GlobalScope, block: MetaBuilder.() -> Unit = {}) = fun WorkspaceBuilder.static(name: Name, data: Any, block: MetaBuilder.() -> Unit = {}) =
data(name, Data.static(scope, data, buildMeta(block))) data(name, Data.static(data, buildMeta(block)))
fun WorkspaceBuilder.static(name: String, data: Any, scope: CoroutineScope = GlobalScope, block: MetaBuilder.() -> Unit = {}) = fun WorkspaceBuilder.static(name: String, data: Any, block: MetaBuilder.() -> Unit = {}) =
data(name, Data.static(scope, data, buildMeta(block))) data(name, Data.static(data, buildMeta(block)))
fun WorkspaceBuilder.data(name: Name, node: DataNode<Any>) { fun WorkspaceBuilder.data(name: Name, node: DataNode<Any>) {
this.data[name] = node this.data[name] = node

View File

@ -27,7 +27,7 @@ class TaskBuilder(val name: String) {
val localData = if (from.isEmpty()) { val localData = if (from.isEmpty()) {
node node
} else { } else {
node.getNode(from.toName()) ?: return null node[from.toName()].node ?: return null
} }
return transform(workspace.context, model, localData) return transform(workspace.context, model, localData)
} }

View File

@ -2,7 +2,7 @@ package hep.dataforge.workspace
import hep.dataforge.context.Context import hep.dataforge.context.Context
import hep.dataforge.data.Data import hep.dataforge.data.Data
import hep.dataforge.data.task import hep.dataforge.data.goal
import hep.dataforge.descriptors.NodeDescriptor import hep.dataforge.descriptors.NodeDescriptor
import hep.dataforge.io.IOFormat import hep.dataforge.io.IOFormat
import hep.dataforge.io.JsonMetaFormat import hep.dataforge.io.JsonMetaFormat
@ -58,7 +58,7 @@ suspend fun <T : Any> Context.readData(
} else { } else {
null null
} }
val goal = task { val goal = goal {
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
format.run { format.run {
Files.newByteChannel(path, StandardOpenOption.READ) Files.newByteChannel(path, StandardOpenOption.READ)

View File

@ -92,13 +92,13 @@ class SimpleWorkspaceTest {
fun testWorkspace() { fun testWorkspace() {
val node = workspace.run("sum") val node = workspace.run("sum")
val res = node.first() val res = node.first()
assertEquals(328350, res.get()) assertEquals(328350, res?.get())
} }
@Test @Test
fun testMetaPropagation() { fun testMetaPropagation() {
val node = workspace.run("sum") { "testFlag" to true } val node = workspace.run("sum") { "testFlag" to true }
val res = node.first().get() val res = node.first()?.get()
} }
@Test @Test