class documentation

class RetrievalJob(ABC): (source)

Known subclasses: aligned.local.job.FileDateJob, aligned.local.job.FileFactualJob, aligned.local.job.FileFullJob, aligned.local.job.LiteralRetrievalJob, aligned.psql.jobs.FactPsqlJob, aligned.psql.jobs.PostgreSqlJob, aligned.redis.job.FactualRedisJob, aligned.redshift.jobs.FactRedshiftJob, aligned.redshift.sql_job.RedshiftSqlJob, aligned.retrieval_job.AggregateJob, aligned.retrieval_job.CombineFactualJob, aligned.retrieval_job.CustomLazyPolarsJob, aligned.retrieval_job.CustomPolarsJob, aligned.retrieval_job.DerivedFeatureJob, aligned.retrieval_job.DropDuplicateEntities, aligned.retrieval_job.DropInvalidJob, aligned.retrieval_job.EncodeDatesJob, aligned.retrieval_job.EnsureTypesJob, aligned.retrieval_job.FileCachedJob, aligned.retrieval_job.FillMissingColumnsJob, aligned.retrieval_job.FilteredJob, aligned.retrieval_job.InMemoryCacheJob, aligned.retrieval_job.InMemorySplitCacheJob, aligned.retrieval_job.JoinAsofJob, aligned.retrieval_job.JoinJobs, aligned.retrieval_job.ListenForTriggers, aligned.retrieval_job.LiteralDictJob, aligned.retrieval_job.LoadedAtJob, aligned.retrieval_job.LogJob, aligned.retrieval_job.OnLoadJob, aligned.retrieval_job.PredictionJob, aligned.retrieval_job.RawFileCachedJob, aligned.retrieval_job.RenameJob, aligned.retrieval_job.ReturnInvalidJob, aligned.retrieval_job.SelectColumnsJob, aligned.retrieval_job.SqlJob, aligned.retrieval_job.StackJob, aligned.retrieval_job.StreamAggregationJob, aligned.retrieval_job.SubsetJob, aligned.retrieval_job.TimeMetricLoggerJob, aligned.retrieval_job.UniqueRowsJob, aligned.retrieval_job.UnpackEmbedding, aligned.retrieval_job.UpdateVectorIndexJob, aligned.retrieval_job.ValidateEntitiesJob, aligned.retrieval_job.WithRequests, aligned.sources.databricks.UnityCatalogTableAllJob

Constructors: RetrievalJob.from_convertable(data, request), RetrievalJob.from_dict(data, request), RetrievalJob.from_lazy_function(callable, request), RetrievalJob.from_polars_df(df, request)

View In Hierarchy

Undocumented

Static Method from_convertable Undocumented
Static Method from_dict Undocumented
Static Method from_lazy_function Undocumented
Static Method from_polars_df Undocumented
Method aggregate Undocumented
Method cache_raw_data Undocumented
Method cached_at Undocumented
Method chunked Undocumented
Method combined_features Undocumented
Method derive_features Undocumented
Method describe Undocumented
Method drop_duplicate_entities Undocumented
Method drop_invalid Drops invalid row based on the defined features.
Method ensure_types Undocumented
Method fill_missing_columns Undocumented
Method filter Filters based on a condition referencing either a feature, a feature name, or an polars expression to filter on.
Method ignore_event_timestamp Undocumented
Method inject_store Undocumented
Method join Undocumented
Method join_asof Undocumented
Method listen_to_events Undocumented
Method log_each_job Undocumented
Method monitor_time_used Undocumented
Method on_load Undocumented
Method polars_method Undocumented
Method remove_derived_features Undocumented
Method rename Undocumented
Method return_invalid Undocumented
Method select Undocumented
Method select_columns Undocumented
Method split Undocumented
Async Method to_lazy_polars Undocumented
Async Method to_pandas Undocumented
Async Method to_polars Undocumented
Method train_test Undocumented
Method train_test_validate Undocumented
Method transform_polars Undocumented
Method unique_entities Undocumented
Method unique_on Undocumented
Method unpack_embeddings Undocumented
Method update_vector_index Undocumented
Method validate_entites Undocumented
Method with_request Undocumented
Method with_subfeatures Undocumented
Method without_derived_features Undocumented
Async Method write_to_source Writes the output of the retrieval job to the passed source.
Property loaded_columns Undocumented
Property request_result Undocumented
Property retrieval_requests Undocumented
@staticmethod
def from_convertable(data: ConvertableToRetrievalJob, request: list[RetrievalRequest] | RetrievalRequest | FeatureRequest) -> RetrievalJob: (source)

Undocumented

@staticmethod
def from_dict(data: dict[str, list], request: list[RetrievalRequest] | RetrievalRequest) -> RetrievalJob: (source)

Undocumented

@staticmethod
def from_lazy_function(callable: Callable[[], Coroutine[None, None, pl.LazyFrame]], request: RetrievalRequest) -> RetrievalJob: (source)

Undocumented

