Giorgos Myrianthous, Author at Towards Data Science https://towardsdatascience.com/author/gmyrianthous/ The world’s leading publication for data science, AI, and ML professionals. Mon, 03 Feb 2025 16:58:06 +0000 en-US hourly 1 https://wordpress.org/?v=6.7.1 https://towardsdatascience.com/wp-content/uploads/2025/02/cropped-Favicon-32x32.png Giorgos Myrianthous, Author at Towards Data Science https://towardsdatascience.com/author/gmyrianthous/ 32 32 Airflow Data Intervals: A Deep Dive https://towardsdatascience.com/airflow-data-intervals-a-deep-dive-15d0ccfb0661/ Tue, 07 Jan 2025 18:39:25 +0000 https://towardsdatascience.com/airflow-data-intervals-a-deep-dive-15d0ccfb0661/ Building idempotent and re-playable data pipelines

The post Airflow Data Intervals: A Deep Dive appeared first on Towards Data Science.

]]>
Photo by Gareth David on Unsplash
Photo by Gareth David on Unsplash

Apache Airflow is a powerful orchestration tool for scheduling and monitoring workflows, but its behaviour can sometimes feel counterintuitive, especially when it comes to data intervals.

Understanding these intervals is crucial for building reliable data pipelines, ensuring idempotency, and enabling replayability. By leveraging data intervals effectively, you can guarantee that your workflows produce consistent and accurate results, even under retries or backfills.

In this article, we’ll explore Airflow’s data intervals in detail, discuss the reasoning behind their design, why they were introduced, and how they can simplify and enhance day-to-day Data Engineering work.


What Are Data Intervals in Airflow?

Data intervals sit at at the heart of how Apache Airflow schedules and executes workflows. Simply put, a data interval represents the specific time range that a DAG run is responsible for processing.

For instance, in a daily-scheduled DAG, each data interval starts at midnight (00:00) and ends at midnight the following day (24:00). The DAG executes only after the data interval has ended, ensuring that the data for that interval is complete and ready for processing.

The intuition behind data intervals

The introduction of data intervals was driven by the need to standardize and simplify how workflows operate on time-based data. In many data engineering scenarios, tasks need to process data for a specific period, such as hourly logs or daily transaction records. Without a clear concept of data intervals, workflows might operate on incomplete data or overlap with other runs, leading to inconsistencies and potential errors.

Data intervals provide a structured way to:

  • Ensure that DAGs process only the data they are meant to handle
  • Avoid issues caused by data that arrives late or is still being written
  • Enable accurate backfilling and replaying of workflows for historical periods
  • Support idempotency by ensuring that each DAG run processes data for a clearly defined and immutable time range. This prevents duplication or missed data when workflows are retried or replayed

By tying each DAG run to a specific data interval, Airflow ensures that tasks can be safely retried without affecting other runs, and workflows can be replayed with confidence, knowing that they will produce consistent results every time.

How do data intervals work?

When a DAG is scheduled, Airflow assigns each run a logical date, which corresponds to the start of its data interval. This logical date is not the actual execution time but rather a marker for the data interval. Each data interval is defined by:

  • Data Interval Start: The beginning of the time range the DAG run is responsible for processing.
  • Data Interval End: The end of the time range for the same DAG run.
  • Logical Date: A timestamp representing the start of the data interval, used as a reference point for the DAG run.

For example:

If a DAG is scheduled to run daily with a start_date of January 1, 2025, the first data interval will have:

  • Data Interval Start: January 1, 2025, 00:00
  • Data Interval End: January 2, 2025, 00:00
  • Logical Date: January 1, 2025
  • The DAG run for this interval will execute after the interval ends, at midnight on January 2, 2025.

This approach ensures that the DAG operates on a complete set of data for the interval it represents.


The role of start_date

The start_date in a DAG defines the logical start of the first data interval. It is a critical parameter that determines when a DAG begins tracking its data intervals and influences backfilling capabilities.

When you set a start_date, it serves as a reference point for Airflow to calculate data intervals. Once again, it’s worth highlighting that a DAG does not execute immediately upon deployment; it waits for the first data interval to end before initiating a run. This ensures that the data for the interval is complete and ready for processing.

Why start_date Matters

The start_date also plays a significant role in how Airflow determines the boundaries of data intervals. By defining a fixed starting point, Airflow ensures that all intervals are consistently calculated, enabling workflows to process data in a structured and reliable manner.

By anchoring DAG runs to a consistent start_date, you ensure reproducible workflows that operate predictably across different environments and timeframes.

  1. Logical Start of Intervals: The start_date establishes the beginning of the first data interval, which serves as the foundation for all subsequent intervals. For example, if your start_date is January 1, 2025, and the schedule is @daily, the first interval will span January 1, 2025, 00:00 to January 2, 2025, 00:00.
  2. Alignment with Schedules: Ensuring that the start_date aligns with the DAG’s schedule is crucial for predictable execution. A mismatch between the two can lead to unintended gaps or overlaps in processing.
  3. Backfilling Support: A thoughtfully chosen start_date enables you to backfill historical data by triggering DAG runs for past intervals. This capability is vital for workflows that need to process older datasets.
  4. Reproducibility: By anchoring DAG runs to a consistent start_date, you ensure reproducible workflows that operate predictably across different environments and timeframes.

In the example DAG presented below,

  • The DAG is scheduled to run daily starting January 1, 2025
  • The catchup=True parameter ensures that if the DAG is deployed after the start_date, it will backfill all missed intervals
Python">from datetime import datetime

from airflow import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
    dag_id='example_dag',
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=True,  # Enables backfilling
) as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')

    start >> end

If the DAG is deployed at 10:00 AM on January 1, 2025, the first run will not execute immediately. Instead, it will run at midnight on January 2, 2025, processing data for January 1, 2025.

If the DAG is deployed on January 3, 2025, it will backfill runs for January 1 and January 2, 2025, before executing the current interval.

Common Pitfalls with start_date

While the start_date parameter is straightforward, its improper configuration can lead to unexpected behavior or workflow failures. Understanding these pitfalls is crucial to avoid common mistakes and ensure smooth execution.

Choosing an Arbitrary start_datePicking a random start_date without considering the data availability or schedule can lead to failed runs or incomplete processing.

Using Dynamic ValuesAvoid setting the start_date to a dynamic value like datetime.now(). Since the start_date is meant to be a fixed reference point, using a dynamic value can result in unpredictable intervals and make backfilling impossible.

Ignoring Time ZonesAlways ensure that the start_date accounts for the time zone of your data source to avoid misaligned intervals.

Misconfigured Catchup SettingsIf catchup is disabled, missed intervals will not be processed, potentially leading to data gaps.


Airflow Templates Reference

Airflow provides several templated variables to simplify interactions with data intervals. These variables are crucial for creating dynamic, maintainable, and idempotent workflows.

By using these templates, you can ensure that your DAGs adapt to changing data intervals without requiring hardcoded values, making them more robust and easier to debug.

Airflow uses Jinja templating under the hood to enable this dynamic behaviour, allowing you to embed these variables directly in your task parameters or scripts.

Data Interval Templated Variables

Here’s a list of some commonly used templated references that you can embed in your DAGs:

  • {{ data_interval_start }}: Represents the start of the current data interval. Use this to query or process data that falls within the beginning of the interval.
  • {{ data_interval_end }}: Represents the end of the current data interval. This is helpful for defining the boundary for data processing tasks.
  • {{ logical_date }}: The logical date for the DAG run, which aligns with the start of the data interval. This is often used for logging or metadata purposes.
  • {{ prev_data_interval_start_success }}: The start of the data interval for the previous successful DAG run. Use this for tasks that depend on the output of earlier runs.
  • {{ prev_data_interval_end_success }}: The end of the data interval for the previous successful DAG run. This ensures continuity in workflows that process sequential data.

You can find a more comprehensive list of all available templated references in the relevant section of Airflow documentation.

The importance of Templates reference when authoring DAGs

Template variables are an integral part of writing flexible and efficient DAGs in Airflow. They provide a way to dynamically reference key properties of data intervals, ensuring that your workflows remain adaptable to changes and are easy to maintain.

Here are some of the main reasons why these variables are so important:

  1. Dynamic Adaptability: Template variables allow your DAGs to automatically adjust to the current data interval, eliminating the need for hardcoding specific dates or time ranges.
  2. Idempotency: By tying task parameters to specific data intervals, you ensure that reruns or retries produce the same results, regardless of when they are executed.
  3. Ease of Maintenance: Using templated variables reduces the risk of errors and simplifies updates. For example, if your data processing logic changes, you can adjust the templates without rewriting the DAG.
  4. Facilitate Backfilling: These variables make it easy to backfill historical data, as each DAG run is automatically tied to the appropriate data interval.
  5. Leverage Jinja Templating: Jinja templating enables you to embed these variables dynamically in your task commands, SQL queries, or scripts. This ensures your workflows remain flexible and context-aware.

Visualising Data Intervals

To better understand data intervals, consider the following visualisation

A visualisation representing of how data intervals work in Airflow - Source: Author
A visualisation representing of how data intervals work in Airflow – Source: Author

Every DAG has a single start date, and every DAG run executes only after the corresponding data interval has ended. The table below illustrates the values of each of the three data interval variables of interest, for every daily run.

| Logical Date | Data Interval Start | Data Interval End |
| ------------ | ------------------- | ----------------- |
| 2025-01-01   | 2025-01-01 00:00    | 2025-01-02 00:00  |
| 2025-01-02   | 2025-01-02 00:00    | 2025-01-03 00:00  |
| 2025-01-03   | 2025-01-03 00:00    | 2025-01-04 00:00  |
| 2025-01-04   | 2025-01-04 00:00    | 2025-01-05 00:00  |
...

A working example

In the example DAG below, we attempt to fetch posts from a public API (jsonplaceholder) that accepts a date as a parameter. In our scenario, we wish to specify the value of the date parameter to the beginning of the interval. In other words, if the DAG is supposed to run today, we want to call the API using yesterday’s date.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.dates import days_ago
from airflow.utils.dates import timedelta
from datetime import datetime

def get_posts_for_data_interval_start_date(**kwargs):
    """
    Fetch Posts from jsonplaceholder
    """
    import requests

    data_interval_start_dt = kwargs['data_interval_start']

    # Format the date as required by the API
    formatted_date = data_interval_start_dt.strftime('%Y-%m-%d')    
    print(f"Calling API for date: {formatted_date}")

    # Call the API
    response = requests.get(
        'https://jsonplaceholder.typicode.com/posts', 
        params={'date': formatted_date}
    )

    if response.status_code == 200:
        print(f'API call successful for {formatted_date}')
    else:
        print(f'API call failed for {formatted_date} with status code {response.status_code}')

dag = DAG(
    'test_dag',
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=True,  # Enable backfilling, to run missed intervals
)

api_task = PythonOperator(
    task_id='call_api_for_date',
    python_callable=get_posts_for_data_interval_start_date,
    dag=dag,
)

The DAG is scheduled to run daily, starting from January 1st, 2025. It leverages the data_interval_start templated variable to dynamically pass the date for each run. Specifically, the data_interval_start corresponds to the start of the execution window for each DAG run, allowing the API to receive the correct date parameter.

The start_date parameter determines when the DAG execution begins, while the catchup=True setting ensures that missed intervals are backfilled. This means that if the DAG is delayed or the system is down, Airflow will automatically execute tasks for the missed dates, ensuring no data is skipped.

The core of the DAG is a Python task that calls the API, passing the data_interval_start as a formatted date. This allows for a flexible, interval-based API query system.


Final Thoughts

Understanding Airflow’s data intervals is essential for building reliable workflows. By learning how start_date, logical dates, and templated variables work together, you can create pipelines that process data accurately and efficiently. Whether it’s backfilling historical data or managing complex workflows, these principles ensure your pipelines run smoothly and consistently.

Mastering data intervals will help you design workflows that are easier to maintain and adapt, making your day-to-day work in data engineering more effective and predictable.

The post Airflow Data Intervals: A Deep Dive appeared first on Towards Data Science.

]]>
Path Representation in Python https://towardsdatascience.com/path-representation-python-712d37917f9d/ Tue, 20 Aug 2024 04:49:48 +0000 https://towardsdatascience.com/path-representation-python-712d37917f9d/ Stop using strings to represent paths and use pathlib instead

The post Path Representation in Python appeared first on Towards Data Science.

]]>
Working with filesystems is one of those tasks that seem trivial at first glance, yet can easily catch even experienced developers off guard. I’ll be the first to admit – I’ve made my fair share of mistakes. One of the most common anti-patterns I’ve encountered is representing file paths as strings in Python.

It’s time to rethink this approach.

In today’s article, we’ll explore why using strings (or even the os module) for file paths is a recipe for disaster. We’ll dive into best practices and see how the Pathlib package can help you write cleaner, more maintainable code.


