Загрузка данных


# mega_diag_cosyvoice3.py

from pathlib import Path
import os
import sys
import json
import traceback
import inspect
import importlib
import types
import hashlib
import re
import numpy as np


# =============================================================================
# CONFIG
# =============================================================================

COSYVOICE_ROOT = Path(r"C:\Users\Geodezik\image_generation\CosyVoice_official").resolve()
MATCHA_ROOT = COSYVOICE_ROOT / "third_party" / "Matcha-TTS"
OLD_BAD_ROOT = Path(r"C:\Users\Geodezik\image_generation\cosyvoice").resolve()

MODEL_DIR = COSYVOICE_ROOT / "pretrained_models" / "Fun-CosyVoice3-0.5B"
QWEN_DIR = MODEL_DIR / "CosyVoice-BlankEN"
PROMPT_WAV = COSYVOICE_ROOT / "asset" / "zero_shot_prompt.wav"

OUT_DIR = COSYVOICE_ROOT / "_mega_diag_outputs"
OUT_DIR.mkdir(parents=True, exist_ok=True)

REPORT_PATH = OUT_DIR / "mega_diag_report.txt"


# =============================================================================
# LOGGING
# =============================================================================

_REPORT_LINES = []


def log(*args):
    s = " ".join(str(a) for a in args)
    print(s)
    _REPORT_LINES.append(s)


def header(title):
    log()
    log("=" * 110)
    log(title)
    log("=" * 110)


def section(title):
    log()
    log("-" * 110)
    log(title)
    log("-" * 110)


def save_report():
    REPORT_PATH.write_text("\n".join(_REPORT_LINES), encoding="utf-8")
    print()
    print("REPORT SAVED:", REPORT_PATH)


def try_block(title):
    def deco(fn):
        def wrapper(*args, **kwargs):
            section(title)
            try:
                return fn(*args, **kwargs)
            except Exception as e:
                log("[ERROR]", title)
                log(type(e).__name__ + ":", str(e))
                log(traceback.format_exc())
                return None
        return wrapper
    return deco


# =============================================================================
# PATH CLEANUP BEFORE IMPORTS
# =============================================================================

header("PATH SETUP")

log("cwd:", os.getcwd())
log("python:", sys.executable)
log("COSYVOICE_ROOT:", COSYVOICE_ROOT)
log("MATCHA_ROOT:", MATCHA_ROOT)
log("MODEL_DIR:", MODEL_DIR)
log("QWEN_DIR:", QWEN_DIR)
log("PROMPT_WAV:", PROMPT_WAV)
log("OUT_DIR:", OUT_DIR)

# remove old path
cleaned = []
for p in sys.path:
    try:
        rp = str(Path(p).resolve()).lower()
    except Exception:
        rp = str(p).lower()

    if str(OLD_BAD_ROOT).lower() in rp:
        log("Removing old bad sys.path:", p)
        continue

    cleaned.append(p)

sys.path = cleaned

# remove already loaded modules
for name in list(sys.modules.keys()):
    if (
        name == "cosyvoice"
        or name.startswith("cosyvoice.")
        or name == "wetext"
        or name.startswith("wetext.")
    ):
        del sys.modules[name]

sys.path.insert(0, str(MATCHA_ROOT))
sys.path.insert(0, str(COSYVOICE_ROOT))

log("sys.path first 10:")
for p in sys.path[:10]:
    log(" ", p)


# =============================================================================
# BASIC IMPORTS
# =============================================================================

header("BASIC ENV IMPORTS")

try:
    import torch
    log("torch:", torch.__version__)
    log("torch file:", torch.__file__)
    log("cuda available:", torch.cuda.is_available())
    if torch.cuda.is_available():
        log("cuda device:", torch.cuda.get_device_name(0))
        log("cuda capability:", torch.cuda.get_device_capability(0))
        log("torch.version.cuda:", torch.version.cuda)
