Changeset 1633

Show
Ignore:
Timestamp:
11/17/08 17:31:16 (2 months ago)
Author:
peet
Message:

packet 'broadcasting' to all available service IPs

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • branches/0.4.7/lib/cxcore/cxcore/cxnetlink.py

    r1632 r1633  
    261261 
    262262        @public 
     263        def lookupByIPPort(self,address,port): 
     264                hosts = map( 
     265                        lambda x: x.name, 
     266                        self.lookupByIP(address) 
     267                ) 
     268                return filter( 
     269                        lambda x: (x.name in hosts) and (x.port == port), 
     270                        filter( 
     271                                lambda x: (x.clazz == zeroconf._CLASS_IN) and (x.type == zeroconf._TYPE_SRV), 
     272                                self.server.cache.entries() 
     273                        ) 
     274                )[0].name 
     275 
     276        @public 
    263277        def lookupByIP(self,address): 
    264278                return filter( 
     
    269283                        ) 
    270284                ) 
     285 
     286        @public 
     287        def lookupAll(self,host,domain=None,nowait = False): 
     288 
     289                types = [zeroconf._TYPE_A, zeroconf._TYPE_SRV, zeroconf._TYPE_TXT] 
     290 
     291                if not domain: 
     292                        domain = host[host.index(".") + 1:] 
     293 
     294                if not self.server.cache.entriesWithName(host): 
     295                        if nowait: 
     296                                raise mDNS_ServiceNotFound() 
     297                        else: 
     298                                self.server.getServiceInfo(domain, host) 
     299 
     300                pool = [] 
     301                port = None 
     302                properties = None 
     303 
     304                for i in self.server.cache.entriesWithName(host): 
     305                        if (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_A): 
     306                                pool.append(i) 
     307                        elif (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_SRV): 
     308                                port = i.port 
     309                        elif (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_TXT): 
     310                                properties = i.properties 
     311 
     312                return map( 
     313                        lambda x: ((inet_ntoa(x.address), port), properties, x), 
     314                        pool 
     315                ) 
     316 
    271317 
    272318        @public 
     
    451497                self.use_cipher = True 
    452498                self._nonce = 0 
     499                self.nmap = {} 
    453500 
    454501                # socket operations 
     
    504551                        return 
    505552 
    506                 for i in map(lambda x: x.name, self.mdns.lookupByIP(address)): 
    507                         if self.watch.has_key(i): 
    508                                 self.watch[i].state = "reachable" 
    509                                 del self.watch[i] 
    510  
    511                         # watch flags 
    512                         if (header.flags & CX_MSG_ECHO) and (header.flags & CX_FLAGS_FIRST_FRAGMENT): 
    513                                 (host,prop,record) = self.mdns.lookup(i) 
    514                                 self._send(host,None,record,CX_MSG_DROP) 
    515  
    516                 if header.flags & CX_MSG_DROP: 
    517                         return 
     553                # drop duplicates 
     554                origin = self.mdns.lookupByIPPort(address,port) 
     555                if not self.nmap.has_key(origin): 
     556                        self.nmap[origin] = 0 
     557                 
     558                if header.nonce <= self.nmap[origin]: 
     559                        if not self.reasm_buffer.has_key((origin,header.nonce)): 
     560                                self.log("debug","dropped old nonce not in self.reasm_buffer") 
     561                                return 
     562                        elif header.flags & CX_FLAGS_FIRST_FRAGMENT: 
     563                                self.log("debug","dropped old nonce with CX_FLAGS_FIRST_FRAGMENT flag") 
     564                                return 
     565                        self.log("debug","accepted old nonce in self.reasm_buffer") 
     566                else: 
     567                        self.nmap[origin] = header.nonce 
     568                        self.log("debug","nonce for an origin incremented") 
     569 
     570                #for i in map(lambda x: x.name, self.mdns.lookupByIP(address)): 
     571                        #if self.watch.has_key(i): 
     572                                #self.watch[i].state = "reachable" 
     573                                #del self.watch[i] 
     574 
     575                        ## watch flags 
     576                        #if (header.flags & CX_MSG_ECHO) and (header.flags & CX_FLAGS_FIRST_FRAGMENT): 
     577                                #(host,prop,record) = self.mdns.lookup(i) 
     578                                #self._send(host,None,record,CX_MSG_DROP) 
     579 
     580                #if header.flags & CX_MSG_DROP: 
     581                        #return 
    518582 
    519583                # get plaintext message and load it 
    520584                self.hook(self.bus, self.recv_buffer, header.msg_len + sizeof(header), "cl_socket.recv()") 
    521585                n = string_at( addressof(self.recv_buffer) + sizeof(header), header.msg_len ) 
    522                 self.store(header,n) 
    523  
    524         def store(self,header,message): 
     586                self.store(origin,header,n) 
     587 
     588        def store(self,origin,header,message): 
    525589                ''' 
    526590                Store fragments 
    527591                ''' 
    528                 if not self.reasm_buffer.has_key(header.nonce): 
    529                         self.reasm_buffer[header.nonce] = {} 
    530                         self.finish_buffer[header.nonce] = 0 
    531                  
     592                key = (origin,header.nonce) 
     593                if not self.reasm_buffer.has_key(key): 
     594                        self.reasm_buffer[key] = {} 
     595                        self.finish_buffer[key] = 0 
     596                 
     597                if self.reasm_buffer[key].has_key(header.fragment): 
     598                        # possible duplicate 
     599                        return 
     600 
    532601                try: 
    533                         if self.reasm_buffer[header.nonce].has_key(header.fragment): 
    534                                 self.reasm_buffer[header.nonce] = None 
    535                                 self.log("debug","corrupt buffer `%s`" % (header.nonce)) 
    536602 
    537603                        # shortcut 
    538                         buf = self.reasm_buffer[header.nonce
     604                        buf = self.reasm_buffer[key
    539605 
    540606                        # store fragment 
     
    544610                        if header.flags & CX_FLAGS_FINAL_FRAGMENT: 
    545611                                # store max fragment number 
    546                                 self.finish_buffer[header.nonce] = header.fragment 
     612                                self.finish_buffer[key] = header.fragment 
    547613                                # drop all invalid fragments? 
    548                                 while max(buf.keys()) > self.finish_buffer[header.nonce]: 
     614                                while max(buf.keys()) > self.finish_buffer[key]: 
    549615                                        del buf[max(buf.keys())] 
    550616 
    551617                        # 
    552                         if self.finish_buffer[header.nonce] > 0: 
     618                        if self.finish_buffer[key] > 0: 
    553619                                # if FIN packet already received, try to reassemble 
    554                                 if len(buf.keys()) == self.finish_buffer[header.nonce]: 
    555                                         self.reassemble(header.nonce, buf) 
     620                                if len(buf.keys()) == self.finish_buffer[key]: 
     621                                        self.reassemble(key, buf) 
    556622                except: 
    557623                        traceback.print_exc() 
     
    559625                        return 
    560626 
    561         def reassemble(self,nonce,buf): 
     627        def reassemble(self,key,buf): 
    562628                ''' 
    563629                Reassemble fragments 
     
    579645                 
    580646                # delete reassemble buffer 
    581                 del self.reasm_buffer[nonce
    582                 del self.finish_buffer[nonce
     647                del self.reasm_buffer[key
     648                del self.finish_buffer[key
    583649 
    584650                self.bus.tx.put(msg) 
     
    591657 
    592658        def send(self,h,packet,flags=0): 
    593                 (host,prop,record) = self.mdns.lookup(h) 
    594                 return self._send(host,packet,record,flags) 
    595  
    596         def _send(self,host,packet,record,flags=0): 
     659                pool = self.mdns.lookupAll(h) 
     660                nonce = self.nonce() 
     661                for (host,prop,record) in pool: 
     662                        self._send(host,packet,record,nonce,flags) 
     663 
     664        def _send(self,host,packet,record,nonce,flags=0): 
    597665                ''' 
    598666                Encrypt and send a packet 
     
    604672                        c = dumps(packet) 
    605673 
    606                         msg.header.nonce = self.nonce() 
     674                        msg.header.nonce = nonce 
    607675 
    608676                        if self.use_cipher: 
     
    611679                                msg.header.alg_len = a 
    612680 
    613                         if flags & CX_MSG_TRACK: 
    614                                 self.watch[record.name] = record 
    615                                 Timer(2,self.watchdog,(record.name,)).start() 
     681                        #if flags & CX_MSG_TRACK: 
     682                                #self.watch[record.name] = record 
     683                                #Timer(2,self.watchdog,(record.name,)).start() 
    616684 
    617685                        offset = 0 
    618686                        fragment = 1 
    619687                        while (offset < len(c)) and (fragment < CX_MAX_FRAGMENT): 
    620                                 msg.header.flags = flags 
     688                                msg.header.flags = 0 
    621689                                 
    622690                                if offset == 0: