Add in-memory caching for workspaces

This commit is contained in:
Alexander Nozik 2022-10-03 20:36:28 +03:00
parent 5406a6a64c
commit 4117a05df4
No known key found for this signature in database
GPG Key ID: F7FCF2DD25C71357
23 changed files with 210 additions and 57 deletions
build.gradle.kts
dataforge-context
dataforge-data
build.gradle.kts
src/commonMain/kotlin/space/kscience/dataforge/data
dataforge-io
dataforge-meta
build.gradle.kts
src/commonMain/kotlin/space/kscience/dataforge/meta
dataforge-workspace
build.gradle.kts
src
commonMain/kotlin/space/kscience/dataforge/workspace
jvmMain/kotlin/space/kscience/dataforge/workspace
jvmTest/kotlin/space/kscience/dataforge/workspace
gradle.properties
gradle/wrapper

@ -1,4 +1,7 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
import space.kscience.gradle.isInDevelopment
import space.kscience.gradle.useApache2Licence
import space.kscience.gradle.useSPCTeam
plugins {
id("space.kscience.gradle.project")
@ -12,8 +15,8 @@ allprojects {
subprojects {
apply(plugin = "maven-publish")
tasks.withType<KotlinCompile>{
kotlinOptions{
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = freeCompilerArgs + "-Xcontext-receivers"
}
}
@ -24,8 +27,18 @@ readme {
}
ksciencePublish {
github("dataforge-core")
space("https://maven.pkg.jetbrains.space/mipt-npm/p/sci/maven")
pom("https://github.com/SciProgCentre/kmath") {
useApache2Licence()
useSPCTeam()
}
github("dataforge-core", "SciProgCentre")
space(
if (isInDevelopment) {
"https://maven.pkg.jetbrains.space/mipt-npm/p/sci/dev"
} else {
"https://maven.pkg.jetbrains.space/mipt-npm/p/sci/release"
}
)
sonatype()
}

@ -1,11 +1,11 @@
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.native")
}
description = "Context and provider definitions"
kscience {
native()
useCoroutines()
}

@ -1,9 +1,9 @@
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.native")
}
kscience{
native()
useCoroutines()
}

@ -56,6 +56,11 @@ public fun <T : Any> DataSet<T>.asSequence(): Sequence<NamedData<T>> = object :
override fun iterator(): Iterator<NamedData<T>> = this@asSequence.iterator()
}
/**
* Return a single [Data] in this [DataSet]. Throw error if it is not single.
*/
public fun <T : Any> DataSet<T>.single(): NamedData<T> = asSequence().single()
public fun <T : Any> DataSet<T>.asIterable(): Iterable<NamedData<T>> = object : Iterable<NamedData<T>> {
override fun iterator(): Iterator<NamedData<T>> = this@asIterable.iterator()
}