except Exception:
    log("torch import failed")
    log(traceback.format_exc())
    raise

try:
    import soundfile as sf
    log("soundfile:", sf.__version__ if hasattr(sf, "__version__") else sf)
except Exception:
    log("soundfile failed")
    log(traceback.format_exc())
    raise

for mod in ["numpy", "scipy", "soxr", "onnxruntime", "transformers", "huggingface_hub", "hyperpyyaml", "yaml"]:
    try:
        m = importlib.import_module(mod)
        log(mod + ":", getattr(m, "__version__", "<no __version__>"), getattr(m, "__file__", ""))
        if mod == "onnxruntime":
            log("onnxruntime providers:", m.get_available_providers())
    except Exception as e:
        log("FAIL import", mod, "->", repr(e))


# =============================================================================
# COSYVOICE IMPORT CHECK
# =============================================================================

@try_block("COSYVOICE IMPORT CHECK")
def check_cosyvoice_import():
    import cosyvoice
    import cosyvoice.cli.cosyvoice as cosyvoice_cli
    import cosyvoice.cli.frontend as frontend
    import cosyvoice.utils.file_utils as file_utils

    log("cosyvoice:", cosyvoice.__file__)
    log("cosyvoice cli:", cosyvoice_cli.__file__)
    log("frontend:", frontend.__file__)
    log("file_utils:", file_utils.__file__)

    expected = str(COSYVOICE_ROOT / "cosyvoice").lower()
    actual = str(Path(cosyvoice.__file__).parent).lower()

    log("expected package dir:", expected)
    log("actual package dir:", actual)

    if not actual.startswith(expected):
        raise RuntimeError(f"WRONG COSYVOICE IMPORT: {actual}")

    return cosyvoice, cosyvoice_cli, frontend, file_utils


cosyvoice_imports = check_cosyvoice_import()
if cosyvoice_imports is None:
    save_report()
    raise SystemExit(1)

cosyvoice, cosyvoice_cli, frontend, file_utils = cosyvoice_imports


# =============================================================================
# FRONTEND / WETEXT CHECK
# =============================================================================

@try_block("FRONTEND / WETEXT CHECK")
def check_frontend():
    checks = [
        "import pynini",
        "import tn",
        "import itn",
        "from tn.chinese.normalizer import Normalizer as ZhNormalizer",
        "from tn.english.normalizer import Normalizer as EnNormalizer",
        "import wetext",
        "from wetext import Normalizer as WetextNormalizer",
    ]

    for code in checks:
        try:
            ns = {}
            exec(code, ns, ns)
            log("OK:", code)
        except Exception as e:
            log("FAIL:", code, type(e).__name__, str(e))

    try:
        import wetext
        log("wetext file:", wetext.__file__)
        from wetext import Normalizer
        log("wetext.Normalizer:", Normalizer)
        n = Normalizer()
        for text in [
            "Hello, I have 123 apples.",
            "今天是2026年6月28日。",
        ]:
            try:
                log("normalize", repr(text), "->", repr(n.normalize(text)))
            except Exception as e:
                log("normalize failed", repr(text), repr(e))
    except Exception:
        log("wetext detailed failed:")
        log(traceback.format_exc())


check_frontend()


# =============================================================================
# FILE STRUCTURE / MODEL FILES
# =============================================================================

@try_block("MODEL FILE STRUCTURE")
def check_model_files():
    log("MODEL_DIR exists:", MODEL_DIR.exists())
    log("QWEN_DIR exists:", QWEN_DIR.exists())
    log("PROMPT_WAV exists:", PROMPT_WAV.exists())

    if not MODEL_DIR.exists():
        raise FileNotFoundError(MODEL_DIR)

    log()
    log("Top-level MODEL_DIR:")
    for p in sorted(MODEL_DIR.iterdir()):
        if p.is_file():
            log(f"  FILE {p.name:45s} {p.stat().st_size / 1024 / 1024:10.2f} MB")
        elif p.is_dir():
            log(f"  DIR  {p.name}")

    log()
    log("QWEN_DIR recursive:")
    if QWEN_DIR.exists():
        for p in sorted(QWEN_DIR.rglob("*")):
            if p.is_file():
                log(f"  {str(p.relative_to(QWEN_DIR)):60s} {p.stat().st_size / 1024 / 1024:10.2f} MB")

    required_model = [
        MODEL_DIR / "cosyvoice3.yaml",
        MODEL_DIR / "llm.pt",
        MODEL_DIR / "flow.pt",
        MODEL_DIR / "hift.pt",
        MODEL_DIR / "campplus.onnx",
        MODEL_DIR / "speech_tokenizer_v3.onnx",
        MODEL_DIR / "flow.decoder.estimator.fp32.onnx",
    ]

    required_qwen = [
        QWEN_DIR / "config.json",
        QWEN_DIR / "model.safetensors",
        QWEN_DIR / "tokenizer_config.json",
        QWEN_DIR / "vocab.json",
        QWEN_DIR / "merges.txt",
    ]

    log()
    log("Required files:")
    for p in required_model + required_qwen:
        log(" ", p, "exists=", p.exists(), "size_mb=", (p.stat().st_size / 1024 / 1024 if p.exists() else None))

    # LFS pointer check
    log()
    log("LFS pointer check:")
    found_lfs = False
    for p in MODEL_DIR.rglob("*"):
        if not p.is_file():
            continue
        if p.stat().st_size > 1024 * 1024:
            continue
        try:
            txt = p.read_text(encoding="utf-8", errors="ignore")
        except Exception:
            continue
        if "version https://git-lfs.github.com/spec" in txt:
            found_lfs = True
            log("  LFS POINTER:", p)
    if not found_lfs:
        log("  no LFS pointers found in small files")


check_model_files()


# =============================================================================
# YAML CHECK
# =============================================================================

@try_block("YAML CONTENT CHECK")
def check_yaml():
    yaml_path = MODEL_DIR / "cosyvoice3.yaml"
    if not yaml_path.exists():
        raise FileNotFoundError(yaml_path)

    txt = yaml_path.read_text(encoding="utf-8", errors="ignore")
    log("cosyvoice3.yaml path:", yaml_path)
    log("cosyvoice3.yaml first 12000 chars:")
    log(txt[:12000])

    interesting = []
    for i, line in enumerate(txt.splitlines(), start=1):
        lower = line.lower()
        if any(k in lower for k in ["qwen", "llm", "flow", "hift", "tokenizer", "pretrain", "checkpoint", "speech_tokenizer"]):
            interesting.append((i, line))

    log()
    log("Interesting yaml lines:")
    for i, line in interesting:
        log(f"{i:04d}: {line}")


check_yaml()


# =============================================================================
# HASHES / CKPT STRUCTURE
# =============================================================================

def sha256_head(path: Path, max_mb=64):
    h = hashlib.sha256()
    total = 0
    limit = max_mb * 1024 * 1024
    with path.open("rb") as f:
        while total < limit:
            chunk = f.read(min(1024 * 1024, limit - total))
            if not chunk:
                break
            h.update(chunk)
            total += len(chunk)
    return h.hexdigest(), total


