By the end of this chapter, students will be able to:
Assemble extract, transform, and load functions into a single, callable pipeline function.
Add structured logging to a pipeline to record progress, row counts, and errors.
Define idempotency and verify that a pipeline produces consistent results when run multiple times.
Design a pipeline that separates concerns cleanly across extract, transform, and load stages.
Test a pipeline end-to-end by running it against a live database and inspecting the results.
The previous three chapters covered each stage of the ETL process in isolation: extracting data from files and APIs, transforming it with Polars, and loading it into PostgreSQL with SQLAlchemy. In this chapter, you will tie those stages together into a single, repeatable pipeline function.
A well-structured pipeline is not just a script that runs top to bottom. It handles errors gracefully, logs its progress, and is safe to run multiple times without corrupting the data it manages. By the end of this chapter, you will have a working pipeline that extracts World Bank data, applies transformations, and upserts the results into PostgreSQL.
25.2 Pipeline Design
Before writing any code, it helps to define the pipeline’s responsibilities clearly:
Extract the countries CSV and indicators Parquet file from the data/ directory
Transform both DataFrames: filter out aggregate rows, select the target columns, sort by primary key
Load into PostgreSQL using upsert so the pipeline is idempotent (safe to run repeatedly)
Report a summary of what changed
Keeping these stages as distinct functions makes the pipeline easier to test, debug, and extend later.
25.3 Setting Up Logging
Rather than sprinkling print() calls throughout, use Python’s standard logging module. It adds timestamps and severity levels to every message, and you can redirect output to a file without changing any pipeline code:
Keeping extraction separate from transformation means you can swap out the data source (a different file path, an API call, an S3 bucket) without touching the transformation logic.
25.5 Transform
The transform stage filters, selects, and cleans both DataFrames. It returns only the columns and rows the database expects:
The load stage writes both DataFrames to PostgreSQL using upsert. It accepts the database engine as a parameter so the pipeline can be pointed at different environments (development, staging, production) without code changes:
from sqlalchemy import create_engine, textUPSERT_COUNTRIES = text(""" INSERT INTO countries_etl (country_code, iso2_code, short_name, region, capital, longitude, latitude, income_group, lending_type) VALUES (:country_code, :iso2_code, :short_name, :region, :capital, :longitude, :latitude, :income_group, :lending_type) ON CONFLICT (country_code) DO UPDATE SET iso2_code = EXCLUDED.iso2_code, short_name = EXCLUDED.short_name, region = EXCLUDED.region, capital = EXCLUDED.capital, longitude = EXCLUDED.longitude, latitude = EXCLUDED.latitude, income_group = EXCLUDED.income_group, lending_type = EXCLUDED.lending_type""")UPSERT_INDICATORS = text(""" INSERT INTO indicators_etl (country_code, year, population, gdp_usd, gdp_per_capita_usd, pct_forest_area, land_area_km2) VALUES (:country_code, :year, :population, :gdp_usd, :gdp_per_capita_usd, :pct_forest_area, :land_area_km2) ON CONFLICT (country_code, year) DO UPDATE SET population = EXCLUDED.population, gdp_usd = EXCLUDED.gdp_usd, gdp_per_capita_usd = EXCLUDED.gdp_per_capita_usd, pct_forest_area = EXCLUDED.pct_forest_area, land_area_km2 = EXCLUDED.land_area_km2""")def _upsert_batched(conn, sql, rows: list[dict], batch_size: int=500) ->int:for i inrange(0, len(rows), batch_size): conn.execute(sql, rows[i : i + batch_size])returnlen(rows)CREATE_TABLES = text(""" CREATE TABLE IF NOT EXISTS countries_etl ( country_code VARCHAR(3) PRIMARY KEY, iso2_code VARCHAR(2), short_name VARCHAR(100), region VARCHAR(100), capital VARCHAR(100), longitude DOUBLE PRECISION, latitude DOUBLE PRECISION, income_group VARCHAR(50), lending_type VARCHAR(50) ); CREATE TABLE IF NOT EXISTS indicators_etl ( country_code VARCHAR(3) REFERENCES countries_etl(country_code), year INTEGER, population DOUBLE PRECISION, gdp_usd DOUBLE PRECISION, gdp_per_capita_usd DOUBLE PRECISION, pct_forest_area DOUBLE PRECISION, land_area_km2 DOUBLE PRECISION, PRIMARY KEY (country_code, year) )""")def load(countries: pl.DataFrame, indicators: pl.DataFrame, engine) ->dict: log.info("Ensuring target tables exist")with engine.begin() as conn: conn.execute(CREATE_TABLES) log.info("Loading countries")with engine.begin() as conn: n_countries = _upsert_batched(conn, UPSERT_COUNTRIES, countries.to_dicts()) log.info("Loading indicators")with engine.begin() as conn: n_indicators = _upsert_batched(conn, UPSERT_INDICATORS, indicators.to_dicts()) log.info("Load complete: %d countries, %d indicators", n_countries, n_indicators)return {"countries": n_countries, "indicators": n_indicators}
Each table loads in its own transaction. Countries must commit before indicators, because the indicators table has a foreign key referencing countries.country_code. Loading them in the same transaction would not cause a constraint violation during the load itself, but committing countries first makes the dependency explicit and keeps the transactions small.
25.7 Running the Pipeline
With the three stages defined, the pipeline is a straightforward composition:
14:13:39 INFO Extracting data/countries.csv
14:13:39 INFO Extracting data/indicators.parquet
14:13:39 INFO Extracted 218 country rows, 4991 indicator rows
14:13:39 INFO Transforming countries
14:13:39 INFO Transforming indicators
14:13:39 INFO Transformed: 218 countries, 4991 indicator rows
14:13:39 INFO Ensuring target tables exist
14:13:39 INFO Loading countries
14:13:39 INFO Loading indicators
14:13:43 INFO Load complete: 218 countries, 4991 indicators
14:13:43 INFO Extracting data/countries.csv
14:13:43 INFO Extracting data/indicators.parquet
14:13:43 INFO Extracted 218 country rows, 4991 indicator rows
14:13:43 INFO Transforming countries
14:13:43 INFO Transforming indicators
14:13:43 INFO Transformed: 218 countries, 4991 indicator rows
14:13:43 INFO Ensuring target tables exist
14:13:43 INFO Loading countries
14:13:43 INFO Loading indicators
14:13:46 INFO Load complete: 218 countries, 4991 indicators
{'countries': 218, 'indicators': 4991}
The row counts should be identical. The upsert logic ensures that running the pipeline twice produces the same database state as running it once.
25.8 Verifying the Result
After the pipeline runs, query PostgreSQL to confirm the data landed correctly:
engine = create_engine("postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank")summary_query =""" SELECT COUNT(DISTINCT c.country_code) AS countries, COUNT(i.country_code) AS indicator_rows, MIN(i.year) AS earliest_year, MAX(i.year) AS latest_year, ROUND(AVG(i.gdp_per_capita_usd)::numeric, 0) AS avg_gdp_per_capita FROM countries_etl AS c INNER JOIN indicators_etl AS i ON c.country_code = i.country_code"""result = pl.read_database(summary_query, connection=engine)print(result)
sample_query =""" SELECT c.short_name, c.region, i.year, i.gdp_per_capita_usd, i.population FROM countries_etl AS c INNER JOIN indicators_etl AS i ON c.country_code = i.country_code WHERE i.year = 2022 AND i.gdp_per_capita_usd IS NOT NULL ORDER BY i.gdp_per_capita_usd DESC LIMIT 10"""print(pl.read_database(sample_query, connection=engine))
shape: (10, 5)
┌────────────────┬────────────────────────────┬──────┬────────────────────┬────────────┐
│ short_name ┆ region ┆ year ┆ gdp_per_capita_usd ┆ population │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i64 ┆ f64 ┆ f64 │
╞════════════════╪════════════════════════════╪══════╪════════════════════╪════════════╡
│ Monaco ┆ Europe & Central Asia ┆ 2022 ┆ 226052.0 ┆ 5.0 │
│ Liechtenstein ┆ Europe & Central Asia ┆ 2022 ┆ 186400.23 ┆ 39493.0 │
│ Luxembourg ┆ Europe & Central Asia ┆ 2022 ┆ 125006.02 ┆ 653103.0 │
│ Bermuda ┆ North America ┆ 2022 ┆ 120897.31 ┆ 64749.0 │
│ Norway ┆ Europe & Central Asia ┆ 2022 ┆ 108798.45 ┆ 5.457127e6 │
│ Ireland ┆ Europe & Central Asia ┆ 2022 ┆ 106194.76 ┆ 5.1657e6 │
│ Switzerland ┆ Europe & Central Asia ┆ 2022 ┆ 93245.8 ┆ 8.777088e6 │
│ Cayman Islands ┆ Latin America & Caribbean ┆ 2022 ┆ 92202.15 ┆ 71591.0 │
│ Qatar ┆ Middle East & North Africa ┆ 2022 ┆ 88701.46 ┆ 2.657333e6 │
│ Singapore ┆ East Asia & Pacific ┆ 2022 ┆ 88428.7 ┆ 5.637022e6 │
└────────────────┴────────────────────────────┴──────┴────────────────────┴────────────┘
25.9 Error Handling
The pipeline as written will raise an exception and stop if anything goes wrong. For a pipeline that runs unattended on a schedule, you want to catch errors, log them, and decide whether to retry or abort.
14:13:46 INFO Extracting data/countries.csv
14:13:46 INFO Extracting data/indicators.parquet
14:13:46 INFO Extracted 218 country rows, 4991 indicator rows
14:13:46 INFO Transforming countries
14:13:46 INFO Transforming indicators
14:13:46 INFO Transformed: 218 countries, 4991 indicator rows
14:13:46 INFO Ensuring target tables exist
14:13:46 INFO Loading countries
14:13:46 INFO Loading indicators
14:13:53 INFO Load complete: 218 countries, 4991 indicators
14:13:53 INFO Pipeline succeeded: {'countries': 218, 'indicators': 4991}
exc_info=True tells the logger to include the full stack trace in the error message, which is essential for diagnosing failures in production.
25.10 Pipeline Patterns in Practice
The pipeline you built here is a batch pipeline: it reads all the source data at once, transforms it in memory, and writes it all to the database in one run. This pattern works well when:
The source data fits comfortably in memory
The pipeline runs on a schedule (nightly, weekly) rather than continuously
Latency of minutes to hours is acceptable
As data volumes grow or latency requirements shrink, teams typically evolve toward one of these approaches:
Incremental loading processes only rows that have changed since the last run. Instead of upserting all 4,991 rows every night, an incremental pipeline might query the API for records modified in the last 24 hours and upsert only those. This requires a reliable “last modified” timestamp in the source data.
Streaming processes records as they arrive, rather than in nightly batches. Tools like Apache Kafka and Apache Flink are designed for this. The SQL and PostgreSQL skills you have built in this course translate directly: you still write data to the same tables, and you still query with the same SQL.
Orchestration tools such as Apache Airflow, Prefect, and Dagster wrap pipelines like the one you built here, adding dependency management, scheduling, retries, alerting, and a dashboard for monitoring pipeline health. A Polars + SQLAlchemy pipeline is the right building block; an orchestrator is what runs it reliably in production.
25.11 Summary
The four-stage pattern (extract, transform, load, verify) is the foundation of data engineering. The specific tools change from organization to organization, but the structure remains consistent:
Stage
Tool used
Key principle
Extract
Polars read_csv, read_parquet, requests
Read only, no transformation
Transform
Polars expressions and DataFrame methods
Pure functions: same input, same output
Load
SQLAlchemy + INSERT ... ON CONFLICT
Upsert for idempotency
Verify
pl.read_database() + SQL queries
Trust but verify
25.12 Exercises
Add a validate stage that runs after extract and before transform. It should assert that the countries DataFrame has at least 200 rows and that the indicators DataFrame has at least 4000 rows. If either assertion fails, raise a ValueError with a descriptive message.
The run_pipeline function currently creates a new engine on every call. Refactor it so the engine is created once and passed in as a parameter, making it easier to reuse an existing connection pool.
Extend run_pipeline to accept an optional year_filter parameter. When provided, the transform stage should keep only indicator rows for that year. Use this to build a pipeline that loads a single year’s snapshot into a separate indicators_snapshot table.
Add a load_timestamp column to the upsert for both tables. Its value should be NOW() (or the current Python datetime), so you can see when each row was last written. This is useful for auditing and incremental refresh strategies.
Wrap the pipeline in a command-line interface using Python’s argparse module so it can be invoked as python pipeline.py --countries data/countries.csv --indicators data/indicators.parquet --db postgresql+psycopg2://.... Log the parsed arguments at the start of the run.