Загрузка данных
import { BehaviorSubject, combineLatest, firstValueFrom, Observable, Subject, Subscription } from 'rxjs';
import { filter, map, take } from 'rxjs/operators';
import { EventManager } from '@core/EventManager';
import { SymbolSource } from '@src/core/SymbolSource';
import { Candle } from '@src/types';
import { Timeframes } from '@src/types/timeframes';
import { normalizeSymbol } from '@src/utils';
export interface DataSourceParams {
getData: (timeframe: Timeframes, symbol: string, until?: Candle) => Promise<Candle[] | null>;
eventManager: EventManager;
}
export interface RealtimeEvent {
symbol: string;
candle: Candle;
}
const EMPTY_CANDLES$ = new BehaviorSubject<Candle[]>([]);
const EMPTY_LAST_CANDLE$ = new BehaviorSubject<Candle | null>(null);
const EMPTY_REALTIME$ = new Subject<Candle>();
export class DataSource {
private readonly getData: DataSourceParams['getData'];
private readonly eventManager: EventManager;
private readonly states = new Map<string, SymbolSource>();
private readonly activeSymbols$ = new BehaviorSubject<Set<string>>(new Set());
private realtimeSub: Subscription | null = null;
private readonly subscriptions = new Subscription();
constructor({ getData, eventManager }: DataSourceParams) {
this.getData = getData;
this.eventManager = eventManager;
this.initSubscriptions();
}
public setSymbols(symbols: string[]): void {
const nextSet = new Set<string>();
for (const raw of symbols) {
const s = normalizeSymbol(raw);
if (s) nextSet.add(s);
}
for (const s of nextSet) {
if (!this.states.has(s)) {
this.ensureState(s);
}
}
for (const existingKey of this.states.keys()) {
if (!nextSet.has(existingKey)) {
this.dropState(existingKey);
}
}
this.activeSymbols$.next(nextSet);
}
public symbolsObs(): Observable<string[]> {
return this.activeSymbols$.pipe(map((set) => Array.from(set)));
}
public bindRealtime(stream$: Observable<RealtimeEvent>): void {
this.unbindRealtime();
this.realtimeSub = stream$.subscribe(({ symbol, candle }) => {
const s = normalizeSymbol(symbol);
if (s && this.activeSymbols$.value.has(s)) {
this.states.get(s)?.pushRealtime(candle);
}
});
}
public unbindRealtime(): void {
if (this.realtimeSub) {
this.realtimeSub.unsubscribe();
this.realtimeSub = null;
}
}
public subscribe(symbolRaw: string, cb: (next: Candle[]) => void): Subscription {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol) return new Subscription();
return this.ensureState(symbol).data$().subscribe(cb);
}
public subscribeRealtime(symbolRaw: string, cb: (next: Candle) => void): Subscription {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol) return new Subscription();
return this.ensureState(symbol).realtime$().subscribe(cb);
}
public data$(symbolRaw: string): Observable<Candle[]> {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? this.ensureState(symbol).data$() : EMPTY_CANDLES$.asObservable();
}
public realtime$(symbolRaw: string): Observable<Candle> {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? this.ensureState(symbol).realtime$() : EMPTY_REALTIME$.asObservable();
}
public lastCandle$(symbolRaw: string): Observable<Candle | null> {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? this.ensureState(symbol).lastCandle$() : EMPTY_LAST_CANDLE$.asObservable();
}
public getLastCandle(symbolRaw: string): Candle | null {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? (this.states.get(symbol)?.getLastValue() ?? null) : null;
}
public isReady = async (symbolRaw: string): Promise<void> => {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol) return;
const st = this.ensureState(symbol);
await firstValueFrom(st.isInitialized$().pipe(filter(Boolean), take(1)));
};
public updateRealtime(symbolRaw: string, next: Candle): void {
const symbol = normalizeSymbol(symbolRaw);
if (symbol && this.activeSymbols$.value.has(symbol)) {
this.ensureState(symbol).pushRealtime(next);
}
}
public async loadTill(symbolRaw: string, time: number): Promise<void> {
const symbol = normalizeSymbol(symbolRaw);
if (symbol && this.activeSymbols$.value.has(symbol)) {
await this.ensureState(symbol).loadTill(time);
}
}
public loadMoreHistory = async (symbolRaw: string): Promise<void> => {
const symbol = normalizeSymbol(symbolRaw);
if (symbol && this.activeSymbols$.value.has(symbol)) {
await this.ensureState(symbol).loadMoreHistory();
}
};
public loadAllHistory = async (symbolRaw: string): Promise<void> => {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol || !this.activeSymbols$.value.has(symbol)) return;
const st = this.ensureState(symbol);
await st.loadAllHistory();
};
public getIsLoading(symbolRaw: string): boolean {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? (this.states.get(symbol)?.isLoadingValue() ?? false) : false;
}
public getOldestTime(symbolRaw: string): number | null {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? (this.states.get(symbol)?.getOldestTime() ?? null) : null;
}
public destroy(): void {
this.unbindRealtime();
this.subscriptions.unsubscribe();
for (const key of this.states.keys()) {
this.dropState(key);
}
this.activeSymbols$.complete();
}
private initSubscriptions(): void {
this.subscriptions.add(
combineLatest([this.eventManager.getSelectedSeries(), this.eventManager.getUserIndicatorsList()]).subscribe(
() => {
this.states.forEach((st) => st.saveRealtimeCache());
},
),
);
this.subscriptions.add(
this.eventManager.timeframe().subscribe((tf) => {
const symbols = Array.from(this.activeSymbols$.value);
Promise.all(
symbols.map((s) => {
const st = this.states.get(s);
return st ? st.reload(tf) : Promise.resolve();
}),
).catch((error) => console.error('[DataSource] Global timeframe reload error:', error));
}),
);
}
private ensureState(symbol: string): SymbolSource {
let st = this.states.get(symbol);
if (!st) {
st = new SymbolSource({
symbol,
getData: this.getData,
getTimeframe: () => this.eventManager.getTimeframe(),
});
this.states.set(symbol, st);
st.init();
}
return st;
}
private dropState(symbol: string): void {
const st = this.states.get(symbol);
if (st) {
st.destroy();
this.states.delete(symbol);
}
}
}