Files
PHANTOM/experiments/procesing/context.py
Daniel Alves Rösel ad9423bf59 Airflow addition (#28)
* introducing airflow to run pipeline

* chore: updating dag with upload to registry

* introducing complete provider (non refactored and noisy)

* chore: removing old shit

* generic pricing baselines

* feature: super simple model registry (to be updated maybe third party OS software)

* chore: refactoring the providers docker config and requirements

* chore: refactored and broke down components (braking

* exporting all

* local pipeline excution working

* fix: fixing import structures from nonrelativistic

* chore: enables cross comm pickling with fully e2e pipeline compilation

* docs: what the pipeline is like now

* pipelines local running and pipeline high level definition

* cleaning old pipeline and vectorization

* leaked but fixing, not so important

* test: started with pipeline step testing

* chore: cleaning up provider of prices

* test: extra tests wit hsemantic meaning checks

* migrating pricers

* feature: introducing pricing predictors (pricers)

* chore: e2e is done with new pipeline

* extra session feature extraction

* feature: experiemntal sessin pricer and metrics(vibe)

* chore: redefined and connected pricers (#29)
2025-11-29 17:50:16 +01:00

35 lines
1.1 KiB
Python

from typing import Any, Dict
import pandas as pd
from procesing.providers.base import DataProvider
class PipelineContext:
"""
Context for pipeline execution holding config, provider, and cached data.
Enables dependency injection and eliminates global state.
"""
def __init__(self,
provider: DataProvider,
store_mode: str,
window_size: str = '30s',
**config):
self.provider = provider
self.store_mode = store_mode
self.window_size = window_size
self.config = config
self._cache: Dict[str, Any] = {}
def get_cached(self, key: str, default=None):
return self._cache.get(key, default)
def cache(self, key: str, value):
self._cache[key] = value
return value
@property
def products(self) -> pd.DataFrame:
"""Lazy-load and cache product catalog, single fetch per pipeline run"""
if 'products' not in self._cache:
self._cache['products'] = self.provider.fetch_products(self.store_mode)
return self._cache['products']