pandas的apply函数常见用法总结

对DataFrame的每一行执行一些复杂的操作

举一个例子,计算DataFrame中每一条数据中两个人的轨迹相似度,因为和业务相关,里面的子函数不能透露,这里重点呈现apply的用法。

import numpy as np

def calculate_trajectory_similarity(df, trail_dict):
"""

    :param trail_dict:
    :param df:
    :return:
"""
    body_threshold = 2
    similarity_threshold = 0.6
    insert_columns = [
        'trajectory_similarity',
    ]
    for c in insert_columns:
        try:
            #
            df.insert(df.shape[1], c, 0)
        except ValueError as v:
            #
            print(str(v))

    def calc_trajectory_similarity(element):
        """计算XXX"""
        trail01 = trail_dict.get(element['record_id1'])
        trail02 = trail_dict.get(element['record_id2'])
        trail_arr1 = np.array(trail01)
        trail_arr2 = np.array(trail02)
        face_trail01 = trail_arr1[trail_arr1[:, -1] == 'face']
        body_trail01 = trail_arr1[trail_arr1[:, -1] == 'body']
        face_trail02 = trail_arr2[trail_arr2[:, -1] == 'face']
        body_trail02 = trail_arr2[trail_arr2[:, -1] == 'body']
        sub_trail_list = [
            (face_trail01, face_trail02),
            (face_trail01, body_trail02),
            (body_trail01, face_trail02),
            (body_trail01, body_trail02)
        ]
        #
        avg_body_width = (element['body_width1'] + element['body_width2']) / 2
        tr_similarity_list = []
        for s in sub_trail_list:
            coordinate_arr1, coordinate_arr2, time_overlap = get_real_time_coordinates(s[0], s[1])
            if time_overlap > 0:
                sub_similarity = trajectory_similarity(coordinate_arr1, coordinate_arr2, avg_body_width, body_threshold,
                                                       similarity_threshold)
            else:
                sub_similarity = 0
            tr_similarity_list.append((min(len(coordinate_arr1), len(coordinate_arr2)), sub_similarity))
        if len(tr_similarity_list) > 0:
            weights = [i[0] for i in tr_similarity_list]
            if np.sum(weights) > 0:  #
                tr_similarity = np.sum([w * s for w, s in tr_similarity_list]) / np.sum(weights)
            else:
                tr_similarity = 0
        else:
            tr_similarity = 0
        element['trajectory_similarity'] = tr_similarity
        return element

    df = df.apply(calc_trajectory_similarity, axis=1)
    return df

里面最核心的操作是 df = df.apply(calc_trajectory_similarity, axis=1),这行代码通过apply调用了calc_trajectory_similarity这个函数,并按照行遍历DataFrame,利用每一行(Series对象)的一些字段信息,计算出轨迹相似度,并存储到DataFrame中。 get_real_time_coordinatestrajectory_similarity分别是统计实时点和计算轨迹相似度的自定义函数,在这里可以不用关注。

对Series的每一个元素执行一些复杂操作

举个例子,现有一些原始的轨迹数据,需要进行预处理,可以针对需要处理的DataFrame字段(Series格式)单独进行操作。

import re

def split_to_int(element):
    """XXX"""
    if element:
        return list(map(int, re.findall(r"[\d]+", element)))
    else:
        element = []
        return element

def split_to_list(element):
    """XXX"""
    if element:
        element = list(re.findall(r"[\d]+", element))
        element = list(map(convert_time, element))
        return element
    else:
        element = []
        return element

def trail_string_processing(df):
"""

    :param df:
    :return:
"""
    #
    pd.set_option('mode.chained_assignment', None)
    trail_name = [
        'trail_left_top_x',
        'trail_left_top_y',
        'trail_right_btm_x',
        'trail_right_btm_y',
    ]
    for t in trail_name:
        df.loc[:, t] = df[t].apply(split_to_int)
    return df

def time_string_processing(df):
"""
    XXX
    :param df:
    :return:
"""
    # XXX
    pd.set_option('mode.chained_assignment', None)
    df.loc[:, 'trail_point_time'] = df['trail_point_time'].apply(split_to_list)
    #
    df.loc[:, 'shot_time'] = df['shot_time'].apply(
        lambda x: x.tz_convert('Asia/Shanghai').tz_localize(None) if x.tz else x)
    return df

