python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

前提

需要拥有管理员账号或获得开发者权限

准备

钉钉方面

1. 创建钉钉应用

钉钉开放平台——应用开发——钉钉应用——创建应用

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

记录应用的AgentId、AppKey、AppSecret

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

3. 打开应用权限

打开需要获取数据的接口权限

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

开发环境方面

1.安装依赖库

dingtalk-sdk 消息加解密同时兼容 cryptography 和 PyCrypto, 优先使用 cryptography 库。 需先自行安装 cryptography 或者 PyCrypto 库:

安装 cryptography
pip install cryptography>=0.8.2

或者安装 PyCrypto
pip install pycrypto>=2.6.1

2.安装 dingtalk-sdk

pip install dingtalk-sdk

with cryptography
pip install dingtalk-sdk[cryptography]

with pycrypto
pip install dingtalk-sdk[pycrypto]

升级 dingtalk-sdk 到新版本

pip install -U dingtalk-sdk

基础接口

1. 获得access_token

dingtalk-sdk方法

from dingtalk import SecretClient, AppKeyClient

client = SecretClient('corp_id', 'secret')  # 旧 access_token 获取方式
client = AppKeyClient('corp_id', 'app_key', 'app_secret')  # 新 access_token 获取方式

自建接口方法

获得access_token
def access_token():
    response = requests.get(
        "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s" % (AppKey, AppSecret))
    if response.status_code == 200:
        str_res = response.text
        token = (json.loads(str_res)).get('access_token')
        return token

2. 获得部门列表

dingtalk-sdk方法

departments = client.department.list()

自建接口方法

获得部门列表
def get_department_list(dept_id=1):
"""
        查询部门列表
        :param dept_id: 部门id,默认为1(根部门),可输入任意部门id
"""
    post_url = "https://oapi.dingtalk.com/topapi/v2/department/listsub?access_token=%s" % (access_token())
    data = {
        "dept_id": dept_id,
    }
    response = requests.post(post_url, data)
    str_res = response.text
    result = (json.loads(str_res)).get('result')
    return result

注意,无论是 dingtalk-sdk 方法还是自建接口方法,都只能获取一层部门的列表, 比如总公司下面有市场部和技术部,市场部下面有研究部和发展部,技术部下面有研究部和开发部,当我们调用接口的时候,只能获取到市场部和技术部,无法获取下面的子部门。想全部获取则需要做循环。

3. 获得部门员工

dingtalk-sdk方法

from dingtalk.client.api import User

object = User(client=client) # 传入上文的token,实例化对象
data = object.listbypage(department_id, offset=0, size=100, order='custom',
                         lang='zh_CN')

自建接口方法

我懒得分别调用每个部门,直接找了个获取全部在职员工的接口,写成了生成器,在外面直接遍历获取就行

获得在职员工列表
def get_user_list():
    post_url = "https://oapi.dingtalk.com/topapi/smartwork/hrm/employee/queryonjob?access_token=%s" % (access_token())
    # 初始化offset
    offset = 0
    while True:
        # 请求参数
        data = {
            "status_list": "2,3,5,-1",
            "offset": offset,
            "size": 50
        }
        # 发送post请求
        response = requests.post(post_url, data)
        str_res = response.text
        # json解析
        result = (json.loads(str_res)).get('result')
        # 返回当前页列表
        yield result["data_list"]
        try:
            # 下一页(下50条)
            offset = result["next_cursor"]
            continue
        # 退出条件
        except KeyError:
            break

4. 获得用户详情

dingtalk-sdk方法

from dingtalk.client.api import User

object = User(client=client) # 传入上文的token,实例化对象
data = object.get(userid, lang='zh_CN')

自建接口方法

获得员工详情(user_id)
def get_user_info(user_id):
"""
    :param user_id:
    :return :
"""
    post_url = "https://oapi.dingtalk.com/topapi/v2/user/get?access_token=%s" % (access_token())
    data = {
        "userid": user_id,
    }
    response = requests.post(post_url, data)
    str_res = response.text
    result = (json.loads(str_res)).get('result')
    return result["name"], result["mobile"], result["userid"]

考勤统计接口

SDK方法已经没有啦~开始手撸~~~

1. 获取考勤报表列名称

我这里是把我需要的列的名称和id返回了,各位各取所需哈

def get_attcolumns():
    post_url = "https://oapi.dingtalk.com/topapi/attendance/getattcolumns?access_token=%s" % (access_token())
    response = requests.post(post_url)
    str_res = response.text
    result = (json.loads(str_res)).get('result')

    name_list = ["应出勤天数", "出勤天数", "休息天数", "迟到次数", "迟到时长", "早退次数", "早退时长", "上班缺卡次数", "下班缺卡次数", "补卡次数", "旷工天数",
                 "加班-审批单统计", "加班总时长", "休息日加班", "工作日加班", "节假日加班"]
    response_data = {}
    for i in result["columns"]:
        try:
            print(i["name"] + ":" + str(i["id"]))
        except:
            print(i["name"] + ":NONE")
        if i["name"] in name_list:
            response_data[i["name"]] = i["id"]
    print(response_data)
    return response_data

2. 获取考勤报表列值

各取所需各取所需

def get_columnval(userid: str, name: str, column_id: str, from_date: str, to_date: str):
"""
    from_date和to_date格式为:'2022-01-01 00:00:00'这样式的
    column_id为列名称id
"""
    post_url = "https://oapi.dingtalk.com/topapi/attendance/getcolumnval?access_token=%s" % (access_token())
    data = {
        "userid": userid,
        "column_id_list": column_id,
        "from_date": from_date,
        "to_date": to_date
    }
    response = requests.post(post_url, data)
    str_res = response.text
    num = 0
    try:
        result = (json.loads(str_res)).get('result').get("column_vals")
        for value in result[0]["column_vals"]:
            num += int(float(value["value"]))
    except:
        pass
    return {name: num}

