本小节将从源码角度重点讲解postgres中共享内存和信号量创建,其接口函数为CreateSharedMemoryAndSemaphores。postmaster调用此函数时会初始化共享内存和信号量,而其他进程调用不进行初始化,仅获取全局变量:共享内存指针和信号量指针。关键数据结构知识回顾见 postgres 源码解析33 进程间通信–1
1) 计算共享内存总大小
2 )为各个模块分配相应大小的内存空间
3)初始化共享内存头指针
4)初始化各模块内存空间
5)启动动态共享内存设施
本文讲解蓝色字体模块逻辑
1 计算共享内存总大小
void
CreateSharedMemoryAndSemaphores(void)
{
PGShmemHeader *shim = NULL;
if (!IsUnderPostmaster)
{
PGShmemHeader *seghdr;
Size size;
int numSemas;
numSemas = ProcGlobalSemas();
numSemas += SpinlockSemas();
size = 100000;
size = add_size(size, PGSemaphoreShmemSize(numSemas));
size = add_size(size, SpinlockSemaSize());
size = add_size(size, hash_estimate_size(SHMEM_INDEX_SIZE,
sizeof(ShmemIndexEnt)));
size = add_size(size, dsm_estimate_size());
size = add_size(size, BufferShmemSize());
size = add_size(size, LockShmemSize());
size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
size = add_size(size, CLOGShmemSize());
size = add_size(size, CommitTsShmemSize());
size = add_size(size, SUBTRANSShmemSize());
size = add_size(size, TwoPhaseShmemSize());
size = add_size(size, BackgroundWorkerShmemSize());
size = add_size(size, MultiXactShmemSize());
size = add_size(size, LWLockShmemSize());
size = add_size(size, ProcArrayShmemSize());
size = add_size(size, BackendStatusShmemSize());
size = add_size(size, SInvalShmemSize());
size = add_size(size, PMSignalShmemSize());
size = add_size(size, ProcSignalShmemSize());
size = add_size(size, CheckpointerShmemSize());
size = add_size(size, AutoVacuumShmemSize());
size = add_size(size, ReplicationSlotsShmemSize());
size = add_size(size, ReplicationOriginShmemSize());
size = add_size(size, WalSndShmemSize());
size = add_size(size, WalRcvShmemSize());
size = add_size(size, PgArchShmemSize());
size = add_size(size, ApplyLauncherShmemSize());
size = add_size(size, SnapMgrShmemSize());
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
addin_request_allowed = false;
size = add_size(size, total_addin_request);
size = add_size(size, 8192 - (size % 8192));
elog(DEBUG3, "invoking IpcMemoryCreate(size=%zu)", size);
2 PGSharedMemoryCreate
1)首先进行安全性检查,如DataDir目录头信息,申请内存大小大于 PGShmemHeader结构体大小等;
2)根据共享内存类型shared_memory_type调用对应的方法申请共享内存,如SHMEM_TYPE_MMAP则调用CreateAnonymousSegment申请匿名内存段;
3)紧接着为指定的 IPC key分配一个新的内存段,将这块内存段 Attach在当前进程,注册on_shmem_exit 回调函数释放此存储空间;
4)初始化标准头信息 PGShmemHeader,返回申请内存的头地址。
PGShmemHeader *
PGSharedMemoryCreate(Size size,
PGShmemHeader **shim)
{
IpcMemoryKey NextShmemSegID;
void *memAddress;
PGShmemHeader *hdr;
struct stat statbuf;
Size sysvsize;
if (stat(DataDir, &statbuf) < 0)
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not stat data directory \"%s\": %m",
DataDir)));
#if !defined(MAP_HUGETLB)
if (huge_pages == HUGE_PAGES_ON)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("huge pages not supported on this platform")));
#endif
Assert(size > MAXALIGN(sizeof(PGShmemHeader)));
if (shared_memory_type == SHMEM_TYPE_MMAP)
{
AnonymousShmem = CreateAnonymousSegment(&size);
AnonymousShmemSize = size;
on_shmem_exit(AnonymousShmemDetach, (Datum) 0);
sysvsize = sizeof(PGShmemHeader);
}
else
sysvsize = size;
NextShmemSegID = statbuf.st_ino;
for (;;)
{
IpcMemoryId shmid;
PGShmemHeader *oldhdr;
IpcMemoryState state;
memAddress = InternalIpcMemoryCreate(NextShmemSegID, sysvsize);
if (memAddress)
break;
shmid = shmget(NextShmemSegID, sizeof(PGShmemHeader), 0);
if (shmid < 0)
{
oldhdr = NULL;
state = SHMSTATE_FOREIGN;
}
else
state = PGSharedMemoryAttach(shmid, NULL, &oldhdr);
switch (state)
{
case SHMSTATE_ANALYSIS_FAILURE:
case SHMSTATE_ATTACHED:
ereport(FATAL,
(errcode(ERRCODE_LOCK_FILE_EXISTS),
errmsg("pre-existing shared memory block (key %lu, ID %lu) is still in use",
(unsigned long) NextShmemSegID,
(unsigned long) shmid),
errhint("Terminate any old server processes associated with data directory \"%s\".",
DataDir)));
break;
case SHMSTATE_ENOENT:
elog(LOG,
"shared memory block (key %lu, ID %lu) deleted during startup",
(unsigned long) NextShmemSegID,
(unsigned long) shmid);
break;
case SHMSTATE_FOREIGN:
NextShmemSegID++;
break;
case SHMSTATE_UNATTACHED:
if (oldhdr->dsm_control != 0)
dsm_cleanup_using_control_segment(oldhdr->dsm_control);
if (shmctl(shmid, IPC_RMID, NULL) < 0)
NextShmemSegID++;
break;
}
if (oldhdr && shmdt(oldhdr) < 0)
elog(LOG, "shmdt(%p) failed: %m", oldhdr);
}
hdr = (PGShmemHeader *) memAddress;
hdr->creatorPID = getpid();
hdr->magic = PGShmemMagic;
hdr->dsm_control = 0;
hdr->device = statbuf.st_dev;
hdr->inode = statbuf.st_ino;
hdr->totalsize = size;
hdr->freeoffset = MAXALIGN(sizeof(PGShmemHeader));
*shim = hdr;
UsedShmemSegAddr = memAddress;
UsedShmemSegID = (unsigned long) NextShmemSegID;
if (AnonymousShmem == NULL)
return hdr;
memcpy(AnonymousShmem, hdr, sizeof(PGShmemHeader));
return (PGShmemHeader *) AnonymousShmem;
}
3 InitShmemAccess
该函数负责初始化全局的共享内存指针,方便后续各模块内存初始化调用
void
InitShmemAccess(void *seghdr)
{
PGShmemHeader *shmhdr = (PGShmemHeader *) seghdr;
ShmemSegHdr = shmhdr;
ShmemBase = (void *) shmhdr;
ShmemEnd = (char *) ShmemBase + shmhdr->totalsize;
}
4 PGReserveSemaphores
该函数用于初始化信号量模块的共享内存,以支持进程间通信
void
PGReserveSemaphores(int maxSemas)
{
struct stat statbuf;
if (stat(DataDir, &statbuf) < 0)
ereport(FATAL,
(errcode_for_file_access(),
errmsg("could not stat data directory \"%s\": %m",
DataDir)));
sharedSemas = (PGSemaphore)
ShmemAllocUnlocked(PGSemaphoreShmemSize(maxSemas));
numSharedSemas = 0;
maxSharedSemas = maxSemas;
maxSemaSets = (maxSemas + SEMAS_PER_SET - 1) / SEMAS_PER_SET;
mySemaSets = (IpcSemaphoreId *)
malloc(maxSemaSets * sizeof(IpcSemaphoreId));
if (mySemaSets == NULL)
elog(PANIC, "out of memory");
numSemaSets = 0;
nextSemaKey = statbuf.st_ino;
nextSemaNumber = SEMAS_PER_SET;
on_shmem_exit(ReleaseSemaphores, 0);
}
5 SpinlockSemaInit
该函数用于实现SpinLock信号量初始化,一些平台支持Spinlock
void
SpinlockSemaInit(void)
{
PGSemaphore *spinsemas;
int nsemas = SpinlockSemas();
int i;
spinsemas = (PGSemaphore *) ShmemAllocUnlocked(SpinlockSemaSize());
for (i = 0; i < nsemas; ++i)
spinsemas[i] = PGSemaphoreCreate();
SpinlockSemaArray = spinsemas;
}
6 InitShmemAllocation
该函数用于初始化全局变量 ShmemLock,在后续各模块申请内存空间需持有该锁;同时初始化事务管理器中OID/XID相关的全局结构体:ShmemVariableCache
void
InitShmemAllocation(void)
{
PGShmemHeader *shmhdr = ShmemSegHdr;
char *aligned;
Assert(shmhdr != NULL);
ShmemLock = (slock_t *) ShmemAllocUnlocked(sizeof(slock_t));
SpinLockInit(ShmemLock);
aligned = (char *)
(CACHELINEALIGN((((char *) shmhdr) + shmhdr->freeoffset)));
shmhdr->freeoffset = aligned - (char *) shmhdr;
shmhdr->index = NULL;
ShmemIndex = (HTAB *) NULL;
ShmemVariableCache = (VariableCache)
ShmemAlloc(sizeof(*ShmemVariableCache));
memset(ShmemVariableCache, 0, sizeof(*ShmemVariableCache));
}
Original: https://blog.csdn.net/qq_52668274/article/details/127776509
Author: Serendipity_Shy
Title: postgres 源码解析34 进程间通信–2
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/654706/
转载文章受原作者版权保护。转载请注明原作者出处!