Datawarehouse and BigQuery

Image
          Data warehouse is a repository where all data from multiple sources are stored —raw data, metadata, and summary data— in a structural approach. The data will be used for various user from data analyst, data scientist, and business analyst. Data may need to be cleaned to ensure data quality before using. Data in a data warehouse can be aggregated for specific need (i.e., marketing, sales, inventory) are called data marts. What is Google BigQuery?           Google BigQuery is a part of Google Cloud Platform (GCP). It is the autonomous data to AI platform, automating the entire data life cycle, from ingestion to AI-driven insights, so you can go from data to AI to action faster [ 1 ]. BigQuery is serverless, meaning users don't have to  manage infrastructure. BigQuery separates storage and compute engines. It can automatically scale to handle large datasets and run complex, petabyte-scale queries quickly. BigQuery s...

Building Data Ingestion Pipeline with Kestra

This week, I built data ingestion pipelines for NYC taxi dataset same as last week. Instead of doing separated tasks, I use Kestra to organize and manage my workflow. I will write a blog on basics of Kestra, but here let's me give a brief introduction. Kestra is a workflow orchestrator that use flow code or no code to automate works, i.e., building a data pipeline. Here is how my pipelines are built

Ingestion to PostgreSQL

In this flow, we are going to take the NYC data, transform it, and put into our PostgreSQL. For input, we need to determine which year, month and taxi type (green or yellow) from the dataset that we want.
The flow is illustrate as below:

The flow code can be found here

Here are description of each task.
  1. Set label
    This set up specific filename and the target database table names based on these inputs (e.g., yellow_tripdata_2019-01.csv).

  2. Data extraction
    Extract taxi data from https://github.com/DataTalksClub/nyc-tlc-data/releases. The data will be storage in an internal storage space of Kestra. This is a landing zone for our data. When we have a large dataset, unstable connection can cause our process incomplete. If we would like to get the csv file we can remove the last task form the flow.

  3. Transformation and storage
    This stage has multiple tasks starting with checking that the flow is dealing with green and yellow taxi dataset as theirs attribute. or columns, are differ [1]. Then, data transformation and storage are perform in similar logic for both.
    • Create a storage table with predefined schema
    • Create a staging table
    • Truncate the staging table
    • Copy data into the staging table
    • Add unique id and file name to the staging table and perform data transformation
    • Merge the data from the staging table to the storage table
Here, the staging table is a temporary table that store and transform data before putting them into our storage table, similar to check similar to a check point before we enter a gate at airport. Since raw data schemas are usually mismatch with the storage table, we need to transform the data before putting into the storage table. This will prevent the locking of table under production condition (someone might query the table) and prevent storing duplicate data from using MERGE.
  1. Purge the csv file from step 2 to save Kestra's storage space. We can remove this task if we want to keep the csv file

Adding Trigger and Backfill

To make the pipeline automate, we can add the trigger to the pipeline to run at a schedule. In Kestra, a triggers can be created using cron expression [2, 3]. The trigger in this flow run at 9:00 am daily.
triggers:
  - id: green_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 9 1 * *"
    inputs:
      taxi: green

  - id: yellow_schedule
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "0 10 1 * *"
    inputs:
      taxi: yellow
Adding trigger will automatically enable us to perform backfill on Kestra's UI. Backfills are replays of missed schedule intervals between a defined start and end date. We can retrieve data even before the pipeline is created.

Ingestion to Google Cloud

The yellow taxi dataset and many more to come are very large and could not store in GitHub Codespaces nor my laptop. Thus, we can move the data into cloud storage. This zoomcamp use Google Cloud Platform. We need to modify the flow code for cloud storage as follows:
  • Extraction
    Instead of storing data in Kestra storage, the csv will be stored in Google Cloud Storage bucket.

  • Storage Table
    The storage  table is in the Bigquery.

  • Add environment variables and secrets
    The credentials of service account is added as secret and storage details (project ID, location, and bucket are added as key-value pairs to repetitively used in the flow. Here is my blog on Using Secrets as Environment Variables in GitHub Codespace
The flow code for data pipeline can be found here and with trigger here.



References

  1. TLC Trip Record Data https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
  2. Triggers in Kestra – Workflow Component Reference https://kestra.io/docs/workflow-components/triggers
  3. Schedule Trigger in Kestra – Cron-Based Scheduling https://kestra.io/docs/workflow-components/triggers/schedule-trigger

Comments

Popular posts from this blog

Basics Dockers for Data Engineering

Using Secrets as Environment Variables in GitHub Codespace

Datawarehouse and BigQuery