Changeset 1617
- Timestamp:
- 11/07/08 15:25:28 (2 months ago)
- Files:
-
- branches/0.4.7/lib/cxcore/cxcore/manager.py (modified) (4 diffs)
- branches/0.4.7/shell/bus/dispatch.py (modified) (4 diffs)
- branches/0.4.7/shell/state/ddb.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/0.4.7/lib/cxcore/cxcore/manager.py
r1577 r1617 68 68 Function-fabrique 69 69 ''' 70 def __init__(self,bus,addr, disconnect=False):70 def __init__(self,bus,addr,flags=0,disconnect=False): 71 71 log.__init__(self,bus,disconnect) 72 72 self.addr = addr 73 self.flags = 0 73 74 74 75 … … 81 82 result = x.func(param,pam,pam) 82 83 ''' 84 def __getattr__(self,key): 85 if key == "addr": 86 return self.addr 87 elif key[:2] == "__": 88 return BaseCoreService.__getattr__(self,key) 89 elif key == "call": 90 def f(*argv,**kwarg): 91 key = argv[0] 92 argv = list(argv) 93 argv.pop(0) 94 self.bus.put2( 95 self.addr, 96 { 97 "map": key, 98 "version": CP_VERSION, 99 "argv": argv, 100 "sync": False, 101 "kwarg": kwarg, 102 }, 103 flags=self.flags 104 ) 105 return f 106 else: 107 def f(*argv,**kwarg): 108 ret = self.bus.put2( 109 self.addr, 110 { 111 "map": key, 112 "version": CP_VERSION, 113 "argv": argv, 114 "sync": True, 115 "kwarg": kwarg, 116 }, 117 flags=self.flags 118 ) 119 if ret["error"]: 120 self.log("debug","raising exception: %s" % (ret["data"])) 121 raise Exception(ret["error"]) 122 return ret["data"] 123 return f 124 125 class ACoreService (BaseCoreService): 126 ''' 127 Asynchronous calls 128 129 sample: 130 x = ACoreService(bus,addr) 131 x.func(param,pam,pam) 132 ''' 133 83 134 def __getattr__(self,key): 84 135 if key == "addr": … … 99 150 "sync": False, 100 151 "kwarg": kwarg, 101 } 102 ) 103 return f 104 else: 105 def f(*argv,**kwarg): 106 ret = self.bus.put2( 107 self.addr, 108 { 109 "map": key, 110 "version": CP_VERSION, 111 "argv": argv, 112 "sync": True, 113 "kwarg": kwarg, 114 } 115 ) 116 if ret["error"]: 117 self.log("debug","raising exception: %s" % (ret["data"])) 118 raise Exception(ret["error"]) 119 return ret["data"] 120 return f 121 122 class ACoreService (BaseCoreService): 123 ''' 124 Asynchronous calls 125 126 sample: 127 x = ACoreService(bus,addr) 128 x.func(param,pam,pam) 129 ''' 130 131 def __getattr__(self,key): 132 if key == "addr": 133 return self.addr 134 elif key[:2] == "__": 135 return BaseCoreService.__getattr__(self,key) 136 elif key == "call": 137 def f(*argv,**kwarg): 138 key = argv[0] 139 argv = list(argv) 140 argv.pop(0) 141 self.bus.put( 142 self.addr, 143 { 144 "map": key, 145 "version": CP_VERSION, 146 "argv": argv, 147 "sync": False, 148 "kwarg": kwarg, 149 } 152 }, 153 flags=self.flags 150 154 ) 151 155 return f … … 160 164 "sync": False, 161 165 "kwarg": kwarg, 162 } 166 }, 167 flags=self.flags 163 168 ) 164 169 return f branches/0.4.7/shell/bus/dispatch.py
r1612 r1617 123 123 self.rx.put(packet) 124 124 125 def put(self,address,message ):125 def put(self,address,message,flags=0): 126 126 ''' 127 127 Create a packet with "to" and "from" address fields and put it into the tx wire. … … 139 139 packet["to"] = address 140 140 packet["from"] = self.address 141 packet["flags"] = flags 141 142 packet["data"] = message 142 143 else: … … 157 158 return packet["data"] 158 159 159 def put2(self,address,message,addr={} ):160 def put2(self,address,message,addr={},flags=0): 160 161 ''' 161 162 Put a message in tx and the get from rx … … 271 272 # lookup() returns ('ip.addr',port) 272 273 packet["from"] = "%s@%s.%s." % (packet["from"],self.sname,CL_DOMAIN) 273 self.sock.send(host,packet )274 self.sock.send(host,packet,packet["flags"]) 274 275 return True 275 276 except Exception,e: branches/0.4.7/shell/state/ddb.py
r1616 r1617 686 686 self.log("debug","Sent %s" % (primer.dump())) 687 687 688 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname))689 x._add(primer)690 691 688 if node.async[hostname] > 0: 692 689 self.log("debug","Node sync watchdog for host `%s` started with ttl `%s`" % (hostname,node.async[hostname])) 693 Timer(1,self._t_add,(host,primer)).start() 690 Timer(2,self._t_add,(host,primer)).start() 691 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 694 692 else: 695 693 self.log("critical","Node sync (add) watchdog: low watermark touch; mark node as stale") 694 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname),flags=CX_MSG_TRACK) 695 x._add(primer) 696 696 else: 697 697 self.log("debug","Node sync watchdog for host `%s` terminated: confirmed" % (hostname)) … … 712 712 self.log("debug","Sent %s" % (primer.dump())) 713 713 714 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname))715 x._update(primer)716 717 714 if node.usync[hostname] > 0: 718 715 self.log("debug","Node sync watchdog for host `%s` started with ttl `%s`" % (hostname,node.usync[hostname])) 719 Timer(1,self._t_update,(host,primer)).start() 716 Timer(2,self._t_update,(host,primer)).start() 717 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 720 718 else: 721 719 self.log("critical","Node sync (update) watchdog: low watermark touch; mark node as stale") 720 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname),flags=CX_MSG_TRACK) 721 x._update(primer) 722 722 else: 723 723 self.log("debug","Node sync watchdog for host `%s` terminated: confirmed" % (hostname)) … … 737 737 self.log("debug","Node sync request for host `%s`" % (hostname)) 738 738 739 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname))740 x._del(index)741 742 739 if node.dsync[hostname] > 0: 743 740 self.log("debug","Node sync watchdog for host `%s` started with ttl `%s`" % (hostname,node.dsync[hostname])) 744 Timer(1,self._t_del,(host,index)).start() 741 Timer(2,self._t_del,(host,index)).start() 742 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 745 743 else: 746 744 self.log("critical","Node sync (del) watchdog: low watermark touch; mark node as stale") 745 x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname),flags=CX_MSG_TRACK) 746 x._del(index) 747 747 else: 748 748 self.log("debug","Node sync watchdog for host `%s` terminated: confirmed" % (hostname))
