A reproduction repo for a Scheduling bug in AirFlow 2.2.3

Overview

How to run

docker-compose build
docker-compose up

Setup

Have 3 DAGs:

  1. ~240 Tasks, executes every hour, runs for about 45-50 minutes total
  2. ~600 Tasks, executes every 5 days, runs for days
  3. ~10 Tasks, executes on trigger, runs for 10-50 minutes

DAGs run on the default_pool with max_active_tasks set to 2.

My AirFlow config file has the following:

parallelism = 32
default_pool_task_slot_count = 6
executor = LocalExecutor
default_task_weight_rule = absolute

parallelism was set initially like that. default_pool_task_slot_count was set to 6, because it seemed rational that if all of my 3 dags are executing at the same time, the maximum amount of tasks that can be executed is 3*2=6.

The problem:

Almost every time any one of the DAGs is executed, no tasks from other DAGs will start until all the tasks from the first one finish. That is, if slow DAG_2 with 600 tasks starts, DAG_1 will have to wait for days to start even a single Task.

the_bug

The Logs for the Scheduler look like this:

scheduler_1  | 
scheduler_1  | [2022-02-09 10:02:27,116] {scheduler_job.py:288} INFO - 4 tasks up for execution:
scheduler_1  |  
   
    
scheduler_1  |  
    
     
scheduler_1  |  
     
      
scheduler_1  |  
      
       
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:322} INFO - Figuring out tasks to run in Pool(name=default_pool) with 4 open slots and 4 task instances ready to be queued
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks
scheduler_1  | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
       
         since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:357} INFO - Not executing 
        
          since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,119] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
         
           since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:349} INFO - DAG Slow_DAG has 2/2 running and queued tasks scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:357} INFO - Not executing 
          
            since the number of tasks running or queued from DAG Slow_DAG is >= to the DAG's max_active_tasks limit of 2 scheduler_1 | [2022-02-09 10:02:27,120] {scheduler_job.py:410} INFO - Setting the following tasks to queued state: scheduler_1 | 
          
         
        
       
      
     
    
   

I have fixed this by setting

parallelism = 1000
default_pool_task_slot_count = 999

The reason why this occurs and why the solution works

We need to look at this method of the Scheduler: https://github.com/apache/airflow/blob/2.2.3/airflow/jobs/scheduler_job.py#L229
In it, the overall logic is the following:

  1. Receive max_tis(== parallelism - active running tasks from the config) in the arguments. In my case that would be 30
  2. Calculate how many free slots in all all Pools we have. In my case that would be 4.
  3. Select the minimum and update the max_tis variable: max_tis = min(4, 30) = 4
  4. Query the DB for tasks that are Scheduled in an unpaused DAGs that are running normally and order them by DAG execution date.
  5. Limit the query by max_tis.
  6. Loop over each returned task and check if we can run it. Run, if possible.

Now, since the tasks are ordered by execution date, this will return us tasks for a DAG that started first, which lets say was DAG_2. It has over 600 tasks. The LIMIT operation will return 4 of them. Each off these tasks cannot be run since there are already 2 tasks running and the max_active_tasks of the DAG is 2. Thus, we just wait till one of these tasks finish and start another task from the same DAG.

Owner
Ilya Strelnikov
Ilya Strelnikov
Find Transposon Element insertions using long reads (nanopore), by alignment directly. (minimap2)

find_te_ins find_te_ins is designed to find Transposon Element (TE) insertions using long reads (nanopore), by alignment directly. (minimap2) Install

Ming Wang 1 Feb 09, 2022
ThnoolBox - A thneed is a multi-use versatile object

ThnoolBox Have you ever wanted a collection of bodged desktop apps that are Lorax themed ? No ? Sucks to suck I guess Apps & their downsides CalculaTh

pocoyo 1 Jan 21, 2022
SimCSE在中文任务上的简单实验

SimCSE 中文测试 SimCSE在常见中文数据集上的测试,包含ATEC、BQ、LCQMC、PAWSX、STS-B共5个任务。 介绍 博客:https://kexue.fm/archives/8348 论文:《SimCSE: Simple Contrastive Learning of Sente

苏剑林(Jianlin Su) 504 Jan 04, 2023
:fishing_pole_and_fish: List of `pre-commit` hooks to ensure the quality of your `dbt` projects.

