Hamilton Global User Group February Meetup¶

How we migrated our features calculation to Hamilton¶

drawing

Outline¶

  • Introduction to machine learning and feature engineering
  • Naive implementation of feature calculation
  • Switching to Hamilton
  • 2 advanced features of Hamilton
  • Hamilton tips

About Me¶

Arthur Andres, software engineer at Tradewell Technologies www.tradewelltech.co/

tradewell_logo

Find me on github as 0x26res www.github.com/0x26res

Machine learning & feature engineering¶

No description has been provided for this image

Key concepts¶

Prediction request¶

A prediction request is an event for which we want to predict, here a timestamp and a location.

timestamp location
2024-02-20 17:30:00+00:00 London
2024-02-20 17:45:00+00:00 San Francisco

Features¶

A feature is scalar/value given to the model as input to help with the prediction

Feature views¶

Each feature view represents a logical group of time series of features.

timestamp location traffic
2024-02-20 16:00:00+00:00 London BAD
2024-02-20 16:00:00+00:00 San Francisco GOOD
timestamp location electric self_driving convertible
2024-02-20 17:00:00+00:00 London 1000 0 1000
2024-02-20 17:00:00+00:00 San Francisco 3000 3000 50

Feature "point-in-time" join¶

timestamp location traffic electric self_driving convertible
2024-02-20 17:30:00+00:00 London BAD 1000 0 1000
2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50

Simple feature calculation job¶

No description has been provided for this image

A bit of configuration first¶

data_location = "./dev"
date = datetime.date(2024, 2, 20)

Loading features¶

def load_feature_view_for_date(
    feature_view_name: str,
    date: datetime.date,
    data_location: str,
) -> pd.DataFrame:
    file_name = f"{data_location}/feature/{feature_view_name}/{date:%Y-%m-%d}.pq"
    return pd.read_parquet(file_name)
prediction_requests = load_feature_view_for_date(
    "prediction_requests",
    date,
    data_location,
)
traffic_condition = load_feature_view_for_date(
    "traffic_condition",
    date,
    data_location,
)
taxi_availability = load_feature_view_for_date(
    "taxi_availability",
    date,
    data_location,
)

Putting features together¶

def join_features(
    predictions: pd.DataFrame, *feature_views: pd.DataFrame
) -> pd.DataFrame:
    results = predictions
    for feature_view in feature_views:
        results = pd.merge_asof(
            results,
            feature_view,
            on="timestamp",
            by="location",
        )
    return results


join_features(prediction_requests, traffic_condition, taxi_availability)
timestamp location traffic electric self_driving convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50

Adding a more complicated feature¶