@try_block("CHECKPOINT STRUCTURE")
def check_checkpoints():
    import torch

    files = [
        MODEL_DIR / "llm.pt",
        MODEL_DIR / "llm.rl.pt",
        MODEL_DIR / "flow.pt",
        MODEL_DIR / "hift.pt",
        QWEN_DIR / "model.safetensors",
    ]

    for p in files:
        log()
        log("FILE:", p)
        log("exists:", p.exists())
        if not p.exists():
            continue
        log("size MB:", p.stat().st_size / 1024 / 1024)
        try:
            digest, n = sha256_head(p, max_mb=64)
            log("sha256 first", n, "bytes:", digest)
        except Exception as e:
            log("hash failed:", repr(e))

        if p.suffix == ".pt":
            try:
                obj = torch.load(str(p), map_location="cpu")
                log("torch.load type:", type(obj))
                if isinstance(obj, dict):
                    keys = list(obj.keys())
                    log("num top keys:", len(keys))
                    log("first 40 keys:", keys[:40])
                    # nested state dict check
                    for k in keys[:20]:
                        v = obj[k]
                        if isinstance(v, torch.Tensor):
                            log(f"  tensor key {k}: shape={tuple(v.shape)} dtype={v.dtype}")
                        elif isinstance(v, dict):
                            log(f"  dict key {k}: len={len(v)} first={list(v.keys())[:10]}")
                else:
                    log("repr:", repr(obj)[:1000])
            except Exception as e:
                log("torch.load failed:", type(e).__name__, str(e))


check_checkpoints()


# =============================================================================
# TOKENIZER CHECK
# =============================================================================

@try_block("QWEN TOKENIZER CHECK")
def check_tokenizer():
    from transformers import AutoTokenizer

    log("Loading tokenizer from:", QWEN_DIR)
    tok = AutoTokenizer.from_pretrained(
        str(QWEN_DIR),
        trust_remote_code=True,
        local_files_only=True,
    )

    log("tokenizer class:", tok.__class__)
    log("vocab size:", getattr(tok, "vocab_size", None))
    log("len(tokenizer):", len(tok))
    log("special tokens map:", tok.special_tokens_map)
    log("all special tokens:", tok.all_special_tokens)
    log("all special ids:", tok.all_special_ids)

    for special in [
        "<|endofprompt|>",
        "<|im_start|>",
        "<|im_end|>",
        "[breath]",
        "<|zh|>",
        "<|en|>",
        "<|ja|>",
        "<|ko|>",
        "<|yue|>",
    ]:
        try:
            tid = tok.convert_tokens_to_ids(special)
            log("special", repr(special), "-> id", tid)
        except Exception as e:
            log("special check failed", special, repr(e))

    texts = [
        "You are a helpful assistant.<|endofprompt|>希望你以后能够做的比我还好呦。",
        "八百标兵奔北坡,北坡炮兵并排跑,炮兵怕把标兵碰,标兵怕碰炮兵炮。",
        "Hello. This is a simple English test.",
        "Это короткий тест русской речи.",
        "<|endofprompt|>",
        "希望你以后能够做的比我还好呦。",
    ]

    for text in texts:
        enc = tok(text, return_tensors=None, add_special_tokens=False)
        ids = enc["input_ids"]
        toks = tok.convert_ids_to_tokens(ids[:80])
        decoded = tok.decode(ids)
        log()
        log("TEXT:", repr(text))
        log("num ids:", len(ids))
        log("first ids:", ids[:80])
        log("first tokens:", toks)
        log("decoded:", repr(decoded[:500]))

    return tok


tokenizer_obj = check_tokenizer()


# =============================================================================
# PROMPT AUDIO CHECK
# =============================================================================

@try_block("PROMPT AUDIO CHECK")
def check_audio():
    import soundfile as sf
    x, sr = sf.read(str(PROMPT_WAV), dtype="float32", always_2d=True)
    log("PROMPT_WAV:", PROMPT_WAV)
    log("sr:", sr)
    log("shape:", x.shape)
    log("duration:", x.shape[0] / sr)
    log("min/max/std:", float(x.min()), float(x.max()), float(x.std()))
    return x, sr


check_audio()


# =============================================================================
# NO FFMPEG LOADER
# =============================================================================

