Motivation

At The Lifetime Value Company, we ingest multiple sources of public record data to give our users accurate results for the searches they perform. Data processing is essential for our business, and we are constantly looking for ways to improve it.

Here we’ll share our experience trying some new technology for automating our data processes.

Our current official tool for automating processes is an Apache project: Apache NiFi. But we were looking for a replacement because it isn’t managed, takes time from our engineers every time it fails, and is hard to track changes made to the flows. We have a process that is run manually via shell commands and needed to be automated. So we decided to give it a try on Apache Airflow.

Problem Chosen: Decompress Images

We extracted a slice from a larger process to be automated using Apache Airflow for the first time. The task that we wanted to automate was to read multiple zip-compressed files from a cloud location and write them uncompressed to another cloud location.

Anticipated Hurdles:

  • Some files need to be renamed after the extraction.
  • The full list of files is unknown: we have received files whose names don’t correspond to a valid name, but they have valid images in them.
  • Zip compression method requires reading the full file before being able to decompress. Thus, there needs to be some place (memory or disk) to temporarily store data.

Airflow

"Airflow is a platform to programmatically author, schedule and monitor workflows."

Some Definitions

To start understanding how Airflow works, let’s check out some basic concepts:

  • DAG (Directed Acyclic Graph): a workflow which glues all the tasks with inter-dependencies.
  • Operator: a template for a specific type of work to be executed. For example, BashOperator represents how to execute a bash script while PythonOperator represents how to execute a python function, etc.
  • Sensor: a type of special operator that will only execute if a certain condition is met.
  • Task: a parameterized instance of an operator/sensor that represents a unit of actual work to be executed.

Why Airflow?

Mainly chosen because:

  • DAGs are written as code, so they can be submitted to source control.
  • Easy to scale.
  • Lots of operators, native and community contributed.
  • Extensible: It is easy to write custom operators.
  • Rich UI.
  • Open Source, so we avoid lock-in.
  • Good comments by people already using it.

But also for the following reasons:

  • Google Cloud offers a managed version of it: Google Cloud Composer.

Google Cloud Composer

"Cloud Composer is a fully managed workflow orchestration service that empowers you to author, schedule, and monitor pipelines that span across clouds and on-premises data centers. Built on the popular Apache Airflow open source project and operated using the Python programming language, Cloud Composer is free from lock-in and easy to use."

We looked for a managed service and were already using GCP in other projects, so Cloud Composer was the perfect fit.

Installing Airflow in GCC is much simpler than installing it natively or using it via Docker, which gave us permissions and volume headaches.

Create the Composer Environment

We followed Google’s guide. The only required step is to choose a service account that has the composer.worker role. If not, it will take approximately 45 minutes to tell you that it failed to create the environment because of this simple requirement.

What to Run How and Where

In the presence of such an extensible platform, there is more than one way to skin a cat. In search of the least amount of effort we initially came up with this list for defining how to accomplish each task:

  1. Does a native operator exist?
  2. Does a community operator exist?
  3. Use a custom operator:
    1. Bash
    2. Python
    3. DB
    4. Containerized (whichever language)

Decompress: First Attempt

As we went through the above list, we found the DataFlow operator. Cloud Dataflow is Google’s managed Apache Beam, so while investigating, we discovered that it had transform for zip decompressing. After discovering this, we decided to go down that path. We installed Beam in the cloud instance and did the quick start.

We initially tried the native Dataflow operator’s zip transform for decompressing our file. However, it did not work because it wasn’t able to decompress all files from a multiple file archive, and only decompressed the first in the archive.

After reading the blog post We’re All Using Airflow Wrong and How to Fix It, we agreed that it was the best approach for using Airflow and had these advantages:

  • Runs on Kubernetes which we are familiar with.
  • Separate environments: A pool of nodes run Airflow and a separate pool runs the tasks themselves.
  • Clear division between Airflow (orchestrator) logic and implementation logic.
  • Any programming language. In our case the preferred one is Go because of:
    • Expertise.
    • Efficiency.
    • Lightweight containers.
  • Uniform way of managing secrets.
  • Avoids lock-in, not only from Google Cloud but from Airflow: task implementations can be ported to any new orchestrator that can launch Docker containers.
  • Task implementation benefits:
    • Easily developed and tested in isolation.
    • Better control of what they do and how they work.
    • In-house knowledge of implementation.

