XRootD
Loading...
Searching...
No Matches
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (std::shared_ptr< URL > url, const URL &prefer=URL())
 Constructor.
 
 ~Stream ()
 Destructor.
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty.
 
void Disconnect (bool force=false)
 Disconnect the stream.
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection.
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error.
 
const std::string & GetName () const
 Return stream name.
 
const URLGetURL () const
 Get the URL.
 
XRootDStatus Initialize ()
 Initializer.
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed.
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error.
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error.
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed.
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout.
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout.
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream.
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler.
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler.
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending.
 
void SetChannelData (AnyObject *channelData)
 Set the channel data.
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue.
 
void SetJobManager (JobManager *jobManager)
 Set job manager.
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams.
 
void SetPoller (Poller *poller)
 Set the poller.
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager.
 
void SetTransport (TransportHandler *transport)
 Set the transport.
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58 {
59 Disconnected = 0,
60 Connected = 1,
61 Connecting = 2,
62 Error = 3
63 };
@ Disconnected
Not connected.
@ Error
Broken.
@ Connected
Connected.
@ Connecting
In the process of being connected.

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( std::shared_ptr< URL > url,
const URL & prefer = URL() )

Constructor.

Definition at line 96 of file XrdClStream.cc.

96 :
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pTTLDiscJob( nullptr ),
111 pSubsWaitingClose( 0 ),
112 pDiscCV( 0 ),
113 pDiscAllCnt( 0 ),
114 pBytesSent( 0 ),
115 pBytesReceived( 0 )
116 {
117 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
118 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
119
120 std::ostringstream o;
121 o << pUrl->GetHostId();
122 pStreamName = o.str();
123
124 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
126 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
128 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
130
131 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
133
134 pAddressType = Utils::String2AddressType( netStack );
135 if( pAddressType == Utils::AddressType::IPAuto )
136 {
138 if( !( stacks & XrdNetUtils::hasIP64 ) )
139 {
140 if( stacks & XrdNetUtils::hasIPv4 )
141 pAddressType = Utils::AddressType::IPv4;
142 else if( stacks & XrdNetUtils::hasIPv6 )
143 pAddressType = Utils::AddressType::IPv6;
144 }
145 }
146
147 Log *log = DefaultEnv::GetLog();
148 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
149 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
150 "Window: %d", pStreamName.c_str(), netStack.c_str(),
151 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
152 }
static Log * GetLog()
Get default log.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdCl::Utils::IPAuto, XrdCl::Utils::IPv4, XrdCl::Utils::IPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, XrdNetUtils::qryINIF, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 157 of file XrdClStream.cc.

158 {
159 Disconnect( true );
160
161 Log *log = DefaultEnv::GetLog();
162 log->Debug( PostMasterMsg, "[%s] Destroying stream",
163 pStreamName.c_str() );
164
165 MonitorDisconnection( XRootDStatus() );
166
167 SubStreamList::iterator it;
168 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
169 delete *it;
170 }
void Disconnect(bool force=false)
Disconnect the stream.

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL & url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1353 of file XrdClStream.cc.

1354 {
1355 Log *log = DefaultEnv::GetLog();
1356
1357 //--------------------------------------------------------------------------
1358 // Resolve all the addresses of the host we're supposed to connect to
1359 //--------------------------------------------------------------------------
1360 std::vector<XrdNetAddr> prefaddrs;
1361 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1362 if( !st.IsOK() )
1363 {
1364 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1365 , pStreamName.c_str(), url.GetHostName().c_str() );
1366 return false;
1367 }
1368
1369 //--------------------------------------------------------------------------
1370 // Resolve all the addresses of the alias
1371 //--------------------------------------------------------------------------
1372 std::vector<XrdNetAddr> aliasaddrs;
1373 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1374 if( !st.IsOK() )
1375 {
1376 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1377 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1378 return false;
1379 }
1380
1381 //--------------------------------------------------------------------------
1382 // Now check if the preferred host is part of the alias
1383 //--------------------------------------------------------------------------
1384 auto itr = prefaddrs.begin();
1385 for( ; itr != prefaddrs.end() ; ++itr )
1386 {
1387 auto itr2 = aliasaddrs.begin();
1388 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1389 if( itr->Same( &*itr2 ) ) return true;
1390 }
1391
1392 return false;
1393 }
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t subStream)

Disables respective uplink if empty.

Definition at line 607 of file XrdClStream.cc.

608 {
609 XrdSysMutexHelper scopedLock( pMutex );
610 Log *log = DefaultEnv::GetLog();
611
612 if( pSubStreams[subStream]->outQueue->IsEmpty() )
613 {
614 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
615 pSubStreams[subStream]->socket->GetStreamName().c_str() );
616 pSubStreams[subStream]->socket->DisableUplink();
617 }
618 }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool force = false)

