Thread-safe Python RabbitMQ Client & Management library

Overview

AMQPStorm

Thread-safe Python RabbitMQ Client & Management library.

Version

Introduction

AMQPStorm is a library designed to be consistent, stable and thread-safe.

  • 100% Test Coverage!
  • Supports Python 2.7 and Python 3.3+.
  • Fully tested against Python Implementations; CPython and PyPy.

Documentation

Additional documentation is available on amqpstorm.io.

Changelog

Version 2.10.4

  • Fixed issue with a forcefully closed channel not sending the appropriate response [#114] - Thanks Bernd Höhl.

Version 2.10.3

  • Fixed install bug with cp1250 encoding on Windows [#112] - Thanks ZygusPatryk.

Version 2.10.2

  • Fixed bad socket fd causing high cpu usage [#110] - Thanks aiden0z.

Version 2.10.1

  • Fixed bug with UriConnection not handling amqps:// properly.
  • Improved documentation.

Version 2.10.0

  • Added Pagination support to Management list calls (e.g. queues list).
  • Added Filtering support to Management list calls.
  • Re-use the requests sessions for Management calls.
  • Updated to use pytest framework instead of nose for testing.

Version 2.9.0

  • Added support for custom Message implementations - Thanks Jay Hogg.
  • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
  • Re-worked the channel re-use code.

Version 2.8.5

  • Fixed a potential deadlock when opening a channel with a broken connection [#97] - Thanks mehdigmira.

Version 2.8.4

  • Fixed a bug in Message.create where it would mutate the properties dict [#92] - Thanks Killerama.

Version 2.8.3

  • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
  • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.

Version 2.8.2

  • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
  • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.

Version 2.8.1

  • Cleaned up documentation.

Version 2.8.0

  • Introduced a new channel function called check_for_exceptions.
  • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskon.
  • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
  • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.

Version 2.7.2

  • Added ability to override client_properties [#77] - Thanks tkram01.

Version 2.7.1

  • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
  • Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0

  • Added support for passing your own ssl context [#71] - Thanks troglas.
  • Improved logging verbosity on connection failures [#72] - Thanks troglas.
  • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.

Version 2.6.2

  • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
  • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.

Version 2.6.1

  • Fixed minor issue with the last channel id not being available.

Version 2.6.0

  • Re-use closed channel ids [#55] - Thanks mikemrm.
  • Changed Poller Timeout to be a constant.
  • Improved Connection Close performance.
  • Channels is now a publicly available variable in Connections.

Version 2.5.0

  • Upgraded pamqp to v2.0.0.
  • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
  • Fixed issue with Management queue/exchange declare when the passive flag was set to True.

Credits

Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

Comments
  • Out of order ack-ing?

    Out of order ack-ing?

    Whoooo, more wierdness!

    .Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 5
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
    Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 6
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 7
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
    Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
    Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Disconnecting!
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
    Running state: True, thread alive: True, thread id:140037440014080
    Joining _inbound_thread. Runstate: %s False
    
    

    Context:

    I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

    It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

    I'm working on pulling out a testable example, but it's certainly odd.


    Incidentally, the new docs pages are fancypants!

    wontfix 
    opened by fake-name 49
  • Add the ability to forcefully close a connection.

    Add the ability to forcefully close a connection.

    Basically, I'm dealing with a context where I have a high volume traffic AMQP connection, across a high-latency, unreliable link (intercontinental).

    If there is a way for a connection to possibly wedge, I'll encounter it.

    Anyways, The issue I ran into here is that it's possible for the "close" operation on a connection to wedge indefinitely, preventing a connection from actually closing (I /think/ if the shutdown RPC request gets garbled/eaten somewhere).

    The question of /how/ this happens aside, I therefore need to be able to kill a open connection in a dirty manner. This adds the ability to kill() a connection which will force the worker thread to exit immediately, without bothering to do any proper cleanup.

    Because this is a operation that's generally done with a additional watcher process, outside of the normal program flow, I used multiprocessing primitives to manage the _die flag (at one point, the watcher was a separate /process/, not thread).

    Anyways, I'm not sure if this is inline with the ideas of the library, but I haven't been able to wedge the amqp connection with this patch set, so that's something.


    Only tested on Py3.5x64, Ubuntu 14.04.

    Things I haven't done: Additional unit tests.

    opened by fake-name 17
  • SSL Retry

    SSL Retry

    I regularly get [SSL: BAD_WRITE_RETRY] bad write retry (_ssl.c:1647) errors when running AMQP-Storm using SSL.

    Looking at the code I think its because it's not handling SSL Retry exceptions from the socket

    http://stackoverflow.com/questions/2997218/why-am-i-getting-error1409f07fssl-routinesssl3-write-pending-bad-write-retr

    i'll get a stack trace to see if its on write or on do_handshake

    bug 
    opened by thejuan 17
  • reached the maximum number of channels raised with closed channels

    reached the maximum number of channels raised with closed channels

    Hello,

    I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

    After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

    I tested with a quick fix

        def _get_next_available_channel_id(self):
            channel_id = len(self._channels) + 1
            active_channels = [
                ch for ch in list(self._channels.values()) if ch and ch.is_open
            ]
            if len(active_channels) >= self.max_allowed_channels:
                raise AMQPConnectionError(
                    'reached the maximum number of channels %d' %
                    self.max_allowed_channels)
            return channel_id
    

    However it may be better to just keep an active count

    bug 
    opened by mikemrm 12
  • Unhandled Frames in 3.6.2

    Unhandled Frames in 3.6.2

    Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.

    [Channel%d] Unhandled Frame: %s -- %s

    No code changes, just a server upgrade. Not sure if this storm or rabbitmq.

    opened by thejuan 12
  • Exception: 'Deliver' object has no attribute 'body_size'

    Exception: 'Deliver' object has no attribute 'body_size'

    I have been experiencing a strange error, just for some messages I got this error:

    'Deliver' object has no attribute 'body_size' channel.py (line: 253)

    I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody

    amqp-storm

    The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.

    bug 
    opened by viniciuschiele 12
  • Cant publish multiple messages

    Cant publish multiple messages

    This is a weird one, I've upgraded to 2.2 and trying to do the following script

    import os
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    from amqpstorm import Message
    from amqpstorm import UriConnection
    
    keys = ["1","2","3"]
    #signer = Signer()
    bus = UriConnection("***")
    with bus.channel(rpc_timeout=10) as channel:
        channel.confirm_deliveries()
        for key in keys:
            print key
            msg = "My Message"
            #properties = {"headers": {"md5-signature": signer.sign(msg)}}
            Message.create(channel, msg, properties).publish(key, exchange="amq.topic")
    
    

    I get this error on the second publish.

    DEBUG:amqpstorm.connection:Connection Opening DEBUG:amqpstorm.channel0:Frame Received: Connection.Start DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started DEBUG:amqpstorm.connection:Connection Opened DEBUG:amqpstorm.connection:Opening a new Channel DEBUG:amqpstorm.connection:Channel #1 Opened 1 2 DEBUG:amqpstorm.channel:Channel #1 Closing DEBUG:amqpstorm.channel:Channel #1 Closed Exception in thread amqpstorm.io: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data if self.poller.is_ready: File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready except select.error as why: AttributeError: 'NoneType' object has no attribute 'error'

    bug 
    opened by thejuan 11
  • Testing with pamqp 3.0.0a4

    Testing with pamqp 3.0.0a4

    I plan on releasing pamqp 3.0 soon and wanted to make sure you were aware of the changes that are making it into 3.0 and give you an opportunity to report any issues prior to its release.

    Please see the version history @ https://pamqp.readthedocs.io/en/latest/history.html

    opened by gmr 10
  • potential infinite loop ?

    potential infinite loop ?

    Hello,

    I am using your library in conjunction with celery and i regularly encounter a problem: When asking celery to restart its worker (emitting a SIGTERM), the process is blocked and celery doesn't want to restart. Celery maintains a parent thread that runs child threads where tasks are executed. Amqpstorm's thread is also run from this parent thread. The processes in htop look like this:

    screenshot from 2017-04-11 10-34-26

    Here process 13880 & 13803 are related to amqpstorm. 13803 is the inbound_thread while 13880 is the heartbeat timer.

    After some investigation, i found out that killing the thread responsible for the heartbeat timer allows celery to gracefuly restart... This lead me to think that the timer could possibly create a deadlock.

    I have created a pull request based on this assumption and will try this branch on my setup: Can you tell me what you think of it ? Do you see any other possible explanations for the error i see ?

    opened by cp2587 10
  • Connection was closed by remote server: CONNECTION_FORCED

    Connection was closed by remote server: CONNECTION_FORCED

    Hello,

    We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

        self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
        self._channel.write_frames(frames_out)
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
        self.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
        self._connection.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
        raise self.exceptions[0]
    AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
    

    On the rabbitmq server, we have the following logs: client unexpectedly closed TCP connection

    Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

    class AMQPStormSocket(object):
    
        def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                     queue_is_durable, exchange_type, fallback_call):
    
            # create connection & channel
            self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
            self.channel = self.connection.channel()
    
            # create an exchange, if needed
            self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
            # create a queue, if needed
            self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
            # bind it
            self.channel.queue.bind(queue=queue, exchange=exchange)
    
            # needed when publishing
            self.exchange = exchange
    
            self.fallback_call = fallback_call
    
        def sendall(self, data):
            try:
                self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
            except Exception as e:
                self.fallback_call(e)
    
        def close(self):
            try:
                self.channel.close()
                self.connection.close()
            except Exception:
                pass
    

    Do you have an idea on how to fix these errors ?

    opened by cp2587 10
  • Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get

    mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue

    If I try to declare a queue without specifying any params I get

    Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'

    The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(

    bug 
    opened by blackalegator 9
  • Is there any support for AMQP 1-0-0 ?

    Is there any support for AMQP 1-0-0 ?

    Does the module supports AMQP-1-0-0.

    I do not see anything regarding supported version in the Doc.

    If not, is there any plan to support it in the future?

    opened by ruffp 1
  • Installation errors with Python 3.10

    Installation errors with Python 3.10

    I am using Python3 version 3.10.4 on Ubuntu 18.04 LTS and encountering the following error when attempting to install amqpstorm:

    $ pip install amqpstorm
    Keyring is skipped due to an exception: module 'collections' has no attribute 'MutableMapping'
    Defaulting to user installation because normal site-packages is not writeable
    Collecting amqpstorm
      Using cached AMQPStorm-2.10.4.tar.gz (71 kB)
      Preparing metadata (setup.py) ... error
      error: subprocess-exited-with-error
    
      × python setup.py egg_info did not run successfully.
      │ exit code: 1
      ╰─> [20 lines of output]
          Traceback (most recent call last):
            File "<string>", line 2, in <module>
            File "<pip-setuptools-caller>", line 14, in <module>
            File "/usr/lib/python3/dist-packages/setuptools/__init__.py", line 12, in <module>
              import setuptools.version
            File "/usr/lib/python3/dist-packages/setuptools/version.py", line 1, in <module>
              import pkg_resources
            File "/usr/lib/python3/dist-packages/pkg_resources/__init__.py", line 77, in <module>
              __import__('pkg_resources.extern.packaging.requirements')
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/packaging/requirements.py", line 9, in <module>
              from pkg_resources.extern.pyparsing import stringStart, stringEnd, originalTextFor, ParseException
            File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
            File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
            File "<frozen importlib._bootstrap>", line 672, in _load_unlocked
            File "<frozen importlib._bootstrap>", line 632, in _load_backward_compatible
            File "/usr/lib/python3/dist-packages/pkg_resources/extern/__init__.py", line 43, in load_module
              __import__(extant)
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/pyparsing.py", line 943, in <module>
              collections.MutableMapping.register(ParseResults)
          AttributeError: module 'collections' has no attribute 'MutableMapping'
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: metadata-generation-failed
    
    × Encountered error while generating package metadata.
    ╰─> See above for output.
    
    note: This is an issue with the package mentioned above, not pip.
    hint: See above for details.
    

    I have seen recommendations to upgrade to the latest version of requests when this happens, but I already have the latest version of requests installed, here is the contents of my requirements.txt file:

    boto3==1.23.0
    botocore==1.26.0
    jmespath==1.0.0
    python-dateutil==2.8.2
    s3transfer==0.5.2
    six==1.16.0
    urllib3==1.26.9
    redis==4.3.1
    requests==2.27.1
    amqpstorm==2.10.4
    pyyaml==6.0
    
    opened by ashleykleynhans 3
  • How does amqpstorm handle Rabbitmq memory alarms

    How does amqpstorm handle Rabbitmq memory alarms

    Hello,

    Thanks for your work on this library 👍 I have a question regarding Rabbitmq memory watermarks and the way amqpstorm handles them.

    Imagine the following:

    1. I have a process that keeps publishing messages (basically an infinite loop for this example)
    2. At a certain point memory alarm is triggered on RMQ, and further publishes are blocked (Writes on a blocked connection will time out or fail with an I/O write exception.), also the publisher get notified that a connexion is blocked (Compatible AMQP 0-9-1 clients will be [notified] when they are blocked and unblocked.).
    3. Then Rabbitmq flushes out some of the data to disk, and unblocks publisher

    What happens on AMQPstorm side ? Does it raise ? do we lose some messages ? does the block/unblock logic trigger some logic in AMQPStorm ?

    Thanks for the help

    Links: RMQ doc

    opened by mehdigmira 4
  • Asyncronous delivery confirmation

    Asyncronous delivery confirmation

    Hi,

    First, thanks for the work.

    We are experiencing some slow-down while enqueuing message with confirm-delivery. Some time before we were not using this feature and enqueuing would be quite fast, and after enabling this feature, enqueuing would become quite slower.

    I checked and it seems that the lib wait for the confirmation on each message while on the following blog post (https://blog.rabbitmq.com/posts/2011/02/introducing-publisher-confirms) they seems to tell us to enqueue all the message and then wait for all the confirmations, do you think there would be a way for amqpstorm to handle that ? If you have some indications, I could try an implementation.

    Thank you.

    enhancement 
    opened by antoinerabany 4
  • Robust Connection example

    Robust Connection example

    Hi,

    This is more of a question than an issue - is it possible to be able to manage a long-lived connection such that a reconnection can be immediately attempted if the channel/connection is closed?

    I see the robust consumer example, but nothing for a connection that maybe used just for publishing. I don't see the ability to add any callbacks for a closed connection or a way to block the current thread until there's an error on the connection/channel, or maybe I'm just missing something.

    Thanks for any help in advance.

    opened by jmcarter 4
Releases(2.10.5)
  • 2.10.5(Aug 14, 2022)

    • Added support for bulk removing users with the Management Api.
    • Added support to get the Cluster Name using the Management Api.
    • Fixed ConnectionUri to default to port 5761 when using ssl [#119] - Thanks s-at-ik.
    Source code(tar.gz)
    Source code(zip)
  • 2.10.4(Nov 20, 2021)

  • 2.10.3(Nov 4, 2021)

  • 2.10.2(Oct 22, 2021)

  • 2.10.1(Sep 29, 2021)

  • 2.10.0(Sep 7, 2021)

    • Added Pagination support to Management list calls (e.g. queues list).
    • Added Filtering support to Management list calls.
    • Re-use the requests sessions for Management calls.
    • Updated to use pytest framework instead of nose for testing.
    Source code(tar.gz)
    Source code(zip)
  • 2.9.0(Jun 11, 2021)

    • Added support for custom Message implementations - Thanks Jay Hogg.
    • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
    • Re-worked the channel re-use code.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.5(May 26, 2021)

  • 2.8.4(Mar 16, 2021)

  • 2.8.3(Mar 16, 2021)

    • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
    • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.2(Oct 8, 2020)

    • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
    • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.1(Jun 27, 2020)

  • 2.8.0(Jun 9, 2020)

    • Introduced a new channel function called check_for_exceptions.
    • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskoň.
    • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
    • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.2(Dec 2, 2019)

  • 2.7.1(Jun 16, 2019)

    • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
    • Fixed an issue with closing Channels taking too long after the server initiated it.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.0(Apr 13, 2019)

    • Added support for passing your own ssl context [#71] - Thanks troglas.
    • Improved logging verbosity on connection failures [#72] - Thanks troglas.
    • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.2(Feb 2, 2019)

    • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
    • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.1(Dec 28, 2018)

  • 2.6.0(Dec 28, 2018)

    • Re-use closed channel ids [#55] - Thanks mikemrm.
    • Changed Poller Timeout to be a constant.
    • Improved Connection Close performance.
    • Channels is now a publicly available variable in Connections.
    Source code(tar.gz)
    Source code(zip)
  • 2.5.0(Nov 25, 2018)

    • Upgraded pamqp to v2.0.0.
      • Python 3 keys will now always be of type str.
      • For more information see https://pamqp.readthedocs.io/en/latest/history.html
    • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
    • Fixed issue with Management queue/exchange declare when the passive flag was set to True.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.2(Sep 1, 2018)

    • Added support for External Authentication - Thanks Bernd Höhl.
    • Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
    • LICENSE file now included in package - Thanks Tomáš Chvátal.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.1(Aug 29, 2018)

    • Added client/server negotiation to better determine the maximum supported channels and maximum allowed frame size [#52] - Thanks gastlich.
    • We now raise an exception if the maximum allowed channel count is reached.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.0(Jan 17, 2018)

  • 2.3.0(Nov 8, 2017)

    • Added delivery_tag property to message.
    • Added redelivered property to message [#41] - Thanks tkram01.
    • Added support for Management Api Healthchecks [#39] - Thanks Julien Carpentier.
    • Fixed incompatibility with Sun Solaris 10 [#46] - Thanks Giuliox.
    • Fixed delivery_tag being set to None by default [#47] - tkram01.
    • Exposed requests verify and certs flags to Management Api [#40] - Thanks Julien Carpentier.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.2(Apr 23, 2017)

  • 2.2.1(Feb 22, 2017)

    • Fixed potential Channel leak [#36] - Thanks Adam Mills.
    • Fixed threading losing select module during python shutdown [#37] - Thanks Adam Mills.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.0(Nov 18, 2016)

    • Connection.close should now be more responsive.
    • Channels are now reset when re-opening an existing connection.
    • Re-wrote large portions of the Test suit.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.4(Nov 3, 2016)

    • Added parameter to override auto-decode on incoming Messages - Thanks Travis Griggs.
    • Fixed a rare bug that could cause the consumer to get stuck if the connection unexpectedly dies - Thanks Connor Wolf.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.3(Sep 29, 2016)

  • 2.1.2(Sep 23, 2016)

Owner
Erik Olof Gunnar Andersson
The views expressed here are 100% mine and in no way reflect those of my employer.
Erik Olof Gunnar Andersson
NekoRobot-2 - Neko is An Anime themed advance Telegram group management bot.

NekoRobot A modular telegram Python bot running on python3 with an sqlalchemy, mongodb database. ╒═══「 Status 」 Maintained Support Group Included Free

Lovely Boy 19 Nov 12, 2022
Raid ToolBox (RTB) is a big toolkit of Spamming/Raiding/Token management tools for discord.

This code is very out of date and not very good, feel free to make it into something better. (we check the github page every 5 years to pulls your PRs

2 Oct 03, 2021
Discord-Mass-Mention - Yup the title says it all

Protocol - Mass Mention (i havent tested this with any token other than my own t

Mallowies 14 Nov 06, 2022
Quickly and efficiently delete your entire tweet history with the help of your Twitter archive without worrying about the pointless 3200 tweet limit imposed by Twitter.

Twitter Nuke Quickly and efficiently delete your entire tweet history with the help of your Twitter archive without worrying about the puny and pointl

Mayur Bhoi 73 Dec 12, 2022
Python-random-quote - A file-based quote bot written in Python

Let's Write a Python Quote Bot! This repository will get you started with building a quote bot in Python. It's meant to be used along with the Learnin

amir mohammad fateh 1 Jan 02, 2022
CDBEC: Catware DataBase Encryption Client

CDBEC: Catware DataBase Encryption Client Описание CDBEC - клиент для создания, просмотра и редактирования .db-catencrypted списков, шифруемых при пом

Catware-Foundation 2 Nov 03, 2022
Non official, but friendly QvaPay library for the Python language.

Python SDK for the QvaPay API Non official, but friendly QvaPay library for the Python language. Setup You can install this package by using the pip t

Carlos Lugones 17 Nov 25, 2022
TgMusicBot is a telegram userbot for playing songs in telegram voice calls based on Pyrogram and PyTgCalls.

TgMusicBot [Stable] TgMusicBot is a telegram userbot for playing songs in telegram voice calls based on Pyrogram and PyTgCalls. Commands !start / !hel

Kürşad 21 Dec 25, 2022
This is a tutorial on how to make a Discord Bot using the discord.py library

HowToMakeADiscordBot This Github repository is here to help you code a Discord Bot using the discord.py library! 1 - Setup: Download the code inside t

Baz 1 Oct 31, 2021
Just another Shiny and Greninja-ash killing preventor for Myuu

Myuu-Anti-Shiny-Discord-Bot Why I made it? Since, I was legit fed up of NebbyBot's lag (not criticising it), I decided to make my own but in python an

5 Nov 12, 2022
Simple integrate of API udemy.com with python

Pyudemy Simple integrate of API udemy.com with python Quick start $ pip install pyudemy or $ python setup.py install Authentication To make any calls

Hudson Brendon 30 Jan 02, 2023
A python script that changes our background based on current weather and time of the day.

Desktop background on Windows 10, based on current weather and time A python script that changes our background based on current weather and time of t

Maj Gaberšček 1 Nov 16, 2021
Using GNU Radio and HackRF One to Receive, Analyze and Send ASK/OOK signals

play_with_ask NIS-8016 Lab A code: Recv.grc/py: Receive signals and match with ASK button using HackRF and GNU radio. I use AM demod block(can also in

Chen Anxue 1 Jul 04, 2022
Decryption utility for PGP Whole Disk Encryption

wdepy: Decryption and Inspection for PGP WDE Disks This is a small python tool to inspect and decrypt disk images encrypted with PGP Whole Disk Encryp

Brendan Dolan-Gavitt 17 Oct 07, 2022
in-progress decompilation of Gauntlet Legends decompression code on the N64

Gauntlet-Legends A in-progress decompilation of Gauntlet-Legends (N64) decompression code. This project currently supports the US release. Building (L

6 Jul 23, 2022
Ini Hanya Shortcut Untuk Menambahkan Kunci Tambahan Pada Termux & Membantu Para Nub Yang Decode Script Orang:v

Ini Hanya Shortcut Untuk Menambahkan Kunci Tambahan Pada Termux & Membantu Para Nub Yang Decode Script Orang:v

Lord_Ammar 1 Jan 23, 2022
A Python bot that uses the Reddit API to send users inspiring messages.

AnonBot By Edric Antoine A Python bot that uses the Reddit API to send users inspiring messages. When a message includes 'What would Anon do?', the bo

1 Jan 05, 2022
A Flask & Twilio Secret Santa app.

🎄 ✨ Secret Santa Twilio ✨ 📱 A contactless Secret Santa game built with Python, Flask and Twilio! Prerequisites 📝 A Twilio account. Sign up here ngr

Sangeeta Jadoonanan 5 Dec 23, 2021
BroBot's files, code and tester.

README - BroBOT Made by Rohan Chaturvedi [email protected] DISCLAIMER: Th

1 Jan 09, 2022
Cities bot - A simple example of using aiogram and the wikipedia package

Cities game A simple example of using aiogram and the wikipedia package. The bot

Artem Meller 2 Jan 29, 2022