A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
This suite consists of two different scripts, made to automate attacks against NoSQL databases.

NoSQL-Attack-Suite This suite consists of two different scripts, made to automate attacks against NoSQL databases. The first one looks for a NoSQL Aut

16 Dec 26, 2022
Parser for RISC OS Font control characters in Python

RISC OS Font control parsing in Python This repository contains a class (FontControlParser) for parsing font control codes from a byte squence, in Pyt

Charles Ferguson 1 Nov 02, 2021
Python script which allows for automatic registration in Golfbox

Python script which allows for automatic registration in Golfbox

Guðni Þór Björnsson 8 Dec 04, 2021
This project intends to take the user's CEP (brazilian adress code) and return the local in which the CEP is placed.

This project aims to simply return the CEP's (the brazilian resident adress code) User of the application. The project uses a request and passes on to

Daniel Soares Saldanha 4 Nov 17, 2021
Write complicated anonymous functions other than lambdas in Python.

lambdex allows you to write multi-line anonymous function expression (called a lambdex) in an idiomatic manner.

Xie Jingyi 71 May 19, 2022
A faster Python generator that get function results from multi-process workers

multiyield This package implements a Python generator that get function results from multi-process workers. The faster_fifo Queue (instead of the stan

Xin Du 1 Nov 18, 2021
Writeup and scripts for the 2021 malwarebytes crackme

Malwarebytes Crackme 2021 Tools and environment setup We will be doing this analysis in a Windows 10 VM with the flare-vm tools installed. Most of the

Jerome Leow 9 Dec 02, 2022
It is a Blender Tool which can convert the Object Data Attributes in face corner to the UVs or Vertex Color.

Blender_ObjectDataAttributesConvertTool It is a Blender Tool which can convert the Object Data Attributes in face corner to the UVs or Vertex Color. D

Takeshi Chō 2 Jan 08, 2022
Block the annoying Token Grabbers on your discord

General We have seen that in the last time many discord servers are infected by fake discord nitro links we want to put an end to this and have develo

BadTiger Network 2 Jul 16, 2022
A simple website-based resource monitor for slurm system.

Slurm Web A simple website-based resource monitor for slurm system. Screenshot Required python packages flask, colored, humanize, humanfriendly, beart

Tengda Han 17 Nov 29, 2022
contextlib2 is a backport of the standard library's contextlib module to earlier Python versions.

contextlib2 is a backport of the standard library's contextlib module to earlier Python versions. It also sometimes serves as a real world proving gro

Jazzband 35 Dec 23, 2022
use Notepad++ for real-time sync after python appending new log text

FTP远程log同步工具 使用Notepad++配合来获取实时更新的log文档效果 适用于FTP协议的log远程同步工具,配合MT管理器开启FTP服务器使用,通过Notepad++监听文本变化,更便捷的使用电脑查看方法注入打印后的信息 功能 过滤器 对每行要打印的文本使用回调函数筛选,支持链式调用

Liuhaixv 1 Oct 17, 2021
sumCulator Это калькулятор, который умеет складывать 2 числа.

sumCulator Это калькулятор, который умеет складывать 2 числа. Но есть условия: Эти 2 числа не могут быть отрицательными (всё-таки это вычитание, а не

0 Jul 12, 2022
Pulse sequence builder and compiler for q1asm

q1pulse Pulse sequence builder and compiler for q1asm. q1pulse is a simple library to compile pulse sequence to q1asm, the assembly language of Qblox

Sander de Snoo 3 Dec 14, 2022
Wagtail + Lottie is a Wagtail package for playing Adobe After Effects animations exported as json with Bodymovin.

Wagtail Lottie Wagtail + Lottie is a Wagtail package for playing Adobe After Effects animations exported as json with Bodymovin. Usage Export your ani

Alexis Le Baron 7 Aug 18, 2022
A project to explore and provide useful code for Mango Markets

🥭 Mango Explorer A project to explore and provide useful code for Mango Markets

Blockworks Foundation 160 Dec 19, 2022
An app to automatically take attendance by scanning students' bar coded ID card as they enter the classroom.

Auto Classroom Attendance This application may be run on a PC to automatically scan students' ID card using a generic bar code scanner and output the

1 Nov 10, 2021
Python flexible slugify function

Python flexible slugify function

Dmitry Voronin 471 Dec 20, 2022
Poetry workspace plugin for Python monorepos.

poetry-workspace-plugin Poetry workspace plugin for Python monorepos. Inspired by Yarn Workspaces. Adds a new subcommand group, poetry workspace, whic

Jack Smith 74 Jan 01, 2023
Logo DYS (Doküman Yönetim Sitemi) API Python Implementation

dys-connector Logo DYS (Dokuman Yonetim Sistemi) API Python Implementation Python Package: https://pypi.org/project/dys-connector Quick Start from dys

Logo Group 8 Mar 19, 2022