Disconnect the stream.

Definition at line 367 of file XrdClStream.cc.

368 {
369 //--------------------------------------------------------------------------
370 // See comment about deadlocks in ForceError() method. We don't expect
371 // to be called from a callback thread.
372 //--------------------------------------------------------------------------
373 XrdSysCondVarHelper discLock( pDiscCV );
374 while ( pDiscAllCnt )
375 {
376 pDiscCV.Wait();
377 }
378 ++pDiscAllCnt;
379 discLock.UnLock();
380
381 XrdSysMutexHelper scopedLock( pMutex );
382 SubStreamList::iterator it;
383 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
384 {
385 (*it)->socket->Close();
386 (*it)->status = Socket::Disconnected;
387 }
388 pSubsWaitingClose = 0;
389
390 scopedLock.UnLock();
391 discLock.Lock( &pDiscCV );
392 --pDiscAllCnt;
393 pDiscCV.Signal();
394 }
@ Disconnected
The socket is disconnected.

References XrdCl::Socket::Disconnected, XrdSysCondVarHelper::Lock(), XrdSysCondVarHelper::UnLock(), and XrdSysMutexHelper::UnLock().

Referenced by ~Stream().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID & path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 191 of file XrdClStream.cc.

192 {
193 XrdSysMutexHelper scopedLock( pMutex );
194
195 //--------------------------------------------------------------------------
196 // We are in the process of connecting the main stream, so we do nothing
197 // because when the main stream connection is established it will connect
198 // all the other streams
199 //--------------------------------------------------------------------------
200 if( pSubStreams[0]->status == Socket::Connecting )
201 return XRootDStatus();
202
203 //--------------------------------------------------------------------------
204 // The main stream is connected, so we can verify whether we have
205 // the up and the down stream connected and ready to handle data.
206 // If anything is not right we fall back to stream 0.
207 //--------------------------------------------------------------------------
208 if( pSubStreams[0]->status == Socket::Connected )
209 {
210 if( pSubStreams[path.down]->status != Socket::Connected )
211 path.down = 0;
212
213 if( pSubStreams[path.up]->status == Socket::Disconnected )
214 {
215 path.up = 0;
216 return pSubStreams[0]->socket->EnableUplink();
217 }
218
219 if( pSubStreams[path.up]->status == Socket::Connected )
220 return pSubStreams[path.up]->socket->EnableUplink();
221
222 return XRootDStatus();
223 }
224
225 //--------------------------------------------------------------------------
226 // The main stream is not connected, we need to check whether enough time
227 // has passed since we last encountered an error (if any) so that we could
228 // re-attempt the connection
229 //--------------------------------------------------------------------------
230 Log *log = DefaultEnv::GetLog();
231 time_t now = ::time(0);
232
233 if( now-pLastStreamError < pStreamErrorWindow )
234 return pLastFatalError;
235
236 gettimeofday( &pConnectionStarted, 0 );
237 ++pConnectionCount;
238
239 //--------------------------------------------------------------------------
240 // Resolve all the addresses of the host we're supposed to connect to
241 //--------------------------------------------------------------------------
242 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
243 if( !st.IsOK() )
244 {
245 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
246 "the host", pStreamName.c_str() );
247 pLastStreamError = now;
248 st.status = stFatal;
249 pLastFatalError = st;
250 return st;
251 }
252
253 if( pPrefer.IsValid() )
254 {
255 std::vector<XrdNetAddr> addrresses;
256 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
257 if( !st.IsOK() )
258 {
259 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
260 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
261 }
262 else
263 {
264 std::vector<XrdNetAddr> tmp;
265 tmp.reserve( pAddresses.size() );
266 // first add all remaining addresses
267 auto itr = pAddresses.begin();
268 for( ; itr != pAddresses.end() ; ++itr )
269 {
270 if( !HasNetAddr( *itr, addrresses ) )
271 tmp.push_back( *itr );
272 }
273 // then copy all 'preferred' addresses
274 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
275 // and keep the result
276 pAddresses.swap( tmp );
277 }
278 }
279
280 Utils::LogHostAddresses( log, PostMasterMsg, pUrl->GetHostId(),
281 pAddresses );
282
283 while( !pAddresses.empty() )
284 {
285 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
286 pAddresses.pop_back();
287 pConnectionInitTime = ::time( 0 );
288 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
289 if( st.IsOK() )
290 {
291 pSubStreams[0]->status = Socket::Connecting;
292 break;
293 }
294 }
295 return st;
296 }
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 351 of file XrdClStream.cc.

