Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 20dfb48a10 | |||
| 8c3b5a2d2b | |||
| c85107cbf6 | |||
| b34bc80b5f | |||
| 63aed3da14 | |||
| 12d2a6a460 |
70
README.md
Normal file
70
README.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# WorkFlow: industrial process simulation and monitoring tool
|
||||
|
||||
## Aim
|
||||
|
||||
The aim of this work is to create a toolset to monitor and simulate industrial processes like production and processing plants,
|
||||
resource management for project-based teams, budgeting in complex cases, etc.
|
||||
|
||||
The model could be used for:
|
||||
* project monitoring;
|
||||
* project optimisation;
|
||||
* project automation;
|
||||
|
||||
## Not-aim
|
||||
|
||||
Creating GUI is currently out of scope.
|
||||
|
||||
Technical process simulation is done with Controls-kt.
|
||||
|
||||
Project strategy management is currently out of scope.
|
||||
|
||||
## Concepts
|
||||
|
||||
The project is based on several main constructs:
|
||||
|
||||
### Resource/object graph
|
||||
|
||||
The current state of the system is described as a graph structure where each node is a separate entity. Each node has a type
|
||||
and unique ID. Some node types could also have **parameters**, that could not be changed after node creation.
|
||||
|
||||
Node can have any number of **attributes** (they could be added, removed and replaced). Attributes define the current node state.
|
||||
|
||||
Node can have external descriptor that stores meta-information about the node. Like access rights, visualization preferences, etc.
|
||||
|
||||
### Attributes
|
||||
|
||||
Attributes are strongly typed containers for values. An attribute key stores information about attribute name and attribute type.
|
||||
Also, it could store information about default values and serialization rules.
|
||||
|
||||
Some attributes are **relations** that connect two nodes with specific directed connection. Currently, we use only one-to-one relations.
|
||||
|
||||
**To be discussed:** attributes could have their own attributes either defined in themselves, or in schema.
|
||||
|
||||
### Work
|
||||
|
||||
Work is a list of events that contains rules to modify object graph. Basically it has the following events:
|
||||
* add node;
|
||||
* remove node;
|
||||
* modify attribute (add/remove/replace);
|
||||
|
||||
Work also could have events that do not modify resource graph but still should be monitored. Like change of external balance.
|
||||
|
||||
Work events are consumed by a work graph with respect to attached **processes**. It is possible that the process rejects node change.
|
||||
It is also possible that the process adds new node changes based on the graph state.
|
||||
|
||||
**To be discussed:** In case of event rejection, part of work could still be completed based on rules and event relations.
|
||||
The specific rules should be further discussed.
|
||||
|
||||
### Process
|
||||
|
||||
A "smart-contract" (no blockchain yet) that defines automatic changes to object graph based on **work** being done.
|
||||
|
||||
### History
|
||||
|
||||
The log of all changes in object graph. It could be used to construct "time machine" and reverse the state of the graph to any state in the past.
|
||||
The history events mirror **work** events.
|
||||
|
||||
## Further discussion
|
||||
|
||||
Time management is out of the scope for now, but it should be considered in the future. For example, there should be possibility to
|
||||
add delay to the process and allow it to propagate in a virtual time. Timeline from *simulations-kt* could be used for that.
|
||||
@@ -4,6 +4,3 @@ plugins {
|
||||
|
||||
group = "center.sciprog"
|
||||
version = "0.1.0"
|
||||
|
||||
|
||||
val attributesVersion: String by extra("0.1.0")
|
||||
@@ -1,3 +1,3 @@
|
||||
kotlin.code.style=official
|
||||
|
||||
toolsVersion=0.15.2-kotlin-1.9.22
|
||||
toolsVersion=0.17.1-kotlin-2.1.20
|
||||
|
||||
9
gradle/libs.versions.toml
Normal file
9
gradle/libs.versions.toml
Normal file
@@ -0,0 +1,9 @@
|
||||
[versions]
|
||||
|
||||
tinkerpop = "3.7.2"
|
||||
|
||||
|
||||
[libraries]
|
||||
attributes = "space.kscience:attributes-kt:0.3.0"
|
||||
gremlin-core = { module = "org.apache.tinkerpop:gremlin-core", version.ref = "tinkerpop" }
|
||||
tinkergraph-gremlin = { module = "org.apache.tinkerpop:tinkergraph-gremlin", version.ref = "tinkerpop" }
|
||||
@@ -39,4 +39,9 @@ dependencyResolutionManagement {
|
||||
}
|
||||
}
|
||||
|
||||
include(":workflow-core")
|
||||
include(
|
||||
":workflow-core",
|
||||
":workflow-budget",
|
||||
":workflow-state",
|
||||
":workflow-tinkerpop",
|
||||
)
|
||||
28
workflow-budget/build.gradle.kts
Normal file
28
workflow-budget/build.gradle.kts
Normal file
@@ -0,0 +1,28 @@
|
||||
import space.kscience.gradle.Maturity
|
||||
|
||||
plugins {
|
||||
id("space.kscience.gradle.mpp")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
val attributesVersion: String by rootProject.extra
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
js()
|
||||
native()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
commonMain {
|
||||
api(projects.workflowCore)
|
||||
}
|
||||
|
||||
jvmTest {
|
||||
implementation(spclibs.logback.classic)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
readme {
|
||||
maturity = Maturity.EXPERIMENTAL
|
||||
}
|
||||
33
workflow-budget/src/commonMain/kotlin/Payments.kt
Normal file
33
workflow-budget/src/commonMain/kotlin/Payments.kt
Normal file
@@ -0,0 +1,33 @@
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.Work
|
||||
import center.sciprog.workflow.WorkFunction
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.filterIsInstance
|
||||
import kotlinx.coroutines.flow.fold
|
||||
import kotlinx.datetime.Instant
|
||||
|
||||
public class Payments(
|
||||
public val subject: Subject,
|
||||
public val timeFrame: ClosedRange<Instant>,
|
||||
public val valueExtractor: (Money) -> Double,
|
||||
override val defaultBase: Double = 0.0,
|
||||
) : WorkFunction<Double> {
|
||||
override suspend fun compute(work: Work, base: Double): Double {
|
||||
return work.flow().filterIsInstance<ExecutedTransaction>().filter {
|
||||
it.toSubject == subject && it.time in timeFrame
|
||||
}.fold(base) { acc, value -> acc + valueExtractor(value.amount) }
|
||||
}
|
||||
|
||||
public companion object {
|
||||
public fun inRubles(
|
||||
subject: Subject,
|
||||
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
|
||||
): Payments = Payments(
|
||||
subject,
|
||||
timeFrame,
|
||||
{ if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") }
|
||||
)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import kotlinx.serialization.Serializable
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
@@ -1,5 +1,8 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.EventId
|
||||
import center.sciprog.workflow.WorkBuilder
|
||||
import center.sciprog.workflow.WorkEvent
|
||||
import kotlinx.datetime.*
|
||||
import kotlinx.serialization.Serializable
|
||||
import space.kscience.attributes.Attributes
|
||||
@@ -60,5 +63,15 @@ public suspend fun WorkBuilder.plannedTransaction(
|
||||
time: Instant = Clock.System.now(),
|
||||
attributesBuilder: AttributesBuilder<PlannedTransaction>.() -> Unit = {},
|
||||
) {
|
||||
put(PlannedTransaction(generateId(), plannedOn, fromSubject, toSubject, amount, time, Attributes(attributesBuilder)))
|
||||
put(
|
||||
PlannedTransaction(
|
||||
generateId(),
|
||||
plannedOn,
|
||||
fromSubject,
|
||||
toSubject,
|
||||
amount,
|
||||
time,
|
||||
Attributes(attributesBuilder)
|
||||
)
|
||||
)
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import kotlinx.serialization.KSerializer
|
||||
import kotlinx.serialization.builtins.serializer
|
||||
@@ -1,22 +1,23 @@
|
||||
package center.sciprog.workflow
|
||||
package center.sciprog.workflow.budget
|
||||
|
||||
import center.sciprog.workflow.WorkList
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import kotlinx.datetime.*
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.time.Duration.Companion.days
|
||||
|
||||
internal class WorkFunctionTest {
|
||||
@Test
|
||||
fun testPayments() = runTest{
|
||||
val person = Person("test","Test Subject")
|
||||
fun testPayments() = runTest {
|
||||
val person = Person("test", "Test Subject")
|
||||
|
||||
val org = Organization("Master", "Master organization")
|
||||
|
||||
val work = WorkList {
|
||||
val now = Clock.System.now()
|
||||
repeat(10) {
|
||||
transaction(org, person, 1000.rubles, time = now.minus((it*30).days))
|
||||
transaction(org, person, 1000.rubles, time = now.minus((it * 30).days))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,8 +15,7 @@ kscience {
|
||||
useSerialization()
|
||||
commonMain {
|
||||
api(spclibs.kotlinx.datetime)
|
||||
api("space.kscience:attributes-kt:$attributesVersion")
|
||||
api("com.benasher44:uuid:0.8.4")
|
||||
api(libs.attributes)
|
||||
}
|
||||
|
||||
jvmTest {
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
public interface ReversibleWork : Work {
|
||||
public fun reversed(): Work
|
||||
public fun reverseEvent(event: WorkEvent): WorkEvent
|
||||
}
|
||||
@@ -3,7 +3,6 @@ package center.sciprog.workflow
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.merge
|
||||
import kotlinx.datetime.Instant
|
||||
import kotlinx.datetime.LocalDateTime
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
|
||||
public typealias WorkId = String
|
||||
@@ -23,10 +22,17 @@ public interface WorkEvent : AttributeContainer {
|
||||
* A localized time operation is attributed to
|
||||
*/
|
||||
public val time: Instant
|
||||
|
||||
/**
|
||||
* Other events that happened in order for this one to occur
|
||||
*/
|
||||
public val dependsOn: Collection<EventId> get() = emptySet()
|
||||
}
|
||||
|
||||
/**
|
||||
* A central API for all workflow computations
|
||||
* A central API for all workflow computations.
|
||||
*
|
||||
* Work represents a flow of events that could be run multiple times.
|
||||
*/
|
||||
public interface Work {
|
||||
|
||||
@@ -42,6 +48,7 @@ public interface Work {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A work that can produce a modified work via [WorkBuilder]
|
||||
*/
|
||||
|
||||
@@ -1,37 +1,11 @@
|
||||
package center.sciprog.workflow
|
||||
|
||||
import kotlinx.coroutines.flow.filter
|
||||
import kotlinx.coroutines.flow.filterIsInstance
|
||||
import kotlinx.coroutines.flow.fold
|
||||
import kotlinx.datetime.Instant
|
||||
|
||||
/**
|
||||
* Aggregate values for given [Work]
|
||||
*/
|
||||
public interface WorkFunction<T> {
|
||||
public val emptyBase: T
|
||||
public suspend fun compute(work: Work, base: T = emptyBase): T
|
||||
public val defaultBase: T
|
||||
public suspend fun compute(work: Work, base: T = defaultBase): T
|
||||
}
|
||||
|
||||
|
||||
public class Payments(
|
||||
public val subject: Subject,
|
||||
public val timeFrame: ClosedRange<Instant>,
|
||||
public val valueExtractor: (Money) -> Double,
|
||||
override val emptyBase: Double = 0.0,
|
||||
) : WorkFunction<Double> {
|
||||
override suspend fun compute(work: Work, base: Double): Double {
|
||||
return work.flow().filterIsInstance<ExecutedTransaction>().filter {
|
||||
it.toSubject == subject && it.time in timeFrame
|
||||
}.fold(base) { acc, value -> acc + valueExtractor(value.amount) }
|
||||
}
|
||||
|
||||
public companion object {
|
||||
public fun inRubles(
|
||||
subject: Subject,
|
||||
timeFrame: ClosedRange<Instant> = Instant.DISTANT_PAST..Instant.DISTANT_FUTURE,
|
||||
): Payments = Payments(
|
||||
subject,
|
||||
timeFrame,
|
||||
{ if (it.currency == Rubles) it.amount.toDouble() else TODO("No currency converter") }
|
||||
)
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,11 +1,15 @@
|
||||
@file:OptIn(ExperimentalUuidApi::class)
|
||||
|
||||
package center.sciprog.workflow
|
||||
|
||||
import com.benasher44.uuid.uuid4
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.asFlow
|
||||
import kotlin.uuid.ExperimentalUuidApi
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
private class WorkListBuilder(val idPrefix: String, val events: MutableList<WorkEvent>) : WorkBuilder {
|
||||
override suspend fun generateId(): EventId = idPrefix + "-" + uuid4().toString()
|
||||
|
||||
override suspend fun generateId(): EventId = idPrefix + "-" + Uuid.random().toHexString()
|
||||
|
||||
override suspend fun put(event: WorkEvent) {
|
||||
events.add(event)
|
||||
@@ -32,7 +36,7 @@ public class WorkList(private val events: List<WorkEvent>) : WorkWithBuilder<Wor
|
||||
|
||||
override suspend fun modified(block: suspend WorkBuilder.() -> Unit): WorkList {
|
||||
val modifiedEvents = ArrayList(events)
|
||||
val prefix = uuid4().leastSignificantBits.toString(16)
|
||||
val prefix = Uuid.random().toHexString()
|
||||
WorkListBuilder(prefix, modifiedEvents).block()
|
||||
return WorkList((modifiedEvents))
|
||||
}
|
||||
@@ -40,7 +44,7 @@ public class WorkList(private val events: List<WorkEvent>) : WorkWithBuilder<Wor
|
||||
|
||||
public suspend fun WorkList(block: suspend WorkBuilder.() -> Unit): WorkList {
|
||||
val events = mutableListOf<WorkEvent>()
|
||||
val prefix = uuid4().leastSignificantBits.toString(16)
|
||||
val prefix = Uuid.random().toHexString()
|
||||
val builder = WorkListBuilder(prefix, events)
|
||||
builder.block()
|
||||
return WorkList(events)
|
||||
|
||||
22
workflow-state/build.gradle.kts
Normal file
22
workflow-state/build.gradle.kts
Normal file
@@ -0,0 +1,22 @@
|
||||
import space.kscience.gradle.Maturity
|
||||
|
||||
plugins {
|
||||
id("space.kscience.gradle.mpp")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
js()
|
||||
native()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
commonMain {
|
||||
api(projects.workflowCore)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
readme {
|
||||
maturity = Maturity.EXPERIMENTAL
|
||||
}
|
||||
@@ -0,0 +1,150 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import center.sciprog.workflow.EventId
|
||||
import center.sciprog.workflow.WorkEvent
|
||||
import kotlinx.coroutines.flow.SharedFlow
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
|
||||
|
||||
public sealed interface GraphEvent {
|
||||
public val time: Instant
|
||||
}
|
||||
|
||||
/**
|
||||
* Graph mutation event
|
||||
*/
|
||||
public sealed interface GraphMutationEvent : GraphEvent {
|
||||
/**
|
||||
* The time in which event was emitted
|
||||
*/
|
||||
override val time: Instant
|
||||
|
||||
public data class SetAttribute<T>(
|
||||
val nodeId: NodeId,
|
||||
val attribute: Attribute<T>,
|
||||
val value: T,
|
||||
override val time: Instant
|
||||
) : GraphMutationEvent
|
||||
|
||||
public data class RemoveAttribute(
|
||||
val nodeId: NodeId,
|
||||
val attribute: Attribute<*>,
|
||||
override val time: Instant
|
||||
) : GraphMutationEvent
|
||||
|
||||
public data class CreateNode(
|
||||
val nodeType: NodeType,
|
||||
override val time: Instant
|
||||
) : GraphMutationEvent
|
||||
|
||||
public data class RemoveNode(
|
||||
val nodeId: NodeId,
|
||||
override val time: Instant
|
||||
) : GraphMutationEvent
|
||||
}
|
||||
|
||||
/**
|
||||
* History events
|
||||
*/
|
||||
public sealed interface GraphHistoryEvent : WorkEvent, GraphEvent {
|
||||
|
||||
public data class AttributeChanged<T>(
|
||||
val nodeId: NodeId,
|
||||
val attribute: Attribute<T>,
|
||||
val valueBefore: T?,
|
||||
val valueAfter: T?,
|
||||
override val attributes: Attributes,
|
||||
override val id: EventId,
|
||||
override val time: Instant
|
||||
) : GraphHistoryEvent
|
||||
|
||||
public data class NodeChanged(
|
||||
val nodeId: NodeId,
|
||||
val nodeBefore: StateNode?,
|
||||
val nodeAfter: StateNode?,
|
||||
override val attributes: Attributes,
|
||||
override val id: EventId,
|
||||
override val time: Instant
|
||||
) : GraphHistoryEvent
|
||||
}
|
||||
|
||||
|
||||
public interface StateGraph: StateNodeProvider {
|
||||
public val clock: Clock
|
||||
|
||||
public val history: SharedFlow<GraphHistoryEvent>
|
||||
|
||||
/**
|
||||
* Project graph forward or backward in time
|
||||
*/
|
||||
//public fun project(block: suspend MutableStateGraph.() -> Unit): StateGraph
|
||||
}
|
||||
|
||||
public interface MutableStateGraph : StateGraph {
|
||||
|
||||
public suspend fun createNode(
|
||||
nodeType: NodeType,
|
||||
time: Instant = clock.now(),
|
||||
attributes: Attributes = Attributes.EMPTY
|
||||
): StateNode
|
||||
|
||||
public suspend fun removeNode(
|
||||
nodeId: NodeId,
|
||||
time: Instant = clock.now()
|
||||
): StateNode?
|
||||
|
||||
/**
|
||||
* Set attribute for given node.
|
||||
*
|
||||
* It is graph implementation responsibility to properly process connection.
|
||||
* So setting connection attributes should automatically set mirroring attribute.
|
||||
*/
|
||||
public suspend fun <T> setAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
value: T,
|
||||
time: Instant = clock.now()
|
||||
)
|
||||
|
||||
/**
|
||||
* Remove attribute for given [nodeId].
|
||||
*
|
||||
* It is graph implementation responsibility to properly process connection.
|
||||
* So removing mirroring attribute should happen automatically.
|
||||
*/
|
||||
public suspend fun <T> removeAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
time: Instant = clock.now()
|
||||
)
|
||||
|
||||
/**
|
||||
* Perform a couple of actions in a single atomic transaction.
|
||||
*/
|
||||
public suspend fun withTransaction(block: suspend MutableStateGraph.() -> Unit): Unit
|
||||
}
|
||||
|
||||
|
||||
internal suspend fun <T> GraphMutationEvent.SetAttribute<T>.mutate(graph: MutableStateGraph) {
|
||||
graph.setAttribute(nodeId, attribute, value, time)
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutate a graph with event.
|
||||
*/
|
||||
public suspend fun MutableStateGraph.consumeEvent(event: GraphMutationEvent): Unit {
|
||||
when (event) {
|
||||
is GraphMutationEvent.CreateNode -> createNode(event.nodeType, clock.now())
|
||||
is GraphMutationEvent.RemoveNode -> removeNode(event.nodeId, clock.now())
|
||||
is GraphMutationEvent.SetAttribute<*> -> event.mutate(this)
|
||||
|
||||
is GraphMutationEvent.RemoveAttribute -> removeAttribute(
|
||||
nodeId = event.nodeId,
|
||||
attribute = event.attribute,
|
||||
time = clock.now()
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import center.sciprog.workflow.Work
|
||||
import center.sciprog.workflow.WorkEvent
|
||||
|
||||
/**
|
||||
* Stateful [StateGraph] manager
|
||||
*/
|
||||
public interface StateManager {
|
||||
public val graph: StateGraph
|
||||
|
||||
/**
|
||||
* Consume a single event. Fail if event is inconsistent with the current state
|
||||
*/
|
||||
public suspend fun consume(event: WorkEvent)
|
||||
|
||||
/**
|
||||
* Advance [graph] by the flow of events in given [work].
|
||||
*
|
||||
*/
|
||||
public suspend fun execute(work: Work): Unit = work.flow().collect(::consume)
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.AttributeContainer
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.keys
|
||||
|
||||
public typealias NodeId = String
|
||||
public typealias NodeType = String
|
||||
public typealias NodeConnectionType = String
|
||||
|
||||
public interface NodeConnection: AttributeContainer {
|
||||
public val type: NodeConnectionType
|
||||
public val inNode: NodeId
|
||||
public val outNode: NodeId
|
||||
}
|
||||
|
||||
public data class SimpleNodeConnection(
|
||||
override val type: NodeConnectionType,
|
||||
override val inNode: NodeId,
|
||||
override val outNode: NodeId,
|
||||
override val attributes: Attributes
|
||||
) : NodeConnection
|
||||
|
||||
public interface ConnectionAttribute : Attribute<NodeConnection>
|
||||
|
||||
|
||||
public interface StateNode : AttributeContainer {
|
||||
public val id: NodeId
|
||||
|
||||
public val type: NodeType
|
||||
|
||||
public companion object {
|
||||
public const val NODE_TYPE_KEY: String = "@type"
|
||||
}
|
||||
}
|
||||
|
||||
public suspend fun StateGraph.connections(nodeId: NodeId): Collection<NodeConnection> {
|
||||
val node = get(nodeId) ?: return emptyList()
|
||||
return node.attributes.keys
|
||||
.filter { it is ConnectionAttribute }
|
||||
.map { node.attributes[it] }
|
||||
.filterIsInstance<NodeConnection>()
|
||||
}
|
||||
|
||||
public suspend fun StateGraph.outConnections(nodeId: NodeId): Collection<NodeConnection> =
|
||||
connections(nodeId).filter { connection -> connection.outNode == nodeId }
|
||||
|
||||
public suspend fun StateGraph.inConnections(nodeId: NodeId): Collection<NodeConnection> =
|
||||
connections(nodeId).filter { connection -> connection.inNode == nodeId }
|
||||
@@ -0,0 +1,76 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.SetAttribute
|
||||
|
||||
/**
|
||||
* Query constructor for a graph
|
||||
*/
|
||||
public sealed interface StateGraphQuery {
|
||||
/**
|
||||
* Select all nodes with given type
|
||||
*/
|
||||
public data class ByType(val type: String) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* Select nodes that adhere to all [queries]
|
||||
*/
|
||||
public data class AllOf(val queries: List<StateGraphQuery>) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* Select nodes that adhere to one of [queries]
|
||||
*/
|
||||
public data class OneOf(val queries: List<StateGraphQuery>) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* Filter nodes by predicate on attribute value
|
||||
*/
|
||||
public data class AttributeEquals<T>(public val attribute: Attribute<T>, public val value: T) : StateGraphQuery
|
||||
|
||||
/**
|
||||
* A query that selects nodes where a specific set attribute contains a given value.
|
||||
*
|
||||
* This query is used to filter state graph nodes based on whether an element exists
|
||||
* in the set associated with a specified attribute.
|
||||
*
|
||||
* @param attribute The set attribute to evaluate.
|
||||
* @param value The value to check for in the attribute's set.
|
||||
* @param T The type of elements within the set attribute.
|
||||
*/
|
||||
public data class AttributeContains<T>(public val attribute: SetAttribute<T>, public val value: T) : StateGraphQuery
|
||||
|
||||
// /**
|
||||
// * Represents a query that filters nodes in a state graph based on whether a specified attribute
|
||||
// * falls within a defined range of values.
|
||||
// *
|
||||
// * This query is particularly useful for selecting graph nodes where an attribute of a comparable type
|
||||
// * meets criteria defined by a closed range.
|
||||
// *
|
||||
// * @param T The type of the attribute, which must implement the [Comparable] interface.
|
||||
// * @property attribute The attribute used for comparison within the query.
|
||||
// * @property range The closed range used to filter nodes whose attribute values fall within this range.
|
||||
// */
|
||||
// public data class AttributeInRange<T: Comparable<T>>(public val attribute: Attribute<T>, public val range: ClosedRange<T>) : StateGraphQuery
|
||||
//
|
||||
// public data class ByQuery(val query: String) : StateGraphQuery
|
||||
}
|
||||
|
||||
public data class NodeWithId(val nodeId: NodeId, val node: StateNode)
|
||||
|
||||
public interface StateNodeProvider {
|
||||
/**
|
||||
* Flow all node Ids. The order of ids is undefined
|
||||
*/
|
||||
public fun nodeIds(): Flow<NodeId>
|
||||
|
||||
/**
|
||||
* Read a single node if it exists
|
||||
*/
|
||||
public suspend operator fun get(id: NodeId): StateNode?
|
||||
|
||||
/**
|
||||
* Request a lazy flow of nodes with a given query. The order
|
||||
*/
|
||||
public suspend fun query(query: StateGraphQuery): Flow<NodeWithId>
|
||||
}
|
||||
26
workflow-tinkerpop/build.gradle.kts
Normal file
26
workflow-tinkerpop/build.gradle.kts
Normal file
@@ -0,0 +1,26 @@
|
||||
import space.kscience.gradle.Maturity
|
||||
|
||||
plugins {
|
||||
id("space.kscience.gradle.mpp")
|
||||
`maven-publish`
|
||||
}
|
||||
|
||||
kscience {
|
||||
jvm()
|
||||
useCoroutines()
|
||||
useSerialization()
|
||||
|
||||
jvmMain {
|
||||
api(projects.workflowState)
|
||||
api(libs.gremlin.core)
|
||||
}
|
||||
|
||||
jvmTest {
|
||||
implementation(spclibs.logback.classic)
|
||||
implementation(libs.tinkergraph.gremlin)
|
||||
}
|
||||
}
|
||||
|
||||
readme {
|
||||
maturity = Maturity.EXPERIMENTAL
|
||||
}
|
||||
@@ -0,0 +1,169 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
|
||||
import org.apache.tinkerpop.gremlin.structure.Edge
|
||||
import org.apache.tinkerpop.gremlin.structure.Element
|
||||
import org.apache.tinkerpop.gremlin.structure.Vertex
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import space.kscience.attributes.UnsafeAPI
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__ as TinkerPop
|
||||
|
||||
public interface TinkerAttributeAccessor<T> {
|
||||
/**
|
||||
* Key of corresponding edge or VertexAttribute
|
||||
*/
|
||||
public val key: String
|
||||
|
||||
public val attribute: Attribute<T>
|
||||
|
||||
public fun Element.readAttribute(): T
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> =
|
||||
map { it.get().readAttribute() }
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.writeAttribute(
|
||||
attribute: Attribute<T>,
|
||||
value: T?
|
||||
): GraphTraversal<*, E>
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(attributes: Attributes): GraphTraversal<*, E> =
|
||||
writeAttribute(attribute, attributes[attribute])
|
||||
}
|
||||
|
||||
public fun <E : Element> GraphTraversal<*, E>.writeAttributes(
|
||||
registry: TinkerRegistry,
|
||||
attributes: Attributes
|
||||
): GraphTraversal<*, E> {
|
||||
var traversal = this
|
||||
attributes.content.keys.forEach { attribute ->
|
||||
val writer = registry.attributePipeByAttribute(attribute)
|
||||
with(writer) {
|
||||
traversal = traversal.writeAttributeFrom(attributes)
|
||||
}
|
||||
}
|
||||
return traversal
|
||||
}
|
||||
|
||||
/**
|
||||
* A [TinkerAttributeAccessor] implementation that assumes
|
||||
*/
|
||||
public class SimpleTinkerAttributeAccessor<T>(
|
||||
override val key: String,
|
||||
override val attribute: Attribute<T>
|
||||
) : TinkerAttributeAccessor<T> {
|
||||
|
||||
override fun Element.readAttribute(): T = this.value<T>(key)
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> = valueOrNull(key)
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
|
||||
attribute: Attribute<T>,
|
||||
value: T?
|
||||
): GraphTraversal<*, E> = property(key, value)
|
||||
}
|
||||
|
||||
public interface TinkerConnectionAttributeAccessor<T : NodeConnection> : TinkerAttributeAccessor<T> {
|
||||
|
||||
public fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, T?>
|
||||
|
||||
/**
|
||||
* Update edge after creation
|
||||
*/
|
||||
public fun GraphTraversal<*, Edge>.writeEdgeAttributes(connection: T): GraphTraversal<*, Edge>
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.readAttribute(): GraphTraversal<*, T?> = bothE(key).readEdge()
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.writeAttribute(
|
||||
attribute: Attribute<T>,
|
||||
value: T?
|
||||
): GraphTraversal<*, E> = if (value == null) {
|
||||
branch(
|
||||
bothE(key).drop()
|
||||
)
|
||||
} else {
|
||||
branch(
|
||||
addE(key).from(V(value.inNode)).to(V(value.outNode)).writeEdgeAttributes(value)
|
||||
)
|
||||
}
|
||||
|
||||
override fun <E : Element> GraphTraversal<*, E>.writeAttributeFrom(
|
||||
attributes: Attributes
|
||||
): GraphTraversal<*, E> = writeAttribute(attribute, attributes[attribute])
|
||||
}
|
||||
|
||||
public class SimpleTinkerConnectionAttributeAccessor(
|
||||
override val key: String,
|
||||
override val attribute: Attribute<SimpleNodeConnection>,
|
||||
private val registry: TinkerRegistry,
|
||||
) : TinkerConnectionAttributeAccessor<SimpleNodeConnection> {
|
||||
|
||||
override fun Element.readAttribute(): SimpleNodeConnection {
|
||||
val edge = this as? Edge ?: error("Not an edge: $this")
|
||||
return SimpleNodeConnection(
|
||||
edge.label(),
|
||||
edge.inVertex().id().toString(),
|
||||
edge.outVertex().id().toString(),
|
||||
TinkerAttributes(edge, registry)
|
||||
)
|
||||
}
|
||||
|
||||
override fun GraphTraversal<*, Edge>.readEdge(): GraphTraversal<*, SimpleNodeConnection?> = map {
|
||||
val edge = it.get()
|
||||
SimpleNodeConnection(
|
||||
edge.label(),
|
||||
edge.inVertex().id().toString(),
|
||||
edge.outVertex().id().toString(),
|
||||
TinkerAttributes(edge, registry)
|
||||
)
|
||||
}
|
||||
|
||||
override fun GraphTraversal<*, Edge>.writeEdgeAttributes(
|
||||
connection: SimpleNodeConnection
|
||||
): GraphTraversal<*, Edge> = writeAttributes(registry, connection.attributes)
|
||||
|
||||
}
|
||||
|
||||
@OptIn(UnsafeAPI::class)
|
||||
public class TinkerAttributes(
|
||||
public val element: Element,
|
||||
private val registry: TinkerRegistry
|
||||
) : Attributes {
|
||||
|
||||
override val content: Map<out Attribute<*>, Any?>
|
||||
get() = element.keys().mapNotNull {
|
||||
registry.attributePipeByLabel(it)
|
||||
}.associate { converter ->
|
||||
with(converter) {
|
||||
converter.attribute to element.readAttribute()
|
||||
}
|
||||
}
|
||||
|
||||
override fun toString(): String = "TinkerAttributes(element=$element)"
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (javaClass != other?.javaClass) return false
|
||||
|
||||
other as TinkerAttributes
|
||||
|
||||
if (element != other.element) return false
|
||||
if (registry != other.registry) return false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = element.hashCode()
|
||||
result = 31 * result + registry.hashCode()
|
||||
return result
|
||||
}
|
||||
|
||||
public companion object
|
||||
}
|
||||
|
||||
public fun <E, T> GraphTraversal<E, *>.valueOrNull(propertyKey: String): GraphTraversal<E, T?> = choose<T>(
|
||||
TinkerPop.has<Vertex>(propertyKey),
|
||||
TinkerPop.values<Vertex, T>(propertyKey),
|
||||
TinkerPop.constant(null)
|
||||
)
|
||||
@@ -0,0 +1,41 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import space.kscience.attributes.Attribute
|
||||
|
||||
public interface TinkerRegistry {
|
||||
public fun attributePipeByLabel(label: String): TinkerAttributeAccessor<*>
|
||||
public fun <T> attributePipeByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T>
|
||||
}
|
||||
|
||||
private class SimpleTinkerRegistry(val accessors: Set<TinkerAttributeAccessor<*>>) : TinkerRegistry {
|
||||
override fun attributePipeByLabel(label: String): TinkerAttributeAccessor<*> = accessors.single { it.key == label }
|
||||
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
override fun <T> attributePipeByAttribute(attribute: Attribute<T>): TinkerAttributeAccessor<T> = accessors.single {
|
||||
it.attribute == attribute
|
||||
} as TinkerAttributeAccessor<T>
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a TinkerRegistry with the provided set of attribute accessors.
|
||||
*
|
||||
* @param accessors A set of TinkerAttributeAccessor objects that define the mapping
|
||||
* between graph elements and their corresponding attributes.
|
||||
* @return A TinkerRegistry instance that maps attributes to their corresponding keys or labels.
|
||||
*/
|
||||
public fun TinkerRegistry(
|
||||
accessors: Set<TinkerAttributeAccessor<*>>
|
||||
): TinkerRegistry = SimpleTinkerRegistry(accessors)
|
||||
|
||||
public fun <T> MutableSet<TinkerAttributeAccessor<*>>.attribute(
|
||||
key: String,
|
||||
attribute: Attribute<T>
|
||||
) {
|
||||
add(SimpleTinkerAttributeAccessor(key, attribute))
|
||||
}
|
||||
|
||||
public fun TinkerRegistry(
|
||||
registryBuilder: MutableSet<TinkerAttributeAccessor<*>>.() -> Unit
|
||||
): TinkerRegistry = TinkerRegistry(buildSet(registryBuilder))
|
||||
|
||||
@@ -0,0 +1,212 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.flow.*
|
||||
import kotlinx.datetime.Clock
|
||||
import kotlinx.datetime.Instant
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.P
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.V
|
||||
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.sideEffect
|
||||
import org.apache.tinkerpop.gremlin.structure.Vertex
|
||||
import space.kscience.attributes.Attribute
|
||||
import space.kscience.attributes.Attributes
|
||||
import kotlin.uuid.ExperimentalUuidApi
|
||||
import kotlin.uuid.Uuid
|
||||
|
||||
public class TinkerNode(
|
||||
private val vertex: Vertex,
|
||||
private val registry: TinkerRegistry
|
||||
) : StateNode {
|
||||
|
||||
override val id: NodeId get() = vertex.id().toString()
|
||||
|
||||
override val type: NodeType get() = vertex.label()
|
||||
|
||||
override val attributes: Attributes get() = TinkerAttributes(vertex, registry)
|
||||
}
|
||||
|
||||
public class TinkerStateGraph(
|
||||
private val ts: GraphTraversalSource,
|
||||
public val registry: TinkerRegistry,
|
||||
override val clock: Clock = Clock.System,
|
||||
) : MutableStateGraph, AutoCloseable by ts.graph {
|
||||
|
||||
private val sharedEventFlow = MutableSharedFlow<GraphHistoryEvent>()
|
||||
|
||||
private suspend fun emitEvent(event: GraphHistoryEvent) {
|
||||
sharedEventFlow.emit(event)
|
||||
}
|
||||
|
||||
private fun findVertex(elementId: NodeId): Vertex? = ts.V(elementId).asSequence().firstOrNull()
|
||||
|
||||
override fun nodeIds(): Flow<NodeId> = ts.V().asFlow().map { it.id().toString() }
|
||||
|
||||
override suspend fun get(id: NodeId): StateNode? = findVertex(id)?.let { TinkerNode(it, registry) }
|
||||
|
||||
@OptIn(ExperimentalUuidApi::class)
|
||||
private fun generateEventId() = Uuid.random().toHexString()
|
||||
|
||||
override suspend fun createNode(
|
||||
nodeType: NodeType,
|
||||
time: Instant,
|
||||
attributes: Attributes
|
||||
): StateNode {
|
||||
val vertex = ts.addV(nodeType).writeAttributes(registry, attributes).asSequence().first()
|
||||
val newNode = TinkerNode(vertex, registry)
|
||||
|
||||
emitEvent(
|
||||
GraphHistoryEvent.NodeChanged(
|
||||
nodeId = newNode.id,
|
||||
nodeBefore = null,
|
||||
nodeAfter = newNode,
|
||||
attributes = attributes,
|
||||
id = generateEventId(),
|
||||
time = time
|
||||
)
|
||||
)
|
||||
|
||||
return newNode
|
||||
}
|
||||
|
||||
override suspend fun removeNode(nodeId: NodeId, time: Instant): StateNode? {
|
||||
val vertex = findVertex(nodeId) ?: return null
|
||||
emitEvent(
|
||||
GraphHistoryEvent.NodeChanged(
|
||||
nodeId = nodeId,
|
||||
nodeBefore = TinkerNode(vertex, registry),
|
||||
nodeAfter = null,
|
||||
attributes = Attributes.EMPTY,
|
||||
id = generateEventId(),
|
||||
time = time
|
||||
)
|
||||
)
|
||||
return TinkerNode(vertex, registry)
|
||||
}
|
||||
|
||||
|
||||
override suspend fun <T> setAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
value: T,
|
||||
time: Instant
|
||||
): Unit = with(registry.attributePipeByAttribute(attribute)) {
|
||||
val oldValue: T? = ts.V(nodeId)
|
||||
.readAttribute()
|
||||
.choose<T>(
|
||||
P.neq(value),
|
||||
sideEffect(
|
||||
V<Vertex>(nodeId).property(key, value)
|
||||
)
|
||||
).next()
|
||||
|
||||
if (oldValue != value) {
|
||||
emitEvent(
|
||||
GraphHistoryEvent.AttributeChanged<T>(
|
||||
nodeId = nodeId,
|
||||
attribute = attribute,
|
||||
valueBefore = oldValue,
|
||||
valueAfter = value,
|
||||
attributes = Attributes.EMPTY,
|
||||
id = generateEventId(),
|
||||
time = time
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun <T> removeAttribute(
|
||||
nodeId: NodeId,
|
||||
attribute: Attribute<T>,
|
||||
time: Instant
|
||||
): Unit = with(registry.attributePipeByAttribute(attribute)) {
|
||||
|
||||
val oldValue = ts.V(nodeId)
|
||||
.readAttribute()
|
||||
.choose<T>(
|
||||
P.neq(null),
|
||||
sideEffect(
|
||||
V<Vertex>(nodeId).properties<T>(key).drop()
|
||||
)
|
||||
).next()
|
||||
|
||||
if (oldValue != null) {
|
||||
emitEvent(
|
||||
GraphHistoryEvent.AttributeChanged(
|
||||
nodeId = nodeId,
|
||||
attribute = attribute,
|
||||
valueBefore = oldValue,
|
||||
valueAfter = null,
|
||||
attributes = Attributes.EMPTY,
|
||||
id = generateEventId(),
|
||||
time = time
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun withTransaction(block: suspend MutableStateGraph.() -> Unit) {
|
||||
val graph = ts.graph
|
||||
if (!graph.features().graph().supportsTransactions()) {
|
||||
error("Transactions not supported by graph")
|
||||
}
|
||||
|
||||
val transaction = graph.tx()
|
||||
if (transaction.isOpen) error("Transaction already open")
|
||||
|
||||
try {
|
||||
val txs: GraphTraversalSource = transaction.begin()
|
||||
TinkerStateGraph(txs, registry, clock).block()
|
||||
transaction.commit()
|
||||
} catch (e: Exception) {
|
||||
// TODO add logging
|
||||
transaction.rollback()
|
||||
}
|
||||
}
|
||||
|
||||
override val history: SharedFlow<GraphHistoryEvent> get() = sharedEventFlow
|
||||
|
||||
|
||||
override suspend fun query(query: StateGraphQuery): Flow<NodeWithId> {
|
||||
|
||||
fun GraphTraversal<*, Vertex>.query(
|
||||
query: StateGraphQuery
|
||||
): GraphTraversal<*, Vertex> = when (query) {
|
||||
is StateGraphQuery.OneOf -> or(
|
||||
*query.queries.map { query(it) }.toTypedArray()
|
||||
)
|
||||
|
||||
is StateGraphQuery.AllOf -> and(
|
||||
*query.queries.map { query(it) }.toTypedArray()
|
||||
)
|
||||
|
||||
is StateGraphQuery.ByType -> hasLabel(query.type)
|
||||
is StateGraphQuery.AttributeContains<*> -> TODO()
|
||||
is StateGraphQuery.AttributeEquals<*> -> {
|
||||
val accessor = registry.attributePipeByAttribute(query.attribute)
|
||||
with(registry.attributePipeByAttribute(query.attribute)) {
|
||||
has(accessor.key).where(readAttribute().`is`(query.value))
|
||||
}
|
||||
}
|
||||
//
|
||||
// is StateGraphQuery.AttributeInRange<*> -> {
|
||||
// val accessor = registry.attributePipeByAttribute(query.attribute)
|
||||
// with(registry.attributePipeByAttribute(query.attribute)) {
|
||||
// has(accessor.key).where(
|
||||
// readAttribute().filter(
|
||||
// P.between(
|
||||
// query.range.start,
|
||||
// query.range.endInclusive
|
||||
// )
|
||||
// )
|
||||
// )
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
return ts.V().query(query).asFlow().map {
|
||||
NodeWithId(it.id().toString(), TinkerNode(it, registry))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package center.sciprog.workflow.state
|
||||
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory
|
||||
import space.kscience.attributes.Attribute
|
||||
import kotlin.test.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
internal class AttributesTest {
|
||||
|
||||
object AttributeA : Attribute<Double>
|
||||
object AttributeB : Attribute<String>
|
||||
|
||||
@Test
|
||||
fun tinkerAttributesWriteRead() = runTest {
|
||||
val graph = TinkerFactory.createModern()
|
||||
|
||||
val registry = TinkerRegistry {
|
||||
attribute("a", AttributeA)
|
||||
attribute("b", AttributeB)
|
||||
}
|
||||
|
||||
TinkerStateGraph(graph.traversal(), registry).use { state ->
|
||||
val node = state.createNode("test")
|
||||
|
||||
state.setAttribute(node.id, AttributeA, 1.0)
|
||||
|
||||
assertEquals(1.0, node.attributes[AttributeA])
|
||||
|
||||
state.removeAttribute(node.id, AttributeA)
|
||||
|
||||
assertEquals(null, node.attributes[AttributeA])
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user