Source code for demos.models.fatality
import orca
import numpy as np
import pandas as pd
from templates import estimated_models, modelmanager as mm
import time
from logging_logic import log_execution_time
from config import DEMOSConfig, get_config
from templates.utils.models import columns_in_formula
STEP_NAME = "fatality"
REQUIRED_COLUMNS = [
"persons.MAR",
"persons.relate",
]
[docs]
@orca.step(STEP_NAME)
def fatality(persons, households, relational_adjustment_mapping, graveyard):
"""
Simulate mortality events and update the persons and households tables.
This step applies the `mortality` estimated model to determine which persons die in the current year.
It removes deceased persons from the persons table, updates marital status and household relationships,
and moves the deceased to the graveyard table. The `relate` column is updated using the relational
adjustment mapping to ensure consistency.
Parameters
----------
persons : orca.Table
The persons table containing individual-level attributes.
households : orca.Table
The households table containing household-level attributes.
relational_adjustment_mapping : orca.Table
Table mapping old relationship codes to new ones after a head of household dies.
graveyard : orca.Table
Table for storing records of deceased individuals.
Returns
-------
None
Notes
-----
- Modifies `persons.MAR`, `persons.relate`, and removes rows from `persons` and `households` tables.
- Adds rows to the `graveyard` table for deceased individuals.
- Updates marital status: surviving spouses/partners become widowed.
- If a household head dies, a partner or the oldest remaining member becomes the new head.
- The `relate` column for all household members is updated using the mapping table.
- Some errors (e.g., multiple spouses per household) are handled silently.
"""
start_time = time.time()
# persons["dead"] = -99
fatality_list = run_and_calibrate_mortality_model(persons)
# Updates necessary:
## - Remove rows from persons table where fatality_list == 1
## - Update Marital status: Married people become Widows if the spouse dies
## - Update Relate column if head died:
### If there is a relate==1 or relate==13 person alive, they are the new head
### Otherwise, oldest person is new head
### In all cases, we need to use the `relational_adjustment_mapping` table to map the rest of the relate columns
fatality_list_idx = fatality_list.astype(bool).reindex(persons.local.index)
dead_people_slice = persons.local.loc[fatality_list_idx]
households_with_dead_people = dead_people_slice.household_id.unique()
persons_in_relevant_household_index = persons["household_id"].isin(
households_with_dead_people
)
# Update Marital status
## If dead person is spouse or partner, head of household is now widow
dead_partners_households = dead_people_slice[
(dead_people_slice.relate == 1) | (dead_people_slice.relate == 13)
]["household_id"].values
## Update widow heads that are still alive
persons.local.loc[
~fatality_list_idx
& persons["household_id"].isin(dead_partners_households)
& (persons["relate"] == 0),
"MAR",
] = 2
# If dead person is head, spouse or partner is now widow
dead_heads_households = dead_people_slice[dead_people_slice.relate == 0][
"household_id"
]
## Update widow partners that are still alive
persons.local.loc[
~fatality_list_idx
& persons["household_id"].isin(dead_heads_households)
& ((persons["relate"] == 1) | (persons["relate"] == 13)),
"MAR",
] = 2
# Updates to `relate`
## Select all the person_id's of alive people where the head died
## They are the new heads
partner_to_head_household_ids = persons.local.loc[
~fatality_list_idx
& persons["relate"].isin([1, 13])
& persons["household_id"].isin(dead_heads_households)
]["household_id"]
partner_to_head_ids = partner_to_head_household_ids.index
## Before modifying the dataframe, select the new heads for the rest of households (by age)
rest_to_head_households = set(dead_heads_households) - set(
partner_to_head_household_ids
)
rest_to_head_ids = (
persons.local.loc[
~fatality_list_idx & persons["household_id"].isin(rest_to_head_households)
]
.groupby("household_id")["age"]
.idxmax()
.values
)
## We need to update the relate column of all the people where the head died
### We only need to do this for the "rest to head" because partners keep the same relation as the previous head
new_heads_ids = rest_to_head_ids.tolist()
rest_to_head_all_filter = (
persons["household_id"].isin(rest_to_head_households) & ~fatality_list_idx
)
new_heads_old_relate_by_hh = persons.local.loc[
new_heads_ids, ["household_id", "relate"]
].set_index("household_id")["relate"]
head_old_relate_by_person_id = persons.local.loc[
rest_to_head_all_filter
].household_id.map(new_heads_old_relate_by_hh)
person_old_relate_by_person_id = persons.local.loc[rest_to_head_all_filter].relate
### In order to efficiently access the relational_adjustment_mapping dataframe, we transform relate values into
### indices to the numpy representation of the dataframe
rel_map_columns = relational_adjustment_mapping.to_frame().columns
rel_map_index = relational_adjustment_mapping.to_frame().index
#### First: The column value that we should query corresponds to the
#### old relate value of the new household head
#### NOTE: the str transformation might need to be changed in the future if rel_map is changed
old_head_relate_index = rel_map_columns.get_indexer(
head_old_relate_by_person_id.astype(str)
)
#### Second: The row value corresponds with the old relate of the
#### person changing relate (everyone but the new head)
old_person_relate_index = rel_map_index.get_indexer(
person_old_relate_by_person_id.values
)
#### With both of these we can now get the new relate values
#### (This step also replaces the relate value of the new heads,
#### we take care of that after this processing)
#### TODO: Here we can create spouses without correct marital status
persons.local.loc[
rest_to_head_all_filter, "relate"
] = relational_adjustment_mapping.to_frame().values[
old_person_relate_index, old_head_relate_index
]
## Update relate of new heads
persons.local.loc[
rest_to_head_ids.tolist() + partner_to_head_ids.to_list(), "relate"
] = 0
# Finally, remove dead people from the persons dataframe and move them to graveyard
dead_people = persons.local.loc[fatality_list_idx].copy()
graveyard.local = pd.concat([graveyard.local, dead_people])
persons.local = persons.local[~fatality_list_idx]
# TODO: This needs to be reevaluated after the refactoring
spouses_per_hh = (persons.relate == 1).groupby(persons.household_id).sum()
persons.local = persons.local.loc[
~persons.household_id.isin(spouses_per_hh[spouses_per_hh > 1].index)
]
households.local = households.local.reindex(sorted(persons.household_id.unique()))
log_execution_time(start_time, orca.get_injectable("year"), "mortality")
# TODO: Refactor this
def run_and_calibrate_mortality_model(persons):
# Load calibration config
demos_config: DEMOSConfig = get_config()
calibration_procedure = demos_config.mortality_module_config.calibration_procedure
# Get model data
model = mm.get_step("mortality")
model_variables = columns_in_formula(model.model_expression)
model_data = persons.to_frame(model_variables)
# Calibrate if needed
if calibration_procedure is not None:
return calibration_procedure.calibrate_and_run_model(model, model_data)
return model.predict(model_data)