352 {
353 XrdSysMutexHelper scopedLock( pMutex );
354 if( pSubStreams[0]->status == Socket::Connecting )
355 {
356 pSubStreams[0]->status = Socket::Disconnected;
357 XrdCl::PathID path( 0, 0 );
358 XrdCl::XRootDStatus st = EnableLink( path );
359 if( !st.IsOK() )
360 OnConnectError( 0, st );
361 }
362 }
XRootDStatus EnableLink(PathID &path)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool IsOK() const
We're fine.

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

+ Here is the call graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus status,
bool hush = false )

Force error.

Definition at line 1042 of file XrdClStream.cc.

1043 {
1044 //----------------------------------------------------------------------
1045 // We can be called in two ways: first is by by a non-poller thread,
1046 // with errOperationInterrupted as error as part of ForceDisconnect. In
1047 // which case the Stream will be destoryed shortly after we return. The
1048 // second way is call by a poller thread with another type of error.
1049 // Further, when we call socket handler Close() for a socket handled a
1050 // callback running we wait for that to complete (unless it is
1051 // ourselves). This raises the possibility of a deadlock. We avoid this
1052 // by returning quickly if we detect we're in a callback thread and
1053 // there's already a disconnect affecting multiple streams in progress.
1054 //----------------------------------------------------------------------
1055 XrdSysCondVarHelper discLock( pDiscCV );
1056 if( pDiscAllCnt &&
1057 !( status.IsError() && status.code == errOperationInterrupted ) )
1058 {
1059 return;
1060 }
1061 while( pDiscAllCnt )
1062 {
1063 pDiscCV.Wait();
1064 }
1065 ++pDiscAllCnt;
1066 discLock.UnLock();
1067
1068 XrdSysMutexHelper scopedLock( pMutex );
1069 Log *log = DefaultEnv::GetLog();
1070 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1071 {
1072 if( pSubStreams[substream]->status != Socket::Connected ) continue;
1073 pSubStreams[substream]->socket->Close();
1074 pSubStreams[substream]->status = Socket::Disconnected;
1075
1076 if( !hush )
1077 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1078 pStreamName.c_str(), status.ToString().c_str() );
1079
1080 //--------------------------------------------------------------------
1081 // Reinsert the stuff that we have failed to sent
1082 //--------------------------------------------------------------------
1083 if( pSubStreams[substream]->outMsgHelper.msg )
1084 {
1085 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
1086 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
1087 h.stateful );
1088 pIncomingQueue->RemoveMessageHandler(h.handler);
1089 pSubStreams[substream]->outMsgHelper.Reset();
1090 }
1091
1092 //--------------------------------------------------------------------
1093 // Reinsert the receiving handler and reset any partially read partial
1094 //--------------------------------------------------------------------
1095 if( pSubStreams[substream]->inMsgHelper.handler )
1096 {
1097 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
1098 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1099 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
1100 if( xrdHandler ) xrdHandler->PartialReceived();
1101 h.Reset();
1102 }
1103 }
1104
1105 pConnectionCount = 0;
1106 pSubsWaitingClose = 0;
1107
1108 //------------------------------------------------------------------------
1109 // We're done here, unlock the stream mutex to avoid deadlocks and
1110 // report the disconnection event to the handlers
1111 //------------------------------------------------------------------------
1112 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1113 "message handlers.", pStreamName.c_str() );
1114
1115 SubStreamList::iterator it;
1116 OutQueue q;
1117 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1118 q.GrabItems( *(*it)->outQueue );
1119 scopedLock.UnLock();
1120
1121 q.Report( status );
1122
1123 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1124 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1125
1126 discLock.Lock( &pDiscCV );
1127 --pDiscAllCnt;
1128 pDiscCV.Signal();
1129 }
@ Broken
The stream is broken.
const uint16_t errOperationInterrupted

References XrdCl::MsgHandler::Broken, XrdCl::Status::code, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::errOperationInterrupted, XrdCl::InMessageHelper::expires, XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::handler, XrdCl::Status::IsError(), XrdSysCondVarHelper::Lock(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), XrdSysCondVarHelper::UnLock(), and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

◆ GetName()

const std::string & XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171 {
172 return pStreamName;
173 }

◆ GetURL()

const URL * XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158 {
159 return pUrl.get();
160 }

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 175 of file XrdClStream.cc.

176 {
177 if( !pTransport || !pPoller || !pChannelData )
178 return XRootDStatus( stError, errUninitialized );
179
180 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
181 pChannelData, 0, this );
182 pSubStreams.push_back( new SubStreamData() );
183 pSubStreams[0]->socket = s;
184 return XRootDStatus();
185 }
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errUninitialized, and XrdCl::stError.

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t stream,
MsgHandler *& incHandler )

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1322 of file XrdClStream.cc.

