postgres 源码解析34 进程间通信–2

本小节将从源码角度重点讲解postgres中共享内存和信号量创建,其接口函数为CreateSharedMemoryAndSemaphores。postmaster调用此函数时会初始化共享内存和信号量,而其他进程调用不进行初始化,仅获取全局变量:共享内存指针和信号量指针。关键数据结构知识回顾见 postgres 源码解析33 进程间通信–1

1) 计算共享内存总大小
2 )为各个模块分配相应大小的内存空间
3)初始化共享内存头指针
4)初始化各模块内存空间
5)启动动态共享内存设施

本文讲解蓝色字体模块逻辑

1 计算共享内存总大小


void
CreateSharedMemoryAndSemaphores(void)
{
    PGShmemHeader *shim = NULL;

    if (!IsUnderPostmaster)
    {
        PGShmemHeader *seghdr;
        Size        size;
        int         numSemas;

        numSemas = ProcGlobalSemas();
        numSemas += SpinlockSemas();

        size = 100000;
        size = add_size(size, PGSemaphoreShmemSize(numSemas));
        size = add_size(size, SpinlockSemaSize());
        size = add_size(size, hash_estimate_size(SHMEM_INDEX_SIZE,
                                                 sizeof(ShmemIndexEnt)));
        size = add_size(size, dsm_estimate_size());
        size = add_size(size, BufferShmemSize());
        size = add_size(size, LockShmemSize());
        size = add_size(size, PredicateLockShmemSize());
        size = add_size(size, ProcGlobalShmemSize());
        size = add_size(size, XLOGShmemSize());
        size = add_size(size, CLOGShmemSize());
        size = add_size(size, CommitTsShmemSize());
        size = add_size(size, SUBTRANSShmemSize());
        size = add_size(size, TwoPhaseShmemSize());
        size = add_size(size, BackgroundWorkerShmemSize());
        size = add_size(size, MultiXactShmemSize());
        size = add_size(size, LWLockShmemSize());
        size = add_size(size, ProcArrayShmemSize());
        size = add_size(size, BackendStatusShmemSize());
        size = add_size(size, SInvalShmemSize());
        size = add_size(size, PMSignalShmemSize());
        size = add_size(size, ProcSignalShmemSize());
        size = add_size(size, CheckpointerShmemSize());
        size = add_size(size, AutoVacuumShmemSize());
        size = add_size(size, ReplicationSlotsShmemSize());
        size = add_size(size, ReplicationOriginShmemSize());
        size = add_size(size, WalSndShmemSize());
        size = add_size(size, WalRcvShmemSize());
        size = add_size(size, PgArchShmemSize());
        size = add_size(size, ApplyLauncherShmemSize());
        size = add_size(size, SnapMgrShmemSize());
        size = add_size(size, BTreeShmemSize());
        size = add_size(size, SyncScanShmemSize());
        size = add_size(size, AsyncShmemSize());
#ifdef EXEC_BACKEND
        size = add_size(size, ShmemBackendArraySize());
#endif

        addin_request_allowed = false;
        size = add_size(size, total_addin_request);

        size = add_size(size, 8192 - (size % 8192));

        elog(DEBUG3, "invoking IpcMemoryCreate(size=%zu)", size);

2 PGSharedMemoryCreate
1)首先进行安全性检查,如DataDir目录头信息,申请内存大小大于 PGShmemHeader结构体大小等;
2)根据共享内存类型shared_memory_type调用对应的方法申请共享内存,如SHMEM_TYPE_MMAP则调用CreateAnonymousSegment申请匿名内存段;
3)紧接着为指定的 IPC key分配一个新的内存段,将这块内存段 Attach在当前进程,注册on_shmem_exit 回调函数释放此存储空间;
4)初始化标准头信息 PGShmemHeader,返回申请内存的头地址。


