Basic design for workspaces

This commit is contained in:
Alexander Nozik 2019-01-31 21:21:02 +03:00
parent 012ee93ab2
commit a4a7163e0d
17 changed files with 345 additions and 307 deletions

View File

@ -1,16 +1,16 @@
package hep.dataforge.context
import hep.dataforge.meta.EmptyMeta
import hep.dataforge.meta.Meta
import hep.dataforge.meta.Config
import hep.dataforge.names.Name
abstract class AbstractPlugin(override val meta: Meta = EmptyMeta) : Plugin {
abstract class AbstractPlugin : Plugin {
private var _context: Context? = null
override val context: Context
get() = _context ?: error("Plugin $tag is not attached")
override val config = Config()
override fun attach(context: Context) {
this._context = context
}
@ -19,6 +19,8 @@ abstract class AbstractPlugin(override val meta: Meta = EmptyMeta) : Plugin {
this._context = null
}
//TODO make configuration activation-safe
override fun provideTop(target: String, name: Name): Any? = null
override fun listTop(target: String): Sequence<Name> = emptySequence()

View File

@ -4,12 +4,14 @@ import hep.dataforge.meta.*
import hep.dataforge.names.Name
import hep.dataforge.names.toName
import hep.dataforge.provider.Provider
import hep.dataforge.provider.provideAll
import hep.dataforge.values.Value
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import mu.KLogger
import mu.KotlinLogging
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.jvm.JvmName
/**
* The local environment for anything being done in DataForge framework. Contexts are organized into tree structure with [Global] at the top.
@ -23,29 +25,36 @@ import kotlin.coroutines.CoroutineContext
* Since plugins could contain mutable state, context has two states: active and inactive. No changes are allowed to active context.
* @author Alexander Nozik
*/
interface Context : Named, MetaRepr, Provider, CoroutineScope {
open class Context(final override val name: String, val parent: Context? = Global) : Named, MetaRepr, Provider,
CoroutineScope {
val parent: Context?
private val config = Config()
/**
* Context properties. Working as substitute for environment variables
*/
val properties: Meta
val properties: Meta = if (parent == null) {
config
} else {
Laminate(config, parent.properties)
}
/**
* Context logger
*/
val logger: KLogger
val logger: KLogger = KotlinLogging.logger(name)
/**
* A [PluginManager] for current context
*/
val plugins: PluginManager
val plugins: PluginManager by lazy { PluginManager(this) }
private val activators = HashSet<Any>()
/**
* Defines if context is used in any kind of active computations. Active context properties and plugins could not be changed
*/
val isActive: Boolean
val isActive: Boolean = activators.isNotEmpty()
override val defaultTarget: String get() = Plugin.PLUGIN_TARGET
@ -68,20 +77,36 @@ interface Context : Named, MetaRepr, Provider, CoroutineScope {
/**
* Mark context as active and used by [activator]
*/
fun activate(activator: Any)
fun activate(activator: Any) {
activators.add(activator)
}
/**
* Mark context unused by [activator]
*/
fun deactivate(activator: Any)
fun deactivate(activator: Any) {
activators.remove(activator)
}
/**
* Change the properties of the context. If active, throw an exception
*/
fun configure(action: Config.() -> Unit) {
if (isActive) error("Can't configure active context")
config.action()
}
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default
get() = EmptyCoroutineContext
/**
* Detach all plugins and terminate context
*/
fun close()
open fun close() {
if (isActive) error("Can't close active context")
//detach all plugins
plugins.forEach { it.detach() }
}
override fun toMeta(): Meta = buildMeta {
"parent" to parent?.name
@ -90,12 +115,45 @@ interface Context : Named, MetaRepr, Provider, CoroutineScope {
}
}
/**
* A sequences of all objects provided by plugins with given target and type
*/
fun Context.members(target: String): Sequence<Any> =
plugins.asSequence().flatMap { it.provideAll(target) }
@JvmName("typedMembers")
inline fun <reified T : Any> Context.members(target: String) =
members(target).filterIsInstance<T>()
/**
* A global root context. Closing [Global] terminates the framework.
*/
expect object Global : Context {
fun getContext(name: String): Context
object Global : Context("GLOBAL", null) {
/**
* Closing all contexts
*
* @throws Exception
*/
override fun close() {
logger.info { "Shutting down GLOBAL" }
for (ctx in contextRegistry.values) {
ctx.close()
}
super.close()
}
private val contextRegistry = HashMap<String, Context>()
/**
* Get previously builder context o builder a new one
*
* @param name
* @return
*/
fun getContext(name: String): Context {
return contextRegistry.getOrPut(name) { Context(name) }
}
}

View File

@ -0,0 +1,37 @@
package hep.dataforge.context
import hep.dataforge.meta.Config
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.configure
/**
* A convenience builder for context
*/
class ContextBuilder(var name: String = "@anonimous", val parent: Context = Global) {
private val plugins = ArrayList<Plugin>()
private var meta = MetaBuilder()
fun properties(action: MetaBuilder.() -> Unit) {
meta.action()
}
fun plugin(plugin: Plugin) {
plugins.add(plugin)
}
fun plugin(tag: PluginTag, action: Config.() -> Unit) {
plugins.add(PluginRepository.fetch(tag).configure(action))
}
fun plugin(name: String, group: String = "", version: String = "", action: Config.() -> Unit) {
plugin(PluginTag(name, group, version), action)
}
fun build(): Context {
return Context(name, parent).apply {
this@ContextBuilder.plugins.forEach {
plugins.load(it)
}
}
}
}

View File

@ -1,8 +1,8 @@
package hep.dataforge.context
import hep.dataforge.meta.Configurable
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaRepr
import hep.dataforge.meta.Metoid
import hep.dataforge.meta.buildMeta
import hep.dataforge.provider.Provider
@ -22,7 +22,7 @@ import hep.dataforge.provider.Provider
*
* @author Alexander Nozik
*/
interface Plugin : Named, Metoid, ContextAware, Provider, MetaRepr {
interface Plugin : Named, ContextAware, Provider, MetaRepr, Configurable {
/**
* Get tag for this plugin
@ -67,7 +67,7 @@ interface Plugin : Named, Metoid, ContextAware, Provider, MetaRepr {
"context" to context.name
"type" to this::class.simpleName
"tag" to tag
"meta" to meta
"meta" to config
}
companion object {

View File

@ -1,9 +1,6 @@
package hep.dataforge.context
import hep.dataforge.meta.EmptyMeta
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.buildMeta
import hep.dataforge.meta.*
import kotlin.reflect.KClass
/**
@ -115,8 +112,8 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
fun load(tag: PluginTag, meta: Meta = EmptyMeta): Plugin {
val loaded = get(tag, false)
return when {
loaded == null -> load(PluginRepository.fetch(tag, meta))
loaded.meta == meta -> loaded // if meta is the same, return existing plugin
loaded == null -> load(PluginRepository.fetch(tag)).configure(meta)
loaded.config == meta -> loaded // if meta is the same, return existing plugin
else -> throw RuntimeException("Can't load plugin with tag $tag. Plugin with this tag and different configuration already exists in context.")
}
}
@ -137,7 +134,7 @@ class PluginManager(override val context: Context) : ContextAware, Iterable<Plug
error("Corrupt type information in plugin repository")
}
}
loaded.meta == meta -> loaded // if meta is the same, return existing plugin
loaded.config == meta -> loaded // if meta is the same, return existing plugin
else -> throw RuntimeException("Can't load plugin with type $type. Plugin with this type and different configuration already exists in context.")
}
}

View File

@ -1,14 +1,17 @@
package hep.dataforge.context
import hep.dataforge.meta.Meta
import hep.dataforge.meta.configure
import kotlin.reflect.KClass
interface PluginFactory {
val tag: PluginTag
val type: KClass<out Plugin>
fun build(meta: Meta): Plugin
fun build(): Plugin
}
fun PluginFactory.build(meta: Meta) = build().configure(meta)
expect object PluginRepository {
@ -24,6 +27,22 @@ expect object PluginRepository {
/**
* Fetch specific plugin and instantiate it with given meta
*/
fun PluginRepository.fetch(tag: PluginTag, meta: Meta): Plugin =
PluginRepository.list().find { it.tag.matches(tag) }?.build(meta)
fun PluginRepository.fetch(tag: PluginTag): Plugin =
PluginRepository.list().find { it.tag.matches(tag) }?.build()
?: error("Plugin with tag $tag not found in the repository")
fun PluginRepository.register(tag: PluginTag, type: KClass<out Plugin>, constructor: () -> Plugin) {
val factory = object : PluginFactory {
override val tag: PluginTag = tag
override val type: KClass<out Plugin> = type
override fun build(): Plugin = constructor()
}
PluginRepository.register(factory)
}
inline fun <reified T : Plugin> PluginRepository.register(tag: PluginTag, noinline constructor: () -> T) =
register(tag, T::class, constructor)
fun PluginRepository.register(plugin: Plugin) = register(plugin.tag, plugin::class) { plugin }

View File

@ -1,75 +0,0 @@
package hep.dataforge.context
import hep.dataforge.meta.*
import mu.KLogger
import mu.KotlinLogging
import kotlin.jvm.Synchronized
actual object Global : Context, JSContext("GLOBAL", null) {
/**
* Closing all contexts
*
* @throws Exception
*/
override fun close() {
logger.info { "Shutting down GLOBAL" }
for (ctx in contextRegistry.values) {
ctx.close()
}
super.close()
}
private val contextRegistry = HashMap<String, Context>()
/**
* Get previously builder context o builder a new one
*
* @param name
* @return
*/
@Synchronized
actual fun getContext(name: String): Context {
return contextRegistry.getOrPut(name) { JSContext(name) }
}
}
open class JSContext(
final override val name: String,
final override val parent: JSContext? = Global,
properties: Meta = EmptyMeta
) : Context {
private val _properties = Config().apply { update(properties) }
override val properties: Meta
get() = if (parent == null) {
_properties
} else {
Laminate(_properties, parent.properties)
}
override val plugins: PluginManager by lazy { PluginManager(this) }
override val logger: KLogger = KotlinLogging.logger(name)
/**
* Free up resources associated with this context
*
* @throws Exception
*/
override fun close() {
if (isActive) error("Can't close active context")
//detach all plugins
plugins.forEach { it.detach() }
}
private val activators = HashSet<Any>()
override val isActive: Boolean = !activators.isEmpty()
override fun activate(activator: Any) {
activators.add(activator)
}
override fun deactivate(activator: Any) {
activators.clear()
}
}

View File

@ -1,8 +1,5 @@
package hep.dataforge.context
import hep.dataforge.meta.Meta
import kotlin.reflect.KClass
actual object PluginRepository {
@ -12,20 +9,6 @@ actual object PluginRepository {
factories.add(factory)
}
fun <T : Plugin> register(tag: PluginTag, type: KClass<out Plugin>, constructor: (Meta) -> T) {
val factory = object : PluginFactory {
override val tag: PluginTag = tag
override val type: KClass<out Plugin> = type
override fun build(meta: Meta): Plugin = constructor(meta)
}
register(factory)
}
inline fun <reified T : Plugin> register(tag: PluginTag, noinline constructor: (Meta) -> T) =
register(tag, T::class, constructor)
/**
* List plugins available in the repository
*/

View File

@ -0,0 +1,118 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hep.dataforge.context
import java.util.*
import kotlin.reflect.KClass
import kotlin.reflect.full.cast
class ClassLoaderPlugin(val classLoader: ClassLoader) : AbstractPlugin() {
override val tag: PluginTag = PluginTag("classLoader", PluginTag.DATAFORGE_GROUP)
private val serviceCache: MutableMap<Class<*>, ServiceLoader<*>> = HashMap()
fun <T : Any> services(type: KClass<T>): Sequence<T> {
return serviceCache.getOrPut(type.java) { ServiceLoader.load(type.java, classLoader) }.asSequence()
.map { type.cast(it) }
}
companion object {
val DEFAULT = ClassLoaderPlugin(Global::class.java.classLoader)
}
}
val Context.classLoaderPlugin get() = this.plugins.get() ?: ClassLoaderPlugin.DEFAULT
inline fun <reified T : Any> Context.services() = classLoaderPlugin.services(T::class)
//open class JVMContext(
// final override val name: String,
// final override val parent: JVMContext? = Global,
// classLoader: ClassLoader? = null,
// properties: Meta = EmptyMeta
//) : Context, AutoCloseable {
//
// override val properties: Meta = if (parent == null) {
// properties
// } else {
// Laminate(properties, parent.properties)
// }
//
// override val plugins: PluginManager by lazy { PluginManager(this) }
// override val logger: KLogger = KotlinLogging.logger(name)
//
// /**
// * A class loader for this context. Parent class loader is used by default
// */
// open val classLoader: ClassLoader = classLoader ?: parent?.classLoader ?: Global.classLoader
//
// /**
// * A property showing that dispatch thread is started in the context
// */
// private var started = false
//
// /**
// * A dispatch thread executor for current context
// *
// * @return
// */
// val dispatcher: ExecutorService by lazy {
// logger.info("Initializing dispatch thread executor in {}", name)
// Executors.newSingleThreadExecutor { r ->
// Thread(r).apply {
// priority = 8 // slightly higher priority
// isDaemon = true
// name = this@JVMContext.name + "_dispatch"
// }.also { started = true }
// }
// }
//
// private val serviceCache: MutableMap<Class<*>, ServiceLoader<*>> = HashMap()
//
// fun <T : Any> services(type: KClass<T>): Sequence<T> {
// return serviceCache.getOrPut(type.java) { ServiceLoader.load(type.java, classLoader) }.asSequence()
// .map { type.cast(it) }
// }
//
// /**
// * Free up resources associated with this context
// *
// * @throws Exception
// */
// override fun close() {
// if (isActive) error("Can't close active context")
// //detach all plugins
// plugins.forEach { it.detach() }
//
// if (started) {
// dispatcher.shutdown()
// }
// }
//
// private val activators = HashSet<WeakReference<Any>>()
//
// override val isActive: Boolean = activators.all { it.get() == null }
//
// override fun activate(activator: Any) {
// activators.add(WeakReference(activator))
// }
//
// override fun deactivate(activator: Any) {
// activators.removeAll { it.get() == activator }
// }
//}
//

View File

@ -1,65 +0,0 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hep.dataforge.context
import hep.dataforge.meta.Meta
import hep.dataforge.meta.buildMeta
import hep.dataforge.names.toName
import java.util.*
import kotlin.collections.HashMap
private fun Properties.asMeta(): Meta {
return buildMeta {
this@asMeta.forEach { key, value ->
set(key.toString().toName(), value)
}
}
}
/**
* A singleton global context. Automatic root for the whole context hierarchy. Also stores the registry for active contexts.
*
* @author Alexander Nozik
*/
actual object Global : Context, JVMContext("GLOBAL", null, Thread.currentThread().contextClassLoader) {
/**
* Closing all contexts
*
* @throws Exception
*/
override fun close() {
logger.info("Shutting down GLOBAL")
for (ctx in contextRegistry.values) {
ctx.close()
}
super.close()
}
private val contextRegistry = HashMap<String, Context>()
/**
* Get previously builder context o builder a new one
*
* @param name
* @return
*/
@Synchronized
actual fun getContext(name: String): Context {
return contextRegistry.getOrPut(name) { JVMContext(name) }
}
}

View File

@ -1,107 +0,0 @@
/*
* Copyright 2018 Alexander Nozik.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hep.dataforge.context
import hep.dataforge.meta.*
import mu.KLogger
import mu.KotlinLogging
import java.lang.ref.WeakReference
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.collections.HashSet
import kotlin.reflect.KClass
import kotlin.reflect.full.cast
open class JVMContext(
final override val name: String,
final override val parent: JVMContext? = Global,
classLoader: ClassLoader? = null,
properties: Meta = EmptyMeta
) : Context, AutoCloseable {
private val _properties = Config().apply { update(properties) }
override val properties: Meta
get() = if (parent == null) {
_properties
} else {
Laminate(_properties, parent.properties)
}
override val plugins: PluginManager by lazy { PluginManager(this) }
override val logger: KLogger = KotlinLogging.logger(name)
/**
* A class loader for this context. Parent class loader is used by default
*/
open val classLoader: ClassLoader = classLoader ?: parent?.classLoader ?: Global.classLoader
/**
* A property showing that dispatch thread is started in the context
*/
private var started = false
/**
* A dispatch thread executor for current context
*
* @return
*/
val dispatcher: ExecutorService by lazy {
logger.info("Initializing dispatch thread executor in {}", name)
Executors.newSingleThreadExecutor { r ->
Thread(r).apply {
priority = 8 // slightly higher priority
isDaemon = true
name = this@JVMContext.name + "_dispatch"
}.also { started = true }
}
}
private val serviceCache: MutableMap<Class<*>, ServiceLoader<*>> = HashMap()
fun <T : Any> services(type: KClass<T>): Sequence<T> {
return serviceCache.getOrPut(type.java) { ServiceLoader.load(type.java, classLoader) }.asSequence()
.map { type.cast(it) }
}
/**
* Free up resources associated with this context
*
* @throws Exception
*/
override fun close() {
if (isActive) error("Can't close active context")
//detach all plugins
plugins.forEach { it.detach() }
if (started) {
dispatcher.shutdown()
}
}
private val activators = HashSet<WeakReference<Any>>()
override val isActive: Boolean = activators.all { it.get() == null }
override fun activate(activator: Any) {
activators.add(WeakReference(activator))
}
override fun deactivate(activator: Any) {
activators.removeAll { it.get() == activator }
}
}

View File

@ -12,6 +12,6 @@ actual object PluginRepository {
* List plugins available in the repository
*/
actual fun list(): Sequence<PluginFactory> =
factories.asSequence() + Global.services(PluginFactory::class)
factories.asSequence() + Global.services()
}

View File

@ -1,6 +1,7 @@
package hep.dataforge.provider
import hep.dataforge.context.Context
import hep.dataforge.context.members
import kotlin.reflect.KClass
import kotlin.reflect.full.findAnnotation
@ -31,7 +32,5 @@ inline fun <reified T : Any> Provider.provideAllByType(): Sequence<T> {
/**
* A sequences of all objects provided by plugins with given target and type
*/
inline fun <reified T : Any> Context.components(): Sequence<T> {
return plugins.asSequence().flatMap { it.provideAll(Types[T::class]) }.filterIsInstance<T>()
}
inline fun <reified T : Any> Context.members(): Sequence<T> = members<T>(Types[T::class])

View File

@ -180,3 +180,5 @@ interface Metoid {
}
fun Value.toMeta() = buildMeta { Meta.VALUE_KEY to this }
fun Meta.isEmpty() = this === EmptyMeta || this.items.isEmpty()

View File

@ -31,12 +31,9 @@ class DataDependency(val filter: DataFilter) : Dependency() {
class TaskModelDependency(val name: String, val meta: Meta, val placement: Name = EmptyName) : Dependency() {
override fun apply(workspace: Workspace): DataNode<Any> {
val model =
workspace.tasks[name]?.build(workspace, meta) ?: error("Task with name $name not found in $workspace")
val task = workspace.tasks[model.name] ?: error("Task with name ${model.name} is not found in the workspace")
val task = workspace.tasks[name] ?: error("Task with name ${name} is not found in the workspace")
if (task.isTerminal) TODO("Support terminal task")
val result = task.run(model)
val result = with(workspace) { task(meta) }
return if (placement.isEmpty()) {
result
} else {

View File

@ -1,7 +1,8 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextAware
import hep.dataforge.context.Named
import hep.dataforge.context.members
import hep.dataforge.data.Data
import hep.dataforge.data.DataNode
import hep.dataforge.meta.Meta
@ -12,7 +13,7 @@ import hep.dataforge.provider.Type
@Type(Workspace.TYPE)
interface Workspace : ContextAware, Named, Provider {
interface Workspace : ContextAware, Provider {
/**
* The whole data node for current workspace
*/
@ -48,8 +49,40 @@ interface Workspace : ContextAware, Named, Provider {
}
}
operator fun <R : Any> Task<R>.invoke(config: Meta): DataNode<R> {
context.activate(this)
try {
val model = build(this@Workspace, config)
validate(model)
return run(model)
} finally {
context.deactivate(this)
}
}
/**
* Invoke a task in the workspace utilizing caching if possible
*/
operator fun <R : Any> Task<R>.invoke(targetName: String): DataNode<R> {
val target = targets[targetName] ?: error("A target with name $targetName not found in ${this@Workspace}")
return invoke(target)
}
companion object {
const val TYPE = "workspace"
}
}
class SimpleWorkspace(
override val context: Context,
override val data: DataNode<Any>,
override val targets: Map<String, Meta>,
tasks: Collection<Task<Any>>
) : Workspace {
override val tasks: Map<String, Task<*>> by lazy {
(context.members<Task<*>>(Task.TYPE) + tasks).associate { it.name to it }
}
}

View File

@ -0,0 +1,40 @@
package hep.dataforge.workspace
import hep.dataforge.context.Context
import hep.dataforge.context.ContextBuilder
import hep.dataforge.data.DataTreeBuilder
import hep.dataforge.meta.Meta
import hep.dataforge.meta.MetaBuilder
import hep.dataforge.meta.buildMeta
/**
* A builder for a workspace
*/
class WorkspaceBuilder(var context: Context) {
val data = DataTreeBuilder<Any>()
val targets = HashMap<String, Meta>()
val tasks = HashSet<Task<Any>>()
fun context(action: ContextBuilder.() -> Unit) {
this.context = ContextBuilder().apply(action).build()
}
fun data(action: DataTreeBuilder<Any>.() -> Unit) = data.apply(action)
fun target(name: String, meta: Meta) {
targets[name] = meta
}
fun target(name: String, action: MetaBuilder.() -> Unit) = target(name, buildMeta(action))
fun task(task: Task<*>) {
tasks.add(task)
}
fun build(): Workspace = SimpleWorkspace(
context,
data.build(),
targets,
tasks
)
}