pre-commit-dbt List of pre-commit hooks to ensure the quality of your dbt projects. BETA NOTICE: This tool is still BETA and may have some bugs, so pl

Offbi 262 Nov 25, 2022
A collection of modern themes for Tkinter TTK

ttkbootstrap A collection of modern flat themes inspired by Bootstrap. Also includes TTK Creator which allows you to easily create and use your own th

Israel Dryer 827 Jan 04, 2023
Minimalist BERT implementation assignment for CS11-747

minbert Assignment by Zhengbao Jiang, Shuyan Zhou, and Ritam Dutt This is an exercise in developing a minimalist version of BERT, part of Carnegie Mel

Graham Neubig 51 Jan 03, 2023
SimilarWeb for Team ACT v.0.0.1

SimilarWeb for Team ACT v.0.0.1 This module has been built to provide a better environment specifically for Similarweb in Team ACT. This module itself

Sunkyeong Lee 0 Dec 29, 2021
A set of scripts for a two-step procedure to measure the value of access to destinations across several modes of travel within a geographic area.

A set of scripts for a two-step procedure to measure the value of access to destinations across several modes of travel within a geographic area.

Institute for Transportation and Development Policy 2 Oct 16, 2022
Jarvis Python BOT acts like Google-assistance

Jarvis-Python-BOT Jarvis Python BOT acts like Google-assistance Setup Add Mail ID (Gmail) in the file at line no 82.

Ishan Jogalekar 1 Jan 08, 2022
A tool for light-duty persistent memoization of API calls

JSON Memoize What is this? json_memoize is a straightforward tool for light-duty persistent memoization, created with API calls in mind. It stores the

1 Dec 11, 2021
A tool to guide you for team selection based on mana and ruleset using your owned cards.

Splinterlands_Teams_Guide A tool to guide you for team selection based on mana and ruleset using your owned cards. Built With This project is built wi

Ruzaini Subri 3 Jul 30, 2022
Simple cash register system made with guizero

Eje-Casher なにこれ guizeroで作った簡易レジシステムです。実際にコミケで使う予定です。 これを誰かがそのまま使うかどうかというよりは、guiz

Akira Ouchi 4 Nov 07, 2022
A discord group chat creator just made it because i saw people selling this stuff for like up to 40 bucks

gccreator some discord group chat tools just made it because i saw people selling this stuff for like up to 40 bucks (im currently working on a faster

baum1810 6 Oct 03, 2022
Auto check in via GitHub Actions

因为本人毕业离校,本项目交由在校的@hfut-xyc同学接手,请访问hfut-xyc/hfut_auto_check-in获得最新的脚本 本项目遵从GPLv2协定,Copyright (C) 2021, Fw[a]rd 免责声明 根据GPL协定,我、本项目的作者,不会对您使用这个脚本带来的任何后果

Fw[a]rd 3 Jun 27, 2021
importlib_resources is a backport of Python standard library importlib.resources module for older Pythons.

importlib_resources is a backport of Python standard library importlib.resources module for older Pythons. The key goal of this module is to replace p

Python 36 Dec 13, 2022
PIP Manager written in python Tkinter

PIP Manager About PIP Manager is designed to make Python Package handling easier by just a click of a button!! Available Features Installing packages

Will Payne 9 Dec 09, 2022
Use Fofa、shodan、zoomeye、360quake to collect information(e.g:domain,IP,CMS,OS)同时调用Fofa、shodan、zoomeye、360quake四个网络空间测绘API完成红队信息收集

Cyberspace Map API English/中文 Development fofaAPI Completed zoomeyeAPI shodanAPI regular 360 quakeAPI Completed Difficulty APIs uses different inputs

Xc1Ym 61 Oct 08, 2022
Lenovo Yoga Ideapad Autocharge

Description This program uses the conservation_mode of Lonovo Ideapad / Yoga not

1 Jan 09, 2022
Voldemort's Python import helper

importmagician Voldemort's Python import helper pip install importmagician Import from uninstalled Python directories Say you have a directory (relat

Zhengyang Feng 4 Mar 09, 2022
Password manager using MySQL and Python 3.10.2

Password Manager Password manager using MySQL and Python 3.10.2 Installation Install my-project with github git clone https://github.com/AyaanSiddiq

1 Feb 18, 2022