class AdvancedNeuralAgent:
def __init__(self, input_size, hidden_layers=[64, 32], output_size=1, learning_rate=0.001):
"""Superior AI Agent with steady coaching and determination making capabilities"""
self.lr = learning_rate
self.initial_lr = learning_rate
self.layers = []
self.reminiscence = []
self.performance_history = []
self.epsilon = 1e-8
layer_sizes = [input_size] + hidden_layers + [output_size]
for i in vary(len(layer_sizes) - 1):
fan_in, fan_out = layer_sizes[i], layer_sizes[i+1]
restrict = np.sqrt(6.0 / (fan_in + fan_out))
layer = {
'weights': np.random.uniform(-limit, restrict, (layer_sizes[i], layer_sizes[i+1])),
'bias': np.zeros((1, layer_sizes[i+1])),
'momentum_w': np.zeros((layer_sizes[i], layer_sizes[i+1])),
'momentum_b': np.zeros((1, layer_sizes[i+1]))
}
self.layers.append(layer)
def activation(self, x, func="relu"):
"""Steady activation capabilities with clipping"""
x = np.clip(x, -50, 50)
if func == 'relu':
return np.most(0, x)
elif func == 'sigmoid':
return 1 / (1 + np.exp(-x))
elif func == 'tanh':
return np.tanh(x)
elif func == 'leaky_relu':
return np.the place(x > 0, x, x * 0.01)
elif func == 'linear':
return x
def activation_derivative(self, x, func="relu"):
"""Steady derivatives"""
x = np.clip(x, -50, 50)
if func == 'relu':
return (x > 0).astype(float)
elif func == 'sigmoid':
s = self.activation(x, 'sigmoid')
return s * (1 - s)
elif func == 'tanh':
return 1 - np.tanh(x)**2
elif func == 'leaky_relu':
return np.the place(x > 0, 1, 0.01)
elif func == 'linear':
return np.ones_like(x)
def ahead(self, X):
"""Ahead move with gradient clipping"""
self.activations = [X]
self.z_values = []
current_input = X
for i, layer in enumerate(self.layers):
z = np.dot(current_input, layer['weights']) + layer['bias']
z = np.clip(z, -50, 50)
self.z_values.append(z)
if i < len(self.layers) - 1:
a = self.activation(z, 'leaky_relu')
else:
a = self.activation(z, 'linear')
self.activations.append(a)
current_input = a
return current_input
def clip_gradients(self, gradients, max_norm=1.0):
"""Gradient clipping to stop explosion"""
grad_norm = np.linalg.norm(gradients)
if grad_norm > max_norm:
gradients = gradients * (max_norm / (grad_norm + self.epsilon))
return gradients
def backward(self, X, y, output):
"""Steady backpropagation with gradient clipping"""
m = X.form[0]
dz = (output - y.reshape(-1, 1)) / m
dz = np.clip(dz, -10, 10)
for i in reversed(vary(len(self.layers))):
layer = self.layers[i]
dw = np.dot(self.activations[i].T, dz)
db = np.sum(dz, axis=0, keepdims=True)
dw = self.clip_gradients(dw, max_norm=1.0)
db = self.clip_gradients(db, max_norm=1.0)
momentum = 0.9
layer['momentum_w'] = momentum * layer['momentum_w'] + (1 - momentum) * dw
layer['momentum_b'] = momentum * layer['momentum_b'] + (1 - momentum) * db
weight_decay = 0.0001
layer['weights'] -= self.lr * (layer['momentum_w'] + weight_decay * layer['weights'])
layer['bias'] -= self.lr * layer['momentum_b']
if i > 0:
activation_func="leaky_relu" if i > 1 else 'leaky_relu'
dz = np.dot(dz, layer['weights'].T) * self.activation_derivative(
self.z_values[i-1], activation_func)
dz = np.clip(dz, -10, 10)
def adapt_learning_rate(self, epoch, performance_history):
"""Adaptive studying price with performance-based adjustment"""
if epoch > 10:
recent_performance = performance_history[-10:]
if len(recent_performance) >= 5:
if recent_performance[-1] >= recent_performance[-5]:
self.lr = max(self.lr * 0.95, self.initial_lr * 0.01)
elif recent_performance[-1] < recent_performance[-5] * 0.98:
self.lr = min(self.lr * 1.02, self.initial_lr * 2)
def calculate_loss(self, y_true, y_pred):
"""Steady loss calculation"""
y_true = y_true.reshape(-1, 1)
y_pred = np.clip(y_pred, -1e6, 1e6)
mse = np.imply((y_true - y_pred) ** 2)
mae = np.imply(np.abs(y_true - y_pred))
if not np.isfinite(mse):
mse = 1e6
if not np.isfinite(mae):
mae = 1e6
return mse, mae
def store_experience(self, state, motion, reward, next_state):
"""Expertise replay for RL facets"""
expertise = {
'state': state,
'motion': motion,
'reward': reward,
'next_state': next_state,
'timestamp': len(self.reminiscence)
}
self.reminiscence.append(expertise)
if len(self.reminiscence) > 1000:
self.reminiscence.pop(0)
def make_decision(self, X, exploration_rate=0.1):
"""Steady determination making"""
prediction = self.ahead(X)
if np.random.random() < exploration_rate:
noise_scale = np.std(prediction) * 0.1 if np.std(prediction) > 0 else 0.1
noise = np.random.regular(0, noise_scale, prediction.form)
prediction += noise
return np.clip(prediction, -1e6, 1e6)
def reset_if_unstable(self):
"""Reset community if coaching turns into unstable"""
print("🔄 Resetting community as a consequence of instability...")
for i, layer in enumerate(self.layers):
fan_in, fan_out = layer['weights'].form
restrict = np.sqrt(6.0 / (fan_in + fan_out))
layer['weights'] = np.random.uniform(-limit, restrict, (fan_in, fan_out))
layer['bias'] = np.zeros((1, fan_out))
layer['momentum_w'] = np.zeros((fan_in, fan_out))
layer['momentum_b'] = np.zeros((1, fan_out))
self.lr = self.initial_lr
def practice(self, X, y, epochs=500, batch_size=32, validation_split=0.2, verbose=True):
"""Sturdy coaching with stability checks"""
y_mean, y_std = np.imply(y), np.std(y)
y_normalized = (y - y_mean) / (y_std + self.epsilon)
X_trn, X_val, y_trn, y_val = train_test_split(
X, y_normalized, test_size=validation_split, random_state=42)
best_val_loss = float('inf')
endurance = 30
patience_counter = 0
train_losses, val_losses = [], []
reset_count = 0
for epoch in vary(epochs):
if epoch > 0 and (not np.isfinite(train_losses[-1]) or train_losses[-1] > 1e6):
if reset_count < 2:
self.reset_if_unstable()
reset_count += 1
proceed
else:
print("🚫 Coaching unstable, stopping...")
break
indices = np.random.permutation(len(X_train))
X_train_shuffled = X_train[indices]
y_train_shuffled = y_train[indices]
epoch_loss = 0
batches = 0
for i in vary(0, len(X_trn), batch_size):
batch_X = X_train_shuffled[i:i+batch_size]
batch_y = y_train_shuffled[i:i+batch_size]
if len(batch_X) == 0:
proceed
output = self.ahead(batch_X)
self.backward(batch_X, batch_y, output)
loss, _ = self.calculate_loss(batch_y, output)
epoch_loss += loss
batches += 1
avg_train_loss = epoch_loss / max(batches, 1)
val_output = self.ahead(X_val)
val_loss, val_mae = self.calculate_loss(y_val, val_output)
train_losses.append(avg_train_loss)
val_losses.append(val_loss)
self.performance_history.append(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= endurance:
if verbose:
print(f"✋ Early stopping at epoch {epoch}")
break
if epoch > 0:
self.adapt_learning_rate(epoch, self.performance_history)
if verbose and (epoch % 50 == 0 or epoch < 10):
print(f"Epoch {epoch:3d}: Practice Loss = {avg_train_loss:.4f}, "
f"Val Loss = {val_loss:.4f}, LR = {self.lr:.6f}")
self.y_mean, self.y_std = y_mean, y_std
return train_losses, val_losses
def predict(self, X):
"""Make predictions with denormalization"""
normalized_pred = self.ahead(X)
if hasattr(self, 'y_mean') and hasattr(self, 'y_std'):
return normalized_pred * self.y_std + self.y_mean
return normalized_pred
def evaluate_performance(self, X, y):
"""Complete efficiency analysis"""
predictions = self.predict(X)
mse, mae = self.calculate_loss(y, predictions)
y_mean = np.imply(y)
ss_tot = np.sum((y - y_mean) ** 2)
ss_res = np.sum((y.reshape(-1, 1) - predictions) ** 2)
r2 = 1 - (ss_res / (ss_tot + self.epsilon))
return {
'mse': float(mse) if np.isfinite(mse) else float('inf'),
'mae': float(mae) if np.isfinite(mae) else float('inf'),
'r2': float(r2) if np.isfinite(r2) else -float('inf'),
'predictions': predictions.flatten()
}
def visualize_training(self, train_losses, val_losses):
"""Visualize coaching progress"""
plt.determine(figsize=(15, 5))
plt.subplot(1, 3, 1)
plt.plot(train_losses, label="Coaching Loss", alpha=0.8)
plt.plot(val_losses, label="Validation Loss", alpha=0.8)
plt.title('Coaching Progress')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.subplot(1, 3, 2)
if len(self.performance_history) > 0:
plt.plot(self.performance_history)
plt.title('Efficiency Historical past')
plt.xlabel('Epoch')
plt.ylabel('Validation Loss')
plt.grid(True, alpha=0.3)
plt.yscale('log')
plt.subplot(1, 3, 3)
if hasattr(self, 'lr_history'):
plt.plot(self.lr_history)
plt.title('Studying Charge Schedule')
plt.xlabel('Epoch')
plt.ylabel('Studying Charge')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.present()