scheduler¶
- class Scheduler(nodes: dict[str, noob.node.spec.NodeSpecification], edges: list[noob.node.base.Edge], source_nodes: list[typing.Annotated[str, AfterValidator(func=<function _is_identifier at 0x776e56993420>), AfterValidator(func=<function _not_reserved at 0x776e569c1620>)]] = <factory>, _logger: logging.Logger = <factory>, _clock: itertools.count = <factory>, _epochs: dict[noob.types.Epoch, noob.toposort.TopoSorter] = <factory>, _subepochs: dict[noob.types.Epoch, set[noob.types.Epoch]] = <factory>, _epoch_log: collections.deque[int] = <factory>, _subgraphs: dict[typing.Annotated[str, AfterValidator(func=<function _is_identifier at 0x776e56993420>), AfterValidator(func=<function _not_reserved at 0x776e569c1620>)], tuple[dict[str, noob.node.spec.NodeSpecification], list[noob.node.base.Edge]]] = <factory>, _frozen_sorters: dict[tuple[typing.Annotated[str, AfterValidator(func=<function _is_identifier at 0x776e56993420>), AfterValidator(func=<function _not_reserved at 0x776e569c1620>)], ...], noob.toposort.TopoSorter] = <factory>)[source]¶
- nodes: dict[str, NodeSpecification]¶
- source_nodes: list[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]]¶
- classmethod from_specification(nodes: dict[str, NodeSpecification], edges: list[Edge]) Self[source]¶
Create an instance of a Scheduler from
NodeSpecificationandEdge
- property graph_signals: set[tuple[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], Annotated[str, AfterValidator(func=_is_identifier)]]][source]¶
The set of (node id, signal) tuples that are depended on in the graph.
Nodes can have many more signals than we actually care about for structuring the graph, this set is only the ones that we care about.
- add_epoch(epoch: int | Epoch | None = None) Epoch[source]¶
Add another epoch with a prepared graph to the scheduler.
- add_subepoch(epoch: Epoch) Epoch[source]¶
Add subepoch!
Creates a topo sorter with all the nodes downstream of the node that created the epoch.
- is_active(epoch: Epoch | None = None) bool[source]¶
Graph remains active while it holds at least one epoch that is active.
- get_ready(epoch: Epoch | None = None, node_id: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)] | None = None) list[MetaEvent][source]¶
Output the set of nodes that are ready across different epochs.
- Parameters:
epoch (
Epoch | None) – if an Epoch, get ready events for that epoch, ifNone, get ready events for all epochs.node_id (
str | None) – If present, only get ready events for a single node
- node_is_ready(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], epoch: Epoch | None = None) bool[source]¶
Check if a single node is ready in a single or any epoch
- Parameters:
node (
NodeID) – the node to checkepoch (
int | None) – the epoch to check, ifNone, any epoch
- node_is_done(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], epoch: Epoch) bool[source]¶
Node is expired or done in specified epoch
- sources_finished(epoch: Epoch | None = None) bool[source]¶
Check the source nodes of the given epoch have been processed. If epoch is None, check the source nodes of the latest epoch.
- update(events: MutableSequence[Event | MetaEvent] | MutableSequence[Event]) MutableSequence[Event] | MutableSequence[Event | MetaEvent][source]¶
When a set of events are received, update the graphs within the scheduler. Currently only has
TopoSorter.done()implemented.
- done(epoch: Epoch, node_id: str, signal: Annotated[str, AfterValidator(func=_is_identifier)] | None = None, with_signals: bool = True) MetaEvent | None[source]¶
Mark a node in a given epoch as done.
- Parameters:
with_signals (
bool) – When marking this node as done, also mark all its signals as done.
- expire(epoch: Epoch, node_id: str, signal: Annotated[str, AfterValidator(func=_is_identifier)] | None = None, with_signals: bool = True, unlock_optionals: bool = True) MetaEvent | None[source]¶
Mark a node as having been completed without making its dependent nodes ready. i.e. when the node emitted
NoEvent
- enable_node(node_id: str) None[source]¶
Enable edges attached to the node and the NodeSpecification enable switches to True
- disable_node(node_id: str) None[source]¶
Disable edges attached to the node and the NodeSpecification enable switches to False
- generations() list[tuple[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)] | NodeSignal, ...]][source]¶
Get the topological generations of the graph: tuples for each set of nodes that can be run at the same time.
Order within a generation is not guaranteed to be stable.
- asset_generations() dict[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)], list[tuple[str, ...]]][source]¶
generations()except only including nodes with direct dependencies on assets, to determine when the asset should be initialized vs. received in the ZMQ Runner.Packed in a dictionary with the asset ID as the key, and the value as the generations for that asset.
- upstream_nodes(node: Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]) set[Annotated[str, AfterValidator(func=_is_identifier), AfterValidator(func=_not_reserved)]][source]¶
All the nodes that have an effect on the given node
From: * Dependencies * If the node has optional dependencies, nodes whose NoEvents it should listen to