一、淘汰page
1.1 使用时钟轮转算法获取淘汰buffer
static inline uint32
ClockSweepTick(void)
{
uint32 victim;
/*
* Atomically move hand ahead one buffer - if there's several processes
* doing this, this can lead to buffers being returned slightly out of
* apparent order.
*/
victim =
pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1);
if (victim >= NBuffers)
{
uint32 originalVictim = victim;
/* always wrap what we look up in BufferDescriptors */
victim = victim % NBuffers;
/*
* If we're the one that just caused a wraparound, force
* completePasses to be incremented while holding the spinlock. We
* need the spinlock so StrategySyncStart() can return a consistent
* value consisting of nextVictimBuffer and completePasses.
*/
if (victim == 0)
{
uint32 expected;
uint32 wrapped;
bool success = false;
expected = originalVictim + 1;
while (!success)
{
/*
* Acquire the spinlock while increasing completePasses. That
* allows other readers to read nextVictimBuffer and
* completePasses in a consistent manner which is required for
* StrategySyncStart(). In theory delaying the increment
* could lead to an overflow of nextVictimBuffers, but that's
* highly unlikely and wouldn't be particularly harmful.
*/
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
wrapped = expected % NBuffers;
success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer,
&expected, wrapped);
if (success)
StrategyControl->completePasses++;
SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}
}
}
return victim;
}
通过nextVictimBuffer
记录遍历到的位置,当遍历完整个buffer后,将继续从头开始,nextVictimBuffer
被重置,并且completePasses
增加1
当freelist中没有可用的buffer时,遍历一遍整个buffer。
引用计数为0的,并且使用计数也为0的,即为淘汰的buffer;
引用计数为0的,但是使用计数不为0,将使用计数减1, 并且将trycounter重置,这样就能再次遍历到这个buffer,总能获取到一个淘汰的buffer。
BufferDesc *
StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state)
{
...
/* Nothing on the freelist, so run the "clock sweep" algorithm */
trycounter = NBuffers;
for (;;)
{
buf = GetBufferDescriptor(ClockSweepTick());
/*
* If the buffer is pinned or has a nonzero usage_count, we cannot use
* it; decrement the usage_count (unless pinned) and keep scanning.
*/
local_buf_state = LockBufHdr(buf);
if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0)
{
if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0)
{
local_buf_state -= BUF_USAGECOUNT_ONE;
trycounter = NBuffers;
}
else
{
/* Found a usable buffer */
if (strategy != NULL)
AddBufferToRing(strategy, buf);
*buf_state = local_buf_state;
return buf;
}
}
else if (--trycounter == 0)
{
/*
* We've scanned all the buffers without making any state changes,
* so all the buffers are pinned (or were when we looked at them).
* We could hope that someone will free one eventually, but it's
* probably better to fail than to risk getting stuck in an
* infinite loop.
*/
UnlockBufHdr(buf, local_buf_state);
elog(ERROR, "no unpinned buffers available");
}
UnlockBufHdr(buf, local_buf_state);
}
...
}
二、释放buffer
比如如下调用链
src/backend/storage/buffer/bufmgr.c
DropRelFileNodeBuffers()
src/backend/storage/buffer/bufmgr.c
InvalidateBuffer(BufferDesc *buf)
src/backend/storage/buffer/freelist.c
StrategyFreeBuffer(buf);
将释放的buffer又放回了freelist中
void
StrategyFreeBuffer(BufferDesc *buf)
{
SpinLockAcquire(&StrategyControl->buffer_strategy_lock);
/*
* It is possible that we are told to put something in the freelist that
* is already in it; don't screw up the list if so.
*/
if (buf->freeNext == FREENEXT_NOT_IN_LIST)
{
buf->freeNext = StrategyControl->firstFreeBuffer;
if (buf->freeNext < 0)
StrategyControl->lastFreeBuffer = buf->buf_id;
StrategyControl->firstFreeBuffer = buf->buf_id;
}
SpinLockRelease(&StrategyControl->buffer_strategy_lock);
}
三、hash扩容
void *
hash_search_with_hash_value(HTAB *hashp,
const void *keyPtr,
uint32 hashvalue,
HASHACTION action,
bool *foundPtr)
{
HASHHDR *hctl = hashp->hctl;
int freelist_idx = FREELIST_IDX(hctl, hashvalue);
...
/*
* If inserting, check if it is time to split a bucket.
*
* NOTE: failure to expand table is not a fatal error, it just means we
* have to run at higher fill factor than we wanted. However, if we're
* using the palloc allocator then it will throw error anyway on
* out-of-memory, so we must do this before modifying the table.
*/
if (action == HASH_ENTER || action == HASH_ENTER_NULL)
{
/*
* Can't split if running in partitioned mode, nor if frozen, nor if
* table is the subject of any active hash_seq_search scans.
*/
if (hctl->freeList[0].nentries > (long) hctl->max_bucket &&
!IS_PARTITIONED(hctl) && !hashp->frozen &&
!has_seq_scans(hashp))
(void) expand_table(hashp);
}
...
只有在非分区模式,没有被冻结,没有序列扫描时才能扩容。
pg用的hash算法也称线性hash,当扩容后,只有一个桶中的节点需要重新hash,在桶增加/减少时,需要重新hash的节点少,但数据分布不均匀
/*
* Expand the table by adding one more hash bucket.
*/
static bool
expand_table(HTAB *hashp)
{
HASHHDR *hctl = hashp->hctl;
HASHSEGMENT old_seg,
new_seg;
long old_bucket,
new_bucket;
long new_segnum,
new_segndx;
long old_segnum,
old_segndx;
HASHBUCKET *oldlink,
*newlink;
HASHBUCKET currElement,
nextElement;
Assert(!IS_PARTITIONED(hctl));
#ifdef HASH_STATISTICS
hash_expansions++;
#endif
new_bucket = hctl->max_bucket + 1;
new_segnum = new_bucket >> hashp->sshift;
new_segndx = MOD(new_bucket, hashp->ssize);
if (new_segnum >= hctl->nsegs)
{
/* Allocate new segment if necessary -- could fail if dir full */
if (new_segnum >= hctl->dsize)
if (!dir_realloc(hashp))
return false;
if (!(hashp->dir[new_segnum] = seg_alloc(hashp)))
return false;
hctl->nsegs++;
}
/* OK, we created a new bucket */
hctl->max_bucket++;
/*
* *Before* changing masks, find old bucket corresponding to same hash
* values; values in that bucket may need to be relocated to new bucket.
* Note that new_bucket is certainly larger than low_mask at this point,
* so we can skip the first step of the regular hash mask calc.
*/
old_bucket = (new_bucket & hctl->low_mask);
/*
* If we crossed a power of 2, readjust masks.
*/
if ((uint32) new_bucket > hctl->high_mask)
{
hctl->low_mask = hctl->high_mask;
hctl->high_mask = (uint32) new_bucket | hctl->low_mask;
}
/*
* Relocate records to the new bucket. NOTE: because of the way the hash
* masking is done in calc_bucket, only one old bucket can need to be
* split at this point. With a different way of reducing the hash value,
* that might not be true!
*/
old_segnum = old_bucket >> hashp->sshift;
old_segndx = MOD(old_bucket, hashp->ssize);
old_seg = hashp->dir[old_segnum];
new_seg = hashp->dir[new_segnum];
oldlink = &old_seg[old_segndx];
newlink = &new_seg[new_segndx];
for (currElement = *oldlink;
currElement != NULL;
currElement = nextElement)
{
nextElement = currElement->link;
if ((long) calc_bucket(hctl, currElement->hashvalue) == old_bucket)
{
*oldlink = currElement;
oldlink = &currElement->link;
}
else
{
*newlink = currElement;
newlink = &currElement->link;
}
}
/* don't forget to terminate the rebuilt hash chains... */
*oldlink = NULL;
*newlink = NULL;
return true;
}
static bool
dir_realloc(HTAB *hashp)
{
HASHSEGMENT *p;
HASHSEGMENT *old_p;
long new_dsize;
long old_dirsize;
long new_dirsize;
if (hashp->hctl->max_dsize != NO_MAX_DSIZE)
return false;
/* Reallocate directory */
new_dsize = hashp->hctl->dsize << 1;
old_dirsize = hashp->hctl->dsize * sizeof(HASHSEGMENT);
new_dirsize = new_dsize * sizeof(HASHSEGMENT);
old_p = hashp->dir;
CurrentDynaHashCxt = hashp->hcxt;
p = (HASHSEGMENT *) hashp->alloc((Size) new_dirsize);
if (p != NULL)
{
memcpy(p, old_p, old_dirsize);
MemSet(((char *) p) + old_dirsize, 0, new_dirsize - old_dirsize);
hashp->dir = p;
hashp->hctl->dsize = new_dsize;
/* XXX assume the allocator is palloc, so we know how to free */
Assert(hashp->alloc == DynaHashAlloc);
pfree(old_p);
return true;
}
return false;
}
这里的buffer不满足,不能被扩容
版权声明:本文为happytree001原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。