From 82838b6a92fc4432c4642e437bc5664d84c874a3 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Sun, 8 Jan 2023 12:44:31 +0300 Subject: [PATCH] Workspace cache --- .../dataforge/workspace/EnvelopeTask.kt | 46 +++++++++++++ .../kscience/dataforge/workspace/Workspace.kt | 1 - .../dataforge/workspace/WorkspaceBase.kt | 10 +-- .../dataforge/workspace/WorkspaceCache.kt | 4 +- .../dataforge/workspace/envelopeData.kt | 11 +-- .../dataforge/workspace/FileWorkspaceCache.kt | 67 +++++++++++++++++++ .../workspace/CachingWorkspaceTest.kt | 7 +- 7 files changed, 130 insertions(+), 16 deletions(-) create mode 100644 dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/EnvelopeTask.kt create mode 100644 dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCache.kt diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/EnvelopeTask.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/EnvelopeTask.kt new file mode 100644 index 00000000..a1588a54 --- /dev/null +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/EnvelopeTask.kt @@ -0,0 +1,46 @@ +package space.kscience.dataforge.workspace + +import space.kscience.dataforge.data.DataTree.Companion.META_ITEM_NAME_TOKEN +import space.kscience.dataforge.io.Envelope +import space.kscience.dataforge.io.IOReader +import space.kscience.dataforge.meta.Meta +import space.kscience.dataforge.meta.descriptors.MetaDescriptor +import space.kscience.dataforge.names.Name +import space.kscience.dataforge.names.asName +import kotlin.reflect.KType + +public abstract class EnvelopeTask( + override val descriptor: MetaDescriptor?, + private val reader: IOReader, +) : Task { + + public abstract suspend fun produceEnvelopes( + workspace: Workspace, + taskName: Name, + taskMeta: Meta, + ): Map + + override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult = + Result(workspace, taskName, taskMeta, reader, produceEnvelopes(workspace, taskName, taskMeta)) + + private class Result( + override val workspace: Workspace, + override val taskName: Name, + override val taskMeta: Meta, + val reader: IOReader, + envelopes: Map, + ) : TaskResult { + + private val dataMap = envelopes.mapValues { + workspace.wrapData(it.value.toData(reader), it.key, taskName, taskMeta) + } + override val meta: Meta get() = dataMap[META_ITEM_NAME_TOKEN.asName()]?.meta ?: Meta.EMPTY + + override val dataType: KType get() = reader.type + + override fun iterator(): Iterator> = dataMap.values.iterator() + + override fun get(name: Name): TaskData? = dataMap[name] + } +} + diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt index b53f20f1..52f92037 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/Workspace.kt @@ -45,7 +45,6 @@ public interface Workspace : ContextAware, Provider { } public suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> { - if (taskName == Name.EMPTY) return data val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace") return task.execute(this, taskName, taskMeta) } diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt index e9422703..411605a3 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceBase.kt @@ -14,14 +14,16 @@ public class WorkspaceBase internal constructor( override val context: Context, data: DataSet<*>, override val targets: Map, - private val externalTasks: Map>, + tasks: Map>, private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>, ) : Workspace { override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) - override val tasks: Map> by lazy { context.gather>(Task.TYPE) + externalTasks } + override val tasks: Map> by lazy { context.gather>(Task.TYPE) + tasks } - override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = - postProcess(super.produce(taskName, taskMeta)) + override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> { + val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace") + return postProcess(task.execute(this, taskName, taskMeta)) + } } \ No newline at end of file diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt index 0f37d4e9..af0dc0bf 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/WorkspaceCache.kt @@ -16,6 +16,7 @@ public class InMemoryWorkspaceCache : WorkspaceCache { private val cache = HashMap>>() //TODO do actual check + @Suppress("UNUSED_PARAMETER") private fun TaskData<*>.checkType(taskType: KType): TaskData = this as TaskData override suspend fun evaluate(result: TaskResult): TaskResult { @@ -35,4 +36,5 @@ public class InMemoryWorkspaceCache : WorkspaceCache { } } } -} \ No newline at end of file +} + diff --git a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/envelopeData.kt b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/envelopeData.kt index d53a8979..d88a7333 100644 --- a/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/envelopeData.kt +++ b/dataforge-workspace/src/commonMain/kotlin/space/kscience/dataforge/workspace/envelopeData.kt @@ -4,20 +4,15 @@ import space.kscience.dataforge.data.Data import space.kscience.dataforge.data.await import space.kscience.dataforge.io.* import space.kscience.dataforge.misc.DFInternal -import kotlin.reflect.KType -import kotlin.reflect.typeOf -@DFInternal -public fun Envelope.toData(type: KType, format: IOReader): Data = Data(type, meta) { - data?.readWith(format) ?: error("Can't convert envelope without data to Data") -} - /** * Convert an [Envelope] to a data via given format. The actual parsing is done lazily. */ @OptIn(DFInternal::class) -public inline fun Envelope.toData(format: IOReader): Data = toData(typeOf(), format) +public fun Envelope.toData(format: IOReader): Data = Data(format.type, meta) { + data?.readWith(format) ?: error("Can't convert envelope without data to Data") +} public suspend fun Data.toEnvelope(format: IOWriter): Envelope { val obj = await() diff --git a/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCache.kt b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCache.kt new file mode 100644 index 00000000..acda84e4 --- /dev/null +++ b/dataforge-workspace/src/jvmMain/kotlin/space/kscience/dataforge/workspace/FileWorkspaceCache.kt @@ -0,0 +1,67 @@ +package space.kscience.dataforge.workspace + +import space.kscience.dataforge.context.request +import space.kscience.dataforge.data.Data +import space.kscience.dataforge.data.await +import space.kscience.dataforge.io.* +import space.kscience.dataforge.names.Name +import java.nio.file.Path +import kotlin.io.path.deleteIfExists +import kotlin.io.path.exists +import kotlin.reflect.KType + +public class FileWorkspaceCache : WorkspaceCache { + + private fun TaskData<*>.checkType(taskType: KType): TaskData = this as TaskData + + + override suspend fun evaluate(result: TaskResult): TaskResult { + val io = result.workspace.context.request(IOPlugin) + + val format: IOFormat = io.resolveIOFormat(result.dataType, result.taskMeta) + ?: error("Can't resolve IOFormat for ${result.dataType}") + + fun cachedDataPath(dataName: Name): Path { + TODO() + } + + fun cachedData(data: TaskData): TaskData { + val path = cachedDataPath(data.name) + val cachedData: Data = Data(data.type, meta = data.meta, dependencies = data.dependencies) { + if (path.exists()) { + try { + val envelope: Envelope = io.readEnvelopeFile(path) + if (envelope.meta != data.meta) error("Wrong metadata in cached result file") + return@Data envelope.data?.readWith(format) + ?: error("Can't convert envelope without data to Data") + } catch (ex: Exception) { + //cleanup cache file + path.deleteIfExists() + } + } + + return@Data data.await().also { + val envelope = Envelope { + meta = data.meta + data { + writeObject(format, it) + } + } + io.writeEnvelopeFile(path, envelope) + } + + } + return data.workspace.wrapData(cachedData, data.name, data.taskName, data.taskMeta) + } + + return object : TaskResult by result { + override fun iterator(): Iterator> = iterator { + result.iterator().forEach { + yield(cachedData(it)) + } + } + + override fun get(name: Name): TaskData? = result[name]?.let { cachedData(it) } + } + } +} \ No newline at end of file diff --git a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt index 9bf72aec..49265ca0 100644 --- a/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt +++ b/dataforge-workspace/src/jvmTest/kotlin/space/kscience/dataforge/workspace/CachingWorkspaceTest.kt @@ -1,5 +1,6 @@ package space.kscience.dataforge.workspace +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.runTest import org.junit.jupiter.api.Test import space.kscience.dataforge.data.startAll @@ -7,9 +8,11 @@ import space.kscience.dataforge.data.static import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.get +import space.kscience.dataforge.misc.DFExperimental -class CachingWorkspaceTest { - val workspace = Workspace { +@OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class) +internal class CachingWorkspaceTest { + private val workspace = Workspace { data { //statically initialize data repeat(5) {