1324 {
1325 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1326 if( !mh.handler )
1328
1329 uint16_t action = mh.handler->InspectStatusRsp();
1330 mh.action |= action;
1331
1332 if( action & MsgHandler::RemoveHandler )
1333 pIncomingQueue->RemoveMessageHandler( mh.handler );
1334
1335 if( action & MsgHandler::Raw )
1336 {
1337 incHandler = mh.handler;
1338 return MsgHandler::Raw;
1339 }
1340
1341 if( action & MsgHandler::Corrupted )
1342 return MsgHandler::Corrupted;
1343
1344 if( action & MsgHandler::More )
1345 return MsgHandler::More;
1346
1347 return MsgHandler::None;
1348 }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, and XrdCl::MsgHandler::RemoveHandler.

+ Here is the call graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > & msg,
uint16_t stream )

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1301 of file XrdClStream.cc.

1302 {
1303 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1304 if( !mh.handler )
1305 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1306 mh.expires,
1307 mh.action );
1308
1309 if( !mh.handler )
1310 return nullptr;
1311
1312 if( mh.action & MsgHandler::Raw )
1313 return mh.handler;
1314 return nullptr;
1315 }

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t subStream)

Call back when a message has been reconstructed.

Definition at line 645 of file XrdClStream.cc.

646 {
647 XrdSysMutexHelper scopedLock( pMutex );
648 if( subStream == 0 )
649 {
650 int nsubconn = 0;
651 if( pSubStreams.size() > 1 )
652 {
653 for( size_t i = 1; i < pSubStreams.size(); ++i )
654 if( pSubStreams[i]->status != Socket::Disconnected ) nsubconn++;
655 }
656 if( nsubconn )
657 {
658 pSubsWaitingClose = nsubconn;
659 pSubStreams[0]->socket->DisableUplink();
660 return;
661 }
662 else
663 pSubStreams[0]->socket->EnableUplink();
664 }
665 else
666 {
667 if( pSubsWaitingClose > 0 )
668 {
669 pSubStreams[subStream]->socket->Close();
670 pSubStreams[subStream]->status = Socket::Disconnected;
671 if( --pSubsWaitingClose == 0 )
672 {
673 scopedLock.UnLock();
674 OnConnect( 0 );
675 }
676 return;
677 }
678 }
679
680 pSubStreams[subStream]->status = Socket::Connected;
681
682 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
683 Log *log = DefaultEnv::GetLog();
684 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
685 subStream, ipstack.c_str() );
686
687 if( subStream == 0 )
688 {
689 pLastStreamError = 0;
690 pLastFatalError = XRootDStatus();
691 pConnectionCount = 0;
692 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
693 pSessionId = ++sSessCntGen;
694
695 //------------------------------------------------------------------------
696 // Create the streams if they don't exist yet
697 //------------------------------------------------------------------------
698 if( pSubStreams.size() == 1 && numSub > 1 )
699 {
700 for( uint16_t i = 1; i < numSub; ++i )
701 {
702 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
703 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
704 pChannelData, i, this );
705 pSubStreams.push_back( new SubStreamData() );
706 pSubStreams[i]->socket = s;
707 }
708 }
709
710 //------------------------------------------------------------------------
711 // Connect the extra streams, if we fail we move all the outgoing items
712 // to stream 0, we don't need to enable the uplink here, because it
713 // should be already enabled after the handshaking process is completed.
714 //------------------------------------------------------------------------
715 if( pSubStreams.size() > 1 )
716 {
717 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
718 pStreamName.c_str(), pSubStreams.size() - 1 );
719 for( size_t i = 1; i < pSubStreams.size(); ++i )
720 {
721 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
722 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
723 if( !st.IsOK() )
724 {
725 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
726 // mark as disconnected. We don't try to actively Close here as
727 // we're in a poller callback thread and the i'th substream here
728 // may be hadled by a different poller callback thread, raising
729 // the possibility of deadlock.
730 pSubStreams[i]->status = Socket::Disconnected;
731 }
732 else
733 {
734 pSubStreams[i]->status = Socket::Connecting;
735 }
736 }
737 }
738
739 //------------------------------------------------------------------------
740 // Inform monitoring
741 //------------------------------------------------------------------------
742 pBytesSent = 0;
743 pBytesReceived = 0;
744 gettimeofday( &pConnectionDone, 0 );
745 Monitor *mon = DefaultEnv::GetMonitor();
746 if( mon )
747 {
748 Monitor::ConnectInfo i;
749 i.server = pUrl->GetHostId();
750 i.sTOD = pConnectionStarted;
751 i.eTOD = pConnectionDone;
752 i.streams = pSubStreams.size();
753
754 AnyObject qryResult;
755 std::string *qryResponse = nullptr;
756 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
757 qryResult.Get( qryResponse );
758
759 if (qryResponse) {
760 i.auth = *qryResponse;
761 delete qryResponse;
762 } else {
763 i.auth = "";
764 }
765
766 mon->Event( Monitor::EvConnect, &i );
767 }
768
769 //------------------------------------------------------------------------
770 // For every connected control-stream call the global on-connect handler
771 //------------------------------------------------------------------------
773 }
774 else if( pOnDataConnJob )
775 {
776 //------------------------------------------------------------------------
777 // For every connected data-stream call the on-connect handler
778 //------------------------------------------------------------------------
779 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
780 }
781 }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::TransportQuery::Auth, XrdCl::Monitor::ConnectInfo::auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), OnConnect(), XrdCl::PostMasterMsg, XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdSysMutexHelper::UnLock().

Referenced by OnConnect(), OnConnectError(), OnError(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t subStream,
XRootDStatus status )

On connect error.

Definition at line 786 of file XrdClStream.cc.

787 {
788 XrdSysMutexHelper scopedLock( pMutex );
789 Log *log = DefaultEnv::GetLog();
790 pSubStreams[subStream]->socket->Close();
791 time_t now = ::time(0);
792
793 //--------------------------------------------------------------------------
794 // For every connection error call the global connection error handler
795 //--------------------------------------------------------------------------
797
798 //--------------------------------------------------------------------------
799 // If we connected subStream == 0 and cannot connect >0 then we just give
800 // up and move the outgoing messages to another queue
801 //--------------------------------------------------------------------------
802 if( subStream > 0 )
803 {
804 const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
805 pSubStreams[subStream]->status = Socket::Disconnected;
806
807 if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
808 {
809 if( --pSubsWaitingClose == 0 )
810 {
811 scopedLock.UnLock();
812 OnConnect( 0 );
813 }
814 return;
815 }
816
817 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
818 if( pSubStreams[0]->status == Socket::Connected )
819 {
820 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
821 if( !st.IsOK() )
822 OnFatalError( 0, st, scopedLock );
823 return;
824 }
825
826 if( pSubStreams[0]->status == Socket::Connecting )
827 return;
828
829 OnFatalError( subStream, status, scopedLock );
830 return;
831 }
832
833 //--------------------------------------------------------------------------
834 // Check if we still have time to try and do something in the current window
835 //--------------------------------------------------------------------------
836 time_t elapsed = now-pConnectionInitTime;
837 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
838 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
839
840 //------------------------------------------------------------------------
841 // If we have some IP addresses left we try them
842 //------------------------------------------------------------------------
843 if( !pAddresses.empty() )
844 {
845 XRootDStatus st;
846 do
847 {
848 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
849 pAddresses.pop_back();
850 pConnectionInitTime = ::time( 0 );
851 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
852 }
853 while( !pAddresses.empty() && !st.IsOK() );
854
855 if( !st.IsOK() )
856 OnFatalError( subStream, st, scopedLock );
857
858 return;
859 }
860 //------------------------------------------------------------------------
861 // If we still can retry with the same host name, we sleep until the end
862 // of the connection window and try
863 //------------------------------------------------------------------------
864 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
865 && !status.IsFatal() )
866 {
867 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
868 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
869
870 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
871 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
872 return;
873 }
874 //--------------------------------------------------------------------------
875 // We are out of the connection window, the only thing we can do here
876 // is re-resolving the host name and retrying if we still can
877 //--------------------------------------------------------------------------
878 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
879 {
880 pAddresses.clear();
881 pSubStreams[0]->status = Socket::Disconnected;
882 PathID path( 0, 0 );
883 XRootDStatus st = EnableLink( path );
884 if( !st.IsOK() )
885 OnFatalError( subStream, st, scopedLock );
886 return;
887 }
888
889 //--------------------------------------------------------------------------
890 // Else, we fail
891 //--------------------------------------------------------------------------
892 OnFatalError( subStream, status, scopedLock );
893 }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
SocketStatus
Status of the socket.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), OnConnect(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by ForceConnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t subStream,
XRootDStatus status )

On error.

Definition at line 898 of file XrdClStream.cc.

