XRootD
Loading...
Searching...
No Matches
XrdThrottleManager Class Reference

#include <XrdThrottleManager.hh>

+ Collaboration diagram for XrdThrottleManager:

Public Member Functions

 XrdThrottleManager (XrdSysError *lP, XrdOucTrace *tP)
 
 ~XrdThrottleManager ()
 
void Apply (int reqsize, int reqops, int uid)
 
bool CheckLoadShed (const std::string &opaque)
 
bool CloseFile (const std::string &entity)
 
void FromConfig (XrdThrottle::Configuration &config)
 
std::tuple< std::string, uint16_t > GetUserInfo (const XrdSecEntity *client)
 
void Init ()
 
bool IsThrottling ()
 
bool OpenFile (const std::string &entity, std::string &open_error_message)
 
void PerformLoadShed (const std::string &opaque, std::string &host, unsigned &port)
 
void PrepLoadShed (const char *opaque, std::string &lsOpaque)
 
void SetLoadShed (std::string &hostname, unsigned port, unsigned frequency)
 
void SetMaxConns (unsigned long max_conns)
 
void SetMaxOpen (unsigned long max_open)
 
void SetMaxWait (unsigned long max_wait)
 
void SetMonitor (XrdXrootdGStream *gstream)
 
void SetThrottles (float reqbyterate, float reqoprate, int concurrency, float interval_length)
 
XrdThrottleTimer StartIOTimer (uint16_t uid, bool &ok)
 

Protected Member Functions

void StopIOTimer (std::chrono::steady_clock::duration &event_duration, uint16_t uid)
 

Friends

class XrdThrottleTimer
 

Detailed Description

Definition at line 53 of file XrdThrottleManager.hh.

Constructor & Destructor Documentation

◆ XrdThrottleManager()

XrdThrottleManager::XrdThrottleManager ( XrdSysError * lP,
XrdOucTrace * tP )

Definition at line 44 of file XrdThrottleManager.cc.

44 :
45 m_trace(tP),
46 m_log(lP),
47 m_interval_length_seconds(1.0),
48 m_bytes_per_second(-1),
49 m_ops_per_second(-1),
50 m_concurrency_limit(-1),
51 m_last_round_allocation(100*1024),
52 m_loadshed_host(""),
53 m_loadshed_port(0),
54 m_loadshed_frequency(0)
55{
56}

◆ ~XrdThrottleManager()

XrdThrottleManager::~XrdThrottleManager ( )
inline

Definition at line 107 of file XrdThrottleManager.hh.

107{} // The buffmanager is never deleted

Member Function Documentation

◆ Apply()

void XrdThrottleManager::Apply ( int reqsize,
int reqops,
int uid )

Definition at line 317 of file XrdThrottleManager.cc.

318{
319 if (m_bytes_per_second < 0)
320 reqsize = 0;
321 if (m_ops_per_second < 0)
322 reqops = 0;
323 while (reqsize || reqops)
324 {
325 // Subtract the requested out of the shares
326 AtomicBeg(m_compute_var);
327 GetShares(m_primary_bytes_shares[uid], reqsize);
328 if (reqsize)
329 {
330 TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
331 GetShares(m_secondary_bytes_shares[uid], reqsize);
332 TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
333 }
334 else
335 {
336 TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
337 }
338 GetShares(m_primary_ops_shares[uid], reqops);
339 if (reqops)
340 {
341 GetShares(m_secondary_ops_shares[uid], reqops);
342 }
343 StealShares(uid, reqsize, reqops);
344 AtomicEnd(m_compute_var);
345
346 if (reqsize || reqops)
347 {
348 if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
349 if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
350 m_compute_var.Wait();
351 m_loadshed_limit_hit++;
352 }
353 }
354
355}
#define AtomicBeg(Mtx)
#define AtomicEnd(Mtx)
#define TRACE(act, x)
Definition XrdTrace.hh:63

References AtomicBeg, AtomicEnd, and TRACE.

◆ CheckLoadShed()

bool XrdThrottleManager::CheckLoadShed ( const std::string & opaque)

Definition at line 769 of file XrdThrottleManager.cc.

770{
771 if (m_loadshed_port == 0)
772 {
773 return false;
774 }
775 if (m_loadshed_limit_hit == 0)
776 {
777 return false;
778 }
779 if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
780 {
781 return false;
782 }
783 if (opaque.empty())
784 {
785 return false;
786 }
787 return true;
788}

◆ CloseFile()

bool XrdThrottleManager::CloseFile ( const std::string & entity)

Definition at line 251 of file XrdThrottleManager.cc.

