1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import random
19
20 from twisted import version
21 from twisted.internet import defer, reactor
22 from twisted.python import reflect
23
24
25 from flumotion.common import errors, common
26
27 __version__ = "$Rev$"
28
29
30
31
32
34
35 def wrapper(*args, **kwargs):
36 gen = proc(*args, **kwargs)
37 result = defer.Deferred()
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 result.__callbacks = result.callbacks
57
58 def with_saved_callbacks(proc, *_args, **_kwargs):
59 saved_callbacks, saved_called = result.callbacks, result.called
60 result.callbacks, result.called = result.__callbacks, False
61 proc(*_args, **_kwargs)
62 result.callbacks, result.called = saved_callbacks, saved_called
63
64
65
66 def default_errback(failure, d):
67
68
69 if failure.check(errors.HandledException):
70 return failure
71
72 def print_traceback(f):
73 import traceback
74 print 'flumotion.twisted.defer: ' + \
75 'Unhandled error calling', proc.__name__, ':', f.type
76 traceback.print_exc()
77 with_saved_callbacks(lambda: d.addErrback(print_traceback))
78 raise
79 result.addErrback(default_errback, result)
80
81 def generator_next():
82 try:
83 x = gen.next()
84 if isinstance(x, defer.Deferred):
85 x.addCallback(callback, x).addErrback(errback, x)
86 else:
87 result.callback(x)
88 except StopIteration:
89 result.callback(None)
90 except Exception, e:
91 result.errback(e)
92
93 def errback(failure, d):
94
95 def raise_error():
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 if common.versionStringToTuple(version.short()) >= (11, 1, 0):
111 k, v = failure.parents[0], failure.value
112 else:
113 k, v = failure.parents[-1], failure.value
114 try:
115 if isinstance(k, str):
116 k = reflect.namedClass(k)
117 if isinstance(v, tuple):
118 e = k(*v)
119 else:
120 e = k(v)
121 except Exception:
122 e = Exception('%s: %r' % (failure.type, v))
123 raise e
124 d.value = raise_error
125 generator_next()
126
127 def callback(result, d):
128 d.value = lambda: result
129 generator_next()
130
131 generator_next()
132
133 return result
134
135 return wrapper
136
137
139 return lambda self, *args, **kwargs: \
140 defer_generator(proc)(self, *args, **kwargs)
141
142
144 """
145 Return a deferred which will fire from a callLater after d fires
146 """
147
148 def fire(result, d):
149 reactor.callLater(0, d.callback, result)
150 res = defer.Deferred()
151 deferred.addCallback(fire, res)
152 return res
153
154
156 """
157 I am a helper class to make sure that the deferred is fired only once
158 with either a result or exception.
159
160 @ivar d: the deferred that gets fired as part of the resolution
161 @type d: L{twisted.internet.defer.Deferred}
162 """
163
165 self.d = defer.Deferred()
166 self.fired = False
167
169 """
170 Clean up any resources related to the resolution.
171 Subclasses can implement me.
172 """
173 pass
174
176 """
177 Make the result succeed, triggering the callbacks with
178 the given result. If a result was already reached, do nothing.
179 """
180 if not self.fired:
181 self.fired = True
182 self.cleanup()
183 self.d.callback(result)
184
186 """
187 Make the result fail, triggering the errbacks with the given exception.
188 If a result was already reached, do nothing.
189 """
190 if not self.fired:
191 self.fired = True
192 self.cleanup()
193 self.d.errback(exception)
194
195
197 """
198 Provides a mechanism to attempt to run some deferred operation until it
199 succeeds. On failure, the operation is tried again later, exponentially
200 backing off.
201 """
202 maxDelay = 1800
203 initialDelay = 5.0
204
205 factor = 2.7182818284590451
206 jitter = 0.11962656492
207 delay = None
208
209 - def __init__(self, deferredCreate, *args, **kwargs):
210 """
211 Create a new RetryingDeferred. Will call
212 deferredCreate(*args, **kwargs) each time a new deferred is needed.
213 """
214 self._create = deferredCreate
215 self._args = args
216 self._kwargs = kwargs
217
218 self._masterD = None
219 self._running = False
220 self._callId = None
221
223 """
224 Start trying. Returns a deferred that will fire when this operation
225 eventually succeeds. That deferred will only errback if this
226 RetryingDeferred is cancelled (it will then errback with the result of
227 the next attempt if one is in progress, or a CancelledError.
228 # TODO: yeah?
229 """
230 self._masterD = defer.Deferred()
231 self._running = True
232 self.delay = None
233
234 self._retry()
235
236 return self._masterD
237
246
248 self._callId = None
249 d = self._create(*self._args, **self._kwargs)
250 d.addCallbacks(self._success, self._failed)
251
253
254 self._masterD.callback(val)
255 self._masterD = None
256
258 if self._running:
259 nextDelay = self._nextDelay()
260 self._callId = reactor.callLater(nextDelay, self._retry)
261 else:
262 self._masterD.errback(failure)
263 self._masterD = None
264
277