Changeset 1633
- Timestamp:
- 11/17/08 17:31:16 (2 months ago)
- Files:
-
- branches/0.4.7/lib/cxcore/cxcore/cxnetlink.py (modified) (10 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/0.4.7/lib/cxcore/cxcore/cxnetlink.py
r1632 r1633 261 261 262 262 @public 263 def lookupByIPPort(self,address,port): 264 hosts = map( 265 lambda x: x.name, 266 self.lookupByIP(address) 267 ) 268 return filter( 269 lambda x: (x.name in hosts) and (x.port == port), 270 filter( 271 lambda x: (x.clazz == zeroconf._CLASS_IN) and (x.type == zeroconf._TYPE_SRV), 272 self.server.cache.entries() 273 ) 274 )[0].name 275 276 @public 263 277 def lookupByIP(self,address): 264 278 return filter( … … 269 283 ) 270 284 ) 285 286 @public 287 def lookupAll(self,host,domain=None,nowait = False): 288 289 types = [zeroconf._TYPE_A, zeroconf._TYPE_SRV, zeroconf._TYPE_TXT] 290 291 if not domain: 292 domain = host[host.index(".") + 1:] 293 294 if not self.server.cache.entriesWithName(host): 295 if nowait: 296 raise mDNS_ServiceNotFound() 297 else: 298 self.server.getServiceInfo(domain, host) 299 300 pool = [] 301 port = None 302 properties = None 303 304 for i in self.server.cache.entriesWithName(host): 305 if (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_A): 306 pool.append(i) 307 elif (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_SRV): 308 port = i.port 309 elif (i.clazz == zeroconf._CLASS_IN) and (i.type == zeroconf._TYPE_TXT): 310 properties = i.properties 311 312 return map( 313 lambda x: ((inet_ntoa(x.address), port), properties, x), 314 pool 315 ) 316 271 317 272 318 @public … … 451 497 self.use_cipher = True 452 498 self._nonce = 0 499 self.nmap = {} 453 500 454 501 # socket operations … … 504 551 return 505 552 506 for i in map(lambda x: x.name, self.mdns.lookupByIP(address)): 507 if self.watch.has_key(i): 508 self.watch[i].state = "reachable" 509 del self.watch[i] 510 511 # watch flags 512 if (header.flags & CX_MSG_ECHO) and (header.flags & CX_FLAGS_FIRST_FRAGMENT): 513 (host,prop,record) = self.mdns.lookup(i) 514 self._send(host,None,record,CX_MSG_DROP) 515 516 if header.flags & CX_MSG_DROP: 517 return 553 # drop duplicates 554 origin = self.mdns.lookupByIPPort(address,port) 555 if not self.nmap.has_key(origin): 556 self.nmap[origin] = 0 557 558 if header.nonce <= self.nmap[origin]: 559 if not self.reasm_buffer.has_key((origin,header.nonce)): 560 self.log("debug","dropped old nonce not in self.reasm_buffer") 561 return 562 elif header.flags & CX_FLAGS_FIRST_FRAGMENT: 563 self.log("debug","dropped old nonce with CX_FLAGS_FIRST_FRAGMENT flag") 564 return 565 self.log("debug","accepted old nonce in self.reasm_buffer") 566 else: 567 self.nmap[origin] = header.nonce 568 self.log("debug","nonce for an origin incremented") 569 570 #for i in map(lambda x: x.name, self.mdns.lookupByIP(address)): 571 #if self.watch.has_key(i): 572 #self.watch[i].state = "reachable" 573 #del self.watch[i] 574 575 ## watch flags 576 #if (header.flags & CX_MSG_ECHO) and (header.flags & CX_FLAGS_FIRST_FRAGMENT): 577 #(host,prop,record) = self.mdns.lookup(i) 578 #self._send(host,None,record,CX_MSG_DROP) 579 580 #if header.flags & CX_MSG_DROP: 581 #return 518 582 519 583 # get plaintext message and load it 520 584 self.hook(self.bus, self.recv_buffer, header.msg_len + sizeof(header), "cl_socket.recv()") 521 585 n = string_at( addressof(self.recv_buffer) + sizeof(header), header.msg_len ) 522 self.store( header,n)523 524 def store(self, header,message):586 self.store(origin,header,n) 587 588 def store(self,origin,header,message): 525 589 ''' 526 590 Store fragments 527 591 ''' 528 if not self.reasm_buffer.has_key(header.nonce): 529 self.reasm_buffer[header.nonce] = {} 530 self.finish_buffer[header.nonce] = 0 531 592 key = (origin,header.nonce) 593 if not self.reasm_buffer.has_key(key): 594 self.reasm_buffer[key] = {} 595 self.finish_buffer[key] = 0 596 597 if self.reasm_buffer[key].has_key(header.fragment): 598 # possible duplicate 599 return 600 532 601 try: 533 if self.reasm_buffer[header.nonce].has_key(header.fragment):534 self.reasm_buffer[header.nonce] = None535 self.log("debug","corrupt buffer `%s`" % (header.nonce))536 602 537 603 # shortcut 538 buf = self.reasm_buffer[ header.nonce]604 buf = self.reasm_buffer[key] 539 605 540 606 # store fragment … … 544 610 if header.flags & CX_FLAGS_FINAL_FRAGMENT: 545 611 # store max fragment number 546 self.finish_buffer[ header.nonce] = header.fragment612 self.finish_buffer[key] = header.fragment 547 613 # drop all invalid fragments? 548 while max(buf.keys()) > self.finish_buffer[ header.nonce]:614 while max(buf.keys()) > self.finish_buffer[key]: 549 615 del buf[max(buf.keys())] 550 616 551 617 # 552 if self.finish_buffer[ header.nonce] > 0:618 if self.finish_buffer[key] > 0: 553 619 # if FIN packet already received, try to reassemble 554 if len(buf.keys()) == self.finish_buffer[ header.nonce]:555 self.reassemble( header.nonce, buf)620 if len(buf.keys()) == self.finish_buffer[key]: 621 self.reassemble(key, buf) 556 622 except: 557 623 traceback.print_exc() … … 559 625 return 560 626 561 def reassemble(self, nonce,buf):627 def reassemble(self,key,buf): 562 628 ''' 563 629 Reassemble fragments … … 579 645 580 646 # delete reassemble buffer 581 del self.reasm_buffer[ nonce]582 del self.finish_buffer[ nonce]647 del self.reasm_buffer[key] 648 del self.finish_buffer[key] 583 649 584 650 self.bus.tx.put(msg) … … 591 657 592 658 def send(self,h,packet,flags=0): 593 (host,prop,record) = self.mdns.lookup(h) 594 return self._send(host,packet,record,flags) 595 596 def _send(self,host,packet,record,flags=0): 659 pool = self.mdns.lookupAll(h) 660 nonce = self.nonce() 661 for (host,prop,record) in pool: 662 self._send(host,packet,record,nonce,flags) 663 664 def _send(self,host,packet,record,nonce,flags=0): 597 665 ''' 598 666 Encrypt and send a packet … … 604 672 c = dumps(packet) 605 673 606 msg.header.nonce = self.nonce()674 msg.header.nonce = nonce 607 675 608 676 if self.use_cipher: … … 611 679 msg.header.alg_len = a 612 680 613 if flags & CX_MSG_TRACK:614 self.watch[record.name] = record615 Timer(2,self.watchdog,(record.name,)).start()681 #if flags & CX_MSG_TRACK: 682 #self.watch[record.name] = record 683 #Timer(2,self.watchdog,(record.name,)).start() 616 684 617 685 offset = 0 618 686 fragment = 1 619 687 while (offset < len(c)) and (fragment < CX_MAX_FRAGMENT): 620 msg.header.flags = flags688 msg.header.flags = 0 621 689 622 690 if offset == 0:
