删除以前版本文件

This commit is contained in:
2025-08-02 01:23:53 +08:00
parent b7a05e56d0
commit d55dd75ff8
2 changed files with 0 additions and 559 deletions

View File

@@ -1,276 +0,0 @@
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine.columns import Boolean
from cassandra.policies import ColDesc
# CBase Hot
cluster_nodes_pro = ['10.20.2.22'] # Cassandra节点的IP地址
port_pro = 9042 # Cassandra使用的端口
username_pro = 'cbase' # Cassandra用户名
password_pro = 'antducbaseadmin@2022' # Cassandra密码
keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
cluster_nodes_test = ['10.20.2.22'] # Cassandra节点的IP地址
port_test = 9042 # Cassandra使用的端口
username_test = 'cbase' # Cassandra用户名
password_test = 'antducbaseadmin@2022' # Cassandra密码
keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro
# CBase Cold
# cluster_nodes_pro = ['10.20.1.108'] # Cassandra节点的IP地址
# port_pro = 9042 # Cassandra使用的端口
# username_pro = 'cassandra' # Cassandra用户名
# password_pro = 'cassandra' # Cassandra密码
# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
# cluster_nodes_test = ['10.20.1.108'] # Cassandra节点的IP地址
# port_test = 9042 # Cassandra使用的端口
# username_test = 'cassandra' # Cassandra用户名
# password_test = 'cassandra' # Cassandra密码
# keyspace_test = 'yuqing_skinny' # Cassandra
# cluster_nodes_pro = ['10.20.1.119'] # Cassandra节点的IP地址
# port_pro = 9044 # Cassandra使用的端口
# username_pro = 'cbase' # Cassandra用户名
# password_pro = 'antducbaseadmin@2022' # Cassandra密码
# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
data_table_pro = "document"
data_table_test = data_table_pro + "_test"
values = [] # 多个ID值
# data_table_pro = "doc_view_8"
# data_table_test = "doc_view_test"
# 定义主键字段及其对应的多ID值
keys = ["docid"]
# 比较全部字段
fields_to_compare = []
# fields_to_compare = [
# "statusid",
# "taglocation",
# "tagemotion",
# "tagindustry",
# "tagdomain",
# "tagtopic",
# "tagsimilar",
# "tagother",
# "hasprivacy",
# "istaged",
# "createat",
# ] # 指定要比较的字段
exclude_fields = [] # 需要排除的字段
# exclude_fields = ['mcrelated','ocrtexts','']
# 小写转换
fields_to_compare = [field.lower() for field in fields_to_compare]
exclude_fields = [field.lower() for field in exclude_fields]
keys = [field.lower() for field in keys]
# 定义存储字段差异数量的字典
field_diff_count = {}
# 输出文件
output_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/output.txt"
input_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/input.txt"
# 清空文件内容
open(output_file, "w").close()
with open(input_file, "r") as file:
values = [item.replace('"', '') for line in file for item in line.strip().split(",") if item]
# 创建身份验证提供程序
auth_provider = PlainTextAuthProvider(username=username_pro, password=password_pro)
# 连接到Cassandra集群
cluster = Cluster(cluster_nodes_pro, port=port_pro, auth_provider=auth_provider)
session = cluster.connect(keyspace_pro) # 连接到指定的keyspace
# 创建身份验证提供程序
auth_provider1 = PlainTextAuthProvider(username=username_test, password=password_test)
# 连接到Cassandra集群
cluster1 = Cluster(cluster_nodes_test, port=port_test, auth_provider=auth_provider1)
session1 = cluster1.connect(keyspace_test) # 连接到指定的keyspace
# 构建IN查询语句
query_conditions = f"""{keys[0]} IN ({', '.join([f"'{value}'" for value in values])})"""
# 如果 fields_to_compare 不为空,使用其中的字段,否则使用 *
fields_str = ", ".join(fields_to_compare) if fields_to_compare else "*"
query_sql1 = f"SELECT {fields_str} FROM {data_table_pro} WHERE {query_conditions};"
query_sql2 = f"SELECT {fields_str} FROM {data_table_test} WHERE {query_conditions};"
# 执行查询
result_doc_data = session.execute(query_sql1)
result_doc_data_test = session1.execute(query_sql2)
# 检查查询结果是否为空,并转换查询结果为列表
list_doc_data = list(result_doc_data) if result_doc_data else []
list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else []
# list_doc_data = list(result_doc_data) if result_doc_data else []
# list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else []
with open(output_file, "a") as f:
f.write(f"查询 {data_table_pro} 内容和 {data_table_test} 内容:\n")
for item1, item2 in zip(list_doc_data, list_doc_data_test):
f.write(f"{item1}\n{item2}\n\n")
# with open(output_file, "a") as f:
# f.write(f"查询 {data_table_pro} 内容:\n{list_doc_data}\n")
# f.write(f"查询 {data_table_test} 内容:\n{list_doc_data_test}\n")
if not list_doc_data:
with open(output_file, "a") as f:
f.write(f"查询 {data_table_pro} 的结果为空。")
print(f"查询 {data_table_pro} 的结果为空。")
if not list_doc_data_test:
with open(output_file, "a") as f:
f.write(f"查询 {data_table_test} 的结果为空。")
print(f"查询 {data_table_test} 的结果为空。")
# 创建一个列表来存储详细比较结果
differences = []
# 进行详细比较
def compare_data(fields_to_compare=None, exclude_fields=None):
exclude_fields = exclude_fields or [] # 如果未指定排除字段,默认为空列表
for value in values:
# 查找原表和测试表中该ID的相关数据
rows_data = [row for row in list_doc_data if getattr(row, keys[0]) == value]
rows_test = [row for row in list_doc_data_test if getattr(row, keys[0]) == value]
for row_data in rows_data:
# 在 doc_data_test 中查找相同主键的行
row_test = next(
(row for row in rows_test if all(getattr(row, key) == getattr(row_data, key) for key in keys)),
None)
if row_test:
# 如果在 doc_data_test 中找到相同的主键,则逐列比较
columns = fields_to_compare if fields_to_compare else row_data._fields
columns = [col for col in columns if col not in exclude_fields] # 过滤排除字段
for column in columns:
value_data = getattr(row_data, column)
value_test = getattr(row_test, column)
if value_data != value_test:
differences.append({
'主键': {key: getattr(row_data, key) for key in keys},
'字段': column,
'生产表': f"\n{value_data}\n",
'测试表': f"\n{value_test}\n"
})
# 统计字段差异次数
if column in field_diff_count:
field_diff_count[column] += 1
else:
field_diff_count[column] = 1
else:
# 如果在 doc_data_test 中未找到相同的行
differences.append({
'主键': {key: getattr(row_data, key) for key in keys},
'消息': f'{data_table_test} 中未找到该行'
})
# 比较 doc_data_test 中的行是否在 doc_data 中存在
for row_test in rows_test:
row_data = next(
(row for row in rows_data if all(getattr(row, key) == getattr(row_test, key) for key in keys)), None)
if not row_data:
differences.append({
'主键': {key: getattr(row_test, key) for key in keys},
'消息': f'{data_table_pro} 中未找到该行'
})
compare_data(fields_to_compare, exclude_fields)
# 使用集合来保存唯一的 topicid
id_set = set()
field_set = set()
grouped_id_dict = {}
# 输出指定字段的差异
with open(output_file, "a") as f:
if differences:
f.write("\n发现指定字段的差异:\n")
for diff in differences:
# 逐行打印每个差异
f.write(f"主键: {diff['主键']}\n")
f.write(f"字段: {diff.get('字段', 'N/A')}\n")
f.write(f"生产表: {diff.get('生产表', 'N/A')}\n")
f.write(f"测试表: {diff.get('测试表', 'N/A')}\n")
f.write("-" * 50) # 分隔符,便于查看
f.write("\n")
# 将差异ID按字段名分组
field = diff.get('字段', '未分组')
if field not in grouped_id_dict:
grouped_id_dict[field] = set()
for key in keys:
id = diff['主键'][key]
id_set.add('"' + id + '",')
grouped_id_dict[field].add('"' + id + '",')
# 输出分组后的差异ID
if grouped_id_dict:
f.write("\n差异ID按字段分组如下:")
for field, ids in grouped_id_dict.items():
field_set.add('"' + field + '",')
f.write(f"字段: {field}\n")
f.write("差异ID:")
for id in ids:
f.write(id)
f.write("\n")
f.write("-" * 50) # 分隔符,便于查看
f.write("\n")
else:
f.write("\n指定字段未发现差异。\n")
f.write("\n")
# 只有在 field_set 不为空时才打印
if field_set:
f.write("\n存在差异的 字段 为:\n")
# 打印所有唯一的 field
for field in field_set:
f.write(field + "\n")
# 只有在 id_set 不为空时才打印
if id_set:
f.write("\n存在差异的 ID 为:\n")
# 打印所有唯一的 topicid
for topicid in id_set:
f.write(topicid + "\n")
f.write("\n")
# 计算存在差异的 ID
different_ids = {id.strip('"').strip(',').strip('"') for id in id_set}
# 计算不存在差异的 ID即在 values 中但不在 different_ids 中)
non_different_ids = set(values) - different_ids
# 只有在 non_different_ids 非空时才打印
if non_different_ids:
f.write("\n不存在差异的 ID 为:\n")
for topicid in non_different_ids:
f.write(f'"{topicid}",\n')
f.write("\n")
f.write("总计key " + len(values).__str__() + "")
# 统计每个字段的差异数量
if field_diff_count:
f.write("\n字段差异统计如下:\n")
for field, count in field_diff_count.items():
f.write(f"字段 '{field}' 发现 {count} 处差异\n")
# 关闭连接
cluster.shutdown()

