This week, I built data ingestion pipelines for NYC taxi dataset same as last week. Instead of doing separated tasks, I use Kestra to organize and manage my workflow. I will write a blog on basics of Kestra, but here let's me give a brief introduction. Kestra is a workflow orchestrator that use flow code or no code to automate works, i.e., building a data pipeline. Here is how my pipelines are built
Ingestion to PostgreSQL
In this flow, we are going to take the NYC data, transform it, and put into our PostgreSQL. For input, we need to determine which year, month and taxi type (green or yellow) from the dataset that we want.
The flow is illustrate as below:
The flow code can be found
here.
Here are description of each task.
- Set label
This set up specific filename and the target database table names based on these inputs (e.g., yellow_tripdata_2019-01.csv).
- Data extraction
Extract taxi data from https://github.com/DataTalksClub/nyc-tlc-data/releases. The data will be storage in an internal storage space of Kestra. This is a landing zone for our data. When we have a large dataset, unstable connection can cause our process incomplete. If we would like to get the csv file we can remove the last task form the flow.
- Transformation and storage
This stage has multiple tasks starting with checking that the flow is dealing with green and yellow taxi dataset as theirs attribute. or columns, are differ [1]. Then, data transformation and storage are perform in similar logic for both.
- Create a storage table with predefined schema
- Create a staging table
- Truncate the staging table
- Copy data into the staging table
- Add unique id and file name to the staging table and perform data transformation
- Merge the data from the staging table to the storage table
Here, the staging table is a temporary table that store and transform data before putting them into our storage table, similar to check similar to a check point before we enter a gate at airport. Since raw data schemas are usually mismatch with the storage table, we need to transform the data before putting into the storage table. This will prevent the locking of table under production condition (someone might query the table) and prevent storing duplicate data from using MERGE.
- Purge the csv file from step 2 to save Kestra's storage space. We can remove this task if we want to keep the csv file
Adding Trigger and Backfill
To make the pipeline automate, we can add the trigger to the pipeline to run at a schedule. In Kestra, a triggers can be created using cron expression [
2,
3]. The trigger in
this flow run at 9:00 am daily.
triggers:
- id: green_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 9 1 * *"
inputs:
taxi: green
- id: yellow_schedule
type: io.kestra.plugin.core.trigger.Schedule
cron: "0 10 1 * *"
inputs:
taxi: yellow
Adding trigger will automatically enable us to perform backfill on Kestra's UI. Backfills are replays of missed schedule intervals between a defined start and end date. We can retrieve data even before the pipeline is created.
Ingestion to Google Cloud
The yellow taxi dataset and many more to come are very large and could not store in GitHub Codespaces nor my laptop. Thus, we can move the data into cloud storage. This zoomcamp use Google Cloud Platform. We need to modify the flow code for cloud storage as follows:
- Extraction
Instead of storing data in Kestra storage, the csv will be stored in Google Cloud Storage bucket.
- Storage Table
The storage table is in the Bigquery.
- Add environment variables and secrets
The credentials of service account is added as secret and storage details (project ID, location, and bucket are added as key-value pairs to repetitively used in the flow. Here is my blog on Using Secrets as Environment Variables in GitHub Codespace
The flow code for data pipeline can be found
here and with trigger
here.
References
- TLC Trip Record Data https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
- Triggers in Kestra – Workflow Component Reference https://kestra.io/docs/workflow-components/triggers
- Schedule Trigger in Kestra – Cron-Based Scheduling https://kestra.io/docs/workflow-components/triggers/schedule-trigger
Comments
Post a Comment