XRootD
Loading...
Searching...
No Matches
XrdOssCsiFile.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O s s C s i F i l e . c c */
4/* */
5/* (C) Copyright 2021 CERN. */
6/* */
7/* This file is part of the XRootD software suite. */
8/* */
9/* XRootD is free software: you can redistribute it and/or modify it under */
10/* the terms of the GNU Lesser General Public License as published by the */
11/* Free Software Foundation, either version 3 of the License, or (at your */
12/* option) any later version. */
13/* */
14/* In applying this licence, CERN does not waive the privileges and */
15/* immunities granted to it by virtue of its status as an Intergovernmental */
16/* Organization or submit itself to any jurisdiction. */
17/* */
18/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
19/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
20/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
21/* License for more details. */
22/* */
23/* You should have received a copy of the GNU Lesser General Public License */
24/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
25/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
26/* */
27/* The copyright holder's institutional names and contributor's names may not */
28/* be used to endorse or promote products derived from this software without */
29/* specific prior written permission of the institution or contributor. */
30/******************************************************************************/
31
32#include "XrdOssCsi.hh"
33#include "XrdOssCsiTrace.hh"
35#include "XrdOssCsiPages.hh"
36#include "XrdOssCsiRanges.hh"
37#include "XrdOuc/XrdOucCRC.hh"
38#include "XrdOuc/XrdOucEnv.hh"
39#include "XrdSfs/XrdSfsAio.hh"
41#include "XrdVersion.hh"
42#include "XrdSfs/XrdSfsAio.hh"
43
44#include <string>
45#include <algorithm>
46
47#include <sys/types.h>
48#include <sys/stat.h>
49#include <unistd.h>
50#include <fcntl.h>
51#include <limits.h>
52#include <assert.h>
53
56
57// storage for class members
59std::unordered_map<std::string, std::shared_ptr<XrdOssCsiFile::puMapItem_t> > XrdOssCsiFile::pumap_;
60
61//
62// If no others hold a pointer to Pages object, close it and remoe the pagemap info object.
63//
64int XrdOssCsiFile::pageMapClose()
65{
66 if (!pmi_) return -EBADF;
67 bool doclose = false;
68
69 XrdSysMutexHelper lck(pmi_->mtx);
70 if (mapRelease(pmi_)) doclose = true;
71
72 int cpret = 0;
73 if (doclose)
74 {
75 if (pmi_->pages)
76 {
77 cpret = pmi_->pages->Close();
78 pmi_->pages.reset();
79 }
80 }
81
82 lck.UnLock();
83 pmi_.reset();
84
85 return cpret;
86}
87
88void XrdOssCsiFile::mapTake(const std::string &key, std::shared_ptr<puMapItem_t> &pmi, const bool create)
89{
91 auto mapidx = pumap_.find(key);
92 if (mapidx == pumap_.end())
93 {
94 if (!create) return;
95 pmi.reset(new puMapItem_t());
96 pmi->tpath = key;
97 if (!key.empty())
98 {
99 pumap_.insert(std::make_pair(key, pmi));
100 }
101 }
102 else
103 {
104 pmi = mapidx->second;
105 }
106 pmi->refcount++;
107}
108
109int XrdOssCsiFile::mapRelease(std::shared_ptr<puMapItem_t> &pmi, XrdSysMutexHelper *plck)
110{
112 pmi->refcount--;
113 auto mapidx = pumap_.find(pmi->tpath);
114 if (pmi->refcount == 0 || pmi->unlinked)
115 {
116 if (mapidx != pumap_.end() && mapidx->second == pmi)
117 {
118 pumap_.erase(mapidx);
119 }
120 }
121 if (plck) plck->UnLock();
122 return (pmi->refcount == 0) ? 1 : 0;
123}
124
125int XrdOssCsiFile::pageAndFileOpen(const char *fn, const int dflags, const int Oflag, const mode_t Mode, XrdOucEnv &Env)
126{
127 if (pmi_) return -EBADF;
128
129 {
130 std::string tpath = config_.tagParam_.makeTagFilename(fn);
131 mapTake(tpath, pmi_);
132 }
133
134 XrdSysMutexHelper lck(pmi_->mtx);
135 pmi_->dpath = fn;
136 if (pmi_->unlinked)
137 {
138 mapRelease(pmi_, &lck);
139 // filename replaced since check, try again
140 pmi_.reset();
141 return pageAndFileOpen(fn, dflags, Oflag, Mode, Env);
142 }
143
144 if ((dflags & O_TRUNC) && pmi_->pages)
145 {
146 // truncate of already open file at open() not supported
147 mapRelease(pmi_, &lck);
148 pmi_.reset();
149 return -EDEADLK;
150 }
151
152 const int dataret = successor_->Open(pmi_->dpath.c_str(), dflags, Mode, Env);
153 int pageret = XrdOssOK;
154 if (dataret == XrdOssOK)
155 {
156 if (pmi_->pages)
157 {
158 return XrdOssOK;
159 }
160
161 pageret = createPageUpdater(Oflag, Env);
162 if (pageret == XrdOssOK)
163 {
164 return XrdOssOK;
165 }
166
167 // failed to open the datafile or create the page object.
168 // close datafile if needed
169 (void) successor_->Close();
170 }
171
172 mapRelease(pmi_, &lck);
173 pmi_.reset();
174
175 return (dataret != XrdOssOK) ? dataret : pageret;
176}
177
179{
180 if (pmi_)
181 {
182 (void)Close();
183 }
184}
185
186int XrdOssCsiFile::Close(long long *retsz)
187{
188 if (!pmi_)
189 {
190 return -EBADF;
191 }
192
193 // wait for any ongoing aios to finish
194 aioWait();
195
196 const int cpret = pageMapClose();
197
198 const int csret = successor_->Close(retsz);
199 if (cpret<0) return cpret;
200 return csret;
201}
202
203int XrdOssCsiFile::createPageUpdater(const int Oflag, XrdOucEnv &Env)
204{
205 std::unique_ptr<XrdOucEnv> tagEnv = XrdOssCsi::tagOpenEnv(config_, Env);
206
207 // get information about data file size
208 off_t dsize = 0;
209 if (!(Oflag & O_EXCL) && !(Oflag & O_TRUNC))
210 {
211 struct stat sb;
212 const int sstat = successor_->Fstat(&sb);
213 if (sstat<0)
214 {
215 return sstat;
216 }
217 dsize = sb.st_size;
218 }
219
220 // tag file always opened O_RDWR as the Tagstore/Pages object associated will be shared
221 // between any File instances which concurrently access the file
222 // (some of which may be RDWR, some RDONLY)
223 int tagFlags = O_RDWR;
224
225 // data file was truncated, do same to tag file and let it be reset
226 if ((Oflag & O_TRUNC)) tagFlags |= O_TRUNC;
227
228 // The concern with allowing creation of a new tag file is that the data file may
229 // already exist. Creating a new empty tag file would usually cause subsequent access
230 // errors, but not if the data file starts empty. In addition we may have been
231 // configured to ignore missing tag files. Approach taken is that:
232 // If the data file creation was wanted and it is currently zero length then
233 // allow creation of tag file.
234 if ((Oflag & O_CREAT) && dsize == 0)
235 {
236 tagFlags |= O_CREAT;
237 }
238
239 // be sure the leading directories exist for the tag file
240 if ((tagFlags & O_CREAT))
241 {
242 int mkdret = XrdOssOK;
243 {
244 std::string base = pmi_->tpath;
245 const size_t idx = base.rfind("/");
246 base = base.substr(0,idx);
247 if (!base.empty())
248 {
249 const int AMode = S_IRWXU|S_IRWXG|S_IROTH|S_IXOTH; // 775
250 mkdret = parentOss_->Mkdir(base.c_str(), AMode, 1, tagEnv.get());
251 }
252 }
253 if (mkdret != XrdOssOK && mkdret != -EEXIST)
254 {
255 return mkdret;
256 }
257 }
258
259 std::unique_ptr<XrdOssDF> integFile(parentOss_->newFile(tident));
260 std::unique_ptr<XrdOssCsiTagstore> ts(new
261 XrdOssCsiTagstoreFile(pmi_->dpath, std::move(integFile), tident));
262 std::unique_ptr<XrdOssCsiPages> pages(new
263 XrdOssCsiPages(pmi_->dpath, std::move(ts), config_.fillFileHole(), config_.allowMissingTags(),
264 config_.disablePgExtend(), config_.disableLooseWrite(), tident));
265
266 int puret = pages->Open(pmi_->tpath.c_str(), dsize, tagFlags, *tagEnv);
267 if (puret<0)
268 {
269 if ((puret == -EROFS || puret == -EACCES) && rdonly_)
270 {
271 // try to open tag file readonly
272 puret = pages->Open(pmi_->tpath.c_str(), dsize, O_RDONLY, *tagEnv);
273 }
274 }
275
276 if (puret<0)
277 {
278 return puret;
279 }
280
281 pages->BasicConsistencyCheck(successor_);
282 pmi_->pages = std::move(pages);
283 return XrdOssOK;
284}
285
286int XrdOssCsiFile::Open(const char *path, const int Oflag, const mode_t Mode, XrdOucEnv &Env)
287{
288 char cxid[4];
289
290 if (pmi_)
291 {
292 // already open
293 return -EINVAL;
294 }
295
296 if (!path)
297 {
298 return -EINVAL;
299 }
300 if (config_.tagParam_.isTagFile(path))
301 {
302 if ((Oflag & O_CREAT)) return -EACCES;
303 return -ENOENT;
304 }
305
306 int dflags = Oflag;
307 if ((dflags & O_ACCMODE) == O_WRONLY)
308 {
309 // for non-aligned writes it may be needed to do read-modify-write
310 dflags &= ~O_ACCMODE;
311 dflags |= O_RDWR;
312 }
313
314 rdonly_ = true;
315 if ((dflags & O_ACCMODE) != O_RDONLY)
316 {
317 rdonly_ = false;
318 }
319
320 const int oret = pageAndFileOpen(path, dflags, Oflag, Mode, Env);
321 if (oret<0)
322 {
323 return oret;
324 }
325
326 if (successor_->isCompressed(cxid)>0)
327 {
328 (void)Close();
329 return -ENOTSUP;
330 }
331
332 if (Pages()->IsReadOnly() && !rdonly_)
333 {
334 (void)Close();
335 return -EACCES;
336 }
337 return XrdOssOK;
338}
339
340ssize_t XrdOssCsiFile::Read(off_t offset, size_t blen)
341{
342 return successor_->Read(offset, blen);
343}
344
345ssize_t XrdOssCsiFile::Read(void *buff, off_t offset, size_t blen)
346{
347 if (!pmi_) return -EBADF;
348
350 Pages()->LockTrackinglen(rg, offset, offset+blen, true);
351
352 const ssize_t bread = successor_->Read(buff, offset, blen);
353 if (bread<0 || blen==0) return bread;
354
355 const ssize_t puret = Pages()->VerifyRange(successor_, buff, offset, bread, rg);
356 if (puret<0) return puret;
357 return bread;
358}
359
360ssize_t XrdOssCsiFile::ReadRaw(void *buff, off_t offset, size_t blen)
361{
362 if (!pmi_) return -EBADF;
363
365 Pages()->LockTrackinglen(rg, offset, offset+blen, true);
366
367 const ssize_t bread = successor_->ReadRaw(buff, offset, blen);
368 if (bread<0 || blen==0) return bread;
369
370 const ssize_t puret = Pages()->VerifyRange(successor_, buff, offset, bread, rg);
371 if (puret<0) return puret;
372 return bread;
373}
374
375ssize_t XrdOssCsiFile::ReadV(XrdOucIOVec *readV, int n)
376{
377 if (!pmi_) return -EBADF;
378 if (n==0) return 0;
379
381 off_t start = readV[0].offset;
382 off_t end = start + (off_t)readV[0].size;
383 for(int i=1; i<n; i++)
384 {
385 const off_t p1 = readV[i].offset;
386 const off_t p2 = p1 + (off_t)readV[i].size;
387 if (p1<start) start = p1;
388 if (p2>end) end = p2;
389 }
390 Pages()->LockTrackinglen(rg, start, end, true);
391
392 // standard OSS gives -ESPIPE in case of partial read of an element
393 ssize_t rret = successor_->ReadV(readV, n);
394 if (rret<0) return rret;
395 for (int i=0; i<n; i++)
396 {
397 if (readV[i].size == 0) continue;
398 ssize_t puret = Pages()->VerifyRange(successor_, readV[i].data, readV[i].offset, readV[i].size, rg);
399 if (puret<0) return puret;
400 }
401 return rret;
402}
403
404ssize_t XrdOssCsiFile::Write(const void *buff, off_t offset, size_t blen)
405{
406 if (!pmi_) return -EBADF;
407 if (rdonly_) return -EBADF;
408
410 Pages()->LockTrackinglen(rg, offset, offset+blen, false);
411
412 int puret = Pages()->UpdateRange(successor_, buff, offset, blen, rg);
413 if (puret<0)
414 {
415 rg.ReleaseAll();
416 resyncSizes();
417 return (ssize_t)puret;
418 }
419 ssize_t towrite = blen;
420 ssize_t bwritten = 0;
421 const uint8_t *p = (uint8_t*)buff;
422 while(towrite>0)
423 {
424 ssize_t wret = successor_->Write(&p[bwritten], offset+bwritten, towrite);
425 if (wret<0)
426 {
427 rg.ReleaseAll();
428 resyncSizes();
429 return wret;
430 }
431 towrite -= wret;
432 bwritten += wret;
433 }
434 return bwritten;
435}
436
437ssize_t XrdOssCsiFile::WriteV(XrdOucIOVec *writeV, int n)
438{
439 if (!pmi_) return -EBADF;
440 if (rdonly_) return -EBADF;
441 if (n==0) return 0;
442
444 off_t start = writeV[0].offset;
445 off_t end = start + (off_t)writeV[0].size;
446 for(int i=1; i<n; i++)
447 {
448 const off_t p1 = writeV[i].offset;
449 const off_t p2 = p1 + (off_t)writeV[i].size;
450 if (p1<start) start = p1;
451 if (p2>end) end = p2;
452 }
453 Pages()->LockTrackinglen(rg, start, end, false);
454
455 for (int i=0; i<n; i++)
456 {
457 int ret = Pages()->UpdateRange(successor_, writeV[i].data, writeV[i].offset, writeV[i].size, rg);
458 if (ret<0)
459 {
460 rg.ReleaseAll();
461 resyncSizes();
462 return ret;
463 }
464 }
465 // standard OSS gives -ESPIPE in case of partial write of an element
466 ssize_t wret = successor_->WriteV(writeV, n);
467 if (wret<0)
468 {
469 rg.ReleaseAll();
470 resyncSizes();
471 }
472 return wret;
473}
474
475ssize_t XrdOssCsiFile::pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
476{
477 if (!pmi_) return -EBADF;
478
480 Pages()->LockTrackinglen(rg, offset, offset+rdlen, true);
481
482 // if we return a short amount of data the caller will have to deal with
483 // joining csvec values from repeated reads: for simplicity try to read as
484 // such as possible up to the request read length
485 ssize_t toread = rdlen;
486 ssize_t bread = 0;
487 uint8_t *const p = (uint8_t*)buffer;
488 do
489 {
490 ssize_t rret = successor_->Read(&p[bread], offset+bread, toread);
491 if (rret<0) return rret;
492 if (rret==0) break;
493 toread -= rret;
494 bread += rret;
495 } while(toread>0);
496 if (rdlen == 0) return bread;
497
498 ssize_t puret = Pages()->FetchRange(successor_, buffer, offset, bread, csvec, opts, rg);
499 if (puret<0) return puret;
500 return bread;
501}
502
503ssize_t XrdOssCsiFile::pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
504{
505 if (!pmi_) return -EBADF;
506 if (rdonly_) return -EBADF;
507 uint64_t pgopts = opts;
508
509 const int prec = XrdOssCsiPages::pgWritePrelockCheck(buffer, offset, wrlen, csvec, opts);
510 if (prec < 0)
511 {
512 return prec;
513 }
514
516 Pages()->LockTrackinglen(rg, offset, offset+wrlen, false);
517
518 int puret = Pages()->StoreRange(successor_, buffer, offset, wrlen, csvec, pgopts, rg);
519 if (puret<0) {
520 rg.ReleaseAll();
521 resyncSizes();
522 return (ssize_t)puret;
523 }
524 ssize_t towrite = wrlen;
525 ssize_t bwritten = 0;
526 const uint8_t *p = (uint8_t*)buffer;
527 do
528 {
529 ssize_t wret = successor_->Write(&p[bwritten], offset+bwritten, towrite);
530 if (wret<0)
531 {
532 rg.ReleaseAll();
533 resyncSizes();
534 return wret;
535 }
536 towrite -= wret;
537 bwritten += wret;
538 } while(towrite>0);
539 return bwritten;
540}
541
543{
544 if (!pmi_) return -EBADF;
545
546 const int psret = Pages()->Fsync();
547 const int ssret = successor_->Fsync();
548 if (psret<0) return psret;
549 return ssret;
550}
551
552int XrdOssCsiFile::Ftruncate(unsigned long long flen)
553{
554 if (!pmi_) return -EBADF;
555 if (rdonly_) return -EBADF;
556
558 Pages()->LockTrackinglen(rg, flen, LLONG_MAX, false);
559 int ret = Pages()->truncate(successor_, flen, rg);
560 if (ret<0)
561 {
562 rg.ReleaseAll();
563 resyncSizes();
564 return ret;
565 }
566 ret = successor_->Ftruncate(flen);
567 if (ret<0)
568 {
569 rg.ReleaseAll();
570 resyncSizes();
571 }
572 return ret;
573}
574
575int XrdOssCsiFile::Fstat(struct stat *buff)
576{
577 if (!pmi_) return -EBADF;
579 const int tsret = Pages()->TrackedSizesGet(sizes, false);
580 const int fsret = successor_->Fstat(buff);
581 if (fsret<0) return fsret;
582 if (tsret<0) return 0;
583 buff->st_size = std::max(sizes.first, sizes.second);
584 return 0;
585}
586
587int XrdOssCsiFile::resyncSizes()
588{
590 Pages()->LockTrackinglen(rg, 0, LLONG_MAX, false);
591 struct stat sbuff;
592 int ret = successor_->Fstat(&sbuff);
593 if (ret<0) return ret;
594 Pages()->LockResetSizes(successor_, sbuff.st_size);
595 return 0;
596}
597
599{
600 if (!pmi_) return;
601
602 Pages()->Flush();
603 successor_->Flush();
604}
605
607{
608 if (!pmi_) return 0;
609 return Pages()->VerificationStatus();
610}
XrdOucTrace OssCsiTrace
XrdOucTrace OssCsiTrace & OssCsiEroute
Definition XrdOssCsi.cc:52
#define XrdOssOK
Definition XrdOss.hh:50
#define stat(a, b)
Definition XrdPosix.hh:96
int Mode
struct myOpts opts
std::string makeTagFilename(const char *path)
virtual ssize_t pgWrite(void *, off_t, size_t, uint32_t *, uint64_t)
XrdOssCsiPages * Pages()
Definition XrdOssCsi.hh:140
virtual int Fsync()
virtual ssize_t Write(const void *, off_t, size_t)
virtual int Ftruncate(unsigned long long)
virtual ssize_t pgRead(void *, off_t, size_t, uint32_t *, uint64_t)
virtual ssize_t ReadV(XrdOucIOVec *readV, int n)
virtual ssize_t Read(off_t, size_t)
virtual int Open(const char *, int, mode_t, XrdOucEnv &)
virtual int Close(long long *retsz=0)
virtual void Flush()
Flush filesystem cached pages for this file (used for checksums).
virtual ~XrdOssCsiFile()
static XrdSysMutex pumtx_
Definition XrdOssCsi.hh:159
virtual int Fstat(struct stat *)
static std::unordered_map< std::string, std::shared_ptr< puMapItem_t > > pumap_
Definition XrdOssCsi.hh:160
virtual ssize_t ReadRaw(void *, off_t, size_t)
virtual ssize_t WriteV(XrdOucIOVec *writeV, int n)
static void mapTake(const std::string &, std::shared_ptr< puMapItem_t > &, bool create=true)
static int mapRelease(std::shared_ptr< puMapItem_t > &, XrdSysMutexHelper *plck=NULL)
std::pair< off_t, off_t > Sizes_t
int truncate(XrdOssDF *, off_t, XrdOssCsiRangeGuard &)
int LockResetSizes(XrdOssDF *, off_t)
int FetchRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
void LockTrackinglen(XrdOssCsiRangeGuard &, off_t, off_t, bool)
int TrackedSizesGet(Sizes_t &, bool)
static int pgWritePrelockCheck(const void *, off_t, size_t, const uint32_t *, uint64_t)
int StoreRange(XrdOssDF *, const void *, off_t, size_t, uint32_t *, uint64_t, XrdOssCsiRangeGuard &)
int UpdateRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
int VerifyRange(XrdOssDF *, const void *, off_t, size_t, XrdOssCsiRangeGuard &)
static std::unique_ptr< XrdOucEnv > tagOpenEnv(const XrdOssCsiConfig &, XrdOucEnv &)
Definition XrdOssCsi.cc:467
XrdOssDF * successor_
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
long long offset