Compare commits

6 Commits
main ... dev

27 changed files with 1005 additions and 54 deletions

70
README.md Normal file
View File

@@ -0,0 +1,70 @@
# WorkFlow: industrial process simulation and monitoring tool
## Aim
The aim of this work is to create a toolset to monitor and simulate industrial processes like production and processing plants,
resource management for project-based teams, budgeting in complex cases, etc.
The model could be used for:
* project monitoring;
* project optimisation;
* project automation;
## Not-aim
Creating GUI is currently out of scope.
Technical process simulation is done with Controls-kt.
Project strategy management is currently out of scope.
## Concepts
The project is based on several main constructs:
### Resource/object graph
The current state of the system is described as a graph structure where each node is a separate entity. Each node has a type
and unique ID. Some node types could also have **parameters**, that could not be changed after node creation.
Node can have any number of **attributes** (they could be added, removed and replaced). Attributes define the current node state.
Node can have external descriptor that stores meta-information about the node. Like access rights, visualization preferences, etc.
### Attributes
Attributes are strongly typed containers for values. An attribute key stores information about attribute name and attribute type.
Also, it could store information about default values and serialization rules.
Some attributes are **relations** that connect two nodes with specific directed connection. Currently, we use only one-to-one relations.
**To be discussed:** attributes could have their own attributes either defined in themselves, or in schema.
### Work
Work is a list of events that contains rules to modify object graph. Basically it has the following events:
* add node;
* remove node;
* modify attribute (add/remove/replace);
Work also could have events that do not modify resource graph but still should be monitored. Like change of external balance.
Work events are consumed by a work graph with respect to attached **processes**. It is possible that the process rejects node change.
It is also possible that the process adds new node changes based on the graph state.
**To be discussed:** In case of event rejection, part of work could still be completed based on rules and event relations.
The specific rules should be further discussed.
### Process
A "smart-contract" (no blockchain yet) that defines automatic changes to object graph based on **work** being done.
### History
The log of all changes in object graph. It could be used to construct "time machine" and reverse the state of the graph to any state in the past.
The history events mirror **work** events.
## Further discussion
Time management is out of the scope for now, but it should be considered in the future. For example, there should be possibility to
add delay to the process and allow it to propagate in a virtual time. Timeline from *simulations-kt* could be used for that.

View File

@@ -4,6 +4,3 @@ plugins {
group = "center.sciprog"
version = "0.1.0"
val attributesVersion: String by extra("0.1.0")

View File

@@ -1,3 +1,3 @@
kotlin.code.style=official
toolsVersion=0.15.2-kotlin-1.9.22
toolsVersion=0.17.1-kotlin-2.1.20

View File

@@ -0,0 +1,9 @@
[versions]
tinkerpop = "3.7.2"
[libraries]
attributes = "space.kscience:attributes-kt:0.3.0"
gremlin-core = { module = "org.apache.tinkerpop:gremlin-core", version.ref = "tinkerpop" }
tinkergraph-gremlin = { module = "org.apache.tinkerpop:tinkergraph-gremlin", version.ref = "tinkerpop" }

View File

@@ -39,4 +39,9 @@ dependencyResolutionManagement {
}
}
include(":workflow-core")
include(
":workflow-core",
":workflow-budget",
":workflow-state",
":workflow-tinkerpop",
)

View File

