Creating an Automated Data Processing Pipeline with Apache Airflow, Kubernetes, and R — Part 3

Robert Gutierrez
Dev Genius
Published in
9 min readFeb 3, 2022

--

A storefront on a street corner, with a tree partially obscuring a window
Photo by Taylor Vick on Unsplash

In part one of this tutorial, we set up our Cloud Composer environment, our local environment, and forked the template repository. In part two, we got down to business: setting up our first container image; writing our first DAG and task; deploying our container image, DAG, and task to Google Cloud; and running our DAG within Airflow!

Here in part three, we will tackle plugins. Plugins are how we extend Airflow, and their functionality can range from creating custom operators, connections to CRMs and databases and other external services, hooks, and custom pages within the UI. There’s a big list of plugins on Github that you can check out; you may find what you need in there, but I’m gonna walk you through how to create a small UI plugin: a “Manual Trigger” page that lets you manually trigger a DAG by using a convenient form with fields for each argument.

Introduction

As a developer, when manually triggering a DAG in Airflow and needing to pass additional parameters, I was fine with writing pure JSON into our conf variable before triggering a DAG. While the interface to do so was simple (a text box with a “Trigger” button), it worked well enough. As myself and our analytics team better defined the technical requirements of our automated pipeline, it became clear that we needed to specify certain things whenever we triggered our DAG. This DAG, the culmination of my efforts to automate our pipeline from last year that generated data for our Network Health Survey Reports (you can find more details on that project here), would need to be triggered with two arguments: which survey administration to refer to for the incoming data and which of our school networks we were processing data for. While the old version of the pipeline used incoming data from all school networks in the survey administration, we wanted our new pipeline to process data one network at a time. This way, we could generate the data for our reports for the schools that had already finished the survey, instead of having to wait for all the schools to finish.

We ended up with three parameters: instantiator, which denoted the person triggering the DAG (to better separate the outputs in Cloud Storage), survey_administration_name, a unique name given by our Network Health Survey Portal to a particular Survey Administration, and network_id, a unique integer that identified a school network.

To make things easier for our analytics folks using Airflow, it seemed prudent to build some kind of manual trigger form, where a user could select a DAG from a dropdown, see the list of parameters expected, input values, then trigger the DAG. Handy links to the Graph View of the current DAG run and the Cloud Storage folder expecting the outputs seemed like a good idea too. This eventually manifested into the Manual Trigger plugin.

A good chunk of this code came from this tutorial. I expand on it here and provide some additional functionality. For the sake of simplicity, I will use our example DAG from the previous part of the tutorial and add in the parameters mentioned above.

Creating an Airflow UI plugin

In the previous parts of this tutorial, you’ve seen mention of plugins, as well as bits of code that hinted towards this Manual Trigger plugin I mentioned above. Let’s dive into this plugin code and explain those bits we saw before.

Folder structure for the UI plugin

This folder structure was recommended in other how-tos and I think it makes sense as well. The Airflow UI is built on Flask so, like with any other Flask application, we can define a place for static assets, a place for template, and a place for views.

The view template generates a page with a form as follows: a dropdown of all the DAGs in our Airflow instance (one can also be selected by default), and by selecting a DAG, we see a set of fields that match the parameters we specified for the DAG. Fields can be required or optional. On form submission, we are redirected back to this page but we see a flash message informing us of the successful trigger. The message reads back our parameter values and contains some helpful links.

The view method will take our form submission, filter for the DAG we had selected, create our custom run ID, then trigger the DAG while passing our form data as the conf variable. It then generates a flash message of the result.

Step 1: DAG trigger parameters

As suggested in the similar tutorial that I posted above, I’ve added a new property to our DAG object, trigger_arguments. Let’s pull up our example DAG and add this in.

trigger_arguments is it just a dictionary defining some properties for our parameters? There is a user-friendly display name to show above the form field, a field type (enum is for a dropdown), the plural version of the parameter name, whether it is required when triggering, the default value if none is supplied, and a description to display under the form field.

We’ve added the three parameters that I mentioned above. It’s up to our template to display them in a pretty way.

Step 2: The template

The template file uses Jinja syntax, and there are a few things we need to include within the file in order to have our full Airflow UI. Omitting them would mean a blank white page, which is jarring when you’re coming to the page from Airflow.

We have a few things to import, some blocks to include, though it’s fairly standard stuff. What we have now is a form with a dropdown of DAGs and a set of fields matching those DAGs’ trigger_arguments. Some Javascript controls the toggling of these field sets as well as switching on HTML5 form validation. There’s also a bit of CSS for general styling and I added Bootstrap’s animated spinner icon.

