PostgreSQL source code reading-WAL log reading read_local_xlog_page()

PostgreSQL source code reading-WAL log reading read_local_xlog_page()

We explore the WAL log reading function read_local_xlog_page() function code. The code is selected from pg12, logicalfuns.c

The read_page() function can obtain the contents of a specific data page

The read_page() function is a function pointer in the XLogReaderState structure. The function pointer points to the logical_read_local_xlog_page() function in the pg_logical_slot_get_changes_guts() function in the logicalfuns.c file.

The read_local_xlog_page function is used to read the data of the local write-ahead log (xlog) page.

ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
logical_read_local_xlog_page,
NULL, NULL, NULL);

The logical_read_local_xlog_page() function code is as follows

int
logical_read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)
{<!-- -->
return read_local_xlog_page(state, targetPagePtr, reqLen,
targetRecPtr, cur_page, pageTLI);
}

The read_local_xlog_page() function code is as follows

int
read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
int reqLen, XLogRecPtr targetRecPtr, char *cur_page,
TimeLineID *pageTLI)
{<!-- -->
XLogRecPtr read_upto,
loc;
int count;

loc = targetPagePtr + reqLen;

/* Loop waiting for xlog to be available if necessary */
while (1)
{<!-- -->
/*
* Determine the limit of xlog we can currently read to, and what the
* most recent timeline is.
*
* RecoveryInProgress() will update ThisTimeLineID when it first
* notices recovery finishes, so we only have to maintain it for the
* local process until recovery ends.
*/
if (!RecoveryInProgress())
read_upto = GetFlushRecPtr();
else
read_upto = GetXLogReplayRecPtr( & amp;ThisTimeLineID);

*pageTLI = ThisTimeLineID;

/*
* Check which timeline to get the record from.
*
* We have to do it each time through the loop because if we're in
* recovery as a cascading standby, the current timeline might've
* become historical. We can't rely on RecoveryInProgress() because in
* a standby configuration like
*
* A => B => C
*
* if we're a logical decoding session on C, and B gets promoted, our
* timeline will change while we remain in recovery.
*
* We can't just keep reading from the old timeline as the last WAL
* archive in the timeline will get renamed to .partial by
* StartupXLOG().
*
* If that happens after our caller updated ThisTimeLineID but before
* we actually read the xlog page, we might still try to read from the
* old (now renamed) segment and fail. There's not much we can do
* about this, but it can only happen when we're a leaf of a cascading
* standby whose master gets promoted while we're decoding, so a
* one-off ERROR isn't too bad.
*/
XLogReadDetermineTimeline(state, targetPagePtr, reqLen);

if (state->currTLI == ThisTimeLineID)
{<!-- -->

if (loc <= read_upto)
break;

CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
}
else
{<!-- -->
/*
* We're on a historical timeline, so limit reading to the switch
* point where we moved to the next timeline.
*
* We don't need to GetFlushRecPtr or GetXLogReplayRecPtr. We know
* about the new timeline, so we must've received past the end of
* it.
*/
read_upto = state->currTLIValidUntil;

/*
* Setting pageTLI to our wanted record's TLI is slightly wrong;
* the page might begin on an older timeline if it contains a
* timeline switch, since its xlog segment will have been copied
* from the prior timeline. This is pretty harmless though, as
* nothing cares so long as the timeline doesn't go backwards. We
* should read the page header instead; FIXME someday.
*/
*pageTLI = state->currTLI;

/* No need to wait on a historical timeline */
break;
}
}

if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
{<!-- -->
/*
* more than one block available; read only that block, have caller
* come back if they need more.
*/
count = XLOG_BLCKSZ;
}
else if (targetPagePtr + reqLen > read_upto)
{<!-- -->
/* not enough data there */
return -1;
}
else
{<!-- -->
/* enough bytes available to satisfy the request */
count = read_upto - targetPagePtr;
}

/*
* Even though we just determined how much of the page can be effectively read
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
XLOG_BLCKSZ);

/* number of valid bytes in the buffer */
return count;
}

The logical_read_local_xlog_page() function is used to read the local xlog file page. The specific function flow is as follows:

1. First is a loop that waits until xlog becomes available.

In this loop, the timeline of the wal log will be read to determine whether the timeline of the log will change.

If the timelineID has not changed and the requested xlog point is smaller than the replay point, exit the loop and sleep if it is not smaller than the most recent replay point.

if (state->currTLI == ThisTimeLineID)
{<!-- -->

if (loc <= read_upto)
break;

CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
}

If the timeline changes, it means that the xlog we are reading now is on the historical timeline, then we will read the xlog in a limited way, that is, we will read the Switch point where the xlog switches to the new timeline. We know the new timeline, so we’ve definitely received its end.

read\_upto = state->currTLIValidUntil;

Setting the pageTLI to the TLI we need to log is not quite logically right because if the page contains a timeline switch, the page might start in an older timeline since its xlog segment will start from the previous timeline copy.

\*pageTLI = state->currTLI;

Then exit the loop

2. The target xlog page plus the page size. If it is less than or equal to read_upto, it means that the data read is not enough. Here we get the data one by one.

If targetPagePtr + reqLen > read_upto, it means there is not enough data to read, and -1 is returned.

In other cases, it means that there is enough data to read, and the data to be read is the position of read_upto minus the starting position of the page header.

 if (targetPagePtr + XLOG_BLCKSZ <= read_upto)
    {<!-- -->
     /*
     * If there are multiple chunks available; read only that chunk and let the caller come back for more if they need more.
     */
     count = XLOG_BLCKSZ;
    }
    else if (targetPagePtr + reqLen > read_upto)
    {<!-- -->
     /* Not enough data there */
     return -1;
    }
    else
    {<!-- -->
     /* There are enough bytes to satisfy the request */
     count = read_upto - targetPagePtr;
    }

3. Just determined how many parts of the page can be effectively read as ‘count’, but the entire page is still read. If the page is incomplete, it is guaranteed to be padded with zeros up to the page boundary

XLogRead(cur_page, state->wal_segment_size, *pageTLI, targetPagePtr,
XLOG_BLCKSZ);

4. Return the valid bytes read in the buffer

return count;