""" This file is part of Mokonnect. Mokonnect is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License Version 3 as published by the Free Software Foundation. Mokonnect is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Mokonnect. If not, see . """ # # mkdev_wifi.py # mokonnect device wifi # import struct import mkbase # ugly connman shit import time import os # static configuration import socket import struct import fcntl class ModePanel(mkbase.MKPanel): def __init__(self,win): mkbase.MKPanel.__init__(self,win) self.gui = { "type": "frame", "label": "Interface Configuration", "align": (-1.0,0.0), "content": { "type": "box", "content": [ {"type":"check","label":"Default","config_link":"default"}, ] } } self.config = {"default":True} class IPConfigPanel(mkbase.MKPanel): def __init__(self,win): mkbase.MKPanel.__init__(self,win) self.gui = { "name": "ipconfig", "type": "frame", "label": "IP Configuration", "align": (-1.0,0.0), "content": { "type": "table", "cols": 3, "content": [ [ {"type":"label","label":"IP Address"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"entry","config_link":"ip"} ], [ {"type":"label","label":"Netmask"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"entry","config_link":"netmask"} ], [ {"type":"label","label":"Gateway"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"entry","config_link":"gateway"} ], [ {"type":"label","label":"DNS Server"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"entry","config_link":"nameserver"} ], ] } } self.config = {"ip": "10.0.0.5","netmask":"255.255.255.0","gateway":"10.0.0.138","nameserver":"10.0.0.138"} class ClientPanel(mkbase.MKPanel): def __init__(self,win,pager,bus,device,powerfunc): self.bus = bus self.pager = pager self.powerfunc = powerfunc self.device = device mkbase.MKPanel.__init__(self,win) self.gui = { "type": "frame", "label": "Network Configuration", "align": (-1.0,0.0), "content": { "type": "table", "cols": 3, "content": [ [ {"type":"button","label":"Scan","align":(-1,0),"clicked":self.ScanClicked}, ], [ {"type":"label","label":"Mode"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type": "button","align":(-1,0),"config_link":"mode","save_object":"mode_button","clicked": self.ModeClicked}, ], [ {"type":"label","label":"SSID"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"entry","config_link":"ssid","save_object":"net_name"} ], [ {"type":"label","label":"Authentication"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"button","config_link":"auth","save_object":"auth_button","clicked":self.AuthClicked} ], [ {"type":"label","label":"Password"}, {"type":"box","align":(-1.0,0.0),"weight":(1,1)}, {"type":"entry","config_link":"pass","save_object":"net_pass"} ], [{"type":"check","label":"Hidden Network","config_link":"hidden"}], [{"type":"check","label":"Use DHCP","config_link":"dhcp","panel_link":"ipconfig","panel_link_reverse":None}], ] } } self.config = {"ssid": "ap_name","auth":"none","pass":"Password","dhcp":True,"mode":"managed","hidden":False} # config shared hover panel self.hover = mkbase.MKHoverChoice(win,"Authentication",["none","wep","wpa","wpa2"]) self.hover.config = self.config # a hover sharing config self.mode_hover = mkbase.MKHoverChoice(win,"Wifi Mode",["managed","adhoc"]) self.mode_hover.config = self.config # scan box self.btnlist = [] self.scanning = False self.scanbox_config = {} self.scanbox_gui = { "type": "box", "content": [ {"type":"scroller","bounce":(0,0),"align":(-1,-1),"weight":(1,1),"content": { "type": "frame", "label": "Scanning...", "save_object": "scanbox_frame", "align": (-1,0), "weight": (1,1), "content": { "type": "box", "align": (-1,0), "weight": (1,1), "save_object": "scanlistbox", "content": [] # filled dynamically by refresh networks } }}, {"type":"button","label":"back","clicked":self.ScanboxBackClicked} ] } def ScanboxBackClicked(self,*args,**kargs): print "ScanboxBackClicked" self.scanning = False self.pager.content_pop() def ModeClicked(self,*args,**kwargs): self.mode_hover.BuildHoverChoice(self.config["obj_mode_button"]) def AuthClicked(self,*args,**kwargs): self.hover.BuildHoverChoice(self.config["obj_auth_button"]) def _dummy_log(self,msg): print msg return def _GetDevice(self): # find wifi device self.device = None for dpath in self.bus["connman"]["/"]["Manager"]["Devices"]: device = self.bus["connman"][dpath]["Device"] try: if device["Type"] == "wifi": self.device = device except KeyError: pass def ScanClicked(self,*args,**kwargs): # check power state and see if we find the device if not self.powerfunc(self._dummy_log): return self._GetDevice() if None == self.device: print "Error: this should never happen!" return # build the gui and prepare the box scanbox_obj = mkbase.EFLBuildObject(self.win,self.scanbox_gui,self.scanbox_config) self.pager.content_push(scanbox_obj) self.bus["connman"].AddSignalFunction(self.DbusSignal) self.btnlist = [] self.RefreshNetworks() self.scanning = True if self.device["Powered"] == False: print self.device.GetProperties() self.device["Powered"] = True print self.device.GetProperties() try: self.device.ProposeScan() except Exception,e: print "Exception on proposescan: %s" % str(e) def RefreshNetworks(self): for b in self.btnlist: b[0].delete() self.btnlist = [] svclist = self.bus["connman"]["/"]["Manager"]["Services"] for svcpath in svclist: netprop = self.bus["connman"][svcpath]["Service"].GetProperties() if not "Strength" in netprop: netprop["Strength"] = 0 if netprop["Type"] == "wifi": # latest connman can have a wifi network without a name.... try: if netprop["Name"]: pass except: netprop["Name"]='unknown' gui_netbtn = { "type": "button", "label": "%s (%s,%d%%)" % (netprop["Name"],netprop["Security"],netprop["Strength"]), "align": (-1,0), "weight": (1,1), "clicked": self.NetworkSelected, } netbtn = mkbase.EFLBuildObject(self.win,gui_netbtn,{}) self.btnlist.append([netbtn,netprop]) self.scanbox_config["obj_scanlistbox"].pack_end(netbtn) def DbusSignal(self,*args,**kargs): if not self.scanning: return if kargs["signal"] == "PropertyChanged": if args[0] == "Scanning": if not args[1]: self.scanbox_config["obj_scanbox_frame"].label_set("Scan Complete") else: self.scanbox_config["obj_scanbox_frame"].label_set("Scanning...") if args[0] == "Services": self.RefreshNetworks() def NetworkSelected(self,obj,*args,**kwargs): self.scanning = False # find button netprop = None for b in self.btnlist: if b[0] == obj: netprop = b[1] if not netprop: print "ERROR: Shouldnt happen! no net properties to match the button" return # we set the network data self.config["obj_net_name"].entry_set(netprop["Name"]) self.config["obj_auth_button"].label_set(netprop["Security"]) self.config["obj_auth_button"].name_set(netprop["Security"]) self.config["obj_mode_button"].label_set(netprop["Mode"]) self.config["obj_mode_button"].name_set(netprop["Mode"]) self.config["cfg_hidden"].state_set(0) # we pop the network selection screen self.pager.content_pop() class WifiDevice(mkbase.MKDevice): def __init__(self,win,pager,bus): mkbase.MKDevice.__init__(self,win) self.name = "Wifi" self.bus = bus self.pager = pager self.powered = False self.device = None # panels self.panels = [ModePanel(win),ClientPanel(win,pager,bus,self.device,self.PowerOn),IPConfigPanel(win)] # vars self.looking_for_network = None self.active_timer = None self.timeout_connect = 60 self.timeout_connect_hidden = 70 self.timeout_scan = 30 def _GetDevice(self): # find wifi device self.device = None for dpath in self.bus["connman"]["/"]["Manager"]["Devices"]: device = self.bus["connman"][dpath]["Device"] try: if device["Type"] == "wifi": self.device = device except KeyError: pass def _GetBroadcast(self): ipb = socket.inet_aton(self.panels[2].config["ip"]) maskb = socket.inet_aton(self.panels[2].config["netmask"]) ipl = struct.unpack("!L",ipb)[0] maskl = struct.unpack("!L",maskb)[0] rmaskl = 0xFFFFFFFF & (~maskl) bcastl = ipl | rmaskl bcastb = struct.pack("!L",bcastl) return socket.inet_ntoa(bcastb) def _ConnectToNetwork(self,found_network): # got network in found_network, update its security log = self.log if "none" != self.panels[1].config["auth"]: found_network["Passphrase"] = self.panels[1].config["pass"] # disconnect if already connected if not found_network["State"] in ["idle","failure"]: self.log("Already connected to this network, reconnecting...") try: found_network.Disconnect() except: pass # connect to it self.active_timer = mkbase.SetTimeout(self.timeout_connect,self._TimePassed) self.bus["connman"].AddSignalFunction(self.DbusSignal) log("Connecting to '%s'..." % self.panels[1].config["ssid"]) if self.panels[1].config["dhcp"]: log("Configuring using DHCP...") #found_network["IPv4.Method"] = "dhcp" else: log("Configuring using Static information...") #found_network["IPV4.Method"] = "static" found_network.Connect() def _TimePassed(self,*args,**kargs): self.log("Timeout reached, connection failed...") self.log("__DONE__") self.active_timer = None return False def _ConnmanPowerOn(self,log): wifi_dev = self.bus["ousaged"]["/org/freesmartphone/Usage"]["Usage"] print "Requesting resource WiFi" wifi_dev.RequestResource('WiFi') if not wifi_dev.GetResourceState('WiFi'): time.sleep(5) # ugly hack self.device = None self._GetDevice() if self.device == None: # ugly hack, restarting connmand print "killing and re-running connmand" os.system("killall connmand ; connmand -I usb0") time.sleep(5) # very ugly... self._GetDevice() if self.device == None: return False return True def PowerOff(self,log): wifi_dev = self.bus["ousaged"]["/org/freesmartphone/Usage"]["Usage"] if wifi_dev.GetResourceState('WiFi'): wifi_dev.ReleaseResource('WiFi') log("Wifi Device was powered off.") log("__DONE__") return True log("Wifi Device is already powered off.") log("__DONE__") return True def PowerOn(self,log): self._GetDevice() if None == self.device: log("Wifi device seems to be off, trying to power it on...") if not self._ConnmanPowerOn(log): log("Failed powering on the device or it was still not found in connman.") log("__DONE__") return False return True def _ConnectHidden(self): net_opts = {"Type":"wifi"} net_opts["SSID"] = self.panels[1].config["ssid"] net_opts["Security"] = self.panels[1].config["auth"] net_opts["Mode"] = self.panels[1].config["mode"] if "none" != net_opts["Security"]: net_opts["Passphrase"] = self.panels[1].config["pass"] self.active_timer = mkbase.SetTimeout(self.timeout_connect_hidden,self._TimePassed) self.bus["connman"].AddSignalFunction(self.DbusSignal) # check that we are not connected to it already found_network = None svclist = self.bus["connman"]["/"]["Manager"]["Services"] for svcpath in svclist: svcobj = self.bus["connman"][svcpath]["Service"] try: if svcobj["Name"] == self.panels[1].config["ssid"]: found_network = svcobj break except: pass if found_network: if not found_network["State"] in ["idle","failure"]: print "netstate %s" % found_network["State"] self.log("Already connected to this network, reconnecting...") try: found_network.Disconnect() except: pass # connect to it... print "Net Options: %s" % str(net_opts) res = self.bus["connman"]["/"]["Manager"].ConnectService(net_opts) print res return False def Apply(self,log): self.log = log self.sent_done = False self.looking_for_network = None # check power state and see if we find the device if not self.PowerOn(log): return log("Wifi device: %s" % self.device["Interface"]) # check if we are connecting to a hidden network if self.panels[1].config["hidden"]: self.log("Connecting to hidden network '%s'..." % self.panels[1].config["ssid"]) mkbase.SetTimeout(2,self._ConnectHidden) return # check if the current network exists in network list found_network = None svclist = self.bus["connman"]["/"]["Manager"]["Services"] for svcpath in svclist: svcobj = self.bus["connman"][svcpath]["Service"] try: if svcobj["Name"] == self.panels[1].config["ssid"]: found_network = svcobj break except: pass if not found_network: # create a new network log("Network '%s' is not currently visible." % self.panels[1].config["ssid"]) log("Triggering scan for %d seconds..." % self.timeout_scan) self.active_timer = mkbase.SetTimeout(self.timeout_scan,self._TimePassed) self.looking_for_network = self.panels[1].config["ssid"] self.bus["connman"].AddSignalFunction(self.DbusSignal) try: self.device.ProposeScan() except Exception,e: print "Exception on proposescan: %s" % str(e) # relegating the rest of connection logic to the callback functions return # trigger connection self._ConnectToNetwork(found_network) def _GetBroadcast(self): ipb = socket.inet_aton(self.panels[2].config["ip"]) maskb = socket.inet_aton(self.panels[2].config["netmask"]) ipl = struct.unpack("!L",ipb)[0] maskl = struct.unpack("!L",maskb)[0] rmaskl = 0xFFFFFFFF & (~maskl) bcastl = ipl | rmaskl bcastb = struct.pack("!L",bcastl) return socket.inet_ntoa(bcastb) def _ConfigStatic(self,ifname): ifname = str(ifname) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) iface = struct.pack('256s', ifname[:15]) # Setting IP info = fcntl.ioctl(sock.fileno(), 0x8915, iface) # SIOCGIFADDR info = info[:20] + socket.inet_aton(self.panels[2].config["ip"]) + info[24:] info = fcntl.ioctl(sock.fileno(), 0x8916, info) # SIOCSIFADDR self.log("IP: %s" % self.panels[2].config["ip"]) # Setting Netmask info = fcntl.ioctl(sock.fileno(), 0x891b, iface) # SIOCGIFNETMASK info = info[:20] + socket.inet_aton(self.panels[2].config["netmask"]) + info[24:] info = fcntl.ioctl(sock.fileno(), 0x891c, info) # SIOCSIFNETMASK self.log("Netmask: %s" % self.panels[2].config["netmask"]) # Setting broadcast bcast = self._GetBroadcast() info = fcntl.ioctl(sock.fileno(), 0x8919, iface) # SIOCGIFBRDADDR info = info[:20] + socket.inet_aton(bcast) + info[24:] info = fcntl.ioctl(sock.fileno(), 0x891a, info) # SIOCSIFBRDADDR # gateway for show self.log("Gateway: %s" % self.panels[2].config["gateway"]) # nameserver append to the /etc/resolv.conf fh = file("/etc/resolv.conf","at") fh.write("nameserver %s\n" % self.panels[2].config["nameserver"]) fh.close() self.log("Nameserver: %s" % self.panels[2].config["nameserver"]) # done closing sock.close() self.log("Done!") self.log("__DONE__") self.sent_done = True def _KillUdhcp(self,iface): iff = "/var/run/connman/udhcpc.%s.pid" % iface try: fh = file(iff,"rt") except: return pid = int(fh.read()) fh.close() os.system("kill %d" % pid) def DbusSignal(self,*args,**kargs): #print "DeviceSignal - %s - %s" % (str(args),str(kargs)) if kargs["signal"] == "PropertyChanged": #print "Property changed! %s %s" % (str(args),str(kargs)) if args[0] == "Connections": # find the connection belonging to this device... if self.sent_done: return dname = self.device.iface.object_path for conpath in args[1]: if dname in conpath: # found our connecion # cancel timeout if self.active_timer: self.active_timer.delete() self.active_timer = None # print con details and flag done con = self.bus["connman"][conpath]["Connection"] cprop = con.GetProperties() print cprop self.log("IP: %s" % cprop["IPv4.Address"]) self.log("Netmask: %s" % cprop["IPv4.Netmask"]) self.log("Gateway: %s" % cprop["IPv4.Gateway"]) self.log("Nameserver: %s" % cprop["IPv4.Nameserver"]) self.log("Done!") self.log("__DONE__") self.sent_done = True return if args[0] == "State": print "state of '%s' changed to '%s'!" % (kargs["path"],args[1]) if args[1] == "configuration": if self.sent_done: return # do we need to do static configuration? if not self.panels[1].config["dhcp"]: # kill timer if self.active_timer: self.active_timer.delete() self.active_timer = None # config static self._KillUdhcp(self.device["Interface"]) try: self._ConfigStatic(self.device["Interface"]) except Exception,e: self.log("Error configuring static information: %s" % str(e)) self.log("__DONE__") return # network lookup logic if self.looking_for_network != None: if kargs["signal"] == "PropertyChanged": if args[0] == "Services": for svcpath in args[1]: svcobj = self.bus["connman"][svcpath]["Service"] try: if svcobj["Name"] == self.panels[1].config["ssid"]: # we found the network, cancel looking_for_network self.log("Network '%s' was found, proceeding..." % self.looking_for_network) self.looking_for_network = None # reset timer if self.active_timer: self.active_timer.delete() self.active_timer = None # connect self._ConnectToNetwork(svcobj) break except Exception,e: print e