PGShmemHeader *
PGSharedMemoryCreate(Size size,
                     PGShmemHeader **shim)
{
    IpcMemoryKey NextShmemSegID;
    void       *memAddress;
    PGShmemHeader *hdr;
    struct stat statbuf;
    Size        sysvsize;

    if (stat(DataDir, &statbuf) < 0)
        ereport(FATAL,
                (errcode_for_file_access(),
                 errmsg("could not stat data directory \"%s\": %m",
                        DataDir)));

#if !defined(MAP_HUGETLB)
    if (huge_pages == HUGE_PAGES_ON)
        ereport(ERROR,
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg("huge pages not supported on this platform")));
#endif

    Assert(size > MAXALIGN(sizeof(PGShmemHeader)));

    if (shared_memory_type == SHMEM_TYPE_MMAP)
    {
        AnonymousShmem = CreateAnonymousSegment(&size);
        AnonymousShmemSize = size;

        on_shmem_exit(AnonymousShmemDetach, (Datum) 0);

        sysvsize = sizeof(PGShmemHeader);
    }
    else
        sysvsize = size;

    NextShmemSegID = statbuf.st_ino;

    for (;;)
    {
        IpcMemoryId shmid;
        PGShmemHeader *oldhdr;
        IpcMemoryState state;

        memAddress = InternalIpcMemoryCreate(NextShmemSegID, sysvsize);
        if (memAddress)
            break;

        shmid = shmget(NextShmemSegID, sizeof(PGShmemHeader), 0);
        if (shmid < 0)
        {
            oldhdr = NULL;
            state = SHMSTATE_FOREIGN;
        }
        else
            state = PGSharedMemoryAttach(shmid, NULL, &oldhdr);

        switch (state)
        {
            case SHMSTATE_ANALYSIS_FAILURE:
            case SHMSTATE_ATTACHED:
                ereport(FATAL,
                        (errcode(ERRCODE_LOCK_FILE_EXISTS),
                         errmsg("pre-existing shared memory block (key %lu, ID %lu) is still in use",
                                (unsigned long) NextShmemSegID,
                                (unsigned long) shmid),
                         errhint("Terminate any old server processes associated with data directory \"%s\".",
                                 DataDir)));
                break;
            case SHMSTATE_ENOENT:

                elog(LOG,
                     "shared memory block (key %lu, ID %lu) deleted during startup",
                     (unsigned long) NextShmemSegID,
                     (unsigned long) shmid);
                break;
            case SHMSTATE_FOREIGN:
                NextShmemSegID++;
                break;
            case SHMSTATE_UNATTACHED:

                if (oldhdr->dsm_control != 0)
                    dsm_cleanup_using_control_segment(oldhdr->dsm_control);
                if (shmctl(shmid, IPC_RMID, NULL) < 0)
                    NextShmemSegID++;
                break;
        }

        if (oldhdr && shmdt(oldhdr) < 0)
            elog(LOG, "shmdt(%p) failed: %m", oldhdr);
    }

    hdr = (PGShmemHeader *) memAddress;
    hdr->creatorPID = getpid();
    hdr->magic = PGShmemMagic;
    hdr->dsm_control = 0;

    hdr->device = statbuf.st_dev;
    hdr->inode = statbuf.st_ino;

    hdr->totalsize = size;
    hdr->freeoffset = MAXALIGN(sizeof(PGShmemHeader));
    *shim = hdr;

    UsedShmemSegAddr = memAddress;
    UsedShmemSegID = (unsigned long) NextShmemSegID;

    if (AnonymousShmem == NULL)
        return hdr;
    memcpy(AnonymousShmem, hdr, sizeof(PGShmemHeader));
    return (PGShmemHeader *) AnonymousShmem;
}

3 InitShmemAccess
该函数负责初始化全局的共享内存指针,方便后续各模块内存初始化调用


void
InitShmemAccess(void *seghdr)
{
    PGShmemHeader *shmhdr = (PGShmemHeader *) seghdr;

    ShmemSegHdr = shmhdr;
    ShmemBase = (void *) shmhdr;
    ShmemEnd = (char *) ShmemBase + shmhdr->totalsize;
}

4 PGReserveSemaphores
该函数用于初始化信号量模块的共享内存,以支持进程间通信


void
PGReserveSemaphores(int maxSemas)
{
    struct stat statbuf;

    if (stat(DataDir, &statbuf) < 0)
        ereport(FATAL,
                (errcode_for_file_access(),
                 errmsg("could not stat data directory \"%s\": %m",
                        DataDir)));

    sharedSemas = (PGSemaphore)
        ShmemAllocUnlocked(PGSemaphoreShmemSize(maxSemas));
    numSharedSemas = 0;
    maxSharedSemas = maxSemas;

    maxSemaSets = (maxSemas + SEMAS_PER_SET - 1) / SEMAS_PER_SET;
    mySemaSets = (IpcSemaphoreId *)
        malloc(maxSemaSets * sizeof(IpcSemaphoreId));
    if (mySemaSets == NULL)
        elog(PANIC, "out of memory");
    numSemaSets = 0;
    nextSemaKey = statbuf.st_ino;
    nextSemaNumber = SEMAS_PER_SET;

    on_shmem_exit(ReleaseSemaphores, 0);
}

5 SpinlockSemaInit
该函数用于实现SpinLock信号量初始化,一些平台支持Spinlock


void
SpinlockSemaInit(void)
{
    PGSemaphore *spinsemas;
    int         nsemas = SpinlockSemas();
    int         i;

    spinsemas = (PGSemaphore *) ShmemAllocUnlocked(SpinlockSemaSize());
    for (i = 0; i < nsemas; ++i)
        spinsemas[i] = PGSemaphoreCreate();
    SpinlockSemaArray = spinsemas;
}

6 InitShmemAllocation
该函数用于初始化全局变量 ShmemLock,在后续各模块申请内存空间需持有该锁;同时初始化事务管理器中OID/XID相关的全局结构体:ShmemVariableCache


void
InitShmemAllocation(void)
{
    PGShmemHeader *shmhdr = ShmemSegHdr;
    char       *aligned;

    Assert(shmhdr != NULL);

    ShmemLock = (slock_t *) ShmemAllocUnlocked(sizeof(slock_t));

    SpinLockInit(ShmemLock);

    aligned = (char *)
        (CACHELINEALIGN((((char *) shmhdr) + shmhdr->freeoffset)));
    shmhdr->freeoffset = aligned - (char *) shmhdr;

    shmhdr->index = NULL;
    ShmemIndex = (HTAB *) NULL;

    ShmemVariableCache = (VariableCache)
        ShmemAlloc(sizeof(*ShmemVariableCache));
    memset(ShmemVariableCache, 0, sizeof(*ShmemVariableCache));
}

Original: https://blog.csdn.net/qq_52668274/article/details/127776509
Author: Serendipity_Shy
Title: postgres 源码解析34 进程间通信–2

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/654706/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球