阅读 126

ApiTesting全链路接口自动化测试框架 - 数据库校验【新增】(二)

ApiTesting全链路接口自动化测试框架 - 数据库校验【新增】(二)

 

在这之前我完成了对于接口上的自动化测试:ApiTesting全链路接口自动化测试框架 - 初版(一)

但是对于很多公司而言,数据库的数据校验也尤为重要,另外也有小伙伴给我反馈希望支持。

所以最近几天我特意抽空完成了相关的内容开发,另外修复了第一版中一些小的bug,以下是相关升级说明。

 


主要升级内容

1、新增数据库查询类封装:目前主要包括MySQL、HBase、Solr、ES,且均使用SQL语法。

 

2、新增数据库配置文件dbConfig.yml

复制代码

PyDemo:
  # 数据库查询超时时长(不得小于1)  timeout: 3
  # MySQL配置信息  mysql_info:
    address: 10.88.88.88:3160
    db: test    user: test
    auth: test  # HBase配置信息(需要启动phoenix查询服务)  hbase_info:
    address: 10.88.88.88:8765
    db: test
  # ES配置信息(需要开放http查询服务)  es_info:
    address: 10.88.88.88:9200
    db: test  # Solr配置信息  solr_info:
    address: 10.88.88.88:8883

复制代码

 

3、新增数据库查询方法二次封装:主要读取数据库配置,以及在指定超时时间循环查询结果(Redis由于其结果多样性,暂不提供支持)。

必须满足正则表达式 ^select (.*?) from (.*?) where (.*?)$ (注意大小写)

即以select开头  +  *(所有)或字段名 + from + 表名 + where + 条件 [ + and + 其他条件 ... ]

复制代码

# -*- coding:utf-8 -*-# @Time    : 2021/03/09# @Author  : Leo Zhang# @File    : queryDatabase.py# **************************from comm.utils.readYaml import read_yaml_datafrom config import DB_CONFIG, PROJECT_NAMEfrom comm.db import *import loggingimport timeimport re

dbcfg = read_yaml_data(DB_CONFIG)[PROJECT_NAME]def query_mysql(sql):    """查询MySQL数据

    :param sql: sql查询语句
    :return:    """
    # 获取配置信息
    timeout = dbcfg['timeout']
    address = dbcfg['mysql_info']['address']
    user = dbcfg['mysql_info']['user']
    auth = dbcfg['mysql_info']['auth']
    db = dbcfg['mysql_info']['db']    # 初始化MySQL
    host, port = address.split(':')
    mysql = MysqlServer(host, int(port), db, user, auth)
    logging.info('执行查询>>> {}'.format(sql))    # 循环查询
    for i in range(int(timeout)):        try:
            result = mysql.query(sql, is_dict=True)
            mysql.close()            if result:                return result            else:
                time.sleep(1)        except Exception as e:            raise Exception('查询异常>>> {}'.format(e))    else:        return []def query_hbase(sql):    """查询HBase数据

    :param sql: sql查询语句
    :return:    """
    # 获取配置信息
    timeout = dbcfg['timeout']
    address = dbcfg['hbase_info']['address']
    db = dbcfg['hbase_info']['db']    # 检索SQL语句
    exp = r"^select .*? from (.*?) where .*?$"
    table = re.findall(exp, sql.strip())[0]    # 添加数据库
    if '.' not in table:
        sql = sql.strip().replace(table, db+'.'+table)    # 初始化HBase
    hbase = PhoenixServer(address)
    logging.info('执行查询>>> {}'.format(sql))    # 循环查询
    for i in range(int(timeout)):        try:
            result = hbase.query(sql, is_dict=True)            if result:                return result            else:
                time.sleep(1)        except Exception as e:            raise Exception('查询异常>>> {}'.format(e))    else:        return []def query_es(sql):    """查询ES数据

    :param sql: sql查询语句
    :return:    """
    # 获取配置信息
    timeout = dbcfg['timeout']
    address = dbcfg['es_info']['address']
    db = dbcfg['es_info']['db']
    logging.info('执行查询>>> {}'.format(sql))    # 循环查询
    for i in range(int(timeout)):        try:
            result = elastic_search(address, db, sql)            if result:                return result            else:
                time.sleep(1)        except Exception as e:            raise Exception('查询异常>>> {}'.format(e))    else:        return []def query_solr(sql):    """查询solr数据

    :param sql: sql查询语句
    :return:    """
    # 获取配置信息
    timeout = dbcfg['timeout']
    address = dbcfg['solr_info']['address']
    logging.info('执行查询>>> {}'.format(sql))    # 循环查询
    for i in range(int(timeout)):        try:
            result = search_solr(address, sql)            if result:                return result            else:
                time.sleep(1)        except Exception as e:            raise Exception('查询异常>>> {}'.format(e))    else:        return []

复制代码

 

4、更新校验代码:增加数据库字段处理、数据校验方法。

复制代码

# -*- coding:utf-8 -*-# @Time    : 2021/2/2# @Author  : Leo Zhang# @File    : checkResult.py# ***************************import reimport allureimport operatorimport loggingfrom decimal import Decimalfrom comm.unit import readRelevance, replaceRelevancefrom comm.unit import queryDatabase as qdbdef check_json(src_data, dst_data):    """
    校验的json
    :param src_data: 检验内容
    :param dst_data: 接口返回的数据
    :return:    """
    if isinstance(src_data, dict):        for key in src_data:            if key not in dst_data:                raise Exception("JSON格式校验,关键字 %s 不在返回结果 %s 中!" % (key, dst_data))            else:
                this_key = key                if isinstance(src_data[this_key], dict) and isinstance(dst_data[this_key], dict):
                    check_json(src_data[this_key], dst_data[this_key])                elif not isinstance(src_data[this_key], type(dst_data[this_key])):                    raise Exception("JSON格式校验,关键字 %s 返回结果 %s 与期望结果 %s 类型不符"
                                    % (this_key, src_data[this_key], dst_data[this_key]))                else:                    pass
    else:        raise Exception("JSON校验内容非dict格式:{}".format(src_data))def check_database(actual, expected, mark=''):    """校验数据库

    :param actual: 实际结果
    :param expected: 期望结果
    :param mark: 标识
    :return:    """
    if isinstance(actual, dict) and isinstance(expected, dict):
        result = list()
        logging.info('校验数据库{}>>>'.format(mark))
        content = '\n%(key)-20s%(actual)-40s%(expected)-40s%(result)-10s' \                % {'key': 'KEY', 'actual': 'ACTUAL', 'expected': 'EXPECTED', 'result': 'RESULT'}        for key in expected:            if key in actual:
                actual_value = actual[key]            else:
                actual_value = None
            expected_value = expected[key]            if actual_value or expected_value:                if isinstance(actual_value, (int, float, Decimal)):                    if int(actual_value) == int(expected_value):
                        rst = 'PASS'
                    else:
                        rst = 'FAIL'
                else:                    if str(actual_value) == str(expected_value):
                        rst = 'PASS'
                    else:
                        rst = 'FAIL'
            else:
                rst = 'PASS'
            result.append(rst)
            line = '%(key)-20s%(actual)-40s%(expected)-40s%(result)-10s' \                % {'key': key, 'actual': str(actual_value) + ' ',                            'expected': str(expected_value) + ' ', 'result': rst}
            content = content + '\n' + line
        logging.info(content)
        allure.attach(name="校验数据库详情{}".format(mark[-1]), body=str(content))        if 'FAIL' in result:            raise AssertionError('校验数据库{}未通过!'.format(mark))    elif isinstance(actual, list) and isinstance(expected, list):
        result = list()
        logging.info('校验数据库{}>>>'.format(mark))
        content = '\n%(key)-25s%(actual)-35s%(expected)-35s%(result)-10s' \                % {'key': 'INDEX', 'actual': 'ACTUAL', 'expected': 'EXPECTED', 'result': 'RESULT'}        for index in range(len(expected)):            if index < len(actual):
                actual_value = actual[index]            else:
                actual_value = None
            expected_value = expected[index]            if actual_value or expected_value:                if isinstance(actual_value, (int, float, Decimal)):                    if int(actual_value) == int(expected_value):
                        rst = 'PASS'
                    else:
                        rst = 'FAIL'
                else:                    if str(actual_value) == str(expected_value):
                        rst = 'PASS'
                    else:
                        rst = 'FAIL'
            else:
                rst = 'PASS'
            result.append(rst)
            line = '%(key)-25s%(actual)-35s%(expected)-35s%(result)-10s' \                % {'key': index, 'actual': str(actual_value) + ' ',                            'expected': str(expected_value) + ' ', 'result': rst}
            content = content + '\n' + line
        logging.info(content)
        allure.attach(name="校验数据库详情{}".format(mark[-1]), body=str(content))        if 'FAIL' in result:            raise AssertionError('校验数据库{}未通过!'.format(mark))    else:
        logging.info('校验数据库{}>>>'.format(mark))
        logging.info('ACTUAL: {}\nEXPECTED: {}'.format(actual, expected))        if str(expected) != str(actual):            raise AssertionError('校验数据库{}未通过!'.format(mark))def check_result(case_data, code, data):    """
    校验测试结果
    :param case_data: 用例数据
    :param code: 接口状态码
    :param data: 返回的接口json数据
    :return:    """
    try:        # 获取用例检查信息
        check_type = case_data['check_body']['check_type']
        expected_code = case_data['check_body']['expected_code']
        expected_result = case_data['check_body']['expected_result']    except Exception as e:        raise KeyError('获取用例检查信息失败:{}'.format(e))    # 接口数据校验
    if check_type == 'no_check':
        with allure.step("不校验接口结果"):            pass

    elif check_type == 'check_code':
        with allure.step("仅校验接口状态码"):
            allure.attach(name="实际code", body=str(code))
            allure.attach(name="期望code", body=str(expected_code))
            allure.attach(name='实际data', body=str(data))        if int(code) != expected_code:            raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))    elif check_type == 'check_json':
        with allure.step("JSON格式校验接口"):
            allure.attach(name="实际code", body=str(code))
            allure.attach(name="期望code", body=str(expected_code))
            allure.attach(name='实际data', body=str(data))
            allure.attach(name='期望data', body=str(expected_result))        if int(code) == expected_code:            if not data:
                data = "{}"
            check_json(expected_result, data)        else:            raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))    elif check_type == 'entirely_check':
        with allure.step("完全校验接口结果"):
            allure.attach(name="实际code", body=str(code))
            allure.attach(name="期望code", body=str(expected_code))
            allure.attach(name='实际data', body=str(data))
            allure.attach(name='期望data', body=str(expected_result))        if int(code) == expected_code:
            result = operator.eq(expected_result, data)            if not result:                raise Exception("完全校验失败! %s ! = %s" % (expected_result, data))        else:            raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))    elif check_type == 'regular_check':        if int(code) == expected_code:            try:
                result = ""
                if isinstance(expected_result, list):                    for i in expected_result:
                        result = re.findall(i.replace("\"", "\""), str(data))
                        allure.attach('校验完成结果\n', str(result))                else:
                    result = re.findall(expected_result.replace("\"", "\'"), str(data))
                    with allure.step("正则校验接口结果"):
                        allure.attach(name="实际code", body=str(code))
                        allure.attach(name="期望code", body=str(expected_code))
                        allure.attach(name='实际data', body=str(data))
                        allure.attach(name='期望data', body=str(expected_result).replace("\'", "\""))
                        allure.attach(name=expected_result.replace("\"", "\'") + '校验完成结果',
                                      body=str(result).replace("\'", "\""))                if not result:                    raise Exception("正则未校验到内容! %s" % expected_result)            except KeyError:                raise Exception("正则校验执行失败! %s\n正则表达式为空时" % expected_result)        else:            raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code))    else:        raise Exception("无该接口校验方式%s" % check_type)    # 判断是否存在数据库校验标识
    if 'check_db' in case_data:
        check_db = case_data['check_db']        # 获取数据库期望结果:获取期望结果-获取关联值-替换关联值
        data['parameter'] = case_data['parameter']        __relevance = readRelevance.get_relevance(data, check_db)
        check_db = replaceRelevance.replace(check_db, __relevance)        # 循环校验数据库
        for each in check_db:            try:
                check_type = each['check_type']
                execute_sql = each['execute_sql']
                expected_result = each['expected_result']            except KeyError as e:                raise KeyError('【check_db】存在错误字段!\n{}'.format(e))            except TypeError:                raise KeyError("【check_db】类型错误,期望<class 'list'>,而不是%s!" % type(expected_result))            if not isinstance(expected_result, list):                raise KeyError("【expected_result】类型错误,期望<class 'list'>,而不是%s!" % type(expected_result))            # 检索SQL语句
            exp = r"^select (.*?) from (.*?) where (.*?)$"
            res = re.findall(exp, execute_sql.strip())[0]            for r in res:                if not each:
                    msg = '标准格式: ' + exp                    raise Exception('无效SQL>>> {}\n{}'.format(execute_sql, msg))            # 判断数据库检查类型
            if check_type == 'mysql':
                actual = qdb.query_mysql(execute_sql)            elif check_type == 'hbase':
                actual = qdb.query_hbase(execute_sql)            elif check_type == 'solr':
                actual = qdb.query_solr(execute_sql)            elif check_type == 'es':
                actual = qdb.query_es(execute_sql)            else:                raise Exception("无该数据库校验方式%s" % check_type)            # 增加输出并进行数据校验
            mark = check_type.replace('check_', '').upper() + '['+res[1]+']'
            with allure.step("校验数据库{}".format(mark)):
                allure.attach(name="实际结果", body=str(actual))
                allure.attach(name='期望结果', body=str(expected_result))                # expected_num = each['expected_num']
                # allure.attach(name="实际行数", body=str(len(actual)))
                # allure.attach(name='期望行数', body=str(expected_num))
                # # 验证数据库实际结果数量是否正确
                # if len(actual) != int(expected_num):
                #     raise AssertionError('校验数据库{}行数未通过!'.format(mark))
                # 检查实际结果中第一条结果值 ***************
                for index, expected in enumerate(expected_result):                    try:
                        check_database(actual[index], expected, mark+str(index))                    except IndexError:                        raise IndexError('校验数据库{}失败,期望结果超出实际条目!'.format(mark+str(index)))

复制代码

 


5、更新测试用例:新增数据库校验字段,默认无,需自行添加。

复制代码

   
  
     
     select * from TD_ADULT where ADULT_CODE= 
       --
        select * from adult where CHIL_NAME=
     
        
        select * from adultsolr320000 where adultName=
     
        select * from TD_ADULT_YZ where ADULT_CODE=

复制代码

 


6、测试报告展示

数据库校验展开详情

 

运行日志示例

 running.log

 


缺陷修复记录

 

作者:Leozhanggg

出处:https://www.cnblogs.com/leozhanggg/p/14522084.html

源码:https://github.com/Leozhanggg/ApiTesting

本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