Examples

Policy examples in Python

These examples show DFC policies for common agent data-flow requirements: privacy, grounded writes, domain rules, and targeted model-assisted classification. The first snippet is self-contained. Later snippets show only the additional policy and query code.

Law-abiding

Remove deductions that violate tax rules

This policy applies when a query derives Expenses from Receipts. Meal expenses with business use above 50 percent fail the constraint and are removed from the insert.

Python
import duckdb
from data_flow_control import Policy, dfc

raw = duckdb.connect()
conn = dfc(raw)

conn.execute("CREATE TABLE Receipts (id INTEGER, biz_use INTEGER, cat VARCHAR)")
conn.execute("CREATE TABLE Expenses (id INTEGER, biz_use INTEGER, cat VARCHAR)")
conn.execute("""
    INSERT INTO Receipts VALUES
    (1, 80, 'Meal'),
    (2, 30, 'Meal'),
    (3, 80, 'Travel')
""")

conn.register_policy(
    Policy.from_pgn("""
        SOURCE Receipts SINK Expenses
        CONSTRAINT Expenses.biz_use <= 50 OR Receipts.cat != 'Meal'
        ON FAIL REMOVE
    """)
)

conn.execute("""
    INSERT INTO Expenses (id, biz_use, cat)
    SELECT Receipts.id, Receipts.biz_use, Receipts.cat
    FROM Receipts
""")

rows = conn.fetchall("SELECT id, biz_use, cat FROM Expenses ORDER BY id")
assert rows == [(2, 30, "Meal"), (3, 80, "Travel")]
Private

Only release aggregates over multiple users

Public reports should not expose a statistic derived from one user. This policy keeps output groups only when more than one distinct receipt owner contributed, or when a one-row current-user dimension says the session is admin.

Python
conn.register_policy(
    Policy.from_pgn("""
        SOURCE Receipts
        DIMENSION CurrentUser
        CONSTRAINT count(distinct Receipts.uid) > 1
            OR max(CurrentUser.is_admin)
        ON FAIL REMOVE
    """)
)

rows = conn.fetchall("""
    SELECT cat, SUM(amount) AS total
    FROM Receipts
    GROUP BY cat
    ORDER BY cat
""")

assert rows == [("Meal", 65)]
Grounded

Abort fabricated writes

SOURCE REQUIRED means writes into the sink must actually query the source. Here, every inserted expense must be derived from a receipt with the same id. A fabricated expense aborts the query.

Python
conn.register_policy(
    Policy.from_pgn("""
        SOURCE REQUIRED Receipts SINK Expenses
        CONSTRAINT Receipts.id = Expenses.id
        ON FAIL KILL
    """)
)

conn.execute("""
    INSERT INTO Expenses (id, item)
    SELECT Receipts.id, Receipts.item
    FROM Receipts
""")

try:
    conn.execute("""
        INSERT INTO Expenses (id, item)
        SELECT 99, 'phantom'
        FROM Receipts
    """)
except Exception as exc:
    print(f"Blocked fabricated expense: {exc}")
Flock extension

Use models only for targeted classification

DFC does not ask a model to decide whether the whole data flow is safe. Instead, the policy can call a narrowly scoped SQL function in CONSTRAINT, such as Flock llm_filter, and keep the rest of the policy deterministic.

Note: Install the Flock DuckDB extension and configure an API key before running this example.

Python
conn.register_policy(
    Policy.from_pgn("""
        SOURCE Receipts SINK Expenses
        CONSTRAINT llm_filter(
            {'model_name': 'default'},
            {'prompt': 'Is this a meal?', 'context_columns': [{'data': Receipts.cat}]}
        ) OR Expenses.biz_use <= 50
        ON FAIL REMOVE
    """)
)

conn.execute("""
    INSERT INTO Expenses (id, biz_use, cat)
    SELECT Receipts.id, Receipts.biz_use, Receipts.cat
    FROM Receipts
""")

rows = conn.fetchall("SELECT id, biz_use, cat FROM Expenses ORDER BY id")
assert rows == [(2, 30, "client dinner"), (3, 80, "taxi")]