diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/cache/CachePlugin.kt b/dataforge-core/src/main/kotlin/hep/dataforge/cache/CachePlugin.kt index afd7d1a5..6ba87927 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/cache/CachePlugin.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/cache/CachePlugin.kt @@ -87,27 +87,25 @@ class CachePlugin(meta: Meta) : BasicPlugin(meta) { } } + @Synchronized override fun run() { - //TODO add executor - synchronized(cache) { - when { - data.goal.isDone -> data.future.thenAccept { result.complete(it) } - cache.containsKey(id) -> { - logger.info("Cached result found. Restoring data from cache for id {}", id.hashCode()) - CompletableFuture.supplyAsync { cache.get(id) }.whenComplete { res, err -> - if (res != null) { - result.complete(res) - } else { - evalData() - } + when { + data.goal.isDone -> data.future.thenAccept { result.complete(it) } + cache.containsKey(id) -> { + logger.info("Cached result found. Restoring data from cache for id {}", id.hashCode()) + CompletableFuture.supplyAsync { cache.get(id) }.whenComplete { res, err -> + if (res != null) { + result.complete(res) + } else { + evalData() + } - if (err != null) { - logger.error("Failed to load data from cache", err) - } + if (err != null) { + logger.error("Failed to load data from cache", err) } } - else -> evalData() } + else -> evalData() } } @@ -162,7 +160,7 @@ class CachePlugin(meta: Meta) : BasicPlugin(meta) { } fun cacheNode(cacheName: String, node: DataNode, idFactory: (NamedData<*>) -> Meta): DataNode { - val builder = DataTree.edit(node.type).also {cached-> + val builder = DataTree.edit(node.type).also { cached -> cached.name = node.name cached.meta = node.meta //recursively caching everything @@ -176,7 +174,7 @@ class CachePlugin(meta: Meta) : BasicPlugin(meta) { private fun getCache(name: String, type: Class): Cache { return manager.getCache(name, Meta::class.java, type) - ?: manager.createCache(name, MetaCacheConfiguration(meta, type)) + ?: manager.createCache(name, MetaCacheConfiguration(meta, type)) } // @Override diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/data/DataNode.kt b/dataforge-core/src/main/kotlin/hep/dataforge/data/DataNode.kt index f44cafd1..bb1e6992 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/data/DataNode.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/data/DataNode.kt @@ -175,9 +175,7 @@ interface DataNode : Iterable>, Named, Metoid, Provide * * @return */ - fun nodeGoal(): GoalGroup { - return GoalGroup(this.dataStream().map { it.goal }.toList()) - } + fun nodeGoal(): GoalGroup = GoalGroup(dataStream().map { it.goal }.toList()) /** * Handle result when the node is evaluated. Does not trigger node evaluation. Ignores exceptional completion diff --git a/dataforge-core/src/main/kotlin/hep/dataforge/workspace/AbstractWorkspace.kt b/dataforge-core/src/main/kotlin/hep/dataforge/workspace/AbstractWorkspace.kt index a91fde86..e770a719 100644 --- a/dataforge-core/src/main/kotlin/hep/dataforge/workspace/AbstractWorkspace.kt +++ b/dataforge-core/src/main/kotlin/hep/dataforge/workspace/AbstractWorkspace.kt @@ -71,9 +71,8 @@ abstract class AbstractWorkspace( /** * Put given data node into cache one by one */ - private fun cacheTaskResult(model: TaskModel, node: DataNode): DataNode { - return cache.cacheNode(model.name, node) { model.getID(it) } - } + private fun cacheTaskResult(model: TaskModel, node: DataNode): DataNode = + cache.cacheNode(model.name, node) { model.getID(it) } override fun clean() { logger.info("Cleaning up cache...") diff --git a/dataforge-core/src/test/kotlin/hep/dataforge/workspace/WorkspaceTest.kt b/dataforge-core/src/test/kotlin/hep/dataforge/workspace/WorkspaceTest.kt index a05898db..210d9538 100644 --- a/dataforge-core/src/test/kotlin/hep/dataforge/workspace/WorkspaceTest.kt +++ b/dataforge-core/src/test/kotlin/hep/dataforge/workspace/WorkspaceTest.kt @@ -24,6 +24,7 @@ import hep.dataforge.meta.MetaBuilder import hep.dataforge.workspace.tasks.PipeTask import hep.dataforge.workspace.tasks.TaskModel import org.junit.Assert.assertEquals +import org.junit.Before import org.junit.BeforeClass import org.junit.Test import org.slf4j.LoggerFactory @@ -49,17 +50,23 @@ class WorkspaceTest { val res2 = wsp.runTask("test2", Meta.empty()) res1.computeAll() res2.computeAll() - assertEquals(6, counter.get().toLong()) + assertEquals(6, counter.get()) val res3 = wsp.runTask("test2", MetaBuilder().putValue("a", 1)) .getCheckedData("data_2", Number::class.java).get().toLong() assertEquals(6, res3) - assertEquals(8, counter.get().toLong()) + assertEquals(8, counter.get()) } companion object { private val counter = AtomicInteger() private lateinit var wsp: Workspace + @Before + fun beforeEach(){ + counter.set(0) + } + + @BeforeClass @JvmStatic fun setup() { @@ -67,19 +74,13 @@ class WorkspaceTest { load(CachePlugin::class.java, MetaBuilder().setValue("fileCache.enabled", false)) } - val task1 = object : PipeTask("test1", Number::class.java, Number::class.java) { override fun buildModel(model: TaskModel.Builder, meta: Meta) { model.data("*") } override fun result(context: Context, name: String, input: Number, meta: Meta): Number { - try { - Thread.sleep(200) - } catch (e: InterruptedException) { - throw RuntimeException(e) - } - + Thread.sleep(200) counter.incrementAndGet() return input.toInt() + meta.getInt("a", 2) } @@ -91,12 +92,7 @@ class WorkspaceTest { } override fun result(context: Context, name: String, input: Number, meta: Meta): Number { - try { - Thread.sleep(200) - } catch (e: InterruptedException) { - throw RuntimeException(e) - } - + Thread.sleep(200) counter.incrementAndGet() return input.toInt() * meta.getInt("b", 2) }