Skip to content

A distributed framework to implement device-cloud collaborative workflow

License

Notifications You must be signed in to change notification settings

ThomasAtlantis/DistPipe

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

DistPipe

DistPipe is a distributed framework to implement device-cloud collaborative workflow.

Quickstart

You can install the latest official version of DistPipe from PyPI:

pip install distpipe

Usage

Step 0: Define connection

Specify the network topology using a JSON file:

{
    "client_addr": ["192.168.1.126", 6000],
    "server_addr": ["192.168.1.126", 6001],
    "role": "server"
}

Here, the field role indicates the current platform (server or client). Next, define a Router to connect the device and the cloud.

from distpipe.transport import Router

router = Router.from_json("node_map.client.json")

Step 1: Define custom tasks

from distpipe.distpipe import Task

class Identical(Task):

    def process(self, data):
        return data[0]

class Log(Task):

    def process(self, data):
        return data[0] + 1

class Add(Task):

    def process(self, data):
        return data[0] + data[1]

identical = Identical('identical', role='client')
log = Log('log', role='client')
add = Add('add', role='server')

Step 2: Define pipeline as DAG

from distpipe.distpipe import Pipe

pipe = Pipe(router=router)
pipe.add(srcs=[identical], tgt=log)
pipe.add(srcs=[identical, log], tgt=add)
pipe.set_io(identical, add)
pipe.start()

Step 3: Launch the pipeline

if router.role == "client":
    pipe.istream.put(1)
    print(pipe.ostream.get()[0])
    pipe.istream.put(2)
    print(pipe.ostream.get()[0])

Optional: Shutdown the system

router.shutdown()