Changeset 1617

Show
Ignore:
Timestamp:
11/07/08 15:25:28 (2 months ago)
Author:
peet
Message:

trying to use cl_socket() level tracking

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/0.4.7/lib/cxcore/cxcore/manager.py

    r1577 r1617  
    6868        Function-fabrique 
    6969        ''' 
    70         def __init__(self,bus,addr,disconnect=False): 
     70        def __init__(self,bus,addr,flags=0,disconnect=False): 
    7171                log.__init__(self,bus,disconnect) 
    7272                self.addr = addr 
     73                self.flags = 0 
    7374 
    7475 
     
    8182                result = x.func(param,pam,pam) 
    8283        ''' 
     84        def __getattr__(self,key): 
     85                if key == "addr": 
     86                        return self.addr 
     87                elif key[:2] == "__": 
     88                        return BaseCoreService.__getattr__(self,key) 
     89                elif key == "call": 
     90                        def f(*argv,**kwarg): 
     91                                key = argv[0] 
     92                                argv = list(argv) 
     93                                argv.pop(0) 
     94                                self.bus.put2( 
     95                                        self.addr, 
     96                                        { 
     97                                                "map": key, 
     98                                                "version": CP_VERSION, 
     99                                                "argv": argv, 
     100                                                "sync": False, 
     101                                                "kwarg": kwarg, 
     102                                        }, 
     103                                        flags=self.flags 
     104                                ) 
     105                        return f 
     106                else: 
     107                        def f(*argv,**kwarg): 
     108                                ret = self.bus.put2( 
     109                                        self.addr, 
     110                                        { 
     111                                                "map": key, 
     112                                                "version": CP_VERSION, 
     113                                                "argv": argv, 
     114                                                "sync": True, 
     115                                                "kwarg": kwarg, 
     116                                        }, 
     117                                        flags=self.flags 
     118                                ) 
     119                                if ret["error"]: 
     120                                        self.log("debug","raising exception: %s" % (ret["data"])) 
     121                                        raise Exception(ret["error"]) 
     122                                return ret["data"] 
     123                        return f 
     124 
     125class ACoreService (BaseCoreService): 
     126        ''' 
     127        Asynchronous calls 
     128         
     129        sample: 
     130                x = ACoreService(bus,addr) 
     131                x.func(param,pam,pam) 
     132        ''' 
     133 
    83134        def __getattr__(self,key): 
    84135                if key == "addr": 
     
    99150                                                "sync": False, 
    100151                                                "kwarg": kwarg, 
    101                                         } 
    102                                 ) 
    103                         return f 
    104                 else: 
    105                         def f(*argv,**kwarg): 
    106                                 ret = self.bus.put2( 
    107                                         self.addr, 
    108                                         { 
    109                                                 "map": key, 
    110                                                 "version": CP_VERSION, 
    111                                                 "argv": argv, 
    112                                                 "sync": True, 
    113                                                 "kwarg": kwarg, 
    114                                         } 
    115                                 ) 
    116                                 if ret["error"]: 
    117                                         self.log("debug","raising exception: %s" % (ret["data"])) 
    118                                         raise Exception(ret["error"]) 
    119                                 return ret["data"] 
    120                         return f 
    121  
    122 class ACoreService (BaseCoreService): 
    123         ''' 
    124         Asynchronous calls 
    125          
    126         sample: 
    127                 x = ACoreService(bus,addr) 
    128                 x.func(param,pam,pam) 
    129         ''' 
    130  
    131         def __getattr__(self,key): 
    132                 if key == "addr": 
    133                         return self.addr 
    134                 elif key[:2] == "__": 
    135                         return BaseCoreService.__getattr__(self,key) 
    136                 elif key == "call": 
    137                         def f(*argv,**kwarg): 
    138                                 key = argv[0] 
    139                                 argv = list(argv) 
    140                                 argv.pop(0) 
    141                                 self.bus.put( 
    142                                         self.addr, 
    143                                         { 
    144                                                 "map": key, 
    145                                                 "version": CP_VERSION, 
    146                                                 "argv": argv, 
    147                                                 "sync": False, 
    148                                                 "kwarg": kwarg, 
    149                                         } 
     152                                        }, 
     153                                        flags=self.flags 
    150154                                ) 
    151155                        return f 
     
    160164                                                "sync": False, 
    161165                                                "kwarg": kwarg, 
    162                                         } 
     166                                        }, 
     167                                        flags=self.flags 
    163168                                ) 
    164169                        return f 
  • branches/0.4.7/shell/bus/dispatch.py

    r1612 r1617  
    123123                self.rx.put(packet) 
    124124 
    125         def put(self,address,message): 
     125        def put(self,address,message,flags=0): 
    126126                ''' 
    127127                Create a packet with "to" and "from" address fields and put it into the tx wire. 
     
    139139                        packet["to"] = address 
    140140                        packet["from"] = self.address 
     141                        packet["flags"] = flags 
    141142                        packet["data"] = message 
    142143                else: 
     
    157158                return packet["data"] 
    158159         
    159         def put2(self,address,message,addr={}): 
     160        def put2(self,address,message,addr={},flags=0): 
    160161                ''' 
    161162                Put a message in tx and the get from rx 
     
    271272                                # lookup() returns ('ip.addr',port) 
    272273                                packet["from"] = "%s@%s.%s." % (packet["from"],self.sname,CL_DOMAIN) 
    273                                 self.sock.send(host,packet
     274                                self.sock.send(host,packet,packet["flags"]
    274275                                return True 
    275276                        except Exception,e: 
  • branches/0.4.7/shell/state/ddb.py

    r1616 r1617  
    686686                        self.log("debug","Sent %s" % (primer.dump())) 
    687687 
    688                         x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 
    689                         x._add(primer) 
    690  
    691688                        if node.async[hostname] > 0: 
    692689                                self.log("debug","Node sync watchdog for host `%s` started with ttl `%s`" % (hostname,node.async[hostname])) 
    693                                 Timer(1,self._t_add,(host,primer)).start() 
     690                                Timer(2,self._t_add,(host,primer)).start() 
     691                                x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 
    694692                        else: 
    695693                                self.log("critical","Node sync (add) watchdog: low watermark touch; mark node as stale") 
     694                                x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname),flags=CX_MSG_TRACK) 
     695                        x._add(primer) 
    696696                else: 
    697697                        self.log("debug","Node sync watchdog for host `%s` terminated: confirmed" % (hostname)) 
     
    712712                        self.log("debug","Sent %s" % (primer.dump())) 
    713713 
    714                         x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 
    715                         x._update(primer) 
    716  
    717714                        if node.usync[hostname] > 0: 
    718715                                self.log("debug","Node sync watchdog for host `%s` started with ttl `%s`" % (hostname,node.usync[hostname])) 
    719                                 Timer(1,self._t_update,(host,primer)).start() 
     716                                Timer(2,self._t_update,(host,primer)).start() 
     717                                x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 
    720718                        else: 
    721719                                self.log("critical","Node sync (update) watchdog: low watermark touch; mark node as stale") 
     720                                x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname),flags=CX_MSG_TRACK) 
     721                        x._update(primer) 
    722722                else: 
    723723                        self.log("debug","Node sync watchdog for host `%s` terminated: confirmed" % (hostname)) 
     
    737737                        self.log("debug","Node sync request for host `%s`" % (hostname)) 
    738738 
    739                         x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 
    740                         x._del(index) 
    741  
    742739                        if node.dsync[hostname] > 0: 
    743740                                self.log("debug","Node sync watchdog for host `%s` started with ttl `%s`" % (hostname,node.dsync[hostname])) 
    744                                 Timer(1,self._t_del,(host,index)).start() 
     741                                Timer(2,self._t_del,(host,index)).start() 
     742                                x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname)) 
    745743                        else: 
    746744                                self.log("critical","Node sync (del) watchdog: low watermark touch; mark node as stale") 
     745                                x = ACoreService(self.bus,"%s@%s" %(self.prefix, hostname),flags=CX_MSG_TRACK) 
     746                        x._del(index) 
    747747                else: 
    748748                        self.log("debug","Node sync watchdog for host `%s` terminated: confirmed" % (hostname))