ZeroMQ bindings for Twisted

Overview

Twisted bindings for 0MQ

https://coveralls.io/repos/smira/txZMQ/badge.png

Introduction

txZMQ allows to integrate easily ØMQ sockets into Twisted event loop (reactor).

txZMQ supports both CPython and PyPy and ØMQ library version 2.2.x or 3.2.x.

Documentation is available at ReadTheDocs.

Requirements

C library required:

  • ØMQ library 2.2.x or 3.2.x

Python packages required:

  • pyzmq >= 13 (for CPython & PyPy)
  • Twisted

Details

txZMQ introduces support for general 0MQ sockets by class ZmqConnection that can do basic event loop integration, sending-receiving messages in non-blocking manner, scatter-gather for multipart messages.

txZMQ uses ØMQ APIs to get file descriptor that is used to signal pending actions from ØMQ library IO thread running in separate thread. This is used in a custom file descriptor reader, which is then added to the Twisted reactor.

Upgrading from 0.3.x

If you're upgrading from version 0.3.1 and earlier, please apply following changes to your code:

  • root package name was changed from txZMQ to txzmq, adjust your imports accordingly;
  • ZmqEndpointType.Connect has been renamed to ZmqEndpointType.connect;
  • ZmqEndpointType.Bind has been renamed to ZmqEndpointType.bind;
  • ZmqConnection.__init__ has been changed to accept keyword arguments instead of list of endpoints; if you were using one endpoint, no changes are required; if using multiple endpoints, please look for add_endpoints method.

Hacking

Source code for txZMQ is available at github; forks and pull requests are welcome.

To start hacking, fork at github and clone to your working directory. To use the Makefile (for running unit tests, checking for PEP8 compliance and running pyflakes), you will want to have virtualenv installed (it includes a pip installation).

Create a branch, add some unit tests, write your code, check it and test it! Some useful make targets are:

  • make env
  • make check
  • make test

If you don't have an environment set up, a new one will be created for you in ./env. Additionally, txZMQ will be installed as well as required development libs.

