Outline¶
- Introduction to machine learning and feature engineering
- Naive implementation of feature calculation
- Switching to Hamilton
- 2 advanced features of Hamilton
- Hamilton tips
About Me¶
Arthur Andres, software engineer at Tradewell Technologies www.tradewelltech.co/
Find me on github as 0x26res
www.github.com/0x26res
Machine learning & feature engineering¶
Key concepts¶
Prediction request¶
A prediction request is an event for which we want to predict, here a timestamp and a location.
timestamp | location |
---|---|
2024-02-20 17:30:00+00:00 | London |
2024-02-20 17:45:00+00:00 | San Francisco |
Features¶
A feature is scalar/value given to the model as input to help with the prediction
Feature views¶
Each feature view represents a logical group of time series of features.
timestamp | location | traffic |
---|---|---|
2024-02-20 16:00:00+00:00 | London | BAD |
2024-02-20 16:00:00+00:00 | San Francisco | GOOD |
timestamp | location | electric | self_driving | convertible |
---|---|---|---|---|
2024-02-20 17:00:00+00:00 | London | 1000 | 0 | 1000 |
2024-02-20 17:00:00+00:00 | San Francisco | 3000 | 3000 | 50 |
Feature "point-in-time" join¶
timestamp | location | traffic | electric | self_driving | convertible |
---|---|---|---|---|---|
2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 |
2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 |
Simple feature calculation job¶
A bit of configuration first¶
data_location = "./dev"
date = datetime.date(2024, 2, 20)
Loading features¶
def load_feature_view_for_date(
feature_view_name: str,
date: datetime.date,
data_location: str,
) -> pd.DataFrame:
file_name = f"{data_location}/feature/{feature_view_name}/{date:%Y-%m-%d}.pq"
return pd.read_parquet(file_name)
prediction_requests = load_feature_view_for_date(
"prediction_requests",
date,
data_location,
)
traffic_condition = load_feature_view_for_date(
"traffic_condition",
date,
data_location,
)
taxi_availability = load_feature_view_for_date(
"taxi_availability",
date,
data_location,
)
Putting features together¶
def join_features(
predictions: pd.DataFrame, *feature_views: pd.DataFrame
) -> pd.DataFrame:
results = predictions
for feature_view in feature_views:
results = pd.merge_asof(
results,
feature_view,
on="timestamp",
by="location",
)
return results
join_features(prediction_requests, traffic_condition, taxi_availability)
timestamp | location | traffic | electric | self_driving | convertible | |
---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 |
Adding a more complicated feature¶
weather = load_feature_view_for_date("weather", date, data_location)
timestamp | location | weather |
---|---|---|
2024-02-20 12:00:00+00:00 | London | rain |
2024-02-20 12:00:00+00:00 | San Francisco | fog |
def calculate_brand_new_features(
weather: pd.DataFrame,
taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
return pd.merge_asof(
taxi_availability,
weather,
on="timestamp",
by="location",
).assign(
actual_convertible=lambda x: x["convertible"].where(
x["weather"] != "rain",
0.0,
)
)[
["timestamp", "location", "actual_convertible"]
]
brand_new_features = calculate_brand_new_features(weather, taxi_availability)
timestamp | location | actual_convertible |
---|---|---|
2024-02-20 17:00:00+00:00 | London | 0 |
2024-02-20 17:00:00+00:00 | San Francisco | 50 |
join_features(
prediction_requests,
traffic_condition,
taxi_availability,
brand_new_features,
)
timestamp | location | traffic | electric | self_driving | convertible | actual_convertible | |
---|---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 | 0 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 | 50 |
But only in dev...¶
is_dev = os.environ["ENV"] == "dev"
if is_dev:
weather = load_feature_view_for_date("weather", date, data_location)
brand_new_features = calculate_brand_new_features(weather, taxi_availability)
# ... later
(
join_features(
prediction_requests,
traffic_condition,
taxi_availability,
brand_new_features,
)
if is_dev
else join_features(
prediction_requests,
traffic_condition,
taxi_availability,
)
)
timestamp | location | traffic | electric | self_driving | convertible | actual_convertible | |
---|---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 | 0 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 | 50 |
Limitations¶
- a lot of imperative code
- hard to add and test new features
- feature flag == 🚩
Moving to Hamilton¶
Reorganizing our code¶
load_features.py¶
import datetime
import pandas as pd
from util import load_feature_view_for_date
def prediction_requests(
date: datetime.date,
data_location: str,
) -> pd.DataFrame:
return load_feature_view_for_date(
"prediction_requests",
date,
data_location,
)
def traffic_condition(date: datetime.date, data_location: str) -> pd.DataFrame:
return load_feature_view_for_date("traffic_condition", date, data_location)
def taxi_availability(date: datetime.date, data_location: str) -> pd.DataFrame:
return load_feature_view_for_date("taxi_availability", date, data_location)
combine_features.py¶
import pandas as pd
from util import join_features
def all_features(
prediction_requests: pd.DataFrame,
traffic_condition: pd.DataFrame,
taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
return join_features(
prediction_requests,
traffic_condition,
taxi_availability,
)
Executing our DAG¶
import hamilton
from hamilton.driver import Builder
import combine_features, load_features
driver = (
hamilton.driver.Builder()
.with_modules(
load_features,
combine_features,
)
.build()
)
driver.execute(
inputs={"date": date, "data_location": data_location},
final_vars={"all_features"},
)["all_features"]
timestamp | location | traffic | electric | self_driving | convertible | |
---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 |
Takeaway¶
Pros:
- Better organized code
- Dependency injection of resources / config (
data_location
) - Nice graph, great for documentation
Cons:
- Some boiler plate function definition in
load_features.py
- Can't configure the inputs of
all_features
Reuse a function with parameterize
¶
Issue: a lot of duplicated function for the sake of integrating with hamilton
import datetime
import pandas as pd
from util import load_feature_view_for_date
def prediction_requests(
date: datetime.date,
data_location: str,
) -> pd.DataFrame:
return load_feature_view_for_date(
"prediction_requests",
date,
data_location,
)
def traffic_condition(date: datetime.date, data_location: str) -> pd.DataFrame:
return load_feature_view_for_date("traffic_condition", date, data_location)
def taxi_availability(date: datetime.date, data_location: str) -> pd.DataFrame:
return load_feature_view_for_date("taxi_availability", date, data_location)
Solution: use parameterize
import datetime
import pandas as pd
from hamilton.function_modifiers import parameterize, value
from util import load_feature_view_for_date
@parameterize(
prediction_requests={"feature_view_name": value("prediction_requests")},
traffic_condition={"feature_view_name": value("traffic_condition")},
taxi_availability={"feature_view_name": value("taxi_availability")},
weather={"feature_view_name": value("weather")},
)
def parameterized_load_feature_view_for_date(
feature_view_name: str,
date: datetime.date,
data_location: str,
) -> pd.DataFrame:
return load_feature_view_for_date(feature_view_name, date, data_location)
import hamilton
from hamilton.driver import Builder
import combine_features, load_features_2
driver = (
hamilton.driver.Builder()
.with_modules(
load_features_2,
combine_features,
)
.build()
)
driver.execute(
inputs={"date": date, "data_location": data_location},
final_vars={"all_features"},
)["all_features"]
timestamp | location | traffic | electric | self_driving | convertible | |
---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 |
Selecting inputs of a node based on configuration with resolve
¶
Issue: I would like to select the inputs of all_features
import pandas as pd
from util import join_features
def all_features(
prediction_requests: pd.DataFrame,
traffic_condition: pd.DataFrame,
taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
return join_features(
prediction_requests,
traffic_condition,
taxi_availability,
)
Solution: use resolve to select function inputs are run time based on config
import pandas as pd
from hamilton.function_modifiers import (
ResolveAt,
group,
inject,
resolve,
source,
)
from util import join_features
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda feature_views: inject(
feature_views=group(
**{feature_view: source(feature_view) for feature_view in feature_views}
)
),
)
def all_features(
prediction_requests: pd.DataFrame,
feature_views: dict[str, pd.DataFrame],
) -> pd.DataFrame:
return join_features(prediction_requests, *feature_views.values())
import combine_features_2, load_features_2
from hamilton import settings
feature_views = ["traffic_condition", "taxi_availability"]
driver = (
hamilton.driver.Builder()
.with_modules(load_features_2, combine_features_2)
.with_config(
{"feature_views": feature_views, settings.ENABLE_POWER_USER_MODE: True}
)
.build()
)
timestamp | location | traffic | electric | self_driving | convertible | |
---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 |
Adding a new feature view easily¶
import pandas as pd
def brand_new_features(
weather: pd.DataFrame,
taxi_availability: pd.DataFrame,
) -> pd.DataFrame:
return pd.merge_asof(
taxi_availability,
weather,
on="timestamp",
by="location",
).assign(
actual_convertible=lambda x: x["convertible"].where(x["weather"] == "rain", 0.0)
)[
["timestamp", "location", "actual_convertible"]
]
import brand_new_features
feature_views = ["traffic_condition", "taxi_availability", "brand_new_features"]
driver = (
hamilton.driver.Builder()
.with_modules(
load_features_2,
combine_features_2,
brand_new_features,
)
.with_config(
{"feature_views": feature_views, settings.ENABLE_POWER_USER_MODE: True}
)
.build()
)
timestamp | location | traffic | electric | self_driving | convertible | actual_convertible | |
---|---|---|---|---|---|---|---|
0 | 2024-02-20 17:30:00+00:00 | London | BAD | 1000 | 0 | 1000 | 1000 |
1 | 2024-02-20 17:45:00+00:00 | San Francisco | GOOD | 3000 | 3000 | 50 | 0 |
feature_views = ["traffic_condition", "taxi_availability"]
Our experience migrating to Hamilton¶
- Don't try to reimplement it yourself, there's a lot to it
- Great error message from the library
- Very responsive Q&A on slack
- It's a mindset to get used to
- 80/20 rule
80% of the time you'll need plain hamilton (one function -> one node in the dag)
20% of the time you'll need some advanced features with decorators
- It doesn't have to intrude on your code
separate your core code from the nodes function in hamilton
import datetime
import pandas as pd
def load_feature_view_for_date(
feature_view_name: str,
date: datetime.date,
data_location: str,
) -> pd.DataFrame:
file_name = f"{data_location}/feature/{feature_view_name}/{date:%Y-%m-%d}.pq"
return pd.read_parquet(file_name)
def join_features(
predictions: pd.DataFrame, *feature_views: pd.DataFrame
) -> pd.DataFrame:
results = predictions
for feature_view in feature_views:
results = pd.merge_asof(
results,
feature_view,
on="timestamp",
by="location",
)
return results
import datetime
import pandas as pd
from hamilton.function_modifiers import parameterize, value
from util import load_feature_view_for_date
@parameterize(
prediction_requests={"feature_view_name": value("prediction_requests")},
traffic_condition={"feature_view_name": value("traffic_condition")},
taxi_availability={"feature_view_name": value("taxi_availability")},
weather={"feature_view_name": value("weather")},
)
def parameterized_load_feature_view_for_date(
feature_view_name: str,
date: datetime.date,
data_location: str,
) -> pd.DataFrame:
return load_feature_view_for_date(feature_view_name, date, data_location)
voila