Track data lineage and provenance from source to consumption. Use when auditing data flows, debugging data quality issues, ensuring compliance (GDPR, SOX), or understanding data dependencies. Covers lineage tracking, impact analysis, data catalogs, and metadata management.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
npx agent-skills-cli listSkill Instructions
name: data-provenance description: | Track data lineage and provenance from source to consumption. Use when auditing data flows, debugging data quality issues, ensuring compliance (GDPR, SOX), or understanding data dependencies. Covers lineage tracking, impact analysis, data catalogs, and metadata management.
Data Provenance & Lineage
Track where data comes from, how it transforms, and where it goesβessential for trust, compliance, and debugging.
When to Use
Use this skill when:
- Auditing data for compliance (GDPR, HIPAA, SOX, CCPA)
- Debugging data quality issues ("Where did this bad data come from?")
- Understanding impact of schema changes ("What breaks if I change this field?")
- Building data catalogs or governance systems
- Tracking sensitive data (PII, PHI) through systems
- Responding to data deletion requests (GDPR "right to be forgotten")
What is Data Provenance?
Provenance: The complete history and lineage of a data element
Question: "Where does the revenue number in this dashboard come from?"
Answer (with provenance):
Dashboard.revenue (computed 2026-01-21 08:00)
β warehouse.daily_sales.total (aggregated 2026-01-21 02:00)
β etl_pipeline.transform_sales (ran 2026-01-21 01:30)
β production_db.orders.amount (order #12345, created 2026-01-20 15:23)
β stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23)
β user input (customer: cust_xyz, card ending 4242)
Key questions provenance answers:
- Where did this data come from? (source)
- When was it created/updated? (timestamp)
- How was it transformed? (logic, code version)
- Who created/modified it? (user, system, process)
- Why does it have this value? (business logic)
- What depends on it? (downstream consumers)
Levels of Provenance Tracking
Level 1: Table-Level Lineage
What: Track which tables feed into other tables
ββββββββββββββ
β orders ββββ
ββββββββββββββ β
ββββΊ ββββββββββββββββ
ββββββββββββββ β β daily_sales β
β customers ββββ ββββββββββββββββ
ββββββββββββββ
Implementation: Metadata table
CREATE TABLE table_lineage (
downstream_table VARCHAR(255),
upstream_table VARCHAR(255),
relationship_type VARCHAR(50), -- 'direct_copy', 'join', 'aggregate'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, upstream_table)
);
INSERT INTO table_lineage VALUES
('daily_sales', 'orders', 'aggregate'),
('daily_sales', 'customers', 'join');
Query: "What tables does daily_sales depend on?"
SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- Result: orders, customers
Query: "What tables depend on orders?"
SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- Result: daily_sales, weekly_report, customer_lifetime_value
Level 2: Column-Level Lineage
What: Track which columns feed into which columns
orders.amount βββ
orders.tax βββΌβββΊ daily_sales.total_revenue
orders.shippingββ
Implementation:
CREATE TABLE column_lineage (
downstream_table VARCHAR(255),
downstream_column VARCHAR(255),
upstream_table VARCHAR(255),
upstream_column VARCHAR(255),
transformation TEXT, -- SQL or description
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);
INSERT INTO column_lineage VALUES
('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');
Query: "Where does daily_sales.total_revenue come from?"
SELECT
upstream_table,
upstream_column,
transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
AND downstream_column = 'total_revenue';
Level 3: Row-Level Lineage
What: Track individual record transformations
orders.id=12345 (amount=$100) βββΊ daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50) βββ
Implementation: Lineage table
CREATE TABLE row_lineage (
id BIGSERIAL PRIMARY KEY,
downstream_table VARCHAR(255),
downstream_pk BIGINT,
upstream_table VARCHAR(255),
upstream_pk BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- After ETL run
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
'daily_sales',
ds.id,
'orders',
o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;
Query: "What orders contributed to daily_sales row 67?"
SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
AND rl.downstream_pk = 67;
Level 4: Value-Level Lineage (Finest)
What: Track transformations at the field value level
order.amount = $100
order.tax = $10
order.shipping = $5
β (SUM transformation)
daily_sales.total_revenue = $115
Implementation: Event log
CREATE TABLE value_lineage (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50),
entity_id BIGINT,
field_name VARCHAR(100),
old_value TEXT,
new_value TEXT,
transformation TEXT,
source_values JSONB, -- Array of source values
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by VARCHAR(255) -- User or process
);
-- Example: Revenue calculation
INSERT INTO value_lineage VALUES (
DEFAULT,
'daily_sales',
67,
'total_revenue',
NULL,
'115.00',
'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
'[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
NOW(),
'etl_pipeline_v1.2.3'
);
Provenance Capture Methods
Method 1: Code Instrumentation
Manual tracking in ETL code:
def etl_orders_to_daily_sales():
# Extract
orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)
# Transform
daily_sales = {}
for order in orders:
date = order['created_at'].date()
if date not in daily_sales:
daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}
daily_sales[date]['total'] += order['amount']
daily_sales[date]['count'] += 1
daily_sales[date]['order_ids'].append(order['id'])
# Load with lineage tracking
for date, metrics in daily_sales.items():
ds_id = db.insert(
"INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
date, metrics['total'], metrics['count']
)
# Track lineage
for order_id in metrics['order_ids']:
db.insert(
"INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
'daily_sales', ds_id, 'orders', order_id
)
Method 2: SQL Parsing
Automatically extract lineage from SQL queries:
import sqlparse
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
DATE(created_at) as date,
SUM(amount + tax + shipping) as total_revenue,
COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""
# Parse lineage
runner = LineageRunner(sql)
print("Source tables:", runner.source_tables)
# {'orders', 'customers'}
print("Target tables:", runner.target_tables)
# {'daily_sales'}
# Store in lineage table
for source in runner.source_tables:
db.insert(
"INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)",
'daily_sales', source
)
Method 3: Database Triggers
Capture changes automatically:
-- Audit trail for all changes
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(255),
record_id BIGINT,
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_by VARCHAR(255),
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- Trigger on orders table
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
VALUES (
'orders',
COALESCE(NEW.id, OLD.id),
TG_OP,
row_to_json(OLD),
row_to_json(NEW),
current_user
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();
Method 4: CDC (Change Data Capture)
Stream database changes:
# Using Debezium or similar CDC tool
from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer:
change_event = json.loads(message.value)
# Store in lineage system
db.insert(
"INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
change_event['source']['table'],
change_event['op'], # 'c' (create), 'u' (update), 'd' (delete)
change_event.get('before'),
change_event.get('after'),
change_event['ts_ms']
)
Impact Analysis
Downstream Impact
Question: "If I change orders.amount, what breaks?"
-- Find all downstream dependencies
WITH RECURSIVE dependencies AS (
-- Base: Direct dependencies
SELECT
downstream_table,
downstream_column,
1 as depth
FROM column_lineage
WHERE upstream_table = 'orders'
AND upstream_column = 'amount'
UNION ALL
-- Recursive: Dependencies of dependencies
SELECT
cl.downstream_table,
cl.downstream_column,
d.depth + 1
FROM column_lineage cl
JOIN dependencies d
ON cl.upstream_table = d.downstream_table
AND cl.upstream_column = d.downstream_column
WHERE d.depth < 10 -- Prevent infinite loops
)
SELECT DISTINCT
downstream_table,
downstream_column,
depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;
Result:
| downstream_table | downstream_column | depth |
|------------------------|--------------------|-------|
| daily_sales | total_revenue | 1 |
| monthly_revenue | total | 2 |
| executive_dashboard | ytd_revenue | 3 |
| investor_report | arr | 4 |
Interpretation: Changing orders.amount affects 4 layers of downstream tables!
Upstream Impact
Question: "What source data feeds into this dashboard metric?"
-- Trace backwards to original sources
WITH RECURSIVE sources AS (
-- Base: Direct sources
SELECT
upstream_table,
upstream_column,
1 as depth
FROM column_lineage
WHERE downstream_table = 'executive_dashboard'
AND downstream_column = 'ytd_revenue'
UNION ALL
-- Recursive: Sources of sources
SELECT
cl.upstream_table,
cl.upstream_column,
s.depth + 1
FROM column_lineage cl
JOIN sources s
ON cl.downstream_table = s.upstream_table
AND cl.downstream_column = s.upstream_column
WHERE s.depth < 10
)
SELECT DISTINCT
upstream_table,
upstream_column,
depth
FROM sources
WHERE upstream_table NOT IN (
SELECT DISTINCT downstream_table FROM column_lineage
) -- Only leaf nodes (true sources)
ORDER BY upstream_table, upstream_column;
Result: Original sources for dashboard metric
| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders | amount | 4 |
| orders | tax | 4 |
| orders | shipping | 4 |
| stripe_events | charge_amount | 5 |
Data Catalog
Schema Registry
Track all datasets and their metadata:
CREATE TABLE data_catalog (
id BIGSERIAL PRIMARY KEY,
dataset_name VARCHAR(255) UNIQUE NOT NULL,
dataset_type VARCHAR(50), -- 'table', 'view', 'api', 'file'
description TEXT,
owner VARCHAR(255),
steward VARCHAR(255), -- Data steward (responsible for quality)
sensitivity VARCHAR(50), -- 'public', 'internal', 'confidential', 'restricted'
contains_pii BOOLEAN DEFAULT FALSE,
retention_days INT, -- How long to keep data
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE data_catalog_columns (
dataset_id BIGINT REFERENCES data_catalog(id),
column_name VARCHAR(255),
data_type VARCHAR(50),
description TEXT,
is_nullable BOOLEAN,
is_pii BOOLEAN DEFAULT FALSE,
pii_type VARCHAR(50), -- 'email', 'ssn', 'phone', 'name', etc.
sample_values TEXT[],
PRIMARY KEY (dataset_id, column_name)
);
-- Example: Register orders table
INSERT INTO data_catalog VALUES (
DEFAULT,
'orders',
'table',
'Customer orders from e-commerce platform',
'engineering@company.com',
'data-team@company.com',
'internal',
TRUE, -- Contains PII
2555, -- 7 years retention
NOW(),
NOW()
);
INSERT INTO data_catalog_columns VALUES
(1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL),
(1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL),
(1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');
Searchable Catalog
Find datasets by keyword:
-- Full-text search
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));
-- Search for "revenue"
SELECT
dataset_name,
dataset_type,
description,
owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
@@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;
Compliance & Data Privacy
GDPR: Right to be Forgotten
Track all PII to enable deletion:
-- Find all PII for a user
SELECT
dc.dataset_name,
dcc.column_name,
dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;
-- Result: Tables/columns containing PII
| dataset_name | column_name | pii_type |
|-----------------|----------------|----------|
| orders | customer_email | email |
| users | email | email |
| users | name | name |
| support_tickets | email | email |
| analytics_events| user_id | user_id |
-- Generate deletion script
SELECT
'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
SELECT DISTINCT
dc.dataset_name,
dcc.column_name
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.pii_type = 'email'
) subq;
-- Output:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';
PII Tracking in Data Flow
Tag PII as it flows through pipeline:
def track_pii_flow(source_table, dest_table, pii_fields):
"""Track movement of PII between tables"""
for field in pii_fields:
db.insert(
"""
INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
VALUES (?, ?, ?, ?, NOW())
""",
source_table, field, dest_table, field
)
# Usage
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])
# Query: "Where has this user's email propagated?"
db.query("""
WITH RECURSIVE pii_flow AS (
SELECT dest_table, dest_column, 1 as depth
FROM pii_lineage
WHERE source_table = 'users' AND source_column = 'email'
UNION ALL
SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth < 10
)
SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")
Visualization & Tools
Lineage Graph
Generate visual lineage:
import graphviz
def visualize_lineage(table_name):
# Fetch lineage
lineage = db.query("""
SELECT upstream_table, downstream_table
FROM table_lineage
WHERE upstream_table = ? OR downstream_table = ?
""", table_name, table_name)
# Create graph
dot = graphviz.Digraph()
for row in lineage:
dot.edge(row['upstream_table'], row['downstream_table'])
dot.render('lineage_graph', format='png', view=True)
visualize_lineage('orders')
Output:
stripe_api βββΊ orders βββ¬βββΊ daily_sales βββΊ monthly_revenue
β
customers βββββββββββββββ
Commercial Tools
| Tool | Use Case | Features |
|---|---|---|
| Apache Atlas | Open-source data governance | Metadata management, lineage, search |
| Collibra | Enterprise data governance | Catalog, lineage, policies, workflows |
| Alation | Data catalog | Metadata search, collaboration, lineage |
| Amundsen (Lyft) | Open-source data discovery | Search, lineage, usage analytics |
| DataHub (LinkedIn) | Open-source metadata platform | Lineage, discovery, governance |
| dbt | Analytics engineering | SQL lineage, documentation, tests |
Implementation Checklist
Minimal (Start Here)
[ ] Table-level lineage tracking
[ ] Audit logs for critical tables
[ ] Data catalog for major datasets
[ ] Documentation of ETL processes
Standard
[ ] Column-level lineage
[ ] Automated lineage extraction from SQL
[ ] PII tagging and tracking
[ ] Impact analysis queries
[ ] Change notifications for downstream consumers
Advanced
[ ] Row-level lineage
[ ] Real-time lineage from CDC
[ ] Searchable data catalog
[ ] Automated GDPR compliance tools
[ ] Data quality metrics tied to lineage
[ ] Machine learning for anomaly detection
Output Format
When helping with data provenance:
## Provenance Strategy
### Lineage Level
- [ ] Table-level
- [ ] Column-level
- [ ] Row-level
- [ ] Value-level
### Capture Method
- [ ] Code instrumentation
- [ ] SQL parsing
- [ ] Database triggers
- [ ] CDC (Change Data Capture)
### Data Catalog Schema
[SQL DDL for catalog tables]
### Impact Analysis Queries
[SQL queries for upstream/downstream impact]
### PII Tracking
Tables with PII:
- [Table 1]: [Columns]
- [Table 2]: [Columns]
Deletion strategy:
[Step-by-step process]
### Visualization
[Lineage graph representation]
### Compliance Requirements
- [ ] GDPR
- [ ] CCPA
- [ ] HIPAA
- [ ] SOX
- [ ] Other: [specify]
### Tooling
- Lineage tracking: [Tool/Custom]
- Data catalog: [Tool/Custom]
- Visualization: [Tool/Custom]
Integration
Works with:
- scalable-data-schema - Track schema evolution over time
- data-infrastructure-at-scale - Lineage for pipelines and ETL
- multi-source-data-conflation - Track source of merged data
- systems-decompose - Plan lineage as part of feature design
More by sunnypatneedi
View allDesign a new habit using behavior science principles
Build effective journaling practices for clarity, growth, and self-awareness. Includes morning pages, gratitude journaling, reflection prompts, problem-solving templates, and habit-building strategies.
Build repeatable sales processes from prospecting through closing. Master qualification frameworks (BANT/MEDDIC), objection handling, pipeline management, discovery questions, and closing techniques for B2B and B2C sales.
Marketing multi-agent orchestration system for complex, parallelizable campaigns. Coordinates specialized AI agents for: (1) Research + synthesis + content creation pipelines, (2) Platform-specific content generation at scale (Reddit, LinkedIn, Twitter, Email), (3) Lead generation and outbound campaigns, (4) Human-in-the-loop review before any external execution, (5) Automated posting with natural timing patterns. Triggers: "build me a marketing team", "orchestrate agents", "outbound campaign", "multi-platform content", "lead generation system", "social media blitz", or any request implying coordinated marketing specialist work.
