r4527 - developers/john_lee/pyfso

john_lee at docs.openmoko.org john_lee at docs.openmoko.org
Tue Jul 15 16:03:33 CEST 2008


Author: john_lee
Date: 2008-07-15 16:03:32 +0200 (Tue, 15 Jul 2008)
New Revision: 4527

Added:
   developers/john_lee/pyfso/__init__.py
   developers/john_lee/pyfso/accelerometer.py
   developers/john_lee/pyfso/dialer.py
   developers/john_lee/pyfso/fso_backend.py
   developers/john_lee/pyfso/general.py
Log:
stuffs abstracted from another project trying to create a python
binding for fso dbus api.

* accelerometer.py  needs to be integrated into frameworkd

* dialer.py  a simple dialer implementation that depends on
  fso_backend.py

* fso_backend.py   a dbus wrapper.  the biggest problem is that it
  uses main loop from e_dbus.

* general.py some general base classes.  Singleton pattern still got
  bug.

Added: developers/john_lee/pyfso/__init__.py
===================================================================
--- developers/john_lee/pyfso/__init__.py	                        (rev 0)
+++ developers/john_lee/pyfso/__init__.py	2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,4 @@
+from accelerometer import Accelerometer, MockAccelerometer, \
+    InputDevAccelerometer, Gta02Accelerometer, get_xy_theta
+
+from dialer import Dialer, MockDialer, FSODialer

