A thing to simplify listening for PG notifications with asyncpg

Overview

asyncpg-listen

This library simplifies usage of listen/notify with asyncpg:

  1. Handles loss of a connection
  2. Simplifies notifications processing from multiple channels
  3. Setups a timeout for receiving a notification
  4. Allows to receive all notifications/only last notification depending on ListenPolicy.
import asyncio
import asyncpg
import asyncpg_listen


async def handle_notifications(notification: asyncpg_listen.NotificationOrTimeout) -> None:
    print(f"{notification} has been received")


listener = asyncpg_listen.NotificationListener(asyncpg_listen.connect_func())
listener_task = asyncio.create_task(
    listener.run(
        {"channel": handle_notifications},
        policy=asyncpg_listen.ListenPolicy.LAST,
        notification_timeout=1
    )
)

async with asyncpg.connect() as connection:
    for i in range(42):
        await connection.execute(f"NOTIFY simple, '{i}'")
Owner
ANNA
ANNA
🚧Useful shortcuts for simple task on windows

Windows Manager A tool containg useful utilities for performing simple shortcut tasks on Windows 10 OS. Features Lit Up - Turns up screen brightness t

Olawale Oyeyipo 0 Mar 24, 2022
extract gene TSS/TES site form gencode/ensembl/gencode database GTF file and export bed format file.

GetTsite python Package extract gene TSS/TES site form gencode/ensembl/gencode database GTF file and export bed format file. Install $ pip install Get

laojunjun 7 Nov 21, 2022
Python Libraries with functions and constants related to electrical engineering.

ElectricPy Electrical-Engineering-for-Python Python Libraries with functions and constants related to electrical engineering. The functions and consta

Joe Stanley 39 Dec 23, 2022
produces PCA on genotypes from fasta files (popPhyl's ID format)

popPhyl_PCA Performs PCA of genotypes. Works in two steps. 1. Input file A single fasta file containing different loci, in different populations/speci

camille roux 2 Oct 08, 2021
A simple API that will return a key-value pair of randomly generated UUID

A simple API that will return a key-value pair of randomly generated UUID. Key will be a timestamp and value will be UUID. While the server is running, whenever the API is called, it should return al

Pius Lucky 2 Jan 18, 2022
Set of utilities for exporting/controlling your robot in Blender

Blender Robotics Utils This repository contains utilities for exporting/controlling your robot in Blender Maintainers This repository is maintained by

Robotology 33 Nov 30, 2022
Factoral Methods using two different method

Factoral-Methods-using-two-different-method Here, I am finding the factorial of a number by using two different method. The first method is by using f

Sachin Vinayak Dabhade 4 Sep 24, 2021
A time table app to notify the user about their class timings

kivyTimeTable A time table app to notify the user about their class timings Features This project incorporates some features i wanted to see in a time

2 Dec 15, 2021
Install, run, and update apps without root and only in your home directory

Qube Apps Install, run, and update apps in the private storage of a Qube. Build and install in Qubes Get the code: git clone https://github.com/micahf

Micah Lee 26 Dec 27, 2022
A simple example for calling C++ functions in Python by `ctypes`.

ctypes-example A simple example for calling C++ functions in Python by ctypes. Features call C++ function int bar(int* value, char* msg) with argumene

Yusu Pan 3 Nov 23, 2022
Python lightweight dependency injection library

pythondi pythondi is a lightweight dependency injection library for python Support both sync and async functions Installation pip3 install pythondi Us

Hide 41 Dec 16, 2022
Know your customer pipeline in apache air flow

KYC_pipline Know your customer pipeline in apache air flow For a successful pipeline run take these steps: Run you Airflow server Admin - connection

saeed 4 Aug 01, 2022
A python lib for generate random string and digits and special characters or A combination of them

A python lib for generate random string and digits and special characters or A combination of them

Torham 4 Nov 15, 2022
Simple web index to use bloom filter for Pwned Passwords

pwbloom Simple web index to use bloom filter for Pwned Passwords The index.py runs a simple CGI web service checking passwords with a bloom filter for

Hanno Böck 4 Nov 23, 2021
Tools for binary data on cassette

Micro Manchester Tape Storage Tools for storing binary data on cassette Includes: Python script for encoding Arduino sketch for decoding Eagle CAD fil

Zack Nelson 28 Dec 25, 2022
Install, run, and update apps without root and only in your home directory

Qube Apps Install, run, and update apps in the private storage of a Qube Building instrutions

Micah Lee 26 Dec 27, 2022
A simple package for handling variables in string.

A simple package for handling string variables. Welcome! This is a simple package for handling variables in string, You can add or remove variables wi

1 Dec 31, 2021
aws ec2.py companion script to generate sshconfigs with auto bastion host discovery

ec2-bastion-sshconfig This script will interate over instances found by ec2.py and if those instances are not publically accessible it will search the

Steve Melo 1 Sep 11, 2022
Creating low-level foundations and abstractions for asynchronous programming in Python.

DIY Async I/O Creating low-level foundations and abstractions for asynchronous programming in Python (i.e., implementing concurrency without using thr

Doc Jones 4 Dec 11, 2021
a demo show how to dump lldb info to ida.

用一个demo来聊聊动态trace 这个仓库能做什么? 帮助理解动态trace的思想。仓库内的demo,可操作,可实践。 动态trace核心思想: 动态记录一个函数内每一条指令的执行中产生的信息,并导入IDA,用来弥补IDA等静态分析工具的不足。 反编译看一下 先clone仓库,把hellolldb

25 Nov 28, 2022