252{
253 if (m_max_open == 0 && m_max_conns == 0) return true;
254
255 bool result = true;
256 const std::lock_guard<std::mutex> lock(m_file_mutex);
257 if (m_max_open) {
258 auto iter = m_file_counters.find(entity);
259 if (iter == m_file_counters.end()) {
260 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
261 result = false;
262 } else if (iter->second == 0) {
263 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
264 result = false;
265 } else {
266 iter->second--;
267 }
268 if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
269 " remain open");
270 }
271
272 if (m_max_conns) {
273 auto pid = XrdSysThread::Num();
274 auto conn_iter = m_active_conns.find(entity);
275 auto conn_count_iter = m_conn_counters.find(entity);
276 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
277 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
278 " tracking");
279 return false;
280 }
281 auto pid_iter = conn_iter->second->find(pid);
282 if (pid_iter == conn_iter->second->end()) {
283 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
284 " tracking");
285 return false;
286 }
287 if (pid_iter->second == 0) {
288 TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
289 " plugin thinks was idle");
290 } else {
291 pid_iter->second--;
292 }
293 if (conn_count_iter == m_conn_counters.end()) {
294 TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
295 " observed an open file");
296 } else if (pid_iter->second == 0) {
297 if (conn_count_iter->second == 0) {
298 TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
299 " throttle plugin already thought all connections were idle");
300 } else {
301 conn_count_iter->second--;
302 TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
303 << conn_count_iter->second << " active connections remain");
304 }
305 }
306 }
307
308 return result;
309}
static unsigned long Num(void)

References XrdSysThread::Num(), and TRACE.

+ Here is the call graph for this function:

◆ FromConfig()

void XrdThrottleManager::FromConfig ( XrdThrottle::Configuration & config)

Definition at line 59 of file XrdThrottleManager.cc.

60{
61
62 auto max_open = config.GetMaxOpen();
63 if (max_open != -1) SetMaxOpen(max_open);
64 auto max_conn = config.GetMaxConn();
65 if (max_conn != -1) SetMaxConns(max_conn);
66 auto max_wait = config.GetMaxWait();
67 if (max_wait != -1) SetMaxWait(max_wait);
68
70 config.GetThrottleIOPSRate(),
72 static_cast<float>(config.GetThrottleRecomputeIntervalMS())/1000.0);
73
74 m_trace->What = config.GetTraceLevels();
75
76 auto loadshed_host = config.GetLoadshedHost();
77 auto loadshed_port = config.GetLoadshedPort();
78 auto loadshed_freq = config.GetLoadshedFreq();
79 if (!loadshed_host.empty() && loadshed_port > 0 && loadshed_freq > 0)
80 {
81 // Loadshed specified, so set it.
82 SetLoadShed(loadshed_host, loadshed_port, loadshed_freq);
83 }
84}
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
void SetMaxWait(unsigned long max_wait)
void SetMaxConns(unsigned long max_conns)
long long GetLoadshedPort() const
long long GetThrottleDataRate() const
long long GetThrottleConcurrency() const
const std::string & GetLoadshedHost() const
long long GetLoadshedFreq() const
long long GetThrottleIOPSRate() const
long long GetThrottleRecomputeIntervalMS() const

References XrdThrottle::Configuration::GetLoadshedFreq(), XrdThrottle::Configuration::GetLoadshedHost(), XrdThrottle::Configuration::GetLoadshedPort(), XrdThrottle::Configuration::GetMaxConn(), XrdThrottle::Configuration::GetMaxOpen(), XrdThrottle::Configuration::GetMaxWait(), XrdThrottle::Configuration::GetThrottleConcurrency(), XrdThrottle::Configuration::GetThrottleDataRate(), XrdThrottle::Configuration::GetThrottleIOPSRate(), XrdThrottle::Configuration::GetThrottleRecomputeIntervalMS(), XrdThrottle::Configuration::GetTraceLevels(), SetLoadShed(), SetMaxConns(), SetMaxOpen(), SetMaxWait(), and SetThrottles().

+ Here is the call graph for this function:

◆ GetUserInfo()

std::tuple< std::string, uint16_t > XrdThrottleManager::GetUserInfo ( const XrdSecEntity * client)

Definition at line 116 of file XrdThrottleManager.cc.

116 {
117 // Try various potential "names" associated with the request, from the most
118 // specific to most generic.
119 std::string user;
120
121 if (client->eaAPI && client->eaAPI->Get("token.subject", user)) {
122 if (client->vorg) user = std::string(client->vorg) + ":" + user;
123 } else if (client->eaAPI) {
124 std::string request_name;
125 if (client->eaAPI->Get("request.name", request_name) && !request_name.empty()) user = request_name;
126 }
127 if (user.empty()) {user = client->name ? client->name : "nobody";}
128 uint16_t uid = GetUid(user.c_str());
129 return std::make_tuple(user, uid);
130}
XrdSecAttr * Get(const void *sigkey)
char * vorg
Entity's virtual organization(s)
XrdSecEntityAttr * eaAPI
non-const API to attributes
char * name
Entity's name.

