Easy to use Google Pub/Sub

Overview

Relé makes integration with Google PubSub straightforward and easy.

Build Status Read the Docs Code Coverage PyPI - Python Version PyPI - Downloads

Motivation and Features

The Publish-Subscribe pattern and specifically the Google Cloud Pub/Sub library are very powerful tools but you can easily cut your fingers on it. Relé makes integration seamless by providing Publisher, Subscriber and Worker classes with the following features:

  • Powerful Publishing API
  • Highly Scalable Worker
  • Intuitive Subscription Management
  • Easily Extensible Middleware
  • Ready to go Django/Flask integration
  • CLI
  • And much more!

What it looks like

# Publish to the topic
import rele

rele.publish(topic='photo-uploaded', data={'customer_id': 123})

# Subscribe to the Pub/Sub topic
from rele import sub

@sub(topic='photo-uploaded')
def photo_uploaded(data, **kwargs):
    print(f"Customer {data['customer_id']} has uploaded an image")

What's in the name

"Relé" is Spanish for relay, a technology that has played a key role in history in the evolution of communication and electrical technology, including the telegraph, telephone, electricity transmission, and transistors.

Install

Relé supports Python 3.6+ and installing via pip

pip install rele

or with Django integration

pip install rele[django]

or with Flask integration

pip install rele[flask]

Quickstart

Please see our documentation to get started.

You can also read more about it here


Running Tests

Does the code actually work?

  make test
