Streamz helps you build pipelines to manage continuous streams of data

Overview

Streamz

Build Status Documentation Status Version Status RAPIDS custreamz gpuCI

Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on.

Optionally, Streamz can also work with both Pandas and cuDF dataframes, to provide sensible streaming operations on continuous tabular data.

To learn more about how to use Streamz see documentation at streamz.readthedocs.org.

LICENSE

BSD-3 Clause

Issues
  • Improve kafka source

    Improve kafka source

    Fixes #143

    The idea is that dynamic polling should always be marked as async, and latch on to any IOLoop going, or the one passed.

    I hope this fixes the jupyter display problem - it certainly works from print, although that has the problem of the actual output appearing wherever the latest code was executed.

    opened by martindurant 80
  • Proposal to add Checkpointing to streamz.

    Proposal to add Checkpointing to streamz.

    I closed out the earlier checkpointing PR #272, since it was an incorrect implementation. Before I raise a new PR, I wanted to put out a proposal and get as many reviews as possible, so as to have a sure footing to implement the feature. So here goes:

    The motivation for Checkpointing is simple — we need to track the amount of data that has been processed in our streaming pipeline so that in case the stream has to restart, it can pick up from where it left off. Please note, that processed here is different from data only being read/consumed in the pipeline.

    Spark follows the following heuristics for checkpointing:

    1. Checkpointing is mandatory for all streaming pipelines, as would be in any production environments. However, we can make checkpointing optional in streamz API, if need be.
    2. Whenever a batch of data is processed completely, and the results are written to a sink, Spark creates a new checkpoint for this batch with the latest offsets for all partitions for the topic(s) involved in the batch.
    3. Spark does not consider printing to console or the like as a sink. In case of streamz, the only valid sink for now is Kafka.
    4. The directory structure of checkpointing is as follows:
    checkpoint_path/ #this is provided by the user 
          topic1/
                        1.txt #Checkpoint for the first batch
                        2.txt #Checkpoint for the second batch 
                        .
                        .
          topic2/
                        1.txt
                        .
    
    1. Now, if a stream is killed for some reason, and restarts, it would pick up reading the Kafka topic from the last checkpoint file created.
    2. Only the last few checkpoints are maintained, and the older ones are deleted. We can hard-code this number to something, for example, 100.
    3. If multiple streams join/union, Spark also takes care of the checkpoints for all topics that may be involved.

    Implementation in streamz: The following is one of the cleanest ways, IMO, to implement checkpointing.

    1. Reading a batch from Kafka starts off with from_kafka_batched().
    from_kafka_batched([topics], consumer_conf, checkpoint_path, ..):	
    	consumer_conf[‘enable.auto.commit’] = False
    	for topic in topics:
    		Retrieve last created checkpoint for the topic from the checkpoint_path.
     		for partition in partitions:
    			emit (partition, low, high) tuples in stream for get_message_batch()
    

    Note that, [topics] parameters is a list of topic that the stream can now subscribe/consume to/from. The checkpoint_path parameter has to be specified by the user. In case the user does not want to use checkpointing, we can set the default to None to use streamz as is.

    1. get_message_batch() consumes the messages from the specific partition between the low and high offsets specified in the tuples emitted in the stream.
    get_message_batch(kafka_params, topic, partition, low, high, checkpoint_path, ..):
    	Consume messages
            Create a dirty-bit file in the checkpoint_path — this is an indication that some data is yet to be checkpointed.
    	Create/update a write-ahead log (WAL) textfile for this topic partition with the latest offsets that have been read and are ready to be `checkpointed` if processing is successful.      
    

    The purpose of the dirty-bit is explained later on. The WAL would be a text-file, #partition.txt in a topic-level directory in the checkpoint path. It would contain the offset for this topic partition that has last been consumed, and if processing goes through to finally write back to a sink, this the offset to be checkpointed.

    The directory structure would now look like this:

    checkpoint_path/
          dirty_bit
          write-ahead-logs/
               topic1/
                        1.txt #Log for first partition of topic 1.
                        2.txt #Log for second partition of topic 1. 
                        .
                        .
               topic2/
                        1.txt
                        .
          topic1/
                        1.txt #Checkpoint for the first batch
                        2.txt #Checkpoint for the second batch 
                        .
                        .
          topic2/
                        1.txt
                        .
    
    1. There would now be some stream-specific processing functions, e.g., map(), etc.
    2. Finally, if we write back the results to Kafka using to_kafka(), this is where the checkpoint creation would take place.
    to_kafka(checkpoint_path, output_topic, producer_conf):
           ... #The current operations within to_kafka() stay as is.
           
           #Everything below will be done from a separate function add_checkpoint()
           If no dirty-bit file, exit.
           Delete dirty-bit file here.
            new_checkpoint = {}
    	for topic in [topics from WAL]: (#We can make this multi-threaded)
    		for partition in WAL/topic path:
                          new_checkpoint[partition] = offset
                    Create checkpoint file for this batch for this topic at `checkpoint_path/topic`. 
    

    A checkpoint file would look like a JSON {partition:offset} dictionary:

    checkpoint_path/topic1/1.txt
    {0: 5, 1: 4, 2: 3}
    

    Let's say we decide to keep only the last 100 checkpoints for each topic. Then, as soon as the 101st checkpoint is created, the 1st checkpoint 1.txt is deleted, and the 101.txt gets created.

    This finishes the implementation.

    Now, a few questions and their answers:

    1. Since, to_kafka() is called on every element of the stream, and if we are using Dask, all WAL for all partitions are created in parallel. After we have gather().to_kafka(), the checkpoint is created on the first call to to_kafka() itself. In this case, how do we avoid duplicate checkpoints being created? A maximum of #partitions duplicate checkpoints can be created. Answer: This is where the dirty bit file comes into play. If new data is consumed, a dirty bit is set. to_kafka() does not attempt to create a new checkpoint if the dirty bit is not set. After a checkpoint is created, the dirty bit is deleted by to_kafka().

    2. The WAL and the checkpoints would be stored on the local file system for now? Answer: Yes, we can support S3 in the future, like Spark.

    3. In the future, if we have different sinks apart from Kafka, like SQS, Google Pub-Sub, ElasticSearch, do we need to implement checkpointing all over again? Answer: No. When writing to a sink, one would just call the add_checkpoint() function that we would create as part of this PR. The source would always be Kafka, though, for the foreseeable future.

    4. We need to pass the checkpoint_path to both from_kafka_batched() and to_kafka()? Answer: Yes, that's unfortunate. I'd love to hear another way to do this for a cleaner API. But we need the path in from_kafka_batched() to retrieve the last checkpoint, and in to_kafka() to create a new checkpoint.

    5. Would this work with Dask? Answer: Absolutely. We would definitely want this to work with Dask!

    opened by chinmaychandak 48
  • Added the to_kafka stream

    Added the to_kafka stream

    I have added the to_kafka class.

    The options were to either:

    1. Call the .flush() method after each .produce(...) call. This will seriously hurt performance as messages will be sent serially.
    2. Use atexit to call flush at the end. This requires the use of atexit, of which I'm unsure of any impact. It also means that the result of the .emit(...) call by the client does not yield an immediate call to the downstream. Note that the Confluent library will flush when the batch size reaches a threshold. This is the method that is implemented here.
    opened by jsmaupin 47
  • Add HoloViews based plotting API

    Add HoloViews based plotting API

    Adds handling for bar plots on Series and Seriess objects. Also fixes Seriess.plot.line, Seriess.plot.scatter and Seriess.plot.area, ensuring the index is reset and therefore made visible to HoloViews.

    opened by philippjfr 39
  • A potential approach for checking if a datum is done

    A potential approach for checking if a datum is done

    One way to think about check pointing is knowing if a piece of data has completely exited the pipeline, it is no longer being processed or being held in a cache.

    Backpressure can help out here, since we can know if the pipeline has returned control up the call stack.

    I think there are three ways that control is returned:

    1. All computations have finished, the data has left through a sink
    2. Some computations have finished, the data was dropped by a filter
    3. Some computations have finished, the data was stored in a cache waiting for something

    1 and 2 are handled by backpressure nicely, since if all the computations have finished then the data is done.

    3 is a bit more tricky since we need to track the cached values.

    Potential implementation

    def _emit(..., metadata={'reference': RefCounter}):
        RefCounter += len(self.downstreams)
        for downstream in downstream:
            downstream.update(x)
            RefCounter -= 1
    

    Caching nodes

    def update(...):
        cache.append(x)
        RefCounter += 1
    

    When the RefCounter hits zero the data is done.

    Note that this would need to merge the refcounting metadata when joining nodes. For instance combine_latest would need to propagate the ref counters for all incoming data sources, and all downstream data consumers would need to increment/decrement for all the ref counters.

    We'd also need to make the refcounters at data ingestion time.

    opened by CJ-Wright 35
  • A simple plugin system

    A simple plugin system

    Right now, if I need to add my own custom stream nodes, I have to do this:

    import mypackage.streamz_extras  # noqa: F401
    

    It would be nice to have a way to distribute additional functionality as separate packages that can just be installed via pip. This can be done with entry_points, similar to the way it works in airflow. This is a super bare-bones implementation of this mechanism. Check out https://github.com/roveo/streamz_example_plugin for an example of a plugin.

    Problems:

    • this should be tested, but I have no idea how, short of shipping the code for example plugin with tests
    • plugged-in classes should be checked for validity in some way. I can add a simple check e.g. isinstance(plugin, Stream), but there is probably something else I haven't thought of
    opened by roveo 29
  • Add refresh_cycles parameter to from_kafka_batched to accommodate adding Kafka topic partitions on the fly.

    Add refresh_cycles parameter to from_kafka_batched to accommodate adding Kafka topic partitions on the fly.

    Streamz should be able to handle addition of Kafka topic partitions on the fly. The refresh_cycles parameter polls Kafka every N cycles to get the latest Kafka partition metadata and accommodates any added partitions on the fly.

    [Some more discussion in #358]

    opened by chinmaychandak 29
  • combine_latest emit_on behavior

    combine_latest emit_on behavior

    Should we buffer the combine_latest emit_on stream?

    Context

    Consider the following situation, we have two streams a and b. We are going to use combine_latest to combine them, with an emit only on a. 5 entries come down from a then one comes down from b then one from a. Due to the lack of data from b none of the 5 initial a entries have been emitted since the b was missing. Now that we have b data do we expect it to emit 6 times (which would require the buffering of the emit_on stream or only once (which may violate the idea of the emit_on stream always emitting)?

    @mrocklin @ordirules @danielballan

    opened by CJ-Wright 26
  • Added checkpointing

    Added checkpointing

    This works by creating a future at the source for each batch. The futures are passed down the pipeline with the data. Each type of function does something different depending on that function's operation. Some functions will mark a future as down with 'drop' to tell the source not to commit the offsets. Others, such as map will pass the future downstream without affecting it.

    opened by jsmaupin 24
  • Connect streams

    Connect streams

    Suggested resolution to #44 . It makes a big difference when visualizing graphs and I'm using it now, so I thought I'd already go ahead and send it. Please let me know if there are better ways.

    From issue, changing from s2.map(s1.emit) to s2.connect(s1) leads to difference of the two figures:

    s2.map(s1.emit) : http://imgur.com/a/0Z0JA

    s2.connect(s1) : http://imgur.com/a/rcrX0

    opened by jrmlhermitte 24
  • WIP - Batches, dispatch, and nesting

    WIP - Batches, dispatch, and nesting

    OK, this is a work-in-progress PR that shows some ability to nest different kinds of elements in a stream. This PR does a few things to enable this:

    1. We make a Batch type that is a tuple of many elements as a single batch in a stream
    2. We create a singledispatch system to replace the monkeypatching that we used to do on Futures
    3. We do a bit of magic so that the functions we pass down to elements again checks whatever it runs on and possible nests further.

    As a result, we can do the following example, which maps a function over remote batches:

    In [1]: from dask.distributed import Client
    
    In [2]: client = Client()
    
    In [3]: from streams import Stream, Batch
    
    In [4]: import streams.dask
    
    In [5]: source = Stream()
    
    In [6]: batches = source.partition(3).map(Batch)
    
    In [7]: futures = streams.dask.scatter(batches)
    
    In [8]: L = futures.map(lambda x: x + 1).sink_to_list()
    
    In [9]: for i in range(9):
       ...:     source.emit(i)
       ...:     
    
    In [10]: L
    Out[10]: 
    [<Future: status: finished, type: Batch, key: _stream_map-06488d54c44f3a3db136e6c34468eb13>,
     <Future: status: finished, type: Batch, key: _stream_map-cad3989e05761099bf5b22db5992eed3>,
     <Future: status: finished, type: Batch, key: _stream_map-ecd902c4b2cd3a96cb3c73694d64cfe6>]
    
    In [11]: client.gather(L)
    Out[11]: [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
    

    Note that things like accumulate (which will have to be changed) and merge (which will have to be split) don't work yet. I wanted to get feedback from @ordirules and @danielballan .

    opened by mrocklin 23
  • Gracefully exit python script using Streams

    Gracefully exit python script using Streams

    I have a use case associated with pulling data from a Kafka topic. I need the streamz operator exit gracefully and exit the python script once it hits an exception. It looks something like this :

    source = Stream.from_kafka_batched(TOPIC, kafka_confs, poll_interval='20s', max_batch_size=10000)
    
    def process_messages():
        try:
             #process_messages
       except Exception as e:
            print(e)
            disconnect_gracefully()
    
    def disconnect_gracefully():
        logging.info("Exit gracefully")
        source.stop()
        source.destory()
        
    source.map(process_messages)
    

    While this seems to work for the streamz operator, I feel like it doesn't disconnect from the Kafka broker and I get logs like this

    %6|1651194599.149|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: my-kafka-server:9093: Disconnected (after 80522ms in state UP)
    

    So, the script doesn't exit. Any pointers to how this can be done effectively?

    opened by arjun180 1
  • Streamz/Dask: gather does not wait for all results of buffer

    Streamz/Dask: gather does not wait for all results of buffer

    Hi Streamz team. I was following the docs to use streamz with dask and facing the issue that dask returns before all the results are calculated, when used with buffer. There is a stackoverflow question regarding this as well - https://stackoverflow.com/questions/60032474/streamz-dask-gather-does-not-wait-for-all-results-of-buffer

    Wondering if this is there a fix as this is not usable is it doesn't wait for the last piece to come back.

    opened by liusztc09 0
  • Transfer to notebook

    Transfer to notebook

    Simply copying the script code to a notebook. Could use some cleaning and prose. For some reason, we are getting lookback on the cluster centres, whereas we should only be showing the most recent.

    cc @maximlt

    opened by martindurant 1
  • Add Stream node constructor for sub-classing #442

    Add Stream node constructor for sub-classing #442

    This PR provides a constructor on the Stream which can be overridden for sub-classing the nodes. This is to create or override methods without altering the original Stream class. It gives a single point of entry to extend any node while chaining. The behaviour remains the same unless Stream._new_node is overridden.

    This is a usage example to subclass and inherit a Stream :

    import streamz as sz
    
    class MyStream(sz.Stream):
        def _new_node(self, cls, args, kwargs):
            if not issubclass(cls, MyStream):
                cls = type(cls.__name__, (cls, MyStream), dict(cls.__dict__))
            return cls(*args, **kwargs)
    
    @MyStream.register_api()
    class foo(sz.sinks.sink):
        pass
    
    stream = MyStream()
    stream.map(lambda x: x + 1).foo(print)
    stream.emit(100)
    
    opened by florentbr 7
  • Fail to extend a Stream with new methods

    Fail to extend a Stream with new methods

    Inheritance on a Stream seems to be broken.

    The following example fails with 'map' object has no attribute 'draw' :

    import streamz
    
    class MyStream(streamz.Stream) :
        def draw(self) :
            print('called draw')
    
    @MyStream.register_api()
    class draw(MyStream) :
        def __init__(self, *args, **kwargs) :
            print('called draw')
    
    s = MyStream()
    s.map(lambda x : x + 5).draw()
    s.emit(99)
    

    I couldn't find a class factory in the source and it looks like all the nodes are instantiated directlly with Stream when it should use the type of the main instance.

    opened by florentbr 6
  • Increase the usage of augmented assignment statements

    Increase the usage of augmented assignment statements

    :eyes: Some source code analysis tools can help to find opportunities for improving software components. :thought_balloon: I propose to increase the usage of augmented assignment statements accordingly.

    Would you like to integrate anything from a transformation result which can be generated by a command like the following? (:point_right: Please check also for questionable change suggestions because of an evolving search pattern.)

    lokal$ perl -p -i.orig -0777 -e 's/^(?<indentation>\s+)(?<target>\S+)\s*=\s*\k<target>[ \t]*(?<operator>[+\-%&|^@]|\*\*?|\/\/?|<<|>>)/$+{indentation}$+{target} $+{operator}=/gm' $(find ~/Projekte/Streamz/lokal -name '*.py')
    
    opened by elfring 0
Owner
Python Streamz
A small real-time streaming library for python
Python Streamz
Reading streams of Twitter data, save them to Kafka, then process with Kafka Stream API and Spark Streaming

Using Streaming Twitter Data with Kafka and Spark Reading streams of Twitter data, publishing them to Kafka topic, process message using Kafka Stream

Rustam Zokirov 1 Dec 6, 2021
PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

null 3 May 11, 2022
Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

Tuplex 758 May 16, 2022
This tool parses log data and allows to define analysis pipelines for anomaly detection.

logdata-anomaly-miner This tool parses log data and allows to define analysis pipelines for anomaly detection. It was designed to run the analysis wit

AECID 23 Apr 12, 2022
Building house price data pipelines with Apache Beam and Spark on GCP

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

null 1 Nov 22, 2021
Data pipelines built with polars

valves Warning: the project is very much work in progress. Valves is a collection of functions for your data .pipe()-lines. This project aimes to host

null 9 May 7, 2022
Python library for creating data pipelines with chain functional programming

PyFunctional Features PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a few examples of what it can do

Pedro Rodriguez 2k May 18, 2022
PipeChain is a utility library for creating functional pipelines.

PipeChain Motivation PipeChain is a utility library for creating functional pipelines. Let's start with a motivating example. We have a list of Austra

Michael Milton 1 Oct 27, 2021
🧪 Panel-Chemistry - exploratory data analysis and build powerful data and viz tools within the domain of Chemistry using Python and HoloViz Panel.

???? ??. The purpose of the panel-chemistry project is to make it really easy for you to do DATA ANALYSIS and build powerful DATA AND VIZ APPLICATIONS within the domain of Chemistry using using Python and HoloViz Panel.

Marc Skov Madsen 75 Apr 23, 2022
follow-analyzer helps GitHub users analyze their following and followers relationship

follow-analyzer follow-analyzer helps GitHub users analyze their following and followers relationship by providing a report in html format which conta

Yin-Chiuan Chen 2 May 2, 2022
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Glotzer Group 44 Apr 18, 2022
Renato 176 Apr 27, 2022
Recommendations from Cramer: On the show Mad-Money (CNBC) Jim Cramer picks stocks which he recommends to buy. We will use this data to build a portfolio

Backtesting the "Cramer Effect" & Recommendations from Cramer Recommendations from Cramer: On the show Mad-Money (CNBC) Jim Cramer picks stocks which

Gábor Vecsei 9 Apr 26, 2022
In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift.

ETL Pipeline for AWS Project Description In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift. The data is loaded from S3 t

Mobeen Ahmed 1 Nov 1, 2021
This mini project showcase how to build and debug Apache Spark application using Python

Spark app can't be debugged using normal procedure. This mini project showcase how to build and debug Apache Spark application using Python programming language. There are also options to run Spark application on Spark container

Denny Imanuel 1 Dec 29, 2021
Amundsen is a metadata driven application for improving the productivity of data analysts, data scientists and engineers when interacting with data.

Amundsen is a metadata driven application for improving the productivity of data analysts, data scientists and engineers when interacting with data.

Amundsen 3.3k May 15, 2022
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

null 544 May 16, 2022
A Pythonic introduction to methods for scaling your data science and machine learning work to larger datasets and larger models, using the tools and APIs you know and love from the PyData stack (such as numpy, pandas, and scikit-learn).

This tutorial's purpose is to introduce Pythonistas to methods for scaling their data science and machine learning work to larger datasets and larger models, using the tools and APIs they know and love from the PyData stack (such as numpy, pandas, and scikit-learn).

Coiled 98 Jan 17, 2022