forked from NPM/numass-framework
Fix cache test
This commit is contained in:
parent
4e590d3e14
commit
d3689638e0
@ -87,27 +87,25 @@ class CachePlugin(meta: Meta) : BasicPlugin(meta) {
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
override fun run() {
|
||||
//TODO add executor
|
||||
synchronized(cache) {
|
||||
when {
|
||||
data.goal.isDone -> data.future.thenAccept { result.complete(it) }
|
||||
cache.containsKey(id) -> {
|
||||
logger.info("Cached result found. Restoring data from cache for id {}", id.hashCode())
|
||||
CompletableFuture.supplyAsync { cache.get(id) }.whenComplete { res, err ->
|
||||
if (res != null) {
|
||||
result.complete(res)
|
||||
} else {
|
||||
evalData()
|
||||
}
|
||||
when {
|
||||
data.goal.isDone -> data.future.thenAccept { result.complete(it) }
|
||||
cache.containsKey(id) -> {
|
||||
logger.info("Cached result found. Restoring data from cache for id {}", id.hashCode())
|
||||
CompletableFuture.supplyAsync { cache.get(id) }.whenComplete { res, err ->
|
||||
if (res != null) {
|
||||
result.complete(res)
|
||||
} else {
|
||||
evalData()
|
||||
}
|
||||
|
||||
if (err != null) {
|
||||
logger.error("Failed to load data from cache", err)
|
||||
}
|
||||
if (err != null) {
|
||||
logger.error("Failed to load data from cache", err)
|
||||
}
|
||||
}
|
||||
else -> evalData()
|
||||
}
|
||||
else -> evalData()
|
||||
}
|
||||
}
|
||||
|
||||
@ -162,7 +160,7 @@ class CachePlugin(meta: Meta) : BasicPlugin(meta) {
|
||||
}
|
||||
|
||||
fun <V : Any> cacheNode(cacheName: String, node: DataNode<V>, idFactory: (NamedData<*>) -> Meta): DataNode<V> {
|
||||
val builder = DataTree.edit(node.type).also {cached->
|
||||
val builder = DataTree.edit(node.type).also { cached ->
|
||||
cached.name = node.name
|
||||
cached.meta = node.meta
|
||||
//recursively caching everything
|
||||
@ -176,7 +174,7 @@ class CachePlugin(meta: Meta) : BasicPlugin(meta) {
|
||||
|
||||
private fun <V> getCache(name: String, type: Class<V>): Cache<Meta, V> {
|
||||
return manager.getCache(name, Meta::class.java, type)
|
||||
?: manager.createCache(name, MetaCacheConfiguration(meta, type))
|
||||
?: manager.createCache(name, MetaCacheConfiguration(meta, type))
|
||||
}
|
||||
|
||||
// @Override
|
||||
|
@ -175,9 +175,7 @@ interface DataNode<T : Any> : Iterable<NamedData<out T>>, Named, Metoid, Provide
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
fun nodeGoal(): GoalGroup {
|
||||
return GoalGroup(this.dataStream().map { it.goal }.toList())
|
||||
}
|
||||
fun nodeGoal(): GoalGroup = GoalGroup(dataStream().map { it.goal }.toList())
|
||||
|
||||
/**
|
||||
* Handle result when the node is evaluated. Does not trigger node evaluation. Ignores exceptional completion
|
||||
|
@ -71,9 +71,8 @@ abstract class AbstractWorkspace(
|
||||
/**
|
||||
* Put given data node into cache one by one
|
||||
*/
|
||||
private fun <R : Any> cacheTaskResult(model: TaskModel, node: DataNode<out R>): DataNode<out R> {
|
||||
return cache.cacheNode(model.name, node) { model.getID(it) }
|
||||
}
|
||||
private fun <R : Any> cacheTaskResult(model: TaskModel, node: DataNode<out R>): DataNode<out R> =
|
||||
cache.cacheNode(model.name, node) { model.getID(it) }
|
||||
|
||||
override fun clean() {
|
||||
logger.info("Cleaning up cache...")
|
||||
|
@ -24,6 +24,7 @@ import hep.dataforge.meta.MetaBuilder
|
||||
import hep.dataforge.workspace.tasks.PipeTask
|
||||
import hep.dataforge.workspace.tasks.TaskModel
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Before
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Test
|
||||
import org.slf4j.LoggerFactory
|
||||
@ -49,17 +50,23 @@ class WorkspaceTest {
|
||||
val res2 = wsp.runTask("test2", Meta.empty())
|
||||
res1.computeAll()
|
||||
res2.computeAll()
|
||||
assertEquals(6, counter.get().toLong())
|
||||
assertEquals(6, counter.get())
|
||||
val res3 = wsp.runTask("test2", MetaBuilder().putValue("a", 1))
|
||||
.getCheckedData("data_2", Number::class.java).get().toLong()
|
||||
assertEquals(6, res3)
|
||||
assertEquals(8, counter.get().toLong())
|
||||
assertEquals(8, counter.get())
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val counter = AtomicInteger()
|
||||
private lateinit var wsp: Workspace
|
||||
|
||||
@Before
|
||||
fun beforeEach(){
|
||||
counter.set(0)
|
||||
}
|
||||
|
||||
|
||||
@BeforeClass
|
||||
@JvmStatic
|
||||
fun setup() {
|
||||
@ -67,19 +74,13 @@ class WorkspaceTest {
|
||||
load(CachePlugin::class.java, MetaBuilder().setValue("fileCache.enabled", false))
|
||||
}
|
||||
|
||||
|
||||
val task1 = object : PipeTask<Number, Number>("test1", Number::class.java, Number::class.java) {
|
||||
override fun buildModel(model: TaskModel.Builder, meta: Meta) {
|
||||
model.data("*")
|
||||
}
|
||||
|
||||
override fun result(context: Context, name: String, input: Number, meta: Meta): Number {
|
||||
try {
|
||||
Thread.sleep(200)
|
||||
} catch (e: InterruptedException) {
|
||||
throw RuntimeException(e)
|
||||
}
|
||||
|
||||
Thread.sleep(200)
|
||||
counter.incrementAndGet()
|
||||
return input.toInt() + meta.getInt("a", 2)
|
||||
}
|
||||
@ -91,12 +92,7 @@ class WorkspaceTest {
|
||||
}
|
||||
|
||||
override fun result(context: Context, name: String, input: Number, meta: Meta): Number {
|
||||
try {
|
||||
Thread.sleep(200)
|
||||
} catch (e: InterruptedException) {
|
||||
throw RuntimeException(e)
|
||||
}
|
||||
|
||||
Thread.sleep(200)
|
||||
counter.incrementAndGet()
|
||||
return input.toInt() * meta.getInt("b", 2)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user