Skip to content

expanded_optimizer — Scenario-Expanded Solver

Alternative formulation: each scenario gets its own set of wind+slack qubits (no superposition over scenarios). Simpler to understand, but exponentially larger register.

Legacy code

Written for Qiskit 0.x/1.x. Apply import fixes before running.


Optimizer_Expanded

QuantumExpectedValueFunctionProject.expanded_optimizer.Optimizer_Expanded

Optimizer_Expanded(system, encoding)

ctor. Take the PowerSystem we will be solving, reserve variables and qubits

Order the variables as gas, wind+slack(scenario1), wind+slack(scenario2), wind+slack(scenario3)...

Source code in QuantumExpectedValueFunctionProject/expanded_optimizer.py
def __init__(self, system, encoding):
    ''' ctor. 
        Take the PowerSystem we will be solving, reserve variables and qubits

        Order the variables as gas, wind+slack(scenario1), wind+slack(scenario2), wind+slack(scenario3)...
    '''
    assert(encoding == 'binary' or encoding == 'unary')
    self.system = system
    self.encoding = encoding
    self.num_scenarios = len(system.pdf.keys())
    self.num_variables = system.num_gas_generators + self.num_scenarios*system.num_wind_turbines + self.num_scenarios
    # get cost for each variable in this problem encoding
    self.variable_costs = system.gas_costs
    for i,_ in enumerate(system.scenarios):
        [self.variable_costs.append(cost) for cost in system.wind_costs]
        self.variable_costs.append(system.undersatisfied_cost)

    # declare variables
    # assign varids, expect order gas, wind(+slackscenario1), wind+slack(scenario2), ... 
    self.varids = list(range(self.num_variables))
    # Limit variable register max values
    # declare gas variables
    self.variables =[optimizer_utils.VariableRegister(system.decision_levels-1, encoding) for _ in range(system.num_gas_generators)]
    # declare a set of wind variables and the slack variable for each scenario
    for scenario in system.scenarios:
        # the wind turbines for this scenario
        for w in range(system.num_wind_turbines):
            # we can only use a hard-cap for unary encodings
            #if encoding == 'unary':
                self.variables.append(optimizer_utils.VariableRegister(scenario[w], encoding))
            #elif encoding == 'binary':
            #    # get the register-max for the scenario

        # slack variable for that scenario - the slack variable is always the last variable in the scenario
        self.variables.append(optimizer_utils.VariableRegister(self.system.demand, encoding))
    # map gas to varids
    self.gas_to_varids = {i : i for i in range(system.num_gas_generators)}
    # map scenario to varids
    self.scenario_to_varids = {i : list(range((system.num_wind_turbines+1)*i + system.num_gas_generators, 
                                              (system.num_wind_turbines+1)*i + system.num_gas_generators + system.num_wind_turbines + 1))
                               for i in range(self.num_scenarios)}
    # reserve qubits
    self.num_qubits = sum([var.width for var in self.variables])
    self.varid_to_qubits = {}
    qubit = 0
    for i,reg in enumerate(self.variables):
        self.varid_to_qubits[i] = list(range(qubit, qubit+reg.width))
        qubit += reg.width

Functions

solveAnnealing

solveAnnealing(time, method='QUBO', num_meas=10000, penalty=1)

solveAnnealing Solve the optimization problem with an annealing routine, specify if we use a Dicke state and constraint preserving mixer or QUBO with a penalty Hamiltonian