def resample_np(audio_1d: np.ndarray, sr: int, target_sr: int) -> np.ndarray:
    audio_1d = audio_1d.astype(np.float32, copy=False)
    if sr == target_sr:
        return audio_1d

    try:
        import soxr
        return soxr.resample(audio_1d, sr, target_sr).astype(np.float32, copy=False)
    except Exception as e:
        log("soxr failed:", repr(e))

    try:
        from scipy.signal import resample_poly
        from math import gcd
        g = gcd(sr, target_sr)
        return resample_poly(audio_1d, target_sr // g, sr // g).astype(np.float32, copy=False)
    except Exception as e:
        log("scipy resample failed:", repr(e))
        raise


def load_wav_no_torchcodec(wav, target_sr, min_sr=16000):
    import torch
    import soundfile as sf

    audio, sr = sf.read(str(wav), dtype="float32", always_2d=True)
    mono = audio.mean(axis=1).astype(np.float32, copy=False)

    log()
    log("[load_wav_no_torchcodec]")
    log(" file:", wav)
    log(" sr:", sr, "target_sr:", target_sr)
    log(" shape:", audio.shape)
    log(" duration:", len(mono) / sr)
    log(" min/max/std:", float(mono.min()), float(mono.max()), float(mono.std()))

    if sr < min_sr:
        raise ValueError(f"sample rate {sr} is lower than min_sr {min_sr}")

    mono = resample_np(mono, sr, target_sr)
    speech = torch.from_numpy(mono).unsqueeze(0)

    log(" out shape:", tuple(speech.shape))
    log(" out duration:", speech.shape[1] / target_sr)
    log(" out min/max/std:", speech.min().item(), speech.max().item(), speech.std().item())
    return speech


# Patch loader
file_utils.load_wav = load_wav_no_torchcodec
frontend.load_wav = load_wav_no_torchcodec


# =============================================================================
# AUTOMODEL SIGNATURE / SOURCE CHECK
# =============================================================================

@try_block("AUTOMODEL / CLASS SIGNATURES")
def check_signatures():
    from cosyvoice.cli.cosyvoice import AutoModel

    log("AutoModel:", AutoModel)
    try:
        log("AutoModel signature:", inspect.signature(AutoModel))
    except Exception as e:
        log("AutoModel signature failed:", repr(e))

    try:
        src = inspect.getsource(AutoModel)
        log("AutoModel source first 5000:")
        log(src[:5000])
    except Exception as e:
        log("AutoModel source failed:", repr(e))

    for cls_name in ["CosyVoice", "CosyVoice2", "CosyVoice3"]:
        if hasattr(cosyvoice_cli, cls_name):
            cls = getattr(cosyvoice_cli, cls_name)
            log()
            log(cls_name, cls)
            try:
                log("signature:", inspect.signature(cls.__init__))
            except Exception as e:
                log("signature failed:", repr(e))
            try:
                src = inspect.getsource(cls.__init__)
                log("__init__ source first 5000:")
                log(src[:5000])
            except Exception as e:
                log("__init__ source failed:", repr(e))


check_signatures()


# =============================================================================
# MODEL LOAD
# =============================================================================

@try_block("LOAD MODEL")
def load_model():
    from cosyvoice.cli.cosyvoice import AutoModel

    log("Trying AutoModel(model_dir, fp16=False)")
    model = AutoModel(
        model_dir=str(MODEL_DIR),
        fp16=False,
    )

    log("model type:", type(model))
    log("sample_rate:", model.sample_rate)
    log("frontend type:", type(model.frontend))
    log("inner model type:", type(model.model))

    # inspect top attrs
    log()
    log("CosyVoice object vars:")
    for k, v in sorted(vars(model).items()):
        log(" ", k, "=", type(v), repr(v)[:300])

    log()
    log("Inner model vars:")
    for k, v in sorted(vars(model.model).items()):
        if any(s in k.lower() for s in ["llm", "flow", "hift", "token", "qwen", "text", "speech"]):
            log(" ", k, "=", type(v), repr(v)[:300])

    return model


cosyvoice_model = load_model()

if cosyvoice_model is None:
    save_report()
    raise SystemExit(1)


# =============================================================================
# INSPECT MODEL MODULE TREE
# =============================================================================

@try_block("MODEL MODULE TREE / PARAM DEVICES")
def inspect_model_tree():
    import torch

    m = cosyvoice_model.model

    log("model.model class:", m.__class__)
    log()
    log("Top-level children:")
    for name, child in m.named_children():
        log(" ", name, "->", child.__class__)

    log()
    log("Named modules containing qwen/llm/text/token:")
    count = 0
    for name, mod in m.named_modules():
        lname = name.lower()
        cname = mod.__class__.__name__.lower()
        if any(s in lname or s in cname for s in ["qwen", "llm", "text", "token"]):
            log(" ", name, "->", mod.__class__)
            count += 1
            if count > 200:
                log(" ... truncated")
                break

    log()
    log("First 80 parameters:")
    i = 0
    for name, p in m.named_parameters():
        log(f" {name:80s} shape={tuple(p.shape)} dtype={p.dtype} device={p.device} mean={p.detach().float().mean().item():.6g} std={p.detach().float().std().item():.6g}")
        i += 1
        if i >= 80:
            break


inspect_model_tree()


# =============================================================================
# OPTIONAL: HOOK LLM OUTPUT TOKENS? SAFE HIGH-LEVEL TESTS
# =============================================================================

def save_outs(test_name, outs, sample_rate):
    import torch
    import soundfile as sf

    outs = list(outs)
    log("chunks:", len(outs))

    if not outs:
        raise RuntimeError("No output chunks")

    for i, o in enumerate(outs):
        log(" chunk", i, "keys:", list(o.keys()))
        if "tts_speech" in o:
            t = o["tts_speech"]
            log("  tts_speech:", tuple(t.shape), "min/max/std", t.min().item(), t.max().item(), t.std().item())

    wav = torch.cat([o["tts_speech"] for o in outs], dim=1)
    audio = wav.detach().float().cpu().squeeze(0).numpy()
    audio = np.clip(audio, -1.0, 1.0)

    out_path = OUT_DIR / f"{test_name}.wav"
    sf.write(str(out_path), audio, sample_rate, subtype="PCM_16")

    log("concat shape:", tuple(wav.shape))
    log("duration:", wav.shape[1] / sample_rate)
    log("min/max/std:", wav.min().item(), wav.max().item(), wav.std().item())
    log("saved:", out_path)

    # dropout analysis
    sr = sample_rate
    frame_ms = 20
    frame = int(sr * frame_ms / 1000)
    n = len(audio) // frame
    if n > 0:
        a = audio[:n * frame].reshape(n, frame)
        rms = np.sqrt(np.mean(a ** 2, axis=1) + 1e-12)
        db = 20 * np.log10(rms + 1e-12)
        silent = db < -45
        log("dropout db min/mean/max:", float(db.min()), float(db.mean()), float(db.max()))
        log("silent frames < -45dB:", int(silent.sum()), "/", n, f"({silent.mean() * 100:.1f}%)")

    return out_path


@try_block("RUN TEST 01: OFFICIAL ZH ZERO-SHOT")
def run_zh_zero():
    TEXT_ZH = "八百标兵奔北坡,北坡炮兵并排跑,炮兵怕把标兵碰,标兵怕碰炮兵炮。"
    PROMPT_TEXT_ZH = "You are a helpful assistant.<|endofprompt|>希望你以后能够做的比我还好呦。"

    outs = cosyvoice_model.inference_zero_shot(
        TEXT_ZH,
        PROMPT_TEXT_ZH,
        str(PROMPT_WAV),
        stream=False,
        speed=1.0,
    )
    return save_outs("01_official_zh_zero_shot", outs, cosyvoice_model.sample_rate)


@try_block("RUN TEST 02: EN ZERO-SHOT WITH CHINESE PROMPT TRANSCRIPT")
def run_en_zero_correct():
    TEXT_EN = (
        "Hello. This is a simple English test. "
        "I am checking whether the speech is clear, continuous, and understandable."
    )
    PROMPT_TEXT_ZH = "You are a helpful assistant.<|endofprompt|>希望你以后能够做的比我还好呦。"

    outs = cosyvoice_model.inference_zero_shot(
        TEXT_EN,
        PROMPT_TEXT_ZH,
        str(PROMPT_WAV),
        stream=False,
        speed=1.0,
    )
    return save_outs("02_en_zero_correct_chinese_prompt", outs, cosyvoice_model.sample_rate)


@try_block("RUN TEST 03: EN CROSS-LINGUAL")
def run_en_cross():
    TEXT = (
        "You are a helpful assistant.<|endofprompt|>"
        "Hello. This is a simple English test. "
        "I am checking whether the speech is clear, continuous, and understandable."
    )
    outs = cosyvoice_model.inference_cross_lingual(
        TEXT,
        str(PROMPT_WAV),
        stream=False,
        speed=1.0,
    )
    return save_outs("03_en_cross_lingual", outs, cosyvoice_model.sample_rate)


@try_block("RUN TEST 04: ZH CROSS-LINGUAL")
def run_zh_cross():
    TEXT = (
        "You are a helpful assistant.<|endofprompt|>"
        "八百标兵奔北坡,北坡炮兵并排跑,炮兵怕把标兵碰,标兵怕碰炮兵炮。"
    )
    outs = cosyvoice_model.inference_cross_lingual(
        TEXT,
        str(PROMPT_WAV),
        stream=False,
        speed=1.0,
    )
    return save_outs("04_zh_cross_lingual", outs, cosyvoice_model.sample_rate)


run_zh_zero()
run_en_zero_correct()
run_en_cross()
run_zh_cross()


# =============================================================================
# FRONTEND DIRECT OUTPUT CHECK
# =============================================================================

@try_block("FRONTEND DIRECT METHOD CHECK")
def check_frontend_methods():
    fe = cosyvoice_model.frontend
    log("frontend object:", fe)
    log("frontend vars:")
    for k, v in sorted(vars(fe).items()):
        log(" ", k, "=", type(v), repr(v)[:300])

    log()
    log("frontend callable methods:")
    for name in dir(fe):
        if name.startswith("_"):
            continue
        obj = getattr(fe, name)
        if callable(obj):
            try:
                sig = inspect.signature(obj)
            except Exception:
                sig = "<no sig>"
            log(" ", name, sig)

    # Try common frontend methods, but only safe ones.
    candidate_calls = [
        ("text_normalize", ("Hello. This is 123.", True)),
        ("text_normalize", ("八百标兵奔北坡。", True)),
    ]

    for method_name, args in candidate_calls:
        if hasattr(fe, method_name):
            try:
                log()
                log("Calling frontend." + method_name, args)
                res = getattr(fe, method_name)(*args)
                log(" result type:", type(res), "repr:", repr(res)[:1000])
            except Exception as e:
                log(" failed:", type(e).__name__, str(e))


check_frontend_methods()


# =============================================================================
# FINAL SUMMARY
# =============================================================================

header("SUMMARY / IMPORTANT OBSERVATIONS TO CHECK MANUALLY")

log("1. Check whether log says 'use wetext frontend'. If yes, frontend is active.")
log("2. Check QWEN_DIR contents. model.safetensors should be large, roughly ~1 GB.")
log("3. Check tokenizer output: <|endofprompt|> should map to a real special id, not be split into many normal pieces.")
log("4. Check whether official zh zero-shot is intelligible. If not, problem is below application text formatting.")
log("5. If timbre transfers but words are garbage, likely LLM/text-token path is broken: Qwen encoder/tokenizer/llm.pt mismatch.")
log("6. If all generated files have many silent frames and broken syllables, also suspect torch/CUDA/Windows runtime.")
log("7. Outputs are in: " + str(OUT_DIR))
log("8. Full report: " + str(REPORT_PATH))

save_report()