Skip to content

ubolonton/twisted-csp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

156 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Twisted CSP

Communicating sequential processes for Twisted. Channels like Go, or Clojurescript's core.async.

WARNING: This is currently alpha software.

This is a very close port of Clojurescript's core.async. The significant difference is that light-weight processes are implemented using generators (yield) instead of macros.

  • Channel operations must happen inside "light-weight processes" (code flows, not actual threads).
  • Light-weight processes are spawn by calling go, go_channel, go_deferred or by using their decorator equivalents.
  • Most channel operations must follow the form of yield do_sth(...).
def slow_pipe(input, output):
    while True:
        value = yield take(input)
        yield sleep(0.5)
        if value is None: # input closed
            close(output)
            yield stop()
        else:
            yield put(output, value)

go(slow_pipe, chan1, chan2))

Examples

Function returning channel (http://talks.golang.org/2012/concurrency.slide#25).

def boring(message):
    c = Channel()
    def counter():
        i = 0
        while True:
            yield put(c, "%s %d" % (message, i))
            yield sleep(random.random())
            i += 1
    go(counter)
    return c


def main():
    b = boring("boring!")
    for i in range(5):
        print "You say: \"%s\"" % (yield take(b))
    print "You are boring; I'm leaving."

Pingpong (http://talks.golang.org/2013/advconc.slide#6).

class Ball:
    hits = 0


@process
def player(name, table):
    while True:
        ball = yield take(table)
        ball.hits += 1
        print name, ball.hits
        yield sleep(0.1)
        yield put(table, ball)


def main():
    table = Channel()

    player("ping", table)
    player("pong", table)

    yield put(table, Ball())
    yield sleep(1)

Timeout using alts (select in Go) (http://talks.golang.org/2012/concurrency.slide#35).

c = boring("Joe")
while True:
    value, chan = yield alts([c, timeout(0.8)])
    if chan is c:
        print value
    else:
        print "You're too slow."
        yield stop()

Running the examples

Use the run script, like

./run example.go.timeout_for_whole_conversation_using_select

Examples under example/go are ports of Go examples from:

Playing around in a REPL

Python 2.7.5+ (default, Sep 19 2013, 13:48:49)
[GCC 4.8.1] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import thread
>>> from csp import *
>>> from twisted.internet import reactor
>>> thread.start_new_thread(reactor.run, (), {"installSignalHandlers": False})
140038185355008
>>> class Ball:
...     hits = 0
...
>>> def player(name, table):
...     while True:
...         ball = yield take(table)
...         if ball is None: # channel's closed
...             print name, "Ball's gone"
...             break
...         ball.hits += 1
...         print name, ball.hits
...         yield sleep(0.1)
...         yield put(table, ball)
...
>>> def main():
...     table = Channel()
...     go(player, "ping", table)
...     go(player, "pong", table)
...     yield put(table, Ball())
...     yield sleep(1)
...     close(table)
...
>>> reactor.callFromThread(lambda: go(main))
>>> ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping Ball's gone
pong Ball's gone

>>>

Limitations

  • Does not work in a multi-threaded environment, at all (this is fixable though).
  • Channel's normal API cannot be used outside of a process (more precisely outside of the generator function of a process).
  • Generator functions must be used to spawn processes. This makes it less composable than in Go (where the constructs are built-in), or Clojurescript (where macros rule).
  • Forgetting to yield can cause subtle bugs.
  • Cooperative multi-processing (not sure if this is a big problem though).

TODO

  • Multiplexing, mixing, publishing/subscribing.
  • Channel operations (map, filter, reduce...).
  • Support multi-threaded environment (porting Clojure's core.async not Clojurescript's).
  • Write tests.
  • Think of a sensible error handling strategy (I think this should be decided by client code not library code though).
    • Should there be a separate error channel?
    • Should channels deliver (result, error) tuples?
    • Should errors be treated as special values (caught exceptions re-thrown when taken)?
  • Support other reactors, e.g. Tornado (should be easy, as the dispatcher is the only thing that depends on twisted).
  • More documentation.
  • More examples (focusing on leveraging Twisted's rich network capabilities).
  • put_then_callback, take_then_callback execute the supplied callback in the same tick if result is immediately available. This can cause problems (especially if they are public API).

Inspiration

Other Python CSP libraries:

These libraries use threads/processes (except for pycsp, which has support for greenlets (which is not portable)). This makes implementation simpler (in principle), sacrificing efficiency (but managing threads/processes can be a chore). On the other hand they are much more generic, and support networked channels (that is not necessarily a good thing though). TODO: More elaborate comparison.

About

Go-style concurrency for Twisted

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages