numpy的并行多进程处理

因为要处理接近32G+数据集,为提高处理效率,没有使用数据库,直接在大内存的机器上用了numpy。

对数据集还要做一些计算,包含逻辑工作流,不能简单向量化。又想进一步提高处理速度,就需要并行化。

关于numpy的ndarray的并行化处理,已经又前人研究过,使用multiprocessing.shared_memory 模块。比较优秀的有:Python多进程处理(读、写)numpy矩阵

这里作一些补充。

1、在Windows系统中(win10 64位),RawArray进入子进程后,会出现”mmap.mmap(-1,…) … OSError: [WinError 1450] 系统资源不足,无法完成请求的服务”的错误,也有可能是” OSError: [WinError 1455] 页面文件太小,无法完成操作”的错误。

虽然在任务管理器中,明显看到实际分配的RawArray还远小于实际内存的容量,但是还是报错。这个问题在linux中不会发生。

解决办法是设置Windows的虚拟内存为固定值,固定值只管放大到比实际内存大,甚至可以到2倍,而且不必放到SSD中,放到机械硬盘里都行。程序实际运行中,可以看到硬盘是不工作的。

在程序运行结束时,还要注意主动 del RawArray,回收一下内存。否则下一次运行时,Windows还可能会报”[WinError 1455] 页面文件太小”的错误。

numpy的并行多进程处理

2、numpy的向量化加速问题。Anaconda Python的numpy是按intel MKL配置加速的,理论上是有向量加速的。但是经过测试,多进程并行处理,还是比单进程要快,所以可以认为intel MKL的向量加速只是基于单核的,如有必要,还是可以把单个numpy.ndarray进行分块,进行并行处理提速。

3、共享的RawArray的并行处理,还是建议使用multiprocessing.Processing,而不要用multiprocessing.Pool。因为前者的父子进程具有继承性,参数传递上无需global化,比较方便。

附上演示代码

-*- coding: utf-8 -*-
from __future__ import absolute_import, print_function

import multiprocessing
import os, sys, time, gc, psutil
import time
import numpy
import multiprocessing.managers
import multiprocessing.sharedctypes

def multi_run (func, imapS, *args):
    # 类似 pool,但是可以形成 子进程 的继承关系,方便传递 共享内存变量
    print ("len(imapS) ", len(imapS))
    ProcS = []
    for s in imapS:
        p = multiprocessing.Process(target=func,args=(s, *args))
        p.daemon = True
        p.start()
        print ("start .. ", psutil.virtual_memory().used)
        ProcS.append (p)

    for p in ProcS:
        p.join()

def Split_List (mList, mSplit):
    # 把一个大 list 基本平均切分成 多份,用于后续的 mulitprocessing 处理
    mp_Length = len(mList)
    step = mp_Length / mSplit # 每份的长度
    if step < 1:
        step = 1
    elif step != int(step):
        step = int(step) + 1
        # 取整后再加1的目的是,不要把前面n个进程余下来的任务,都压倒最后一个进程上
        # 相反,是取最后一个进程工作量不足的,分担到前面的进程中去
    ret = []
    c = 0
    while c < mp_Length :
        ret.append (mList[c: c + step])
        c += step
    return ret

def write_m (mrange, gm, gmp_Count, gstatic_C, static_LL):
    m = numpy.frombuffer(gm, dtype=numpy.uint8)
    m = numpy.reshape (m, (gmp_Count*gstatic_C , static_LL))
    for i in mrange:
        m[i,:] = i+1

def calc_m (mrange, gm, gmp_Count, gstatic_C, static_LL):
    m = numpy.frombuffer(gm, dtype=numpy.uint8)
    m = numpy.reshape (m, (gmp_Count*gstatic_C , static_LL))
    m[mrange,:] = numpy.exp (m[mrange,:])

if __name__ == "__main__":
    multiprocessing.freeze_support()
    mp_Count = multiprocessing.cpu_count()
    # 以下两个参数可根据运行的机器的实际内存容量进行调整,目前参数值适合于32G内存机器
    static_C = 2
    static_LL = 1024 *1024* 512

    raw_arr_m = multiprocessing.sharedctypes.RawArray( \
        numpy.ctypeslib.as_ctypes_type(numpy.uint8), \
        mp_Count*static_C *static_LL)
    m = numpy.frombuffer(raw_arr_m, dtype=numpy.uint8)
    m = numpy.reshape (m, (mp_Count*static_C , static_LL))
    print ("Get Memory .. ", m.shape, psutil.virtual_memory().used)

    for i in range(mp_Count):
        print (i*static_C, m[i*static_C, :10])

    sp = Split_List(list(range(mp_Count*static_C)), mp_Count)
    multi_run (write_m, sp, raw_arr_m, mp_Count, static_C, static_LL)

    print ("calc Start .")
    t0 = time.time()
    m = numpy.exp (m)
    print ("single", time.time() - t0)

    sp = Split_List(list(range(mp_Count*static_C)), mp_Count)
    multi_run (write_m, sp, raw_arr_m, mp_Count, static_C, static_LL)
    t0 = time.time()
    multi_run (calc_m, sp, raw_arr_m, mp_Count, static_C, static_LL)
    print ("multi", time.time() - t0)

    for i in range(m.shape[0]):
        print (i, m[i, :10])
    del raw_arr_m
    gc.collect()

如果处理过程比较简单,使用的 numpy函数能被 numba 模块,则用 numba 更为简便,无需操作 共享内存,即能实现numpy 的并行化计算。比起 numpy.vectorize ,或则 numpy.fromfunction 更为便利。当然 numba 还是有不少限制,有些函数虽然支持,但是参数限制性比原生的 numpy 要范围小得多,前面的办法相对”万能”使用。

对于参数限制了的numpy函数,也可以自行进行算法展开进行弥补。由于 numba 是实时”编译”后执行,在代码量较大的情况下,特别是使用了njit(parallel=True)参数后,即nopython方式,速度还是传统较快得多。在另一个复杂的算例(代码不便贴出),可以比 共享内存的并行方式快上一倍。

numba 的示例代码如下

-*- coding: utf-8 -*-
from __future__ import absolute_import, print_function

import multiprocessing
import os, sys, time, gc, psutil
import time
import numpy
import numba
from numba.typed import List

@numba.njit
def write_m (mrange, m):
    for i in mrange:
        m[i,:] = i+1

@numba.njit
def calc_m (mrange, m):
    for i in mrange:
        m[i,:] = numpy.exp (m[i,:]).astype(numpy.uint8)

if __name__ == "__main__":
    multiprocessing.freeze_support()
    mp_Count = multiprocessing.cpu_count()
    # 以下两个参数可根据运行的机器的实际内存容量进行调整,目前参数值适合于32G内存机器
    static_C = 2
    static_LL = 1024 *1024* 512

    m = numpy.empty((mp_Count*static_C , static_LL), dtype=numpy.uint8)
    print ("Get Memory .. ", m.shape, psutil.virtual_memory().used)

    for i in range(mp_Count):
        print (i*static_C, m[i*static_C, :10])

    write_m (List(range(mp_Count*static_C)), m)

    print ("calc Start .")

    t0 = time.time()
    m = numpy.exp (m).astype(numpy.uint8)
    print ("single", time.time() - t0)

    write_m (List(range(mp_Count*static_C)), m)

    t0 = time.time()
    calc_m (List(range(mp_Count*static_C)), m)
    print ("multi", time.time() - t0)

    for i in range(m.shape[0]):
        print (i, m[i, :10])

Original: https://blog.csdn.net/makefool/article/details/121185054
Author: makefool
Title: numpy的并行多进程处理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/760174/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球