@@ -45,10 +45,10 @@ def process(self, inport, inmsg):
4545 raise NotImplementedError ('IParticipant.process()' )
4646
4747 def ack (self , msg ):
48- self ._runtime ._channel .ack (msg .delivery_tag )
48+ self ._runtime ._channel .basic . ack (msg .delivery_info [ " delivery_tag" ] )
4949
5050 def nack (self , msg ):
51- self ._runtime ._channel .nack (msg .delivery_tag )
51+ self ._runtime ._channel .basic . nack (msg .delivery_info [ " delivery_tag" ] )
5252
5353def sendParticipantDefinition (channel , d ):
5454 msg = haigha_Message (json .dumps (d ))
@@ -69,7 +69,7 @@ def handleInput(msg):
6969
7070 if 'in' in direction :
7171 channel .queue .declare (queue )
72- channel .basic .consume (queue = queue , consumer = handleInput )
72+ channel .basic .consume (queue = queue , consumer = handleInput , no_ack = False )
7373 print 'subscribed to' , queue
7474 sys .stdout .flush ()
7575 else :
@@ -107,7 +107,7 @@ def __init__(self, participant, done_cb):
107107
108108 def _send (self , outport , data ):
109109 ports = self .participant .definition ['outports' ]
110- print "Publising message: %s, %s, %s" % (data ,outport ,ports )
110+ print "Publishing message: %s, %s, %s" % (data ,outport ,ports )
111111 sys .stdout .flush ()
112112 serialized = json .dumps (data )
113113 msg = haigha_Message (serialized )
0 commit comments