Creating an Automated Data Processing Pipeline with Apache Airflow, Kubernetes, and R — Part 2

Robert Gutierrez
Dev Genius
Published in
8 min readFeb 2, 2022

--

A jet engine on a commercial airplane
Photo by Inspirationfeed on Unsplash

In part one, we set up our Cloud Composer environment, our local environment, and forked the template repository.

Now that this setup is complete, we can get to the fun part: creating a container image to use for our task pods, creating a DAG and task, setting up our task files to be executed within the task pod, deploying our code, and watching the magic happen.

Our first DAG: start to finish

I’ve included a sample DAG and container image in the repo. We can walk through it here to see how everything is wired together. If you’d like to read through the DAG code on your own, you can find it at dags/example_dag.py. The Dockerfile and relevant image stuff can be found in images/my_data_processor.

Step 1: Container image

Before the task executes, a pod is spun up using a supplied container image. If we know what sorts of programs/libraries we will need for our tasks, we can build that into our container image so that it’s ready to use by our tasks.

Let’s take a look at the Dockerfile I’ve provided:

We are basing our image off of the awesome rocker/tidyverse image, then adding some additional Python and R stuff. We’re also setting up google-cloud-sdk so we can connect to and perform operations with Google Cloud Services.

Feel free to clean up the R packages section. I kept all the packages my team has been using but you may not need all of them.

Then we are adding ~/bin to our PATH, copying our bootstrap script in, and setting it as the entrypoint.

And the bootstrap script:

You’ll notice that we have some environment variables in here. These are being set when we initialize our KubernetesPodOperator instances (our tasks). For now, you shouldn’t need to touch this script. All it’s doing is authenticating with Google Cloud, making some extra directories, downloading the “task file” (this will be a shell script that you supply for every task), and executing it.

With our Dockerfile and bootstrap script ready to go, let’s create the image and upload it to Google Cloud so we can use it in our task.

In a Terminal tab or window, navigate to the project directory, then go to images/my_data_processor. If you have not set up Docker Desktop or the docker and docker-machine packages, you should do so now.

Let’s build the image.

$ docker build -t IMAGE_NAME .

I just use the folder name for IMAGE_NAME so it’s easy to follow.

Once the image is built, we need to tag it and push it to a container registry (we will be using Google Container Registry).

$ docker tag IMAGE_NAME HOSTNAME/PROJECT-ID/IMAGE_NAME
$ gcloud auth configure-docker
$ docker push HOSTNAME/PROJECT-ID/IMAGE_NAME

HOSTNAME is the hostname of the container registry you're pushing to (in our case, we used us.gcr.io). And PROJECT-ID is your Google Cloud project ID.

Updating an image

When you need to update the Docker image, just rebuild and push it:

$ docker build -t IMAGE_NAME .
$ docker push HOSTNAME/PROJECT-ID/IMAGE_NAME

If you want to completely rebuild the image, you can pass --no-cache to the docker build command. Then push the image to the registry.

Alternatively, for a more thorough solution, you can delete the old image and build it again. Then push.

$ docker rmi --force IMAGE_NAME
$ docker build -t IMAGE_NAME .
$ docker tag IMAGE_NAME HOSTNAME/PROJECT-ID/IMAGE_NAME
$ docker push HOSTNAME/PROJECT-ID/IMAGE_NAME

This can help if you’ve modified your Dockerfile but docker push results in all "Layer already exists" messages.

Testing the image locally

If you are testing the my_data_processor image or another similar image locally, pay attention to the ENTRYPOINT directive in the Dockerfile. If you are running a shell script as the entrypoint, every container spun up will start up and instantly shut down. To get around this, we need to override the ENTRYPOINT directive to keep the container alive. Then, once it's running, we can connect to it.

We’ll need to run two commands here. First, start up a container and override the entrypoint:

$ docker run --name CONTAINER_NAME -dit --entrypoint=/bin/bash IMAGE_NAME

For CONTAINER_NAME use something unique and/or related to IMAGE_NAME.

Once it’s running, we can initialize a shell session and connect to it:

$ docker exec -it CONTAINER_NAME /bin/bash

Once you’re done testing, to stop the container and delete it, run:

$ docker stop CONTAINER_NAME && docker rm CONTAINER_NAME

Step 2: DAG and task code

Let’s take a look at the example DAG provided:

The way I set this up is slightly different from the documentation. I went ahead and put task code into a separate tasks directory, and within that, directories for each DAG. Any tasks that are used by multiple DAGs are in the tasks/common folder.