But we disagree with the article’s tl;dr: “only use Kubernetes Operators.” We still think that very simple tasks should be accomplished using some of the provided operators to avoid the extra effort.

Writing a Go Tool

We could have written a single main function and moved forward, but wanted to establish some good practices to follow for writing these in the future:

  • It is a CLI tool but the main logic sits in a core lib, so that it is open for writing some other interface to interact with it, like an HTTP server or a Lambda function.
  • Try to import the minimum external libraries.
  • Use Go Modules for the first time (our previous Go projects are using Glide).
  • Core public methods receive and return either native or stdlib types.
  • Wrap external libraries around abstractions so that their public methods can also just receive and return native or stdlib types
  • Wrapped external libraries can also be replaced with other implementations (Google Cloud Storage, AWS S3, etc).

Dockerizing the Go Tool

For dockerizing Go apps it is common to use the Golang image, and it works perfectly, however, since Go compiles to binary the final image does not need to have Go installed in it.

We actually want our docker images to be as lightweight as possible, since we want tools to do exactly one thing and do it right.

For creating our docker image we decided to use a Multi-Stage technique, which we found as a very simple and accurate way to fit our requirements. By reading this article Multi-stage Docker Builds in Golang with Go Modules we realized that this was what we needed.

In the creation of the Dockerfile, there are two defined stages, which allow us to accomplish the goal:

  1. First Stage: Instead of using the large Golang image, there is lightweight Alpine Docker image for Go which fills up all the requirements to build the app. Since Go is a compiled language that generates binaries, it just needs an operating system to be run, no runtime or libraries are required.
  2. Second Stage: Now that it is built, all we have to do is to copy the contents of our working directory from the first stage and define an entry point to call our binary to be executed. For this task, we chose an Alpine image because it is an extremely small and optimized distribution of Linux, thus we have to make sure that there are no other external libraries or files needed or we would need to add them first.

Our docker image will be reduced down as we expected and now we have not only a small but clean image.

Using Kubernetes Pod Operator from Google Cloud Composer

Add a Kubernetes Pool

Google Cloud Composer creates a Kubernetes cluster with 1 pool of 3 nodes. When you run tasks using the KubernetesPodOperator it is recommended to run them in a separate pool.

In our case, we knew that the largest file consumes 6GB of memory when decompressing, so we needed to make sure the pool had at least one node containing more than that amount of memory available.

Upload the Image

To upload our dockerized tool, we need to add the image and push it to the project’s Container Registry. There is a simple quickstart to do so.

Upload the Service Account Key

It can only be done directly in Kubernetes using the CLI;

# Find cluster name
$ gcloud composer environments describe <environment_name> --location <location>

# Get credential to connect to cluster
$ gcloud container clusters get-credentials <cluster_name>
# Upload key into cluster
$ kubectl create secret generic gc-storage-rw-key --from-file=key.json=<path_to_key>

Using the Service Account Key

In the KubernetesPodOperator instantiation:

# First define a secret from a file
secret_file = secret.Secret(
    deploy_type='volume',
    deploy_target='/tmp/secrets/google',
    secret='gc-storage-rw-key',
    key='key.json')

# Then use it when creating the task
unzip = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='unzip',
    secrets=[secret_file],
    ...

Running the First DAG

Decompressing a Small Test File

  • Having made ready the setup for using the Kubernetes Pod Operator, it just takes a task definition to run our dockerized Go tool using its basic configuration.
  • One important thing to consider is to properly define our secrets for any key file we need to pass to the pod. In our case, Google credentials is a must.
  • Since it is a single file, no dependency setup it required.

Decompressing a Real-life 3GB+ File

We know that this job consumes much more memory than the first sample. To avoid OOMKilled and Evicted errors, we created a pool with a 15GB node to account for the overhead, specified request memory of 7GB, limit memory of 10GB and configured the operator to use this pool.

Multiple Files and a Case for Dynamic Tasks

Although dynamic tasks are indeed a feature, there needs to be some caution about how they are used as they are evaluated on a different schedule than the DAG run itself.

As a concrete example, we wanted our DAG to search for all available zip archives and be built based on this criteria. We could have written a routine that pulls out the list of files and then build the DAG looping through the list (one task per each), but that would have put a big burden on the scheduler that evaluates the DAGs, as explained here.

Possible Solutions

A) Have each task internally work with a list.

B) Write more complex DAGs or a composition of multiple DAGs. As shown in this set of two articles:

