XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.cc
Go to the documentation of this file.
1
3
4#include "XrdOuc/XrdOucEnv.hh"
9
10#define XRD_TRACE m_trace->
12
13#include <sstream>
14
15const char *
16XrdThrottleManager::TraceID = "ThrottleManager";
17
18const
19int XrdThrottleManager::m_max_users = 1024;
20
21#if defined(__linux__) || defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__))
22clockid_t XrdThrottleTimer::clock_id = CLOCK_MONOTONIC;
23#else
24int XrdThrottleTimer::clock_id = 0;
25#endif
26
28 m_trace(tP),
29 m_log(lP),
30 m_interval_length_seconds(1.0),
31 m_bytes_per_second(-1),
32 m_ops_per_second(-1),
33 m_concurrency_limit(-1),
34 m_last_round_allocation(100*1024),
35 m_io_active(0),
36 m_loadshed_host(""),
37 m_loadshed_port(0),
38 m_loadshed_frequency(0),
39 m_loadshed_limit_hit(0)
40{
41 m_stable_io_wait.tv_sec = 0;
42 m_stable_io_wait.tv_nsec = 0;
43}
44
45void
47{
48 TRACE(DEBUG, "Initializing the throttle manager.");
49 // Initialize all our shares to zero.
50 m_primary_bytes_shares.resize(m_max_users);
51 m_secondary_bytes_shares.resize(m_max_users);
52 m_primary_ops_shares.resize(m_max_users);
53 m_secondary_ops_shares.resize(m_max_users);
54 // Allocate each user 100KB and 10 ops to bootstrap;
55 for (int i=0; i<m_max_users; i++)
56 {
57 m_primary_bytes_shares[i] = m_last_round_allocation;
58 m_secondary_bytes_shares[i] = 0;
59 m_primary_ops_shares[i] = 10;
60 m_secondary_ops_shares[i] = 0;
61 }
62
63 m_io_wait.tv_sec = 0;
64 m_io_wait.tv_nsec = 0;
65
66 int rc;
67 pthread_t tid;
68 if ((rc = XrdSysThread::Run(&tid, XrdThrottleManager::RecomputeBootstrap, static_cast<void *>(this), 0, "Buffer Manager throttle")))
69 m_log->Emsg("ThrottleManager", rc, "create throttle thread");
70
71}
72
73/*
74 * Take as many shares as possible to fulfill the request; update
75 * request with current remaining value, or zero if satisfied.
76 */
77inline void
78XrdThrottleManager::GetShares(int &shares, int &request)
79{
80 int remaining;
81 AtomicFSub(remaining, shares, request);
82 if (remaining > 0)
83 {
84 request -= (remaining < request) ? remaining : request;
85 }
86}
87
88/*
89 * Iterate through all of the secondary shares, attempting
90 * to steal enough to fulfill the request.
91 */
92void
93XrdThrottleManager::StealShares(int uid, int &reqsize, int &reqops)
94{
95 if (!reqsize && !reqops) return;
96 TRACE(BANDWIDTH, "Stealing shares to fill request of " << reqsize << " bytes");
97 TRACE(IOPS, "Stealing shares to fill request of " << reqops << " ops.");
98
99 for (int i=uid+1; i % m_max_users == uid; i++)
100 {
101 if (reqsize) GetShares(m_secondary_bytes_shares[i % m_max_users], reqsize);
102 if (reqops) GetShares(m_secondary_ops_shares[ i % m_max_users], reqops);
103 }
104
105 TRACE(BANDWIDTH, "After stealing shares, " << reqsize << " of request bytes remain.");
106 TRACE(IOPS, "After stealing shares, " << reqops << " of request ops remain.");
107}
108
109/*
110 * Increment the number of files held open by a given entity. Returns false
111 * if the user is at the maximum; in this case, the internal counter is not
112 * incremented.
113 */
114bool
115XrdThrottleManager::OpenFile(const std::string &entity, std::string &error_message)
116{
117 if (m_max_open == 0 && m_max_conns == 0) return true;
118
119 const std::lock_guard<std::mutex> lock(m_file_mutex);
120 auto iter = m_file_counters.find(entity);
121 unsigned long cur_open_files = 0, cur_open_conns;
122 if (m_max_open) {
123 if (iter == m_file_counters.end()) {
124 m_file_counters[entity] = 1;
125 TRACE(FILES, "User " << entity << " has opened their first file");
126 cur_open_files = 1;
127 } else if (iter->second < m_max_open) {
128 iter->second++;
129 cur_open_files = iter->second;
130 } else {
131 std::stringstream ss;
132 ss << "User " << entity << " has hit the limit of " << m_max_open << " open files";
133 TRACE(FILES, ss.str());
134 error_message = ss.str();
135 return false;
136 }
137 }
138
139 if (m_max_conns) {
140 auto pid = XrdSysThread::Num();
141 auto conn_iter = m_active_conns.find(entity);
142 auto conn_count_iter = m_conn_counters.find(entity);
143 if ((conn_count_iter != m_conn_counters.end()) && (conn_count_iter->second == m_max_conns) &&
144 (conn_iter == m_active_conns.end() || ((*(conn_iter->second))[pid] == 0)))
145 {
146 // note: we are rolling back the increment in open files
147 if (m_max_open) iter->second--;
148 std::stringstream ss;
149 ss << "User " << entity << " has hit the limit of " << m_max_conns <<
150 " open connections";
151 TRACE(CONNS, ss.str());
152 error_message = ss.str();
153 return false;
154 }
155 if (conn_iter == m_active_conns.end()) {
156 std::unique_ptr<std::unordered_map<pid_t, unsigned long>> conn_map(
157 new std::unordered_map<pid_t, unsigned long>());
158 (*conn_map)[pid] = 1;
159 m_active_conns[entity] = std::move(conn_map);
160 if (conn_count_iter == m_conn_counters.end()) {
161 m_conn_counters[entity] = 1;
162 cur_open_conns = 1;
163 } else {
164 m_conn_counters[entity] ++;
165 cur_open_conns = m_conn_counters[entity];
166 }
167 } else {
168 auto pid_iter = conn_iter->second->find(pid);
169 if (pid_iter == conn_iter->second->end() || pid_iter->second == 0) {
170 (*(conn_iter->second))[pid] = 1;
171 conn_count_iter->second++;
172 cur_open_conns = conn_count_iter->second;
173 } else {
174 (*(conn_iter->second))[pid] ++;
175 cur_open_conns = conn_count_iter->second;
176 }
177 }
178 TRACE(CONNS, "User " << entity << " has " << cur_open_conns << " open connections");
179 }
180 if (m_max_open) TRACE(FILES, "User " << entity << " has " << cur_open_files << " open files");
181 return true;
182}
183
184
185/*
186 * Decrement the number of files held open by a given entity.
187 *
188 * Returns false if the value would have fallen below zero or
189 * if the entity isn't tracked.
190 */
191bool
192XrdThrottleManager::CloseFile(const std::string &entity)
193{
194 if (m_max_open == 0 && m_max_conns == 0) return true;
195
196 bool result = true;
197 const std::lock_guard<std::mutex> lock(m_file_mutex);
198 if (m_max_open) {
199 auto iter = m_file_counters.find(entity);
200 if (iter == m_file_counters.end()) {
201 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin never saw an open file");
202 result = false;
203 } else if (iter->second == 0) {
204 TRACE(FILES, "WARNING: User " << entity << " closed a file but throttle plugin thinks all files were already closed");
205 result = false;
206 } else {
207 iter->second--;
208 }
209 if (result) TRACE(FILES, "User " << entity << " closed a file; " << iter->second <<
210 " remain open");
211 }
212
213 if (m_max_conns) {
214 auto pid = XrdSysThread::Num();
215 auto conn_iter = m_active_conns.find(entity);
216 auto conn_count_iter = m_conn_counters.find(entity);
217 if (conn_iter == m_active_conns.end() || !(conn_iter->second)) {
218 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
219 " tracking");
220 return false;
221 }
222 auto pid_iter = conn_iter->second->find(pid);
223 if (pid_iter == conn_iter->second->end()) {
224 TRACE(CONNS, "WARNING: User " << entity << " closed a file on a connection we are not"
225 " tracking");
226 return false;
227 }
228 if (pid_iter->second == 0) {
229 TRACE(CONNS, "WARNING: User " << entity << " closed a file on connection the throttle"
230 " plugin thinks was idle");
231 } else {
232 pid_iter->second--;
233 }
234 if (conn_count_iter == m_conn_counters.end()) {
235 TRACE(CONNS, "WARNING: User " << entity << " closed a file but the throttle plugin never"
236 " observed an open file");
237 } else if (pid_iter->second == 0) {
238 if (conn_count_iter->second == 0) {
239 TRACE(CONNS, "WARNING: User " << entity << " had a connection go idle but the "
240 " throttle plugin already thought all connections were idle");
241 } else {
242 conn_count_iter->second--;
243 TRACE(CONNS, "User " << entity << " had connection on thread " << pid << " go idle; "
244 << conn_count_iter->second << " active connections remain");
245 }
246 }
247 }
248
249 return result;
250}
251
252
253/*
254 * Apply the throttle. If there are no limits set, returns immediately. Otherwise,
255 * this applies the limits as best possible, stalling the thread if necessary.
256 */
257void
258XrdThrottleManager::Apply(int reqsize, int reqops, int uid)
259{
260 if (m_bytes_per_second < 0)
261 reqsize = 0;
262 if (m_ops_per_second < 0)
263 reqops = 0;
264 while (reqsize || reqops)
265 {
266 // Subtract the requested out of the shares
267 AtomicBeg(m_compute_var);
268 GetShares(m_primary_bytes_shares[uid], reqsize);
269 if (reqsize)
270 {
271 TRACE(BANDWIDTH, "Using secondary shares; request has " << reqsize << " bytes left.");
272 GetShares(m_secondary_bytes_shares[uid], reqsize);
273 TRACE(BANDWIDTH, "Finished with secondary shares; request has " << reqsize << " bytes left.");
274 }
275 else
276 {
277 TRACE(BANDWIDTH, "Filled byte shares out of primary; " << m_primary_bytes_shares[uid] << " left.");
278 }
279 GetShares(m_primary_ops_shares[uid], reqops);
280 if (reqops)
281 {
282 GetShares(m_secondary_ops_shares[uid], reqops);
283 }
284 StealShares(uid, reqsize, reqops);
285 AtomicEnd(m_compute_var);
286
287 if (reqsize || reqops)
288 {
289 if (reqsize) TRACE(BANDWIDTH, "Sleeping to wait for throttle fairshare.");
290 if (reqops) TRACE(IOPS, "Sleeping to wait for throttle fairshare.");
291 m_compute_var.Wait();
292 AtomicBeg(m_compute_var);
293 AtomicInc(m_loadshed_limit_hit);
294 AtomicEnd(m_compute_var);
295 }
296 }
297
298}
299
300void *
301XrdThrottleManager::RecomputeBootstrap(void *instance)
302{
303 XrdThrottleManager * manager = static_cast<XrdThrottleManager*>(instance);
304 manager->Recompute();
305 return NULL;
306}
307
308void
309XrdThrottleManager::Recompute()
310{
311 while (1)
312 {
313 // The connection counter can accumulate a number of known-idle connections.
314 // We only need to keep long-term memory of idle ones. Take this chance to garbage
315 // collect old connection counters.
316 if (m_max_open || m_max_conns) {
317 const std::lock_guard<std::mutex> lock(m_file_mutex);
318 for (auto iter = m_active_conns.begin(); iter != m_active_conns.end();)
319 {
320 auto & conn_count = *iter;
321 if (!conn_count.second) {
322 iter = m_active_conns.erase(iter);
323 continue;
324 }
325 for (auto iter2 = conn_count.second->begin(); iter2 != conn_count.second->end();) {
326 if (iter2->second == 0) {
327 iter2 = conn_count.second->erase(iter2);
328 } else {
329 iter2++;
330 }
331 }
332 if (!conn_count.second->size()) {
333 iter = m_active_conns.erase(iter);
334 } else {
335 iter++;
336 }
337 }
338 for (auto iter = m_conn_counters.begin(); iter != m_conn_counters.end();) {
339 if (!iter->second) {
340 iter = m_conn_counters.erase(iter);
341 } else {
342 iter++;
343 }
344 }
345 for (auto iter = m_file_counters.begin(); iter != m_file_counters.end();) {
346 if (!iter->second) {
347 iter = m_file_counters.erase(iter);
348 } else {
349 iter++;
350 }
351 }
352 }
353
354 TRACE(DEBUG, "Recomputing fairshares for throttle.");
355 RecomputeInternal();
356 TRACE(DEBUG, "Finished recomputing fairshares for throttle; sleeping for " << m_interval_length_seconds << " seconds.");
357 XrdSysTimer::Wait(static_cast<int>(1000*m_interval_length_seconds));
358 }
359}
360
361/*
362 * The heart of the manager approach.
363 *
364 * This routine periodically recomputes the shares of each current user.
365 * Each user has a "primary" and a "secondary" share. At the end of the
366 * each time interval, the remaining primary share is moved to secondary.
367 * A user can utilize both shares; if both are gone, they must block until
368 * the next recompute interval.
369 *
370 * The secondary share can be "stolen" by any other user; so, if a user
371 * is idle or under-utilizing, their share can be used by someone else.
372 * However, they can never be completely starved, as no one can steal
373 * primary share.
374 *
375 * In this way, we violate the throttle for an interval, but never starve.
376 *
377 */
378void
379XrdThrottleManager::RecomputeInternal()
380{
381 // Compute total shares for this interval;
382 float intervals_per_second = 1.0/m_interval_length_seconds;
383 float total_bytes_shares = m_bytes_per_second / intervals_per_second;
384 float total_ops_shares = m_ops_per_second / intervals_per_second;
385
386 // Compute the number of active users; a user is active if they used
387 // any primary share during the last interval;
388 AtomicBeg(m_compute_var);
389 float active_users = 0;
390 long bytes_used = 0;
391 for (int i=0; i<m_max_users; i++)
392 {
393 int primary = AtomicFAZ(m_primary_bytes_shares[i]);
394 if (primary != m_last_round_allocation)
395 {
396 active_users++;
397 if (primary >= 0)
398 m_secondary_bytes_shares[i] = primary;
399 primary = AtomicFAZ(m_primary_ops_shares[i]);
400 if (primary >= 0)
401 m_secondary_ops_shares[i] = primary;
402 bytes_used += (primary < 0) ? m_last_round_allocation : (m_last_round_allocation-primary);
403 }
404 }
405
406 if (active_users == 0)
407 {
408 active_users++;
409 }
410
411 // Note we allocate the same number of shares to *all* users, not
412 // just the active ones. If a new user becomes active in the next
413 // interval, we'll go over our bandwidth budget just a bit.
414 m_last_round_allocation = static_cast<int>(total_bytes_shares / active_users);
415 int ops_shares = static_cast<int>(total_ops_shares / active_users);
416 TRACE(BANDWIDTH, "Round byte allocation " << m_last_round_allocation << " ; last round used " << bytes_used << ".");
417 TRACE(IOPS, "Round ops allocation " << ops_shares);
418 for (int i=0; i<m_max_users; i++)
419 {
420 m_primary_bytes_shares[i] = m_last_round_allocation;
421 m_primary_ops_shares[i] = ops_shares;
422 }
423
424 // Reset the loadshed limit counter.
425 int limit_hit = AtomicFAZ(m_loadshed_limit_hit);
426 TRACE(DEBUG, "Throttle limit hit " << limit_hit << " times during last interval.");
427
428 AtomicEnd(m_compute_var);
429
430 // Update the IO counters
431 m_compute_var.Lock();
432 m_stable_io_active = AtomicGet(m_io_active);
433 auto io_active = m_stable_io_active;
434 m_stable_io_total = static_cast<unsigned>(AtomicGet(m_io_total));
435 auto io_total = m_stable_io_total;
436 time_t secs; AtomicFZAP(secs, m_io_wait.tv_sec);
437 long nsecs; AtomicFZAP(nsecs, m_io_wait.tv_nsec);
438 m_stable_io_wait.tv_sec += static_cast<long>(secs * intervals_per_second);
439 m_stable_io_wait.tv_nsec += static_cast<long>(nsecs * intervals_per_second);
440 while (m_stable_io_wait.tv_nsec > 1000000000)
441 {
442 m_stable_io_wait.tv_nsec -= 1000000000;
443 m_stable_io_wait.tv_sec ++;
444 }
445 struct timespec io_wait_ts;
446 io_wait_ts.tv_sec = m_stable_io_wait.tv_sec;
447 io_wait_ts.tv_nsec = m_stable_io_wait.tv_nsec;
448
449 m_compute_var.UnLock();
450 uint64_t io_wait_ms = io_wait_ts.tv_sec*1000+io_wait_ts.tv_nsec/1000000;
451 TRACE(IOLOAD, "Current IO counter is " << io_active << "; total IO wait time is " << io_wait_ms << "ms.");
452 if (m_gstream)
453 {
454 char buf[128];
455 auto len = snprintf(buf, 128,
456 R"({"event":"throttle_update","io_wait":%.4f,"io_active":%d,"io_total":%d})",
457 static_cast<double>(io_wait_ms) / 1000.0, io_active, io_total);
458 auto suc = (len < 128) ? m_gstream->Insert(buf, len + 1) : false;
459 if (!suc)
460 {
461 TRACE(IOLOAD, "Failed g-stream insertion of throttle_update record (len=" << len << "): " << buf);
462 }
463 }
464 m_compute_var.Broadcast();
465}
466
467/*
468 * Do a simple hash across the username.
469 */
470int
471XrdThrottleManager::GetUid(const char *username)
472{
473 const char *cur = username;
474 int hval = 0;
475 while (cur && *cur && *cur != '@' && *cur != '.')
476 {
477 hval += *cur;
478 hval %= m_max_users;
479 cur++;
480 }
481 //std::cerr << "Calculated UID " << hval << " for " << username << std::endl;
482 return hval;
483}
484
485/*
486 * Create an IO timer object; increment the number of outstanding IOs.
487 */
490{
491 AtomicBeg(m_compute_var);
492 int cur_counter = AtomicInc(m_io_active);
493 AtomicInc(m_io_total);
494 AtomicEnd(m_compute_var);
495 while (m_concurrency_limit >= 0 && cur_counter > m_concurrency_limit)
496 {
497 AtomicBeg(m_compute_var);
498 AtomicInc(m_loadshed_limit_hit);
499 AtomicDec(m_io_active);
500 AtomicEnd(m_compute_var);
501 m_compute_var.Wait();
502 AtomicBeg(m_compute_var);
503 cur_counter = AtomicInc(m_io_active);
504 AtomicEnd(m_compute_var);
505 }
506 return XrdThrottleTimer(*this);
507}
508
509/*
510 * Finish recording an IO timer.
511 */
512void
514{
515 AtomicBeg(m_compute_var);
516 AtomicDec(m_io_active);
517 AtomicAdd(m_io_wait.tv_sec, timer.tv_sec);
518 // Note this may result in tv_nsec > 1e9
519 AtomicAdd(m_io_wait.tv_nsec, timer.tv_nsec);
520 AtomicEnd(m_compute_var);
521}
522
523/*
524 * Check the counters to see if we have hit any throttle limits in the
525 * current time period. If so, shed the client randomly.
526 *
527 * If the client has already been load-shedded once and reconnected to this
528 * server, then do not load-shed it again.
529 */
530bool
531XrdThrottleManager::CheckLoadShed(const std::string &opaque)
532{
533 if (m_loadshed_port == 0)
534 {
535 return false;
536 }
537 if (AtomicGet(m_loadshed_limit_hit) == 0)
538 {
539 return false;
540 }
541 if (static_cast<unsigned>(rand()) % 100 > m_loadshed_frequency)
542 {
543 return false;
544 }
545 if (opaque.empty())
546 {
547 return false;
548 }
549 return true;
550}
551
552void
553XrdThrottleManager::PrepLoadShed(const char * opaque, std::string &lsOpaque)
554{
555 if (m_loadshed_port == 0)
556 {
557 return;
558 }
559 if (opaque && opaque[0])
560 {
561 XrdOucEnv env(opaque);
562 // Do not load shed client if it has already been done once.
563 if (env.Get("throttle.shed") != 0)
564 {
565 return;
566 }
567 lsOpaque = opaque;
568 lsOpaque += "&throttle.shed=1";
569 }
570 else
571 {
572 lsOpaque = "throttle.shed=1";
573 }
574}
575
576void
577XrdThrottleManager::PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
578{
579 host = m_loadshed_host;
580 host += "?";
581 host += opaque;
582 port = m_loadshed_port;
583}
#define DEBUG(x)
#define AtomicFSub(w, x, y)
#define AtomicInc(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicFZAP(w, x)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
#define TRACE(act, x)
Definition XrdTrace.hh:63
char * Get(const char *varname)
Definition XrdOucEnv.hh:69
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static unsigned long Num(void)
static void Wait(int milliseconds)
void Apply(int reqsize, int reqops, int uid)
void StopIOTimer(struct timespec)
friend class XrdThrottleTimer
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
XrdThrottleTimer StartIOTimer()
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
static int GetUid(const char *username)