25  A Complete ETL Pipeline

25.1 Learning Objectives

By the end of this chapter, students will be able to:

  • Assemble extract, transform, and load functions into a single, callable pipeline function.
  • Add structured logging to a pipeline to record progress, row counts, and errors.
  • Define idempotency and verify that a pipeline produces consistent results when run multiple times.
  • Design a pipeline that separates concerns cleanly across extract, transform, and load stages.
  • Test a pipeline end-to-end by running it against a live database and inspecting the results.

The previous three chapters covered each stage of the ETL process in isolation: extracting data from files and APIs, transforming it with Polars, and loading it into PostgreSQL with SQLAlchemy. In this chapter, you will tie those stages together into a single, repeatable pipeline function.

A well-structured pipeline is not just a script that runs top to bottom. It handles errors gracefully, logs its progress, and is safe to run multiple times without corrupting the data it manages. By the end of this chapter, you will have a working pipeline that extracts World Bank data, applies transformations, and upserts the results into PostgreSQL.

25.2 Pipeline Design

Before writing any code, it helps to define the pipeline’s responsibilities clearly:

  1. Extract the countries CSV and indicators Parquet file from the data/ directory
  2. Transform both DataFrames: filter out aggregate rows, select the target columns, sort by primary key
  3. Load into PostgreSQL using upsert so the pipeline is idempotent (safe to run repeatedly)
  4. Report a summary of what changed

Keeping these stages as distinct functions makes the pipeline easier to test, debug, and extend later.

25.3 Setting Up Logging

Rather than sprinkling print() calls throughout, use Python’s standard logging module. It adds timestamps and severity levels to every message, and you can redirect output to a file without changing any pipeline code:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s  %(levelname)-8s  %(message)s",
    datefmt="%H:%M:%S"
)
log = logging.getLogger(__name__)

log.info("Pipeline initialized")
14:13:39  INFO      Pipeline initialized

25.4 Extract

The extract stage reads the source files and returns raw DataFrames. It does no transformation, just ingestion:

import polars as pl

def extract(countries_path: str, indicators_path: str) -> tuple[pl.DataFrame, pl.DataFrame]:
    log.info("Extracting %s", countries_path)
    countries_raw = pl.read_csv(countries_path)

    log.info("Extracting %s", indicators_path)
    indicators_raw = pl.read_parquet(indicators_path)

    log.info(
        "Extracted %d country rows, %d indicator rows",
        len(countries_raw), len(indicators_raw)
    )
    return countries_raw, indicators_raw

Keeping extraction separate from transformation means you can swap out the data source (a different file path, an API call, an S3 bucket) without touching the transformation logic.

25.5 Transform

The transform stage filters, selects, and cleans both DataFrames. It returns only the columns and rows the database expects:

def transform(
    countries_raw: pl.DataFrame,
    indicators_raw: pl.DataFrame
) -> tuple[pl.DataFrame, pl.DataFrame]:

    log.info("Transforming countries")
    countries = (
        countries_raw
        .select([
            "country_code", "iso2_code", "short_name", "region",
            "capital", "longitude", "latitude", "income_group", "lending_type"
        ])
        .sort("country_code")
    )

    log.info("Transforming indicators")
    indicators = (
        indicators_raw
        .filter(pl.col("region") != "Aggregates")
        .select([
            "country_code", "year", "population", "gdp_usd",
            "gdp_per_capita_usd", "pct_forest_area", "land_area_km2"
        ])
        .sort(["country_code", "year"])
    )

    log.info(
        "Transformed: %d countries, %d indicator rows",
        len(countries), len(indicators)
    )
    return countries, indicators

25.6 Load

The load stage writes both DataFrames to PostgreSQL using upsert. It accepts the database engine as a parameter so the pipeline can be pointed at different environments (development, staging, production) without code changes:

from sqlalchemy import create_engine, text

UPSERT_COUNTRIES = text("""
    INSERT INTO countries_etl
        (country_code, iso2_code, short_name, region,
         capital, longitude, latitude, income_group, lending_type)
    VALUES
        (:country_code, :iso2_code, :short_name, :region,
         :capital, :longitude, :latitude, :income_group, :lending_type)
    ON CONFLICT (country_code) DO UPDATE SET
        iso2_code    = EXCLUDED.iso2_code,
        short_name   = EXCLUDED.short_name,
        region       = EXCLUDED.region,
        capital      = EXCLUDED.capital,
        longitude    = EXCLUDED.longitude,
        latitude     = EXCLUDED.latitude,
        income_group = EXCLUDED.income_group,
        lending_type = EXCLUDED.lending_type
""")

