XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfc.hh"
23#include "XrdPfcIO.hh"
24#include "XrdPfcTrace.hh"
25
27#include "XrdSys/XrdSysTimer.hh"
28#include "XrdOss/XrdOss.hh"
29#include "XrdOuc/XrdOucEnv.hh"
31
32#include "XrdCl/XrdClURL.hh"
33
34#include <cstdio>
35#include <sstream>
36#include <fcntl.h>
37#include <cassert>
38
39
40using namespace XrdPfc;
41
42namespace
43{
44
45const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
46
47Cache* cache() { return &Cache::GetInstance(); }
48
49}
50
51const char *File::m_traceID = "File";
52
53//------------------------------------------------------------------------------
54
55File::File(const std::string& path, long long iOffset, long long iFileSize) :
56 m_ref_cnt(0),
57 m_data_file(0),
58 m_info_file(0),
59 m_cfi(Cache::TheOne().GetTrace(), Cache::TheOne().is_prefetch_enabled()),
60 m_filename(path),
61 m_offset(iOffset),
62 m_file_size(iFileSize),
63 m_current_io(m_io_set.end()),
64 m_ios_in_detach(0),
65 m_non_flushed_cnt(0),
66 m_in_sync(false),
67 m_detach_time_logged(false),
68 m_in_shutdown(false),
69 m_state_cond(0),
70 m_block_size(0),
71 m_num_blocks(0),
72 m_resmon_token(-1),
73 m_prefetch_state(kOff),
74 m_prefetch_bytes(0),
75 m_prefetch_read_cnt(0),
76 m_prefetch_hit_cnt(0),
77 m_prefetch_score(0)
78{}
79
80File::~File()
81{
82 TRACEF(Debug, "~File() for ");
83}
84
85void File::Close()
86{
87 // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
88 // A stat is called after close to re-check that m_stat_blocks have been reported correctly
89 // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
90 // in report_and_merge_delta_stats() below.
91 //
92 // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
93 // get reported in as stat.st_blocks.
94 // The reported number is correct in a stat immediately following a close.
95 // If one starts off by writing the last byte of the file, this pre-allocation does not get
96 // triggered up to that point. But comes back with a vengeance right after.
97 //
98 // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
99
100 if (m_info_file)
101 {
102 TRACEF(Debug, "Close() closing info-file ");
103 m_info_file->Close();
104 delete m_info_file;
105 m_info_file = nullptr;
106 }
107
108 if (m_data_file)
109 {
110 TRACEF(Debug, "Close() closing data-file ");
111 m_data_file->Close();
112 delete m_data_file;
113 m_data_file = nullptr;
114 }
115
116 if (m_resmon_token >= 0)
117 {
118 // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
119 // but in this case the file will get unlinked by the cache and reported as purge event.
120 // We check if the reported st_blocks so far is correct.
121 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
122 struct stat s;
123 int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
124 if (sr == 0 && s.st_blocks != m_st_blocks) {
125 Stats stats;
126 stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
127 m_st_blocks = s.st_blocks;
128 Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
129 }
130 }
131
132 Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
133 }
134
135 TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
136}
137
138//------------------------------------------------------------------------------
139
140File* File::FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO* inputIO)
141{
142 File *file = new File(path, offset, fileSize);
143 if ( ! file->Open(inputIO))
144 {
145 delete file;
146 file = 0;
147 }
148 return file;
149}
150
151//------------------------------------------------------------------------------
152
154{
155 // Called from Cache::Unlink() when the file is currently open.
156 // Cache::Unlink is also called on FSync error and when wrong number of bytes
157 // is received from a remote read.
158 //
159 // From this point onward the file will not be written to, cinfo file will
160 // not be updated, and all new read requests will return -ENOENT.
161 //
162 // File's entry in the Cache's active map is set to nullptr and will be
163 // removed from there shortly, in any case, well before this File object
164 // shuts down. Cache::Unlink() also reports the appropriate purge event.
165
166 XrdSysCondVarHelper _lck(m_state_cond);
167
168 m_in_shutdown = true;
169
170 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
171 {
172 m_prefetch_state = kStopped;
173 cache()->DeRegisterPrefetchFile(this);
174 }
175
176 report_and_merge_delta_stats();
177
178 return m_st_blocks;
179}
180
181//------------------------------------------------------------------------------
182
183void File::check_delta_stats()
184{
185 // Called under m_state_cond lock.
186 // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
187 if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
188 report_and_merge_delta_stats();
189}
190
191void File::report_and_merge_delta_stats()
192{
193 // Called under m_state_cond lock.
194 struct stat s;
195 m_data_file->Fstat(&s);
196 // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
197 // aggressive pre-allocation in this field (XFS, 4GB).
198 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
199 : m_file_size >> 9;
200 long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
201 m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
202 m_st_blocks = st_blocks_to_report;
203 Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
204 m_stats.AddUp(m_delta_stats);
205 m_delta_stats.Reset();
206}
207
208//------------------------------------------------------------------------------
209
211{
212 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
213
214 XrdSysCondVarHelper _lck(m_state_cond);
215 dec_ref_count(b);
216}
217
218void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
219{
220 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
221
222 XrdSysCondVarHelper _lck(m_state_cond);
223
224 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
225 {
226 dec_ref_count(*i);
227 }
228}
229
230//------------------------------------------------------------------------------
231
233{
234 std::string loc(io->GetLocation());
235 XrdSysCondVarHelper _lck(m_state_cond);
236 insert_remote_location(loc);
237}
238
239//------------------------------------------------------------------------------
240
242{
243 // Returns true if delay is needed.
244
245 TRACEF(Debug, "ioActive start for io " << io);
246
247 std::string loc(io->GetLocation());
248
249 {
250 XrdSysCondVarHelper _lck(m_state_cond);
251
252 IoSet_i mi = m_io_set.find(io);
253
254 if (mi != m_io_set.end())
255 {
256 unsigned int n_active_reads = io->m_active_read_reqs;
257
258 TRACE(Info, "ioActive for io " << io <<
259 ", active_reads " << n_active_reads <<
260 ", active_prefetches " << io->m_active_prefetches <<
261 ", allow_prefetching " << io->m_allow_prefetching <<
262 ", ios_in_detach " << m_ios_in_detach);
263 TRACEF(Info,
264 "\tio_map.size() " << m_io_set.size() <<
265 ", block_map.size() " << m_block_map.size() << ", file");
266
267 insert_remote_location(loc);
268
269 io->m_allow_prefetching = false;
270 io->m_in_detach = true;
271
272 // Check if any IO is still available for prfetching. If not, stop it.
273 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
274 {
275 if ( ! select_current_io_or_disable_prefetching(false) )
276 {
277 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
278 }
279 }
280
281 // On last IO, consider write queue blocks. Note, this also contains
282 // blocks being prefetched.
283
284 bool io_active_result;
285
286 if (n_active_reads > 0)
287 {
288 io_active_result = true;
289 }
290 else if (m_io_set.size() - m_ios_in_detach == 1)
291 {
292 io_active_result = ! m_block_map.empty();
293 }
294 else
295 {
296 io_active_result = io->m_active_prefetches > 0;
297 }
298
299 if ( ! io_active_result)
300 {
301 ++m_ios_in_detach;
302 }
303
304 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
305
306 return io_active_result;
307 }
308 else
309 {
310 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
311 return false;
312 }
313 }
314}
315
316//------------------------------------------------------------------------------
317
319{
320 XrdSysCondVarHelper _lck(m_state_cond);
321 m_detach_time_logged = false;
322}
323
325{
326 // Returns true if sync is required.
327 // This method is called after corresponding IO is detached from PosixCache.
328
329 XrdSysCondVarHelper _lck(m_state_cond);
330 if ( ! m_in_shutdown)
331 {
332 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
333 {
334 report_and_merge_delta_stats();
335 m_cfi.WriteIOStatDetach(m_stats);
336 m_detach_time_logged = true;
337 m_in_sync = true;
338 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
339 return true;
340 }
341 }
342 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
343 return false;
344}
345
346//------------------------------------------------------------------------------
347
349{
350 // Called from Cache::GetFile() when a new IO asks for the file.
351
352 TRACEF(Debug, "AddIO() io = " << (void*)io);
353
354 time_t now = time(0);
355 std::string loc(io->GetLocation());
356
357 m_state_cond.Lock();
358
359 IoSet_i mi = m_io_set.find(io);
360
361 if (mi == m_io_set.end())
362 {
363 m_io_set.insert(io);
364 io->m_attach_time = now;
365 m_delta_stats.IoAttach();
366
367 insert_remote_location(loc);
368
369 if (m_prefetch_state == kStopped)
370 {
371 m_prefetch_state = kOn;
372 cache()->RegisterPrefetchFile(this);
373 }
374 }
375 else
376 {
377 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
378 }
379
380 m_state_cond.UnLock();
381}
382
383//------------------------------------------------------------------------------
384
386{
387 // Called from Cache::ReleaseFile.
388
389 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
390
391 time_t now = time(0);
392
393 m_state_cond.Lock();
394
395 IoSet_i mi = m_io_set.find(io);
396
397 if (mi != m_io_set.end())
398 {
399 if (mi == m_current_io)
400 {
401 ++m_current_io;
402 }
403
404 m_delta_stats.IoDetach(now - io->m_attach_time);
405 m_io_set.erase(mi);
406 --m_ios_in_detach;
407
408 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
409 {
410 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
411 m_prefetch_state = kStopped;
412 cache()->DeRegisterPrefetchFile(this);
413 }
414 }
415 else
416 {
417 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
418 }
419
420 m_state_cond.UnLock();
421}
422
423//------------------------------------------------------------------------------
424
425bool File::Open(XrdOucCacheIO* inputIO)
426{
427 // Sets errno accordingly.
428
429 static const char *tpfx = "Open() ";
430
431 TRACEF(Dump, tpfx << "entered");
432
433 // Before touching anything, check with ResourceMonitor if a scan is in progress.
434 // This function will wait internally if needed until it is safe to proceed.
435 Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
436
438
439 XrdOss &myOss = * Cache::GetInstance().GetOss();
440 const char *myUser = conf.m_username.c_str();
441 XrdOucEnv myEnv;
442 struct stat data_stat, info_stat;
443
444 std::string ifn = m_filename + Info::s_infoExtension;
445
446 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
447 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
448
449 // Create the data file itself.
450 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
451 myEnv.Put("oss.asize", size_str);
452 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
453
454 int res;
455
456 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
457 {
458 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
459 errno = -res;
460 return false;
461 }
462
463 m_data_file = myOss.newFile(myUser);
464 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
465 {
466 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
467 errno = -res;
468 delete m_data_file; m_data_file = 0;
469 return false;
470 }
471
472 myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
473 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
474 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
475 {
476 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
477 errno = -res;
478 m_data_file->Close(); delete m_data_file; m_data_file = 0;
479 return false;
480 }
481
482 m_info_file = myOss.newFile(myUser);
483 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
484 {
485 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
486 errno = -res;
487 delete m_info_file; m_info_file = 0;
488 m_data_file->Close(); delete m_data_file; m_data_file = 0;
489 return false;
490 }
491
492 bool initialize_info_file = true;
493
494 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
495 {
496 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
497 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
498 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
499 ", block_size=" << (m_cfi.GetBufferSize() >> 10) << "k)");
500
501 // Check if data file exists and is of reasonable size.
502 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
503 {
504 initialize_info_file = false;
505 } else {
506 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
507 m_cfi.ResetAllAccessStats();
508 m_data_file->Ftruncate(0);
509 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
510 }
511 }
512
513 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
514 {
515 if (conf.does_cschk_have_missing_bits(m_cfi.GetCkSumState()) &&
516 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
517 {
518 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
519 initialize_info_file = true;
520 m_cfi.ResetAllAccessStats();
521 m_data_file->Ftruncate(0);
522 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
523 } else {
524 // TODO: If the file is complete, we don't need to reset net cksums.
525 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
526 }
527 }
528
529 // Check if we have pfc url arguments.
530 long long pfc_blocksize = conf.m_bufferSize;
531 int pfc_prefetch = conf.m_prefetch_max_blocks;
533 {
534 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
535 }
536
537 if (initialize_info_file)
538 {
539 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
540 m_cfi.SetCkSumState(conf.get_cs_Chk());
541 m_cfi.ResetNoCkSumTime();
542 m_cfi.Write(m_info_file, ifn.c_str());
543 m_info_file->Fsync();
544 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
545 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks()
546 << " block size = " << pfc_blocksize);
547 }
548 else
549 {
550 if (futimens(m_info_file->getFD(), NULL)) {
551 TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
552 }
553 if (pfc_blocksize != conf.m_bufferSize) {
554 TRACEF(Info, tpfx << "URL CGI pfc.blocksize ignored for an already existing file");
555 }
556 }
557
558 m_cfi.WriteIOStatAttach();
559 m_state_cond.Lock();
560 m_block_size = m_cfi.GetBufferSize();
561 m_num_blocks = m_cfi.GetNBlocks();
562 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
563 m_prefetch_max_blocks_in_flight = pfc_prefetch;
564 if (pfc_prefetch != conf.m_prefetch_max_blocks)
565 TRACEF(Debug, tpfx << "pfc.prefetch set to " << pfc_prefetch << " via CGI parameter");
566
567 m_data_file->Fstat(&data_stat);
568 m_st_blocks = data_stat.st_blocks;
569
570 m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
571 constexpr long long MB = 1024 * 1024;
572 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
573 // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
574 // actual threshold based on return values from register_file_update_stats().
575
576 m_state_cond.UnLock();
577
578 return true;
579}
580
581void File::parse_pfc_url_args(XrdOucCacheIO* inputIO, long long &pfc_blocksize, int &pfc_prefetch) const
582{
583 const Configuration &conf = Cache::TheOne().RefConfiguration();
584
585 XrdCl::URL url(inputIO->Path());
586 auto const & urlp = url.GetParams();
587
588 auto extract = [&](const std::string &key, std::string &value) -> bool {
589 auto it = urlp.find(key);
590 if (it != urlp.end()) {
591 value = it->second;
592 return true;
593 } else {
594 value.clear();
595 return false;
596 }
597 };
598
599 std::string val;
600 if (conf.m_cgi_blocksize_allowed && extract("pfc.blocksize", val))
601 {
602 const char *tpfx = "File::Open::urlcgi pfc.blocksize ";
603 long long bsize;
604 if (Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
606 {
607 pfc_blocksize = bsize;
608 } else {
609 TRACEF(Error, tpfx << "Error processing the parameter.");
610 }
611 }
612 if (conf.m_cgi_prefetch_allowed && extract("pfc.prefetch", val))
613 {
614 const char *tpfx = "File::Open::urlcgi pfc.prefetch ";
615 int pref;
616 if (Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
618 {
619 pfc_prefetch = pref;
620 } else {
621 TRACEF(Error, tpfx << "Error processing the parameter.");
622 }
623 }
624}
625
626//------------------------------------------------------------------------------
627
628int File::Fstat(struct stat &sbuff)
629{
630 // Stat on an open file.
631 // Corrects size to actual full size of the file.
632 // Sets atime to 0 if the file is only partially downloaded, in accordance
633 // with pfc.onlyifcached settings.
634 // Called from IO::Fstat() and Cache::Stat() when the file is active.
635 // Returns 0 on success, -errno on error.
636
637 int res;
638
639 if ((res = m_data_file->Fstat(&sbuff))) return res;
640
641 sbuff.st_size = m_file_size;
642
643 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
644 if ( ! is_cached)
645 sbuff.st_atime = 0;
646
647 return 0;
648}
649
650//==============================================================================
651// Read and helpers
652//==============================================================================
653
654bool File::overlap(int blk, // block to query
655 long long blk_size, //
656 long long req_off, // offset of user request
657 int req_size, // size of user request
658 // output:
659 long long &off, // offset in user buffer
660 long long &blk_off, // offset in block
661 int &size) // size to copy
662{
663 const long long beg = blk * blk_size;
664 const long long end = beg + blk_size;
665 const long long req_end = req_off + req_size;
666
667 if (req_off < end && req_end > beg)
668 {
669 const long long ovlp_beg = std::max(beg, req_off);
670 const long long ovlp_end = std::min(end, req_end);
671
672 off = ovlp_beg - req_off;
673 blk_off = ovlp_beg - beg;
674 size = (int) (ovlp_end - ovlp_beg);
675
676 assert(size <= blk_size);
677 return true;
678 }
679 else
680 {
681 return false;
682 }
683}
684
685//------------------------------------------------------------------------------
686
687Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
688{
689 // Must be called w/ state_cond locked.
690 // Checks on size etc should be done before.
691 //
692 // Reference count is 0 so increase it in calling function if you want to
693 // catch the block while still in memory.
694
695 const long long off = i * m_block_size;
696 const int last_block = m_num_blocks - 1;
697 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
698
699 int blk_size, req_size;
700 if (i == last_block) {
701 blk_size = req_size = m_file_size - off;
702 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
703 } else {
704 blk_size = req_size = m_block_size;
705 }
706
707 Block *b = 0;
708 char *buf = cache()->RequestRAM(req_size);
709
710 if (buf)
711 {
712 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
713
714 if (b)
715 {
716 m_block_map[i] = b;
717
718 // Actual Read request is issued in ProcessBlockRequests().
719
720 if (m_prefetch_state == kOn && (int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
721 {
722 m_prefetch_state = kHold;
723 cache()->DeRegisterPrefetchFile(this);
724 }
725 }
726 else
727 {
728 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
729 }
730 }
731
732 return b;
733}
734
735void File::ProcessBlockRequest(Block *b)
736{
737 // This *must not* be called with block_map locked.
738
740
741 if (XRD_TRACE What >= TRACE_Dump) {
742 char buf[256];
743 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
744 b->get_offset()/m_block_size, (void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (void*)b->get_buff(), (void*)brh);
745 TRACEF(Dump, "ProcessBlockRequest() " << buf);
746 }
747
748 if (b->req_cksum_net())
749 {
750 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
751 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
752 } else {
753 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
754 }
755}
756
757void File::ProcessBlockRequests(BlockList_t& blks)
758{
759 // This *must not* be called with block_map locked.
760
761 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
762 {
763 ProcessBlockRequest(*bi);
764 }
765}
766
767//------------------------------------------------------------------------------
768
769void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
770{
771 int n_chunks = ioVec.size();
772 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
773
774 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
775 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
776
777 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
778
779 int pos = 0;
780 while (n_chunks > XrdProto::maxRvecsz) {
781 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
782 pos += XrdProto::maxRvecsz;
783 n_chunks -= XrdProto::maxRvecsz;
784 }
785 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
786}
787
788//------------------------------------------------------------------------------
789
790int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
791{
792 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
793
794 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
795
796 if (rs < 0)
797 {
798 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
799 return rs;
800 }
801
802 if (rs != expected_size)
803 {
804 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
805 return -EIO;
806 }
807
808 return (int) rs;
809}
810
811//------------------------------------------------------------------------------
812
813int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
814{
815 // rrc_func is ONLY called from async processing.
816 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
817 // This streamlines implementation of synchronous IO::Read().
818
819 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
820
821 m_state_cond.Lock();
822
823 if (m_in_shutdown || io->m_in_detach)
824 {
825 m_state_cond.UnLock();
826 return m_in_shutdown ? -ENOENT : -EBADF;
827 }
828
829 // Shortcut -- file is fully downloaded.
830
831 if (m_cfi.IsComplete())
832 {
833 m_state_cond.UnLock();
834 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
835 if (ret > 0) {
836 XrdSysCondVarHelper _lck(m_state_cond);
837 m_delta_stats.AddBytesHit(ret);
838 check_delta_stats();
839 }
840 return ret;
841 }
842
843 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
844
845 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
846}
847
848//------------------------------------------------------------------------------
849
850int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
851{
852 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
853
854 m_state_cond.Lock();
855
856 if (m_in_shutdown || io->m_in_detach)
857 {
858 m_state_cond.UnLock();
859 return m_in_shutdown ? -ENOENT : -EBADF;
860 }
861
862 // Shortcut -- file is fully downloaded.
863
864 if (m_cfi.IsComplete())
865 {
866 m_state_cond.UnLock();
867 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
868 if (ret > 0) {
869 XrdSysCondVarHelper _lck(m_state_cond);
870 m_delta_stats.AddBytesHit(ret);
871 check_delta_stats();
872 }
873 return ret;
874 }
875
876 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
877}
878
879//------------------------------------------------------------------------------
880
881int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
882 ReadReqRH *rh, const char *tpfx)
883{
884 // Non-trivial processing for Read and ReadV.
885 // Entered under lock.
886 //
887 // loop over reqired blocks:
888 // - if on disk, ok;
889 // - if in ram or incoming, inc ref-count
890 // - otherwise request and inc ref count (unless RAM full => request direct)
891 // unlock
892
893 int prefetch_cnt = 0;
894
895 ReadRequest *read_req = nullptr;
896 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
897
898 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
899
900 std::vector<XrdOucIOVec> iovec_disk;
901 std::vector<XrdOucIOVec> iovec_direct;
902 int iovec_disk_total = 0;
903 int iovec_direct_total = 0;
904
905 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
906 {
907 const XrdOucIOVec &iov = readV[iov_idx];
908 long long iUserOff = iov.offset;
909 int iUserSize = iov.size;
910 char *iUserBuff = iov.data;
911
912 const int idx_first = iUserOff / m_block_size;
913 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
914
915 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
916
917 enum LastBlock_e { LB_other, LB_disk, LB_direct };
918
919 LastBlock_e lbe = LB_other;
920
921 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
922 {
923 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
924 BlockMap_i bi = m_block_map.find(block_idx);
925
926 // overlap and read
927 long long off; // offset in user buffer
928 long long blk_off; // offset in block
929 int size; // size to copy
930
931 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
932
933 // In RAM or incoming?
934 if (bi != m_block_map.end())
935 {
936 inc_ref_count(bi->second);
937 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
938
939 if (bi->second->is_finished())
940 {
941 // note, blocks with error should not be here !!!
942 // they should be either removed or reissued in ProcessBlockResponse()
943 assert(bi->second->is_ok());
944
945 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
946
947 if (bi->second->m_prefetch)
948 ++prefetch_cnt;
949 }
950 else
951 {
952 if ( ! read_req)
953 read_req = new ReadRequest(io, rh);
954
955 // We have a lock on state_cond --> as we register the request before releasing the lock,
956 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
957
958 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
959 ++read_req->m_n_chunk_reqs;
960 }
961
962 lbe = LB_other;
963 }
964 // On disk?
965 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
966 {
967 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
968
969 if (lbe == LB_disk)
970 iovec_disk.back().size += size;
971 else
972 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
973 iovec_disk_total += size;
974
975 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
976 ++prefetch_cnt;
977
978 lbe = LB_disk;
979 }
980 // Neither ... then we have to go get it ...
981 else
982 {
983 if ( ! read_req)
984 read_req = new ReadRequest(io, rh);
985
986 // Is there room for one more RAM Block?
987 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
988 if (b)
989 {
990 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
991 inc_ref_count(b);
992 blks_to_request.push_back(b);
993
994 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
995 ++read_req->m_n_chunk_reqs;
996
997 lbe = LB_other;
998 }
999 else // Nope ... read this directly without caching.
1000 {
1001 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
1002
1003 iovec_direct_total += size;
1004 read_req->m_direct_done = false;
1005
1006 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
1007 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
1008 // is determined in the RequestBlocksDirect().
1009 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
1010 iovec_direct.back().size += size;
1011 } else {
1012 long long in_offset = block_idx * m_block_size + blk_off;
1013 char *out_pos = iUserBuff + off;
1014 while (size > XrdProto::maxRVdsz) {
1015 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
1016 in_offset += XrdProto::maxRVdsz;
1017 out_pos += XrdProto::maxRVdsz;
1018 size -= XrdProto::maxRVdsz;
1019 }
1020 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1021 }
1022
1023 lbe = LB_direct;
1024 }
1025 }
1026 } // end for over blocks in an IOVec
1027 } // end for over readV IOVec
1028
1029 inc_prefetch_hit_cnt(prefetch_cnt);
1030
1031 m_state_cond.UnLock();
1032
1033 // First, send out remote requests for new blocks.
1034 if ( ! blks_to_request.empty())
1035 {
1036 ProcessBlockRequests(blks_to_request);
1037 blks_to_request.clear();
1038 }
1039
1040 // Second, send out remote direct read requests.
1041 if ( ! iovec_direct.empty())
1042 {
1043 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1044
1045 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
1046 }
1047
1048 // Begin synchronous part where we process data that is already in RAM or on disk.
1049
1050 long long bytes_read = 0;
1051 int error_cond = 0; // to be set to -errno
1052
1053 // Third, process blocks that are available in RAM.
1054 if ( ! blks_ready.empty())
1055 {
1056 for (auto &bvi : blks_ready)
1057 {
1058 for (auto &cr : bvi.second)
1059 {
1060 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
1061 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1062 bytes_read += cr.m_size;
1063 }
1064 }
1065 }
1066
1067 // Fourth, read blocks from disk.
1068 if ( ! iovec_disk.empty())
1069 {
1070 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1071 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1072 if (rc >= 0)
1073 {
1074 bytes_read += rc;
1075 }
1076 else
1077 {
1078 error_cond = rc;
1079 TRACEF(Error, tpfx << "failed read from disk");
1080 }
1081 }
1082
1083 // End synchronous part -- update with sync stats and determine actual state of this read.
1084 // Note: remote reads might have already finished during disk-read!
1085
1086 m_state_cond.Lock();
1087
1088 for (auto &bvi : blks_ready)
1089 dec_ref_count(bvi.first, (int) bvi.second.size());
1090
1091 if (read_req)
1092 {
1093 read_req->m_bytes_read += bytes_read;
1094 if (error_cond)
1095 read_req->update_error_cond(error_cond);
1096 read_req->m_stats.m_BytesHit += bytes_read;
1097 read_req->m_sync_done = true;
1098
1099 if (read_req->is_complete())
1100 {
1101 // Almost like FinalizeReadRequest(read_req) -- but no callout!
1102 m_delta_stats.AddReadStats(read_req->m_stats);
1103 check_delta_stats();
1104 m_state_cond.UnLock();
1105
1106 int ret = read_req->return_value();
1107 delete read_req;
1108 return ret;
1109 }
1110 else
1111 {
1112 m_state_cond.UnLock();
1113 return -EWOULDBLOCK;
1114 }
1115 }
1116 else
1117 {
1118 m_delta_stats.m_BytesHit += bytes_read;
1119 check_delta_stats();
1120 m_state_cond.UnLock();
1121
1122 // !!! No callout.
1123
1124 return error_cond ? error_cond : bytes_read;
1125 }
1126}
1127
1128
1129//==============================================================================
1130// WriteBlock and Sync
1131//==============================================================================
1132
1134{
1135 // write block buffer into disk file
1136 long long offset = b->m_offset - m_offset;
1137 long long size = b->get_size();
1138 ssize_t retval;
1139
1140 if (m_cfi.IsCkSumCache())
1141 if (b->has_cksums())
1142 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1143 else
1144 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1145 else
1146 retval = m_data_file->Write(b->get_buff(), offset, size);
1147
1148 if (retval < size)
1149 {
1150 if (retval < 0) {
1151 TRACEF(Error, "WriteToDisk() write error " << retval);
1152 } else {
1153 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1154 }
1155
1156 XrdSysCondVarHelper _lck(m_state_cond);
1157
1158 dec_ref_count(b);
1159
1160 return;
1161 }
1162
1163 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1164
1165 // Set written bit.
1166 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1167
1168 bool schedule_sync = false;
1169 {
1170 XrdSysCondVarHelper _lck(m_state_cond);
1171
1172 m_cfi.SetBitWritten(blk_idx);
1173
1174 if (b->m_prefetch)
1175 {
1176 m_cfi.SetBitPrefetch(blk_idx);
1177 }
1178 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1179 {
1180 m_cfi.ResetCkSumNet();
1181 }
1182
1183 // Set synced bit or stash block index if in actual sync.
1184 // Synced state is only written out to cinfo file when data file is synced.
1185 if (m_in_sync)
1186 {
1187 m_writes_during_sync.push_back(blk_idx);
1188 }
1189 else
1190 {
1191 m_cfi.SetBitSynced(blk_idx);
1192 ++m_non_flushed_cnt;
1193 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1194 ! m_in_shutdown)
1195 {
1196 schedule_sync = true;
1197 m_in_sync = true;
1198 m_non_flushed_cnt = 0;
1199 }
1200 }
1201 // As soon as the reference count is decreased on the block, the
1202 // file object may be deleted. Thus, to avoid holding both locks at a time,
1203 // we defer the ref count decrease until later if a sync is needed
1204 if (!schedule_sync) {
1205 dec_ref_count(b);
1206 }
1207 }
1208
1209 if (schedule_sync)
1210 {
1211 cache()->ScheduleFileSync(this);
1212 XrdSysCondVarHelper _lck(m_state_cond);
1213 dec_ref_count(b);
1214 }
1215}
1216
1217//------------------------------------------------------------------------------
1218
1220{
1221 TRACEF(Dump, "Sync()");
1222
1223 int ret = m_data_file->Fsync();
1224 bool errorp = false;
1225 if (ret == XrdOssOK)
1226 {
1227 Stats loc_stats;
1228 {
1229 XrdSysCondVarHelper _lck(&m_state_cond);
1230 report_and_merge_delta_stats();
1231 loc_stats = m_stats;
1232 }
1233 m_cfi.WriteIOStat(loc_stats);
1234 m_cfi.Write(m_info_file, m_filename.c_str());
1235 int cret = m_info_file->Fsync();
1236 if (cret != XrdOssOK)
1237 {
1238 TRACEF(Error, "Sync cinfo file sync error " << cret);
1239 errorp = true;
1240 }
1241 }
1242 else
1243 {
1244 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1245 errorp = true;
1246 }
1247
1248 if (errorp)
1249 {
1250 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1251
1252 // Unlink will also call this->initiate_emergency_shutdown()
1253 Cache::GetInstance().UnlinkFile(m_filename, false);
1254
1255 XrdSysCondVarHelper _lck(&m_state_cond);
1256
1257 m_writes_during_sync.clear();
1258 m_in_sync = false;
1259
1260 return;
1261 }
1262
1263 int written_while_in_sync;
1264 bool resync = false;
1265 {
1266 XrdSysCondVarHelper _lck(&m_state_cond);
1267 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1268 {
1269 m_cfi.SetBitSynced(*i);
1270 }
1271 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1272 m_writes_during_sync.clear();
1273
1274 // If there were writes during sync and the file is now complete,
1275 // let us call Sync again without resetting the m_in_sync flag.
1276 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1277 resync = true;
1278 else
1279 m_in_sync = false;
1280 }
1281 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1282
1283 if (resync)
1284 Sync();
1285}
1286
1287
1288//==============================================================================
1289// Block processing
1290//==============================================================================
1291
1292void File::free_block(Block* b)
1293{
1294 // Method always called under lock.
1295 int i = b->m_offset / m_block_size;
1296 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1297 size_t ret = m_block_map.erase(i);
1298 if (ret != 1)
1299 {
1300 // assert might be a better option than a warning
1301 TRACEF(Error, "free_block did not erase " << i << " from map");
1302 }
1303 else
1304 {
1305 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1306 delete b;
1307 }
1308
1309 if (m_prefetch_state == kHold && (int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1310 {
1311 m_prefetch_state = kOn;
1312 cache()->RegisterPrefetchFile(this);
1313 }
1314}
1315
1316//------------------------------------------------------------------------------
1317
1318bool File::select_current_io_or_disable_prefetching(bool skip_current)
1319{
1320 // Method always called under lock. It also expects prefetch to be active.
1321
1322 int io_size = (int) m_io_set.size();
1323 bool io_ok = false;
1324
1325 if (io_size == 1)
1326 {
1327 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1328 if (io_ok)
1329 {
1330 m_current_io = m_io_set.begin();
1331 }
1332 }
1333 else if (io_size > 1)
1334 {
1335 IoSet_i mi = m_current_io;
1336 if (skip_current && mi != m_io_set.end()) ++mi;
1337
1338 for (int i = 0; i < io_size; ++i)
1339 {
1340 if (mi == m_io_set.end()) mi = m_io_set.begin();
1341
1342 if ((*mi)->m_allow_prefetching)
1343 {
1344 m_current_io = mi;
1345 io_ok = true;
1346 break;
1347 }
1348 ++mi;
1349 }
1350 }
1351
1352 if ( ! io_ok)
1353 {
1354 m_current_io = m_io_set.end();
1355 m_prefetch_state = kStopped;
1356 cache()->DeRegisterPrefetchFile(this);
1357 }
1358
1359 return io_ok;
1360}
1361
1362//------------------------------------------------------------------------------
1363
1364void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1365{
1366 // Called from DirectResponseHandler.
1367 // NOT under lock.
1368
1369 if (error_cond)
1370 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1371
1372 m_state_cond.Lock();
1373
1374 if (error_cond)
1375 rreq->update_error_cond(error_cond);
1376 else {
1377 rreq->m_stats.m_BytesBypassed += bytes_read;
1378 rreq->m_bytes_read += bytes_read;
1379 }
1380
1381 rreq->m_direct_done = true;
1382
1383 bool rreq_complete = rreq->is_complete();
1384
1385 m_state_cond.UnLock();
1386
1387 if (rreq_complete)
1388 FinalizeReadRequest(rreq);
1389}
1390
1391void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1392{
1393 // Called from ProcessBlockResponse().
1394 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1395 // Does not manage m_read_req.
1396 // Will not complete the request.
1397
1398 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1399 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1400
1401 rreq->update_error_cond(b->get_error());
1402 --rreq->m_n_chunk_reqs;
1403
1404 dec_ref_count(b);
1405}
1406
1407void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1408{
1409 // Called from ProcessBlockResponse().
1410 // NOT under lock as it does memcopy ofor exisf block data.
1411 // Acquires lock for block, m_read_req and rreq state update.
1412
1413 ReadRequest *rreq = creq.m_read_req;
1414
1415 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1416 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1417
1418 m_state_cond.Lock();
1419
1420 rreq->m_bytes_read += creq.m_size;
1421
1422 if (b->get_req_id() == (void*) rreq)
1423 rreq->m_stats.m_BytesMissed += creq.m_size;
1424 else
1425 rreq->m_stats.m_BytesHit += creq.m_size;
1426
1427 --rreq->m_n_chunk_reqs;
1428
1429 if (b->m_prefetch)
1430 inc_prefetch_hit_cnt(1);
1431
1432 dec_ref_count(b);
1433
1434 bool rreq_complete = rreq->is_complete();
1435
1436 m_state_cond.UnLock();
1437
1438 if (rreq_complete)
1439 FinalizeReadRequest(rreq);
1440}
1441
1442void File::FinalizeReadRequest(ReadRequest *rreq)
1443{
1444 // called from ProcessBlockResponse()
1445 // NOT under lock -- does callout
1446 {
1447 XrdSysCondVarHelper _lck(m_state_cond);
1448 m_delta_stats.AddReadStats(rreq->m_stats);
1449 check_delta_stats();
1450 }
1451
1452 rreq->m_rh->Done(rreq->return_value());
1453 delete rreq;
1454}
1455
1456void File::ProcessBlockResponse(Block *b, int res)
1457{
1458 static const char* tpfx = "ProcessBlockResponse ";
1459
1460 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1461
1462 if (res >= 0 && res != b->get_size())
1463 {
1464 // Incorrect number of bytes received, apparently size of the file on the remote
1465 // is different than what the cache expects it to be.
1466 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1467 Cache::GetInstance().UnlinkFile(m_filename, false);
1468 }
1469
1470 m_state_cond.Lock();
1471
1472 // Deregister block from IO's prefetch count, if needed.
1473 if (b->m_prefetch)
1474 {
1475 IO *io = b->get_io();
1476 IoSet_i mi = m_io_set.find(io);
1477 if (mi != m_io_set.end())
1478 {
1479 --io->m_active_prefetches;
1480
1481 // If failed and IO is still prefetching -- disable prefetching on this IO.
1482 if (res < 0 && io->m_allow_prefetching)
1483 {
1484 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1485 io->m_allow_prefetching = false;
1486
1487 // Check if any IO is still available for prfetching. If not, stop it.
1488 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1489 {
1490 if ( ! select_current_io_or_disable_prefetching(false) )
1491 {
1492 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1493 }
1494 }
1495 }
1496
1497 // If failed with no subscribers -- delete the block and exit.
1498 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1499 {
1500 free_block(b);
1501 m_state_cond.UnLock();
1502 return;
1503 }
1504 m_prefetch_bytes += b->get_size();
1505 }
1506 else
1507 {
1508 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1509 }
1510 }
1511
1512 if (res == b->get_size())
1513 {
1514 b->set_downloaded();
1515 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1516 if ( ! m_in_shutdown)
1517 {
1518 // Increase ref-count for the writer.
1519 inc_ref_count(b);
1520 m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1521 // No check for writes, report-and-merge forced during Sync().
1522 cache()->AddWriteTask(b, true);
1523 }
1524
1525 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1526 vChunkRequest_t creqs_to_notify;
1527 creqs_to_notify.swap( b->m_chunk_reqs );
1528
1529 m_state_cond.UnLock();
1530
1531 for (auto &creq : creqs_to_notify)
1532 {
1533 ProcessBlockSuccess(b, creq);
1534 }
1535 }
1536 else
1537 {
1538 if (res < 0) {
1539 bool new_error = b->get_io()->register_block_error(res);
1540 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1541 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1542 << ", io=" << b->get_io() << ", error=" << res);
1543 } else {
1544 bool first_p = b->get_io()->register_incomplete_read();
1545 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1546 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1547 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1548#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1549 res = -EIO;
1550#else
1551 res = -EREMOTEIO;
1552#endif
1553 }
1554 b->set_error(res);
1555
1556 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1557 // Collect others with a different IO, the first of them will be used to reissue the request.
1558 // This is then done outside of lock.
1559 std::list<ReadRequest*> rreqs_to_complete;
1560 vChunkRequest_t creqs_to_keep;
1561
1562 for(ChunkRequest &creq : b->m_chunk_reqs)
1563 {
1564 ReadRequest *rreq = creq.m_read_req;
1565
1566 if (rreq->m_io == b->get_io())
1567 {
1568 ProcessBlockError(b, rreq);
1569 if (rreq->is_complete())
1570 {
1571 rreqs_to_complete.push_back(rreq);
1572 }
1573 }
1574 else
1575 {
1576 creqs_to_keep.push_back(creq);
1577 }
1578 }
1579
1580 bool reissue = false;
1581 if ( ! creqs_to_keep.empty())
1582 {
1583 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1584
1585 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1586 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1587
1588 b->reset_error_and_set_io(rreq->m_io, rreq);
1589 b->m_chunk_reqs.swap( creqs_to_keep );
1590 reissue = true;
1591 }
1592
1593 m_state_cond.UnLock();
1594
1595 for (auto rreq : rreqs_to_complete)
1596 FinalizeReadRequest(rreq);
1597
1598 if (reissue)
1599 ProcessBlockRequest(b);
1600 }
1601}
1602
1603//------------------------------------------------------------------------------
1604
1605const char* File::lPath() const
1606{
1607 return m_filename.c_str();
1608}
1609
1610//------------------------------------------------------------------------------
1611
1612int File::offsetIdx(int iIdx) const
1613{
1614 return iIdx - m_offset/m_block_size;
1615}
1616
1617
1618//------------------------------------------------------------------------------
1619
1621{
1622 // Check that block is not on disk and not in RAM.
1623 // TODO: Could prefetch several blocks at once!
1624 // blks_max could be an argument
1625
1626 BlockList_t blks;
1627
1628 TRACEF(DumpXL, "Prefetch() entering.");
1629 {
1630 XrdSysCondVarHelper _lck(m_state_cond);
1631
1632 if (m_prefetch_state != kOn)
1633 {
1634 return;
1635 }
1636
1637 if ( ! select_current_io_or_disable_prefetching(true) )
1638 {
1639 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1640 return;
1641 }
1642
1643 // Select block(s) to fetch.
1644 for (int f = 0; f < m_num_blocks; ++f)
1645 {
1646 if ( ! m_cfi.TestBitWritten(f))
1647 {
1648 int f_act = f + m_offset / m_block_size;
1649
1650 BlockMap_i bi = m_block_map.find(f_act);
1651 if (bi == m_block_map.end())
1652 {
1653 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1654 if (b)
1655 {
1656 TRACEF(Dump, "Prefetch take block " << f_act);
1657 blks.push_back(b);
1658 // Note: block ref_cnt not increased, it will be when placed into write queue.
1659
1660 inc_prefetch_read_cnt(1);
1661 }
1662 else
1663 {
1664 // This shouldn't happen as prefetching stops when RAM is 70% full.
1665 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1666 }
1667 break;
1668 }
1669 }
1670 }
1671
1672 if (blks.empty())
1673 {
1674 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1675 m_prefetch_state = kComplete;
1676 cache()->DeRegisterPrefetchFile(this);
1677 }
1678 else
1679 {
1680 (*m_current_io)->m_active_prefetches += (int) blks.size();
1681 }
1682 }
1683
1684 if ( ! blks.empty())
1685 {
1686 ProcessBlockRequests(blks);
1687 }
1688}
1689
1690
1691//------------------------------------------------------------------------------
1692
1694{
1695 return m_prefetch_score;
1696}
1697
1699{
1700 return Cache::TheOne().GetLog();
1701}
1702
1704{
1705 return Cache::TheOne().GetTrace();
1706}
1707
1708void File::insert_remote_location(const std::string &loc)
1709{
1710 if ( ! loc.empty())
1711 {
1712 size_t p = loc.find_first_of('@');
1713 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1714 }
1715}
1716
1717std::string File::GetRemoteLocations() const
1718{
1719 std::string s;
1720 if ( ! m_remote_locations.empty())
1721 {
1722 size_t sl = 0;
1723 int nl = 0;
1724 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1725 {
1726 sl += i->size();
1727 }
1728 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1729 s = '[';
1730 int j = 1;
1731 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1732 {
1733 s += '"'; s += *i; s += '"';
1734 if (j < nl) s += ',';
1735 }
1736 s += ']';
1737 }
1738 else
1739 {
1740 s = "[]";
1741 }
1742 return s;
1743}
1744
1745//==============================================================================
1746//======================= RESPONSE HANDLERS ==============================
1747//==============================================================================
1748
1750{
1751 m_block->m_file->ProcessBlockResponse(m_block, res);
1752 delete this;
1753}
1754
1755//------------------------------------------------------------------------------
1756
1758{
1759 m_mutex.Lock();
1760
1761 int n_left = --m_to_wait;
1762
1763 if (res < 0) {
1764 if (m_errno == 0) m_errno = res; // store first reported error
1765 } else {
1766 m_bytes_read += res;
1767 }
1768
1769 m_mutex.UnLock();
1770
1771 if (n_left == 0)
1772 {
1773 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1774 delete this;
1775 }
1776}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:163
XrdSysError * GetLog() const
Definition XrdPfc.hh:294
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:295
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
static const Cache & TheOne()
Definition XrdPfc.cc:133
XrdOss * GetOss() const
Definition XrdPfc.hh:280
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
bool register_incomplete_read()
Definition XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
bool register_block_error(int res)
Definition XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70
const char * GetLocation()
Definition XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
XrdPosixStats Stats
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:115
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:116
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:80
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
Definition XrdPfc.hh:119
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:73
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
Definition XrdPfc.hh:112
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_bufferSize
cache block size, default 128 kB
Definition XrdPfc.hh:107
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
Definition XrdPfc.hh:114
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
Definition XrdPfc.hh:117
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
Definition XrdPfc.hh:118
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68