@staticmethod
def from_polars_df(df: pl.DataFrame | pl.LazyFrame, request: list[RetrievalRequest]) -> RetrievalJob: (source)

Undocumented

def aggregate(self, request: RetrievalRequest) -> RetrievalJob: (source)

Undocumented

def cache_raw_data(self, location: DataFileReference | str) -> RetrievalJob: (source)

Undocumented

def chunked(self, size: int) -> DataLoaderJob: (source)

Undocumented

def combined_features(self, requests: list[RetrievalRequest] | None = None) -> RetrievalJob: (source)

Undocumented

def derive_features(self, requests: list[RetrievalRequest] | None = None) -> RetrievalJob: (source)

Undocumented

def drop_duplicate_entities(self) -> RetrievalJob: (source)

Undocumented

def drop_invalid(self, validator: Validator | None = None) -> RetrievalJob: (source)

Drops invalid row based on the defined features.

```python @feature_view(...) class WhiteWine:

wine_id = UInt64().as_entity()

quality = Int32().lower_bound(1).upper_bound(10)

valid_wines = WhiteWine.drop_invalid({
"wine_id": [0, 1, 2, 3, 4], "quality": [None, 4, 8, 20, -10]

})

print(valid_wines) >>> {

"wine_id": [1, 2], "quality": [4, 8]

}

Args:
validator (Validator): A validator that can validate the data.
The default uses the PolarsValidator
Returns:
RetrievalJob: A new retrieval job with only valid rows.
def ensure_types(self, requests: list[RetrievalRequest] | None = None, date_formatter: DateFormatter | None = None) -> RetrievalJob: (source)

Undocumented

def fill_missing_columns(self) -> RetrievalJob: (source)

Undocumented

def filter(self, condition: str | Feature | DerivedFeature | pl.Expr) -> RetrievalJob: (source)

Filters based on a condition referencing either a feature, a feature name, or an polars expression to filter on.

def inject_store(self, store: ContractStore) -> RetrievalJob: (source)
def join(self, job: RetrievalJob, method: Literal['inner', 'left', 'outer'], left_on: str | list[str], right_on: str | list[str]) -> RetrievalJob: (source)

Undocumented

def join_asof(self, job: RetrievalJob, left_event_timestamp: str | None = None, right_event_timestamp: str | None = None, left_on: str | list[str] | None = None, right_on: str | list[str] | None = None, timestamp_unit: TimeUnit = 'us') -> RetrievalJob: (source)

Undocumented

def listen_to_events(self, events: set[EventTrigger]) -> RetrievalJob: (source)

Undocumented

def monitor_time_used(self, callback: Callable[[float], None]) -> RetrievalJob: (source)

Undocumented

def on_load(self, on_load: Callable[[], Coroutine[Any, Any, None]]) -> RetrievalJob: (source)

Undocumented

def polars_method(self, polars_method: Callable[[pl.LazyFrame], pl.LazyFrame]) -> RetrievalJob: (source)

Undocumented

def rename(self, mappings: dict[str, str]) -> RetrievalJob: (source)

Undocumented

def return_invalid(self, should_return_validation: bool | None = None) -> RetrievalJob: (source)

Undocumented

def select(self, include_features: Collection[str]) -> RetrievalJob: (source)

Undocumented

def select_columns(self, include_features: Collection[str]) -> RetrievalJob: (source)

Undocumented

def split(self, splitter: Callable[[pl.DataFrame], tuple[pl.DataFrame, pl.DataFrame]], dataset_sizes: tuple[float, float]) -> tuple[RetrievalJob, RetrievalJob]: (source)

Undocumented

@abstractmethod
async def to_lazy_polars(self) -> pl.LazyFrame: (source)
overridden in aligned.local.job.FileDateJob, aligned.local.job.FileFactualJob, aligned.local.job.FileFullJob, aligned.local.job.LiteralRetrievalJob, aligned.psql.jobs.FactPsqlJob, aligned.psql.jobs.PostgreSqlJob, aligned.redis.job.FactualRedisJob, aligned.redshift.jobs.FactRedshiftJob, aligned.redshift.sql_job.RedshiftSqlJob, aligned.retrieval_job.AggregateJob, aligned.retrieval_job.CombineFactualJob, aligned.retrieval_job.CustomLazyPolarsJob, aligned.retrieval_job.CustomPolarsJob, aligned.retrieval_job.DerivedFeatureJob, aligned.retrieval_job.DropDuplicateEntities, aligned.retrieval_job.DropInvalidJob, aligned.retrieval_job.EncodeDatesJob, aligned.retrieval_job.EnsureTypesJob, aligned.retrieval_job.FileCachedJob, aligned.retrieval_job.FillMissingColumnsJob, aligned.retrieval_job.FilteredJob, aligned.retrieval_job.InMemoryCacheJob, aligned.retrieval_job.InMemorySplitCacheJob, aligned.retrieval_job.JoinAsofJob, aligned.retrieval_job.JoinJobs, aligned.retrieval_job.ListenForTriggers, aligned.retrieval_job.LiteralDictJob, aligned.retrieval_job.LoadedAtJob, aligned.retrieval_job.LogJob, aligned.retrieval_job.OnLoadJob, aligned.retrieval_job.PredictionJob, aligned.retrieval_job.RawFileCachedJob, aligned.retrieval_job.RenameJob, aligned.retrieval_job.ReturnInvalidJob, aligned.retrieval_job.SelectColumnsJob, aligned.retrieval_job.StackJob, aligned.retrieval_job.StreamAggregationJob, aligned.retrieval_job.SubsetJob, aligned.retrieval_job.TimeMetricLoggerJob, aligned.retrieval_job.UniqueRowsJob, aligned.retrieval_job.UnpackEmbedding, aligned.retrieval_job.UpdateVectorIndexJob, aligned.retrieval_job.ValidateEntitiesJob, aligned.retrieval_job.WithRequests, aligned.sources.databricks.UnityCatalogTableAllJob

Undocumented

@abstractmethod
async def to_pandas(self) -> pd.DataFrame: (source)
overridden in aligned.local.job.FileDateJob, aligned.local.job.FileFactualJob, aligned.local.job.FileFullJob, aligned.local.job.LiteralRetrievalJob, aligned.psql.jobs.FactPsqlJob, aligned.psql.jobs.PostgreSqlJob, aligned.redis.job.FactualRedisJob, aligned.redshift.jobs.FactRedshiftJob, aligned.redshift.sql_job.RedshiftSqlJob, aligned.retrieval_job.AggregateJob, aligned.retrieval_job.CombineFactualJob, aligned.retrieval_job.CustomLazyPolarsJob, aligned.retrieval_job.CustomPolarsJob, aligned.retrieval_job.DerivedFeatureJob, aligned.retrieval_job.DropDuplicateEntities, aligned.retrieval_job.DropInvalidJob, aligned.retrieval_job.EncodeDatesJob, aligned.retrieval_job.EnsureTypesJob, aligned.retrieval_job.FileCachedJob, aligned.retrieval_job.FillMissingColumnsJob, aligned.retrieval_job.FilteredJob, aligned.retrieval_job.InMemoryCacheJob, aligned.retrieval_job.InMemorySplitCacheJob, aligned.retrieval_job.JoinAsofJob, aligned.retrieval_job.JoinJobs, aligned.retrieval_job.ListenForTriggers, aligned.retrieval_job.LiteralDictJob, aligned.retrieval_job.LoadedAtJob, aligned.retrieval_job.LogJob, aligned.retrieval_job.OnLoadJob, aligned.retrieval_job.PredictionJob, aligned.retrieval_job.RawFileCachedJob, aligned.retrieval_job.RenameJob, aligned.retrieval_job.ReturnInvalidJob, aligned.retrieval_job.SelectColumnsJob, aligned.retrieval_job.StackJob, aligned.retrieval_job.StreamAggregationJob, aligned.retrieval_job.SubsetJob, aligned.retrieval_job.TimeMetricLoggerJob, aligned.retrieval_job.UniqueRowsJob, aligned.retrieval_job.UnpackEmbedding, aligned.retrieval_job.UpdateVectorIndexJob, aligned.retrieval_job.ValidateEntitiesJob, aligned.retrieval_job.WithRequests, aligned.sources.databricks.UnityCatalogTableAllJob

Undocumented

async def to_polars(self) -> pl.DataFrame: (source)

Undocumented

def train_test(self, train_size: float, target_column: str, splitter_factory: Callable[[SplitConfig], SplitterCallable] | None = None) -> TrainTestJob: (source)

Undocumented

def train_test_validate(self, train_size: float, validate_size: float, target_column: str, splitter_factory: Callable[[SplitConfig], SplitterCallable] | None = None, should_filter_out_null_targets: bool = True) -> TrainTestValidateJob: (source)

Undocumented

def transform_polars(self, polars_method: CustomPolarsTransform) -> RetrievalJob: (source)

Undocumented

def unique_entities(self) -> RetrievalJob: (source)

Undocumented

def unique_on(self, unique_on: list[str], sort_key: str | None = None, descending: bool = True) -> RetrievalJob: (source)

Undocumented

def unpack_embeddings(self) -> RetrievalJob: (source)

Undocumented

def update_vector_index(self, indexes: list[VectorIndex]) -> RetrievalJob: (source)

Undocumented

def validate_entites(self) -> RetrievalJob: (source)

Undocumented

def with_request(self, requests: list[RetrievalRequest]) -> RetrievalJob: (source)

Undocumented

def without_derived_features(self) -> RetrievalJob: (source)

Undocumented

async def write_to_source(self, source: WritableFeatureSource | DataFileReference): (source)

Writes the output of the retrieval job to the passed source.

```python redis_cluster = RedisConfig.localhost()

store = FeatureStore.from_dir(".")

await (store.model("taxi")
.all_predictions() # Reads predictions from a prediction_source .write_to_source(redis_cluster)

)

```

Args:
source (WritableFeatureSource): A source that we can write to.