A fast and durable Pub/Sub channel over Websockets. FastAPI + WebSockets + PubSub == โšก ๐Ÿ’ช โค๏ธ

Overview

pubsub

โšก ๐Ÿ—ž๏ธ FastAPI Websocket Pub/Sub

Tests Package Downloads

A fast and durable Pub/Sub channel over Websockets. The easiest way to create a live publish / subscribe multi-cast over the web.

Supports and tested on Python >= 3.7

Installation ๐Ÿ› ๏ธ

pip install fastapi_websocket_pubsub

Intro

The classic pub/sub pattern made easily accessible and scalable over the web and across your cloud in realtime; while enjoying the benefits of FastAPI (e.g. dependency injection).

FastAPI + WebSockets + PubSub == โšก ๐Ÿ’ช โค๏ธ

  • Subscribe

    • Clients subscribe to topics (arbitrary strings) and receive relevant events along with structured data (serialized with Pydantic).
      # Callback to be called upon event being published on server
      async def on_event(data):
          print("We got an event! with data- ", data)
      # Subscribe for the event 
      client.subscribe("my event", on_event)
  • Publish

    • Directly from server code to connected clients.
      app = FastAPI() 
      endpoint = PubSubEndpoint()
      endpoint.register_route(app, "/pubsub")
      endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • From client to client (through the servers)
      async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
          endpoint.publish(["my_event_topic"], data=["my", "data", 1])
    • Across server instances (using broadcaster and a backend medium (e.g. Redis, Kafka, ...))
      • No matter which server a client connects to - it will get the messages it subscribes to
      app = FastAPI() 
      endpoint = PubSubEndpoint(broadcaster="postgres://localhost:5432/")
      
      @app.websocket("/pubsub")
      async def websocket_rpc_endpoint(websocket: WebSocket):
          async with endpoint.broadcaster:
              await endpoint.main_loop(websocket)
      see examples/pubsub_broadcaster_server_example.py for full usage example

Usage example (server publishing following HTTP trigger):

In the code below, a client connects to the server and subscribes to a topic named "triggered". Aside from PubSub websocket, the server also exposes a regular http route, which triggers publication of the event.

Server:

import asyncio
import uvicorn
from fastapi import FastAPI
from fastapi.routing import APIRouter

from fastapi_websocket_pubsub import PubSubEndpoint
app =  FastAPI()
# Init endpoint
endpoint = PubSubEndpoint()
# register the endpoint on the app
endpoint.register_route(app, "/pubsub")
# Register a regular HTTP route
@app.get("/trigger")
async def trigger_events():
    # Upon request trigger an event
    endpoint.publish(["triggered"])

Client:

from fastapi_websocket_pubsub import PubSubClient
# Callback to be called upon event being published on server
async def on_trigger(data):
    print("Trigger URL was accessed")

async with PubSubClient(server_uri="ws://localhost/pubsub") as client:
    # Subscribe for the event 
    client.subscribe("triggered", on_event)

More Examples

What can I do with this?

The combination of Websockets, and bi-directional Pub/Sub is ideal to create realtime data propagation solution at scale over the web.

  • Update mechanism
  • Remote control mechanism
  • Data processing
  • Distributed computing
  • Realtime communications over the web

Foundations:

  • Based on fastapi-websocket-rpc for a robust realtime bidirectional channel

  • Based on broadcaster for syncing server instances

  • Server Endpoint:

    • Based on FastAPI: enjoy all the benefits of a full ASGI platform, including Async-io and dependency injections (for example to authenticate connections)

    • Based on Pydantic: easily serialize structured data as part of RPC requests and responses. Simply Pass Pydantic data models as PubSub published data to have it available as part of an event.

  • Client :

    • Based on Tenacity: allowing configurable retries to keep to connection alive

      • see WebSocketRpcClient.init's retry_config
    • Based on python websockets - a more comprehensive client than the one offered by FastAPI

Logging

