import logging
from pathlib import Path
import boto3
import geopandas as gpd
import numpy as np
import pandas as pd
from botocore import UNSIGNED
from botocore.config import Config
from gtfsblocks import Feed
logger = logging.getLogger(__name__)
[docs]
def fetch_counties_gdf() -> gpd.GeoDataFrame:
gdf_county = gpd.read_file(
"https://www2.census.gov/geo/tiger/GENZ2022/shp/cb_2022_us_county_20m.zip"
)
gdf_county["county_id"] = (
"G" + gdf_county["STATEFP"] + "0" + gdf_county["COUNTYFP"] + "0"
)
return gdf_county
[docs]
def download_tmy_files(county_ids: list[str], tmy_dir: Path) -> None:
"""
Download and save TMY weather files for estimating thermal energy demand.
TMY stands for Typical Meteorological Year, a dataset that provides representative
hourly weather data for a location over a synthetic year. Unlike Actual
Meteorological Year (AMY) files, which reflect the observed conditions in a specific
calendar year, TMY files are constructed by selecting typical months from multiple
years of historical records. This approach smooths out unusual extremes and produces
a “typical” climate profile, making TMY data well-suited for long-term energy
modeling and system design studies.
This function downloads TMY files for all the supplied county IDs and saves them to
TMY_DIR. It returns None.
Parameters
----------
county_ids : list[str]
List of US Census County IDs for which to download TMY files.
tmy_dir : Path
Directory where downloaded TMY CSV files are saved.
"""
bucket_name = "oedi-data-lake" # S3 Bucket and path prefix for TMY data
prefix = (
"nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2021/"
"resstock_tmy3_release_1/weather/tmy3/"
)
# Create anonymous S3 client
s3 = boto3.client("s3", config=Config(signature_version=UNSIGNED))
if not tmy_dir.exists():
tmy_dir.mkdir(parents=True, exist_ok=True)
# Download files for each county
for county_id in county_ids:
file_key = f"{prefix}{county_id}_tmy3.csv"
local_file = tmy_dir / f"{county_id}.csv"
if not local_file.is_file():
s3.download_file(bucket_name, file_key, str(local_file))
print(f"Downloaded: {county_id}.csv")
[docs]
def load_thermal_lookup_table() -> pd.DataFrame:
# Create the HVAC + BTMS power lookup table
temperature_list = [-10, -5, 0, 5, 10, 15, 20, 25, 30, 35, 40] # From literature
HVAC_power_list = [25, 17, 10, 6, 4, 1, 1, 2, 4, 7, 11] # From literature
BTMS_power_list = [
4.9,
3.6,
2.1,
0.8,
0.2,
0.1,
0.1,
1.4,
1.5,
2.1,
5.6,
] # From literature
total_temp_energy_list = [
HVAC_power_list[i] + BTMS_power_list[i] for i in range(len(HVAC_power_list))
]
# Add two extreme values to make sure we cover all temperature values
min_temp = -100
max_temp = 100
min_temp_power = (-10 - min_temp) * (
total_temp_energy_list[0] - total_temp_energy_list[1]
) / 5 + total_temp_energy_list[0]
max_temp_power = (max_temp - 40) * (
total_temp_energy_list[-1] - total_temp_energy_list[-2]
) / 5 + total_temp_energy_list[-1]
# Extend temp and energy list
temperature_list = [-100] + temperature_list + [100]
total_temp_energy_list = (
[min_temp_power] + total_temp_energy_list + [max_temp_power]
)
# Define a dataframe to store the information
df_temp_energy = pd.DataFrame(
{"Temp_C": temperature_list, "Power": total_temp_energy_list}
)
# Fill every integer Temp_C
df_tmp_fill = pd.DataFrame({"Temp_C": np.arange(-100, 100.1, 0.1)})
df_temp_energy["Temp_C"] = df_temp_energy["Temp_C"].astype(float).round(1)
df_tmp_fill["Temp_C"] = df_tmp_fill["Temp_C"].astype(float).round(1)
df_temp_energy = df_tmp_fill.merge(df_temp_energy, on="Temp_C", how="left")
# Linear interpolate
df_temp_energy["Power"] = df_temp_energy["Power"].interpolate(method="linear")
df_temp_energy["Temp_C"] = df_temp_energy["Temp_C"].astype(float)
return df_temp_energy
[docs]
def compute_HVAC_energy(
start_hours: pd.Series,
end_hours: pd.Series,
power_array: np.typing.NDArray[np.number],
) -> np.typing.NDArray[np.number]:
"""
Calculate HVAC + BTMS energy consumption between time intervals.
Args:
start_hours (array-like): fractional start times in hours
end_hours (array-like): fractional end times in hours (can be > 24)
power_array (array-like): hourly average power values [kW] for hours 0–23
Returns:
np.ndarray: energy consumption [kWh] for each interval
"""
power_array = np.asarray(power_array)
power_24 = np.concatenate((power_array, [power_array[0]])) # wrap for interpolation
def interp_power(t: np.number) -> float:
"""Linearly interpolate power at fractional hour t."""
i = int(np.floor(t)) % 24
frac = t - np.floor(t)
return float((1 - frac) * power_24[i] + frac * power_24[i + 1])
energies = []
for s, e in zip(start_hours, end_hours):
# sample in small steps for accurate integration
ts = np.arange(s, e, 0.01) # 0.01 h = 36 s resolution
ps = np.array([interp_power(t) for t in ts])
energy = np.trapezoid(ps, ts) # integrate kW over hours → kWh
energies.append(energy)
return np.array(energies)
[docs]
def get_hourly_temperature(
county_id: str,
scenario: str,
tmy_dir: Path,
) -> pd.DataFrame:
local_file = tmy_dir / f"{county_id}.csv"
tmy_df = pd.read_csv(local_file, parse_dates=["date_time"])[
["date_time", "Dry Bulb Temperature [°C]"]
]
tmy_df["day_of_year"] = tmy_df["date_time"].dt.day_of_year
mean_temp_by_day = (
tmy_df.groupby("day_of_year")
.agg(avg_temp_C=("Dry Bulb Temperature [°C]", "mean"))
.reset_index()
)
if scenario == "winter":
# Find the days with the hottest and coldest average temperature
cold_day: int = mean_temp_by_day[
mean_temp_by_day.avg_temp_C == mean_temp_by_day.avg_temp_C.min()
]["day_of_year"].iloc[0]
# Grab the hourly profiles for the coldest and hottest days
df_out = tmy_df[tmy_df["day_of_year"] == cold_day].copy()
elif scenario == "summer":
hot_day: int = mean_temp_by_day[
mean_temp_by_day.avg_temp_C == mean_temp_by_day.avg_temp_C.max()
]["day_of_year"].iloc[0]
df_out = tmy_df[tmy_df["day_of_year"] == hot_day].copy()
elif scenario == "median":
median_day: int = mean_temp_by_day.iloc[
(mean_temp_by_day["avg_temp_C"] - mean_temp_by_day["avg_temp_C"].median())
.abs()
.argsort()[:1]
]["day_of_year"].iloc[0]
df_out = tmy_df[tmy_df["day_of_year"] == median_day].copy()
df_out["Dry Bulb Temperature [°C]"] = df_out["Dry Bulb Temperature [°C]"].round(
1
)
else:
raise ValueError(f"Unknown scenario: {scenario}")
df_out["hour"] = df_out["date_time"].dt.hour
return df_out
[docs]
def add_HVAC_energy(
feed: Feed, trips_df: pd.DataFrame, output_dir: Path | None = None
) -> pd.DataFrame:
"""
Add HVAC energy consumption.
Parameters
----------
feed : gtfsblocks.Feed
GTFS feed object containing blocks DataFrame.
trips_df : pd.DataFrame
Trips on selected date and route, including deadhead trips.
output_dir : Path or None
Directory used to store downloaded TMY weather files (in a ``TMY/``
subdirectory). If None, defaults to ``~/cache/routee-transit/TMY``.
Returns
-------
pd.DataFrame
Updated trip-level energy prediction DataFrame with HVAC energy
consumption per trip for each weather scenario
(``summer``, ``winter``, ``median``).
"""
if output_dir is not None:
tmy_dir = output_dir / "TMY"
else:
tmy_dir = Path.home() / "cache" / "routee-transit" / "TMY"
# Based on gtfs stops data, get counties served
df_stops = feed.stops
gdf_stops = gpd.GeoDataFrame(
df_stops,
geometry=gpd.points_from_xy(df_stops.stop_lon, df_stops.stop_lat),
crs=4269,
)
gdf_county = fetch_counties_gdf()
# Make sure that both GDFs use the same CRS
if gdf_stops.crs != gdf_county.crs:
gdf_county = gdf_county.to_crs(gdf_stops.crs)
# Start by joining directly to counties
gdf_stops = gpd.sjoin(
gdf_stops,
gdf_county[["geometry", "county_id"]],
how="left",
predicate="intersects",
)
# If any county IDs are NA, use sjoin_nearest to map to the nearest county
na_mask = gdf_stops["county_id"].isna()
na_stops = gdf_stops[na_mask]
if not na_stops.empty:
na_stops = gdf_stops[na_mask].drop(columns=["index_right", "county_id"])
# Project for distance calculation
na_stops = na_stops.to_crs("ESRI:102003")
na_stops = na_stops.sjoin_nearest(
right=gdf_county[["geometry", "county_id"]].to_crs("ESRI:102003"),
how="left",
max_distance=3000,
)
if na_stops["county_id"].isna().sum() > 0:
raise ValueError(
"One or more stops are not within 3 km of a county boundary. Unable to "
"add county-level weather data and HVAC impacts."
)
stops_final = pd.concat([gdf_stops[~na_mask], na_stops.to_crs("EPSG:4269")])
else:
stops_final = gdf_stops
county_ids = stops_final["county_id"].unique().tolist()
# Download TMY Weather Data
download_tmy_files(county_ids, tmy_dir)
df_temp_energy = load_thermal_lookup_table()
# Get the power tables for different weather scenarios
thermal_dfs = []
for county_id in county_ids:
for scenario in ["summer", "winter", "median"]:
hourly_temp_df = get_hourly_temperature(county_id, scenario, tmy_dir)
hourly_temp_df["Dry Bulb Temperature [°C]"] = hourly_temp_df[
"Dry Bulb Temperature [°C]"
].round(1)
hourly_temp_df = hourly_temp_df.merge(
df_temp_energy,
left_on="Dry Bulb Temperature [°C]",
right_on="Temp_C",
how="left",
)
hourly_temp_df["scenario"] = scenario
hourly_temp_df["county"] = county_id
thermal_dfs.append(hourly_temp_df)
thermal_power_vals = (
pd.concat(thermal_dfs).groupby(["scenario", "hour"])["Power"].mean()
)
# Last calculate the trip HVAC energy
# Get the start and end time for each trip
df_stop_times = feed.stop_times[
feed.stop_times["trip_id"].isin(trips_df["trip_id"].unique())
].copy()
df_trip_time = (
df_stop_times.groupby("trip_id")
.agg(start_time=("arrival_time", "min"), end_time=("arrival_time", "max"))
.reset_index()
)
start_hours = df_trip_time["start_time"].dt.total_seconds() / 3600
end_hours = df_trip_time["end_time"].dt.total_seconds() / 3600
hvac_df_list = []
for scen, subdf in thermal_power_vals.reset_index().groupby("scenario"):
scenario_trips = trips_df.copy()
scenario_trips["scenario"] = scen
scenario_trips["hvac_energy_kWh"] = compute_HVAC_energy(
start_hours, end_hours, subdf["Power"].to_numpy()
)
hvac_df_list.append(scenario_trips)
all_predictions = pd.concat(hvac_df_list).reset_index(drop=True)
trips_df_out = trips_df.merge(
all_predictions[["trip_id", "scenario", "hvac_energy_kWh"]],
on="trip_id",
how="inner",
)
return trips_df_out