899 {
900 //--------------------------------------------------------------------------
901 // See comment about deadlocks in ForceError() method. We expect to be
902 // called form a callback thread. However we take care to only potentially
903 // disconnect the socket for our own subStream. We require no ongoing
904 // disconnect of all substreams and ensure that remains true throughout
905 // our execution by releasing discLock only after acquiring pMutex.
906 //--------------------------------------------------------------------------
907
908 XrdSysCondVarHelper discLock( pDiscCV );
909 if( pDiscAllCnt ) return;
910
911 XrdSysMutexHelper scopedLock( pMutex );
912 discLock.UnLock();
913 Log *log = DefaultEnv::GetLog();
914 const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
915 pSubStreams[subStream]->socket->Close();
916 pSubStreams[subStream]->status = Socket::Disconnected;
917
918 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
919 pStreamName.c_str(), subStream, status.ToString().c_str() );
920
921 //--------------------------------------------------------------------------
922 // Reinsert the stuff that we have failed to sent
923 //--------------------------------------------------------------------------
924 if( pSubStreams[subStream]->outMsgHelper.msg )
925 {
926 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
927 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
928 h.stateful );
929 pIncomingQueue->RemoveMessageHandler(h.handler);
930 pSubStreams[subStream]->outMsgHelper.Reset();
931 }
932
933 //--------------------------------------------------------------------------
934 // Reinsert the receiving handler and reset any partially read partial
935 //--------------------------------------------------------------------------
936 if( pSubStreams[subStream]->inMsgHelper.handler )
937 {
938 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
939 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
940 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
941 if( xrdHandler ) xrdHandler->PartialReceived();
942 h.Reset();
943 }
944
945 //--------------------------------------------------------------------------
946 // We are dealing with an error of a peripheral stream. If we don't have
947 // anything to send don't bother recovering. Otherwise move the requests
948 // to stream 0 if possible.
949 //--------------------------------------------------------------------------
950 if( subStream > 0 )
951 {
952 if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
953 {
954 if( --pSubsWaitingClose == 0 )
955 {
956 scopedLock.UnLock();
957 OnConnect( 0 );
958 }
959 return;
960 }
961
962 if( pSubStreams[0]->status != Socket::Disconnected )
963 {
964 pSubStreams[subStream]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
965 XRootDStatus st = pSubStreams[subStream]->socket->Connect( pConnectionWindow );
966 if( !st.IsOK() )
967 {
968 pSubStreams[subStream]->socket->Close();
969 if( pSubStreams[subStream]->outQueue->IsEmpty() )
970 return;
971 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
972 if( pSubStreams[0]->status == Socket::Connected )
973 {
974 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
975 if( !st.IsOK() )
976 OnFatalError( 0, st, scopedLock );
977 return;
978 }
979 if( pSubStreams[0]->status == Socket::Connecting )
980 return;
981 }
982 else
983 {
984 pSubStreams[subStream]->status = Socket::Connecting;
985 return;
986 }
987 OnFatalError( subStream, status, scopedLock );
988 return;
989 }
990 if( pSubStreams[subStream]->outQueue->IsEmpty() )
991 return;
992 OnFatalError( subStream, status, scopedLock );
993 return;
994 }
995
996 //--------------------------------------------------------------------------
997 // If we lost the stream 0 we have lost the session, we re-enable the
998 // stream if we still have things in one of the outgoing queues, otherwise
999 // there is not point to recover at this point.
1000 //--------------------------------------------------------------------------
1001 if( subStream == 0 )
1002 {
1003 MonitorDisconnection( status );
1004
1005 SubStreamList::iterator it;
1006 size_t outstanding = 0;
1007 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1008 outstanding += (*it)->outQueue->GetSizeStateless();
1009
1010 if( outstanding )
1011 {
1012 PathID path( 0, 0 );
1013 XRootDStatus st = EnableLink( path );
1014 if( !st.IsOK() )
1015 {
1016 OnFatalError( 0, st, scopedLock );
1017 return;
1018 }
1019 }
1020
1021 //------------------------------------------------------------------------
1022 // We're done here, unlock the stream mutex to avoid deadlocks and
1023 // report the disconnection event to the handlers
1024 //------------------------------------------------------------------------
1025 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1026 "message handlers.", pStreamName.c_str() );
1027 OutQueue q;
1028 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1029 q.GrabStateful( *(*it)->outQueue );
1030 scopedLock.UnLock();
1031
1032 q.Report( status );
1033 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1034 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1035 return;
1036 }
1037 }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::InMessageHelper::expires, XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, OnConnect(), XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::OutQueue::Report(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), XrdSysCondVarHelper::UnLock(), and XrdSysMutexHelper::UnLock().

Referenced by OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t subStream,
std::shared_ptr< Message > msg,
uint32_t bytesReceived )

Call back when a message has been reconstructed.

Definition at line 493 of file XrdClStream.cc.

496 {
497 msg->SetSessionId( pSessionId );
498 pBytesReceived += bytesReceived;
499
500 MsgHandler *handler = nullptr;
501 uint16_t action = 0;
502 {
503 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
504 handler = mh.handler;
505 action = mh.action;
506 mh.Reset();
507 }
508
509 if( !IsPartial( *msg ) )
510 {
511 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
512 *pChannelData );
513 if( streamAction & TransportHandler::DigestMsg )
514 return;
515
516 if( streamAction & TransportHandler::RequestClose )
517 {
518 RequestClose( *msg );
519 return;
520 }
521 }
522
523 Log *log = DefaultEnv::GetLog();
524
525 //--------------------------------------------------------------------------
526 // No handler, we discard the message ...
527 //--------------------------------------------------------------------------
528 if( !handler )
529 {
530 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
531 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
532 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
533 pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
534 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
535 return;
536 }
537
538 //--------------------------------------------------------------------------
539 // We have a handler, so we call the callback
540 //--------------------------------------------------------------------------
541 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
542 pStreamName.c_str(), (void*)msg.get() );
543
545 {
546 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
547 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
548
549 // if we are handling partial response we have to take down the timeout fence
550 if( IsPartial( *msg ) )
551 {
552 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
553 if( xrdHandler ) xrdHandler->PartialReceived();
554 }
555
556 return;
557 }
558
559 Job *job = new HandleIncMsgJob( handler );
560 pJobManager->QueueJob( job );
561 }
kXR_char streamid[2]
Definition XProtocol.hh:914
ServerResponseHeader hdr
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t subStream,
Message * msg,
uint32_t bytesSent )

