Changeset 1612
- Timestamp:
- 11/07/08 13:44:10 (2 months ago)
- Files:
-
- branches/0.4.7/lib/cxcore/cxcore/cxnetlink.py (modified) (2 diffs)
- branches/0.4.7/lib/cxnet/cxnet/zeroconf.py (modified) (11 diffs)
- branches/0.4.7/shell/bus/dispatch.py (modified) (5 diffs)
- branches/0.4.7/shell/connexion.py (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
branches/0.4.7/lib/cxcore/cxcore/cxnetlink.py
r1581 r1612 157 157 ''' 158 158 159 def __init__(self, address, join,bus, adaptive=True, heartbeat=True):159 def __init__(self, address, bus, adaptive=True, heartbeat=True): 160 160 # set up ZeroConf mDNS 161 161 log.__init__(self,bus) 162 self.server = zeroconf.Zeroconf(address, join,adaptive, heartbeat=heartbeat)162 self.server = zeroconf.Zeroconf(address, adaptive, heartbeat=heartbeat) 163 163 self.announcer = mDNSannouncer(bus) 164 164 self.server.addCacheHook(self.announcer) … … 166 166 self.browsers = {} 167 167 168 def registerService(self,name,address="0.0.0.0",port=CL_PORT,type=CL_DOMAIN,properties={},records=[zeroconf._TYPE_A, zeroconf._TYPE_SRV, zeroconf._TYPE_TXT],ttl=zeroconf._DNS_TTL): 168 def printCache(self): 169 text = "\n" 170 name = "" 171 for i in self.server.cache.entries(): 172 try: 173 if name != i.name: 174 text += "\n%s\n" % (i.name) 175 name = i.name 176 text += "\t%s\t%s\t%s\n" % (zeroconf._CLASSES[i.clazz], zeroconf._TYPES[i.type], i) 177 except: 178 pass 179 return text 180 181 def registerService(self,name,address=[],port=CL_PORT,type=CL_DOMAIN,properties={},records=[zeroconf._TYPE_A, zeroconf._TYPE_SRV, zeroconf._TYPE_TXT],ttl=zeroconf._DNS_TTL): 169 182 # register the service and return a reference 170 if address:171 address = inet_aton(address) 183 assert isinstance(address,list) or isinstance(address,tuple) 184 172 185 svc = zeroconf.ServiceInfo( 173 186 type='%s' % (type), 174 187 name='%s.%s' % (name, type), 175 address= address,188 address=map(lambda x: inet_aton(x), address), 176 189 port=port, 177 190 weight=0, branches/0.4.7/lib/cxnet/cxnet/zeroconf.py
r1603 r1612 975 975 the read() method called when a socket is availble for reading.""" 976 976 977 def __init__(self, zeroconf ):977 def __init__(self, zeroconf, socket): 978 978 self.zeroconf = zeroconf 979 self.zeroconf.engine.addReader(self, self.zeroconf.socket) 979 self.socket = socket 980 self.zeroconf.engine.addReader(self, self.socket) 980 981 981 982 def handle_read(self): 982 data, (addr, port) = self. zeroconf.socket.recvfrom(_MAX_MSG_ABSOLUTE)983 data, (addr, port) = self.socket.recvfrom(_MAX_MSG_ABSOLUTE) 983 984 self.data = data 984 985 msg = DNSIncoming(data) … … 1110 1111 """Service information""" 1111 1112 1112 def __init__(self, type, name, address= None, port=None, weight=0, priority=0, properties={}, server=None, records=[_TYPE_A, _TYPE_SRV, _TYPE_TXT], ttl=_DNS_TTL):1113 def __init__(self, type, name, address=[], port=None, weight=0, priority=0, properties={}, server=None, records=[_TYPE_A, _TYPE_SRV, _TYPE_TXT], ttl=_DNS_TTL): 1113 1114 """Create a service description. 1114 1115 … … 1126 1127 self.type = type 1127 1128 self.name = name 1128 self.address = address 1129 if isinstance(address,tuple): 1130 self.address = list(address) 1131 elif isinstance(address,list): 1132 self.address = address 1133 else: 1134 self.address = [address,] 1129 1135 self.port = port 1130 1136 self.weight = weight … … 1219 1225 if record.type == _TYPE_A: 1220 1226 if record.name == self.name: 1221 self.address = record.address 1227 if not record.address in self.address: 1228 self.address.append(record.address) 1222 1229 elif record.type == _TYPE_SRV: 1223 1230 if record.name == self.name: … … 1243 1250 try: 1244 1251 zeroconf.addListener(self, DNSQuestion(self.name, _TYPE_ANY, _CLASS_IN)) 1245 while self.server is None or self.address is Noneor self.text is None:1252 while self.server is None or len(self.address) == 0 or self.text is None: 1246 1253 if last <= now: 1247 1254 return 0 … … 1354 1361 intf = None 1355 1362 1356 def __init__(self, bindaddress='', joinaddress=None, adaptive=False, heartbeat=False):1363 def __init__(self, address=[], adaptive=False, heartbeat=False): 1357 1364 """ 1358 1365 Creates an instance of the Zeroconf class, establishing 1359 1366 multicast communications, listening and reaping threads. 1360 1367 1361 bindaddress - address to bind() to (additional security besides of joinaddress) 1362 joinaddress - none, string or a tuple/list: on which interfaces to join the group 1368 bindaddress - address to bind() to 1363 1369 adaptive - DNS hack. When receives address 0.0.0.0, substitute it with sender's IP 1364 1370 heartbeat - run mDNS in the heartbeat mode 1365 1371 """ 1366 1372 globals()['_GLOBAL_DONE'] = 0 1367 self.intf = []1373 self.intf = {} 1368 1374 self.adaptive = adaptive 1369 if type(joinaddress) is types.NoneType: 1370 if bindaddress: 1371 self.intf.append(bindaddress) 1372 else: 1373 self.intf.append(socket.gethostbyname(socket.gethostname())) 1374 elif type(joinaddress) is types.StringType: 1375 self.intf.append(joinaddress) 1376 elif type(joinaddress) in (types.TupleType, types.ListType): 1377 self.intf = joinaddress 1378 else: 1379 raise Exception("choose correct interfaces to join on") 1380 1381 self.group = (bindaddress, _MDNS_PORT) 1382 self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1383 try: 1384 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1385 self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 1386 except: 1387 # SO_REUSEADDR should be equivalent to SO_REUSEPORT for 1388 # multicast UDP sockets (p 731, "TCP/IP Illustrated, 1389 # Volume 2"), but some BSD-derived systems require 1390 # SO_REUSEPORT to be specified explicity. Also, not all 1391 # versions of Python have SO_REUSEPORT available. So 1392 # if you're on a BSD-based system, and haven't upgraded 1393 # to Python 2.3 yet, you may find this library doesn't 1394 # work as expected. 1395 # 1396 pass 1397 self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 255) 1398 self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1) 1399 try: 1400 self.socket.bind(self.group) 1401 except: 1402 # Some versions of linux raise an exception even though 1403 # the SO_REUSE* options have been set, so ignore it 1404 # 1405 pass 1406 for i in self.intf: 1407 self.addIntf(i) 1375 1376 assert isinstance(address,list) or isinstance(address,tuple) 1377 1378 1379 for i in address: 1380 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1381 try: 1382 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 1383 s.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 1384 except: 1385 # SO_REUSEADDR should be equivalent to SO_REUSEPORT for 1386 # multicast UDP sockets (p 731, "TCP/IP Illustrated, 1387 # Volume 2"), but some BSD-derived systems require 1388 # SO_REUSEPORT to be specified explicity. Also, not all 1389 # versions of Python have SO_REUSEPORT available. So 1390 # if you're on a BSD-based system, and haven't upgraded 1391 # to Python 2.3 yet, you may find this library doesn't 1392 # work as expected. 1393 # 1394 pass 1395 s.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_TTL, 255) 1396 s.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_LOOP, 1) 1397 s.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(i) + socket.inet_aton('0.0.0.0')) 1398 s.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton(i)) 1399 try: 1400 s.bind(("0.0.0.0", _MDNS_PORT)) 1401 except: 1402 # Some versions of linux raise an exception even though 1403 # the SO_REUSE* options have been set, so ignore it 1404 # 1405 pass 1406 self.intf[i] = s 1408 1407 1409 1408 self.hooks = [] 1410 1409 self.listeners = [] 1410 self.listns = [] 1411 1411 self.browsers = [] 1412 1412 self.services = {} … … 1417 1417 1418 1418 self.engine = Engine(self) 1419 self.listener = Listener(self) 1419 for i in self.intf.values(): 1420 self.listns.append(Listener(self,i)) 1421 1420 1422 self.reaper = Reaper(self) 1421 1423 self.heartbeat = None … … 1424 1426 self.heartbeat = Heartbeat(self) 1425 1427 1426 def addIntf(self,addr):1427 '''1428 Subscribe to multicast group on an interface1429 '''1430 self.socket.setsockopt(socket.SOL_IP, socket.IP_MULTICAST_IF, socket.inet_aton(addr) + socket.inet_aton('0.0.0.0'))1431 try:1432 self.socket.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton(addr))1433 except Exception,e:1434 if e.args[0] == 98: # Address already in use1435 pass1436 else:1437 raise e1438 1439 1440 def isLoopback(self):1441 for i in self.intf:1442 if i.startswith("127.0.0.1"):1443 return True1444 return False1445 1446 def isLinklocal(self):1447 for i in self.intf:1448 if i.startswith("169.254."):1449 return True1450 return False1451 1428 1452 1429 def wait(self, timeout): … … 1511 1488 out.addAnswerAtTime(DNSText(info.name, _TYPE_TXT, _CLASS_IN, info.ttl, info.text), 0) 1512 1489 if info.address and _TYPE_A in info.records: 1513 out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, info.ttl, info.address), 0) 1490 for i in info.address: 1491 out.addAnswerAtTime(DNSAddress(info.server, _TYPE_A, _CLASS_IN, info.ttl, i), 0) 1514 1492 self.send(out) 1515 1493 iterations -= 1 … … 1719 1697 # This is a quick test to see if we can parse the packets we generate 1720 1698 #temp = DNSIncoming(out.packet()) 1721 try: 1722 bytes_sent = self.socket.sendto(out.packet(), 0, (addr, port)) 1723 except: 1724 # Ignore this, it may be a temporary loss of network connection 1725 pass 1699 for i in self.intf.values(): 1700 try: 1701 bytes_sent = i.sendto(out.packet(), 0, (addr, port)) 1702 except: 1703 # Ignore this, it may be a temporary loss of network connection 1704 pass 1726 1705 1727 1706 def close(self): … … 1733 1712 self.engine.notify() 1734 1713 self.unregisterAllServices() 1735 try: 1736 # there are cases, when we start mDNS without network 1737 self.socket.setsockopt(socket.SOL_IP, socket.IP_DROP_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0')) 1738 except: 1739 pass 1740 self.socket.close() 1714 for i in self.intf.values(): 1715 try: 1716 # there are cases, when we start mDNS without network 1717 i.setsockopt(socket.SOL_IP, socket.IP_DROP_MEMBERSHIP, socket.inet_aton(_MDNS_ADDR) + socket.inet_aton('0.0.0.0')) 1718 except: 1719 pass 1720 i.close() 1741 1721 1742 1722 # Test a few module features, including service registration, service branches/0.4.7/shell/bus/dispatch.py
r1606 r1612 208 208 209 209 # routing engine 210 sock = None 210 211 server = None 211 sock = None 212 213 def __init__(self,name="dispatcher2",sname=None,address="0.0.0.0",join=[],port=CL_PORT,psk=None): 212 213 def __init__(self,name="dispatcher2",sname=None,address=[],port=CL_PORT,psk=None): 214 214 ''' 215 215 Creates a new dispatcher2 thread named by name argument … … 219 219 address - address to bind the service on 220 220 port - port to bind the service on 221 join - string or tuple of strings - address[es] to use for set up multicast membership222 221 ''' 223 222 Thread.__init__(self) … … 231 230 # set up ZeroConf mDNS 232 231 self.sname = sname 233 self.mdns = mDNSengine(address, join,self.connect())232 self.mdns = mDNSengine(address, self.connect()) 234 233 self.mdns.registerDomain(CL_DOMAIN) 235 self.svc = self.mdns.registerService(sname,address,port,type="%s." % (CL_DOMAIN),properties={"state":"running","role":"_cx._udp. testbed at %s" % (sname)}) 234 self.svc = self.mdns.registerService(sname,address,port,type="%s." % (CL_DOMAIN),properties={"state":"booming"}) 235 236 # set up routing 237 self.sock = cl_socket("0.0.0.0",port,self.connect(),keyfile=psk,debug=False) 236 238 237 239 # set up lockd … … 239 241 self.lockd.start() 240 242 self.subscribe(self.lockd.bus.address, "lockd") 241 242 # set up routing243 self.sock = cl_socket(address,port,self.connect(),keyfile=psk,debug=False)244 243 245 244 … … 412 411 413 412 elif text == "dns": 414 if self.sock: 415 text = "\n" 416 name = "" 417 # FIXME: it breaks incapsulation 418 for i in self.mdns.server.cache.entries(): 419 if name != i.name: 420 text += "\n%s\n" % (i.name) 421 name = i.name 422 text += "\t%s\t%s\t%s\n" % (repr(zeroconf._CLASSES[i.clazz]), repr(zeroconf._TYPES[i.type]), repr(i)) 423 self.put({"to": addr, "from": "dispatcher2", "data": text}) 413 if self.mdns: 414 self.put({"to": addr, "from": "dispatcher2", "data": self.mdns.printCache()}) 424 415 else: 425 416 self.put({"to": addr, "from": "dispatcher2", "data": "mDNS service is disabled"}) branches/0.4.7/shell/connexion.py
r1611 r1612 106 106 socketN_a = "" 107 107 socketN_p = "" 108 cluster_a = "0.0.0.0" 109 cluster_m = joinaddr() 108 cluster_b = joinaddr() 110 109 cluster_p = 40323 111 110 cluster_name = hostname() … … 139 138 # clusterization (disabled by default) 140 139 # 141 -b <addr> cluster manager address (default: 0.0.0.0) 140 -b <addr> cluster manager address (default: all available 141 primary addresses) 142 142 -c <port> cluster manager port (default: 40323) 143 -m <address> join multicast group on this address144 143 -N <name> use name for mDNS service instance (if set up, 145 144 enables clusterization capabilities) … … 172 171 173 172 try: 174 (opts,left) = getopt(sys.argv[1:],"b:c:dD:hk:l: m:N:n:p:r:s:t:w:W:x")173 (opts,left) = getopt(sys.argv[1:],"b:c:dD:hk:l:N:n:p:r:s:t:w:W:x") 175 174 except Exception,e: 176 175 print e … … 180 179 for (i,k) in opts: 181 180 if i == "-b": 182 cluster_a = k 181 if autoconf: 182 cluster_b = [] 183 autoconf = False 184 cluster_b.append(k) 183 185 elif i == "-c": 184 186 cluster_p = int(k) 185 elif i == "-m":186 if autoconf:187 cluster_m = []188 autoconf = False189 cluster_m.append(k)190 187 elif i == "-N": 191 188 cluster_name = k … … 247 244 name = "dispatcher2", 248 245 sname = cluster_name, 249 address = cluster_ a,246 address = cluster_b, 250 247 port = cluster_p, 251 join = cluster_m,252 248 psk = cluster_psk 253 249 )
