Imports
After pip install data-flow-control, the following imports are
exposed:
from data_flow_control import (
Dialect,
PassantRewriteError,
Policy,
Resolution,
RewriteOptions,
UiUpdateMode,
UiViolationEvent,
dfc,
)
dfc(conn, dialect=None)
Wraps an existing database connection and returns a policy-aware connection object.
If dialect is omitted, the package tries to infer it from the
connection type.
| Parameter | Type | Description |
conn | Any | Existing DuckDB, SQLite, psycopg, ClickHouse, DataFusion, Umbra, or adapter-backed connection. |
dialect | str | None | Optional explicit dialect. Use this when inference is ambiguous, such as Umbra over psycopg. |
import duckdb
from data_flow_control import dfc
conn = dfc(duckdb.connect())
Connection Object
This class wraps your standard database connection to automatically rewrite any
queries based on registered policies.
| Member | Description |
raw_connection | Property exposing the underlying database connection when the adapter supports it. |
register_policy(policy) | Validates adapter capabilities, syncs catalog metadata, and registers one Policy. |
register_policies(policies) | Registers a list of policies with one catalog sync. |
delete_policy(...) | Deletes a matching policy by sources, sink, constraint, resolution, and description. Returns bool. |
policies() | Returns the currently registered Policy objects. |
transform_query(sql, options=None) | Returns rewritten SQL without executing it. |
execute(query, params=None, options=None) | Rewrites and executes a statement. DDL statements are executed directly and invalidate the catalog cache. |
fetchall(query, params=None, options=None) | Executes a query and returns all rows. |
fetchone(query, params=None, options=None) | Executes a query and returns one row. |
explain(query) | Returns a dictionary explanation of the rewrite plan. |
last_rewrite_stats() | Returns stats from the most recent rewrite when stats collection is enabled. |
last_statement_rewrite_summary() | Returns the last statement-level rewrite summary. |
refresh_catalog(force=False, include_row_counts=False) | Manually refreshes catalog metadata used for policy validation. |
close() | Closes the underlying adapter. |
with dfc(...) as conn | The wrapper supports context manager cleanup. |
Policy
Dataclass describing one data flow policy. Policies can be constructed directly or
parsed from policy text with Policy.from_pgn(...).
| Field | Type | Description |
constraint | str | SQL boolean expression that must hold for the protected flow. Columns must be qualified. |
on_fail | Resolution | Action taken when the constraint fails. |
sources | list[str] | Required list of source tables. Use [] for sink-only policies. |
sink | str | None | Optional table written by protected INSERT, UPDATE, or MERGE statements. |
sink_alias | str | None | Optional alias for the sink table inside the constraint. |
source_aliases | dict[str, str] | None | Optional alias-to-source mapping. Inferred when parsing policy text. |
description | str | None | Human-readable policy description. Also used in UI violation events. |
required_sources | list[str] | None | Sources that must appear in a write query for the policy to pass. Every required source must also be listed in sources. |
dimensions | list[str] | None | Additional policy tables or subqueries used in the constraint. |
dimension_aliases | dict[str, str] | None | Alias-to-dimension mapping. |
dimension_queries | dict[str, str] | None | Alias-to-subquery mapping for dimension subqueries. |
udf_name | str | None | Required for Resolution.UDF and Resolution.RELATION_UDF. |
from data_flow_control import Policy, Resolution
policy = Policy(
sources=["orders"],
constraint="orders.amount < 1000",
on_fail=Resolution.REMOVE,
)
Policy.from_pgn(policy_str) parses policy text, normalizes aliases and
dimensions, and returns a Policy.
Resolution
| Value | Description |
Resolution.REMOVE | Filters violating rows out of the result or write. |
Resolution.KILL | Aborts when a violating row is evaluated. Requires adapter exception-UDF support. |
Resolution.UDF | Routes violating tuples through a registered tuple resolution function. |
Resolution.RELATION_UDF | Routes a relation of violations through a registered relation function. |
Resolution.UI | Routes violations to a configured UI handler. Currently DuckDB-focused. |
Resolution.from_label(label) parses labels such as
"REMOVE", "KILL", "UDF fix_row", and
"RELATION UDF fix_batch".
RewriteOptions and UiUpdateMode
| Field | Type | Description |
use_partial_push | bool | Enables the partial-push rewrite strategy. |
collect_stats | bool | Collects rewrite timing and planner stats for later inspection. |
dialect | str | None | Overrides the SQL dialect used for rewrite options. |
ui_stream_endpoint | str | None | Path used by UI resolution streaming. |
ui_update_mode | UiUpdateMode | Controls whether UI resolution only approves rows or also returns edited rows. |
UiUpdateMode.APPROVAL_ONLY approves or rejects violating rows.
UiUpdateMode.EDITED_ROWS allows corrected row values to be returned.
from data_flow_control import RewriteOptions
rewritten = conn.transform_query(
"SELECT region, COUNT(*) FROM orders GROUP BY region",
options=RewriteOptions(use_partial_push=True, collect_stats=True),
)
stats = conn.last_rewrite_stats()
Dialect
Supported dialect enum values:
Dialect.DUCKDB / "duckdb"
Dialect.SQLITE / "sqlite"
Dialect.POSTGRES / "postgres"
Dialect.CLICKHOUSE / "clickhouse"
Dialect.DATAFUSION / "datafusion"
Dialect.UMBRA / "umbra"
| Class method | Description |
normalize(dialect) | Lowercases dialect names and maps postgresql to postgres. |
contains(value) | Returns whether a dialect string is supported. |
parse(dialect) | Returns a Dialect enum member or raises ValueError. |
supported_names() | Returns a sorted tuple of supported dialect names. |
UI Resolution
UI resolution sends violating rows to an application callback. Configure it before
registering policies with ON FAIL UI.
| API | Description |
configure_ui_resolution(handler, ...) | Configures a violation handler, optional stream endpoint, extension path, polling interval, update mode, and maximum wait time. |
reset_ui_stream() | Clears the current UI stream file. |
UiViolationEvent | Frozen dataclass passed to the handler. |
UiViolationEvent field | Description |
constraint | Policy constraint that failed. |
description | Optional policy description. |
column_names | Ordered names for values sent to the handler. |
values | Ordered row values. |
row | Dictionary mapping column names to values. |
source_columns | Names interpreted as source columns. |
output_columns | Names interpreted as output columns. |
stream_endpoint | Path used for corrected row streaming. |
PassantRewriteError
Raised by the Rust-backed rewrite layer when a query or policy cannot be rewritten
safely. Catch it around transform_query, execute,
fetchall, or fetchone when you need to distinguish
rewrite failures from database execution errors.