Minor update to workspace cache
This commit is contained in:
parent
707b59e6fc
commit
61c8df9eb0
@ -42,6 +42,9 @@ private class TaskDataImpl<out T : Any>(
|
|||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adopt data into this workspace
|
||||||
|
*/
|
||||||
public fun <T : Any> Workspace.wrapData(data: Data<T>, name: Name, taskName: Name, taskMeta: Meta): TaskData<T> =
|
public fun <T : Any> Workspace.wrapData(data: Data<T>, name: Name, taskName: Name, taskMeta: Meta): TaskData<T> =
|
||||||
TaskDataImpl(this, data, name, taskName, taskMeta)
|
TaskDataImpl(this, data, name, taskName, taskMeta)
|
||||||
|
|
||||||
|
@ -4,30 +4,36 @@ import space.kscience.dataforge.context.request
|
|||||||
import space.kscience.dataforge.data.Data
|
import space.kscience.dataforge.data.Data
|
||||||
import space.kscience.dataforge.data.await
|
import space.kscience.dataforge.data.await
|
||||||
import space.kscience.dataforge.io.*
|
import space.kscience.dataforge.io.*
|
||||||
|
import space.kscience.dataforge.misc.DFExperimental
|
||||||
|
import space.kscience.dataforge.misc.DFInternal
|
||||||
import space.kscience.dataforge.names.Name
|
import space.kscience.dataforge.names.Name
|
||||||
|
import space.kscience.dataforge.names.withIndex
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
import kotlin.io.path.deleteIfExists
|
import kotlin.io.path.deleteIfExists
|
||||||
|
import kotlin.io.path.div
|
||||||
import kotlin.io.path.exists
|
import kotlin.io.path.exists
|
||||||
import kotlin.reflect.KType
|
import kotlin.reflect.KType
|
||||||
|
|
||||||
public class FileWorkspaceCache : WorkspaceCache {
|
public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCache {
|
||||||
|
|
||||||
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
|
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
|
||||||
|
|
||||||
|
|
||||||
|
@OptIn(DFExperimental::class, DFInternal::class)
|
||||||
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
|
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
|
||||||
val io = result.workspace.context.request(IOPlugin)
|
val io = result.workspace.context.request(IOPlugin)
|
||||||
|
|
||||||
val format: IOFormat<T> = io.resolveIOFormat<T>(result.dataType, result.taskMeta)
|
val format: IOFormat<T> = io.resolveIOFormat(result.dataType, result.taskMeta)
|
||||||
?: error("Can't resolve IOFormat for ${result.dataType}")
|
?: error("Can't resolve IOFormat for ${result.dataType}")
|
||||||
|
|
||||||
fun cachedDataPath(dataName: Name): Path {
|
fun cachedDataPath(dataName: Name): Path = cacheDirectory /
|
||||||
TODO()
|
result.taskName.withIndex(result.taskMeta.hashCode().toString(16)).toString() /
|
||||||
}
|
dataName.toString()
|
||||||
|
|
||||||
fun cachedData(data: TaskData<T>): TaskData<T> {
|
fun evaluateDatum(data: TaskData<T>): TaskData<T> {
|
||||||
val path = cachedDataPath(data.name)
|
val path = cachedDataPath(data.name)
|
||||||
val cachedData: Data<T> = Data<T>(data.type, meta = data.meta, dependencies = data.dependencies) {
|
val datum: Data<T> = Data<T>(data.type, meta = data.meta, dependencies = data.dependencies) {
|
||||||
|
// return cached data if it is present
|
||||||
if (path.exists()) {
|
if (path.exists()) {
|
||||||
try {
|
try {
|
||||||
val envelope: Envelope = io.readEnvelopeFile(path)
|
val envelope: Envelope = io.readEnvelopeFile(path)
|
||||||
@ -40,28 +46,26 @@ public class FileWorkspaceCache : WorkspaceCache {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return@Data data.await().also {
|
//waiting for data in current scope because Envelope is synchronous
|
||||||
|
return@Data data.await().also { result ->
|
||||||
val envelope = Envelope {
|
val envelope = Envelope {
|
||||||
meta = data.meta
|
meta = data.meta
|
||||||
data {
|
data {
|
||||||
writeObject(format, it)
|
writeObject(format, result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
io.writeEnvelopeFile(path, envelope)
|
io.writeEnvelopeFile(path, envelope)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return data.workspace.wrapData(cachedData, data.name, data.taskName, data.taskMeta)
|
return data.workspace.wrapData(datum, data.name, data.taskName, data.taskMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
return object : TaskResult<T> by result {
|
return object : TaskResult<T> by result {
|
||||||
override fun iterator(): Iterator<TaskData<T>> = iterator {
|
override fun iterator(): Iterator<TaskData<T>> =
|
||||||
result.iterator().forEach {
|
iterator().asSequence().map { evaluateDatum(it) }.iterator()
|
||||||
yield(cachedData(it))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun get(name: Name): TaskData<T>? = result[name]?.let { cachedData(it) }
|
override fun get(name: Name): TaskData<T>? = result[name]?.let { evaluateDatum(it) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -28,7 +28,7 @@ internal class CachingWorkspaceTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val doSecond by task<Any>{
|
@Suppress("UNUSED_VARIABLE") val doSecond by task<Any>{
|
||||||
pipeFrom(doFirst) { _, name, _ ->
|
pipeFrom(doFirst) { _, name, _ ->
|
||||||
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
|
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
|
||||||
}
|
}
|
||||||
|
@ -6,4 +6,4 @@ kotlin.mpp.stability.nowarn=true
|
|||||||
kotlin.incremental.js.ir=true
|
kotlin.incremental.js.ir=true
|
||||||
kotlin.native.ignoreDisabledTargets=true
|
kotlin.native.ignoreDisabledTargets=true
|
||||||
|
|
||||||
toolsVersion=0.14.2-kotlin-1.8.10
|
toolsVersion=0.14.3-kotlin-1.8.20-RC
|
||||||
|
Loading…
Reference in New Issue
Block a user