24  Loading Data into PostgreSQL with SQLAlchemy

24.1 Learning Objectives

By the end of this chapter, students will be able to:

  • Connect to PostgreSQL from Python using a SQLAlchemy engine and a connection URL.
  • Write a Polars or pandas DataFrame to a database table using SQLAlchemy.
  • Write an upsert using INSERT ... ON CONFLICT to handle rows that already exist in the target table.
  • Explain what idempotency means in the context of a data pipeline and why it matters for scheduled runs.
  • Verify a successful load by querying the target table after insertion.

With clean DataFrames in hand, the final stage of the ETL pipeline is loading them into PostgreSQL. This chapter covers how to do that using SQLAlchemy, the standard Python library for database connectivity.

You will learn to write DataFrames to database tables, and more importantly, how to write upserts: inserts that gracefully handle duplicate rows by updating existing records rather than failing or creating duplicates. This is critical for any pipeline that runs on a schedule, since subsequent runs will encounter rows that were already loaded on a previous run.

24.2 Prerequisites

Install SQLAlchemy and the psycopg2 adapter if you have not already:

pip install sqlalchemy psycopg2-binary pyarrow

24.3 Connecting to PostgreSQL

SQLAlchemy uses a connection string (also called a connection URL) that encodes the database type, credentials, host, port, and database name in a single string:

postgresql+psycopg2://user:password@host:port/database

Create an engine from the connection string. The engine manages a pool of connections to the database; you do not open a raw connection until you actually execute a query:

from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://postgres:postgres@localhost:5433/worldbank"
)

Verify the connection by running a trivial query:

from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("SELECT current_database(), version()"))
    db, version = result.fetchone()
    print(f"Connected to: {db}")
    print(f"PostgreSQL:   {version.split(',')[0]}")
Connected to: worldbank
PostgreSQL:   PostgreSQL 17.9 on x86_64-pc-linux-musl

The with block guarantees the connection is returned to the pool when the block exits, even if an exception is raised.

24.4 Creating the Target Tables

The ETL pipeline loads into two tables: countries_etl and indicators_etl. These are separate from the read-only teaching tables used in earlier chapters. Using country_code as the primary key (rather than a surrogate integer) makes upsert straightforward because we always know the natural identifier for each row.

with engine.begin() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS countries_etl (
            country_code  VARCHAR(3)        PRIMARY KEY,
            iso2_code     VARCHAR(2),
            short_name    VARCHAR(100),
            region        VARCHAR(100),
            capital       VARCHAR(100),
            longitude     DOUBLE PRECISION,
            latitude      DOUBLE PRECISION,
            income_group  VARCHAR(50),
            lending_type  VARCHAR(50)
        )
    """))
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS indicators_etl (
            country_code        VARCHAR(3)        REFERENCES countries_etl(country_code),
            year                INTEGER,
            population          DOUBLE PRECISION,
            gdp_usd             DOUBLE PRECISION,
            gdp_per_capita_usd  DOUBLE PRECISION,
            pct_forest_area     DOUBLE PRECISION,
            land_area_km2       DOUBLE PRECISION,
            PRIMARY KEY (country_code, year)
        )
    """))
print("Tables ready")
Tables ready

CREATE TABLE IF NOT EXISTS makes this block idempotent: running it a second time does nothing if the tables already exist. The REFERENCES countries_etl(country_code) on indicators_etl is a foreign key constraint that prevents loading indicator rows for a country that is not yet in countries_etl. This is why countries must always be loaded before indicators.

Confirm the schema was created correctly:

with engine.connect() as conn:
    for table in ["countries_etl", "indicators_etl"]:
        result = conn.execute(text("""
            SELECT column_name, data_type, is_nullable
            FROM information_schema.columns
            WHERE table_schema = 'public' AND table_name = :tbl
            ORDER BY ordinal_position
        """), {"tbl": table})
        print(f"\n{table}")
        print("-" * 50)
        for row in result:
            nullable = "NULL" if row.is_nullable == "YES" else "NOT NULL"
            print(f"  {row.column_name:<28} {row.data_type:<15} {nullable}")

countries_etl
--------------------------------------------------
  country_code                 character varying NOT NULL
  iso2_code                    character varying NULL
  short_name                   character varying NULL
  region                       character varying NULL
  capital                      character varying NULL
  longitude                    double precision NULL
  latitude                     double precision NULL
  income_group                 character varying NULL
  lending_type                 character varying NULL

indicators_etl
--------------------------------------------------
  country_code                 character varying NOT NULL
  year                         integer         NOT NULL
  population                   double precision NULL
  gdp_usd                      double precision NULL
  gdp_per_capita_usd           double precision NULL
  pct_forest_area              double precision NULL
  land_area_km2                double precision NULL

24.5 Loading the Clean DataFrames

Load the files produced at the end of Chapter 32:

import polars as pl

countries = pl.read_parquet("data/countries_clean.parquet")
indicators = pl.read_parquet("data/indicators_clean.parquet")

print(f"countries:  {countries.shape}")
print(f"indicators: {indicators.shape}")
countries:  (218, 9)
indicators: (4991, 7)