Why Using Strings for Paths is a Bad Idea

If you’ve ever worked on a project that needs to run on different operating systems, you know the pain of dealing with paths. Different systems have different ways of representing paths. Unix-based systems (like Linux and macOS) use a forward slash /, while Windows uses a backslash “. It’s a small detail, but one that can lead to big headaches if you’re not careful.

# Unix (e.g. Linux, OSX, etc.)
/home/this/is/a/path/to/a/directory

# Windows
C:homethisisapathtoadirectory

I once spent hours debugging a script that worked flawlessly on my Mac but completely fell apart when a colleague ran it on Windows. The issue? It was, of course, the file paths. I had hardcoded paths as strings, assuming they’d behave the same on any system.

The problem here is code portability. If you’re using strings to represent paths, you’ll need to write conditional logic to handle different operating systems. Not only does this make your code messy, but it also introduces unnecessary complexity.

Python"># This is a bad practice
import platform

if platform.system() == 'Windows':
    filepath = 'C:homethisisapathtoadirectory'
else:  # e.g. 'Darwin' for OSX or 'Linux'
    filepath = '/home/this/is/a/path/to/a/directory'

The worst part? It doesn’t stop there. Let’s say you need to concatenate two paths. Using plain string concatenation can lead to invalid paths, especially if one of the strings contains a slash at the end.

Trust me, I’ve been there.

path_1 = '/this/is/a/path/'
path_2 = '/another/path'

# filepath = '/this/is/a/path//another/path'
filepath = path_1 + path_2

Why the OS Module Isn’t the Solution

At this point, some of you might be thinking, "But what about the os module?". Yes, using os.path.join() can help you avoid some of the pitfalls of string concatenation. However, it’s far from a perfect solution.

import os

path_1 = '/this/is/a/path/'
path_2 = '/another/path'

# filepath = '/another/path'
filepath = os.path.join(path_1, path_2)

While this avoids the issue of double slashes, paths in the os module are still just strings. That means you’re still stuck dealing with all the limitations and complexities of string manipulation. Need to extract the parent directory? Want to check if a file exists? You’ll be writing a lot of extra code to get these basic tasks done.

In my early Python days, I relied heavily on the os module, thinking it was the go-to solution for path manipulation. But as my projects grew in complexity, so did my path-related bugs.


Pathlib to the rescue!

That’s where pathlib comes in. It’s part of Python’s standard library and is designed to make filesystem path manipulation a breeze. The beauty of pathlib lies in its ability to represent paths as objects—pure paths for computational operations and concrete paths for I/O operations.

Pathlib represents paths as objects - Source: Python documentation (licensed under PSF licence)
Pathlib represents paths as objects – Source: Python documentation (licensed under PSF licence)

As an example, let’s consider the following chunk of code that simply constructs a Path using Pathlib library:

import pathlib

path = pathlib.Path('folder') / 'subfolder' / 'another_subfolder'

print(path)
print(type(path))

The output of this code will vary depending on the operating system:

  • On Unix systems (Linux or macOS):
# The output when running it on Unix systems (Linux or OSX, for example)

folder/subfolder/another_subfolder
<class 'pathlib.PosixPath'>
  • On Windows
# The output when running it on Windows systems

foldersubfolderanother_subfolder
<class 'pathlib.WindowsPath'>

This automatic handling of paths based on the OS you’re running is a game-changer. And the benefits don’t stop there.


What Makes Pathlib So Powerful?

When it comes to handling file paths in Python, pathlib offers a range of features that go beyond just fixing the shortcomings of strings and the os module. Here are some key reasons why pathlib stands out as the best tool for the job.

  1. Dealing with current working directory (cwd)No more worrying about whether your script will run on Windows or Unix. Pathlib takes care of it for you.
from pathlib import Path

print(Path.cwd())
# On Unix: PosixPath(/Users/gmyrianthous/test)
# On Windows: WindowsPath(C:Usersgmyrianthoustest)

2. Programmatically creating new directory

from pathlib import Path

Path('new_directory').mkdir()

If the newly created directory exists already, the above code snippet will fail. If you want to ignore this failure, you can specify the corresponding argument when calling mkdir() method;

Path('new_directory').mkdir(exist_ok=True)

3. Checking if a file existsIn order to check if a particular file exists on the file system, you will first have to construct a Path and then make use of exists() method over the path object:

from pathlib import Path

file = Path('my_directory') / 'data.txt'
print(file.exists())

4. Listing the contents of a directoryTo list the contents of a directory you can call iterdir() that will return an iterator:

from pathlib import Path

filepath = Path('folder') / 'subfolder'
content = filepath.iterdir()

If the filepath is expected to contain a huge volume of paths and files, then I would advise you to iterate over the files within the iterator one by one, in order to avoid loading everything into memory.

Alternatively, if you are expecting a relatively small volume of contents, you can directly convert the iterator into a list:

from pathlib import Path

filepath = Path('folder') / 'subfolder'
content = list(filepath.iterdir())

Final Thoughts

It’s easy to overlook best practices when dealing with seemingly simple tasks like file path manipulation. I know I did. But as your projects grow, these "little" things can become major pain points. Investing the time to learn and use tools like pathlib will pay off in the long run, saving you from headaches and potential bugs.

So, if you’re still using strings to represent paths in Python, it’s time to make the switch. Your future self (and your teammates) will thank you.

The post Path Representation in Python appeared first on Towards Data Science.

]]>
Deploying dbt Projects at Scale on Google Cloud https://towardsdatascience.com/dbt-deployment-gcp-a350074e3377/ Mon, 29 Jul 2024 15:59:01 +0000 https://towardsdatascience.com/dbt-deployment-gcp-a350074e3377/ Containerising and running dbt projects with Artifact Registry, Cloud Composer, GitHub Actions and dbt-airflow

The post Deploying dbt Projects at Scale on Google Cloud appeared first on Towards Data Science.

]]>
Photo by Photo Boards on Unsplash
Photo by Photo Boards on Unsplash

Managing data models at scale is a common challenge for data teams using dbt (data build tool). Initially, teams often start with simple models that are easy to manage and deploy. However, as the volume of data grows and business needs evolve, the complexity of these models increases.

This progression often leads to a monolithic repository where all dependencies are intertwined, making it difficult for different teams to collaborate efficiently. To address this, data teams may find it beneficial to distribute their data models across multiple dbt projects. This approach not only promotes better organisation and modularity but also enhances the scalability and maintainability of the entire data infrastructure.

One significant complexity introduced by handling multiple dbt projects is the way they are executed and deployed. Managing library dependencies becomes a critical concern, especially when different projects require different versions of dbt. While dbt Cloud offers a robust solution for scheduling and executing multi-repo dbt projects, it comes with significant investments that not every organisation can afford or find reasonable. A common alternative is to run dbt projects using Cloud Composer, Google Cloud’s managed Apache Airflow service.

Cloud Composer provides a managed environment with a substantial set of pre-defined dependencies. However, based on my experience, this setup poses a significant challenge. Installing any Python library without encountering unresolved dependencies is often difficult. When working with dbt-core, I found that installing a specific version of dbt within the Cloud Composer environment was nearly impossible due to conflicting version dependencies. This experience highlighted the difficulty of running any dbt version on Cloud Composer directly.

Containerisation offers an effective solution. Instead of installing libraries within the Cloud Composer environment, you can containerise your dbt projects using Docker images and run them on Kubernetes via Cloud Composer. This approach keeps your Cloud Composer environment clean while allowing you to include any required libraries within the Docker image. It also provides the flexibility to run different dbt projects on various dbt versions, addressing dependency conflicts and ensuring seamless execution and deployment.


With the complexities of managing multiple dbt projects addressed, we now move on to the technical implementation of deploying these projects at scale on Google Cloud. The diagram below outlines the process of containerising dbt projects, storing the Docker images in Artifact Registry, and automating the deployment with GitHub Actions. Additionally, it illustrates how these projects are executed on Cloud Composer using the open-source Python package, dbt-airflow, which renders dbt projects as Airflow DAGs. The following section will guide you through each of these steps, providing a comprehensive approach to effectively scaling your dbt workflows.

Overview of dbt project deployment procedure on Google Cloud - Source: Author
Overview of dbt project deployment procedure on Google Cloud – Source: Author

Deploying containerised dbt projects on Artifact Registry with GitHub Actions

In this section, we will define a CI/CD pipeline using GitHub Actions to automate the deployment of a dbt project as a Docker image to Google Artifact Registry. This pipeline will streamline the process, ensuring that your dbt projects are containerised and consistently deployed on a Docker repo where Cloud Composer will then be able to pick them up.

First, let’s start with a high-level overview of how the dbt project is structured within the repository. This will help you follow along with the definition of the CI/CD pipeline since we will be working in certain sub-directories to get things done. Note that Python dependencies are managed via Poetry, hence the presence of pyproject.toml and poetry.lock files. The rest of the structure shared below should be straightforward to understand if you have worked with dbt in the past.

.
├── README.md
├── dbt_project.yml
├── macros
├── models
├── packages.yml
├── poetry.lock
├── profiles
├── pyproject.toml
├── seeds
├── snapshots
└── tests

With the project structure in place, we can now move on to defining the CI/CD pipeline. To ensure everyone can follow along, we’ll go through each step in the GitHub Action workflow and explain the purpose of each one. This detailed breakdown will help you understand how to implement and customise the pipeline for your own projects. Let’s get started!


Step 1: Creating triggers for the GitHub Action workflow

The upper section of our GitHub Action workflow defines the triggers that will activate the pipeline.

name: dbt project deployment
on:
  push:
    branches:
      - main
    paths:
      - 'projects/my_dbt_project/**'
      - '.github/workflows/my_dbt_project_deployment.yml'

