r4537 - developers/john_lee/pyfso
john_lee at docs.openmoko.org
john_lee at docs.openmoko.org
Fri Jul 18 19:20:10 CEST 2008
Author: john_lee
Date: 2008-07-18 19:20:10 +0200 (Fri, 18 Jul 2008)
New Revision: 4537
Modified:
developers/john_lee/pyfso/__init__.py
developers/john_lee/pyfso/accelerometer.py
developers/john_lee/pyfso/dialer.py
developers/john_lee/pyfso/fso_backend.py
developers/john_lee/pyfso/general.py
Log:
* fso backend cope with new odeviced accelerometer change
* accelerometer now cope with fso backend
* put the responsibility of FSOObject initialization into user
* update singleton implementation. we don't use it anywhere for now.
* update tests.
Modified: developers/john_lee/pyfso/__init__.py
===================================================================
--- developers/john_lee/pyfso/__init__.py 2008-07-18 17:19:55 UTC (rev 4536)
+++ developers/john_lee/pyfso/__init__.py 2008-07-18 17:20:10 UTC (rev 4537)
@@ -1,3 +1,9 @@
+"""
+
+(C) 2008 John Lee <john_lee at openmoko.com>
+(C) 2008 Openmoko, Inc.
+GPLv2 or later
+"""
from accelerometer import Accelerometer, MockAccelerometer, \
InputDevAccelerometer, Gta02Accelerometer, get_xy_theta
Modified: developers/john_lee/pyfso/accelerometer.py
===================================================================
--- developers/john_lee/pyfso/accelerometer.py 2008-07-18 17:19:55 UTC (rev 4536)
+++ developers/john_lee/pyfso/accelerometer.py 2008-07-18 17:20:10 UTC (rev 4537)
@@ -1,229 +1,24 @@
"""
-Accelerometer class. It can run in two different modes. In pull
-mode, it accepts read request then retrieve accelerometer values. In
-daemon mode, it updates itself as often as possible, notifies the
-observers about value changes and answers request with its current
-state.
-
+Get accelerometer signals from dbus and parse them.
(C) 2008 John Lee <john_lee at openmoko.com>
(C) 2008 Openmoko, Inc.
GPLv2 or later
"""
-from __future__ import with_statement
-from general import Subject
-
+from fso_backend import FSOObject
+from general import Subject, DumpObserver
import math
-import os
-import struct
-from threading import RLock
-from time import sleep
-class Accelerometer(Subject):
- """The base class of all accelerometers.
- """
+class FSOAccelerometer(Subject):
+ def __init__(self, fso_obj):
+ super(FSOAccelerometer, self).__init__()
+ self.fso = fso_obj
+ self.fso.onAccelerometer.append(self.on_accelerometer)
- def __new__(cls, *args, **kwargs):
- cls.sample_rate = property(cls._get_sample_rate, cls._set_sample_rate)
- obj = object.__new__(cls, *args, **kwargs)
- return obj
+ def on_accelerometer(self, x, y, z):
+ self._notify(x, y, z)
- def __init__(self, device, sample_rate=None):
- """device: the name of the input
- sample_rate: how many samples per second
- """
- super(Accelerometer, self).__init__()
- self._statelock = RLock()
- self._state = (0, -1000, 0)
- self._open_device(device)
- self._daemonized = False
- if sample_rate is not None:
- self.sample_rate = sample_rate
- def _open_device(self, device):
- raise NotImplementedError
-
- def _get_sample_rate(self):
- """get sample rate
- """
- raise NotImplementedError
-
- def _set_sample_rate(self, sample_rate):
- """set sample rate
- """
- raise NotImplementedError
-
- def _retrieve(self):
- """read and parse the current values. could be time
- consuming.
- """
- raise NotImplementedError
-
- @property
- def state(self):
- """return the current state
- """
- if self._daemonized:
- with self._statelock:
- return self._state
- else:
- state = self._retrieve()
- with self._statelock:
- self._state = state
- return self._state
-
- def daemonize(self):
- """a infinite loop to update self.state. will notify
- observers about state changes.
- """
- self._daemonized = True
- while self._daemonized:
- state = self._retrieve()
- with self._statelock:
- self._state = state
- self._notify(self._state)
-
- def stop(self):
- """stop daemon
- """
- self._daemonized = False
-
-
-class MockAccelerometer(Accelerometer):
- """Mock accelerometer class.
- >>> g = MockAccelerometer()
- >>> g.sample_rate
- 100
- >>> g.sample_rate = 400
- >>> g.sample_rate
- 400
- """
-
- def __init__(self):
- super(MockAccelerometer, self).__init__(None)
- self.sample_rate = 100
-
- def _open_device(self, device):
- pass
-
- def _set_sample_rate(self, sample_rate):
- self._sample_rate = sample_rate
-
- def _get_sample_rate(self):
- return self._sample_rate
-
- def _retrieve(self):
- sleep(1/self._sample_rate)
- return (0, -1000, 0)
-
-
-class InputDevAccelerometer(Accelerometer):
- """Read values from kernel input device
- """
-
- # Event types
- EV_SYN = 0x00
- EV_KEY = 0x01
- EV_REL = 0x02
- EV_ABS = 0x03
- EV_MSC = 0x04
- EV_SW = 0x05
- EV_LED = 0x11
- EV_SND = 0x12
- EV_REP = 0x14
- EV_FF = 0x15
- EV_PWR = 0x16
- EV_FF = 0x17
- EV_MAX = 0x1f
- EV_CNT = (EV_MAX+1)
-
- # Relative axes
- REL_X = 0x00
- REL_Y = 0x01
- REL_Z = 0x02
- REL_RX = 0x03
- REL_RY = 0x04
- REL_RZ = 0x05
- REL_HWHEEL = 0x06
- REL_DIAL = 0x07
- REL_WHEEL = 0x08
- REL_MISC = 0x09
- REL_MAX = 0x0f
- REL_CNT = REL_MAX + 1
-
- input_event_struct = "@llHHi"
- input_event_size = struct.calcsize(input_event_struct)
-
- def __init__(self, *args, **kwargs):
- super(InputDevAccelerometer, self).__init__(*args, **kwargs)
-
- def _open_device(self, device):
- self.device = os.open(device, os.O_RDONLY | os.O_SYNC)
-
- def _unpack(self):
- """struct input_event {
- struct timeval time; /* (long, long) */
- __u16 type;
- __u16 code;
- __s32 value;
- };
- return (tv_sec, tv_usec, type, code, value)
- """
- for i in xrange(0, 5):
- data = os.read(self.device, InputDevAccelerometer.input_event_size)
- if len(data) >= InputDevAccelerometer.input_event_size:
- break;
- else:
- raise Exception()
-
- return struct.unpack(InputDevAccelerometer.input_event_struct, data)
-
- def _unpack_xyz(self):
- """return a 3 tuple
- """
- # wait for EV_SYN
- while self._unpack()[2] != InputDevAccelerometer.EV_SYN:
- pass
- # now return (x, y, z)
- return (self._unpack()[4], self._unpack()[4], self._unpack()[4])
-
-
-class Gta02Accelerometer(InputDevAccelerometer):
- """Read values from gta02. for now we use just one.
- >>> g = Gta02Accelerometer()
- >>> g.sample_rate = 400
- >>> g.sample_rate
- 400
- >>> g.sample_rate = 100
- >>> g.sample_rate
- 100
- """
-
- INPUT_DEV = '/dev/input/event3'
- SYS_SAMPLE_RATE = '/sys/devices/platform/spi_s3c24xx_gpio.1/spi0.1/sample_rate'
-
- def __init__(self, device=None, sample_rate=None):
- if device == None:
- device = Gta02Accelerometer.INPUT_DEV
- super(Gta02Accelerometer, self).__init__(device, sample_rate)
-
- def _set_sample_rate(self, sample_rate):
- """possible values: 100, 400
- """
- f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'w', 0)
- f.write('%d\n' % sample_rate)
- f.close()
-
- def _get_sample_rate(self):
- f = open(Gta02Accelerometer.SYS_SAMPLE_RATE, 'r', 0)
- sample_rate = int(f.read())
- f.close()
- return sample_rate
-
- def _retrieve(self):
- return self._unpack_xyz()
-
-
-# shamelessly stole from olv
+# shamelessly stoled from olv
def get_xy_theta(u):
"""get the angle related to (0, -1), clockwise.
return 0 < theta < 2pi.
@@ -237,30 +32,6 @@
return theta
-class DumpObserver(object):
- """Does nothing but print the subject state when notified.
- """
-
- def __init__(self, subject=None):
- if subject == None:
- subject = MockAccelerometer()
- subject.attach(self)
- self.subject = subject
-
- def update(self, state):
- t = get_xy_theta(state)
- print 'x = %d, y = %d, z = %d, theta = %f pi' % (state + (t / math.pi, ))
-
-
-def dump():
- try:
- gsensor = Gta02Accelerometer(sample_rate=400)
- except OSError:
- gsensor = MockAccelerometer()
- DumpObserver(gsensor)
- gsensor.daemonize()
-
-
def _doctest():
try:
import doctest
@@ -270,5 +41,19 @@
doctest.testmod()
+def _test():
+ from dbus.mainloop.glib import DBusGMainLoop
+ fso = FSOObject()
+ fso.initialize(DBusGMainLoop())
+ accelerometer = FSOAccelerometer(fso)
+ DumpObserver(accelerometer)
+ import gobject
+ try:
+ gobject.MainLoop().run()
+ except KeyboardInterrupt:
+ return
+
+
if __name__ == '__main__':
_doctest()
+ _test()
Modified: developers/john_lee/pyfso/dialer.py
===================================================================
--- developers/john_lee/pyfso/dialer.py 2008-07-18 17:19:55 UTC (rev 4536)
+++ developers/john_lee/pyfso/dialer.py 2008-07-18 17:20:10 UTC (rev 4537)
@@ -1,3 +1,9 @@
+"""
+
+(C) 2008 John Lee <john_lee at openmoko.com>
+(C) 2008 Openmoko, Inc.
+GPLv2 or later
+"""
from general import Subject
from fso_backend import FSOObject
from dbus import DBusException
@@ -64,38 +70,27 @@
return decorator
-def need_initialized(real_f):
- def decorator(self, *args, **kwargs):
- if self.initialized:
- return real_f(self, *args, **kwargs)
- else:
- return False
- return decorator
-
-
class FSODialer(Dialer):
"""A Dialer implementation based on fso dbus API.
"""
- def __init__(self, mainloop):
+ def __init__(self, fso_obj):
super(FSODialer, self).__init__()
self.callid = None
self.initialized = False
self.registered = False
- self.fso = FSOObject()
+ self.fso = fso_obj
+ self._set_state('release')
from threading import Thread
- # put the slow init function into background
- Thread(target=self._init, args=(mainloop, )).start()
- # FIXME: get state from fso and set self state
- self._set_state('release')
+ Thread(target=self._init).run()
- def _init(self, mainloop):
- if not self.fso.initialize(mainloop):
- return
+ def _init(self):
+ if not self.fso.fullinit:
+ self.fso.initialize()
self.fso.onCallStatus.append(self.on_call_status)
- self.initialized = True
if not self._register():
return
self.registered = True
+ # FIXME: get state from fso and set self state
return True
def _register(self):
@@ -131,8 +126,14 @@
def _test():
from dbus.mainloop.glib import DBusGMainLoop
- FSODialer(DBusGMainLoop())
+ fso = FSOObject()
+ fso.initialize(DBusGMainLoop())
+ dialer = FSODialer(fso)
+ import gobject
+ try:
+ gobject.MainLoop().run()
+ except KeyboardInterrupt:
+ return
-
if __name__ == "__main__":
_test()
Modified: developers/john_lee/pyfso/fso_backend.py
===================================================================
--- developers/john_lee/pyfso/fso_backend.py 2008-07-18 17:19:55 UTC (rev 4536)
+++ developers/john_lee/pyfso/fso_backend.py 2008-07-18 17:20:10 UTC (rev 4537)
@@ -1,3 +1,9 @@
+"""
+
+(C) 2008 John Lee <john_lee at openmoko.com>
+(C) 2008 Openmoko, Inc.
+GPLv2 or later
+"""
import os
from dbus import DBusException, SystemBus, Interface
from general import Singleton
@@ -2,7 +8,6 @@
-class FSOObject(Singleton):
- """ based on the code from zhone but it's a singleton class now.
+class FSOObject(object):
+ """Based on the code from zhone.
"""
-
- def __init__( self ):
+ def __init__(self):
self.objects = {}
@@ -13,6 +18,7 @@
self.onNetworkStatus = []
self.onIdleStateChanged = []
self.onSignalStrength = []
+ self.onAccelerometer = []
self.ignoreSuspend = False
self.framework_obj = None
@@ -43,7 +49,7 @@
self.objects[ "%s:%s" % ( busname, objname ) ] = obj
return obj
- def initialize( self, mainloop ):
+ def initialize(self, mainloop):
if self.fullinit:
return True
try:
@@ -112,6 +118,11 @@
if self.display_obj is not None:
self.display_iface = Interface( self.display_obj, "org.freesmartphone.Device.Display" )
self.display_iface.SetBrightness( 90 )
+
+ self.accelerometer_obj = self.bus.get_object("org.freesmartphone.odeviced", "/org/freesmartphone/Device/Accelerometer")
+ self.accelerometer_iface = Interface(self.accelerometer_obj, "org.freesmartphone.Device.Accelerometer")
+ self.accelerometer_iface.connect_to_signal("Event", self.cbAccelerometer)
+
if self.device_obj is None:
failcount += 1
else:
@@ -174,11 +185,21 @@
cb (strength = strength)
print 'SignalStrength:', strength
+ def cbAccelerometer(self, x, y, z):
+ for cb in self.onAccelerometer:
+ cb(x, y, z)
+
def _test():
from dbus.mainloop.glib import DBusGMainLoop
+ import sys
fso = FSOObject()
fso.initialize(DBusGMainLoop())
+ import gobject
+ try:
+ gobject.MainLoop().run()
+ except KeyboardInterrupt:
+ return
if __name__ == '__main__':
Modified: developers/john_lee/pyfso/general.py
===================================================================
--- developers/john_lee/pyfso/general.py 2008-07-18 17:19:55 UTC (rev 4536)
+++ developers/john_lee/pyfso/general.py 2008-07-18 17:20:10 UTC (rev 4537)
@@ -1,3 +1,9 @@
+"""
+
+(C) 2008 John Lee <john_lee at openmoko.com>
+(C) 2008 Openmoko, Inc.
+GPLv2 or later
+"""
from __future__ import with_statement
from threading import RLock
@@ -28,12 +34,39 @@
class Singleton(object):
- __single_lock = RLock()
+ """the implementation of the singleton class in python.
+ >>> class A(Singleton):
+ ... def __init__(self):
+ ... print 'init'
+ ...
+ >>> a1 = A.instance()
+ init
+ >>> a2 = A.instance()
+ """
__single = None
- def __new__(cls, *args, **kwargs):
- with cls.__single_lock:
- if cls != type(cls.__single):
- # subclass can create their own
- cls.__single = object.__new__(cls, *args, **kwargs)
- return cls.__single
- #FIXME: the __init__ problem
+
+ @classmethod
+ def instance(cls, *args, **kwargs):
+ if cls != type(cls.__single):
+ cls.__single = cls(*args, **kwargs)
+ return cls.__single
+
+
+class DumpObserver(object):
+ def __init__(self, subject):
+ subject.attach(self)
+
+ def update(self, *args, **kwargs):
+ print args, kwargs
+
+
+def _doctest():
+ try:
+ import doctest
+ except ImportError:
+ return
+ doctest.testmod()
+
+
+if __name__ == '__main__':
+ _doctest()
More information about the commitlog
mailing list