Our Quick Solution

To move on, we decided to hard code the list of possible files. We acknowledge it isn’t the cleanest approach, but we just wanted to finish experimenting with DAGs that are dynamically created.

Our Pattern for Multi-Threading

So we had a solution for processing N amount of files without having to write a task for each. Then, we wanted to figure out how to process them in parallel without exhausting our node capability.

Two Threads

What Airflow provides is a language for defining dependencies: one task doesn’t run until another one succeeds, so we figured out that one way of decompressing every image file was to divide them into two groups and write two sets of dependencies:

Thread 1: File A --> File B --> File C
Thread 2: File X --> File Y --> File Z

Code looked like this:

files = ["A", "B", "C", ... , "Z"]

even_previous = None
odd_previous = None

for i in range(len(files)):
    unzip = # ... and the code that creates the task dynamically

    if i%2 == 0:

        if even_previous != None:
            even_previous >> unzip
        even_previous = files[i]
    else:
        if odd_previous != None:
            odd_previous >> unzip
        odd_previous = files[i]

Multiple Threads

Once knowing that it worked, we expanded the same idea to N threads:

files = ["A", "B", "C", ... , "Z"]

threads = 10
previous_tasks = [None]*threads

for i in range(len(files)):
    unzip = # ... and the code that creates the task dynamically

    if previous_tasks[i%threads] != None:
        previous_tasks[i%threads] >> unzip

    previous_tasks[i%threads] = join

Move on in the Event of a Failed Task

Airflow has a special operator called DummyOperator which does nothing itself but is helpful to group tasks in a DAG, when we need to skip a task we can make a dummy task and set the correct dependencies to keep the flow as desired.

Last thing we needed to solve was to allow the process to move on in the event of a failed task. The reason for this was that we were hardcoding the list of files and we know for a fact that our list wouldn’t be consistent every time, thus a missing file would cause the unzip task to fail and the downstream tasks to never run. So, we included some DummyOperators and made use of the trigger_rule that every operator has:

files = ["A", "B", "C", ... , "Z"]

threads = 3
previous_tasks = [None]*threads

for i in range(len(files)):
    unzip = # ... and the code that creates the unzip task
    move = # ... and the code that creates the move task

    succeed = DummyOperator(task_id='succeed-'+files[i], dag=dag, trigger_rule='all_success')
    fail = DummyOperator(task_id='fail-'+files[i], dag=dag, trigger_rule='one_failed')
    join = DummyOperator(task_id='join-'+files[i], dag=dag, trigger_rule='all_done')

    # set dependencies dynamically  
    if previous_tasks[i%threads] != None:
        previous_tasks[i%threads] >> unzip  

    # branch flow to handle succeeded and failed tasks
    unzip >> succeed >> move >> join
    unzip >> fail >> join

    previous_tasks[i%threads] = join

The resulting DAG:

UI Errors

The above DAG, but with all the files (more than 50) breaks the UI when trying to display its Tree View. This behaviour appeared when we ran multi-thread DAGs without setting any memory related configuration.

Update Environment Errors

When updating the cluster’s version, we realized something went wrong which made our Airflow environment stop working. New DAGs were not able to be built and existing ones wouldn’t start running when triggered. Looking back at the log, all Airflow default containers failed to restart after the update.

Test Google Kubernetes Engine Auto-scaling Feature

As an experiment, we changed the configuration of the Kubernetes pool and enabled auto-scaling. It worked almost perfectly fine; some tasks failed on the first attempt but just setting retries on them allowed the DAG run to finish. It created nodes as needed while it was running and ten minutes later they were turned off.

An important point that showed up while trying auto-scaling is that thread control is not necessary, as this configuration creates nodes if there aren’t enough resources available for tasks to start.

Dynamic Memory Request for Kubernetes Pods

As an attempt to test more competencies, and show what else it can do, we made some changes to our DAG in order to allow pods to request memory dynamically. Since we worked with a hardcoded list of filenames, we adapted it to be a dictionary including each file’s size as a value, in this way we would be able to set each container size based on the size of the corresponding file to be processed.

We decided to avoid thread control since we set node auto-scaling and retries on. The attempt ran smoothly.

Code looks like this:

files = {
    "A":   "31740140",
    "B":   "27155420",
    "C":   "22334970",
    "D":   "21634461",
    ...
    ... 
}

