class documentation

class ModelFeatureStore: (source)

View In Hierarchy

Undocumented

Method all_predictions Undocumented
Method cached_at Loads the model features from a pre computed location
Method depends_on Returns the views and models that the model depend on to compute it's output.
Method feature_references_for Undocumented
Method features_for Returns the features the model has produced.
Method filter_predictions Undocumented
Async Method freshness Undocumented
Method has_exposed_model Undocumented
Method has_one_source_for_input_features If the input features are from the same source.
Method input_features The input features to load, based on the input features in a model.
Method input_features_for Returns the features to pass into a model
Async Method input_freshness Undocumented
Method input_request Undocumented
Async Method insert Writes data to a source defined as a output source
Async Method insert_predictions Writes data to a source defined as a output source
Method needed_entities Undocumented
Async Method overwrite Overwrites data to a source defined as a output source
Method predict_over Undocumented
Async Method prediction_freshness Undocumented
Method prediction_request Undocumented
Method predictions_between Undocumented
Method predictions_for Undocumented
Method process_features Undocumented
Method raw_string_features Undocumented
Method request Undocumented
Async Method upsert Upserts data to a source defined as a output source
Async Method upsert_predictions Upserts data to a source defined as a output source
Method using_model Makes it possible to define which model to use for predictions.
Method using_source Undocumented
Method using_version Undocumented
Method with_labels Will also load the labels for the model
Class Variable model Undocumented
Class Variable model_to_use Undocumented
Class Variable selected_version Undocumented
Class Variable store Undocumented
Property dataset_store Undocumented
Property location Undocumented
Property source Undocumented
def all_predictions(self, limit: int | None = None) -> RetrievalJob: (source)

Undocumented

def cached_at(self, location: DataFileReference) -> RetrievalJob: (source)

Loads the model features from a pre computed location

```python from aligned import FileSource

store = await FileSource.json_at("features-latest.json").feature_store()

cached_features = FileSource.parquet_at("titanic_features.parquet")

df = store.model("titanic")
.cached_at(cached_features).to_polars()

print(df.collect()) >>> ┌──────────────┬───────┬─────────┬─────────────────────┬──────────────┐ >>> │ passenger_id ┆ is_mr ┆ is_male ┆ constant_filled_age ┆ has_siblings │ >>> │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ >>> │ i32 ┆ bool ┆ bool ┆ f64 ┆ bool │ >>> ╞══════════════╪═══════╪═════════╪═════════════════════╪══════════════╡ >>> │ 1 ┆ true ┆ true ┆ 22.0 ┆ true │ >>> │ 2 ┆ false ┆ false ┆ 38.0 ┆ true │ >>> │ 3 ┆ false ┆ false ┆ 26.0 ┆ false │ >>> └──────────────┴───────┴─────────┴─────────────────────┴──────────────┘ ```

Args:
location (DataFileReference): _description_
Returns:
RetrievalJob: _description_
def depends_on(self) -> set[FeatureLocation]: (source)

Returns the views and models that the model depend on to compute it's output.

Examples:

```python @data_contract(name="passenger", ...) class Passenger:

passenger_id = Int32().as_entity()

age = Float()

@data_contract(name="location", ...) class Location:

location_id = String().as_entity()

location_area = Float()

@model_contract(name="some_model", ...) class SomeModel:

some_id = String().as_entity()

some_computed_metric = Int32()

@model_contract(

name="new_model", features=[

Passenger().age, Location().location_area, SomeModel().some_computed_metric

]

) class NewModel:

...

print(store.model("new_model").depends_on()) >>> { >>> FeatureLocation(location="feature_view", name="passenger"), >>> FeatureLocation(location="feature_view", name="location"), >>> FeatureLocation(location="model", name="some_model") >>> } ```

def feature_references_for(self, version: str) -> list[FeatureReference]: (source)

Undocumented

def features_for(self, entities: ConvertableToRetrievalJob | RetrievalJob, event_timestamp_column: str | None = None, model_version_as_entity: bool | None = None) -> RetrievalJob: (source)

Returns the features the model has produced.