First we create our DAG, setting the schedule and timeout intervals and supplying some default args. Next we initialize our tasks by passing in the DAG we just created. Finally, we define the relationships of the tasks within the DAG by using the arrow operators. If any tasks have multiple dependencies, we will use a list of tasks instead.

# a simple linear DAG
t1 >> t2 >> t3
# a more complex DAG
t1 >> [t2a, t2b]
t2a >> t3a
[t2b, t3a] >> t4

This may look confusing but when we deploy the DAG and look at the Graph View, you’ll be able to see the relationships more clearly.

You can ignore the trigger_arguments property. I will explain that in the next part of this tutorial.

Now let’s look at the task code:

I’ve set up some functions to abstract and standardize some of the task initialization code and reduce the number of lines needed to create our task. You can check it out in tasks/__init__.py.

generate_env_vars sets up the default environment variables that should be passed to all tasks, then lets us define the ones specific to our task. We then verify our vars with verify_env_vars, which also adds additional variables that rely on previously-set variables (like TASK_FOLDER relying on TASK_ID to build the full path). Finally, generate_op_vars does something similar with variables passed to the operator class and lets us supply the task-specific ones.

There’s another task in common, called parse_job_args, that lets us take any data specified in conf when we manually trigger a task and push it to XCom so that it can be used by all downstream tasks within the DAG. I will explain it in more detail in the next part of this tutorial.

Step 3: Task files

Now let’s tackle the “meat and potatoes” of our data processing pipeline, the scripts that actually do the data processing.

For our example DAG, we’re gonna look at the task_files/example_dag/hello_world folder. Task files are separated first into DAG folders, then task folders. At a minimum, we need one file, a shell script, which is what gets executed by the container’s bootstrap script. This allows us to execute an arbitrary amount of code and call whatever programs we want. This folder is where we put our R scripts, and we execute them from the shell script. I will show you what I mean:

A lot of our shell scripts end up with this format: download task files from Cloud Storage and chmod them to make them executable, download any common files needed, execute our R scripts (there’s a vanilla R script and an RMarkdown script in the example), then upload the output to the requisite folder in runs.

In Cloud Storage, I elected to save the outputs of every task to make debugging easier. This folder is stored in the TASK_RESULTS_FOLDER environment variable, but the format is like so:

$CLOUD_STORAGE_BUCKET/runs/$DAG_ID/$CUSTOM_RUN_ID/$TASK_ID

CUSTOM_RUN_ID is something I define in a plugin, which I will talk about in the next part of this tutorial. You can instead use the run_id variable that Airflow creates.

Step 4: Deploy

Now we’re ready to test this in Airflow!

Use our handy-dandy deploy script to easily upload everything to Cloud Storage so Airflow can read it and populate the UI. In a Terminal tab or window, navigate to the project root and run:

$ ./deploy.sh staging

The deploy should be pretty fast.

Let’s pull up Airflow and see the fruits of our labor. You can grab the URL for your Airflow instance by going to your Cloud Composer environment and clicking Open Airflow UI at the top.

My team’s Airflow UI. example_dag is in there, along with some other tests.

If you’ve set up your DAG to run on a schedule, it may already be running. If not, we can trigger it manually.

Step 5: Trigger and observe

To trigger our example DAG, find the example_dag row and then find the “play button” icon in the set of icons in the farthest-right column. Go ahead and click it.

Screen seen when manually triggering a DAG

In the text box, we can write some JSON which will be copied into the conf variable usable within a task. If you’d like to write something here, you can, just make sure to enclose it in brackets.

{"instantiator": "smith"}

Click Trigger. You should be taken back to the home page but now you’ll see a green circle in the DAG Runs column. Click it and you’ll be graced with the Graph View page.

The Graph View page, a wonderful visual representation of your DAG. You will spend a lot of time here.

Each box is a task. You can hover over the boxes to see the task’s status, and click it to view the task instance details, logs from the task, and manually change the status (e.g. if you want to mark it Failed and halt execution).

You did it! Once the tasks are all complete, you can navigate to the Cloud Storage bucket, go to the relevant folder in runs and see the output.

Wrap-up

You’ve successfully deployed a Cloud Composer environment; set up your first container image, DAG, and task; deployed your container image, DAG, and task; and run your DAG within Airflow! With this knowledge, you can start building your own DAGS and modifying or creating new images for your workloads. Airflow is a powerful tool, and with the added power of KubernetesPodOperator and custom container images, the possibilities are endless!

In the next part of this tutorial, we will go over the sample plugin, which provides a nice UI for manually triggering DAGs and passing arguments to your tasks.

--

--

A creative, collaborative, and empathetic software engineer. I have over nine years of professional experience in developing impactful web applications