"""
========================================================
Solver Applications in Optimization and Network Modeling
========================================================
The provided code constitutes a comprehensive framework for research and development in the fields of artificial intelligence and computational biology. Its architecture consists of specialized modules addressing key aspects of modeling, machine learning, and optimization, with an emphasis on code robustness and reusability.
From a technical perspective, the code includes:
Training Data Generation: Functionality for generating synthetic data and manipulating real datasets, with advanced options for controlled noise introduction and simulation of specific scenarios.
Machine Learning Model Development: Tools for constructing and customizing machine learning models, including deep neural network architectures and supervised and unsupervised learning algorithms.
Performance Evaluation: Capabilities for systematically evaluating model performance under various conditions, using statistical metrics and visualizations to analyze prediction quality and generalization ability.
Optimization Solver Comparison: Functionality for comparing different optimization methods used in solving specific problems, with tools for measuring convergence, computational efficiency, and scalability.
The underlying technical approach is based on the efficient implementation of algorithms and data structures, leveraging high-performance software libraries, and adopting software engineering practices such as modularity, encapsulation, and detailed documentation.
"""
from scipy.sparse import coo_matrix, eye, hstack
from tensorflow.keras.callbacks import Callback
from google.colab import drive
from tensorflow.python.ops.gen_math_ops import imag_eager_fallback
from tensorflow.python.ops.nn_ops import softmax
from keras import backend as k
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from IPython import display
from scipy.optimize import linprog
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error
import numpy as np
import matplotlib. pyplot as plt
from keras.layers import Activation, Dense, Input, BatchNormalization, Dropout, Conv1D, Flatten, MaxPool1D, Dot, Reshape, Conv2D, Concatenate, ReLU, Lambda, MaxPooling2D, Normalization
import cvxpy as cp
import tensorflow as tf
from keras import Model
from IPython import display
from tensorflow.keras import layers
from tensorflow.keras import regularizers
import warnings
import pandas as pd
from IPython import display
from keras.models import Sequential
import os
from pathlib import Path
import scipy.io as sio
import time
mse = mean_squared_error
mae = mean_absolute_error
mape = mean_absolute_percentage_error
warnings.filterwarnings("ignore")
[docs]def gen_w(Data_inf, Data_W, Data_T, Data_C, Data_Dc, Data_sto):
"""
Generates a matrix combining information from wells, pipes, compressors, users, and storages.
Parameters
----------
Data_inf : List
Information data.
Data_W : pd.DataFrame
Data related to wells.
Data_T : pd.DataFrame
Data related to pipes.
Data_C : pd.DataFrame
Data related to compressors.
Data_Dc : pd.DataFrame
Data related to users.
Data_sto : pd.DataFrame
Data related to storages.
Returns
-------
np.ndarray
A matrix combining all the information.
Raises
------
ValueError
If any input data is not in the expected format or type.
"""
if not all(isinstance(data, pd.DataFrame) for data in [Data_W, Data_T, Data_C, Data_Dc, Data_sto]):
raise ValueError("All data inputs must be pandas DataFrames")
num_nodes = len(Data_inf)
num_wells = len(Data_W)
wells_matrix = coo_matrix((np.ones(
num_wells, ), (Data_W['node'] - 1, np.arange(num_wells))), shape=(num_nodes, num_wells))
num_pipes = len(Data_T)
pipe_data = np.concatenate(
(-1.0 * np.ones(num_pipes), np.ones(num_pipes)))
pipe_row = pd.concat((Data_T['fnode'] - 1, Data_T['tnode'] - 1))
pipe_col = np.concatenate(2 * [np.arange(num_pipes)])
pipes_matrix = coo_matrix(
(pipe_data, (pipe_row, pipe_col)), shape=(num_nodes, num_pipes))
num_compressors = len(Data_C)
compressor_data = np.concatenate(
(-1.0 * np.ones(num_compressors), np.ones(num_compressors)))
compressor_row = pd.concat((Data_C['fnode'] - 1, Data_C['tnode'] - 1))
compressor_col = np.concatenate(2 * [np.arange(num_compressors)])
compressors_matrix = coo_matrix(
(compressor_data, (compressor_row, compressor_col)), shape=(num_nodes, num_compressors))
num_users = len(Data_Dc.T)
users_matrix = hstack(num_users * [eye(num_nodes)])
num_storages = len(Data_sto)
storage_matrix = coo_matrix((np.ones(num_storages, ), (
Data_sto['node'] - 1, np.arange(num_storages))), shape=(num_nodes, num_storages))
storage_matrix = hstack([storage_matrix, -1.0 * storage_matrix])
result_matrix = hstack((wells_matrix, pipes_matrix,
compressors_matrix, users_matrix, storage_matrix)).toarray()
return result_matrix
[docs]class CustomSigmoidActivation():
[docs] def __init__(self, lower_bound: float, upper_bound: float):
"""
Initializes the CustomSigmoidActivation with specified bounds.
Parameters
----------
lower_bound : float
The lower limit for the activation function.
upper_bound : float
The upper limit for the activation function.
"""
self.lower_bound = tf.cast(lower_bound, tf.float32)
self.upper_bound = tf.cast(upper_bound, tf.float32)
[docs] def __call__(self, x: tf.Tensor) -> tf.Tensor:
"""
Apply the custom sigmoid activation function to the input.
Parameters
----------
x : tf.Tensor
The input tensor to apply the activation function.
Returns
-------
tf.Tensor
The output tensor after applying the activation function.
"""
x = tf.cast(x, tf.float32)
x = k.sigmoid(x) * (self.upper_bound -
self.lower_bound) + self.lower_bound
return x
[docs] def get_config(self) -> dict:
"""
Returns the configuration of the CustomSigmoidActivation function.
Returns
-------
dict
A dictionary containing the configuration: lower and upper bounds.
"""
return {
"lower_bound": self.lower_bound.numpy(),
"upper_bound": self.upper_bound.numpy(),
}
[docs]def split(feature_data, output_data):
"""
Splits the feature and output data into training and test sets.
Parameters
----------
feature_data : array-like
The input features data.
output_data : array-like
The output data corresponding to the input features.
Returns
-------
tuple
A tuple containing split data: (X_train, X_test, y_train, y_test)
"""
X_train, X_test, y_train, y_test = train_test_split(
feature_data, output_data, test_size=0.2)
return X_train, X_test, y_train, y_test
[docs]def create_customized_dense_layer(input_tensor, layer_info, layer_name, activation_function, initializer):
"""
Creates a custom dense layer using the Keras Dense function.
Parameters
----------
input_tensor : Tensor
The input tensor for the dense layer.
layer_info : list
A list containing specific information for constructing the dense layer.
layer_name : str
The name of the dense layer.
activation_function : callable
The activation function to be applied to the output of the dense layer.
initializer : Initializer
The initializer for the kernel weights of the dense layer.
Returns
-------
Tensor
The output tensor from the dense layer.
"""
num_neurons = len(layer_info)
dense_layer = Dense(num_neurons, name=layer_name, kernel_initializer=initializer,
activation=activation_function)(input_tensor)
return dense_layer
[docs]def create_custom_pres_layer(input_tensor, lower_bound, upper_bound, layer_name, initializer):
"""
Creates a custom dense layer with a CustomSigmoidActivation activation function.
Parameters
----------
input_tensor : Tensor
The input tensor for the dense layer.
lower_bound : float
The lower bound for the CustomSigmoidActivation function.
upper_bound : float
The upper bound for the CustomSigmoidActivation function.
layer_name : str
The name of the dense layer.
initializer : Initializer
The initializer for the kernel weights of the dense layer.
Returns
-------
Tensor
The output tensor from the dense layer.
"""
num_neurons = len(lower_bound)
custom_activation = CustomSigmoidActivation(lower_bound, upper_bound)
dense_layer = Dense(num_neurons, name=layer_name, kernel_initializer=initializer,
activation=custom_activation)(input_tensor)
return dense_layer
[docs]class LearningRateReducer(Callback):
"""
A custom callback to reduce the learning rate based on epoch thresholds.
Attributes
----------
epoch_threshold : list of int
Epoch numbers at which the learning rate should be reduced.
Methods
-------
on_epoch_begin(epoch, logs=None):
Reduces the learning rate if the current epoch is in the epoch_threshold.
"""
[docs] def __init__(self, epoch_threshold):
"""
Initializes the LRReducer callback with specified epoch thresholds.
Parameters
----------
epoch_threshold : list of int
Epoch numbers at which the learning rate should be reduced.
"""
super(LearningRateReducer, self).__init__()
self.epoch_threshold = epoch_threshold
[docs] def on_epoch_begin(self, epoch, logs=None):
"""
Called at the beginning of an epoch during training.
Parameters
----------
epoch : int
The current epoch number.
logs : dict, optional
Currently no logs are used in this method.
"""
if epoch in self.epoch_threshold:
lr = float(self.model.optimizer.lr)
new_lr = lr * 0.1
self.model.optimizer.lr.assign(new_lr)
[docs]class WellDense(tf.keras.layers.Layer):
"""
A custom Keras layer that applies a threshold to the sum of inputs.
Attributes
----------
well : tf.Variable
The threshold value for the sum of inputs.
Methods
-------
call(inputs):
Computes the sum of inputs and applies the threshold.
get_config():
Returns the layer's configuration.
"""
[docs] def __init__(self, well=80.0, **kwargs):
"""
Initializes the WellDense layer with a specified threshold.
Parameters
----------
well : float, optional
The threshold value for the sum of inputs.
"""
super().__init__(**kwargs)
self.well = tf.Variable(
initial_value=well, trainable=False, dtype=tf.float32)
[docs] def call(self, inputs):
"""
Computes the sum of inputs and applies the threshold.
Parameters
----------
inputs : Tensor
The input tensor to the layer.
Returns
-------
Tensor
The output tensor after applying the threshold.
"""
x = inputs
w = tf.reduce_sum(x, axis=1, keepdims=True)
w = tf.where(w >= self.well, self.well, w)
return w
[docs] def get_config(self):
"""
Returns the configuration of the WellDense layer.
Returns
-------
dict
A dictionary containing the configuration: well.
"""
return {
"well": self.well.numpy(),
}
[docs]class RestrictedDense(tf.keras.layers.Layer):
"""
A custom dense layer with constraints on its output.
Attributes
----------
initializer : Initializer
The initializer for the kernel weights of the layer.
well : tf.Variable
A threshold value used in the layer's computations.
units : int
The number of units in the dense layer.
dense : tf.keras.layers.Dense
The underlying dense layer.
Methods
-------
call(inputs):
Computes the layer's output with the given inputs.
get_config():
Returns the layer's configuration.
"""
[docs] def __init__(self, initializer, well, units, **kwargs):
"""
Initializes the RestrictedDense layer.
Parameters
----------
initializer : Initializer
The initializer for the kernel weights of the layer.
well : float
A threshold value used in the layer's computations.
units : int
The number of units in the dense layer.
"""
super().__init__(**kwargs)
self.initializer = initializer
self.units = units
self.well = tf.Variable(
initial_value=well, trainable=False, dtype=tf.float32)
self.dense = tf.keras.layers.Dense(
self.units, activation='elu', kernel_initializer=self.initializer)
[docs] def call(self, inputs):
"""
Computes the output of the layer with given inputs.
Parameters
----------
inputs : tuple of Tensors
Inputs to the layer: (x, In, fw).
Returns
-------
Tensor
The output tensor of the layer.
"""
x, In, fw = inputs
x = self.dense(x)
x = tf.where(x < 0.0, 0.0, x)
x = tf.where(x < In, x, In)
mask = tf.where(fw >= self.well, 1.0, 0.0)
x = x * mask
return x
[docs] def get_config(self):
"""
Returns the configuration of the RestrictedDense layer.
Returns
-------
dict
A dictionary containing the configuration.
"""
return {
"initializer": self.initializer,
"well": self.well.numpy(),
"units": self.units,
}
[docs]def mish(x):
"""
Mish Activation Function.
Applies the mish function element-wise to the input Tensor.
Parameters
----------
x : Tensor
The input tensor.
Returns
-------
Tensor
The transformed tensor after applying the mish activation function.
"""
return x * tf.math.tanh(tf.math.softplus(x))
[docs]def swish(x):
"""
Swish Activation Function.
Applies the swish function element-wise to the input Tensor.
Parameters
----------
x : Tensor
The input tensor.
Returns
-------
Tensor
The transformed tensor after applying the swish activation function.
"""
return x * tf.sigmoid(x)
[docs]class CustomLossFuntion(tf.keras.losses.Loss):
"""
A custom loss function that computes a loss based on specific criteria
and includes a penalty term for certain conditions.
Attributes
----------
Kt : float
A coefficient used in the loss computation.
i : int
Index used in gathering elements from predictions.
j : int
Another index used in gathering elements from predictions.
Bc : float
A boundary condition used in the penalty term.
ic : int
Index for penalty computation.
jc : int
Another index for penalty computation.
find : int, optional
A fixed index to slice the predictions for certain operations.
"""
[docs] def __init__(self, Kt, i, j, Bc, ic, jc, find=6):
"""
Initializes the object with given parameters.
Parameters
----------
Kt : float
Constant value.
i : int
Integer value.
j : int
Integer value.
Bc : float
Boundary condition value.
ic : int
Integer value.
jc : int
Integer value.
find : int, optional
An integer defining something, by default 6.
Returns
-------
None
Notes
-----
This function initializes the object with the provided parameters and sets up
initial values for some internal variables such as bounds.
"""
super().__init__()
self.Kt = Kt
self.i = i
self.j = j
self.ic = ic
self.jc = jc
self.find = find
self.lower_bound = tf.constant(1.0, dtype=tf.float32)
self.upper_bound = tf.constant(Bc, dtype=tf.float32)
[docs] def call(self, y_true, y_pred):
"""
Computes the custom loss between true labels and predicted labels.
Parameters
----------
y_true : Tensor
True labels.
y_pred : Tensor
Predicted labels.
Returns
-------
Tensor
Computed loss value.
"""
f = tf.cast(y_pred[:, :self.find], tf.float32)
P = tf.cast(y_pred[:, self.find:], tf.float32)
pi = tf.gather(P, self.i, axis=1)
pj = tf.gather(P, self.j, axis=1)
loss = tf.keras.losses.huber(f, tf.sign(
pi**2 - pj**2) * tf.sqrt(self.Kt * tf.abs(pi**2 - pj**2)))
P_ic = tf.gather(P, self.ic, axis=1)
P_jc = tf.gather(P, self.jc, axis=1)
Penalty = tf.reduce_mean(
tf.where(P_jc >= P_ic, 0.0, tf.square(P_jc - P_ic)))
return loss + Penalty
[docs] def get_config(self):
"""
Returns the configuration of the custom loss function.
Returns
-------
dict
Configuration dictionary.
"""
config = super().get_config()
config.update({
'Kt': self.Kt,
'i': self.i,
'j': self.j,
'ic': self.ic,
'jc': self.jc,
'lower_bound': self.lower_bound.numpy(),
'upper_bound': self.upper_bound.numpy(),
'find': self.find
})
return config
[docs]def flow_model(path, fd, seeds=1, s=1):
"""
Builds and compiles a TensorFlow Keras model based on data from an Excel file and additional parameters.
Parameters
----------
path : str
Path to the Excel file containing the model's data.
fd : array-like
Additional flow data for the model.
seeds : int, optional
Seed for random number generation for reproducibility, by default 1.
s : int, optional
Mode switch for the model configuration, by default 1.
Returns
-------
tensorflow.keras.Model
The compiled Keras model.
Notes
-----
This function constructs and compiles a TensorFlow Keras model based on data extracted from an Excel file
specified by `path`. Additionally, it accepts additional flow data (`fd`) to incorporate into the model.
The `seeds` parameter is used to set a seed for random number generation, ensuring reproducibility,
while `s` allows for configuration of the model based on different modes.
"""
seed = seeds
np.random.seed(seed)
tf.random.set_seed(seed)
Data_inf = pd.read_excel(path, sheet_name='node.info')
Data_D = pd.read_excel(path, sheet_name='node.dem')
Data_Dc = pd.read_excel(path, sheet_name='node.demcost')
Data_W = pd.read_excel(path, sheet_name='well')
Data_T = pd.read_excel(path, sheet_name='pipe')
Data_C = pd.read_excel(path, sheet_name='comp')
Data_sto = pd.read_excel(path, sheet_name='sto')
w = gen_w(Data_inf, Data_W, Data_T, Data_C, Data_Dc, Data_sto)
N = w.shape[0]
initializer = tf.keras.initializers.GlorotNormal(seed=seed)
In = Input(shape=(N,))
N1 = 8
act = mish
x = In
for units in [N1 * 4, N1 * 8, N1 * 16, N1 * 32]:
x = Dense(units, activation=act, kernel_initializer=initializer)(x)
x = Normalization()(x)
F = []
if s == 0:
Li = np.concatenate((Data_T['Fg_min'].values, [0] * len(Data_C), Data_D['Res'].values * 0.0, Data_D['Ind'].values * 0.0, Data_D['Com'].values * 0.0,
Data_D['NGV'].values * 0.0, Data_D['Ref'].values,
Data_D['Pet'].values * 0.0, [0] * len(Data_sto.values) * 2))
Ls = np.concatenate((Data_T['Fg_max'].values, Data_C['fmaxc'].values, fd, Data_D['Ind'].values * 0.0, Data_D['Com'].values * 0.0, Data_D['NGV'].values * 0.0, Data_D['Ref'].values * 0.0, Data_D['Pet'].values * 0.0,
(Data_sto['V0'].values - Data_sto['V_max'].values) * 0.0, (Data_sto['Vmax'].values - Data_sto['V0'].values) * 0.0))
F = []
F0 = WellDense(well=Data_W['Imax'].values[0], name='F_0')(In)
F1 = create_customized_dense_layer(
x, Li, 'F_1', CustomSigmoidActivation(Li, Ls), initializer)
F = Concatenate(name='F')([F0, F1]) # ,F2,F3])
elif s == 1:
fdin = len(Data_W['Imin'].values) + \
len([0] * len(Data_T)) + len([0] * len(Data_C))
fdfn = len([0] * len(Data_sto.values) * 2) + \
len(Data_inf['Pmin'].values)
F0 = WellDense(well=Data_W['Imax'].values[0], name='F_0')(In)
Li = Data_T['Fg_min'].values
Ls = Data_T['Fg_max'].values
F1 = create_customized_dense_layer(
x, Li, 'F_1', CustomSigmoidActivation(Li, Ls), initializer)
Li = [0] * len(Data_C)
Ls = Data_C['fmaxc'].values
F2 = create_customized_dense_layer(
x, Li, 'F_2', CustomSigmoidActivation(Li, Ls), initializer)
F3 = RestrictedDense(
initializer=initializer, well=Data_W['Imax'].values[0], name='F_3', units=len(Data_inf))([x, In, F0])
Li = np.concatenate((Data_D['Ind'].values * 0.0, Data_D['Com'].values * 0.0, Data_D['NGV'].values * 0.0, Data_D['Ref'].values * 0.0,
Data_D['Pet'].values * 0.0, [0] * len(Data_sto.values) * 2))
Ls = np.concatenate((Data_D['Ind'].values * 0.0, Data_D['Com'].values * 0.0, Data_D['NGV'].values * 0.0, Data_D['Ref'].values * 0.0, Data_D['Pet'].values * 0.0,
Data_sto['V0'].values - Data_sto['V_max'].values, Data_sto['Vmax'].values - Data_sto['V0'].values))
F4 = create_customized_dense_layer(
x, Li, 'F_4', CustomSigmoidActivation(Li, Ls), initializer)
F = Concatenate(name='F')([F0, F1, F2, F3, F4])
Out1 = tf.matmul(F, tf.constant(w.T, dtype=tf.float32))
Kt = tf.constant(Data_T['Kij'].values, dtype=tf.float32)
i = Data_T['fnode'].values - 1
j = Data_T['tnode'].values - 1
ftin = len(Data_W)
ftfn = len(Data_T)
f = F[:, ftin:ftfn + ftin]
Li = Data_inf['Pmin'].values
Ls = Data_inf['Pmax'].values
x1 = Dense(N1 * 4, act, kernel_initializer=initializer)(f)
x1 = Normalization()(x1)
x1 = Dense(N1 * 8, act, kernel_initializer=initializer)(x1)
x1 = Normalization()(x1)
x1 = Dense(N1 * 16, act, kernel_initializer=initializer)(x1)
x1 = Normalization()(x1)
x1 = Dense(N1 * 32, act, kernel_initializer=initializer)(x1)
x1 = Normalization()(x1)
ib = Data_C['fnode'].values - 1
jb = Data_C['tnode'].values - 1
P = create_custom_pres_layer(x1, Li, Ls, 'P', initializer)
Out2 = Concatenate()([f, P])
Out3 = P
Bc = Data_C['ratio'].values
model = Model(In, [Out1, Out2])
model.compile(loss=['huber', CustomLossFuntion(Kt, i, j, Bc, ib, jb, ftfn)],
optimizer=tf.keras.optimizers.Adamax(1e-2), loss_weights=[0.7, 0.3])
return model
[docs]def plots(Balance, Wey, re, Costos, s):
"""
Creates a set of boxplot visualizations for various data frames.
This function generates a 2x2 grid of boxplots, each representing data from
one of the provided DataFrames. The boxplots for 'Balance' and 'Costos' are
set to a logarithmic scale. The title of the plots changes based on the value of 's'.
Parameters
----------
Balance : DataFrame
DataFrame containing balance data.
Wey : DataFrame
DataFrame containing Weymouth data.
re : DataFrame
DataFrame containing ratio data.
Costos : DataFrame
DataFrame containing cost data.
s : int
Mode switch for the plot titles.
Returns
-------
None
Notes
-----
This function generates visualizations for the provided DataFrames in a 2x2 grid of boxplots.
The titles of the plots change based on the value of 's'. 'Balance' and 'Costos' DataFrames
are displayed in a logarithmic scale.
"""
# Create a 2x2 subplot grid
fig, axs = plt.subplots(2, 2, figsize=(29, 20))
fontsize = 20
rotation_angle = 45
# Configure the Balance boxplot
Balance.boxplot(ax=axs[0, 0], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[0, 0].set_yscale('log')
axs[0, 0].set_ylabel('MAPE', fontsize=fontsize)
axs[0, 0].set_xlabel('Network dimension', fontsize=fontsize)
axs[0, 0].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[0, 0].tick_params(axis='y', labelsize=fontsize)
# Configure the Wey boxplot
Wey.boxplot(ax=axs[0, 1], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[0, 1].set_yscale('log')
axs[0, 1].set_ylabel('MAPE', fontsize=fontsize)
axs[0, 1].set_xlabel('Network dimension', fontsize=fontsize)
axs[0, 1].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[0, 1].tick_params(axis='y', labelsize=fontsize)
# Configure the re boxplot
re.boxplot(ax=axs[1, 0], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[1, 0].set_ylabel('MAPE', fontsize=fontsize)
axs[1, 0].set_xlabel('Network dimension', fontsize=fontsize)
axs[1, 0].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[1, 0].tick_params(axis='y', labelsize=fontsize)
# Configure the Costos boxplot
Costos.boxplot(ax=axs[1, 1], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[1, 1].set_yscale('symlog')
axs[1, 1].set_ylabel('Cost difference', fontsize=fontsize)
axs[1, 1].set_xlabel('Network dimension', fontsize=fontsize)
axs[1, 1].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[1, 1].tick_params(axis='y', labelsize=fontsize)
# Adjust layout and set the main title based on 's'
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
if s == 0:
fig.suptitle('Without Dynamic Function', fontsize=24, y=1.05)
elif s == 1:
fig.suptitle('With Dynamic Function', fontsize=24, y=1.05)
# Show the plot
plt.show()
[docs]def load_data(path, key):
"""
Loads data from a MATLAB file specified by 'path' and extracts data based on 'key'.
Parameters
----------
path (str): The path to the MATLAB file.
key (str): The key corresponding to the data within the MATLAB file.
Returns
-------
list: A list containing the extracted data.
The function iterates through eight sets of data within the MATLAB file, extracting
each set and appending it to a list. The extracted data is expected to be in a
specific format within the MATLAB file.
"""
aux = []
cont = 0
for i in range(9):
aux.append([])
X_test = sio.loadmat(path)[key][i]
for j in X_test:
aux[cont].append(j[0])
cont = cont + 1
return aux
[docs]def evaluate_balance(FT, W, X):
"""
Evaluates the balance of predictions using Mean Absolute Percentage Error (MAPE).
Parameters
----------
FT : list
A list containing two sets of forecasted data.
W : array-like
Weight matrix used in the calculation.
X : array-like
Actual data for comparison.
Returns
-------
list
A list containing two lists of MAPE values for each forecast set in FT.
Notes
-----
This function computes the Mean Absolute Percentage Error (MAPE) between the actual data (X)
and each set of forecasted data in FT, using the provided weight matrix W.
"""
aux1 = [mape(X[i], FT[0][i] @ W.T) for i in range(len(FT[0]))]
aux2 = [mape(X[i], FT[1][i] @ W.T) for i in range(len(FT[1]))]
return [aux1, aux2]
[docs]def evaluate_weymouth(X, Y):
"""
Evaluates the Weymouth equation performance by calculating the MAPE (Mean Absolute Percentage Error)
between actual values (X) and predicted values (Y).
Parameters
----------
X : list of lists
Actual values for comparison.
Y : list of lists
Predicted values to be compared against X.
Returns
-------
list of lists
Calculated MAPE values for each set of comparisons.
Notes
-----
This function calculates the Mean Absolute Percentage Error (MAPE) for two sets of data
(indicated by indices 0 and 1) in X and Y.
"""
# Using list comprehensions for more concise code
aux1 = [mape(X[0][i], Y[0][i]) for i in range(len(X[0]))]
aux2 = [mape(X[1][i], Y[1][i]) for i in range(len(X[1]))]
return [aux1, aux2]
[docs]def evaluate_cost(X, Y):
"""
Calculates the difference between predicted costs and actual costs.
Parameters
----------
X : list
A list of actual cost values.
Y : list
A list of predicted cost values.
Returns
-------
list
A list of differences between predicted and actual costs.
Notes
-----
This function iterates over each element in X and Y, calculates the
difference (Y[i] - X[i]) for each corresponding element, and returns
these differences in a list.
"""
return [Y[i] - X[i] for i in range(len(X))]
[docs]class TimeHistoryTest(Callback):
"""
A custom callback to track the time taken for predictions during model inference.
Attributes
----------
prediction_times : list
A list to store the time taken for each prediction batch.
Methods
-------
on_predict_begin(self, logs=None):
Called at the beginning of prediction.
on_predict_batch_begin(self, batch, logs=None):
Called at the beginning of a batch during prediction.
on_predict_batch_end(self, batch, logs=None):
Called at the end of a batch during prediction.
"""
[docs] def on_predict_begin(self, logs=None):
"""
Initializes the prediction_times list at the beginning of prediction.
Parameters
----------
logs : dict, optional
Dictionary containing the logs for the current prediction, by default None.
"""
self.prediction_times = []
[docs] def on_predict_batch_begin(self, batch, logs=None):
"""
Records the start time of a batch prediction.
Parameters
----------
batch : int
Index of the batch.
logs : dict, optional
Additional information about the batch, by default None.
"""
self.batch_time_start = time.time()
[docs] def on_predict_batch_end(self, batch, logs=None):
"""
Calculate and record the time taken for a batch prediction.
Parameters
----------
batch : int
Index of the batch.
logs : dict, optional
Additional information about the batch, by default None.
"""
batch_time_end = time.time() - self.batch_time_start
self.prediction_times.append(batch_time_end)
[docs]def plot_time(b='fit', s1='/content/drive/Shareddrives/red_gas_col/', s2='/content/drive/Shareddrives/red_gas_col/Prueba/Data/', s=1):
"""
Generates and displays boxplot visualizations for time measurements of different network models.
Parameters
----------
b : str, optional
Indicates the type of operation to perform, 'fit' for fitting or 'predict' for prediction, by default 'fit'.
s1 : str, optional
Base path for the file location, by default '/content/drive/Shareddrives/red_gas_col/'.
s2 : str, optional
Secondary path for data and model files, by default '/content/drive/Shareddrives/red_gas_col/Prueba/Data/'.
s : int, optional
Mode indicator for selecting model paths, by default 1.
Returns
-------
None
Notes
-----
The function first determines the file paths for the Excel and model files based on the mode 's'.
It then loads the test data and times data. If 'b' is 'predict', it iterates over models, performs
predictions, measures the time taken for predictions, and stores these times in a DataFrame.
This DataFrame is then used to create boxplot visualizations. If 'b' is not 'predict', it loads
time measurements from a CSV file and creates boxplots for these measurements.
The boxplots provide a comparison of time taken by different network models for predictions or fitting.
"""
s1_path = Path(s1)
s2_path = Path(s2)
filepath = ['ng_case8.xlsx', 'ng_case9.xlsx', 'ng_case10.xlsx', 'ng_case11.xlsx',
'ng_case12.xlsx', 'ng_case13.xlsx', 'ng_case14.xlsx', 'ng_case15.xlsx', 'ng_caseCol_18.xlsx']
if s == 0:
modelpath = ['model2_8.h5', 'model2_9.h5', 'model2_10.h5', 'model2_11.h5',
'model2_12.h5', 'model2_13.h5', 'model2_14.h5', 'model2_15.h5', 'model2_Col.h5']
elif s == 1:
modelpath = ['model8.h5', 'model9.h5', 'model10.h5', 'model11.h5',
'model12.h5', 'model13.h5', 'model14.h5', 'model15.h5', 'modelCol.h5']
X_test = load_data(s2_path / 'inputs_P.mat', 'inputs')
times = np.array(sio.loadmat(s2_path / 'times_P.mat')['times'])
fontsize = 20
rotation_angle = 45
if b == 'predict':
Times_p = pd.DataFrame()
for im in range(9):
path = s1_path / filepath[im]
fd = np.array(X_test[im]).max(axis=0)
model2 = flow_model(str(path), fd, seeds=1, s=s)
model2.load_weights(s2_path / modelpath[im])
time_callback = TimeHistoryTest()
predictions = model2.predict(np.array(X_test[im]), batch_size=1, callbacks=[
time_callback], verbose=False)
if im == 8:
Times_p['NN(Col_18)'] = time_callback.prediction_times
Times_p['S(Col_18)'] = extract_data(times, im)
else:
Times_p['NN(' + str(im + 8)
+ ')'] = time_callback.prediction_times
Times_p['S(' + str(im + 8) + ')'] = extract_data(times, im)
plt.figure(figsize=(20, 20))
Times_p.boxplot(boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
plt.yscale('log')
# Ajusta aquí el tamaño para la etiqueta del eje X
plt.xlabel('Network dimension', fontsize=20)
# Ajusta aquí el tamaño para la etiqueta del eje Y
plt.ylabel('Time[s]', fontsize=20)
# Configurar las etiquetas de los ejes y su tamaño de fuente
# Ajusta aquí el tamaño para los tick labels del eje X
plt.xticks(fontsize=20, rotation=45)
# Ajusta aquí el tamaño para los tick labels del eje Y
plt.yticks(fontsize=20)
plt.show()
else:
N = 15920
timesr = pd.read_csv(s2_path / 'Time.csv').values
im = pd.DataFrame()
for i in range(9):
if i == 8:
im['NN(Col_18)'] = timesr[:, i]
im['S(Col_18)'] = extract_data(times, i)
else:
im['NN(' + str(i + 8) + ')'] = timesr[:, i]
im['S(' + str(i + 8) + ')'] = extract_data(times, i)
plt.figure(figsize=(20, 20))
im.boxplot(boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
plt.yscale('log')
# Ajusta aquí el tamaño para la etiqueta del eje X
plt.xlabel('Network dimension', fontsize=20)
# Ajusta aquí el tamaño para la etiqueta del eje Y
plt.ylabel('Time[s]', fontsize=20)
# Configurar las etiquetas de los ejes y su tamaño de fuente
# Ajusta aquí el tamaño para los tick labels del eje X
plt.xticks(fontsize=20, rotation=45)
# Ajusta aquí el tamaño para los tick labels del eje Y
plt.yticks(fontsize=20)
plt.show()
[docs]def ng_case_evaluate(s1='/content/drive/Shareddrives/red_gas_col/', s2='/content/drive/Shareddrives/red_gas_col/Prueba/Data/', s=1):
"""
Evaluates multiple gas network cases using pre-trained models and generates evaluation metrics.
Parameters
----------
s1 : str, optional
Base directory path where the network case files are stored, by default '/content/drive/Shareddrives/red_gas_col/'.
s2 : str, optional
Directory path where input and output data files, and model files are stored, by default '/content/drive/Shareddrives/red_gas_col/Prueba/Data/'.
s : int, optional
Mode indicator (0 or 1) to choose between two sets of model paths, by default 1.
Returns
-------
tuple of DataFrames
A tuple of DataFrames containing evaluation metrics: (Balance, Weymouth, Pj/Pi, Cost).
Notes
-----
This function performs the following steps:
- Loads test input and output data.
- Based on the mode 's', selects a set of model paths.
- Iterates over multiple network cases, loading the necessary data for each case.
- For each case, it loads a pre-trained model and performs predictions.
- Computes evaluation metrics such as Weymouth, Balance, Cost, and Pressure Ratios (Pj/Pi).
- Aggregates the results into DataFrames for each metric.
- Finally, calls the `plots` function to visualize the results.
"""
s1_path = Path(s1)
s2_path = Path(s2)
X_test = load_data(s2_path / 'inputs_P.mat', 'inputs')
y_test = load_data(s2_path / 'outputs_P.mat', 'outputs')
if s == 0:
modelpath = ['model2_8.h5', 'model2_9.h5', 'model2_10.h5', 'model2_11.h5',
'model2_12.h5', 'model2_13.h5', 'model2_14.h5', 'model2_15.h5', 'model2_Col.h5']
elif s == 1:
modelpath = ['model8.h5', 'model9.h5', 'model10.h5', 'model11.h5',
'model12.h5', 'model13.h5', 'model14.h5', 'model15.h5', 'modelCol.h5']
filepath = ['ng_case8.xlsx', 'ng_case9.xlsx', 'ng_case10.xlsx', 'ng_case11.xlsx',
'ng_case12.xlsx', 'ng_case13.xlsx', 'ng_case14.xlsx', 'ng_case15.xlsx', 'ng_caseCol_18.xlsx']
Weymouth = pd.DataFrame()
Balance = pd.DataFrame()
Costos = pd.DataFrame()
PjPi = pd.DataFrame()
for im in range(9):
path = s1 + filepath[im]
Data_inf = pd.read_excel(path, sheet_name='node.info')
Data_D = pd.read_excel(path, sheet_name='node.dem')
Data_Dc = pd.read_excel(path, sheet_name='node.demcost')
Data_W = pd.read_excel(path, sheet_name='well')
Data_T = pd.read_excel(path, sheet_name='pipe')
Data_C = pd.read_excel(path, sheet_name='comp')
Data_sto = pd.read_excel(path, sheet_name='sto')
Cost = np.concatenate((Data_W['Cg'].values, Data_T['C_O'].values, Data_C['costc'], Data_Dc['al_Res'].values,
Data_Dc['al_Ind'].values, Data_Dc['al_Com'].values, Data_Dc[
'al_NGV'].values, Data_Dc['al_Ref'].values, Data_Dc['al_Pet'].values,
Data_sto['C_S+'].values - Data_sto['C_V'].values, -1 * (Data_sto['C_S-'] - Data_sto['C_V']).values, Data_sto['C_V'].values)).reshape(-1, 1)
w = gen_w(Data_inf, Data_W, Data_T, Data_C, Data_Dc, Data_sto)
y = np.array(y_test[im])
fd = np.array(X_test[im]).max(axis=0)
model2 = flow_model(str(path), fd, seeds=1, s=s)
model2.load_weights(s2_path / modelpath[im])
model_A = tf.keras.Model(
inputs=model2.inputs, outputs=model2.get_layer('F').output)
model_B = tf.keras.Model(
inputs=model2.inputs, outputs=model2.get_layer('P').output)
Fe = model_A.predict(np.array(X_test[im]), verbose=False)
Pe = model_B.predict(np.array(X_test[im]), verbose=False)
i = Data_T['fnode'].values.astype(int) - 1
j = Data_T['tnode'].values.astype(int) - 1
CK = Data_T['Kij'].values.reshape(-1,)
nod = len(Data_inf)
F = y[:, 1:-(nod)]
F = np.concatenate((F, np.zeros((F.shape[0], 2))), axis=1)
P = y[:, -(nod):]
Fte = Fe[:, 1:1 + len(Data_T)]
Ft = F[:, 1:1 + len(Data_T)]
Pie = Pe[:, i]
Pje = Pe[:, j]
Pte = np.sign(Pie**2 - Pje**2) * \
np.sqrt(CK * np.abs(Pie**2 - Pje**2))
Pi = P[:, i]
Pj = P[:, j]
Pt = np.sign(Pi**2 - Pj**2) * np.sqrt(CK * np.abs(Pi**2 - Pj**2))
balance = evaluate_balance([Fe, F], w, np.array(X_test[im]))
wey = evaluate_weymouth([Fte, Ft], [Pte, Pt])
costos = evaluate_cost(Fe @ Cost[:-1], F @ Cost[:-1])
ic = Data_C['fnode'].values - 1
jc = Data_C['tnode'].values - 1
Bc = Data_C['fnode'].values
are = np.round(Pe[:, jc] / Pe[:, ic], 4)
ind1 = np.unique(np.concatenate((np.where(~(are[:, 0] <= Bc[0]) | ~(are[:, 0] >= 1.0))[
0], np.where(~(are[:, 1] <= Bc[1]) | ~(are[:, 1] >= 1.0))[0])))
re1 = np.zeros(y.shape[0])
if len(ind1) != 0:
re1[ind1] = 100
are = P[:, jc] / P[:, ic]
ind1 = np.unique(np.concatenate((np.where(~(are[:, 0] <= Bc[0]) | ~(are[:, 0] >= 1.0))[
# .shape[0]/are.shape[0]*100
0], np.where(~(are[:, 1] <= Bc[1]) | ~(are[:, 1] >= 1.0))[0])))
re2 = np.zeros(y.shape[0])
if len(ind1) != 0:
re2[ind1] = 100
if im <= 7:
Weymouth['NN(' + str(nod) + ')'] = wey[0]
Weymouth['S(' + str(nod) + ')'] = wey[1]
Balance['NN(' + str(nod) + ')'] = balance[0]
Balance['S(' + str(nod) + ')'] = balance[1]
Costos['NN(' + str(nod) + ')'] = np.array(costos)[:, 0]
PjPi['NN(' + str(nod) + ')'] = re1
PjPi['S(' + str(nod) + ')'] = re2
else:
Weymouth['NN(Col_18)'] = wey[0]
Weymouth['S(Col_18)'] = wey[1]
Balance['NN(Col_18)'] = balance[0]
Balance['S(Col_18)'] = balance[1]
Costos['NN(Col_18)'] = np.array(costos)[:, 0]
PjPi['NN(Col_18)'] = re1
PjPi['S(Col_18)'] = re2
plots(Balance, Weymouth, PjPi, Costos, s=s)
return(Balance, Weymouth, PjPi, Costos)
[docs]def evaluate_balance_two(FT, W, X):
"""
Evaluate the Mean Absolute Percentage Error (MAPE) between actual and predicted data.
Parameters
----------
FT : list of numpy arrays
List of predicted flow data from models.
W : numpy array
Weight matrix used for the calculation.
X : list of numpy arrays
List of actual flow data for comparison.
Returns
-------
list
A list containing MAPE values for each corresponding set of predicted and actual data.
Notes
-----
This function computes the MAPE for each set of predicted data in 'FT' against the actual data in 'X'.
The computation involves a dot product of predicted data with the transpose of weight matrix 'W'.
The function iterates over the elements of 'FT' and 'X' to compute MAPE for each set of data.
"""
return [mape(X[i], FT[i] @ W.T) for i in range(len(FT))]
[docs]def evaluate_weymouth_two(X, Y):
"""
Calculate the Mean Absolute Percentage Error (MAPE) between two sets of flow data.
Parameters
----------
X : list of numpy arrays
The first set of flow data (usually the actual flow data).
Y : list of numpy arrays
The second set of flow data (usually the predicted flow data).
Returns
-------
list
A list of MAPE values, each corresponding to the MAPE between the elements of X and Y.
Notes
-----
This function iterates over the corresponding elements of X and Y, calculating the MAPE
for each pair of elements. The results are stored and returned in a list.
"""
return [mape(X[i], Y[i]) for i in range(len(X))]
[docs]def ng_evaluate_atip(s1='/content/drive/Shareddrives/red_gas_col/', s2='/content/drive/Shareddrives/red_gas_col/Prueba/Data/', s=1):
"""
Evaluate different neural network models on various gas network cases and plot their performance metrics.
Parameters
----------
s1 : str, optional
Base directory path for the model and Excel data files, by default '/content/drive/Shareddrives/red_gas_col/'.
s2 : str, optional
Path for the input data files, by default '/content/drive/Shareddrives/red_gas_col/Prueba/Data/'.
s : int, optional
Scenario indicator for choosing different sets of model paths, by default 1.
Returns
-------
tuple
A tuple containing lists of evaluation metrics (Balance, Weymouth, PjPi, Costos) for each network case.
Notes
-----
This function processes multiple gas network cases in a loop. For each case, it loads necessary data,
constructs and loads a neural network model, and predicts outputs. It then evaluates these predictions
using custom functions for different metrics such as Weymouth formula, balance, cost, and pressure ratios.
The function visualizes these metrics using boxplot graphs.
"""
s1_path = Path(s1)
s2_path = Path(s2)
if s == 0:
modelpath = ['model2_8.h5', 'model2_9.h5', 'model2_10.h5', 'model2_11.h5',
'model2_12.h5', 'model2_13.h5', 'model2_14.h5', 'model2_15.h5', 'model2_Col.h5']
elif s == 1:
modelpath = ['model8.h5', 'model9.h5', 'model10.h5', 'model11.h5',
'model12.h5', 'model13.h5', 'model14.h5', 'model15.h5', 'modelCol.h5']
filepath = ['ng_case8.xlsx', 'ng_case9.xlsx', 'ng_case10.xlsx', 'ng_case11.xlsx',
'ng_case12.xlsx', 'ng_case13.xlsx', 'ng_case14.xlsx', 'ng_case15.xlsx', 'ng_caseCol_18.xlsx']
fig, axs = plt.subplots(2, 2, figsize=(29, 20))
Weymouth = [] # pd.DataFrame()
Balance = [] # pd.DataFrame()
Costos = [] # pd.DataFrame()
PjPi = [] # pd.DataFrame()
for im in range(9):
path = s1_path / filepath[im]
Data_inf = pd.read_excel(path, sheet_name='node.info')
Data_D = pd.read_excel(path, sheet_name='node.dem')
Data_Dc = pd.read_excel(path, sheet_name='node.demcost')
Data_W = pd.read_excel(path, sheet_name='well')
Data_T = pd.read_excel(path, sheet_name='pipe')
Data_C = pd.read_excel(path, sheet_name='comp')
Data_sto = pd.read_excel(path, sheet_name='sto')
Cost = np.concatenate((Data_W['Cg'].values, Data_T['C_O'].values, Data_C['costc'], Data_Dc['al_Res'].values,
Data_Dc['al_Ind'].values, Data_Dc['al_Com'].values, Data_Dc[
'al_NGV'].values, Data_Dc['al_Ref'].values, Data_Dc['al_Pet'].values,
Data_sto['C_S+'].values - Data_sto['C_V'].values, -1 * (Data_sto['C_S-'] - Data_sto['C_V']).values, Data_sto['C_V'].values)).reshape(-1, 1)
w = gen_w(Data_inf, Data_W, Data_T, Data_C, Data_Dc, Data_sto)
if im == 8:
X_test = sio.loadmat(
s2_path / f'inputs_None_{im + 8}.mat')['inputs'][0]
else:
X_test = sio.loadmat(
s2_path / f'inputs_None_{im + 8}.mat')['inputs']
fd = np.array(X_test).max(axis=0)
model2 = flow_model(str(path), fd, seeds=1, s=s)
model2.load_weights(s2_path / modelpath[im])
model_A = tf.keras.Model(
inputs=model2.inputs, outputs=model2.get_layer('F').output)
model_B = tf.keras.Model(
inputs=model2.inputs, outputs=model2.get_layer('P').output)
Fe = model_A.predict(np.array(X_test), verbose=False)
Pe = model_B.predict(np.array(X_test), verbose=False)
i = Data_T['fnode'].values.astype(int) - 1
j = Data_T['tnode'].values.astype(int) - 1
CK = Data_T['Kij'].values.reshape(-1,)
Fte = Fe[:, 1:1 + len(Data_T)]
Pie = Pe[:, i]
Pje = Pe[:, j]
Pte = np.sign(Pie**2 - Pje**2) * \
np.sqrt(CK * np.abs(Pie**2 - Pje**2))
balance = evaluate_balance_two(Fe, w, np.array(X_test))
wey = evaluate_weymouth_two(Fte, Pte)
ic = Data_C['fnode'].values - 1
jc = Data_C['tnode'].values - 1
Bc = Data_C['fnode'].values
are = np.round(Pe[:, jc] / Pe[:, ic], 4)
ind1 = np.unique(np.concatenate((np.where(~(are[:, 0] <= Bc[0]) | ~(are[:, 0] >= 1.0))[
# .shape[0]/are.shape[0]*100
0], np.where(~(are[:, 1] <= Bc[1]) | ~(are[:, 1] >= 1.0))[0])))
re1 = np.zeros(X_test.shape[0])
if len(ind1) != 0:
re1[ind1] = 100
costos = (Fe @ Cost[:-1]).flatten()
Weymouth.append(wey)
Balance.append(balance)
Costos.append(costos)
PjPi.append(re1)
ax1 = axs[0, 0]
ax2 = axs[0, 1]
ax3 = axs[1, 0]
ax4 = axs[1, 1]
# ax1.boxplot(Balance)
ax1.boxplot(Balance, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
fontsize = 20
rotation_angle = 45
a = ['NN(8)', 'NN(9)', 'NN(10)', 'NN(11)', 'NN(12)',
'NN(13)', 'NN(14)', 'NN(15)', 'NNCol(16)']
# ax1.set_title('Balance')
ax1.set_ylabel('MAPE', fontsize=20)
ax1.set_yscale('log')
ax1.set_xlabel('Network dimension', fontsize=20)
ax1.set_xticklabels(a)
ax1.tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
ax1.tick_params(axis='y', labelsize=fontsize)
ax2.boxplot(Weymouth, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
# ax2.set_title('Weymouth')
ax2.set_yscale('log')
ax2.set_ylabel('MAPE', fontsize=20)
ax2.set_xlabel('Network dimension', fontsize=20)
ax2.set_xticklabels(a)
ax2.tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
ax2.tick_params(axis='y', labelsize=fontsize)
ax3.boxplot(PjPi, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
# ax3.set_title('Pj/Pi')
ax3.set_ylabel('MAPE', fontsize=20)
ax3.set_xlabel('Network dimension', fontsize=20)
ax3.set_xticklabels(a)
ax3.tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
ax3.tick_params(axis='y', labelsize=fontsize)
ax4.boxplot(Costos, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
# ax4.set_title('Costos')#MAPE---COSTOS
ax4.set_yscale('log')
ax4.set_ylabel('Costs', fontsize=20)
ax4.set_xlabel('Network dimension', fontsize=20)
ax4.set_xticklabels(a)
ax4.tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
ax4.tick_params(axis='y', labelsize=fontsize)
# Ajustar el diseño para evitar solapamientos
plt.tight_layout()
# Mostrar la gráfica
plt.show()
return(Balance, Weymouth, PjPi, Costos)
[docs]def visualize_atipic(dataframe, columns):
"""
Identifies and visualizes the percentage of outlier values in specified columns of a DataFrame.
Parameters
----------
dataframe : DataFrame
The DataFrame in which to identify outliers.
columns : list
List of column names in the DataFrame to analyze.
Returns
-------
None
Notes
-----
This function utilizes the Interquartile Range (IQR) method to identify outliers.
A value is considered an outlier if it is below Q1 - 1.5 * IQR or above Q3 + 1.5 * IQR,
where Q1 and Q3 are the first and third quartiles, respectively.
The function generates a bar plot displaying the percentage of outliers in each column.
"""
outlier_percentages = []
colors = [] # List to store colors for each column
plt.figure(figsize=(20, 8))
for index, column in enumerate(columns):
Q1 = dataframe[column].quantile(0.25)
Q3 = dataframe[column].quantile(0.75)
IQR = Q3 - Q1
# Calculate the percentage of outliers
outliers = ((dataframe[column] < Q1 - 1.5 * IQR) |
(dataframe[column] > Q3 + 1.5 * IQR)) & (dataframe[column] >= 1.0)
percentage = (outliers.sum() / len(dataframe[column])) * 100
outlier_percentages.append(percentage)
# Assign color based on index parity
colors.append('orange' if index % 2 == 0 else 'blue')
plt.bar(columns, outlier_percentages, color=colors)
plt.xlabel('Network Dimension', fontsize=20)
plt.ylabel('Percentage of Outliers', fontsize=20)
plt.xticks(fontsize=20, rotation=45)
plt.yticks(fontsize=20)
plt.show()
[docs]def visualize_non_convergence(data_path='/content/drive/Shareddrives/red_gas_col/Prueba/Data/inputs_None_', network_range=(8, 17)):
"""
Visualizes the percentage of non-convergence cases for different network dimensions.
Parameters
----------
data_path : str, optional
Path to the directory containing input data files, by default '/content/drive/Shareddrives/red_gas_col/Prueba/Data/inputs_None_'.
network_range : tuple, optional
Range of network files to analyze (inclusive), by default (8, 17).
Returns
-------
None
Notes
-----
This function loads data from specified files and calculates the percentage of
non-convergence cases. It then generates a bar plot to visually represent this data.
"""
data_path = Path(data_path)
percentages = []
labels = []
for file_index in range(*network_range):
file_path = data_path.parent / f'{data_path.name}{file_index}.mat'
data = sio.loadmat(file_path)['inputs']
non_convergence_percentage = (data.shape[0] / 1000) * 100
percentages.append(non_convergence_percentage)
label = f'N(Col_18)' if file_index == 16 else f'N({data.shape[1]})'
labels.append(label)
# Create bar plot
plt.figure(figsize=(20, 8))
plt.bar(labels, percentages, color='blue')
plt.xlabel('Network Dimension', fontsize=15)
plt.ylabel('Percentage of Non-Convergence', fontsize=15)
plt.xticks(fontsize=10, rotation=45)
plt.yticks(fontsize=10)
plt.show()
[docs]def identify_atypical_values(files):
"""
Identify and visualize the percentage of atypical values in a list of arrays.
Parameters
----------
files : list of arrays
List of arrays. Each array represents a different data set.
Returns
-------
None
Notes
-----
This function calculates the percentage of values in each array that are greater than 1,
indicating atypical values. It then visualizes these percentages using a bar plot.
"""
percentages = []
labels = []
# Process each file to calculate the percentage of atypical values
for i, file in enumerate(files):
data_array = np.array(file)
atypical_percentage = (
data_array[data_array > 1].shape[0] / data_array.shape[0]) * 100
percentages.append(atypical_percentage)
# Create labels for the x-axis based on the index of the file
label = 'R(' + str(i + 8) + ')' if i != 8 else 'N(Col18)'
labels.append(label)
# Plotting
plt.bar(labels, percentages, color='blue')
plt.xlabel('Network Dimension')
plt.ylabel('Percentage')
plt.xticks(rotation=45)
plt.show()
[docs]def dynamic_val(s1='/content/drive/Shareddrives/red_gas_col/', s2='/content/drive/Shareddrives/red_gas_col/Prueba/Data/'):
"""
Generates sinusoidal behavior and adjusts random data accordingly.
Parameters
----------
s1 : str, optional
Path for the first directory, by default '/content/drive/Shareddrives/red_gas_col/'.
s2 : str, optional
Path for the second directory, by default '/content/drive/Shareddrives/red_gas_col/Prueba/Data/'.
Returns
-------
X_test : Randomly generated data.
Notes
-----
This function generates sinusoidal behavior and adjusts random data accordingly.
"""
# Step 1: Generate desired sinusoidal behavior
x = np.linspace(0, 2 * np.pi, 1000) # Generates x values from 0 to 2*pi
amplitude = 150 # Sine wave amplitude
frequency = 1 # Sine wave frequency
# Generates the sine wave
sine_wave = abs(amplitude * np.sin(frequency * x))
# Step 2: Create a matrix of random data
data = np.random.rand(1000, 18) # Random data
# Step 3: Adjust random data
# The idea is to scale the data so that its sum along the rows equals the corresponding sine wave value
# Current sum of data along the rows
data_sum = np.sum(data, axis=1)
adjustment_factor = sine_wave / data_sum # Adjustment factor for each row
s1_path = Path(s1)
s2_path = Path(s2)
# Adjust the data
for i in range(data.shape[1]):
data[:, i] *= adjustment_factor
X_test = data
modelpath = 'modelCol.h5'
filepath = 'ng_caseCol_18.xlsx'
path = s1_path / filepath
Data_inf = pd.read_excel(path, sheet_name='node.info')
Data_D = pd.read_excel(path, sheet_name='node.dem')
Data_Dc = pd.read_excel(path, sheet_name='node.demcost')
Data_W = pd.read_excel(path, sheet_name='well')
Data_T = pd.read_excel(path, sheet_name='pipe')
Data_C = pd.read_excel(path, sheet_name='comp')
Data_sto = pd.read_excel(path, sheet_name='sto')
Cost = np.concatenate((Data_W['Cg'].values, Data_T['C_O'].values, Data_C['costc'], Data_Dc['al_Res'].values,
Data_Dc['al_Ind'].values, Data_Dc['al_Com'].values, Data_Dc[
'al_NGV'].values, Data_Dc['al_Ref'].values, Data_Dc['al_Pet'].values,
Data_sto['C_S+'].values - Data_sto['C_V'].values, -1 * (Data_sto['C_S-'] - Data_sto['C_V']).values, Data_sto['C_V'].values)).reshape(-1, 1)
w = gen_w(Data_inf, Data_W, Data_T, Data_C, Data_Dc, Data_sto)
fd = X_test.max(axis=0)
# Assuming Fd, Fw, and data are your data.
plt.figure(figsize=(15, 5))
model2 = flow_model(path, fd, seeds=1, s=1)
model2.load_weights(s2_path / modelpath)
model_A = tf.keras.Model(inputs=model2.inputs,
outputs=model2.get_layer('F').output)
model_B = tf.keras.Model(inputs=model2.inputs,
outputs=model2.get_layer('P').output)
i = len(Data_W) + len(Data_C) + len(Data_T)
j = len(Data_inf)
Fd = model_A.predict(X_test, verbose=False)[:, i:i + j]
plt.plot(Fd.sum(axis=1), label="Dynamic-Unsupplied Gas")
Fw = model_A.predict(X_test, verbose=False)[:, 0]
plt.plot(Fw, label="Dynamic-Gas Supply")
plt.plot(data.sum(axis=1), label="Total Demand")
model2 = flow_model(path, fd, seeds=1, s=0)
modelpath = 'model2_Col.h5'
model2.load_weights(s2_path / modelpath)
model_A = tf.keras.Model(inputs=model2.inputs,
outputs=model2.get_layer('F').output)
model_B = tf.keras.Model(inputs=model2.inputs,
outputs=model2.get_layer('P').output)
i = len(Data_W) + len(Data_C) + len(Data_T)
j = len(Data_inf)
Fd = model_A.predict(X_test, verbose=False)[:, i:i + j]
plt.plot(Fd.sum(axis=1), label="Unsupplied Gas")
plt.xlabel("Samples", fontsize=20) # Adds label on x-axis
plt.ylabel("Flows", fontsize=20) # Adds label on y-axis
plt.legend() # Displays legend on the plot
# Adjust here the size for the X axis tick labels
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.show()
[docs]def loss_val():
"""
Visualizes and compares differences in performance metrics of various models using box plots.
Notes
-----
This function loads performance data from different models from the files Weymouth.xlsx, Balance.xlsx, and Costos.xlsx.
Then, box plots are generated for each of these metrics, showing how the performance values are distributed among the different models.
The plots display the Mean Absolute Percentage Error (MAPE) and costs, facilitating visual comparison between the models.
A logarithmic scale is used to ease the visualization of data that may have a wide range of values.
This function has no input parameters and does not return any value. The results are directly displayed as plots.
"""
W = pd.read_csv(
'/content/drive/Shareddrives/red_gas_col/Prueba/Data/Weymouth.xlsx')
W.drop(['Unnamed: 0'], axis=1, inplace=True)
B = pd.read_csv(
'/content/drive/Shareddrives/red_gas_col/Prueba/Data/Balance.xlsx')
B.drop(['Unnamed: 0'], axis=1, inplace=True)
C = pd.read_csv(
'/content/drive/Shareddrives/red_gas_col/Prueba/Data/Costos.xlsx')
C.drop(['Unnamed: 0'], axis=1, inplace=True)
fig, axs = plt.subplots(1, 3, figsize=(20, 5))
fontsize = 10
rotation_angle = 45
# Organizar los datos en la gráfica y establecer escala logarítmica
B.boxplot(ax=axs[0], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[0].set_yscale('log')
axs[0].set_ylabel('MAPE', fontsize=15)
axs[0].set_xlabel('Loss Functions', fontsize=15)
axs[0].tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
axs[0].tick_params(axis='y', labelsize=fontsize)
# for label in axs[0, 0].get_xticklabels():
# label.set_rotation(rotation_angle)
W.boxplot(ax=axs[1], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
#axs[0, 1].set_title('Weymouth')
axs[1].set_yscale('log')
axs[1].set_ylabel('MAPE', fontsize=15)
axs[1].set_xlabel('Loss Functions', fontsize=15)
axs[1].tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
axs[1].tick_params(axis='y', labelsize=fontsize)
# for label in axs[0, 1].get_xticklabels():
C.boxplot(ax=axs[2], boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
#axs[1, 1].set_title('Costos')
axs[2].set_yscale('symlog')
axs[2].set_ylabel('Costs', fontsize=15)
axs[2].set_xlabel('Loss Functions', fontsize=15)
axs[2].tick_params(axis='x', labelsize=fontsize, rotation=rotation_angle)
axs[2].tick_params(axis='y', labelsize=fontsize)
[docs]def bounded(s1='/content/drive/Shareddrives/red_gas_col/', s2='/content/drive/Shareddrives/red_gas_col/Prueba/Data/', s=1):
"""
Generates box plots to visualize and compare performance metric differences across various models.
This function loads performance data of different models from the Weymouth.xlsx, Balance.xlsx, and Costos.xlsx files.
Then, box plots are generated for each of these metrics, showing how the performance values are distributed
among the different models.
The plots display the Mean Absolute Percentage Error (MAPE) and costs, facilitating visual comparison between the models.
A logarithmic scale is used to ease the visualization of data that may have a wide range of values.
Parameters
----------
s1 : str, optional
Path to the directory containing input data files. Default is '/content/drive/Shareddrives/red_gas_col/'.
s2 : str, optional
Path to the directory containing model data files. Default is '/content/drive/Shareddrives/red_gas_col/Prueba/Data/'.
s : int, optional
Determines the type of model. If 0, uses one set of models, if 1, uses another set. Default is 1.
Returns
-------
None
The results are directly displayed as plots.
"""
s1_path = Path(s1)
s2_path = Path(s2)
X_test = load_data(s2_path / 'inputs_P.mat', 'inputs')
y_test = load_data(s2_path / 'outputs_P.mat', 'outputs')
if s == 0:
modelpath = ['model2_8.h5', 'model2_9.h5', 'model2_10.h5', 'model2_11.h5',
'model2_12.h5', 'model2_13.h5', 'model2_14.h5', 'model2_15.h5', 'model2_Col.h5']
elif s == 1:
modelpath = ['model8.h5', 'model9.h5', 'model10.h5', 'model11.h5',
'model12.h5', 'model13.h5', 'model14.h5', 'model15.h5', 'modelCol.h5']
filepath = ['ng_case8.xlsx', 'ng_case9.xlsx', 'ng_case10.xlsx', 'ng_case11.xlsx','ng_case12.xlsx', 'ng_case13.xlsx', 'ng_case14.xlsx', 'ng_case15.xlsx', 'ng_caseCol_18.xlsx']
Weymouth = pd.DataFrame()
Balance = pd.DataFrame()
Costos = pd.DataFrame()
PjPi = pd.DataFrame()
im = 8
path = s1_path / filepath[im]
Data_inf = pd.read_excel(path, sheet_name='node.info')
Data_D = pd.read_excel(path, sheet_name='node.dem')
Data_Dc = pd.read_excel(path, sheet_name='node.demcost')
Data_W = pd.read_excel(path, sheet_name='well')
Data_T = pd.read_excel(path, sheet_name='pipe')
Data_C = pd.read_excel(path, sheet_name='comp')
Data_sto = pd.read_excel(path, sheet_name='sto')
Cost = np.concatenate((Data_W['Cg'].values, Data_T['C_O'].values, Data_C['costc'], Data_Dc['al_Res'].values,
Data_Dc['al_Ind'].values, Data_Dc['al_Com'].values, Data_Dc[
'al_NGV'].values, Data_Dc['al_Ref'].values, Data_Dc['al_Pet'].values,
Data_sto['C_S+'].values - Data_sto['C_V'].values, -1 * (Data_sto['C_S-'] - Data_sto['C_V']).values, Data_sto['C_V'].values)).reshape(-1, 1)
w = gen_w(Data_inf, Data_W, Data_T, Data_C, Data_Dc, Data_sto)
y = np.array(y_test[im])
fd = np.array(X_test[im]).max(axis=0)
model2 = flow_model(path, fd, seeds=1, s=s)
model2.load_weights(s2_path / modelpath[im])
model_A = tf.keras.Model(inputs=model2.inputs,
outputs=model2.get_layer('F').output)
model_B = tf.keras.Model(inputs=model2.inputs,
outputs=model2.get_layer('P').output)
Fe = model_A.predict(np.array(X_test[im]), verbose=False)
Pe = model_B.predict(np.array(X_test[im]), verbose=False)
i = Data_T['fnode'].values.astype(int) - 1
j = Data_T['tnode'].values.astype(int) - 1
CK = Data_T['Kij'].values.reshape(-1,)
nod = len(Data_inf)
Fte = Fe[:, 1:1 + len(Data_T)]
Fce = Fe[:, 1 + len(Data_T):1 + len(Data_T) + len(Data_C)]
Fwe = Fe[:, 1:2]
fig, axs = plt.subplots(2, 2, figsize=(29, 20))
fontsize = 20
rotation_angle = 45
# Organizar los datos en la gráfica y establecer escala logarítmica
axs[0, 0].boxplot(Fwe, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[0, 0].set_yscale('symlog')
axs[0, 0].plot(1, 80, 'k*', markersize=15, label='Injection Limit')
axs[0, 0].plot(1, 0, 'k*', markersize=15)
axs[0, 0].legend()
axs[0, 0].legend(fontsize=20)
axs[0, 0].set_ylabel('Gas injection', fontsize=20)
axs[0, 0].set_xlabel('Well node', fontsize=20)
axs[0, 0].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[0, 0].tick_params(axis='y', labelsize=fontsize)
# for label in axs[0, 0].get_xticklabels():
# label.set_rotation(rotation_angle)
Li = Data_T['Fg_min'].values
Ls = Data_T['Fg_max'].values
axs[0, 1].boxplot(Fte, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
axs[0, 1].set_xticklabels(np.arange(2, 2 + len(Data_T)))
#axs[0, 1].set_title('Weymouth')
axs[0, 1].set_yscale('symlog')
axs[0, 1].plot(np.arange(1, 1 + len(Data_T)), Ls,
'k*', markersize=15, label='Flow limit')
axs[0, 1].plot(np.arange(1, 1 + len(Data_T)), Li, 'k*', markersize=15)
axs[0, 1].legend()
axs[0, 1].legend(fontsize=20)
axs[0, 1].set_ylabel('Gas flow', fontsize=20)
axs[0, 1].set_xlabel('Pipeline node', fontsize=20)
axs[0, 1].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[0, 1].tick_params(axis='y', labelsize=fontsize)
# for label in axs[0, 1].get_xticklabels():
Li = [0] * len(Data_C)
Ls = Data_C['fmaxc'].values
axs[1, 0].boxplot(Fce, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
#axs[1, 1].set_title('Costos')
axs[1, 0].set_xticklabels(
np.arange(2 + len(Data_T), 2 + len(Data_T) + len(Data_C)))
axs[1, 0].set_yscale('symlog')
axs[1, 0].plot(np.arange(1, 4), Ls, 'k*',
markersize=15, label='Flow limit')
axs[1, 0].plot(np.arange(1, 4), Li, 'k*', markersize=15)
axs[1, 0].legend()
axs[1, 0].legend(fontsize=20)
axs[1, 0].set_ylabel('Gas flow', fontsize=20)
axs[1, 0].set_xlabel('Compressor node', fontsize=20)
axs[1, 0].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[1, 0].tick_params(axis='y', labelsize=fontsize)
Li = Data_inf['Pmin'].values
Ls = Data_inf['Pmax'].values
axs[1, 1].boxplot(Pe, boxprops=dict(color='blue'), medianprops=dict(
color='orangered'), whiskerprops=dict(color='blue'))
#axs[1, 1].set_title('Costos')
#axs[1, 1].set_yscale('log')
axs[1, 1].plot(np.arange(1, 19), Ls, 'k*',
markersize=15, label='Pressure limit')
axs[1, 1].legend()
axs[1, 1].legend(fontsize=20)
axs[1, 1].set_ylabel('Pressure[psia]', fontsize=20)
axs[1, 1].set_xlabel('Node', fontsize=20)
axs[1, 1].tick_params(axis='x', labelsize=fontsize,
rotation=rotation_angle)
axs[1, 1].tick_params(axis='y', labelsize=fontsize)