Comments
  • Clean up rele.config.setup + Worker() init

    Clean up rele.config.setup + Worker() init

    This relates to #114 & #119

    This makes makes all config variables nullable falling back to standard google envars, without breaking the current api.

    The new apis would look like this is you have GOOGLE_APPLICATION_CREDENTIALS set.

    rele.config.setup()
    
    w = Worker([sub1, sub2])
    w.run_forever()
    

    TBH, I think reading global configs from the environment is easier to reason about than a singleton so I'd suggest that. Any non globals should just be passed into the instances on init.

    :tophat: What?

    Provide a description of what has been implemented.

    :thinking: Why?

    Give an explanation of why.

    :link: Related issue

    Add related issue's number. Example: Fix #1

    opened by craigmulligan 8
  • Runtime error when trying to read project_id from default google creds

    Runtime error when trying to read project_id from default google creds

    I'm getting the following error when trying to run rele.config.setup using default credentials:

    rele.config.setup({
    	"GC_CREDENTIALS_PATH": None,
        "MIDDLEWARE": [
         	"rele.contrib.LoggingMiddleware",
            "rele.contrib.FlaskMiddleware",
         ],
         "APP_NAME": "smart_comms_planner",
    }, flask_app=app)
    

    output:

      File "/Users/matthewbridges/repos/smart-comms-planner/src/__init__.py", line 66, in <module>
        rele.config.setup(settings["rele"], flask_app=app)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 69, in setup
        init_global_publisher(config)
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/publishing.py", line 10, in init_global_publisher
        gc_project_id=config.gc_project_id,
      File "/Users/matthewbridges/.local/share/virtualenvs/smart-comms-planner-yrxYHqso/lib/python3.8/site-packages/rele/config.py", line 59, in gc_project_id
        return self.credentials.project_id
    AttributeError: 'Credentials' object has no attribute 'project_id'
    

    The code is incorrectly attempting to read project_id off the credentials object, when in fact it is returned as a tuple from get_google_defaults.

    I've added a proposed fix here: https://github.com/mercadona/rele/pull/195

    opened by Itsindigo 7
  • Style guide

    Style guide

    We currently have flake8 in our linting command. We do not have isort in the project style guidelines.

    Shall we add it?

    Or try out something new like black?

    good first issue question 
    opened by andrewgy8 5
  • Configure Publisher Client timeout

    Configure Publisher Client timeout

    I would like to be able to configure the timeout when publishing a message in a blocking fashion. Right now, the default and hard coded way is set to 3.0 seconds.

    I propose adding a configuration so I can declare any number of seconds for the publisher.

    Ex.

    RELE = {
    	...
    	'PUBLISHER_TIMEOUT': 5.0
    }
    
    enhancement 
    opened by andrewgy8 4
  • Add an API to allow passing objects to middleware

    Add an API to allow passing objects to middleware

    I'm adding rele to flask app, and the subscription callbacks need the flask app_context. I've done this by way of middleware:

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def pre_process_message(self, subscription, message):
            from server import app
            self.ctx = app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    

    But to make this reusable across our services and other flask apps, I'd need a way to add arbitrary data to the config that is passed to middleware.setup method or have an easy way to call custom middleware functions.

    eg:

    class FlaskMiddleware(BaseMiddleware):
        def setup(self, config):
            self.app = config["FLASK_APP"]
    
        def pre_process_message(self, subscription, message):
            self.ctx = self.app.app_context()
            self.ctx.push()
    
        def post_process_message(self):
            self.ctx.pop()
    
    enhancement 
    opened by craigmulligan 4
  • Simplify initializing and running the Worker

    Simplify initializing and running the Worker

    One consistent peice of feedback that I have received is simplifying the Worker class. Right now, you must Initialize with the subs, each individual config attribute, run setup, run start, and then sleep.

    Like this:

    worker = Worker(
        [photo_uploaded],
        config.gc_project_id,
        config.credentials,
        config.ack_deadline,
    )
    worker.setup()
    worker.start()
    sleep(120)
    

    I propose we simplify the API to run a worker to look something like this instead:

    worker = Worker([photo_uploaded], config)
    worker.run(sleep=120)
    

    run would call both setup and start, and we could add the standard sleep method. In addition we consolidate the configuration into one attribute.

    This would be backwards compatible since we will be creating a new method. And the change to Worker initialization could also fall back to the declared attributes if defined. Otherwise, use the config object.

    enhancement 
    opened by andrewgy8 4
  • Improve sub decorator

    Improve sub decorator

    The sub decorator can get some ease of use improvements

    1. Use functools.wraps to preserve __name__ and __doc__ of the original function
    2. Inspect callback function signature and raise an exception if the signature is not compatible (ie. missing data)
    3. Log a warning message if the function cannot be discovered (ie. no sub in the path)

    Slightly related to the second point: @sub("topic", filter_by=42) will raise a TypeError at runtime (because filters are used as callables but they are not checked)

    enhancement hacktoberfest 
    opened by tbarbugli 3
  • Breaking with Google PubSub >= 2.0

    Breaking with Google PubSub >= 2.0

    In the 1.0 release, I had to pin pubsub to <2.0. Otherwise, our tests would break. It seems there were major changes, and we need to update some usages in our code.

    To reproduce:

    1. update google pubsub library to >2.0.
    2. Run tests
    good first issue hacktoberfest 
    opened by andrewgy8 3
  • Execution doesn't stop when using debugger

    Execution doesn't stop when using debugger

    Hey 👋 first of all thanks for this great library!

    This might not be a strictly Relé related issue, but when I tried to pause execution to debug some issue (ie dropping a __import__("pdb").set_trace()) inside the subscription message handler function it did not stop, but just kept on processing messages.

    This might be a trivial problem to overcome, but my search engine fu failed me. I'm guessing the problem is that the subscriber handles the message in a thread, so it would somehow need to let the parent know to stop, but not really sure how to go about it.

    question 
    opened by daaain 3
  • Add unrecoverable_middleware

    Add unrecoverable_middleware

    This is useful if you know you will never be able to handle the message and don't have a dead-letter queue setup.

    :tophat: What?

    A custom exception and middleware to automate and ignore bad messages.

    :thinking: Why?

    I've been using this a fair amount in some of our services and thought it may be useful to others.

    opened by craigmulligan 3
  • Add python 3.8 to travis

    Add python 3.8 to travis

    :tophat: What?

    Add python 3.8 to travis.

    :thinking: Why?

    It was officially released on Oct 14, 2019

    This would allow us to officially add Python 3.8. However, it seems that grpcio is not compiled for Python 3.8 and therefore the travis build takes much longer to complete.. Ran for 5 min 40 sec.

    So either we can wait and not support 3.8 yet, or merge this and have the travis build take longer than expected.

    I leave it as a draft until we decide.

    opened by andrewgy8 3
  • [travis] PSQL is not required for tests execution

    [travis] PSQL is not required for tests execution

    :tophat: What?

    Remove PostgreSQL service from Travis pipeline

    :thinking: Why?

    • It's not used
    • It could give a false sensation of security by believing there are tests checking database features (Django DB)
    hacktoberfest 
    opened by Maks3w 0
  • [add] Python 3.10 support

    [add] Python 3.10 support

    :tophat: What?

    • Run the test suite against the minimumPython supported (3.6) and the maximum version supported (3.10)
    • Also update the dist image to bionic because it's the only one with support for all the Python versions.
    • Add the PIP classifiers for Python 3.10

    :thinking: Why?

    Be sure the library is compatible with the latest/current version of Python (3.10)

    hacktoberfest 
    opened by Maks3w 0
  • Add native async support for subscriptions

    Add native async support for subscriptions

    Currently, async functions can't be used with the @sub context manager.

    @sub(topic='my-topic'):
    async def async_handler(data, **kwargs):
        print('data', data)
    

    If you try, you'll get the following error.

    Configuring worker with 1 subscription(s)...
      dicom_uploads - handle_upload
    /usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
      callback(message)
    RuntimeWarning: Enable tracemalloc to get the object allocation traceback
    

    There are some simple workarounds, but it would be nice if this was supported natively.

    @sub(topic='my-topic'):
    def sync_handler(data, **kwargs)
        return asyncio.run(async_handler(data, **kwargs))
    
    async def async_handler(data, **kwargs):
        print('data', data)
    
    opened by csaroff 6
  • Add support for arbitrary deserializers

    Add support for arbitrary deserializers

    :tophat: What?

    Allow custom deserializers other than the default json deserializer.

    :thinking: Why?

    Some of GCP's pub/sub notifications have messages that are simple strings which can't be deserialized as json.

    For example, notifications that a record was inserted into the dicom store comes through with a string indicating the path to that record and nothing more.

    :link: https://github.com/mercadona/rele/issues/229

    opened by csaroff 8
  • Add subscription option to disable json parsing

    Add subscription option to disable json parsing

    Currently rele's subscription model only supports consuming messages that are json deserializable.

    Unfortunately some of GCP's core services publish non-json messages. For example, the gcp dicom store publishes notifications to a topic when a new dicom instance was inserted, but the message field is just the dicom instance path. When attempting to consume these messages through rele, we get the following error.

    Exception raised while processing message for dicom_uploads - handle_upload: JSONDecodeError
    Traceback (most recent call last):
      File "/usr/local/lib/python3.9/site-packages/rele/subscription.py", line 112, in __call__
        data = json.loads(message.data.decode("utf-8"))
      File "/usr/local/lib/python3.9/json/__init__.py", line 346, in loads
        return _default_decoder.decode(s)
      File "/usr/local/lib/python3.9/json/decoder.py", line 337, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/local/lib/python3.9/json/decoder.py", line 355, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    Can we add an option to disable deserialization of these messages? Something like:

    from rele import sub
    
    @sub(topic='my-topic', deserialize_message=False):
    def handle_upload(data, **kwargs):
        print('Handling data', data)
    
    opened by csaroff 2
