Working on remote function call
This commit is contained in:
parent
418e1c2134
commit
8bac218715
@ -64,6 +64,16 @@ class ArrayBinary(val array: ByteArray) : RandomAccessBinary {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class PacketBinary(val packet: ByteReadPacket): Binary{
|
||||||
|
override val size: ULong
|
||||||
|
get() = TODO("not implemented") //To change initializer of created properties use File | Settings | File Templates.
|
||||||
|
|
||||||
|
override fun <R> read(block: Input.() -> R): R {
|
||||||
|
TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read given binary as object using given format
|
* Read given binary as object using given format
|
||||||
*/
|
*/
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
package hep.dataforge.io
|
package hep.dataforge.io
|
||||||
|
|
||||||
import hep.dataforge.meta.Laminate
|
import hep.dataforge.meta.*
|
||||||
import hep.dataforge.meta.Meta
|
import kotlinx.io.core.Output
|
||||||
import hep.dataforge.meta.get
|
import kotlinx.io.core.buildPacket
|
||||||
import hep.dataforge.meta.string
|
import kotlinx.io.core.readBytes
|
||||||
|
|
||||||
interface Envelope {
|
interface Envelope {
|
||||||
val meta: Meta
|
val meta: Meta
|
||||||
@ -20,6 +20,10 @@ interface Envelope {
|
|||||||
const val ENVELOPE_DESCRIPTION_KEY = "$ENVELOPE_NODE.description"
|
const val ENVELOPE_DESCRIPTION_KEY = "$ENVELOPE_NODE.description"
|
||||||
//const val ENVELOPE_TIME_KEY = "@envelope.time"
|
//const val ENVELOPE_TIME_KEY = "@envelope.time"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a static envelope using provided builder
|
||||||
|
*/
|
||||||
|
fun build(block: EnvelopeBuilder.() -> Unit) = EnvelopeBuilder().apply(block).build()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,4 +67,29 @@ fun Envelope.withMetaLayers(vararg layers: Meta): Envelope {
|
|||||||
this is ProxyEnvelope -> ProxyEnvelope(source, *layers, *this.meta.layers.toTypedArray())
|
this is ProxyEnvelope -> ProxyEnvelope(source, *layers, *this.meta.layers.toTypedArray())
|
||||||
else -> ProxyEnvelope(this, *layers)
|
else -> ProxyEnvelope(this, *layers)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class EnvelopeBuilder {
|
||||||
|
private val metaBuilder = MetaBuilder()
|
||||||
|
var data: Binary? = null
|
||||||
|
|
||||||
|
fun meta(block: MetaBuilder.() -> Unit) {
|
||||||
|
metaBuilder.apply(block)
|
||||||
|
}
|
||||||
|
|
||||||
|
var type by metaBuilder.string(key = Envelope.ENVELOPE_TYPE_KEY)
|
||||||
|
var dataType by metaBuilder.string(key = Envelope.ENVELOPE_DATA_TYPE_KEY)
|
||||||
|
var description by metaBuilder.string(key = Envelope.ENVELOPE_DESCRIPTION_KEY)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a binary and transform it into byte-array based buffer
|
||||||
|
*/
|
||||||
|
fun data(block: Output.() -> Unit) {
|
||||||
|
val bytes = buildPacket {
|
||||||
|
block()
|
||||||
|
}
|
||||||
|
data = ArrayBinary(bytes.readBytes())
|
||||||
|
}
|
||||||
|
|
||||||
|
internal fun build() = SimpleEnvelope(metaBuilder.seal(), data)
|
||||||
}
|
}
|
@ -1,38 +0,0 @@
|
|||||||
package hep.dataforge.io
|
|
||||||
|
|
||||||
import kotlin.reflect.KClass
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A descriptor for specific type of functions
|
|
||||||
*/
|
|
||||||
interface FunctionSpec<T : Any, R : Any> {
|
|
||||||
val inputType: KClass<T>
|
|
||||||
val outputType: KClass<R>
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A server that could produce asynchronous function values
|
|
||||||
*/
|
|
||||||
interface FunctionServer {
|
|
||||||
/**
|
|
||||||
* Call a function with given name and descriptor
|
|
||||||
*/
|
|
||||||
suspend fun <T : Any, R : Any, D : FunctionSpec<T, R>> call(name: String, descriptor: D, arg: T): R
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Resolve a function descriptor for given types
|
|
||||||
*/
|
|
||||||
fun <T : Any, R : Any> resolveType(inputType: KClass<out T>, outputType: KClass<out R>): FunctionSpec<T, R>
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a generic suspended function with given name and descriptor
|
|
||||||
*/
|
|
||||||
operator fun <T : Any, R : Any, D : FunctionSpec<T, R>> get(name: String, descriptor: D): (suspend (T) -> R) =
|
|
||||||
{ call(name, descriptor, it) }
|
|
||||||
}
|
|
||||||
|
|
||||||
suspend inline fun <reified T : Any, reified R : Any> FunctionServer.call(name: String, arg: T): R =
|
|
||||||
call(name, resolveType(T::class, R::class), arg)
|
|
||||||
|
|
||||||
inline operator fun <reified T : Any, reified R : Any> FunctionServer.get(name: String): (suspend (T) -> R) =
|
|
||||||
get(name, resolveType(T::class, R::class))
|
|
@ -1,14 +1,47 @@
|
|||||||
package hep.dataforge.io
|
package hep.dataforge.io
|
||||||
|
|
||||||
|
import hep.dataforge.io.IOFormat.Companion.TYPE
|
||||||
|
import hep.dataforge.provider.Type
|
||||||
import kotlinx.io.core.*
|
import kotlinx.io.core.*
|
||||||
|
import kotlinx.serialization.KSerializer
|
||||||
|
import kotlinx.serialization.cbor.Cbor
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* And interface for serialization facilities
|
* And interface for serialization facilities
|
||||||
*/
|
*/
|
||||||
|
@Type(TYPE)
|
||||||
interface IOFormat<T : Any> {
|
interface IOFormat<T : Any> {
|
||||||
fun Output.writeThis(obj: T)
|
fun Output.writeThis(obj: T)
|
||||||
fun Input.readThis(): T
|
fun Input.readThis(): T
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
const val TYPE = "ioFormat"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) }
|
fun <T : Any> IOFormat<T>.writePacket(obj: T): ByteReadPacket = buildPacket { writeThis(obj) }
|
||||||
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes()
|
fun <T : Any> IOFormat<T>.writeBytes(obj: T): ByteArray = buildPacket { writeThis(obj) }.readBytes()
|
||||||
|
|
||||||
|
|
||||||
|
object DoubleIOFormat: IOFormat<Double>{
|
||||||
|
override fun Output.writeThis(obj: Double) {
|
||||||
|
writeDouble(obj)
|
||||||
|
}
|
||||||
|
override fun Input.readThis(): Double = readDouble()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Experimental
|
||||||
|
*/
|
||||||
|
class SerializerIOFormat<T : Any>(val serializer: KSerializer<T>) : IOFormat<T> {
|
||||||
|
override fun Output.writeThis(obj: T) {
|
||||||
|
val bytes = Cbor.plain.dump(serializer, obj)
|
||||||
|
writeFully(bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun Input.readThis(): T {
|
||||||
|
//FIXME reads the whole input
|
||||||
|
val bytes = readBytes()
|
||||||
|
return Cbor.plain.load(serializer, bytes)
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,8 @@
|
|||||||
|
package hep.dataforge.io
|
||||||
|
|
||||||
|
interface Responder {
|
||||||
|
/**
|
||||||
|
* Send a request and wait for response for this specific request
|
||||||
|
*/
|
||||||
|
suspend fun respond(request: Envelope): Envelope
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package hep.dataforge.io
|
||||||
|
|
||||||
|
import hep.dataforge.context.ContextAware
|
||||||
|
import hep.dataforge.io.functions.FunctionSpec
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A server that could produce asynchronous function values
|
||||||
|
*/
|
||||||
|
interface FunctionServer : ContextAware {
|
||||||
|
/**
|
||||||
|
* Call a function with given name and descriptor
|
||||||
|
*/
|
||||||
|
suspend fun <T : Any, R : Any> call(name: String, spec: FunctionSpec<T, R>, arg: T): R
|
||||||
|
|
||||||
|
suspend fun <T : Any, R : Any> callMany(
|
||||||
|
name: String,
|
||||||
|
spec: FunctionSpec<T, R>,
|
||||||
|
arg: List<T>
|
||||||
|
): List<R> = List(arg.size) {
|
||||||
|
call(name, spec, arg[it])
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a generic suspended function with given name and descriptor
|
||||||
|
*/
|
||||||
|
fun <T : Any, R : Any> get(
|
||||||
|
name: String,
|
||||||
|
spec: FunctionSpec<T, R>
|
||||||
|
): (suspend (T) -> R) =
|
||||||
|
{ call(name, spec, it) }
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -0,0 +1,9 @@
|
|||||||
|
package hep.dataforge.io.functions
|
||||||
|
|
||||||
|
import hep.dataforge.io.IOFormat
|
||||||
|
import hep.dataforge.meta.MetaRepr
|
||||||
|
|
||||||
|
interface FunctionSpec<T : Any, R : Any>: MetaRepr {
|
||||||
|
val inputFormat: IOFormat<T>
|
||||||
|
val outputFormat: IOFormat<R>
|
||||||
|
}
|
@ -0,0 +1,58 @@
|
|||||||
|
package hep.dataforge.io.functions
|
||||||
|
|
||||||
|
import hep.dataforge.context.AbstractPlugin
|
||||||
|
import hep.dataforge.context.PluginFactory
|
||||||
|
import hep.dataforge.context.PluginTag
|
||||||
|
import hep.dataforge.io.DoubleIOFormat
|
||||||
|
import hep.dataforge.io.IOFormat
|
||||||
|
import hep.dataforge.io.IOPlugin
|
||||||
|
import hep.dataforge.meta.Meta
|
||||||
|
import hep.dataforge.meta.buildMeta
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
class FunctionsPlugin(meta: Meta) : AbstractPlugin(meta) {
|
||||||
|
override val tag: PluginTag get() = Companion.tag
|
||||||
|
|
||||||
|
override fun dependsOn(): List<PluginFactory<*>> = listOf(IOPlugin)
|
||||||
|
|
||||||
|
private val specs: Collection<FunctionSpec<out Any, out Any>> = listOf(
|
||||||
|
DoubleToDoubleFunctionSpec
|
||||||
|
)
|
||||||
|
|
||||||
|
fun resolve(meta: Meta): FunctionSpec<*, *>? {
|
||||||
|
return specs.find { it.toMeta() == meta }
|
||||||
|
}
|
||||||
|
|
||||||
|
// fun <T : Any, R : Any> resolve(inputType: KClass<out T>, outputType: KClass<out R>): FunctionSpec<T, R> {
|
||||||
|
//
|
||||||
|
// }
|
||||||
|
|
||||||
|
companion object : PluginFactory<FunctionsPlugin> {
|
||||||
|
|
||||||
|
override val tag: PluginTag = PluginTag("io.functions", group = PluginTag.DATAFORGE_GROUP)
|
||||||
|
override val type: KClass<out FunctionsPlugin> = FunctionsPlugin::class
|
||||||
|
override fun invoke(meta: Meta): FunctionsPlugin = FunctionsPlugin(meta)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object DoubleToDoubleFunctionSpec : FunctionSpec<Double, Double> {
|
||||||
|
override val inputFormat: IOFormat<Double> get() = DoubleIOFormat
|
||||||
|
override val outputFormat: IOFormat<Double> get() = DoubleIOFormat
|
||||||
|
|
||||||
|
override fun toMeta(): Meta = buildMeta {
|
||||||
|
"input" to "Double"
|
||||||
|
"output" to "Double"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//suspend inline fun <reified T : Any, reified R : Any> FunctionServer.call(name: String, arg: T): R {
|
||||||
|
// val plugin = context.plugins.load(FunctionsPlugin)
|
||||||
|
// val spec = plugin.resolve(T::class, R::class)
|
||||||
|
// return call(name, spec, arg)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//inline operator fun <reified T : Any, reified R : Any> FunctionServer.get(name: String): (suspend (T) -> R) {
|
||||||
|
// val plugin = context.plugins.load(FunctionsPlugin)
|
||||||
|
// val spec = plugin.resolve(T::class, R::class)
|
||||||
|
// return get(name, spec)
|
||||||
|
//}
|
@ -0,0 +1,100 @@
|
|||||||
|
package hep.dataforge.io
|
||||||
|
|
||||||
|
import hep.dataforge.context.Context
|
||||||
|
import hep.dataforge.context.ContextAware
|
||||||
|
import hep.dataforge.io.functions.FunctionSpec
|
||||||
|
import hep.dataforge.io.functions.FunctionsPlugin
|
||||||
|
import hep.dataforge.meta.get
|
||||||
|
import hep.dataforge.meta.int
|
||||||
|
import kotlin.reflect.KClass
|
||||||
|
|
||||||
|
class RemoteFunctionClient(override val context: Context, val responder: Responder) : FunctionServer, ContextAware {
|
||||||
|
|
||||||
|
private fun <T : Any> encodeOne(name: String, spec: FunctionSpec<T, *>, value: T): Envelope =
|
||||||
|
Envelope.build {
|
||||||
|
type = REQUEST_TYPE
|
||||||
|
meta {
|
||||||
|
FUNCTION_NAME_KEY to name
|
||||||
|
FUNCTION_SPEC_KEY to spec.toMeta()
|
||||||
|
}
|
||||||
|
data {
|
||||||
|
spec.inputFormat.run {
|
||||||
|
writeThis(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun <T : Any> encodeMany(name: String, spec: FunctionSpec<T, *>, values: List<T>): Envelope =
|
||||||
|
Envelope.build {
|
||||||
|
type = REQUEST_TYPE
|
||||||
|
meta {
|
||||||
|
FUNCTION_NAME_KEY to name
|
||||||
|
FUNCTION_SPEC_KEY to spec.toMeta()
|
||||||
|
SIZE_KEY to values.size
|
||||||
|
}
|
||||||
|
data {
|
||||||
|
spec.inputFormat.run {
|
||||||
|
values.forEach {
|
||||||
|
writeThis(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun <R : Any> decode(spec: FunctionSpec<*, R>, envelope: Envelope): List<R> {
|
||||||
|
require(envelope.type == RESPONSE_TYPE) { "Unexpected message type: ${envelope.type}" }
|
||||||
|
val size = envelope.meta[SIZE_KEY].int ?: 1
|
||||||
|
|
||||||
|
return if (size == 0) {
|
||||||
|
emptyList()
|
||||||
|
} else {
|
||||||
|
envelope.data?.read {
|
||||||
|
List<R>(size) {
|
||||||
|
spec.outputFormat.run {
|
||||||
|
readThis()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} ?: error("Message does not contain data")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun <T : Any, R : Any> call(
|
||||||
|
name: String,
|
||||||
|
spec: FunctionSpec<T, R>,
|
||||||
|
arg: T
|
||||||
|
): R {
|
||||||
|
val request = encodeOne(name, spec, arg)
|
||||||
|
val response = responder.respond(request)
|
||||||
|
return decode(spec, response).first()
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun <T : Any, R : Any> callMany(
|
||||||
|
name: String,
|
||||||
|
spec: FunctionSpec<T, R>,
|
||||||
|
arg: List<T>
|
||||||
|
): List<R> {
|
||||||
|
val request = encodeMany(name, spec, arg)
|
||||||
|
val response = responder.respond(request)
|
||||||
|
return decode(spec, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
private val plugin by lazy {
|
||||||
|
context.plugins.load(FunctionsPlugin)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun <T : Any, R : Any> resolveSpec(
|
||||||
|
inputType: KClass<out T>,
|
||||||
|
outputType: KClass<out R>
|
||||||
|
): FunctionSpec<T, R> {
|
||||||
|
return plugin.resolve(inputType, outputType)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
const val REQUEST_TYPE = "function.request"
|
||||||
|
const val RESPONSE_TYPE = "function.response"
|
||||||
|
|
||||||
|
const val FUNCTION_NAME_KEY = "function"
|
||||||
|
const val SIZE_KEY = "size"
|
||||||
|
const val FUNCTION_SPEC_KEY = "spec"
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,58 @@
|
|||||||
|
package hep.dataforge.io.functions
|
||||||
|
|
||||||
|
import hep.dataforge.context.Context
|
||||||
|
import hep.dataforge.context.ContextAware
|
||||||
|
import hep.dataforge.io.*
|
||||||
|
import hep.dataforge.meta.get
|
||||||
|
import hep.dataforge.meta.int
|
||||||
|
import hep.dataforge.meta.node
|
||||||
|
import hep.dataforge.meta.string
|
||||||
|
|
||||||
|
class RemoteFunctionServer(
|
||||||
|
override val context: Context,
|
||||||
|
val functionServer: FunctionServer
|
||||||
|
) : ContextAware, Responder {
|
||||||
|
|
||||||
|
private val plugin by lazy {
|
||||||
|
context.plugins.load(FunctionsPlugin)
|
||||||
|
}
|
||||||
|
|
||||||
|
override suspend fun respond(request: Envelope): Envelope {
|
||||||
|
require(request.type == RemoteFunctionClient.REQUEST_TYPE) { "Unexpected message type: ${request.type}" }
|
||||||
|
val functionName = request.meta[RemoteFunctionClient.FUNCTION_NAME_KEY].string ?: ""
|
||||||
|
|
||||||
|
@Suppress("UNCHECKED_CAST") val spec = request.meta[RemoteFunctionClient.FUNCTION_SPEC_KEY].node?.let {
|
||||||
|
plugin.resolve(it) as FunctionSpec<Any, Any>
|
||||||
|
} ?: error("Function specification not found")
|
||||||
|
|
||||||
|
val size = request
|
||||||
|
.meta[RemoteFunctionClient.SIZE_KEY].int ?: 1
|
||||||
|
|
||||||
|
val input = request.data?.read {
|
||||||
|
spec.inputFormat.run {
|
||||||
|
List(size) {
|
||||||
|
readThis()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} ?: error("Input is empty")
|
||||||
|
|
||||||
|
val output = functionServer.callMany<Any, Any>(functionName, spec, input)
|
||||||
|
|
||||||
|
return Envelope.build {
|
||||||
|
type = RemoteFunctionClient.RESPONSE_TYPE
|
||||||
|
meta {
|
||||||
|
RemoteFunctionClient.FUNCTION_NAME_KEY to functionName
|
||||||
|
RemoteFunctionClient.FUNCTION_SPEC_KEY to spec.toMeta()
|
||||||
|
RemoteFunctionClient.SIZE_KEY to output.size
|
||||||
|
}
|
||||||
|
data {
|
||||||
|
spec.outputFormat.run {
|
||||||
|
output.forEach {
|
||||||
|
writeThis(it)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -6,10 +6,7 @@ import hep.dataforge.data.filter
|
|||||||
import hep.dataforge.meta.Meta
|
import hep.dataforge.meta.Meta
|
||||||
import hep.dataforge.meta.MetaRepr
|
import hep.dataforge.meta.MetaRepr
|
||||||
import hep.dataforge.meta.buildMeta
|
import hep.dataforge.meta.buildMeta
|
||||||
import hep.dataforge.names.EmptyName
|
import hep.dataforge.names.*
|
||||||
import hep.dataforge.names.Name
|
|
||||||
import hep.dataforge.names.get
|
|
||||||
import hep.dataforge.names.isEmpty
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A dependency of the task which allows to lazily create a data tree for single dependency
|
* A dependency of the task which allows to lazily create a data tree for single dependency
|
||||||
@ -45,12 +42,18 @@ class AllDataDependency(val placement: Name = EmptyName) : Dependency() {
|
|||||||
"data" to "*"
|
"data" to "*"
|
||||||
"to" to placement
|
"to" to placement
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class TaskModelDependency(val name: String, val meta: Meta, val placement: Name = EmptyName) : Dependency() {
|
abstract class TaskDependency(val meta: Meta, val placement: Name = EmptyName) : Dependency() {
|
||||||
|
abstract fun resolveTask(workspace: Workspace): Task<*>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A name of the dependency for logging and serialization
|
||||||
|
*/
|
||||||
|
abstract val name: Name
|
||||||
|
|
||||||
override fun apply(workspace: Workspace): DataNode<Any> {
|
override fun apply(workspace: Workspace): DataNode<Any> {
|
||||||
val task = workspace.tasks[name] ?: error("Task with name $name is not found in the workspace")
|
val task = resolveTask(workspace)
|
||||||
if (task.isTerminal) TODO("Support terminal task")
|
if (task.isTerminal) TODO("Support terminal task")
|
||||||
val result = workspace.run(task, meta)
|
val result = workspace.run(task, meta)
|
||||||
return if (placement.isEmpty()) {
|
return if (placement.isEmpty()) {
|
||||||
@ -65,4 +68,19 @@ class TaskModelDependency(val name: String, val meta: Meta, val placement: Name
|
|||||||
"meta" to meta
|
"meta" to meta
|
||||||
"to" to placement
|
"to" to placement
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DirectTaskDependency(val task: Task<*>, meta: Meta, placement: Name) : TaskDependency(meta, placement) {
|
||||||
|
override fun resolveTask(workspace: Workspace): Task<*> = task
|
||||||
|
|
||||||
|
override val name: Name get() = DIRECT_TASK_NAME + task.name
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
val DIRECT_TASK_NAME = "@direct".asName()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class WorkspaceTaskDependency(override val name: Name, meta: Meta, placement: Name) : TaskDependency(meta, placement) {
|
||||||
|
override fun resolveTask(workspace: Workspace): Task<*> =
|
||||||
|
workspace.tasks[name] ?: error("Task with name $name is not found in the workspace")
|
||||||
}
|
}
|
@ -35,9 +35,9 @@ data class TaskModel(
|
|||||||
"meta" to meta
|
"meta" to meta
|
||||||
"dependsOn" to {
|
"dependsOn" to {
|
||||||
val dataDependencies = dependencies.filterIsInstance<DataDependency>()
|
val dataDependencies = dependencies.filterIsInstance<DataDependency>()
|
||||||
val taskDependencies = dependencies.filterIsInstance<TaskModelDependency>()
|
val taskDependencies = dependencies.filterIsInstance<TaskDependency>()
|
||||||
setIndexed("data".toName(), dataDependencies.map { it.toMeta() })
|
setIndexed("data".toName(), dataDependencies.map { it.toMeta() })
|
||||||
setIndexed("task".toName(), taskDependencies.map { it.toMeta() }) { taskDependencies[it].name }
|
setIndexed("task".toName(), taskDependencies.map { it.toMeta() }) { taskDependencies[it].name.toString() }
|
||||||
//TODO ensure all dependencies are listed
|
//TODO ensure all dependencies are listed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -76,14 +76,15 @@ class TaskModelBuilder(val name: String, meta: Meta = EmptyMeta) {
|
|||||||
var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "")
|
var target: String by this.meta.string(key = MODEL_TARGET_KEY, default = "")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add dependency for
|
* Add dependency for a task defined in a workspace and resolved by
|
||||||
*/
|
*/
|
||||||
fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) {
|
fun dependsOn(name: String, meta: Meta = this.meta, placement: Name = EmptyName) {
|
||||||
dependencies.add(TaskModelDependency(name, meta, placement))
|
dependencies.add(WorkspaceTaskDependency(name.toName(), meta, placement))
|
||||||
}
|
}
|
||||||
|
|
||||||
fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) =
|
fun dependsOn(task: Task<*>, meta: Meta = this.meta, placement: Name = EmptyName) {
|
||||||
dependsOn(task.name, meta, placement)
|
dependencies.add(DirectTaskDependency(task, meta, placement))
|
||||||
|
}
|
||||||
|
|
||||||
fun dependsOn(task: Task<*>, placement: Name = EmptyName, metaBuilder: MetaBuilder.() -> Unit) =
|
fun dependsOn(task: Task<*>, placement: Name = EmptyName, metaBuilder: MetaBuilder.() -> Unit) =
|
||||||
dependsOn(task.name, buildMeta(metaBuilder), placement)
|
dependsOn(task.name, buildMeta(metaBuilder), placement)
|
||||||
|
@ -81,13 +81,13 @@ class TaskBuilder(val name: String) {
|
|||||||
inline fun <reified T : Any, reified R : Any> customPipe(
|
inline fun <reified T : Any, reified R : Any> customPipe(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: PipeBuilder<T, R>.(Context) -> Unit
|
crossinline block: PipeBuilder<T, R>.(TaskEnv) -> Unit
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
PipeAction(
|
PipeAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class
|
||||||
) { block(context) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,13 +118,13 @@ class TaskBuilder(val name: String) {
|
|||||||
inline fun <reified T : Any, reified R : Any> joinByGroup(
|
inline fun <reified T : Any, reified R : Any> joinByGroup(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: JoinGroupBuilder<T, R>.(Context) -> Unit
|
crossinline block: JoinGroupBuilder<T, R>.(TaskEnv) -> Unit
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
JoinAction(
|
JoinAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class
|
||||||
) { block(context) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -157,13 +157,13 @@ class TaskBuilder(val name: String) {
|
|||||||
inline fun <reified T : Any, reified R : Any> split(
|
inline fun <reified T : Any, reified R : Any> split(
|
||||||
from: String = "",
|
from: String = "",
|
||||||
to: String = "",
|
to: String = "",
|
||||||
crossinline block: SplitBuilder<T, R>.(Context) -> Unit
|
crossinline block: SplitBuilder<T, R>.(TaskEnv) -> Unit
|
||||||
) {
|
) {
|
||||||
action(from, to) {
|
action(from, to) {
|
||||||
SplitAction(
|
SplitAction(
|
||||||
inputType = T::class,
|
inputType = T::class,
|
||||||
outputType = R::class
|
outputType = R::class
|
||||||
) { block(context) }
|
) { block(this@action) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,16 +60,16 @@ class SimpleWorkspaceTest {
|
|||||||
model {
|
model {
|
||||||
allData()
|
allData()
|
||||||
}
|
}
|
||||||
joinByGroup<Int, Double> { context ->
|
joinByGroup<Int, Double> {env->
|
||||||
group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) {
|
group("even", filter = { name, _ -> name.toString().toInt() % 2 == 0 }) {
|
||||||
result { data ->
|
result { data ->
|
||||||
context.logger.info { "Starting even" }
|
env.context.logger.info { "Starting even" }
|
||||||
data.values.average()
|
data.values.average()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
group("odd", filter = { name, _ -> name.toString().toInt() % 2 == 1 }) {
|
group("odd", filter = { name, _ -> name.toString().toInt() % 2 == 1 }) {
|
||||||
result { data ->
|
result { data ->
|
||||||
context.logger.info { "Starting odd" }
|
env.context.logger.info { "Starting odd" }
|
||||||
data.values.average()
|
data.values.average()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user