XRootD
Loading...
Searching...
No Matches
XrdPfcPurge.cc
Go to the documentation of this file.
1#include "XrdPfc.hh"
2#include "XrdPfcTrace.hh"
3
4#include <fcntl.h>
5#include <sys/time.h>
6
7#include "XrdOuc/XrdOucEnv.hh"
8#include "XrdOss/XrdOssAt.hh"
10
11using namespace XrdPfc;
12
13namespace XrdPfc
14{
15
17{
18 // needed for logging macros
20}
21
22// Temporary, extensive purge tracing
23// #define TRACE_PURGE(x) TRACE(Debug, x)
24// #define TRACE_PURGE(x) std::cout << "PURGE " << x << "\n"
25#define TRACE_PURGE(x)
26
27//==============================================================================
28// DirState
29//==============================================================================
30
32{
33 DirState *m_parent;
34
35 Stats m_stats; // access stats from client reads in this directory (and subdirs)
36
37 long long m_usage; // collected / measured during purge traversal
38 long long m_usage_extra; // collected from write events in this directory and subdirs
39 long long m_usage_purged; // amount of data purged from this directory (and subdirectories for leaf nodes)
40
41 // begin purge traversal usage \_ so we can have a good estimate of what came in during the traversal
42 // end purge traversal usage / (should be small, presumably)
43
44 // quota info, enabled?
45
46 int m_depth;
47 int m_max_depth; // XXXX Do we need this? Should it be passed in to find functions?
48 bool m_stat_report; // not used yet - storing of stats requested
49
50 typedef std::map<std::string, DirState> DsMap_t;
51 typedef DsMap_t::iterator DsMap_i;
52
53 DsMap_t m_subdirs;
54
55 void init()
56 {
57 m_usage = 0;
58 m_usage_extra = 0;
59 m_usage_purged = 0;
60 }
61
62 DirState* create_child(const std::string &dir)
63 {
64 std::pair<DsMap_i, bool> ir = m_subdirs.insert(std::make_pair(dir, DirState(this)));
65 return & ir.first->second;
66 }
67
68 DirState* find_path_tok(PathTokenizer &pt, int pos, bool create_subdirs)
69 {
70 if (pos == pt.get_n_dirs()) return this;
71
72 DsMap_i i = m_subdirs.find(pt.m_dirs[pos]);
73
74 DirState *ds = 0;
75
76 if (i != m_subdirs.end())
77 {
78 ds = & i->second;
79 }
80 if (create_subdirs && m_depth < m_max_depth)
81 {
82 ds = create_child(pt.m_dirs[pos]);
83 }
84 if (ds) return ds->find_path_tok(pt, pos + 1, create_subdirs);
85
86 return 0;
87 }
88
89public:
90
91 DirState(int max_depth) : m_parent(0), m_depth(0), m_max_depth(max_depth)
92 {
93 init();
94 }
95
96 DirState(DirState *parent) : m_parent(parent), m_depth(m_parent->m_depth + 1), m_max_depth(m_parent->m_max_depth)
97 {
98 init();
99 }
100
101 DirState* get_parent() { return m_parent; }
102
103 void set_usage(long long u) { m_usage = u; m_usage_extra = 0; }
104 void add_up_stats(const Stats& stats) { m_stats.AddUp(stats); }
105 void add_usage_purged(long long up) { m_usage_purged += up; }
106
107 DirState* find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs)
108 {
109 PathTokenizer pt(path, max_depth, parse_as_lfn);
110
111 return find_path_tok(pt, 0, create_subdirs);
112 }
113
114 DirState* find_dir(const std::string &dir, bool create_subdirs)
115 {
116 DsMap_i i = m_subdirs.find(dir);
117
118 if (i != m_subdirs.end()) return & i->second;
119
120 if (create_subdirs && m_depth < m_max_depth) return create_child(dir);
121
122 return 0;
123 }
124
126 {
127 m_stats.Reset();
128
129 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
130 {
131 i->second.reset_stats();
132 }
133 }
134
136 {
137 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
138 {
139 i->second.upward_propagate_stats();
140
141 m_stats.AddUp(i->second.m_stats);
142 }
143
144 m_usage_extra += m_stats.m_BytesWritten;
145 }
146
148 {
149 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
150 {
151 m_usage_purged += i->second.upward_propagate_usage_purged();
152 }
153 m_usage -= m_usage_purged;
154
155 long long ret = m_usage_purged;
156 m_usage_purged = 0;
157 return ret;
158 }
159
160 void dump_recursively(const char *name)
161 {
162 printf("%*d %s usage=%lld usage_extra=%lld usage_total=%lld num_ios=%d duration=%d b_hit=%lld b_miss=%lld b_byps=%lld b_wrtn=%lld\n",
163 2 + 2*m_depth, m_depth, name, m_usage, m_usage_extra, m_usage + m_usage_extra,
164 m_stats.m_NumIos, m_stats.m_Duration, m_stats.m_BytesHit, m_stats.m_BytesMissed, m_stats.m_BytesBypassed, m_stats.m_BytesWritten);
165
166 for (DsMap_i i = m_subdirs.begin(); i != m_subdirs.end(); ++i)
167 {
168 i->second.dump_recursively(i->first.c_str());
169 }
170 }
171};
172
173
174//==============================================================================
175// DataFsState
176//==============================================================================
177
179{
180 int m_max_depth;
181 DirState m_root;
182 time_t m_prev_time;
183
184public:
186 m_max_depth ( Cache::Conf().m_dirStatsStoreDepth ),
187 m_root ( m_max_depth ),
188 m_prev_time ( time(0) )
189 {}
190
191 int get_max_depth() const { return m_max_depth; }
192
193 DirState* get_root() { return & m_root; }
194
195 DirState* find_dirstate_for_lfn(const std::string& lfn)
196 {
197 return m_root.find_path(lfn, m_max_depth, true, true);
198 }
199
200 void reset_stats() { m_root.reset_stats(); }
201 void upward_propagate_stats() { m_root.upward_propagate_stats(); }
202 void upward_propagate_usage_purged() { m_root.upward_propagate_usage_purged(); }
203
205 {
206 time_t now = time(0);
207
208 printf("DataFsState::dump_recursively epoch = %lld delta_t = %lld max_depth = %d\n",
209 (long long) now, (long long) (now - m_prev_time), m_max_depth);
210
211 m_prev_time = now;
212
213 m_root.dump_recursively("root");
214 }
215};
216
217
218//==============================================================================
219// FPurgeState
220//==============================================================================
221
223{
224public:
225 struct FS
226 {
227 std::string path;
228 long long nBytes;
229 time_t time;
231
232 FS(const std::string &dname, const char *fname, long long n, time_t t, DirState *ds) :
233 path(dname + fname), nBytes(n), time(t), dirState(ds)
234 {}
235 };
236
237 typedef std::multimap<time_t, FS> map_t;
238 typedef map_t::iterator map_i;
239
240 map_t m_fmap; // map of files that are purge candidates
241
242 typedef std::list<FS> list_t;
243 typedef list_t::iterator list_i;
244
245 list_t m_flist; // list of files to be removed unconditionally
246
247 long long nBytesReq;
248 long long nBytesAccum;
249 long long nBytesTotal;
252
253 // XrdOss *m_oss;
255
256 // ------------------------------------
257 // Directory handling & stat collection
258 // ------------------------------------
259
261 std::string m_current_path; // Includes trailing '/'
263 const int m_max_dir_level_for_stat_collection; // until we honor globs from pfc.dirstats
264
265 std::vector<std::string> m_dir_names_stack;
266 std::vector<long long> m_dir_usage_stack;
267
268 const char *m_info_ext;
269 const size_t m_info_ext_len;
271
272 static const char *m_traceID;
273
274
275 void begin_traversal(DirState *root, const char *root_path = "/")
276 {
277 m_dir_state = root;
278 m_dir_level = 0;
279 m_current_path = std::string(root_path);
280 m_dir_usage_stack.push_back(0);
281
282 TRACE_PURGE("FPurgeState::begin_traversal cur_path '" << m_current_path << "', usage=" << m_dir_usage_stack.back() << ", level=" << m_dir_level);
283 }
284
286 {
287 TRACE_PURGE("FPurgeState::end_traversal reporting for '" << m_current_path << "', usage=" << m_dir_usage_stack.back() << ", nBytesTotal=" << nBytesTotal << ", level=" << m_dir_level);
288
289 m_dir_state->set_usage(m_dir_usage_stack.back());
290
291 m_dir_state = 0;
292 }
293
294 void cd_down(const std::string& dir_name)
295 {
296 ++m_dir_level;
297
299 {
300 m_dir_usage_stack.push_back(0);
301 m_dir_state = m_dir_state->find_dir(dir_name, true);
302 }
303
304 m_dir_names_stack.push_back(dir_name);
305 m_current_path.append(dir_name);
306 m_current_path.append("/");
307 }
308
309 void cd_up()
310 {
312 {
313 long long tail = m_dir_usage_stack.back();
314 m_dir_usage_stack.pop_back();
315
316 TRACE_PURGE("FPurgeState::cd_up reporting for '" << m_current_path << "', usage=" << tail << ", level=" << m_dir_level);
317
318 m_dir_state->set_usage(tail);
319 m_dir_state = m_dir_state->get_parent();
320
321 m_dir_usage_stack.back() += tail;
322 }
323
324 // remove trailing / and last dir but keep the new trailing / in place.
325 m_current_path.erase(m_current_path.find_last_of('/', m_current_path.size() - 2) + 1);
326 m_dir_names_stack.pop_back();
327
328 --m_dir_level;
329 }
330
331 // ------------------------------------------------------------------------
332 // ------------------------------------------------------------------------
333
334 FPurgeState(long long iNBytesReq, XrdOss &oss) :
336 // m_oss(oss),
337 m_oss_at(oss),
339 m_max_dir_level_for_stat_collection(Cache::Conf().m_dirStatsStoreDepth),
340 m_info_ext(XrdPfc::Info::s_infoExtension),
341 m_info_ext_len(strlen(XrdPfc::Info::s_infoExtension)),
342 m_trace(Cache::GetInstance().GetTrace())
343 {
344 m_current_path.reserve(256);
345 m_dir_names_stack.reserve(32);
347 }
348
349 // ------------------------------------------------------------------------
350
351 void setMinTime(time_t min_time) { tMinTimeStamp = min_time; }
352 time_t getMinTime() const { return tMinTimeStamp; }
353 void setUVKeepMinTime(time_t min_time) { tMinUVKeepTimeStamp = min_time; }
354 long long getNBytesTotal() const { return nBytesTotal; }
355
357 {
358 for (list_i i = m_flist.begin(); i != m_flist.end(); ++i)
359 {
360 m_fmap.insert(std::make_pair(i->time, *i));
361 }
362 m_flist.clear();
363 }
364
365 /*
366 void UnlinkInfoAndData(const char *fname, long long nbytes, XrdOssDF *iOssDF)
367 {
368 fname[fname_len - m_info_ext_len] = 0;
369 if (nbytes > 0)
370 {
371 if ( ! Cache.GetInstance().IsFileActiveOrPurgeProtected(dataPath))
372 {
373 m_n_purged++;
374 m_bytes_purged += nbytes;
375 } else
376 {
377 m_n_purge_protected++;
378 m_bytes_purge_protected += nbytes;
379 m_dir_state->add_usage_purged(nbytes);
380 // XXXX should also tweak other stuff?
381 fname[fname_len - m_info_ext_len] = '.';
382 return;
383 }
384 }
385 m_oss_at.Unlink(*iOssDF, fname);
386 fname[fname_len - m_info_ext_len] = '.';
387 m_oss_at.Unlink(*iOssDF, fname);
388 }
389 */
390
391 void CheckFile(const char *fname, Info &info, struct stat &fstat /*, XrdOssDF *iOssDF*/)
392 {
393 static const char *trc_pfx = "FPurgeState::CheckFile ";
394
395 long long nbytes = info.GetNDownloadedBytes();
396 time_t atime;
397 if ( ! info.GetLatestDetachTime(atime))
398 {
399 // cinfo file does not contain any known accesses, use fstat.mtime instead.
400 TRACE(Debug, trc_pfx << "could not get access time for " << m_current_path << fname << ", using mtime from stat instead.");
401 atime = fstat.st_mtime;
402 }
403 // TRACE(Dump, trc_pfx << "checking " << fname << " accessTime " << atime);
404
405 nBytesTotal += nbytes;
406
407 m_dir_usage_stack.back() += nbytes;
408
409 // XXXX Should remove aged-out files here ... but I have trouble getting
410 // the DirState and purge report set up consistently.
411 // Need some serious code reorganization here.
412 // Biggest problem is maintaining overall state a traversal state consistently.
413 // Sigh.
414
415 // In first two cases we lie about FS time (set to 0) to get them all removed early.
416 // The age-based purge atime would also be good as there should be nothing
417 // before that time in the map anyway.
418 // But we use 0 as a test in purge loop to make sure we continue even if enough
419 // disk-space has been freed.
420
421 if (tMinTimeStamp > 0 && atime < tMinTimeStamp)
422 {
423 m_flist.push_back(FS(m_current_path, fname, nbytes, 0, m_dir_state));
424 nBytesAccum += nbytes;
425 }
426 else if (tMinUVKeepTimeStamp > 0 &&
427 Cache::Conf().does_cschk_have_missing_bits(info.GetCkSumState()) &&
429 {
430 m_flist.push_back(FS(m_current_path, fname, nbytes, 0, m_dir_state));
431 nBytesAccum += nbytes;
432 }
433 else if (nBytesAccum < nBytesReq || ( ! m_fmap.empty() && atime < m_fmap.rbegin()->first))
434 {
435 m_fmap.insert(std::make_pair(atime, FS(m_current_path, fname, nbytes, atime, m_dir_state)));
436 nBytesAccum += nbytes;
437
438 // remove newest files from map if necessary
439 while ( ! m_fmap.empty() && nBytesAccum - m_fmap.rbegin()->second.nBytes >= nBytesReq)
440 {
441 nBytesAccum -= m_fmap.rbegin()->second.nBytes;
442 m_fmap.erase(--(m_fmap.rbegin().base()));
443 }
444 }
445 }
446
448 {
449 static const char *trc_pfx = "FPurgeState::TraverseNamespace ";
450
451 char fname[256];
452 struct stat fstat;
453 XrdOucEnv env;
454
455 TRACE_PURGE("Starting to read dir [" << m_current_path << "], iOssDF->getFD()=" << iOssDF->getFD() << ".");
456
457 iOssDF->StatRet(&fstat);
458
459 while (true)
460 {
461 int rc = iOssDF->Readdir(fname, 256);
462
463 if (rc == -ENOENT) {
464 TRACE_PURGE(" Skipping ENOENT dir entry [" << fname << "].");
465 continue;
466 }
467 if (rc != XrdOssOK) {
468 TRACE(Error, trc_pfx << "Readdir error at " << m_current_path << ", err " << XrdSysE2T(-rc) << ".");
469 break;
470 }
471
472 TRACE_PURGE(" Readdir [" << fname << "]");
473
474 if (fname[0] == 0) {
475 TRACE_PURGE(" Finished reading dir [" << m_current_path << "]. Break loop.");
476 break;
477 }
478 if (fname[0] == '.' && (fname[1] == 0 || (fname[1] == '.' && fname[2] == 0))) {
479 TRACE_PURGE(" Skipping here or parent dir [" << fname << "]. Continue loop.");
480 continue;
481 }
482
483 size_t fname_len = strlen(fname);
484 XrdOssDF *dfh = 0;
485
486 if (S_ISDIR(fstat.st_mode))
487 {
488 if (m_oss_at.Opendir(*iOssDF, fname, env, dfh) == XrdOssOK)
489 {
490 cd_down(fname); TRACE_PURGE(" cd_down -> [" << m_current_path << "].");
492 cd_up(); TRACE_PURGE(" cd_up -> [" << m_current_path << "].");
493 }
494 else
495 TRACE(Warning, trc_pfx << "could not opendir [" << m_current_path << fname << "], " << XrdSysE2T(errno));
496 }
497 else if (fname_len > m_info_ext_len && strncmp(&fname[fname_len - m_info_ext_len], m_info_ext, m_info_ext_len) == 0)
498 {
499 // Check if the file is currently opened / purge-protected is done before unlinking of the file.
500
501 Info cinfo(m_trace);
502
503 if (m_oss_at.OpenRO(*iOssDF, fname, env, dfh) == XrdOssOK && cinfo.Read(dfh, m_current_path.c_str(), fname))
504 {
505 CheckFile(fname, cinfo, fstat);
506 }
507 else
508 {
509 TRACE(Warning, trc_pfx << "can't open or read " << m_current_path << fname << ", err " << XrdSysE2T(errno) << "; purging.");
510 m_oss_at.Unlink(*iOssDF, fname);
511 fname[fname_len - m_info_ext_len] = 0;
512 m_oss_at.Unlink(*iOssDF, fname);
513 }
514 }
515 else // XXXX devel debug only, to be removed
516 {
517 TRACE_PURGE(" Ignoring [" << fname << "], not a dir or cinfo.");
518 }
519
520 delete dfh;
521 }
522 }
523};
524
525const char *FPurgeState::m_traceID = "Purge";
526
527
528//==============================================================================
529// ResourceMonitor
530//==============================================================================
531
532// Encapsulates local variables used withing the previous mega-function Purge().
533//
534// This will be used within the continuously/periodically ran heart-beat / breath
535// function ... and then parts of it will be passed to invoked FS scan and purge
536// jobs (which will be controlled throught this as well).
537
539{
540
541};
542
543
544//==============================================================================
545//
546//==============================================================================
547
548namespace
549{
550
551class ScanAndPurgeJob : public XrdJob
552{
553public:
554 ScanAndPurgeJob(const char *desc = "") : XrdJob(desc) {}
555
556 void DoIt() {} // { Cache::GetInstance().ScanAndPurge(); }
557};
558
559}
560
561//==============================================================================
562// Cache methods
563//==============================================================================
564
565void Cache::copy_out_active_stats_and_update_data_fs_state()
566{
567 static const char *trc_pfx = "copy_out_active_stats_and_update_data_fs_state() ";
568
569 StatsMMap_t updates;
570 {
571 XrdSysCondVarHelper lock(&m_active_cond);
572
573 // Slurp in stats from files closed since last cycle.
574 updates.swap( m_closed_files_stats );
575
576 for (ActiveMap_i i = m_active.begin(); i != m_active.end(); ++i)
577 {
578 if (i->second != 0)
579 {
580 updates.insert(std::make_pair(i->first, i->second->DeltaStatsFromLastCall()));
581 }
582 }
583 }
584
585 m_fs_state->reset_stats(); // XXXX-CKSUM rethink how to do this if we keep some purge entries for next time
586
587 for (StatsMMap_i i = updates.begin(); i != updates.end(); ++i)
588 {
589 DirState *ds = m_fs_state->find_dirstate_for_lfn(i->first);
590
591 if (ds == 0)
592 {
593 TRACE(Error, trc_pfx << "Failed finding DirState for file '" << i->first << "'.");
594 continue;
595 }
596
597 ds->add_up_stats(i->second);
598 }
599
600 m_fs_state->upward_propagate_stats();
601}
602
603
604//==============================================================================
605
607{
608 // static const char *trc_pfx = "ResourceMonitorHeartBeat() ";
609
610 // Pause before initial run
611 sleep(1);
612
613 // XXXX Setup initial / constant stats (total RAM, total disk, ???)
614
617
618 S.Lock();
619
620 X.DiskSize = m_configuration.m_diskTotalSpace;
621
622 X.MemSize = m_configuration.m_RamAbsAvailable;
623
624 S.UnLock();
625
626 // XXXX Schedule initial disk scan, time it!
627 //
628 // TRACE(Info, trc_pfx << "scheduling intial disk scan.");
629 // schedP->Schedule( new ScanAndPurgeJob("XrdPfc::ScanAndPurge") );
630 //
631 // bool scan_and_purge_running = true;
632
633 // XXXX Could we really hold last-usage for all files in memory?
634
635 // XXXX Think how to handle disk-full, scan/purge not finishing:
636 // - start dropping things out of write queue, but only when RAM gets near full;
637 // - monitoring this then becomes a high-priority job, inner loop with sleep of,
638 // say, 5 or 10 seconds.
639
640 while (true)
641 {
642 time_t heartbeat_start = time(0);
643
644 // TRACE(Info, trc_pfx << "HeartBeat starting ...");
645
646 // if sumary monitoring configured, pupulate OucCacheStats:
647 S.Lock();
648
649 // - available / used disk space (files usage calculated elsewhere (maybe))
650
651 // - RAM usage
652 { XrdSysMutexHelper lck(&m_RAM_mutex);
653 X.MemUsed = m_RAM_used;
654 X.MemWriteQ = m_RAM_write_queue;
655 }
656 // - files opened / closed etc
657
658 // do estimate of available space
659 S.UnLock();
660
661 // if needed, schedule purge in a different thread.
662 // purge is:
663 // - deep scan + gather FSPurgeState
664 // - actual purge
665 //
666 // this thread can continue running and, if needed, stop writing to disk
667 // if purge is taking too long.
668
669 // think how data is passed / synchronized between this and purge thread
670
671 // !!!! think how stat collection is done and propgated upwards;
672 // until now it was done once per purge-interval.
673 // now stats will be added up more often, but purge will be done
674 // only occasionally.
675 // also, do we report cumulative values or deltas? cumulative should
676 // be easier and consistent with summary data.
677 // still, some are state - like disk usage, num of files.
678
679 // Do we take care of directories that need to be newly added into DirState hierarchy?
680 // I.e., when user creates new directories and these are covered by either full
681 // spec or by root + depth declaration.
682
683 int heartbeat_duration = time(0) - heartbeat_start;
684
685 // TRACE(Info, trc_pfx << "HeartBeat finished, heartbeat_duration " << heartbeat_duration);
686
687 // int sleep_time = m_configuration.m_purgeInterval - heartbeat_duration;
688 int sleep_time = 60 - heartbeat_duration;
689 if (sleep_time > 0)
690 {
691 sleep(sleep_time);
692 }
693 }
694}
695
696//==============================================================================
697
699{
700 static const char *trc_pfx = "Purge() ";
701
702 XrdOucEnv env;
703 long long disk_usage;
704 long long estimated_file_usage = m_configuration.m_diskUsageHWM;
705
706 // Pause before initial run
707 sleep(1);
708
709 m_fs_state = new DataFsState;
710
711 // { PathTokenizer p("/a/b/c/f.root", 2, true); p.deboog(); }
712 // { PathTokenizer p("/a/b/f.root", 2, true); p.deboog(); }
713 // { PathTokenizer p("/a/f.root", 2, true); p.deboog(); }
714 // { PathTokenizer p("/f.root", 2, true); p.deboog(); }
715
716 int age_based_purge_countdown = 0; // enforce on first purge loop entry.
717 bool is_first = true;
718
719 while (true)
720 {
721 time_t purge_start = time(0);
722
723 {
724 XrdSysCondVarHelper lock(&m_active_cond);
725
726 m_in_purge = true;
727 }
728
729 TRACE(Info, trc_pfx << "Started.");
730
731 // Bytes to remove based on total disk usage (d) and file usage (f).
732 long long bytesToRemove_d = 0, bytesToRemove_f = 0;
733
734 // get amount of space to potentially erase based on total disk usage
735 XrdOssVSInfo sP; // Make sure we start when a clean slate in each loop
736 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
737 {
738 TRACE(Error, trc_pfx << "can't get StatVS for oss space " << m_configuration.m_data_space);
739 continue;
740 }
741 else
742 {
743 disk_usage = sP.Total - sP.Free;
744 TRACE(Debug, trc_pfx << "used disk space " << disk_usage << " bytes.");
745
746 if (disk_usage > m_configuration.m_diskUsageHWM)
747 {
748 bytesToRemove_d = disk_usage - m_configuration.m_diskUsageLWM;
749 }
750 }
751
752 // estimate amount of space to erase based on file usage
753 if (m_configuration.are_file_usage_limits_set())
754 {
755 long long estimated_writes_since_last_purge;
756 {
757 XrdSysCondVarHelper lock(&m_writeQ.condVar);
758
759 estimated_writes_since_last_purge = m_writeQ.writes_between_purges;
760 m_writeQ.writes_between_purges = 0;
761 }
762 estimated_file_usage += estimated_writes_since_last_purge;
763
764 TRACE(Debug, trc_pfx << "estimated usage by files " << estimated_file_usage << " bytes.");
765
766 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
767
768 // Here we estimate fractional usages -- to decide if full scan is necessary before actual purge.
769 double frac_du = 0, frac_fu = 0;
770 m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
771
772 if (frac_fu > 1.0 - frac_du)
773 {
774 bytesToRemove_f = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
775 }
776 }
777
778 long long bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
779
780 bool enforce_age_based_purge = false;
781 if (m_configuration.is_age_based_purge_in_effect() || m_configuration.is_uvkeep_purge_in_effect())
782 {
783 // XXXX ... I could collect those guys in larger vectors (maps?) and do traversal when
784 // they are empty.
785 if (--age_based_purge_countdown <= 0)
786 {
787 enforce_age_based_purge = true;
788 age_based_purge_countdown = m_configuration.m_purgeAgeBasedPeriod;
789 }
790 }
791
792 bool enforce_traversal_for_usage_collection = is_first;
793 // XXX Other conditions? Periodic checks?
794
795 copy_out_active_stats_and_update_data_fs_state();
796
797 TRACE(Debug, trc_pfx << "Precheck:");
798 TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
799 TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (" << (is_first ? "max possible for initial run" : "estimated") << ")");
800 TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
801 TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
802 is_first = false;
803
804 long long bytesToRemove_at_start = 0; // set after file scan
805 int deleted_file_count = 0;
806
807 bool purge_required = (bytesToRemove > 0 || enforce_age_based_purge);
808
809 // XXXX-PurgeOpt Need to retain this state between purges so I can avoid doing
810 // the traversal more often than really needed.
811 FPurgeState purgeState(2 * bytesToRemove, *m_oss); // prepare twice more volume than required
812
813 if (purge_required || enforce_traversal_for_usage_collection)
814 {
815 // Make a sorted map of file paths sorted by access time.
816
817 if (m_configuration.is_age_based_purge_in_effect())
818 {
819 purgeState.setMinTime(time(0) - m_configuration.m_purgeColdFilesAge);
820 }
821 if (m_configuration.is_uvkeep_purge_in_effect())
822 {
823 purgeState.setUVKeepMinTime(time(0) - m_configuration.m_cs_UVKeep);
824 }
825
826 XrdOssDF* dh = m_oss->newDir(m_configuration.m_username.c_str());
827 if (dh->Opendir("/", env) == XrdOssOK)
828 {
829 purgeState.begin_traversal(m_fs_state->get_root());
830
831 purgeState.TraverseNamespace(dh);
832
833 purgeState.end_traversal();
834
835 dh->Close();
836 }
837 delete dh; dh = 0;
838
839 estimated_file_usage = purgeState.getNBytesTotal();
840
841 TRACE(Debug, trc_pfx << "actual usage by files " << estimated_file_usage << " bytes.");
842
843 // Adjust bytesToRemove_f and then bytesToRemove based on actual file usage,
844 // possibly retreating below nominal file usage (but not below baseline file usage).
845 if (m_configuration.are_file_usage_limits_set())
846 {
847 bytesToRemove_f = std::max(estimated_file_usage - m_configuration.m_fileUsageNominal, 0ll);
848
849 double frac_du = 0, frac_fu = 0;
850 m_configuration.calculate_fractional_usages(disk_usage, estimated_file_usage, frac_du, frac_fu);
851
852 if (frac_fu > 1.0 - frac_du)
853 {
854 bytesToRemove = std::max(bytesToRemove_f, disk_usage - m_configuration.m_diskUsageLWM);
855 bytesToRemove = std::min(bytesToRemove, estimated_file_usage - m_configuration.m_fileUsageBaseline);
856 }
857 else
858 {
859 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
860 }
861 }
862 else
863 {
864 bytesToRemove = std::max(bytesToRemove_d, bytesToRemove_f);
865 }
866 bytesToRemove_at_start = bytesToRemove;
867
868 TRACE(Debug, trc_pfx << "After scan:");
869 TRACE(Debug, "\tbytes_to_remove_disk = " << bytesToRemove_d << " B");
870 TRACE(Debug, "\tbytes_to remove_files = " << bytesToRemove_f << " B (measured)");
871 TRACE(Debug, "\tbytes_to_remove = " << bytesToRemove << " B");
872 TRACE(Debug, "\tenforce_age_based_purge = " << enforce_age_based_purge);
873 TRACE(Debug, "\tmin_time = " << purgeState.getMinTime());
874
875 if (enforce_age_based_purge)
876 {
877 purgeState.MoveListEntriesToMap();
878 }
879 }
880
881 // Dump statistcs before actual purging so maximum usage values get recorded.
882 // Should really go to gstream --- and should really go from Heartbeat.
883 if (m_configuration.is_dir_stat_reporting_on())
884 {
885 m_fs_state->dump_recursively();
886 }
887
888 if (purge_required)
889 {
890 // Loop over map and remove files with oldest values of access time.
891 struct stat fstat;
892 size_t info_ext_len = strlen(Info::s_infoExtension);
893 int protected_cnt = 0;
894 long long protected_sum = 0;
895 for (FPurgeState::map_i it = purgeState.m_fmap.begin(); it != purgeState.m_fmap.end(); ++it)
896 {
897 // Finish when enough space has been freed but not while age-based purging is in progress.
898 // Those files are marked with time-stamp = 0.
899 if (bytesToRemove <= 0 && ! (enforce_age_based_purge && it->first == 0))
900 {
901 break;
902 }
903
904 std::string &infoPath = it->second.path;
905 std::string dataPath = infoPath.substr(0, infoPath.size() - info_ext_len);
906
907 if (IsFileActiveOrPurgeProtected(dataPath))
908 {
909 ++protected_cnt;
910 protected_sum += it->second.nBytes;
911 TRACE(Debug, trc_pfx << "File is active or purge-protected: " << dataPath << " size: " << it->second.nBytes);
912 continue;
913 }
914
915 // remove info file
916 if (m_oss->Stat(infoPath.c_str(), &fstat) == XrdOssOK)
917 {
918 // cinfo file can be on another oss.space, do not subtract for now.
919 // Could be relevant for very small block sizes.
920 // bytesToRemove -= fstat.st_size;
921 // estimated_file_usage -= fstat.st_size;
922 // ++deleted_file_count;
923
924 m_oss->Unlink(infoPath.c_str());
925 TRACE(Dump, trc_pfx << "Removed file: '" << infoPath << "' size: " << fstat.st_size);
926 }
927
928 // remove data file
929 if (m_oss->Stat(dataPath.c_str(), &fstat) == XrdOssOK)
930 {
931 bytesToRemove -= it->second.nBytes;
932 estimated_file_usage -= it->second.nBytes;
933 ++deleted_file_count;
934
935 m_oss->Unlink(dataPath.c_str());
936 TRACE(Dump, trc_pfx << "Removed file: '" << dataPath << "' size: " << it->second.nBytes << ", time: " << it->first);
937
938 if (it->second.dirState != 0) // XXXX This should now always be true.
939 it->second.dirState->add_usage_purged(it->second.nBytes);
940 else
941 TRACE(Error, trc_pfx << "DirState not set for file '" << dataPath << "'.");
942 }
943 }
944 if (protected_cnt > 0)
945 {
946 TRACE(Info, trc_pfx << "Encountered " << protected_cnt << " protected files, sum of their size: " << protected_sum);
947 }
948
949 m_fs_state->upward_propagate_usage_purged();
950 }
951
952 {
953 XrdSysCondVarHelper lock(&m_active_cond);
954
955 m_purge_delay_set.clear();
956 m_in_purge = false;
957 }
958
959 int purge_duration = time(0) - purge_start;
960
961 TRACE(Info, trc_pfx << "Finished, removed " << deleted_file_count << " data files, total size " <<
962 bytesToRemove_at_start - bytesToRemove << ", bytes to remove at end " << bytesToRemove << ", purge duration " << purge_duration);
963
964 int sleep_time = m_configuration.m_purgeInterval - purge_duration;
965 if (sleep_time > 0)
966 {
967 sleep(sleep_time);
968 }
969 }
970}
971
972} // end XrdPfc namespace
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
static void parent()
#define XrdOssOK
Definition XrdOss.hh:50
#define TRACE_PURGE(x)
#define fstat(a, b)
Definition XrdPosix.hh:57
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int StatRet(struct stat *buff)
Definition XrdOss.hh:107
virtual int Opendir(const char *path, XrdOucEnv &env)
Definition XrdOss.hh:79
virtual int Readdir(char *buff, int blen)
Definition XrdOss.hh:92
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
long long Total
Definition XrdOssVS.hh:90
long long Free
Definition XrdOssVS.hh:91
XrdOucCacheStats Statistics
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
static const Configuration & Conf()
Definition XrdPfc.cc:165
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:402
void Purge()
Thread function invoked to scan and purge files from disk when needed.
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:163
void ResourceMonitorHeartBeat()
Thread function checking resource usage periodically.
bool IsFileActiveOrPurgeProtected(const std::string &)
Definition XrdPfc.cc:684
DirState * find_dirstate_for_lfn(const std::string &lfn)
int get_max_depth() const
void upward_propagate_usage_purged()
DirState * get_root()
DirState(int max_depth)
void add_usage_purged(long long up)
void dump_recursively(const char *name)
long long upward_propagate_usage_purged()
void upward_propagate_stats()
DirState * find_path(const std::string &path, int max_depth, bool parse_as_lfn, bool create_subdirs)
DirState * find_dir(const std::string &dir, bool create_subdirs)
DirState(DirState *parent)
DirState * get_parent()
void set_usage(long long u)
void add_up_stats(const Stats &stats)
time_t getMinTime() const
const int m_max_dir_level_for_stat_collection
std::vector< long long > m_dir_usage_stack
std::multimap< time_t, FS > map_t
const char * m_info_ext
void CheckFile(const char *fname, Info &info, struct stat &fstat)
map_t::iterator map_i
std::vector< std::string > m_dir_names_stack
void setUVKeepMinTime(time_t min_time)
list_t::iterator list_i
XrdSysTrace * m_trace
const size_t m_info_ext_len
static const char * m_traceID
std::list< FS > list_t
void cd_down(const std::string &dir_name)
void TraverseNamespace(XrdOssDF *iOssDF)
void setMinTime(time_t min_time)
FPurgeState(long long iNBytesReq, XrdOss &oss)
long long getNBytesTotal() const
void begin_traversal(DirState *root, const char *root_path="/")
std::string m_current_path
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
bool GetLatestDetachTime(time_t &t) const
Get latest detach time.
long long GetNDownloadedBytes() const
Get number of downloaded bytes.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
Statistics of cache utilisation by a File object.
XrdSysTrace * GetTrace()
FS(const std::string &dname, const char *fname, long long n, time_t t, DirState *ds)
std::vector< const char * > m_dirs
Definition XrdPfc.hh:187