from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank"
)24 Loading Data into PostgreSQL with SQLAlchemy
24.1 Learning Objectives
By the end of this chapter, students will be able to:
- Connect to PostgreSQL from Python using a SQLAlchemy engine and a connection URL.
- Write a Polars or pandas DataFrame to a database table using SQLAlchemy.
- Write an upsert using
INSERT ... ON CONFLICTto handle rows that already exist in the target table. - Explain what idempotency means in the context of a data pipeline and why it matters for scheduled runs.
- Verify a successful load by querying the target table after insertion.
With clean DataFrames in hand, the final stage of the ETL pipeline is loading them into PostgreSQL. This chapter covers how to do that using SQLAlchemy, the standard Python library for database connectivity.
You will learn to write DataFrames to database tables, and more importantly, how to write upserts: inserts that gracefully handle duplicate rows by updating existing records rather than failing or creating duplicates. This is critical for any pipeline that runs on a schedule, since subsequent runs will encounter rows that were already loaded on a previous run.
24.2 Prerequisites
Install SQLAlchemy and the psycopg2 adapter if you have not already:
pip install sqlalchemy psycopg2-binary pyarrow24.3 Connecting to PostgreSQL
SQLAlchemy uses a connection string (also called a connection URL) that encodes the database type, credentials, host, port, and database name in a single string:
postgresql+psycopg2://user:password@host:port/database
Create an engine from the connection string. The engine manages a pool of connections to the database; you do not open a raw connection until you actually execute a query:
Verify the connection by running a trivial query:
from sqlalchemy import text
with engine.connect() as conn:
result = conn.execute(text("SELECT current_database(), version()"))
db, version = result.fetchone()
print(f"Connected to: {db}")
print(f"PostgreSQL: {version.split(',')[0]}")Connected to: worldbank
PostgreSQL: PostgreSQL 17.9 on x86_64-pc-linux-musl
The with block guarantees the connection is returned to the pool when the block exits, even if an exception is raised.
24.4 Creating the Target Tables
The ETL pipeline loads into two tables: countries_etl and indicators_etl. These are separate from the read-only teaching tables used in earlier chapters. Using country_code as the primary key (rather than a surrogate integer) makes upsert straightforward because we always know the natural identifier for each row.
with engine.begin() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS countries_etl (
country_code VARCHAR(3) PRIMARY KEY,
iso2_code VARCHAR(2),
short_name VARCHAR(100),
region VARCHAR(100),
capital VARCHAR(100),
longitude DOUBLE PRECISION,
latitude DOUBLE PRECISION,
income_group VARCHAR(50),
lending_type VARCHAR(50)
)
"""))
conn.execute(text("""
CREATE TABLE IF NOT EXISTS indicators_etl (
country_code VARCHAR(3) REFERENCES countries_etl(country_code),
year INTEGER,
population DOUBLE PRECISION,
gdp_usd DOUBLE PRECISION,
gdp_per_capita_usd DOUBLE PRECISION,
pct_forest_area DOUBLE PRECISION,
land_area_km2 DOUBLE PRECISION,
PRIMARY KEY (country_code, year)
)
"""))
print("Tables ready")Tables ready
CREATE TABLE IF NOT EXISTS makes this block idempotent: running it a second time does nothing if the tables already exist. The REFERENCES countries_etl(country_code) on indicators_etl is a foreign key constraint that prevents loading indicator rows for a country that is not yet in countries_etl. This is why countries must always be loaded before indicators.
Confirm the schema was created correctly:
with engine.connect() as conn:
for table in ["countries_etl", "indicators_etl"]:
result = conn.execute(text("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = :tbl
ORDER BY ordinal_position
"""), {"tbl": table})
print(f"\n{table}")
print("-" * 50)
for row in result:
nullable = "NULL" if row.is_nullable == "YES" else "NOT NULL"
print(f" {row.column_name:<28} {row.data_type:<15} {nullable}")
countries_etl
--------------------------------------------------
country_code character varying NOT NULL
iso2_code character varying NULL
short_name character varying NULL
region character varying NULL
capital character varying NULL
longitude double precision NULL
latitude double precision NULL
income_group character varying NULL
lending_type character varying NULL
indicators_etl
--------------------------------------------------
country_code character varying NOT NULL
year integer NOT NULL
population double precision NULL
gdp_usd double precision NULL
gdp_per_capita_usd double precision NULL
pct_forest_area double precision NULL
land_area_km2 double precision NULL
24.5 Loading the Clean DataFrames
Load the files produced at the end of Chapter 32:
import polars as pl
countries = pl.read_parquet("data/countries_clean.parquet")
indicators = pl.read_parquet("data/indicators_clean.parquet")
print(f"countries: {countries.shape}")
print(f"indicators: {indicators.shape}")countries: (218, 9)
indicators: (4991, 7)
24.6 Simple Bulk Load with write_database
Polars has a built-in write_database() method that writes a DataFrame directly to a database table. It is the fastest path when you are loading into an empty table or are comfortable replacing all existing data:
countries.write_database(
table_name="countries_etl_staging",
connection=engine,
if_table_exists="replace"
)
print("Loaded countries_etl_staging")Loaded countries_etl_staging
The if_table_exists="replace" option drops and recreates the table, inferring column types from the DataFrame schema. This is convenient for staging tables where you do not care about preserving existing data. For production tables with constraints and foreign keys, you want "append" or an explicit upsert.
24.7 Upsert: Handling Duplicate Rows
A pipeline that runs repeatedly will encounter rows it has already loaded. A plain INSERT will fail on a duplicate primary key. Truncating the table and reloading from scratch works but is slow and briefly leaves the table empty. The right solution is an upsert: insert new rows and update existing ones in a single atomic statement.
PostgreSQL implements upsert via the INSERT ... ON CONFLICT clause:
INSERT INTO countries_etl (country_code, short_name, ...)
VALUES (%s, %s, ...)
ON CONFLICT (country_code) DO UPDATE SET
short_name = EXCLUDED.short_name,
...;EXCLUDED refers to the row that was proposed for insertion but conflicted. The DO UPDATE SET block copies the proposed values over the existing row.
24.7.1 Upsert for the Countries Table
We will execute the upsert with parameterized queries using SQLAlchemy’s text() and bulk executemany():
upsert_countries_sql = text("""
INSERT INTO countries_etl
(country_code, iso2_code, short_name, region,
capital, longitude, latitude, income_group, lending_type)
VALUES
(:country_code, :iso2_code, :short_name, :region,
:capital, :longitude, :latitude, :income_group, :lending_type)
ON CONFLICT (country_code) DO UPDATE SET
iso2_code = EXCLUDED.iso2_code,
short_name = EXCLUDED.short_name,
region = EXCLUDED.region,
capital = EXCLUDED.capital,
longitude = EXCLUDED.longitude,
latitude = EXCLUDED.latitude,
income_group = EXCLUDED.income_group,
lending_type = EXCLUDED.lending_type
""")
rows = countries.to_dicts()
with engine.begin() as conn:
conn.execute(upsert_countries_sql, rows)
print(f"Upserted {len(rows)} country rows")Upserted 218 country rows
engine.begin() opens a connection and starts a transaction. The transaction commits automatically when the with block exits cleanly and rolls back if an exception is raised. Never call conn.commit() manually inside an engine.begin() block.
.to_dicts() converts the Polars DataFrame into a list of dictionaries, one per row, which SQLAlchemy passes as named parameters (:country_code, :iso2_code, etc.) to the parameterized query.
Verify the result:
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM countries_etl"))
print(f"countries_etl rows: {result.scalar()}")countries_etl rows: 218
24.7.2 Upsert for the Indicators Table
The indicators table is larger (4,991 rows) and has a composite primary key: (country_code, year). The upsert logic is the same; only the conflict target and column list change:
upsert_indicators_sql = text("""
INSERT INTO indicators_etl
(country_code, year, population, gdp_usd,
gdp_per_capita_usd, pct_forest_area, land_area_km2)
VALUES
(:country_code, :year, :population, :gdp_usd,
:gdp_per_capita_usd, :pct_forest_area, :land_area_km2)
ON CONFLICT (country_code, year) DO UPDATE SET
population = EXCLUDED.population,
gdp_usd = EXCLUDED.gdp_usd,
gdp_per_capita_usd = EXCLUDED.gdp_per_capita_usd,
pct_forest_area = EXCLUDED.pct_forest_area,
land_area_km2 = EXCLUDED.land_area_km2
""")
rows = indicators.to_dicts()
with engine.begin() as conn:
conn.execute(upsert_indicators_sql, rows)
print(f"Upserted {len(rows)} indicator rows")Upserted 4991 indicator rows
with engine.connect() as conn:
result = conn.execute(text("SELECT COUNT(*) FROM indicators_etl"))
print(f"indicators_etl rows: {result.scalar()}")indicators_etl rows: 4991
24.7.3 Batching Large Loads
For very large DataFrames (hundreds of thousands of rows), sending all rows in a single executemany() call can consume substantial memory because SQLAlchemy builds the entire parameter list in Python before sending it to the database. Batching limits peak memory use:
def upsert_in_batches(conn, sql, rows, batch_size=1000):
for i in range(0, len(rows), batch_size):
batch = rows[i : i + batch_size]
conn.execute(sql, batch)
return len(rows)
rows = indicators.to_dicts()
with engine.begin() as conn:
total = upsert_in_batches(conn, upsert_indicators_sql, rows, batch_size=500)
print(f"Upserted {total} rows in batches of 500")Upserted 4991 rows in batches of 500
A batch size of 500 to 1000 rows is a reasonable default. Very small batches (10 to 50 rows) generate excessive round trips to the database; very large batches (10,000+ rows) increase memory pressure on both the client and the server.
24.8 Conditional Updates: Only Update Changed Rows
The upsert above updates every column on every conflict, even if nothing changed. For tables where updates trigger downstream processes (audit logs, cache invalidation, replication), you may want to skip rows that have not actually changed:
ON CONFLICT (country_code) DO UPDATE SET
short_name = EXCLUDED.short_name,
region = EXCLUDED.region,
capital = EXCLUDED.capital
WHERE
countries_etl.short_name IS DISTINCT FROM EXCLUDED.short_name OR
countries_etl.region IS DISTINCT FROM EXCLUDED.region OR
countries_etl.capital IS DISTINCT FROM EXCLUDED.capitalIS DISTINCT FROM handles null comparisons correctly. Unlike <>, it treats NULL <> NULL as false, so a row is only updated when at least one value genuinely differs.
24.9 Reading Data Back into Polars
Once data is in PostgreSQL, you can read it back into a Polars DataFrame for further analysis using pl.read_database():
query = """
SELECT c.short_name, c.region, i.year,
i.gdp_per_capita_usd, i.population
FROM countries_etl AS c
INNER JOIN indicators_etl AS i ON c.country_code = i.country_code
WHERE i.year = 2022
AND i.gdp_per_capita_usd IS NOT NULL
ORDER BY i.gdp_per_capita_usd DESC
LIMIT 10
"""
top10 = pl.read_database(query, connection=engine)
print(top10)shape: (10, 5)
┌────────────────┬────────────────────────────┬──────┬────────────────────┬────────────┐
│ short_name ┆ region ┆ year ┆ gdp_per_capita_usd ┆ population │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ i64 ┆ f64 ┆ f64 │
╞════════════════╪════════════════════════════╪══════╪════════════════════╪════════════╡
│ Monaco ┆ Europe & Central Asia ┆ 2022 ┆ 226052.0 ┆ 5.0 │
│ Liechtenstein ┆ Europe & Central Asia ┆ 2022 ┆ 186400.23 ┆ 39493.0 │
│ Luxembourg ┆ Europe & Central Asia ┆ 2022 ┆ 125006.02 ┆ 653103.0 │
│ Bermuda ┆ North America ┆ 2022 ┆ 120897.31 ┆ 64749.0 │
│ Norway ┆ Europe & Central Asia ┆ 2022 ┆ 108798.45 ┆ 5.457127e6 │
│ Ireland ┆ Europe & Central Asia ┆ 2022 ┆ 106194.76 ┆ 5.1657e6 │
│ Switzerland ┆ Europe & Central Asia ┆ 2022 ┆ 93245.8 ┆ 8.777088e6 │
│ Cayman Islands ┆ Latin America & Caribbean ┆ 2022 ┆ 92202.15 ┆ 71591.0 │
│ Qatar ┆ Middle East & North Africa ┆ 2022 ┆ 88701.46 ┆ 2.657333e6 │
│ Singapore ┆ East Asia & Pacific ┆ 2022 ┆ 88428.7 ┆ 5.637022e6 │
└────────────────┴────────────────────────────┴──────┴────────────────────┴────────────┘
This round-trip pattern (transform in Polars, load to PostgreSQL, query back with SQL, analyze in Polars) is how many data science workflows operate in practice. PostgreSQL handles the storage, indexing, and multi-user access; Polars handles the in-memory computation.
24.10 Summary
The loading stage has three main scenarios:
| Scenario | Approach |
|---|---|
| Fresh load into empty table | write_database(if_table_exists="replace") |
| Append new rows only (no duplicates expected) | write_database(if_table_exists="append") |
| Recurring pipeline, duplicates expected | INSERT ... ON CONFLICT DO UPDATE upsert |
| Large DataFrame, memory constrained | Upsert in batches with a helper function |
The INSERT ... ON CONFLICT DO UPDATE pattern is the workhorse of production ETL pipelines. It is atomic, efficient, and safe to run repeatedly without any cleanup step between runs.
In the final chapter of this section, you will combine extraction, transformation, and loading into a single end-to-end pipeline function.
24.11 Exercises
Run the countries upsert a second time without modifying any data. Verify that the row count in
countries_etldoes not change.Modify the countries upsert to use the conditional
WHERE IS DISTINCT FROMclause so that only rows with changed values are updated. Test it by changing one country’s capital in the DataFrame and re-running.Write a query using
pl.read_database()that returns the top 5 countries by population for each year from 2018 to 2022. Hint: use a window function (RANK() OVER (PARTITION BY year ORDER BY population DESC)).The
write_database(if_table_exists="replace")approach creates a staging table. Extend the pattern so that after loading the staging table, you run an upsert from the staging table into the production table usingINSERT INTO ... SELECT ... ON CONFLICT DO UPDATE. This is called a staging table upsert and is common when the source is a file drop rather than a live API.Modify the batch upsert helper to log the batch number and row count for each batch using Python’s
loggingmodule at theINFOlevel.