Files
BigDataTool/demo/twcsQuery.py
2025-07-31 18:05:10 +08:00

284 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine.columns import Boolean
from cassandra.policies import ColDesc
# # 配置Cassandra集群信息
# doc_view
# cluster_nodes_pro = ['10.20.2.43'] # Cassandra节点的IP地址
# port_pro = 9042 # Cassandra使用的端口
# username_pro = 'cbase' # Cassandra用户名
# password_pro = 'antducbaseadmin@2022' # Cassandra密码
# keyspace_pro = 'yuqing_twcs' # Cassandra keyspace_pro
# CBase Hot
cluster_nodes_test = ['10.20.2.22'] # Cassandra节点的IP地址
port_test = 9042 # Cassandra使用的端口
username_test = 'cbase' # Cassandra用户名
password_test = 'antducbaseadmin@2022' # Cassandra密码
keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro
# CBase Cold
cluster_nodes_pro = ['10.20.4.152'] # Cassandra节点的IP地址
port_pro = 9044 # Cassandra使用的端口
username_pro = 'cbase' # Cassandra用户名
password_pro = 'antducbaseadmin@2022' # Cassandra密码
keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
# cluster_nodes_test = ['10.20.1.108'] # Cassandra节点的IP地址
# port_test = 9042 # Cassandra使用的端口
# username_test = 'cassandra' # Cassandra用户名
# password_test = 'cassandra' # Cassandra密码
# keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro
# cluster_nodes_pro = ['10.20.1.119'] # Cassandra节点的IP地址
# port_pro = 9044 # Cassandra使用的端口
# username_pro = 'cbase' # Cassandra用户名
# password_pro = 'antducbaseadmin@2022' # Cassandra密码
# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
# data_table_pro = "doc_view"
# data_table_test = data_table_pro + "_test"
values = [] # 多个ID值
data_table_pro = "wemedia_1"
data_table_test = "wemedia_test"
# 定义主键字段及其对应的多ID值
keys = ["wmid"]
# 比较全部字段
fields_to_compare = []
# fields_to_compare = [
# "docid",
# "bitset",
# "crawltime",
# "createat",
# "domain",
# "fanslevel",
# "nickname",
# "officialsitetypes",
# "platform",
# "tagemotion",
# "taglocation",
# "tagsimilar",
# "userid",
# "username",
# ] # 指定要比较的字段
exclude_fields = [] # 需要排除的字段
# exclude_fields = ['mcrelated','ocrtexts','']
# 定义存储字段差异数量的字典
field_diff_count = {}
# 输出文件
output_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/output.txt"
input_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/input.txt"
# 清空文件内容
open(output_file, "w").close()
# 单表
# with open(input_file, "r") as file:
# values = [line.strip() for line in file if line.strip()] # 去除空行
# twcs
with open(input_file, "r") as file:
values = [item.replace('"', '') for line in file for item in line.strip().split(",") if item]
# 创建身份验证提供程序
auth_provider = PlainTextAuthProvider(username=username_pro, password=password_pro)
# 连接到Cassandra集群
cluster = Cluster(cluster_nodes_pro, port=port_pro, auth_provider=auth_provider)
session = cluster.connect(keyspace_pro) # 连接到指定的keyspace
# 创建身份验证提供程序
auth_provider1 = PlainTextAuthProvider(username=username_test, password=password_test)
# 连接到Cassandra集群
cluster1 = Cluster(cluster_nodes_test, port=port_test, auth_provider=auth_provider1)
session1 = cluster1.connect(keyspace_test) # 连接到指定的keyspace
# 构建IN查询语句
query_conditions = f"""{keys[0]} IN ({', '.join([f"'{value}'" for value in values])})"""
# 如果 fields_to_compare 不为空,使用其中的字段,否则使用 *
fields_str = ", ".join(fields_to_compare) if fields_to_compare else "*"
query_sql1 = f"SELECT {fields_str} FROM {data_table_pro} WHERE {query_conditions};"
query_sql2 = f"SELECT {fields_str} FROM {data_table_test} WHERE {query_conditions};"
# 执行查询
result_doc_data = session.execute(query_sql1)
result_doc_data_test = session1.execute(query_sql2)
# 检查查询结果是否为空,并转换查询结果为列表
list_doc_data = list(result_doc_data) if result_doc_data else []
list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else []
# list_doc_data = list(result_doc_data) if result_doc_data else []
# list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else []
with open(output_file, "a") as f:
f.write(f"查询 {data_table_pro} 内容和 {data_table_test} 内容:\n")
for item1, item2 in zip(list_doc_data, list_doc_data_test):
f.write(f"{item1}\n{item2}\n\n")
# with open(output_file, "a") as f:
# f.write(f"查询 {data_table_pro} 内容:\n{list_doc_data}\n")
# f.write(f"查询 {data_table_test} 内容:\n{list_doc_data_test}\n")
if not list_doc_data:
with open(output_file, "a") as f:
f.write(f"查询 {data_table_pro} 的结果为空。")
print(f"查询 {data_table_pro} 的结果为空。")
if not list_doc_data_test:
with open(output_file, "a") as f:
f.write(f"查询 {data_table_test} 的结果为空。")
print(f"查询 {data_table_test} 的结果为空。")
# 创建一个列表来存储详细比较结果
differences = []
# 进行详细比较
def compare_data(fields_to_compare=None, exclude_fields=None):
exclude_fields = exclude_fields or [] # 如果未指定排除字段,默认为空列表
for value in values:
# 查找原表和测试表中该ID的相关数据
rows_data = [row for row in list_doc_data if getattr(row, keys[0]) == value]
rows_test = [row for row in list_doc_data_test if getattr(row, keys[0]) == value]
for row_data in rows_data:
# 在 doc_data_test 中查找相同主键的行
row_test = next(
(row for row in rows_test if all(getattr(row, key) == getattr(row_data, key) for key in keys)),
None)
if row_test:
# 如果在 doc_data_test 中找到相同的主键,则逐列比较
columns = fields_to_compare if fields_to_compare else row_data._fields
columns = [col for col in columns if col not in exclude_fields] # 过滤排除字段
for column in columns:
value_data = getattr(row_data, column)
value_test = getattr(row_test, column)
if value_data != value_test:
differences.append({
'主键': {key: getattr(row_data, key) for key in keys},
'字段': column,
'生产表': f"\n{value_data}\n",
'测试表': f"\n{value_test}\n"
})
# 统计字段差异次数
if column in field_diff_count:
field_diff_count[column] += 1
else:
field_diff_count[column] = 1
else:
# 如果在 doc_data_test 中未找到相同的行
differences.append({
'主键': {key: getattr(row_data, key) for key in keys},
'消息': f'{data_table_test} 中未找到该行'
})
# 比较 doc_data_test 中的行是否在 doc_data 中存在
for row_test in rows_test:
row_data = next(
(row for row in rows_data if all(getattr(row, key) == getattr(row_test, key) for key in keys)), None)
if not row_data:
differences.append({
'主键': {key: getattr(row_test, key) for key in keys},
'消息': f'{data_table_pro} 中未找到该行'
})
compare_data(fields_to_compare, exclude_fields)
# 使用集合来保存唯一的 topicid
id_set = set()
field_set = set()
grouped_id_dict = {}
# 输出指定字段的差异
with open(output_file, "a") as f:
if differences:
f.write("\n发现指定字段的差异:\n")
for diff in differences:
# 逐行打印每个差异
f.write(f"主键: {diff['主键']}\n")
f.write(f"字段: {diff.get('字段', 'N/A')}\n")
f.write(f"生产表: {diff.get('生产表', 'N/A')}\n")
f.write(f"测试表: {diff.get('测试表', 'N/A')}\n")
f.write("-" * 50) # 分隔符,便于查看
f.write("\n")
# 将差异ID按字段名分组
field = diff.get('字段', '未分组')
if field not in grouped_id_dict:
grouped_id_dict[field] = set()
for key in keys:
id = diff['主键'][key]
id_set.add('"' + id + '",')
grouped_id_dict[field].add('"' + id + '",')
# 输出分组后的差异ID
if grouped_id_dict:
f.write("\n差异ID按字段分组如下:")
for field, ids in grouped_id_dict.items():
field_set.add('"' + field + '",')
f.write(f"字段: {field}\n")
f.write("差异ID:")
for id in ids:
f.write(id)
f.write("\n")
f.write("-" * 50) # 分隔符,便于查看
f.write("\n")
else:
f.write("\n指定字段未发现差异。\n")
f.write("\n")
# 只有在 field_set 不为空时才打印
if field_set:
f.write("\n存在差异的 字段 为:\n")
# 打印所有唯一的 field
for field in field_set:
f.write(field + "\n")
# 只有在 id_set 不为空时才打印
if id_set:
f.write("\n存在差异的 ID 为:\n")
# 打印所有唯一的 topicid
for topicid in id_set:
f.write(topicid + "\n")
f.write("\n")
# 计算存在差异的 ID
different_ids = {id.strip('"').strip(',').strip('"') for id in id_set}
# 计算不存在差异的 ID即在 values 中但不在 different_ids 中)
non_different_ids = set(values) - different_ids
# 只有在 non_different_ids 非空时才打印
if non_different_ids:
f.write("\n不存在差异的 ID 为:\n")
for topicid in non_different_ids:
f.write(f'"{topicid}",\n')
f.write("\n")
f.write("总计key " + len(values).__str__() + "")
# 统计每个字段的差异数量
if field_diff_count:
f.write("\n字段差异统计如下:\n")
for field, count in field_diff_count.items():
f.write(f"字段 '{field}' 发现 {count} 处差异\n")
# 关闭连接
cluster.shutdown()