References XrdSecEntity::eaAPI, XrdSecEntityAttr::Get(), XrdSecEntity::name, and XrdSecEntity::vorg.

+ Here is the call graph for this function:

◆ Init()

void XrdThrottleManager::Init ( )

Definition at line 87 of file XrdThrottleManager.cc.

88{
89 TRACE(DEBUG, "Initializing the throttle manager.");
90 // Initialize all our shares to zero.
91 m_primary_bytes_shares.resize(m_max_users);
92 m_secondary_bytes_shares.resize(m_max_users);
93 m_primary_ops_shares.resize(m_max_users);
94 m_secondary_ops_shares.resize(m_max_users);
95 for (auto & waiter : m_waiter_info) {
96 waiter.m_manager = this;
97 }
98
99 // Allocate each user 100KB and 10 ops to bootstrap;
100 for (int i=0; i<m_max_users; i++)
101 {
102 m_primary_bytes_shares[i] = m_last_round_allocation;
103 m_secondary_bytes_shares[i] = 0;
104 m_primary_ops_shares[i] = 10;
105 m_secondary_ops_shares[i] = 0;
106 }
107
108 int rc;
109 pthread_t tid;
110 if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
111 m_log->Emsg("ThrottleManager", rc, "create throttle thread");
112
113}
#define DEBUG(x)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)

References DEBUG, XrdSysThread::Run(), and TRACE.

+ Here is the call graph for this function:

◆ IsThrottling()

bool XrdThrottleManager::IsThrottling ( )
inline

Definition at line 69 of file XrdThrottleManager.hh.

69{return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}

◆ OpenFile()

bool XrdThrottleManager::OpenFile ( const std::string & entity,
std::string & open_error_message )

Definition at line 174 of file XrdThrottleManager.cc.

175{
176 if (m_max_open == 0 && m_max_conns == 0) return true;
177
178 const std::lock_guard<std::mutex> lock(m_file_mutex);
179 auto iter = m_file_counters.find(entity);
180 unsigned long cur_open_files = 0, cur_open_conns;
181 if (m_max_open) {
182 if (iter == m_file_counters.end()) {
183 m_file_counters[entity] = 1;
184 TRACE(FILES, "User " << entity << " has opened their first file");
185 cur_open_files = 1;
186 } else if (iter->second < m_max_open) {
187 iter->second++;
188 cur_open_files = iter->second;
189 } else {
190 std::stringstream ss;
191 ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
192 TRACE(FILES, ss.str());
193 error_message = ss.str();
194 return false;
195 }
196 }
197
198 if (m_max_conns) {
199 auto pid = XrdSysThread::Num();
200 auto conn_iter = m_active_conns.find(entity);
201 auto conn_count_iter = m_conn_counters.find(entity);
202 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
203 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
204 {
205 // note: we are rolling back the increment in open files
206 if (m_max_open) iter->second--;
207 std::stringstream ss;
208 ss << "User " << entity << " has hit the limit of " << m_max_conns <<
209 " open connections";
210 TRACE(CONNS, ss.str());
211 error_message = ss.str();
212 return false;
213 }
214 if (conn_iter == m_active_conns.end()) {
215 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
216 new std::unordered_map<pid_t, unsigned long>());
217 (*conn_map)[pid] = 1;
218 m_active_conns[entity] = std::move(conn_map);
219 if (conn_count_iter == m_conn_counters.end()) {
220 m_conn_counters[entity] = 1;
221 cur_open_conns = 1;
222 } else {
223 m_conn_counters[entity] ++;
224 cur_open_conns = m_conn_counters[entity];
225 }
226 } else {
227 auto pid_iter = conn_iter->second->find(pid);
228 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
229 (*(conn_iter->second))[pid] = 1;
230 conn_count_iter->second++;
231 cur_open_conns = conn_count_iter->second;
232 } else {
233 (*(conn_iter->second))[pid] ++;
234 cur_open_conns = conn_count_iter->second;
235 }
236 }
237 TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
238 }
239 if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
240 return true;
241}

References XrdSysThread::Num(), and TRACE.

+ Here is the call graph for this function:

◆ PerformLoadShed()

void XrdThrottleManager::PerformLoadShed ( const std::string & opaque,
std::string & host,
unsigned & port )

Definition at line 815 of file XrdThrottleManager.cc.

816{
817 host = m_loadshed_host;
818 host += "?";
819 host += opaque;
820 port = m_loadshed_port;
821}

◆ PrepLoadShed()

void XrdThrottleManager::PrepLoadShed ( const char * opaque,
std::string & lsOpaque )

Definition at line 791 of file XrdThrottleManager.cc.

