A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Old versions of Deadcord that are problematic or used as reference.

⚠️ Unmaintained and broken. We have decided to release the old version of Deadcord before our v1.0 rewrite. (which will be equiped with much more feat

Galaxzy 1 Feb 10, 2022
monster hunter world randomizer project

mhw_randomizer monster hunter world randomizer project Settings are in rando_config.py Current script for attack randomization is n mytest.py There ar

2 Jan 24, 2022
A simply dashboard to view commodities position data based on CFTC reports

commodities-dashboard A simply dashboard to view commodities position data based on CFTC reports This is a python project using Dash and plotly to con

71 Dec 19, 2022
Python bindings for `ign-msgs` and `ign-transport`

Python Ignition This project aims to provide Python bindings for ignition-msgs and ignition-transport. It is a work in progress... C++ and Python libr

Rhys Mainwaring 3 Nov 08, 2022
MySQL Connectivity based project. Contains various functions of a Store-Management-System

An Intermediate Level Python - MySQL Connectivity based project. Contains various functions of a Store-Management-System.

Yash Wadhvani 2 Nov 21, 2022
In this repo i inherit the pos module and added QR code to pos receipt

odoo-pos-inherit In this repo i inherit the pos module and added QR code to pos receipt 1- Create new Odoo Module using command line $ python odoo-bin

5 Apr 09, 2022
Notebooks for computing approximations to the prime counting function using Riemann's formula.

Notebooks for computing approximations to the prime counting function using Riemann's formula.

Tom White 2 Aug 02, 2022
Aerospace utilities: flight conditions package, standard atmosphere model, and more.

Aerospace Utilities About Module that contains commonly-used aerospace utilities for problem solving. Flight Condition: input altitude to compute comm

1 Jan 03, 2022
Org agenda in the console

This Python script reads an org agenda file (i.e. a regular org file with some active dates) and displays an interactive and colored year calendar with detailed information for each day when the mous

Nicolas P. Rougier 113 Jan 03, 2023
This is some simple code to scrape vistbook's system to get an overview of the different cabins availability.

DNT_cabin_availability_system This is some simple code to scrape visbook's system to get an overview of the different cabins availability. The system

Andreas Lorentzen 1 Sep 25, 2022
Data Orchestration Platform

Table of contents What is DOP Design Concept A Typical DOP Orchestration Flow Prerequisites - Run in Docker For DOP Native Features For DBT Instructio

Datatonic 61 Mar 04, 2022
Shows a pixel art of any Pokémon in your terminal!

pokemon-icat This script is inspired by this project, but since the output heavily depends on the font of your terminal, i decided to make a script th

ph04 52 Dec 22, 2022
A carrot-based color palette you didn't know you needed.

A package to produce a carrot-inspired color palette for python/matplotlib. Install: pip install carrotColors Update: pip install --upgrade carrotColo

10 Sep 28, 2021
Voldemort's Python import helper

importmagician Voldemort's Python import helper pip install importmagician Import from uninstalled Python directories Say you have a directory (relat

Zhengyang Feng 4 Mar 09, 2022
Dicionario-git-github - Dictionary created to help train new users of Git and GitHub applications

Dicionário 📕 Dicionário criado com o objetivo de auxiliar no treinamento de nov

Felippe Rafael 1 Feb 07, 2022
Banking management project using Tkinter GUI in python.

Bank-Management Banking management project using Tkinter GUI in python. Packages required Tkinter - Tkinter is the standard GUI library for Python. sq

Anjali Kumawat 7 Jul 03, 2022
My Solutions to 120 commonly asked data science interview questions.

Data_Science_Interview_Questions Introduction 👋 Here are the answers to 120 Data Science Interview Questions The above answer some is modified based

Milaan Parmar / Милан пармар / _米兰 帕尔马 181 Dec 31, 2022
Let's pretend you want to create a AWS Lambda project called "sns-processor".

Usage Let's pretend you want to create a AWS Lambda project called "sns-processor". Rather than using lambda and then editing the results to include y

1 Dec 31, 2021
BDD base project: Python + Behave

BDD base project: Python + Behave Basic example of using Python with Behave (BDD). This Gherkin example includes: Basic Scenario Scenario Outline Tagg

eccanto 1 Dec 08, 2021
Rates how pog a word or user is. Not random and does have *some* kind of algorithm to it.

PogRater :D Rates how pogchamp a word is :D A fun project coded by JBYT27 using Python3 Have you ever wondered how pog a word is? Well, congrats, you

an aspirin 2 Jun 25, 2022