ctor.
Take the PowerSystem we will be solving, reserve variables and qubits
Order the variables as gas, wind+slack(scenario1), wind+slack(scenario2), wind+slack(scenario3)...
Source code in QuantumExpectedValueFunctionProject/expanded_optimizer.py
| def __init__(self, system, encoding):
''' ctor.
Take the PowerSystem we will be solving, reserve variables and qubits
Order the variables as gas, wind+slack(scenario1), wind+slack(scenario2), wind+slack(scenario3)...
'''
assert(encoding == 'binary' or encoding == 'unary')
self.system = system
self.encoding = encoding
self.num_scenarios = len(system.pdf.keys())
self.num_variables = system.num_gas_generators + self.num_scenarios*system.num_wind_turbines + self.num_scenarios
# get cost for each variable in this problem encoding
self.variable_costs = system.gas_costs
for i,_ in enumerate(system.scenarios):
[self.variable_costs.append(cost) for cost in system.wind_costs]
self.variable_costs.append(system.undersatisfied_cost)
# declare variables
# assign varids, expect order gas, wind(+slackscenario1), wind+slack(scenario2), ...
self.varids = list(range(self.num_variables))
# Limit variable register max values
# declare gas variables
self.variables =[optimizer_utils.VariableRegister(system.decision_levels-1, encoding) for _ in range(system.num_gas_generators)]
# declare a set of wind variables and the slack variable for each scenario
for scenario in system.scenarios:
# the wind turbines for this scenario
for w in range(system.num_wind_turbines):
# we can only use a hard-cap for unary encodings
#if encoding == 'unary':
self.variables.append(optimizer_utils.VariableRegister(scenario[w], encoding))
#elif encoding == 'binary':
# # get the register-max for the scenario
# slack variable for that scenario - the slack variable is always the last variable in the scenario
self.variables.append(optimizer_utils.VariableRegister(self.system.demand, encoding))
# map gas to varids
self.gas_to_varids = {i : i for i in range(system.num_gas_generators)}
# map scenario to varids
self.scenario_to_varids = {i : list(range((system.num_wind_turbines+1)*i + system.num_gas_generators,
(system.num_wind_turbines+1)*i + system.num_gas_generators + system.num_wind_turbines + 1))
for i in range(self.num_scenarios)}
# reserve qubits
self.num_qubits = sum([var.width for var in self.variables])
self.varid_to_qubits = {}
qubit = 0
for i,reg in enumerate(self.variables):
self.varid_to_qubits[i] = list(range(qubit, qubit+reg.width))
qubit += reg.width
|
Functions
solveAnnealing
solveAnnealing(time, method='QUBO', num_meas=10000, penalty=1)
solveAnnealing
Solve the optimization problem with an annealing routine, specify if we use
a Dicke state and constraint preserving mixer or QUBO with a penalty Hamiltonian
Source code in QuantumExpectedValueFunctionProject/expanded_optimizer.py
| def solveAnnealing(self, time, method='QUBO', num_meas=10_000, penalty=1):
''' solveAnnealing
Solve the optimization problem with an annealing routine, specify if we use
a Dicke state and constraint preserving mixer or QUBO with a penalty Hamiltonian
'''
if self.system.normalization is not None and penalty is not None:
#penalty *= self.system.normalization[1]/self.system.normalization[0]
penalty = self.system.normalize(penalty)
demand = self.system.demand
# NOTE this will require a bit more work
if self.system.normalization is not None:# and penalty is not None:
demand = self.system.normalize(demand)
# warning
#if phase!='PEN' and penalty is not None:
# print("WARNING: specified a penalty but the penalty cost Hamiltonian is not used")
#if penalty is None and phase == 'PEN':
# print("ERROR: if PEN is specified (penalty Hamiltonian) we need a penalty specified")
# exit(1)
qc = QuantumCircuit(self.num_qubits, self.num_qubits)
if method == 'QUBO':
for qubit in range(self.num_qubits):
qc.h(qubit)
else:
print("Unimplemented annealing solution method: {}".format(method))
return 0
for t in range(time):
f = (t+1)/(time+1)
####
# cost operator
####
# cost operator - gas
for _,varid in self.gas_to_varids.items():
reg = self.variables[varid]
cost = self.variable_costs[varid]
qc.append(reg.numberOperator(f*cost), self.varid_to_qubits[varid])
# cost operator - second stage
for scenario_id,varids in self.scenario_to_varids.items():
# get the probability of this scenario
pr = self.system.pdf[self.system.scenarios[scenario_id]]
#print(scenario_id)
# apply cost operator to each set of variables in this scenario
for varid in varids:
reg = self.variables[varid]
cost = self.variable_costs[varid]
#print(varid,cost)
qc.append(reg.numberOperator(pr*f*cost), self.varid_to_qubits[varid])
qc.barrier()
# cost operator - penalty term
# each scenario has an equlity constraint enforced with quadratic penalty
# additionally, binary encodings need to enforce stochastic inequality constraints with a penalty
for scenario_id, varids in self.scenario_to_varids.items():
gas_varids = list(self.gas_to_varids.values())
all_varids = gas_varids + varids
# penalty constraint
for j,varid_j in enumerate(all_varids):
# -2*gamma*d*f * sum_j(N_j)
qc.append(self.variables[varid_j].numberOperator(-2 * penalty * demand * f),
self.varid_to_qubits[varid_j])#[::-1])
# gamma*f * sum_j(N_j*N_j)
qc.append(self.variables[varid_j].squaredOperator(penalty * f),
self.varid_to_qubits[varid_j])#[::-1])
# 2*gamma*f * sum_j<k(N_j*N_k)
for varid_k in all_varids[j+1:]:
qc.append(self.variables[varid_j].productOperator(self.variables[varid_k], 2*penalty*f),
#(self.varid_to_qubits[varid_k]+self.varid_to_qubits[varid_j])[::-1])
#(self.varid_to_qubits[varid_k][::-1]+self.varid_to_qubits[varid_j][::-1]))
self.varid_to_qubits[varid_j]+self.varid_to_qubits[varid_k])
#self.varid_to_qubits[varid_k] + self.varid_to_qubits[varid_j])
# stochastic inequality constraint
continue
if self.encoding == 'binary':
for j,varid_j in enumerate(varids[:-1]):
# NOTE we assume j is that variables position within the pdf
qc.append(self.variables[varid_j].lessThanValue(self.system.scenarios[scenario_id][j],
self.system.undersatisfied_cost*f),
#penalty*f),
self.varid_to_qubits[varid_j])#[::-1])
#break
qc.barrier()
####
# mixing operator
####
if method == 'QUBO':
for i in range(self.num_qubits):
qc.rx(1-f, i)
else:
print("Unimplemented mixing operator for method: {}".format(method))
qc.measure(list(range(self.num_qubits)), list(range(self.num_qubits)))
# Transpile for simulator
simulator = Aer.get_backend('aer_simulator')
qc = transpile(qc, simulator)
#print(qc)
# Run and get statevector
result = simulator.run(qc, shots=num_meas).result()
counts = result.get_counts(qc)
return counts
|