Releases(1.6.0)
  • 1.6.0(Aug 4, 2022)

  • v1.4.1(Apr 20, 2022)

    What's Changed

    • [Deleted] Delete post-publish-message-failure hook on VerboseLoggingMiddleware. (#220)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.4.0...v1.4.1

    Source code(tar.gz)
    Source code(zip)
  • v1.4.0(Apr 13, 2022)

    What's Changed

    • [Added] Added a VerboseLoggingMiddleware that does not truncate mesage payload. (#218)

    Full Changelog: https://github.com/mercadona/rele/compare/v1.3.0...v1.4.0

    Source code(tar.gz)
    Source code(zip)
  • v1.3.0(Apr 5, 2022)

    What's Changed

    • GC Project Id & Windows support by @chirgjin-les in https://github.com/mercadona/rele/pull/215
    • Release v1.3.0 by @jonasae in https://github.com/mercadona/rele/pull/216

    New Contributors

    • @chirgjin-les made their first contribution in https://github.com/mercadona/rele/pull/215

    Full Changelog: https://github.com/mercadona/rele/compare/v1.2.0...v1.3.0

    Source code(tar.gz)
    Source code(zip)
  • v1.2.0(Dec 15, 2021)

    • [CHANGED] TimeotError from publisher (#212)
    • Added filter_subs_by setting in documentation (#208)
    • Automatic topic creation (#206)
    • Log post publish success (#204)
    Source code(tar.gz)
    Source code(zip)
  • 1.1.1(Jun 28, 2021)

  • 1.1.0(Mar 10, 2021)

    • Google Pubsub 2.0 Compat (#192)
    • Add validations to the sub decorator (#189)
    • Add new post_publish_hook and deprecate the old one (#190)
    • Discover and load settings when publishing (#188)
    • Fix #180: Raise error when the config loads a repeated subscription (#187)
    Source code(tar.gz)
    Source code(zip)
  • 1.0.0(Sep 25, 2020)

  • 0.14.0(Aug 5, 2020)

    • BREAKING CHANGE: Remove GC_CREDENTIALS (#174)
    • DEPRECATE: GC_PROJECT_ID setting (#178)
    • Add changelog to the docs site (#179)
    • Catch TimeoutError and run post_publish_failure when blocking (#172)
    Source code(tar.gz)
    Source code(zip)
  • 0.13.0(Jul 10, 2020)

  • 0.13.dev0(Jun 16, 2020)

  • 0.12.0(Jun 12, 2020)

  • 0.11.0(Jun 4, 2020)

  • 0.10.0(Feb 4, 2020)

    • Adjust default THREADS_PER_SUBSCRIPTION (#152)
    • Add unrecoverable_middleware (#150)
    • Allow multiple filters (#148)
    • Configure timeout from .publish() (#143)
    • Dont crash when subscription topic does not exist (#142)
    Source code(tar.gz)
    Source code(zip)
  • 0.9.1(Jan 2, 2020)

  • 0.9.0(Dec 20, 2019)

    • Flask support via middleware (#127)
    • Add message attributes to metrics log (#128)
    • Specify number of threads per subscriber with Subscription ThreadPoolExecutor (#139)
    • Publishing timeout while blocking (#137)
    • Clean up rele.config.setup + Worker() init (#132)
    Source code(tar.gz)
    Source code(zip)
  • 0.8.1(Nov 25, 2019)

  • 0.8.0(Nov 22, 2019)

    • Worker run method (#118)
    • Add kwargs to setup method passed through to middleware (#123)
    • Add missing worker middleware hooks (#121)
    • Add 3.8 support
    • More Documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.7.0(Oct 21, 2019)

  • 0.6.0(Sep 23, 2019)

    • BREAKING: Remove drf as a dependency (#91)
    • Add message as a parameter for middleware hooks (#99)
    • Check setting.CONN_MAX_AGE and warn when not 0 (#97)
    • More documentation
    Source code(tar.gz)
    Source code(zip)
  • 0.5.0(Aug 9, 2019)

  • 0.4.1(Jun 19, 2019)

  • 0.4.0(Jun 17, 2019)

    • Set DEFAULT_ACK_DEADLINE (#49)
    • Filter by message attributes (#66)
    • BREAKING: All Relé settings are defined in a dict (#60)

    Old structure:

    from google.oauth2 import service_account
    RELE_GC_CREDENTIALS = service_account.Credentials.from_service_account_file(
        'rele/settings/dummy-credentials.json'
    )
    RELE_GC_PROJECT_ID = 'dummy-project-id'
    

    New structure:

    from google.oauth2 import service_account
    RELE = {
        'GC_CREDENTIALS': service_account.Credentials.from_service_account_file(
            'rele/settings/dummy-credentials.json'
        ),
        'GC_PROJECT_ID': 'dummy-project-id',
        'MIDDLEWARE': [
            'rele.contrib.LoggingMiddleware',
            'rele.contrib.DjangoDBMiddleware',
        ],
        'SUB_PREFIX': 'delivery',
        'APP_NAME': 'delivery',
    }
    
    • rele.contrib.middleware (#55)
    • Prefix argument in sub decorator (#47)
    • Add timestamp to the published message (#42)
    • BREAKING: Explicit publisher and subscriber configuration (#43)
    • Sphinx documentation (#27, #34, #40, #41)
    • Contributing guidelines (#32)
    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Jun 5, 2019)

  • v0.3.0(May 14, 2019)

    • Ability to run in emulator mode (#12)
    • Add Travis-CI builds (#10)
    • More friendly global publish (#11)
    • Non-blocking behaviour when publishing by default (#6)
    Source code(tar.gz)
    Source code(zip)
Tamil Voicechat UserBot. Powerd By TamilBots. Https://T.me/TamilSupport

Tamil Voicechat UserBot A Telegram UserBot to Play music 🎶 in Voice Chats. It's recommended to use an USA number.(if your real number is suspended I'

Tamil Bots 78 Nov 01, 2022
A lightweight, dependency-free Python library (and command-line utility) for downloading YouTube Videos.

24 July 2020 Actively soliciting contributers! Ping @ronncc if you would like to help out! pytube pytube is a very serious, lightweight, dependency-fr

pytube 7.9k Jan 02, 2023
ML-Test-Client

ML-Test-Client Introduction What is this? This Test Client App is to be used to crowd-test machine learning models with the goal of finding the best c

11 Jul 15, 2022
AKShare is an elegant and simple financial data interface library for Python, built for human beings

AKShare is an elegant and simple financial data interface library for Python, built for human beings

AKFamily 5.8k Dec 30, 2022
A userbot made for telegram

𝚃𝙷𝙴 𝙼𝙰𝙵𝙸𝙰𝙱𝙾𝚃 This is a userbot made for telegram. I made this userbot with help of all other userbots available in telegram. All credits go

MafiaBotOP 8 Apr 08, 2022
Discord heximals: More colors for your bot

DISCORD-HEXIMALS More colors for your bot ! Support : okimii#0434 COLORS ( 742 )

4 Feb 04, 2022
A python package that allows you to place automated trades using the TD Ameritrade API.

Template Repo Table of Contents Overview Setup Usage Support These Projects Overview Setup Setup - Requirements Install:* For this particular project,

Alex Reed 4 Jan 25, 2022
Discord bot do sprawdzania ceny pizzy.

Discord bot do sprawdzania ceny pizzy w pizzeri Bombola. Umieszczony jest na platformie Heroku, dzięki czemu działa 24/7. Commands List Info: Jako com

1 Sep 18, 2021
An API wrapper library for opensea api.

Opensea API An API wrapper library for opensea api. Installation pip3 install opensea Usage Retrieving assets: from opensea import get_assets # This

Ankush Singh 38 Jul 17, 2022
The modern Lavalink wrapper designed for discord.py

Pomice The modern Lavalink wrapper designed for discord.py This library is heavily based off of/uses code from the following libraries: Wavelink Slate

Gstone 1 Feb 02, 2022
Community-based extensions for the python-telegram-bot library.

Community-based extensions for the python-telegram-bot library. Table of contents Introduction Installing Getting help Contributing License Introducti

74 Dec 24, 2022
Open Source Discord bot with many cool features like Weather, Balance, Avatar, User, Server, RP-commands, Gif search, YouTube search, VK post search etc.

Сокобот Дискорд бот с открытым исходным кодом. Содержит в себе экономику, полезные команды (!аватар, !юзер, !сервер и тд.), рп-команды (!обнять, !глад

serverok 2 Jan 16, 2022
Notion4ever - Python tool for export all your content of Notion page using official Notion API

NOTION4EVER Notion4ever is a small python tool that allows you to free your cont

50 Dec 30, 2022
This is telegram bot to generate string session for using user bots. You can see live bot in https://telegram.dog/string_session_Nsbot

TG String Session Generate Pyrogram String Session Using this bot. Demo Bot: Configs: API_HASH Get from Here. API_ID Get from Here. BOT_TOKEN Telegram

Anonymous 27 Oct 28, 2022
Azure free vpn for students only! (Self hosted/No sketchy services/Fast and free)

Azpn-Azure-Free-VPN Azure free vpn for students only! (Self hosted/No sketchy services/Fast and free) This is an alternative secure way of accessing f

Harishankar Kumar 6 Mar 19, 2022
Tools to help record data from Qiskit jobs

archiver4qiskit Tools to help record data from Qiskit jobs. Install with pip install git+https://github.com/NCCR-SPIN/archiver4qiskit.git Import the

0 Dec 10, 2021
Framework to make using Bottle less time-consuming and easier

A class for the Bottle API to reduce clutter and difficulty while creating a website.

Tygzy 0 Dec 26, 2022
DEPRECATED - Official Python Client for the Discogs API

⚠️ DEPRECATED This repository is no longer maintained. You can still use a REST client like Requests or other third-party Python library to access the

Discogs 483 Dec 31, 2022
A simple test repo created following docker docs.

docker_sampleRepo A simple test repo created following docker docs. Link to docs: https://docs.docker.com/language/python/develop/ Other links: https:

Suraj Verma 2 Sep 16, 2022
Sie_banxico - A python class for the Economic Information System (SIE) API of Banco de México

sie_banxico A python class for the Economic Information System (SIE) API of Banco de México. Args: token (str): A query token from Banco de México id_

Dillan 2 Apr 07, 2022