fastapi-websocket-pubsub uses fastapi-websocket-rpc for logging config. It provides a helper logging module to control how it produces logs for you. See fastapi_websocket_rpc/logger.py. Use logging_config.set_mode or the 'WS_RPC_LOGGING' environment variable to choose the logging method you prefer. Or override completely via default logging config (e.g. 'logging.config.dictConfig'), all logger name start with: 'fastapi.ws_rpc.pubsub'

example:

# set RPC to log like UVICORN
from fastapi_websocket_rpc.logger import logging_config, LoggingModes
logging_config.set_mode(LoggingModes.UVICORN)

Pull requests - welcome!

  • Please include tests for new features
You might also like...
Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

๐Ÿš€   Cookiecutter Template for FastAPI + React Projects.  Using PostgreSQL, SQLAlchemy, and Docker
๐Ÿš€ Cookiecutter Template for FastAPI + React Projects. Using PostgreSQL, SQLAlchemy, and Docker

FastAPI + React ยท A cookiecutter template for bootstrapping a FastAPI and React project using a modern stack. Features FastAPI (Python 3.8) JWT authen

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

FastAPI application and service structure for a more maintainable codebase

Abstracting FastAPI Services See this article for more information: https://camillovisini.com/article/abstracting-fastapi-services/ Poetry poetry inst

A rate limiter for Starlette and FastAPI

SlowApi A rate limiting library for Starlette and FastAPI adapted from flask-limiter. Note: this is alpha quality code still, the API may change, and

Prometheus exporter for Starlette and FastAPI

starlette_exporter Prometheus exporter for Starlette and FastAPI. The middleware collects basic metrics: Counter: starlette_requests_total Histogram:

Opentracing support for Starlette and FastApi
Opentracing support for Starlette and FastApi

Starlette-OpenTracing OpenTracing support for Starlette and FastApi. Inspired by: Flask-OpenTracing OpenTracing implementations exist for major distri

Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.
Full stack, modern web application generator. Using FastAPI, PostgreSQL as database, Docker, automatic HTTPS and more.

Full Stack FastAPI and PostgreSQL - Base Project Generator Generate a backend and frontend stack using Python, including interactive API documentation

Releases(v0.1.20)
Redis-based rate-limiting for FastAPI

Redis-based rate-limiting for FastAPI

Glib 6 Nov 14, 2022
Lazy package to start your project using FastAPIโœจ

Fastapi-lazy ๐Ÿฆฅ Utilities that you use in various projects made in FastAPI. Source Code: https://github.com/yezz123/fastapi-lazy Install the project:

Yasser Tahiri 95 Dec 29, 2022
Code for my FastAPI tutorial

FastAPI tutorial Code for my video tutorial FastAPI tutorial What is FastAPI? FastAPI is a high-performant REST API framework for Python. It's built o

Josรฉ Haro Peralta 9 Nov 15, 2022
Cache-house - Caching tool for python, working with Redis single instance and Redis cluster mode

Caching tool for python, working with Redis single instance and Redis cluster mo

Tural 14 Jan 06, 2022
A server hosts a FastAPI application and multiple clients can be connected to it via SocketIO.

FastAPI_and_SocketIO A server hosts a FastAPI application and multiple clients can be connected to it via SocketIO. Executing server.py sets up the se

Ankit Rana 2 Mar 04, 2022
Adds GraphQL support to your Flask application.

Flask-GraphQL Adds GraphQL support to your Flask application. Usage Just use the GraphQLView view from flask_graphql from flask import Flask from flas

GraphQL Python 1.3k Dec 31, 2022
Prometheus exporter for several chia node statistics

prometheus-chia-exporter Prometheus exporter for several chia node statistics It's assumed that the full node, the harvester and the wallet run on the

30 Sep 19, 2022
A request rate limiter for fastapi

fastapi-limiter Introduction FastAPI-Limiter is a rate limiting tool for fastapi routes. Requirements redis Install Just install from pypi pip insta