Added: developers/john_lee/pyfso/accelerometer.py
===================================================================
--- developers/john_lee/pyfso/accelerometer.py	                        (rev 0)
+++ developers/john_lee/pyfso/accelerometer.py	2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,274 @@
+"""
+Accelerometer class.  It can run in two different modes.  In pull
+mode, it accepts read request then retrieve accelerometer values.  In
+daemon mode, it updates itself as often as possible, notifies the
+observers about value changes and answers request with its current
+state.
+
+(C) 2008 John Lee <john_lee at openmoko.com>
+(C) 2008 Openmoko, Inc.
+GPLv2 or later
+"""
+from __future__ import with_statement
+from general import Subject
+
+import math
+import os
+import struct
+from threading import RLock
+from time import sleep
+
+class Accelerometer(Subject):
+    """The base class of all accelerometers.
+    """
+
+    def __new__(cls, *args, **kwargs):
+        cls.sample_rate = property(cls._get_sample_rate, cls._set_sample_rate)
+        obj = object.__new__(cls, *args, **kwargs)
+        return obj
+
+    def __init__(self, device, sample_rate=None):
+        """device: the name of the input
+        sample_rate: how many samples per second
+        """
+        super(Accelerometer, self).__init__()
+        self._statelock = RLock()
+        self._state = (0, -1000, 0)
+        self._open_device(device)
+        self._daemonized = False
+        if sample_rate is not None:
+            self.sample_rate = sample_rate
+
+    def _open_device(self, device):
+        raise NotImplementedError
+
+    def _get_sample_rate(self):
+        """get sample rate
+        """
+        raise NotImplementedError
+
+    def _set_sample_rate(self, sample_rate):
+        """set sample rate
+        """
+        raise NotImplementedError
+
+    def _retrieve(self):
+        """read and parse the current values.  could be time
+        consuming.
+        """
+        raise NotImplementedError
+
+    @property
+    def state(self):
+        """return the current state
+        """
+        if self._daemonized:
+            with self._statelock:
+                return self._state
+        else:
+            state = self._retrieve()
+            with self._statelock:
+                self._state = state
+                return self._state
+
+    def daemonize(self):
+        """a infinite loop to update self.state.  will notify
+        observers about state changes.
+        """
+        self._daemonized = True
+        while self._daemonized:
+            state = self._retrieve()
+            with self._statelock:
+                self._state = state
+                self._notify(self._state)
+
+    def stop(self):
+        """stop daemon
+        """
+        self._daemonized = False
+
+
+class MockAccelerometer(Accelerometer):
+    """Mock accelerometer class.
+    >>> g = MockAccelerometer()
+    >>> g.sample_rate
+    100
+    >>> g.sample_rate = 400
+    >>> g.sample_rate
+    400
+    """
+
+    def __init__(self):
+        super(MockAccelerometer, self).__init__(None)
+        self.sample_rate = 100
+
+    def _open_device(self, device):
+        pass
+
+    def _set_sample_rate(self, sample_rate):
+        self._sample_rate = sample_rate
+
+    def _get_sample_rate(self):
+        return self._sample_rate
+
+    def _retrieve(self):
+        sleep(1/self._sample_rate)
+        return (0, -1000, 0)
+
+
+class InputDevAccelerometer(Accelerometer):
+    """Read values from kernel input device
+    """
+
+    # Event types
+    EV_SYN = 0x00
+    EV_KEY = 0x01
+    EV_REL = 0x02
+    EV_ABS = 0x03
+    EV_MSC = 0x04
+    EV_SW = 0x05
+    EV_LED = 0x11
+    EV_SND = 0x12
+    EV_REP = 0x14
+    EV_FF = 0x15
+    EV_PWR = 0x16
+    EV_FF = 0x17
+    EV_MAX = 0x1f
+    EV_CNT = (EV_MAX+1)
+
+    # Relative axes
+    REL_X = 0x00
+    REL_Y = 0x01
+    REL_Z = 0x02
+    REL_RX = 0x03
+    REL_RY = 0x04
+    REL_RZ = 0x05
+    REL_HWHEEL = 0x06
+    REL_DIAL = 0x07
+    REL_WHEEL = 0x08
+    REL_MISC = 0x09
+    REL_MAX = 0x0f
+    REL_CNT = REL_MAX + 1
+
+    input_event_struct = "@llHHi"
+    input_event_size = struct.calcsize(input_event_struct)
+
+    def __init__(self, *args, **kwargs):
+        super(InputDevAccelerometer, self).__init__(*args, **kwargs)
+
+    def _open_device(self, device):
+        self.device = os.open(device, os.O_RDONLY | os.O_SYNC)
+
+    def _unpack(self):
+        """struct input_event {
+	struct timeval time; /* (long, long) */
+	__u16 type;
+	__u16 code;
+	__s32 value;
+        };
+        return (tv_sec, tv_usec, type, code, value)
+        """
+        for i in xrange(0, 5):
+            data = os.read(self.device, InputDevAccelerometer.input_event_size)
+            if len(data) >= InputDevAccelerometer.input_event_size:
+                break;
+        else:
+            raise Exception()
+
+        return struct.unpack(InputDevAccelerometer.input_event_struct, data)
+
+    def _unpack_xyz(self):
+        """return a 3 tuple
+        """
+        # wait for EV_SYN
+        while self._unpack()[2] != InputDevAccelerometer.EV_SYN:
+            pass
+        # now return (x, y, z)
+        return (self._unpack()[4], self._unpack()[4], self._unpack()[4])
+
+
+class Gta02Accelerometer(InputDevAccelerometer):
+    """Read values from gta02.  for now we use just one.
+    >>> g = Gta02Accelerometer()
+    >>> g.sample_rate = 400
+    >>> g.sample_rate
+    400
+    >>> g.sample_rate = 100
+    >>> g.sample_rate
+    100
+    """
+
+    INPUT_DEV = '/dev/input/event3'
+    SYS_SAMPLE_RATE = '/sys/devices/platform/spi_s3c24xx_gpio.1/spi0.1/sample_rate'
+
+    def __init__(self, device=None, sample_rate=None):
+        if device == None:
+            device = Gta02Accelerometer.INPUT_DEV
+        super(Gta02Accelerometer, self).__init__(device, sample_rate)
+
+    def _set_sample_rate(self, sample_rate):
+        """possible values: 100, 400
+        """
+        f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'w', 0)
+        f.write('%d\n' % sample_rate)
+        f.close()
+
+    def _get_sample_rate(self):
+        f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'r', 0)
+        sample_rate = int(f.read())
+        f.close()
+        return sample_rate
+
+    def _retrieve(self):
+        return self._unpack_xyz()
+
+
+# shamelessly stole from olv
+def get_xy_theta(u):
+    """get the angle related to (0, -1), clockwise.
+    return 0 < theta < 2pi.
+    >>> get_xy_theta((0, -1000, 0))
+    0.0
+    """
+    uu = u[0] * u[0] + u[1] * u[1]
+    theta = math.acos(-u[1] / math.sqrt(uu))
+    if (u[0] < 0):
+        theta = 2 * math.pi - theta
+    return theta
+
+
+class DumpObserver(object):
+    """Does nothing but print the subject state when notified.
+    """
+
+    def __init__(self, subject=None):
+        if subject == None:
+            subject = MockAccelerometer()
+        subject.attach(self)
+        self.subject = subject
+
+    def update(self, state):
+        t = get_xy_theta(state)
+        print 'x = %d, y = %d, z = %d, theta = %f pi' % (state + (t / math.pi, ))
+
+
+def dump():
+    try:
+        gsensor = Gta02Accelerometer(sample_rate=400)
+    except OSError:
+        gsensor = MockAccelerometer()
+    DumpObserver(gsensor)
+    gsensor.daemonize()
+
+
+def _doctest():
+    try:
+        import doctest
+    except ImportError:
+        return
+    else:
+        doctest.testmod()
+
+
+if __name__ == '__main__':
+    _doctest()