UPSERT_INDICATORS = text("""
    INSERT INTO indicators_etl
        (country_code, year, population, gdp_usd,
         gdp_per_capita_usd, pct_forest_area, land_area_km2)
    VALUES
        (:country_code, :year, :population, :gdp_usd,
         :gdp_per_capita_usd, :pct_forest_area, :land_area_km2)
    ON CONFLICT (country_code, year) DO UPDATE SET
        population         = EXCLUDED.population,
        gdp_usd            = EXCLUDED.gdp_usd,
        gdp_per_capita_usd = EXCLUDED.gdp_per_capita_usd,
        pct_forest_area    = EXCLUDED.pct_forest_area,
        land_area_km2      = EXCLUDED.land_area_km2
""")

def _upsert_batched(conn, sql, rows: list[dict], batch_size: int = 500) -> int:
    for i in range(0, len(rows), batch_size):
        conn.execute(sql, rows[i : i + batch_size])
    return len(rows)

CREATE_TABLES = text("""
    CREATE TABLE IF NOT EXISTS countries_etl (
        country_code  VARCHAR(3)        PRIMARY KEY,
        iso2_code     VARCHAR(2),
        short_name    VARCHAR(100),
        region        VARCHAR(100),
        capital       VARCHAR(100),
        longitude     DOUBLE PRECISION,
        latitude      DOUBLE PRECISION,
        income_group  VARCHAR(50),
        lending_type  VARCHAR(50)
    );
    CREATE TABLE IF NOT EXISTS indicators_etl (
        country_code        VARCHAR(3)        REFERENCES countries_etl(country_code),
        year                INTEGER,
        population          DOUBLE PRECISION,
        gdp_usd             DOUBLE PRECISION,
        gdp_per_capita_usd  DOUBLE PRECISION,
        pct_forest_area     DOUBLE PRECISION,
        land_area_km2       DOUBLE PRECISION,
        PRIMARY KEY (country_code, year)
    )
""")

def load(countries: pl.DataFrame, indicators: pl.DataFrame, engine) -> dict:
    log.info("Ensuring target tables exist")
    with engine.begin() as conn:
        conn.execute(CREATE_TABLES)

    log.info("Loading countries")
    with engine.begin() as conn:
        n_countries = _upsert_batched(conn, UPSERT_COUNTRIES, countries.to_dicts())

    log.info("Loading indicators")
    with engine.begin() as conn:
        n_indicators = _upsert_batched(conn, UPSERT_INDICATORS, indicators.to_dicts())

    log.info("Load complete: %d countries, %d indicators", n_countries, n_indicators)
    return {"countries": n_countries, "indicators": n_indicators}

Each table loads in its own transaction. Countries must commit before indicators, because the indicators table has a foreign key referencing countries.country_code. Loading them in the same transaction would not cause a constraint violation during the load itself, but committing countries first makes the dependency explicit and keeps the transactions small.

25.7 Running the Pipeline

With the three stages defined, the pipeline is a straightforward composition:

def run_pipeline(
    countries_path: str,
    indicators_path: str,
    db_url: str
) -> dict:
    engine = create_engine(db_url)

    countries_raw, indicators_raw = extract(countries_path, indicators_path)
    countries, indicators = transform(countries_raw, indicators_raw)
    stats = load(countries, indicators, engine)

    return stats

stats = run_pipeline(
    countries_path="data/countries.csv",
    indicators_path="data/indicators.parquet",
    db_url="postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank"
)

print(stats)
14:13:39  INFO      Extracting data/countries.csv
14:13:39  INFO      Extracting data/indicators.parquet
14:13:39  INFO      Extracted 218 country rows, 4991 indicator rows
14:13:39  INFO      Transforming countries
14:13:39  INFO      Transforming indicators
14:13:39  INFO      Transformed: 218 countries, 4991 indicator rows
14:13:39  INFO      Ensuring target tables exist
14:13:39  INFO      Loading countries
14:13:39  INFO      Loading indicators
14:13:43  INFO      Load complete: 218 countries, 4991 indicators
{'countries': 218, 'indicators': 4991}

Run it a second time to confirm idempotency:

stats2 = run_pipeline(
    countries_path="data/countries.csv",
    indicators_path="data/indicators.parquet",
    db_url="postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank"
)

