ApiTesting全链路接口自动化测试框架 - 数据库校验【新增】(二)
ApiTesting全链路接口自动化测试框架 - 数据库校验【新增】(二)
在这之前我完成了对于接口上的自动化测试:ApiTesting全链路接口自动化测试框架 - 初版(一)
但是对于很多公司而言,数据库的数据校验也尤为重要,另外也有小伙伴给我反馈希望支持。
所以最近几天我特意抽空完成了相关的内容开发,另外修复了第一版中一些小的bug,以下是相关升级说明。
主要升级内容
1、新增数据库查询类封装:目前主要包括MySQL、HBase、Solr、ES,且均使用SQL语法。
2、新增数据库配置文件dbConfig.yml
PyDemo: # 数据库查询超时时长(不得小于1) timeout: 3 # MySQL配置信息 mysql_info: address: 10.88.88.88:3160 db: test user: test auth: test # HBase配置信息(需要启动phoenix查询服务) hbase_info: address: 10.88.88.88:8765 db: test # ES配置信息(需要开放http查询服务) es_info: address: 10.88.88.88:9200 db: test # Solr配置信息 solr_info: address: 10.88.88.88:8883
3、新增数据库查询方法二次封装:主要读取数据库配置,以及在指定超时时间循环查询结果(Redis由于其结果多样性,暂不提供支持)。
必须满足正则表达式 ^select (.*?) from (.*?) where (.*?)$ (注意大小写)
即以select开头 + *(所有)或字段名 + from + 表名 + where + 条件 [ + and + 其他条件 ... ]
# -*- coding:utf-8 -*-# @Time : 2021/03/09# @Author : Leo Zhang# @File : queryDatabase.py# **************************from comm.utils.readYaml import read_yaml_datafrom config import DB_CONFIG, PROJECT_NAMEfrom comm.db import *import loggingimport timeimport re dbcfg = read_yaml_data(DB_CONFIG)[PROJECT_NAME]def query_mysql(sql): """查询MySQL数据 :param sql: sql查询语句 :return: """ # 获取配置信息 timeout = dbcfg['timeout'] address = dbcfg['mysql_info']['address'] user = dbcfg['mysql_info']['user'] auth = dbcfg['mysql_info']['auth'] db = dbcfg['mysql_info']['db'] # 初始化MySQL host, port = address.split(':') mysql = MysqlServer(host, int(port), db, user, auth) logging.info('执行查询>>> {}'.format(sql)) # 循环查询 for i in range(int(timeout)): try: result = mysql.query(sql, is_dict=True) mysql.close() if result: return result else: time.sleep(1) except Exception as e: raise Exception('查询异常>>> {}'.format(e)) else: return []def query_hbase(sql): """查询HBase数据 :param sql: sql查询语句 :return: """ # 获取配置信息 timeout = dbcfg['timeout'] address = dbcfg['hbase_info']['address'] db = dbcfg['hbase_info']['db'] # 检索SQL语句 exp = r"^select .*? from (.*?) where .*?$" table = re.findall(exp, sql.strip())[0] # 添加数据库 if '.' not in table: sql = sql.strip().replace(table, db+'.'+table) # 初始化HBase hbase = PhoenixServer(address) logging.info('执行查询>>> {}'.format(sql)) # 循环查询 for i in range(int(timeout)): try: result = hbase.query(sql, is_dict=True) if result: return result else: time.sleep(1) except Exception as e: raise Exception('查询异常>>> {}'.format(e)) else: return []def query_es(sql): """查询ES数据 :param sql: sql查询语句 :return: """ # 获取配置信息 timeout = dbcfg['timeout'] address = dbcfg['es_info']['address'] db = dbcfg['es_info']['db'] logging.info('执行查询>>> {}'.format(sql)) # 循环查询 for i in range(int(timeout)): try: result = elastic_search(address, db, sql) if result: return result else: time.sleep(1) except Exception as e: raise Exception('查询异常>>> {}'.format(e)) else: return []def query_solr(sql): """查询solr数据 :param sql: sql查询语句 :return: """ # 获取配置信息 timeout = dbcfg['timeout'] address = dbcfg['solr_info']['address'] logging.info('执行查询>>> {}'.format(sql)) # 循环查询 for i in range(int(timeout)): try: result = search_solr(address, sql) if result: return result else: time.sleep(1) except Exception as e: raise Exception('查询异常>>> {}'.format(e)) else: return []
4、更新校验代码:增加数据库字段处理、数据校验方法。
# -*- coding:utf-8 -*-# @Time : 2021/2/2# @Author : Leo Zhang# @File : checkResult.py# ***************************import reimport allureimport operatorimport loggingfrom decimal import Decimalfrom comm.unit import readRelevance, replaceRelevancefrom comm.unit import queryDatabase as qdbdef check_json(src_data, dst_data): """ 校验的json :param src_data: 检验内容 :param dst_data: 接口返回的数据 :return: """ if isinstance(src_data, dict): for key in src_data: if key not in dst_data: raise Exception("JSON格式校验,关键字 %s 不在返回结果 %s 中!" % (key, dst_data)) else: this_key = key if isinstance(src_data[this_key], dict) and isinstance(dst_data[this_key], dict): check_json(src_data[this_key], dst_data[this_key]) elif not isinstance(src_data[this_key], type(dst_data[this_key])): raise Exception("JSON格式校验,关键字 %s 返回结果 %s 与期望结果 %s 类型不符" % (this_key, src_data[this_key], dst_data[this_key])) else: pass else: raise Exception("JSON校验内容非dict格式:{}".format(src_data))def check_database(actual, expected, mark=''): """校验数据库 :param actual: 实际结果 :param expected: 期望结果 :param mark: 标识 :return: """ if isinstance(actual, dict) and isinstance(expected, dict): result = list() logging.info('校验数据库{}>>>'.format(mark)) content = '\n%(key)-20s%(actual)-40s%(expected)-40s%(result)-10s' \ % {'key': 'KEY', 'actual': 'ACTUAL', 'expected': 'EXPECTED', 'result': 'RESULT'} for key in expected: if key in actual: actual_value = actual[key] else: actual_value = None expected_value = expected[key] if actual_value or expected_value: if isinstance(actual_value, (int, float, Decimal)): if int(actual_value) == int(expected_value): rst = 'PASS' else: rst = 'FAIL' else: if str(actual_value) == str(expected_value): rst = 'PASS' else: rst = 'FAIL' else: rst = 'PASS' result.append(rst) line = '%(key)-20s%(actual)-40s%(expected)-40s%(result)-10s' \ % {'key': key, 'actual': str(actual_value) + ' ', 'expected': str(expected_value) + ' ', 'result': rst} content = content + '\n' + line logging.info(content) allure.attach(name="校验数据库详情{}".format(mark[-1]), body=str(content)) if 'FAIL' in result: raise AssertionError('校验数据库{}未通过!'.format(mark)) elif isinstance(actual, list) and isinstance(expected, list): result = list() logging.info('校验数据库{}>>>'.format(mark)) content = '\n%(key)-25s%(actual)-35s%(expected)-35s%(result)-10s' \ % {'key': 'INDEX', 'actual': 'ACTUAL', 'expected': 'EXPECTED', 'result': 'RESULT'} for index in range(len(expected)): if index < len(actual): actual_value = actual[index] else: actual_value = None expected_value = expected[index] if actual_value or expected_value: if isinstance(actual_value, (int, float, Decimal)): if int(actual_value) == int(expected_value): rst = 'PASS' else: rst = 'FAIL' else: if str(actual_value) == str(expected_value): rst = 'PASS' else: rst = 'FAIL' else: rst = 'PASS' result.append(rst) line = '%(key)-25s%(actual)-35s%(expected)-35s%(result)-10s' \ % {'key': index, 'actual': str(actual_value) + ' ', 'expected': str(expected_value) + ' ', 'result': rst} content = content + '\n' + line logging.info(content) allure.attach(name="校验数据库详情{}".format(mark[-1]), body=str(content)) if 'FAIL' in result: raise AssertionError('校验数据库{}未通过!'.format(mark)) else: logging.info('校验数据库{}>>>'.format(mark)) logging.info('ACTUAL: {}\nEXPECTED: {}'.format(actual, expected)) if str(expected) != str(actual): raise AssertionError('校验数据库{}未通过!'.format(mark))def check_result(case_data, code, data): """ 校验测试结果 :param case_data: 用例数据 :param code: 接口状态码 :param data: 返回的接口json数据 :return: """ try: # 获取用例检查信息 check_type = case_data['check_body']['check_type'] expected_code = case_data['check_body']['expected_code'] expected_result = case_data['check_body']['expected_result'] except Exception as e: raise KeyError('获取用例检查信息失败:{}'.format(e)) # 接口数据校验 if check_type == 'no_check': with allure.step("不校验接口结果"): pass elif check_type == 'check_code': with allure.step("仅校验接口状态码"): allure.attach(name="实际code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='实际data', body=str(data)) if int(code) != expected_code: raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code)) elif check_type == 'check_json': with allure.step("JSON格式校验接口"): allure.attach(name="实际code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='实际data', body=str(data)) allure.attach(name='期望data', body=str(expected_result)) if int(code) == expected_code: if not data: data = "{}" check_json(expected_result, data) else: raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code)) elif check_type == 'entirely_check': with allure.step("完全校验接口结果"): allure.attach(name="实际code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='实际data', body=str(data)) allure.attach(name='期望data', body=str(expected_result)) if int(code) == expected_code: result = operator.eq(expected_result, data) if not result: raise Exception("完全校验失败! %s ! = %s" % (expected_result, data)) else: raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code)) elif check_type == 'regular_check': if int(code) == expected_code: try: result = "" if isinstance(expected_result, list): for i in expected_result: result = re.findall(i.replace("\"", "\""), str(data)) allure.attach('校验完成结果\n', str(result)) else: result = re.findall(expected_result.replace("\"", "\'"), str(data)) with allure.step("正则校验接口结果"): allure.attach(name="实际code", body=str(code)) allure.attach(name="期望code", body=str(expected_code)) allure.attach(name='实际data', body=str(data)) allure.attach(name='期望data', body=str(expected_result).replace("\'", "\"")) allure.attach(name=expected_result.replace("\"", "\'") + '校验完成结果', body=str(result).replace("\'", "\"")) if not result: raise Exception("正则未校验到内容! %s" % expected_result) except KeyError: raise Exception("正则校验执行失败! %s\n正则表达式为空时" % expected_result) else: raise Exception("接口状态码错误!\n %s != %s" % (code, expected_code)) else: raise Exception("无该接口校验方式%s" % check_type) # 判断是否存在数据库校验标识 if 'check_db' in case_data: check_db = case_data['check_db'] # 获取数据库期望结果:获取期望结果-获取关联值-替换关联值 data['parameter'] = case_data['parameter'] __relevance = readRelevance.get_relevance(data, check_db) check_db = replaceRelevance.replace(check_db, __relevance) # 循环校验数据库 for each in check_db: try: check_type = each['check_type'] execute_sql = each['execute_sql'] expected_result = each['expected_result'] except KeyError as e: raise KeyError('【check_db】存在错误字段!\n{}'.format(e)) except TypeError: raise KeyError("【check_db】类型错误,期望<class 'list'>,而不是%s!" % type(expected_result)) if not isinstance(expected_result, list): raise KeyError("【expected_result】类型错误,期望<class 'list'>,而不是%s!" % type(expected_result)) # 检索SQL语句 exp = r"^select (.*?) from (.*?) where (.*?)$" res = re.findall(exp, execute_sql.strip())[0] for r in res: if not each: msg = '标准格式: ' + exp raise Exception('无效SQL>>> {}\n{}'.format(execute_sql, msg)) # 判断数据库检查类型 if check_type == 'mysql': actual = qdb.query_mysql(execute_sql) elif check_type == 'hbase': actual = qdb.query_hbase(execute_sql) elif check_type == 'solr': actual = qdb.query_solr(execute_sql) elif check_type == 'es': actual = qdb.query_es(execute_sql) else: raise Exception("无该数据库校验方式%s" % check_type) # 增加输出并进行数据校验 mark = check_type.replace('check_', '').upper() + '['+res[1]+']' with allure.step("校验数据库{}".format(mark)): allure.attach(name="实际结果", body=str(actual)) allure.attach(name='期望结果', body=str(expected_result)) # expected_num = each['expected_num'] # allure.attach(name="实际行数", body=str(len(actual))) # allure.attach(name='期望行数', body=str(expected_num)) # # 验证数据库实际结果数量是否正确 # if len(actual) != int(expected_num): # raise AssertionError('校验数据库{}行数未通过!'.format(mark)) # 检查实际结果中第一条结果值 *************** for index, expected in enumerate(expected_result): try: check_database(actual[index], expected, mark+str(index)) except IndexError: raise IndexError('校验数据库{}失败,期望结果超出实际条目!'.format(mark+str(index)))
5、更新测试用例:新增数据库校验字段,默认无,需自行添加。
select * from TD_ADULT where ADULT_CODE= -- select * from adult where CHIL_NAME= select * from adultsolr320000 where adultName= select * from TD_ADULT_YZ where ADULT_CODE=
6、测试报告展示
数据库校验展开详情
运行日志示例
running.log
缺陷修复记录
作者:Leozhanggg
出处:https://www.cnblogs.com/leozhanggg/p/14522084.html
源码:https://github.com/Leozhanggg/ApiTesting
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。