Added: developers/john_lee/pyfso/dialer.py
===================================================================
--- developers/john_lee/pyfso/dialer.py	                        (rev 0)
+++ developers/john_lee/pyfso/dialer.py	2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,139 @@
+from general import Subject
+from fso_backend import FSOObject
+from dbus import DBusException
+from threading import Timer
+
+class Dialer(Subject):
+    """dial, pickup and hangup should be called by controller.  state
+    change like 'incoming', 'active', 'outgoing' and 'release' should
+    be set directly to dialer.state
+    """
+    def __init__(self):
+        Subject.__init__(self)
+
+    def dial(self, number):
+        raise NotImplementedError
+
+    def pickup(self):
+        raise NotImplementedError
+
+    def hangup(self):
+        raise NotImplementedError
+
+    # make this read only to hint this should not be changed from
+    # outside.
+    @property
+    def state(self):
+        return self._state
+
+    def _set_state(self, state):
+        self._state = state
+        self._notify(state)
+
+
+class MockDialer(Dialer):
+    def __init__(self):
+        """initiate an incoming call first
+        """
+        super(MockDialer, self).__init__()
+        self._set_state('incoming')
+
+    def dial(self, number):
+        print 'mock dialer dial:', number
+        Timer(1, self._set_state, args=('outgoing', )).start()
+
+    def pickup(self):
+        print 'mock dialer pickup'
+        Timer(1, self._set_state, args=('active', )).start()
+
+    def hangup(self):
+        print 'mock dialer hangup'
+        Timer(1, self._set_state, args=('release', )).start()
+
+    def _set_state(self, state):
+        print 'mock dialer', state
+        super(MockDialer, self)._set_state(state)
+
+
+def need_registered(real_f):
+    def decorator(self, *args, **kwargs):
+        if self.registered:
+            return real_f(self, *args, **kwargs)
+        else:
+            return False
+    return decorator
+
+
+def need_initialized(real_f):
+    def decorator(self, *args, **kwargs):
+        if self.initialized:
+            return real_f(self, *args, **kwargs)
+        else:
+            return False
+    return decorator
+
+
+class FSODialer(Dialer):
+    """A Dialer implementation based on fso dbus API.
+    """
+    def __init__(self):
+        super(FSODialer, self).__init__()
+        self.callid = None
+        self.initialized = False
+        self.registered = False
+        self.fso = FSOObject()
+        from threading import Thread
+        # put the slow init function into background
+        Thread(target=self._init).start()
+        # FIXME: get state from fso and set self state
+        self._set_state('release')
+
+    def _init(self):
+        if not self.fso.initialize():
+            return
+        self.fso.onCallStatus.append(self.on_call_status)
+        self.initialized = True
+        if not self._register():
+            return
+        self.registered = True
+        return True
+
+    def _register(self):
+        if self.fso.gsm_network_iface.GetStatus()['registration'] == 'unregistered':
+            try:
+                self.fso.gsm_device_iface.SetAntennaPower(True)
+                self.fso.gsm_network_iface.Register()
+                print 'registered'
+            except DBusException, e:
+                # FIXME pin number?
+                print e
+                self.registered = False
+                return False
+        return True
+
+    @need_registered
+    def dial(self, number):
+        self.fso.gsm_call_iface.Initiate(number, "voice")
+
+    @need_registered
+    def hangup(self):
+        self.fso.gsm_call_iface.Release(self.callid)
+
+    @need_registered
+    def pickup(self):
+        self.fso.gsm_call_iface.Activate(self.callid)
+
+    @need_registered
+    def on_call_status(self, id, status, properties):
+        self._set_state(status)
+        self.callid = id
+
+
+def _test():
+    import ecore
+    FSODialer()
+    ecore.main_loop_begin()
+
+
+if __name__ == "__main__":
+    _test()