print(stats2)
14:13:43  INFO      Extracting data/countries.csv
14:13:43  INFO      Extracting data/indicators.parquet
14:13:43  INFO      Extracted 218 country rows, 4991 indicator rows
14:13:43  INFO      Transforming countries
14:13:43  INFO      Transforming indicators
14:13:43  INFO      Transformed: 218 countries, 4991 indicator rows
14:13:43  INFO      Ensuring target tables exist
14:13:43  INFO      Loading countries
14:13:43  INFO      Loading indicators
14:13:46  INFO      Load complete: 218 countries, 4991 indicators
{'countries': 218, 'indicators': 4991}

The row counts should be identical. The upsert logic ensures that running the pipeline twice produces the same database state as running it once.

25.8 Verifying the Result

After the pipeline runs, query PostgreSQL to confirm the data landed correctly:

engine = create_engine(
    "postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank"
)

summary_query = """
    SELECT
        COUNT(DISTINCT c.country_code)                          AS countries,
        COUNT(i.country_code)                                   AS indicator_rows,
        MIN(i.year)                                             AS earliest_year,
        MAX(i.year)                                             AS latest_year,
        ROUND(AVG(i.gdp_per_capita_usd)::numeric, 0)           AS avg_gdp_per_capita
    FROM countries_etl AS c
    INNER JOIN indicators_etl AS i ON c.country_code = i.country_code
"""

result = pl.read_database(summary_query, connection=engine)
print(result)
shape: (1, 5)
┌───────────┬────────────────┬───────────────┬─────────────┬────────────────────┐
│ countries ┆ indicator_rows ┆ earliest_year ┆ latest_year ┆ avg_gdp_per_capita │
│ ---       ┆ ---            ┆ ---           ┆ ---         ┆ ---                │
│ i64       ┆ i64            ┆ i64           ┆ i64         ┆ decimal[38,0]      │
╞═══════════╪════════════════╪═══════════════╪═════════════╪════════════════════╡
│ 217       ┆ 4991           ┆ 2000          ┆ 2022        ┆ 15743              │
└───────────┴────────────────┴───────────────┴─────────────┴────────────────────┘
sample_query = """
    SELECT c.short_name, c.region, i.year,
           i.gdp_per_capita_usd, i.population
    FROM countries_etl AS c
    INNER JOIN indicators_etl AS i ON c.country_code = i.country_code
    WHERE i.year = 2022
      AND i.gdp_per_capita_usd IS NOT NULL
    ORDER BY i.gdp_per_capita_usd DESC
    LIMIT 10
"""

print(pl.read_database(sample_query, connection=engine))
shape: (10, 5)
┌────────────────┬────────────────────────────┬──────┬────────────────────┬────────────┐
│ short_name     ┆ region                     ┆ year ┆ gdp_per_capita_usd ┆ population │
│ ---            ┆ ---                        ┆ ---  ┆ ---                ┆ ---        │
│ str            ┆ str                        ┆ i64  ┆ f64                ┆ f64        │
╞════════════════╪════════════════════════════╪══════╪════════════════════╪════════════╡
│ Monaco         ┆ Europe & Central Asia      ┆ 2022 ┆ 226052.0           ┆ 5.0        │
│ Liechtenstein  ┆ Europe & Central Asia      ┆ 2022 ┆ 186400.23          ┆ 39493.0    │
│ Luxembourg     ┆ Europe & Central Asia      ┆ 2022 ┆ 125006.02          ┆ 653103.0   │
│ Bermuda        ┆ North America              ┆ 2022 ┆ 120897.31          ┆ 64749.0    │
│ Norway         ┆ Europe & Central Asia      ┆ 2022 ┆ 108798.45          ┆ 5.457127e6 │
│ Ireland        ┆ Europe & Central Asia      ┆ 2022 ┆ 106194.76          ┆ 5.1657e6   │
│ Switzerland    ┆ Europe & Central Asia      ┆ 2022 ┆ 93245.8            ┆ 8.777088e6 │
│ Cayman Islands ┆ Latin America & Caribbean  ┆ 2022 ┆ 92202.15           ┆ 71591.0    │
│ Qatar          ┆ Middle East & North Africa ┆ 2022 ┆ 88701.46           ┆ 2.657333e6 │
│ Singapore      ┆ East Asia & Pacific        ┆ 2022 ┆ 88428.7            ┆ 5.637022e6 │
└────────────────┴────────────────────────────┴──────┴────────────────────┴────────────┘

