Workspace cache

This commit is contained in:
Alexander Nozik 2023-01-08 12:44:31 +03:00
parent e41fdfc086
commit 82838b6a92
7 changed files with 130 additions and 16 deletions

View File

@ -0,0 +1,46 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.data.DataTree.Companion.META_ITEM_NAME_TOKEN
import space.kscience.dataforge.io.Envelope
import space.kscience.dataforge.io.IOReader
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.reflect.KType
public abstract class EnvelopeTask<T : Any>(
override val descriptor: MetaDescriptor?,
private val reader: IOReader<T>,
) : Task<T> {
public abstract suspend fun produceEnvelopes(
workspace: Workspace,
taskName: Name,
taskMeta: Meta,
): Map<Name, Envelope>
override suspend fun execute(workspace: Workspace, taskName: Name, taskMeta: Meta): TaskResult<T> =
Result(workspace, taskName, taskMeta, reader, produceEnvelopes(workspace, taskName, taskMeta))
private class Result<T : Any>(
override val workspace: Workspace,
override val taskName: Name,
override val taskMeta: Meta,
val reader: IOReader<T>,
envelopes: Map<Name, Envelope>,
) : TaskResult<T> {
private val dataMap = envelopes.mapValues {
workspace.wrapData(it.value.toData(reader), it.key, taskName, taskMeta)
}
override val meta: Meta get() = dataMap[META_ITEM_NAME_TOKEN.asName()]?.meta ?: Meta.EMPTY
override val dataType: KType get() = reader.type
override fun iterator(): Iterator<TaskData<T>> = dataMap.values.iterator()
override fun get(name: Name): TaskData<T>? = dataMap[name]
}
}

View File

@ -45,7 +45,6 @@ public interface Workspace : ContextAware, Provider {
} }
public suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> { public suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> {
if (taskName == Name.EMPTY) return data
val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace") val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace")
return task.execute(this, taskName, taskMeta) return task.execute(this, taskName, taskMeta)
} }

View File

@ -14,14 +14,16 @@ public class WorkspaceBase internal constructor(
override val context: Context, override val context: Context,
data: DataSet<*>, data: DataSet<*>,
override val targets: Map<String, Meta>, override val targets: Map<String, Meta>,
private val externalTasks: Map<Name, Task<*>>, tasks: Map<Name, Task<*>>,
private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>, private val postProcess: suspend (TaskResult<*>) -> TaskResult<*>,
) : Workspace { ) : Workspace {
override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY) override val data: TaskResult<*> = wrapResult(data, Name.EMPTY, Meta.EMPTY)
override val tasks: Map<Name, Task<*>> by lazy { context.gather<Task<*>>(Task.TYPE) + externalTasks } override val tasks: Map<Name, Task<*>> by lazy { context.gather<Task<*>>(Task.TYPE) + tasks }
override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> = override suspend fun produce(taskName: Name, taskMeta: Meta): TaskResult<*> {
postProcess(super.produce(taskName, taskMeta)) val task = tasks[taskName] ?: error("Task with name $taskName not found in the workspace")
return postProcess(task.execute(this, taskName, taskMeta))
}
} }

View File

@ -16,6 +16,7 @@ public class InMemoryWorkspaceCache : WorkspaceCache {
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>() private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
//TODO do actual check //TODO do actual check
@Suppress("UNUSED_PARAMETER")
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T> private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> { override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
@ -36,3 +37,4 @@ public class InMemoryWorkspaceCache : WorkspaceCache {
} }
} }
} }

View File

@ -4,20 +4,15 @@ import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.* import space.kscience.dataforge.io.*
import space.kscience.dataforge.misc.DFInternal import space.kscience.dataforge.misc.DFInternal
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@DFInternal
public fun <T : Any> Envelope.toData(type: KType, format: IOReader<T>): Data<T> = Data(type, meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}
/** /**
* Convert an [Envelope] to a data via given format. The actual parsing is done lazily. * Convert an [Envelope] to a data via given format. The actual parsing is done lazily.
*/ */
@OptIn(DFInternal::class) @OptIn(DFInternal::class)
public inline fun <reified T : Any> Envelope.toData(format: IOReader<T>): Data<T> = toData(typeOf<T>(), format) public fun <T : Any> Envelope.toData(format: IOReader<T>): Data<T> = Data(format.type, meta) {
data?.readWith(format) ?: error("Can't convert envelope without data to Data")
}
public suspend fun <T : Any> Data<T>.toEnvelope(format: IOWriter<T>): Envelope { public suspend fun <T : Any> Data<T>.toEnvelope(format: IOWriter<T>): Envelope {
val obj = await() val obj = await()

View File

@ -0,0 +1,67 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.context.request
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
import space.kscience.dataforge.io.*
import space.kscience.dataforge.names.Name
import java.nio.file.Path
import kotlin.io.path.deleteIfExists
import kotlin.io.path.exists
import kotlin.reflect.KType
public class FileWorkspaceCache : WorkspaceCache {
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
val io = result.workspace.context.request(IOPlugin)
val format: IOFormat<T> = io.resolveIOFormat<T>(result.dataType, result.taskMeta)
?: error("Can't resolve IOFormat for ${result.dataType}")
fun cachedDataPath(dataName: Name): Path {
TODO()
}
fun cachedData(data: TaskData<T>): TaskData<T> {
val path = cachedDataPath(data.name)
val cachedData: Data<T> = Data<T>(data.type, meta = data.meta, dependencies = data.dependencies) {
if (path.exists()) {
try {
val envelope: Envelope = io.readEnvelopeFile(path)
if (envelope.meta != data.meta) error("Wrong metadata in cached result file")
return@Data envelope.data?.readWith(format)
?: error("Can't convert envelope without data to Data")
} catch (ex: Exception) {
//cleanup cache file
path.deleteIfExists()
}
}
return@Data data.await().also {
val envelope = Envelope {
meta = data.meta
data {
writeObject(format, it)
}
}
io.writeEnvelopeFile(path, envelope)
}
}
return data.workspace.wrapData(cachedData, data.name, data.taskName, data.taskMeta)
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = iterator {
result.iterator().forEach {
yield(cachedData(it))
}
}
override fun get(name: Name): TaskData<T>? = result[name]?.let { cachedData(it) }
}
}
}

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.workspace package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll import space.kscience.dataforge.data.startAll
@ -7,9 +8,11 @@ import space.kscience.dataforge.data.static
import space.kscience.dataforge.meta.Meta import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
class CachingWorkspaceTest { @OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class)
val workspace = Workspace { internal class CachingWorkspaceTest {
private val workspace = Workspace {
data { data {
//statically initialize data //statically initialize data
repeat(5) { repeat(5) {