PyPipeline Core =============== PyPipeline has five major concepts 1. Components/Endpoints 2. Exchange 3. Pipeline 4. Enterprise Integration Patterns 5. Plumber Components/Endpoints -------------------- Components are the blocks which either produce, process or consume data that flows on the bus. In PyPipeline, the components can be of two types: Source ^^^^^^ A source is a component which produces data that will flow on the bus. How the data is generated is completely up to the source. Few examples of how sources may generate data are as follows * A MySQL source component may read a table from a database and send each row as one data packet. * A RabbitMQ source may listen to a queue and produce a data packet for every message received from the queue. * A REST source may access an API and send the response as a data packet The sources continuously run, until explicitly stopped, and are event based. They run their own thread, which responds to some event (a message on the queue, in case of RabbitMQ source) and produce data packets. A source might take some parameters for configuration, such as database name, host and port in case of MySQL source. When implementing any source, the class must extend the Source class from *core* package.:: class Source: def __init__(self, plumber, params): self.plumber = plumber self.chain = None self.params = params def start(self): raise NotImplementedError("Sources should implement their start method") def stop(self): raise NotImplementedError("Sources should implement their stop method") When start is called, the source must start it's own thread which listens to the events and produces data packets. To send the data of the bus, it must call the *process* method of *self.chain*. When stop is called, the source must stop the thread and not send any more data packets on the bug. Destination ^^^^^^^^^^^ A destination is a component which receives a data packet as input and processes it to either modify the data, or send it an external service, or both. Few examples of destination are as follows * A MongoDB destination component which receives json data and persists it in a collection * A reverse geocoder destination which modifies the data to convert it location field from lat/long to address * A log destination component which prints the content of the data on console A destination may take some parameters for configuration, such as collection name, host and port in case of MongoDB destination. When implementing any destination, the class must extend the Destination class from *core* package.:: class Destination: def __init__(self, plumber, params): self.plumber = plumber self.params = params def process(self, exchange): raise NotImplementedError("Subclass of destination needs to implement process method") The destination class should implement it's logic in the *process* method. Exchange -------- The data packets that flow on the bus are encapsulated with an Exchange.:: import uuid class Exchange: def __init__(self): self.id = uuid.uuid4() self.in_msg = None self.out_msg = None self.properties = {} def __str__(self): return "Id: " + str(self.id) + "\nProperties: " + str(self.properties) + "\nIn Msg:\n" + str(self.in_msg) + "\nOut Msg:\n" + str(self.out_msg) The exchange has an id, a properties dict, and in and out messages. The in and out messages are of type Message.:: class Message: def __init__(self): self.headers = {} self.body = None def __str__(self): return "\tHeaders: " + str(self.headers) + "\n\tBody: " + str(self.body) Each message has a headers dict and a body. The body may contain any data.The actual data that the source wants to send is put within the body. The *in_msg* in Exchange is generally used by most components. The *out_msg* is needed only for request-reply EIP. Pipeline -------- The pipeline represents the bus. It is a sequence of components, starting with a Source and followed by arbitrary number of Destination. The data travels on the bus, according to Pipeline configuration. If I wanted to take data from a Timer based source, modify it in a MessageModifier destination, and then send it to a Log destination, then the pipeline would be built as follows:: class PipelineTest(unittest.TestCase): def test_simple_pipeline(self): builder = DslPipelineBuilder() pipeline = builder.id("pipeline1").source(Timer, {"period": 1.0}).to(MessageModifier).to(Log, {"name": "test"}).build() pipeline.start() time.sleep(10) pipeline.stop() class MessageModifier(Destination): def process(self, exchange): exchange.in_msg.body += " modified" A pipeline must have one and only one source. A pipeline id can also be specified, using which the pipeline can be referenced. More on that later. Enterprise Integration Patterns ------------------------------- PyPipeline implements many Enterprise Integration Patterns. A pattern might have its own method in the DslPipelineBuilder or it might be possible to implement the pattern by combining other patterns. One of the EIP is Message Filter. As evident by the name, it is a step in the pipeline which filters messages based on some criteria that is provided as configuration. Example of Filter is as follows:: class FilterTest(unittest.TestCase): def test_simple_pipeline(self): builder = DslPipelineBuilder() pipeline = builder.source(Timer, {"period": 1.0}).filter(filter_method).process(Log, {"name": "test"}).build() pipeline.start() time.sleep(10) pipeline.stop() def filter_method(exchange): parts = exchange.in_msg.body.split() return int(parts[-1]) % 2 == 0 In this example, the filter() step takes a method which it will use to determine which messages should be allowed to continue on the pipeline. Plumber ------- Plumber is the Pipeline manager. It can be used to register multiple PipelineBuilders. It then builds the pipelines. All the pipelines can then be started by calling start() on the plumber. Similarly, all the pipelines can be stopped by calling stop on the plumber. With the use of plumber, it is also possible to start/stop individual pipelines by calling start_pipeline/stop_pipeline and providing the pipeline id. When a pipeline is built by registering the builder with the Plumber, then the pipeline and all its components are provided with the instance of the plumber. This can be used to do advanced stuff from within a component, such as starting another pipeline which is registered with the Plumber. The plumber usage is as follows:: class PlumberTest(unittest.TestCase): def test_simple_pipeline(self): plumber = Plumber() builder1 = DslPipelineBuilder() builder2 = DslPipelineBuilder() pipeline1 = builder1.source(Timer, {"period": 1.0}).to(MessageModifier) pipeline2 = builder2.source(Timer, {"period": 2.0}).to(MessageModifier) plumber.add_pipeline(pipeline1) plumber.add_pipeline(pipeline2) plumber.start() time.sleep(10) plumber.stop() class MessageModifier(Destination): def process(self, exchange): exchange.in_msg.body += " modified"