Properly configuring and tuning your Airflow deployment to be generally performant can be a nontrivial endeavor. You can easily shoot yourself in the foot if you don’t do it right at first. You’ll discover and experience sporadic bottlenecks in Airflow-wide and DAG-level execution. You’ll apply solutions to those as they arise. You run the risk of developing a fragmented and mystical understanding of Airflow’s goings-on. You’ll have many questions and, frustratingly, fewer answers.
Why are more tasks not being run, even after I add workers?
Why is my task scheduling latency increasing?
Ultimately, if my Airflow DAG performance is being negatively affected, what do I actually need to look at and fix?
If you’re reading this, maybe you’ve had and answered questions like this for yourself before. The goal of this article is to connect questions like the above to various control levers we have in Airflow. Generally, problems of this nature are a result of either improper or insufficient Airflow configuration. We mean that in a sense beyond Airflow’s own notion of configuration - the machine specs running our Airflow processes can also matter.
We will frame the types of problems that can be identified and tuned for by discussing several broad examples. To that end, we’ll begin by identifying a subset of Airflow’s configuration options that are relevant to the Airflow processes experiencing a bottleneck. For example, we’ll call out the appropriate Airflow scheduler settings in the event of a scheduler bottleneck. A walkthrough of several theoretical Airflow bottlenecks and the role those parameters play in fixing them follows. The discussion concludes on additional Airflow functionality supporting further tuning and monitoring for these situations.
Ultimately, we hope to achieve the understanding of ‘which’ Airflow control levers to pull and ‘when’. Further, we’ll be equipping our mental models of understanding with the ‘why’.
Finally, before we dive in, we’ll make two assumptions. First, we will assume you’ve already got a distributed Airflow deployment using the CeleryExecutor. Much of what we discuss regarding Airflow worker settings are grounded within that context. Secondly, the discussion is based on Airflow 1.10.x versions, as Airflow 2.0 will introduce significant changes.
Outlining the Relevant Airflow Configuration Settings
Airflow has a number of configuration settings. They can be set in a few ways, but the general precedence is that configuration parameters set using environment variables will take priority over a setting set in the configuration file.
For the topic at hand, we’re interested only in the Airflow configuration settings that govern the capacity of Airflow to run tasks and schedule DAGs. The relationship between those parameters, the infrastructure of your Airflow deployment, and the desired result based on the observed bottlenecks is what’s described here.
Relevant Configuration Settings
We’ll reiterate a few of the definitions from Airflow’s documentation here, and make some useful comments about the parameter’s usage.
This defines the max number of task instances that should run simultaneously on this airflow installation.
This defines the number of task instances that a worker will take, so size up your workers based on the resources on your worker box and the nature of your tasks.
This defines how many threads will run in parallel when Airflow schedules DAGs.
The scheduler constantly tries to trigger new tasks (look at the scheduler section in the docs for more information). This defines how often the scheduler should run (in seconds).
The number of task instances allowed to run concurrently by the scheduler.
While the Airflow documentation is not overly descriptive, the important point here is that this is a task concurrency limitation set and applied at a per-DAG level.
When not using pools, tasks are run in the "default pool", whose size is guided by this config element.
The non_pooled_task_slot_count is removed as of Airflow 1.10.4 in favor of an explicit default_pool, which functions similarly, but is governed in a different way through configuration.
Observable Bottlenecks - Things to Look For
Now we’ll discuss and tie the above Airflow settings to a specific category of bottleneck you might run into. For example, you may experience a bottleneck with your workers, where new tasks are not being run due to your workers being at full capacity. Then we will describe a few example observations someone might make of their Airflow instance that would indicate that issue has occurred, along with a potential cause. The root cause we’ll suggest for the symptom is not intended to be the only possible cause of such a problem, especially for a system as complex as Airflow. Rather, the reason we’ll suggest provides one potential reason for the problem. Our explanation will also serve as a gateway to discussing fixes.
The desired takeaway is the ability to troubleshoot some common performance bottlenecks if you face the issues we describe.
For your workers, the relevant Airflow configuration parameters are parallelism and worker_concurrency. As defined above, parallelism is the maximum number of task instances your Airflow instance will allow to be in the running state.
For the CeleryExecutor, the worker_concurrency determines the concurrency of the Celery worker. A good general recommendation for these two settings is that parallelism should be the sum of all workers’ worker_concurrency. For example, with 3 workers and a worker_concurrency of 32, parallelism should be set to 96.
There’s a fairly natural question that follows this recommendation. What should the workers’ worker_concurrency be set to? Celery’s documentation sort of provides a baseline here. However, there is no universally recommended value; the correct setting relies on a number of factors. Thus, it’ll generally require some experimentation. Still, here’s a general guideline that gives a good place to start.
The maximum number of processes should be bound by the machine’s available cores, if tasks are CPU bound. Alternatively, if tasks are I/O bound, the number of processes can be increased past the core count of the worker. Ultimately, this heuristic still suggests an upper limit of roughly twice the number of CPUs on the machine.
In the case of Airflow, if it’s being used as an orchestrator (that is, as prescribed), and the processing and execution of most tasks are being handled by another application, we can exceed the core count of the worker a little. Again, though, these guidelines are more of a starting point; it’s expected that the appropriate worker concurrency for your setup will require some experimentation. Anecdotally, I’ve found that a worker_concurrency four times the number of available cores is completely viable.
The other Airflow setting you may need to tweak when experimenting with parallelism and worker_concurrency is non_pooled_task_slot_count.
Unless otherwise specified, Airflow tasks will begin by running in a default Pool. The default Pool also has a max size of 128 that you will want to increase alongside your parallelism, assuming you have no other Pools. Until Airflow 1.10.4, the maximum size for the Pool was governed by an explicit setting, the non_pooled_task_slot_count. For 1.10.4 onwards, this is initialized and governed via the UI, but our recommendations still apply.
Worker - Potential Bottleneck
Observation: Tasks across DAGs are only entering “queued” state, but do not run for a long time.
Potential Cause: We would call this a “worker (or workers) bottleneck”: the number of tasks being scheduled for your Airflow instance for at least some task volumes exceeds your workers’ capacity.
There’s a variety of causes for a behavior such as the above. However, a fairly common reason for seeing DAG tasks enter and remain in the “queued” state for long periods of time, especially across multiple DAGs in a relatively active period for the Airflow instance, is a bottleneck at the worker level. More specifically, we suspect the task volumes of the Airflow task load exceeds the capacity of all workers to run those tasks.
Given the parameters and configuration we have already discussed above, the Airflow configuration parameters could be unnecessarily limiting your total task execution bandwidth. Alternatively, the task volume during certain execution periods could exceed the limits of your current configuration, even when set properly.
In the case of the former, where your Airflow configuration parameters are limiting your total task execution bandwidth, the first thing to confirm is that the parallelism parameter is the product of the worker_concurrency parameter and the number of workers you have running.
For example, you could be self-sabotaging your task throughput if you have two workers with a worker_concurrency of 32, and have set your parallelism to 32 as well. This configuration effectively limits your total number of running tasks to 32. Then, the implicit worker_concurrency value will actually only be 16. If your Airflow instance has periods of activity exceeding 32 concurrent tasks, they will simply remain in the “queued” state until capacity opens up.
In the latter case, where your volume of tasks exceeds the limits of your current configuration, we can extend the example above. We might imagine that parallelism is properly set to 64, but tasks are still not being scheduled immediately under high volume.
In that case, our task volume simply exceeds the capacity of workers. To increase the running task throughput, we need to scale the workers in some fashion. A fix requires adding additional workers, or increasing the worker_concurrency setting and resources available to the workers already present. In both cases, parallelism has to be increased to match the new resulting product of worker_concurrency and your number of workers for the changes to reflect properly.
For the scheduler, the Airflow settings critical for configuration are max_threads and scheduler_heartbeat_sec. As defined above, max_threads governs the number of threads the scheduler will use to parse and schedule DAGs. Scheduler_heartbeat_sec determines how often this occurs.
Airflow’s FAQ has a section on guidelines for setting the max_threads parameter to limit scheduling latency in production. The recommended value is the number of cpus on the machine running the scheduler – 1. This value is used by Airflow in a multiprocessing.Queue for parsing DAGs, which is how DAGRuns and Tasks that should be scheduled are discovered. Generally speaking, a higher value for max_threads will result in better scheduler performance.
The scheduler_heartbeat_sec determines the interval (in seconds) at which the scheduler tries to find new tasks that should be run. As the number of Airflow DAGs increases in number, the scheduler heartbeat becomes increasingly expensive.
Scheduler - Potential Bottleneck
Observation: Task scheduling latency is high and getting higher.
Potential Cause: An excessive number of DAGs has overburdened Airflow’s scheduler, which needs to find resources to parse all the DAGs in the DAGs directory to schedule jobs.
This kind of problem is often observed as a trend over time. For example, the time between execution for two consecutive tasks (such as B’s wait time in a DAG, A -> B) slowly increases from several seconds to several minutes over the course of a year.
This behavior is usually caused by a bottleneck in the scheduler’s resource use. As the number of active DAGs increases, the load on the scheduler increases as well. In terms of Airflow configuration parameters, Airflow uses the max_threads setting to process the specified DAG directory for DAG tasks that can be scheduled every scheduler_heartbeat_sec.
Thus, one way to resolve this bottleneck is to simply increase the scheduler_heartbeat_sec value. Airflow documentation notes that this process gets more expensive as the number of active DAGs grows. Indicating to Airflow that it should run its scheduling process less often can alleviate some of the burden. This will, however, keep the task scheduling latency towards the higher side. As the scheduler_heartbeat_sec determines how often the scheduler will look to trigger new tasks, increasing this value will mean your scheduler will run a process that has gotten more computationally expensive less often. This won’t necessarily reduce the task scheduling latency much, but it should lessen the burden on the scheduler and help it operate more smoothly.
One can take a different approach by increasing the number of threads available on the machine that runs the scheduler process so that the max_threads parameter can be set to a higher value. With a higher value, the Airflow scheduler will be able to more effectively process the increased number of DAGs. This should in turn reflect a lower observed task scheduling latency as well.
Lastly, there can be observable bottlenecks in DAGs that are caused purely by configuration settings and are not indicative of a larger performance concern. We’ll briefly touch upon two example possibilities here.
Miscellaneous - Potential Bottlenecks
Observation: An individual DAG is unable to run all tasks in parallel, but other DAGs seem unaffected.
Potential Cause: This would be a “DAG-level bottleneck”. The DAG is suffering from an insufficient dag_concurrency parameter, which is limiting the number of tasks the individual DAG can run in parallel.
Sometimes, the tasks in an individual DAG remain stuck in a “queued” state, but the issue is not observed across multiple DAGs. In cases like this, the problem is likely isolated at the DAG level.
One possible culprit is the dag_concurrency setting, which limits the amount of tasks that can be “running” for a given DAGRun. Thus, if a DAG’s dag_concurrency is 2, then it will not run more than 2 tasks in parallel even if the DAG would have otherwise. Dag_concurrency has a default which is specified in the Airflow configuration, but it can be overridden at the DAG level as well.
When using the CeleryExecutor, another possible culprit that would be mostly isolated to a single DAG would be mis-specifying a Queue. Tasks defined in DAGs can specify a specific Celery Queue to be run against. If the Queue has been incorrectly defined or the workers serving that Queue are not operational, tasks configured to run against the Queue will simply sit in a “scheduled” state until any issues with the Queues are resolved.
Observation: An individual subDAG is not executing in parallel.
Potential Cause: This is actually Airflow’s default behavior for SubDAGs in Airflow 1.10.x. It uses the SequentialExecutor. This can be changed, but Airflow has some warnings against doing so.
Finally, a note on something that can be observed with subDAGs. While this is intended behavior, one might observe that their subDAGs do not execute more than a single task at a time, even if configured to do so. There are several historical reasons for this, but the reason for the observed behavior is that Airflow SubDagOperators have a default executor as the SequentialExecutor, which only runs one TaskInstance at a time. If necessary, one can supply an alternate executor for the SubDagOperator to use, but Airflow’s own documentation does caution that you begin to enter problematic territory by doing so.
Built-in Monitoring Offering and Bottleneck Detection
Airflow offers some built-in support for monitoring that can help detect some of the problems described above. Firstly, Airflow supports sending a variety of metrics to StatsD. These metrics can include useful information about things like the number of tasks your scheduler is running. This, in turn, is useful for monitoring if our workers have adequate resources to serve current Airflow task volumes. Secondly, Flower, which is a more general real-time monitoring tool for Celery, can give a real-time look into Celery worker information, as opposed to the more historical overview from StatsD.
Airflow can be configured to send metrics to StatsD. To quote their own README, StatsD is
“a network daemon that runs on the Node.js platform and listens for statistics, like counters and timers, sent over UDP or TCP and sends aggregates to one or more pluggable backend services (e.g., Graphite).” In the context of this conversation, Airflow’s StatsD support is useful as a way to monitor various aspects of your Airflow instance’s performance.
To highlight some of the metrics that are useful in identifying some of the previously mentioned problems, the executor class of metrics and the total_parse_time metric are both helpful. The executor class of metrics encodes information about how many task slots, with the slots being determined by the parallelism configuration setting, are open and how many tasks are in the queued and running states. The total_parse_time metric is the total number of seconds taken to process all the DAG files. It’s a useful metric for monitoring scheduler load and performance.
Role in Determining Recommended Parameter and Infrastructure Settings
The relationship with the number of queued_tasks and open_slots we see on average and at max load for the scheduler is important. A degree of open_slots for periods of time indicates a lack of full utilization of worker resources. Fully occupied running_tasks with significant periods of non-zero queued_tasks indicate a lack of worker capacity - either for individual Celery worker concurrency or the number of Celery workers - that indicate more resources would lead to smoother operation. We can generally expect non-zero queued_tasks to appear during Airflow’s regular scheduling behavior as tasks are naturally set to the queued state, but a significant amount of queued tasks may be a symptom of some brewing trouble with task execution.
With regards to a DAG’s dag_processing.last_runtime, it is important to note the relationship between scheduling a DAG and processing a DAG. The longer a DAG’s last_runtime is, the worse its general scheduling performance will be. This metric, alongside total_parse_time, which tracks overall DAG directory parse times, provide insights into gradual degradation of the performance of the scheduler as the DAG directory grows. It is worth monitoring for when to consider vertical scaling for the scheduler machine.
Flower is a monitoring service built on top of Celery that allows for visibility into the health and workloads of Celery workers. Setting this up is a useful aspect of being able to observe worker metrics of interest in our Airflow environments. More specifically, it offers real-time information on the number of tasks a given Celery worker is occupied with. In the context of this conversation, it provides real-time visibility into if the Airflow workers are overloaded or underloaded.
More Best Practices for Performant Airflow
Airflow Pools are an additional feature that Airflow has for controlling task execution. They are useful primarily as a means to limit the execution of logical sets of tasks. Airflow task definitions that refer to a pool can also refer to a priority_weight, which can be used to further control the flow of tasks into the Pool. You might set aside tasks into a Pool to limit expensive types of tasks from running all at once or to limit requests to an API.
One important note is that tasks that are not assigned a Pool run in the default pool, which is initialized to a value of 128. For Airflow 1.10.4 onwards, the size of the default pool is governed in the UI. Prior to that, it’s governed by a configuration setting, non_pooled_task_slot_count. In either case, you’ll want the size of the default pool to be matched with your parallelism setting, assuming no other Pools.
When using the CeleryExecutor, another form of task control is offered in Airflow through Queues. Airflow Queues leverage Celery queues, which is why the option is only useful on the CeleryExecutor. More specifically, workers can be configured to listen to one or multiple queues of tasks. Workers have a default Queue, and the name is controlled through Airflow configuration.
As a specific example, one might use Queues to set up a queue for high priority tasks. Then, the worker configuration would be such that there was at least one worker which only served the high priority queue. Further, the regular workers would serve both the default and high priority queue. The resulting effect is that a number of task slots equal to the high priority queue serving worker’s worker_concurrency will be reserved, provided other settings such as parallelism have been set properly.
DAG Writing Best Practices
Finally, a note on some of Airflow’s own recommendations for DAG writing best practices. When possible, Airflow advises not accessing Variables outside of an operator’s execute method or Jinja template. The reason for this is that Variables will create a connection to the Airflow metadata DB. As Airflow parses all active DAGs at certain intervals, an abundance of such Variable usage could result in many open connections slowing parsing and placing load on the DB.
A little more generally to the above point, DAG writing in general should try to be cautious and aware of a given individual DAG’s parse time. In a similar vein to the Variable usage, it’s good to avoid using any expensive processing code in the DAG file, which should be primarily, if not wholly, defining configuration. Particularly at lower max_threads values for the scheduler, an expensive to parse DAG can have an impact on the scheduler’s overall performance.
If you’re still here, we’ve covered a lot of ground together. We’ve learned about the usual suspects in Airflow’s configuration that can cause some trouble. And, we walked through a few examples and potential solutions leveraging those same configuration parameters. While none of the solutions were a panacea, ideally they’ll serve as a structured starting point for troubleshooting.
As you know by now, Airflow’s got a reasonable monitoring offering to assist with detecting these kinds of issues as well. Also, following Airflow’s best practices for writing DAGs and leveraging features like Pools and Queues can front-load the performance effort considerably.
Properly configuring and tuning your Airflow deployment to be generally performant can be tricky business. I hope it’s a little less mystical now.