No description has been provided for this image
weather = load_feature_view_for_date("weather", date, data_location)
timestamp location weather
2024-02-20 12:00:00+00:00 London rain
2024-02-20 12:00:00+00:00 San Francisco fog
def calculate_brand_new_features(
    weather: pd.DataFrame,
    taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
    return pd.merge_asof(
        taxi_availability,
        weather,
        on="timestamp",
        by="location",
    ).assign(
        actual_convertible=lambda x: x["convertible"].where(
            x["weather"] != "rain",
            0.0,
        )
    )[
        ["timestamp", "location", "actual_convertible"]
    ]


brand_new_features = calculate_brand_new_features(weather, taxi_availability)
timestamp location actual_convertible
2024-02-20 17:00:00+00:00 London 0
2024-02-20 17:00:00+00:00 San Francisco 50
join_features(
    prediction_requests,
    traffic_condition,
    taxi_availability,
    brand_new_features,
)
timestamp location traffic electric self_driving convertible actual_convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000 0
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50 50

But only in dev...¶

is_dev = os.environ["ENV"] == "dev"

if is_dev:
    weather = load_feature_view_for_date("weather", date, data_location)
    brand_new_features = calculate_brand_new_features(weather, taxi_availability)
# ... later
(
    join_features(
        prediction_requests,
        traffic_condition,
        taxi_availability,
        brand_new_features,
    )
    if is_dev
    else join_features(
        prediction_requests,
        traffic_condition,
        taxi_availability,
    )
)
timestamp location traffic electric self_driving convertible actual_convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000 0
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50 50
No description has been provided for this image
No description has been provided for this image

Limitations¶

  • a lot of imperative code
  • hard to add and test new features
  • feature flag == 🚩

Moving to Hamilton¶

Reorganizing our code¶

load_features.py¶

import datetime

import pandas as pd

from util import load_feature_view_for_date


def prediction_requests(
    date: datetime.date,
    data_location: str,
) -> pd.DataFrame:
    return load_feature_view_for_date(
        "prediction_requests",
        date,
        data_location,
    )


def traffic_condition(date: datetime.date, data_location: str) -> pd.DataFrame:
    return load_feature_view_for_date("traffic_condition", date, data_location)


def taxi_availability(date: datetime.date, data_location: str) -> pd.DataFrame:
    return load_feature_view_for_date("taxi_availability", date, data_location)

combine_features.py¶

import pandas as pd

from util import join_features


def all_features(
    prediction_requests: pd.DataFrame,
    traffic_condition: pd.DataFrame,
    taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
    return join_features(
        prediction_requests,
        traffic_condition,
        taxi_availability,
    )

Executing our DAG¶

import hamilton
from hamilton.driver import Builder
import combine_features, load_features

driver = (
    hamilton.driver.Builder()
    .with_modules(
        load_features,
        combine_features,
    )
    .build()
)
No description has been provided for this image
driver.execute(
    inputs={"date": date, "data_location": data_location},
    final_vars={"all_features"},
)["all_features"]
timestamp location traffic electric self_driving convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50

Takeaway¶

Pros:

  • Better organized code
  • Dependency injection of resources / config (data_location)
  • Nice graph, great for documentation

Cons:

  • Some boiler plate function definition in load_features.py
  • Can't configure the inputs of all_features

Reuse a function with parameterize¶

Issue: a lot of duplicated function for the sake of integrating with hamilton

import datetime

import pandas as pd

from util import load_feature_view_for_date


def prediction_requests(
    date: datetime.date,
    data_location: str,
) -> pd.DataFrame:
    return load_feature_view_for_date(
        "prediction_requests",
        date,
        data_location,
    )


def traffic_condition(date: datetime.date, data_location: str) -> pd.DataFrame:
    return load_feature_view_for_date("traffic_condition", date, data_location)


def taxi_availability(date: datetime.date, data_location: str) -> pd.DataFrame:
    return load_feature_view_for_date("taxi_availability", date, data_location)

Solution: use parameterize

import datetime


import pandas as pd
from hamilton.function_modifiers import parameterize, value
from util import load_feature_view_for_date


@parameterize(
    prediction_requests={"feature_view_name": value("prediction_requests")},
    traffic_condition={"feature_view_name": value("traffic_condition")},
    taxi_availability={"feature_view_name": value("taxi_availability")},
    weather={"feature_view_name": value("weather")},
)
def parameterized_load_feature_view_for_date(
    feature_view_name: str,
    date: datetime.date,
    data_location: str,
) -> pd.DataFrame:
    return load_feature_view_for_date(feature_view_name, date, data_location)
import hamilton
from hamilton.driver import Builder
import combine_features, load_features_2

driver = (
    hamilton.driver.Builder()
    .with_modules(
        load_features_2,
        combine_features,
    )
    .build()
)
driver.execute(
    inputs={"date": date, "data_location": data_location},
    final_vars={"all_features"},
)["all_features"]
timestamp location traffic electric self_driving convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50
No description has been provided for this image

Selecting inputs of a node based on configuration with resolve¶

Issue: I would like to select the inputs of all_features

import pandas as pd

from util import join_features


def all_features(
    prediction_requests: pd.DataFrame,
    traffic_condition: pd.DataFrame,
    taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
    return join_features(
        prediction_requests,
        traffic_condition,
        taxi_availability,
    )

Solution: use resolve to select function inputs are run time based on config

import pandas as pd
from hamilton.function_modifiers import (
    ResolveAt,
    group,
    inject,
    resolve,
    source,
)

from util import join_features


@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda feature_views: inject(
        feature_views=group(
            **{feature_view: source(feature_view) for feature_view in feature_views}
        )
    ),
)
def all_features(
    prediction_requests: pd.DataFrame,
    feature_views: dict[str, pd.DataFrame],
) -> pd.DataFrame:
    return join_features(prediction_requests, *feature_views.values())
import combine_features_2, load_features_2
from hamilton import settings

feature_views = ["traffic_condition", "taxi_availability"]

driver = (
    hamilton.driver.Builder()
    .with_modules(load_features_2, combine_features_2)
    .with_config(
        {"feature_views": feature_views, settings.ENABLE_POWER_USER_MODE: True}
    )
    .build()
)
timestamp location traffic electric self_driving convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50
No description has been provided for this image

Adding a new feature view easily¶

import pandas as pd


def brand_new_features(
    weather: pd.DataFrame,
    taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
    return pd.merge_asof(
        taxi_availability,
        weather,
        on="timestamp",
        by="location",
    ).assign(
        actual_convertible=lambda x: x["convertible"].where(x["weather"] == "rain", 0.0)
    )[
        ["timestamp", "location", "actual_convertible"]
    ]
import brand_new_features

feature_views = ["traffic_condition", "taxi_availability", "brand_new_features"]

driver = (
    hamilton.driver.Builder()
    .with_modules(
        load_features_2,
        combine_features_2,
        brand_new_features,
    )
    .with_config(
        {"feature_views": feature_views, settings.ENABLE_POWER_USER_MODE: True}
    )
    .build()
)
timestamp location traffic electric self_driving convertible actual_convertible
0 2024-02-20 17:30:00+00:00 London BAD 1000 0 1000 1000
1 2024-02-20 17:45:00+00:00 San Francisco GOOD 3000 3000 50 0
No description has been provided for this image
feature_views = ["traffic_condition", "taxi_availability"]
No description has been provided for this image

Our experience migrating to Hamilton¶

  • Don't try to reimplement it yourself, there's a lot to it

features

  • Great error message from the library

features

  • Very responsive Q&A on slack

https://join.slack.com/t/hamilton-opensource/

slack

  • It's a mindset to get used to
  • 80/20 rule

80% of the time you'll need plain hamilton (one function -> one node in the dag)

20% of the time you'll need some advanced features with decorators

  • It doesn't have to intrude on your code

separate your core code from the nodes function in hamilton

import datetime
import pandas as pd


def load_feature_view_for_date(
    feature_view_name: str,
    date: datetime.date,
    data_location: str,
) -> pd.DataFrame:
    file_name = f"{data_location}/feature/{feature_view_name}/{date:%Y-%m-%d}.pq"
    return pd.read_parquet(file_name)


def join_features(
    predictions: pd.DataFrame, *feature_views: pd.DataFrame
) -> pd.DataFrame:
    results = predictions
    for feature_view in feature_views:
        results = pd.merge_asof(
            results,
            feature_view,
            on="timestamp",
            by="location",
        )
    return results
import datetime


import pandas as pd
from hamilton.function_modifiers import parameterize, value
from util import load_feature_view_for_date


@parameterize(
    prediction_requests={"feature_view_name": value("prediction_requests")},
    traffic_condition={"feature_view_name": value("traffic_condition")},
    taxi_availability={"feature_view_name": value("taxi_availability")},
    weather={"feature_view_name": value("weather")},
)
def parameterized_load_feature_view_for_date(
    feature_view_name: str,
    date: datetime.date,
    data_location: str,
) -> pd.DataFrame:
    return load_feature_view_for_date(feature_view_name, date, data_location)

voila