Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 32ccc6be4f | |||
| 226d520ffd | |||
| d8870bfd8a | |||
| 99f91fa6f4 | |||
| 20dfb48a10 | |||
| 8c3b5a2d2b | |||
| c85107cbf6 | |||
| b34bc80b5f | |||
| 63aed3da14 | |||
| 12d2a6a460 |
70
README.md
Normal file
70
README.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# WorkFlow: industrial process simulation and monitoring tool
|
||||
|
||||
## Aim
|
||||
|
||||
The aim of this work is to create a toolset to monitor and simulate industrial processes like production and processing plants,
|
||||
resource management for project-based teams, budgeting in complex cases, etc.
|
||||
|
||||
The model could be used for:
|
||||
* project monitoring;
|
||||
* project optimisation;
|
||||
* project automation;
|
||||
|
||||
## Not-aim
|
||||
|
||||
Creating GUI is currently out of scope.
|
||||
|
||||
Technical process simulation is done with Controls-kt.
|
||||
|
||||
Project strategy management is currently out of scope.
|
||||
|
||||
## Concepts
|
||||
|
||||
The project is based on several main constructs:
|
||||
|
||||
### Resource/object graph
|
||||
|
||||
The current state of the system is described as a graph structure where each node is a separate entity. Each node has a type
|
||||
and unique ID. Some node types could also have **parameters**, that could not be changed after node creation.
|
||||
|
||||
Node can have any number of **attributes** (they could be added, removed and replaced). Attributes define the current node state.
|
||||
|
||||
Node can have external descriptor that stores meta-information about the node. Like access rights, visualization preferences, etc.
|
||||
|
||||
### Attributes
|
||||
|
||||
Attributes are strongly typed containers for values. An attribute key stores information about attribute name and attribute type.
|
||||
Also, it could store information about default values and serialization rules.
|
||||
|
||||
Some attributes are **relations** that connect two nodes with specific directed connection. Currently, we use only one-to-one relations.
|
||||
|
||||
**To be discussed:** attributes could have their own attributes either defined in themselves, or in schema.
|
||||
|
||||
### Work
|
||||
|
||||
Work is a list of events that contains rules to modify object graph. Basically it has the following events:
|
||||
* add node;
|
||||
* remove node;
|
||||
* modify attribute (add/remove/replace);
|
||||
|
||||
Work also could have events that do not modify resource graph but still should be monitored. Like change of external balance.
|
||||
|
||||
Work events are consumed by a work graph with respect to attached **processes**. It is possible that the process rejects node change.
|
||||
It is also possible that the process adds new node changes based on the graph state.
|
||||
|
||||
**To be discussed:** In case of event rejection, part of work could still be completed based on rules and event relations.
|
||||
The specific rules should be further discussed.
|
||||
|
||||
### Process
|
||||
|
||||
A "smart-contract" (no blockchain yet) that defines automatic changes to object graph based on **work** being done.
|
||||
|
||||
### History
|
||||
|
||||
The log of all changes in object graph. It could be used to construct "time machine" and reverse the state of the graph to any state in the past.
|
||||
The history events mirror **work** events.
|
||||
|
||||
## Further discussion
|
||||
|
||||
Time management is out of the scope for now, but it should be considered in the future. For example, there should be possibility to
|
||||
add delay to the process and allow it to propagate in a virtual time. Timeline from *simulations-kt* could be used for that.
|
||||
@@ -4,6 +4,3 @@ plugins {
|
||||
|
||||
group = "center.sciprog"
|
||||
version = "0.1.0"
|
||||
|
||||
|
||||
val attributesVersion: String by extra("0.1.0")
|
||||
@@ -1,3 +1,5 @@
|
||||
kotlin.code.style=official
|
||||
|
||||
toolsVersion=0.15.2-kotlin-1.9.22
|
||||
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
|
||||
|
||||
toolsVersion=0.20.2-kotlin-2.3.0
|
||||
|
||||
10
gradle/libs.versions.toml
Normal file
10
gradle/libs.versions.toml
Normal file
@@ -0,0 +1,10 @@
|
||||
[versions]
|
||||
|
||||
tinkerpop = "3.8.0"
|
||||
|
||||
|
||||
[libraries]
|
||||
attributes = "space.kscience:attributes-kt:0.3.0"
|
||||
attributes-serialization = "space.kscience:attributes-kt-serialization:0.4.0"
|
||||
gremlin-core = { module = "org.apache.tinkerpop:gremlin-core", version.ref = "tinkerpop" }
|
||||
tinkergraph-gremlin = { module = "org.apache.tinkerpop:tinkergraph-gremlin", version.ref = "tinkerpop" }
|
||||
2
gradle/wrapper/gradle-wrapper.properties
vendored
2
gradle/wrapper/gradle-wrapper.properties
vendored
@@ -1,6 +1,6 @@
|
||||
#Wed Apr 03 19:58:49 MSK 2024
|
||||
distributionBase=GRADLE_USER_HOME
|
||||
distributionPath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-bin.zip
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
||||
|
||||
@@ -39,4 +39,8 @@ dependencyResolutionManagement {
|
||||
}
|
||||
}
|
||||
|
||||
include(":workflow-core")
|
||||
include(
|
||||
":workflow-core",
|
||||
":workflow-pm",
|
||||
":workflow-tinkerpop",
|
||||
)
|
||||
@@ -13,10 +13,12 @@ kscience {
|
||||
native()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
|
||||
useContextParameters()
|
||||
|
||||
commonMain {
|
||||
api(spclibs.kotlinx.datetime)
|
||||
api("space.kscience:attributes-kt:$attributesVersion")
|
||||
api("com.benasher44:uuid:0.8.4")
|
||||
api(libs.attributes)
|
||||
}
|
||||
|
||||
jvmTest {
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
import space.kscience.attributes.Attributes
|
||||
|
||||
public typealias SubjectId = String
|
||||
|
||||
|
||||
public sealed interface Subject : AttributeContainer {
|
||||
public val id: SubjectId
|
||||
}
|
||||
|
||||
public data class Person(
|
||||
override val id: SubjectId,
|
||||
public val name: String,
|
||||
override val attributes: Attributes = Attributes.EMPTY,
|
||||
) : Subject
|
||||
|
||||
public data class Organization(
|
||||
override val id: SubjectId,
|
||||
public val name: String,
|
||||
override val attributes: Attributes = Attributes.EMPTY,
|
||||
) : Subject
|
||||
|
||||
public interface SubjectProvider {
|
||||
public suspend fun ids(): Flow<SubjectId>
|
||||
public suspend fun provide(subjectId: SubjectId): Subject?
|
||||
}
|
||||
|
||||
public class ListSubjectProvider(private val subjects: List<Subject>) : SubjectProvider {
|
||||
override suspend fun ids(): Flow<SubjectId> = subjects.asFlow().map { it.id }
|
||||
|
||||
override suspend fun provide(subjectId: SubjectId): Subject? = subjects.find { it.id == subjectId }
|
||||
}
|
||||
@@ -1,64 +0,0 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import kotlinx.datetime.*
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.AttributesBuilder
|
||||
|
||||
/**
|
||||
* A money transaction
|
||||
*/
|
||||
public interface Transaction : WorkEvent {
|
||||
public val fromSubject: Subject
|
||||
public val toSubject: Subject
|
||||
public val amount: Money
|
||||
}
|
||||
|
||||
/**
|
||||
* The transaction that has been executed. Some additional information could be encoded in [attributes]
|
||||
*/
|
||||
@Serializable
|
||||
public data class ExecutedTransaction(
|
||||
override val id: EventId,
|
||||
override val fromSubject: Subject,
|
||||
override val toSubject: Subject,
|
||||
override val amount: Money,
|
||||
override val time: Instant,
|
||||
override val attributes: Attributes,
|
||||
) : WorkEvent, Transaction
|
||||
|
||||
public suspend fun WorkBuilder.transaction(
|
||||
fromSubject: Subject,
|
||||
toSubject: Subject,
|
||||
amount: Money,
|
||||
time: Instant = Clock.System.now(),
|
||||
attributesBuilder: AttributesBuilder<ExecutedTransaction>.() -> Unit = {},
|
||||
) {
|
||||
put(ExecutedTransaction(generateId(), fromSubject, toSubject, amount, time, Attributes(attributesBuilder)))
|
||||
}
|
||||
|
||||
/**
|
||||
* The transaction that is planned, but not yet executed
|
||||
*/
|
||||
@Serializable
|
||||
public data class PlannedTransaction(
|
||||
override val id: EventId,
|
||||
public val plannedOn: Instant,
|
||||
override val fromSubject: Subject,
|
||||
override val toSubject: Subject,
|
||||
override val amount: Money,
|
||||
override val time: Instant,
|
||||
override val attributes: Attributes,
|
||||
) : WorkEvent, Transaction
|
||||
|
||||
|
||||
public suspend fun WorkBuilder.plannedTransaction(
|
||||
fromSubject: Subject,
|
||||
toSubject: Subject,
|
||||
plannedOn: Instant,
|
||||
amount: Money,
|
||||
time: Instant = Clock.System.now(),
|
||||
attributesBuilder: AttributesBuilder<PlannedTransaction>.() -> Unit = {},
|
||||
) {
|
||||
put(PlannedTransaction(generateId(), plannedOn, fromSubject, toSubject, amount, time, Attributes(attributesBuilder)))
|
||||
}
|
||||
@@ -1,50 +1,57 @@
|
||||
@file:OptIn(UnstableAPI::class)
|
||||
|
||||
package center.sciprog.workflow
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.datetime.LocalDateTime
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.UnstableAPI
|
||||
import kotlin.time.Instant
|
||||
|
||||
public typealias WorkId = String
|
||||
|
||||
public typealias EventId = String
|
||||
|
||||
/**
|
||||
* A generic operation in a workflow
|
||||
*/
|
||||
public interface WorkEvent : AttributeContainer {
|
||||
/**
|
||||
* A unique ID for the operation
|
||||
*/
|
||||
public val id: EventId
|
||||
public data class WorkEvent<out T>(
|
||||
public val id: EventId,
|
||||
public val time: Instant,
|
||||
public val action: T,
|
||||
override val attributes: Attributes,
|
||||
val dependsOn: Set<EventId> = emptySet(),
|
||||
): AttributeContainer
|
||||
|
||||
public typealias AnyWorkEvent = WorkEvent<Any?>
|
||||
|
||||
/**
|
||||
* A localized time operation is attributed to
|
||||
*/
|
||||
public val time: Instant
|
||||
}
|
||||
|
||||
/**
|
||||
* A central API for all workflow computations
|
||||
* A central API for all workflow computations.
|
||||
*
|
||||
* Work represents a flow of events that could be run multiple times.
|
||||
*/
|
||||
public interface Work {
|
||||
public interface Work<out T> {
|
||||
|
||||
public fun flow(): Flow<WorkEvent>
|
||||
public fun flow(activeOnly: Boolean = true): Flow<WorkEvent<T>>
|
||||
|
||||
public companion object {
|
||||
/**
|
||||
* Join several works into one
|
||||
*/
|
||||
public fun join(vararg works: Work): Work = object : Work {
|
||||
override fun flow(): Flow<WorkEvent> = works.map { it.flow() }.merge()
|
||||
public fun <T> join(vararg works: Work<T>): Work<T> = object : Work<T> {
|
||||
override fun flow(activeOnly: Boolean): Flow<WorkEvent<T>> = works.map { it.flow(activeOnly) }.merge()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public typealias AnyWork = Work<Any?>
|
||||
|
||||
/**
|
||||
* A work that can produce a modified work via [WorkBuilder]
|
||||
* A work that can be forked via [WorkBuilder]
|
||||
*/
|
||||
public interface WorkWithBuilder<W : Work> : Work {
|
||||
public suspend fun modified(block: suspend WorkBuilder.() -> Unit): W
|
||||
public interface WorkStore<A> : Work<A> {
|
||||
|
||||
/**
|
||||
* Produce a modified work by creating a copy and applying changes
|
||||
*/
|
||||
public suspend fun derive(rules: List<WorkRule<A>>, block: suspend WorkBuilder<A>.() -> Unit): Work<A>
|
||||
}
|
||||
@@ -1,35 +1,51 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.AttributesBuilder
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Instant
|
||||
|
||||
|
||||
public interface WorkBuilder {
|
||||
|
||||
public interface WorkBuilder<A> {
|
||||
/**
|
||||
* Generate a unique event Id
|
||||
*/
|
||||
public suspend fun generateId(): EventId
|
||||
|
||||
/**
|
||||
* Add a single event to a builder
|
||||
* Add a single event to a builder. Throw an exception if the event with given Id already exists
|
||||
*/
|
||||
public suspend fun put(event: WorkEvent)
|
||||
public suspend fun put(event: WorkEvent<A>)
|
||||
|
||||
/**
|
||||
* Visit all events Remove or replace or don't change all events in the Work.
|
||||
* * If the result of the [visitor] is the same as input, no changes are done.
|
||||
* * If the result is a new event, the initial event is replaced
|
||||
* * if the result is null, the initial event is removed
|
||||
*/
|
||||
public suspend fun visit(visitor: (WorkEvent) -> WorkEvent?)
|
||||
public suspend fun changeAttributes(eventId: EventId, builder: AttributesBuilder<WorkEvent<A>>.() -> Unit)
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a single event with given ID from builder
|
||||
*/
|
||||
public suspend fun WorkBuilder.remove(id: EventId): Unit = visit { if (it.id == id) null else it }
|
||||
public suspend fun <T> WorkBuilder<T>.setState(eventId: EventId, state: WorkEventState): Unit = changeAttributes(eventId){
|
||||
put(WorkEventState, state)
|
||||
}
|
||||
|
||||
public suspend fun WorkBuilder.putAll(events: Collection<WorkEvent>) {
|
||||
public suspend fun WorkBuilder<*>.cancel(id: EventId) {
|
||||
setState(id, WorkEventState.CANCELED)
|
||||
}
|
||||
|
||||
public suspend fun <T> WorkBuilder<T>.put(
|
||||
content: T,
|
||||
time: Instant = Clock.System.now(),
|
||||
attributes: Attributes = Attributes.EMPTY,
|
||||
dependsOn: Set<EventId> = emptySet()
|
||||
): Unit = put(
|
||||
WorkEvent<T>(
|
||||
id = generateId(),
|
||||
time = time,
|
||||
action = content,
|
||||
attributes = attributes,
|
||||
dependsOn = dependsOn
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
public suspend fun <T> WorkBuilder<T>.putAll(events: Collection<WorkEvent<T>>) {
|
||||
events.forEach {
|
||||
put(it)
|
||||
}
|
||||
@@ -38,10 +54,10 @@ public suspend fun WorkBuilder.putAll(events: Collection<WorkEvent>) {
|
||||
/**
|
||||
* Suspends until all [events] are put into this builder
|
||||
*/
|
||||
public suspend fun WorkBuilder.putAll(events: Flow<WorkEvent>) {
|
||||
public suspend fun <T> WorkBuilder<T>.putAll(events: Flow<WorkEvent<T>>) {
|
||||
events.collect {
|
||||
put(it)
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun WorkBuilder.putAll(work: Work): Unit = putAll(work.flow())
|
||||
public suspend fun <T> WorkBuilder<T>.putAll(work: Work<T>): Unit = putAll(work.flow())
|
||||
@@ -0,0 +1,16 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import space.kscience.attributes.Attribute
|
||||
|
||||
public enum class WorkEventState {
|
||||
PENDING,
|
||||
COMPLETED,
|
||||
CANCELED;
|
||||
|
||||
public companion object : Attribute<WorkEventState>
|
||||
}
|
||||
|
||||
public val AnyWorkEvent.state: WorkEventState get() = attributes[WorkEventState] ?: WorkEventState.PENDING
|
||||
|
||||
|
||||
public val AnyWorkEvent.isActive: Boolean get() = state != WorkEventState.CANCELED
|
||||
@@ -1,37 +1,13 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.filterIsInstance
|
||||
import kotlinx.coroutines.flow.fold
|
||||
import kotlinx.datetime.Instant
|
||||
|
||||
public interface WorkFunction<T> {
|
||||
public val emptyBase: T
|
||||
public suspend fun compute(work: Work, base: T = emptyBase): T
|
||||
/**
|
||||
* Aggregate values for given [Work]
|
||||
*
|
||||
* @param A type of work action ([WorkEvent] content)
|
||||
*/
|
||||
public interface WorkFunction<T, in A> {
|
||||
public val defaultBase: T
|
||||
public suspend fun compute(work: Work<A>, base: T = defaultBase): T
|
||||
}
|
||||
|
||||
|
||||
public class Payments(
|
||||
public val subject: Subject,
|
||||
public val timeFrame: ClosedRange<Instant>,
|
||||
public val valueExtractor: (Money) -> Double,
|
||||
override val emptyBase: Double = 0.0,
|
||||
) : WorkFunction<Double> {
|
||||
override suspend fun compute(work: Work, base: Double): Double {
|
||||
return work.flow().filterIsInstance<ExecutedTransaction>().filter {
|
||||
it.toSubject == subject && it.time in timeFrame
|
||||
}.fold(base) { acc, value -> acc + valueExtractor(value.amount) }
|
||||
}
|
||||
|
||||
public companion object {
|
||||
public fun inRubles(
|
||||
subject: Subject,
|
||||
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
|
||||
): Payments = Payments(
|
||||
subject,
|
||||
timeFrame,
|
||||
{ if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") }
|
||||
)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,47 +1,80 @@
|
||||
@file:OptIn(ExperimentalUuidApi::class)
|
||||
|
||||
package center.sciprog.workflow
|
||||
|
||||
import com.benasher44.uuid.uuid4
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import space.kscience.attributes.AttributesBuilder
|
||||
import space.kscience.attributes.modified
|
||||
import kotlin.uuid.ExperimentalUuidApi
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
private class WorkListBuilder(val idPrefix: String, val events: MutableList<WorkEvent>) : WorkBuilder {
|
||||
override suspend fun generateId(): EventId = idPrefix + "-" + uuid4().toString()
|
||||
private class WorkListBuilder<A>(
|
||||
val idPrefix: String,
|
||||
val events: MutableList<WorkEvent<A>>,
|
||||
val rules: List<WorkRule<A>>
|
||||
) : WorkBuilder<A> {
|
||||
|
||||
override suspend fun put(event: WorkEvent) {
|
||||
override suspend fun generateId(): EventId = idPrefix + "-" + Uuid.random().toHexString()
|
||||
|
||||
override suspend fun put(event: WorkEvent<A>) {
|
||||
if (events.any { it.id == event.id }) error("Event with id ${event.id} already exists")
|
||||
events.add(event)
|
||||
rules.filterIsInstance<WorkRule.OnPut<A>>().forEach { it.run(this, event) }
|
||||
}
|
||||
|
||||
override suspend fun visit(visitor: (WorkEvent) -> WorkEvent?) {
|
||||
val iterator = events.listIterator()
|
||||
while (iterator.hasNext()) {
|
||||
val event = iterator.next()
|
||||
val newEvent = visitor(event)
|
||||
if (newEvent == null) {
|
||||
iterator.remove()
|
||||
} else if (newEvent != event) {
|
||||
iterator.set(newEvent)
|
||||
override suspend fun changeAttributes(
|
||||
eventId: EventId,
|
||||
builder: AttributesBuilder<WorkEvent<A>>.() -> Unit
|
||||
) {
|
||||
//TODO add concurrent safety
|
||||
|
||||
val eventIndex = events.indexOfFirst { it.id == eventId }
|
||||
if (eventIndex == -1) error("Event with id $eventId not found")
|
||||
val event = events[eventIndex]
|
||||
|
||||
val newAttributes = event.attributes.modified(builder)
|
||||
|
||||
events[eventIndex] = event.copy(attributes = newAttributes)
|
||||
|
||||
rules.filterIsInstance<WorkRule.OnAttributesChange<A>>().forEach { it.run(this, event, newAttributes) }
|
||||
|
||||
if (event.isActive && newAttributes[WorkEventState] == WorkEventState.CANCELED) {
|
||||
event.dependsOn.forEach {
|
||||
cancel(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public class WorkList(private val events: List<WorkEvent>) : WorkWithBuilder<WorkList> {
|
||||
public class WorkList<A>(private val events: List<WorkEvent<A>>) : WorkStore<A> {
|
||||
|
||||
override fun flow(): Flow<WorkEvent> = events.asFlow()
|
||||
override fun flow(activeOnly: Boolean): Flow<WorkEvent<A>> = if (activeOnly) {
|
||||
events.asFlow().filter { it.isActive }
|
||||
} else {
|
||||
events.asFlow()
|
||||
}
|
||||
|
||||
override suspend fun modified(block: suspend WorkBuilder.() -> Unit): WorkList {
|
||||
/**
|
||||
* Create a derived work by applying changes to the original work
|
||||
*/
|
||||
override suspend fun derive(rules: List<WorkRule<A>>, block: suspend WorkBuilder<A>.() -> Unit): WorkList<A> {
|
||||
val modifiedEvents = ArrayList(events)
|
||||
val prefix = uuid4().leastSignificantBits.toString(16)
|
||||
WorkListBuilder(prefix, modifiedEvents).block()
|
||||
return WorkList((modifiedEvents))
|
||||
val prefix = Uuid.random().toHexString()
|
||||
WorkListBuilder(prefix, modifiedEvents, rules).block()
|
||||
return WorkList(modifiedEvents)
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun WorkList(block: suspend WorkBuilder.() -> Unit): WorkList {
|
||||
val events = mutableListOf<WorkEvent>()
|
||||
val prefix = uuid4().leastSignificantBits.toString(16)
|
||||
val builder = WorkListBuilder(prefix, events)
|
||||
public suspend fun <T> WorkList(
|
||||
rules: List<WorkRule<T>> = emptyList(),
|
||||
block: suspend WorkBuilder<T>.() -> Unit
|
||||
): WorkList<T> {
|
||||
val events = mutableListOf<WorkEvent<T>>()
|
||||
val prefix = Uuid.random().toHexString()
|
||||
val builder = WorkListBuilder(prefix, events, rules)
|
||||
builder.block()
|
||||
return WorkList(events)
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import space.kscience.attributes.Attributes
|
||||
|
||||
/**
|
||||
* A rule that is triggered when a new event is put into the workflow or when attributes of an event are changed.
|
||||
*/
|
||||
public sealed interface WorkRule<A> {
|
||||
public fun interface OnPut<A> : WorkRule<A> {
|
||||
public fun run(builder: WorkBuilder<A>, event: WorkEvent<A>)
|
||||
}
|
||||
|
||||
public fun interface OnAttributesChange<A> : WorkRule<A> {
|
||||
public fun run(builder: WorkBuilder<A>, event: WorkEvent<A>, newAttributes: Attributes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,162 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.serialization.Contextual
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.withAttribute
|
||||
import space.kscience.attributes.withoutAttribute
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Instant
|
||||
import kotlin.uuid.ExperimentalUuidApi
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
@Serializable
|
||||
private data class InMemoryStateNode(
|
||||
override val id: NodeId,
|
||||
override val type: NodeType,
|
||||
@Contextual override val attributes: Attributes
|
||||
) : StateNode
|
||||
|
||||
/**
|
||||
* In
|
||||
*/
|
||||
@OptIn(ExperimentalUuidApi::class)
|
||||
public class InMemoryStateGraph(
|
||||
nodes: Collection<StateNode> = emptyList(),
|
||||
override val clock: Clock = Clock.System,
|
||||
) : MutableStateGraph {
|
||||
|
||||
private val nodes: HashMap<NodeId, StateNode> = HashMap(
|
||||
nodes.map { value ->
|
||||
value as? InMemoryStateNode ?: InMemoryStateNode(value.id, value.type, value.attributes)
|
||||
}.associateBy { it.id }
|
||||
)
|
||||
|
||||
private val _history = MutableSharedFlow<GraphEvent>()
|
||||
|
||||
override val history: SharedFlow<GraphEvent> get() = _history
|
||||
|
||||
|
||||
/**
|
||||
* Flow all node Ids. The order of ids is undefined
|
||||
*/
|
||||
override fun nodeIds(): Flow<NodeId> = nodes.keys.asFlow()
|
||||
|
||||
/**
|
||||
* Read a single node if it exists
|
||||
*/
|
||||
override suspend fun get(id: NodeId): StateNode? = nodes[id]
|
||||
|
||||
private fun predicate(query: StateGraphQuery): (StateNode) -> Boolean = when (query) {
|
||||
is StateGraphQuery.OneOf -> { node -> query.queries.any { predicate(it)(node) } }
|
||||
is StateGraphQuery.AllOf -> { node -> query.queries.all { predicate(it)(node) } }
|
||||
is StateGraphQuery.AttributeContains<*> -> { node ->
|
||||
query.value in (node[query.attribute] ?: emptySet())
|
||||
}
|
||||
|
||||
is StateGraphQuery.AttributeEquals<*> -> { node ->
|
||||
query.value == node.attributes[query.attribute]
|
||||
}
|
||||
|
||||
is StateGraphQuery.ByType -> { node ->
|
||||
node.type == query.type
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Request a lazy flow of nodes with a given query. The order
|
||||
*/
|
||||
override suspend fun query(query: StateGraphQuery): Flow<StateNode> = nodes.values.asFlow().filter(predicate(query))
|
||||
|
||||
@OptIn(ExperimentalUuidApi::class)
|
||||
private fun generateId(): NodeId = Uuid.random().toHexDashString()
|
||||
|
||||
override suspend fun createNode(
|
||||
nodeType: NodeType,
|
||||
time: Instant,
|
||||
attributes: Attributes
|
||||
): StateNode {
|
||||
val id = generateId()
|
||||
val node = InMemoryStateNode(id, nodeType, attributes)
|
||||
nodes[id] = node
|
||||
_history.emit(GraphEvent.NodeChanged(id, null, node, time))
|
||||
return node
|
||||
}
|
||||
|
||||
override suspend fun removeNode(
|
||||
nodeId: NodeId,
|
||||
time: Instant
|
||||
): StateNode? {
|
||||
val res = nodes.remove(nodeId) ?: return null
|
||||
_history.emit(GraphEvent.NodeChanged(nodeId, res, null, time))
|
||||
return res
|
||||
}
|
||||
|
||||
/**
|
||||
* Set attribute for given node.
|
||||
*
|
||||
* It is graph implementation responsibility to properly process the connection.
|
||||
* So setting connection attributes should automatically set mirroring attribute.
|
||||
*/
|
||||
override suspend fun <T> setAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
value: T,
|
||||
time: Instant
|
||||
) {
|
||||
val node = nodes[nodeId] ?: error("Node with id $nodeId does not exist in the graph")
|
||||
nodes[nodeId] = InMemoryStateNode(nodeId, node.type, node.attributes.withAttribute(attribute, value))
|
||||
_history.emit(
|
||||
GraphEvent.AttributeChanged<T>(
|
||||
nodeId = nodeId,
|
||||
attribute = attribute,
|
||||
valueBefore = node[attribute],
|
||||
valueAfter = value,
|
||||
time = time
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove attribute for given [nodeId].
|
||||
*
|
||||
* It is graph implementation responsibility to properly process connection.
|
||||
* So removing the mirroring attribute should happen automatically.
|
||||
*/
|
||||
override suspend fun <T> removeAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
time: Instant
|
||||
) {
|
||||
val node = nodes[nodeId] ?: error("Node with id $nodeId does not exist in the graph")
|
||||
nodes[nodeId] = InMemoryStateNode(nodeId, node.type, node.attributes.withoutAttribute(attribute))
|
||||
_history.emit(
|
||||
GraphEvent.AttributeChanged<T>(
|
||||
nodeId = nodeId,
|
||||
attribute = attribute,
|
||||
valueBefore = node[attribute],
|
||||
valueAfter = null,
|
||||
time = time
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
override suspend fun snapshot(): StateGraph = InMemoryStateGraph(nodes.values, clock)
|
||||
|
||||
override suspend fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph =
|
||||
InMemoryStateGraph(nodes.values, clock).apply {
|
||||
block()
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform a couple of actions in a single atomic transaction.
|
||||
*/
|
||||
override suspend fun transform(block: suspend MutableStateGraph.() -> Unit) {
|
||||
//create a copy of the current graph
|
||||
val transactionGraph = InMemoryStateGraph(nodes.values, clock)
|
||||
transactionGraph.block()
|
||||
nodes.putAll(transactionGraph.nodes)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
public sealed interface ProcessResult {
|
||||
public data object Success : ProcessResult
|
||||
public data class Failure(public val cause: Throwable) : ProcessResult
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A change that is applied to a [MutableStateGraph]
|
||||
*/
|
||||
public fun interface Process {
|
||||
/**
|
||||
* Execute a mutation on a node. The mutation could span several nodes.
|
||||
*
|
||||
* The whole mutation is considered atomic.
|
||||
*/
|
||||
public suspend fun MutableStateGraph.execute(node: StateNode): ProcessResult
|
||||
}
|
||||
|
||||
/**
|
||||
* A process that is automatically triggered on node state change
|
||||
*/
|
||||
public interface AutoProcess : Process {
|
||||
/**
|
||||
* A query for all nodes that are watched for changes
|
||||
*/
|
||||
public val watchQuery: StateGraphQuery
|
||||
|
||||
/**
|
||||
* A condition that triggers execution on a specific node
|
||||
*/
|
||||
public suspend fun StateGraph.triggersOn(node: StateNode): Boolean
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import center.sciprog.workflow.*
|
||||
|
||||
/**
|
||||
* Represents a chain of graph states where each state is computed by a [WorkFunction] from previous state.
|
||||
*
|
||||
* @param state the current [StateGraph]
|
||||
* @param parent the parent chain or null for the root chain
|
||||
* @param function the function to compute next state
|
||||
*/
|
||||
public class StateChain<A>(
|
||||
public val state: StateGraph,
|
||||
public val parent: StateChain<A>?,
|
||||
public val function: WorkFunction<StateGraph, A>,
|
||||
public val work: Work<A>,
|
||||
public val workRules: List<WorkRule<A>> = emptyList()
|
||||
)
|
||||
|
||||
//TODO consider adding WorkRules that depend on a current state
|
||||
|
||||
public suspend fun <A> StateChain<A>.project(builder: WorkBuilder<A>.() -> Unit): StateChain<A> {
|
||||
val work = WorkList(workRules, builder)
|
||||
val newState = function.compute(work, state)
|
||||
return StateChain(state = newState, parent = this, function = function, work = work)
|
||||
}
|
||||
|
||||
public fun interface StateWorkFunction<A> : WorkFunction<StateGraph, A> {
|
||||
|
||||
public suspend fun MutableStateGraph.processEvent(event: WorkEvent<A>)
|
||||
|
||||
override val defaultBase: StateGraph get() = StateGraph.EMPTY
|
||||
|
||||
override suspend fun compute(work: Work<A>, base: StateGraph): StateGraph = base.project {
|
||||
work.flow().collect {
|
||||
processEvent(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,179 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.coroutines.flow.emptyFlow
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Instant
|
||||
|
||||
|
||||
/**
|
||||
* History events
|
||||
*/
|
||||
public sealed interface GraphEvent {
|
||||
|
||||
public val time: Instant
|
||||
|
||||
public data class AttributeChanged<T>(
|
||||
val nodeId: NodeId,
|
||||
val attribute: Attribute<T>,
|
||||
val valueBefore: T?,
|
||||
val valueAfter: T?,
|
||||
override val time: Instant
|
||||
) : GraphEvent
|
||||
|
||||
public data class NodeChanged(
|
||||
val nodeId: NodeId,
|
||||
val nodeBefore: StateNode?,
|
||||
val nodeAfter: StateNode?,
|
||||
override val time: Instant
|
||||
) : GraphEvent
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a graph structure consisting of interconnected nodes, where each node can maintain attributes, state, and relationships.
|
||||
*
|
||||
* A StateGraph is an immutable representation of the current state of the graph at any given point in time. It provides mechanisms
|
||||
* to retrieve nodes and query the graph but does not allow direct modifications. StateGraph also maintains a historical record
|
||||
* of events that occurred and provides time-based functionalities through a clock component.
|
||||
*
|
||||
* This interface extends [StateNodeProvider], providing additional behavior specific to managing the graph's state and history.
|
||||
*/
|
||||
public interface StateGraph : StateNodeProvider {
|
||||
|
||||
/**
|
||||
* Project a new state graph by applying a block of operations to a mutable copy of the current graph.
|
||||
*/
|
||||
public suspend fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph
|
||||
|
||||
public companion object {
|
||||
public val EMPTY: StateGraph = object : StateGraph {
|
||||
override fun nodeIds(): Flow<NodeId> = emptyFlow()
|
||||
|
||||
override suspend fun get(id: NodeId): StateNode? = null
|
||||
|
||||
override suspend fun query(query: StateGraphQuery): Flow<StateNode> = emptyFlow()
|
||||
|
||||
override suspend fun project(
|
||||
block: suspend MutableStateGraph.() -> Unit
|
||||
): StateGraph = InMemoryStateGraph().apply {
|
||||
block()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public interface MutableStateGraph : StateGraph {
|
||||
|
||||
public val clock: Clock
|
||||
|
||||
public val history: SharedFlow<GraphEvent>
|
||||
|
||||
/**
|
||||
* Create new node with given type and attributes.
|
||||
*
|
||||
* @param nodeType type of the node to create
|
||||
* @param time time of the event, defaults to current time
|
||||
* @param attributes attributes to set on the node
|
||||
* @return created node
|
||||
*/
|
||||
public suspend fun createNode(
|
||||
nodeType: NodeType,
|
||||
time: Instant = clock.now(),
|
||||
attributes: Attributes = Attributes.EMPTY
|
||||
): StateNode
|
||||
|
||||
/**
|
||||
* Remove node with given id.
|
||||
*
|
||||
* @param nodeId id of the node to remove
|
||||
* @param time time of the event, defaults to current time
|
||||
* @return removed node, or null if node was not found
|
||||
*/
|
||||
public suspend fun removeNode(
|
||||
nodeId: NodeId,
|
||||
time: Instant = clock.now()
|
||||
): StateNode?
|
||||
|
||||
/**
|
||||
* Set attribute for given node.
|
||||
*
|
||||
* It is graph implementation's responsibility to properly process the connection.
|
||||
* So setting connection attributes should automatically set mirroring attribute.
|
||||
*/
|
||||
public suspend fun <T> setAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
value: T,
|
||||
time: Instant = clock.now()
|
||||
)
|
||||
|
||||
/**
|
||||
* Remove attribute for given [nodeId].
|
||||
*
|
||||
* It is graph implementation's responsibility to properly process the connection.
|
||||
* So removing the mirroring attribute should happen automatically.
|
||||
*/
|
||||
public suspend fun <T> removeAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
time: Instant = clock.now()
|
||||
)
|
||||
|
||||
/**
|
||||
* Create an immutable snapshot of the current state of the graph.
|
||||
*/
|
||||
public suspend fun snapshot(): StateGraph
|
||||
|
||||
/**
|
||||
* Perform a couple of actions in a single atomic transaction.
|
||||
*/
|
||||
public suspend fun transform(block: suspend MutableStateGraph.() -> Unit)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Removes a specified node from the mutable state graph.
|
||||
*
|
||||
* @param node The node to be removed.
|
||||
* @param time The timestamp indicating when the removal should take place.
|
||||
* @return The removed node if it existed in the graph, or null if no such node was found.
|
||||
*/
|
||||
public suspend fun MutableStateGraph.removeNode(
|
||||
node: StateNode,
|
||||
time: Instant = clock.now()
|
||||
): StateNode? = removeNode(node.id, time)
|
||||
|
||||
/**
|
||||
* Sets an attribute for a specified node in the mutable state graph.
|
||||
*
|
||||
* @param T The type of the attribute value.
|
||||
* @param node The node for which the attribute is being set.
|
||||
* @param attribute The attribute to update.
|
||||
* @param value The value to assign to the attribute.
|
||||
* @param time The timestamp of the operation.
|
||||
* @return Unit
|
||||
*/
|
||||
public suspend fun <T> MutableStateGraph.setAttribute(
|
||||
node: StateNode,
|
||||
attribute: Attribute<T>,
|
||||
value: T,
|
||||
time: Instant = clock.now()
|
||||
): Unit = setAttribute(node.id, attribute, value, time)
|
||||
|
||||
/**
|
||||
* Removes a specific attribute from the given node at the specified point in time.
|
||||
*
|
||||
* @param node The node from which the attribute will be removed.
|
||||
* @param attribute The attribute to be removed from the specified node.
|
||||
* @param time The point in time to associate with the attribute removal operation.
|
||||
* @return Unit value indicating the completion of the operation.
|
||||
*/
|
||||
public suspend fun <T> MutableStateGraph.removeAttribute(
|
||||
node: StateNode,
|
||||
attribute: Attribute<T>,
|
||||
time: Instant = clock.now()
|
||||
): Unit = removeAttribute(node.id, attribute, time)
|
||||
@@ -0,0 +1,65 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.serialization.Contextual
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.keys
|
||||
|
||||
public typealias NodeId = String
|
||||
public typealias NodeType = String
|
||||
public typealias NodeConnectionType = String
|
||||
|
||||
public interface NodeConnection : AttributeContainer {
|
||||
public val type: NodeConnectionType
|
||||
public val inNodeId: NodeId
|
||||
public val outNodeId: NodeId
|
||||
}
|
||||
|
||||
/**
|
||||
* Actualize inNode for this connection in the context of the graph
|
||||
*/
|
||||
context(graph: StateGraph)
|
||||
public suspend fun NodeConnection.inNode(): StateNode = graph[inNodeId] ?: error("InNode with id $inNodeId is not resolved")
|
||||
|
||||
/**
|
||||
* Actualize outNode for this connection in the context of the graph
|
||||
*/
|
||||
context(graph: StateGraph)
|
||||
public suspend fun NodeConnection.outNode(): StateNode = graph[outNodeId] ?: error("OutNode with id $outNodeId is not resolved")
|
||||
|
||||
|
||||
@Serializable
|
||||
public data class SimpleNodeConnection(
|
||||
override val type: NodeConnectionType,
|
||||
override val inNodeId: NodeId,
|
||||
override val outNodeId: NodeId,
|
||||
@Contextual override val attributes: Attributes = Attributes.EMPTY,
|
||||
) : NodeConnection
|
||||
|
||||
public interface ConnectionAttribute : Attribute<NodeConnection>
|
||||
|
||||
public interface StateNode : AttributeContainer {
|
||||
public val id: NodeId
|
||||
|
||||
public val type: NodeType
|
||||
|
||||
public companion object {
|
||||
public const val NODE_TYPE_KEY: String = "@type"
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun StateGraph.connections(nodeId: NodeId): Collection<NodeConnection> {
|
||||
val node = get(nodeId) ?: return emptyList()
|
||||
return node.attributes.keys
|
||||
.filterIsInstance<ConnectionAttribute>()
|
||||
.map { node.attributes[it] }
|
||||
.filterIsInstance<NodeConnection>()
|
||||
}
|
||||
|
||||
public suspend fun StateGraph.outConnections(nodeId: NodeId): Collection<NodeConnection> =
|
||||
connections(nodeId).filter { connection -> connection.outNodeId == nodeId }
|
||||
|
||||
public suspend fun StateGraph.inConnections(nodeId: NodeId): Collection<NodeConnection> =
|
||||
connections(nodeId).filter { connection -> connection.inNodeId == nodeId }
|
||||
@@ -0,0 +1,97 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.SetAttribute
|
||||
|
||||
/**
|
||||
* Query constructor for a graph
|
||||
*/
|
||||
public sealed interface StateGraphQuery {
|
||||
/**
|
||||
* Select all nodes with a given type
|
||||
*/
|
||||
public data class ByType(val type: String) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* Select nodes that adhere to all [queries]
|
||||
*/
|
||||
public data class AllOf(val queries: List<StateGraphQuery>) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* Select nodes that adhere to one of [queries]
|
||||
*/
|
||||
public data class OneOf(val queries: List<StateGraphQuery>) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* Filter nodes by predicate on attribute value
|
||||
*/
|
||||
public data class AttributeEquals<T>(public val attribute: Attribute<T>, public val value: T) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* A query that selects nodes where a specific set attribute contains a given value.
|
||||
*
|
||||
* This query is used to filter state graph nodes based on whether an element exists
|
||||
* in the set associated with a specified attribute.
|
||||
*
|
||||
* @param attribute The set attribute to evaluate.
|
||||
* @param value The value to check for in the attribute's set.
|
||||
* @param T The type of elements within the set attribute.
|
||||
*/
|
||||
public data class AttributeContains<T>(public val attribute: SetAttribute<T>, public val value: T) : StateGraphQuery
|
||||
|
||||
// /**
|
||||
// * Represents a query that filters nodes in a state graph based on whether a specified attribute
|
||||
// * falls within a defined range of values.
|
||||
// *
|
||||
// * This query is particularly useful for selecting graph nodes where an attribute of a comparable type
|
||||
// * meets criteria defined by a closed range.
|
||||
// *
|
||||
// * @param T The type of the attribute, which must implement the [Comparable] interface.
|
||||
// * @property attribute The attribute used for comparison within the query.
|
||||
// * @property range The closed range used to filter nodes whose attribute values fall within this range.
|
||||
// */
|
||||
// public data class AttributeInRange<T: Comparable<T>>(public val attribute: Attribute<T>, public val range: ClosedRange<T>) : StateGraphQuery
|
||||
//
|
||||
// public data class ByQuery(val query: String) : StateGraphQuery
|
||||
}
|
||||
|
||||
public object StateGraphQueryScope
|
||||
|
||||
context(_: StateGraphQueryScope)
|
||||
public fun type(type: String): StateGraphQuery.ByType = StateGraphQuery.ByType(type)
|
||||
|
||||
context(_: StateGraphQueryScope)
|
||||
public fun <T> value(attribute: Attribute<T>, value: T): StateGraphQuery.AttributeEquals<T> =
|
||||
StateGraphQuery.AttributeEquals(attribute, value)
|
||||
|
||||
context(_: StateGraphQueryScope)
|
||||
public fun <T> contains(attribute: SetAttribute<T>, value: T): StateGraphQuery.AttributeContains<T> =
|
||||
StateGraphQuery.AttributeContains(attribute, value)
|
||||
|
||||
context(_: StateGraphQueryScope)
|
||||
public fun allOf(vararg queries: StateGraphQuery): StateGraphQuery.AllOf = StateGraphQuery.AllOf(queries.toList())
|
||||
|
||||
context(_: StateGraphQueryScope)
|
||||
public fun oneOf(vararg queries: StateGraphQuery): StateGraphQuery.OneOf = StateGraphQuery.OneOf(queries.toList())
|
||||
|
||||
|
||||
public interface StateNodeProvider {
|
||||
/**
|
||||
* Flow all node Ids. The order of ids is undefined
|
||||
*/
|
||||
public fun nodeIds(): Flow<NodeId>
|
||||
|
||||
/**
|
||||
* Read a single node if it exists
|
||||
*/
|
||||
public suspend operator fun get(id: NodeId): StateNode?
|
||||
|
||||
/**
|
||||
* Request a lazy flow of nodes with a given query. The order
|
||||
*/
|
||||
public suspend fun query(query: StateGraphQuery): Flow<StateNode>
|
||||
}
|
||||
|
||||
public suspend inline fun StateNodeProvider.query(queryBlock: context(StateGraphQueryScope) () -> StateGraphQuery): Flow<StateNode> =
|
||||
query(context(StateGraphQueryScope) { queryBlock() })
|
||||
@@ -0,0 +1,18 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import kotlin.reflect.KProperty
|
||||
|
||||
public interface StateScheme {
|
||||
public val node: StateNode
|
||||
|
||||
public operator fun <T> Attribute<T>.getValue(thisRef: Any?, property: KProperty<*>): T? = node[this]
|
||||
}
|
||||
|
||||
context(graph: MutableStateGraph) public inline fun StateScheme.modify(
|
||||
block: context(MutableStateGraph) Attributes.() -> Unit
|
||||
) {
|
||||
block(graph, node.attributes)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
|
||||
public inline fun <T : AutoCloseable> T.receive(block: T.() -> Unit) {
|
||||
use {
|
||||
with(it, block)
|
||||
}
|
||||
}
|
||||
|
||||
public operator fun <T> StateNode.get(attribute: Attribute<T>): T? = attributes[attribute]
|
||||
|
||||
|
||||
context(graph: MutableStateGraph)
|
||||
public suspend operator fun <T : Any> StateNode.set(attribute: Attribute<T>, value: T?) {
|
||||
if (value == null) {
|
||||
graph.removeAttribute(this, attribute)
|
||||
} else {
|
||||
graph.setAttribute(this, attribute, value)
|
||||
}
|
||||
}
|
||||
|
||||
context(graph: MutableStateGraph)
|
||||
public suspend fun StateNode.connectTo(
|
||||
other: StateNode,
|
||||
attribute: ConnectionAttribute,
|
||||
connectionType: NodeConnectionType,
|
||||
attributes: Attributes = Attributes.EMPTY
|
||||
): NodeConnection {
|
||||
val connection = SimpleNodeConnection(connectionType, id, other.id, attributes)
|
||||
set(attribute, connection)
|
||||
|
||||
return connection
|
||||
}
|
||||
37
workflow-pm/build.gradle.kts
Normal file
37
workflow-pm/build.gradle.kts
Normal file
@@ -0,0 +1,37 @@
|
||||
import space.kscience.gradle.Maturity
|
||||
|
||||
plugins {
|
||||
id("space.kscience.gradle.mpp")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
description = "Project management workflows"
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
js()
|
||||
native()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
useContextParameters()
|
||||
|
||||
commonMain {
|
||||
api(projects.workflowCore)
|
||||
api(libs.attributes.serialization)
|
||||
}
|
||||
|
||||
jvmTest {
|
||||
implementation(spclibs.logback.classic)
|
||||
}
|
||||
}
|
||||
|
||||
kotlin {
|
||||
compilerOptions {
|
||||
freeCompilerArgs.add("-Xcontext-parameters")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
readme {
|
||||
maturity = Maturity.EXPERIMENTAL
|
||||
}
|
||||
41
workflow-pm/src/commonMain/kotlin/Obligation.kt
Normal file
41
workflow-pm/src/commonMain/kotlin/Obligation.kt
Normal file
@@ -0,0 +1,41 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.WorkBuilder
|
||||
import center.sciprog.workflow.put
|
||||
import center.sciprog.workflow.state.StateNode
|
||||
import center.sciprog.workflow.state.StateScheme
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.AttributesBuilder
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Instant
|
||||
|
||||
/**
|
||||
* Represents a financial obligation between two subjects.
|
||||
*
|
||||
* Positive obligation towards a subject means that it should receiver payment. Negative obligation towards a subject means that it should pay somebody.
|
||||
*/
|
||||
@Serializable
|
||||
public data class Obligation(
|
||||
public val toSubject: SubjectId,
|
||||
public val fromSubject: SubjectId?,
|
||||
public val amount: Money,
|
||||
override val attributes: Attributes,
|
||||
) : PmFinanceAction
|
||||
|
||||
public suspend fun WorkBuilder<PmAction>.obligation(
|
||||
fromSubject: SubjectId,
|
||||
toSubject: SubjectId,
|
||||
amount: Money,
|
||||
time: Instant = Clock.System.now(),
|
||||
attributesBuilder: AttributesBuilder<Payment>.() -> Unit = {},
|
||||
) {
|
||||
put(Obligation(toSubject, fromSubject, amount, Attributes(attributesBuilder)), time)
|
||||
}
|
||||
|
||||
|
||||
public class ObligationState(
|
||||
override val node: StateNode
|
||||
) : StateScheme {
|
||||
public val amount: Money? by AmountOfMoney
|
||||
}
|
||||
66
workflow-pm/src/commonMain/kotlin/ObligationFulfillment.kt
Normal file
66
workflow-pm/src/commonMain/kotlin/ObligationFulfillment.kt
Normal file
@@ -0,0 +1,66 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.Work
|
||||
import center.sciprog.workflow.WorkFunction
|
||||
import kotlinx.coroutines.flow.fold
|
||||
import kotlin.time.Instant
|
||||
|
||||
/**
|
||||
* Compute obligation fulfillment for a given subject within a time frame, applying a filter function to finance actions.
|
||||
*
|
||||
* Positive number means that the subject is paid more than it owed. Negative number means that
|
||||
* obligations towards the subject are not fully covered.
|
||||
*/
|
||||
public class ObligationFulfillment(
|
||||
public val subject: Subject,
|
||||
public val timeFrame: ClosedRange<Instant>,
|
||||
public val valueExtractor: (Money) -> Double,
|
||||
private val filter: (PmFinanceAction) -> Boolean = { true },
|
||||
override val defaultBase: Double = 0.0,
|
||||
) : WorkFunction<Double, PmAction> {
|
||||
|
||||
override suspend fun compute(
|
||||
work: Work<PmAction>,
|
||||
base: Double
|
||||
): Double = work.flow().fold(base) { acc, value ->
|
||||
val action = value.action
|
||||
if (value.time in timeFrame && action is PmFinanceAction && filter(action)) {
|
||||
when(action) {
|
||||
is Payment -> {
|
||||
if (action.toSubject == subject) {
|
||||
acc + valueExtractor(action.amount)
|
||||
} else if (action.fromSubject == subject) {
|
||||
acc - valueExtractor(action.amount)
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
}
|
||||
is Obligation -> {
|
||||
if (action.toSubject == subject) {
|
||||
acc - valueExtractor(action.amount)
|
||||
} else if (action.fromSubject == subject) {
|
||||
acc + valueExtractor(action.amount)
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
}
|
||||
|
||||
public companion object {
|
||||
public fun inRubles(
|
||||
subject: Subject,
|
||||
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
|
||||
filter: (PmFinanceAction) -> Boolean = { true }
|
||||
): ObligationFulfillment = ObligationFulfillment(
|
||||
subject = subject,
|
||||
timeFrame = timeFrame,
|
||||
valueExtractor = { if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") },
|
||||
filter = filter
|
||||
)
|
||||
|
||||
}
|
||||
}
|
||||
42
workflow-pm/src/commonMain/kotlin/Payment.kt
Normal file
42
workflow-pm/src/commonMain/kotlin/Payment.kt
Normal file
@@ -0,0 +1,42 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.WorkBuilder
|
||||
import center.sciprog.workflow.put
|
||||
import center.sciprog.workflow.state.StateNode
|
||||
import center.sciprog.workflow.state.StateScheme
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.AttributesBuilder
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Instant
|
||||
|
||||
|
||||
/**
|
||||
* The transaction that has been executed. Some additional information could be encoded in [attributes].
|
||||
*
|
||||
* Positive payment towards a subject means that it received a payment. Negative payment towards a subject means that it paid to somebody.
|
||||
*/
|
||||
@Serializable
|
||||
public data class Payment(
|
||||
public val toSubject: SubjectId,
|
||||
public val fromSubject: SubjectId?,
|
||||
public val amount: Money,
|
||||
override val attributes: Attributes,
|
||||
) : PmFinanceAction
|
||||
|
||||
public suspend fun WorkBuilder<PmAction>.payment(
|
||||
fromSubject: SubjectId,
|
||||
toSubject: SubjectId,
|
||||
amount: Money,
|
||||
time: Instant = Clock.System.now(),
|
||||
attributesBuilder: AttributesBuilder<Payment>.() -> Unit = {},
|
||||
) {
|
||||
put(Payment(toSubject, fromSubject, amount, Attributes(attributesBuilder)), time)
|
||||
}
|
||||
|
||||
|
||||
public class PaymentState(
|
||||
override val node: StateNode
|
||||
) : StateScheme {
|
||||
public val amount: Money? by AmountOfMoney
|
||||
}
|
||||
49
workflow-pm/src/commonMain/kotlin/PaymentBalance.kt
Normal file
49
workflow-pm/src/commonMain/kotlin/PaymentBalance.kt
Normal file
@@ -0,0 +1,49 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.Work
|
||||
import center.sciprog.workflow.WorkFunction
|
||||
import kotlinx.coroutines.flow.fold
|
||||
import kotlin.time.Instant
|
||||
|
||||
/**
|
||||
* Aggregated payments for one subject in given [timeFrame]
|
||||
*/
|
||||
public class PaymentBalance(
|
||||
public val subject: Subject,
|
||||
public val timeFrame: ClosedRange<Instant>,
|
||||
public val valueExtractor: (Money) -> Double,
|
||||
private val filter: (Payment) -> Boolean = { true },
|
||||
override val defaultBase: Double = 0.0,
|
||||
) : WorkFunction<Double, PmAction> {
|
||||
override suspend fun compute(
|
||||
work: Work<PmAction>,
|
||||
base: Double
|
||||
): Double = work.flow().fold(base) { acc, value ->
|
||||
val action = value.action
|
||||
if (value.time in timeFrame && action is Payment && filter(action)) {
|
||||
if (action.toSubject == subject) {
|
||||
acc + valueExtractor(action.amount)
|
||||
} else if (action.fromSubject == subject) {
|
||||
acc - valueExtractor(action.amount)
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
} else {
|
||||
acc
|
||||
}
|
||||
}
|
||||
|
||||
public companion object {
|
||||
public fun inRubles(
|
||||
subject: Subject,
|
||||
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
|
||||
filter: (Payment) -> Boolean = { true }
|
||||
): PaymentBalance = PaymentBalance(
|
||||
subject = subject,
|
||||
timeFrame = timeFrame,
|
||||
valueExtractor = { if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") },
|
||||
filter = filter
|
||||
)
|
||||
|
||||
}
|
||||
}
|
||||
9
workflow-pm/src/commonMain/kotlin/PmAction.kt
Normal file
9
workflow-pm/src/commonMain/kotlin/PmAction.kt
Normal file
@@ -0,0 +1,9 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
|
||||
public sealed interface PmAction: AttributeContainer
|
||||
|
||||
public sealed interface PmFinanceAction: PmAction
|
||||
|
||||
public sealed interface PmProjectAction: PmAction
|
||||
34
workflow-pm/src/commonMain/kotlin/PmFunction.kt
Normal file
34
workflow-pm/src/commonMain/kotlin/PmFunction.kt
Normal file
@@ -0,0 +1,34 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.WorkEvent
|
||||
import center.sciprog.workflow.state.*
|
||||
import kotlinx.coroutines.flow.single
|
||||
|
||||
public object PmFunction : StateWorkFunction<PmAction> {
|
||||
|
||||
|
||||
private suspend fun MutableStateGraph.getSubjectNode(id: SubjectId): StateNode = query {
|
||||
allOf(
|
||||
type(Subject.TYPE),
|
||||
value(SubjectIdAttribute, id)
|
||||
)
|
||||
}.single()
|
||||
|
||||
override val defaultBase: StateGraph get() = StateGraph.EMPTY
|
||||
|
||||
override suspend fun MutableStateGraph.processEvent(event: WorkEvent<PmAction>) {
|
||||
val action = event.action
|
||||
|
||||
when (action) {
|
||||
is Obligation -> {
|
||||
val toNode = getSubjectNode(action.toSubject)
|
||||
setAttribute(toNode, ObligationAttribute, action)
|
||||
}
|
||||
|
||||
is Payment -> {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.serialization.SerializableAttribute
|
||||
|
||||
public interface Resource
|
||||
|
||||
@@ -14,4 +15,7 @@ public data class Money(
|
||||
val currency: Currency,
|
||||
) : Resource
|
||||
|
||||
|
||||
public object AmountOfMoney : SerializableAttribute<Money>("amount", Money.serializer())
|
||||
|
||||
public val Number.rubles: Money get() = Money(this, Rubles)
|
||||
48
workflow-pm/src/commonMain/kotlin/Subject.kt
Normal file
48
workflow-pm/src/commonMain/kotlin/Subject.kt
Normal file
@@ -0,0 +1,48 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.serializer
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.serialization.SerializableAttribute
|
||||
|
||||
public typealias SubjectId = String
|
||||
|
||||
/**
|
||||
* Represents a subject in the workflow system, which can be either a person or an organization.
|
||||
*/
|
||||
public data class Subject(
|
||||
val id: SubjectId,
|
||||
public val name: String,
|
||||
override val attributes: Attributes = Attributes.EMPTY,
|
||||
) : AttributeContainer{
|
||||
public companion object{
|
||||
public const val TYPE: String = "subject"
|
||||
}
|
||||
}
|
||||
|
||||
public object SubjectName : SerializableAttribute<String>("subject.name", serializer())
|
||||
|
||||
public object SubjectIdAttribute : SerializableAttribute<String>("subject.id", serializer())
|
||||
|
||||
@Serializable
|
||||
public enum class SubjectType {
|
||||
PERSON, ORGANIZATION;
|
||||
|
||||
public companion object: SerializableAttribute<SubjectType>("subject.type", serializer())
|
||||
}
|
||||
|
||||
|
||||
public interface SubjectProvider {
|
||||
public suspend fun ids(): Flow<SubjectId>
|
||||
public suspend fun provide(subjectId: SubjectId): Subject?
|
||||
}
|
||||
|
||||
public class SubjectList(private val subjects: List<Subject>) : SubjectProvider {
|
||||
override suspend fun ids(): Flow<SubjectId> = subjects.asFlow().map { it.id }
|
||||
|
||||
override suspend fun provide(subjectId: SubjectId): Subject? = subjects.find { it.id == subjectId }
|
||||
}
|
||||
13
workflow-pm/src/commonMain/kotlin/attributes.kt
Normal file
13
workflow-pm/src/commonMain/kotlin/attributes.kt
Normal file
@@ -0,0 +1,13 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import space.kscience.attributes.serialization.SerializableAttribute
|
||||
|
||||
/**
|
||||
* The project id this work attributed to
|
||||
*/
|
||||
public object ProjectId : SerializableAttribute<String>("project", kotlinx.serialization.serializer())
|
||||
|
||||
/**
|
||||
* The funding source for this work
|
||||
*/
|
||||
public object SourceId: SerializableAttribute<String>("source", kotlinx.serialization.serializer())
|
||||
@@ -1,4 +1,4 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.builtins.serializer
|
||||
@@ -1,25 +1,26 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.WorkList
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import kotlinx.datetime.*
|
||||
import kotlin.time.Clock
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.time.Duration.Companion.days
|
||||
|
||||
internal class WorkFunctionTest {
|
||||
@Test
|
||||
fun testPayments() = runTest{
|
||||
val person = Person("test","Test Subject")
|
||||
fun testPayments() = runTest {
|
||||
val person = Person("test", "Test Subject")
|
||||
|
||||
val org = Organization("Master", "Master organization")
|
||||
|
||||
val work = WorkList {
|
||||
val now = Clock.System.now()
|
||||
repeat(10) {
|
||||
transaction(org, person, 1000.rubles, time = now.minus((it*30).days))
|
||||
payment(org, person, 1000.rubles, time = now.minus((it * 30).days))
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(10000.0, Payments.inRubles(person).compute(work), 1.0)
|
||||
assertEquals(10000.0, PaymentBalance.inRubles(person).compute(work), 1.0)
|
||||
}
|
||||
}
|
||||
32
workflow-tinkerpop/build.gradle.kts
Normal file
32
workflow-tinkerpop/build.gradle.kts
Normal file
@@ -0,0 +1,32 @@
|
||||
import space.kscience.gradle.Maturity
|
||||
|
||||
plugins {
|
||||
id("space.kscience.gradle.mpp")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
|
||||
jvmMain {
|
||||
api(projects.workflowCore)
|
||||
api(libs.gremlin.core)
|
||||
}
|
||||
|
||||
jvmTest {
|
||||
implementation(spclibs.logback.classic)
|
||||
implementation(libs.tinkergraph.gremlin)
|
||||
}
|
||||
}
|
||||
|
||||
kotlin {
|
||||
compilerOptions {
|
||||
freeCompilerArgs.add("-Xcontext-parameters")
|
||||
}
|
||||
}
|
||||
|
||||
readme {
|
||||
maturity = Maturity.EXPERIMENTAL
|
||||
}
|
||||
@@ -0,0 +1,241 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
|
||||
import org.apache.tinkerpop.gremlin.structure.Direction
|
||||
import org.apache.tinkerpop.gremlin.structure.Edge
|
||||
import org.apache.tinkerpop.gremlin.structure.Element
|
||||
import org.apache.tinkerpop.gremlin.structure.Vertex
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.UnsafeAPI
|
||||
import space.kscience.attributes.keys
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ as TinkerPop
|
||||
|
||||
/**
|
||||
* Interface for accessing attributes on TinkerPop elements (vertices, edges, etc.)
|
||||
*/
|
||||
public interface TinkerAttributeAccessor<T> {
|
||||
/**
|
||||
* Key of corresponding edge or VertexAttribute
|
||||
*/
|
||||
public val key: String
|
||||
|
||||
public val attribute: Attribute<T>
|
||||
|
||||
public fun Element.readAttribute(): T?
|
||||
|
||||
public fun Element.writeAttribute(value: T?)
|
||||
|
||||
public fun Element.writeAttributeFrom(attributes: Attributes): Unit = writeAttribute(attributes[attribute])
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> =
|
||||
map { it.get().readAttribute() }
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.writeAttribute(
|
||||
attribute: Attribute<T>,
|
||||
value: T?
|
||||
): GraphTraversal<*, E>
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(attributes: Attributes): GraphTraversal<*, E> =
|
||||
writeAttribute(attribute, attributes[attribute])
|
||||
}
|
||||
|
||||
public fun Element.writeAttributes(
|
||||
registry: TinkerRegistry,
|
||||
attributes: Attributes
|
||||
) {
|
||||
attributes.keys.forEach { attribute ->
|
||||
val writer = registry.accessorByAttribute(attribute)
|
||||
with(writer) {
|
||||
writeAttributeFrom(attributes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.writeAttributes(
|
||||
registry: TinkerRegistry,
|
||||
attributes: Attributes
|
||||
): GraphTraversal<*, E> {
|
||||
var traversal = this
|
||||
attributes.keys.forEach { attribute ->
|
||||
val writer = registry.accessorByAttribute(attribute)
|
||||
with(writer) {
|
||||
traversal = traversal.writeAttributeFrom(attributes)
|
||||
}
|
||||
}
|
||||
return traversal
|
||||
}
|
||||
|
||||
/**
|
||||
* A [TinkerAttributeAccessor] implementation that assumes
|
||||
*/
|
||||
public class SimpleTinkerAttributeAccessor<T>(
|
||||
override val key: String,
|
||||
override val attribute: Attribute<T>
|
||||
) : TinkerAttributeAccessor<T> {
|
||||
|
||||
override fun Element.readAttribute(): T? = property<T>(key).orElse(null)
|
||||
|
||||
override fun Element.writeAttribute(value: T?) {
|
||||
property(key, value)
|
||||
}
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> = valueOrNull(key)
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
|
||||
attribute: Attribute<T>,
|
||||
value: T?
|
||||
): GraphTraversal<*, E> = property(key, value)
|
||||
}
|
||||
|
||||
public interface TinkerConnectionAttributeAccessor<T : NodeConnection> : TinkerAttributeAccessor<T> {
|
||||
|
||||
public fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, T?>
|
||||
|
||||
/**
|
||||
* Update edge after creation
|
||||
*/
|
||||
public fun GraphTraversal<*, Edge>.writeEdgeAttributes(connection: T): GraphTraversal<*, Edge>
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> =
|
||||
bothE(key).readEdge().orNull()
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
|
||||
attribute: Attribute<T>,
|
||||
value: T?
|
||||
): GraphTraversal<*, E> = if (value == null) {
|
||||
sideEffect(
|
||||
bothE(key).drop()
|
||||
)
|
||||
} else {
|
||||
sideEffect(
|
||||
TinkerPop.addE<Vertex>(key).from(V(value.inNodeId)).to(V(value.outNodeId)).writeEdgeAttributes(value)
|
||||
)
|
||||
}
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(
|
||||
attributes: Attributes
|
||||
): GraphTraversal<*, E> = writeAttribute(attribute, attributes[attribute])
|
||||
}
|
||||
|
||||
public class SimpleTinkerConnectionAttributeAccessor(
|
||||
override val key: String,
|
||||
override val attribute: Attribute<NodeConnection>,
|
||||
private val registry: TinkerRegistry,
|
||||
) : TinkerConnectionAttributeAccessor<NodeConnection> {
|
||||
|
||||
override fun Element.readAttribute(): SimpleNodeConnection? {
|
||||
val vertex = this as? Vertex ?: error("Expected vertex but got $this")
|
||||
val edge = vertex.edges(Direction.BOTH, key).asSequence().singleOrNull() ?: return null
|
||||
|
||||
return SimpleNodeConnection(
|
||||
edge.label(),
|
||||
edge.inVertex().id().toString(),
|
||||
edge.outVertex().id().toString(),
|
||||
TinkerAttributes(edge, registry)
|
||||
)
|
||||
}
|
||||
|
||||
override fun Element.writeAttribute(value: NodeConnection?) {
|
||||
val vertex = this as? Vertex ?: error("Expected vertex but got $this")
|
||||
if (value == null) {
|
||||
vertex.edges(Direction.BOTH, key).forEach { it.remove() }
|
||||
} else {
|
||||
if (value.inNodeId == vertex.id().toString()) {
|
||||
val inNode = vertex
|
||||
val outNode = vertex.graph().vertices(value.outNodeId).asSequence().single()
|
||||
outNode.addEdge(key, inNode).apply {
|
||||
writeAttributes(registry, value.attributes)
|
||||
}
|
||||
} else if (value.outNodeId == vertex.id().toString()) {
|
||||
val inNode = vertex.graph().vertices(value.inNodeId).asSequence().single()
|
||||
val outNode = vertex
|
||||
outNode.addEdge(key, inNode).apply {
|
||||
writeAttributes(registry, value.attributes)
|
||||
}
|
||||
} else {
|
||||
error("Connection $value is not attached to vertex $vertex")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, NodeConnection?> = map {
|
||||
val edge = it.get()
|
||||
SimpleNodeConnection(
|
||||
edge.label(),
|
||||
edge.inVertex().id().toString(),
|
||||
edge.outVertex().id().toString(),
|
||||
TinkerAttributes(edge, registry)
|
||||
)
|
||||
}
|
||||
|
||||
override fun GraphTraversal<*, Edge>.writeEdgeAttributes(
|
||||
connection: NodeConnection
|
||||
): GraphTraversal<*, Edge> = writeAttributes(registry, connection.attributes)
|
||||
|
||||
}
|
||||
|
||||
@OptIn(UnsafeAPI::class)
|
||||
public class TinkerAttributes(
|
||||
public val element: Element,
|
||||
private val registry: TinkerRegistry
|
||||
) : Attributes {
|
||||
|
||||
private fun propertyContent() = element.keys().mapNotNull {
|
||||
registry.accessorByLabel(it)
|
||||
}.associate { accessor ->
|
||||
with(accessor) {
|
||||
accessor.attribute to element.readAttribute()
|
||||
}
|
||||
}
|
||||
|
||||
private fun edges(): Map<Attribute<*>, Any?> {
|
||||
val vertex = element as? Vertex ?: return emptyMap()
|
||||
|
||||
return element.edges(Direction.BOTH).asSequence().associate { edge ->
|
||||
val accessor = registry.accessorByLabel(edge.label()) as TinkerConnectionAttributeAccessor
|
||||
|
||||
with(accessor){
|
||||
accessor.attribute to vertex.readAttribute()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
override val content: Map<out Attribute<*>, Any?>
|
||||
get() = edges() + propertyContent()
|
||||
|
||||
override fun toString(): String = "TinkerAttributes(element=$element)"
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (javaClass != other?.javaClass) return false
|
||||
|
||||
other as TinkerAttributes
|
||||
|
||||
if (element != other.element) return false
|
||||
if (registry != other.registry) return false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = element.hashCode()
|
||||
result = 31 * result + registry.hashCode()
|
||||
return result
|
||||
}
|
||||
|
||||
public companion object
|
||||
}
|
||||
|
||||
public fun <E, T> GraphTraversal<E, T>.orNull(): GraphTraversal<E, T?> = choose(
|
||||
TinkerPop.unfold<T>(),
|
||||
TinkerPop.identity<T>(),
|
||||
TinkerPop.constant(null)
|
||||
)
|
||||
|
||||
public fun <E, T> GraphTraversal<E, *>.valueOrNull(propertyKey: String): GraphTraversal<E, T?> = choose<T>(
|
||||
TinkerPop.has<Vertex>(propertyKey),
|
||||
TinkerPop.values<Vertex, T>(propertyKey),
|
||||
TinkerPop.constant(null)
|
||||
)
|
||||
@@ -0,0 +1,49 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import space.kscience.attributes.Attribute
|
||||
|
||||
/**
|
||||
* Interface representing a registry for managing access to attributes within a TinkerPop graph.
|
||||
* The registry provides mechanisms to retrieve attribute accessors either by label or directly by attribute.
|
||||
*/
|
||||
public interface TinkerRegistry {
|
||||
public fun accessorByLabel(label: String): TinkerAttributeAccessor<*>
|
||||
public fun <T> accessorByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T>
|
||||
}
|
||||
|
||||
private class SimpleTinkerRegistry(val accessors: Set<TinkerAttributeAccessor<*>>) : TinkerRegistry {
|
||||
override fun accessorByLabel(label: String): TinkerAttributeAccessor<*> = accessors.single { it.key == label }
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun <T> accessorByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T> = accessors.single {
|
||||
it.attribute == attribute
|
||||
} as TinkerAttributeAccessor<T>
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a TinkerRegistry with the provided set of attribute accessors.
|
||||
*
|
||||
* @param accessors A set of TinkerAttributeAccessor objects that define the mapping
|
||||
* between graph elements and their corresponding attributes.
|
||||
* @return A TinkerRegistry instance that maps attributes to their corresponding keys or labels.
|
||||
*/
|
||||
public fun TinkerRegistry(
|
||||
accessors: Set<TinkerAttributeAccessor<*>>
|
||||
): TinkerRegistry = SimpleTinkerRegistry(accessors)
|
||||
|
||||
public fun <T> MutableSet<TinkerAttributeAccessor<*>>.attribute(
|
||||
key: String,
|
||||
attribute: Attribute<T>
|
||||
) {
|
||||
if(attribute is ConnectionAttribute){
|
||||
add(SimpleTinkerConnectionAttributeAccessor(key, attribute, TinkerRegistry(this)))
|
||||
} else {
|
||||
add(SimpleTinkerAttributeAccessor(key, attribute))
|
||||
}
|
||||
}
|
||||
|
||||
public fun TinkerRegistry(
|
||||
registryBuilder: MutableSet<TinkerAttributeAccessor<*>>.() -> Unit
|
||||
): TinkerRegistry = TinkerRegistry(buildSet(registryBuilder))
|
||||
|
||||
@@ -0,0 +1,234 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.flow.*
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
|
||||
import org.apache.tinkerpop.gremlin.structure.Vertex
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import kotlin.jvm.optionals.getOrNull
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Instant
|
||||
|
||||
/**
|
||||
* Represents a node in a TinkerPop graph, encapsulating a TinkerPop vertex and providing access to its attributes.
|
||||
*/
|
||||
public class TinkerNode(
|
||||
private val vertex: Vertex,
|
||||
private val registry: TinkerRegistry
|
||||
) : StateNode {
|
||||
|
||||
override val id: NodeId get() = vertex.id().toString()
|
||||
|
||||
override val type: NodeType get() = vertex.label()
|
||||
|
||||
override val attributes: Attributes get() = TinkerAttributes(vertex, registry)
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a TinkerPop-based state graph, providing access to nodes and events within the graph.
|
||||
* This class integrates with TinkerPop's graph traversal source to manage and query the graph.
|
||||
*/
|
||||
public class TinkerStateGraph(
|
||||
private val ts: GraphTraversalSource,
|
||||
private val rootNodeId: NodeId,
|
||||
public val registry: TinkerRegistry,
|
||||
override val clock: Clock = Clock.System,
|
||||
) : MutableStateGraph, AutoCloseable by ts.graph {
|
||||
|
||||
public companion object {
|
||||
private const val HOLD_EDGE_LABEL = "holds"
|
||||
}
|
||||
|
||||
private val sharedEventFlow = MutableSharedFlow<GraphEvent>()
|
||||
|
||||
private suspend fun emitEvent(event: GraphEvent) {
|
||||
sharedEventFlow.emit(event)
|
||||
}
|
||||
|
||||
private fun findVertex(elementId: NodeId): Vertex? = ts.V(elementId).tryNext().getOrNull()
|
||||
|
||||
override fun nodeIds(): Flow<NodeId> =
|
||||
ts.V(rootNodeId).outE(HOLD_EDGE_LABEL).inV().id().asFlow().map { it.toString() }
|
||||
|
||||
override suspend fun get(id: NodeId): StateNode? = findVertex(id)?.let { TinkerNode(it, registry) }
|
||||
|
||||
// @OptIn(ExperimentalUuidApi::class)
|
||||
// private fun generateEventId() = Uuid.random().toHexString()
|
||||
|
||||
override suspend fun createNode(
|
||||
nodeType: NodeType,
|
||||
time: Instant,
|
||||
attributes: Attributes
|
||||
): StateNode {
|
||||
val vertex = ts.V(rootNodeId).`as`("root")
|
||||
.addV(nodeType).writeAttributes(registry, attributes).`as`("newVertex")
|
||||
.addE(HOLD_EDGE_LABEL).from("root").to("newVertex")
|
||||
.select<Vertex>("root").next()
|
||||
|
||||
val newNode = TinkerNode(vertex, registry)
|
||||
|
||||
emitEvent(
|
||||
GraphEvent.NodeChanged(
|
||||
nodeId = newNode.id,
|
||||
nodeBefore = null,
|
||||
nodeAfter = newNode,
|
||||
time = time
|
||||
)
|
||||
)
|
||||
|
||||
return newNode
|
||||
}
|
||||
|
||||
override suspend fun removeNode(nodeId: NodeId, time: Instant): StateNode? {
|
||||
val vertex = findVertex(nodeId) ?: return null
|
||||
vertex.remove()
|
||||
|
||||
emitEvent(
|
||||
GraphEvent.NodeChanged(
|
||||
nodeId = nodeId,
|
||||
nodeBefore = TinkerNode(vertex, registry),
|
||||
nodeAfter = null,
|
||||
time = time
|
||||
)
|
||||
)
|
||||
return TinkerNode(vertex, registry)
|
||||
}
|
||||
|
||||
|
||||
override suspend fun <T> setAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
value: T,
|
||||
time: Instant
|
||||
): Unit = with(registry.accessorByAttribute(attribute)) {
|
||||
val node = ts.V(nodeId).tryNext().orElseThrow()
|
||||
val oldValue = node.readAttribute()
|
||||
|
||||
// val oldValue = ts.V(nodeId)
|
||||
// .`as`("node")
|
||||
// .readAttribute()
|
||||
// .choose<T>(
|
||||
// P.neq(null),
|
||||
// sideEffect(
|
||||
// TinkerPop.select<Vertex>("node").writeAttribute(attribute, value)
|
||||
// )
|
||||
// ).next()
|
||||
|
||||
|
||||
if (oldValue != value) {
|
||||
node.writeAttribute(value)
|
||||
emitEvent(
|
||||
GraphEvent.AttributeChanged<T>(
|
||||
nodeId = nodeId,
|
||||
attribute = attribute,
|
||||
valueBefore = oldValue,
|
||||
valueAfter = value,
|
||||
time = time
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun <T> removeAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
time: Instant
|
||||
): Unit = with(registry.accessorByAttribute(attribute)) {
|
||||
val node = ts.V(nodeId).tryNext().orElseThrow()
|
||||
val oldValue = node.readAttribute()
|
||||
// val oldValue = ts.V(nodeId)
|
||||
// .readAttribute()
|
||||
// .choose<T>(
|
||||
// P.neq(null),
|
||||
// sideEffect(
|
||||
// TinkerPop.V<Vertex>(nodeId).writeAttribute(attribute, null)
|
||||
// )
|
||||
// ).next()
|
||||
|
||||
if (oldValue != null) {
|
||||
node.writeAttribute(null)
|
||||
emitEvent(
|
||||
GraphEvent.AttributeChanged(
|
||||
nodeId = nodeId,
|
||||
attribute = attribute,
|
||||
valueBefore = oldValue,
|
||||
valueAfter = null,
|
||||
time = time
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun snapshot(): InMemoryStateGraph {
|
||||
val nodes = nodeIds().map { get(it)!! }.toSet()
|
||||
return InMemoryStateGraph(nodes, clock)
|
||||
}
|
||||
|
||||
override suspend fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph = snapshot().apply { block() }
|
||||
|
||||
override suspend fun transform(block: suspend MutableStateGraph.() -> Unit) {
|
||||
val graph = ts.graph
|
||||
if (!graph.features().graph().supportsTransactions()) {
|
||||
error("Transactions not supported by graph")
|
||||
}
|
||||
|
||||
val transaction = graph.tx()
|
||||
if (transaction.isOpen) error("Transaction already open")
|
||||
|
||||
try {
|
||||
val txs: GraphTraversalSource = transaction.begin()
|
||||
TinkerStateGraph(txs, rootNodeId, registry, clock).block()
|
||||
transaction.commit()
|
||||
} catch (e: Exception) {
|
||||
// TODO add logging
|
||||
transaction.rollback()
|
||||
}
|
||||
}
|
||||
|
||||
override val history: SharedFlow<GraphEvent> get() = sharedEventFlow
|
||||
|
||||
|
||||
override suspend fun query(query: StateGraphQuery): Flow<StateNode> {
|
||||
|
||||
fun GraphTraversal<*, Vertex>.query(
|
||||
query: StateGraphQuery
|
||||
): GraphTraversal<*, Vertex> = when (query) {
|
||||
is StateGraphQuery.OneOf -> or(
|
||||
*query.queries.map { query(it) }.toTypedArray()
|
||||
)
|
||||
|
||||
is StateGraphQuery.AllOf -> and(
|
||||
*query.queries.map { query(it) }.toTypedArray()
|
||||
)
|
||||
|
||||
is StateGraphQuery.ByType -> hasLabel(query.type)
|
||||
is StateGraphQuery.AttributeContains<*> -> TODO()
|
||||
is StateGraphQuery.AttributeEquals<*> -> {
|
||||
val accessor = registry.accessorByAttribute(query.attribute)
|
||||
with(registry.accessorByAttribute(query.attribute)) {
|
||||
has(accessor.key).where(readAttribute().`is`(query.value))
|
||||
}
|
||||
}
|
||||
//
|
||||
// is StateGraphQuery.AttributeInRange<*> -> {
|
||||
// val accessor = registry.attributePipeByAttribute(query.attribute)
|
||||
// with(registry.attributePipeByAttribute(query.attribute)) {
|
||||
// has(accessor.key).where(
|
||||
// readAttribute().filter(
|
||||
// P.between(
|
||||
// query.range.start,
|
||||
// query.range.endInclusive
|
||||
// )
|
||||
// )
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
return ts.V().query(query).asFlow().map {
|
||||
TinkerNode(it, registry)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,87 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.apache.commons.configuration2.BaseConfiguration
|
||||
import org.apache.commons.configuration2.Configuration
|
||||
import org.apache.tinkerpop.gremlin.tinkergraph.structure.AbstractTinkerGraph
|
||||
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
|
||||
import space.kscience.attributes.Attribute
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
internal class AttributesTest {
|
||||
|
||||
object AttributeA : Attribute<Double>
|
||||
object AttributeB : Attribute<String>
|
||||
object CA : ConnectionAttribute
|
||||
|
||||
private val registry = TinkerRegistry {
|
||||
attribute("a", AttributeA)
|
||||
attribute("b", AttributeB)
|
||||
attribute("connection", CA)
|
||||
}
|
||||
|
||||
val conf: Configuration = BaseConfiguration().apply {
|
||||
setProperty(
|
||||
TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER,
|
||||
AbstractTinkerGraph.DefaultIdManager.LONG.name
|
||||
)
|
||||
setProperty(
|
||||
TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER,
|
||||
AbstractTinkerGraph.DefaultIdManager.LONG.name
|
||||
)
|
||||
setProperty(
|
||||
TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER,
|
||||
AbstractTinkerGraph.DefaultIdManager.LONG.name
|
||||
)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun tinkerAttributesWriteRead() = runTest {
|
||||
|
||||
TinkerGraph.open(conf).use { graph ->
|
||||
|
||||
val rootId = graph.traversal().addV().id().next().toString()
|
||||
|
||||
TinkerStateGraph(graph.traversal(), rootId, registry).receive {
|
||||
val node1 = createNode("test1")
|
||||
|
||||
val node2 = createNode("test1")
|
||||
|
||||
node1[AttributeA] = 1.0
|
||||
|
||||
node1[AttributeB] = "test"
|
||||
|
||||
assertEquals(1.0, node1[AttributeA])
|
||||
|
||||
node1[AttributeA] = null
|
||||
|
||||
assertEquals(null, node1[AttributeA])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun connectionAttributesWriteRead() = runTest {
|
||||
|
||||
TinkerGraph.open(conf).use { graph ->
|
||||
|
||||
val rootId = graph.traversal().addV().id().next().toString()
|
||||
|
||||
|
||||
TinkerStateGraph(graph.traversal(), rootId, registry).receive {
|
||||
val node1 = createNode("test1")
|
||||
|
||||
val node2 = createNode("test1")
|
||||
|
||||
node1.connectTo(node2, CA, "testConnection")
|
||||
|
||||
assertEquals(node2.id, node1[CA]?.outNodeId)
|
||||
|
||||
assertEquals(node1.id, node2[CA]?.inNodeId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user