for file, size in files.items():

    request_mem = (int(size)/pow(1024, 3))*2  # ... double the size to request 
    
    unzip = kubernetes_pod_operator.KubernetesPodOperator(
            # ... and the code that creates the unzip task
            ...
            resources = Resources(request_memory=str(request_mem)+"Gi",                  
            limit_memory="10Gi"),
            retries=2,
            retry_delay= datetime.timedelta(minutes=2),
            ...
            )
    
    move = # ... and the code that creates the move task

    succeed = # ... and the code that creates the dummy task with trigger rule
    fail = # ... and the code that creates the dummy task with trigger rule
    join = # ... and the code that creates the dummy task with trigger rule

    unzip >> succeed >> move >> join
    unzip >> fail >> join

This is a section of the DAG, in general terms it looks like this:

This is the status summary of the running DAG, as we can see it is a large one containing more than 200 tasks, and it all worked perfectly under our latest version.

Pros

  • The Graph View shows exactly what’s being run and the status of it.
  • We can adjust each pod to the size of the file that they would process.
  • Auto-scaling is a powerful feature.
  • Trigger rule helps build more complex flows.
  • Uniform way of managing secrets.

Cons

  • Airflow doesn’t seem to be able to handle large amounts of heavy tasks under the default Google Cloud Composer deployment.
  • Hardcoded list of zip files.
  • When allowing tasks to fail, some add false negatives when looking at DAG statuses in the UI:

  • Launch and stop one pod per each file to be processed causes a big overhead.

Our final version

After all our experiments and lessons learned, we finally came up with our last version which we are currently using as the best fit for us in this specific problem.

We decided to define groups of files to be processed by alphabetical order and set up enough memory resources to support any of the tasks. We modified our unzipper Go tool to accept wild cards, so we can process multiple files by just one call to the program. In this final version, we added a rename task (which is another in-house built Go implementation) and a simple list task using a BashOperator.

Code looks like this:

...

file_groups = { "A-D": "[a-dA-D]",
                "E-H": "[e-hE-H]",
                "I-L": "[i-lI-L]",
                "M-P": "[m-pM-P]",
                "Q-T": "[q-tQ-T]",
                "U-Z": "[u-zU-Z]",
                "NoAlpha": "[^a-zA-Z]"}

unzips = []

with models.DAG(
        dag_id=os.path.basename(__file__),
        schedule_interval=datetime.timedelta(days=1),
        start_date=yesterday) as dag:

        for group, expression in file_groups.items():
            unzip = kubernetes_pod_operator.KubernetesPodOperator(
                task_id='unzip-'+group,
                name='unzip-'+group.lower(),
                secrets=[secret_file],
                cmds=["./unzipper", <flags>],
                env_vars={'GOOGLE_APPLICATION_CREDENTIALS': 'location'},
                namespace='default',
                image='our image location',
                resources = Resources(request_memory="7Gi", limit_memory="10Gi"),
                retries=2,
                retry_delay= datetime.timedelta(minutes=2),
                affinity={
                    'nodeAffinity': {
                        'requiredDuringSchedulingIgnoredDuringExecution': {
                            'nodeSelectorTerms': [{
                                'matchExpressions': [{
                                    'key': 'cloud.google.com/gke-nodepool',
                                    'operator': 'In',
                                    'values': [
                                        'our pool name',
                                    ]
                                }]
                            }]
                        }
                    }
                }
            )
            unzips.append(unzip)

        rename = kubernetes_pod_operator.KubernetesPodOperator(
            # code for the rename task
        )

        list_files = BashOperator(
                    task_id='list',
                    bash_command='<gsutil command>,
        )

        for unzip in unzips:
            unzip >> rename
        rename >> list_files

Graph view:

Our Conclusions

  • We think Google Cloud Composer running Apache Airflow is an excellent option for running our workflows.
  • It is a good idea to use existing operators for simple tasks but use the containerized approach for custom items.
  • It is important to understand when and where is the DAG code evaluated for not abusing dynamic DAGs.
  • It is important to use a standard for naming DAGs and to keep the filenames aligned with their IDs.
  • Even though it is very stable, auto-scaling isn’t bullet proof, which is why we had to set it for retries.
  • We learned that we should always specify pod request and limit parameters for memory and CPU in k8s.
  • Google Cloud Composer and its relationship with the underlying k8s infrastructure that hosts it is fragile. An update (suggested by the console UI) broke it.