@@ -0,0 +1,28 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
val attributesVersion: String by rootProject.extra
kscience {
jvm()
js()
native()
useCoroutines()
useSerialization()
commonMain {
api(projects.workflowCore)
}
jvmTest {
implementation(spclibs.logback.classic)
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@@ -0,0 +1,33 @@
package center.sciprog.workflow.budget
import center.sciprog.workflow.Work
import center.sciprog.workflow.WorkFunction
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.fold
import kotlinx.datetime.Instant
public class Payments(
public val subject: Subject,
public val timeFrame: ClosedRange<Instant>,
public val valueExtractor: (Money) -> Double,
override val defaultBase: Double = 0.0,
) : WorkFunction<Double> {
override suspend fun compute(work: Work, base: Double): Double {
return work.flow().filterIsInstance<ExecutedTransaction>().filter {
it.toSubject == subject && it.time in timeFrame
}.fold(base) { acc, value -> acc + valueExtractor(value.amount) }
}
public companion object {
public fun inRubles(
subject: Subject,
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
): Payments = Payments(
subject,
timeFrame,
{ if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") }
)
}
}

View File

@@ -1,4 +1,4 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import kotlinx.serialization.Serializable

View File

@@ -1,4 +1,4 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow

View File

@@ -1,5 +1,8 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import center.sciprog.workflow.EventId
import center.sciprog.workflow.WorkBuilder
import center.sciprog.workflow.WorkEvent
import kotlinx.datetime.*
import kotlinx.serialization.Serializable
import space.kscience.attributes.Attributes
@@ -60,5 +63,15 @@ public suspend fun WorkBuilder.plannedTransaction(
time: Instant = Clock.System.now(),
attributesBuilder: AttributesBuilder<PlannedTransaction>.() -> Unit = {},
) {
put(PlannedTransaction(generateId(), plannedOn, fromSubject, toSubject, amount, time, Attributes(attributesBuilder)))
put(
PlannedTransaction(
generateId(),
plannedOn,
fromSubject,
toSubject,
amount,
time,
Attributes(attributesBuilder)
)
)
}

View File

@@ -1,4 +1,4 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import kotlinx.serialization.KSerializer
import kotlinx.serialization.builtins.serializer

View File

@@ -1,22 +1,23 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import center.sciprog.workflow.WorkList
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.*
import kotlinx.datetime.Clock
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.days
internal class WorkFunctionTest {
@Test
fun testPayments() = runTest{
val person = Person("test","Test Subject")
fun testPayments() = runTest {
val person = Person("test", "Test Subject")
val org = Organization("Master", "Master organization")
val work = WorkList {
val now = Clock.System.now()
repeat(10) {
transaction(org, person, 1000.rubles, time = now.minus((it*30).days))
transaction(org, person, 1000.rubles, time = now.minus((it * 30).days))
}
}

View File

@@ -15,8 +15,7 @@ kscience {
useSerialization()
commonMain {
api(spclibs.kotlinx.datetime)
api("space.kscience:attributes-kt:$attributesVersion")
api("com.benasher44:uuid:0.8.4")
api(libs.attributes)
}
jvmTest {

View File

@@ -0,0 +1,6 @@
package center.sciprog.workflow
public interface ReversibleWork : Work {
public fun reversed(): Work
public fun reverseEvent(event: WorkEvent): WorkEvent
}

View File

@@ -3,7 +3,6 @@ package center.sciprog.workflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.merge
import kotlinx.datetime.Instant
import kotlinx.datetime.LocalDateTime
import space.kscience.attributes.AttributeContainer
public typealias WorkId = String
@@ -23,10 +22,17 @@ public interface WorkEvent : AttributeContainer {
* A localized time operation is attributed to
*/
public val time: Instant
/**
* Other events that happened in order for this one to occur
*/
public val dependsOn: Collection<EventId> get() = emptySet()
}
/**
* A central API for all workflow computations
* A central API for all workflow computations.
*
* Work represents a flow of events that could be run multiple times.
*/
public interface Work {
@@ -42,6 +48,7 @@ public interface Work {
}
}
/**
* A work that can produce a modified work via [WorkBuilder]
*/

View File

@@ -1,37 +1,11 @@
package center.sciprog.workflow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.fold
import kotlinx.datetime.Instant
/**
* Aggregate values for given [Work]
*/
public interface WorkFunction<T> {
public val emptyBase: T
public suspend fun compute(work: Work, base: T = emptyBase): T
public val defaultBase: T
public suspend fun compute(work: Work, base: T = defaultBase): T
}
public class Payments(
public val subject: Subject,
public val timeFrame: ClosedRange<Instant>,
public val valueExtractor: (Money) -> Double,
override val emptyBase: Double = 0.0,
) : WorkFunction<Double> {
override suspend fun compute(work: Work, base: Double): Double {
return work.flow().filterIsInstance<ExecutedTransaction>().filter {
it.toSubject == subject && it.time in timeFrame
}.fold(base) { acc, value -> acc + valueExtractor(value.amount) }
}
public companion object {
public fun inRubles(
subject: Subject,
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
): Payments = Payments(
subject,
timeFrame,
{ if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") }
)
}
}

View File

@@ -1,11 +1,15 @@
@file:OptIn(ExperimentalUuidApi::class)
package center.sciprog.workflow
import com.benasher44.uuid.uuid4
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
private class WorkListBuilder(val idPrefix: String, val events: MutableList<WorkEvent>) : WorkBuilder {
override suspend fun generateId(): EventId = idPrefix + "-" + uuid4().toString()
override suspend fun generateId(): EventId = idPrefix + "-" + Uuid.random().toHexString()
override suspend fun put(event: WorkEvent) {
events.add(event)
@@ -32,7 +36,7 @@ public class WorkList(private val events: List<WorkEvent>) : WorkWithBuilder<Wor
override suspend fun modified(block: suspend WorkBuilder.() -> Unit): WorkList {
val modifiedEvents = ArrayList(events)
val prefix = uuid4().leastSignificantBits.toString(16)
val prefix = Uuid.random().toHexString()
WorkListBuilder(prefix, modifiedEvents).block()
return WorkList((modifiedEvents))
}
@@ -40,7 +44,7 @@ public class WorkList(private val events: List<WorkEvent>) : WorkWithBuilder<Wor
public suspend fun WorkList(block: suspend WorkBuilder.() -> Unit): WorkList {
val events = mutableListOf<WorkEvent>()
val prefix = uuid4().leastSignificantBits.toString(16)
val prefix = Uuid.random().toHexString()
val builder = WorkListBuilder(prefix, events)
builder.block()
return WorkList(events)

View File

@@ -0,0 +1,22 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
kscience {
jvm()
js()
native()
useCoroutines()
useSerialization()
commonMain {
api(projects.workflowCore)
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@@ -0,0 +1,150 @@
package center.sciprog.workflow.state
import center.sciprog.workflow.EventId
import center.sciprog.workflow.WorkEvent
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
public sealed interface GraphEvent {
public val time: Instant
}
/**
* Graph mutation event
*/
public sealed interface GraphMutationEvent : GraphEvent {
/**
* The time in which event was emitted
*/
override val time: Instant
public data class SetAttribute<T>(
val nodeId: NodeId,
val attribute: Attribute<T>,
val value: T,
override val time: Instant
) : GraphMutationEvent
public data class RemoveAttribute(
val nodeId: NodeId,
val attribute: Attribute<*>,
override val time: Instant
) : GraphMutationEvent
public data class CreateNode(
val nodeType: NodeType,
override val time: Instant
) : GraphMutationEvent
public data class RemoveNode(
val nodeId: NodeId,
override val time: Instant
) : GraphMutationEvent
}
/**
* History events
*/
public sealed interface GraphHistoryEvent : WorkEvent, GraphEvent {
public data class AttributeChanged<T>(
val nodeId: NodeId,
val attribute: Attribute<T>,
val valueBefore: T?,
val valueAfter: T?,
override val attributes: Attributes,
override val id: EventId,
override val time: Instant
) : GraphHistoryEvent
public data class NodeChanged(
val nodeId: NodeId,
val nodeBefore: StateNode?,
val nodeAfter: StateNode?,
override val attributes: Attributes,
override val id: EventId,
override val time: Instant
) : GraphHistoryEvent
}
public interface StateGraph: StateNodeProvider {
public val clock: Clock
public val history: SharedFlow<GraphHistoryEvent>
/**
* Project graph forward or backward in time
*/
//public fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph
}
public interface MutableStateGraph : StateGraph {
public suspend fun createNode(
nodeType: NodeType,
time: Instant = clock.now(),
attributes: Attributes = Attributes.EMPTY
): StateNode
public suspend fun removeNode(
nodeId: NodeId,
time: Instant = clock.now()
): StateNode?
/**
* Set attribute for given node.
*
* It is graph implementation responsibility to properly process connection.
* So setting connection attributes should automatically set mirroring attribute.
*/
public suspend fun <T> setAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
value: T,
time: Instant = clock.now()
)
/**
* Remove attribute for given [nodeId].
*
* It is graph implementation responsibility to properly process connection.
* So removing mirroring attribute should happen automatically.
*/
public suspend fun <T> removeAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
time: Instant = clock.now()
)
/**
* Perform a couple of actions in a single atomic transaction.
*/
public suspend fun withTransaction(block: suspend MutableStateGraph.() -> Unit): Unit
}
internal suspend fun <T> GraphMutationEvent.SetAttribute<T>.mutate(graph: MutableStateGraph) {
graph.setAttribute(nodeId, attribute, value, time)
}
/**
* Mutate a graph with event.
*/
public suspend fun MutableStateGraph.consumeEvent(event: GraphMutationEvent): Unit {
when (event) {
is GraphMutationEvent.CreateNode -> createNode(event.nodeType, clock.now())
is GraphMutationEvent.RemoveNode -> removeNode(event.nodeId, clock.now())
is GraphMutationEvent.SetAttribute<*> -> event.mutate(this)
is GraphMutationEvent.RemoveAttribute -> removeAttribute(
nodeId = event.nodeId,
attribute = event.attribute,
time = clock.now()
)
}
}

View File

@@ -0,0 +1,22 @@
package center.sciprog.workflow.state
import center.sciprog.workflow.Work
import center.sciprog.workflow.WorkEvent
/**
* Stateful [StateGraph] manager
*/
public interface StateManager {
public val graph: StateGraph
/**
* Consume a single event. Fail if event is inconsistent with the current state
*/
public suspend fun consume(event: WorkEvent)
/**
* Advance [graph] by the flow of events in given [work].
*
*/
public suspend fun execute(work: Work): Unit = work.flow().collect(::consume)
}

View File

@@ -0,0 +1,50 @@
package center.sciprog.workflow.state
import space.kscience.attributes.Attribute
import space.kscience.attributes.AttributeContainer
import space.kscience.attributes.Attributes
import space.kscience.attributes.keys
public typealias NodeId = String
public typealias NodeType = String
public typealias NodeConnectionType = String
public interface NodeConnection: AttributeContainer {
public val type: NodeConnectionType
public val inNode: NodeId
public val outNode: NodeId
}
public data class SimpleNodeConnection(
override val type: NodeConnectionType,
override val inNode: NodeId,
override val outNode: NodeId,
override val attributes: Attributes
) : NodeConnection
public interface ConnectionAttribute : Attribute<NodeConnection>
public interface StateNode : AttributeContainer {
public val id: NodeId
public val type: NodeType
public companion object {
public const val NODE_TYPE_KEY: String = "@type"
}
}
public suspend fun StateGraph.connections(nodeId: NodeId): Collection<NodeConnection> {
val node = get(nodeId) ?: return emptyList()
return node.attributes.keys
.filter { it is ConnectionAttribute }
.map { node.attributes[it] }
.filterIsInstance<NodeConnection>()
}
public suspend fun StateGraph.outConnections(nodeId: NodeId): Collection<NodeConnection> =
connections(nodeId).filter { connection -> connection.outNode == nodeId }
public suspend fun StateGraph.inConnections(nodeId: NodeId): Collection<NodeConnection> =
connections(nodeId).filter { connection -> connection.inNode == nodeId }

View File

@@ -0,0 +1,76 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.flow.Flow
import space.kscience.attributes.Attribute
import space.kscience.attributes.SetAttribute
/**
* Query constructor for a graph
*/
public sealed interface StateGraphQuery {
/**
* Select all nodes with given type
*/
public data class ByType(val type: String) : StateGraphQuery
/**
* Select nodes that adhere to all [queries]
*/
public data class AllOf(val queries: List<StateGraphQuery>) : StateGraphQuery
/**
* Select nodes that adhere to one of [queries]
*/
public data class OneOf(val queries: List<StateGraphQuery>) : StateGraphQuery
/**
* Filter nodes by predicate on attribute value
*/
public data class AttributeEquals<T>(public val attribute: Attribute<T>, public val value: T) : StateGraphQuery
/**
* A query that selects nodes where a specific set attribute contains a given value.
*
* This query is used to filter state graph nodes based on whether an element exists
* in the set associated with a specified attribute.
*
* @param attribute The set attribute to evaluate.
* @param value The value to check for in the attribute's set.
* @param T The type of elements within the set attribute.
*/
public data class AttributeContains<T>(public val attribute: SetAttribute<T>, public val value: T) : StateGraphQuery
// /**
// * Represents a query that filters nodes in a state graph based on whether a specified attribute
// * falls within a defined range of values.
// *
// * This query is particularly useful for selecting graph nodes where an attribute of a comparable type
// * meets criteria defined by a closed range.
// *
// * @param T The type of the attribute, which must implement the [Comparable] interface.
// * @property attribute The attribute used for comparison within the query.
// * @property range The closed range used to filter nodes whose attribute values fall within this range.
// */
// public data class AttributeInRange<T: Comparable<T>>(public val attribute: Attribute<T>, public val range: ClosedRange<T>) : StateGraphQuery
//
// public data class ByQuery(val query: String) : StateGraphQuery
}
public data class NodeWithId(val nodeId: NodeId, val node: StateNode)
public interface StateNodeProvider {
/**
* Flow all node Ids. The order of ids is undefined
*/
public fun nodeIds(): Flow<NodeId>
/**
* Read a single node if it exists
*/
public suspend operator fun get(id: NodeId): StateNode?
/**
* Request a lazy flow of nodes with a given query. The order
*/
public suspend fun query(query: StateGraphQuery): Flow<NodeWithId>
}

View File

@@ -0,0 +1,26 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
kscience {
jvm()
useCoroutines()
useSerialization()
jvmMain {
api(projects.workflowState)
api(libs.gremlin.core)
}
jvmTest {
implementation(spclibs.logback.classic)
implementation(libs.tinkergraph.gremlin)
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@@ -0,0 +1,169 @@
package center.sciprog.workflow.state
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
import org.apache.tinkerpop.gremlin.structure.Edge
import org.apache.tinkerpop.gremlin.structure.Element
import org.apache.tinkerpop.gremlin.structure.Vertex
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import space.kscience.attributes.UnsafeAPI
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ as TinkerPop
public interface TinkerAttributeAccessor<T> {
/**
* Key of corresponding edge or VertexAttribute
*/
public val key: String
public val attribute: Attribute<T>
public fun Element.readAttribute(): T
public fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> =
map { it.get().readAttribute() }
public fun <E : Element> GraphTraversal<*, E>.writeAttribute(
attribute: Attribute<T>,
value: T?
): GraphTraversal<*, E>
public fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(attributes: Attributes): GraphTraversal<*, E> =
writeAttribute(attribute, attributes[attribute])
}
public fun <E : Element> GraphTraversal<*, E>.writeAttributes(
registry: TinkerRegistry,
attributes: Attributes
): GraphTraversal<*, E> {
var traversal = this
attributes.content.keys.forEach { attribute ->
val writer = registry.attributePipeByAttribute(attribute)
with(writer) {
traversal = traversal.writeAttributeFrom(attributes)
}
}
return traversal
}
/**
* A [TinkerAttributeAccessor] implementation that assumes
*/
public class SimpleTinkerAttributeAccessor<T>(
override val key: String,
override val attribute: Attribute<T>
) : TinkerAttributeAccessor<T> {
override fun Element.readAttribute(): T = this.value<T>(key)
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> = valueOrNull(key)
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
attribute: Attribute<T>,
value: T?
): GraphTraversal<*, E> = property(key, value)
}
public interface TinkerConnectionAttributeAccessor<T : NodeConnection> : TinkerAttributeAccessor<T> {
public fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, T?>
/**
* Update edge after creation
*/
public fun GraphTraversal<*, Edge>.writeEdgeAttributes(connection: T): GraphTraversal<*, Edge>
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> = bothE(key).readEdge()
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
attribute: Attribute<T>,
value: T?
): GraphTraversal<*, E> = if (value == null) {
branch(
bothE(key).drop()
)
} else {
branch(
addE(key).from(V(value.inNode)).to(V(value.outNode)).writeEdgeAttributes(value)
)
}
override fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(
attributes: Attributes
): GraphTraversal<*, E> = writeAttribute(attribute, attributes[attribute])
}
public class SimpleTinkerConnectionAttributeAccessor(
override val key: String,
override val attribute: Attribute<SimpleNodeConnection>,
private val registry: TinkerRegistry,
) : TinkerConnectionAttributeAccessor<SimpleNodeConnection> {
override fun Element.readAttribute(): SimpleNodeConnection {
val edge = this as? Edge ?: error("Not an edge: $this")
return SimpleNodeConnection(
edge.label(),
edge.inVertex().id().toString(),
edge.outVertex().id().toString(),
TinkerAttributes(edge, registry)
)
}
override fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, SimpleNodeConnection?> = map {
val edge = it.get()
SimpleNodeConnection(
edge.label(),
edge.inVertex().id().toString(),
edge.outVertex().id().toString(),
TinkerAttributes(edge, registry)
)
}
override fun GraphTraversal<*, Edge>.writeEdgeAttributes(
connection: SimpleNodeConnection
): GraphTraversal<*, Edge> = writeAttributes(registry, connection.attributes)
}
@OptIn(UnsafeAPI::class)
public class TinkerAttributes(
public val element: Element,
private val registry: TinkerRegistry
) : Attributes {
override val content: Map<out Attribute<*>, Any?>
get() = element.keys().mapNotNull {
registry.attributePipeByLabel(it)
}.associate { converter ->
with(converter) {
converter.attribute to element.readAttribute()
}
}
override fun toString(): String = "TinkerAttributes(element=$element)"
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as TinkerAttributes
if (element != other.element) return false
if (registry != other.registry) return false
return true
}
override fun hashCode(): Int {
var result = element.hashCode()
result = 31 * result + registry.hashCode()
return result
}
public companion object
}
public fun <E, T> GraphTraversal<E, *>.valueOrNull(propertyKey: String): GraphTraversal<E, T?> = choose<T>(
TinkerPop.has<Vertex>(propertyKey),
TinkerPop.values<Vertex, T>(propertyKey),
TinkerPop.constant(null)
)

View File

@@ -0,0 +1,41 @@
package center.sciprog.workflow.state
import space.kscience.attributes.Attribute
public interface TinkerRegistry {
public fun attributePipeByLabel(label: String): TinkerAttributeAccessor<*>
public fun <T> attributePipeByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T>
}
private class SimpleTinkerRegistry(val accessors: Set<TinkerAttributeAccessor<*>>) : TinkerRegistry {
override fun attributePipeByLabel(label: String): TinkerAttributeAccessor<*> = accessors.single { it.key == label }
@Suppress("UNCHECKED_CAST")
override fun <T> attributePipeByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T> = accessors.single {
it.attribute == attribute
} as TinkerAttributeAccessor<T>
}
/**
* Constructs a TinkerRegistry with the provided set of attribute accessors.
*
* @param accessors A set of TinkerAttributeAccessor objects that define the mapping
* between graph elements and their corresponding attributes.
* @return A TinkerRegistry instance that maps attributes to their corresponding keys or labels.
*/
public fun TinkerRegistry(
accessors: Set<TinkerAttributeAccessor<*>>
): TinkerRegistry = SimpleTinkerRegistry(accessors)
public fun <T> MutableSet<TinkerAttributeAccessor<*>>.attribute(
key: String,
attribute: Attribute<T>
) {
add(SimpleTinkerAttributeAccessor(key, attribute))
}
public fun TinkerRegistry(
registryBuilder: MutableSet<TinkerAttributeAccessor<*>>.() -> Unit
): TinkerRegistry = TinkerRegistry(buildSet(registryBuilder))

View File

@@ -0,0 +1,212 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.flow.*
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import org.apache.tinkerpop.gremlin.process.traversal.P
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.V
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.sideEffect
import org.apache.tinkerpop.gremlin.structure.Vertex
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
public class TinkerNode(
private val vertex: Vertex,
private val registry: TinkerRegistry
) : StateNode {
override val id: NodeId get() = vertex.id().toString()
override val type: NodeType get() = vertex.label()
override val attributes: Attributes get() = TinkerAttributes(vertex, registry)
}
public class TinkerStateGraph(
private val ts: GraphTraversalSource,
public val registry: TinkerRegistry,
override val clock: Clock = Clock.System,
) : MutableStateGraph, AutoCloseable by ts.graph {
private val sharedEventFlow = MutableSharedFlow<GraphHistoryEvent>()
private suspend fun emitEvent(event: GraphHistoryEvent) {
sharedEventFlow.emit(event)
}
private fun findVertex(elementId: NodeId): Vertex? = ts.V(elementId).asSequence().firstOrNull()
override fun nodeIds(): Flow<NodeId> = ts.V().asFlow().map { it.id().toString() }
override suspend fun get(id: NodeId): StateNode? = findVertex(id)?.let { TinkerNode(it, registry) }
@OptIn(ExperimentalUuidApi::class)
private fun generateEventId() = Uuid.random().toHexString()
override suspend fun createNode(
nodeType: NodeType,
time: Instant,
attributes: Attributes
): StateNode {
val vertex = ts.addV(nodeType).writeAttributes(registry, attributes).asSequence().first()
val newNode = TinkerNode(vertex, registry)
emitEvent(
GraphHistoryEvent.NodeChanged(
nodeId = newNode.id,
nodeBefore = null,
nodeAfter = newNode,
attributes = attributes,
id = generateEventId(),
time = time
)
)
return newNode
}
override suspend fun removeNode(nodeId: NodeId, time: Instant): StateNode? {
val vertex = findVertex(nodeId) ?: return null
emitEvent(
GraphHistoryEvent.NodeChanged(
nodeId = nodeId,
nodeBefore = TinkerNode(vertex, registry),
nodeAfter = null,
attributes = Attributes.EMPTY,
id = generateEventId(),
time = time
)
)
return TinkerNode(vertex, registry)
}
override suspend fun <T> setAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
value: T,
time: Instant
): Unit = with(registry.attributePipeByAttribute(attribute)) {
val oldValue: T? = ts.V(nodeId)
.readAttribute()
.choose<T>(
P.neq(value),
sideEffect(
V<Vertex>(nodeId).property(key, value)
)
).next()
if (oldValue != value) {
emitEvent(
GraphHistoryEvent.AttributeChanged<T>(
nodeId = nodeId,
attribute = attribute,
valueBefore = oldValue,
valueAfter = value,
attributes = Attributes.EMPTY,
id = generateEventId(),
time = time
)
)
}
}
override suspend fun <T> removeAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
time: Instant
): Unit = with(registry.attributePipeByAttribute(attribute)) {
val oldValue = ts.V(nodeId)
.readAttribute()
.choose<T>(
P.neq(null),
sideEffect(
V<Vertex>(nodeId).properties<T>(key).drop()
)
).next()
if (oldValue != null) {
emitEvent(
GraphHistoryEvent.AttributeChanged(
nodeId = nodeId,
attribute = attribute,
valueBefore = oldValue,
valueAfter = null,
attributes = Attributes.EMPTY,
id = generateEventId(),
time = time
)
)
}
}
override suspend fun withTransaction(block: suspend MutableStateGraph.() -> Unit) {
val graph = ts.graph
if (!graph.features().graph().supportsTransactions()) {
error("Transactions not supported by graph")
}
val transaction = graph.tx()
if (transaction.isOpen) error("Transaction already open")
try {
val txs: GraphTraversalSource = transaction.begin()
TinkerStateGraph(txs, registry, clock).block()
transaction.commit()
} catch (e: Exception) {
// TODO add logging
transaction.rollback()
}
}
override val history: SharedFlow<GraphHistoryEvent> get() = sharedEventFlow
override suspend fun query(query: StateGraphQuery): Flow<NodeWithId> {
fun GraphTraversal<*, Vertex>.query(
query: StateGraphQuery
): GraphTraversal<*, Vertex> = when (query) {
is StateGraphQuery.OneOf -> or(
*query.queries.map { query(it) }.toTypedArray()
)
is StateGraphQuery.AllOf -> and(
*query.queries.map { query(it) }.toTypedArray()
)
is StateGraphQuery.ByType -> hasLabel(query.type)
is StateGraphQuery.AttributeContains<*> -> TODO()
is StateGraphQuery.AttributeEquals<*> -> {
val accessor = registry.attributePipeByAttribute(query.attribute)
with(registry.attributePipeByAttribute(query.attribute)) {
has(accessor.key).where(readAttribute().`is`(query.value))
}
}
//
// is StateGraphQuery.AttributeInRange<*> -> {
// val accessor = registry.attributePipeByAttribute(query.attribute)
// with(registry.attributePipeByAttribute(query.attribute)) {
// has(accessor.key).where(
// readAttribute().filter(
// P.between(
// query.range.start,
// query.range.endInclusive
// )
// )
// )
// }
// }
}
return ts.V().query(query).asFlow().map {
NodeWithId(it.id().toString(), TinkerNode(it, registry))
}
}
}

View File

@@ -0,0 +1,37 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.test.runTest
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory
import space.kscience.attributes.Attribute
import kotlin.test.Test
import kotlin.test.assertEquals
internal class AttributesTest {
object AttributeA : Attribute<Double>
object AttributeB : Attribute<String>
@Test
fun tinkerAttributesWriteRead() = runTest {
val graph = TinkerFactory.createModern()
val registry = TinkerRegistry {
attribute("a", AttributeA)
attribute("b", AttributeB)
}
TinkerStateGraph(graph.traversal(), registry).use { state ->
val node = state.createNode("test")
state.setAttribute(node.id, AttributeA, 1.0)
assertEquals(1.0, node.attributes[AttributeA])
state.removeAttribute(node.id, AttributeA)
assertEquals(null, node.attributes[AttributeA])
}
}
}