Documentation

Python API reference

Use this reference when wiring DFC into an application. The package is imported as data_flow_control, and the public top-level exports are dfc, Dialect, Policy, Resolution, RewriteOptions, UiUpdateMode, PassantRewriteError, and UiViolationEvent.

Imports

After pip install data-flow-control, the following imports are exposed:

from data_flow_control import (
    Dialect,
    PassantRewriteError,
    Policy,
    Resolution,
    RewriteOptions,
    UiUpdateMode,
    UiViolationEvent,
    dfc,
)

dfc(conn, dialect=None)

Wraps an existing database connection and returns a policy-aware connection object. If dialect is omitted, the package tries to infer it from the connection type.

ParameterTypeDescription
connAnyExisting DuckDB, SQLite, psycopg, ClickHouse, DataFusion, Umbra, or adapter-backed connection.
dialectstr | NoneOptional explicit dialect. Use this when inference is ambiguous, such as Umbra over psycopg.
import duckdb
from data_flow_control import dfc

conn = dfc(duckdb.connect())

Connection Object

This class wraps your standard database connection to automatically rewrite any queries based on registered policies.

MemberDescription
raw_connectionProperty exposing the underlying database connection when the adapter supports it.
register_policy(policy)Validates adapter capabilities, syncs catalog metadata, and registers one Policy.
register_policies(policies)Registers a list of policies with one catalog sync.
delete_policy(...)Deletes a matching policy by sources, sink, constraint, resolution, and description. Returns bool.
policies()Returns the currently registered Policy objects.
transform_query(sql, options=None)Returns rewritten SQL without executing it.
execute(query, params=None, options=None)Rewrites and executes a statement. DDL statements are executed directly and invalidate the catalog cache.
fetchall(query, params=None, options=None)Executes a query and returns all rows.
fetchone(query, params=None, options=None)Executes a query and returns one row.
explain(query)Returns a dictionary explanation of the rewrite plan.
last_rewrite_stats()Returns stats from the most recent rewrite when stats collection is enabled.
last_statement_rewrite_summary()Returns the last statement-level rewrite summary.
refresh_catalog(force=False, include_row_counts=False)Manually refreshes catalog metadata used for policy validation.
close()Closes the underlying adapter.
with dfc(...) as connThe wrapper supports context manager cleanup.

Policy

Dataclass describing one data flow policy. Policies can be constructed directly or parsed from policy text with Policy.from_pgn(...).

FieldTypeDescription
constraintstrSQL boolean expression that must hold for the protected flow. Columns must be qualified.
on_failResolutionAction taken when the constraint fails.
sourceslist[str]Required list of source tables. Use [] for sink-only policies.
sinkstr | NoneOptional table written by protected INSERT, UPDATE, or MERGE statements.
sink_aliasstr | NoneOptional alias for the sink table inside the constraint.
source_aliasesdict[str, str] | NoneOptional alias-to-source mapping. Inferred when parsing policy text.
descriptionstr | NoneHuman-readable policy description. Also used in UI violation events.
required_sourceslist[str] | NoneSources that must appear in a write query for the policy to pass. Every required source must also be listed in sources.
dimensionslist[str] | NoneAdditional policy tables or subqueries used in the constraint.
dimension_aliasesdict[str, str] | NoneAlias-to-dimension mapping.
dimension_queriesdict[str, str] | NoneAlias-to-subquery mapping for dimension subqueries.
udf_namestr | NoneRequired for Resolution.UDF and Resolution.RELATION_UDF.
from data_flow_control import Policy, Resolution

policy = Policy(
    sources=["orders"],
    constraint="orders.amount < 1000",
    on_fail=Resolution.REMOVE,
)

Policy.from_pgn(policy_str) parses policy text, normalizes aliases and dimensions, and returns a Policy.

Resolution

ValueDescription
Resolution.REMOVEFilters violating rows out of the result or write.
Resolution.KILLAborts when a violating row is evaluated. Requires adapter exception-UDF support.
Resolution.UDFRoutes violating tuples through a registered tuple resolution function.
Resolution.RELATION_UDFRoutes a relation of violations through a registered relation function.
Resolution.UIRoutes violations to a configured UI handler. Currently DuckDB-focused.

Resolution.from_label(label) parses labels such as "REMOVE", "KILL", "UDF fix_row", and "RELATION UDF fix_batch".

RewriteOptions and UiUpdateMode

FieldTypeDescription
use_partial_pushboolEnables the partial-push rewrite strategy.
collect_statsboolCollects rewrite timing and planner stats for later inspection.
dialectstr | NoneOverrides the SQL dialect used for rewrite options.
ui_stream_endpointstr | NonePath used by UI resolution streaming.
ui_update_modeUiUpdateModeControls whether UI resolution only approves rows or also returns edited rows.

UiUpdateMode.APPROVAL_ONLY approves or rejects violating rows. UiUpdateMode.EDITED_ROWS allows corrected row values to be returned.

from data_flow_control import RewriteOptions

rewritten = conn.transform_query(
    "SELECT region, COUNT(*) FROM orders GROUP BY region",
    options=RewriteOptions(use_partial_push=True, collect_stats=True),
)
stats = conn.last_rewrite_stats()

Dialect

Supported dialect enum values:

  • Dialect.DUCKDB / "duckdb"
  • Dialect.SQLITE / "sqlite"
  • Dialect.POSTGRES / "postgres"
  • Dialect.CLICKHOUSE / "clickhouse"
  • Dialect.DATAFUSION / "datafusion"
  • Dialect.UMBRA / "umbra"
Class methodDescription
normalize(dialect)Lowercases dialect names and maps postgresql to postgres.
contains(value)Returns whether a dialect string is supported.
parse(dialect)Returns a Dialect enum member or raises ValueError.
supported_names()Returns a sorted tuple of supported dialect names.

UI Resolution

UI resolution sends violating rows to an application callback. Configure it before registering policies with ON FAIL UI.

APIDescription
configure_ui_resolution(handler, ...)Configures a violation handler, optional stream endpoint, extension path, polling interval, update mode, and maximum wait time.
reset_ui_stream()Clears the current UI stream file.
UiViolationEventFrozen dataclass passed to the handler.
UiViolationEvent fieldDescription
constraintPolicy constraint that failed.
descriptionOptional policy description.
column_namesOrdered names for values sent to the handler.
valuesOrdered row values.
rowDictionary mapping column names to values.
source_columnsNames interpreted as source columns.
output_columnsNames interpreted as output columns.
stream_endpointPath used for corrected row streaming.

PassantRewriteError

Raised by the Rust-backed rewrite layer when a query or policy cannot be rewritten safely. Catch it around transform_query, execute, fetchall, or fetchone when you need to distinguish rewrite failures from database execution errors.