在 github 上获取免费节点,订阅链接列表可以自己添加上去,理论上每天有无数的节点,不崇洋,也不要被忽悠,打破信息蚕房,独立思考。
import requests
import yaml
import base64
# 定义订阅链接列表
SUBSCRIPTION_URLS = ["https://raw.githubusercontent.com/Surfboardv2ray/TGParse/main/splitted/mixed"
    
"https://raw.githubusercontent.com/ripaojiedian/freenode/main/sub"
    "https://raw.githubusercontent.com/roosterkid/openproxylist/main/V2RAY_RAW.txt"
    "https://github.com/andrewji8/V2ray-Configs/blob/main/All_Configs_base64_Sub.txt"
]
def fetch_subscription(url):
    """获取订阅内容"""
    try:
        response = requests.get(url, timeout=10, verify=False)  # 禁用 SSL 验证
        if response.status_code == 200:
            return response.text.strip()  # 去除多余的空白字符
        else:
            print(f"无法获取 {url} 的内容,状态码: {response.status_code}")
            return None
    except Exception as e:
        print(f"请求 {url} 失败: {e}")
        return None
def decode_base64(content):
    """解码 Base64 编码的内容"""
    try:
        decoded_content = base64.b64decode(content).decode("utf-8")
        return decoded_content
    except Exception as e:
        print(f"Base64 解码失败: {e}")
        return None
def parse_yaml(content):
    """解析 YAML 内容"""
    try:
        data = yaml.safe_load(content)
        if isinstance(data, dict):  # 如果是字典类型,直接返回
            return data
        elif isinstance(data, str):  # 如果是字符串类型,包装为字典
            return {"proxies": [data]}  # 假设内容是代理节点列表
        else:
            print(f"解析结果不是字典或字符串类型: {type(data)}")
            return None
    except Exception as e:
        print(f"解析 YAML 失败: {e}")
        return None
def merge_subscriptions(subscriptions):
    """合并多个订阅内容,并去除重复项"""
    merged_data = {}
    for sub in subscriptions:
        if not sub or not isinstance(sub, dict):  # 确保订阅内容是字典类型
            print("跳过无效的订阅内容")
            continue
        for key, value in sub.items():
            if key not in merged_data:
                merged_data[key] = value
            elif isinstance(value, list) and isinstance(merged_data[key], list):
                # 合并列表并去重
                merged_data[key] = list(set(merged_data[key] + value))
            elif isinstance(value, dict) and isinstance(merged_data[key], dict):
                # 如果是字典类型,递归合并
                merged_data[key].update(value)
    return merged_data
def save_to_files(data, group_size=12000, base_filename="subscription_group"):
    """将数据分组保存到多个文件"""
    if "proxies" not in data:
        print("没有代理节点可以分组")
        return
    
    proxies = data["proxies"]
    total_groups = (len(proxies) + group_size - 1) // group_size  # 计算分组数量
    for i in range(total_groups):
        group_proxies = proxies[i * group_size:(i + 1) * group_size]
        group_data = {**data, "proxies": group_proxies}  # 创建分组数据
        filename = f"{base_filename}_{i + 1}.yaml"
        with open(filename, "w", encoding="utf-8") as f:
            yaml.dump(group_data, f, allow_unicode=True)
        print(f"分组 {i + 1} 已保存到 {filename}")
def split_yaml_file(input_filename, output_base_filename="split_subscription", num_splits=3):
    """
    将一个 YAML 文件分割成多个文件。
    
    :param input_filename: 输入的 YAML 文件名
    :param output_base_filename: 输出文件的基础名称
    :param num_splits: 分割的文件数量
    """
    try:
        # 读取输入的 YAML 文件
        with open(input_filename, "r", encoding="utf-8") as f:
            data = yaml.safe_load(f)
        
        if "proxies" not in data:
            print("没有代理节点可以分割")
            return
        
        proxies = data["proxies"]
        total_proxies = len(proxies)
        print(f"总节点数: {total_proxies}")
        
        # 计算每组的大小
        group_size = (total_proxies + num_splits - 1) // num_splits  # 向上取整
        
        # 分割并保存到多个文件
        for i in range(num_splits):
            start_index = i * group_size
            end_index = min((i + 1) * group_size, total_proxies)
            group_proxies = proxies[start_index:end_index]
            
            if not group_proxies:
                print(f"分组 {i + 1} 没有节点,跳过保存")
                continue
            
            # 创建分组数据
            group_data = {**data, "proxies": group_proxies}
            output_filename = f"{output_base_filename}_{i + 1}.yaml"
            
            # 保存到文件
            with open(output_filename, "w", encoding="utf-8") as f:
                yaml.dump(group_data, f, allow_unicode=True)
            print(f"分组 {i + 1} 已保存到 {output_filename}")
    
    except Exception as e:
        print(f"分割文件失败: {e}")
def main():
    # 获取所有订阅内容
    subscription_contents = [fetch_subscription(url) for url in SUBSCRIPTION_URLS]
    # 解码 Base64 并解析为 YAML 格式
    parsed_subscriptions = []
    for content in subscription_contents:
        if content:
            print(f"原始内容: {content[:100]}...")  # 打印前 100 个字符
            decoded_content = decode_base64(content)
            if decoded_content:
                print(f"解码后内容: {decoded_content[:100]}...")  # 打印前 100 个字符
                data = parse_yaml(decoded_content)
                print(f"解析后数据类型: {type(data)}")
                parsed_subscriptions.append(data)
    # 合并订阅内容
    merged_data = merge_subscriptions(parsed_subscriptions)
    # 分组保存到多个文件,每组 12000 个节点
    if merged_data:
        save_to_files(merged_data, group_size=12000)  # 每组 12000 个节点
    else:
        print("没有有效的订阅内容可以合并")
    # 将生成的第一个分组文件分割成 3 个子文件
    input_filename = "subscription_group_1.yaml"
    split_yaml_file(input_filename, output_base_filename="split_subscription", num_splits=3)
if __name__ == "__main__":
    main()