3. 获取员工考勤详情

这个接口有个问题,它两个时间的间隔不能超过七天..我就顺手也给时间搞定了。

还有个小建议,当你在外面调用接口的时候,肯定会有同时获取好多人考勤详情的情况,可以在调用的时候使用多线程,然后一次只传一个人的userid,效率会很高,不会搞线程的可以找我哈

查询员工考勤情况
def attendance_information(workDateFrom, workDateTo, userIdList):
"""
    查询员工考勤情况
    :param workDateFrom: 查询开始时间 格式:"2022-03-10 00:00:00"
    :param workDateTo: 查询结束时间 格式:"2022-03-10 00:00:00"
    :param userIdList: 员工id列表
    :return: 员工考勤情况 type:dict
"""
    num1 = int(time.mktime(time.strptime(workDateFrom, "%Y-%m-%d %H:%M:%S")))
    num2 = int(time.mktime(time.strptime(workDateTo, "%Y-%m-%d %H:%M:%S")))
    if num2 - num1 >= 604800:
        # 604800为七天的秒数
        response_data = []
        while True:
            if num1 >= num2:
                break
            if num2 - num1 < 604800:
                end_num = num2
            else:
                end_num = num1 + 604799
            startTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(num1))
            endTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_num))
            data = attendance_info(startTime, endTime, userIdList)
            response_data += data
            num1 = end_num + 1
    else:
        response_data = attendance_info(workDateFrom, workDateTo, userIdList)
    return response_data

获取员工打卡结果
def attendance_info(workDateFrom, workDateTo, userIdList):
    post_url = "https://oapi.dingtalk.com/attendance/list?access_token=%s" % (access_token())
    data = {
        "workDateFrom": workDateFrom,
        "workDateTo": workDateTo,
        "userIdList": userIdList,
        "offset": 0,
        "limit": 50
    }
    response = requests.post(post_url, json.dumps(data))
    str_res = response.text
    recordresult = (json.loads(str_res)).get('recordresult')
    return recordresult

4. 审批接口(请假、加班等)

总共分为两个步骤:

  1. 获取审批实例ID列表

  2. 获取审批实例详情

在调用第一个接口的时候,需要传入一个process_code参数,这个参数要在钉钉管理后台的审批模板编辑页面的URL中获取,这需要拿到OA审批的权限:

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

比如这个请假,点击最右侧的编辑,进入页面后,process_code就在上面这个位置,记录下来

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

有了这个code,就可以写接口调用了,这边我封装了一个类:

"""
请假processCode=PROC-EF6YNNYRN2-CRQI0Q3ZSC7L2TDQEKQO1-25SLJQZI-ST
加班processCode=PROC-EF6YNNYRN2-CRQI0Q3ZSC7L2TDQEKQO1-XZRLJQZI-GT
补卡processCode=PROC-EF6YNNYRN2-CRQI0Q3ZSC7L2TDQEKQO1-V2SLJQZI-NT
"""

获取审批信息
class ProcessInstance:
    def __init__(self, process_code: str, start_time: int, end_time: int, userid: str):
        self.process_code = process_code
        self.start_time = start_time
        self.end_time = end_time
        self.userid = userid

    # 获取审批实例id列表
    def processinstance_list(self):
        post_url = "https://oapi.dingtalk.com/topapi/processinstance/listids?access_token=%s" % (access_token())
        data = {
            "process_code": self.process_code,
            "start_time": self.start_time - 2678400000,
            "end_time": self.end_time + 2678400000,
            "userid_list": self.userid,
            "size": 20,
            "cursor": 0,
        }
        response = requests.post(post_url, data)
        str_res = response.text
        result = (json.loads(str_res)).get('result').get("list")
        return result

    # 获取审批实例详情
    def getprocessinstance_details(self):
        lists = self.processinstance_list()
        response_list = []
        for i in lists:
            post_url = "https://oapi.dingtalk.com/topapi/processinstance/get?access_token=%s" % (access_token())
            data = {
                "process_instance_id": i,
            }
            response = requests.post(post_url, data)
            str_res = response.text
            result = (json.loads(str_res)).get('process_instance')
            status = self.status_CN(result.get("status"))
            form_component_values = result.get("form_component_values")
            types = [x.get('value') for x in form_component_values if x.get("component_type") == "DDSelectField"][0]
            s_times = [(json.loads(x.get('value')))
                       for x in form_component_values if x.get("component_type") == "DDDateRangeField"][0][0]
            if self.start_time

因为审批单发起的时间可能与审批单实际的时间不符,比如我明天要请假,我今天发起了请假审批,那我们传入明天的时间就会导致查不到这条审批,于是我就把传入的时间扩大了31天,等查到这个人在这个时间段内发起的审批单后,再根据审批单实际的时间把不符合查询时间筛选出去,这样就能最大程度的保证不遗漏。但是这样也有一个缺点,就是代码处理的时间变长了。

目前我用到的接口就是这些,然后就把数据处理成自己想要的样子,再放到前端页面上显示就可以啦。如果以后用到别的借口,还会继续在这个文章更新。可以简单看一下目前实现的效果

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

Everything is going smoothly.

Original: https://blog.csdn.net/Runaway_pilot/article/details/123655580
Author: Runaway_pilot
Title: python/Django对接dingtalk-sdk企业内部开发对接钉钉过程手记

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/733221/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球