Загрузка данных


from pathlib import Path
import sys
import os
import shutil
import zipfile
import urllib.request
import subprocess
import textwrap
import time


# ============================================================
# CONFIG
# ============================================================

BASE_DIR = Path(r"C:\Users\Geodezik\image_generation").resolve()

OLD_COSYVOICE_DIR = BASE_DIR / "cosyvoice"
OLD_MODEL_DIR = OLD_COSYVOICE_DIR / "pretrained_models" / "Fun-CosyVoice3-0.5B-2512"

NEW_COSYVOICE_DIR = BASE_DIR / "CosyVoice_official"
NEW_MODEL_DIR = NEW_COSYVOICE_DIR / "pretrained_models" / "Fun-CosyVoice3-0.5B"

DOWNLOAD_URL = "https://github.com/FunAudioLLM/CosyVoice/archive/refs/heads/main.zip"

DOWNLOADS_DIR = BASE_DIR / "_downloads"
ZIP_PATH = DOWNLOADS_DIR / "CosyVoice-main.zip"
EXTRACT_DIR = DOWNLOADS_DIR / "CosyVoice-main"

DIAG_SCRIPT = NEW_COSYVOICE_DIR / "run_cosyvoice3_diag.py"

BACKUP_SUFFIX = time.strftime("%Y%m%d_%H%M%S")


# ============================================================
# HELPERS
# ============================================================

def print_header(title):
    print()
    print("=" * 90)
    print(title)
    print("=" * 90)


def run(cmd, cwd=None, check=True):
    print()
    print("[RUN]", " ".join(map(str, cmd)))
    if cwd:
        print("[CWD]", cwd)
    return subprocess.run(cmd, cwd=str(cwd) if cwd else None, check=check)


def remove_dir(path: Path):
    if path.exists():
        print(f"Removing: {path}")
        shutil.rmtree(path, ignore_errors=False)


def copytree_overwrite(src: Path, dst: Path):
    if dst.exists():
        print(f"Removing existing dst: {dst}")
        shutil.rmtree(dst)
    print(f"Copying:\n  from: {src}\n  to:   {dst}")
    shutil.copytree(src, dst)


def check_file(path: Path, required=True):
    print(f"{path} -> exists={path.exists()}")
    if required and not path.exists():
        raise FileNotFoundError(str(path))


def install_package(pkg):
    run([sys.executable, "-m", "pip", "install", pkg], check=True)


# ============================================================
# START
# ============================================================

print_header("CosyVoice3 Windows no-git fixer")

print("Python:", sys.executable)
print("BASE_DIR:", BASE_DIR)
print("OLD_COSYVOICE_DIR:", OLD_COSYVOICE_DIR)
print("OLD_MODEL_DIR:", OLD_MODEL_DIR)
print("NEW_COSYVOICE_DIR:", NEW_COSYVOICE_DIR)
print("NEW_MODEL_DIR:", NEW_MODEL_DIR)

if not OLD_MODEL_DIR.exists():
    raise FileNotFoundError(
        f"Не найдена старая папка модели:\n{OLD_MODEL_DIR}\n\n"
        f"Проверь путь OLD_MODEL_DIR в начале скрипта."
    )


# ============================================================
# STEP 1: ensure basic packages
# ============================================================

print_header("Step 1: checking Python packages")

needed = [
    ("soundfile", "soundfile"),
    ("numpy", "numpy"),
    ("torch", "torch"),
    ("torchaudio", "torchaudio"),
    ("onnxruntime", "onnxruntime"),
]

for import_name, pip_name in needed:
    try:
        __import__(import_name)
        print(f"OK import {import_name}")
    except Exception as e:
        print(f"Missing/broken {import_name}: {repr(e)}")
        answer = input(f"Install {pip_name}? [y/N]: ").strip().lower()
        if answer == "y":
            install_package(pip_name)
        else:
            print(f"Skip installing {pip_name}")


# ============================================================
# STEP 2: download official repo zip
# ============================================================

print_header("Step 2: downloading official CosyVoice zip")

DOWNLOADS_DIR.mkdir(parents=True, exist_ok=True)

if ZIP_PATH.exists():
    print("ZIP already exists:", ZIP_PATH)
    answer = input("Re-download zip? [y/N]: ").strip().lower()
    if answer == "y":
        ZIP_PATH.unlink()
    else:
        print("Using existing zip.")

