Agent SkillsAgent Skills
eyadsibai

data-engineering

@eyadsibai/data-engineering
eyadsibai
1
1 forks
Updated 3/31/2026
View on GitHub

Use when "data pipelines", "ETL", "data warehousing", "data lakes", or asking about "Airflow", "Spark", "dbt", "Snowflake", "BigQuery", "data modeling"

Installation

$npx agent-skills-cli install @eyadsibai/data-engineering
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Repositoryeyadsibai/ltk
Pathplugins/ltk-data/skills/data-engineering/SKILL.md
Branchmaster
Scoped Name@eyadsibai/data-engineering

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

npx agent-skills-cli list

Skill Instructions


name: data-engineering description: Use when "data pipelines", "ETL", "data warehousing", "data lakes", or asking about "Airflow", "Spark", "dbt", "Snowflake", "BigQuery", "data modeling" version: 1.0.0

<!-- Adapted from: claude-skills/engineering-team/senior-data-engineer -->

Data Engineering Guide

Data pipelines, warehousing, and modern data stack.

When to Use

  • Building data pipelines
  • Designing data warehouses
  • Implementing ETL/ELT processes
  • Setting up data lakes
  • Optimizing data infrastructure

Modern Data Stack

Components

Sources β†’ Ingestion β†’ Storage β†’ Transform β†’ Serve β†’ Consume
LayerTools
IngestionFivetran, Airbyte, Stitch
StorageS3, GCS, Snowflake, BigQuery
Transformdbt, Spark, Airflow
OrchestrationAirflow, Dagster, Prefect
ServingLooker, Tableau, Metabase

Data Pipeline Patterns

Batch Processing

# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    'daily_etl',
    schedule_interval='0 6 * * *',
    start_date=datetime(2024, 1, 1)
)

def extract():
    # Extract from source
    pass

def transform():
    # Transform data
    pass

def load():
    # Load to warehouse
    pass

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag
)

extract_task >> transform_task >> load_task

Streaming Processing

# Kafka consumer example
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    process_event(message.value)

dbt Patterns

Model Structure

models/
β”œβ”€β”€ staging/           # 1:1 with source
β”‚   β”œβ”€β”€ stg_orders.sql
β”‚   └── stg_customers.sql
β”œβ”€β”€ intermediate/      # Business logic
β”‚   └── int_order_items.sql
└── marts/             # Final models
    β”œβ”€β”€ dim_customers.sql
    └── fct_orders.sql

Example Model

-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

select
    o.order_id,
    o.customer_id,
    o.order_date,
    sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
    on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3

Data Modeling

Dimensional Modeling

Fact Tables (events/transactions)
β”œβ”€β”€ fct_orders
β”œβ”€β”€ fct_page_views
└── fct_transactions

Dimension Tables (context)
β”œβ”€β”€ dim_customers
β”œβ”€β”€ dim_products
β”œβ”€β”€ dim_dates
└── dim_locations

Star Schema

        dim_customers
              β”‚
dim_dates ── fct_orders ── dim_products
              β”‚
        dim_locations

Data Quality

Validation Rules

-- dbt tests
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_total
        tests:
          - not_null
          - positive_value

Quality Metrics

MetricDescription
Completeness% non-null values
Uniqueness% distinct values
TimelinessData freshness
AccuracyMatches source
ConsistencyAcross systems

Performance Optimization

Partitioning

-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders

Query Optimization

TechniqueImpact
PartitioningReduce scanned data
ClusteringImprove filter speed
MaterializationPre-compute joins
CachingReduce repeat queries

Monitoring

Pipeline Metrics

MetricAlert Threshold
Runtime>2x normal
Row countΒ±20% variance
Freshness>SLA
FailuresAny failure

Data Observability

# Monte Carlo / Elementary example
monitors:
  - table: fct_orders
    tests:
      - freshness:
          threshold: 6 hours
      - volume:
          threshold: 10%
      - schema_change: true

Best Practices

Pipeline Design

  • Idempotent operations
  • Incremental processing
  • Clear data lineage
  • Automated testing

Data Governance

  • Document all models
  • Track data lineage
  • Implement access controls
  • Version control SQL

Cost Management

  • Monitor query costs
  • Use partitioning
  • Schedule off-peak
  • Archive old data