792{
793 if (m_loadshed_port == 0)
794 {
795 return;
796 }
797 if (opaque && opaque[0])
798 {
799 XrdOucEnv env(opaque);
800 // Do not load shed client if it has already been done once.
801 if (env.Get("throttle.shed") != 0)
802 {
803 return;
804 }
805 lsOpaque = opaque;
806 lsOpaque += "&throttle.shed=1";
807 }
808 else
809 {
810 lsOpaque = "throttle.shed=1";
811 }
812}

References XrdOucEnv::Get().

+ Here is the call graph for this function:

◆ SetLoadShed()

void XrdThrottleManager::SetLoadShed ( std::string & hostname,
unsigned port,
unsigned frequency )
inline

Definition at line 80 of file XrdThrottleManager.hh.

81 {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxConns()

void XrdThrottleManager::SetMaxConns ( unsigned long max_conns)
inline

Definition at line 85 of file XrdThrottleManager.hh.

85{m_max_conns = max_conns;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxOpen()

void XrdThrottleManager::SetMaxOpen ( unsigned long max_open)
inline

Definition at line 83 of file XrdThrottleManager.hh.

83{m_max_open = max_open;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMaxWait()

void XrdThrottleManager::SetMaxWait ( unsigned long max_wait)
inline

Definition at line 87 of file XrdThrottleManager.hh.

87{m_max_wait_time = std::chrono::seconds(max_wait);}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ SetMonitor()

void XrdThrottleManager::SetMonitor ( XrdXrootdGStream * gstream)
inline

Definition at line 89 of file XrdThrottleManager.hh.

89{m_gstream = gstream;}

◆ SetThrottles()

void XrdThrottleManager::SetThrottles ( float reqbyterate,
float reqoprate,
int concurrency,
float interval_length )
inline

Definition at line 76 of file XrdThrottleManager.hh.

77 {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
78 m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}

Referenced by FromConfig().

+ Here is the caller graph for this function:

◆ StartIOTimer()

XrdThrottleTimer XrdThrottleManager::StartIOTimer ( uint16_t uid,
bool & ok )

Definition at line 703 of file XrdThrottleManager.cc.

704{
705 int cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
706 m_io_total++;
707
708 while (m_concurrency_limit >= 0 && cur_counter >= m_concurrency_limit)
709 {
710 // If the user has essentially no concurrency, then we let them
711 // temporarily exceed the limit. This prevents potential waits for
712 // every single read for an infrequent user.
713 if (m_waiter_info[uid].m_concurrency < 1)
714 {
715 break;
716 }
717 m_loadshed_limit_hit++;
718 m_io_active.fetch_sub(1, std::memory_order_acq_rel);
719 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): IO concurrency limit hit; waiting for other IOs to finish.");
720 ok = m_waiter_info[uid].Wait();
721 if (!ok) {
722 TRACE(DEBUG, "ThrottleManager (user=" << uid << "): timed out waiting for other IOs to finish.");
723 return XrdThrottleTimer();
724 }
725 cur_counter = m_io_active.fetch_add(1, std::memory_order_acq_rel);
726 }
727
728 ok = true;
729 return XrdThrottleTimer(this, uid);
730}
friend class XrdThrottleTimer

References DEBUG, TRACE, and XrdThrottleTimer.

+ Here is the call graph for this function:

◆ StopIOTimer()

void XrdThrottleManager::StopIOTimer ( std::chrono::steady_clock::duration & event_duration,
uint16_t uid )
protected

Definition at line 736 of file XrdThrottleManager.cc.

737{
738 m_io_active_time += event_duration.count();
739 auto old_active = m_io_active.fetch_sub(1, std::memory_order_acq_rel);
740 m_waiter_info[uid].m_io_time += event_duration.count();
741 if (old_active == static_cast<unsigned>(m_concurrency_limit))
742 {
743 // If we are below the concurrency limit threshold and have another waiter
744 // for our user, then execute it immediately. Otherwise, we will give
745 // someone else a chance to run (as we have gotten more than our share recently).
746 unsigned waiting_users = m_waiting_users;
747 if (waiting_users == 0) waiting_users = 1;
748 if (m_waiter_info[uid].m_concurrency < m_concurrency_limit / waiting_users)
749 {
750 std::unique_lock<std::mutex> lock(m_waiter_info[uid].m_mutex);
751 if (m_waiter_info[uid].m_waiting > 0)
752 {
753 m_waiter_info[uid].NotifyOne(std::move(lock));
754 return;
755 }
756 }
757 NotifyOne();
758 }
759}

Friends And Related Symbol Documentation

◆ XrdThrottleTimer

friend class XrdThrottleTimer
friend

Definition at line 56 of file XrdThrottleManager.hh.

References XrdThrottleTimer.

Referenced by StartIOTimer(), and XrdThrottleTimer.


The documentation for this class was generated from the following files: