Compare commits

10 Commits
main ... dev

40 changed files with 1878 additions and 213 deletions

70
README.md Normal file
View File

@@ -0,0 +1,70 @@
# WorkFlow: industrial process simulation and monitoring tool
## Aim
The aim of this work is to create a toolset to monitor and simulate industrial processes like production and processing plants,
resource management for project-based teams, budgeting in complex cases, etc.
The model could be used for:
* project monitoring;
* project optimisation;
* project automation;
## Not-aim
Creating GUI is currently out of scope.
Technical process simulation is done with Controls-kt.
Project strategy management is currently out of scope.
## Concepts
The project is based on several main constructs:
### Resource/object graph
The current state of the system is described as a graph structure where each node is a separate entity. Each node has a type
and unique ID. Some node types could also have **parameters**, that could not be changed after node creation.
Node can have any number of **attributes** (they could be added, removed and replaced). Attributes define the current node state.
Node can have external descriptor that stores meta-information about the node. Like access rights, visualization preferences, etc.
### Attributes
Attributes are strongly typed containers for values. An attribute key stores information about attribute name and attribute type.
Also, it could store information about default values and serialization rules.
Some attributes are **relations** that connect two nodes with specific directed connection. Currently, we use only one-to-one relations.
**To be discussed:** attributes could have their own attributes either defined in themselves, or in schema.
### Work
Work is a list of events that contains rules to modify object graph. Basically it has the following events:
* add node;
* remove node;
* modify attribute (add/remove/replace);
Work also could have events that do not modify resource graph but still should be monitored. Like change of external balance.
Work events are consumed by a work graph with respect to attached **processes**. It is possible that the process rejects node change.
It is also possible that the process adds new node changes based on the graph state.
**To be discussed:** In case of event rejection, part of work could still be completed based on rules and event relations.
The specific rules should be further discussed.
### Process
A "smart-contract" (no blockchain yet) that defines automatic changes to object graph based on **work** being done.
### History
The log of all changes in object graph. It could be used to construct "time machine" and reverse the state of the graph to any state in the past.
The history events mirror **work** events.
## Further discussion
Time management is out of the scope for now, but it should be considered in the future. For example, there should be possibility to
add delay to the process and allow it to propagate in a virtual time. Timeline from *simulations-kt* could be used for that.

View File

@@ -4,6 +4,3 @@ plugins {
group = "center.sciprog"
version = "0.1.0"
val attributesVersion: String by extra("0.1.0")

View File

@@ -1,3 +1,5 @@
kotlin.code.style=official
toolsVersion=0.15.2-kotlin-1.9.22
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
toolsVersion=0.20.2-kotlin-2.3.0

10
gradle/libs.versions.toml Normal file
View File

@@ -0,0 +1,10 @@
[versions]
tinkerpop = "3.8.0"
[libraries]
attributes = "space.kscience:attributes-kt:0.3.0"
attributes-serialization = "space.kscience:attributes-kt-serialization:0.4.0"
gremlin-core = { module = "org.apache.tinkerpop:gremlin-core", version.ref = "tinkerpop" }
tinkergraph-gremlin = { module = "org.apache.tinkerpop:tinkergraph-gremlin", version.ref = "tinkerpop" }

View File

@@ -1,6 +1,6 @@
#Wed Apr 03 19:58:49 MSK 2024
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@@ -39,4 +39,8 @@ dependencyResolutionManagement {
}
}
include(":workflow-core")
include(
":workflow-core",
":workflow-pm",
":workflow-tinkerpop",
)

View File

@@ -13,10 +13,12 @@ kscience {
native()
useCoroutines()
useSerialization()
useContextParameters()
commonMain {
api(spclibs.kotlinx.datetime)
api("space.kscience:attributes-kt:$attributesVersion")
api("com.benasher44:uuid:0.8.4")
api(libs.attributes)
}
jvmTest {

View File

@@ -1,37 +0,0 @@
package center.sciprog.workflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import space.kscience.attributes.AttributeContainer
import space.kscience.attributes.Attributes
public typealias SubjectId = String
public sealed interface Subject : AttributeContainer {
public val id: SubjectId
}
public data class Person(
override val id: SubjectId,
public val name: String,
override val attributes: Attributes = Attributes.EMPTY,
) : Subject
public data class Organization(
override val id: SubjectId,
public val name: String,
override val attributes: Attributes = Attributes.EMPTY,
) : Subject
public interface SubjectProvider {
public suspend fun ids(): Flow<SubjectId>
public suspend fun provide(subjectId: SubjectId): Subject?
}
public class ListSubjectProvider(private val subjects: List<Subject>) : SubjectProvider {
override suspend fun ids(): Flow<SubjectId> = subjects.asFlow().map { it.id }
override suspend fun provide(subjectId: SubjectId): Subject? = subjects.find { it.id == subjectId }
}

View File

@@ -1,64 +0,0 @@
package center.sciprog.workflow
import kotlinx.datetime.*
import kotlinx.serialization.Serializable
import space.kscience.attributes.Attributes
import space.kscience.attributes.AttributesBuilder
/**
* A money transaction
*/
public interface Transaction : WorkEvent {
public val fromSubject: Subject
public val toSubject: Subject
public val amount: Money
}
/**
* The transaction that has been executed. Some additional information could be encoded in [attributes]
*/
@Serializable
public data class ExecutedTransaction(
override val id: EventId,
override val fromSubject: Subject,
override val toSubject: Subject,
override val amount: Money,
override val time: Instant,
override val attributes: Attributes,
) : WorkEvent, Transaction
public suspend fun WorkBuilder.transaction(
fromSubject: Subject,
toSubject: Subject,
amount: Money,
time: Instant = Clock.System.now(),
attributesBuilder: AttributesBuilder<ExecutedTransaction>.() -> Unit = {},
) {
put(ExecutedTransaction(generateId(), fromSubject, toSubject, amount, time, Attributes(attributesBuilder)))
}
/**
* The transaction that is planned, but not yet executed
*/
@Serializable
public data class PlannedTransaction(
override val id: EventId,
public val plannedOn: Instant,
override val fromSubject: Subject,
override val toSubject: Subject,
override val amount: Money,
override val time: Instant,
override val attributes: Attributes,
) : WorkEvent, Transaction
public suspend fun WorkBuilder.plannedTransaction(
fromSubject: Subject,
toSubject: Subject,
plannedOn: Instant,
amount: Money,
time: Instant = Clock.System.now(),
attributesBuilder: AttributesBuilder<PlannedTransaction>.() -> Unit = {},
) {
put(PlannedTransaction(generateId(), plannedOn, fromSubject, toSubject, amount, time, Attributes(attributesBuilder)))
}

View File

@@ -1,50 +1,57 @@
@file:OptIn(UnstableAPI::class)
package center.sciprog.workflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.merge
import kotlinx.datetime.Instant
import kotlinx.datetime.LocalDateTime
import space.kscience.attributes.AttributeContainer
import space.kscience.attributes.Attributes
import space.kscience.attributes.UnstableAPI
import kotlin.time.Instant
public typealias WorkId = String
public typealias EventId = String
/**
* A generic operation in a workflow
*/
public interface WorkEvent : AttributeContainer {
/**
* A unique ID for the operation
*/
public val id: EventId
public data class WorkEvent<out T>(
public val id: EventId,
public val time: Instant,
public val action: T,
override val attributes: Attributes,
val dependsOn: Set<EventId> = emptySet(),
): AttributeContainer
public typealias AnyWorkEvent = WorkEvent<Any?>
/**
* A localized time operation is attributed to
*/
public val time: Instant
}
/**
* A central API for all workflow computations
* A central API for all workflow computations.
*
* Work represents a flow of events that could be run multiple times.
*/
public interface Work {
public interface Work<out T> {
public fun flow(): Flow<WorkEvent>
public fun flow(activeOnly: Boolean = true): Flow<WorkEvent<T>>
public companion object {
/**
* Join several works into one
*/
public fun join(vararg works: Work): Work = object : Work {
override fun flow(): Flow<WorkEvent> = works.map { it.flow() }.merge()
public fun <T> join(vararg works: Work<T>): Work<T> = object : Work<T> {
override fun flow(activeOnly: Boolean): Flow<WorkEvent<T>> = works.map { it.flow(activeOnly) }.merge()
}
}
}
public typealias AnyWork = Work<Any?>
/**
* A work that can produce a modified work via [WorkBuilder]
* A work that can be forked via [WorkBuilder]
*/
public interface WorkWithBuilder<W : Work> : Work {
public suspend fun modified(block: suspend WorkBuilder.() -> Unit): W
public interface WorkStore<A> : Work<A> {
/**
* Produce a modified work by creating a copy and applying changes
*/
public suspend fun derive(rules: List<WorkRule<A>>, block: suspend WorkBuilder<A>.() -> Unit): Work<A>
}

View File

@@ -1,35 +1,51 @@
package center.sciprog.workflow
import kotlinx.coroutines.flow.Flow
import space.kscience.attributes.Attributes
import space.kscience.attributes.AttributesBuilder
import kotlin.time.Clock
import kotlin.time.Instant
public interface WorkBuilder {
public interface WorkBuilder<A> {
/**
* Generate a unique event Id
*/
public suspend fun generateId(): EventId
/**
* Add a single event to a builder
* Add a single event to a builder. Throw an exception if the event with given Id already exists
*/
public suspend fun put(event: WorkEvent)
public suspend fun put(event: WorkEvent<A>)
/**
* Visit all events Remove or replace or don't change all events in the Work.
* * If the result of the [visitor] is the same as input, no changes are done.
* * If the result is a new event, the initial event is replaced
* * if the result is null, the initial event is removed
*/
public suspend fun visit(visitor: (WorkEvent) -> WorkEvent?)
public suspend fun changeAttributes(eventId: EventId, builder: AttributesBuilder<WorkEvent<A>>.() -> Unit)
}
/**
* Remove a single event with given ID from builder
*/
public suspend fun WorkBuilder.remove(id: EventId): Unit = visit { if (it.id == id) null else it }
public suspend fun <T> WorkBuilder<T>.setState(eventId: EventId, state: WorkEventState): Unit = changeAttributes(eventId){
put(WorkEventState, state)
}
public suspend fun WorkBuilder.putAll(events: Collection<WorkEvent>) {
public suspend fun WorkBuilder<*>.cancel(id: EventId) {
setState(id, WorkEventState.CANCELED)
}
public suspend fun <T> WorkBuilder<T>.put(
content: T,
time: Instant = Clock.System.now(),
attributes: Attributes = Attributes.EMPTY,
dependsOn: Set<EventId> = emptySet()
): Unit = put(
WorkEvent<T>(
id = generateId(),
time = time,
action = content,
attributes = attributes,
dependsOn = dependsOn
)
)
public suspend fun <T> WorkBuilder<T>.putAll(events: Collection<WorkEvent<T>>) {
events.forEach {
put(it)
}
@@ -38,10 +54,10 @@ public suspend fun WorkBuilder.putAll(events: Collection<WorkEvent>) {
/**
* Suspends until all [events] are put into this builder
*/
public suspend fun WorkBuilder.putAll(events: Flow<WorkEvent>) {
public suspend fun <T> WorkBuilder<T>.putAll(events: Flow<WorkEvent<T>>) {
events.collect {
put(it)
}
}
public suspend fun WorkBuilder.putAll(work: Work): Unit = putAll(work.flow())
public suspend fun <T> WorkBuilder<T>.putAll(work: Work<T>): Unit = putAll(work.flow())

View File

@@ -0,0 +1,16 @@
package center.sciprog.workflow
import space.kscience.attributes.Attribute
public enum class WorkEventState {
PENDING,
COMPLETED,
CANCELED;
public companion object : Attribute<WorkEventState>
}
public val AnyWorkEvent.state: WorkEventState get() = attributes[WorkEventState] ?: WorkEventState.PENDING
public val AnyWorkEvent.isActive: Boolean get() = state != WorkEventState.CANCELED

View File

@@ -1,37 +1,13 @@
package center.sciprog.workflow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.fold
import kotlinx.datetime.Instant
public interface WorkFunction<T> {
public val emptyBase: T
public suspend fun compute(work: Work, base: T = emptyBase): T
/**
* Aggregate values for given [Work]
*
* @param A type of work action ([WorkEvent] content)
*/
public interface WorkFunction<T, in A> {
public val defaultBase: T
public suspend fun compute(work: Work<A>, base: T = defaultBase): T
}
public class Payments(
public val subject: Subject,
public val timeFrame: ClosedRange<Instant>,
public val valueExtractor: (Money) -> Double,
override val emptyBase: Double = 0.0,
) : WorkFunction<Double> {
override suspend fun compute(work: Work, base: Double): Double {
return work.flow().filterIsInstance<ExecutedTransaction>().filter {
it.toSubject == subject && it.time in timeFrame
}.fold(base) { acc, value -> acc + valueExtractor(value.amount) }
}
public companion object {
public fun inRubles(
subject: Subject,
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
): Payments = Payments(
subject,
timeFrame,
{ if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") }
)
}
}

View File

@@ -1,47 +1,80 @@
@file:OptIn(ExperimentalUuidApi::class)
package center.sciprog.workflow
import com.benasher44.uuid.uuid4
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
import space.kscience.attributes.AttributesBuilder
import space.kscience.attributes.modified
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
private class WorkListBuilder(val idPrefix: String, val events: MutableList<WorkEvent>) : WorkBuilder {
override suspend fun generateId(): EventId = idPrefix + "-" + uuid4().toString()
private class WorkListBuilder<A>(
val idPrefix: String,
val events: MutableList<WorkEvent<A>>,
val rules: List<WorkRule<A>>
) : WorkBuilder<A> {
override suspend fun put(event: WorkEvent) {
override suspend fun generateId(): EventId = idPrefix + "-" + Uuid.random().toHexString()
override suspend fun put(event: WorkEvent<A>) {
if (events.any { it.id == event.id }) error("Event with id ${event.id} already exists")
events.add(event)
rules.filterIsInstance<WorkRule.OnPut<A>>().forEach { it.run(this, event) }
}
override suspend fun visit(visitor: (WorkEvent) -> WorkEvent?) {
val iterator = events.listIterator()
while (iterator.hasNext()) {
val event = iterator.next()
val newEvent = visitor(event)
if (newEvent == null) {
iterator.remove()
} else if (newEvent != event) {
iterator.set(newEvent)
override suspend fun changeAttributes(
eventId: EventId,
builder: AttributesBuilder<WorkEvent<A>>.() -> Unit
) {
//TODO add concurrent safety
val eventIndex = events.indexOfFirst { it.id == eventId }
if (eventIndex == -1) error("Event with id $eventId not found")
val event = events[eventIndex]
val newAttributes = event.attributes.modified(builder)
events[eventIndex] = event.copy(attributes = newAttributes)
rules.filterIsInstance<WorkRule.OnAttributesChange<A>>().forEach { it.run(this, event, newAttributes) }
if (event.isActive && newAttributes[WorkEventState] == WorkEventState.CANCELED) {
event.dependsOn.forEach {
cancel(it)
}
}
}
}
public class WorkList(private val events: List<WorkEvent>) : WorkWithBuilder<WorkList> {
public class WorkList<A>(private val events: List<WorkEvent<A>>) : WorkStore<A> {
override fun flow(): Flow<WorkEvent> = events.asFlow()
override fun flow(activeOnly: Boolean): Flow<WorkEvent<A>> = if (activeOnly) {
events.asFlow().filter { it.isActive }
} else {
events.asFlow()
}
override suspend fun modified(block: suspend WorkBuilder.() -> Unit): WorkList {
/**
* Create a derived work by applying changes to the original work
*/
override suspend fun derive(rules: List<WorkRule<A>>, block: suspend WorkBuilder<A>.() -> Unit): WorkList<A> {
val modifiedEvents = ArrayList(events)
val prefix = uuid4().leastSignificantBits.toString(16)
WorkListBuilder(prefix, modifiedEvents).block()
return WorkList((modifiedEvents))
val prefix = Uuid.random().toHexString()
WorkListBuilder(prefix, modifiedEvents, rules).block()
return WorkList(modifiedEvents)
}
}
public suspend fun WorkList(block: suspend WorkBuilder.() -> Unit): WorkList {
val events = mutableListOf<WorkEvent>()
val prefix = uuid4().leastSignificantBits.toString(16)
val builder = WorkListBuilder(prefix, events)
public suspend fun <T> WorkList(
rules: List<WorkRule<T>> = emptyList(),
block: suspend WorkBuilder<T>.() -> Unit
): WorkList<T> {
val events = mutableListOf<WorkEvent<T>>()
val prefix = Uuid.random().toHexString()
val builder = WorkListBuilder(prefix, events, rules)
builder.block()
return WorkList(events)
}

View File

@@ -0,0 +1,17 @@
package center.sciprog.workflow
import space.kscience.attributes.Attributes
/**
* A rule that is triggered when a new event is put into the workflow or when attributes of an event are changed.
*/
public sealed interface WorkRule<A> {
public fun interface OnPut<A> : WorkRule<A> {
public fun run(builder: WorkBuilder<A>, event: WorkEvent<A>)
}
public fun interface OnAttributesChange<A> : WorkRule<A> {
public fun run(builder: WorkBuilder<A>, event: WorkEvent<A>, newAttributes: Attributes)
}
}

View File

@@ -0,0 +1,162 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.flow.*
import kotlinx.serialization.Contextual
import kotlinx.serialization.Serializable
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import space.kscience.attributes.withAttribute
import space.kscience.attributes.withoutAttribute
import kotlin.time.Clock
import kotlin.time.Instant
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
@Serializable
private data class InMemoryStateNode(
override val id: NodeId,
override val type: NodeType,
@Contextual override val attributes: Attributes
) : StateNode
/**
* In
*/
@OptIn(ExperimentalUuidApi::class)
public class InMemoryStateGraph(
nodes: Collection<StateNode> = emptyList(),
override val clock: Clock = Clock.System,
) : MutableStateGraph {
private val nodes: HashMap<NodeId, StateNode> = HashMap(
nodes.map { value ->
value as? InMemoryStateNode ?: InMemoryStateNode(value.id, value.type, value.attributes)
}.associateBy { it.id }
)
private val _history = MutableSharedFlow<GraphEvent>()
override val history: SharedFlow<GraphEvent> get() = _history
/**
* Flow all node Ids. The order of ids is undefined
*/
override fun nodeIds(): Flow<NodeId> = nodes.keys.asFlow()
/**
* Read a single node if it exists
*/
override suspend fun get(id: NodeId): StateNode? = nodes[id]
private fun predicate(query: StateGraphQuery): (StateNode) -> Boolean = when (query) {
is StateGraphQuery.OneOf -> { node -> query.queries.any { predicate(it)(node) } }
is StateGraphQuery.AllOf -> { node -> query.queries.all { predicate(it)(node) } }
is StateGraphQuery.AttributeContains<*> -> { node ->
query.value in (node[query.attribute] ?: emptySet())
}
is StateGraphQuery.AttributeEquals<*> -> { node ->
query.value == node.attributes[query.attribute]
}
is StateGraphQuery.ByType -> { node ->
node.type == query.type
}
}
/**
* Request a lazy flow of nodes with a given query. The order
*/
override suspend fun query(query: StateGraphQuery): Flow<StateNode> = nodes.values.asFlow().filter(predicate(query))
@OptIn(ExperimentalUuidApi::class)
private fun generateId(): NodeId = Uuid.random().toHexDashString()
override suspend fun createNode(
nodeType: NodeType,
time: Instant,
attributes: Attributes
): StateNode {
val id = generateId()
val node = InMemoryStateNode(id, nodeType, attributes)
nodes[id] = node
_history.emit(GraphEvent.NodeChanged(id, null, node, time))
return node
}
override suspend fun removeNode(
nodeId: NodeId,
time: Instant
): StateNode? {
val res = nodes.remove(nodeId) ?: return null
_history.emit(GraphEvent.NodeChanged(nodeId, res, null, time))
return res
}
/**
* Set attribute for given node.
*
* It is graph implementation responsibility to properly process the connection.
* So setting connection attributes should automatically set mirroring attribute.
*/
override suspend fun <T> setAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
value: T,
time: Instant
) {
val node = nodes[nodeId] ?: error("Node with id $nodeId does not exist in the graph")
nodes[nodeId] = InMemoryStateNode(nodeId, node.type, node.attributes.withAttribute(attribute, value))
_history.emit(
GraphEvent.AttributeChanged<T>(
nodeId = nodeId,
attribute = attribute,
valueBefore = node[attribute],
valueAfter = value,
time = time
)
)
}
/**
* Remove attribute for given [nodeId].
*
* It is graph implementation responsibility to properly process connection.
* So removing the mirroring attribute should happen automatically.
*/
override suspend fun <T> removeAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
time: Instant
) {
val node = nodes[nodeId] ?: error("Node with id $nodeId does not exist in the graph")
nodes[nodeId] = InMemoryStateNode(nodeId, node.type, node.attributes.withoutAttribute(attribute))
_history.emit(
GraphEvent.AttributeChanged<T>(
nodeId = nodeId,
attribute = attribute,
valueBefore = node[attribute],
valueAfter = null,
time = time
)
)
}
override suspend fun snapshot(): StateGraph = InMemoryStateGraph(nodes.values, clock)
override suspend fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph =
InMemoryStateGraph(nodes.values, clock).apply {
block()
}
/**
* Perform a couple of actions in a single atomic transaction.
*/
override suspend fun transform(block: suspend MutableStateGraph.() -> Unit) {
//create a copy of the current graph
val transactionGraph = InMemoryStateGraph(nodes.values, clock)
transactionGraph.block()
nodes.putAll(transactionGraph.nodes)
}
}

View File

@@ -0,0 +1,34 @@
package center.sciprog.workflow.state
public sealed interface ProcessResult {
public data object Success : ProcessResult
public data class Failure(public val cause: Throwable) : ProcessResult
}
/**
* A change that is applied to a [MutableStateGraph]
*/
public fun interface Process {
/**
* Execute a mutation on a node. The mutation could span several nodes.
*
* The whole mutation is considered atomic.
*/
public suspend fun MutableStateGraph.execute(node: StateNode): ProcessResult
}
/**
* A process that is automatically triggered on node state change
*/
public interface AutoProcess : Process {
/**
* A query for all nodes that are watched for changes
*/
public val watchQuery: StateGraphQuery
/**
* A condition that triggers execution on a specific node
*/
public suspend fun StateGraph.triggersOn(node: StateNode): Boolean
}

View File

@@ -0,0 +1,39 @@
package center.sciprog.workflow.state
import center.sciprog.workflow.*
/**
* Represents a chain of graph states where each state is computed by a [WorkFunction] from previous state.
*
* @param state the current [StateGraph]
* @param parent the parent chain or null for the root chain
* @param function the function to compute next state
*/
public class StateChain<A>(
public val state: StateGraph,
public val parent: StateChain<A>?,
public val function: WorkFunction<StateGraph, A>,
public val work: Work<A>,
public val workRules: List<WorkRule<A>> = emptyList()
)
//TODO consider adding WorkRules that depend on a current state
public suspend fun <A> StateChain<A>.project(builder: WorkBuilder<A>.() -> Unit): StateChain<A> {
val work = WorkList(workRules, builder)
val newState = function.compute(work, state)
return StateChain(state = newState, parent = this, function = function, work = work)
}
public fun interface StateWorkFunction<A> : WorkFunction<StateGraph, A> {
public suspend fun MutableStateGraph.processEvent(event: WorkEvent<A>)
override val defaultBase: StateGraph get() = StateGraph.EMPTY
override suspend fun compute(work: Work<A>, base: StateGraph): StateGraph = base.project {
work.flow().collect {
processEvent(it)
}
}
}

View File

@@ -0,0 +1,179 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.emptyFlow
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import kotlin.time.Clock
import kotlin.time.Instant
/**
* History events
*/
public sealed interface GraphEvent {
public val time: Instant
public data class AttributeChanged<T>(
val nodeId: NodeId,
val attribute: Attribute<T>,
val valueBefore: T?,
val valueAfter: T?,
override val time: Instant
) : GraphEvent
public data class NodeChanged(
val nodeId: NodeId,
val nodeBefore: StateNode?,
val nodeAfter: StateNode?,
override val time: Instant
) : GraphEvent
}
/**
* Represents a graph structure consisting of interconnected nodes, where each node can maintain attributes, state, and relationships.
*
* A StateGraph is an immutable representation of the current state of the graph at any given point in time. It provides mechanisms
* to retrieve nodes and query the graph but does not allow direct modifications. StateGraph also maintains a historical record
* of events that occurred and provides time-based functionalities through a clock component.
*
* This interface extends [StateNodeProvider], providing additional behavior specific to managing the graph's state and history.
*/
public interface StateGraph : StateNodeProvider {
/**
* Project a new state graph by applying a block of operations to a mutable copy of the current graph.
*/
public suspend fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph
public companion object {
public val EMPTY: StateGraph = object : StateGraph {
override fun nodeIds(): Flow<NodeId> = emptyFlow()
override suspend fun get(id: NodeId): StateNode? = null
override suspend fun query(query: StateGraphQuery): Flow<StateNode> = emptyFlow()
override suspend fun project(
block: suspend MutableStateGraph.() -> Unit
): StateGraph = InMemoryStateGraph().apply {
block()
}
}
}
}
public interface MutableStateGraph : StateGraph {
public val clock: Clock
public val history: SharedFlow<GraphEvent>
/**
* Create new node with given type and attributes.
*
* @param nodeType type of the node to create
* @param time time of the event, defaults to current time
* @param attributes attributes to set on the node
* @return created node
*/
public suspend fun createNode(
nodeType: NodeType,
time: Instant = clock.now(),
attributes: Attributes = Attributes.EMPTY
): StateNode
/**
* Remove node with given id.
*
* @param nodeId id of the node to remove
* @param time time of the event, defaults to current time
* @return removed node, or null if node was not found
*/
public suspend fun removeNode(
nodeId: NodeId,
time: Instant = clock.now()
): StateNode?
/**
* Set attribute for given node.
*
* It is graph implementation's responsibility to properly process the connection.
* So setting connection attributes should automatically set mirroring attribute.
*/
public suspend fun <T> setAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
value: T,
time: Instant = clock.now()
)
/**
* Remove attribute for given [nodeId].
*
* It is graph implementation's responsibility to properly process the connection.
* So removing the mirroring attribute should happen automatically.
*/
public suspend fun <T> removeAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
time: Instant = clock.now()
)
/**
* Create an immutable snapshot of the current state of the graph.
*/
public suspend fun snapshot(): StateGraph
/**
* Perform a couple of actions in a single atomic transaction.
*/
public suspend fun transform(block: suspend MutableStateGraph.() -> Unit)
}
/**
* Removes a specified node from the mutable state graph.
*
* @param node The node to be removed.
* @param time The timestamp indicating when the removal should take place.
* @return The removed node if it existed in the graph, or null if no such node was found.
*/
public suspend fun MutableStateGraph.removeNode(
node: StateNode,
time: Instant = clock.now()
): StateNode? = removeNode(node.id, time)
/**
* Sets an attribute for a specified node in the mutable state graph.
*
* @param T The type of the attribute value.
* @param node The node for which the attribute is being set.
* @param attribute The attribute to update.
* @param value The value to assign to the attribute.
* @param time The timestamp of the operation.
* @return Unit
*/
public suspend fun <T> MutableStateGraph.setAttribute(
node: StateNode,
attribute: Attribute<T>,
value: T,
time: Instant = clock.now()
): Unit = setAttribute(node.id, attribute, value, time)
/**
* Removes a specific attribute from the given node at the specified point in time.
*
* @param node The node from which the attribute will be removed.
* @param attribute The attribute to be removed from the specified node.
* @param time The point in time to associate with the attribute removal operation.
* @return Unit value indicating the completion of the operation.
*/
public suspend fun <T> MutableStateGraph.removeAttribute(
node: StateNode,
attribute: Attribute<T>,
time: Instant = clock.now()
): Unit = removeAttribute(node.id, attribute, time)

View File

@@ -0,0 +1,65 @@
package center.sciprog.workflow.state
import kotlinx.serialization.Contextual
import kotlinx.serialization.Serializable
import space.kscience.attributes.Attribute
import space.kscience.attributes.AttributeContainer
import space.kscience.attributes.Attributes
import space.kscience.attributes.keys
public typealias NodeId = String
public typealias NodeType = String
public typealias NodeConnectionType = String
public interface NodeConnection : AttributeContainer {
public val type: NodeConnectionType
public val inNodeId: NodeId
public val outNodeId: NodeId
}
/**
* Actualize inNode for this connection in the context of the graph
*/
context(graph: StateGraph)
public suspend fun NodeConnection.inNode(): StateNode = graph[inNodeId] ?: error("InNode with id $inNodeId is not resolved")
/**
* Actualize outNode for this connection in the context of the graph
*/
context(graph: StateGraph)
public suspend fun NodeConnection.outNode(): StateNode = graph[outNodeId] ?: error("OutNode with id $outNodeId is not resolved")
@Serializable
public data class SimpleNodeConnection(
override val type: NodeConnectionType,
override val inNodeId: NodeId,
override val outNodeId: NodeId,
@Contextual override val attributes: Attributes = Attributes.EMPTY,
) : NodeConnection
public interface ConnectionAttribute : Attribute<NodeConnection>
public interface StateNode : AttributeContainer {
public val id: NodeId
public val type: NodeType
public companion object {
public const val NODE_TYPE_KEY: String = "@type"
}
}
public suspend fun StateGraph.connections(nodeId: NodeId): Collection<NodeConnection> {
val node = get(nodeId) ?: return emptyList()
return node.attributes.keys
.filterIsInstance<ConnectionAttribute>()
.map { node.attributes[it] }
.filterIsInstance<NodeConnection>()
}
public suspend fun StateGraph.outConnections(nodeId: NodeId): Collection<NodeConnection> =
connections(nodeId).filter { connection -> connection.outNodeId == nodeId }
public suspend fun StateGraph.inConnections(nodeId: NodeId): Collection<NodeConnection> =
connections(nodeId).filter { connection -> connection.inNodeId == nodeId }

View File

@@ -0,0 +1,97 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.flow.Flow
import space.kscience.attributes.Attribute
import space.kscience.attributes.SetAttribute
/**
* Query constructor for a graph
*/
public sealed interface StateGraphQuery {
/**
* Select all nodes with a given type
*/
public data class ByType(val type: String) : StateGraphQuery
/**
* Select nodes that adhere to all [queries]
*/
public data class AllOf(val queries: List<StateGraphQuery>) : StateGraphQuery
/**
* Select nodes that adhere to one of [queries]
*/
public data class OneOf(val queries: List<StateGraphQuery>) : StateGraphQuery
/**
* Filter nodes by predicate on attribute value
*/
public data class AttributeEquals<T>(public val attribute: Attribute<T>, public val value: T) : StateGraphQuery
/**
* A query that selects nodes where a specific set attribute contains a given value.
*
* This query is used to filter state graph nodes based on whether an element exists
* in the set associated with a specified attribute.
*
* @param attribute The set attribute to evaluate.
* @param value The value to check for in the attribute's set.
* @param T The type of elements within the set attribute.
*/
public data class AttributeContains<T>(public val attribute: SetAttribute<T>, public val value: T) : StateGraphQuery
// /**
// * Represents a query that filters nodes in a state graph based on whether a specified attribute
// * falls within a defined range of values.
// *
// * This query is particularly useful for selecting graph nodes where an attribute of a comparable type
// * meets criteria defined by a closed range.
// *
// * @param T The type of the attribute, which must implement the [Comparable] interface.
// * @property attribute The attribute used for comparison within the query.
// * @property range The closed range used to filter nodes whose attribute values fall within this range.
// */
// public data class AttributeInRange<T: Comparable<T>>(public val attribute: Attribute<T>, public val range: ClosedRange<T>) : StateGraphQuery
//
// public data class ByQuery(val query: String) : StateGraphQuery
}
public object StateGraphQueryScope
context(_: StateGraphQueryScope)
public fun type(type: String): StateGraphQuery.ByType = StateGraphQuery.ByType(type)
context(_: StateGraphQueryScope)
public fun <T> value(attribute: Attribute<T>, value: T): StateGraphQuery.AttributeEquals<T> =
StateGraphQuery.AttributeEquals(attribute, value)
context(_: StateGraphQueryScope)
public fun <T> contains(attribute: SetAttribute<T>, value: T): StateGraphQuery.AttributeContains<T> =
StateGraphQuery.AttributeContains(attribute, value)
context(_: StateGraphQueryScope)
public fun allOf(vararg queries: StateGraphQuery): StateGraphQuery.AllOf = StateGraphQuery.AllOf(queries.toList())
context(_: StateGraphQueryScope)
public fun oneOf(vararg queries: StateGraphQuery): StateGraphQuery.OneOf = StateGraphQuery.OneOf(queries.toList())
public interface StateNodeProvider {
/**
* Flow all node Ids. The order of ids is undefined
*/
public fun nodeIds(): Flow<NodeId>
/**
* Read a single node if it exists
*/
public suspend operator fun get(id: NodeId): StateNode?
/**
* Request a lazy flow of nodes with a given query. The order
*/
public suspend fun query(query: StateGraphQuery): Flow<StateNode>
}
public suspend inline fun StateNodeProvider.query(queryBlock: context(StateGraphQueryScope) () -> StateGraphQuery): Flow<StateNode> =
query(context(StateGraphQueryScope) { queryBlock() })

View File

@@ -0,0 +1,18 @@
package center.sciprog.workflow.state
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import kotlin.reflect.KProperty
public interface StateScheme {
public val node: StateNode
public operator fun <T> Attribute<T>.getValue(thisRef: Any?, property: KProperty<*>): T? = node[this]
}
context(graph: MutableStateGraph) public inline fun StateScheme.modify(
block: context(MutableStateGraph) Attributes.() -> Unit
) {
block(graph, node.attributes)
}

View File

@@ -0,0 +1,35 @@
package center.sciprog.workflow.state
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
public inline fun <T : AutoCloseable> T.receive(block: T.() -> Unit) {
use {
with(it, block)
}
}
public operator fun <T> StateNode.get(attribute: Attribute<T>): T? = attributes[attribute]
context(graph: MutableStateGraph)
public suspend operator fun <T : Any> StateNode.set(attribute: Attribute<T>, value: T?) {
if (value == null) {
graph.removeAttribute(this, attribute)
} else {
graph.setAttribute(this, attribute, value)
}
}
context(graph: MutableStateGraph)
public suspend fun StateNode.connectTo(
other: StateNode,
attribute: ConnectionAttribute,
connectionType: NodeConnectionType,
attributes: Attributes = Attributes.EMPTY
): NodeConnection {
val connection = SimpleNodeConnection(connectionType, id, other.id, attributes)
set(attribute, connection)
return connection
}

View File

@@ -0,0 +1,37 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
description = "Project management workflows"
kscience {
jvm()
js()
native()
useCoroutines()
useSerialization()
useContextParameters()
commonMain {
api(projects.workflowCore)
api(libs.attributes.serialization)
}
jvmTest {
implementation(spclibs.logback.classic)
}
}
kotlin {
compilerOptions {
freeCompilerArgs.add("-Xcontext-parameters")
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@@ -0,0 +1,41 @@
package center.sciprog.workflow.budget
import center.sciprog.workflow.WorkBuilder
import center.sciprog.workflow.put
import center.sciprog.workflow.state.StateNode
import center.sciprog.workflow.state.StateScheme
import kotlinx.serialization.Serializable
import space.kscience.attributes.Attributes
import space.kscience.attributes.AttributesBuilder
import kotlin.time.Clock
import kotlin.time.Instant
/**
* Represents a financial obligation between two subjects.
*
* Positive obligation towards a subject means that it should receiver payment. Negative obligation towards a subject means that it should pay somebody.
*/
@Serializable
public data class Obligation(
public val toSubject: SubjectId,
public val fromSubject: SubjectId?,
public val amount: Money,
override val attributes: Attributes,
) : PmFinanceAction
public suspend fun WorkBuilder<PmAction>.obligation(
fromSubject: SubjectId,
toSubject: SubjectId,
amount: Money,
time: Instant = Clock.System.now(),
attributesBuilder: AttributesBuilder<Payment>.() -> Unit = {},
) {
put(Obligation(toSubject, fromSubject, amount, Attributes(attributesBuilder)), time)
}
public class ObligationState(
override val node: StateNode
) : StateScheme {
public val amount: Money? by AmountOfMoney
}

View File

@@ -0,0 +1,66 @@
package center.sciprog.workflow.budget
import center.sciprog.workflow.Work
import center.sciprog.workflow.WorkFunction
import kotlinx.coroutines.flow.fold
import kotlin.time.Instant
/**
* Compute obligation fulfillment for a given subject within a time frame, applying a filter function to finance actions.
*
* Positive number means that the subject is paid more than it owed. Negative number means that
* obligations towards the subject are not fully covered.
*/
public class ObligationFulfillment(
public val subject: Subject,
public val timeFrame: ClosedRange<Instant>,
public val valueExtractor: (Money) -> Double,
private val filter: (PmFinanceAction) -> Boolean = { true },
override val defaultBase: Double = 0.0,
) : WorkFunction<Double, PmAction> {
override suspend fun compute(
work: Work<PmAction>,
base: Double
): Double = work.flow().fold(base) { acc, value ->
val action = value.action
if (value.time in timeFrame && action is PmFinanceAction && filter(action)) {
when(action) {
is Payment -> {
if (action.toSubject == subject) {
acc + valueExtractor(action.amount)
} else if (action.fromSubject == subject) {
acc - valueExtractor(action.amount)
} else {
acc
}
}
is Obligation -> {
if (action.toSubject == subject) {
acc - valueExtractor(action.amount)
} else if (action.fromSubject == subject) {
acc + valueExtractor(action.amount)
} else {
acc
}
}
}
} else {
acc
}
}
public companion object {
public fun inRubles(
subject: Subject,
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
filter: (PmFinanceAction) -> Boolean = { true }
): ObligationFulfillment = ObligationFulfillment(
subject = subject,
timeFrame = timeFrame,
valueExtractor = { if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") },
filter = filter
)
}
}

View File

@@ -0,0 +1,42 @@
package center.sciprog.workflow.budget
import center.sciprog.workflow.WorkBuilder
import center.sciprog.workflow.put
import center.sciprog.workflow.state.StateNode
import center.sciprog.workflow.state.StateScheme
import kotlinx.serialization.Serializable
import space.kscience.attributes.Attributes
import space.kscience.attributes.AttributesBuilder
import kotlin.time.Clock
import kotlin.time.Instant
/**
* The transaction that has been executed. Some additional information could be encoded in [attributes].
*
* Positive payment towards a subject means that it received a payment. Negative payment towards a subject means that it paid to somebody.
*/
@Serializable
public data class Payment(
public val toSubject: SubjectId,
public val fromSubject: SubjectId?,
public val amount: Money,
override val attributes: Attributes,
) : PmFinanceAction
public suspend fun WorkBuilder<PmAction>.payment(
fromSubject: SubjectId,
toSubject: SubjectId,
amount: Money,
time: Instant = Clock.System.now(),
attributesBuilder: AttributesBuilder<Payment>.() -> Unit = {},
) {
put(Payment(toSubject, fromSubject, amount, Attributes(attributesBuilder)), time)
}
public class PaymentState(
override val node: StateNode
) : StateScheme {
public val amount: Money? by AmountOfMoney
}

View File

@@ -0,0 +1,49 @@
package center.sciprog.workflow.budget
import center.sciprog.workflow.Work
import center.sciprog.workflow.WorkFunction
import kotlinx.coroutines.flow.fold
import kotlin.time.Instant
/**
* Aggregated payments for one subject in given [timeFrame]
*/
public class PaymentBalance(
public val subject: Subject,
public val timeFrame: ClosedRange<Instant>,
public val valueExtractor: (Money) -> Double,
private val filter: (Payment) -> Boolean = { true },
override val defaultBase: Double = 0.0,
) : WorkFunction<Double, PmAction> {
override suspend fun compute(
work: Work<PmAction>,
base: Double
): Double = work.flow().fold(base) { acc, value ->
val action = value.action
if (value.time in timeFrame && action is Payment && filter(action)) {
if (action.toSubject == subject) {
acc + valueExtractor(action.amount)
} else if (action.fromSubject == subject) {
acc - valueExtractor(action.amount)
} else {
acc
}
} else {
acc
}
}
public companion object {
public fun inRubles(
subject: Subject,
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
filter: (Payment) -> Boolean = { true }
): PaymentBalance = PaymentBalance(
subject = subject,
timeFrame = timeFrame,
valueExtractor = { if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") },
filter = filter
)
}
}

View File

@@ -0,0 +1,9 @@
package center.sciprog.workflow.budget
import space.kscience.attributes.AttributeContainer
public sealed interface PmAction: AttributeContainer
public sealed interface PmFinanceAction: PmAction
public sealed interface PmProjectAction: PmAction

View File

@@ -0,0 +1,34 @@
package center.sciprog.workflow.budget
import center.sciprog.workflow.WorkEvent
import center.sciprog.workflow.state.*
import kotlinx.coroutines.flow.single
public object PmFunction : StateWorkFunction<PmAction> {
private suspend fun MutableStateGraph.getSubjectNode(id: SubjectId): StateNode = query {
allOf(
type(Subject.TYPE),
value(SubjectIdAttribute, id)
)
}.single()
override val defaultBase: StateGraph get() = StateGraph.EMPTY
override suspend fun MutableStateGraph.processEvent(event: WorkEvent<PmAction>) {
val action = event.action
when (action) {
is Obligation -> {
val toNode = getSubjectNode(action.toSubject)
setAttribute(toNode, ObligationAttribute, action)
}
is Payment -> {
}
}
}
}

View File

@@ -1,6 +1,7 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import kotlinx.serialization.Serializable
import space.kscience.attributes.serialization.SerializableAttribute
public interface Resource
@@ -14,4 +15,7 @@ public data class Money(
val currency: Currency,
) : Resource
public object AmountOfMoney : SerializableAttribute<Money>("amount", Money.serializer())
public val Number.rubles: Money get() = Money(this, Rubles)

View File

@@ -0,0 +1,48 @@
package center.sciprog.workflow.budget
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.serialization.Serializable
import kotlinx.serialization.serializer
import space.kscience.attributes.AttributeContainer
import space.kscience.attributes.Attributes
import space.kscience.attributes.serialization.SerializableAttribute
public typealias SubjectId = String
/**
* Represents a subject in the workflow system, which can be either a person or an organization.
*/
public data class Subject(
val id: SubjectId,
public val name: String,
override val attributes: Attributes = Attributes.EMPTY,
) : AttributeContainer{
public companion object{
public const val TYPE: String = "subject"
}
}
public object SubjectName : SerializableAttribute<String>("subject.name", serializer())
public object SubjectIdAttribute : SerializableAttribute<String>("subject.id", serializer())
@Serializable
public enum class SubjectType {
PERSON, ORGANIZATION;
public companion object: SerializableAttribute<SubjectType>("subject.type", serializer())
}
public interface SubjectProvider {
public suspend fun ids(): Flow<SubjectId>
public suspend fun provide(subjectId: SubjectId): Subject?
}
public class SubjectList(private val subjects: List<Subject>) : SubjectProvider {
override suspend fun ids(): Flow<SubjectId> = subjects.asFlow().map { it.id }
override suspend fun provide(subjectId: SubjectId): Subject? = subjects.find { it.id == subjectId }
}

View File

@@ -0,0 +1,13 @@
package center.sciprog.workflow.budget
import space.kscience.attributes.serialization.SerializableAttribute
/**
* The project id this work attributed to
*/
public object ProjectId : SerializableAttribute<String>("project", kotlinx.serialization.serializer())
/**
* The funding source for this work
*/
public object SourceId: SerializableAttribute<String>("source", kotlinx.serialization.serializer())

View File

@@ -1,4 +1,4 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import kotlinx.serialization.KSerializer
import kotlinx.serialization.builtins.serializer

View File

@@ -1,25 +1,26 @@
package center.sciprog.workflow
package center.sciprog.workflow.budget
import center.sciprog.workflow.WorkList
import kotlinx.coroutines.test.runTest
import kotlinx.datetime.*
import kotlin.time.Clock
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.days
internal class WorkFunctionTest {
@Test
fun testPayments() = runTest{
val person = Person("test","Test Subject")
fun testPayments() = runTest {
val person = Person("test", "Test Subject")
val org = Organization("Master", "Master organization")
val work = WorkList {
val now = Clock.System.now()
repeat(10) {
transaction(org, person, 1000.rubles, time = now.minus((it*30).days))
payment(org, person, 1000.rubles, time = now.minus((it * 30).days))
}
}
assertEquals(10000.0, Payments.inRubles(person).compute(work), 1.0)
assertEquals(10000.0, PaymentBalance.inRubles(person).compute(work), 1.0)
}
}

View File

@@ -0,0 +1,32 @@
import space.kscience.gradle.Maturity
plugins {
id("space.kscience.gradle.mpp")
`maven-publish`
}
kscience {
jvm()
useCoroutines()
useSerialization()
jvmMain {
api(projects.workflowCore)
api(libs.gremlin.core)
}
jvmTest {
implementation(spclibs.logback.classic)
implementation(libs.tinkergraph.gremlin)
}
}
kotlin {
compilerOptions {
freeCompilerArgs.add("-Xcontext-parameters")
}
}
readme {
maturity = Maturity.EXPERIMENTAL
}

View File

@@ -0,0 +1,241 @@
package center.sciprog.workflow.state
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
import org.apache.tinkerpop.gremlin.structure.Direction
import org.apache.tinkerpop.gremlin.structure.Edge
import org.apache.tinkerpop.gremlin.structure.Element
import org.apache.tinkerpop.gremlin.structure.Vertex
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import space.kscience.attributes.UnsafeAPI
import space.kscience.attributes.keys
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ as TinkerPop
/**
* Interface for accessing attributes on TinkerPop elements (vertices, edges, etc.)
*/
public interface TinkerAttributeAccessor<T> {
/**
* Key of corresponding edge or VertexAttribute
*/
public val key: String
public val attribute: Attribute<T>
public fun Element.readAttribute(): T?
public fun Element.writeAttribute(value: T?)
public fun Element.writeAttributeFrom(attributes: Attributes): Unit = writeAttribute(attributes[attribute])
public fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> =
map { it.get().readAttribute() }
public fun <E : Element> GraphTraversal<*, E>.writeAttribute(
attribute: Attribute<T>,
value: T?
): GraphTraversal<*, E>
public fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(attributes: Attributes): GraphTraversal<*, E> =
writeAttribute(attribute, attributes[attribute])
}
public fun Element.writeAttributes(
registry: TinkerRegistry,
attributes: Attributes
) {
attributes.keys.forEach { attribute ->
val writer = registry.accessorByAttribute(attribute)
with(writer) {
writeAttributeFrom(attributes)
}
}
}
public fun <E : Element> GraphTraversal<*, E>.writeAttributes(
registry: TinkerRegistry,
attributes: Attributes
): GraphTraversal<*, E> {
var traversal = this
attributes.keys.forEach { attribute ->
val writer = registry.accessorByAttribute(attribute)
with(writer) {
traversal = traversal.writeAttributeFrom(attributes)
}
}
return traversal
}
/**
* A [TinkerAttributeAccessor] implementation that assumes
*/
public class SimpleTinkerAttributeAccessor<T>(
override val key: String,
override val attribute: Attribute<T>
) : TinkerAttributeAccessor<T> {
override fun Element.readAttribute(): T? = property<T>(key).orElse(null)
override fun Element.writeAttribute(value: T?) {
property(key, value)
}
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> = valueOrNull(key)
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
attribute: Attribute<T>,
value: T?
): GraphTraversal<*, E> = property(key, value)
}
public interface TinkerConnectionAttributeAccessor<T : NodeConnection> : TinkerAttributeAccessor<T> {
public fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, T?>
/**
* Update edge after creation
*/
public fun GraphTraversal<*, Edge>.writeEdgeAttributes(connection: T): GraphTraversal<*, Edge>
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> =
bothE(key).readEdge().orNull()
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
attribute: Attribute<T>,
value: T?
): GraphTraversal<*, E> = if (value == null) {
sideEffect(
bothE(key).drop()
)
} else {
sideEffect(
TinkerPop.addE<Vertex>(key).from(V(value.inNodeId)).to(V(value.outNodeId)).writeEdgeAttributes(value)
)
}
override fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(
attributes: Attributes
): GraphTraversal<*, E> = writeAttribute(attribute, attributes[attribute])
}
public class SimpleTinkerConnectionAttributeAccessor(
override val key: String,
override val attribute: Attribute<NodeConnection>,
private val registry: TinkerRegistry,
) : TinkerConnectionAttributeAccessor<NodeConnection> {
override fun Element.readAttribute(): SimpleNodeConnection? {
val vertex = this as? Vertex ?: error("Expected vertex but got $this")
val edge = vertex.edges(Direction.BOTH, key).asSequence().singleOrNull() ?: return null
return SimpleNodeConnection(
edge.label(),
edge.inVertex().id().toString(),
edge.outVertex().id().toString(),
TinkerAttributes(edge, registry)
)
}
override fun Element.writeAttribute(value: NodeConnection?) {
val vertex = this as? Vertex ?: error("Expected vertex but got $this")
if (value == null) {
vertex.edges(Direction.BOTH, key).forEach { it.remove() }
} else {
if (value.inNodeId == vertex.id().toString()) {
val inNode = vertex
val outNode = vertex.graph().vertices(value.outNodeId).asSequence().single()
outNode.addEdge(key, inNode).apply {
writeAttributes(registry, value.attributes)
}
} else if (value.outNodeId == vertex.id().toString()) {
val inNode = vertex.graph().vertices(value.inNodeId).asSequence().single()
val outNode = vertex
outNode.addEdge(key, inNode).apply {
writeAttributes(registry, value.attributes)
}
} else {
error("Connection $value is not attached to vertex $vertex")
}
}
}
override fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, NodeConnection?> = map {
val edge = it.get()
SimpleNodeConnection(
edge.label(),
edge.inVertex().id().toString(),
edge.outVertex().id().toString(),
TinkerAttributes(edge, registry)
)
}
override fun GraphTraversal<*, Edge>.writeEdgeAttributes(
connection: NodeConnection
): GraphTraversal<*, Edge> = writeAttributes(registry, connection.attributes)
}
@OptIn(UnsafeAPI::class)
public class TinkerAttributes(
public val element: Element,
private val registry: TinkerRegistry
) : Attributes {
private fun propertyContent() = element.keys().mapNotNull {
registry.accessorByLabel(it)
}.associate { accessor ->
with(accessor) {
accessor.attribute to element.readAttribute()
}
}
private fun edges(): Map<Attribute<*>, Any?> {
val vertex = element as? Vertex ?: return emptyMap()
return element.edges(Direction.BOTH).asSequence().associate { edge ->
val accessor = registry.accessorByLabel(edge.label()) as TinkerConnectionAttributeAccessor
with(accessor){
accessor.attribute to vertex.readAttribute()
}
}
}
override val content: Map<out Attribute<*>, Any?>
get() = edges() + propertyContent()
override fun toString(): String = "TinkerAttributes(element=$element)"
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as TinkerAttributes
if (element != other.element) return false
if (registry != other.registry) return false
return true
}
override fun hashCode(): Int {
var result = element.hashCode()
result = 31 * result + registry.hashCode()
return result
}
public companion object
}
public fun <E, T> GraphTraversal<E, T>.orNull(): GraphTraversal<E, T?> = choose(
TinkerPop.unfold<T>(),
TinkerPop.identity<T>(),
TinkerPop.constant(null)
)
public fun <E, T> GraphTraversal<E, *>.valueOrNull(propertyKey: String): GraphTraversal<E, T?> = choose<T>(
TinkerPop.has<Vertex>(propertyKey),
TinkerPop.values<Vertex, T>(propertyKey),
TinkerPop.constant(null)
)

View File

@@ -0,0 +1,49 @@
package center.sciprog.workflow.state
import space.kscience.attributes.Attribute
/**
* Interface representing a registry for managing access to attributes within a TinkerPop graph.
* The registry provides mechanisms to retrieve attribute accessors either by label or directly by attribute.
*/
public interface TinkerRegistry {
public fun accessorByLabel(label: String): TinkerAttributeAccessor<*>
public fun <T> accessorByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T>
}
private class SimpleTinkerRegistry(val accessors: Set<TinkerAttributeAccessor<*>>) : TinkerRegistry {
override fun accessorByLabel(label: String): TinkerAttributeAccessor<*> = accessors.single { it.key == label }
@Suppress("UNCHECKED_CAST")
override fun <T> accessorByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T> = accessors.single {
it.attribute == attribute
} as TinkerAttributeAccessor<T>
}
/**
* Constructs a TinkerRegistry with the provided set of attribute accessors.
*
* @param accessors A set of TinkerAttributeAccessor objects that define the mapping
* between graph elements and their corresponding attributes.
* @return A TinkerRegistry instance that maps attributes to their corresponding keys or labels.
*/
public fun TinkerRegistry(
accessors: Set<TinkerAttributeAccessor<*>>
): TinkerRegistry = SimpleTinkerRegistry(accessors)
public fun <T> MutableSet<TinkerAttributeAccessor<*>>.attribute(
key: String,
attribute: Attribute<T>
) {
if(attribute is ConnectionAttribute){
add(SimpleTinkerConnectionAttributeAccessor(key, attribute, TinkerRegistry(this)))
} else {
add(SimpleTinkerAttributeAccessor(key, attribute))
}
}
public fun TinkerRegistry(
registryBuilder: MutableSet<TinkerAttributeAccessor<*>>.() -> Unit
): TinkerRegistry = TinkerRegistry(buildSet(registryBuilder))

View File

@@ -0,0 +1,234 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.flow.*
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
import org.apache.tinkerpop.gremlin.structure.Vertex
import space.kscience.attributes.Attribute
import space.kscience.attributes.Attributes
import kotlin.jvm.optionals.getOrNull
import kotlin.time.Clock
import kotlin.time.Instant
/**
* Represents a node in a TinkerPop graph, encapsulating a TinkerPop vertex and providing access to its attributes.
*/
public class TinkerNode(
private val vertex: Vertex,
private val registry: TinkerRegistry
) : StateNode {
override val id: NodeId get() = vertex.id().toString()
override val type: NodeType get() = vertex.label()
override val attributes: Attributes get() = TinkerAttributes(vertex, registry)
}
/**
* Represents a TinkerPop-based state graph, providing access to nodes and events within the graph.
* This class integrates with TinkerPop's graph traversal source to manage and query the graph.
*/
public class TinkerStateGraph(
private val ts: GraphTraversalSource,
private val rootNodeId: NodeId,
public val registry: TinkerRegistry,
override val clock: Clock = Clock.System,
) : MutableStateGraph, AutoCloseable by ts.graph {
public companion object {
private const val HOLD_EDGE_LABEL = "holds"
}
private val sharedEventFlow = MutableSharedFlow<GraphEvent>()
private suspend fun emitEvent(event: GraphEvent) {
sharedEventFlow.emit(event)
}
private fun findVertex(elementId: NodeId): Vertex? = ts.V(elementId).tryNext().getOrNull()
override fun nodeIds(): Flow<NodeId> =
ts.V(rootNodeId).outE(HOLD_EDGE_LABEL).inV().id().asFlow().map { it.toString() }
override suspend fun get(id: NodeId): StateNode? = findVertex(id)?.let { TinkerNode(it, registry) }
// @OptIn(ExperimentalUuidApi::class)
// private fun generateEventId() = Uuid.random().toHexString()
override suspend fun createNode(
nodeType: NodeType,
time: Instant,
attributes: Attributes
): StateNode {
val vertex = ts.V(rootNodeId).`as`("root")
.addV(nodeType).writeAttributes(registry, attributes).`as`("newVertex")
.addE(HOLD_EDGE_LABEL).from("root").to("newVertex")
.select<Vertex>("root").next()
val newNode = TinkerNode(vertex, registry)
emitEvent(
GraphEvent.NodeChanged(
nodeId = newNode.id,
nodeBefore = null,
nodeAfter = newNode,
time = time
)
)
return newNode
}
override suspend fun removeNode(nodeId: NodeId, time: Instant): StateNode? {
val vertex = findVertex(nodeId) ?: return null
vertex.remove()
emitEvent(
GraphEvent.NodeChanged(
nodeId = nodeId,
nodeBefore = TinkerNode(vertex, registry),
nodeAfter = null,
time = time
)
)
return TinkerNode(vertex, registry)
}
override suspend fun <T> setAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
value: T,
time: Instant
): Unit = with(registry.accessorByAttribute(attribute)) {
val node = ts.V(nodeId).tryNext().orElseThrow()
val oldValue = node.readAttribute()
// val oldValue = ts.V(nodeId)
// .`as`("node")
// .readAttribute()
// .choose<T>(
// P.neq(null),
// sideEffect(
// TinkerPop.select<Vertex>("node").writeAttribute(attribute, value)
// )
// ).next()
if (oldValue != value) {
node.writeAttribute(value)
emitEvent(
GraphEvent.AttributeChanged<T>(
nodeId = nodeId,
attribute = attribute,
valueBefore = oldValue,
valueAfter = value,
time = time
)
)
}
}
override suspend fun <T> removeAttribute(
nodeId: NodeId,
attribute: Attribute<T>,
time: Instant
): Unit = with(registry.accessorByAttribute(attribute)) {
val node = ts.V(nodeId).tryNext().orElseThrow()
val oldValue = node.readAttribute()
// val oldValue = ts.V(nodeId)
// .readAttribute()
// .choose<T>(
// P.neq(null),
// sideEffect(
// TinkerPop.V<Vertex>(nodeId).writeAttribute(attribute, null)
// )
// ).next()
if (oldValue != null) {
node.writeAttribute(null)
emitEvent(
GraphEvent.AttributeChanged(
nodeId = nodeId,
attribute = attribute,
valueBefore = oldValue,
valueAfter = null,
time = time
)
)
}
}
override suspend fun snapshot(): InMemoryStateGraph {
val nodes = nodeIds().map { get(it)!! }.toSet()
return InMemoryStateGraph(nodes, clock)
}
override suspend fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph = snapshot().apply { block() }
override suspend fun transform(block: suspend MutableStateGraph.() -> Unit) {
val graph = ts.graph
if (!graph.features().graph().supportsTransactions()) {
error("Transactions not supported by graph")
}
val transaction = graph.tx()
if (transaction.isOpen) error("Transaction already open")
try {
val txs: GraphTraversalSource = transaction.begin()
TinkerStateGraph(txs, rootNodeId, registry, clock).block()
transaction.commit()
} catch (e: Exception) {
// TODO add logging
transaction.rollback()
}
}
override val history: SharedFlow<GraphEvent> get() = sharedEventFlow
override suspend fun query(query: StateGraphQuery): Flow<StateNode> {
fun GraphTraversal<*, Vertex>.query(
query: StateGraphQuery
): GraphTraversal<*, Vertex> = when (query) {
is StateGraphQuery.OneOf -> or(
*query.queries.map { query(it) }.toTypedArray()
)
is StateGraphQuery.AllOf -> and(
*query.queries.map { query(it) }.toTypedArray()
)
is StateGraphQuery.ByType -> hasLabel(query.type)
is StateGraphQuery.AttributeContains<*> -> TODO()
is StateGraphQuery.AttributeEquals<*> -> {
val accessor = registry.accessorByAttribute(query.attribute)
with(registry.accessorByAttribute(query.attribute)) {
has(accessor.key).where(readAttribute().`is`(query.value))
}
}
//
// is StateGraphQuery.AttributeInRange<*> -> {
// val accessor = registry.attributePipeByAttribute(query.attribute)
// with(registry.attributePipeByAttribute(query.attribute)) {
// has(accessor.key).where(
// readAttribute().filter(
// P.between(
// query.range.start,
// query.range.endInclusive
// )
// )
// )
// }
// }
}
return ts.V().query(query).asFlow().map {
TinkerNode(it, registry)
}
}
}

View File

@@ -0,0 +1,87 @@
package center.sciprog.workflow.state
import kotlinx.coroutines.test.runTest
import org.apache.commons.configuration2.BaseConfiguration
import org.apache.commons.configuration2.Configuration
import org.apache.tinkerpop.gremlin.tinkergraph.structure.AbstractTinkerGraph
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
import space.kscience.attributes.Attribute
import kotlin.test.Test
import kotlin.test.assertEquals
internal class AttributesTest {
object AttributeA : Attribute<Double>
object AttributeB : Attribute<String>
object CA : ConnectionAttribute
private val registry = TinkerRegistry {
attribute("a", AttributeA)
attribute("b", AttributeB)
attribute("connection", CA)
}
val conf: Configuration = BaseConfiguration().apply {
setProperty(
TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER,
AbstractTinkerGraph.DefaultIdManager.LONG.name
)
setProperty(
TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER,
AbstractTinkerGraph.DefaultIdManager.LONG.name
)
setProperty(
TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER,
AbstractTinkerGraph.DefaultIdManager.LONG.name
)
}
@Test
fun tinkerAttributesWriteRead() = runTest {
TinkerGraph.open(conf).use { graph ->
val rootId = graph.traversal().addV().id().next().toString()
TinkerStateGraph(graph.traversal(), rootId, registry).receive {
val node1 = createNode("test1")
val node2 = createNode("test1")
node1[AttributeA] = 1.0
node1[AttributeB] = "test"
assertEquals(1.0, node1[AttributeA])
node1[AttributeA] = null
assertEquals(null, node1[AttributeA])
}
}
}
@Test
fun connectionAttributesWriteRead() = runTest {
TinkerGraph.open(conf).use { graph ->
val rootId = graph.traversal().addV().id().next().toString()
TinkerStateGraph(graph.traversal(), rootId, registry).receive {
val node1 = createNode("test1")
val node2 = createNode("test1")
node1.connectTo(node2, CA, "testConnection")
assertEquals(node2.id, node1[CA]?.outNodeId)
assertEquals(node1.id, node2[CA]?.inNodeId)
}
}
}
}