Comments
  • Timeout support

    Timeout support

    I have looked at the code and it does not seem like there is any support for setting timeouts.

    Is that correct, it seems like a pretty fundamental thing for REP/REQ over tcp.

    opened by brunsgaard 23
  • Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Just want to confirm that you are intentionally prohibiting distribution of this package along with proprietary software. While GPL is certainly a valid license for a library, the LGPL is more common for libraries (e.g. psycopg2) -- and you (appear to) have licensed other packages under more permissive licenses like MPL and MIT.

    opened by ambsw-technology 19
  • DEALER socket losing messages

    DEALER socket losing messages

    Hi all,

    I am seeing strange behavior in an application using DEALER and ROUTER sockets.

    Requests made by the DEALER are always received by the ROUTER, but the responses made by the ROUTER are sometimes lost. I've added a test to demonstrate this behavior. It is the only change in this pull request.

    https://github.com/wehriam/txZMQ/blob/master/txZMQ/test/test_async_dealer_router.py

    In the application (but not demonstrated in the test) after some varying number of requests responses eventually fail to arrive entirely.

    I was unsuccessful in resolving this issue and hope you'll have better luck in determining the underlying problem.

    OSX Lion, Python 2.7.1 ZMQ 2.1.9

    Twisted==11.0.0 pep8==0.6.1 pyflakes==0.5.0 pyzmq==2.1.10 txZMQ==0.3 wsgiref==0.1.2 zope.interface==3.8.0

    opened by wehriam 13
  • push with bind

    push with bind

    i run push_pull.py example, using python push_pull.py --method=bind --mode=push, raise

    python push_pull.py --method=bind --mode=push
    
    producing ['1352354886.76', 'ubuntu']
    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 413, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 301, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 290, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 551, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 426, in _continueFiring
        callable(*args, **kwargs)
      File "push_pull.py", line 45, in produce
        s.push(data)
      File "/home/jjx/sources/txZMQ/txzmq/pushpull.py", line 22, in push
        self.send(message)
      File "/home/jjx/sources/txZMQ/txzmq/connection.py", line 245, in send
        self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
      File "socket.pyx", line 499, in zmq.core.socket.Socket.send (zmq/core/socket.c:5381)
    
      File "socket.pyx", line 546, in zmq.core.socket.Socket.send (zmq/core/socket.c:5143)
    
      File "socket.pyx", line 175, in zmq.core.socket._send_copy (zmq/core/socket.c:2139)
    
    zmq.core.error.ZMQError: Resource temporarily unavailable
    
    
    opened by jiangjianxiao 11
  • Support for Zeromq3.2

    Support for Zeromq3.2

    Hi, zeromq3.2 isn't quite out yet, but it will be soon! pyzmq has had experimental support for it since pyzmq-2.1.7.

    The following commits introduce the start of experimental support for zeromq-3.2. Whether pyzmq is built against zeromq-2.2 or zeromq-3.2, the txZMQ tests should pass.

    pubsub works, but the req/rep and router/dealer patterns are broken for reasons I haven't figured out yet.

    opened by ralphbean 9
  • Integrate message reading into twisted reactor loop

    Integrate message reading into twisted reactor loop

    During work with txZMQ I met the case when fast incoming messages stream with not so fast message processing functions prevents twisted reactor's loop from resuming because loop reading messages in doRead() never ends - there are always new messages arrived. This pull request transforms message reading and processing loop to cooperative task scheduled by Twisted.

    opened by daa 7
  • XREQ/XREP

    XREQ/XREP

    Hi!

    I just put together this initial XREQ/XREP ZmqConnections subclasses.

    There are also some small changes to the base ZmqConnection class, the most important is the doRead call at the end of _startWriting method. This is the only way (I've found ) to be sure we don't miss any events from the socket, as it's event-triggered and twisted might miss an event while we are writting.

    Cheers.

    opened by verterok 7
  • Incorrect (twice?) shutdown when registered for shutdown

    Incorrect (twice?) shutdown when registered for shutdown

    The factory which has been registered for shutdown ...

            self.zmqfactory = ZmqFactory()
            self.zmqfactory.registerForShutdown()
            self.receiver = MyZmqSubConnection(self,self.zmqfactory)
    

    is shutted down later

           self.zmqfactory.shutdown()
    

    As a result, the unhandled exception happens when the reactor is shutted down:

    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 416, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 307, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 296, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 578, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 429, in _continueFiring
        callable(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/txzmq/factory.py", line 51, in shutdown
        for connection in self.connections.copy():
    

    It probably useful either add an 'active' flag of the factory to avoid conflict when shutting down twice (manually and automatically), or unregister from the reactor to be shutted down automatically to avoid a conflict as such.

    opened by nnseva 6
  • Is it possible to create txzmq daemon as twistd plugin?

    Is it possible to create txzmq daemon as twistd plugin?

    Hi,

    How can i create txzmq daemon as twistd plugin? I googled for some information, but without success. If it is possible to do, I will be grateful for any help, advice or information.

    opened by tmpd 6
  • highWaterMark cannot protect from memory overflow

    highWaterMark cannot protect from memory overflow

    With pyzmq, setting HWM would protect you from memory overflow issues like this:

    import zmq
    ctx = zmq.Context()
    s= zmq.socket(zmq.PUSH)
    s.setsockopt(zmq.HWM, 8)
    s.connect('tcp://localhost:33710')  # non-exist endpoint
    while True:
        s.send('XXX')  # this will block when high water mark is reached
    

    With txZMQ, ZmqConnection.send() simply append into a deque(), regardless of highWaterMark. Therefore test like above would take up a lot of memory.

    If send() could return a Deferred object, the calling code would be able to do something about it.

    A limited work around is to set a maxlen on the deque object, so it would drop data when full. Problem is if you really want to wait until the exceptional HWM state is finished (data is very important for example), this won't help.

    feature 
    opened by fantix 6
  • 'ZmqREPConnection' object has no attribute '_routingInfo'

    'ZmqREPConnection' object has no attribute '_routingInfo'

    In our project we encountered the following failure:

    
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 153, in __init__
        ZmqConnection.__init__(self, *args, **kwargs)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 174, in __init__
        self.doRead()
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 281, in doRead
        log.callWithLogger(self, self.messageReceived, message)
    --- <exception caught here> ---
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 88, in callWithLogger
        return callWithContext({"system": lp}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 73, in callWithContext
        return context.call({ILogContext: newCtx}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 179, in messageReceived
        self._routingInfo[msgId] = routingInfo
    exceptions.AttributeError: 'ZmqREPConnection' object has no attribute '_routingInfo'
    
    

    It seems like the doRead called during initialization of ZmqREPConnection actually gets a message and tries to handle it, before the _routingInfo attribute was initialized later in its __init__.

    opened by aryes 5
  • docs: fix simple typo, recevied -> received

    docs: fix simple typo, recevied -> received

    There is a small typo in txzmq/pubsub.py.

    Should read received rather than recevied.

    Semi-automated pull request generated by https://github.com/timgates42/meticulous/blob/master/docs/NOTE.md

    opened by timgates42 0
  • txzmq/pubsub.py

    txzmq/pubsub.py", line 26 -- exception cannot conatenate bytes

    File "/home/ar/Envs/a3_pyvcluster/lib/python3.7/site-packages/txzmq/pubsub.py", line 26, in publish |class ZmqSubConnection(ZmqConnection): self.send(tag + b'\0' + message) | """
    TypeError: can only concatenate str (not "bytes") to str

    def publish(self, message, tag=b''): """ Publish message with specified tag.

        :param message: message data
        :type message: str
        :param tag: message tag
        :type tag: str
        """
        self.send(tag + b'\0' + message) <---- It does not work in python3, I tried both tag and message being bytes and it does not work either
    
    opened by alex4747-pub 1
  • Timeout handling in REP-REQ

    Timeout handling in REP-REQ

    I'm not quite sure this is a bug or if its an intended feature, and I'd like to elaborate on this problem that I've spent some time figuring out:

    I'm using REP-REQ in a context where the REP server is unstable. The txzmq implements a timeout on the ZmqREQConnection.sendMsg() method. This timeout works perfectly in itself, but after some digging it seems the timeout feature is not a part of zmg. The problem with this is that the message is still left in zmq REQ queue, even after the caller has already gotten its Errback() called for the this message. If the server becomes available at some later time, the reply will be send back to the REQ client, but it goes nowhere in the txzmq stack.

    The second problem is that sending a message to the server is the only way of knowing if the server connection is alive, so one needs to keep sending messages (that will time out) regularly to check if the link is up. When it is up again, the server will be flooded with a stream of these "alive" messages.

    My resolution to the issue is to use a variant of the lazy pirate pattern: When sending a message with sendMsg() I install an Errback callback that closes the link, which will flush the connection.

        if not socket:
            socket = ZmqREQConnection(zmq, req_endpoint)
    
        try:
            d = socket.sendMsg(message, timeout=2)
    
            def onTimeout(fail):
                socket.shutdown()
                socket = None
                return fail
    
            d.addErrback(onTimeout)
            return d
    
        except zmq.error.Again:
            print("Handle me...")
    

    Since this is more an effect of the zmq's REP-REQ implementation than txzmq, I'm not sure this is a bug in txzmq, but I dislike that a message receives a timeout, while still being scheduled for transmission. Perhaps the above logic is required? Perhaps there is some way to cancel the message when it times out?

    opened by sveinse 1
  • Add support for multipart messages and remove custom framing for pub/sub

    Add support for multipart messages and remove custom framing for pub/sub

    I've changed the pub/sub code to remove the custom framing using a null byte and added code to send and receive multipart messages. I've ensured API-level compatibility as well as making sure old versions can read data sent by the old one (thanks to the existing compatibility code there). But of course older versions won't be able to read multipart messages sent by this one.

    opened by cncfanatics 5
Releases(1.0.1)
Quick & dirty controller to schedule Kubernetes Jobs later (once)

K8s Jobber Operator Quickly implemented Kubernetes controller to enable scheduling of Jobs at a later time. Usage: To schedule a Job later, Set .spec.

Jukka Väisänen 2 Feb 11, 2022
Tencent Yun tools with python

Tencent_Yun_tools 使用 python3.9 + 腾讯云 AccessKey 利用工具 使用之前请先填写config.ini配置文件 Usage python3 Tencent_rce.py -h Scanner python3 Tencent_rce.py -s 生成CSV

<img src="> 13 Dec 20, 2022
Get Response Of Container Deployment Kube with python

get-response-of-container-deployment-kube 概要 get-response-of-container-deployment-kube は、例えばエッジコンピューティング環境のコンテナデプロイメントシステムにおいて、デプロイ元の端末がデプロイ先のコンテナデプロイ

Latona, Inc. 3 Nov 05, 2021
Micro Data Lake based on Docker Compose

Micro Data Lake based on Docker Compose This is the implementation of a Minimum Data Lake

Abel Coronado 15 Jan 07, 2023
CTF infrastructure deployment automation tool.

CTF infrastructure deployment automation tool. Focus on the challenges. Mirrored from

Fake News 1 Apr 12, 2022
sysctl/sysfs settings on a fly for Kubernetes Cluster. No restarts are required for clusters and nodes.

SysBindings Daemon Little toolkit for control the sysctl/sysfs bindings on Kubernetes Cluster on the fly and without unnecessary restarts of cluster o

Wallarm 19 May 06, 2022
MLops tools review for execution on multiple cluster types: slurm, kubernetes, dask...

MLops tools review focused on execution using multiple cluster types: slurm, kubernetes, dask...

4 Nov 30, 2022
Ralph is the CMDB / Asset Management system for data center and back office hardware.

Ralph Ralph is full-featured Asset Management, DCIM and CMDB system for data centers and back offices. Features: keep track of assets purchases and th

Allegro Tech 1.9k Jan 01, 2023
The leading native Python SSHv2 protocol library.

Paramiko Paramiko: Python SSH module Copyright: Copyright (c) 2009 Robey Pointer 8.1k Jan 04, 2023

IP address management (IPAM) and data center infrastructure management (DCIM) tool.

NetBox is an IP address management (IPAM) and data center infrastructure management (DCIM) tool. Initially conceived by the network engineering team a

NetBox Community 11.8k Jan 07, 2023
Define and run multi-container applications with Docker

Docker Compose Docker Compose is a tool for running multi-container applications on Docker defined using the Compose file format. A Compose file is us

Docker 28.2k Jan 08, 2023
Docker Container wallstreetbets-sentiment-analysis

Docker Container wallstreetbets-sentiment-analysis A docker container using restful endpoints exposed on port 5000 "/analyze" to gather sentiment anal

145 Nov 22, 2022
CDK Template of Table Definition AWS Lambda for RDB

CDK Template of Table Definition AWS Lambda for RDB Overview This sample deploys Amazon Aurora of PostgreSQL or MySQL with AWS Lambda that can define

AWS Samples 5 May 16, 2022
Oncall is a calendar tool designed for scheduling and managing on-call shifts. It can be used as source of dynamic ownership info for paging systems like http://iris.claims.

Oncall See admin docs for information on how to run and manage Oncall. Development setup Prerequisites Debian/Ubuntu - sudo apt-get install libsasl2-d

LinkedIn 928 Dec 22, 2022
Python IMDB Docker - A docker tutorial to containerize a python script.

Python_IMDB_Docker A docker tutorial to containerize a python script. Build the docker in the current directory: docker build -t python-imdb . Run the

Sarthak Babbar 1 Dec 30, 2021
A Habitica Integration with Github Workflows.

Habitica-Workflow A Habitica Integration with Github Workflows. How To Use? Fork (and Star) this repository. Set environment variable in Settings - S

Priate 2 Dec 20, 2021
Organizing ssh servers in one shell.

NeZha (哪吒) NeZha is a famous chinese deity who can have three heads and six arms if he wants. And my NeZha tool is hoping to bring developer such mult

Zilin Zhu 8 Dec 20, 2021
Cobbler is a versatile Linux deployment server

Cobbler Cobbler is a Linux installation server that allows for rapid setup of network installation environments. It glues together and automates many

Cobbler 2.4k Dec 24, 2022
Google Kubernetes Engine (GKE) with a Snyk Kubernetes controller installed/configured for Snyk App

Google Kubernetes Engine (GKE) with a Snyk Kubernetes controller installed/configured for Snyk App This example provisions a Google Kubernetes Engine

Pas Apicella 2 Feb 09, 2022