Загрузка данных


import { BehaviorSubject, combineLatest, firstValueFrom, Observable, Subject, Subscription } from 'rxjs';
import { filter, map, take } from 'rxjs/operators';

import { EventManager } from '@core/EventManager';
import { SymbolSource } from '@src/core/SymbolSource';
import { Candle } from '@src/types';
import { Timeframes } from '@src/types/timeframes';
import { normalizeSymbol } from '@src/utils';

export interface DataSourceParams {
  getData: (timeframe: Timeframes, symbol: string, until?: Candle) => Promise<Candle[] | null>;
  eventManager: EventManager;
}

export interface RealtimeEvent {
  symbol: string;
  candle: Candle;
}

const EMPTY_CANDLES$ = new BehaviorSubject<Candle[]>([]);
const EMPTY_LAST_CANDLE$ = new BehaviorSubject<Candle | null>(null);
const EMPTY_REALTIME$ = new Subject<Candle>();

export class DataSource {
  private readonly getData: DataSourceParams['getData'];
  private readonly eventManager: EventManager;

  private readonly states = new Map<string, SymbolSource>();
  private readonly activeSymbols$ = new BehaviorSubject<Set<string>>(new Set());

  private realtimeSub: Subscription | null = null;
  private readonly subscriptions = new Subscription();

  constructor({ getData, eventManager }: DataSourceParams) {
    this.getData = getData;
    this.eventManager = eventManager;
    this.initSubscriptions();
  }

  public setSymbols(symbols: string[]): void {
    const nextSet = new Set<string>();

    for (const raw of symbols) {
      const s = normalizeSymbol(raw);
      if (s) nextSet.add(s);
    }

    for (const s of nextSet) {
      if (!this.states.has(s)) {
        this.ensureState(s);
      }
    }

    for (const existingKey of this.states.keys()) {
      if (!nextSet.has(existingKey)) {
        this.dropState(existingKey);
      }
    }

    this.activeSymbols$.next(nextSet);
  }

  public symbolsObs(): Observable<string[]> {
    return this.activeSymbols$.pipe(map((set) => Array.from(set)));
  }

  public bindRealtime(stream$: Observable<RealtimeEvent>): void {
    this.unbindRealtime();

    this.realtimeSub = stream$.subscribe(({ symbol, candle }) => {
      const s = normalizeSymbol(symbol);
      if (s && this.activeSymbols$.value.has(s)) {
        this.states.get(s)?.pushRealtime(candle);
      }
    });
  }

  public unbindRealtime(): void {
    if (this.realtimeSub) {
      this.realtimeSub.unsubscribe();
      this.realtimeSub = null;
    }
  }

  public subscribe(symbolRaw: string, cb: (next: Candle[]) => void): Subscription {
    const symbol = normalizeSymbol(symbolRaw);
    if (!symbol) return new Subscription();
    return this.ensureState(symbol).data$().subscribe(cb);
  }

  public subscribeRealtime(symbolRaw: string, cb: (next: Candle) => void): Subscription {
    const symbol = normalizeSymbol(symbolRaw);
    if (!symbol) return new Subscription();
    return this.ensureState(symbol).realtime$().subscribe(cb);
  }

  public data$(symbolRaw: string): Observable<Candle[]> {
    const symbol = normalizeSymbol(symbolRaw);
    return symbol ? this.ensureState(symbol).data$() : EMPTY_CANDLES$.asObservable();
  }

  public realtime$(symbolRaw: string): Observable<Candle> {
    const symbol = normalizeSymbol(symbolRaw);
    return symbol ? this.ensureState(symbol).realtime$() : EMPTY_REALTIME$.asObservable();
  }

  public lastCandle$(symbolRaw: string): Observable<Candle | null> {
    const symbol = normalizeSymbol(symbolRaw);
    return symbol ? this.ensureState(symbol).lastCandle$() : EMPTY_LAST_CANDLE$.asObservable();
  }

  public getLastCandle(symbolRaw: string): Candle | null {
    const symbol = normalizeSymbol(symbolRaw);
    return symbol ? (this.states.get(symbol)?.getLastValue() ?? null) : null;
  }

  public isReady = async (symbolRaw: string): Promise<void> => {
    const symbol = normalizeSymbol(symbolRaw);
    if (!symbol) return;
    const st = this.ensureState(symbol);
    await firstValueFrom(st.isInitialized$().pipe(filter(Boolean), take(1)));
  };

  public updateRealtime(symbolRaw: string, next: Candle): void {
    const symbol = normalizeSymbol(symbolRaw);
    if (symbol && this.activeSymbols$.value.has(symbol)) {
      this.ensureState(symbol).pushRealtime(next);
    }
  }

  public async loadTill(symbolRaw: string, time: number): Promise<void> {
    const symbol = normalizeSymbol(symbolRaw);
    if (symbol && this.activeSymbols$.value.has(symbol)) {
      await this.ensureState(symbol).loadTill(time);
    }
  }

  public loadMoreHistory = async (symbolRaw: string): Promise<void> => {
    const symbol = normalizeSymbol(symbolRaw);
    if (symbol && this.activeSymbols$.value.has(symbol)) {
      await this.ensureState(symbol).loadMoreHistory();
    }
  };

  public loadAllHistory = async (symbolRaw: string): Promise<void> => {
    const symbol = normalizeSymbol(symbolRaw);
    if (!symbol || !this.activeSymbols$.value.has(symbol)) return;

    const st = this.ensureState(symbol);
    await st.loadAllHistory();
  };

  public getIsLoading(symbolRaw: string): boolean {
    const symbol = normalizeSymbol(symbolRaw);
    return symbol ? (this.states.get(symbol)?.isLoadingValue() ?? false) : false;
  }

  public getOldestTime(symbolRaw: string): number | null {
    const symbol = normalizeSymbol(symbolRaw);
    return symbol ? (this.states.get(symbol)?.getOldestTime() ?? null) : null;
  }

  public destroy(): void {
    this.unbindRealtime();
    this.subscriptions.unsubscribe();
    for (const key of this.states.keys()) {
      this.dropState(key);
    }
    this.activeSymbols$.complete();
  }

  private initSubscriptions(): void {
    this.subscriptions.add(
      combineLatest([this.eventManager.getSelectedSeries(), this.eventManager.getUserIndicatorsList()]).subscribe(
        () => {
          this.states.forEach((st) => st.saveRealtimeCache());
        },
      ),
    );

    this.subscriptions.add(
      this.eventManager.timeframe().subscribe((tf) => {
        const symbols = Array.from(this.activeSymbols$.value);

        Promise.all(
          symbols.map((s) => {
            const st = this.states.get(s);
            return st ? st.reload(tf) : Promise.resolve();
          }),
        ).catch((error) => console.error('[DataSource] Global timeframe reload error:', error));
      }),
    );
  }

  private ensureState(symbol: string): SymbolSource {
    let st = this.states.get(symbol);
    if (!st) {
      st = new SymbolSource({
        symbol,
        getData: this.getData,
        getTimeframe: () => this.eventManager.getTimeframe(),
      });
      this.states.set(symbol, st);
      st.init();
    }
    return st;
  }

  private dropState(symbol: string): void {
    const st = this.states.get(symbol);
    if (st) {
      st.destroy();
      this.states.delete(symbol);
    }
  }
}