View File

@@ -1,283 +0,0 @@
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.cqlengine.columns import Boolean
from cassandra.policies import ColDesc
# # 配置Cassandra集群信息
# doc_view
# cluster_nodes_pro = ['10.20.2.43'] # Cassandra节点的IP地址
# port_pro = 9042 # Cassandra使用的端口
# username_pro = 'cbase' # Cassandra用户名
# password_pro = 'antducbaseadmin@2022' # Cassandra密码
# keyspace_pro = 'yuqing_twcs' # Cassandra keyspace_pro
# CBase Hot
cluster_nodes_test = ['10.20.2.22'] # Cassandra节点的IP地址
port_test = 9042 # Cassandra使用的端口
username_test = 'cbase' # Cassandra用户名
password_test = 'antducbaseadmin@2022' # Cassandra密码
keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro
# CBase Cold
cluster_nodes_pro = ['10.20.4.152'] # Cassandra节点的IP地址
port_pro = 9044 # Cassandra使用的端口
username_pro = 'cbase' # Cassandra用户名
password_pro = 'antducbaseadmin@2022' # Cassandra密码
keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
# cluster_nodes_test = ['10.20.1.108'] # Cassandra节点的IP地址
# port_test = 9042 # Cassandra使用的端口
# username_test = 'cassandra' # Cassandra用户名
# password_test = 'cassandra' # Cassandra密码
# keyspace_test = 'yuqing_skinny' # Cassandra keyspace_pro
# cluster_nodes_pro = ['10.20.1.119'] # Cassandra节点的IP地址
# port_pro = 9044 # Cassandra使用的端口
# username_pro = 'cbase' # Cassandra用户名
# password_pro = 'antducbaseadmin@2022' # Cassandra密码
# keyspace_pro = 'yuqing_skinny' # Cassandra keyspace_pro
# data_table_pro = "doc_view"
# data_table_test = data_table_pro + "_test"
values = [] # 多个ID值
data_table_pro = "wemedia_1"
data_table_test = "wemedia_test"
# 定义主键字段及其对应的多ID值
keys = ["wmid"]
# 比较全部字段
fields_to_compare = []
# fields_to_compare = [
# "docid",
# "bitset",
# "crawltime",
# "createat",
# "domain",
# "fanslevel",
# "nickname",
# "officialsitetypes",
# "platform",
# "tagemotion",
# "taglocation",
# "tagsimilar",
# "userid",
# "username",
# ] # 指定要比较的字段
exclude_fields = [] # 需要排除的字段
# exclude_fields = ['mcrelated','ocrtexts','']
# 定义存储字段差异数量的字典
field_diff_count = {}
# 输出文件
output_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/output.txt"
input_file = "/Users/yovinchen/project/python/CassandraQueryComparator/QueryCassandra/input.txt"
# 清空文件内容
open(output_file, "w").close()
# 单表
# with open(input_file, "r") as file:
# values = [line.strip() for line in file if line.strip()] # 去除空行
# twcs
with open(input_file, "r") as file:
values = [item.replace('"', '') for line in file for item in line.strip().split(",") if item]
# 创建身份验证提供程序
auth_provider = PlainTextAuthProvider(username=username_pro, password=password_pro)
# 连接到Cassandra集群
cluster = Cluster(cluster_nodes_pro, port=port_pro, auth_provider=auth_provider)
session = cluster.connect(keyspace_pro) # 连接到指定的keyspace
# 创建身份验证提供程序
auth_provider1 = PlainTextAuthProvider(username=username_test, password=password_test)
# 连接到Cassandra集群
cluster1 = Cluster(cluster_nodes_test, port=port_test, auth_provider=auth_provider1)
session1 = cluster1.connect(keyspace_test) # 连接到指定的keyspace
# 构建IN查询语句
query_conditions = f"""{keys[0]} IN ({', '.join([f"'{value}'" for value in values])})"""
# 如果 fields_to_compare 不为空,使用其中的字段,否则使用 *
fields_str = ", ".join(fields_to_compare) if fields_to_compare else "*"
query_sql1 = f"SELECT {fields_str} FROM {data_table_pro} WHERE {query_conditions};"
query_sql2 = f"SELECT {fields_str} FROM {data_table_test} WHERE {query_conditions};"
# 执行查询
result_doc_data = session.execute(query_sql1)
result_doc_data_test = session1.execute(query_sql2)
# 检查查询结果是否为空,并转换查询结果为列表
list_doc_data = list(result_doc_data) if result_doc_data else []
list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else []
# list_doc_data = list(result_doc_data) if result_doc_data else []
# list_doc_data_test = list(result_doc_data_test) if result_doc_data_test else []
with open(output_file, "a") as f:
f.write(f"查询 {data_table_pro} 内容和 {data_table_test} 内容:\n")
for item1, item2 in zip(list_doc_data, list_doc_data_test):
f.write(f"{item1}\n{item2}\n\n")
# with open(output_file, "a") as f:
# f.write(f"查询 {data_table_pro} 内容:\n{list_doc_data}\n")
# f.write(f"查询 {data_table_test} 内容:\n{list_doc_data_test}\n")
if not list_doc_data:
with open(output_file, "a") as f:
f.write(f"查询 {data_table_pro} 的结果为空。")
print(f"查询 {data_table_pro} 的结果为空。")
if not list_doc_data_test:
with open(output_file, "a") as f:
f.write(f"查询 {data_table_test} 的结果为空。")
print(f"查询 {data_table_test} 的结果为空。")
# 创建一个列表来存储详细比较结果
differences = []
# 进行详细比较
def compare_data(fields_to_compare=None, exclude_fields=None):
exclude_fields = exclude_fields or [] # 如果未指定排除字段,默认为空列表
for value in values:
# 查找原表和测试表中该ID的相关数据
rows_data = [row for row in list_doc_data if getattr(row, keys[0]) == value]
rows_test = [row for row in list_doc_data_test if getattr(row, keys[0]) == value]
for row_data in rows_data:
# 在 doc_data_test 中查找相同主键的行
row_test = next(
(row for row in rows_test if all(getattr(row, key) == getattr(row_data, key) for key in keys)),
None)
if row_test:
# 如果在 doc_data_test 中找到相同的主键,则逐列比较
columns = fields_to_compare if fields_to_compare else row_data._fields
columns = [col for col in columns if col not in exclude_fields] # 过滤排除字段
for column in columns:
value_data = getattr(row_data, column)
value_test = getattr(row_test, column)
if value_data != value_test:
differences.append({
'主键': {key: getattr(row_data, key) for key in keys},
'字段': column,
'生产表': f"\n{value_data}\n",
'测试表': f"\n{value_test}\n"
})
# 统计字段差异次数
if column in field_diff_count:
field_diff_count[column] += 1
else:
field_diff_count[column] = 1
else:
# 如果在 doc_data_test 中未找到相同的行
differences.append({
'主键': {key: getattr(row_data, key) for key in keys},
'消息': f'{data_table_test} 中未找到该行'
})
# 比较 doc_data_test 中的行是否在 doc_data 中存在
for row_test in rows_test:
row_data = next(
(row for row in rows_data if all(getattr(row, key) == getattr(row_test, key) for key in keys)), None)
if not row_data:
differences.append({
'主键': {key: getattr(row_test, key) for key in keys},
'消息': f'{data_table_pro} 中未找到该行'
})
compare_data(fields_to_compare, exclude_fields)
# 使用集合来保存唯一的 topicid
id_set = set()
field_set = set()
grouped_id_dict = {}
# 输出指定字段的差异
with open(output_file, "a") as f:
if differences:
f.write("\n发现指定字段的差异:\n")
for diff in differences:
# 逐行打印每个差异
f.write(f"主键: {diff['主键']}\n")
f.write(f"字段: {diff.get('字段', 'N/A')}\n")
f.write(f"生产表: {diff.get('生产表', 'N/A')}\n")
f.write(f"测试表: {diff.get('测试表', 'N/A')}\n")
f.write("-" * 50) # 分隔符,便于查看
f.write("\n")
# 将差异ID按字段名分组
field = diff.get('字段', '未分组')
if field not in grouped_id_dict:
grouped_id_dict[field] = set()
for key in keys:
id = diff['主键'][key]
id_set.add('"' + id + '",')
grouped_id_dict[field].add('"' + id + '",')
# 输出分组后的差异ID
if grouped_id_dict:
f.write("\n差异ID按字段分组如下:")
for field, ids in grouped_id_dict.items():
field_set.add('"' + field + '",')
f.write(f"字段: {field}\n")
f.write("差异ID:")
for id in ids:
f.write(id)
f.write("\n")
f.write("-" * 50) # 分隔符,便于查看
f.write("\n")
else:
f.write("\n指定字段未发现差异。\n")
f.write("\n")
# 只有在 field_set 不为空时才打印
if field_set:
f.write("\n存在差异的 字段 为:\n")
# 打印所有唯一的 field
for field in field_set:
f.write(field + "\n")
# 只有在 id_set 不为空时才打印
if id_set:
f.write("\n存在差异的 ID 为:\n")
# 打印所有唯一的 topicid
for topicid in id_set:
f.write(topicid + "\n")
f.write("\n")
# 计算存在差异的 ID
different_ids = {id.strip('"').strip(',').strip('"') for id in id_set}
# 计算不存在差异的 ID即在 values 中但不在 different_ids 中)
non_different_ids = set(values) - different_ids
# 只有在 non_different_ids 非空时才打印
if non_different_ids:
f.write("\n不存在差异的 ID 为:\n")
for topicid in non_different_ids:
f.write(f'"{topicid}",\n')
f.write("\n")
f.write("总计key " + len(values).__str__() + "")
# 统计每个字段的差异数量
if field_diff_count:
f.write("\n字段差异统计如下:\n")
for field, count in field_diff_count.items():
f.write(f"字段 '{field}' 发现 {count} 处差异\n")
# 关闭连接
cluster.shutdown()