Source code for demos.models.employment

import orca
import numpy as np
import pandas as pd
from templates import estimated_models, modelmanager as mm
from templates.calibration import CalibrationConfig
import time
from logging_logic import log_execution_time
from config import (
    DEMOSConfig,
    EmploymentModuleConfig,
    SimultaneousCalibrationConfig,
    get_config,
)
from templates.utils.models import columns_in_formula
from loguru import logger

STEP_NAME = "employment"
REQUIRED_COLUMNS = ["persons.age", "persons.worker", "persons.earning"]


[docs] @orca.step(STEP_NAME) def employment(persons): """ Simulate labor force transitions for eligible persons. This step applies estimated models to determine which unemployed persons become employed and which employed persons exit the workforce. Only persons aged 18 or older are considered. The function updates the `worker` and `earning` columns in the `persons` table. Parameters ---------- persons : orca.Table The persons table containing individual-level attributes. Notes ----- - Requires the `persons` table with columns: `age`, `worker`, `earning`. - Modifies `persons.worker` and `persons.earning` in place. - Uses module configuration from the TOML config file. - Triggers caching of the `income_dist` table for income assignment. Example ------- This step is executed as part of the annual simulation loop: `orca.run(['employment'], iter_vars=[...])` """ start_time = time.time() # Load calibration config demos_config: DEMOSConfig = get_config() module_config: EmploymentModuleConfig = demos_config.employment_module_config if module_config.simultaneous_calibration_config is not None: stay_unemployed_list, exit_workforce_list = run_simultaenous_calibration( persons, module_config.simultaneous_calibration_config ) else: stay_unemployed_list = run_and_calibrate_in_workforce_model( persons, module_config.enter_model_calibration_procedure ) exit_workforce_list = run_and_calibrate_out_workforce_model( persons, module_config.exit_model_calibration_procedure ) # Re-index to help querying below reindexed_remain_unemployed = stay_unemployed_list.reindex( persons.local.index ).fillna(2) reindexed_exit_workforce = exit_workforce_list.reindex(persons.local.index).fillna( 2 ) # Trigger a cache for `income_dist` _ = orca.get_table("income_dist").to_frame() # Updating working status and income persons.local.loc[reindexed_exit_workforce == 1, "worker"] = 0 persons.local.loc[reindexed_exit_workforce == 1, "earning"] = 0 persons.local.loc[reindexed_remain_unemployed == 0, "worker"] = 1 persons.local.loc[reindexed_remain_unemployed == 0, "earning"] = ( persons["new_earning"].loc[reindexed_remain_unemployed == 0].values ) log_execution_time(start_time, orca.get_injectable("year"), STEP_NAME)
[docs] def sample_income(mean, std): """ Draw samples from a lognormal distribution. Parameters ---------- mean : float or array-like The mean(s) of the underlying normal distribution. std : float or array-like The standard deviation(s) of the underlying normal distribution. Returns ------- float or np.ndarray Sample(s) from the lognormal distribution. """ return np.random.lognormal(mean, std)
[docs] def run_and_calibrate_in_workforce_model( persons: pd.DataFrame, calibration_procedure: CalibrationConfig ) -> pd.Series: """ Run the 'enter_labor_force' estimated model for eligible persons. Parameters ---------- persons : pandas.DataFrame DataFrame of persons with required model variables. calibration_procedure : CalibrationConfig or None Calibration procedure to apply, if any. Returns ------- pandas.Series Model predictions for each eligible person (0 = remain unemployed, 1 = become employed). """ # Get model data model = mm.get_step("enter_labor_force") model_variables = columns_in_formula(model.model_expression) model_filters = (persons.worker == 0) & (persons.age >= 18) model_data = persons.to_frame(model_variables)[model_filters] # Calibrate if needed if calibration_procedure is not None: return calibration_procedure.calibrate_and_run_model(model, model_data) return model.predict(model_data)
[docs] def run_and_calibrate_out_workforce_model( persons: pd.DataFrame, calibration_procedure: CalibrationConfig ) -> pd.Series: """ Run the 'exit_labor_force' estimated model for eligible persons. Parameters ---------- persons : pandas.DataFrame DataFrame of persons with required model variables. calibration_procedure : CalibrationConfig or None Calibration procedure to apply, if any. Returns ------- pandas.Series Model predictions for each eligible person (0 = remain employed, 1 = become unemployed). """ # Get model data model = mm.get_step("exit_labor_force") model_variables = columns_in_formula(model.model_expression) model_filters = (persons.worker == 1) & (persons.age >= 18) model_data = persons.to_frame(model_variables)[model_filters] # Calibrate if needed if calibration_procedure is not None: return calibration_procedure.calibrate_and_run_model(model, model_data) return model.predict(model_data)
[docs] def run_simultaenous_calibration( persons: pd.DataFrame, simultaneous_calibration_config: SimultaneousCalibrationConfig, ) -> pd.Series: """ Run simultaneous calibration for both 'enter' and 'exit' labor force models. Parameters ---------- persons : pandas.DataFrame DataFrame of persons with required model variables. simultaneous_calibration_config : SimultaneousCalibrationConfig Configuration for simultaneous calibration. Returns ------- tuple of pandas.Series Predictions for entering and exiting the workforce. """ # Get enter model data enter_model = mm.get_step("enter_labor_force") enter_model_variables = columns_in_formula(enter_model.model_expression) enter_model_filters = (persons.worker == 0) & (persons.age >= 18) enter_model_data = persons.to_frame(enter_model_variables)[enter_model_filters] # Get exit model data exit_model = mm.get_step("exit_labor_force") exit_model_variables = columns_in_formula(exit_model.model_expression) exit_model_filters = (persons.worker == 1) & (persons.age >= 18) exit_model_data = persons.to_frame(exit_model_variables)[exit_model_filters] # Get calibration data observed_workers_table = orca.get_table("observed_employment").local observed_workers = observed_workers_table[ observed_workers_table.index == orca.get_injectable("year") ]["count"].iloc[0] enter_model_predictions, exit_model_predictions = enter_model.predict( enter_model_data ), exit_model.predict(exit_model_data) # enter_model_predictions == 0 are those who do NOT remain unemployed (will now be employed) # exit_model_preditions == 0 are those that remain employed predicted_total_workers = (enter_model_predictions == 0).sum() + ( exit_model_predictions == 0 ).sum() # Initialize optimization algorithm config: SimultaneousCalibrationConfig = simultaneous_calibration_config gradient = 0 momentum_weight = config.momentum_weight total_iterations = 0 error = abs(predicted_total_workers - observed_workers) while error > config.tolerance and total_iterations < config.max_iter: logger.info( f"Simultaneous Calibration: Iteration {total_iterations} error: {error}" ) lr = ( config.learning_rate * ((config.max_iter - total_iterations) + 0.5) / config.max_iter ) # Calculate gradient update_coeff = np.log(observed_workers / predicted_total_workers) gradient = lr * ( momentum_weight * gradient + (1 - momentum_weight) * update_coeff / 2 ) # Apply gradient enter_model.fitted_parameters[0] -= gradient exit_model.fitted_parameters[0] -= gradient enter_model_predictions, exit_model_predictions = enter_model.predict( enter_model_data ), exit_model.predict(exit_model_data) predicted_total_workers = (enter_model_predictions == 0).sum() + ( exit_model_predictions == 0 ).sum() error = abs(predicted_total_workers - observed_workers) total_iterations += 1 print(f"Final error after Simultaneous calibration: {error}") return enter_model_predictions, exit_model_predictions
[docs] @orca.column(table_name="persons") def new_earning(persons, income_dist): """ Compute new earnings for persons entering the workforce. For each eligible person, samples income from a lognormal distribution parameterized by age and education group, using the cached `income_dist` table. Parameters ---------- persons : orca.Table The persons table. income_dist : orca.Table Table with income distribution parameters. Returns ------- pandas.Series Sampled earnings for each person. """ persons_df = persons.to_frame(["age_group", "education_group"]) merged_df = persons_df.merge( income_dist.local, on=["age_group", "education_group"], how="left" ) return pd.Series( sample_income(merged_df["mu"], merged_df["sigma"]), index=persons_df.index )
[docs] @orca.column(table_name="households") def hh_workers(persons): """ Compute the number of workers per household. Parameters ---------- persons : orca.Table The persons table. Returns ------- pandas.Series Categorical summary: "none", "one", or "two or more" workers per household. """ return ( persons.to_frame(["household_id", "worker"]) .groupby("household_id") .sum()["worker"] .apply(lambda r: "none" if r == 0 else ("one" if r == 1 else "two or more")) )
[docs] @orca.column(table_name="households") def income(persons): """ Aggregate household income from person-level earnings. Parameters ---------- persons : orca.Table The persons table. Returns ------- pandas.Series Total income per household. Notes ----- This is for `HOUSEHOLDS` table """ return ( persons.to_frame(["household_id", "earning"]) .groupby("household_id") .sum()["earning"] )
[docs] @orca.table(cache=True, cache_scope="forever") def income_dist(persons): """ Compute and cache income distribution parameters by age and education group. This table is used to sample earnings for new workers in the employment model. Parameters ---------- persons : orca.Table The persons table. Returns ------- pandas.DataFrame DataFrame with columns: age_group, education_group, data_mean, data_std, mu, sigma. """ income_dist = ( persons.to_frame(["worker", "age_group", "education_group", "earning"])[ persons["worker"] == 1 ] .groupby(["age_group", "education_group"]) .agg(data_mean=("earning", "mean"), data_std=("earning", "std")) .reset_index() ) # Convert to the parameters of the underlying normal distribution mu = pd.Series( np.log( income_dist["data_mean"] ** 2 / np.sqrt(income_dist["data_std"] ** 2 + income_dist["data_mean"] ** 2) ), name="mu", ) sigma = pd.Series( np.sqrt( np.log(1 + income_dist["data_std"] ** 2 / income_dist["data_mean"] ** 2) ), name="sigma", ) income_dist = pd.concat([income_dist, mu, sigma], axis=1) return income_dist