XRootD
Loading...
Searching...
No Matches
XrdClStream.hh
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//------------------------------------------------------------------------------
18
19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
21
22#include "XrdCl/XrdClPoller.hh"
23#include "XrdCl/XrdClStatus.hh"
24#include "XrdCl/XrdClURL.hh"
28#include "XrdCl/XrdClInQueue.hh"
29#include "XrdCl/XrdClUtils.hh"
30
33#include "XrdNet/XrdNetAddr.hh"
35#include <list>
36#include <vector>
37#include <functional>
38#include <memory>
39
40namespace XrdCl
41{
42 class Message;
43 class Channel;
44 class TransportHandler;
45 class TaskManager;
46 struct SubStreamData;
47
48 //----------------------------------------------------------------------------
50 //----------------------------------------------------------------------------
51 class Stream
52 {
53 public:
54 //------------------------------------------------------------------------
56 //------------------------------------------------------------------------
64
65 //------------------------------------------------------------------------
67 //------------------------------------------------------------------------
68 Stream( std::shared_ptr<URL> url, const URL &prefer = URL() );
69
70 //------------------------------------------------------------------------
72 //------------------------------------------------------------------------
73 ~Stream();
74
75 //------------------------------------------------------------------------
77 //------------------------------------------------------------------------
79
80 //------------------------------------------------------------------------
82 //------------------------------------------------------------------------
84 MsgHandler *handler,
85 bool stateful,
86 time_t expires );
87
88 //------------------------------------------------------------------------
90 //------------------------------------------------------------------------
91 void SetTransport( TransportHandler *transport )
92 {
93 pTransport = transport;
94 }
95
96 //------------------------------------------------------------------------
98 //------------------------------------------------------------------------
99 void SetPoller( Poller *poller )
100 {
101 pPoller = poller;
102 }
103
104 //------------------------------------------------------------------------
106 //------------------------------------------------------------------------
107 void SetIncomingQueue( InQueue *incomingQueue )
108 {
109 pIncomingQueue = incomingQueue;
110 }
111
112 //------------------------------------------------------------------------
114 //------------------------------------------------------------------------
115 void SetChannelData( AnyObject *channelData )
116 {
117 pChannelData = channelData;
118 }
119
120 //------------------------------------------------------------------------
122 //------------------------------------------------------------------------
123 void SetTaskManager( TaskManager *taskManager )
124 {
125 pTaskManager = taskManager;
126 }
127
128 //------------------------------------------------------------------------
130 //------------------------------------------------------------------------
131 void SetJobManager( JobManager *jobManager )
132 {
133 pJobManager = jobManager;
134 }
135
136 //------------------------------------------------------------------------
140 //------------------------------------------------------------------------
142
143 //------------------------------------------------------------------------
145 //------------------------------------------------------------------------
146 void Disconnect( bool force = false );
147
148 //------------------------------------------------------------------------
151 //------------------------------------------------------------------------
152 void Tick( time_t now );
153
154 //------------------------------------------------------------------------
156 //------------------------------------------------------------------------
157 const URL *GetURL() const
158 {
159 return pUrl.get();
160 }
161
162 //------------------------------------------------------------------------
164 //------------------------------------------------------------------------
165 void ForceConnect();
166
167 //------------------------------------------------------------------------
169 //------------------------------------------------------------------------
170 const std::string &GetName() const
171 {
172 return pStreamName;
173 }
174
175 //------------------------------------------------------------------------
177 //------------------------------------------------------------------------
178 void DisableIfEmpty( uint16_t subStream );
179
180 //------------------------------------------------------------------------
182 //------------------------------------------------------------------------
183 void OnIncoming( uint16_t subStream,
184 std::shared_ptr<Message> msg,
185 uint32_t bytesReceived );
186
187 //------------------------------------------------------------------------
188 // Call when one of the sockets is ready to accept a new message
189 //------------------------------------------------------------------------
190 std::pair<Message *, MsgHandler *>
191 OnReadyToWrite( uint16_t subStream );
192
193 //------------------------------------------------------------------------
194 // Call when a message is written to the socket
195 //------------------------------------------------------------------------
196 void OnMessageSent( uint16_t subStream,
197 Message *msg,
198 uint32_t bytesSent );
199
200 //------------------------------------------------------------------------
202 //------------------------------------------------------------------------
203 void OnConnect( uint16_t subStream );
204
205 //------------------------------------------------------------------------
207 //------------------------------------------------------------------------
208 void OnConnectError( uint16_t subStream, XRootDStatus status );
209
210 //------------------------------------------------------------------------
212 //------------------------------------------------------------------------
213 void OnError( uint16_t subStream, XRootDStatus status );
214
215 //------------------------------------------------------------------------
217 //------------------------------------------------------------------------
218 void ForceError( XRootDStatus status, bool hush=false );
219
220 //------------------------------------------------------------------------
222 //------------------------------------------------------------------------
223 bool OnReadTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
224
225 //------------------------------------------------------------------------
227 //------------------------------------------------------------------------
228 bool OnWriteTimeout( uint16_t subStream ) XRD_WARN_UNUSED_RESULT;
229
230 //------------------------------------------------------------------------
232 //------------------------------------------------------------------------
234
235 //------------------------------------------------------------------------
237 //------------------------------------------------------------------------
239
240 //------------------------------------------------------------------------
249 //------------------------------------------------------------------------
251 InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream );
252
253 //------------------------------------------------------------------------
257 //------------------------------------------------------------------------
258 uint16_t InspectStatusRsp( uint16_t stream, MsgHandler *&incHandler );
259
260 //------------------------------------------------------------------------
262 //------------------------------------------------------------------------
263 void SetOnDataConnectHandler( std::shared_ptr<Job> &onConnJob )
264 {
265 XrdSysMutexHelper scopedLock( pMutex );
266 pOnDataConnJob = onConnJob;
267 }
268
269 //------------------------------------------------------------------------
272 //------------------------------------------------------------------------
273 bool CanCollapse( const URL &url );
274
275 //------------------------------------------------------------------------
277 //------------------------------------------------------------------------
278 Status Query( uint16_t query, AnyObject &result );
279
280 private:
281
282 //------------------------------------------------------------------------
284 //------------------------------------------------------------------------
285 static bool IsPartial( Message &msg );
286
287 //------------------------------------------------------------------------
289 //------------------------------------------------------------------------
290 inline static bool HasNetAddr( const XrdNetAddr &addr,
291 std::vector<XrdNetAddr> &addresses )
292 {
293 auto itr = addresses.begin();
294 for( ; itr != addresses.end() ; ++itr )
295 {
296 if( itr->Same( &addr ) ) return true;
297 }
298
299 return false;
300 }
301
302 //------------------------------------------------------------------------
303 // Job handling force disconnect
304 //------------------------------------------------------------------------
305 class ForceDisconnectJob: public Job
306 {
307 public:
308 ForceDisconnectJob( std::shared_ptr<URL> url ) : pUrl( url ) {}
309 virtual ~ForceDisconnectJob() {}
310 virtual void Run( void* )
311 {
313 delete this;
314 }
315 private:
316 std::shared_ptr<URL> pUrl;
317 };
318
319 //------------------------------------------------------------------------
320 // Job handling the incoming messages
321 //------------------------------------------------------------------------
322 class HandleIncMsgJob: public Job
323 {
324 public:
325 HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
326 virtual ~HandleIncMsgJob() {};
327 virtual void Run( void* )
328 {
329 pHandler->Process();
330 delete this;
331 }
332 private:
333 MsgHandler *pHandler;
334 };
335
336 //------------------------------------------------------------------------
338 //------------------------------------------------------------------------
339 void OnFatalError( uint16_t subStream,
340 XRootDStatus status,
341 XrdSysMutexHelper &lock );
342
343 //------------------------------------------------------------------------
345 //------------------------------------------------------------------------
346 void MonitorDisconnection( XRootDStatus status );
347
348 //------------------------------------------------------------------------
350 //------------------------------------------------------------------------
351 XRootDStatus RequestClose( Message &resp );
352
353 typedef std::vector<SubStreamData*> SubStreamList;
354
355 //------------------------------------------------------------------------
356 // Data members
357 //------------------------------------------------------------------------
358 std::shared_ptr<URL> pUrl;
359 const URL pPrefer;
360 std::string pStreamName;
361 TransportHandler *pTransport;
362 Poller *pPoller;
363 TaskManager *pTaskManager;
364 JobManager *pJobManager;
365 XrdSysRecMutex pMutex;
366 InQueue *pIncomingQueue;
367 AnyObject *pChannelData;
368 uint32_t pLastStreamError;
369 XRootDStatus pLastFatalError;
370 uint16_t pStreamErrorWindow;
371 uint16_t pConnectionCount;
372 uint16_t pConnectionRetry;
373 time_t pConnectionInitTime;
374 uint16_t pConnectionWindow;
375 SubStreamList pSubStreams;
376 std::vector<XrdNetAddr> pAddresses;
377 Utils::AddressType pAddressType;
378 ChannelHandlerList pChannelEvHandlers;
379 uint64_t pSessionId;
380 ForceDisconnectJob *pTTLDiscJob;
381 std::atomic<int> pSubsWaitingClose;
382
383 //------------------------------------------------------------------------
384 // When disconnecting other sub-streams from within a poller callback
385 // we call IOEvents::Channel::Delete, potentially on channels with an
386 // active callback in a different thread. The poller will require that
387 // callback to finish before Delete returns. However those theads may
388 // block waiting on us to complete, either for our poller to be idle or
389 // due to one of our mutexes. Detect that situation by noting when all
390 // sub-streams are about to be closed. Protected by mutex below. In case
391 // bother are needed, lock order should be pDiscMtuex, pMutex.
392 //------------------------------------------------------------------------
393 XrdSysCondVar pDiscCV;
394 int pDiscAllCnt;
395
396 //------------------------------------------------------------------------
397 // Monitoring info
398 //------------------------------------------------------------------------
399 timeval pConnectionStarted;
400 timeval pConnectionDone;
401 std::atomic<uint64_t> pBytesSent;
402 std::atomic<uint64_t> pBytesReceived;
403
404 //------------------------------------------------------------------------
405 // Data stream on-connect handler
406 //------------------------------------------------------------------------
407 std::shared_ptr<Job> pOnDataConnJob;
408
409 //------------------------------------------------------------------------
410 // Track last assigned Id across all Streams, to ensure unique sessionId
411 //------------------------------------------------------------------------
412 static RAtomic_uint64_t sSessCntGen;
413 };
414}
415
416#endif // __XRD_CL_STREAM_HH__
#define XRD_WARN_UNUSED_RESULT
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
A communication channel between the client and the server.
static PostMaster * GetPostMaster()
Get default post master.
A synchronize queue for incoming data.
A synchronized queue.
The message representation used throughout the system.
Interface for socket pollers.
Status ForceDisconnect(const URL &url)
Shut down a channel.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void SetTransport(TransportHandler *transport)
Set the transport.
StreamStatus
Status of the stream.
@ Disconnected
Not connected.
@ Error
Broken.
@ Connected
Connected.
@ Connecting
In the process of being connected.
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
void ForceConnect()
Force connection.
Stream(std::shared_ptr< URL > url, const URL &prefer=URL())
Constructor.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
void SetTaskManager(TaskManager *taskManager)
Set task manager.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void SetJobManager(JobManager *jobManager)
Set job manager.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void SetChannelData(AnyObject *channelData)
Set the channel data.
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Perform the handshake and the authentication for each physical stream.
URL representation.
Definition XrdClURL.hh:31
AddressType
Address type.
Definition XrdClUtils.hh:87
QueryImpl< false > Query
Procedure execution status.