```python store = await FileSource.json_at("contracts.json").feature_store()

df = store.model("titanic")
.features_for({"passenger_id": [1, 2, 3]}).to_polars()

print(df.collect()) >>> ┌──────────────┬──────────────┐ >>> │ passenger_id ┆ will_survive │ >>> │ --- ┆ --- │ >>> │ i32 ┆ bool │ >>> ╞══════════════╪══════════════╡ >>> │ 1 ┆ true │ >>> │ 2 ┆ true │ >>> │ 3 ┆ false │ >>> └──────────────┴──────────────┘ ```

Args:
entities (dict[str, list] | RetrievalJob): The entities to fetch features for
Returns:
RetrievalJob: A retrieval job that can be used to fetch the features
def filter_predictions(self, filter: FilterRepresentable) -> RetrievalJob: (source)

Undocumented

async def freshness(self) -> dict[FeatureLocation, datetime | None]: (source)

Undocumented

def has_exposed_model(self) -> bool: (source)

Undocumented

def has_one_source_for_input_features(self) -> bool: (source)

If the input features are from the same source.

This can be interesting to know in order to automatically predict over the input. E.g. predict over all data in the source.

def input_features(self) -> list[FeatureReference]: (source)

The input features to load, based on the input features in a model.

def input_features_for(self, entities: ConvertableToRetrievalJob | RetrievalJob, event_timestamp_column: str | None = None) -> RetrievalJob: (source)

Returns the features to pass into a model

```python store = await FileSource.json_at("contracts.json").feature_store()

df = store.model("titanic")
.input_features_for({"passenger_id": [1, 2, 3]}).to_polars()

print(df.collect()) >>> ┌──────────────┬───────┬─────────┬─────────────────────┬──────────────┐ >>> │ passenger_id ┆ is_mr ┆ is_male ┆ constant_filled_age ┆ has_siblings │ >>> │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ >>> │ i32 ┆ bool ┆ bool ┆ f64 ┆ bool │ >>> ╞══════════════╪═══════╪═════════╪═════════════════════╪══════════════╡ >>> │ 1 ┆ true ┆ true ┆ 22.0 ┆ true │ >>> │ 2 ┆ false ┆ false ┆ 38.0 ┆ true │ >>> │ 3 ┆ false ┆ false ┆ 26.0 ┆ false │ >>> └──────────────┴───────┴─────────┴─────────────────────┴──────────────┘ ```

Args:
entities (dict[str, list] | RetrievalJob): The entities to fetch features for
Returns:
RetrievalJob: A retrieval job that can be used to fetch the features
async def input_freshness(self) -> dict[FeatureLocation, datetime | None]: (source)

Undocumented

def input_request(self, except_features: set[str] | None = None, event_timestamp_column: str | None = None) -> FeatureRequest: (source)

Undocumented

async def insert(self, output: ConvertableToRetrievalJob | RetrievalJob): (source)

Writes data to a source defined as a output source

```python @model_contract(

name="taxi_eta", features=[...] output_source=FileSource.parquet_at("predictions.parquet")

) class TaxiEta:

trip_id = Int32().as_entity()

duration = Int32()

...

store = FeatureStore.from_dir(".")

await store.model("taxi_eta").insert({
"trip_id": [1, 2, 3, ...], "duration": [20, 33, 42, ...]

})

async def insert_predictions(self, predictions: ConvertableToRetrievalJob | RetrievalJob): (source)

Writes data to a source defined as a output source

```python @model_contract(

name="taxi_eta", features=[...] output_source=FileSource.parquet_at("predictions.parquet")

) class TaxiEta:

trip_id = Int32().as_entity()

duration = Int32()

...

store = FeatureStore.from_dir(".")

await store.model("taxi_eta").insert_predictions({
"trip_id": [1, 2, 3, ...], "duration": [20, 33, 42, ...]

})

def needed_entities(self) -> set[Feature]: (source)

Undocumented

async def overwrite(self, output: ConvertableToRetrievalJob | RetrievalJob): (source)

Overwrites data to a source defined as a output source

