- Published on
Read Parquet File and Write to PostgreSQL using Pandas
I was thinking of experimenting with a large enough database on my local machine to practice my database management skill in optimizing it. I need to find a dataset that has a big amount of data and then I found TCL Trip dataset and it’s a great candidate. Now the problem is to read the parquet file and store it on an RDBMS like PostgreSQL. Pandas have function API to read the parquet and bulk-write it to PostgreSQL. So, Let’s try it out.
Prerequisites and Requirements
- Prepare PostgreSQL server.
- You can use
Anaconda Jupyter Notebook
if you have one because it is the simplest option or plainPython
also fine. - Required packages for the demo:
- pandas
(pip install pandas
) for reading parquet file (already installed on anaconda env) - pyarrow
(pip install pyarrow)
for parquet support in pandas - sqlalchemy
(pip install sqlalchemy)
for sql support in pandas (already installed on anaconda env) - psycopg2
(pip install psycopg2)
for postgresql support in sqlalchemy
- pandas
Downloading the TLC Trip Data parquet file(s) (optional)
This is an optional step if you wish to download the parquet files to your machine and read it offline or we can directly read it from the URL using pandas. You can download the latest TCL Trip Data for Yellow Taxi available from the webpage. I got yellow_tripdata_2022-01.parquet
at the time. To make it easy, prepare a folder for storing the parquet files and our python program file.
Creating nyc_tlc
database in PostgreSQL
Create a database named nyc_tlc
or anything you want by the query CREATE DATABASE nyc_tlc;
or by RDBMS UI.
Creating yellow_trip
table in PostgreSQL (optional)
For the table, it’s optional to create it first or let pandas make it for you later. Here is the schema:
CREATE TABLE IF NOT EXISTS public.yellow_trips (
vendorid BIGINT NULL,
tpep_pickup_datetime TIMESTAMP NULL,
tpep_dropoff_datetime TIMESTAMP NULL,
passenger_count FLOAT NULL,
trip_distance FLOAT NULL,
ratecodeid FLOAT NULL,
store_and_fwd_flag TEXT NULL,
pulocationid INT NULL,
dolocationid INT NULL,
payment_type INT NULL,
fare_amount FLOAT NULL,
extra FLOAT NULL,
mta_tax FLOAT NULL,
tip_amount FLOAT NULL,
tolls_amount FLOAT NULL,
improvement_surcharge FLOAT NULL,
total_amount FLOAT NULL,
congestion_surcharge FLOAT NULL,
airport_fee FLOAT NULL
);
Reading the parquet file using pandas
Let’s create a python file named main.py
and put it in the same folder as our downloaded parquet file. We can read the parquet file using pandas:
import pandas as pd
# load parquet file as dataframe
df = pd.read_parquet('yellow_tripdata_2022-01.parquet', engine='pyarrow')
the above code uses the local parquet file path in your machine. But if prefer to download it directly (make sure you have very fast internet for a better experience), you can assign it with the full URL to the file from the website:
df = pd.read_parquet('https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet', engine='pyarrow')
Before we insert it into the database, you could always check the data frame first:
print('rows: ', df.shape[0])
print('columns: ', df.shape[1])
# or
print('info:', df.info())
Writing data to PostgreSQL using pandas
After successfully reading the parquet into the data frame, we can write the data immediately into PostgreSQL:
from sqlalchemy import create_engine
engine = create_engine('postgresql+psycopg2://your_dbuser:your_dbuser_pwd@localhost:5432/nyc_tlc')
df.to_sql('yellow_trips', con=engine, index=False, if_exists='append')
if the data has millions of rows, it will take some minutes to complete.
The complete main.py
script
import pandas as pd
from sqlalchemy import create_engine
# create postgresql engine
engine = create_engine('postgresql+psycopg2://your_dbuser:your_dbuser_pwd@localhost:5432/nyc_tlc')
# load parquet file as dataframe
df = pd.read_parquet('yellow_tripdata_2022-01.parquet', engine='pyarrow')
# lowercased column names
df.columns = map(str.lower, df.columns.to_list())
# write data to postgresql
df.to_sql('yellow_trips', con=engine, index=False, if_exists='append')