在上面的代码中,每一个apply都是针对series执行的操作,apply里面的函数可以是自定义函数,也可以是lambda匿名函数。

对GroupBy对象执行一些复杂操作

举个例子,现有一个DataFrame需要按照某些字段进行分组,然后对分组后的对象执行一些操作,然后重构为新的DataFrame,这时可以通过apply来实现。

import pandas as pd

def merge_key_person_info(df):
"""
    XXXX
    :param df:
    :return:
"""

    def group_by_key_person(element):
        element = element.drop_duplicates(subset=['pvid', 'rel_pvid'])
        #
        key_person_code = element['key_person_code'].iloc[0]
        if key_person_code == 'tag_is_family':
            max_members_num = 6
        else:
            max_members_num = 11
        key_person_num = len(element['pvid'].iloc[0].split(','))
        num_k = max_members_num - key_person_num
        num_k = num_k if num_k > 1 else 1
        element = element.sort_values(by=['relation_score'], ascending=False).iloc[:num_k, :]
        #
        key_person_score = list(set(element['key_person_score'].values))
        rel_pvid_list = list(element['rel_pvid'].values)
        relation_code_list = list(element['relation_code'].values)
        relation_score_list = list(element['relation_score'].values)
        start_time_list = list(element['relation_info_start_time'].values)
        end_time_list = list(element['relation_info_end_time'].values)
        series_dict = {
            'pvid': element['pvid'].iloc[0],
            'corp_id': element['corp_id'].iloc[0],
            'key_person_code': element['key_person_code'].iloc[0],
            'key_person_score': key_person_score,
            'rel_pvid': rel_pvid_list,
            'relation_code': relation_code_list,
            'relation_score': relation_score_list,
            'relation_info_start_time': start_time_list,
            'relation_info_end_time': end_time_list
        }
        result = pd.Series(series_dict)
        return result

    #
    group_by_obj = df.groupby(by=['pvid', 'corp_id', 'key_person_code'])
    group_df = group_by_obj.apply(group_by_key_person).reset_index(drop=True)
    return group_df

有时候为了提升效率,一些涉及到大量数值计算的apply可以使用numpy的.apply_along_axis替代。

def calculate_speed_and_angle_similarity(parameters_df):
"""

    :param parameters_df:
    :return:
"""
    try:
        #
        parameters_df.insert(parameters_df.shape[1], 'angle_similarity', 0)
        parameters_df.insert(parameters_df.shape[1], 'speed_similarity', 0)
    except ValueError as v:
        #
        logger = my_logger()
        logger.info(str(v))

    def calc_angle_speed_similarity(element):
        """XXXX"""
        angle1 = element[35]
        angle2 = element[83]

        moving_speed1 = element[43]
        moving_speed2 = element[91]

        #
        angle_difference = abs(angle1 - angle2)
        if angle_difference >= 90:  #
            angle_similarity = 0
        else:
            angle_similarity = np.cos(abs(angle1 - angle2) / 180 * np.pi)
        element[102] = angle_similarity

        #
        slower_speed = min(moving_speed1, moving_speed2)
        faster_speed = max(moving_speed1, moving_speed2)
        speed_similarity = slower_speed / faster_speed
        element[103] = speed_similarity
        return element

    arr = parameters_df.values
    new_arr = np.apply_along_axis(calc_angle_speed_similarity, axis=1, arr=arr)
    parameters_df = pd.DataFrame(new_arr, columns=parameters_df.columns)
    return parameters_df

按照上述写法,虽然可以在一定程度上提升运行速度,但由于ndarray不支持字符串索引,对字段的操作只能按照序号来进行,很容易出错,代码可读性也比较差,不太推荐在复杂函数中使用,简单的计算用np.apply_along_axis会比较适合。

上面的代码都是一些模块的片段,只是用来展示apply的用法,因此无法跑通,请多包涵。为了信息安全,所有注释和细节代码都删除了。

Original: https://blog.csdn.net/Ray_awakepure/article/details/121778153
Author: Ray_awakepure
Title: pandas的apply函数常见用法总结

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/698170/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球