File-based workspace caching

This commit is contained in:
Alexander Nozik 2023-03-20 17:53:40 +03:00
parent 61c8df9eb0
commit f3afb5e9fe
38 changed files with 357 additions and 159 deletions

View File

@ -2,6 +2,8 @@
## [Unreleased]
### Added
- File cache for workspace
- Smart task metadata transformation for workspace
- Add `readOnly` property to descriptors
- Add `specOrNull` delegate to meta and Scheme
- Suspended read methods to the `Binary`
@ -9,11 +11,13 @@
- More fine-grained types in Action builders.
### Changed
- `PluginFactory` no longer requires plugin class
- Collection<Named> toMap -> associateByName
- Simplified `DFTL` envelope format. Closing symbols are unnecessary. Properties are discontinued.
- Meta `get` method allows nullable receiver
- `withDefault` functions do not add new keys to meta children and are consistent.
- `dataforge.meta.values` package is merged into `dataforge.meta` for better star imports
- Kotlin 1.7.20
- Kotlin 1.8.20
- `Factory` is now `fun interface` and uses `build` instead of `invoke`. `invoke moved to an extension.
- KTor 2.0
- DataTree `items` call is blocking.

View File

@ -9,7 +9,7 @@ plugins {
allprojects {
group = "space.kscience"
version = "0.6.1-dev-4"
version = "0.6.1-dev-5"
}
subprojects {

View File

@ -9,6 +9,7 @@ kscience {
js()
native()
useCoroutines()
useSerialization()
dependencies {
api(project(":dataforge-meta"))
}

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name
import kotlin.properties.ReadOnlyProperty
@ -24,25 +25,33 @@ public abstract class AbstractPlugin(override val meta: Meta = Meta.EMPTY) : Plu
this._context = null
}
final override fun dependsOn(): Map<PluginFactory<*>, Meta> = dependencies
override fun dependsOn(): Map<PluginFactory<*>, Meta> = dependencies
protected fun <P : Plugin> require(
factory: PluginFactory<P>,
type: KClass<P>,
meta: Meta = Meta.EMPTY,
): ReadOnlyProperty<AbstractPlugin, P> {
dependencies[factory] = meta
return PluginDependencyDelegate(factory, type)
}
/**
* Register plugin dependency and return a delegate which provides lazily initialized reference to dependent plugin
*/
protected fun <P : Plugin> require(
protected inline fun <reified P : Plugin> require(
factory: PluginFactory<P>,
meta: Meta = Meta.EMPTY,
): ReadOnlyProperty<AbstractPlugin, P> {
dependencies[factory] = meta
return PluginDependencyDelegate(factory.type)
}
): ReadOnlyProperty<AbstractPlugin, P> = require(factory, P::class, meta)
}
public fun <T : Named> Collection<T>.toMap(): Map<Name, T> = associate { it.name to it }
public fun <T : Named> Collection<T>.associateByName(): Map<Name, T> = associate { it.name to it }
private class PluginDependencyDelegate<P : Plugin>(val type: KClass<out P>) : ReadOnlyProperty<AbstractPlugin, P> {
private class PluginDependencyDelegate<P : Plugin>(val factory: PluginFactory<P>, val type: KClass<P>) :
ReadOnlyProperty<AbstractPlugin, P> {
@OptIn(DFInternal::class)
override fun getValue(thisRef: AbstractPlugin, property: KProperty<*>): P {
if (!thisRef.isAttached) error("Plugin dependency must not be called eagerly during initialization.")
return thisRef.context.plugins[type] ?: error("Plugin with type $type not found")
return thisRef.context.plugins.getByType(type, factory.tag) ?: error("Plugin ${factory.tag} not found")
}
}

View File

@ -6,10 +6,10 @@ import kotlinx.coroutines.SupervisorJob
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.provider.Provider
import kotlin.coroutines.CoroutineContext
import kotlin.jvm.Synchronized
/**
* The local environment for anything being done in DataForge framework. Contexts are organized into tree structure with [Global] at the top.
@ -76,10 +76,10 @@ public open class Context internal constructor(
* @param name the relative (tail) name of the new context. If null, uses context hash code as a marker.
*/
@OptIn(DFExperimental::class)
@Synchronized
@ThreadSafe
public fun buildContext(name: Name? = null, block: ContextBuilder.() -> Unit = {}): Context {
val existing = name?.let { childrenContexts[name] }
return existing?.modify(block)?: ContextBuilder(this, name).apply(block).build().also {
return existing?.modify(block) ?: ContextBuilder(this, name).apply(block).build().also {
childrenContexts[it.name] = it
}
}

View File

@ -4,7 +4,6 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
import kotlin.reflect.KClass
public fun interface Logger {
public fun log(tag: String, body: () -> String)
@ -66,7 +65,6 @@ public class DefaultLogManager : AbstractPlugin(), LogManager {
override fun build(context: Context, meta: Meta): DefaultLogManager = DefaultLogManager()
override val tag: PluginTag = PluginTag(group = PluginTag.DATAFORGE_GROUP, name = "log.default")
override val type: KClass<out DefaultLogManager> = DefaultLogManager::class
}
}

View File

@ -0,0 +1,57 @@
package space.kscience.dataforge.context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.names.Name
/**
* A convenience factory to build simple plugins
*/
public class PluginBuilder(
name: String,
group: String = "",
version: String = "",
) {
public val tag: PluginTag = PluginTag(name, group, version)
private val content = HashMap<String, MutableMap<Name, Any>>()
private val dependencies = HashMap<PluginFactory<*>, Meta>()
public fun requires(
factory: PluginFactory<*>,
meta: Meta = Meta.EMPTY,
) {
dependencies[factory] = meta
}
public fun provides(target: String, items: Map<Name, Any>) {
content.getOrPut(target) { HashMap() }.putAll(items)
}
public fun provides(target: String, vararg items: Named) {
provides(target, items.associateBy { it.name })
}
public fun build(): PluginFactory<*> {
return object : PluginFactory<Plugin> {
override val tag: PluginTag get() = this@PluginBuilder.tag
override fun build(context: Context, meta: Meta): Plugin = object : AbstractPlugin() {
override val tag: PluginTag get() = this@PluginBuilder.tag
override fun content(target: String): Map<Name, Any> = this@PluginBuilder.content[target] ?: emptyMap()
override fun dependsOn(): Map<PluginFactory<*>, Meta> = this@PluginBuilder.dependencies
}
}
}
}
public fun PluginFactory(
name: String,
group: String = "",
version: String = "",
block: PluginBuilder.() -> Unit,
): PluginFactory<*> = PluginBuilder(name, group, version).apply(block).build()

View File

@ -2,12 +2,10 @@ package space.kscience.dataforge.context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.Type
import kotlin.reflect.KClass
@Type(PluginFactory.TYPE)
public interface PluginFactory<T : Plugin> : Factory<T> {
public val tag: PluginTag
public val type: KClass<out T>
public companion object {
public const val TYPE: String = "pluginFactory"
@ -20,5 +18,4 @@ public interface PluginFactory<T : Plugin> : Factory<T> {
internal class DeFactoPluginFactory<T : Plugin>(val plugin: T) : PluginFactory<T> {
override fun build(context: Context, meta: Meta): T = plugin
override val tag: PluginTag get() = plugin.tag
override val type: KClass<out T> get() = plugin::class
}

View File

@ -1,8 +1,10 @@
package space.kscience.dataforge.context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.plus
import kotlin.reflect.KClass
import kotlin.reflect.cast
/**
@ -64,15 +66,17 @@ public class PluginManager internal constructor(
* @param <T>
* @return
*/
@Suppress("UNCHECKED_CAST")
public operator fun <T : Any> get(type: KClass<out T>, tag: PluginTag? = null, recursive: Boolean = true): T? =
find(recursive) { type.isInstance(it) && (tag == null || tag.matches(it.tag)) } as T?
@DFInternal
public fun <T : Any> getByType(type: KClass<T>, tag: PluginTag? = null, inherit: Boolean = true): T? =
find(inherit) { type.isInstance(it) && (tag == null || tag.matches(it.tag)) }?.let { type.cast(it) }
@OptIn(DFInternal::class)
public inline operator fun <reified T : Any> get(tag: PluginTag? = null, recursive: Boolean = true): T? =
get(T::class, tag, recursive)
getByType(T::class, tag, recursive)
@OptIn(DFInternal::class)
public inline operator fun <reified T : Plugin> get(factory: PluginFactory<T>, recursive: Boolean = true): T? =
get(factory.type, factory.tag, recursive)
getByType(T::class, factory.tag, recursive)
override fun iterator(): Iterator<Plugin> = plugins.iterator()
}

View File

@ -1,5 +1,6 @@
package space.kscience.dataforge.context
import kotlinx.serialization.Serializable
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MetaRepr
@ -9,6 +10,7 @@ import space.kscience.dataforge.meta.MetaRepr
*
* @author Alexander Nozik
*/
@Serializable
public data class PluginTag(
val name: String,
val group: String = "",

View File

@ -2,7 +2,6 @@ package space.kscience.dataforge.context
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
public class ConsoleLogManager : AbstractPlugin(), LogManager {
@ -27,7 +26,6 @@ public class ConsoleLogManager : AbstractPlugin(), LogManager {
override fun build(context: Context, meta: Meta): ConsoleLogManager = ConsoleLogManager()
override val tag: PluginTag = PluginTag(group = PluginTag.DATAFORGE_GROUP, name = "log.jsConsole")
override val type: KClass<out ConsoleLogManager> = ConsoleLogManager::class
}
}

View File

@ -3,7 +3,6 @@ package space.kscience.dataforge.context
import org.slf4j.LoggerFactory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
public class SlfLogManager : AbstractPlugin(), LogManager {
@ -27,7 +26,6 @@ public class SlfLogManager : AbstractPlugin(), LogManager {
override fun build(context: Context, meta: Meta): SlfLogManager = SlfLogManager()
override val tag: PluginTag = PluginTag(group = PluginTag.DATAFORGE_GROUP, name = "log.kotlinLogging")
override val type: KClass<out SlfLogManager> = SlfLogManager::class
}
}

View File

@ -1,8 +1,10 @@
package space.kscience.dataforge.provider
import space.kscience.dataforge.context.Context
import space.kscience.dataforge.context.PluginBuilder
import space.kscience.dataforge.context.gather
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.Named
import space.kscience.dataforge.misc.Type
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
@ -35,3 +37,13 @@ public inline fun <reified T : Any> Provider.top(): Map<Name, T> {
public inline fun <reified T : Any> Context.gather(inherit: Boolean = true): Map<Name, T> =
gather<T>(T::class.dfType, inherit)
@DFExperimental
public inline fun <reified T : Any> PluginBuilder.provides(items: Map<Name, T>) {
provides(T::class.dfType, items)
}
@DFExperimental
public inline fun <reified T : Any> PluginBuilder.provides(vararg items: Named) {
provides(T::class.dfType, *items)
}

View File

@ -6,11 +6,11 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.collections.set
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
import kotlin.jvm.Synchronized
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@ -36,7 +36,7 @@ public class DataTreeBuilder<T : Any>(
override val updates: MutableSharedFlow<Name> = MutableSharedFlow<Name>()
@Synchronized
@ThreadSafe
private fun remove(token: NameToken) {
if (treeItems.remove(token) != null) {
launch {
@ -50,12 +50,12 @@ public class DataTreeBuilder<T : Any>(
(getItem(name.cutLast()).tree as? DataTreeBuilder)?.remove(name.lastOrNull()!!)
}
@Synchronized
@ThreadSafe
private fun set(token: NameToken, data: Data<T>) {
treeItems[token] = DataTreeItem.Leaf(data)
}
@Synchronized
@ThreadSafe
private fun set(token: NameToken, node: DataTree<T>) {
treeItems[token] = DataTreeItem.Node(node)
}
@ -103,7 +103,7 @@ public fun <T : Any> DataSource(
block: DataSourceBuilder<T>.() -> Unit,
): DataTreeBuilder<T> = DataTreeBuilder<T>(type, parent.coroutineContext).apply(block)
@Suppress("OPT_IN_USAGE","FunctionName")
@Suppress("OPT_IN_USAGE", "FunctionName")
public inline fun <reified T : Any> DataSource(
parent: CoroutineScope,
crossinline block: DataSourceBuilder<T>.() -> Unit,

View File

@ -8,10 +8,8 @@ import space.kscience.dataforge.io.EnvelopeFormatFactory
import space.kscience.dataforge.io.IOPlugin
import space.kscience.dataforge.io.MetaFormatFactory
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.asName
import kotlin.reflect.KClass
public class YamlPlugin(meta: Meta) : AbstractPlugin(meta) {
public val io: IOPlugin by require(IOPlugin)
@ -27,7 +25,6 @@ public class YamlPlugin(meta: Meta) : AbstractPlugin(meta) {
public companion object : PluginFactory<YamlPlugin> {
override val tag: PluginTag = PluginTag("io.yaml", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out YamlPlugin> = YamlPlugin::class
override fun build(context: Context, meta: Meta): YamlPlugin = YamlPlugin(meta)
}
}

View File

@ -9,7 +9,6 @@ import space.kscience.dataforge.meta.get
import space.kscience.dataforge.meta.string
import space.kscience.dataforge.misc.DFInternal
import space.kscience.dataforge.names.Name
import kotlin.reflect.KClass
import kotlin.reflect.KType
import kotlin.reflect.typeOf
@ -54,8 +53,8 @@ public class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
}
override fun content(target: String): Map<Name, Any> = when (target) {
META_FORMAT_TYPE -> defaultMetaFormats.toMap()
ENVELOPE_FORMAT_TYPE -> defaultEnvelopeFormats.toMap()
META_FORMAT_TYPE -> defaultMetaFormats.associateByName()
ENVELOPE_FORMAT_TYPE -> defaultEnvelopeFormats.associateByName()
IO_FORMAT_TYPE -> content(META_FORMAT_TYPE) + content(ENVELOPE_FORMAT_TYPE)
else -> super.content(target)
}
@ -69,7 +68,6 @@ public class IOPlugin(meta: Meta) : AbstractPlugin(meta) {
override val tag: PluginTag = PluginTag("io", group = PluginTag.DATAFORGE_GROUP)
override val type: KClass<out IOPlugin> = IOPlugin::class
override fun build(context: Context, meta: Meta): IOPlugin = IOPlugin(meta)
public val WORK_DIRECTORY_KEY: Name = Name.of("io", "workDirectory")

View File

@ -2,9 +2,9 @@ package space.kscience.dataforge.meta
import kotlinx.serialization.Serializable
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.js.JsName
import kotlin.jvm.Synchronized
/**
@ -248,10 +248,10 @@ private fun ObservableMeta.adoptBy(parent: MutableMetaImpl, key: NameToken) {
*/
private class MutableMetaImpl(
value: Value?,
children: Map<NameToken, Meta> = emptyMap()
children: Map<NameToken, Meta> = emptyMap(),
) : AbstractObservableMeta(), ObservableMutableMeta {
override var value = value
@Synchronized set(value) {
@ThreadSafe set(value) {
val oldValue = field
field = value
if (oldValue != value) {
@ -292,11 +292,11 @@ private class MutableMetaImpl(
override fun getOrCreate(name: Name): ObservableMutableMeta =
if (name.isEmpty()) this else get(name) ?: createNode(name)
@Synchronized
@ThreadSafe
private fun replaceItem(
key: NameToken,
oldItem: ObservableMutableMeta?,
newItem: ObservableMutableMeta?
newItem: ObservableMutableMeta?,
) {
if (oldItem != newItem) {
if (newItem == null) {
@ -318,7 +318,7 @@ private class MutableMetaImpl(
}
)
@Synchronized
@ThreadSafe
override fun setMeta(name: Name, node: Meta?) {
val oldItem: ObservableMutableMeta? = get(name)
if (oldItem != node) {
@ -337,6 +337,7 @@ private class MutableMetaImpl(
children[token] = newNode
}
}
else -> {
val token = name.firstOrNull()!!
//get existing or create new node.
@ -401,7 +402,7 @@ public inline fun Meta.copy(block: MutableMeta.() -> Unit = {}): Meta =
private class MutableMetaWithDefault(
val source: MutableMeta, val default: MetaProvider, val rootName: Name
val source: MutableMeta, val default: MetaProvider, val rootName: Name,
) : MutableMeta by source {
override val items: Map<NameToken, MutableMeta>
get() {

View File

@ -1,7 +1,7 @@
package space.kscience.dataforge.meta
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.jvm.Synchronized
import kotlin.reflect.KProperty1
@ -54,12 +54,12 @@ internal abstract class AbstractObservableMeta : ObservableMeta {
listeners.forEach { it.callback(this, name) }
}
@Synchronized
@ThreadSafe
override fun onChange(owner: Any?, callback: Meta.(name: Name) -> Unit) {
listeners.add(MetaListener(owner, callback))
}
@Synchronized
@ThreadSafe
override fun removeListener(owner: Any?) {
listeners.removeAll { it.owner === owner }
}

View File

@ -1,8 +1,8 @@
package space.kscience.dataforge.meta
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.jvm.Synchronized
/**
* A class that takes [MutableMeta] provider and adds obsevability on top of that
@ -10,7 +10,7 @@ import kotlin.jvm.Synchronized
private class ObservableMetaWrapper(
val root: MutableMeta,
val absoluteName: Name,
val listeners: MutableSet<MetaListener>
val listeners: MutableSet<MetaListener>,
) : ObservableMutableMeta {
override val items: Map<NameToken, ObservableMutableMeta>
get() = root.items.keys.associateWith {
@ -20,7 +20,7 @@ private class ObservableMetaWrapper(
override fun getMeta(name: Name): ObservableMutableMeta? =
root.getMeta(name)?.let { ObservableMetaWrapper(root, this.absoluteName + name, listeners) }
@Synchronized
@ThreadSafe
override fun onChange(owner: Any?, callback: Meta.(name: Name) -> Unit) {
listeners.add(
MetaListener(Pair(owner, absoluteName)) { name ->

View File

@ -5,8 +5,8 @@ import space.kscience.dataforge.meta.descriptors.MetaDescriptor
import space.kscience.dataforge.meta.descriptors.get
import space.kscience.dataforge.meta.descriptors.validate
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.misc.ThreadSafe
import space.kscience.dataforge.names.*
import kotlin.jvm.Synchronized
/**
* A base for delegate-based or descriptor-based scheme. [Scheme] has an empty constructor to simplify usage from [Specification].
@ -92,7 +92,7 @@ public open class Scheme : Described, MetaRepr, MutableMetaProvider, Configurabl
listeners.forEach { it.callback(this@Scheme.meta, pathName + name) }
}
@Synchronized
@ThreadSafe
override fun onChange(owner: Any?, callback: Meta.(name: Name) -> Unit) {
listeners.add(MetaListener(owner) { changedName ->
if (changedName.startsWith(pathName)) {
@ -101,7 +101,7 @@ public open class Scheme : Described, MetaRepr, MutableMetaProvider, Configurabl
})
}
@Synchronized
@ThreadSafe
override fun removeListener(owner: Any?) {
listeners.removeAll { it.owner === owner }
}

View File

@ -0,0 +1,7 @@
package space.kscience.dataforge.misc
@OptIn(ExperimentalMultiplatform::class)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER, AnnotationTarget.PROPERTY_SETTER)
@MustBeDocumented
@OptionalExpectation
public expect annotation class ThreadSafe()

View File

@ -0,0 +1,3 @@
package space.kscience.dataforge.misc
public actual typealias ThreadSafe = Synchronized

View File

@ -4,7 +4,6 @@ plugins {
kscience{
jvm()
js()
dependencies {
api(projects.dataforgeWorkspace)
implementation(kotlin("scripting-common"))

View File

@ -7,6 +7,9 @@ kscience{
js()
native()
useCoroutines()
useSerialization{
protobuf()
}
dependencies {
api(projects.dataforgeContext)
api(projects.dataforgeData)

View File

@ -0,0 +1,38 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
private typealias TaskResultId = Pair<Name, Meta>
public class InMemoryWorkspaceCache : WorkspaceCache {
// never do that at home!
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
//TODO do actual check
@Suppress("UNUSED_PARAMETER")
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
for (d: TaskData<T> in result) {
cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d }
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = (cache[result.taskName to result.taskMeta]
?.values?.map { it.checkType<T>(result.dataType) }
?: emptyList()).iterator()
override fun get(name: Name): TaskData<T>? {
val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null
//TODO check types
return cached.checkType(result.dataType)
}
}
}
}
public fun WorkspaceBuilder.inMemoryCache(): Unit = cache(InMemoryWorkspaceCache())

View File

@ -124,15 +124,15 @@ public class WorkspaceBuilder(private val parentContext: Context = Global) : Tas
tasks[taskName] = task
}
public fun useCache() {
cache = InMemoryWorkspaceCache()
public fun cache(cache: WorkspaceCache) {
this.cache = cache
}
public fun build(): Workspace {
val postProcess: suspend (TaskResult<*>) -> TaskResult<*> = { result ->
cache?.evaluate(result) ?: result
}
return WorkspaceBase(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks, postProcess)
return WorkspaceImpl(context ?: parentContext, data ?: DataSet.EMPTY, targets, tasks, postProcess)
}
}

View File

@ -1,40 +1,5 @@
package space.kscience.dataforge.workspace
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
import kotlin.reflect.KType
public interface WorkspaceCache {
public suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T>
}
private typealias TaskResultId = Pair<Name, Meta>
public class InMemoryWorkspaceCache : WorkspaceCache {
// never do that at home!
private val cache = HashMap<TaskResultId, HashMap<Name, TaskData<*>>>()
//TODO do actual check
@Suppress("UNUSED_PARAMETER")
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
override suspend fun <T : Any> evaluate(result: TaskResult<T>): TaskResult<T> {
for (d: TaskData<T> in result) {
cache.getOrPut(result.taskName to result.taskMeta) { HashMap() }.getOrPut(d.name) { d }
}
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> = (cache[result.taskName to result.taskMeta]
?.values?.map { it.checkType<T>(result.dataType) }
?: emptyList()).iterator()
override fun get(name: Name): TaskData<T>? {
val cached: TaskData<*> = cache[result.taskName to result.taskMeta]?.get(name) ?: return null
//TODO check types
return cached.checkType(result.dataType)
}
}
}
}

View File

@ -7,10 +7,7 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.names.Name
/**
* A simple workspace without caching
*/
public class WorkspaceBase internal constructor(
internal class WorkspaceImpl internal constructor(
override val context: Context,
data: DataSet<*>,
override val targets: Map<String, Meta>,

View File

@ -4,34 +4,56 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.data.DataSet
import space.kscience.dataforge.data.forEach
import space.kscience.dataforge.data.map
import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.MutableMeta
import space.kscience.dataforge.meta.toMutableMeta
import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.Name
import space.kscience.dataforge.names.plus
/**
* A task meta without a node corresponding to the task itself (removing a node with name of the task).
*/
public val TaskResultBuilder<*>.defaultDependencyMeta: Meta
get() = taskMeta.copy {
remove(taskName)
}
/**
* Select data using given [selector]
*
* @param selector a workspace data selector. Could be either task selector or initial data selector.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
*/
public suspend fun <T : Any> TaskResultBuilder<*>.from(
selector: DataSelector<T>,
meta: Meta = taskMeta
): DataSet<T> = selector.select(workspace, meta)
dependencyMeta: Meta = defaultDependencyMeta,
): DataSet<T> = selector.select(workspace, dependencyMeta)
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
plugin: P,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
): DataSet<T> {
require(workspace.context.plugins.contains(plugin)){"Plugin $plugin is not loaded into $workspace"}
val taskReference: TaskReference<T> = plugin.selectorBuilder()
return workspace.produce(plugin.name + taskReference.taskName, dependencyMeta) as TaskResult<T>
}
/**
* Select data from a [WorkspacePlugin] attached to this [Workspace] context.
*
* @param pluginFactory a plugin which contains the task definition. The plugin must be loaded into Workspace context.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
* @param selectorBuilder a builder of task from the plugin.
*/
public suspend inline fun <T : Any, reified P : WorkspacePlugin> TaskResultBuilder<*>.from(
pluginFactory: PluginFactory<P>,
meta: Meta = taskMeta,
dependencyMeta: Meta = defaultDependencyMeta,
selectorBuilder: P.() -> TaskReference<T>,
): DataSet<T> {
val plugin = workspace.context.plugins[pluginFactory]
?: error("Plugin ${pluginFactory.tag} not loaded into workspace context")
val taskReference: TaskReference<T> = plugin.selectorBuilder()
return workspace.produce(plugin.name + taskReference.taskName, meta) as TaskResult<T>
return workspace.produce(plugin.name + taskReference.taskName, dependencyMeta) as TaskResult<T>
}
public val TaskResultBuilder<*>.allData: DataSelector<*>
@ -42,18 +64,23 @@ public val TaskResultBuilder<*>.allData: DataSelector<*>
/**
* Perform a lazy mapping task using given [selector] and [action]. The meta of resulting
* TODO move selector to receiver with multi-receivers
*
* @param selector a workspace data selector. Could be either task selector or initial data selector.
* @param dependencyMeta meta used for selector. The same meta is used for caching. By default, uses [defaultDependencyMeta].
* @param dataMetaTransform additional transformation of individual data meta.
* @param action process individual data asynchronously.
*/
@DFExperimental
public suspend inline fun <T : Any, reified R : Any> TaskResultBuilder<R>.pipeFrom(
selector: DataSelector<T>,
selectorMeta: Meta = taskMeta,
dataMetaTransform: MutableMeta.() -> Unit = {},
dependencyMeta: Meta = defaultDependencyMeta,
dataMetaTransform: MutableMeta.(name: Name) -> Unit = {},
crossinline action: suspend (arg: T, name: Name, meta: Meta) -> R,
) {
from(selector, selectorMeta).forEach { data ->
from(selector, dependencyMeta).forEach { data ->
val meta = data.meta.toMutableMeta().apply {
taskName put taskMeta
dataMetaTransform()
taskMeta[taskName]?.let { taskName.put(it) }
dataMetaTransform(data.name)
}
val res = data.map(workspace.context.coroutineContext, meta) {

View File

@ -1,5 +1,15 @@
package space.kscience.dataforge.workspace
import io.ktor.utils.io.core.Input
import io.ktor.utils.io.core.Output
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.writeFully
import kotlinx.serialization.KSerializer
import kotlinx.serialization.json.Json
import kotlinx.serialization.protobuf.ProtoBuf
import kotlinx.serialization.serializer
import space.kscience.dataforge.context.error
import space.kscience.dataforge.context.logger
import space.kscience.dataforge.context.request
import space.kscience.dataforge.data.Data
import space.kscience.dataforge.data.await
@ -14,9 +24,32 @@ import kotlin.io.path.div
import kotlin.io.path.exists
import kotlin.reflect.KType
public class JsonIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
override fun readObject(input: Input): T = Json.decodeFromString(serializer, input.readUtf8String())
override fun writeObject(output: Output, obj: T) {
output.writeUtf8String(Json.encodeToString(serializer, obj))
}
}
public class ProtobufIOFormat<T : Any>(override val type: KType) : IOFormat<T> {
private val serializer: KSerializer<T> = serializer(type) as KSerializer<T>
override fun readObject(input: Input): T = ProtoBuf.decodeFromByteArray(serializer, input.readBytes())
override fun writeObject(output: Output, obj: T) {
output.writeFully(ProtoBuf.encodeToByteArray(serializer, obj))
}
}
public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCache {
private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
// private fun <T : Any> TaskData<*>.checkType(taskType: KType): TaskData<T> = this as TaskData<T>
@OptIn(DFExperimental::class, DFInternal::class)
@ -24,23 +57,24 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
val io = result.workspace.context.request(IOPlugin)
val format: IOFormat<T> = io.resolveIOFormat(result.dataType, result.taskMeta)
?: ProtobufIOFormat(result.dataType)
?: error("Can't resolve IOFormat for ${result.dataType}")
fun cachedDataPath(dataName: Name): Path = cacheDirectory /
result.taskName.withIndex(result.taskMeta.hashCode().toString(16)).toString() /
dataName.toString()
fun evaluateDatum(data: TaskData<T>): TaskData<T> {
val path = cachedDataPath(data.name)
val path = cacheDirectory /
result.taskName.withIndex(result.taskMeta.hashCode().toString(16)).toString() /
data.name.toString()
val datum: Data<T> = Data<T>(data.type, meta = data.meta, dependencies = data.dependencies) {
// return cached data if it is present
if (path.exists()) {
try {
val envelope: Envelope = io.readEnvelopeFile(path)
if (envelope.meta != data.meta) error("Wrong metadata in cached result file")
return@Data envelope.data?.readWith(format)
?: error("Can't convert envelope without data to Data")
return@Data (envelope.data ?: Binary.EMPTY).readWith(format)
} catch (ex: Exception) {
result.workspace.logger.error { "Failed to read data from cache: ${ex.localizedMessage}" }
//cleanup cache file
path.deleteIfExists()
}
@ -63,9 +97,11 @@ public class FileWorkspaceCache(public val cacheDirectory: Path) : WorkspaceCach
return object : TaskResult<T> by result {
override fun iterator(): Iterator<TaskData<T>> =
iterator().asSequence().map { evaluateDatum(it) }.iterator()
result.iterator().asSequence().map { evaluateDatum(it) }.iterator()
override fun get(name: Name): TaskData<T>? = result[name]?.let { evaluateDatum(it) }
}
}
}
public fun WorkspaceBuilder.fileCache(cacheDir: Path): Unit = cache(FileWorkspaceCache(cacheDir))

View File

@ -15,7 +15,7 @@ import space.kscience.dataforge.names.matches
* Select the whole data set from the workspace filtered by type.
*/
@OptIn(DFExperimental::class)
public inline fun <reified T : Any> TaskResultBuilder<*>.data(namePattern: Name? = null): DataSelector<T> =
public inline fun <reified T : Any> TaskResultBuilder<*>.dataByType(namePattern: Name? = null): DataSelector<T> =
object : DataSelector<T> {
override suspend fun select(workspace: Workspace, meta: Meta): DataSet<T> =
workspace.data.filterByType { name, _ ->

View File

@ -1,6 +1,7 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
@ -9,41 +10,58 @@ import space.kscience.dataforge.meta.Meta
import space.kscience.dataforge.meta.boolean
import space.kscience.dataforge.meta.get
import space.kscience.dataforge.misc.DFExperimental
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class, DFExperimental::class)
internal class CachingWorkspaceTest {
private val workspace = Workspace {
data {
//statically initialize data
repeat(5) {
static("myData[$it]", it)
}
}
useCache()
val doFirst by task<Any> {
pipeFrom(data()) { _, name, _ ->
println("Done first on $name with flag=${taskMeta["flag"].boolean ?: false}")
}
}
@Suppress("UNUSED_VARIABLE") val doSecond by task<Any>{
pipeFrom(doFirst) { _, name, _ ->
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
}
}
}
@Test
fun testMetaPropagation() = runTest {
var firstCounter = 0
var secondCounter = 0
val workspace = Workspace {
data {
//statically initialize data
repeat(5) {
static("myData[$it]", it)
}
}
inMemoryCache()
val doFirst by task<Any> {
pipeFrom(allData) { _, name, _ ->
firstCounter++
println("Done first on $name with flag=${taskMeta["flag"].boolean}")
}
}
@Suppress("UNUSED_VARIABLE")
val doSecond by task<Any> {
pipeFrom(
doFirst,
dependencyMeta = if(taskMeta["flag"].boolean == true) taskMeta else Meta.EMPTY
) { _, name, _ ->
secondCounter++
println("Done second on $name with flag=${taskMeta["flag"].boolean ?: false}")
}
}
}
val first = workspace.produce("doFirst")
val secondA = workspace.produce("doSecond")
val secondB = workspace.produce("doSecond", Meta { "flag" put true })
first.startAll(this)
secondA.startAll(this)
secondB.startAll(this)
val secondC = workspace.produce("doSecond")
coroutineScope {
first.startAll(this)
secondA.startAll(this)
secondB.startAll(this)
//repeat to check caching
secondC.startAll(this)
}
assertEquals(10, firstCounter)
assertEquals(10, secondCounter)
}
}

View File

@ -9,7 +9,6 @@ import space.kscience.dataforge.context.PluginFactory
import space.kscience.dataforge.context.PluginTag
import space.kscience.dataforge.data.*
import space.kscience.dataforge.meta.Meta
import kotlin.reflect.KClass
import kotlin.test.Test
import kotlin.test.assertEquals
@ -34,7 +33,6 @@ class DataPropagationTestPlugin : WorkspacePlugin() {
companion object : PluginFactory<DataPropagationTestPlugin> {
override val type: KClass<out DataPropagationTestPlugin> = DataPropagationTestPlugin::class
override fun build(context: Context, meta: Meta): DataPropagationTestPlugin = DataPropagationTestPlugin()

View File

@ -0,0 +1,33 @@
package space.kscience.dataforge.workspace
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Test
import space.kscience.dataforge.data.startAll
import space.kscience.dataforge.data.static
import space.kscience.dataforge.misc.DFExperimental
import java.nio.file.Files
@OptIn(ExperimentalCoroutinesApi::class,DFExperimental::class)
class FileWorkspaceCacheTest {
@Test
fun testCaching() = runTest {
val workspace = Workspace {
data {
//statically initialize data
repeat(5) {
static("myData[$it]", it)
}
}
fileCache(Files.createTempDirectory("dataforge-temporary-cache"))
task<String> {
pipeFrom(dataByType<String>()) { arg, _, _ -> arg }
}
}
workspace.produce("echo").startAll(this)
}
}

View File

@ -13,7 +13,6 @@ import space.kscience.dataforge.meta.*
import space.kscience.dataforge.misc.DFExperimental
import space.kscience.dataforge.names.get
import space.kscience.dataforge.names.plus
import kotlin.reflect.KClass
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@ -22,11 +21,10 @@ import kotlin.test.assertTrue
/**
* Make a fake-factory for a one single plugin. Useful for unique or test plugins
*/
public inline fun <reified P : Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory<P> {
public fun <P : Plugin> P.toFactory(): PluginFactory<P> = object : PluginFactory<P> {
override fun build(context: Context, meta: Meta): P = this@toFactory
override val tag: PluginTag = this@toFactory.tag
override val type: KClass<out P> = P::class
}
public fun Workspace.produceBlocking(task: String, block: MutableMeta.() -> Unit = {}): DataSet<Any> = runBlocking {
@ -39,7 +37,7 @@ internal object TestPlugin : WorkspacePlugin() {
val test by task {
// type is inferred
pipeFrom(data<Int>()) { arg, _, _ ->
pipeFrom(dataByType<Int>()) { arg, _, _ ->
logger.info { "Test: $arg" }
arg
}
@ -76,7 +74,7 @@ internal class SimpleWorkspaceTest {
}
val square by task<Int> {
pipeFrom(data<Int>()) { arg, name, meta ->
pipeFrom(dataByType<Int>()) { arg, name, meta ->
if (meta["testFlag"].boolean == true) {
println("Side effect")
}
@ -86,7 +84,7 @@ internal class SimpleWorkspaceTest {
}
val linear by task<Int> {
pipeFrom(data<Int>()) { arg, name, _ ->
pipeFrom(dataByType<Int>()) { arg, name, _ ->
workspace.logger.info { "Starting linear on $name" }
arg * 2 + 1
}

View File

@ -6,4 +6,4 @@ kotlin.mpp.stability.nowarn=true
kotlin.incremental.js.ir=true
kotlin.native.ignoreDisabledTargets=true
toolsVersion=0.14.3-kotlin-1.8.20-RC
toolsVersion=0.14.4-kotlin-1.8.20-RC

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -1,7 +1,7 @@
rootProject.name = "dataforge-core"
enableFeaturePreview("TYPESAFE_PROJECT_ACCESSORS")
enableFeaturePreview("VERSION_CATALOGS")
//enableFeaturePreview("VERSION_CATALOGS")
pluginManagement {