Changeset 1615

Show
Ignore:
Timestamp:
11/07/08 14:46:28 (2 months ago)
Author:
peet
Message:
  • randomized pools for reachable/stale IPs (load balancing)
  • watchdogs for hosts at cl_socket level
Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/0.4.7/lib/cxcore/cxcore/cxnetlink.py

    r1612 r1615  
    3737import traceback 
    3838import time 
     39import random 
    3940 
    4041from cxnet.common import hline 
     
    240241                svc = zeroconf.ServiceInfo(domain, host) 
    241242 
     243                pool = { 
     244                        "reachable": [], 
     245                        "stale": [], 
     246                } 
     247 
    242248                for i in self.server.cache.entriesWithName(host): 
    243249                        if (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_A): 
    244                                 svc.address = inet_ntoa(i.address) 
     250                                if pool.has_key(i.state): 
     251                                        pool[i.state].append(i) 
    245252                        elif (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_SRV): 
    246253                                svc.port = i.port 
     
    248255                                svc.setText(i.text) 
    249256 
    250                 if (not svc.address) and (zeroconf._TYPE_A in types): 
     257                print "DEBUG", pool 
     258                if len(pool["reachable"]) > 0: 
     259                        rec = random.choice(pool["reachable"]) 
     260                elif len(pool["stale"]) > 0: 
     261                        rec = random.choice(pool["stale"]) 
     262                else: 
     263                        rec = None 
     264 
     265                if (not rec) and (zeroconf._TYPE_A in types): 
    251266                        raise mDNS_HostNotFound() 
    252267                 
     
    257272                        raise mDNS_TextNotFound() 
    258273 
    259                 return ((svc.getAddress(),svc.getPort()),svc.getProperties()) 
     274                print ((inet_ntoa(rec.address),svc.getPort()),svc.getProperties(),rec) 
     275                return ((inet_ntoa(rec.address),svc.getPort()),svc.getProperties(),rec) 
    260276 
    261277 
     
    309325CX_FLAGS_MORE_FRAGMENTS = 0x004 
    310326 
     327CX_MSG_TRACK = 0x001 
     328 
    311329class cxmsg(Structure): 
    312330        ''' 
     
    393411                self.reasm_buffer = {} 
    394412                self.finish_buffer = {} 
     413                self.watch = {} 
    395414                 
    396415                # encryption 
     
    416435                try: 
    417436                        (bytes,(address,port)) = self.sock.recvfrom_into(self.recv_buffer) 
     437                        if self.watch.has_key((address,port)): 
     438                                self.watch[(address,port)].state = "reachable" 
     439                                del self.watch[(address,port)] 
     440 
    418441                # python < 2.5 
    419442                except: 
     
    498521 
    499522 
    500         def send(self,host,packet): 
     523        def watchdog(self,host): 
     524                if self.watch.has_key(host): 
     525                        self.watch[host].state = "stale" 
     526                        del self.watch[host] 
     527 
     528        def send(self,host,packet,flags=0,data=None): 
    501529                ''' 
    502530                Encrypt and send a packet 
     
    514542                                (c,a) = self.encrypt(c) 
    515543                                msg.header.alg_len = a 
     544 
     545                        if flags & CX_MSG_TRACK and data: 
     546                                self.watch[host] = data 
     547                                Timer(2,self.watchdog,(host,)).start() 
    516548 
    517549                        offset = 0