24.6 Simple Bulk Load with write_database

Polars has a built-in write_database() method that writes a DataFrame directly to a database table. It is the fastest path when you are loading into an empty table or are comfortable replacing all existing data:

countries.write_database(
    table_name="countries_etl_staging",
    connection=engine,
    if_table_exists="replace"
)
print("Loaded countries_etl_staging")
Loaded countries_etl_staging

The if_table_exists="replace" option drops and recreates the table, inferring column types from the DataFrame schema. This is convenient for staging tables where you do not care about preserving existing data. For production tables with constraints and foreign keys, you want "append" or an explicit upsert.

24.7 Upsert: Handling Duplicate Rows

A pipeline that runs repeatedly will encounter rows it has already loaded. A plain INSERT will fail on a duplicate primary key. Truncating the table and reloading from scratch works but is slow and briefly leaves the table empty. The right solution is an upsert: insert new rows and update existing ones in a single atomic statement.

PostgreSQL implements upsert via the INSERT ... ON CONFLICT clause:

INSERT INTO countries_etl (country_code, short_name, ...)
VALUES (%s, %s, ...)
ON CONFLICT (country_code) DO UPDATE SET
    short_name = EXCLUDED.short_name,
    ...;

EXCLUDED refers to the row that was proposed for insertion but conflicted. The DO UPDATE SET block copies the proposed values over the existing row.

24.7.1 Upsert for the Countries Table

We will execute the upsert with parameterized queries using SQLAlchemy’s text() and bulk executemany():

upsert_countries_sql = text("""
    INSERT INTO countries_etl
        (country_code, iso2_code, short_name, region,
         capital, longitude, latitude, income_group, lending_type)
    VALUES
        (:country_code, :iso2_code, :short_name, :region,
         :capital, :longitude, :latitude, :income_group, :lending_type)
    ON CONFLICT (country_code) DO UPDATE SET
        iso2_code    = EXCLUDED.iso2_code,
        short_name   = EXCLUDED.short_name,
        region       = EXCLUDED.region,
        capital      = EXCLUDED.capital,
        longitude    = EXCLUDED.longitude,
        latitude     = EXCLUDED.latitude,
        income_group = EXCLUDED.income_group,
        lending_type = EXCLUDED.lending_type
""")

rows = countries.to_dicts()

with engine.begin() as conn:
    conn.execute(upsert_countries_sql, rows)

print(f"Upserted {len(rows)} country rows")
Upserted 218 country rows

engine.begin() opens a connection and starts a transaction. The transaction commits automatically when the with block exits cleanly and rolls back if an exception is raised. Never call conn.commit() manually inside an engine.begin() block.

.to_dicts() converts the Polars DataFrame into a list of dictionaries, one per row, which SQLAlchemy passes as named parameters (:country_code, :iso2_code, etc.) to the parameterized query.

Verify the result:

with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM countries_etl"))
    print(f"countries_etl rows: {result.scalar()}")
countries_etl rows: 218

24.7.2 Upsert for the Indicators Table

The indicators table is larger (4,991 rows) and has a composite primary key: (country_code, year). The upsert logic is the same; only the conflict target and column list change:

upsert_indicators_sql = text("""
    INSERT INTO indicators_etl
        (country_code, year, population, gdp_usd,
         gdp_per_capita_usd, pct_forest_area, land_area_km2)
    VALUES
        (:country_code, :year, :population, :gdp_usd,
         :gdp_per_capita_usd, :pct_forest_area, :land_area_km2)
    ON CONFLICT (country_code, year) DO UPDATE SET
        population         = EXCLUDED.population,
        gdp_usd            = EXCLUDED.gdp_usd,
        gdp_per_capita_usd = EXCLUDED.gdp_per_capita_usd,
        pct_forest_area    = EXCLUDED.pct_forest_area,
        land_area_km2      = EXCLUDED.land_area_km2
""")

rows = indicators.to_dicts()

with engine.begin() as conn:
    conn.execute(upsert_indicators_sql, rows)

print(f"Upserted {len(rows)} indicator rows")
Upserted 4991 indicator rows
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) FROM indicators_etl"))
    print(f"indicators_etl rows: {result.scalar()}")
indicators_etl rows: 4991

24.7.3 Batching Large Loads

For very large DataFrames (hundreds of thousands of rows), sending all rows in a single executemany() call can consume substantial memory because SQLAlchemy builds the entire parameter list in Python before sending it to the database. Batching limits peak memory use:

def upsert_in_batches(conn, sql, rows, batch_size=1000):
    for i in range(0, len(rows), batch_size):
        batch = rows[i : i + batch_size]
        conn.execute(sql, batch)
    return len(rows)

rows = indicators.to_dicts()

with engine.begin() as conn:
    total = upsert_in_batches(conn, upsert_indicators_sql, rows, batch_size=500)

print(f"Upserted {total} rows in batches of 500")
Upserted 4991 rows in batches of 500

A batch size of 500 to 1000 rows is a reasonable default. Very small batches (10 to 50 rows) generate excessive round trips to the database; very large batches (10,000+ rows) increase memory pressure on both the client and the server.

