docs(predictV3): 添加predictV3算法优化研究文档和前端功能实现

- 完成Phase 11: predictV3算法优化研究文档,涵盖6个优化方向的技术分析
- 实现置信度评估功能,提供历史命中率、得分分布、多维度一致性置信度指标
- 扩展回测指标体系,新增NDCG@K、MRR、命中率分布等排名质量评估指标
- 优化转移概率算法,引入二阶马尔可夫链和多属性联合转移增强预测准确性
- 设计权重训练机制,支持网格搜索和遗传算法进行数据驱动的参数优化
- 集成组合特征挖掘功能,采用关联规则和序列模式发现号码间潜在关联
- 实现完整的前端交互界面,支持预测结果显示、置信度展示和回测验证功能
- 建立性能优化策略,包括预计算缓存、批量计算和降级策略保障响应速度
This commit is contained in:
2026-05-01 23:17:24 +08:00
parent 02b3ff3a22
commit 8b2590c5b5
26 changed files with 5407 additions and 2 deletions
+589
View File
@@ -0,0 +1,589 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
上期正码与当期特码关联规律分析脚本
分析维度:
1. 上期正码平均值与下期特码的差值分布
2. 上期正码范围[min,max]与下期特码的关系
3. 上期正码与下期特码的最短距离分布
4. 上期正码和值尾数与下期特码尾数的关系
5. 上期正码覆盖区间与下期特码所在区间的关系
6. 上期正码波色分布与下期特码波色的关系
7. 上期特码与下期特码的转移关系
"""
import re
from collections import defaultdict
from pathlib import Path
# 波色映射表
COLOR_MAP = {
1: '', 2: '', 3: '', 4: '', 5: '绿', 6: '绿',
7: '', 8: '', 9: '', 10: '', 11: '绿', 12: '',
13: '', 14: '', 15: '', 16: '绿', 17: '绿', 18: '',
19: '', 20: '', 21: '绿', 22: '绿', 23: '', 24: '',
25: '', 26: '', 27: '绿', 28: '绿', 29: '', 30: '',
31: '', 32: '绿', 33: '绿', 34: '', 35: '', 36: '',
37: '', 38: '绿', 39: '绿', 40: '', 41: '', 42: '',
43: '绿', 44: '绿', 45: '', 46: '', 47: '', 48: '',
49: '绿'
}
def get_range(num):
"""获取数字所在的区间"""
if 1 <= num <= 10:
return '小号(1-10)'
elif 11 <= num <= 30:
return '中号(11-30)'
else:
return '大号(31-49)'
def get_tail(num):
"""获取数字的尾数"""
return num % 10
def parse_sql_file(filepath):
"""解析SQL文件,提取历史数据"""
content = Path(filepath).read_text(encoding='utf-8')
# 解析INSERT语句
pattern = r"INSERT INTO `fa_history` VALUES \((\d+), (\d+), (\d+), (\d+), (\d+), (\d+), (\d+), (\d+), '([^']+)'"
matches = re.findall(pattern, content)
history = []
for match in matches:
history.append({
'expect': int(match[0]),
'num1': int(match[1]),
'num2': int(match[2]),
'num3': int(match[3]),
'num4': int(match[4]),
'num5': int(match[5]),
'num6': int(match[6]),
'num7': int(match[7]),
'openTime': match[8]
})
# 按期号排序
history.sort(key=lambda x: x['expect'])
return history
def analyze():
"""主分析函数"""
# 解析数据
history = parse_sql_file(r'C:\Users\91611\Desktop\fa_history.sql')
print("=== 数据概览 ===")
print(f"总期数: {len(history)}")
print(f"期号范围: {history[0]['expect']} - {history[-1]['expect']}")
print()
total_predictions = len(history) - 1
# ============ 维度1: 上期正码平均值与下期特码的差值分布 ============
print("=== 维度1: 上期正码平均值与下期特码的差值分布 ===")
avg_diffs = []
hit_count_avg = 0
prediction_range = 10
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
# 计算上期正码平均值
avg = (current['num1'] + current['num2'] + current['num3'] +
current['num4'] + current['num5'] + current['num6']) / 6
# 与下期特码的差值
diff = next_record['num7'] - avg
avg_diffs.append(round(diff))
# 预测范围:平均值±10
predict_min = max(1, int(avg) - prediction_range)
predict_max = min(49, int(avg) + prediction_range)
if predict_min <= next_record['num7'] <= predict_max:
hit_count_avg += 1
# 统计差值分布
diff_stats = defaultdict(int)
for d in avg_diffs:
diff_stats[d] += 1
print("差值分布(差值=下期特码-上期正码平均值):")
for diff in sorted(diff_stats.keys()):
count = diff_stats[diff]
percent = round(count / total_predictions * 100, 2)
print(f" 差值 {diff}: {count}次 ({percent}%)")
# 差值范围分布
print("\n差值范围分布:")
ranges = [
('[-40,-20]', lambda d: -40 <= d < -20),
('[-20,-10]', lambda d: -20 <= d < -10),
('[-10,0]', lambda d: -10 <= d < 0),
('[0,10]', lambda d: 0 <= d < 10),
('[10,20]', lambda d: 10 <= d < 20),
('[20,40]', lambda d: 20 <= d <= 40)
]
for range_name, condition in ranges:
count = sum(1 for d in avg_diffs if condition(d))
percent = round(count / total_predictions * 100, 2)
print(f" {range_name}: {count}次 ({percent}%)")
print(f"\n基于平均值±{prediction_range}范围的预测命中率: {round(hit_count_avg / total_predictions * 100, 2)}% ({hit_count_avg}/{total_predictions})")
print()
# ============ 维度2: 上期正码范围[min,max]与下期特码的关系 ============
print("=== 维度2: 上期正码范围[min,max]与下期特码的关系 ===")
in_range_count = 0
below_range_count = 0
above_range_count = 0
range_width_stats = defaultdict(lambda: {'在范围内': 0, '低于范围': 0, '高于范围': 0})
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
nums = [current['num1'], current['num2'], current['num3'],
current['num4'], current['num5'], current['num6']]
min_num = min(nums)
max_num = max(nums)
range_width = max_num - min_num
if min_num <= next_record['num7'] <= max_num:
in_range_count += 1
relation = '在范围内'
elif next_record['num7'] < min_num:
below_range_count += 1
relation = '低于范围'
else:
above_range_count += 1
relation = '高于范围'
range_width_stats[range_width][relation] += 1
print("下期特码位置分布:")
print(f" 在上期正码范围内: {in_range_count}次 ({round(in_range_count / total_predictions * 100, 2)}%)")
print(f" 低于上期正码范围: {below_range_count}次 ({round(below_range_count / total_predictions * 100, 2)}%)")
print(f" 高于上期正码范围: {above_range_count}次 ({round(above_range_count / total_predictions * 100, 2)}%)")
print("\n范围宽度与特码位置关系:")
for width in sorted(range_width_stats.keys()):
stats = range_width_stats[width]
width_total = sum(stats.values())
print(f" 范围宽度{width} (共{width_total}期):")
for relation, count in stats.items():
percent = round(count / width_total * 100, 2)
print(f" {relation}: {count}次 ({percent}%)")
avg_range_width = sum(max([h['num1'], h['num2'], h['num3'], h['num4'], h['num5'], h['num6']]) -
min([h['num1'], h['num2'], h['num3'], h['num4'], h['num5'], h['num6']])
for h in history[:-1]) / total_predictions
print(f"\n平均范围宽度: {round(avg_range_width, 2)}")
print()
# ============ 维度3: 上期正码与下期特码的最短距离分布 ============
print("=== 维度3: 上期正码与下期特码的最短距离分布 ===")
min_distances = []
prediction_hit_dist = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
nums = [current['num1'], current['num2'], current['num3'],
current['num4'], current['num5'], current['num6']]
min_dist = min(abs(next_record['num7'] - num) for num in nums)
min_distances.append(min_dist)
# 预测范围:每个正码±3
predicted = set()
for num in nums:
for p in range(max(1, num - 3), min(50, num + 4)):
predicted.add(p)
if next_record['num7'] in predicted:
prediction_hit_dist += 1
dist_stats = defaultdict(int)
for d in min_distances:
dist_stats[d] += 1
print("最短距离分布:")
for dist in sorted(dist_stats.keys()):
count = dist_stats[dist]
percent = round(count / total_predictions * 100, 2)
print(f" 距离 {dist}: {count}次 ({percent}%)")
close_hit = sum(1 for d in min_distances if d <= 5)
very_close_hit = sum(1 for d in min_distances if d <= 3)
print(f"\n最短距离≤5的比例: {round(close_hit / total_predictions * 100, 2)}% ({close_hit}/{total_predictions})")
print(f"最短距离≤3的比例: {round(very_close_hit / total_predictions * 100, 2)}% ({very_close_hit}/{total_predictions})")
print(f"基于正码±3范围预测命中率: {round(prediction_hit_dist / total_predictions * 100, 2)}% ({prediction_hit_dist}/{total_predictions})")
# 计算预测范围大小
nums_sample = [history[0]['num1'], history[0]['num2'], history[0]['num3'],
history[0]['num4'], history[0]['num5'], history[0]['num6']]
predicted_sample = set()
for num in nums_sample:
for p in range(max(1, num - 3), min(50, num + 4)):
predicted_sample.add(p)
print(f"预测范围大小: 约{len(predicted_sample)}个数字")
print()
# ============ 维度4: 上期正码和值尾数与下期特码尾数的关系 ============
print("=== 维度4: 上期正码和值尾数与下期特码尾数的关系 ===")
sum_tail_relations = defaultdict(lambda: defaultdict(int))
tail_same_count = 0
tail_diff1_count = 0
tail_diff2_count = 0
tail_prediction_hit = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
sum_val = current['num1'] + current['num2'] + current['num3'] + \
current['num4'] + current['num5'] + current['num6']
sum_tail = get_tail(sum_val)
next_tail = get_tail(next_record['num7'])
sum_tail_relations[sum_tail][next_tail] += 1
# 计算尾数差异(考虑环形)
tail_diff = abs(sum_tail - next_tail)
if tail_diff > 5:
tail_diff = 10 - tail_diff
if tail_diff == 0:
tail_same_count += 1
elif tail_diff == 1:
tail_diff1_count += 1
elif tail_diff == 2:
tail_diff2_count += 1
# 预测:和值尾数±2范围内的尾数
predict_tails = {sum_tail, (sum_tail + 1) % 10, (sum_tail - 1 + 10) % 10,
(sum_tail + 2) % 10, (sum_tail - 2 + 10) % 10}
if next_tail in predict_tails:
tail_prediction_hit += 1
print("尾数关系分布:")
print(f" 尾数相同: {tail_same_count}次 ({round(tail_same_count / total_predictions * 100, 2)}%)")
print(f" 尾数相差1: {tail_diff1_count}次 ({round(tail_diff1_count / total_predictions * 100, 2)}%)")
print(f" 尾数相差2: {tail_diff2_count}次 ({round(tail_diff2_count / total_predictions * 100, 2)}%)")
print("\n上期和值尾数→下期特码尾数转移矩阵:")
for sum_tail in range(10):
if sum_tail in sum_tail_relations:
stats = sum_tail_relations[sum_tail]
max_tail = max(stats, key=stats.get)
max_count = stats[max_tail]
total = sum(stats.values())
percent = round(max_count / total * 100, 2)
others = [f"{t}({c})" for t, c in stats.items() if t != max_tail]
print(f" 和值尾数{sum_tail} → 最可能尾数{max_tail} ({max_count}次, {percent}%), 其他: {', '.join(others)}")
print(f"\n基于和值尾数±2范围的尾数预测命中率: {round(tail_prediction_hit / total_predictions * 100, 2)}% ({tail_prediction_hit}/{total_predictions})")
print("预测范围: 5个尾数,每个尾数对应约5个数字,共约25个数字")
print()
# ============ 维度5: 上期正码覆盖区间与下期特码所在区间的关系 ============
print("=== 维度5: 上期正码覆盖区间与下期特码所在区间的关系 ===")
range_cover_stats = defaultdict(lambda: defaultdict(int))
range_transfer_stats = defaultdict(lambda: defaultdict(int))
hit_in_covered_range = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
nums = [current['num1'], current['num2'], current['num3'],
current['num4'], current['num5'], current['num6']]
covers = set(get_range(num) for num in nums)
cover_str = '+'.join(sorted(covers))
next_range = get_range(next_record['num7'])
range_cover_stats[cover_str][next_range] += 1
range_transfer_stats[len(covers)][next_range] += 1
if next_range in covers:
hit_in_covered_range += 1
print("上期正码覆盖区间→下期特码区间转移:")
for cover, next_stats in sorted(range_cover_stats.items(), key=lambda x: -sum(x[1].values())):
total_cover = sum(next_stats.values())
print(f" {cover} (共{total_cover}期):")
for next_range, count in next_stats.items():
percent = round(count / total_cover * 100, 2)
print(f"{next_range}: {count}次 ({percent}%)")
print("\n上期正码覆盖区间数量与下期特码分布:")
for cover_count, stats in sorted(range_transfer_stats.items()):
total_cover = sum(stats.values())
print(f" 覆盖{cover_count}个区间 (共{total_cover}期):")
for range_name, count in stats.items():
percent = round(count / total_cover * 100, 2)
print(f" {range_name}: {count}次 ({percent}%)")
print(f"\n下期特码在上期正码覆盖区间内的比例: {round(hit_in_covered_range / total_predictions * 100, 2)}% ({hit_in_covered_range}/{total_predictions})")
print()
# ============ 维度6: 上期正码波色分布与下期特码波色的关系 ============
print("=== 维度6: 上期正码波色分布与下期特码波色的关系 ===")
color_distribution_stats = defaultdict(lambda: defaultdict(int))
dominant_color_stats = defaultdict(lambda: defaultdict(int))
dominant_prediction_hit = 0
expanded_color_prediction_hit = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
nums = [current['num1'], current['num2'], current['num3'],
current['num4'], current['num5'], current['num6']]
colors = defaultdict(int)
for num in nums:
colors[COLOR_MAP[num]] += 1
color_str = f"{colors['']}{colors['']}绿{colors['绿']}"
next_color = COLOR_MAP[next_record['num7']]
color_distribution_stats[color_str][next_color] += 1
# 主导波色
dominant_color = max(colors, key=colors.get)
dominant_color_stats[dominant_color][next_color] += 1
if dominant_color == next_color:
dominant_prediction_hit += 1
# 扩展到两种波色
top2_colors = sorted(colors, key=colors.get, reverse=True)[:2]
if next_color in top2_colors:
expanded_color_prediction_hit += 1
print("上期正码波色分布→下期特码波色转移 (出现5次以上的):")
sorted_color_dist = sorted(color_distribution_stats.items(),
key=lambda x: -sum(x[1].values()))
for dist, next_stats in sorted_color_dist:
total_dist = sum(next_stats.values())
if total_dist >= 5:
print(f" {dist} (共{total_dist}期):")
for next_color, count in next_stats.items():
percent = round(count / total_dist * 100, 2)
print(f"{next_color}: {count}次 ({percent}%)")
print("\n上期主导波色→下期特码波色转移:")
for dominant, next_stats in dominant_color_stats.items():
total_dom = sum(next_stats.values())
print(f" 主导{dominant} (共{total_dom}期):")
for next_color, count in next_stats.items():
percent = round(count / total_dom * 100, 2)
print(f"{next_color}: {count}次 ({percent}%)")
print(f"\n主导波色预测命中率: {round(dominant_prediction_hit / total_predictions * 100, 2)}% ({dominant_prediction_hit}/{total_predictions})")
print(f"扩展到两种主导波色预测命中率: {round(expanded_color_prediction_hit / total_predictions * 100, 2)}% ({expanded_color_prediction_hit}/{total_predictions})")
print()
# ============ 维度7: 上期特码与下期特码的转移关系 ============
print("=== 维度7: 上期特码与下期特码的转移关系(马尔可夫分析) ===")
special_transfer = defaultdict(lambda: defaultdict(int))
special_range_transfer = defaultdict(lambda: defaultdict(int))
special_range_prediction_hit = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
current_special = current['num7']
next_special = next_record['num7']
special_transfer[current_special][next_special] += 1
current_range = get_range(current_special)
next_range = get_range(next_special)
special_range_transfer[current_range][next_range] += 1
print("特码区间转移矩阵:")
for from_range, to_stats in special_range_transfer.items():
total_from = sum(to_stats.values())
print(f" {from_range} → :")
for to_range, count in to_stats.items():
percent = round(count / total_from * 100, 2)
print(f" {to_range}: {count}次 ({percent}%)")
# 高频特码转移
print("\n高频特码转移(出现2次以上):")
for from_num, to_stats in special_transfer.items():
for to_num, count in to_stats.items():
if count >= 2:
print(f" 特码{from_num} → 特码{to_num}: {count}")
# 基于特码区间预测
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
current_range = get_range(current['num7'])
transfer_stats = special_range_transfer[current_range]
top_ranges = sorted(transfer_stats, key=transfer_stats.get, reverse=True)[:2]
next_range = get_range(next_record['num7'])
if next_range in top_ranges:
special_range_prediction_hit += 1
print(f"\n基于特码区间转移预测(前2区间)命中率: {round(special_range_prediction_hit / total_predictions * 100, 2)}% ({special_range_prediction_hit}/{total_predictions})")
print()
# ============ 综合分析 ============
print("=== 综合分析:寻找40%以上命中率的规律 ===")
print()
combined_hits = {
'平均值±10范围': hit_count_avg,
'正码±3范围': prediction_hit_dist,
'和值尾数±2尾数范围': tail_prediction_hit,
'覆盖区间预测': hit_in_covered_range,
'主导波色预测': dominant_prediction_hit,
'双波色预测': expanded_color_prediction_hit,
'特码区间转移': special_range_prediction_hit
}
print("各维度预测命中率汇总:")
for name, hit in combined_hits.items():
percent = round(hit / total_predictions * 100, 2)
status = '【达标】' if percent >= 40 else ''
print(f" {name}: {percent}% ({hit}/{total_predictions}) {status}")
# 组合预测测试
print("\n组合预测测试:")
combo_hits = 0
combo_plus_hits = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
nums = [current['num1'], current['num2'], current['num3'],
current['num4'], current['num5'], current['num6']]
# 方法1:平均值±15范围
avg = sum(nums) / 6
avg_range = set(range(max(1, int(avg) - 15), min(50, int(avg) + 16)))
# 方法2:正码±5范围
num_range = set()
for num in nums:
for p in range(max(1, num - 5), min(50, num + 6)):
num_range.add(p)
# 方法3:和值尾数±3范围
sum_val = sum(nums)
sum_tail = get_tail(sum_val)
tail_range = set()
for t in range(sum_tail - 3, sum_tail + 4):
actual_tail = (t + 10) % 10
for n in range(1, 50):
if get_tail(n) == actual_tail:
tail_range.add(n)
# 组合1
if next_record['num7'] in avg_range or next_record['num7'] in num_range:
combo_hits += 1
# 组合2
if next_record['num7'] in avg_range or next_record['num7'] in num_range or next_record['num7'] in tail_range:
combo_plus_hits += 1
print(f"组合1(平均值±15 OR 正码±5)命中率: {round(combo_hits / total_predictions * 100, 2)}% ({combo_hits}/{total_predictions})")
print(f"组合2(平均值±15 OR 正码±5 OR 尾数±3)命中率: {round(combo_plus_hits / total_predictions * 100, 2)}% ({combo_plus_hits}/{total_predictions})")
# 波色+区间组合
print("\n波色+区间组合预测:")
color_range_combo_hit = 0
color_range_combo2_hit = 0
last_predict_count = 0
last_predict_count2 = 0
for i in range(len(history) - 1):
current = history[i]
next_record = history[i + 1]
nums = [current['num1'], current['num2'], current['num3'],
current['num4'], current['num5'], current['num6']]
colors = defaultdict(int)
ranges = defaultdict(int)
for num in nums:
colors[COLOR_MAP[num]] += 1
ranges[get_range(num)] += 1
# 前1波色+前1区间
top_color = max(colors, key=colors.get)
top_range = max(ranges, key=ranges.get)
predict_nums = {n for n in range(1, 50) if COLOR_MAP[n] == top_color and get_range(n) == top_range}
if next_record['num7'] in predict_nums:
color_range_combo_hit += 1
last_predict_count = len(predict_nums)
# 前2波色+前2区间
top2_colors = sorted(colors, key=colors.get, reverse=True)[:2]
top2_ranges = sorted(ranges, key=ranges.get, reverse=True)[:2]
predict_nums2 = {n for n in range(1, 50)
if COLOR_MAP[n] in top2_colors and get_range(n) in top2_ranges}
if next_record['num7'] in predict_nums2:
color_range_combo2_hit += 1
last_predict_count2 = len(predict_nums2)
print(f"波色+区间交集预测命中率: {round(color_range_combo_hit / total_predictions * 100, 2)}% ({color_range_combo_hit}/{total_predictions})")
print(f"预测范围大小: {last_predict_count}个数字")
print(f"前2波色+前2区间交集预测命中率: {round(color_range_combo2_hit / total_predictions * 100, 2)}% ({color_range_combo2_hit}/{total_predictions})")
print(f"预测范围大小: {last_predict_count2}个数字")
print()
# ============ 总结 ============
print("=== 总结:达到40%以上命中率的规律 ===")
print()
high_hit_rules = []
for name, hit in combined_hits.items():
percent = round(hit / total_predictions * 100, 2)
if percent >= 40:
high_hit_rules.append((name, percent, hit, total_predictions))
if high_hit_rules:
for name, percent, hit, total in high_hit_rules:
print(f"{name}】命中率: {percent}% ({hit}/{total})")
else:
print("单维度分析中没有达到40%以上命中率的规律")
print("\n组合规律命中率:")
print(f"组合1(平均值±15 OR 正码±5): {round(combo_hits / total_predictions * 100, 2)}%")
print(f"组合2(平均值±15 OR 正码±5 OR 尾数±3): {round(combo_plus_hits / total_predictions * 100, 2)}%")
print(f"前2波色+前2区间交集: {round(color_range_combo2_hit / total_predictions * 100, 2)}%")
print("\n分析完成!")
if __name__ == '__main__':
analyze()