45const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
51const char *File::m_traceID =
"File";
55File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
59 m_cfi(
Cache::TheOne().GetTrace(),
Cache::TheOne().is_prefetch_enabled()),
62 m_file_size(iFileSize),
63 m_current_io(m_io_set.end()),
67 m_detach_time_logged(false),
73 m_prefetch_state(kOff),
75 m_prefetch_read_cnt(0),
76 m_prefetch_hit_cnt(0),
103 m_info_file->Close();
105 m_info_file =
nullptr;
111 m_data_file->Close();
113 m_data_file =
nullptr;
116 if (m_resmon_token >= 0)
121 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
124 if (sr == 0 && s.st_blocks != m_st_blocks) {
127 m_st_blocks = s.st_blocks;
135 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
142 File *file =
new File(path, offset, fileSize);
143 if ( ! file->Open(inputIO))
168 m_in_shutdown =
true;
170 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
172 m_prefetch_state = kStopped;
173 cache()->DeRegisterPrefetchFile(
this);
176 report_and_merge_delta_stats();
183void File::check_delta_stats()
188 report_and_merge_delta_stats();
191void File::report_and_merge_delta_stats()
195 m_data_file->
Fstat(&s);
198 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
200 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
202 m_st_blocks = st_blocks_to_report;
204 m_stats.
AddUp(m_delta_stats);
205 m_delta_stats.
Reset();
212 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
220 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
224 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
236 insert_remote_location(loc);
252 IoSet_i mi = m_io_set.find(io);
254 if (mi != m_io_set.end())
259 ", active_reads " << n_active_reads <<
260 ", active_prefetches " << io->m_active_prefetches <<
261 ", allow_prefetching " << io->m_allow_prefetching <<
262 ", ios_in_detach " << m_ios_in_detach);
264 "\tio_map.size() " << m_io_set.size() <<
265 ", block_map.size() " << m_block_map.size() <<
", file");
267 insert_remote_location(loc);
269 io->m_allow_prefetching =
false;
270 io->m_in_detach =
true;
273 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
275 if ( ! select_current_io_or_disable_prefetching(
false) )
277 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
284 bool io_active_result;
286 if (n_active_reads > 0)
288 io_active_result =
true;
290 else if (m_io_set.size() - m_ios_in_detach == 1)
292 io_active_result = ! m_block_map.empty();
296 io_active_result = io->m_active_prefetches > 0;
299 if ( ! io_active_result)
304 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
306 return io_active_result;
310 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
321 m_detach_time_logged =
false;
330 if ( ! m_in_shutdown)
332 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
334 report_and_merge_delta_stats();
335 m_cfi.WriteIOStatDetach(m_stats);
336 m_detach_time_logged =
true;
338 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
342 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
354 time_t now = time(0);
359 IoSet_i mi = m_io_set.find(io);
361 if (mi == m_io_set.end())
364 io->m_attach_time = now;
365 m_delta_stats.IoAttach();
367 insert_remote_location(loc);
369 if (m_prefetch_state == kStopped)
371 m_prefetch_state = kOn;
372 cache()->RegisterPrefetchFile(
this);
377 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
380 m_state_cond.UnLock();
391 time_t now = time(0);
395 IoSet_i mi = m_io_set.find(io);
397 if (mi != m_io_set.end())
399 if (mi == m_current_io)
404 m_delta_stats.IoDetach(now - io->m_attach_time);
408 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
410 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
411 m_prefetch_state = kStopped;
412 cache()->DeRegisterPrefetchFile(
this);
417 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
420 m_state_cond.UnLock();
429 static const char *tpfx =
"Open() ";
431 TRACEF(Dump, tpfx <<
"entered");
442 struct stat data_stat, info_stat;
446 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
447 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
450 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
451 myEnv.
Put(
"oss.asize", size_str);
463 m_data_file = myOss.
newFile(myUser);
464 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
468 delete m_data_file; m_data_file = 0;
472 myEnv.
Put(
"oss.asize",
"64k");
478 m_data_file->Close();
delete m_data_file; m_data_file = 0;
482 m_info_file = myOss.
newFile(myUser);
483 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
487 delete m_info_file; m_info_file = 0;
488 m_data_file->Close();
delete m_data_file; m_data_file = 0;
492 bool initialize_info_file =
true;
494 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
496 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
497 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
498 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
499 ", block_size=" << (m_cfi.GetBufferSize() >> 10) <<
"k)");
502 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
504 initialize_info_file =
false;
506 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
507 m_cfi.ResetAllAccessStats();
508 m_data_file->Ftruncate(0);
513 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.
get_cs_Chk())
518 TRACEF(Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
519 initialize_info_file =
true;
520 m_cfi.ResetAllAccessStats();
521 m_data_file->Ftruncate(0);
534 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
537 if (initialize_info_file)
539 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
541 m_cfi.ResetNoCkSumTime();
542 m_cfi.Write(m_info_file, ifn.c_str());
543 m_info_file->Fsync();
544 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
545 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.GetNBlocks()
546 <<
" block size = " << pfc_blocksize);
550 if (futimens(m_info_file->getFD(), NULL)) {
554 TRACEF(Info, tpfx <<
"URL CGI pfc.blocksize ignored for an already existing file");
558 m_cfi.WriteIOStatAttach();
560 m_block_size = m_cfi.GetBufferSize();
561 m_num_blocks = m_cfi.GetNBlocks();
562 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped;
563 m_prefetch_max_blocks_in_flight = pfc_prefetch;
565 TRACEF(
Debug, tpfx <<
"pfc.prefetch set to " << pfc_prefetch <<
" via CGI parameter");
567 m_data_file->Fstat(&data_stat);
568 m_st_blocks = data_stat.st_blocks;
571 constexpr long long MB = 1024 * 1024;
572 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
576 m_state_cond.UnLock();
581void File::parse_pfc_url_args(XrdOucCacheIO* inputIO,
long long &pfc_blocksize,
int &pfc_prefetch)
const
585 XrdCl::URL url(inputIO->
Path());
586 auto const & urlp = url.GetParams();
588 auto extract = [&](
const std::string &key, std::string &value) ->
bool {
589 auto it = urlp.find(key);
590 if (it != urlp.end()) {
602 const char *tpfx =
"File::Open::urlcgi pfc.blocksize ";
604 if (
Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
607 pfc_blocksize = bsize;
609 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
614 const char *tpfx =
"File::Open::urlcgi pfc.prefetch ";
616 if (
Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
621 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
639 if ((res = m_data_file->Fstat(&sbuff)))
return res;
641 sbuff.st_size = m_file_size;
643 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
654bool File::overlap(
int blk,
663 const long long beg = blk * blk_size;
664 const long long end = beg + blk_size;
665 const long long req_end = req_off + req_size;
667 if (req_off < end && req_end > beg)
669 const long long ovlp_beg = std::max(beg, req_off);
670 const long long ovlp_end = std::min(end, req_end);
672 off = ovlp_beg - req_off;
673 blk_off = ovlp_beg - beg;
674 size = (int) (ovlp_end - ovlp_beg);
676 assert(size <= blk_size);
687Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
695 const long long off = i * m_block_size;
696 const int last_block = m_num_blocks - 1;
697 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
699 int blk_size, req_size;
700 if (i == last_block) {
701 blk_size = req_size = m_file_size - off;
702 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
704 blk_size = req_size = m_block_size;
708 char *buf = cache()->RequestRAM(req_size);
712 b =
new (std::nothrow) Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
720 if (m_prefetch_state == kOn && (
int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
722 m_prefetch_state = kHold;
723 cache()->DeRegisterPrefetchFile(
this);
728 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
735void File::ProcessBlockRequest(
Block *b)
743 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
744 b->
get_offset()/m_block_size, (
void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (
void*)b->get_buff(), (
void*)brh);
745 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
761 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
763 ProcessBlockRequest(*bi);
769void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
771 int n_chunks = ioVec.size();
774 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
775 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
785 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
790int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
792 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
794 long long rs = m_data_file->ReadV(ioVec.data(), (
int) ioVec.size());
798 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
802 if (rs != expected_size)
804 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
823 if (m_in_shutdown || io->m_in_detach)
825 m_state_cond.UnLock();
826 return m_in_shutdown ? -ENOENT : -EBADF;
831 if (m_cfi.IsComplete())
833 m_state_cond.UnLock();
834 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
837 m_delta_stats.AddBytesHit(ret);
843 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
845 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
852 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
856 if (m_in_shutdown || io->m_in_detach)
858 m_state_cond.UnLock();
859 return m_in_shutdown ? -ENOENT : -EBADF;
864 if (m_cfi.IsComplete())
866 m_state_cond.UnLock();
867 int ret = m_data_file->ReadV(
const_cast<XrdOucIOVec*
>(readV), readVnum);
870 m_delta_stats.AddBytesHit(ret);
876 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
881int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
893 int prefetch_cnt = 0;
898 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
900 std::vector<XrdOucIOVec> iovec_disk;
901 std::vector<XrdOucIOVec> iovec_direct;
902 int iovec_disk_total = 0;
903 int iovec_direct_total = 0;
905 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
912 const int idx_first = iUserOff / m_block_size;
913 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
915 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
917 enum LastBlock_e { LB_other, LB_disk, LB_direct };
919 LastBlock_e lbe = LB_other;
921 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
924 BlockMap_i bi = m_block_map.find(block_idx);
931 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
934 if (bi != m_block_map.end())
936 inc_ref_count(bi->second);
937 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
939 if (bi->second->is_finished())
943 assert(bi->second->is_ok());
945 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
947 if (bi->second->m_prefetch)
953 read_req =
new ReadRequest(io, rh);
958 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
965 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
967 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
970 iovec_disk.back().size += size;
972 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
973 iovec_disk_total += size;
975 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
984 read_req =
new ReadRequest(io, rh);
987 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
990 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
992 blks_to_request.push_back(b);
994 b->
m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
1001 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
1003 iovec_direct_total += size;
1010 iovec_direct.back().size += size;
1012 long long in_offset = block_idx * m_block_size + blk_off;
1013 char *out_pos = iUserBuff + off;
1020 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1029 inc_prefetch_hit_cnt(prefetch_cnt);
1031 m_state_cond.UnLock();
1034 if ( ! blks_to_request.empty())
1036 ProcessBlockRequests(blks_to_request);
1037 blks_to_request.clear();
1041 if ( ! iovec_direct.empty())
1043 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1045 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
1050 long long bytes_read = 0;
1054 if ( ! blks_ready.empty())
1056 for (
auto &bvi : blks_ready)
1058 for (
auto &cr : bvi.second)
1060 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
1061 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1062 bytes_read += cr.m_size;
1068 if ( ! iovec_disk.empty())
1070 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1071 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1086 m_state_cond.Lock();
1088 for (
auto &bvi : blks_ready)
1089 dec_ref_count(bvi.first, (
int) bvi.second.size());
1102 m_delta_stats.AddReadStats(read_req->
m_stats);
1103 check_delta_stats();
1104 m_state_cond.UnLock();
1112 m_state_cond.UnLock();
1113 return -EWOULDBLOCK;
1118 m_delta_stats.m_BytesHit += bytes_read;
1119 check_delta_stats();
1120 m_state_cond.UnLock();
1124 return error_cond ? error_cond : bytes_read;
1136 long long offset = b->
m_offset - m_offset;
1140 if (m_cfi.IsCkSumCache())
1144 retval = m_data_file->pgWrite(b->
get_buff(), offset, size, 0, 0);
1146 retval = m_data_file->Write(b->
get_buff(), offset, size);
1151 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1153 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1163 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1166 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1168 bool schedule_sync =
false;
1172 m_cfi.SetBitWritten(blk_idx);
1176 m_cfi.SetBitPrefetch(blk_idx);
1180 m_cfi.ResetCkSumNet();
1187 m_writes_during_sync.push_back(blk_idx);
1191 m_cfi.SetBitSynced(blk_idx);
1192 ++m_non_flushed_cnt;
1193 if ((m_cfi.IsComplete() || m_non_flushed_cnt >=
Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1196 schedule_sync =
true;
1198 m_non_flushed_cnt = 0;
1204 if (!schedule_sync) {
1211 cache()->ScheduleFileSync(
this);
1223 int ret = m_data_file->Fsync();
1224 bool errorp =
false;
1230 report_and_merge_delta_stats();
1231 loc_stats = m_stats;
1233 m_cfi.WriteIOStat(loc_stats);
1234 m_cfi.Write(m_info_file, m_filename.c_str());
1235 int cret = m_info_file->Fsync();
1238 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1244 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1250 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1257 m_writes_during_sync.clear();
1263 int written_while_in_sync;
1264 bool resync =
false;
1267 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1269 m_cfi.SetBitSynced(*i);
1271 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1272 m_writes_during_sync.clear();
1276 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1281 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1292void File::free_block(
Block* b)
1295 int i = b->
m_offset / m_block_size;
1296 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1297 size_t ret = m_block_map.erase(i);
1301 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1309 if (m_prefetch_state == kHold && (
int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1311 m_prefetch_state = kOn;
1312 cache()->RegisterPrefetchFile(
this);
1318bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1322 int io_size = (int) m_io_set.size();
1327 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1330 m_current_io = m_io_set.begin();
1333 else if (io_size > 1)
1335 IoSet_i mi = m_current_io;
1336 if (skip_current && mi != m_io_set.end()) ++mi;
1338 for (
int i = 0; i < io_size; ++i)
1340 if (mi == m_io_set.end()) mi = m_io_set.begin();
1342 if ((*mi)->m_allow_prefetching)
1354 m_current_io = m_io_set.end();
1355 m_prefetch_state = kStopped;
1356 cache()->DeRegisterPrefetchFile(
this);
1364void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1370 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1372 m_state_cond.Lock();
1385 m_state_cond.UnLock();
1388 FinalizeReadRequest(rreq);
1415 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1418 m_state_cond.Lock();
1423 rreq->m_stats.m_BytesMissed += creq.
m_size;
1425 rreq->m_stats.m_BytesHit += creq.
m_size;
1427 --rreq->m_n_chunk_reqs;
1430 inc_prefetch_hit_cnt(1);
1434 bool rreq_complete = rreq->is_complete();
1436 m_state_cond.UnLock();
1439 FinalizeReadRequest(rreq);
1447 XrdSysCondVarHelper _lck(m_state_cond);
1448 m_delta_stats.AddReadStats(rreq->
m_stats);
1449 check_delta_stats();
1456void File::ProcessBlockResponse(
Block *b,
int res)
1458 static const char* tpfx =
"ProcessBlockResponse ";
1460 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1462 if (res >= 0 && res != b->
get_size())
1466 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1470 m_state_cond.Lock();
1476 IoSet_i mi = m_io_set.find(io);
1477 if (mi != m_io_set.end())
1479 --io->m_active_prefetches;
1482 if (res < 0 && io->m_allow_prefetching)
1484 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1485 io->m_allow_prefetching =
false;
1488 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1490 if ( ! select_current_io_or_disable_prefetching(
false) )
1492 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1498 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1501 m_state_cond.UnLock();
1515 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1516 if ( ! m_in_shutdown)
1522 cache()->AddWriteTask(b,
true);
1529 m_state_cond.UnLock();
1531 for (
auto &creq : creqs_to_notify)
1533 ProcessBlockSuccess(b, creq);
1542 <<
", io=" << b->
get_io() <<
", error=" << res);
1547 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1548#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1559 std::list<ReadRequest*> rreqs_to_complete;
1568 ProcessBlockError(b, rreq);
1571 rreqs_to_complete.push_back(rreq);
1576 creqs_to_keep.push_back(creq);
1580 bool reissue =
false;
1581 if ( ! creqs_to_keep.empty())
1583 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1585 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1586 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1593 m_state_cond.UnLock();
1595 for (
auto rreq : rreqs_to_complete)
1596 FinalizeReadRequest(rreq);
1599 ProcessBlockRequest(b);
1607 return m_filename.c_str();
1612int File::offsetIdx(
int iIdx)
const
1614 return iIdx - m_offset/m_block_size;
1628 TRACEF(DumpXL,
"Prefetch() entering.");
1632 if (m_prefetch_state != kOn)
1637 if ( ! select_current_io_or_disable_prefetching(
true) )
1639 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1644 for (
int f = 0; f < m_num_blocks; ++f)
1646 if ( ! m_cfi.TestBitWritten(f))
1648 int f_act = f + m_offset / m_block_size;
1650 BlockMap_i bi = m_block_map.find(f_act);
1651 if (bi == m_block_map.end())
1653 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1656 TRACEF(Dump,
"Prefetch take block " << f_act);
1660 inc_prefetch_read_cnt(1);
1665 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1674 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1675 m_prefetch_state = kComplete;
1676 cache()->DeRegisterPrefetchFile(
this);
1680 (*m_current_io)->m_active_prefetches += (int) blks.size();
1684 if ( ! blks.empty())
1686 ProcessBlockRequests(blks);
1695 return m_prefetch_score;
1708void File::insert_remote_location(
const std::string &loc)
1712 size_t p = loc.find_first_of(
'@');
1713 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1720 if ( ! m_remote_locations.empty())
1724 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1728 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1731 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1733 s +=
'"'; s += *i; s +=
'"';
1734 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Fstat(struct stat *buf)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
XrdSysError * GetLog() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
XrdSysTrace * GetTrace() const
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
static const Cache & TheOne()
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
cache block size, default 128 kB
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
std::string m_meta_space
oss space for metadata files (cinfo)
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
std::string m_username
username passed to oss plugin
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
void update_error_cond(int ec)