long2ice 200 Jan 08, 2023
Cube-CRUD is a simple example of a REST API CRUD in a context of rubik's cube review service.

Cube-CRUD is a simple example of a REST API CRUD in a context of rubik's cube review service. It uses Sqlalchemy ORM to manage the connection and database operations.

Sebastian Andrade 1 Dec 11, 2021
FastAPI Auth Starter Project

This is a template for FastAPI that comes with authentication preconfigured.

Oluwaseyifunmi Oyefeso 6 Nov 13, 2022
Practice-python is a simple Fast api project for dealing with modern rest api technologies.

Practice Python Practice-python is a simple Fast api project for dealing with modern rest api technologies. Deployment with docker Go to the project r

0 Sep 19, 2022
Opinionated set of utilities on top of FastAPI

FastAPI Contrib Opinionated set of utilities on top of FastAPI Free software: MIT license Documentation: https://fastapi-contrib.readthedocs.io. Featu

identix.one 543 Jan 05, 2023
Github timeline htmx based web app rewritten from Common Lisp to Python FastAPI

python-fastapi-github-timeline Rewrite of Common Lisp htmx app _cl-github-timeline into Python using FastAPI. This project tries to prove, that with h

Jan Vlฤinskรฝ 4 Mar 25, 2022
Fastapi performans monitoring

Fastapi-performans-monitoring This project is a simple performance monitoring for FastAPI. License This project is licensed under the terms of the MIT

bilal alpaslan 11 Dec 31, 2022
์Šคํƒ€ํŠธ์—… ๊ฐœ๋ฐœ์ž ์ฑ„์šฉ

์Šคํƒ€ํŠธ์—… ๊ฐœ๋ฐœ์ž ์ฑ„์šฉ ๅคง ๋ฐ•๋žŒํšŒ Seed ~ Series B์— ์žˆ๋Š” ์Šคํƒ€ํŠธ์—…์„ ์œ„ํ•œ ์ฑ„์šฉ์ •๋ณด ํŽ˜์ด์ง€์ž…๋‹ˆ๋‹ค. Back-end, Frontend, Mobile ๋“ฑ ๊ฐœ๋ฐœ์ž๋ฅผ ๋Œ€์ƒ์œผ๋กœ ์ง„ํ–‰ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ํ•ด๋‹น ์Šคํƒ€ํŠธ์—…์— ์ข…์‚ฌํ•˜์‹œ๋Š” ๋ถ„๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ์ฑ„์šฉ ๊ด€๋ จ ์ •๋ณด๋ฅผ ์•Œ๊ณ  ๊ณ„์‹œ๋‹ค๋ฉด

JuHyun Lee 58 Dec 14, 2022
flask extension for integration with the awesome pydantic package

flask extension for integration with the awesome pydantic package

249 Jan 06, 2023
FastAPI CRUD template using Deta Base

Deta Base FastAPI CRUD FastAPI CRUD template using Deta Base Setup Install the requirements for the CRUD: pip3 install -r requirements.txt Add your D

Sebastian Ponce 2 Dec 15, 2021
implementation of deta base for FastAPIUsers

FastAPI Users - Database adapter for Deta Base Ready-to-use and customizable users management for FastAPI Documentation: https://fastapi-users.github.

2 Aug 15, 2022
๐Ÿ”€โณ Easy throttling with asyncio support

Throttler Zero-dependency Python package for easy throttling with asyncio support. ๐Ÿ“ Table of Contents ๐ŸŽ’ Install ๐Ÿ›  Usage Examples Throttler and Thr

Ramzan Bekbulatov 80 Dec 07, 2022
FastAPI Server Session is a dependency-based extension for FastAPI that adds support for server-sided session management

FastAPI Server-sided Session FastAPI Server Session is a dependency-based extension for FastAPI that adds support for server-sided session management.

DevGuyAhnaf 5 Dec 23, 2022