24.8 Conditional Updates: Only Update Changed Rows

The upsert above updates every column on every conflict, even if nothing changed. For tables where updates trigger downstream processes (audit logs, cache invalidation, replication), you may want to skip rows that have not actually changed:

ON CONFLICT (country_code) DO UPDATE SET
    short_name = EXCLUDED.short_name,
    region     = EXCLUDED.region,
    capital    = EXCLUDED.capital
WHERE
    countries_etl.short_name IS DISTINCT FROM EXCLUDED.short_name OR
    countries_etl.region     IS DISTINCT FROM EXCLUDED.region     OR
    countries_etl.capital    IS DISTINCT FROM EXCLUDED.capital

IS DISTINCT FROM handles null comparisons correctly. Unlike <>, it treats NULL <> NULL as false, so a row is only updated when at least one value genuinely differs.

24.9 Reading Data Back into Polars

Once data is in PostgreSQL, you can read it back into a Polars DataFrame for further analysis using pl.read_database():

query = """
    SELECT c.short_name, c.region, i.year,
           i.gdp_per_capita_usd, i.population
    FROM countries_etl AS c
    INNER JOIN indicators_etl AS i ON c.country_code = i.country_code
    WHERE i.year = 2022
      AND i.gdp_per_capita_usd IS NOT NULL
    ORDER BY i.gdp_per_capita_usd DESC
    LIMIT 10
"""

top10 = pl.read_database(query, connection=engine)
print(top10)
shape: (10, 5)
┌────────────────┬────────────────────────────┬──────┬────────────────────┬────────────┐
│ short_name     ┆ region                     ┆ year ┆ gdp_per_capita_usd ┆ population │
│ ---            ┆ ---                        ┆ ---  ┆ ---                ┆ ---        │
│ str            ┆ str                        ┆ i64  ┆ f64                ┆ f64        │
╞════════════════╪════════════════════════════╪══════╪════════════════════╪════════════╡
│ Monaco         ┆ Europe & Central Asia      ┆ 2022 ┆ 226052.0           ┆ 5.0        │
│ Liechtenstein  ┆ Europe & Central Asia      ┆ 2022 ┆ 186400.23          ┆ 39493.0    │
│ Luxembourg     ┆ Europe & Central Asia      ┆ 2022 ┆ 125006.02          ┆ 653103.0   │
│ Bermuda        ┆ North America              ┆ 2022 ┆ 120897.31          ┆ 64749.0    │
│ Norway         ┆ Europe & Central Asia      ┆ 2022 ┆ 108798.45          ┆ 5.457127e6 │
│ Ireland        ┆ Europe & Central Asia      ┆ 2022 ┆ 106194.76          ┆ 5.1657e6   │
│ Switzerland    ┆ Europe & Central Asia      ┆ 2022 ┆ 93245.8            ┆ 8.777088e6 │
│ Cayman Islands ┆ Latin America & Caribbean  ┆ 2022 ┆ 92202.15           ┆ 71591.0    │
│ Qatar          ┆ Middle East & North Africa ┆ 2022 ┆ 88701.46           ┆ 2.657333e6 │
│ Singapore      ┆ East Asia & Pacific        ┆ 2022 ┆ 88428.7            ┆ 5.637022e6 │
└────────────────┴────────────────────────────┴──────┴────────────────────┴────────────┘

This round-trip pattern (transform in Polars, load to PostgreSQL, query back with SQL, analyze in Polars) is how many data science workflows operate in practice. PostgreSQL handles the storage, indexing, and multi-user access; Polars handles the in-memory computation.

24.10 Summary

The loading stage has three main scenarios:

Scenario Approach
Fresh load into empty table write_database(if_table_exists="replace")
Append new rows only (no duplicates expected) write_database(if_table_exists="append")
Recurring pipeline, duplicates expected INSERT ... ON CONFLICT DO UPDATE upsert
Large DataFrame, memory constrained Upsert in batches with a helper function

The INSERT ... ON CONFLICT DO UPDATE pattern is the workhorse of production ETL pipelines. It is atomic, efficient, and safe to run repeatedly without any cleanup step between runs.

In the final chapter of this section, you will combine extraction, transformation, and loading into a single end-to-end pipeline function.

24.11 Exercises

  1. Run the countries upsert a second time without modifying any data. Verify that the row count in countries_etl does not change.

  2. Modify the countries upsert to use the conditional WHERE IS DISTINCT FROM clause so that only rows with changed values are updated. Test it by changing one country’s capital in the DataFrame and re-running.

  3. Write a query using pl.read_database() that returns the top 5 countries by population for each year from 2018 to 2022. Hint: use a window function (RANK() OVER (PARTITION BY year ORDER BY population DESC)).

  4. The write_database(if_table_exists="replace") approach creates a staging table. Extend the pattern so that after loading the staging table, you run an upsert from the staging table into the production table using INSERT INTO ... SELECT ... ON CONFLICT DO UPDATE. This is called a staging table upsert and is common when the source is a file drop rather than a live API.

  5. Modify the batch upsert helper to log the batch number and row count for each batch using Python’s logging module at the INFO level.