25.9 Error Handling

The pipeline as written will raise an exception and stop if anything goes wrong. For a pipeline that runs unattended on a schedule, you want to catch errors, log them, and decide whether to retry or abort.

A minimal error-handling wrapper:

def run_pipeline_safe(countries_path, indicators_path, db_url) -> dict | None:
    try:
        return run_pipeline(countries_path, indicators_path, db_url)
    except FileNotFoundError as exc:
        log.error("Source file not found: %s", exc)
    except Exception as exc:
        log.error("Pipeline failed: %s", exc, exc_info=True)
    return None

result = run_pipeline_safe(
    countries_path="data/countries.csv",
    indicators_path="data/indicators.parquet",
    db_url="postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank"
)

if result:
    log.info("Pipeline succeeded: %s", result)
else:
    log.warning("Pipeline did not complete")
14:13:46  INFO      Extracting data/countries.csv
14:13:46  INFO      Extracting data/indicators.parquet
14:13:46  INFO      Extracted 218 country rows, 4991 indicator rows
14:13:46  INFO      Transforming countries
14:13:46  INFO      Transforming indicators
14:13:46  INFO      Transformed: 218 countries, 4991 indicator rows
14:13:46  INFO      Ensuring target tables exist
14:13:46  INFO      Loading countries
14:13:46  INFO      Loading indicators
14:13:53  INFO      Load complete: 218 countries, 4991 indicators
14:13:53  INFO      Pipeline succeeded: {'countries': 218, 'indicators': 4991}

exc_info=True tells the logger to include the full stack trace in the error message, which is essential for diagnosing failures in production.

25.10 Pipeline Patterns in Practice

The pipeline you built here is a batch pipeline: it reads all the source data at once, transforms it in memory, and writes it all to the database in one run. This pattern works well when:

  • The source data fits comfortably in memory
  • The pipeline runs on a schedule (nightly, weekly) rather than continuously
  • Latency of minutes to hours is acceptable

As data volumes grow or latency requirements shrink, teams typically evolve toward one of these approaches:

Incremental loading processes only rows that have changed since the last run. Instead of upserting all 4,991 rows every night, an incremental pipeline might query the API for records modified in the last 24 hours and upsert only those. This requires a reliable “last modified” timestamp in the source data.

Streaming processes records as they arrive, rather than in nightly batches. Tools like Apache Kafka and Apache Flink are designed for this. The SQL and PostgreSQL skills you have built in this course translate directly: you still write data to the same tables, and you still query with the same SQL.

Orchestration tools such as Apache Airflow, Prefect, and Dagster wrap pipelines like the one you built here, adding dependency management, scheduling, retries, alerting, and a dashboard for monitoring pipeline health. A Polars + SQLAlchemy pipeline is the right building block; an orchestrator is what runs it reliably in production.

25.11 Summary

The four-stage pattern (extract, transform, load, verify) is the foundation of data engineering. The specific tools change from organization to organization, but the structure remains consistent:

Stage Tool used Key principle
Extract Polars read_csv, read_parquet, requests Read only, no transformation
Transform Polars expressions and DataFrame methods Pure functions: same input, same output
Load SQLAlchemy + INSERT ... ON CONFLICT Upsert for idempotency
Verify pl.read_database() + SQL queries Trust but verify

25.12 Exercises

  1. Add a validate stage that runs after extract and before transform. It should assert that the countries DataFrame has at least 200 rows and that the indicators DataFrame has at least 4000 rows. If either assertion fails, raise a ValueError with a descriptive message.

  2. The run_pipeline function currently creates a new engine on every call. Refactor it so the engine is created once and passed in as a parameter, making it easier to reuse an existing connection pool.

  3. Extend run_pipeline to accept an optional year_filter parameter. When provided, the transform stage should keep only indicator rows for that year. Use this to build a pipeline that loads a single year’s snapshot into a separate indicators_snapshot table.

  4. Add a load_timestamp column to the upsert for both tables. Its value should be NOW() (or the current Python datetime), so you can see when each row was last written. This is useful for auditing and incremental refresh strategies.

  5. Wrap the pipeline in a command-line interface using Python’s argparse module so it can be invoked as python pipeline.py --countries data/countries.csv --indicators data/indicators.parquet --db postgresql+psycopg2://.... Log the parsed arguments at the start of the run.