Source code in QuantumExpectedValueFunctionProject/expanded_optimizer.py
def solveAnnealing(self, time, method='QUBO', num_meas=10_000, penalty=1):
    ''' solveAnnealing
        Solve the optimization problem with an annealing routine, specify if we use 
        a Dicke state and constraint preserving mixer or QUBO with a penalty Hamiltonian
    '''
    if self.system.normalization is not None and penalty is not None:
        #penalty *= self.system.normalization[1]/self.system.normalization[0]
        penalty = self.system.normalize(penalty)
    demand = self.system.demand
    # NOTE this will require a bit more work
    if self.system.normalization is not None:# and penalty is not None:
        demand = self.system.normalize(demand)
    # warning
    #if phase!='PEN' and penalty is not None:
    #    print("WARNING: specified a penalty but the penalty cost Hamiltonian is not used")
    #if penalty is None and phase == 'PEN':
    #    print("ERROR: if PEN is specified (penalty Hamiltonian) we need a penalty specified")
    #    exit(1)

    qc = QuantumCircuit(self.num_qubits, self.num_qubits)
    if method == 'QUBO':
        for qubit in range(self.num_qubits):
            qc.h(qubit)
    else:
        print("Unimplemented annealing solution method: {}".format(method))
        return 0

    for t in range(time):
        f = (t+1)/(time+1)
        ####
        # cost operator
        ####
        # cost operator - gas
        for _,varid in self.gas_to_varids.items():
            reg = self.variables[varid]
            cost = self.variable_costs[varid]
            qc.append(reg.numberOperator(f*cost), self.varid_to_qubits[varid])
        # cost operator - second stage
        for scenario_id,varids in self.scenario_to_varids.items():
            # get the probability of this scenario
            pr = self.system.pdf[self.system.scenarios[scenario_id]]
            #print(scenario_id)
            # apply cost operator to each set of variables in this scenario
            for varid in varids:
                reg = self.variables[varid]
                cost = self.variable_costs[varid]
                #print(varid,cost)
                qc.append(reg.numberOperator(pr*f*cost), self.varid_to_qubits[varid])
        qc.barrier()

        # cost operator - penalty term
        # each scenario has an equlity constraint enforced with quadratic penalty
        # additionally, binary encodings need to enforce stochastic inequality constraints with a penalty
        for scenario_id, varids in self.scenario_to_varids.items():
            gas_varids = list(self.gas_to_varids.values())
            all_varids = gas_varids + varids
            # penalty constraint
            for j,varid_j in enumerate(all_varids):
                # -2*gamma*d*f * sum_j(N_j)
                qc.append(self.variables[varid_j].numberOperator(-2 * penalty * demand * f), 
                          self.varid_to_qubits[varid_j])#[::-1])
                # gamma*f * sum_j(N_j*N_j)
                qc.append(self.variables[varid_j].squaredOperator(penalty * f), 
                          self.varid_to_qubits[varid_j])#[::-1])
                # 2*gamma*f * sum_j<k(N_j*N_k)
                for varid_k in all_varids[j+1:]:
                    qc.append(self.variables[varid_j].productOperator(self.variables[varid_k], 2*penalty*f),
                              #(self.varid_to_qubits[varid_k]+self.varid_to_qubits[varid_j])[::-1])
                              #(self.varid_to_qubits[varid_k][::-1]+self.varid_to_qubits[varid_j][::-1]))
                              self.varid_to_qubits[varid_j]+self.varid_to_qubits[varid_k])
                              #self.varid_to_qubits[varid_k] + self.varid_to_qubits[varid_j])
            # stochastic inequality constraint
            continue
            if self.encoding == 'binary':
                for j,varid_j in enumerate(varids[:-1]):
                    # NOTE we assume j is that variables position within the pdf
                    qc.append(self.variables[varid_j].lessThanValue(self.system.scenarios[scenario_id][j], 
                                                                    self.system.undersatisfied_cost*f),
                                                                    #penalty*f),
                              self.varid_to_qubits[varid_j])#[::-1])
        #break
        qc.barrier()
        ####
        # mixing operator
        ####
        if method == 'QUBO':
            for i in range(self.num_qubits):
                qc.rx(1-f, i)
        else:
            print("Unimplemented mixing operator for method: {}".format(method))
    qc.measure(list(range(self.num_qubits)), list(range(self.num_qubits)))

    # Transpile for simulator
    simulator = Aer.get_backend('aer_simulator')
    qc = transpile(qc, simulator)
    #print(qc)

    # Run and get statevector
    result = simulator.run(qc, shots=num_meas).result()
    counts = result.get_counts(qc)
    return counts