Data Pipeline Orchestration using Argo Workflows

Chris Schnabl
willhaben Tech Blog
6 min readJul 9, 2021

--

Photo by Mike Benna on Unsplash

Every day, we need to load fresh data from our Data Lake into our Data Warehouse. Different data sets exist within the Data Lake. To guarantee fresh and correct Dashboards, we need a mechanism to load, transform and quality check the data. In this blog post, we are tackling the load and transform section of the work. Quality checks are accomplished by both Tests and Metrics and will be the content of another post.

Here’s a quick overview of our current setup:

  • s3 as a Data Lake
  • Snowflake is used as our Cloud Data Warehouse
  • dbt to model our transformations following the Data Vault 2.0 approach in SQL
  • Argo Workflows to orchestrate all transformations throughout the whole ELT pipeline
  • Tableau for Data Visualization (Dashboards, …)

But what do we want to achieve? What is pipeline orchestration and why did we need it?

So, we have a bunch of different data sets inside the Data Lake. We then want to load all the new entries from those data sets, apply transformations (e.g., dictionaries to tables, dates and timestamps into one uniform date format, meta data into flags that indicate the deletion of a row and so on) and merge them into our existing tables inside the DWH.

All data sets reside inside our Data Lake (parquet files in s3) and hence function as our single input source. During this whole process, multiple atomic tasks have to be executed after each other. Most of these could actually be executed in parallel, with certain tasks depending on multiple previous tasks to be executed beforehand.

In our very first setup, all this dependent execution was accomplished by having multiple cron jobs with enough time padded in so they wouldn’t clash with each other, but more about that later. This calls for advanced pipeline orchestration!

Why Argo

At the beginning of our modernisation journey, we did not use any orchestration tool at all. Some tasks were configured inside Snowflake, while others used k8s pod with their logs published to Kibana. We didn’t have great visibility into each step either.

For dependent tasks to be executed after each other, we had to allocate enough time between steps when configuring the cron jobs. Picture two tasks: task A and task B, with task A typically running for one hour and task B depending on A. We would then naively schedule A at, say 2:00 am, followed by triggering Task B at 4:00 am.

We also had no way of notifying tasks about each other’s individual status. The success or failure of the whole pipeline itself had to be queried in Kibana.
This worked well enough for what was then a way smaller project. Back then, we already knew we had to actually “orchestrate” all pipeline steps/tasks using some sort of orchestration tool to not shoot ourselves in the foot in the long run.

While researching industry best practices, one ultimately comes around the two biggest Frameworks used for Pipeline Automation: Luigi and Airflow. I am not going delve into detail about those two, but rather would like to focus on why Argo, which had been virtually unknown over a year ago, made the most sense for us.

Argo consists of multiple sub-projects built around kubernetes (k8s). When referring to Argo in this article, Argo Workflows is meant unless stated otherwise.

This is not a tutorial about Argo, but the folks have a wonderful one as well as a collection of useful boilerplate examples over at their website.

So why Argo then?

In a nutshell, Argo Workflows allows you to define tasks as parameterised steps that can depend on each other, every step serving as its own container running inside a k8s pod.

  • We already had a k8s cluster up and running and ultimately did NOT want to manage yet another infrastructure.
  • Argo being able to run everything that can also run inside a Docker container also meant that we were able to install the Snowflake CLI in a Docker image with minimal effort.
  • We did not want to be confined to any single programming language only.
  • One of our parent companies already made heavy use of Argo (the whole project) for lots of their workflows, which meant we have convenient access to plenty of documentation, knowledge and most of all, infrastructure.

Architecture

As mentioned above, we wanted to orchestrate a few tasks with dependencies among them.

Pipeline Overview

  1. Send start message: We are notified of the successful pipeline start via Slack.
  2. Build all Stages:
    2.1. Copy new data from AWS S3 into Snowflake
    2.2. Execute dbt run on all stages
    3.3. Execute dbt test on all stages (if enabled)
    3.4. Clean Sources
  3. Build Core. Build all schemas parallel:
    3.1. Execute dbt run to build the schema
    3.2. Execute dbt test on this schema (if enabled)
  4. Cleanup depends only on Step 3; hence, it is executed concurrently with Step 4.
    4.1. Cleanup scripts delete already processed data from the stage to keep the stages empty before the next copy into will fill it with fresh data again.
  5. Build Marts and refresh Tableau. For each configured mart (in a list):
    5.1. Execute dbt run to build all marts
    5.2. Execute dbt test (if enabled)
    5.3. Update all Tableau Dashboards by refreshing the data sources depending on the mart.
  6. Exit Handler is called at the end of the pipeline
    6.1. Write Audit Log to be able to measure our table availability
    6.2. Send Success/Error message to Slack Hook

Artifacts

Artifacts are nothing more than pre-built Dockerfiles (also called images) that we push to the Artifactory during the Deployment of our pipeline to then be able to use them during pipeline runs. I will list the most important images below:

  • Snowflake client: includes the official snowsql CLI running on Ubuntu
  • Dbt client: runs dbt in a Python environment for the given dbt parameters
  • Send Slack Messages: sends a provided message to the provided Slack hook
  • Update Tableau: calls Tableaus refresh data source endpoint for the provided data source

Argo Building Blocks

I am now going to provide an overview of how we use certain Argo Workflow features and why we think they are useful:

  • Cron Workflows: This triggers Workflows based on a schedule, as the name suggests.
  • Workflow Templates: Most of our Argo Workflow files are organised as templates due to abstraction and reusability. We use them to both run individually and to reuse in other Workflows.
  • Workflows: We run our Workflows by either instantiating WorkflowTemplates (for ad hoc Workflows) or manually submitting CronWorkflows.
  • Exit Handlers: An Exit Handler is used to send a success/failure message to our Slack alerting channel.
  • Loops and Conditionals: We heavily utilise both loops and conditionals to loop through a list of stages/marts or skip certain steps.
  • Output parameters: To pass data between steps, e.g., the start date for the copy into steps.

Deployment

Until now, we already had a decent understanding of all the moving parts, the different steps and their responsibilities, and how they are all glued together in a bunch of yaml files.
You have already heard me buzz throwing kubernetes multiple times. You also might wonder how and where all this pile of yaml is deployed. You may have also thought about how all these are monitored, how we deal with logs and how we execute tests.
While I am not going into too much detail for each of these, they are remarkably interesting topics on their own and we are also going to cover them in future articles. So, stay tuned.

We have a bunch of yaml, docker and sql files plus infrastructure to execute them on.

In a nutshell, when we make changes, we utilise the gitops pattern in a way that all changes when the configurations are deployed to Argo Workflows through Argo CI/CD. There are additional Build Pipelines for all the artifacts.
The current version of the pipeline is then triggered either manually, or by a cron schedule.

Outlook

We are actively improving parts of our pipeline, which includes building better tooling to execute certain parts of the pipeline independently or automatically triggering full reload, resulting in all data in the Data Lake being copied to Snowflake and all Models being rebuilt using that data, as well as automatic verifications.

Reusable Workflow Templates allow us to build more and more operational tooling.

Our pipeline initially ran for three to four hours on a Snowflake S Warehouse. After configuring dbt to run even more parallel and using a bigger, more powerful Warehouse, the run time is now down to just over one hour, costing the exact same amount of Snowflake credits.
Orchestrating the pipeline in addition to working monitoring, as well as availability metrics helped us improve both the overall stability of loading data and the freshness of data in the Dashboards.

As mentioned previously, we will release more information about various aspects of our Data setup soon. You can also check out our first article on how we modernised our Data Analytics Infrastructure.

--

--