Essentially, the pipeline is triggered by push events to the main branch whenever there are changes in the projects/my_dbt_project/** directory or modifications to the GitHub Action workflow file. This setup ensures that the deployment process runs only when relevant changes are made, keeping the workflow efficient and up-to-date.


Step 2: Defining some environment variables

The next section of the GitHub Action workflow sets up environment variables, which will be used throughout the subsequent steps:

env:
  ARTIFACT_REPOSITORY: europe-west2-docker.pkg.dev/my-gcp-project-name/my-af-repo
  COMPOSER_DAG_BUCKET: composer-env-c1234567-bucket
  DOCKER_IMAGE_NAME: my-dbt-project
  GCP_WORKLOAD_IDENTITY_PROVIDER: projects/11111111111/locations/global/workloadIdentityPools/github-actions/providers/github-actions-provider
  GOOGLE_SERVICE_ACCOUNT: my-service-account@my-gcp-project-name.iam.gserviceaccount.com
  PYTHON_VERSION: '3.8.12'

These environment variables store critical information needed for the deployment process, such as the Artifact Registry repository, the Cloud Composer DAG bucket, the Docker image name, service account details and workload identity federation.

💡 At a high level, Google Cloud’s Workload Identity allows applications running on Google Cloud to authenticate and authorize their identities in a secure and scalable way.

For more details, refer to the Google Cloud documentation.


Step 3: Checking out the repository

The next step in the GitHub Action workflow is to check out the repository:

- uses: actions/checkout@v4.1.6

This step uses the actions/checkout action to pull the latest code from the repository. This ensures that the workflow has access to the most recent version of the dbt project files and configurations needed for building and deploying the Docker image.


Step 4: Authenticating to Google Cloud and Artifact Registry

The next step in the workflow involves Google Cloud authentication

- name: Authenticate to Google Cloud
  id: google_auth
  uses: google-github-actions/auth@v2.1.3
  with:
    token_format: access_token
    workload_identity_provider: ${{ env.GCP_WORKLOAD_IDENTITY_PROVIDER }}
    service_account: ${{ env.GOOGLE_SERVICE_ACCOUNT }}

- name: Login to Google Artifact Registry
  uses: docker/login-action@v3.2.0
  with:
    registry: europe-west2-docker.pkg.dev
    username: oauth2accesstoken
    password: ${{ steps.google_auth.outputs.access_token }}

First, the workflow authenticates with Google Cloud using the google-github-actions/auth action. This step retrieves an access token by leveraging the provided workload identity provider and service account.

The access token from the previous authentication step is used to authenticate Docker with the specified registry (europe-west2-docker.pkg.dev) in the . This login enables the workflow to push the Docker image of the dbt project to the Artifact Registry in subsequent steps.


Step 5: Creating a Python environment

The next set of steps involves setting up the Python environment, installing Poetry, and managing dependencies.

- name: Install poetry
  uses: snok/install-poetry@v1.3.4
  with:
    version: 1.7.1
    virtualenvs-in-project: true

- name: Set up Python ${{ env.PYTHON_VERSION }}
  uses: actions/setup-python@v5.1.0
  with:
    python-version: ${{ env.PYTHON_VERSION }}
    cache: 'poetry'

- name: Load cached venv
  id: cached-poetry-dependencies
  uses: actions/cache@v4.0.2
  with:
    path: projects/my_dbt_project/.venv
    key: venv-${{ runner.os }}-${{ env.PYTHON_VERSION }}-${{ hashFiles('projects/my_dbt_project/poetry.lock') }}

- name: Install dependencies
  if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
  working-directory: './projects/my_dbt_project/'
  run: poetry install --no-ansi --no-interaction --sync

We first install poetry, a dependency management tool that we will use to install Python dependencies. Then we installed the specified Python version and eventually load the virtual environment from cache. If we have a cache miss (i.e. some changes were made to the poetry lock file since the last time the workflow has executed), Python dependencies will be installed from scratch. Alternatively, if we have a cache hit, dependencies will be loaded from cache.


Step 6: Compiling the dbt project

The following step involves cleaning the dbt environment, installing dbt dependencies, and compiling the dbt project.

- name: Clean dbt, install deps and compile
  working-directory: './projects/my_dbt_priject/'
  run: |
    echo "Cleaning dbt"
    poetry run dbt clean --profiles-dir profiles --target prod

    echo "Installing dbt deps"
    poetry run dbt deps

    echo "Compiling dbt"
    poetry run dbt compile --profiles-dir profiles --target prod

This step, will also generate the manifest.json file, a metadata file for the dbt project. This file is essential for the dbt-airflow package, which will be used by Cloud Composer to automatically render the dbt project as an Airflow DAG.


Step 7: Building and Pushing Docker Image on Artifact Registry

The next step in the workflow is to build and push the Docker image of the dbt project to the Google Artifact Registry.

- name: Build and Push Docker Image
  run: |
    FULL_ARTIFACT_PATH="${ARTIFACT_REPOSITORY}/${DOCKER_IMAGE_NAME}"
    echo "Building image"
    docker build --build-arg project_name=my_dbt_project --tag ${FULL_ARTIFACT_PATH}:latest --tag ${FULL_ARTIFACT_PATH}:${GITHUB_SHA::7} -f Dockerfile .
    echo "Pushing image to artifact"
    docker push ${FULL_ARTIFACT_PATH} --all-tags

Note how we build the image with two tags, namely latest as well as the short commit SHA. This approach serves two purposes; One is to be able to identify which docker image version is the latest and secondly, to be able to identify which commit is associated to each docker image. The latter can be extremely useful when debugging needs to take place for one reason or another.


Step 8: Syncing manifest file with Cloud Composer GCS bucket

The final step involves synchronizing the compiled dbt project, specifically the manifest.json file, to the Cloud Composer DAG bucket.

- name: Synchronize compiled dbt
  uses: docker://rclone/rclone:1.62
  with:
    args: >-
      sync -v --gcs-bucket-policy-only
      --include="target/manifest.json"
      projects/my_dbt_project/ :googlecloudstorage:${{ env.COMPOSER_DAG_BUCKET }}/dags/dbt/my_dbt_project

This step uses the rclone Docker image to synchronise the manifest.json file with the bucket of Cloud Composer. This is crucial to ensure that Cloud Composer has the latest metadata available, so that it can then be picked up by dbt-airflow package and render the latest changes made to the dbt project.


Step 9: Sending Slack alerts in case of a failure

The final step is to send a Slack alert if the deployment fails. All you need to do in order to replicate this step, is to issue a token (SLACK_WEBHOOK) as specified in the documentation.

- name: Slack Alert (on failure)
  if: failure()
  uses: rtCamp/action-slack-notify@v2.3.0
  env:
    SLACK_CHANNEL: alerts-slack-channel
    SLACK_COLOR: ${{ job.status }}
    SLACK_TITLE: 'dbt project deployment failed'
    SLACK_MESSAGE: |
      Your message with more details with regards to 
      the deployment failure goes here.
    SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}

The full definition of the GitHub action is shared below. In case you have any troubles running it, please do let me know in the comments and will do my best to help you out!

name: dbt project deployment
on:
  push:
    branches:
      - main
    paths:
      - 'projects/my_dbt_project/**'
      - '.github/workflows/my_dbt_project_deployment.yml'

env:
  ARTIFACT_REPOSITORY: europe-west2-docker.pkg.dev/my-gcp-project-name/my-af-repo
  COMPOSER_DAG_BUCKET: composer-env-c1234567-bucket
  DOCKER_IMAGE_NAME: my-dbt-project
  GCP_WORKLOAD_IDENTITY_PROVIDER: projects/11111111111/locations/global/workloadIdentityPools/github-actions/providers/github-actions-provider
  GOOGLE_SERVICE_ACCOUNT: my-service-account@my-gcp-project-name.iam.gserviceaccount.com
  PYTHON_VERSION: '3.8.12'

jobs:
  deploy-dbt:
    runs-on: ubuntu-22.04
    permissions:
      contents: 'read'
      id-token: 'write'

    steps:
      - uses: actions/checkout@v4.1.6

      - name: Authenticate to Google Cloud
        id: google_auth
        uses: google-github-actions/auth@v2.1.3
        with:
          token_format: access_token
          workload_identity_provider: ${{ env.GCP_WORKLOAD_IDENTITY_PROVIDER }}
          service_account: ${{ env.GOOGLE_SERVICE_ACCOUNT }}

      - name: Login to Google Artifact Registry
        uses: docker/login-action@v3.2.0
        with:
          registry: europe-west2-docker.pkg.dev
          username: oauth2accesstoken
          password: ${{ steps.google_auth.outputs.access_token }}

      - name: Install poetry
        uses: snok/install-poetry@v1.3.4
        with:
          version: 1.7.1
          virtualenvs-in-project: true

      - name: Set up Python ${{ env.PYTHON_VERSION }}
        uses: actions/setup-python@v5.1.0
        with:
          python-version: ${{ env.PYTHON_VERSION }}
          cache: 'poetry'

      - name: Load cached venv
        id: cached-poetry-dependencies
        uses: actions/cache@v4.0.2
        with:
          path: projects/my_dbt_project/.venv
          key: venv-${{ runner.os }}-${{ env.PYTHON_VERSION }}-${{ hashFiles('projects/my_dbt_project/poetry.lock') }}

      - name: Install dependencies
        if: steps.cached-poetry-dependencies.outputs.cache-hit != 'true'
        working-directory: './projects/my_dbt_project/'
        run: poetry install --no-ansi --no-interaction --sync

      - name: Clean dbt, install deps and compile
        working-directory: './projects/my_dbt_priject/'
        run: |
          echo "Cleaning dbt"
          poetry run dbt clean --profiles-dir profiles --target prod

          echo "Installing dbt deps"
          poetry run dbt deps

          echo "Compiling dbt"
          poetry run dbt compile --profiles-dir profiles --target prod

      - name: Build and Push Docker Image
        run: |
          FULL_ARTIFACT_PATH="${ARTIFACT_REPOSITORY}/${DOCKER_IMAGE_NAME}"
          echo "Building image"
          docker build --build-arg project_name=my_dbt_project --tag ${FULL_ARTIFACT_PATH}:latest --tag ${FULL_ARTIFACT_PATH}:${GITHUB_SHA::7} -f Dockerfile .
          echo "Pushing image to artifact"
          docker push ${FULL_ARTIFACT_PATH} --all-tags

      - name: Synchronize compiled dbt
        uses: docker://rclone/rclone:1.62
        with:
          args: >-
            sync -v --gcs-bucket-policy-only
            --include="target/manifest.json"
            projects/my_dbt_project/ :googlecloudstorage:${{ env.COMPOSER_DAG_BUCKET }}/dags/dbt/my_dbt_project

      - name: Slack Alert (on failure)
        if: failure()
        uses: rtCamp/action-slack-notify@v2.3.0
        env:
          SLACK_CHANNEL: alerts-slack-channel
          SLACK_COLOR: ${{ job.status }}
          SLACK_TITLE: 'dbt project deployment failed'
          SLACK_MESSAGE: |
            Your message with more details with regards to 
            the deployment failure goes here.
          SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}

Running dbt projects with Cloud Composer and dbt-airflow

In the previous section, we discussed and demonstrated how to deploy dbt project Docker images on Google Artifact Registry using GitHub Actions. With our dbt projects containerised and stored securely, the next crucial step is to ensure that Cloud Composer can seamlessly pick up these Docker images and execute the dbt projects as Airflow DAGs. This is where the dbt-airflow package comes into play.

In this section, we will explore how to configure and use Cloud Composer alongside the dbt-airflow package to automate the running of dbt projects. By integrating these tools, we can leverage the power of Apache Airflow for orchestration while maintaining the flexibility and scalability provided by containerised deployments.


To ensure the dbt-airflow package can render and execute our containerised dbt project on Cloud Composer, we need to provide the following:

  1. Path to the manifest file: The location of the manifest.json file on the Google Cloud Storage (GCS) bucket, which was pushed during the CI/CD process by our GitHub Action
  2. Docker Image details: The relevant details for the Docker image residing on Artifact Registry, enabling dbt-airflow to run it using the KubernetesPodOperator

Here’s the full definition of the Airflow DAG:

import functools
from datetime import datetime 
from datetime import timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from dbt_airflow.core.config import DbtAirflowConfig
from dbt_airflow.core.config import DbtProfileConfig
from dbt_airflow.core.config import DbtProjectConfig
from dbt_airflow.core.task_group import DbtTaskGroup
from dbt_airflow.operators.execution import ExecutionOperator

with DAG(
    dag_id='test_dag',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    'on_failure_callback': functools.partial(
        our_callback_function_to_send_slack_alerts
    ),
) as dag:

    t1 = EmptyOperator(task_id='extract')
    t2 = EmptyOperator(task_id='load')

    tg = DbtTaskGroup(
        group_id='transform',
        dbt_airflow_config=DbtAirflowConfig(
            create_sub_task_groups=True,
            execution_operator=ExecutionOperator.KUBERNETES,
            operator_kwargs={
                'name': f'dbt-project-1-dev',
                'namespace': 'composer-user-workloads',
                'image': 'gcp-region-docker.pkg.dev/gcp-project-name/ar-repo/my-dbt-project:latest',
                'kubernetes_conn_id': 'kubernetes_default',
                'config_file': '/home/airflow/composer_kube_config',
                'image_pull_policy': 'Always',
            },
        ),
        dbt_project_config=DbtProjectConfig(
            project_path=Path('/home/my_dbt_project/'),  # path within docker container
            manifest_path=Path('/home/airflow/gcs/dags/dbt/my_dbt_project/target/manifest.json'),  # path on Cloud Composer GCS bucket
        ),
        dbt_profile_config=DbtProfileConfig(
            profiles_path=Path('/home/my_dbt_project/profiles/'),  # path within docker container
            target=dbt_profile_target,
        ),
    )

    t1 >> t2 >> tg

Once the Airflow scheduler picks up the file, your dbt project will be seamlessly transformed into an Airflow DAG. This automated process converts your project into a series of tasks, visually represented in the Airflow UI.

As shown in the screenshot below, the DAG is structured and ready to be executed according to the defined schedule. This visualisation not only provides clarity on the flow and dependencies of your dbt tasks but also allows for easy monitoring and management, ensuring that your data transformations run smoothly and efficiently.

dbt project automatically rendered with dbt-airflow - Source: dbt-airflow official GitHub repository
dbt project automatically rendered with dbt-airflow – Source: dbt-airflow official GitHub repository

While this overview gives you a foundational understanding, covering the full capabilities and configuration options of the dbt-airflow package is beyond the scope of this article.

For those eager to explore further and unlock the full potential of dbt-airflow, I highly recommend reading the detailed article linked below. It covers everything you need to know to master the integration of dbt with Airflow, ensuring you can elevate your data transformation workflows.

dbt + Airflow = ❤


Final Thoughts

By containerising your dbt projects, leveraging Artifact Registry, Cloud Composer, GitHub Actions, and the dbt-airflow package, you can streamline and automate your data workflows in a scalable and effective way.

This comprehensive guide aimed to help you build deployment process, providing you with the tools and knowledge to efficiently manage and execute your dbt projects at scale. I truly hope it has been insightful and has equipped you with the confidence to implement these strategies in your own projects.

The post Deploying dbt Projects at Scale on Google Cloud appeared first on Towards Data Science.

]]>
Modeling Slowly Changing Dimensions https://towardsdatascience.com/slowly-changing-dimensions-6a08dc0386ae/ Fri, 03 May 2024 16:57:45 +0000 https://towardsdatascience.com/slowly-changing-dimensions-6a08dc0386ae/ A deep dive into the various SCD types and how they can be implemented in Data Warehouses

The post Modeling Slowly Changing Dimensions appeared first on Towards Data Science.

]]>
Photo by Pawel Czerwinski on Unsplash
Photo by Pawel Czerwinski on Unsplash

In today’s dynamic and competitive landscape, modern organisations heavily invest in their data assets. This investment ensures that teams across the entire organisational spectrum – ranging from leadership, product, engineering, finance, marketing, to human resources – can make informed decisions.

Consequently, data teams have a pivotal role in enabling organisations to rely on data-driven decision-making processes. However, merely having a robust and scalable data platform isn’t enough; it’s essential to extract valuable insights from the stored data. This is where data modeling comes into play.

At its essence, data modeling defines how data is stored, organised, and accessed to facilitate the extraction of insights and analytics. The primary objective of data modeling is to ensure that data needs and requirements are met to support the business and product effectively.

Data teams strive to offer organisations the ability to unlock the full potential of their data but usually encounter a big challenge that relates to how the data is structured such that meaningful analyses can be performed by the relevant teams. This is why modeling dimensions is one of the most important aspects when designing data warehouses.


Dimensions and Data Modeling

As organisations evolve and adapt to changing needs, the simplicity of early data models often gives way to complexity. Without proper modeling, this complexity can quickly spiral out of control, leading to inefficiencies and challenges in managing and deriving value from the data.

Dimensions are crucial components of data modeling as they offer a structured framework that allows data teams to organise their data. Essentially, dimensions represent the different perspectives from which data can be analysed and understood.

Put simply, dimensions provide a lens through which data can be interpreted, facilitating decision-making processes. Whether analysing sales trends, user engagement patterns, customer segmentation, or product performance, dimensions play a pivotal role in measuring and understanding various aspects of the data.

To illustrate, let’s consider a business that offers multiple products across different countries or markets. The following diagram depicts a cube representing the model, consisting of three dimensions: product, market, and time. By incorporating these dimensions, the business can extract different measures to inform decision-making processes.

Modeling data dimensions - Source: Author
Modeling data dimensions – Source: Author

In essence, dimensions assist organisations in several key ways:

  1. Organising Data: Dimensions streamline data organisation and make data navigation more intuitive. By categorising data into distinct perspectives, dimensions facilitate easier access and retrieval of relevant information
  2. Establishing Clear Relationships: Dimensions define clear relationships with fact tables, which typically store measures, transactions, or events. These relationships enable seamless integration of context with quantitative data, ensuring a comprehensive understanding of the underlying information
  3. Enhancing Analytical Capabilities: Dimensions enhance analytical capabilities by enabling data users to extract insights and build meaningful reports or dashboards. By slicing, dicing, and drilling down data along different dimensions, organisations can gain deeper insights into various aspects of their operations
  4. Improving System Performance: Dimensions play a crucial role in improving the performance of data systems. By structuring data efficiently and optimising queries, dimensions facilitate the extraction of valuable insights in a cost-efficient and timely manner, ultimately enhancing decision-making processes

Dimensions are indeed a critical aspect of data modeling, with the potential to significantly impact the effectiveness of your data products. However, they pose a unique challenge due to the dynamic nature of data. Data is not static; it continuously changes over time. Therefore, it becomes increasingly important to implement techniques that ensure changes are accurately captured and seamlessly integrated into the data models.


Understanding Slowly Changing Dimensions

Slowly Changing Dimensions (SCDs) represent a foundational concept in the context of Data Warehouse design, having a direct influence on the operational capacity of data analytics teams.

SCDs is a concept used to address how to capture and store data changes of dimensions over time. Put simply, Slowly Changing Dimensions offer a framework that enables data teams track data historicity within the data warehouse.

Failing to model SCDs in a way that is both proper and aligned with the needs of the business and product can have profound consequences. It may lead to an inability to accurately capture historical data, jeopardising the organisation’s capacity to report essential metrics. This poses a substantial risk, as it undermines the reliability and completeness of the analytical insights derived from the data.

In more technical terms, SCDs have the same natural key and an additional set of data attributes that may (or may not) change over time. The way teams handle these records could determine whether historicity is tracked and subsequently, whether the business metrics of interest can be extracted.

Furthermore, the implementation of SCDs within a data warehouse could also significantly impact other aspects of the data platform. For instance, neglecting to model SCDs properly could lead to the creation of non-idempotent data pipelines, which, in turn, may introduce various challenges in data management.


The five types of Slowly Changing Dimensions

Dealing with the challenges arising from changes to data over time involves employing various methodologies known as SCD Types.

SCD Type 0: Retain original

The first type of Slowly Changing Dimensions, known as SCD Type 0, deals with data that remains static over time. Examples of such data include Date of Birth, National Insurance Number (or Social Security Number for those in the US), and date dimensions.

SCD Type 0 is suitable for data records whose attributes don't change over time - Source: Author
SCD Type 0 is suitable for data records whose attributes don’t change over time – Source: Author

SCD Type 1: Overwrite

Type 1 refers to data records that are overwritten each time a change occurs. This means that historical records are not retained, making it impossible to track changes over time.

Before implementing this dimension, it’s crucial to determine whether historical data for these attributes is necessary. Otherwise, the loss of historical data could limit the team’s analytics capabilities.

For instance, let’s consider a business that ships products to customers and needs to store shipping addresses. From an operational standpoint, the business only requires the customer’s latest address for delivery. However, if the business also aims to analyse how often a customer changes their address over time, SCD Type 1 may not be suitable. This type does not retain historical changes, potentially hindering the extraction of such insights.

In SCD Type 1, attributes in data records are overwritten and historicity is not maintained - Source: Author
In SCD Type 1, attributes in data records are overwritten and historicity is not maintained – Source: Author

SCD Type 2: Create new record

SCD Type 2 involves the creation of a new record each time a change occurs. This means that for the same natural key, a new record with a distinct surrogate key is generated to represent the change. This is the way SCD Type 2 preserves historical data.

By associating each natural key with at least one surrogate key, the system retains a trail of changes over time. This approach allows for the tracking of historical variations while maintaining a clear lineage of data evolution.


💡 A quick refresher on Natural vs Surrogate Keys

Natural Key: This is usually a column, or a set of columns, that exist(s) already in the table (i.e. they are attributes of the entity within the data model) and can be used to uniquely identify a record in a table.

Surrogate Key: This is usually a system-generated value – such as UUID, sequence or unique identifier – that has no business meaning and is only used to uniquely identify a record in a table.


In practice, this means that instead of overwriting existing records, as in SCD Type 1, a new record is added to the dimension table, with its own surrogate key. This method ensures that historical data remains intact and accessible for analytical purposes.

For instance, in a scenario where a customer updates their address, instead of modifying the existing customer record, a new record is appended to the dimension table. This approach enables the business to analyse past and present customer details, facilitating insights into trends, behaviours, and historical patterns.

There are several ways to implement SCD Type 2, each with its own approach to preserving historical data; some of them include:

  • Timestamp Columns for Validity Intervals: One common approach is to utilise two timestamp columns to denote validity intervals. This method is widely used as it enables tracking of when a change occurred and facilitates effective time window analysis. By recording both the start and end timestamps, it becomes easier to understand the duration of each data version.
In SCD Type 2, a new record is created when a change occurs. Historical records are preserved - Source: Author
In SCD Type 2, a new record is created when a change occurs. Historical records are preserved – Source: Author
  • Effective Date and Flag Columns: Another approach combines the use of two columns; an effective date column indicating when a change took effect and a flag column indicating the record’s current validity status.
In SCD Type 2, a new record is created when a change occurs. Historical records are preserved - Source: Author
In SCD Type 2, a new record is created when a change occurs. Historical records are preserved – Source: Author

SCD Type 3: Add new attribute

SCD Type 3 tracks changes in a dimension by introducing a new column to preserve limited historical data. Specifically, this type can capture one change per record.

In SCD Type 3, a new attribute is created when a change occurs. Historicity is retained only for one change.
In SCD Type 3, a new attribute is created when a change occurs. Historicity is retained only for one change.

Unlike the previous types discussed, SCD Type 3 maintains the same natural and surrogate keys for records that have undergone changes in at least one attribute.

This method is particularly useful in scenarios where the primary key must remain unchanged and correspond to a single natural key. Additionally, SCD Type 3 is suitable when only one-time changes need to be recorded, or when it’s guaranteed that a record will not undergo multiple updates.

SCD Type 4: History Tables

Dimensions conforming to SCD Type 4 are stored across two distinct tables. The first table maintains the current state of records, while the second table, known as the history table, preserves all past changes that are no longer valid. Additionally, the history table includes an extra attribute to denote the timestamp when each record was created.

Returning to our example, whenever a change occurs, the latest record is updated in the main table, while an entry is appended to the history table along with the effective timestamp attribute.

In SCD Type 4, current and historical states of a record are persisted in two distinct tables - Source: Author
In SCD Type 4, current and historical states of a record are persisted in two distinct tables – Source: Author

Unlike SCD Type 2, where historical records are managed by adding new entries to the dimension table, SCD Type 4 addresses the scalability issue posed by rapidly changing dimensions.

In this approach, columns expected to undergo frequent changes are relocated to a separate history table.


Implementing Slowly Changing Dimensions in a Data Warehouse

In the context of Data Warehouse design, early consideration of Slowly Changing Dimensions modeling is crucial. As highlighted previously, the effective capture of historical changes within dimensions significantly influences an organisation’s analytical capabilities.

Selecting the appropriate SCD type is not a one-size-fits-all decision; it hinges on both business and technical requirements. Therefore, the initial step is to identify dimensions that require historical retention. While some dimensions may not require capturing changes, others demand the preservation of historical records.

If uncertainty persists regarding whether historical changes should be captured for certain dimensions, it’s advisable to err on the side of caution and opt for SCD types that maintain historical data. This approach is particularly prudent unless data ingestion pipelines and jobs are entirely replayable, which, in my experience, is not always the case.

In cases where changes are not captured and data ingestion pipelines lack replayability, the risk of losing historical data permanently becomes apparent. This loss could severely limit the organization’s ability to conduct retrospective analysis and derive valuable insights from historical trends and patterns. Therefore, careful consideration and proactive measures are essential to ensure the preservation of historical data integrity within the data warehouse.

The selection of the appropriate SCD type should be based on the nature of the dimension and the expected frequency of change. It’s important to note that different dimensions may require different SCD types. However, implementing multiple SCD types means the team will need to manage several different patterns, which requires careful consideration. While this may introduce complexity, it’s crucial to ensure the accurate representation and maintenance of historical data across various dimensions.

As previously noted, these decisions reside at the intersection of business and technical requirements. Therefore, it’s crucial to engage not only data engineers but also data analysts and analytics engineers in defining the actual business needs. This collaborative approach ensures that both the technical feasibility and the business objectives are effectively addressed.

Collaboration between Data Engineers, Analysts and Analytics engineers is essential to determine the most effective approach to model Slowly Changing Dimensions - Source: Author
Collaboration between Data Engineers, Analysts and Analytics engineers is essential to determine the most effective approach to model Slowly Changing Dimensions – Source: Author

After capturing the business needs, the technical implementation during ingestion must accommodate the identified requirements. This ensures that the data ingestion processes align with and fulfil the established business needs effectively. On the technical side now, there are a few considerations that need to be taken into account.

ELT Pipelines

One approach used to ingest data – in a batch fashion – from an external source, be it a database, an API or a file store, is through ELT pipelines. In essence, an ELT pipeline functions by extracting data from a source, loading it into the destination system, and subsequently executing transformations on the ingested raw data.

Depending on how a particular Slowly Changing Dimension has been modeled, the ELT pipeline needs to behave accordingly in order to comply with the requirements specified by the corresponding SCD type.

  • SCD Type 0: The pipeline simply needs to load new records, given that existing records are not updated
  • SCD Type 1: The pipeline does not need to retain historicity and this a MERGE INTO query can be used to insert new records and overwrite existing ones that match the specified natural key
  • SCD Type 2: The pipeline needs to insert new records for each change on the dimension table, along with the additional timestamp columns that indicate the effective time window of a particular record. A MERGE INTO query can simplify the process of inserting new records, and updating effective end dates of existing records that are no longer valid.
  • SCD Type 3: The pipeline can use a MERGE INTO query in order to insert new records and/or update the existing record to add the current value for an attribute that has changed
  • SCD Type 4: The ELT pipeline will now have to add or update records in two tables. Depending on the way you implement SCD type 4 you need to adjust the operations of the pipeline accordingly. UDPATE (and thus MERGE INTO) are not so applicable in this type of SCD. Hence, you can write a query that adds inserts the new record in the table consisting of the current state of the dimension, and also captures the historical change in the history table

The above steps could preserve your preferred SCD type at ingestion time. However, many teams need to apply dimension modeling in more data models. A widely used tool for implementing the transformation step in ELT pipelines is the data build tool (dbt). In addition to ensuring that source tables adhere to the predetermined strategy for handling Slowly Changing Dimensions, it’s important to consider preserving historical data in other data models resulting from subsequent transformations.

dbt offers snapshots, a mechanism that captures changes to a mutable table over time. dbt snapshots inherently implement SCD Type 2 functionality, making them a convenient solution for managing historical data within transformed datasets.

Change Data Capture (CDC)

As mentioned earlier, ELT pipelines are commonly used to ingest data in batches. This means that changes happening in a source, will be captured at a slower pace by an ELT pipeline, since such jobs are scheduled to run on an hourly, daily, or even weekly/monthly schedule.

Change Data Capture pattern can be used to capture and ingest data changes at near real-time. It is a commonly used design pattern that aims to keep source and destinations systems in sync, while preserving the historical changes that occurred over time.

In technical terms, CDC can help us determine when a record has been created or changed. Every record ingested via CDC will usually indicate when the change has occured, what the attributes of the new record are and what kind of operation it relates to (i.e. INSERT, UPDATE or DELETE).

Even though CDC can be used to serve pretty much all SCD types, to my experience, it is widely adopted as part of modeling SCD of Type 2.


The impact of SCD on data pipelines’ idempotency

When building data pipelines, it’s crucial to ensure they are idempotent. The method used to capture changing dimensions could significantly affect your team’s capacity to construct idempotent pipelines and workflows that consume data from the Data Warehouse.


💡 A quick refresher on idempotent data pipelines

Idempotent pipeline: A pipeline that for the same input, it always produces the same results regardless of when or how many times you run it.


  • SCD Type 0: Does not really affect idempotency in data pipelines since these dimensions don’t change over time
  • SCD Type 1: In this type, only the latest value of an attribute is retained. Therefore, pipelines consuming data from a source modeled using SCD Type 1 are not idempotent. Every time a change occurs and the pipeline re-runs, a different output will be generated
  • SCD Type 2: This type is probably the perfect one when it comes down to building idempotent pipelines. However, you need to be careful when using start and end validity intervals
  • SCD Type 3: This SCD type could be considered as a partially idempotent but since it only captures only up to one data mutation, I would consider it as a non idempotent (unless it is guaranteed that attributes won’t change more than once)

Final Thoughts

Effectively capturing and managing changes in data dimensions is fundamental to a well-designed Data Warehouse. Therefore, understanding how each dimension should be modeled, considering the frequency and nature of attribute changes, is paramount.

Moreover, it’s crucial to recognize the broader implications of processes like ELT pipelines or CDC on Slowly Changing Dimensions. These processes can significantly influence how dimensional data evolves over time.

It’s important to emphasize that there’s no one-size-fits-all solution. Instead, decisions should be driven by the unique business and analytical requirements of the organization.

Retaining and appropriately modeling historical data changes is a core responsibility of Data Engineering teams. This not only ensures data integrity but also maximises the potential for leveraging organisational data effectively.

The post Modeling Slowly Changing Dimensions appeared first on Towards Data Science.

]]>
Beneath the Surface: A Closer Look at 4 Airflow Internals https://towardsdatascience.com/airflow-internals-851f4a440028/ Sat, 03 Feb 2024 15:47:57 +0000 https://towardsdatascience.com/airflow-internals-851f4a440028/ Four Apache Airflow internals you might have missed

The post Beneath the Surface: A Closer Look at 4 Airflow Internals appeared first on Towards Data Science.

]]>
Image generated via DALL-E
Image generated via DALL-E

I have been working with Airflow for more than three years now and overall, I am quite confident with it. It’s a powerful orchestrator that helps me build data pipelines quickly and in a scalable fashion while for most things I am looking to implement it comes with batteries included.

Recently, and while preparing myself to get a certification for Airflow, I’ve come across many different things I had literally no clue about. And this was essentially my motivation to write this article and share with you a few Airflow internals that have totally blown my mind!


1. Scheduler only parses files containing certain keywords

The Airflow Scheduler will parse only files containing airflow or dag in the code! Yes, you’ve heard this right! If a file under the DAG folder does not contain at least one of these two keywords, it will simply not be parsed by the scheduler.

If you want to modify this rule such that this is no longer a requirement for the scheduler, you can simply set DAG_DISCOVERY_SAFE_MODE configuration setting to False. In that case, the scheduler will parse all files under your DAG folder (/dags).

I wouldn’t recommend disabling this check though, since doing so doesn’t really make any sense. A proper DAG file will have Airflow imports and DAG definition which means the requirements for parsing that file are met) but it is worth knowing that this rule exists.


2. Variables with certain keywords in their name have their values hidden

We know that by default, Airflow will hide sensitive information stored in a Connection (and more specifically in the password field), but what about Variables?

Well, this is indeed possible and the mind blowing thing is that Airflow can do this automatically for you. If a variable contains certain keywords, that can possibly indicate sensitive information, then its value will automatically be hidden.

Here’s a list of keywords that will make a Variable qualify for having sensitive information store as its value:

access_token
api_key
apikey
authorization
passphrase
passwd
password
private_key
secret
token
keyfile_dict
service_account

This means that if your variable name contains any of these keywords, Airflow will handle its value accordingly.

Let’s see this feature in action. First, let’s create a variable from the UI whose name does not contain any keyword that would make it qualify as sensitive. As you can see in the screenshot below, the value of variable my_var is visible on the User Interface.

Airflow variable that does not qualify as sensitive - Source: Author
Airflow variable that does not qualify as sensitive – Source: Author

Now let’s create another variable containing one of the keywords in its name and see what happens. From the UI, we go to Admin -> Variables -> + and create a new variable called my_api_key:

Creating a new Airflow variable via User Interface - Source: Author
Creating a new Airflow variable via User Interface – Source: Author

Now if we go to the Variables section in the User Interface, we can see our newly created Variable but you should now notice that its value is actually hidden and now replaced with asterisks.

Airflow variables containing certain keywords in their name will have their values hidden from the UI - Source: Author
Airflow variables containing certain keywords in their name will have their values hidden from the UI – Source: Author

Personally, I prefer storing Airflow variables in a secret store, such as HashiCorp Vault, and therefore I had no clue that Airflow handles variables this way. In fact, it is a very nice and useful feature. However, I would expect it to be a bit more flexible. Instead of having a set of pre-defined keywords, it would be more convenient if we could specify whether a variable contains sensitive information rather than restricting ourselves in naming a variable with a particular keyword. Let’s hope this is going to be implemented in a future Airflow version.

For now, you can still extend the list of sensitive keywords by configuring Airflow accordingly, either using sensitive_var_conn_names in airflow.cfg (under [core] section), or by exporting AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES environment variable.

sensitive_var_conn_names

  • New in version 2.1.0.

A comma-separated list of extra sensitive keywords to look for in variables names or connection’s extra JSON.

Type: stringDefault: ''Environment Variable: AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES

Airflow Docs


3. Two or more DAGs can actually have the same DAG ID (but UI won’t like it)

I always make sure to use unique IDs for all of my DAGs. In fact, I do so by naming them after their Python filename (since this approach guarantees that no duplicate filename – and thus DAG id – can be created by mistake). All this time, I had the impression that it is not even possible to have two or more DAGs with the same ID but surprisingly, this is far from true!

Two or more DAGs can actually have the same DAG ID, meaning that you will not see any error at all. However, this is a bad practice and must be avoided. Even if the scheduler will not complain about DAGs with the same ID, on the User Interface one DAG will appear. In fact, if you refresh the UI after a while, the one previously shown might disappear and one of the DAGs with the same ID will appear this time.

It’s not just about rendering the DAG in the User Interface though. Things can go extremely wrong when you want to reference a DAG whose ID is used by other DAG(s) too. As an example, consider the TriggerDagRunOperator that can be used in a DAG to trigger another DAG. In such case, the wrong DAG might get triggered. Users even reported that clicking ‘Run’ on the UI of the DAG being shown, resulted in the execution of the DAG with the same ID that was not shown in the UI. So make sure to avoid having duplicate DAG IDs.

💡 Pro Tip: Instead of hardcoding the ID of a DAG, you can dynamically name it after the filename itself, using Path(__filename__).stem:

from pathlib import Path

from airflow import DAG

with DAG(
  dag_id=Path(__file__).stem,
  ...
): 
  ...

4. Airflow supports "ignore file"

I am pretty sure you are familiar with .gitignore file, but personally, I had no clue that Airflow supports the same construct, too.

You can instruct scheduler to ignore certain files in your DAG folder, by creating a .airflowignore file under the DAG folder itself (i.e. /dags/.airflowignore). Overall it works like a .gitignore file. You can use it to specify directories or files in DAG folder. Each line in .airflowignore essentially specifies a regex pattern, which means that any directory or file whose name matches any of the patterns specified will be ignored.

As an example, let’s assume that we have created a .airflowignore file with the content outlined below.

# .airflowignore

helper
dbt_[d]

Files like

  • helper_functions.py
  • utils_HeLpEr.py
  • dbt_1.py
  • dbt_2.py
  • helper/utils.py

will be completely ignored. Be careful though, since if a directory’s name matches any of the patterns, this directory and all its subfolders would not be scanned by Airflow at all.

This feature can help you speed up DAG parsing in case you have tons of files in your DAG folder that your scheduler should not care about.


Final Thoughts..

Whether you are new to Airflow or a seasoned Engineer, there are always opportunities to learn new things about the technology, which might not even be mentioned (explicitly) in the documentation.

💡 Thanks for taking the time to read this article. If you found it useful, make sure to follow me and subscribe to my Medium newsletter to be notified when the next article will be out.

The post Beneath the Surface: A Closer Look at 4 Airflow Internals appeared first on Towards Data Science.

]]>
Mastering Airflow Variables https://towardsdatascience.com/mastering-airflow-variables-32548a53b3c5/ Sat, 27 Jan 2024 06:33:32 +0000 https://towardsdatascience.com/mastering-airflow-variables-32548a53b3c5/ The way you retrieve variables from Airflow can impact the performance of your DAGs

The post Mastering Airflow Variables appeared first on Towards Data Science.

]]>
What happens if multiple data pipelines need to interact with the same API endpoint? Would you really have to declare this endpoint in every pipeline? In case this endpoint changes in the near future, you will have to update its value in every single file.

Airflow variables are simple yet valuable constructs, used to prevent redundant declarations across multiple DAGs. They are simply objects consisting of a key and a JSON serializable value, stored in Airflow’s metadata database.

And what if your code uses tokens or other type of secrets? Hardcoding them in plain-text doesn’t seem to be a secure approach. Beyond reducing repetition, Airflow variables also aid in managing sensitive information. With six different ways to define variables in Airflow, selecting the appropriate method is crucial for ensuring security and portability.

An often overlooked aspect is the impact that variable retrieval has on Airflow performance. It can potentially strain the metadata database with requests, every time the Scheduler parses the DAG files (defaults to thirty seconds).

It’s fairly easy to fall into this trap, unless you understand how the Scheduler parses DAGs and how Variables are retrieved from the database.


Defining Airflow Variables

Before getting into the discussion of how Variables are fetched from the metastore and what best practices to apply in order to optimise DAGs , it’s important to get the basics right. For now, let’s just focus on how we can actually declare variables in Airflow.

As mentioned already, there are several different ways to declare variables in Airflow. Some of them turn out to be more secure and portable than others, so let’s examine all and try to understand their pros and cons.

1. Creating a variable from the User Interface

In this first approach, we are going to create a variable through the User Interface. From the top menu select AdminVariables+

Creating a new variable from the User Interface - Source: Author
Creating a new variable from the User Interface – Source: Author

Once you enter the key and value, click Save to create it. The variable should now be visible in the Variables’ List. By default, variables created on the UI are automatically stored on the metadata database.

List of Variables on Airflow UI - Source: Author
List of Variables on Airflow UI – Source: Author

Notice how the value of the Variable is shown in plain text. If you are looking into storing sensitive information in one of your Airflow Variables, then the UI approach may not be the most suitable.

Furthermore, this option lacks portability. If you want to re-create an environment, you will first have to manually export them from the current environment and finally import them back to the newly created one.

2. Creating a variable by exporting an environment variable

The second option we have, is to export environment variables using the AIRFLOW_VAR_<VARIABLE_NAME> notation.

The following commands can be used to create two variables, namely foo and bar.

export AIRFLOW_VAR_FOO=my_value
export AIRFLOW_VAR_BAR='{"newsletter":"Data Pipeline"}'

One advantage of this approach is that variables created via environment variables are not visible on the UI (yet, you can of course reference them in your code) and thus any sensitive information will not be visible either.

Unlike variables created via the UI, this approach won’t persist them on the metadata database. This means that environment variables are faster to retrieve since no database connection should be established.

But still, managing environment variables can be challenging too. How do you secure the values if environment variables are stored in a file used in automation script responsible for Airflow deployment?

3. Creating a variable via Airflow CLI

Variables can also be created with the use of Airflow CLI. You will first need to connect to the Airflow Scheduler worker. For example, if you are running Airflow via Docker, you will first need to find the scheduler’s container id and run

docker exec -it <airflow-scheduler-container-id> /bin/bash

You can now create a new Airflow Variable using airflow variables command:

airflow variables set 
    my_cli_var 
    my_value 
    --description 'This variable was created via CLI'

If you would like to assign multiple values in a particular variable, you should consider using JSON format:

airflow variables set 
    my_cli_json_var 
    '{"key": "value", "another_key": "another_value"}' 
    --description 'This variable was created via CLI'

You will also have the option to serialise the JSON variable. This can be done by providing the -j or --json flag.

airflow variables set 
    --json 
    my_cli_serialised_json_var 
    '{"key": "value", "another_key": "another_value"}' 
    --description 'This variable was created via CLI'

Now if we head back to the Variable list on the UI, we can now see all three variables that were created in previous steps.

Variables created via Airflow CLI are visible on the User Interface - Source: Author
Variables created via Airflow CLI are visible on the User Interface – Source: Author

Variables created via the CLI are visible on the User Interface, which means sensitive information can also be exposed and are stored on the metadata database.

In my opinion, this approach is handful in development environments where you may want to quickly create and test a particular variable or some functionality referencing it. For production deployments, you will have to write an automation script to create (or update) these variables, which means this information must be stored somewhere, say in a file, which makes it challenging to handle given that some variables may contain sensitive information.

4. Creating a variable using the REST API

This fourth approach involves calling the REST API in order to get some variables created. This is similar to the Airflow CLI approach and it also offers the same advantages and disadvantages.

curl -X POST ${AIRFLOW_URL}/api/v1/variables 
        -H "Content-Type: application/json" 
        --user "${AIRFLOW_USERNAME}:${AIRFLOW_PASSWORD}" 
        -d '{"key": "json_var", "value": "{"key1":"val1"}"}'

5. Creating variables programmatically

Programmatic creation of variables is also feasible and straightforward.

Python">def create_vars():
    from airflow.models import Variable

    Variable.set(key='my_var', value='my_val')
    Variable.set(
        key='my_json_var', 
        value={'my_key': 23, 'another_key': 'another_val'}, 
        serialize_json=True,
    )

...

PythonOperator(
    task_id='create_variables',
    python_callable=create_vars,
)

Needless to say that this is a bad practice and should be avoided in production deployments. Variables -and especially those containing sensitive information- should not be declared in the DAG files given that the code is version controlled and also visible on the Code Tab of the UI.

6. Creating variables in a Secret Store/Backend ❤

In addition to variable retrieval from environment variables or the metastore database, you can also enable alternative secrets backend to retrieve Airflow variables.

Airflow has the capability of reading connections, variables and configuration from Secret Backends rather than from its own Database. While storing such information in Airflow’s database is possible, many of the enterprise customers already have some secret managers storing secrets, and Airflow can tap into those via providers that implement secrets backends for services Airflow integrates with.

Airflow Docs

Currently, the Apache Airflow Community provided implementations of secret backends include:

  • Amazon (Secrets Manager & Systems Manager Parameter Store)
  • Google (Cloud Secret Manager)
  • Microsoft (Azure Key Vault)
  • HashiCorp (Vault)

In fact, this is the best, most secure and portable way for defining Variables in Airflow.


Hiding Sensitive Variable Values

For some of the methods outlined in the previous section, I mentioned that sensitive information can actually be visible on the User Interface. In fact, it is actually possible to hide sensitive values, as far as your variables are properly named.

If a variable name contains certain keywords that can possibly indicate that the variable holds sensitive information, then its value will automatically be hidden.

Here’s a list of keywords that will make a Variable qualify for having sensitive information store as its value:

access_token
api_key
apikey
authorization
passphrase
passwd
password
private_key
secret
token
keyfile_dict
service_account

This means that if your variable name contains any of these keywords, Airflow will handle its value accordingly. Now let’s go ahead and try out an example to verify that this functionality works as expected.

First, let’s create a new variable without including any of the keywords mentioned above. As we can observe, the value of the variable is visible on the User Interface.

Airflow variable that does not qualify for holding sensitive information - Source: Author
Airflow variable that does not qualify for holding sensitive information – Source: Author

In the following screen, we attempt to create a new variable called my_api_key. According to what we discussed earlier, since the variable name contains api_key keyword, Airflow should handle its value in a way that it protects sensitive information.

Creating a variable using a name that indicates sensitive information - Source: Author
Creating a variable using a name that indicates sensitive information – Source: Author

In fact, if we now head back to the Variable list on the UI, we can now see that the value of the newly created variable is hidden.

Variables with names containing certain keywords will have their values hidden on the User Interface - Source: Author
Variables with names containing certain keywords will have their values hidden on the User Interface – Source: Author

If you are not happy with the existing list of keywords, you can in fact extend it by specifying additional keywords that can be taken into account when hiding Variable values. This can be configured through sensitive_var_conn_names in airflow.cfg (under [core] section), or by exporting AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES environment variable.

_sensitive_var_conn_names_

– New in version 2.1.0.

A comma-separated list of extra sensitive keywords to look for in variables names or connection’s extra JSON.

_Type: stringDefault: ''Environment Variable: AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES_

Airflow Docs


Efficient Variable Retrieval

By default, your Airflow DAGs are parsed every 30 seconds. The scheduler will scan your DAG folder and identify any changes made to the DAG files. If you are not fetching variables the right way, DAG parsing process may soon become a bottleneck.

Depending on the method you choose to declare variables, Airflow may have to initiate a connection to the metastore database for every variable declared in your DAG files.

Avoid overloading metastore with requests

In order to retrieve variables in your DAGs, there are essentially two approaches you can follow:

  1. Using Variable.get() function
  2. Using the var template variable

If you have chose to go with first option, Variable.get() will create a new connection to the metastore database in order to infer the value of the specified variable. Now the place where you call this function in your DAG file can have a huge performance impact.

Bad Practice

If the function gets called outside of a task, or within the DAG Context Manager, a new -useless- connection to the metastore will be created every time the DAG is parsed (i.e. every thirty seconds).

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator
my_var = Variable.get('my_var')

with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
    print_var = PythonOperator(
        task_id='print_var',
        python_callable=lambda: print(my_var),
    )

If the same pattern exists in many different DAGs, then sooner or later, the metastore database will run into troubles.

There are in fact some edge cases where this pattern cannot be avoided. For example, let’s assume you want to dynamically create tasks and based on the value of a variable. Well, in that case you may have to call the function outside of the tasks, or within the Context Manager. Make sure to avoid this approach when possible though.

It is also important to mention that the same problem will occur, even if you call Variable.get() function in the arguments of an operator.

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

def _print_var(my_var):
    print(my_var)
with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
    print_var = PythonOperator(
        task_id='print_var',
        python_callable=_print_var,
        op_args=[Variable.get('my_var')],
    )

In fact, this can easily be avoided with the use of the template engine.

Best Practices

Basically, instead of calling Variable.get(), we can actually use templated references. With this technique, the value of a variable will only fetched at runtime.

List of Airflow Variables - Source: Author
List of Airflow Variables – Source: Author

The code snippet below demonstrates how to use templated references for variables that either JSON or non-JSON values.

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

def _print_var(val1, val2, val3, val4):
    print(val1)
    print(val2)
    print(val3)
    print(val4)

with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
    print_var = PythonOperator(
        task_id='print_var',
        python_callable=_print_var,
        op_args=[
            '{{ var.value.my_var }}',
            '{{ var.json.my_vars.key1 }}',
            '{{ var.json.my_vars.key2 }}',
            '{{ var.json.my_vars.key3 }}',
        ],
    )

However, the template engine approach is applicable only to cases where the operator supports templated fields for the arguments we are looking to provide templated references.

In cases where templated references won’t work, you can still make sure that Variable.get() gets called within the task, so that no connections to the metastore database are initiated every time a DAG is parsed.

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

def _print_var():
    my_var = Variable.get('my_var')
    print(my_var)

with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
    print_var = PythonOperator(
        task_id='print_var',
        python_callable=_print_var,
    )

Storing multiple values in a single Variable

Now let’s assume that a particular DAG needs to retrieve three different values. Even if you follow the best practices outlined in previous section, you will still have to initiate three distinct connections to the metastore database.

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

def _print_vars():
    my_var = Variable.get('my_var')
    another_var = Variable.get('another_var')
    one_more_var = Variable.get('one_more_var')
    print(my_var)
    print(another_var)
    print(one_more_var)

with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
    print_var = PythonOperator(
        task_id='print_vars',
        python_callable=_print_vars,
    )

Instead, we could create a single JSON Variable consisting of three key-value pairs. Obviously, you should do this as far as it logically makes sense to squeeze the three values into one variable.

Creating a new JSON variable consisting of three key-value pairs - Source: Author
Creating a new JSON variable consisting of three key-value pairs – Source: Author

We can now retrieve the values from all keys specified in our variable with just one connection to the metastore database.

from datetime import datetime

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python import PythonOperator

def _print_vars():
    my_vars = Variable.get('my_vars', deserialize_json=True)
    print(my_vars['key1'])
    print(my_vars['key2'])
    print(my_vars['key3'])

with DAG('my_dag', start_date=datetime(2024, 1, 1)) as dag:
    print_var = PythonOperator(
        task_id='print_vars',
        python_callable=_print_vars,
    )

Final Thoughts..

Airflow Variable declaration is straightforward but their retrieval could turn into a nightmare for the metastore database, in case you don’t apply best practices.

In this tutorial, we demonstrated how one can create Airflow Variables using six different ways. Every approach comes with its pros and cons and should be used accordingly. Bear in mind that the best practice for production deployments is the use of a Backend Secret since it provides security and portability.

More importantly, we discussed which techniques should be avoided in order not to overload Airflow database as well as how to make optimal use of the Variables construct. I hope it is now clear to you that variables should either be inferred via templated references, or within the task function definition.

The post Mastering Airflow Variables appeared first on Towards Data Science.

]]>
Running Airflow DAG Only If Another DAG Is Successful https://towardsdatascience.com/run-airflow-dag-if-another-dag-succeeds-233aaa4118c1/ Tue, 19 Dec 2023 20:17:38 +0000 https://towardsdatascience.com/run-airflow-dag-if-another-dag-succeeds-233aaa4118c1/ Using Airflow sensors to control the execution of DAGs on a different schedule

The post Running Airflow DAG Only If Another DAG Is Successful appeared first on Towards Data Science.

]]>
Recently, I’ve been trying to coordinate two Airflow DAGs such that one would only run – on its own hourly schedule – if the other DAG (running on a daily basis) has been successful.

In today’s tutorial I will walk you through the use case and demonstrate how to achieve the desired behaviour in three different ways; two using the ExternalTaskSensor and another one using a customised approach with PythonOperator.


Use Case: Running the hourly DAG only if the daily DAG succeeded

Now let’s get started with our use case that involves two Airflow DAGs.

The first DAG, my_daily_dag, runs every day at 5AM UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 5 * * *',
    max_active_runs=1,
) as dag:
   DummyOperator(task_id='dummy_task')

The second DAG, my_hourly_dag, runs on an hourly basis, between 6AM and 8PM UTC.

from datetime import datetime, timedelta
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
   DummyOperator(task_id='dummy_task')

In our use case, we would like my_hourly_dag to run only if my_daily_dag has ran successfully within the current date. If not, then my_hourly_dag should be skipped. It is important to mention here that we don’t want to trigger my_hourly_dag as soon as my_daily_dag succeeds. That would be achievable with TriggerDagRun operator. Instead, we want both DAGs to run on their own schedule but add a condition on the my_hourly_dag.

How to Skip Tasks in Airflow DAGs

In the next two sections we will discuss and demonstrate how to achieve this using a few different approaches.


Determining the execution_date of both DAGs

Before jumping into implementation details, it is important to first understand how the two DAGs differ in terms of their respective execution_date. This is crucial since we will use this knowledge to determine the implementation of the desired behaviour.

Let’s assume that today is December 13th. The daily DAG my_daily_dag, has an execution_date of 2023–12–12 00:00 since it covers the data interval between 2023–12–12 and 2023–12–13. Recall that Airflow DAG runs start at the end of an interval.

Meanwhile, our hourly my_hourly_dag DAG has an execution_date of 2023–12–13 (except the midnight run that will have an execution_date of 2023–12–12 since the beginning of the interval is 2023–12–12 23:00 through 2023–12–13 00:00).


Using ExternalTaskSensor

Our first option is the built-in [ExternalTaskSensor](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html#airflow.sensors.external_task.ExternalTaskSensor) operator.

Waits for a different DAG, task group, or task to complete for a specific logical date.

By default, the ExternalTaskSensor will wait for the external task to succeed, at which point it will also succeed. However, by default it will not fail if the external task fails, but will continue to check the status until the sensor times out (thus giving you time to retry the external task without also having to clear the sensor).

We can use this sensor in our my_hourly_dag that will essentially check if my_daily_dag has been successful in the specified interval.

The ExternalTaskSensor accepts one of execution_delta or execution_date_fn. The former can be used to indicate the time difference with the previous execution to look at. By default, this is set to the logical date as the current task/DAG. The latter, receives a callable (i.e. a function) that accepts the current execution’s logical date as the first position argument and returns the desired logical date(s) to query.

– execution_delta ([datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta) | None) – time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

– execution_date_fn (Callable | None) – function that receives the current execution’s logical date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

Since the two DAGs are running on a different schedule, the default behaviour of the sensor won’t work for us . In the previous section, we clarified why the two DAGs will have different execution dates.

Therefore, we need to figure out how to use either the execution_delta or execution_date_fn to make both execution dates align with each other.

Using ExternalTaskSensor with execution_delta

The simplest approach in my opinion is to use execution_delta. The data interval start date of our daily DAG, is "yesterday at 5AM UTC". Since we know that my_hourly_dag runs on an hourly basis, we can come up with a formula to compute the delta between interval start datetime of hourly DAG and the interval start datetime of the daily DAG.

The following will create a delta that adds up:

  • 24 that corresponds to the difference of 24 hours the two DAGs have, given that they run on a different schedule, as explained earlier
  • the difference between the hour of the interval start datetime of the hourly dag and 5, which is the hour the daily DAG runs every day.
24 + (hourly_dag_interval_start_hour - 5)

As an example, consider the following scenarios when the hourly DAG starts running at 6AM (until 8PM):

At 6AM:

  • hourly data interval starts at 5AM (and ends at 6AM)
  • daily data interval starts at 5AM yesterday
  • execution_delta=24 + (5-5) = 24
  • The sensor will check the success of the daily DAG with data interval start date set to 24 hours before

At 7AM:

  • hourly data intevral starts at 6AM (and ends at 7AM)
  • daily data interval starts at 5AM yesterday
  • execution_delta=24 + (6-5) = 25
  • The sensor will check the success of the daily DAG with data interval start date set to 25 hours before

and so on.

Now how do we implement this? One problem we need to face is that (by the time this post was written), execution_delta is not a templated field meaning that we cannot use the templated variables that give us access to useful information, including the data_interval_start.

Therefore, we will have to manually construct the data_interval_start of the hourly DAG. Given that DAG runs every hour, the data interval start hour corresponds the current hour minus one

from datetime import datetime, timezone

datetime.now(timezone.utc).hour - 1

Therefore, the execution_delta that will be provided as an argument to the ExternalTaskSensor can now be defined as:

execution_delta=timedelta(hours=24 + datetime.now(timezone.utc).hour - 1 - 5)

Here’s the full code of our hourly DAG, that will run every hour between 6AM and 8PM UTC, only if the daily DAG has been successful today.

from datetime import datetime, timedelta, timezone
from pathlib import Path

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
    sensor_task = ExternalTaskSensor(
        task_id='daily_dag_completed_successfully',
        external_dag_id='my_daily_dag',
        soft_fail=True,
        check_existence=True,
        execution_delta=timedelta(hours=24 + datetime.now(timezone.utc).hour - 1 - 5),
        poke_interval=30,
        timeout=120,
    )

    dummy_task = DummyOperator(task_id='dummy_task')

    sensor_task >> dummy_task

Using ExternalTaskSensor with execution_date_fn

Now apart from execution_delta, the sensor can be configured to work with execution_date_fn that accepts a callable returning the logical date(s) to be queried.

In other words, we need to create a function and fetch the desired logical date of the daily DAG that needs to be against the conditions of the sensor, that by default will check whether the state of the DagRun at the specified interval was successful.

The function below, will fetch the DagRuns of the daily DAG and return the execution date of the DagRun only if it happened on the same day as the hourly DAG. If no DagRun is found (which means that the daily DAG was not executed in the past, AirflowSkipException will be raised such that the sensor task (and any downstream) is skipped. Likewise, if no DagRun for the daily DAG is found that happened on the same date as the hourly DAG, the current_logical_dt will be returned, which is essentially the default value that is checked by ExternalTaskSensor (and is the argument that must be present in the function definition that is provided when using execution_date_fn argument).

Recall that the two DAGs run on a different schedule which means their execution_date differs. In order to make a proper comparison and determine whether the daily DAG was executed successfully on the same day that the hourly DAG runs, we need to subtract one day from the hourly DAG’s execution date. Note that we are only interested whether the year, month and day between the two DAGs is the same (we don’t really care about the time information in this context).

import logging 

from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun

def get_most_recent_dag_run(current_logical_dt):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an 
    # exception to skip. 
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly's DAG current execution_date in order to 
    # align with the daily DAG's scedule
    current_logical_dt_yesterday = current_logical_dt.subtract(hours=24)

    # if year/month/day of daily's DAG execution_date and hourly's DAG execution_date
    # (minus one day) are the same, it means the daily DAG was executed today. 
    # We therefore return the execution_date of the latest daily DagRun. 
    # It's state (i.e. if successful) will be handled by the sensor and the configuration 
    # we provide to it. 
    if (
        current_logical_dt_yesterday.day == latest_daily_dag_run.execution_date.day
        and current_logical_dt_yesterday.month == latest_daily_dag_run.execution_date.month
        and current_logical_dt_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'DAG run was found for {dag_id} today.')
        return latest_daily_dag_run.execution_date

    # Alternatively, return the current execution_date of the hourly DAG
    # This is the default value the sensor would otherwise use, and essentially
    # it means that the sensor won't be triggered given that the intervals between 
    # the daily DAG and the sensor won't align. 
    return current_logical_dt

Here’s the full code for our hourly DAG using execution_function_fn with ExternalTaskSensor.

import logging
from datetime import datetime, timedelta
from pathlib import Path

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor

def get_most_recent_dag_run(current_logical_dt):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an 
    # exception to skip. 
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly DAG's current execution_date in order to 
    # align with the daily DAG's scedule
    current_logical_dt_yesterday = current_logical_dt.subtract(hours=24)

    # if year/month/day of daily DAG's execution_date and hourly DAG's execution_date
    # (minus one day) are the same, it means the daily DAG was executed today. 
    # We therefore return the execution_date of the latest daily DagRun. 
    # It's state (i.e. if successful) will be handled by the sensor and the configuration 
    # we provide to it. 
    if (
        current_logical_dt_yesterday.day == latest_daily_dag_run.execution_date.day
        and current_logical_dt_yesterday.month == latest_daily_dag_run.execution_date.month
        and current_logical_dt_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'DAG run was found for {dag_id} today.')
        return latest_daily_dag_run.execution_date

    # Alternatively, return the current execution_date of the hourly DAG
    # This is the default value the sensor would otherwise use, and essentially
    # it means that the sensor won't be triggered given that the intervals between 
    # the daily DAG and the sensor won't align. 
    return current_logical_dt

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
    sensor_task = ExternalTaskSensor(
        task_id='daily_dag_completed_successfully',
        external_dag_id='my_daily_dag',
        soft_fail=True,
        check_existence=True,
        execution_function_fn=get_most_recent_dag_run,
        poke_interval=30,
        timeout=120,
    )

    dummy_task = DummyOperator(task_id='dummy_task')

    sensor_task >> dummy_task

Using PythonOperator

The second approach involves a more customised solution. More specifically, we can programmatically find the latest successful DagRun of our daily DAG and handle the behaviour of the operator accordingly. In other words, if the latest successful DagRun of the daily DAG does not align with the execution date of our hourly DAG, the task will be skipped (as well as the downstream tasks).

Therefore, we can write a function – similar to the one we have written in the previous section and was used as an argument to the execution_date_fn argument for ExternalTaskSensor.

More specifically, we need to fetch the DagRuns of the daily DAG, determine if anyone has completed successfully today (i.e. on the same day the hourly DAG runs). If none is found, we raise a AirflowSkipException such that the execution of the hourly DAG is skipped. In this case, the PythonOperator supports templated variables and we will therefore take advantage of it.

This is what our function looks like:

from airflow.exceptions import AirflowSkipException
from airflow.models import DagRun
from airflow.utils.state import DagRunState

def check_daily_dag_success_today(**kwargs):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an
    # exception to skip.
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly DAG's current execution_date in order to
    # align with the daily DAG's schedule
    data_interval_start = kwargs['data_interval_start']
    data_interval_start_yesterday = data_interval_start.subtract(hours=24)

    # Check the intervals and the success of the daily DAg's DagRun. If conditions are not met,
    # DAG run should be skipped.
    if not (
        latest_daily_dag_run.state == DagRunState.SUCCESS
        and data_interval_start_yesterday.day == latest_daily_dag_run.execution_date.day
        and data_interval_start_yesterday.month == latest_daily_dag_run.execution_date.month
        and data_interval_start_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'No successful DAG run was found for {dag_id} today. Skipping..')
        raise AirflowSkipException

    logging.info(f'Successful DAG run was found for {dag_id} today.')

Here’s the complete code for the my_hourly_dag DAG, using a PythonOperator to check the status of my_daily_dag:

from datetime import datetime, timedelta
from pathlib import Path

from airflow.exceptions import AirflowSkipException
from airflow.models import DAG, DagRun
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

def check_daily_dag_success_today(**kwargs):
    dag_id = 'my_daily_dag'
    # Get the historical DagRuns of the daily DAG
    dag_runs = DagRun.find(dag_id=dag_id)

    # Sort DagRuns on descending order such that the first element
    # in the list, corresponds to the latest DagRun of the daily DAG
    dag_runs.sort(key=lambda x: x.execution_date, reverse=True)

    # If the daily DAG was not executed ever before, simply raise an
    # exception to skip.
    if not dag_runs:
        logging.info(f'No DAG runs found for {dag_id}. Skipping..')
        raise AirflowSkipException

    # Get the latest DagRun of the daily DAG
    latest_daily_dag_run = dag_runs[0]

    # Subtract one day from hourly DAG's current execution_date in order to
    # align with the daily DAG's schedule
    data_interval_start = kwargs['data_interval_start']
    data_interval_start_yesterday = data_interval_start.subtract(hours=24)

    # Check the intervals and the success of the daily DAg's DagRun. If conditions are not met,
    # DAG run should be skipped.
    if not (
        latest_daily_dag_run.state == DagRunState.SUCCESS
        and data_interval_start_yesterday.day == latest_daily_dag_run.execution_date.day
        and data_interval_start_yesterday.month == latest_daily_dag_run.execution_date.month
        and data_interval_start_yesterday.year == latest_daily_dag_run.execution_date.year
    ):
        logging.info(f'No successful DAG run was found for {dag_id} today. Skipping..')
        raise AirflowSkipException

    logging.info(f'Successful DAG run was found for {dag_id} today.')

with DAG(
    catchup=False,
    dag_id='my_daily_dag'
    start_date=datetime(2023, 7, 26),
    default_args={
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    },
    schedule_interval='0 6-20 * * *',  # At :00 every hour between 6AM-8PM
    max_active_runs=1,
) as dag:
   check_task = PythonOperator(
       task_id='check_daily_dag', 
       python_callable=check_daily_dag_success_today,
   )
   dummy_task = DummyOperator(task_id='dummy_task')

   check_task >> dummy_task

Final Thoughts..

In today’s tutorial we discussed how to handle dependencies between different DAGs when using Airflow. More specifically, we discussed how to run a DAG that is supposed to execute on an hourly basis, only if a different DAG, on a daily schedule, executes successfully within the day.

Three different approaches were demonstrated. Depending on the complexity of your use-case, you should pick the one that makes more sense and results in more elegant code.


Subscribe to Data Pipeline, a newsletter dedicated to Data Engineering

The post Running Airflow DAG Only If Another DAG Is Successful appeared first on Towards Data Science.

]]>
Transitioning from ETL to ELT https://towardsdatascience.com/from-etl-to-elt-908ce414e39e/ Wed, 06 Dec 2023 05:24:50 +0000 https://towardsdatascience.com/from-etl-to-elt-908ce414e39e/ ETL (Extract-Transform-Load) and ELT (Extract-Load-Transform) are two terms commonly used in the realm of Data Engineering and more specifically in the context of data ingestion and transformation. While these terms are often used interchangeably, they refer to slightly different concepts and have different implications for the design of a data pipeline. In this post, we […]

The post Transitioning from ETL to ELT appeared first on Towards Data Science.

]]>

ETL (Extract-Transform-Load) and ELT (Extract-Load-Transform) are two terms commonly used in the realm of Data Engineering and more specifically in the context of data ingestion and transformation.

While these terms are often used interchangeably, they refer to slightly different concepts and have different implications for the design of a data pipeline.

In this post, we will clarify the definitions of ETL and ELT processes, outline the differences between the two, and discuss the advantages and disadvantages both have to offer to engineers and data teams in general.

And most importantly, I am going to describe how the recent changes in modern data teams’ formation has impacted the landscape around ETL vs ELT battle.


Understanding Extract, Load and Transform independently

The main stake when it comes to comparing ETL and ELT is obviously the sequence the Extract, Load and Transform steps are executed within a data pipeline.

For now, let’s ignore this execution sequence and let’s focus on the actual terminology and discuss about what each individual step is supposed to do.

Extract: This step refers to the process of pulling data from a persistent source. This data source could be a database, an API endpoint, a file or really anything that contains any form of data, including both structured or unstructured.

Transform: In this step, the pipeline is expected to perform some changes in the structure or format of the data in order to achieve a certain goal. A transformation could be an attribute selection, a modification of records (e.g. transform 'United Kingdom' into 'UK'), a data validation, a join to another source or really anything that changes the format of the input raw data.

Load: The load step refers to the process of copying the data (either the raw or the transformed version) into the target system. Usually, the target system is a Data Warehouse (i.e. an OLAP system used for analytics purposes) or an application database (i.e. an OLTP system).

Unavoidably, the sequence we execute these three steps matters. And with the increasing volumes of data that need to be handled, the order of execution matters a lot. Let’s discuss why!


Extract Transform Load (ETL)

ETL stands for Extract-Transform-Load and the term itself refers to a process where the data extraction step is followed by the transformation step and ends with the load step.

Transform > Load – Source: Author” />

The data transformation step in ETL processes occurs in a staging environment outside of the target system, where the data is transformed just before it gets loaded to the destination.


Extract Load Transform (ELT)

On the other hand, ELT, that stands for Extract-Load-Transform, refers to a process where the extraction step is followed by the load step and the final data transformation step happens at the very end.

Load > Transform – Source: Author” />

In contrast to ETL, in ELT no staging environment/server is required since data transformation is performed within the destination system, which is usually a Data Warehouse or Data Lake hosted on the Cloud.


How to choose between ETL and ELT

ETL and ELT come with their pros and cons, and the chances are you will come across both in your day-to-day work given that they are typically used for different use-cases.

ETL is best suited in use cases where data resides on-premises and needs to be structured before loading them into the target database or warehouse. Therefore, the ETL process is usually preferred when smaller amounts of data are involved in the process and/or when complex transformations are about to be performed.

Furthermore, since ETL transforms the data before the load step, sensitive data can be masked, encrypted or completely removed before it’s loaded. This aspect of ETL can help companies and teams enforce and implement compliance to various regulations (e.g. GDPR) more easily.

Since the transformation happens in an intermediate (staging) server, there’s an overhead for moving the transformed data into the target system. In addition, the target system won’t contain the raw data (i.e. the data in the form prior to the transformation). This means that whenever additional transformations are required, we would have to pull the raw data once again.

On the other hand, ELT provides more flexibility over ETL given that the latter was historically intended for structured (relational) data. Modern cloud architectures and warehouses enabled ELT for both structured and unstructured data.

As mentioned earlier, ETL should be used for small volumes of data. ELT offers faster transformation since it is not dependent on the data size and is usually performed on an as-needed basis.

In addition, when the data is loaded before the transformation as part of an ELT process, it means that users and systems still have access to the raw data. This means that if more transformations are required at a later stage, we already have the raw data within the Data Warehouse that can be accessed any time. The only downside would be the additional storage required to store this raw data, but given the constantly decreasing storage cost I don’t think this is a major issue, anyway.

Now that we all have a good understanding regarding the technical implications of both ETL and ELT processes, let me question something. When it comes to choosing one over the other, is it just a matter of technical implementation?

It’s not just about when to perform a Transformation

In addition, the data realm has been constantly evolving and moving forward and the data roles are no exception to this fast-paced environment. ETL vs ELT is not just about where the transformation step occurs – it’s (also) about who is supposed to perform them.

A transformation step, usually involves some sort of business logic. Traditional ETL processes were used to be executed by Data Warehouse Engineers (not sure if this is still a thing to be honest) which means these people were in charge of crafting business logic, too.

On the other hand, ELT processes evolved due to the nature and landscape of modern data teams and their formation. The EL (Extract-Load) steps are usually performed by Data Engineers whereas the Transformation step is executed by the – so called – Analytics Engineers.

And it makes so much sense to me. A Data Engineer is a purely technical guy who is concerned about efficiency, scalability, readiness and availability (among another million of things). On the other hand, an Analytics Engineer is still a technical person with much better business understanding. And therefore it makes more sense for the Analytics Engineer to be responsible for transforming the data, given that (usually) transformation corresponds to business value.

Modern cloud architectures, data stacks (including cloud-based OLAP systems) and team formations have made ELT processes more relevant and effective. From my personal experience, I would say there’s an undergoing shift from ETL to ELT despite the fact that ETL is still relevant and useful.


Modern data stacks and teams favour ELT processes

While ETL is not dead, in my opinion, modern data stacks and technologies favour ELT processes. As an example, let’s consider dbt (data build tool), which is one of the hottest additions in the data realm and it has become the de-facto transformational tool for analysts and engineers.

Usually, we would want to bring into the Data Warehouse the raw data (i.e. without applying any sort of transformation) from external or internal data sources. Then on top of these data models (in dbt we usually call them staging models) we build additional models (intermediate and mart) which are the result of some transformational processes that occur within the Data Warehouse.

In such workflows, it therefore makes more sense to load the data into the Warehouse before transforming it. This also enables the access to the raw data at any given time so that future use-cases can be supported.

If you are interested in gaining a deeper understanding around how dbt works and how the different components come together in order to transform raw data and build meaningful data models to support decision-making, I would recommend the following article.

Final Thoughts

Designing data pipelines is a challenging task and when doing so, many factors need to be carefully taken into account. When it comes ingesting data from data sources into a Data Warehouse, there are typically two approaches you can take.

In this article we discussed how ETL and ELT perform a sequence of steps in order to transform and load (or load and transform) data into a destination system.

Depending on the landscape of the organisation and the specific use-case you may want to choose one over the other. I hope this tutorial has provided all the details you need in order to choose the best and most effective approach when it comes to ingesting and transforming data

The post Transitioning from ETL to ELT appeared first on Towards Data Science.

]]>
An Introduction To Analytics Engineering https://towardsdatascience.com/analytics-engineering-8b0ed0883379/ Sun, 22 Oct 2023 16:14:23 +0000 https://towardsdatascience.com/analytics-engineering-8b0ed0883379/ Who is an Analytics Engineer and what are they supposed to do

The post An Introduction To Analytics Engineering appeared first on Towards Data Science.

]]>
Image generated via DALL-E2
Image generated via DALL-E2

Traditionally, data teams were formed by Data Engineers and Data Analysts.

The Data Engineers are responsible for building up the infrastructure to support data operations. These would include the configuration of databases and the implementation of ETL processes that are used to ingest data from external sources into a destination system (perhaps another database). Furthermore, Data Engineers are typically in charge of ensuring data integrity, freshness and security so that Analysts can then query the data. A typical skillset for a Data Engineer includes Python (or Java), SQL, orchestration (using tools such as Apache Airflow) and data modeling.

On the other hand, Data Analysts are supposed to build dashboards and reports using Excel or SQL in order to provide business insights to internal users and departments.

Traditional formation of Data Teams
Traditional formation of Data Teams

Transitioning From ETL to ELT

In order to process data and gain valuable insights we first need to extract it, right? 🤯

Data Ingestion is performed using ETL (and more recently with ELT) processes. Both ETL and ELT paradigms involve three main steps; Extract, Transform and Load. For now, let’s ignore the sequence of executing these steps and let’s focus on what does each step do independently.

Extract

This step refers to the process of pulling data from a persistent source. This data source could be a database, an API endpoint a file or a message queue.

Extract step pulls data from various sources - Source: Author
Extract step pulls data from various sources – Source: Author

Transform

In Transform step, the pipeline is expected to perform some changes in the structure and/or format of the data in order to achieve a certain goal. A transformation could be a modification (e.g. mapping "United States" to "US"), an attribute selection, a numerical calculation or a join.

The transformation steps performs a number of transformation into the input raw data - Source: Author
The transformation steps performs a number of transformation into the input raw data – Source: Author

Load

This step refers to the process of moving data (either raw, or transformed) into a destination system. The target is usually a OLTP system, such as a database or an OLAP system, such as a Data Warehouse.

Loading data into a destination system - Source: Author
Loading data into a destination system – Source: Author

ETL: Extract → Transform → Load

ETL refers to the process where the data extraction step is followed by the transformation step and ends with the load step.

A visual representation of an ETL process - Source: Author
A visual representation of an ETL process – Source: Author

The data transformation step in ETL processes occurs in a staging environment outside of the target system, where the data is transformed just before it gets loaded to the destination.

ETL has been around for a while but its application has slowly started fading out.

  1. Since the transformation happens in an intermediate (staging) server, there’s an overhead for moving the transformed data into the target system
  2. The target system won’t contain the raw data (i.e. the data in the format prior to the transformation). This means that whenever additional transformations are required, we would have to pull the raw data once again.

The emergence of Cloud technologies have shifted the process of ingesting and transforming data. Data Warehouses hosted on the cloud have made it possible to store huge volumes of data at a very low cost. Therefore, is there really need to apply transformations "on the fly" while discarding raw data every time a transformation is performed?

ELT: Extract → Load → Transform

ELT refers to a process where the extraction step is followed by the load step and the final data transformation step happens at the very end.

A visual representation of an ELT process - Source: Author
A visual representation of an ELT process – Source: Author

In contrast to ETL, in ELT no staging environment/server is required since data transformation is performed within the destination system, which is usually a Data Warehouse or Data Lake hosted on the Cloud.

In addition, the raw data exists on the destination system and thus available for further transformations at any time.


Analytics Engineering

As a reminder, in older data team formations, engineers were in charge of maintaining the ETL layer while analysts where responsible for the creation of dashboards and reporting. But the question now is where do Analytics Engineers fit into the picture?

In older data team formations, Data Engineers were responsible for ETL and Data Analysts for reporting - Source: Author
In older data team formations, Data Engineers were responsible for ETL and Data Analysts for reporting – Source: Author

Analytics Engineers are essentially the link between Data Engineers and Analysts. Their responsibility is to take the raw data and apply transformations so that Data Analysts can then collect the transformed data and prepare Dashboards and Reports on the Business Intelligence layer so that internal users can then make data-informed decisions. Now the Data Engineers can focus more on the ingestion level and the wider data infrastructure of the data platform.

In ELT pipelines, Data Engineers are responsible for Extraction and Load of data in a Data Warehouse, Analytics Engineers for the data transformation layer and Analysts for the creation of business dashboards - Source: Author
In ELT pipelines, Data Engineers are responsible for Extraction and Load of data in a Data Warehouse, Analytics Engineers for the data transformation layer and Analysts for the creation of business dashboards – Source: Author

dbt: The ultimate tool for Analytics Engineering

Analytics Engineers are people that can help data teams scale and move faster. But to do so, they also need to take advantage of tools that can help them get the job done. And the ultimate Analytics Engineering tool is data build tool (dbt).

dbt is a tool used to build and manage data models in a scalable and cost effective fashion. Instead of taking the time to figure out all inter-dependencies between models in order to decide in what sequence models must be executed, dbt does all the dirty work for you. Furthermore, it provides functionality to support data quality tests, freshness tests and documentation among others.

In order to better understand what dbt does, it’s important to visualise the wider context and see where it fits within the modern data stack. dbt is actually sitting on the T layer within an ELT pipeline and transformations are performed within the Data Warehouse where the raw data resides.

Using dbt to perform transformations over raw data within the Data Warehouse - Source: Author
Using dbt to perform transformations over raw data within the Data Warehouse – Source: Author

dbt is a CLI (Command Line Interface) tool that enables Analytics Engineering teams deploy and manage data models following software engineering best practices. Some of these practices include support for multiple environments (development and production), version controlling and CI/CD (Continuous Integration and Continuous Development). Data models can be written in SQL (jinja templated) but more recent versions of the tool also support model definitions with Python!


Final Thoughts..

Analytics Engineering is an emerging field in the intersection of Data Engineering and Data Analytics that aims to speed up the development of analytics products, improve data quality and bring more data trust. The main tool that facilitates the lifecycle of data products is dbt that has drastically changed the way data teams work and collaborate together. It is therefore important to familiarise yourself with it since it’s here to stay for the long run.

In upcoming articles we are going to focus more on dbt and how you can use it to build and manage your data models effectively. So make sure to subscribe in order to be notified when the articles are out!

The post An Introduction To Analytics Engineering appeared first on Towards Data Science.

]]>