Added: developers/john_lee/pyfso/fso_backend.py
===================================================================
--- developers/john_lee/pyfso/fso_backend.py	                        (rev 0)
+++ developers/john_lee/pyfso/fso_backend.py	2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,185 @@
+import e_dbus
+import os
+from dbus import DBusException, SystemBus, Interface
+from general import Singleton
+
+class FSOObject(Singleton):
+    """ based on the code from zhone but it's a singleton class now.
+    """
+
+    def __init__( self ):
+        self.objects = {}
+        self.onResourceChanged = []
+        self.onCallStatus = []
+        self.onNetworkStatus = []
+        self.onIdleStateChanged = []
+        self.onSignalStrength = []
+        self.ignoreSuspend = False
+
+        self.framework_obj = None
+        self.gsm_device_obj = None
+        self.gsm_device_iface = None
+        self.usage_iface = None
+        self.device_iface = None
+        self.device_power_iface = None
+        self.idlenotifier_obj = None
+        self.idlenotifier_iface = None
+        self.inputnotifier_obj = None
+        self.inputnotifier_iface = None
+        self.display_obj = None
+        self.display_iface = None
+
+        self.fullinit = False
+
+    def tryGetProxy( self, busname, objname ):
+        obj = None
+        try:
+            obj = self.objects[ "%s:%s" % ( busname, objname ) ]
+        except KeyError:
+            try:
+                obj = self.bus.get_object( busname, objname )
+            except DBusException, e:
+                print "could not create proxy for %s:%s" % ( busname, objname ), e
+            else:
+                self.objects[ "%s:%s" % ( busname, objname ) ] = obj
+        return obj
+
+    def initialize( self ):
+        if self.fullinit:
+            return True
+        try:
+            self.bus = SystemBus( mainloop=e_dbus.DBusEcoreMainLoop() )
+        except DBusException, e:
+            print "could not connect to dbus_object system bus:", e
+            return False
+
+        # Framework
+        fw_obj = self.tryGetProxy( 'org.freesmartphone.frameworkd', '/org/freesmartphone/Framework' )
+        if fw_obj is None:
+            print ( "could not connect to org.freesmartphone.frameworkd -- is the framework daemon started?" )
+            return False
+        else:
+            self.fw = Interface( fw_obj, "org.freesmartphone.Objects" )
+        failcount = 0
+
+        # Usage
+        self.usage_obj = self.tryGetProxy( 'org.freesmartphone.ousaged', '/org/freesmartphone/Usage' )
+        if ( self.usage_obj is not None ) and ( self.usage_iface is None ):
+            self.usage_iface = Interface(self.usage_obj, 'org.freesmartphone.Usage')
+            self.usage_iface.connect_to_signal( "ResourceChanged", self.cbResourceChanged )
+            self.usage_iface.RequestResource("GSM")
+        if self.usage_obj is None:
+            failcount += 1
+        else:
+            print "usage ok", self.usage_iface
+
+        # Phone
+        self.gsm_device_obj = self.tryGetProxy( 'org.freesmartphone.ogpsd', '/org/freesmartphone/GSM/Device' )
+        print self.gsm_device_obj
+
+        if ( self.gsm_device_obj is not None ) and ( self.gsm_device_iface is None ):
+            print "creating gsm interfaces"
+            self.gsm_device_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Device')
+            self.gsm_sim_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.SIM')
+            self.gsm_network_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Network')
+            self.gsm_call_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Call')
+            self.gsm_test_iface = Interface(self.gsm_device_obj, 'org.freesmartphone.GSM.Test')
+            self.gsm_call_iface.connect_to_signal( "CallStatus", self.cbCallStatus )
+            self.gsm_network_iface.connect_to_signal( "Status", self.cbNetworkStatus )
+            self.gsm_network_iface.connect_to_signal('SignalStrength', self.cbSignalStrength)
+        if self.gsm_device_obj is None:
+            failcount += 1
+        else:
+            print "gsm ok", self.gsm_network_iface
+
+        self.device_obj = self.tryGetProxy( 'org.freesmartphone.odeviced', '/org/freesmartphone/Device' )
+        if ( self.device_obj is not None ) and ( self.device_iface is None ):
+            self.device_iface = Interface( self.device_obj, 'org.freesmartphone.Device' )
+
+            self.device_power_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.PowerSupply" )[0] )
+            self.device_power_iface = Interface(self.device_power_obj, 'org.freesmartphone.Device.PowerSupply')
+
+            self.idlenotifier_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.IdleNotifier" )[0] )
+
+            self.idlenotifier_iface = Interface( self.idlenotifier_obj, "org.freesmartphone.Device.IdleNotifier" )
+            self.idlenotifier_iface.connect_to_signal( "State", self.cbIdleStateChanged )
+
+            self.inputnotifier_obj = self.bus.get_object( "org.freesmartphone.odeviced", "/org/freesmartphone/Device/Input" )
+            self.inputnotifier_iface = Interface( self.inputnotifier_obj, "org.freesmartphone.Device.Input" )
+            self.inputnotifier_iface.connect_to_signal( "Event", self.cbEvent )
+
+            print "displays:", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.Display" )
+            self.display_obj = self.tryGetProxy( "org.freesmartphone.odeviced", self.fw.ListObjectsByInterface( "org.freesmartphone.Device.Display" )[0] )
+            if self.display_obj is not None:
+                self.display_iface = Interface( self.display_obj, "org.freesmartphone.Device.Display" )
+                self.display_iface.SetBrightness( 90 )
+        if self.device_obj is None:
+            failcount += 1
+        else:
+            print "device ok", self.device_iface
+
+        print "failcount=", failcount
+        if failcount == 0:
+            self.fullinit = True
+        return self.fullinit
+
+    def cbResourceChanged( self, resourcename ):
+        for cb in self.onResourceChanged:
+            cb( resourcename=resourcename )
+
+    def cbCallStatus( self, id, status, properties ):
+        print "CALL STATUS=", id, status, properties
+        if self.idlenotifier_iface is not None:
+            self.idlenotifier_iface.SetState("BUSY")
+        for cb in self.onCallStatus:
+            cb( id=id, status=status, properties=properties )
+
+    def cbNetworkStatus(self, status):
+        for cb in self.onNetworkStatus:
+            cb( status=status )
+
+    def cbIdleStateChanged( self, state ):
+        print "IDLE STATE=", state
+        for cb in self.onIdleStateChanged:
+            cb( state=state )
+        if state == "BUSY":
+            if self.display_iface is not None:
+                self.display_iface.SetBrightness( 90 )
+        elif state == "IDLE_DIM":
+            if self.display_iface is not None:
+                self.display_iface.SetBrightness( 20 )
+        elif state == "IDLE_PRELOCK":
+            if self.display_iface is not None:
+                self.display_iface.SetBrightness( 0 )
+
+    def cbEvent( self, name, action, seconds ):
+        print "INPUT EVENT=", name, action, seconds
+        if name+action == "AUXpressed":
+            # FIXME launch transition to main screen
+            pass
+        elif name+action == "POWERpressed":
+            # FIXME should the kernel filter out the button event that triggerd the wakeup?
+            if self.ignoreSuspend: # we've just resumed
+                self.ignoreSuspend = False
+                return
+            self.ignoreSuspend = True
+            self.gsm_device_iface.PrepareForSuspend()
+            print "ENTERING SUSPEND"
+            os.system( "apm -s" )
+            print "RETURN FROM SUSPEND"
+            self.gsm_device_iface.RecoverFromSuspend()
+            self.idlenotifier_iface.SetState("BUSY")
+
+    def cbSignalStrength(self, strength):
+        for cb in self.onSignalStrength:
+            cb (strength = strength)
+        print 'SignalStrength:', strength
+
+
+def _test():
+    fso = FSOObject()
+    fso.initialize()
+
+
+if __name__ == '__main__':
+    _test()

