Minor change to Goal API

This commit is contained in:
Alexander Nozik 2019-12-03 17:01:56 +03:00
parent c789dabdae
commit 0ec42689d7
6 changed files with 42 additions and 44 deletions

View File

@ -91,7 +91,7 @@ fun <T : Any, R : Any> Data<T>.map(
meta: Meta = this.meta, meta: Meta = this.meta,
block: suspend CoroutineScope.(T) -> R block: suspend CoroutineScope.(T) -> R
): Data<R> = DynamicData(outputType, meta, coroutineContext, listOf(this)) { ): Data<R> = DynamicData(outputType, meta, coroutineContext, listOf(this)) {
block(await(this)) block(await())
} }
@ -103,7 +103,7 @@ inline fun <T : Any, reified R : Any> Data<T>.map(
meta: Meta = this.meta, meta: Meta = this.meta,
noinline block: suspend CoroutineScope.(T) -> R noinline block: suspend CoroutineScope.(T) -> R
): Data<R> = DynamicData(R::class, meta, coroutineContext, listOf(this)) { ): Data<R> = DynamicData(R::class, meta, coroutineContext, listOf(this)) {
block(await(this)) block(await())
} }
/** /**
@ -119,7 +119,7 @@ inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduce(
coroutineContext, coroutineContext,
this this
) { ) {
block(map { run { it.await(this) } }) block(map { run { it.await() } })
} }
fun <K, T : Any, R : Any> Map<K, Data<T>>.reduce( fun <K, T : Any, R : Any> Map<K, Data<T>>.reduce(
@ -133,7 +133,7 @@ fun <K, T : Any, R : Any> Map<K, Data<T>>.reduce(
coroutineContext, coroutineContext,
this.values this.values
) { ) {
block(mapValues { it.value.await(this) }) block(mapValues { it.value.await() })
} }
@ -153,7 +153,7 @@ inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduce(
coroutineContext, coroutineContext,
this.values this.values
) { ) {
block(mapValues { it.value.await(this) }) block(mapValues { it.value.await() })
} }

View File

@ -4,6 +4,7 @@ import hep.dataforge.meta.*
import hep.dataforge.names.* import hep.dataforge.names.*
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlin.collections.component1 import kotlin.collections.component1
import kotlin.collections.component2 import kotlin.collections.component2
@ -55,6 +56,19 @@ interface DataNode<out T : Any> : MetaRepr {
} }
} }
/**
* Start computation for all goals in data node and return a job for the whole node
*/
@Suppress("DeferredResultUnused")
fun CoroutineScope.startAll(): Job = launch {
items.values.forEach {
when (it) {
is DataItem.Node<*> -> it.node.run { startAll() }
is DataItem.Leaf<*> -> it.data.run { startAsync() }
}
}
}
companion object { companion object {
const val TYPE = "dataNode" const val TYPE = "dataNode"
@ -68,21 +82,11 @@ interface DataNode<out T : Any> : MetaRepr {
} }
} }
suspend fun <T: Any> DataNode<T>.join(): Unit = coroutineScope { startAll().join() }
val <T : Any> DataItem<T>?.node: DataNode<T>? get() = (this as? DataItem.Node<T>)?.node val <T : Any> DataItem<T>?.node: DataNode<T>? get() = (this as? DataItem.Node<T>)?.node
val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.data val <T : Any> DataItem<T>?.data: Data<T>? get() = (this as? DataItem.Leaf<T>)?.data
/**
* Start computation for all goals in data node and return a job for the whole node
*/
fun DataNode<*>.launchAll(scope: CoroutineScope): Job = scope.launch {
items.values.forEach {
when (it) {
is DataItem.Node<*> -> it.node.launchAll(scope)
is DataItem.Leaf<*> -> it.data.start(scope)
}
}
}
operator fun <T : Any> DataNode<T>.get(name: Name): DataItem<T>? = when (name.length) { operator fun <T : Any> DataNode<T>.get(name: Name): DataItem<T>? = when (name.length) {
0 -> error("Empty name") 0 -> error("Empty name")
1 -> items[name.first()] 1 -> items[name.first()]

View File

@ -15,9 +15,7 @@ interface Goal<out T> {
* Get ongoing computation or start a new one. * Get ongoing computation or start a new one.
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
*/ */
fun startAsync(scope: CoroutineScope): Deferred<T> fun CoroutineScope.startAsync(): Deferred<T>
suspend fun CoroutineScope.await(): T = startAsync(this).await()
/** /**
* Reset the computation * Reset the computation
@ -29,17 +27,15 @@ interface Goal<out T> {
} }
} }
fun Goal<*>.start(scope: CoroutineScope): Job = startAsync(scope) suspend fun <T> Goal<T>.await(): T = coroutineScope { startAsync().await() }
val Goal<*>.isComplete get() = result?.isCompleted ?: false val Goal<*>.isComplete get() = result?.isCompleted ?: false
suspend fun <T> Goal<T>.await(scope: CoroutineScope): T = scope.await()
open class StaticGoal<T>(val value: T) : Goal<T> { open class StaticGoal<T>(val value: T) : Goal<T> {
override val dependencies: Collection<Goal<*>> get() = emptyList() override val dependencies: Collection<Goal<*>> get() = emptyList()
override val result: Deferred<T> = CompletableDeferred(value) override val result: Deferred<T> = CompletableDeferred(value)
override fun startAsync(scope: CoroutineScope): Deferred<T> = result override fun CoroutineScope.startAsync(): Deferred<T> = result
override fun reset() { override fun reset() {
//doNothing //doNothing
@ -59,11 +55,12 @@ open class DynamicGoal<T>(
* Get ongoing computation or start a new one. * Get ongoing computation or start a new one.
* Does not guarantee thread safety. In case of multi-thread access, could create orphan computations. * Does not guarantee thread safety. In case of multi-thread access, could create orphan computations.
*/ */
override fun startAsync(scope: CoroutineScope): Deferred<T> { override fun CoroutineScope.startAsync(): Deferred<T> {
val startedDependencies = this.dependencies.map { goal -> val startedDependencies = this@DynamicGoal.dependencies.map { goal ->
goal.startAsync(scope) goal.run { startAsync() }
} }
return result ?: scope.async(coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)) { return result
?: async(this@DynamicGoal.coroutineContext + CoroutineMonitor() + Dependencies(startedDependencies)) {
startedDependencies.forEach { deferred -> startedDependencies.forEach { deferred ->
deferred.invokeOnCompletion { error -> deferred.invokeOnCompletion { error ->
if (error != null) cancel(CancellationException("Dependency $deferred failed with error: ${error.message}")) if (error != null) cancel(CancellationException("Dependency $deferred failed with error: ${error.message}"))
@ -89,7 +86,7 @@ fun <T, R> Goal<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R block: suspend CoroutineScope.(T) -> R
): Goal<R> = DynamicGoal(coroutineContext, listOf(this)) { ): Goal<R> = DynamicGoal(coroutineContext, listOf(this)) {
block(await(this)) block(await())
} }
/** /**
@ -99,7 +96,7 @@ fun <T, R> Collection<Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R block: suspend CoroutineScope.(Collection<T>) -> R
): Goal<R> = DynamicGoal(coroutineContext, this) { ): Goal<R> = DynamicGoal(coroutineContext, this) {
block(map { run { it.await(this) } }) block(map { run { it.await() } })
} }
/** /**
@ -112,6 +109,6 @@ fun <K, T, R> Map<K, Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R block: suspend CoroutineScope.(Map<K, T>) -> R
): Goal<R> = DynamicGoal(coroutineContext, this.values) { ): Goal<R> = DynamicGoal(coroutineContext, this.values) {
block(mapValues { it.value.await(this) }) block(mapValues { it.value.await() })
} }

View File

@ -41,7 +41,7 @@ fun <R : Any> Data<*>.cast(type: KClass<out R>): Data<R> {
override val meta: Meta get() = this@cast.meta override val meta: Meta get() = this@cast.meta
override val dependencies: Collection<Goal<*>> get() = this@cast.dependencies override val dependencies: Collection<Goal<*>> get() = this@cast.dependencies
override val result: Deferred<R>? get() = this@cast.result as Deferred<R> override val result: Deferred<R>? get() = this@cast.result as Deferred<R>
override fun startAsync(scope: CoroutineScope): Deferred<R> = this@cast.startAsync(scope) as Deferred<R> override fun CoroutineScope.startAsync(): Deferred<R> = this@cast.run { startAsync() as Deferred<R> }
override fun reset() = this@cast.reset() override fun reset() = this@cast.reset()
override val type: KClass<out R> = type override val type: KClass<out R> = type
} }

View File

@ -6,7 +6,6 @@ import hep.dataforge.io.Envelope
import hep.dataforge.io.IOFormat import hep.dataforge.io.IOFormat
import hep.dataforge.io.SimpleEnvelope import hep.dataforge.io.SimpleEnvelope
import hep.dataforge.io.readWith import hep.dataforge.io.readWith
import kotlinx.coroutines.coroutineScope
import kotlinx.io.ArrayBinary import kotlinx.io.ArrayBinary
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -18,9 +17,7 @@ fun <T : Any> Envelope.toData(type: KClass<out T>, format: IOFormat<T>): Data<T>
} }
suspend fun <T : Any> Data<T>.toEnvelope(format: IOFormat<T>): Envelope { suspend fun <T : Any> Data<T>.toEnvelope(format: IOFormat<T>): Envelope {
val obj = coroutineScope { val obj = await()
await(this)
}
val binary = ArrayBinary.write { val binary = ArrayBinary.write {
format.run { writeObject(obj) } format.run { writeObject(obj) }
} }