A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Just a little benchmark for scrapper PC's

PopMark Just a little benchmark for scrapper PC's This benchmark is for old computer that dont support other benchmark because of support. Like lack o

Garry 1 Nov 24, 2021
An After Effects render queue for ShotGrid Toolkit.

AEQueue An After Effects render queue for ShotGrid Toolkit. Features Render multiple comps to locations defined by templates in your Toolkit config. C

Brand New School 5 Nov 20, 2022
RISE allows you to instantly turn your Jupyter Notebooks into a slideshow

RISE RISE allows you to instantly turn your Jupyter Notebooks into a slideshow. No out-of-band conversion is needed, switch from jupyter notebook to a

Damian Avila 3.4k Jan 04, 2023
Snek-test - An operating system kernel made in python and assembly

pythonOS An operating system kernel made in python and assembly Wait what? It us

TechStudent10 2 Jan 25, 2022
Install Firefox from Mozilla.org easily, complete with .desktop file creation.

firefox-installer Install Firefox from Mozilla.org easily, complete with .desktop file creation. Dependencies Python 3 Python LXML Debian/Ubuntu: sudo

rany 7 Nov 04, 2022
Fisherman is a free open source fishing bot written in python.

Fisherman is a free open source fishing bot written in python.

Pure | Cody 33 Jan 29, 2022
Solves Maths24 problems for you!

maths24-solver Solves Maths24 problems for you! Enjoy this open scource project! You can edit modify and share! My wishes is for you to use this proje

6 Nov 07, 2021
Winxp_python3.6.15 - Python 3.6.15 For Windows XP SP3

This is Python version 3.6.15 Copyright (c) 2001-2021 Python Software Foundation. All rights reserved. See the end of this file for further copyright

Alex Free 13 Sep 11, 2022
📜Generate poetry with gcc diagnostics

gado (gcc awesome diagnostics orchestrator) is a wrapper of gcc that outputs its errors and warnings in a more poetic format.

Dikson Santos 19 Jun 25, 2022
Aplicação que envia regularmente um email ao utilizador com todos os filmes disponíveis no cartaz dos cinemas Nos.

Cartaz-Cinemas-Nos Aplicação que envia regularmente uma notificação ao utilizador com todos os filmes disponíveis no cartaz dos cinemas Nos. Só funcio

Cavalex 1 Jan 09, 2022
combs is a package used to generate all possible combinations of a given length k on a given set.

The package combs is a package used to generate all possible combinations of a given length k on a given set. The set is given as a list, and k must b

1 Dec 24, 2021
Zues Auto Claimer Leaked By bazooka#0001

Zues Auto Claimer Leaked By bazooka#0001 put proxies in prox.txt put ssid in sid.txt put all users you want to target in user.txt for the login just t

1 Jan 15, 2022
Recreate the joys of Office Assistant from the comfort of the Python interpreter

Recreate the joys of Office Assistant from the comfort of the Python interpreter.

Louis Sven Goulet 3 May 21, 2022
TickerRain is an open-source web app that stores and analysis Reddit posts in a transparent and semi-interactive manner.

TickerRain is an open-source web app that stores and analysis Reddit posts in a transparent and semi-interactive manner

GonVas 180 Oct 08, 2022
Simple calculator made in python

calculator Uma alculadora simples feita em python CMD, PowerShell, Bash ✔️ Início 💻 apt-get update apt-get upgrade -y apt-get install python git git

Spyware 8 Dec 28, 2021
NYCU(NCTU)-差勤-助教

NCTU-TA-fill 填寫 差勤-助教時數 有沒有覺得在差勤系統填助教時數有點浪費生命? 今天有個懶鬼浪費好多時間幫大家寫了code 只要填好的必要的資料,就可以讓電腦自動幫你完成差勤助教的時數填寫喔! https://pt-attendance.nctu.edu.tw/verify/userL

14 Dec 21, 2021
Islam - This is a simple python script.In this script I have written all the suras of Al Quran. As a result, by using this script, you can know the number of any sura at the moment.

Introduction: If you want to know sura number of al quran by just typing the name of sura than you can use this script. Usage in termux: $ pkg install

Fazle Rabbi 1 Jan 02, 2022
PressurePlate is a multi-agent environment that requires agents to cooperate during the traversal of a gridworld.

PressurePlate is a multi-agent environment that requires agents to cooperate during the traversal of a gridworld. The grid is partitioned into several rooms, and each room contains a plate and a clos

Autonomous Agents Research Group (University of Edinburgh) 6 Dec 03, 2022
Python wrapper around Apple App Store Api

App Store Connect Api This is a Python wrapper around the Apple App Store Api : https://developer.apple.com/documentation/appstoreconnectapi So far, i

123 Jan 06, 2023
Windows Task Manager with special features, written in Python.

Killer That damn Chrome ⬇ Download here · 👋 Join our discord Tired of trying to kill processes with the default Windows Task Manager? Selecting one b

Nathan Araújo 49 Jan 03, 2023