Added: developers/john_lee/pyfso/general.py
===================================================================
--- developers/john_lee/pyfso/general.py	                        (rev 0)
+++ developers/john_lee/pyfso/general.py	2008-07-15 14:03:32 UTC (rev 4527)
@@ -0,0 +1,39 @@
+from __future__ import with_statement
+from threading import RLock
+
+class Subject(object):
+    """implementation of the observer pattern.  use _notify to notify all
+    observers about state change.
+    """
+    def __init__(self):
+        self._observers_lock = RLock()
+        self._observers = []
+
+    def attach(self, observer):
+        """attach as an observer
+        """
+        with self._observers_lock:
+            self._observers.append(observer)
+
+    def detach(self, observer):
+        """detach from observers
+        """
+        with self._observers_lock:
+            self._observers.remove(observer)
+
+    def _notify(self, *args, **kwargs):
+        with self._observers_lock:
+            for o in self._observers:
+                o.update(*args, **kwargs)
+
+
+class Singleton(object):
+    __single_lock = RLock()
+    __single = None
+    def __new__(cls, *args, **kwargs):
+        with cls.__single_lock:
+            if cls != type(cls.__single):
+                # subclass can create their own
+                cls.__single = object.__new__(cls, *args, **kwargs)
+            return cls.__single
+    #FIXME: the __init__ problem





More information about the commitlog mailing list