@ -1,6 +1,5 @@
package space.kscience.dataforge.data
import kotlinx.coroutines.flow.map
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.seal
@ -37,7 +36,7 @@ public inline fun <T : Any, reified R : Any> Data<T>.map(
}
/**
* Combine this data with the other data using [block]. See [map] for other details
* Combine this data with the other data using [block]. See [Data::map] for other details
*/
public inline fun <T1 : Any, T2 : Any, reified R : Any> Data<T1>.combine(
other: Data<T2>,

@ -2,12 +2,12 @@ import space.kscience.gradle.KScienceVersions
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.native")
}
description = "IO module"
kscience {
native()
useSerialization(sourceSet = space.kscience.gradle.DependencySourceSet.TEST) {
cbor()
}

@ -1,9 +1,9 @@
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.native")
}
kscience {
native()
useSerialization{
json()
}

@ -13,8 +13,8 @@ private class ObservableMetaWrapper(
val listeners: MutableSet<MetaListener>
) : ObservableMutableMeta {
override val items: Map<NameToken, ObservableMutableMeta>
get() = root.items.mapValues {
ObservableMetaWrapper(root, absoluteName + it.key, listeners)
get() = root.items.keys.associateWith {
ObservableMetaWrapper(root, absoluteName + it, listeners)
}
override fun getMeta(name: Name): ObservableMutableMeta? =

@ -14,7 +14,15 @@ public class SealedMeta internal constructor(
) : TypedMeta<SealedMeta> {
override fun toString(): String = Meta.toString(this)
override fun equals(other: Any?): Boolean = Meta.equals(this, other as? Meta)
override fun hashCode(): Int = Meta.hashCode(this)
/**
* Compute hash code once to optimize later access
*/
private val cachedHashCode by lazy {
Meta.hashCode(this)
}
override fun hashCode(): Int = cachedHashCode
}
/**

@ -1,9 +1,9 @@
plugins {
id("space.kscience.gradle.mpp")
id("space.kscience.gradle.native")
}
kscience{
native()
useCoroutines()
}
@ -16,6 +16,11 @@ kotlin {
api(project(":dataforge-io"))
}
}
jvmTest{
dependencies{
implementation("ch.qos.logback:logback-classic:1.4.1")
}
}
}
}

@ -22,6 +22,11 @@ import kotlin.reflect.typeOf
@Type(TYPE)
public interface Task<out T : Any> : Described {
/**
* A task identification string used to compare tasks and check task body for change
*/
public val fingerprint: String get() = hashCode().toString()
/**
* Compute a [TaskResult] using given meta. In general, the result is lazy and represents both computation model
* and a handler for actual result

@ -17,7 +17,7 @@ public interface TaskData<out T : Any> : NamedData<T> {
/**
* The name of the stage that produced this data. [Name.EMPTY] if the workspace intrinsic data is used.
*/
public val task: Name
public val taskName: Name
/**
* Stage configuration used to produce this data.
@ -34,7 +34,7 @@ private class TaskDataImpl<out T : Any>(
override val workspace: Workspace,
override val data: Data<T>,
override val name: Name,
override val task: Name,
override val taskName: Name,
override val taskMeta: Meta,
) : TaskData<T>, Data<T> by data {
// override val dependencies: Collection<TaskData<*>> = data.dependencies.map {

@ -15,6 +15,9 @@ public interface DataSelector<T: Any>{
public suspend fun select(workspace: Workspace, meta: Meta): DataSet<T>
}
/**
* An environment for pull-mode computation
*/
@Type(Workspace.TYPE)
public interface Workspace : ContextAware, Provider {
/**
@ -28,7 +31,7 @@ public interface Workspace : ContextAware, Provider {
public val targets: Map<String, Meta>
/**
* All stages associated with the workspace
* All tasks associated with the workspace
*/
public val tasks: Map<Name, Task<*>>

@ -10,16 +10,18 @@ import space.kscience.dataforge.names.Name
/**
* A simple workspace without caching
*/
public class SimpleWorkspace(
public class WorkspaceBase internal constructor(
override val context: Context,
data: DataSet<*>,
override val targets: Map<String, Meta>,
private val externalTasks: Map<Name, Task<*>>,
private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>,
) : Workspace {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>>
get() = context.gather<Task<*>>(Task.TYPE) + externalTasks
override val tasks: Map<Name, Task<*>> by lazy { context.gather<Task<*>>(Task.TYPE) + externalTasks }
override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> =
postProcess(super.produce(taskName, taskMeta))
}

@ -88,6 +88,7 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas
private var data: DataSet<*>? = null
private val targets: HashMap<String, Meta> = HashMap()
private val tasks = HashMap<Name, Task<*>>()
private var cache: WorkspaceCache? = null
/**
* Define a context for the workspace
@ -123,7 +124,16 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas
tasks[taskName] = task
}
public fun build(): Workspace = SimpleWorkspace(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks)
public fun useCache() {
cache = InMemoryWorkspaceCache()
}
public fun build(): Workspace {
val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result ->
cache?.evaluate(result) ?: result
}
return WorkspaceBase(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks, postProcess)
}
}
/**

@ -0,0 +1,38 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
public interface WorkspaceCache {
public suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T>
}
private typealias TaskResultId = Pair<Name, Meta>
public class InMemoryWorkspaceCache : WorkspaceCache {
// never do that at home!
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
//TODO do actual check
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
for (d: TaskData<T> in result) {
cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d }
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = (cache[result.taskName to result.taskMeta]
?.values?.map { it.checkType<T>(result.dataType) }
?: emptyList()).iterator()
override fun get(name: Name): TaskData<T>? {
val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null
//TODO check types
return cached.checkType(result.dataType)
}
}
}
}

@ -1,19 +1,38 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.map
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
/**
* Select data using given [selector]
*/
public suspend fun <T : Any> TaskResultBuilder<*>.from(
selector: DataSelector<T>,
): DataSet<T> = selector.select(workspace, taskMeta)
meta: Meta = taskMeta
): DataSet<T> = selector.select(workspace, meta)
/**
* Select data from a [WorkspacePlugin] attached to this [Workspace] context.
*/
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
pluginFactory: PluginFactory<P>,
meta: Meta = taskMeta,
selectorBuilder: P.() -> TaskReference<T>,
): DataSet<T> {
val plugin = workspace.context.plugins[pluginFactory]
?: error("Plugin ${pluginFactory.tag} not loaded into workspace context")
val taskReference: TaskReference<T> = plugin.selectorBuilder()
return workspace.produce(plugin.name + taskReference.taskName, meta) as TaskResult<T>
}
public val TaskResultBuilder<*>.allData: DataSelector<*>
get() = object : DataSelector<Any> {
@ -27,11 +46,14 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFrom(
selector: DataSelector<T>,
crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R
selectorMeta: Meta = taskMeta,
dataMetaTransform: MutableMeta.() -> Unit = {},
crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R,
) {
from(selector).forEach { data ->
from(selector, selectorMeta).forEach { data ->
val meta = data.meta.toMutableMeta().apply {
taskName put taskMeta
dataMetaTransform()
}
val res = data.map(workspace.context.coroutineContext, meta) {

@ -11,6 +11,9 @@ import space.kscience.dataforge.names.matches
// data(builder)
//}
/**
* Select the whole data set from the workspace filtered by type.
*/
@OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> =
object : DataSelector<T> {

@ -0,0 +1,46 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get
class CachingWorkspaceTest {
val workspace = Workspace {
data {
//statically initialize data
repeat(5) {
static("myData[$it]", it)
}
}
useCache()
val doFirst by task<Any> {
pipeFrom(data()) { _, name, meta ->
println("Done first on $name with flag=${taskMeta["flag"].boolean ?: false}")
}
}
val doSecond by task<Any>{
pipeFrom(doFirst) { _, name, meta ->
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
}
}
}
@Test
fun testMetaPropagation() = runTest {
val first = workspace.produce("doFirst")
val secondA = workspace.produce("doSecond")
val secondB = workspace.produce("doSecond", Meta { "flag" put true })
first.startAll(this)
secondA.startAll(this)
secondB.startAll(this)
}
}

@ -33,40 +33,44 @@ public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit
produce(task, block)
}
@OptIn(DFExperimental::class)
internal object TestPlugin : WorkspacePlugin() {
override val tag: PluginTag = PluginTag("test")
@DFExperimental
class SimpleWorkspaceTest {
val testPlugin = object : WorkspacePlugin() {
override val tag: PluginTag = PluginTag("test")
val test by task<Any> {
populateFrom(
workspace.data.map {
it.also {
logger.info { "Test: $it" }
}
}
)
val test by task {
// type is inferred
pipeFrom(data<Int>()) { arg, _, _ ->
logger.info { "Test: $arg" }
arg
}
}
}
val testPluginFactory = testPlugin.toFactory()
@DFExperimental
internal class SimpleWorkspaceTest {
val testPluginFactory = TestPlugin.toFactory()
val workspace = Workspace {
context {
//load existing task via plugin into the workspace
plugin(testPluginFactory)
}
data {
//statically initialize data
repeat(100) {
static("myData[$it]", it)
}
}
val filterOne by task<Int> {
workspace.data.getByType<Int>("myData[12]")?.let { source ->
val name by taskMeta.string { error("Name field not defined") }
from(testPluginFactory) { test }.getByType<Int>(name)?.let { source ->
data(source.name, source.map { it })
}
}
@ -74,19 +78,11 @@ class SimpleWorkspaceTest {
val square by task<Int> {
pipeFrom(data<Int>()) { arg, name, meta ->
if (meta["testFlag"].boolean == true) {
println("flag")
println("Side effect")
}
workspace.logger.info { "Starting square on $name" }
arg * arg
}
// workspace.data.select<Int>().forEach { data ->
// if (data.meta["testFlag"].boolean == true) {
// println("flag")
// }
// val value = data.await()
// workspace.logger.info { "Starting square on $value" }
// emit(data.name, data.map { it * it })
// }
}
val linear by task<Int> {
@ -94,17 +90,13 @@ class SimpleWorkspaceTest {
workspace.logger.info { "Starting linear on $name" }
arg * 2 + 1
}
// workspace.data.select<Int>().forEach { data ->
// workspace.logger.info { "Starting linear on $data" }
// emit(data.name, data.data.map { it * 2 + 1 })
// }
}
val fullSquare by task<Int> {
val squareData = from(square)
val linearData = from(linear)
squareData.forEach { data ->
val newData: Data<Int> = data.combine(linearData.get(data.name)!!) { l, r ->
val newData: Data<Int> = data.combine(linearData[data.name]!!) { l, r ->
l + r
}
data(data.name, newData)
@ -190,8 +182,10 @@ class SimpleWorkspaceTest {
@Test
fun testFilter() {
runBlocking {
val node = workspace.produce("filterOne")
assertEquals(12, node.asSequence().first().await())
val node = workspace.produce("filterOne") {
"name" put "myData[12]"
}
assertEquals(12, node.single().await())
}
}
}

@ -5,4 +5,4 @@ kotlin.code.style=official
kotlin.mpp.stability.nowarn=true
kotlin.incremental.js.ir=true
toolsVersion=0.12.0-kotlin-1.7.20-Beta
toolsVersion=0.13.0-kotlin-1.7.20-Beta

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.5.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists