Advancement of postgresql logical replication slots

Why should the logical replication slot be advanced?

  • We know that the logical replication slot has two functions, one is to protect the system table from being vacuumed, and the other is to protect the xlog and prevent the xlog that needs to be decoded from being recycled.
  • If the system table does not advance forward, the system table will bloat.
  • If xlog is not pushed forward, xlog will accumulate on the disk.
  • Therefore, we advance the logical replication slot based on the current progress of logical replication.

What fields are mainly reflected in the advancement of logical replication slots?

  • For xlog, same as physical replication slot, controlled by restart_lsn
  • For system tables, controlled by catalog_xmin

Promotion of restart_lsn

  • The advancement of physical replication restart lsn is relatively simple. When the standby machine flushes an lsn, it tells the host. After the host receives it, it will immediately advance the restart lsn.
  • Logical replication is relatively complex.
    • We use confirmed_flush to indicate the progress of the current subscriber logical replication. However, this does not mean that the xlog before confirmed_flush can be recycled.
    • Because logical decoding requires historical snapshots, and the construction of historical snapshots is essentially based on xlog such as running_xacts and subsequent commits.
    • Therefore, in order to be able to parse the subsequent xlog, restart lsn also needs to protect the snapshot xlog needed for subsequent decoding.
    • Then you have to look at the oldest snapshot lsn that needs to be used when updating confirmed_flush.
    • So we use two lsn to advance restart_lsn. candidate_restart_valid represents the current decoding progress lsn, and candidate_restart_lsn represents the lsn of the oldest snapshot that needs to be used when decoding to candidate_restart_valid.
    • When confirmed_flush exceeds candidate_restart_valid, restart_lsn can be advanced to candidate_restart_lsn and updated.
//The processing after the host or publisher receives the flush from the standby or subscriber
ProcessStandbyReplyMessage(void)
{
    if (MyReplicationSlot & amp; & amp; flushPtr != InvalidXLogRecPtr)
    {
        if (SlotIsLogical(MyReplicationSlot))
            LogicalConfirmReceivedLocation(flushPtr);
        else
            PhysicalConfirmReceivedLocation(flushPtr);
    }
}

// For the physical replication slot, as long as the standby machine flushes an lsn, the host will immediately push the restart lsn after receiving it.
PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
    if (slot->data.restart_lsn != lsn)
    {
        changed = true;
        slot->data.restart_lsn = lsn;
    }
}


// Logical replication slot, if the subscriber flushes an lsn, the restart lsn will not be directly promoted. Instead, it is judged whether to advance the restart lsn based on the candidate situation.
// If possible, push forward. If not, only record the lsn of the current subscriber flush to confirmed_flush.
LogicalConfirmReceivedLocation(XLogRecPtr lsn)
{
    if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
        MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    {
        if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr & amp; & amp;
            MyReplicationSlot->candidate_restart_valid <= lsn)
        {
            MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
            MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
            MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
            updated_restart = true;
        }
    }
    else
    {
        SpinLockAcquire( & amp;MyReplicationSlot->mutex);
        MyReplicationSlot->data.confirmed_flush = lsn;
        SpinLockRelease( & amp;MyReplicationSlot->mutex);
    }
}

// Why can't logical replication directly restart lsn based on flush?
// Because logical decoding requires historical snapshots, and the construction of historical snapshots is essentially based on xlog such as running_xacts and subsequent commits.
//So in order to be able to parse the subsequent xlog, restart lsn also needs to protect the snapshot xlog needed for subsequent xlog decoding.
// Because we will serialize a snapshot every time after parsing running_xacts, so we can advance the lsn we need to protect at this time, which is the so-called candidate
// After the flush on the subscriber side meets the conditions, you can actually proceed with the restart lsn.
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
    //After processing running_xacts, serialize a snapshot
    SnapBuildSerialize(builder, lsn);

    // We find the oldest txn from the reorder buffer, so that we can get the oldest xlog that needs to be protected
    txn = ReorderBufferGetOldestTXN(builder->reorder);
    if (txn != NULL & amp; & amp; txn->restart_decoding_lsn != InvalidXLogRecPtr)
        LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    // If there is no transaction currently being processed, use the latest serialized snapshot location.
    else if (txn == NULL & amp; & amp;
             builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr & amp; & amp;
             builder->last_serialized_snapshot != InvalidXLogRecPtr)
        LogicalIncreaseRestartDecodingForSlot(lsn,
                                              builder->last_serialized_snapshot);
}

// There are two lsn here. candidate_restart_valid is a reference object. It is the current lsn. When the subscriber confirmed_flush exceeds it, the corresponding restart_lsn can be advanced.
// Why is it designed like this? Because confirmed_flush only represents the progress of the flush, or only the decoding xlog progress, not the snapshot xlog progress.
LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
{
    if (current_lsn <= slot->data.confirmed_flush)
    {
        slot->candidate_restart_valid = current_lsn;
        slot->candidate_restart_lsn = restart_lsn;
        updated_lsn = true;
    }

    if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    {
        slot->candidate_restart_valid = current_lsn;
        slot->candidate_restart_lsn = restart_lsn;
    }

    if (updated_lsn)
        LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
}

Promotion of catalog_xmin

  • The advancement of restart lsn is similar.
  • When serializing snapshots, use candidate_xmin_lsn to indicate the current decoding progress, and use candidate_catalog_xmin to indicate the oldest xmin needed for current decoding.
  • When confirmed_flush exceeds candidate_xmin_lsn, the catalog_xmin of the replication slot can be updated to candidate_catalog_xmin.
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
    SnapBuildSerialize(builder, lsn);
    xmin = ReorderBufferGetOldestXmin(builder->reorder);
    LogicalIncreaseXminForSlot(lsn, xmin);
}

LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
{
    if (current_lsn <= slot->data.confirmed_flush)
    {
        slot->candidate_catalog_xmin = xmin;
        slot->candidate_xmin_lsn = current_lsn;
        updated_xmin = true;
    }
    else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    {
        slot->candidate_catalog_xmin = xmin;
        slot->candidate_xmin_lsn = current_lsn;
    }
}

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. PostgreSQL skill treeSQL advanced skillsRecursive query 7087 people are learning the system