Definition at line 623 of file XrdClStream.cc.

626 {
627 pTransport->MessageSent( msg, subStream, bytesSent,
628 *pChannelData );
629 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
630 pBytesSent += bytesSent;
631 if( h.handler )
632 {
633 // ensure expiration time is assigned if still in queue
634 pIncomingQueue->AssignTimeout( h.handler );
635 // OnStatusReady may cause the handler to delete itself, in
636 // which case the handler or the user callback may also delete msg
637 h.handler->OnStatusReady( msg, XRootDStatus() );
638 }
639 pSubStreams[subStream]->outMsgHelper.Reset();
640 }

References XrdCl::OutQueue::MsgHelper::handler, XrdCl::MsgHandler::OnStatusReady(), and XrdCl::OutQueue::MsgHelper::Reset().

+ Here is the call graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t subStream)

On read timeout.

Definition at line 1188 of file XrdClStream.cc.

1189 {
1190 //--------------------------------------------------------------------------
1191 // We only take the main stream into account
1192 //--------------------------------------------------------------------------
1193 if( substream != 0 )
1194 {
1195 if( pSubsWaitingClose )
1196 {
1197 XrdSysMutexHelper scopedLock( pMutex );
1198 if( !pSubsWaitingClose ) return true;
1199 if( pSubStreams[substream]->status == Socket::Disconnected ) return true;
1200 pSubStreams[substream]->socket->Close();
1201 pSubStreams[substream]->status = Socket::Disconnected;
1202 if( --pSubsWaitingClose == 0 )
1203 {
1204 scopedLock.UnLock();
1205 OnConnect( 0 );
1206 }
1207 }
1208 return true;
1209 }
1210
1211 //--------------------------------------------------------------------------
1212 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1213 // It is assumed that the underlying transport makes sure that there is no
1214 // pending requests that are not answered, ie. all possible virtual streams
1215 // are de-allocated
1216 //--------------------------------------------------------------------------
1217 Log *log = DefaultEnv::GetLog();
1218 SubStreamList::iterator it;
1219 time_t now = time(0);
1220
1221 XrdSysMutexHelper scopedLock( pMutex );
1222 uint32_t outgoingMessages = 0;
1223 time_t lastActivity = 0;
1224 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1225 {
1226 outgoingMessages += (*it)->outQueue->GetSize();
1227 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1228 if( lastActivity < sockLastActivity )
1229 lastActivity = sockLastActivity;
1230 }
1231
1232 if( !outgoingMessages )
1233 {
1234 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1235 *pChannelData );
1236 if( disconnect )
1237 {
1238 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1239 pStreamName.c_str() );
1240 //----------------------------------------------------------------------
1241 // Important note!
1242 //
1243 // This job destroys the Stream object itself, the underlying
1244 // AsyncSocketHandler object (that called this method) and the Channel
1245 // object that aggregates this Stream.
1246 //
1247 // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1248 // in a Channel that was previously collapsed in a redirect.
1249 //----------------------------------------------------------------------
1250 if( !pTTLDiscJob )
1251 {
1252 pTTLDiscJob = new ForceDisconnectJob( pUrl );
1253 pJobManager->QueueJob( pTTLDiscJob );
1254 }
1255 return false;
1256 }
1257 }
1258
1259 //--------------------------------------------------------------------------
1260 // Check if the stream is broken
1261 //--------------------------------------------------------------------------
1262 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1263 *pChannelData );
1264 if( !st.IsOK() )
1265 {
1266 scopedLock.UnLock();
1267 OnError( substream, st );
1268 return false;
1269 }
1270 return true;
1271 }
void OnError(uint16_t subStream, XRootDStatus status)
On error.

References XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), OnConnect(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

+ Here is the call graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t subStream)

Definition at line 567 of file XrdClStream.cc.

568 {
569 XrdSysMutexHelper scopedLock( pMutex );
570 Log *log = DefaultEnv::GetLog();
571 if( pSubStreams[subStream]->outQueue->IsEmpty() )
572 {
573 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
574 pSubStreams[subStream]->socket->GetStreamName().c_str() );
575
576 pSubStreams[subStream]->socket->DisableUplink();
577 return std::make_pair( (Message *)0, (MsgHandler *)0 );
578 }
579
580 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
581 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
582 h.expires,
583 h.stateful );
584
585 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
586 "from out-queue to in-queue, starting to send outgoing.",
587 pUrl->GetHostId().c_str(), (void*)h.handler,
588 h.msg->GetObfuscatedDescription().c_str() );
589
590 scopedLock.UnLock();
591
592 if( h.handler )
593 {
594 bool rmMsg = false;
595 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
596 if( rmMsg )
597 {
598 Log *log = DefaultEnv::GetLog();
599 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
600 pStreamName.c_str() );
601 }
602 h.handler->OnReadyToSend( h.msg );
603 }
604 return std::make_pair( h.msg, h.handler );
605 }

References XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t subStream)

On write timeout.

Definition at line 1276 of file XrdClStream.cc.

1277 {
1278 return true;
1279 }

◆ Query()

Status XrdCl::Stream::Query ( uint16_t query,
AnyObject & result )

Query the stream.

Definition at line 1398 of file XrdClStream.cc.

1399 {
1400 switch( query )
1401 {
1403 {
1404 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1405 return Status();
1406 }
1407
1409 {
1410 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1411 return Status();
1412 }
1413
1415 {
1416 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1417 return Status();
1418 }
1419
1420 default:
1421 return Status( stError, errQueryNotSupported );
1422 }
1423 }
const uint16_t errQueryNotSupported
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

+ Here is the call graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler * handler)

Register channel event handler.

Definition at line 1284 of file XrdClStream.cc.

1285 {
1286 pChannelEvHandlers.AddHandler( handler );
1287 }

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler * handler)

Remove a channel event handler.

Definition at line 1292 of file XrdClStream.cc.

1293 {
1294 pChannelEvHandlers.RemoveHandler( handler );
1295 }

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message * msg,
MsgHandler * handler,
bool stateful,
time_t expires )

Queue the message for sending.

Definition at line 301 of file XrdClStream.cc.

305 {
306 XrdSysMutexHelper scopedLock( pMutex );
307 Log *log = DefaultEnv::GetLog();
308
309 //--------------------------------------------------------------------------
310 // Check the session ID and bounce if needed
311 //--------------------------------------------------------------------------
312 if( msg->GetSessionId() &&
313 (pSubStreams[0]->status != Socket::Connected ||
314 pSessionId != msg->GetSessionId()) )
315 return XRootDStatus( stError, errInvalidSession );
316
317 //--------------------------------------------------------------------------
318 // Decide on the path to send the message
319 //--------------------------------------------------------------------------
320 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
321 if( pSubStreams.size() <= path.up )
322 {
323 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
324 "substream %d, using 0 instead", pStreamName.c_str(),
325 msg->GetObfuscatedDescription().c_str(), path.up );
326 path.up = 0;
327 }
328
329 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
330 "substream %d expecting answer at %d", pStreamName.c_str(),
331 msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
332
333 //--------------------------------------------------------------------------
334 // Enable *a* path and insert the message to the right queue
335 //--------------------------------------------------------------------------
336 XRootDStatus st = EnableLink( path );
337 if( st.IsOK() )
338 {
339 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
340 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
341 expires, stateful );
342 }
343 else
344 st.status = stFatal;
345 return st;
346 }
const uint16_t errInvalidSession

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

+ Here is the call graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject * channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116 {
117 pChannelData = channelData;
118 }

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue * incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108 {
109 pIncomingQueue = incomingQueue;
110 }

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager * jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132 {
133 pJobManager = jobManager;
134 }

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > & onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264 {
265 XrdSysMutexHelper scopedLock( pMutex );
266 pOnDataConnJob = onConnJob;
267 }

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller * poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100 {
101 pPoller = poller;
102 }

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager * taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124 {
125 pTaskManager = taskManager;
126 }

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler * transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92 {
93 pTransport = transport;
94 }

◆ Tick()

void XrdCl::Stream::Tick ( time_t now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 399 of file XrdClStream.cc.

400 {
401 //--------------------------------------------------------------------------
402 // Check for timed-out requests and incoming handlers
403 //--------------------------------------------------------------------------
404 pMutex.Lock();
405 OutQueue q;
406 SubStreamList::iterator it;
407 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
408 q.GrabExpired( *(*it)->outQueue, now );
409 pMutex.UnLock();
410
411 q.Report( XRootDStatus( stError, errOperationExpired ) );
412 pIncomingQueue->ReportTimeout( now );
413 }
const uint16_t errOperationExpired

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdCl::OutQueue::Report(), and XrdCl::stError.

+ Here is the call graph for this function:

The documentation for this class was generated from the following files: