Thread-safe Python RabbitMQ Client & Management library

Overview

AMQPStorm

Thread-safe Python RabbitMQ Client & Management library.

Version

Introduction

AMQPStorm is a library designed to be consistent, stable and thread-safe.

  • 100% Test Coverage!
  • Supports Python 2.7 and Python 3.3+.
  • Fully tested against Python Implementations; CPython and PyPy.

Documentation

Additional documentation is available on amqpstorm.io.

Changelog

Version 2.10.4

  • Fixed issue with a forcefully closed channel not sending the appropriate response [#114] - Thanks Bernd Höhl.

Version 2.10.3

  • Fixed install bug with cp1250 encoding on Windows [#112] - Thanks ZygusPatryk.

Version 2.10.2

  • Fixed bad socket fd causing high cpu usage [#110] - Thanks aiden0z.

Version 2.10.1

  • Fixed bug with UriConnection not handling amqps:// properly.
  • Improved documentation.

Version 2.10.0

  • Added Pagination support to Management list calls (e.g. queues list).
  • Added Filtering support to Management list calls.
  • Re-use the requests sessions for Management calls.
  • Updated to use pytest framework instead of nose for testing.

Version 2.9.0

  • Added support for custom Message implementations - Thanks Jay Hogg.
  • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
  • Re-worked the channel re-use code.

Version 2.8.5

  • Fixed a potential deadlock when opening a channel with a broken connection [#97] - Thanks mehdigmira.

Version 2.8.4

  • Fixed a bug in Message.create where it would mutate the properties dict [#92] - Thanks Killerama.

Version 2.8.3

  • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
  • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.

Version 2.8.2

  • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
  • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.

Version 2.8.1

  • Cleaned up documentation.

Version 2.8.0

  • Introduced a new channel function called check_for_exceptions.
  • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskon.
  • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
  • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.

Version 2.7.2

  • Added ability to override client_properties [#77] - Thanks tkram01.

Version 2.7.1

  • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
  • Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0

  • Added support for passing your own ssl context [#71] - Thanks troglas.
  • Improved logging verbosity on connection failures [#72] - Thanks troglas.
  • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.

Version 2.6.2

  • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
  • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.

Version 2.6.1

  • Fixed minor issue with the last channel id not being available.

Version 2.6.0

  • Re-use closed channel ids [#55] - Thanks mikemrm.
  • Changed Poller Timeout to be a constant.
  • Improved Connection Close performance.
  • Channels is now a publicly available variable in Connections.

Version 2.5.0

  • Upgraded pamqp to v2.0.0.
  • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
  • Fixed issue with Management queue/exchange declare when the passive flag was set to True.

Credits

Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

Comments
  • Out of order ack-ing?

    Out of order ack-ing?

    Whoooo, more wierdness!

    .Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 5
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
    Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 6
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 7
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
    Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
    Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Disconnecting!
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
    Running state: True, thread alive: True, thread id:140037440014080
    Joining _inbound_thread. Runstate: %s False
    
    

    Context:

    I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

    It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

    I'm working on pulling out a testable example, but it's certainly odd.


    Incidentally, the new docs pages are fancypants!

    wontfix 
    opened by fake-name 49
  • Add the ability to forcefully close a connection.

    Add the ability to forcefully close a connection.

    Basically, I'm dealing with a context where I have a high volume traffic AMQP connection, across a high-latency, unreliable link (intercontinental).

    If there is a way for a connection to possibly wedge, I'll encounter it.

    Anyways, The issue I ran into here is that it's possible for the "close" operation on a connection to wedge indefinitely, preventing a connection from actually closing (I /think/ if the shutdown RPC request gets garbled/eaten somewhere).

    The question of /how/ this happens aside, I therefore need to be able to kill a open connection in a dirty manner. This adds the ability to kill() a connection which will force the worker thread to exit immediately, without bothering to do any proper cleanup.

    Because this is a operation that's generally done with a additional watcher process, outside of the normal program flow, I used multiprocessing primitives to manage the _die flag (at one point, the watcher was a separate /process/, not thread).

    Anyways, I'm not sure if this is inline with the ideas of the library, but I haven't been able to wedge the amqp connection with this patch set, so that's something.


    Only tested on Py3.5x64, Ubuntu 14.04.

    Things I haven't done: Additional unit tests.

    opened by fake-name 17
  • SSL Retry

    SSL Retry

    I regularly get [SSL: BAD_WRITE_RETRY] bad write retry (_ssl.c:1647) errors when running AMQP-Storm using SSL.

    Looking at the code I think its because it's not handling SSL Retry exceptions from the socket

    http://stackoverflow.com/questions/2997218/why-am-i-getting-error1409f07fssl-routinesssl3-write-pending-bad-write-retr

    i'll get a stack trace to see if its on write or on do_handshake

    bug 
    opened by thejuan 17
  • reached the maximum number of channels raised with closed channels

    reached the maximum number of channels raised with closed channels

    Hello,

    I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

    After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

    I tested with a quick fix

        def _get_next_available_channel_id(self):
            channel_id = len(self._channels) + 1
            active_channels = [
                ch for ch in list(self._channels.values()) if ch and ch.is_open
            ]
            if len(active_channels) >= self.max_allowed_channels:
                raise AMQPConnectionError(
                    'reached the maximum number of channels %d' %
                    self.max_allowed_channels)
            return channel_id
    

    However it may be better to just keep an active count

    bug 
    opened by mikemrm 12
  • Unhandled Frames in 3.6.2

    Unhandled Frames in 3.6.2

    Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.

    [Channel%d] Unhandled Frame: %s -- %s

    No code changes, just a server upgrade. Not sure if this storm or rabbitmq.

    opened by thejuan 12
  • Exception: 'Deliver' object has no attribute 'body_size'

    Exception: 'Deliver' object has no attribute 'body_size'

    I have been experiencing a strange error, just for some messages I got this error:

    'Deliver' object has no attribute 'body_size' channel.py (line: 253)

    I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody

    amqp-storm

    The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.

    bug 
    opened by viniciuschiele 12
  • Cant publish multiple messages

    Cant publish multiple messages

    This is a weird one, I've upgraded to 2.2 and trying to do the following script

    import os
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    from amqpstorm import Message
    from amqpstorm import UriConnection
    
    keys = ["1","2","3"]
    #signer = Signer()
    bus = UriConnection("***")
    with bus.channel(rpc_timeout=10) as channel:
        channel.confirm_deliveries()
        for key in keys:
            print key
            msg = "My Message"
            #properties = {"headers": {"md5-signature": signer.sign(msg)}}
            Message.create(channel, msg, properties).publish(key, exchange="amq.topic")
    
    

    I get this error on the second publish.

    DEBUG:amqpstorm.connection:Connection Opening DEBUG:amqpstorm.channel0:Frame Received: Connection.Start DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started DEBUG:amqpstorm.connection:Connection Opened DEBUG:amqpstorm.connection:Opening a new Channel DEBUG:amqpstorm.connection:Channel #1 Opened 1 2 DEBUG:amqpstorm.channel:Channel #1 Closing DEBUG:amqpstorm.channel:Channel #1 Closed Exception in thread amqpstorm.io: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data if self.poller.is_ready: File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready except select.error as why: AttributeError: 'NoneType' object has no attribute 'error'

    bug 
    opened by thejuan 11
  • Testing with pamqp 3.0.0a4

    Testing with pamqp 3.0.0a4

    I plan on releasing pamqp 3.0 soon and wanted to make sure you were aware of the changes that are making it into 3.0 and give you an opportunity to report any issues prior to its release.

    Please see the version history @ https://pamqp.readthedocs.io/en/latest/history.html

    opened by gmr 10
  • potential infinite loop ?

    potential infinite loop ?

    Hello,

    I am using your library in conjunction with celery and i regularly encounter a problem: When asking celery to restart its worker (emitting a SIGTERM), the process is blocked and celery doesn't want to restart. Celery maintains a parent thread that runs child threads where tasks are executed. Amqpstorm's thread is also run from this parent thread. The processes in htop look like this:

    screenshot from 2017-04-11 10-34-26

    Here process 13880 & 13803 are related to amqpstorm. 13803 is the inbound_thread while 13880 is the heartbeat timer.

    After some investigation, i found out that killing the thread responsible for the heartbeat timer allows celery to gracefuly restart... This lead me to think that the timer could possibly create a deadlock.

    I have created a pull request based on this assumption and will try this branch on my setup: Can you tell me what you think of it ? Do you see any other possible explanations for the error i see ?

    opened by cp2587 10
  • Connection was closed by remote server: CONNECTION_FORCED

    Connection was closed by remote server: CONNECTION_FORCED

    Hello,

    We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

        self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
        self._channel.write_frames(frames_out)
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
        self.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
        self._connection.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
        raise self.exceptions[0]
    AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
    

    On the rabbitmq server, we have the following logs: client unexpectedly closed TCP connection

    Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

    class AMQPStormSocket(object):
    
        def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                     queue_is_durable, exchange_type, fallback_call):
    
            # create connection & channel
            self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
            self.channel = self.connection.channel()
    
            # create an exchange, if needed
            self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
            # create a queue, if needed
            self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
            # bind it
            self.channel.queue.bind(queue=queue, exchange=exchange)
    
            # needed when publishing
            self.exchange = exchange
    
            self.fallback_call = fallback_call
    
        def sendall(self, data):
            try:
                self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
            except Exception as e:
                self.fallback_call(e)
    
        def close(self):
            try:
                self.channel.close()
                self.connection.close()
            except Exception:
                pass
    

    Do you have an idea on how to fix these errors ?

    opened by cp2587 10
  • Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get

    mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue

    If I try to declare a queue without specifying any params I get

    Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'

    The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(

    bug 
    opened by blackalegator 9
  • Is there any support for AMQP 1-0-0 ?

    Is there any support for AMQP 1-0-0 ?

    Does the module supports AMQP-1-0-0.

    I do not see anything regarding supported version in the Doc.

    If not, is there any plan to support it in the future?

    opened by ruffp 1
  • Installation errors with Python 3.10

    Installation errors with Python 3.10

    I am using Python3 version 3.10.4 on Ubuntu 18.04 LTS and encountering the following error when attempting to install amqpstorm:

    $ pip install amqpstorm
    Keyring is skipped due to an exception: module 'collections' has no attribute 'MutableMapping'
    Defaulting to user installation because normal site-packages is not writeable
    Collecting amqpstorm
      Using cached AMQPStorm-2.10.4.tar.gz (71 kB)
      Preparing metadata (setup.py) ... error
      error: subprocess-exited-with-error
    
      × python setup.py egg_info did not run successfully.
      │ exit code: 1
      ╰─> [20 lines of output]
          Traceback (most recent call last):
            File "<string>", line 2, in <module>
            File "<pip-setuptools-caller>", line 14, in <module>
            File "/usr/lib/python3/dist-packages/setuptools/__init__.py", line 12, in <module>
              import setuptools.version
            File "/usr/lib/python3/dist-packages/setuptools/version.py", line 1, in <module>
              import pkg_resources
            File "/usr/lib/python3/dist-packages/pkg_resources/__init__.py", line 77, in <module>
              __import__('pkg_resources.extern.packaging.requirements')
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/packaging/requirements.py", line 9, in <module>
              from pkg_resources.extern.pyparsing import stringStart, stringEnd, originalTextFor, ParseException
            File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
            File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
            File "<frozen importlib._bootstrap>", line 672, in _load_unlocked
            File "<frozen importlib._bootstrap>", line 632, in _load_backward_compatible
            File "/usr/lib/python3/dist-packages/pkg_resources/extern/__init__.py", line 43, in load_module
              __import__(extant)
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/pyparsing.py", line 943, in <module>
              collections.MutableMapping.register(ParseResults)
          AttributeError: module 'collections' has no attribute 'MutableMapping'
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: metadata-generation-failed
    
    × Encountered error while generating package metadata.
    ╰─> See above for output.
    
    note: This is an issue with the package mentioned above, not pip.
    hint: See above for details.
    

    I have seen recommendations to upgrade to the latest version of requests when this happens, but I already have the latest version of requests installed, here is the contents of my requirements.txt file:

    boto3==1.23.0
    botocore==1.26.0
    jmespath==1.0.0
    python-dateutil==2.8.2
    s3transfer==0.5.2
    six==1.16.0
    urllib3==1.26.9
    redis==4.3.1
    requests==2.27.1
    amqpstorm==2.10.4
    pyyaml==6.0
    
    opened by ashleykleynhans 3
  • How does amqpstorm handle Rabbitmq memory alarms

    How does amqpstorm handle Rabbitmq memory alarms

    Hello,

    Thanks for your work on this library 👍 I have a question regarding Rabbitmq memory watermarks and the way amqpstorm handles them.

    Imagine the following:

    1. I have a process that keeps publishing messages (basically an infinite loop for this example)
    2. At a certain point memory alarm is triggered on RMQ, and further publishes are blocked (Writes on a blocked connection will time out or fail with an I/O write exception.), also the publisher get notified that a connexion is blocked (Compatible AMQP 0-9-1 clients will be [notified] when they are blocked and unblocked.).
    3. Then Rabbitmq flushes out some of the data to disk, and unblocks publisher

    What happens on AMQPstorm side ? Does it raise ? do we lose some messages ? does the block/unblock logic trigger some logic in AMQPStorm ?

    Thanks for the help

    Links: RMQ doc

    opened by mehdigmira 4
  • Asyncronous delivery confirmation

    Asyncronous delivery confirmation

    Hi,

    First, thanks for the work.

    We are experiencing some slow-down while enqueuing message with confirm-delivery. Some time before we were not using this feature and enqueuing would be quite fast, and after enabling this feature, enqueuing would become quite slower.

    I checked and it seems that the lib wait for the confirmation on each message while on the following blog post (https://blog.rabbitmq.com/posts/2011/02/introducing-publisher-confirms) they seems to tell us to enqueue all the message and then wait for all the confirmations, do you think there would be a way for amqpstorm to handle that ? If you have some indications, I could try an implementation.

    Thank you.

    enhancement 
    opened by antoinerabany 4
  • Robust Connection example

    Robust Connection example

    Hi,

    This is more of a question than an issue - is it possible to be able to manage a long-lived connection such that a reconnection can be immediately attempted if the channel/connection is closed?

    I see the robust consumer example, but nothing for a connection that maybe used just for publishing. I don't see the ability to add any callbacks for a closed connection or a way to block the current thread until there's an error on the connection/channel, or maybe I'm just missing something.

    Thanks for any help in advance.

    opened by jmcarter 4
Releases(2.10.5)
  • 2.10.5(Aug 14, 2022)

    • Added support for bulk removing users with the Management Api.
    • Added support to get the Cluster Name using the Management Api.
    • Fixed ConnectionUri to default to port 5761 when using ssl [#119] - Thanks s-at-ik.
    Source code(tar.gz)
    Source code(zip)
  • 2.10.4(Nov 20, 2021)

  • 2.10.3(Nov 4, 2021)

  • 2.10.2(Oct 22, 2021)

  • 2.10.1(Sep 29, 2021)

  • 2.10.0(Sep 7, 2021)

    • Added Pagination support to Management list calls (e.g. queues list).
    • Added Filtering support to Management list calls.
    • Re-use the requests sessions for Management calls.
    • Updated to use pytest framework instead of nose for testing.
    Source code(tar.gz)
    Source code(zip)
  • 2.9.0(Jun 11, 2021)

    • Added support for custom Message implementations - Thanks Jay Hogg.
    • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
    • Re-worked the channel re-use code.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.5(May 26, 2021)

  • 2.8.4(Mar 16, 2021)

  • 2.8.3(Mar 16, 2021)

    • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
    • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.2(Oct 8, 2020)

    • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
    • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.1(Jun 27, 2020)

  • 2.8.0(Jun 9, 2020)

    • Introduced a new channel function called check_for_exceptions.
    • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskoň.
    • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
    • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.2(Dec 2, 2019)

  • 2.7.1(Jun 16, 2019)

    • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
    • Fixed an issue with closing Channels taking too long after the server initiated it.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.0(Apr 13, 2019)

    • Added support for passing your own ssl context [#71] - Thanks troglas.
    • Improved logging verbosity on connection failures [#72] - Thanks troglas.
    • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.2(Feb 2, 2019)

    • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
    • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.1(Dec 28, 2018)

  • 2.6.0(Dec 28, 2018)

    • Re-use closed channel ids [#55] - Thanks mikemrm.
    • Changed Poller Timeout to be a constant.
    • Improved Connection Close performance.
    • Channels is now a publicly available variable in Connections.
    Source code(tar.gz)
    Source code(zip)
  • 2.5.0(Nov 25, 2018)

    • Upgraded pamqp to v2.0.0.
      • Python 3 keys will now always be of type str.
      • For more information see https://pamqp.readthedocs.io/en/latest/history.html
    • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
    • Fixed issue with Management queue/exchange declare when the passive flag was set to True.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.2(Sep 1, 2018)

    • Added support for External Authentication - Thanks Bernd Höhl.
    • Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
    • LICENSE file now included in package - Thanks Tomáš Chvátal.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.1(Aug 29, 2018)

    • Added client/server negotiation to better determine the maximum supported channels and maximum allowed frame size [#52] - Thanks gastlich.
    • We now raise an exception if the maximum allowed channel count is reached.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.0(Jan 17, 2018)

  • 2.3.0(Nov 8, 2017)

    • Added delivery_tag property to message.
    • Added redelivered property to message [#41] - Thanks tkram01.
    • Added support for Management Api Healthchecks [#39] - Thanks Julien Carpentier.
    • Fixed incompatibility with Sun Solaris 10 [#46] - Thanks Giuliox.
    • Fixed delivery_tag being set to None by default [#47] - tkram01.
    • Exposed requests verify and certs flags to Management Api [#40] - Thanks Julien Carpentier.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.2(Apr 23, 2017)

  • 2.2.1(Feb 22, 2017)

    • Fixed potential Channel leak [#36] - Thanks Adam Mills.
    • Fixed threading losing select module during python shutdown [#37] - Thanks Adam Mills.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.0(Nov 18, 2016)

    • Connection.close should now be more responsive.
    • Channels are now reset when re-opening an existing connection.
    • Re-wrote large portions of the Test suit.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.4(Nov 3, 2016)

    • Added parameter to override auto-decode on incoming Messages - Thanks Travis Griggs.
    • Fixed a rare bug that could cause the consumer to get stuck if the connection unexpectedly dies - Thanks Connor Wolf.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.3(Sep 29, 2016)

  • 2.1.2(Sep 23, 2016)

Owner
Erik Olof Gunnar Andersson
The views expressed here are 100% mine and in no way reflect those of my employer.
Erik Olof Gunnar Andersson
Python based Algo trading bot for Nifty / Banknifty futures and options

Fully automated Alice Blue Algo Trading with Python on NSE and MCX for Nifty / Crude / Banknifty futures and options , absolutely FREE ! This algo tra

Rajesh Sivadasan 49 Dec 31, 2022
Neofetch/pfetch, but for weather

Wfetch Neofetch/pfetch, but for weather Features Information about the weather outside: Weather condition Temperature Humidity Pressure Wind Sunrise-s

G_cat 72 Nov 18, 2022
Python3 based bittrex rest api wrapper

bittrex-rest-api This open source project was created to give an understanding of the Bittrex Rest API v1.1/v3.0 in pearl language. The sample file sh

4 Nov 15, 2022
Using DST's API with Python

A short guide on how to access Denmark's Statistics API with python, together with a helper class that facilitates the collection of data and metadata from any DST's table

Alessandro Martinello 16 Dec 02, 2022
Discord Mass Report script that uses multiple tokens

Discord-Mass-Report Discord Mass Report script that uses multiple tokens, full credits to https://github.com/hoki0/Discord-mass-report who made it in

cChimney 4 Jun 08, 2022
Fully automated Chegg Discord bot for "homework help"

cheggbog Fully automated Chegg Discord bot for "homework help". Working Sept 15, 2021 Overview Recently, Chegg has made it extremely difficult to auto

Bryce Hackel 8 Dec 23, 2022
Easy & powerful bot to check if your all Telegram bots are working or not. This bot status bot updates every 45 minutes & runs for 24x7 hours.

PowerfulBotStatus-IDN-C-X Easy & powerful bot to check if your all Telegram bots are working or not. This bot status bot updates every 45 minutes & ru

IDNCoderX 5 Oct 06, 2022
A PowerFull Telegram Mirror Bot.......

- [ DEAD REPO AND NO MORE UPDATE ] Slam Mirror Bot Slam Mirror Bot is a multipurpose Telegram Bot written in Python for mirroring files on the Interne

αвιנтн 2 Nov 09, 2021
b2blaze

b2blaze Welcome to the b2blaze library for Python. Backblaze B2 provides the cheapest cloud object storage and transfer available on the internet. Com

George Sibble 603 Jan 03, 2023
A simple Telegram bot that converts a phone number to a direct whatsapp chat link

Open in WhatsApp I was using a great app to open a whatsapp chat with a given number directly without saving that number in my contact list, but I fel

Pathfinder 19 Dec 24, 2022
Senexia - A powerful telegram bot to manage your groups as effectively as possible

⚡ Kenechi bot ⚡ A Powerful, Smart And Simple Group Manager ... Written with AioG

Akhi 2 Jan 11, 2022
Declarative assertions for AWS

AWSsert AWSsert is a Python library providing declarative assertions about AWS resources to your tests. Installation Use the package manager pip to in

19 Jan 04, 2022
The most versatile torrent leecher and youtube-dl bot for telegram

TorToolkit Telegram So basically Tortoolkit is aimed to be the most versatile torrent leecher and youtube-dl bot for telegram. This bot is highly cust

αвιנтн 1 Nov 11, 2021
Python package for Calendly API v2

PyCalendly Python package to use Calendly API-v2. Installation Install with pip $ pip install PyCalendly Usage Getting Started See Getting Started wi

Lakshmanan Meiyappan 20 Dec 05, 2022
PyDiscord, a maintained fork of discord.py, is a python wrapper for the Discord API.

discord.py A modern, easy to use, feature-rich, and async ready API wrapper for Discord written in Python. The Future of discord.py Please read the gi

Omkaar 1 Jan 16, 2022
A Discord token grabber written in Python3, with awesome obfuscation and anti-debug protection.

☣️ Plague ☣️ Plague is a Discord token grabber written in Python3, obfuscated with Kramer, protected from traffic analysers with Scarecrow and using t

Billy 125 Dec 20, 2022
Send song lyrics to iMessage users using the Genius lyrics API

pyMessage Send song lyrics to iMessage users using the Genius lyrics API. Setup 1.) Open the main.py file, and add your API key on line 7. 2.) Install

therealkingnull 1 Jan 23, 2022
A discord bot providing notifications of player activity on a minecraft server.

tos-alert A discord bot providing notifications of player activity on a minecraft server. Setup By default the app does not launch and will crash with

1 Jul 22, 2022
Automatic generation of crypto-arts based on image layers

NFT Generator Автоматическая генерация крипто-артов на основе слоев изображения. Установка pip3 install -r requirements.txt rm -rf result/* Как это ра

Zproger 31 Dec 29, 2022
Discord rich-presence implementation for VALORANT

not working on v1 anymore in favor of v2, but if there's any big bugs i'll try to fix them valorant-rich-presence-client Discord rich presence extensi

colinh 278 Jan 08, 2023