r4527 - developers/john_lee/pyfso
john_lee at docs.openmoko.org
john_lee at docs.openmoko.org
Tue Jul 15 16:03:33 CEST 2008
Author: john_lee
Date: 2008-07-15 16:03:32 +0200 (Tue, 15 Jul 2008)
New Revision: 4527
Added:
developers/john_lee/pyfso/__init__.py
developers/john_lee/pyfso/accelerometer.py
developers/john_lee/pyfso/dialer.py
developers/john_lee/pyfso/fso_backend.py
developers/john_lee/pyfso/general.py
Log:
stuffs abstracted from another project trying to create a python
binding for fso dbus api.
* accelerometer.py needs to be integrated into frameworkd
* dialer.py a simple dialer implementation that depends on
fso_backend.py
* fso_backend.py a dbus wrapper. the biggest problem is that it
uses main loop from e_dbus.
* general.py some general base classes. Singleton pattern still got
bug.
Added: developers/john_lee/pyfso/__init__.py
===================================================================
--- developers/john_lee/pyfso/__init__.py (rev 0)
+++ developers/john_lee/pyfso/__init__.py 2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,4 @@
+from accelerometer import Accelerometer, MockAccelerometer, \
+ InputDevAccelerometer, Gta02Accelerometer, get_xy_theta
+
+from dialer import Dialer, MockDialer, FSODialer
Added: developers/john_lee/pyfso/accelerometer.py
===================================================================
--- developers/john_lee/pyfso/accelerometer.py (rev 0)
+++ developers/john_lee/pyfso/accelerometer.py 2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,274 @@
+"""
+Accelerometer class. It can run in two different modes. In pull
+mode, it accepts read request then retrieve accelerometer values. In
+daemon mode, it updates itself as often as possible, notifies the
+observers about value changes and answers request with its current
+state.
+
+(C) 2008 John Lee <john_lee at openmoko.com>
+(C) 2008 Openmoko, Inc.
+GPLv2 or later
+"""
+from __future__ import with_statement
+from general import Subject
+
+import math
+import os
+import struct
+from threading import RLock
+from time import sleep
+
+class Accelerometer(Subject):
+ """The base class of all accelerometers.
+ """
+
+ def __new__(cls, *args, **kwargs):
+ cls.sample_rate = property(cls._get_sample_rate, cls._set_sample_rate)
+ obj = object.__new__(cls, *args, **kwargs)
+ return obj
+
+ def __init__(self, device, sample_rate=None):
+ """device: the name of the input
+ sample_rate: how many samples per second
+ """
+ super(Accelerometer, self).__init__()
+ self._statelock = RLock()
+ self._state = (0, -1000, 0)
+ self._open_device(device)
+ self._daemonized = False
+ if sample_rate is not None:
+ self.sample_rate = sample_rate
+
+ def _open_device(self, device):
+ raise NotImplementedError
+
+ def _get_sample_rate(self):
+ """get sample rate
+ """
+ raise NotImplementedError
+
+ def _set_sample_rate(self, sample_rate):
+ """set sample rate
+ """
+ raise NotImplementedError
+
+ def _retrieve(self):
+ """read and parse the current values. could be time
+ consuming.
+ """
+ raise NotImplementedError
+
+ @property
+ def state(self):
+ """return the current state
+ """
+ if self._daemonized:
+ with self._statelock:
+ return self._state
+ else:
+ state = self._retrieve()
+ with self._statelock:
+ self._state = state
+ return self._state
+
+ def daemonize(self):
+ """a infinite loop to update self.state. will notify
+ observers about state changes.
+ """
+ self._daemonized = True
+ while self._daemonized:
+ state = self._retrieve()
+ with self._statelock:
+ self._state = state
+ self._notify(self._state)
+
+ def stop(self):
+ """stop daemon
+ """
+ self._daemonized = False
+
+
+class MockAccelerometer(Accelerometer):
+ """Mock accelerometer class.
+ >>> g = MockAccelerometer()
+ >>> g.sample_rate
+ 100
+ >>> g.sample_rate = 400
+ >>> g.sample_rate
+ 400
+ """
+
+ def __init__(self):
+ super(MockAccelerometer, self).__init__(None)
+ self.sample_rate = 100
+
+ def _open_device(self, device):
+ pass
+
+ def _set_sample_rate(self, sample_rate):
+ self._sample_rate = sample_rate
+
+ def _get_sample_rate(self):
+ return self._sample_rate
+
+ def _retrieve(self):
+ sleep(1/self._sample_rate)
+ return (0, -1000, 0)
+
+
+class InputDevAccelerometer(Accelerometer):
+ """Read values from kernel input device
+ """
+
+ # Event types
+ EV_SYN = 0x00
+ EV_KEY = 0x01
+ EV_REL = 0x02
+ EV_ABS = 0x03
+ EV_MSC = 0x04
+ EV_SW = 0x05
+ EV_LED = 0x11
+ EV_SND = 0x12
+ EV_REP = 0x14
+ EV_FF = 0x15
+ EV_PWR = 0x16
+ EV_FF = 0x17
+ EV_MAX = 0x1f
+ EV_CNT = (EV_MAX+1)
+
+ # Relative axes
+ REL_X = 0x00
+ REL_Y = 0x01
+ REL_Z = 0x02
+ REL_RX = 0x03
+ REL_RY = 0x04
+ REL_RZ = 0x05
+ REL_HWHEEL = 0x06
+ REL_DIAL = 0x07
+ REL_WHEEL = 0x08
+ REL_MISC = 0x09
+ REL_MAX = 0x0f
+ REL_CNT = REL_MAX + 1
+
+ input_event_struct = "@llHHi"
+ input_event_size = struct.calcsize(input_event_struct)
+
+ def __init__(self, *args, **kwargs):
+ super(InputDevAccelerometer, self).__init__(*args, **kwargs)
+
+ def _open_device(self, device):
+ self.device = os.open(device, os.O_RDONLY | os.O_SYNC)
+
+ def _unpack(self):
+ """struct input_event {
+ struct timeval time; /* (long, long) */
+ __u16 type;
+ __u16 code;
+ __s32 value;
+ };
+ return (tv_sec, tv_usec, type, code, value)
+ """
+ for i in xrange(0, 5):
+ data = os.read(self.device, InputDevAccelerometer.input_event_size)
+ if len(data) >= InputDevAccelerometer.input_event_size:
+ break;
+ else:
+ raise Exception()
+
+ return struct.unpack(InputDevAccelerometer.input_event_struct, data)
+
+ def _unpack_xyz(self):
+ """return a 3 tuple
+ """
+ # wait for EV_SYN
+ while self._unpack()[2] != InputDevAccelerometer.EV_SYN:
+ pass
+ # now return (x, y, z)
+ return (self._unpack()[4], self._unpack()[4], self._unpack()[4])
+
+
+class Gta02Accelerometer(InputDevAccelerometer):
+ """Read values from gta02. for now we use just one.
+ >>> g = Gta02Accelerometer()
+ >>> g.sample_rate = 400
+ >>> g.sample_rate
+ 400
+ >>> g.sample_rate = 100
+ >>> g.sample_rate
+ 100
+ """
+
+ INPUT_DEV = '/dev/input/event3'
+ SYS_SAMPLE_RATE = '/sys/devices/platform/spi_s3c24xx_gpio.1/spi0.1/sample_rate'
+
+ def __init__(self, device=None, sample_rate=None):
+ if device == None:
+ device = Gta02Accelerometer.INPUT_DEV
+ super(Gta02Accelerometer, self).__init__(device, sample_rate)
+
+ def _set_sample_rate(self, sample_rate):
+ """possible values: 100, 400
+ """
+ f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'w', 0)
+ f.write('%d\n' % sample_rate)
+ f.close()
+
+ def _get_sample_rate(self):
+ f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'r', 0)
+ sample_rate = int(f.read())
+ f.close()
+ return sample_rate
+
+ def _retrieve(self):
+ return self._unpack_xyz()
+
+
+# shamelessly stole from olv
+def get_xy_theta(u):
+ """get the angle related to (0, -1), clockwise.
+ return 0 < theta < 2pi.
+ >>> get_xy_theta((0, -1000, 0))
+ 0.0
+ """
+ uu = u[0] * u[0] + u[1] * u[1]
+ theta = math.acos(-u[1] / math.sqrt(uu))
+ if (u[0] < 0):
+ theta = 2 * math.pi - theta
+ return theta
+
+
+class DumpObserver(object):
+ """Does nothing but print the subject state when notified.
+ """
+
+ def __init__(self, subject=None):
+ if subject == None:
+ subject = MockAccelerometer()
+ subject.attach(self)
+ self.subject = subject
+
+ def update(self, state):
+ t = get_xy_theta(state)
+ print 'x = %d, y = %d, z = %d, theta = %f pi' % (state + (t / math.pi, ))
+
+
+def dump():
+ try:
+ gsensor = Gta02Accelerometer(sample_rate=400)
+ except OSError:
+ gsensor = MockAccelerometer()
+ DumpObserver(gsensor)
+ gsensor.daemonize()
+
+
+def _doctest():
+ try:
+ import doctest
+ except ImportError:
+ return
+ else:
+ doctest.testmod()
+
+
+if __name__ == '__main__':
+ _doctest()
Added: developers/john_lee/pyfso/dialer.py
===================================================================
--- developers/john_lee/pyfso/dialer.py (rev 0)
+++ developers/john_lee/pyfso/dialer.py 2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,139 @@
+from general import Subject
+from fso_backend import FSOObject
+from dbus import DBusException
+from threading import Timer
+
+class Dialer(Subject):
+ """dial, pickup and hangup should be called by controller. state
+ change like 'incoming', 'active', 'outgoing' and 'release' should
+ be set directly to dialer.state
+ """
+ def __init__(self):
+ Subject.__init__(self)
+
+ def dial(self, number):
+ raise NotImplementedError
+
+ def pickup(self):
+ raise NotImplementedError
+
+ def hangup(self):
+ raise NotImplementedError
+
+ # make this read only to hint this should not be changed from
+ # outside.
+ @property
+ def state(self):
+ return self._state
+
+ def _set_state(self, state):
+ self._state = state
+ self._notify(state)
+
+
+class MockDialer(Dialer):
+ def __init__(self):
+ """initiate an incoming call first
+ """
+ super(MockDialer, self).__init__()
+ self._set_state('incoming')
+
+ def dial(self, number):
+ print 'mock dialer dial:', number
+ Timer(1, self._set_state, args=('outgoing', )).start()
+
+ def pickup(self):
+ print 'mock dialer pickup'
+ Timer(1, self._set_state, args=('active', )).start()
+
+ def hangup(self):
+ print 'mock dialer hangup'
+ Timer(1, self._set_state, args=('release', )).start()
+
+ def _set_state(self, state):
+ print 'mock dialer', state
+ super(MockDialer, self)._set_state(state)
+
+
+def need_registered(real_f):
+ def decorator(self, *args, **kwargs):
+ if self.registered:
+ return real_f(self, *args, **kwargs)
+ else:
+ return False
+ return decorator
+
+
+def need_initialized(real_f):
+ def decorator(self, *args, **kwargs):
+ if self.initialized:
+ return real_f(self, *args, **kwargs)
+ else:
+ return False
+ return decorator
+
+
+class FSODialer(Dialer):
+ """A Dialer implementation based on fso dbus API.
+ """
+ def __init__(self):
+ super(FSODialer, self).__init__()
+ self.callid = None
+ self.initialized = False
+ self.registered = False
+ self.fso = FSOObject()
+ from threading import Thread
+ # put the slow init function into background
+ Thread(target=self._init).start()
+ # FIXME: get state from fso and set self state
+ self._set_state('release')
+
+ def _init(self):
+ if not self.fso.initialize():
+ return
+ self.fso.onCallStatus.append(self.on_call_status)
+ self.initialized = True
+ if not self._register():
+ return
+ self.registered = True
+ return True
+
+ def _register(self):
+ if self.fso.gsm_network_iface.GetStatus()['registration'] == 'unregistered':
+ try:
+ self.fso.gsm_device_iface.SetAntennaPower(True)
+ self.fso.gsm_network_iface.Register()
+ print 'registered'
+ except DBusException, e:
+ # FIXME pin number?
+ print e
+ self.registered = False
+ return False
+ return True
+
+ @need_registered
+ def dial(self, number):
+ self.fso.gsm_call_iface.Initiate(number, "voice")
+
+ @need_registered
+ def hangup(self):
+ self.fso.gsm_call_iface.Release(self.callid)
+
+ @need_registered
+ def pickup(self):
+ self.fso.gsm_call_iface.Activate(self.callid)
+
+ @need_registered
+ def on_call_status(self, id, status, properties):
+ self._set_state(status)
+ self.callid = id
+
+
+def _test():
+ import ecore
+ FSODialer()
+ ecore.main_loop_begin()
+
+
+if __name__ == "__main__":
+ _test()
Added: developers/john_lee/pyfso/fso_backend.py
===================================================================
--- developers/john_lee/pyfso/fso_backend.py (rev 0)
+++ developers/john_lee/pyfso/fso_backend.py 2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,185 @@
+import e_dbus
+import os
+from dbus import DBusException, SystemBus, Interface
+from general import Singleton
+
+class FSOObject(Singleton):
+ """ based on the code from zhone but it's a singleton class now.
+ """
+
+ def __init__( self ):
+ self.objects = {}
+ self.onResourceChanged = []
+ self.onCallStatus = []
+ self.onNetworkStatus = []
+ self.onIdleStateChanged = []
+ self.onSignalStrength = []
+ self.ignoreSuspend = False
+
+ self.framework_obj = None
+ self.gsm_device_obj = None
+ self.gsm_device_iface = None
+ self.usage_iface = None
+ self.device_iface = None
+ self.device_power_iface = None
+ self.idlenotifier_obj = None
+ self.idlenotifier_iface = None
+ self.inputnotifier_obj = None
+ self.inputnotifier_iface = None
+ self.display_obj = None
+ self.display_iface = None
+
+ self.fullinit = False
+
+ def tryGetProxy( self, busname, objname ):
+ obj = None
+ try:
+ obj = self.objects[ "%s:%s" % ( busname, objname ) ]
+ except KeyError:
+ try:
+ obj = self.bus.get_object( busname, objname )
+ except DBusException, e:
+ print "could not create proxy for %s:%s" % ( busname, objname ), e
+ else:
+ self.objects[ "%s:%s" % ( busname, objname ) ] = obj
+ return obj
+
+ def initialize( self ):
+ if self.fullinit:
+ return True
+ try:
+ self.bus = SystemBus( mainloop=e_dbus.DBusEcoreMainLoop() )
+ except DBusException, e:
+ print "could not connect to dbus_object system bus:", e
+ return False
+
+ # Framework
+ fw_obj = self.tryGetProxy( 'org.freesmartphone.frameworkd', '/org/freesmartphone/Framework' )
+ if fw_obj is None:
+ print ( "could not connect to org.freesmartphone.frameworkd -- is the framework daemon started?" )
+ return False
+ else:
+ self.fw = Interface( fw_obj, "org.freesmartphone.Objects" )
+ failcount = 0
+
+ # Usage
+ self.usage_obj = self.tryGetProxy( 'org.freesmartphone.ousaged', '/org/freesmartphone/Usage' )
+ if ( self.usage_obj is not None ) and ( self.usage_iface is None ):
+ self.usage_iface = Interface(self.usage_obj, 'org.freesmartphone.Usage')
+ self.usage_iface.connect_to_signal( "ResourceChanged", self.cbResourceChanged )
+ self.usage_iface.RequestResource("GSM")
+ if self.usage_obj is None:
+ failcount += 1
+ else:
+ print "usage ok", self.usage_iface
+
+ # Phone
+ self.gsm_device_obj = self.tryGetProxy( 'org.freesmartphone.ogpsd', '/org/freesmartphone/GSM/Device' )
+ print self.gsm_device_obj
+
+ if ( self.gsm_device_obj is not None ) and ( self.gsm_device_iface is None ):
+ print "creating gsm interfaces"
+ self.gsm_device_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Device')
+ self.gsm_sim_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.SIM')
+ self.gsm_network_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Network')
+ self.gsm_call_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Call')
+ self.gsm_test_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Test')
+ self.gsm_call_iface.connect_to_signal( "CallStatus", self.cbCallStatus )
+ self.gsm_network_iface.connect_to_signal( "Status", self.cbNetworkStatus )
+ self.gsm_network_iface.connect_to_signal('SignalStrength', self.cbSignalStrength)
+ if self.gsm_device_obj is None:
+ failcount += 1
+ else:
+ print "gsm ok", self.gsm_network_iface
+
+ self.device_obj = self.tryGetProxy( 'org.freesmartphone.odeviced', '/org/freesmartphone/Device' )
+ if ( self.device_obj is not None ) and ( self.device_iface is None ):
+ self.device_iface = Interface( self.device_obj, 'org.freesmartphone.Device' )
+
+ self.device_power_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.PowerSupply" )[0] )
+ self.device_power_iface = Interface(self.device_power_obj, 'org.freesmartphone.Device.PowerSupply')
+
+ self.idlenotifier_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.IdleNotifier" )[0] )
+
+ self.idlenotifier_iface = Interface( self.idlenotifier_obj, "org.freesmartphone.Device.IdleNotifier" )
+ self.idlenotifier_iface.connect_to_signal( "State", self.cbIdleStateChanged )
+
+ self.inputnotifier_obj = self.bus.get_object( "org.freesmartphone.odeviced", "/org/freesmartphone/Device/Input" )
+ self.inputnotifier_iface = Interface( self.inputnotifier_obj, "org.freesmartphone.Device.Input" )
+ self.inputnotifier_iface.connect_to_signal( "Event", self.cbEvent )
+
+ print "displays:", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.Display" )
+ self.display_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.Display" )[0] )
+ if self.display_obj is not None:
+ self.display_iface = Interface( self.display_obj, "org.freesmartphone.Device.Display" )
+ self.display_iface.SetBrightness( 90 )
+ if self.device_obj is None:
+ failcount += 1
+ else:
+ print "device ok", self.device_iface
+
+ print "failcount=", failcount
+ if failcount == 0:
+ self.fullinit = True
+ return self.fullinit
+
+ def cbResourceChanged( self, resourcename ):
+ for cb in self.onResourceChanged:
+ cb( resourcename=resourcename )
+
+ def cbCallStatus( self, id, status, properties ):
+ print "CALL STATUS=", id, status, properties
+ if self.idlenotifier_iface is not None:
+ self.idlenotifier_iface.SetState("BUSY")
+ for cb in self.onCallStatus:
+ cb( id=id, status=status, properties=properties )
+
+ def cbNetworkStatus(self, status):
+ for cb in self.onNetworkStatus:
+ cb( status=status )
+
+ def cbIdleStateChanged( self, state ):
+ print "IDLE STATE=", state
+ for cb in self.onIdleStateChanged:
+ cb( state=state )
+ if state == "BUSY":
+ if self.display_iface is not None:
+ self.display_iface.SetBrightness( 90 )
+ elif state == "IDLE_DIM":
+ if self.display_iface is not None:
+ self.display_iface.SetBrightness( 20 )
+ elif state == "IDLE_PRELOCK":
+ if self.display_iface is not None:
+ self.display_iface.SetBrightness( 0 )
+
+ def cbEvent( self, name, action, seconds ):
+ print "INPUT EVENT=", name, action, seconds
+ if name+action == "AUXpressed":
+ # FIXME launch transition to main screen
+ pass
+ elif name+action == "POWERpressed":
+ # FIXME should the kernel filter out the button event that triggerd the wakeup?
+ if self.ignoreSuspend: # we've just resumed
+ self.ignoreSuspend = False
+ return
+ self.ignoreSuspend = True
+ self.gsm_device_iface.PrepareForSuspend()
+ print "ENTERING SUSPEND"
+ os.system( "apm -s" )
+ print "RETURN FROM SUSPEND"
+ self.gsm_device_iface.RecoverFromSuspend()
+ self.idlenotifier_iface.SetState("BUSY")
+
+ def cbSignalStrength(self, strength):
+ for cb in self.onSignalStrength:
+ cb (strength = strength)
+ print 'SignalStrength:', strength
+
+
+def _test():
+ fso = FSOObject()
+ fso.initialize()
+
+
+if __name__ == '__main__':
+ _test()
Added: developers/john_lee/pyfso/general.py
===================================================================
--- developers/john_lee/pyfso/general.py (rev 0)
+++ developers/john_lee/pyfso/general.py 2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,39 @@
+from __future__ import with_statement
+from threading import RLock
+
+class Subject(object):
+ """implementation of the observer pattern. use _notify to notify all
+ observers about state change.
+ """
+ def __init__(self):
+ self._observers_lock = RLock()
+ self._observers = []
+
+ def attach(self, observer):
+ """attach as an observer
+ """
+ with self._observers_lock:
+ self._observers.append(observer)
+
+ def detach(self, observer):
+ """detach from observers
+ """
+ with self._observers_lock:
+ self._observers.remove(observer)
+
+ def _notify(self, *args, **kwargs):
+ with self._observers_lock:
+ for o in self._observers:
+ o.update(*args, **kwargs)
+
+
+class Singleton(object):
+ __single_lock = RLock()
+ __single = None
+ def __new__(cls, *args, **kwargs):
+ with cls.__single_lock:
+ if cls != type(cls.__single):
+ # subclass can create their own
+ cls.__single = object.__new__(cls, *args, **kwargs)
+ return cls.__single
+ #FIXME: the __init__ problem
More information about the commitlog
mailing list