Загрузка данных
import { BehaviorSubject, combineLatest, firstValueFrom, Observable, Subject, Subscription } from 'rxjs';
import { filter, map, take } from 'rxjs/operators';
import { EventManager } from '@core/EventManager';
import { SymbolSource } from '@src/core/SymbolSource';
import { Candle } from '@src/types';
import { Timeframes } from '@src/types/timeframes';
import { normalizeSymbol } from '@src/utils';
export interface DataSourceParams {
getData: (timeframe: Timeframes, symbol: string, until?: Candle) => Promise<Candle[] | null>;
eventManager: EventManager;
}
export interface RealtimeEvent {
symbol: string;
candle: Candle;
}
const EMPTY_CANDLES$ = new BehaviorSubject<Candle[]>([]);
const EMPTY_LAST_CANDLE$ = new BehaviorSubject<Candle | null>(null);
const EMPTY_REALTIME$ = new Subject<Candle>();
export class DataSource {
private readonly getData: DataSourceParams['getData'];
private readonly eventManager: EventManager;
private readonly states = new Map<string, SymbolSource>();
private readonly activeSymbols$ = new BehaviorSubject<Set<string>>(new Set());
private realtimeSub: Subscription | null = null;
private readonly subscriptions = new Subscription();
constructor({ getData, eventManager }: DataSourceParams) {
this.getData = getData;
this.eventManager = eventManager;
this.initSubscriptions();
}
public setSymbols(symbols: string[]): void {
const nextSet = new Set<string>();
for (const raw of symbols) {
const s = normalizeSymbol(raw);
if (s) nextSet.add(s);
}
for (const s of nextSet) {
if (!this.states.has(s)) {
this.ensureState(s);
}
}
for (const existingKey of this.states.keys()) {
if (!nextSet.has(existingKey)) {
this.dropState(existingKey);
}
}
this.activeSymbols$.next(nextSet);
}
public symbolsObs(): Observable<string[]> {
return this.activeSymbols$.pipe(map((set) => Array.from(set)));
}
public bindRealtime(stream$: Observable<RealtimeEvent>): void {
this.unbindRealtime();
this.realtimeSub = stream$.subscribe(({ symbol, candle }) => {
const s = normalizeSymbol(symbol);
if (s && this.activeSymbols$.value.has(s)) {
this.states.get(s)?.pushRealtime(candle);
}
});
}
public unbindRealtime(): void {
if (this.realtimeSub) {
this.realtimeSub.unsubscribe();
this.realtimeSub = null;
}
}
public subscribe(symbolRaw: string, cb: (next: Candle[]) => void): Subscription {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol) return new Subscription();
return this.ensureState(symbol).data$().subscribe(cb);
}
public subscribeRealtime(symbolRaw: string, cb: (next: Candle) => void): Subscription {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol) return new Subscription();
return this.ensureState(symbol).realtime$().subscribe(cb);
}
public data$(symbolRaw: string): Observable<Candle[]> {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? this.ensureState(symbol).data$() : EMPTY_CANDLES$.asObservable();
}
public realtime$(symbolRaw: string): Observable<Candle> {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? this.ensureState(symbol).realtime$() : EMPTY_REALTIME$.asObservable();
}
public lastCandle$(symbolRaw: string): Observable<Candle | null> {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? this.ensureState(symbol).lastCandle$() : EMPTY_LAST_CANDLE$.asObservable();
}
public getLastCandle(symbolRaw: string): Candle | null {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? (this.states.get(symbol)?.getLastValue() ?? null) : null;
}
public isReady = async (symbolRaw: string): Promise<void> => {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol) return;
const st = this.ensureState(symbol);
await firstValueFrom(st.isInitialized$().pipe(filter(Boolean), take(1)));
};
public updateRealtime(symbolRaw: string, next: Candle): void {
const symbol = normalizeSymbol(symbolRaw);
if (symbol && this.activeSymbols$.value.has(symbol)) {
this.ensureState(symbol).pushRealtime(next);
}
}
public async loadTill(symbolRaw: string, time: number): Promise<void> {
const symbol = normalizeSymbol(symbolRaw);
if (symbol && this.activeSymbols$.value.has(symbol)) {
await this.ensureState(symbol).loadTill(time);
}
}
public loadMoreHistory = async (symbolRaw: string): Promise<void> => {
const symbol = normalizeSymbol(symbolRaw);
if (symbol && this.activeSymbols$.value.has(symbol)) {
await this.ensureState(symbol).loadMoreHistory();
}
};
public loadAllHistory = async (symbolRaw: string): Promise<void> => {
const symbol = normalizeSymbol(symbolRaw);
if (!symbol || !this.activeSymbols$.value.has(symbol)) return;
const st = this.ensureState(symbol);
await st.loadAllHistory();
};
public getIsLoading(symbolRaw: string): boolean {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? (this.states.get(symbol)?.isLoadingValue() ?? false) : false;
}
public getOldestTime(symbolRaw: string): number | null {
const symbol = normalizeSymbol(symbolRaw);
return symbol ? (this.states.get(symbol)?.getOldestTime() ?? null) : null;
}
public destroy(): void {
this.unbindRealtime();
this.subscriptions.unsubscribe();
for (const key of this.states.keys()) {
this.dropState(key);
}
this.activeSymbols$.complete();
}
private initSubscriptions(): void {
this.subscriptions.add(
combineLatest([this.eventManager.getSelectedSeries()]).subscribe(() => {
this.states.forEach((st) => st.saveRealtimeCache());
}),
);
this.subscriptions.add(
this.eventManager.timeframe().subscribe((tf) => {
const symbols = Array.from(this.activeSymbols$.value);
Promise.all(
symbols.map((s) => {
const st = this.states.get(s);
return st ? st.reload(tf) : Promise.resolve();
}),
).catch((error) => console.error('[DataSource] Global timeframe reload error:', error));
}),
);
}
private ensureState(symbol: string): SymbolSource {
let st = this.states.get(symbol);
if (!st) {
st = new SymbolSource({
symbol,
getData: this.getData,
getTimeframe: () => this.eventManager.getTimeframe(),
});
this.states.set(symbol, st);
st.init();
}
return st;
}
private dropState(symbol: string): void {
const st = this.states.get(symbol);
if (st) {
st.destroy();
this.states.delete(symbol);
}
}
}
import dayjs from 'dayjs';
import { BehaviorSubject, firstValueFrom, Observable, Subject } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import { Candle } from '@src/types';
import { Timeframes } from '@src/types/timeframes';
import { ensureDefined, getStartTime, normalizeSeriesData, parseTimeframe } from '@src/utils';
export interface SymbolSourceParams {
symbol: string;
getData: (timeframe: Timeframes, symbol: string, until?: Candle) => Promise<Candle[] | null>;
getTimeframe: () => Timeframes;
}
export class SymbolSource {
public readonly symbol: string;
private readonly getData: SymbolSourceParams['getData'];
private readonly getTimeframe: SymbolSourceParams['getTimeframe'];
private readonly currentDataSubject = new BehaviorSubject<Candle[]>([]);
private readonly realtimeSubject = new Subject<Candle>();
private readonly lastCandleSubject = new BehaviorSubject<Candle | null>(null);
private readonly isLoadingSubject = new BehaviorSubject<boolean>(false);
private readonly isInitializedSubject = new BehaviorSubject<boolean>(false);
private realtimeCache: Candle[] = [];
private realtimeBuffer: Candle[] = [];
private oldestCandle: Candle | null = null;
private newestCandle: Candle | null = null;
private loadSeq = 0;
private loadingPromise: Promise<void> | null = null;
private isEndOfData = false;
constructor({ symbol, getData, getTimeframe }: SymbolSourceParams) {
this.symbol = symbol;
this.getData = getData;
this.getTimeframe = getTimeframe;
}
public init(): void {
this.reload(this.getTimeframe());
}
public data$(): Observable<Candle[]> {
return this.currentDataSubject.asObservable();
}
public realtime$(): Observable<Candle> {
return this.realtimeSubject.asObservable();
}
public lastCandle$(): Observable<Candle | null> {
return this.lastCandleSubject.asObservable();
}
public isInitialized$(): Observable<boolean> {
return this.isInitializedSubject.asObservable();
}
public isLoadingValue(): boolean {
return this.isLoadingSubject.value;
}
public getOldestTime(): number | null {
return this.oldestCandle?.time ?? null;
}
public getLastValue(): Candle | null {
return this.lastCandleSubject.value;
}
public async ready(): Promise<void> {
if (this.isInitializedSubject.value) return;
await firstValueFrom(this.isInitializedSubject.pipe(filter(Boolean), take(1)));
}
public destroy(): void {
this.loadSeq += 1;
this.loadingPromise = null;
this.currentDataSubject.complete();
this.realtimeSubject.complete();
this.lastCandleSubject.complete();
this.isLoadingSubject.complete();
this.isInitializedSubject.complete();
this.realtimeCache = [];
this.realtimeBuffer = [];
this.oldestCandle = null;
this.newestCandle = null;
}
public pushRealtime(next: Candle): void {
const tf = this.getTimeframe();
const aligned = this.alignCandle(tf, next);
const res = SymbolSource.stackCandles(tf, aligned, this.newestCandle, this.realtimeCache);
this.newestCandle = res.newestCandle;
this.realtimeCache = res.realtimeCache;
this.lastCandleSubject.next(res.newestCandle);
if (!this.isInitializedSubject.value) {
this.realtimeBuffer.push(res.newestCandle);
return;
}
this.realtimeSubject.next(res.newestCandle);
}
public saveRealtimeCache(): void {
if (this.realtimeCache.length === 0) return;
const tf = this.getTimeframe();
this.realtimeCache = this.normalizeList(tf, this.realtimeCache);
const left = this.currentDataSubject.value;
const right = this.realtimeCache;
if (left.length === 0 || right.length === 0) {
this.realtimeCache = [];
return;
}
const lastLeft = left[left.length - 1];
const firstRight = right[0];
const next =
lastLeft && firstRight && lastLeft.time === firstRight.time
? left
.slice(0, -1)
.concat(SymbolSource.combineCandles(lastLeft, firstRight, true))
.concat(right.slice(1))
: left.concat(right);
const normalizedNext = this.normalizeList(tf, next);
this.currentDataSubject.next(normalizedNext);
this.newestCandle = normalizedNext[normalizedNext.length - 1] ?? null;
this.realtimeCache = [];
this.lastCandleSubject.next(this.newestCandle);
}
public async loadMoreHistory(): Promise<void> {
await this.ready();
if (this.isEndOfData) return;
if (this.loadingPromise) {
return this.loadingPromise;
}
this.loadSeq += 1;
if (!this.oldestCandle) return;
const tf = ensureDefined(this.getTimeframe());
const task = (async () => {
this.isLoadingSubject.next(true);
try {
this.saveRealtimeCache();
if (!this.oldestCandle) return;
const olderData = await this.getData(tf, this.symbol, this.oldestCandle);
if (olderData === null) {
this.isEndOfData = true;
return;
}
const older = this.normalizeList(tf, olderData);
if (older.length === 0) return;
const current = this.currentDataSubject.value;
const combinedRaw = SymbolSource.mergeHistory(older, current);
const combined = this.normalizeList(tf, combinedRaw);
const nextOldest = combined[0] ?? null;
if (!nextOldest || !this.oldestCandle || nextOldest.time >= this.oldestCandle.time) {
return;
}
this.oldestCandle = nextOldest;
this.newestCandle = combined[combined.length - 1] ?? this.newestCandle;
this.currentDataSubject.next(combined);
this.lastCandleSubject.next(this.newestCandle);
} catch (error) {
console.error('[DataSource] Ошибка при догрузке истории:', error);
} finally {
this.isLoadingSubject.next(false);
}
})();
this.loadingPromise = task;
task.finally(() => {
if (this.loadingPromise === task) {
this.loadingPromise = null;
}
});
await task;
}
public async loadAllHistory(): Promise<void> {
await this.ready();
if (this.isEndOfData) return;
if (this.loadingPromise) {
await this.loadingPromise;
}
const seq = ++this.loadSeq;
const tf = ensureDefined(this.getTimeframe());
const task = (async () => {
this.isLoadingSubject.next(true);
try {
this.saveRealtimeCache();
let current = this.currentDataSubject.value;
let oldest = current[0] ?? null;
if (!oldest) return;
const seenOldestTimes = new Set<number>();
while (oldest) {
if (seq !== this.loadSeq) return;
if (seenOldestTimes.has(oldest.time)) {
break;
}
seenOldestTimes.add(oldest.time);
// eslint-disable-next-line no-await-in-loop
const olderData = await this.getData(tf, this.symbol, oldest);
if (seq !== this.loadSeq) return;
if (olderData === null) {
this.isEndOfData = true;
break;
}
const older = this.normalizeList(tf, olderData);
if (older.length === 0) break;
const combinedRaw = SymbolSource.mergeHistory(older, current);
const combined = this.normalizeList(tf, combinedRaw);
const nextOldest = combined[0] ?? null;
if (!nextOldest || nextOldest.time >= oldest.time) {
break;
}
oldest = nextOldest;
current = combined;
}
this.oldestCandle = current[0] ?? null;
this.newestCandle = current[current.length - 1] ?? null;
this.currentDataSubject.next(current);
this.saveRealtimeCache();
} catch (error) {
console.error('[DataSource] Ошибка при полной загрузке истории:', error);
} finally {
if (seq === this.loadSeq) {
this.isLoadingSubject.next(false);
}
}
})();
this.loadingPromise = task;
task.finally(() => {
if (this.loadingPromise === task) {
this.loadingPromise = null;
}
});
await task;
}
public async loadTill(time: number): Promise<void> {
await this.ready();
while (this.oldestCandle && this.oldestCandle.time >= time) {
const before = this.oldestCandle.time;
// eslint-disable-next-line no-await-in-loop
await this.loadMoreHistory();
if (!this.oldestCandle) break;
if (this.oldestCandle.time === before) break;
}
}
public async reload(tf: Timeframes): Promise<void> {
this.loadSeq += 1;
const seq = this.loadSeq;
this.isInitializedSubject.next(false);
this.isLoadingSubject.next(true);
this.currentDataSubject.next([]);
this.realtimeCache = [];
this.realtimeBuffer = [];
this.oldestCandle = null;
this.newestCandle = null;
this.isEndOfData = false;
this.lastCandleSubject.next(null);
const task = (async () => {
try {
const loaded = (await this.getData(tf, this.symbol)) ?? [];
if (seq !== this.loadSeq) return;
const normalized = this.normalizeList(tf, loaded);
this.oldestCandle = normalized[0] ?? null;
this.newestCandle = normalized[normalized.length - 1] ?? null;
this.currentDataSubject.next(normalized);
this.lastCandleSubject.next(this.newestCandle);
this.isInitializedSubject.next(true);
this.flushRealtimeBuffer();
} catch (error) {
console.error('[DataSource] Ошибка при загрузке данных:', error);
} finally {
if (seq === this.loadSeq) this.isLoadingSubject.next(false);
}
})();
this.loadingPromise = task;
task.finally(() => {
if (this.loadingPromise === task) this.loadingPromise = null;
});
await task;
}
private flushRealtimeBuffer(): void {
if (this.realtimeBuffer.length === 0) return;
const newestTime = this.newestCandle?.time ?? Number.NEGATIVE_INFINITY;
const filtered = this.realtimeBuffer.filter((c) => c.time >= newestTime);
const map = new Map<number, Candle>();
for (const c of filtered) {
map.set(c.time, c);
}
const unique = Array.from(map.values()).sort((a, b) => a.time - b.time);
for (const c of unique) {
this.realtimeSubject.next(c);
}
this.realtimeBuffer = [];
}
private alignCandle(tf: Timeframes, c: Candle): Candle {
const timeMS = c.time > 1e10 ? Math.floor(c.time) : Math.floor(c.time * 1000);
const startMS = getStartTime(tf, timeMS);
return { ...c, time: startMS };
}
private normalizeList(tf: Timeframes, list: Candle[]): Candle[] {
const aligned = list.map((c) => this.alignCandle(tf, c)).sort((a, b) => a.time - b.time);
return normalizeSeriesData(aligned);
}
private static stackCandles(
timeframe: Timeframes,
next: Candle,
newestCandle: Candle | null,
realtimeCache: Candle[],
): { realtimeCache: Candle[]; newestCandle: Candle } {
const { candleWidth, dayjsUnit } = parseTimeframe(timeframe);
const historicalUpdate = newestCandle
? next.time - newestCandle.time < dayjs.duration(candleWidth, dayjsUnit).as('s')
: false;
if (!historicalUpdate) {
const nextNewestCandle = { ...next, time: getStartTime(timeframe, next.time * 1000) };
realtimeCache.push(nextNewestCandle);
return { realtimeCache, newestCandle: nextNewestCandle };
}
const dataToSet = SymbolSource.combineCandles(ensureDefined(newestCandle), next);
if (realtimeCache.length === 0) {
realtimeCache.push(dataToSet);
return { realtimeCache, newestCandle: dataToSet };
}
realtimeCache[realtimeCache.length - 1] = dataToSet;
return { realtimeCache, newestCandle: dataToSet };
}
private static mergeHistory(older: Candle[], current: Candle[]): Candle[] {
if (current.length === 0) return older;
if (older.length === 0) return current;
const lastOlder = older[older.length - 1];
const firstCurrent = current[0];
if (lastOlder && firstCurrent && lastOlder.time === firstCurrent.time) {
const merged = SymbolSource.combineCandles(lastOlder, firstCurrent, true);
return older.slice(0, -1).concat(merged).concat(current.slice(1));
}
return older.concat(current);
}
private static combineCandles(left: Candle, right: Candle, volumeStacked?: boolean): Candle {
return {
time: left.time,
open: left.open,
high: Math.max(left.high, right.high),
low: Math.min(left.low, right.low),
close: right.close,
volume: volumeStacked ? (left.volume ?? 0) + (right.volume ?? 0) : (right.volume ?? 0),
};
}
}
import dayjs from 'dayjs';
import duration from 'dayjs/plugin/duration';
import { UTCTimestamp } from 'lightweight-charts';
import { BehaviorSubject } from 'rxjs';
import { Candle, parseTimeframe, Timeframes, timeframeToSeconds } from '@lib';
import { normalizeSymbol } from '@src/utils';
dayjs.extend(duration);
class DataSourceProvider {
// todo: add cache по `${symbol}${timeframe}`
static instance: DataSourceProvider | null = null;
private currentCandle: BehaviorSubject<Candle | null> = new BehaviorSubject<Candle | null>(null);
private historicalStartDate: dayjs.Dayjs;
private realtimeTimer: ReturnType<typeof setInterval> | null = null;
private realtimeLastBySymbol = new Map<string, Candle>();
constructor() {
this.historicalStartDate = dayjs(Date.now()).subtract(5, 'year');
}
static getInstance(): DataSourceProvider {
if (!this.instance) {
this.instance = new DataSourceProvider();
}
return this.instance;
}
public startRealtime(
getSymbols: () => string[],
getTimeframe: () => Timeframes,
update: (symbol: string, candle: Candle) => void,
periodMs = 500,
): () => void {
if (this.realtimeTimer) clearInterval(this.realtimeTimer);
this.realtimeTimer = setInterval(() => {
const tf = getTimeframe();
const symbols = getSymbols();
for (let i = 0; i < symbols.length; i += 1) {
const symbol = normalizeSymbol(symbols[i]);
if (!symbol) continue;
update(symbol, this.nextRealtimeCandle(tf, symbol, periodMs));
}
}, periodMs);
return () => {
if (this.realtimeTimer) clearInterval(this.realtimeTimer);
this.realtimeTimer = null;
};
}
private nextRealtimeCandle(timeframe: Timeframes, symbol: string, realtimePeriodMs: number): Candle {
const time = getStartTime(timeframe, Date.now()) as UTCTimestamp;
const prev = this.realtimeLastBySymbol.get(symbol);
const sameBar = !!prev && prev.time === time;
const basePrice = sameBar ? prev.open : prev ? prev.close : 100 + Math.random() * 20;
const open = sameBar ? prev.open : basePrice;
const close = basePrice + ((Math.random() - 0.5) * 2) / 10;
const high = Math.max(open, close) + Math.random() / 10;
const low = Math.min(open, close) - Math.random() / 10;
const { candleWidth, dayjsUnit } = parseTimeframe(timeframe);
const candleDuration = dayjs.duration(candleWidth, dayjsUnit);
const volume = Math.max(
1,
Math.floor((Math.random() * 1000000 + 100000) / (candleDuration.asMilliseconds() / realtimePeriodMs)),
);
const next: Candle = sameBar
? {
time,
open: prev.open,
high: Math.max(prev.high, high),
low: Math.min(prev.low, low),
close,
volume,
}
: {
time,
open,
high,
low,
close,
volume,
};
this.realtimeLastBySymbol.set(symbol, next);
return next;
}
/**
* Универсальная функция для генерации исторических свечей.
* @param timeframe - Таймфрейм.
* @param symbol - symbol.
* @param untilCandle - (Опционально) Свеча, ДО которой нужно генерировать данные.
*/
public async generateCandles(timeframe: Timeframes, _symbol: string, untilCandle?: Candle): Promise<Candle[] | null> {
const data: Candle[] = [];
const dataLength = 2000;
const { candleWidth, dayjsUnit } = parseTimeframe(timeframe);
const endSec = untilCandle ? untilCandle.time : (getStartTime(timeframe, Date.now()) as number);
const endTime = dayjs.unix(endSec);
const historyStartUnix = getStartTime(timeframe, this.historicalStartDate.unix() * 1000);
const historyStartTime = dayjs.unix(historyStartUnix);
if (untilCandle && untilCandle.time <= historyStartUnix) {
return null;
}
let startTime = dayjs(
getStartTime(timeframe, endTime.subtract(candleWidth * dataLength, dayjsUnit).unix() * 1000) * 1000,
);
if (startTime.isBefore(historyStartTime)) {
startTime = historyStartTime;
}
if (!startTime.isBefore(endTime)) {
return [];
}
let basePrice = untilCandle?.open ?? 100 + Math.random() * 20;
let currentTime = startTime.clone();
const limit = untilCandle ? endTime : endTime.add(candleWidth, dayjsUnit);
while (currentTime.isBefore(limit)) {
const time = currentTime.unix();
const open = basePrice;
const close = basePrice + (Math.random() - 0.5) * 2;
const high = Math.max(open, close) + Math.random();
const low = Math.min(open, close) - Math.random();
const volume = Math.floor(Math.random() * 1000000) + 100000;
data.push({
time,
open,
high,
low,
close,
volume,
});
basePrice = close;
currentTime = currentTime.add(candleWidth, dayjsUnit);
}
await delay(300);
if (this.currentCandle.value === null && data.length > 0) {
this.currentCandle.next(data[data.length - 1]);
}
if(!untilCandle){
this.realtimeLastBySymbol.set(_symbol, data[data.length - 1])
}
return data;
}
}
function delay(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
// todo: import from @lib, then this commit will be published into moex-chart
function getStartTime(timeframe: Timeframes, date: number) {
const { startOfUnit } = parseTimeframe(timeframe, date);
const sec = timeframeToSeconds(timeframe);
return Math.floor(startOfUnit.unix() / sec) * sec;
}
export const dataSourceProvider = DataSourceProvider.getInstance();