Airflow uses Bootstrap for its look-and-feel so we can use Bootstrap patterns in laying out or page as well as some jQuery if we wish.

Step 3: View method

Now let’s look at our view method code.

This file is a bit on the longer end but I’ll walk through it.

We define a function to help us with triggering the DAG properly, supplying all the properties that Airflow expects, and shoving our submitted parameters into conf. Then we have a function that defines a custom run ID. The default run ID created by Airflow is something like “<TRIGGER_TYPE>_<DATETIME_AS_ISO>” so you end up with an ID like

manual__2022-01-27T09:56:01.162295+0000

For one, the datetime is always in UTC, and two, it’s impossible to tell who triggered the DAG run by looking at this ID. So, I created a custom run ID that adds in the instantiator and converts the datetime to our local time zone. You can change the custom run ID code to anything you want, as long as it’s unique.

The view method within our FlaskAdminTriggerView class holds most of the code, but it’s set up like any other Flask view method. We set up a dictionary of DAG IDs and their respective trigger_arguments, then add in the dropdown choices for any enum-type arguments. For the choices for instanstiator, it made sense to create a Variable within Airflow and define the list there as valid_instantiators.

When the form is submitted, we filter out just the fields for our selected DAG. We set a default for instantiator, add in our custom_run_id, then send it all to our trigger function. We build our pretty little flash message, add in some helpful links, then render it all.

Now we get an instance of our FlaskAdminTriggerView class, and the properties here ensure we can access our new page from the nav menu. category lets us add a menu item to an existing dropdown in the nav or add a new dropdown. name is the name of the menu item and endpoint is the route stem for our page.

Defining a Blueprint lets us use our folder setup that we created, with static assets and templates separated out. And if we want, we can add a link to our Github repo under the Docs nav item.

Finally, we define the plugin class, which lets us “plug in” our plugin to Airflow so it’s loaded into the UI.

Step 4: The parse_job_args task

Let’s briefly look at the parse_job_args task that you may have seen mentioned in the example DAG, and included within the tasks/common folder.

This special task utilizes the PythonOperator and its provide_context argument to grab the values we submitted to the conf variable.

We want the parameters we pass into conf to be accessible to all the tasks in our DAG, so we’re gonna utilize a feature of Airflow called XComs. XComs basically form a DAG “state” that can be accessed by all the tasks within the DAG. They work by “pushing” variables to the state to store them and “pulling” them to read them. They are best used for small, serializable bits of data so avoid pushing things like data frames or other large task outputs. Those can easily be uploaded to Cloud Storage and then downloaded by a future task.

In this case, we want to push our parameters in the first task of our DAG so all the rest of the tasks have access to the values. In our PythonOperator, we define a callable and set provide_context to True. Now, in our callable, we have access to all the variables that Airflow passes to templates. We will just use kwargs to access them as a dictionary. conf comes from the DAGRun object, dag_run, and the task instance, ti, is where we push our XComs.

Now that our parameter values are saved, how do we access them in other tasks? We need to make a small modification to our code. Pull up tasks/__init__.py and check out the DEFAULT_ENV_VARS dictionary.

This is where I hid the environment variables that get passed to your task. As you might have seen on the Airflow template reference page, we have access to the task_instance object, and we can use it in a Jinja-like way. The task instance objects is where we can utilize xcom_push and xcom_pull. We can add our three parameters that we saved earlier as environment variables here.

I put these three parameters as part of the “default” environment variables because I use them everywhere, but it may make sense for you to put them in the task definition instead.

Step 5: Test

Time to see what all our effort has earned us!

Our finished Manual Trigger page

Simple but effective. Switching the DAG in the dropdown will swap out the form fields accordingly (if you have more than one DAG). If you try to click Trigger while any required fields are missing data, the browser should complain. If all fields are filled out and you click Trigger, you’ll get a nice spinner icon inside the button.

Why don’t we trigger our DAG?

Our Manual Trigger page, after triggering a DAG

Success! The “flash message” isn’t the prettiest thing in the world but it gets the point across. The first link will take you to the Graph View of your currently-running DAG, while the second link shows you the outputs of all the preview runs in Cloud Storage and the third will be the outputs from your current run.

Wrap-up

You’ve successfully created your first Airflow plugin, a UI plugin that provides a “Manual Trigger” page to help with manually triggering DAGs with parameters.

This concludes this tutorial series on automating data processing pipelines with Airflow, Kubernetes, and R. I plan on learning more about Spark so in the future I may write something on how to run a Spark workflow in Google Dataproc and trigger it from an Airflow DAG. Stay tuned!

Find the previous parts of this tutorial here: Part One and Part Two.

--

--

A creative, collaborative, and empathetic software engineer. I have over nine years of professional experience in developing impactful web applications