if not ZIP_PATH.exists():
    print("Downloading:")
    print(" ", DOWNLOAD_URL)
    print("to:")
    print(" ", ZIP_PATH)

    with urllib.request.urlopen(DOWNLOAD_URL) as response:
        total = response.length
        downloaded = 0
        chunk_size = 1024 * 1024

        with open(ZIP_PATH, "wb") as f:
            while True:
                chunk = response.read(chunk_size)
                if not chunk:
                    break
                f.write(chunk)
                downloaded += len(chunk)
                if total:
                    pct = downloaded * 100 / total
                    print(f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB / {total / 1024 / 1024:.1f} MB ({pct:.1f}%)", end="")
                else:
                    print(f"\rDownloaded {downloaded / 1024 / 1024:.1f} MB", end="")
    print()

print("ZIP_PATH:", ZIP_PATH)
print("ZIP size MB:", ZIP_PATH.stat().st_size / 1024 / 1024)


# ============================================================
# STEP 3: extract zip
# ============================================================

print_header("Step 3: extracting zip")

remove_dir(EXTRACT_DIR)

print("Extracting:", ZIP_PATH)
with zipfile.ZipFile(ZIP_PATH, "r") as z:
    z.extractall(DOWNLOADS_DIR)

check_file(EXTRACT_DIR / "cosyvoice" / "__init__.py", required=True)
check_file(EXTRACT_DIR / "cosyvoice" / "cli" / "cosyvoice.py", required=True)
check_file(EXTRACT_DIR / "asset" / "zero_shot_prompt.wav", required=True)


# ============================================================
# STEP 4: install/copy official repo
# ============================================================

print_header("Step 4: installing official repo layout")

if NEW_COSYVOICE_DIR.exists():
    print("NEW_COSYVOICE_DIR already exists:", NEW_COSYVOICE_DIR)
    backup_dir = BASE_DIR / f"CosyVoice_official_backup_{BACKUP_SUFFIX}"
    answer = input(f"Backup existing CosyVoice_official to {backup_dir.name} and replace? [Y/n]: ").strip().lower()

    if answer in ["", "y", "yes"]:
        print("Backing up:")
        print(" ", NEW_COSYVOICE_DIR)
        print("to:")
        print(" ", backup_dir)
        shutil.move(str(NEW_COSYVOICE_DIR), str(backup_dir))
    else:
        raise RuntimeError("User aborted to avoid overwriting existing CosyVoice_official.")

print("Copying official repo:")
print(" from:", EXTRACT_DIR)
print(" to:  ", NEW_COSYVOICE_DIR)
shutil.copytree(EXTRACT_DIR, NEW_COSYVOICE_DIR)

check_file(NEW_COSYVOICE_DIR / "cosyvoice" / "__init__.py", required=True)
check_file(NEW_COSYVOICE_DIR / "cosyvoice" / "cli" / "cosyvoice.py", required=True)
check_file(NEW_COSYVOICE_DIR / "asset" / "zero_shot_prompt.wav", required=True)


# ============================================================
# STEP 5: copy model
# ============================================================

print_header("Step 5: copying model")

NEW_MODEL_DIR.parent.mkdir(parents=True, exist_ok=True)

if NEW_MODEL_DIR.exists():
    print("NEW_MODEL_DIR already exists:", NEW_MODEL_DIR)
    answer = input("Replace model folder? [Y/n]: ").strip().lower()
    if answer in ["", "y", "yes"]:
        shutil.rmtree(NEW_MODEL_DIR)
    else:
        print("Keeping existing model folder.")

if not NEW_MODEL_DIR.exists():
    print("Copying model:")
    print(" from:", OLD_MODEL_DIR)
    print(" to:  ", NEW_MODEL_DIR)
    shutil.copytree(OLD_MODEL_DIR, NEW_MODEL_DIR)

print()
print("New model files:")
for p in sorted(NEW_MODEL_DIR.iterdir()):
    if p.is_file():
        print(f"  FILE {p.name:45s} {p.stat().st_size / 1024 / 1024:10.2f} MB")
    elif p.is_dir():
        print(f"  DIR  {p.name}")


# ============================================================
# STEP 6: LFS pointer check
# ============================================================

print_header("Step 6: LFS pointer check")

lfs_found = False
for p in NEW_MODEL_DIR.rglob("*"):
    if not p.is_file():
        continue
    if p.stat().st_size > 1024 * 1024:
        continue

    try:
        txt = p.read_text(encoding="utf-8", errors="ignore")
    except Exception:
        continue

    if "version https://git-lfs.github.com/spec" in txt:
        lfs_found = True
        print("LFS POINTER:", p)

if lfs_found:
    raise RuntimeError(
        "В модели найдены Git LFS pointer-файлы вместо настоящих весов. "
        "Нужно скачать модель через huggingface/modelscope snapshot_download или вручную."
    )
else:
    print("OK: no LFS pointers found in small files.")


# ============================================================
# STEP 7: create diagnostic script
# ============================================================

print_header("Step 7: creating diagnostic script")

diag_code = r'''
from pathlib import Path
import sys
import os
import numpy as np
import torch
import soundfile as sf

# Optional display in Jupyter
try:
    from IPython.display import Audio, display
    HAS_IPYTHON = True
except Exception:
    HAS_IPYTHON = False


COSYVOICE_ROOT = Path(r"C:\Users\Geodezik\image_generation\CosyVoice_official").resolve()
MATCHA_ROOT = COSYVOICE_ROOT / "third_party" / "Matcha-TTS"

# ВАЖНО:
# В sys.path добавляем именно корень репозитория CosyVoice_official,
# внутри которого есть папка cosyvoice/
sys.path.insert(0, str(MATCHA_ROOT))
sys.path.insert(0, str(COSYVOICE_ROOT))

MODEL_DIR = COSYVOICE_ROOT / "pretrained_models" / "Fun-CosyVoice3-0.5B"
PROMPT_WAV = COSYVOICE_ROOT / "asset" / "zero_shot_prompt.wav"
OUT_DIR = COSYVOICE_ROOT / "_diag_outputs"
OUT_DIR.mkdir(parents=True, exist_ok=True)

OUTPUT_WAV = OUT_DIR / "official_zh_zero_shot.wav"


# ============================================================
# no ffmpeg / no torchcodec loader
# ============================================================

import torchaudio.functional as AF

def load_wav_no_torchcodec(wav, target_sr, min_sr=16000):
    wav = Path(wav)

    audio, sr = sf.read(str(wav), dtype="float32", always_2d=True)

    # soundfile: [samples, channels]
    # torch expected: [channels, samples]
    speech = torch.from_numpy(audio.T)

    # force mono
    speech = speech.mean(dim=0, keepdim=True)

    print()
    print("[load_wav_no_torchcodec]")
    print("  file:", wav)
    print("  exists:", wav.exists())
    print("  original sr:", sr)
    print("  original shape:", audio.shape)
    print("  duration:", audio.shape[0] / sr)
    print("  min/max/std:", speech.min().item(), speech.max().item(), speech.std().item())

    if sr < min_sr:
        raise ValueError(f"sample rate {sr} is lower than min_sr {min_sr}")

    if sr != target_sr:
        speech = AF.resample(speech, sr, target_sr)
        print("  resampled to:", target_sr)
        print("  resampled shape:", tuple(speech.shape))
        print("  resampled duration:", speech.shape[1] / target_sr)
        print("  resampled min/max/std:", speech.min().item(), speech.max().item(), speech.std().item())

    return speech


# ============================================================
# imports and monkey patch
# ============================================================

import cosyvoice
import cosyvoice.cli.cosyvoice as cosyvoice_cli
import cosyvoice.utils.file_utils as file_utils
import cosyvoice.cli.frontend as frontend

file_utils.load_wav = load_wav_no_torchcodec
frontend.load_wav = load_wav_no_torchcodec

from cosyvoice.cli.cosyvoice import AutoModel


print("=" * 90)
print("IMPORT CHECK")
print("=" * 90)
print("cwd:", os.getcwd())
print("COSYVOICE_ROOT:", COSYVOICE_ROOT)
print("MATCHA_ROOT:", MATCHA_ROOT)
print("cosyvoice package:", cosyvoice.__file__)
print("cosyvoice cli:", cosyvoice_cli.__file__)
print("frontend:", frontend.__file__)
print("file_utils:", file_utils.__file__)

expected = str(COSYVOICE_ROOT / "cosyvoice")
actual = str(Path(cosyvoice.__file__).parent)

print()
print("expected package dir starts with:", expected)
print("actual package dir:", actual)

if not actual.lower().startswith(expected.lower()):
    raise RuntimeError(
        "Импортируется неправильный cosyvoice package.\n"
        f"Expected inside: {expected}\n"
        f"Actual: {actual}"
    )


print()
print("=" * 90)
print("ENV CHECK")
print("=" * 90)
print("python:", sys.executable)
print("torch:", torch.__version__)
print("cuda available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("cuda device:", torch.cuda.get_device_name(0))

try:
    import torchaudio
    print("torchaudio:", torchaudio.__version__)
except Exception as e:
    print("torchaudio error:", repr(e))

try:
    import onnxruntime as ort
    print("onnxruntime:", ort.__version__)
    print("onnxruntime providers:", ort.get_available_providers())
except Exception as e:
    print("onnxruntime error:", repr(e))


print()
print("=" * 90)
print("FILES CHECK")
print("=" * 90)
print("MODEL_DIR:", MODEL_DIR)
print("MODEL_DIR exists:", MODEL_DIR.exists())
print("PROMPT_WAV:", PROMPT_WAV)
print("PROMPT_WAV exists:", PROMPT_WAV.exists())

if not MODEL_DIR.exists():
    raise FileNotFoundError(MODEL_DIR)

if not PROMPT_WAV.exists():
    raise FileNotFoundError(PROMPT_WAV)

print()
print("Model files:")
for p in sorted(MODEL_DIR.iterdir()):
    if p.is_file():
        print(f"  FILE {p.name:45s} {p.stat().st_size / 1024 / 1024:10.2f} MB")
    elif p.is_dir():
        print(f"  DIR  {p.name}")


print()
print("=" * 90)
print("PROMPT AUDIO CHECK")
print("=" * 90)
x, sr = sf.read(str(PROMPT_WAV), dtype="float32", always_2d=True)
print("sr:", sr)
print("shape:", x.shape)
print("duration:", x.shape[0] / sr)
print("min/max/std:", float(x.min()), float(x.max()), float(x.std()))


print()
print("=" * 90)
print("LOAD MODEL")
print("=" * 90)

cosyvoice = AutoModel(model_dir=str(MODEL_DIR))

print("cosyvoice object:", type(cosyvoice))
print("sample_rate:", cosyvoice.sample_rate)

for attr in ["model_dir", "frontend", "model", "llm", "flow", "hift"]:
    if hasattr(cosyvoice, attr):
        try:
            print(f"{attr}:", type(getattr(cosyvoice, attr)))
        except Exception as e:
            print(f"{attr}: <error {repr(e)}>")


print()
print("=" * 90)
print("RUN OFFICIAL ZH ZERO-SHOT")
print("=" * 90)

outs = list(
    cosyvoice.inference_zero_shot(
        "八百标兵奔北坡,北坡炮兵并排跑,炮兵怕把标兵碰,标兵怕碰炮兵炮。",
        "You are a helpful assistant.<|endofprompt|>希望你以后能够做的比我还好呦。",
        str(PROMPT_WAV),
        stream=False,
        speed=1.0,
    )
)

print()
print("chunks:", len(outs))
for i, o in enumerate(outs):
    print("chunk", i, "keys:", list(o.keys()))
    t = o["tts_speech"]
    print("  shape:", tuple(t.shape))
    print("  min/max/std:", t.min().item(), t.max().item(), t.std().item())

wav = torch.cat([o["tts_speech"] for o in outs], dim=1)

print()
print("OUT")
print("shape:", tuple(wav.shape))
print("sample_rate:", cosyvoice.sample_rate)
print("duration:", wav.shape[1] / cosyvoice.sample_rate)
print("min/max/std:", wav.min().item(), wav.max().item(), wav.std().item())

audio = wav.detach().cpu().squeeze(0).numpy()
audio = np.clip(audio, -1.0, 1.0)

sf.write(str(OUTPUT_WAV), audio, cosyvoice.sample_rate)
print()
print("saved:", OUTPUT_WAV)

if HAS_IPYTHON:
    display(Audio(str(OUTPUT_WAV)))
else:
    print("Open this file manually:")
    print(OUTPUT_WAV)
'''

DIAG_SCRIPT.write_text(diag_code, encoding="utf-8")

print("Created:", DIAG_SCRIPT)


# ============================================================
# STEP 8: run diagnostic script
# ============================================================

print_header("Step 8: run diagnostic")

answer = input("Run diagnostic now? [Y/n]: ").strip().lower()

if answer in ["", "y", "yes"]:
    run([sys.executable, str(DIAG_SCRIPT)], cwd=NEW_COSYVOICE_DIR, check=True)
else:
    print()
    print("You can run it manually:")
    print(f'cd /d "{NEW_COSYVOICE_DIR}"')
    print(f'python "{DIAG_SCRIPT}"')


print_header("DONE")

print("Official repo:", NEW_COSYVOICE_DIR)
print("Model dir:", NEW_MODEL_DIR)
print("Diagnostic script:", DIAG_SCRIPT)
print()
print("Если official_zh_zero_shot.wav всё ещё мусор, значит следующий шаг — чистое conda окружение.")