From d55dd75ff82f126ebcc1389a73bcdd8d5bc93058 Mon Sep 17 00:00:00 2001 From: YoVinchen Date: Sat, 2 Aug 2025 01:23:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E4=BB=A5=E5=89=8D=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/Query.py | 276 -------------------------------------------- demo/twcsQuery.py | 283 ---------------------------------------------- 2 files changed, 559 deletions(-) delete mode 100644 demo/Query.py delete mode 100644 demo/twcsQuery.py diff --git a/demo/Query.py b/demo/Query.py deleted file mode 100644 index c63469d..0000000 --- a/demo/Query.py +++ /dev/null @@ -1,276 +0,0 @@ -from cassandra.cluster import Cluster -from cassandra.auth import PlainTextAuthProvider -from cassandra.cqlengine.columns import Boolean -from cassandra.policies import ColDesc - -# CBase Hot -cluster_nodes_pro = ['10.20.2.22'] # Cassandra节点的IP地址 -port_pro = 9042 # Cassandra使用的端口 -username_pro = 'cbase' # Cassandra用户名 -password_pro = 'antducbaseadmin@2022' # Cassandra密码 -keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro - -cluster_nodes_test = ['10.20.2.22'] # Cassandra节点的IP地址 -port_test = 9042 # Cassandra使用的端口 -username_test = 'cbase' # Cassandra用户名 -password_test = 'antducbaseadmin@2022' # Cassandra密码 -keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro - -# CBase Cold -# cluster_nodes_pro = ['10.20.1.108'] # Cassandra节点的IP地址 -# port_pro = 9042 # Cassandra使用的端口 -# username_pro = 'cassandra' # Cassandra用户名 -# password_pro = 'cassandra' # Cassandra密码 -# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro -# cluster_nodes_test = ['10.20.1.108'] # Cassandra节点的IP地址 -# port_test = 9042 # Cassandra使用的端口 -# username_test = 'cassandra' # Cassandra用户名 -# password_test = 'cassandra' # Cassandra密码 -# keyspace_test = 'yuqing_skinny' # Cassandra - -# cluster_nodes_pro = ['10.20.1.119'] # Cassandra节点的IP地址 -# port_pro = 9044 # Cassandra使用的端口 -# username_pro = 'cbase' # Cassandra用户名 -# password_pro = 'antducbaseadmin@2022' # Cassandra密码 -# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro - -data_table_pro = "document" -data_table_test = data_table_pro + "_test" - -values = [] # 多个ID值 - -# data_table_pro = "doc_view_8" -# data_table_test = "doc_view_test" - -# 定义主键字段及其对应的多ID值 -keys = ["docid"] - -# 比较全部字段 -fields_to_compare = [] -# fields_to_compare = [ -# "statusid", -# "taglocation", -# "tagemotion", -# "tagindustry", -# "tagdomain", -# "tagtopic", -# "tagsimilar", -# "tagother", -# "hasprivacy", -# "istaged", -# "createat", -# ] # 指定要比较的字段 - -exclude_fields = [] # 需要排除的字段 -# exclude_fields = ['mcrelated','ocrtexts',''] - -# 小写转换 -fields_to_compare = [field.lower() for field in fields_to_compare] -exclude_fields = [field.lower() for field in exclude_fields] -keys = [field.lower() for field in keys] - -# 定义存储字段差异数量的字典 -field_diff_count = {} - -# 输出文件 -output_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/output.txt" -input_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/input.txt" - -# 清空文件内容 -open(output_file, "w").close() - -with open(input_file, "r") as file: - values = [item.replace('"', '') for line in file for item in line.strip().split(",") if item] - -# 创建身份验证提供程序 -auth_provider = PlainTextAuthProvider(username=username_pro, password=password_pro) -# 连接到Cassandra集群 -cluster = Cluster(cluster_nodes_pro, port=port_pro, auth_provider=auth_provider) -session = cluster.connect(keyspace_pro) # 连接到指定的keyspace - -# 创建身份验证提供程序 -auth_provider1 = PlainTextAuthProvider(username=username_test, password=password_test) -# 连接到Cassandra集群 -cluster1 = Cluster(cluster_nodes_test, port=port_test, auth_provider=auth_provider1) -session1 = cluster1.connect(keyspace_test) # 连接到指定的keyspace -# 构建IN查询语句 -query_conditions = f"""{keys[0]} IN ({', '.join([f"'{value}'" for value in values])})""" - -# 如果 fields_to_compare 不为空,使用其中的字段,否则使用 * -fields_str = ", ".join(fields_to_compare) if fields_to_compare else "*" - -query_sql1 = f"SELECT {fields_str} FROM {data_table_pro} WHERE {query_conditions};" -query_sql2 = f"SELECT {fields_str} FROM {data_table_test} WHERE {query_conditions};" - -# 执行查询 -result_doc_data = session.execute(query_sql1) -result_doc_data_test = session1.execute(query_sql2) - -# 检查查询结果是否为空,并转换查询结果为列表 -list_doc_data = list(result_doc_data) if result_doc_data else [] -list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else [] - -# list_doc_data = list(result_doc_data) if result_doc_data else [] -# list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else [] - -with open(output_file, "a") as f: - f.write(f"查询 {data_table_pro} 内容和 {data_table_test} 内容:\n") - for item1, item2 in zip(list_doc_data, list_doc_data_test): - f.write(f"{item1}\n{item2}\n\n") - -# with open(output_file, "a") as f: -# f.write(f"查询 {data_table_pro} 内容:\n{list_doc_data}\n") -# f.write(f"查询 {data_table_test} 内容:\n{list_doc_data_test}\n") - -if not list_doc_data: - with open(output_file, "a") as f: - f.write(f"查询 {data_table_pro} 的结果为空。") - print(f"查询 {data_table_pro} 的结果为空。") -if not list_doc_data_test: - with open(output_file, "a") as f: - f.write(f"查询 {data_table_test} 的结果为空。") - print(f"查询 {data_table_test} 的结果为空。") - -# 创建一个列表来存储详细比较结果 -differences = [] - - -# 进行详细比较 -def compare_data(fields_to_compare=None, exclude_fields=None): - exclude_fields = exclude_fields or [] # 如果未指定排除字段,默认为空列表 - - for value in values: - # 查找原表和测试表中该ID的相关数据 - rows_data = [row for row in list_doc_data if getattr(row, keys[0]) == value] - rows_test = [row for row in list_doc_data_test if getattr(row, keys[0]) == value] - - for row_data in rows_data: - # 在 doc_data_test 中查找相同主键的行 - row_test = next( - (row for row in rows_test if all(getattr(row, key) == getattr(row_data, key) for key in keys)), - None) - - if row_test: - # 如果在 doc_data_test 中找到相同的主键,则逐列比较 - columns = fields_to_compare if fields_to_compare else row_data._fields - columns = [col for col in columns if col not in exclude_fields] # 过滤排除字段 - - for column in columns: - value_data = getattr(row_data, column) - value_test = getattr(row_test, column) - if value_data != value_test: - differences.append({ - '主键': {key: getattr(row_data, key) for key in keys}, - '字段': column, - '生产表': f"\n{value_data}\n", - '测试表': f"\n{value_test}\n" - }) - - # 统计字段差异次数 - if column in field_diff_count: - field_diff_count[column] += 1 - else: - field_diff_count[column] = 1 - - else: - # 如果在 doc_data_test 中未找到相同的行 - differences.append({ - '主键': {key: getattr(row_data, key) for key in keys}, - '消息': f'在 {data_table_test} 中未找到该行' - }) - - # 比较 doc_data_test 中的行是否在 doc_data 中存在 - for row_test in rows_test: - row_data = next( - (row for row in rows_data if all(getattr(row, key) == getattr(row_test, key) for key in keys)), None) - if not row_data: - differences.append({ - '主键': {key: getattr(row_test, key) for key in keys}, - '消息': f'在 {data_table_pro} 中未找到该行' - }) - - -compare_data(fields_to_compare, exclude_fields) - -# 使用集合来保存唯一的 topicid -id_set = set() -field_set = set() - -grouped_id_dict = {} - -# 输出指定字段的差异 -with open(output_file, "a") as f: - if differences: - f.write("\n发现指定字段的差异:\n") - for diff in differences: - # 逐行打印每个差异 - f.write(f"主键: {diff['主键']}\n") - f.write(f"字段: {diff.get('字段', 'N/A')}\n") - f.write(f"生产表: {diff.get('生产表', 'N/A')}\n") - f.write(f"测试表: {diff.get('测试表', 'N/A')}\n") - f.write("-" * 50) # 分隔符,便于查看 - f.write("\n") - - # 将差异ID按字段名分组 - field = diff.get('字段', '未分组') - if field not in grouped_id_dict: - grouped_id_dict[field] = set() - for key in keys: - id = diff['主键'][key] - id_set.add('"' + id + '",') - grouped_id_dict[field].add('"' + id + '",') - - # 输出分组后的差异ID - if grouped_id_dict: - f.write("\n差异ID按字段分组如下:") - for field, ids in grouped_id_dict.items(): - field_set.add('"' + field + '",') - f.write(f"字段: {field}\n") - f.write("差异ID:") - for id in ids: - f.write(id) - f.write("\n") - f.write("-" * 50) # 分隔符,便于查看 - f.write("\n") - else: - f.write("\n指定字段未发现差异。\n") - - f.write("\n") - - # 只有在 field_set 不为空时才打印 - if field_set: - f.write("\n存在差异的 字段 为:\n") - # 打印所有唯一的 field - for field in field_set: - f.write(field + "\n") - - # 只有在 id_set 不为空时才打印 - if id_set: - f.write("\n存在差异的 ID 为:\n") - # 打印所有唯一的 topicid - for topicid in id_set: - f.write(topicid + "\n") - f.write("\n") - - # 计算存在差异的 ID - different_ids = {id.strip('"').strip(',').strip('"') for id in id_set} - - # 计算不存在差异的 ID(即在 values 中但不在 different_ids 中) - non_different_ids = set(values) - different_ids - - # 只有在 non_different_ids 非空时才打印 - if non_different_ids: - f.write("\n不存在差异的 ID 为:\n") - for topicid in non_different_ids: - f.write(f'"{topicid}",\n') - f.write("\n") - - f.write("总计key " + len(values).__str__() + "个") - # 统计每个字段的差异数量 - if field_diff_count: - f.write("\n字段差异统计如下:\n") - for field, count in field_diff_count.items(): - f.write(f"字段 '{field}' 发现 {count} 处差异\n") - -# 关闭连接 -cluster.shutdown() diff --git a/demo/twcsQuery.py b/demo/twcsQuery.py deleted file mode 100644 index c260401..0000000 --- a/demo/twcsQuery.py +++ /dev/null @@ -1,283 +0,0 @@ -from cassandra.cluster import Cluster -from cassandra.auth import PlainTextAuthProvider -from cassandra.cqlengine.columns import Boolean -from cassandra.policies import ColDesc - -# # 配置Cassandra集群信息 -# doc_view -# cluster_nodes_pro = ['10.20.2.43'] # Cassandra节点的IP地址 -# port_pro = 9042 # Cassandra使用的端口 -# username_pro = 'cbase' # Cassandra用户名 -# password_pro = 'antducbaseadmin@2022' # Cassandra密码 -# keyspace_pro = 'yuqing_twcs' # Cassandra keyspace_pro - -# CBase Hot -cluster_nodes_test = ['10.20.2.22'] # Cassandra节点的IP地址 -port_test = 9042 # Cassandra使用的端口 -username_test = 'cbase' # Cassandra用户名 -password_test = 'antducbaseadmin@2022' # Cassandra密码 -keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro - -# CBase Cold -cluster_nodes_pro = ['10.20.4.152'] # Cassandra节点的IP地址 -port_pro = 9044 # Cassandra使用的端口 -username_pro = 'cbase' # Cassandra用户名 -password_pro = 'antducbaseadmin@2022' # Cassandra密码 -keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro - -# cluster_nodes_test = ['10.20.1.108'] # Cassandra节点的IP地址 -# port_test = 9042 # Cassandra使用的端口 -# username_test = 'cassandra' # Cassandra用户名 -# password_test = 'cassandra' # Cassandra密码 -# keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro - -# cluster_nodes_pro = ['10.20.1.119'] # Cassandra节点的IP地址 -# port_pro = 9044 # Cassandra使用的端口 -# username_pro = 'cbase' # Cassandra用户名 -# password_pro = 'antducbaseadmin@2022' # Cassandra密码 -# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro - - -# data_table_pro = "doc_view" -# data_table_test = data_table_pro + "_test" - -values = [] # 多个ID值 - -data_table_pro = "wemedia_1" -data_table_test = "wemedia_test" - -# 定义主键字段及其对应的多ID值 -keys = ["wmid"] - -# 比较全部字段 -fields_to_compare = [] -# fields_to_compare = [ -# "docid", -# "bitset", -# "crawltime", -# "createat", -# "domain", -# "fanslevel", -# "nickname", -# "officialsitetypes", -# "platform", -# "tagemotion", -# "taglocation", -# "tagsimilar", -# "userid", -# "username", -# ] # 指定要比较的字段 - -exclude_fields = [] # 需要排除的字段 -# exclude_fields = ['mcrelated','ocrtexts',''] - -# 定义存储字段差异数量的字典 -field_diff_count = {} - -# 输出文件 -output_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/output.txt" -input_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/input.txt" - -# 清空文件内容 -open(output_file, "w").close() - -# 单表 -# with open(input_file, "r") as file: -# values = [line.strip() for line in file if line.strip()] # 去除空行 - -# twcs -with open(input_file, "r") as file: - values = [item.replace('"', '') for line in file for item in line.strip().split(",") if item] - -# 创建身份验证提供程序 -auth_provider = PlainTextAuthProvider(username=username_pro, password=password_pro) -# 连接到Cassandra集群 -cluster = Cluster(cluster_nodes_pro, port=port_pro, auth_provider=auth_provider) -session = cluster.connect(keyspace_pro) # 连接到指定的keyspace - -# 创建身份验证提供程序 -auth_provider1 = PlainTextAuthProvider(username=username_test, password=password_test) -# 连接到Cassandra集群 -cluster1 = Cluster(cluster_nodes_test, port=port_test, auth_provider=auth_provider1) -session1 = cluster1.connect(keyspace_test) # 连接到指定的keyspace -# 构建IN查询语句 -query_conditions = f"""{keys[0]} IN ({', '.join([f"'{value}'" for value in values])})""" - -# 如果 fields_to_compare 不为空,使用其中的字段,否则使用 * -fields_str = ", ".join(fields_to_compare) if fields_to_compare else "*" - -query_sql1 = f"SELECT {fields_str} FROM {data_table_pro} WHERE {query_conditions};" -query_sql2 = f"SELECT {fields_str} FROM {data_table_test} WHERE {query_conditions};" - -# 执行查询 -result_doc_data = session.execute(query_sql1) -result_doc_data_test = session1.execute(query_sql2) - -# 检查查询结果是否为空,并转换查询结果为列表 -list_doc_data = list(result_doc_data) if result_doc_data else [] -list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else [] - -# list_doc_data = list(result_doc_data) if result_doc_data else [] -# list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else [] - -with open(output_file, "a") as f: - f.write(f"查询 {data_table_pro} 内容和 {data_table_test} 内容:\n") - for item1, item2 in zip(list_doc_data, list_doc_data_test): - f.write(f"{item1}\n{item2}\n\n") - -# with open(output_file, "a") as f: -# f.write(f"查询 {data_table_pro} 内容:\n{list_doc_data}\n") -# f.write(f"查询 {data_table_test} 内容:\n{list_doc_data_test}\n") - -if not list_doc_data: - with open(output_file, "a") as f: - f.write(f"查询 {data_table_pro} 的结果为空。") - print(f"查询 {data_table_pro} 的结果为空。") -if not list_doc_data_test: - with open(output_file, "a") as f: - f.write(f"查询 {data_table_test} 的结果为空。") - print(f"查询 {data_table_test} 的结果为空。") - -# 创建一个列表来存储详细比较结果 -differences = [] - - -# 进行详细比较 -def compare_data(fields_to_compare=None, exclude_fields=None): - exclude_fields = exclude_fields or [] # 如果未指定排除字段,默认为空列表 - - for value in values: - # 查找原表和测试表中该ID的相关数据 - rows_data = [row for row in list_doc_data if getattr(row, keys[0]) == value] - rows_test = [row for row in list_doc_data_test if getattr(row, keys[0]) == value] - - for row_data in rows_data: - # 在 doc_data_test 中查找相同主键的行 - row_test = next( - (row for row in rows_test if all(getattr(row, key) == getattr(row_data, key) for key in keys)), - None) - - if row_test: - # 如果在 doc_data_test 中找到相同的主键,则逐列比较 - columns = fields_to_compare if fields_to_compare else row_data._fields - columns = [col for col in columns if col not in exclude_fields] # 过滤排除字段 - - for column in columns: - value_data = getattr(row_data, column) - value_test = getattr(row_test, column) - if value_data != value_test: - differences.append({ - '主键': {key: getattr(row_data, key) for key in keys}, - '字段': column, - '生产表': f"\n{value_data}\n", - '测试表': f"\n{value_test}\n" - }) - - # 统计字段差异次数 - if column in field_diff_count: - field_diff_count[column] += 1 - else: - field_diff_count[column] = 1 - - else: - # 如果在 doc_data_test 中未找到相同的行 - differences.append({ - '主键': {key: getattr(row_data, key) for key in keys}, - '消息': f'在 {data_table_test} 中未找到该行' - }) - - # 比较 doc_data_test 中的行是否在 doc_data 中存在 - for row_test in rows_test: - row_data = next( - (row for row in rows_data if all(getattr(row, key) == getattr(row_test, key) for key in keys)), None) - if not row_data: - differences.append({ - '主键': {key: getattr(row_test, key) for key in keys}, - '消息': f'在 {data_table_pro} 中未找到该行' - }) - - -compare_data(fields_to_compare, exclude_fields) - -# 使用集合来保存唯一的 topicid -id_set = set() -field_set = set() - -grouped_id_dict = {} - -# 输出指定字段的差异 -with open(output_file, "a") as f: - if differences: - f.write("\n发现指定字段的差异:\n") - for diff in differences: - # 逐行打印每个差异 - f.write(f"主键: {diff['主键']}\n") - f.write(f"字段: {diff.get('字段', 'N/A')}\n") - f.write(f"生产表: {diff.get('生产表', 'N/A')}\n") - f.write(f"测试表: {diff.get('测试表', 'N/A')}\n") - f.write("-" * 50) # 分隔符,便于查看 - f.write("\n") - - # 将差异ID按字段名分组 - field = diff.get('字段', '未分组') - if field not in grouped_id_dict: - grouped_id_dict[field] = set() - for key in keys: - id = diff['主键'][key] - id_set.add('"' + id + '",') - grouped_id_dict[field].add('"' + id + '",') - - # 输出分组后的差异ID - if grouped_id_dict: - f.write("\n差异ID按字段分组如下:") - for field, ids in grouped_id_dict.items(): - field_set.add('"' + field + '",') - f.write(f"字段: {field}\n") - f.write("差异ID:") - for id in ids: - f.write(id) - f.write("\n") - f.write("-" * 50) # 分隔符,便于查看 - f.write("\n") - else: - f.write("\n指定字段未发现差异。\n") - - f.write("\n") - - # 只有在 field_set 不为空时才打印 - if field_set: - f.write("\n存在差异的 字段 为:\n") - # 打印所有唯一的 field - for field in field_set: - f.write(field + "\n") - - # 只有在 id_set 不为空时才打印 - if id_set: - f.write("\n存在差异的 ID 为:\n") - # 打印所有唯一的 topicid - for topicid in id_set: - f.write(topicid + "\n") - f.write("\n") - - # 计算存在差异的 ID - different_ids = {id.strip('"').strip(',').strip('"') for id in id_set} - - # 计算不存在差异的 ID(即在 values 中但不在 different_ids 中) - non_different_ids = set(values) - different_ids - - # 只有在 non_different_ids 非空时才打印 - if non_different_ids: - f.write("\n不存在差异的 ID 为:\n") - for topicid in non_different_ids: - f.write(f'"{topicid}",\n') - f.write("\n") - - f.write("总计key " + len(values).__str__() + "个") - # 统计每个字段的差异数量 - if field_diff_count: - f.write("\n字段差异统计如下:\n") - for field, count in field_diff_count.items(): - f.write(f"字段 '{field}' 发现 {count} 处差异\n") - -# 关闭连接 -cluster.shutdown()