XRootD
Loading...
Searching...
No Matches
XrdPfc::File Class Reference

#include <XrdPfcFile.hh>

+ Collaboration diagram for XrdPfc::File:

Public Member Functions

void AddIO (IO *io)
 
void BlockRemovedFromWriteQ (Block *)
 Handle removal of a block from Cache's write queue.
 
void BlocksRemovedFromWriteQ (std::list< Block * > &)
 Handle removal of a set of blocks from Cache's write queue.
 
int dec_ref_cnt ()
 
bool FinalizeSyncBeforeExit ()
 Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
 
int Fstat (struct stat &sbuff)
 
int get_ref_cnt ()
 
size_t GetAccessCnt () const
 
int GetBlockSize () const
 
long long GetFileSize () const
 
const Info::AStatGetLastAccessStats () const
 
const std::string & GetLocalPath () const
 
XrdSysErrorGetLog () const
 
int GetNBlocks () const
 
int GetNDownloadedBlocks () const
 
int GetPrefetchCountOnIO (IO *io)
 
long long GetPrefetchedBytes () const
 
float GetPrefetchScore () const
 
std::string GetRemoteLocations () const
 
XrdSysTraceGetTrace () const
 
int inc_ref_cnt ()
 
long long initiate_emergency_shutdown ()
 
bool ioActive (IO *io)
 Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
 
void ioUpdated (IO *io)
 Notification from IO that it has been updated (remote open).
 
bool is_in_emergency_shutdown ()
 
const char * lPath () const
 Log path.
 
void Prefetch ()
 
int Read (IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
 Normal read.
 
int ReadV (IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
 Vector read.
 
const StatsRefStats () const
 
void RemoveIO (IO *io)
 
void RequestSyncOfDetachStats ()
 Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
 
void StopPrefetchingOnIO (IO *io)
 
void Sync ()
 Sync file cache inf o and output data with disk.
 
void WriteBlockToDisk (Block *b)
 

Static Public Member Functions

static FileFileOpen (const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
 Static constructor that also does Open. Returns null ptr if Open fails.
 

Friends

class BlockResponseHandler
 
class Cache
 
class DirectResponseHandler
 

Detailed Description

Definition at line 202 of file XrdPfcFile.hh.

Member Function Documentation

◆ AddIO()

void File::AddIO ( IO * io)

Definition at line 348 of file XrdPfcFile.cc.

349{
350 // Called from Cache::GetFile() when a new IO asks for the file.
351
352 TRACEF(Debug, "AddIO() io = " << (void*)io);
353
354 time_t now = time(0);
355 std::string loc(io->GetLocation());
356
357 m_state_cond.Lock();
358
359 IoSet_i mi = m_io_set.find(io);
360
361 if (mi == m_io_set.end())
362 {
363 m_io_set.insert(io);
364 io->m_attach_time = now;
365 m_delta_stats.IoAttach();
366
367 insert_remote_location(loc);
368
369 if (m_prefetch_state == kStopped)
370 {
371 m_prefetch_state = kOn;
372 cache()->RegisterPrefetchFile(this);
373 }
374 }
375 else
376 {
377 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
378 }
379
380 m_state_cond.UnLock();
381}
#define TRACEF(act, x)
bool Debug
const char * GetLocation()
Definition XrdPfcIO.hh:44

References Debug, Error, XrdPfc::IO::GetLocation(), and TRACEF.

Referenced by XrdPfc::Cache::GetFile().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ BlockRemovedFromWriteQ()

void File::BlockRemovedFromWriteQ ( Block * b)

Handle removal of a block from Cache's write queue.

Definition at line 210 of file XrdPfcFile.cc.

211{
212 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
213
214 XrdSysCondVarHelper _lck(m_state_cond);
215 dec_ref_count(b);
216}
long long m_offset

References XrdPfc::Block::m_offset, and TRACEF.

◆ BlocksRemovedFromWriteQ()

void File::BlocksRemovedFromWriteQ ( std::list< Block * > & blocks)

Handle removal of a set of blocks from Cache's write queue.

Definition at line 218 of file XrdPfcFile.cc.

219{
220 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
221
222 XrdSysCondVarHelper _lck(m_state_cond);
223
224 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
225 {
226 dec_ref_count(*i);
227 }
228}

References TRACEF.

Referenced by XrdPfc::Cache::RemoveWriteQEntriesFor().

+ Here is the caller graph for this function:

◆ dec_ref_cnt()

int XrdPfc::File::dec_ref_cnt ( )
inline

Definition at line 288 of file XrdPfcFile.hh.

288{ return --m_ref_cnt; }

◆ FileOpen()

File * File::FileOpen ( const std::string & path,
long long offset,
long long fileSize,
XrdOucCacheIO * inputIO )
static

Static constructor that also does Open. Returns null ptr if Open fails.

Definition at line 140 of file XrdPfcFile.cc.

141{
142 File *file = new File(path, offset, fileSize);
143 if ( ! file->Open(inputIO))
144 {
145 delete file;
146 file = 0;
147 }
148 return file;
149}
XrdOucString File

References File.

Referenced by XrdPfc::Cache::GetFile().

+ Here is the caller graph for this function:

◆ FinalizeSyncBeforeExit()

bool File::FinalizeSyncBeforeExit ( )

Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.

Definition at line 324 of file XrdPfcFile.cc.

325{
326 // Returns true if sync is required.
327 // This method is called after corresponding IO is detached from PosixCache.
328
329 XrdSysCondVarHelper _lck(m_state_cond);
330 if ( ! m_in_shutdown)
331 {
332 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
333 {
334 report_and_merge_delta_stats();
335 m_cfi.WriteIOStatDetach(m_stats);
336 m_detach_time_logged = true;
337 m_in_sync = true;
338 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
339 return true;
340 }
341 }
342 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
343 return false;
344}

References Debug, and TRACEF.

◆ Fstat()

int File::Fstat ( struct stat & sbuff)

Definition at line 628 of file XrdPfcFile.cc.

629{
630 // Stat on an open file.
631 // Corrects size to actual full size of the file.
632 // Sets atime to 0 if the file is only partially downloaded, in accordance
633 // with pfc.onlyifcached settings.
634 // Called from IO::Fstat() and Cache::Stat() when the file is active.
635 // Returns 0 on success, -errno on error.
636
637 int res;
638
639 if ((res = m_data_file->Fstat(&sbuff))) return res;
640
641 sbuff.st_size = m_file_size;
642
643 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
644 if ( ! is_cached)
645 sbuff.st_atime = 0;
646
647 return 0;
648}

References stat.

Referenced by XrdPfc::Cache::ConsiderCached(), and XrdPfc::Cache::Stat().

+ Here is the caller graph for this function:

◆ get_ref_cnt()

int XrdPfc::File::get_ref_cnt ( )
inline

Definition at line 286 of file XrdPfcFile.hh.

286{ return m_ref_cnt; }

◆ GetAccessCnt()

size_t XrdPfc::File::GetAccessCnt ( ) const
inline

Definition at line 276 of file XrdPfcFile.hh.

276{ return m_cfi.GetAccessCnt(); }

◆ GetBlockSize()

int XrdPfc::File::GetBlockSize ( ) const
inline

Definition at line 277 of file XrdPfcFile.hh.

277{ return m_cfi.GetBufferSize(); }

◆ GetFileSize()

long long XrdPfc::File::GetFileSize ( ) const
inline

Definition at line 267 of file XrdPfcFile.hh.

267{ return m_file_size; }

◆ GetLastAccessStats()

const Info::AStat * XrdPfc::File::GetLastAccessStats ( ) const
inline

Definition at line 275 of file XrdPfcFile.hh.

275{ return m_cfi.GetLastAccessStats(); }

◆ GetLocalPath()

const std::string & XrdPfc::File::GetLocalPath ( ) const
inline

Definition at line 262 of file XrdPfcFile.hh.

262{ return m_filename; }

Referenced by XrdPfc::Cache::AddWriteTask(), and XrdPfc::Cache::ReleaseFile().

+ Here is the caller graph for this function:

◆ GetLog()

XrdSysError * File::GetLog ( ) const

Definition at line 1698 of file XrdPfcFile.cc.

1699{
1700 return Cache::TheOne().GetLog();
1701}
XrdSysError * GetLog() const
Definition XrdPfc.hh:294
static const Cache & TheOne()
Definition XrdPfc.cc:133

References XrdPfc::Cache::GetLog(), and XrdPfc::Cache::TheOne().

+ Here is the call graph for this function:

◆ GetNBlocks()

int XrdPfc::File::GetNBlocks ( ) const
inline

Definition at line 278 of file XrdPfcFile.hh.

278{ return m_cfi.GetNBlocks(); }

◆ GetNDownloadedBlocks()

int XrdPfc::File::GetNDownloadedBlocks ( ) const
inline

Definition at line 279 of file XrdPfcFile.hh.

279{ return m_cfi.GetNDownloadedBlocks(); }

◆ GetPrefetchCountOnIO()

int XrdPfc::File::GetPrefetchCountOnIO ( IO * io)

◆ GetPrefetchedBytes()

long long XrdPfc::File::GetPrefetchedBytes ( ) const
inline

Definition at line 280 of file XrdPfcFile.hh.

280{ return m_prefetch_bytes; }

◆ GetPrefetchScore()

float File::GetPrefetchScore ( ) const

Definition at line 1693 of file XrdPfcFile.cc.

1694{
1695 return m_prefetch_score;
1696}

◆ GetRemoteLocations()

std::string File::GetRemoteLocations ( ) const

Definition at line 1717 of file XrdPfcFile.cc.

1718{
1719 std::string s;
1720 if ( ! m_remote_locations.empty())
1721 {
1722 size_t sl = 0;
1723 int nl = 0;
1724 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1725 {
1726 sl += i->size();
1727 }
1728 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1729 s = '[';
1730 int j = 1;
1731 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1732 {
1733 s += '"'; s += *i; s += '"';
1734 if (j < nl) s += ',';
1735 }
1736 s += ']';
1737 }
1738 else
1739 {
1740 s = "[]";
1741 }
1742 return s;
1743}

◆ GetTrace()

XrdSysTrace * File::GetTrace ( ) const

Definition at line 1703 of file XrdPfcFile.cc.

1704{
1705 return Cache::TheOne().GetTrace();
1706}
XrdSysTrace * GetTrace() const
Definition XrdPfc.hh:295

References XrdPfc::Cache::GetTrace(), and XrdPfc::Cache::TheOne().

+ Here is the call graph for this function:

◆ inc_ref_cnt()

int XrdPfc::File::inc_ref_cnt ( )
inline

Definition at line 287 of file XrdPfcFile.hh.

287{ return ++m_ref_cnt; }

◆ initiate_emergency_shutdown()

long long File::initiate_emergency_shutdown ( )

Definition at line 153 of file XrdPfcFile.cc.

154{
155 // Called from Cache::Unlink() when the file is currently open.
156 // Cache::Unlink is also called on FSync error and when wrong number of bytes
157 // is received from a remote read.
158 //
159 // From this point onward the file will not be written to, cinfo file will
160 // not be updated, and all new read requests will return -ENOENT.
161 //
162 // File's entry in the Cache's active map is set to nullptr and will be
163 // removed from there shortly, in any case, well before this File object
164 // shuts down. Cache::Unlink() also reports the appropriate purge event.
165
166 XrdSysCondVarHelper _lck(m_state_cond);
167
168 m_in_shutdown = true;
169
170 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
171 {
172 m_prefetch_state = kStopped;
173 cache()->DeRegisterPrefetchFile(this);
174 }
175
176 report_and_merge_delta_stats();
177
178 return m_st_blocks;
179}

Referenced by XrdPfc::Cache::UnlinkFile().

+ Here is the caller graph for this function:

◆ ioActive()

bool File::ioActive ( IO * io)

Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()

Definition at line 241 of file XrdPfcFile.cc.

242{
243 // Returns true if delay is needed.
244
245 TRACEF(Debug, "ioActive start for io " << io);
246
247 std::string loc(io->GetLocation());
248
249 {
250 XrdSysCondVarHelper _lck(m_state_cond);
251
252 IoSet_i mi = m_io_set.find(io);
253
254 if (mi != m_io_set.end())
255 {
256 unsigned int n_active_reads = io->m_active_read_reqs;
257
258 TRACE(Info, "ioActive for io " << io <<
259 ", active_reads " << n_active_reads <<
260 ", active_prefetches " << io->m_active_prefetches <<
261 ", allow_prefetching " << io->m_allow_prefetching <<
262 ", ios_in_detach " << m_ios_in_detach);
263 TRACEF(Info,
264 "\tio_map.size() " << m_io_set.size() <<
265 ", block_map.size() " << m_block_map.size() << ", file");
266
267 insert_remote_location(loc);
268
269 io->m_allow_prefetching = false;
270 io->m_in_detach = true;
271
272 // Check if any IO is still available for prfetching. If not, stop it.
273 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
274 {
275 if ( ! select_current_io_or_disable_prefetching(false) )
276 {
277 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
278 }
279 }
280
281 // On last IO, consider write queue blocks. Note, this also contains
282 // blocks being prefetched.
283
284 bool io_active_result;
285
286 if (n_active_reads > 0)
287 {
288 io_active_result = true;
289 }
290 else if (m_io_set.size() - m_ios_in_detach == 1)
291 {
292 io_active_result = ! m_block_map.empty();
293 }
294 else
295 {
296 io_active_result = io->m_active_prefetches > 0;
297 }
298
299 if ( ! io_active_result)
300 {
301 ++m_ios_in_detach;
302 }
303
304 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
305
306 return io_active_result;
307 }
308 else
309 {
310 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
311 return false;
312 }
313 }
314}
#define TRACE(act, x)
Definition XrdTrace.hh:63
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70

References Debug, Error, XrdPfc::IO::GetLocation(), XrdPfc::IO::m_active_read_reqs, TRACE, and TRACEF.

+ Here is the call graph for this function:

◆ ioUpdated()

void File::ioUpdated ( IO * io)

Notification from IO that it has been updated (remote open).

Definition at line 232 of file XrdPfcFile.cc.

233{
234 std::string loc(io->GetLocation());
235 XrdSysCondVarHelper _lck(m_state_cond);
236 insert_remote_location(loc);
237}

References XrdPfc::IO::GetLocation().

+ Here is the call graph for this function:

◆ is_in_emergency_shutdown()

bool XrdPfc::File::is_in_emergency_shutdown ( )
inline

Definition at line 291 of file XrdPfcFile.hh.

291{ return m_in_shutdown; }

◆ lPath()

const char * File::lPath ( ) const

Log path.

Definition at line 1605 of file XrdPfcFile.cc.

1606{
1607 return m_filename.c_str();
1608}

Referenced by XrdPfc::Cache::ProcessWriteTasks(), and XrdPfc::Cache::RemoveWriteQEntriesFor().

+ Here is the caller graph for this function:

◆ Prefetch()

void File::Prefetch ( )

Definition at line 1620 of file XrdPfcFile.cc.

1621{
1622 // Check that block is not on disk and not in RAM.
1623 // TODO: Could prefetch several blocks at once!
1624 // blks_max could be an argument
1625
1626 BlockList_t blks;
1627
1628 TRACEF(DumpXL, "Prefetch() entering.");
1629 {
1630 XrdSysCondVarHelper _lck(m_state_cond);
1631
1632 if (m_prefetch_state != kOn)
1633 {
1634 return;
1635 }
1636
1637 if ( ! select_current_io_or_disable_prefetching(true) )
1638 {
1639 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1640 return;
1641 }
1642
1643 // Select block(s) to fetch.
1644 for (int f = 0; f < m_num_blocks; ++f)
1645 {
1646 if ( ! m_cfi.TestBitWritten(f))
1647 {
1648 int f_act = f + m_offset / m_block_size;
1649
1650 BlockMap_i bi = m_block_map.find(f_act);
1651 if (bi == m_block_map.end())
1652 {
1653 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1654 if (b)
1655 {
1656 TRACEF(Dump, "Prefetch take block " << f_act);
1657 blks.push_back(b);
1658 // Note: block ref_cnt not increased, it will be when placed into write queue.
1659
1660 inc_prefetch_read_cnt(1);
1661 }
1662 else
1663 {
1664 // This shouldn't happen as prefetching stops when RAM is 70% full.
1665 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1666 }
1667 break;
1668 }
1669 }
1670 }
1671
1672 if (blks.empty())
1673 {
1674 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1675 m_prefetch_state = kComplete;
1676 cache()->DeRegisterPrefetchFile(this);
1677 }
1678 else
1679 {
1680 (*m_current_io)->m_active_prefetches += (int) blks.size();
1681 }
1682 }
1683
1684 if ( ! blks.empty())
1685 {
1686 ProcessBlockRequests(blks);
1687 }
1688}
std::list< Block * > BlockList_t

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::Prefetch().

+ Here is the caller graph for this function:

◆ Read()

int File::Read ( IO * io,
char * buff,
long long offset,
int size,
ReadReqRH * rh )

Normal read.

Definition at line 813 of file XrdPfcFile.cc.

814{
815 // rrc_func is ONLY called from async processing.
816 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
817 // This streamlines implementation of synchronous IO::Read().
818
819 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
820
821 m_state_cond.Lock();
822
823 if (m_in_shutdown || io->m_in_detach)
824 {
825 m_state_cond.UnLock();
826 return m_in_shutdown ? -ENOENT : -EBADF;
827 }
828
829 // Shortcut -- file is fully downloaded.
830
831 if (m_cfi.IsComplete())
832 {
833 m_state_cond.UnLock();
834 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
835 if (ret > 0) {
836 XrdSysCondVarHelper _lck(m_state_cond);
837 m_delta_stats.AddBytesHit(ret);
838 check_delta_stats();
839 }
840 return ret;
841 }
842
843 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
844
845 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
846}
unsigned short m_seq_id
Definition XrdPfcFile.hh:53

References Xrd::hex1, XrdPfc::ReadReqRH::m_seq_id, and TRACEF.

Referenced by XrdPfc::IOFileBlock::Read().

+ Here is the caller graph for this function:

◆ ReadV()

int File::ReadV ( IO * io,
const XrdOucIOVec * readV,
int readVnum,
ReadReqRH * rh )

Vector read.

Definition at line 850 of file XrdPfcFile.cc.

851{
852 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
853
854 m_state_cond.Lock();
855
856 if (m_in_shutdown || io->m_in_detach)
857 {
858 m_state_cond.UnLock();
859 return m_in_shutdown ? -ENOENT : -EBADF;
860 }
861
862 // Shortcut -- file is fully downloaded.
863
864 if (m_cfi.IsComplete())
865 {
866 m_state_cond.UnLock();
867 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
868 if (ret > 0) {
869 XrdSysCondVarHelper _lck(m_state_cond);
870 m_delta_stats.AddBytesHit(ret);
871 check_delta_stats();
872 }
873 return ret;
874 }
875
876 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
877}

References TRACEF.

◆ RefStats()

const Stats & XrdPfc::File::RefStats ( ) const
inline

Definition at line 281 of file XrdPfcFile.hh.

281{ return m_stats; }

◆ RemoveIO()

void File::RemoveIO ( IO * io)

Definition at line 385 of file XrdPfcFile.cc.

386{
387 // Called from Cache::ReleaseFile.
388
389 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
390
391 time_t now = time(0);
392
393 m_state_cond.Lock();
394
395 IoSet_i mi = m_io_set.find(io);
396
397 if (mi != m_io_set.end())
398 {
399 if (mi == m_current_io)
400 {
401 ++m_current_io;
402 }
403
404 m_delta_stats.IoDetach(now - io->m_attach_time);
405 m_io_set.erase(mi);
406 --m_ios_in_detach;
407
408 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
409 {
410 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
411 m_prefetch_state = kStopped;
412 cache()->DeRegisterPrefetchFile(this);
413 }
414 }
415 else
416 {
417 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
418 }
419
420 m_state_cond.UnLock();
421}

References Debug, Error, and TRACEF.

Referenced by XrdPfc::Cache::ReleaseFile().

+ Here is the caller graph for this function:

◆ RequestSyncOfDetachStats()

void File::RequestSyncOfDetachStats ( )

Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.

Definition at line 318 of file XrdPfcFile.cc.

319{
320 XrdSysCondVarHelper _lck(m_state_cond);
321 m_detach_time_logged = false;
322}

◆ StopPrefetchingOnIO()

void XrdPfc::File::StopPrefetchingOnIO ( IO * io)

◆ Sync()

void File::Sync ( )

Sync file cache inf o and output data with disk.

Definition at line 1219 of file XrdPfcFile.cc.

1220{
1221 TRACEF(Dump, "Sync()");
1222
1223 int ret = m_data_file->Fsync();
1224 bool errorp = false;
1225 if (ret == XrdOssOK)
1226 {
1227 Stats loc_stats;
1228 {
1229 XrdSysCondVarHelper _lck(&m_state_cond);
1230 report_and_merge_delta_stats();
1231 loc_stats = m_stats;
1232 }
1233 m_cfi.WriteIOStat(loc_stats);
1234 m_cfi.Write(m_info_file, m_filename.c_str());
1235 int cret = m_info_file->Fsync();
1236 if (cret != XrdOssOK)
1237 {
1238 TRACEF(Error, "Sync cinfo file sync error " << cret);
1239 errorp = true;
1240 }
1241 }
1242 else
1243 {
1244 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1245 errorp = true;
1246 }
1247
1248 if (errorp)
1249 {
1250 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1251
1252 // Unlink will also call this->initiate_emergency_shutdown()
1253 Cache::GetInstance().UnlinkFile(m_filename, false);
1254
1255 XrdSysCondVarHelper _lck(&m_state_cond);
1256
1257 m_writes_during_sync.clear();
1258 m_in_sync = false;
1259
1260 return;
1261 }
1262
1263 int written_while_in_sync;
1264 bool resync = false;
1265 {
1266 XrdSysCondVarHelper _lck(&m_state_cond);
1267 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1268 {
1269 m_cfi.SetBitSynced(*i);
1270 }
1271 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1272 m_writes_during_sync.clear();
1273
1274 // If there were writes during sync and the file is now complete,
1275 // let us call Sync again without resetting the m_in_sync flag.
1276 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1277 resync = true;
1278 else
1279 m_in_sync = false;
1280 }
1281 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1282
1283 if (resync)
1284 Sync();
1285}
#define XrdOssOK
Definition XrdOss.hh:50
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1188
void Sync()
Sync file cache inf o and output data with disk.
XrdPosixStats Stats

References Error, XrdPfc::Cache::GetInstance(), Sync(), TRACEF, XrdPfc::Cache::UnlinkFile(), and XrdOssOK.

Referenced by Sync().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ WriteBlockToDisk()

void File::WriteBlockToDisk ( Block * b)

Definition at line 1133 of file XrdPfcFile.cc.

1134{
1135 // write block buffer into disk file
1136 long long offset = b->m_offset - m_offset;
1137 long long size = b->get_size();
1138 ssize_t retval;
1139
1140 if (m_cfi.IsCkSumCache())
1141 if (b->has_cksums())
1142 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1143 else
1144 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1145 else
1146 retval = m_data_file->Write(b->get_buff(), offset, size);
1147
1148 if (retval < size)
1149 {
1150 if (retval < 0) {
1151 TRACEF(Error, "WriteToDisk() write error " << retval);
1152 } else {
1153 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1154 }
1155
1156 XrdSysCondVarHelper _lck(m_state_cond);
1157
1158 dec_ref_count(b);
1159
1160 return;
1161 }
1162
1163 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1164
1165 // Set written bit.
1166 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1167
1168 bool schedule_sync = false;
1169 {
1170 XrdSysCondVarHelper _lck(m_state_cond);
1171
1172 m_cfi.SetBitWritten(blk_idx);
1173
1174 if (b->m_prefetch)
1175 {
1176 m_cfi.SetBitPrefetch(blk_idx);
1177 }
1178 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1179 {
1180 m_cfi.ResetCkSumNet();
1181 }
1182
1183 // Set synced bit or stash block index if in actual sync.
1184 // Synced state is only written out to cinfo file when data file is synced.
1185 if (m_in_sync)
1186 {
1187 m_writes_during_sync.push_back(blk_idx);
1188 }
1189 else
1190 {
1191 m_cfi.SetBitSynced(blk_idx);
1192 ++m_non_flushed_cnt;
1193 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1194 ! m_in_shutdown)
1195 {
1196 schedule_sync = true;
1197 m_in_sync = true;
1198 m_non_flushed_cnt = 0;
1199 }
1200 }
1201 // As soon as the reference count is decreased on the block, the
1202 // file object may be deleted. Thus, to avoid holding both locks at a time,
1203 // we defer the ref count decrease until later if a sync is needed
1204 if (!schedule_sync) {
1205 dec_ref_count(b);
1206 }
1207 }
1208
1209 if (schedule_sync)
1210 {
1211 cache()->ScheduleFileSync(this);
1212 XrdSysCondVarHelper _lck(m_state_cond);
1213 dec_ref_count(b);
1214 }
1215}
int get_size() const
vCkSum_t & ref_cksum_vec()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:215
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:122

References Error, XrdPfc::Block::get_buff(), XrdPfc::Block::get_size(), XrdPfc::Cache::GetInstance(), XrdPfc::Block::has_cksums(), XrdPfc::Block::m_offset, XrdPfc::Block::m_prefetch, XrdPfc::Block::ref_cksum_vec(), XrdPfc::Block::req_cksum_net(), and TRACEF.

Referenced by XrdPfc::Cache::ProcessWriteTasks().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

Friends And Related Symbol Documentation

◆ BlockResponseHandler

friend class BlockResponseHandler
friend

Definition at line 205 of file XrdPfcFile.hh.

References BlockResponseHandler.

Referenced by BlockResponseHandler.

◆ Cache

friend class Cache
friend

Definition at line 204 of file XrdPfcFile.hh.

References Cache.

Referenced by Cache.

◆ DirectResponseHandler

friend class DirectResponseHandler
friend

Definition at line 206 of file XrdPfcFile.hh.

References DirectResponseHandler.

Referenced by DirectResponseHandler.


The documentation for this class was generated from the following files: