This commit is contained in:
Alexander Nozik 2019-10-29 20:21:38 +03:00
parent 10de05240c
commit dce9199b78
7 changed files with 44 additions and 44 deletions

View File

@ -85,7 +85,7 @@ class StaticData<T : Any>(
class NamedData<out T : Any>(val name: String, data: Data<T>) : Data<T> by data class NamedData<out T : Any>(val name: String, data: Data<T>) : Data<T> by data
fun <T : Any, R : Any> Data<T>.pipe( fun <T : Any, R : Any> Data<T>.map(
outputType: KClass<out R>, outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta, meta: Meta = this.meta,
@ -98,7 +98,7 @@ fun <T : Any, R : Any> Data<T>.pipe(
/** /**
* Create a data pipe * Create a data pipe
*/ */
inline fun <T : Any, reified R : Any> Data<T>.pipe( inline fun <T : Any, reified R : Any> Data<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta = this.meta, meta: Meta = this.meta,
noinline block: suspend CoroutineScope.(T) -> R noinline block: suspend CoroutineScope.(T) -> R
@ -109,7 +109,7 @@ inline fun <T : Any, reified R : Any> Data<T>.pipe(
/** /**
* Create a joined data. * Create a joined data.
*/ */
inline fun <T : Any, reified R : Any> Collection<Data<T>>.join( inline fun <T : Any, reified R : Any> Collection<Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta, meta: Meta,
noinline block: suspend CoroutineScope.(Collection<T>) -> R noinline block: suspend CoroutineScope.(Collection<T>) -> R
@ -119,10 +119,10 @@ inline fun <T : Any, reified R : Any> Collection<Data<T>>.join(
coroutineContext, coroutineContext,
this this
) { ) {
block(map { this.run { it.await(this) } }) block(map { run { it.await(this) } })
} }
fun <K, T : Any, R : Any> Map<K, Data<T>>.join( fun <K, T : Any, R : Any> Map<K, Data<T>>.reduce(
outputType: KClass<out R>, outputType: KClass<out R>,
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta, meta: Meta,
@ -143,7 +143,7 @@ fun <K, T : Any, R : Any> Map<K, Data<T>>.join(
* @param T type of the input goal * @param T type of the input goal
* @param R type of the result goal * @param R type of the result goal
*/ */
inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.join( inline fun <K, T : Any, reified R : Any> Map<K, Data<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
meta: Meta, meta: Meta,
noinline block: suspend CoroutineScope.(Map<K, T>) -> R noinline block: suspend CoroutineScope.(Map<K, T>) -> R

View File

@ -85,7 +85,7 @@ open class DynamicGoal<T>(
/** /**
* Create a one-to-one goal based on existing goal * Create a one-to-one goal based on existing goal
*/ */
fun <T, R> Goal<T>.pipe( fun <T, R> Goal<T>.map(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(T) -> R block: suspend CoroutineScope.(T) -> R
): Goal<R> = DynamicGoal(coroutineContext, listOf(this)) { ): Goal<R> = DynamicGoal(coroutineContext, listOf(this)) {
@ -95,11 +95,11 @@ fun <T, R> Goal<T>.pipe(
/** /**
* Create a joining goal. * Create a joining goal.
*/ */
fun <T, R> Collection<Goal<T>>.join( fun <T, R> Collection<Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Collection<T>) -> R block: suspend CoroutineScope.(Collection<T>) -> R
): Goal<R> = DynamicGoal(coroutineContext, this) { ): Goal<R> = DynamicGoal(coroutineContext, this) {
block(map { this.run { it.await(this) } }) block(map { run { it.await(this) } })
} }
/** /**
@ -108,7 +108,7 @@ fun <T, R> Collection<Goal<T>>.join(
* @param T type of the input goal * @param T type of the input goal
* @param R type of the result goal * @param R type of the result goal
*/ */
fun <K, T, R> Map<K, Goal<T>>.join( fun <K, T, R> Map<K, Goal<T>>.reduce(
coroutineContext: CoroutineContext = EmptyCoroutineContext, coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.(Map<K, T>) -> R block: suspend CoroutineScope.(Map<K, T>) -> R
): Goal<R> = DynamicGoal(coroutineContext, this.values) { ): Goal<R> = DynamicGoal(coroutineContext, this.values) {

View File

@ -10,7 +10,7 @@ class ActionEnv(val name: Name, val meta: Meta)
/** /**
* Action environment * Action environment
*/ */
class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) { class MapActionBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
lateinit var result: suspend ActionEnv.(T) -> R lateinit var result: suspend ActionEnv.(T) -> R
/** /**
@ -22,10 +22,10 @@ class PipeBuilder<T, R>(var name: Name, var meta: MetaBuilder) {
} }
class PipeAction<T : Any, out R : Any>( class MapAction<T : Any, out R : Any>(
val inputType: KClass<out T>, val inputType: KClass<out T>,
val outputType: KClass<out R>, val outputType: KClass<out R>,
private val block: PipeBuilder<T, R>.() -> Unit private val block: MapActionBuilder<T, R>.() -> Unit
) : Action<T, R> { ) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> { override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
@ -38,12 +38,12 @@ class PipeAction<T : Any, out R : Any>(
// creating environment from old meta and name // creating environment from old meta and name
val env = ActionEnv(name, oldMeta) val env = ActionEnv(name, oldMeta)
//applying transformation from builder //applying transformation from builder
val builder = PipeBuilder<T, R>(name, oldMeta).apply(block) val builder = MapActionBuilder<T, R>(name, oldMeta).apply(block)
//getting new name //getting new name
val newName = builder.name val newName = builder.name
//getting new meta //getting new meta
val newMeta = builder.meta.seal() val newMeta = builder.meta.seal()
val newData = data.pipe(outputType, meta = newMeta) { builder.result(env, it) } val newData = data.map(outputType, meta = newMeta) { builder.result(env, it) }
//setting the data node //setting the data node
this[newName] = newData this[newName] = newData
} }
@ -51,10 +51,10 @@ class PipeAction<T : Any, out R : Any>(
} }
} }
inline fun <reified T : Any, reified R : Any> DataNode<T>.pipe( inline fun <reified T : Any, reified R : Any> DataNode<T>.map(
meta: Meta, meta: Meta,
noinline action: PipeBuilder<in T, out R>.() -> Unit noinline action: MapActionBuilder<in T, out R>.() -> Unit
): DataNode<R> = PipeAction(T::class, R::class, action).invoke(this, meta) ): DataNode<R> = MapAction(T::class, R::class, action).invoke(this, meta)

View File

@ -21,7 +21,7 @@ class JoinGroup<T : Any, R : Any>(var name: String, internal val node: DataNode<
} }
class JoinGroupBuilder<T : Any, R : Any>(val actionMeta: Meta) { class ReduceGroupBuilder<T : Any, R : Any>(val actionMeta: Meta) {
private val groupRules: MutableList<(DataNode<T>) -> List<JoinGroup<T, R>>> = ArrayList(); private val groupRules: MutableList<(DataNode<T>) -> List<JoinGroup<T, R>>> = ArrayList();
/** /**
@ -73,16 +73,16 @@ class JoinGroupBuilder<T : Any, R : Any>(val actionMeta: Meta) {
/** /**
* The same rules as for KPipe * The same rules as for KPipe
*/ */
class JoinAction<T : Any, R : Any>( class ReduceAction<T : Any, R : Any>(
val inputType: KClass<out T>, val inputType: KClass<out T>,
val outputType: KClass<out R>, val outputType: KClass<out R>,
private val action: JoinGroupBuilder<T, R>.() -> Unit private val action: ReduceGroupBuilder<T, R>.() -> Unit
) : Action<T, R> { ) : Action<T, R> {
override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> { override fun invoke(node: DataNode<T>, meta: Meta): DataNode<R> {
node.ensureType(inputType) node.ensureType(inputType)
return DataNode.invoke(outputType) { return DataNode.invoke(outputType) {
JoinGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group -> ReduceGroupBuilder<T, R>(meta).apply(action).buildGroups(node).forEach { group ->
val laminate = Laminate(group.meta, meta) val laminate = Laminate(group.meta, meta)
@ -92,7 +92,7 @@ class JoinAction<T : Any, R : Any>(
val env = ActionEnv(groupName.toName(), laminate.builder()) val env = ActionEnv(groupName.toName(), laminate.builder())
val res: DynamicData<R> = dataMap.join(outputType, meta = laminate) { group.result.invoke(env, it) } val res: DynamicData<R> = dataMap.reduce(outputType, meta = laminate) { group.result.invoke(env, it) }
set(env.name, res) set(env.name, res)
} }

View File

@ -55,7 +55,7 @@ class SplitAction<T : Any, R : Any>(
rule(env) rule(env)
val res = data.pipe(outputType, meta = env.meta) { env.result(it) } val res = data.map(outputType, meta = env.meta) { env.result(it) }
set(env.name, res) set(env.name, res)
} }
} }

View File

@ -105,15 +105,15 @@ class TaskBuilder<R : Any>(val name: Name, val type: KClass<out R>) {
} }
/** /**
* A customized pipe action with ability to change meta and name * A customized map action with ability to change meta and name
*/ */
inline fun <reified T : Any> customPipe( inline fun <reified T : Any> mapAction(
from: String = "", from: String = "",
to: String = "", to: String = "",
crossinline block: PipeBuilder<T, R>.(TaskEnv) -> Unit crossinline block: MapActionBuilder<T, R>.(TaskEnv) -> Unit
) { ) {
action(from, to) { action(from, to) {
PipeAction( MapAction(
inputType = T::class, inputType = T::class,
outputType = type outputType = type
) { block(this@action) } ) { block(this@action) }
@ -121,15 +121,15 @@ class TaskBuilder<R : Any>(val name: Name, val type: KClass<out R>) {
} }
/** /**
* A simple pipe action without changing meta or name * A simple map action without changing meta or name
*/ */
inline fun <reified T : Any> pipe( inline fun <reified T : Any> map(
from: String = "", from: String = "",
to: String = "", to: String = "",
crossinline block: suspend TaskEnv.(T) -> R crossinline block: suspend TaskEnv.(T) -> R
) { ) {
action(from, to) { action(from, to) {
PipeAction( MapAction(
inputType = T::class, inputType = T::class,
outputType = type outputType = type
) { ) {
@ -144,13 +144,13 @@ class TaskBuilder<R : Any>(val name: Name, val type: KClass<out R>) {
/** /**
* Join elements in gathered data by multiple groups * Join elements in gathered data by multiple groups
*/ */
inline fun <reified T : Any> joinByGroup( inline fun <reified T : Any> reduceByGroup(
from: String = "", from: String = "",
to: String = "", to: String = "",
crossinline block: JoinGroupBuilder<T, R>.(TaskEnv) -> Unit //TODO needs KEEP-176 crossinline block: ReduceGroupBuilder<T, R>.(TaskEnv) -> Unit //TODO needs KEEP-176
) { ) {
action(from, to) { action(from, to) {
JoinAction( ReduceAction(
inputType = T::class, inputType = T::class,
outputType = type outputType = type
) { block(this@action) } ) { block(this@action) }
@ -160,13 +160,13 @@ class TaskBuilder<R : Any>(val name: Name, val type: KClass<out R>) {
/** /**
* Join all elemlents in gathered data matching input type * Join all elemlents in gathered data matching input type
*/ */
inline fun <reified T : Any> join( inline fun <reified T : Any> reduce(
from: String = "", from: String = "",
to: String = "", to: String = "",
crossinline block: suspend TaskEnv.(Map<Name, T>) -> R crossinline block: suspend TaskEnv.(Map<Name, T>) -> R
) { ) {
action(from, to) { action(from, to) {
JoinAction( ReduceAction(
inputType = T::class, inputType = T::class,
outputType = type, outputType = type,
action = { action = {

View File

@ -17,7 +17,7 @@ class SimpleWorkspaceTest {
override val tag: PluginTag = PluginTag("test") override val tag: PluginTag = PluginTag("test")
val contextTask = task("test", Any::class) { val contextTask = task("test", Any::class) {
pipe<Any> { map<Any> {
context.logger.info { "Test: $it" } context.logger.info { "Test: $it" }
} }
} }
@ -39,13 +39,13 @@ class SimpleWorkspaceTest {
model { model {
data("myData\\[12\\]") data("myData\\[12\\]")
} }
pipe<Int>{ map<Int>{
it it
} }
} }
val square = task<Int>("square") { val square = task<Int>("square") {
pipe<Int> { data -> map<Int> { data ->
if (meta["testFlag"].boolean == true) { if (meta["testFlag"].boolean == true) {
println("flag") println("flag")
} }
@ -55,7 +55,7 @@ class SimpleWorkspaceTest {
} }
val linear = task<Int>("linear") { val linear = task<Int>("linear") {
pipe<Int> { data -> map<Int> { data ->
context.logger.info { "Starting linear on $data" } context.logger.info { "Starting linear on $data" }
data * 2 + 1 data * 2 + 1
} }
@ -86,14 +86,14 @@ class SimpleWorkspaceTest {
model { model {
dependsOn(square) dependsOn(square)
} }
join<Int> { data -> reduce<Int> { data ->
context.logger.info { "Starting sum" } context.logger.info { "Starting sum" }
data.values.sum() data.values.sum()
} }
} }
val average = task<Double>("average") { val average = task<Double>("average") {
joinByGroup<Int> { env -> reduceByGroup<Int> { env ->
group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) { group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) {
result { data -> result { data ->
env.context.logger.info { "Starting even" } env.context.logger.info { "Starting even" }
@ -113,13 +113,13 @@ class SimpleWorkspaceTest {
model { model {
dependsOn(average) dependsOn(average)
} }
join<Double> { data -> reduce<Double> { data ->
data["even"]!! - data["odd"]!! data["even"]!! - data["odd"]!!
} }
} }
val customPipeTask = task<Int>("custom") { val customPipeTask = task<Int>("custom") {
customPipe<Int> { mapAction<Int> {
meta = meta.builder().apply { meta = meta.builder().apply {
"newValue" to 22 "newValue" to 22
} }