1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 implementation of a PB Client to interface with feedserver.py
20 """
21
22 import socket
23 import os
24
25 from twisted.internet import reactor, main, defer, tcp
26 from twisted.python import failure
27 from zope.interface import implements
28
29 from flumotion.common import log, common, interfaces
30 from flumotion.twisted import pb as fpb
31
32 __version__ = "$Rev$"
33
34
35
36
37
58
59
62
63
73
74
86
87
88
89
91 """
92 I am a client for a Feed Server.
93
94 I am used as the remote interface between a component and another
95 component.
96
97 @ivar component: the component this is a feed client for
98 @type component: L{flumotion.component.feedcomponent.FeedComponent}
99 @ivar remote: a reference to a
100 L{flumotion.worker.feedserver.FeedAvatar}
101 @type remote: L{twisted.spread.pb.RemoteReference}
102 """
103 logCategory = 'feedmedium'
104 remoteLogName = 'feedserver'
105 implements(interfaces.IFeedMedium)
106
107 remote = None
108
115
116 - def startConnecting(self, host, port, authenticator, timeout=30,
117 bindAddress=None):
118 """Optional helper method to connect to a remote feed server.
119
120 This method starts a client factory connecting via a
121 L{PassableClientConnector}. It offers the possibility of
122 cancelling an in-progress connection via the stopConnecting()
123 method.
124
125 @param host: the remote host name
126 @type host: str
127 @param port: the tcp port on which to connect
128 @param port int
129 @param authenticator: the authenticator, normally provided by
130 the worker
131 @type authenticator: L{flumotion.twisted.pb.Authenticator}
132
133 @returns: a deferred that will fire with the remote reference,
134 once we have authenticated.
135 """
136 assert self._factory is None
137 self._factory = FeedClientFactory(self)
138 c = PassableClientConnector(host, port, self._factory, timeout,
139 bindAddress, reactor=reactor)
140 c.connect()
141
142 return self._factory.login(authenticator)
143
144 - def requestFeed(self, host, port, authenticator, fullFeedId):
145 """Request a feed from a remote feed server.
146
147 This helper method calls startConnecting() to make the
148 connection and authenticate, and will return the feed file
149 descriptor or an error. A pending connection attempt can be
150 cancelled via stopConnecting().
151
152 @param host: the remote host name
153 @type host: str
154 @param port: the tcp port on which to connect
155 @type port: int
156 @param authenticator: the authenticator, normally provided by
157 the worker
158 @type authenticator: L{flumotion.twisted.pb.Authenticator}
159 @param fullFeedId: the full feed id (/flow/component:feed)
160 offered by the remote side
161 @type fullFeedId: str
162
163 @returns: a deferred that, if successful, will fire with a pair
164 (feedId, fd). In an error case it will errback and close the
165 remote connection.
166 """
167
168 def connected(remote):
169 self.setRemoteReference(remote)
170 return remote.callRemote('sendFeed', fullFeedId)
171
172 def feedSent(res):
173
174
175
176
177 return self._feedToDeferred
178
179 def error(failure):
180 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
181 host, port)
182 self.debug('failure: %s', log.getFailureMessage(failure))
183 self.debug('closing connection')
184 self.stopConnecting()
185 return failure
186
187 d = self.startConnecting(host, port, authenticator)
188 d.addCallback(connected)
189 d.addCallback(feedSent)
190 d.addErrback(error)
191 return d
192
193 - def sendFeed(self, host, port, authenticator, fullFeedId):
194 """Send a feed to a remote feed server.
195
196 This helper method calls startConnecting() to make the
197 connection and authenticate, and will return the feed file
198 descriptor or an error. A pending connection attempt can be
199 cancelled via stopConnecting().
200
201 @param host: the remote host name
202 @type host: str
203 @param port: the tcp port on which to connect
204 @type port: int
205 @param authenticator: the authenticator, normally provided by
206 the worker
207 @type authenticator: L{flumotion.twisted.pb.Authenticator}
208 @param fullFeedId: the full feed id (/flow/component:eaterAlias)
209 to feed to on the remote size
210 @type fullFeedId: str
211
212 @returns: a deferred that, if successful, will fire with a pair
213 (feedId, fd). In an error case it will errback and close the
214 remote connection.
215 """
216
217 def connected(remote):
218 assert isinstance(remote.broker.transport, _SocketMaybeCloser)
219 self.setRemoteReference(remote)
220 return remote.callRemote('receiveFeed', fullFeedId)
221
222 def feedSent(res):
223 t = self.remote.broker.transport
224 self.debug('stop reading from transport')
225 t.stopReading()
226
227 self.debug('flushing PB write queue')
228 t.doWrite()
229 self.debug('stop writing to transport')
230 t.stopWriting()
231
232 t.keepSocketAlive = True
233 fd = os.dup(t.fileno())
234
235
236 self.setRemoteReference(None)
237
238 d = defer.Deferred()
239
240 def loseConnection():
241 t.connectionLost(failure.Failure(main.CONNECTION_DONE))
242 d.callback((fullFeedId, fd))
243
244 reactor.callLater(0, loseConnection)
245 return d
246
247 def error(failure):
248 self.warning('failed to retrieve %s from %s:%d', fullFeedId,
249 host, port)
250 self.debug('failure: %s', log.getFailureMessage(failure))
251 self.debug('closing connection')
252 self.stopConnecting()
253 return failure
254
255 d = self.startConnecting(host, port, authenticator)
256 d.addCallback(connected)
257 d.addCallback(feedSent)
258 d.addErrback(error)
259 return d
260
262 """Stop a pending or established connection made via
263 startConnecting().
264
265 Stops any established or pending connection to a remote feed
266 server started via the startConnecting() method. Safe to call
267 even if connection has not been started.
268 """
269 if self._factory:
270 self._factory.disconnect()
271 self._factory = None
272
273
274 self.setRemoteReference(None)
275
276
277
279 self.remote = remoteReference
280
282 return self.remote is not None
283
286
288 t = self.remote.broker.transport
289
290 self.debug('stop reading from transport')
291 t.stopReading()
292 reactor.callLater(0, self._doFeedTo, fullFeedId, t)
293
320