```python @model_contract(

name="taxi_eta", features=[...] output_source=FileSource.parquet_at("predictions.parquet")

) class TaxiEta:

trip_id = Int32().as_entity()

duration = Int32()

...

store = FeatureStore.from_dir(".")

await store.model("taxi_eta").overwrite({
"trip_id": [1, 2, 3, ...], "duration": [20, 33, 42, ...]

})

def predict_over(self, entities: ConvertableToRetrievalJob | RetrievalJob) -> PredictionJob: (source)

Undocumented

async def prediction_freshness(self) -> datetime | None: (source)

Undocumented

def prediction_request(self, exclude_features: set[str] | None = None, model_version_as_entity: bool = False) -> RetrievalRequest: (source)

Undocumented

def predictions_between(self, start_date: datetime, end_date: datetime) -> RetrievalJob: (source)

Undocumented

def predictions_for(self, entities: ConvertableToRetrievalJob | RetrievalJob, event_timestamp_column: str | None = None, model_version_as_entity: bool | None = None) -> RetrievalJob: (source)

Undocumented

def process_features(self, input: RetrievalJob | ConvertableToRetrievalJob) -> RetrievalJob: (source)

Undocumented

def raw_string_features(self, except_features: set[str]) -> set[str]: (source)

Undocumented

def request(self, except_features: set[str] | None = None, event_timestamp_column: str | None = None) -> FeatureRequest: (source)

Undocumented

async def upsert(self, output: ConvertableToRetrievalJob | RetrievalJob): (source)

Upserts data to a source defined as a output source

```python @model_contract(

name="taxi_eta", features=[...] output_source=FileSource.parquet_at("predictions.parquet")

) class TaxiEta:

trip_id = Int32().as_entity()

duration = Int32()

...

store = FeatureStore.from_dir(".")

await store.model("taxi_eta").upsert({
"trip_id": [1, 2, 3, ...], "duration": [20, 33, 42, ...]

})

async def upsert_predictions(self, predictions: ConvertableToRetrievalJob | RetrievalJob): (source)

Upserts data to a source defined as a output source

```python @model_contract(

name="taxi_eta", features=[...] output_source=FileSource.parquet_at("predictions.parquet")

) class TaxiEta:

trip_id = Int32().as_entity()

duration = Int32()

...

store = FeatureStore.from_dir(".")

await store.model("taxi_eta").upsert_predictions({
"trip_id": [1, 2, 3, ...], "duration": [20, 33, 42, ...]

})

def using_model(self, model_to_use: ExposedModel) -> ModelFeatureStore: (source)

Makes it possible to define which model to use for predictions.

def using_source(self, source: BatchDataSource) -> ModelFeatureStore: (source)

Undocumented

def using_version(self, version: str) -> ModelFeatureStore: (source)

Undocumented

def with_labels(self, label_refs: set[FeatureReference] | None = None) -> SupervisedModelFeatureStore: (source)

Will also load the labels for the model

```python store = await FileSource.json_at("features-latest.json").feature_store()

data = store.model("titanic")
.with_labels().features_for({"passenger_id": [1, 2, 3]}).to_polars()

print(data.labels.collect(), data.input.collect()) >>> ┌──────────┐ ┌───────┬─────────┬─────────────────────┬──────────────┐ >>> │ survived │ │ is_mr ┆ is_male ┆ constant_filled_age ┆ has_siblings │ >>> │ --- │ │ --- ┆ --- ┆ --- ┆ --- │ >>> │ bool │ │ bool ┆ bool ┆ f64 ┆ bool │ >>> ╞══════════╡ ╞═══════╪═════════╪═════════════════════╪══════════════╡ >>> │ false │ │ true ┆ true ┆ 22.0 ┆ true │ >>> │ true │ │ false ┆ false ┆ 38.0 ┆ true │ >>> │ true │ │ false ┆ false ┆ 26.0 ┆ false │ >>> └──────────┘ └───────┴─────────┴─────────────────────┴──────────────┘ ```

Returns:
SupervisedModelFeatureStore: A new queryable feature store
model: ModelSchema = (source)

Undocumented

model_to_use: ExposedModel | None = (source)

Undocumented

selected_version: str | None = (source)

Undocumented

store: ContractStore = (source)

Undocumented

dataset_store: DatasetStore | None = (source)

Undocumented

location: FeatureLocation = (source)

Undocumented