AdaEvolve
Hierarchical adaptive search: G-signal exploration intensity, UCB island allocation, and LLM meta-guidance on stagnation.
"""AdaEvolve SelectionPolicy component — Level-1 adaptive exploration intensity (the G signal)
+ Level-2 UCB island bandit + 3-mode parent sampling.
One file per component (see scaffold.py). Faithful port of SkyDiscover's ``AdaptiveState`` and
``MultiDimensionalAdapter`` (search/adaevolve/adaptation.py) plus the sampling half of
``AdaEvolveDatabase``. Where the paper and the reference code diverge, this port follows the
code: G is updated only on improvement events (no ``G ← ρ·G`` decay during stagnation), island
spawning triggers on global *productivity* (improvements/evaluations < threshold) rather than
``G ≤ τ_S``, and parent selection uses the 3-mode split ``P(explore)=I``,
``P(exploit)=(1−I)·0.7``, ``P(balanced)=(1−I)·0.3``.
The end-of-iteration bookkeeping (UCB island rotation, ring migration, dynamic spawning —
upstream ``AdaEvolveDatabase.end_iteration``) is exposed as :meth:`AdaEvolvePolicy.end_iteration`
and called by the scaffold's ``periodic`` hook, because ``observe`` is skipped on NO_DIFF steps
while ``periodic`` runs every iteration (the open mirror of upstream's ``finally`` block).
All randomness flows through ``self.rng``.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from ...components.selection import SelectionPolicy
from ...records import Genome, RunState, Selection
from .population import ISLAND_CONFIG_PRESETS
# ---------------------------------------------------------------------------------------------
# Level 1 — AdaptiveState (adaptation.py::AdaptiveState)
# ---------------------------------------------------------------------------------------------
@dataclass
class AdaptiveState:
"""Per-island Level-1 state: the accumulated improvement signal G and the local best."""
decay: float = 0.9
epsilon: float = 1e-8
intensity_min: float = 0.15
intensity_max: float = 0.5
accumulated_signal: float = 0.0 # G
best_score: float = field(default=float("-inf"))
improvement_count: int = 0
total_evaluations: int = 0
def _normalize_delta(self, raw_delta: float) -> float:
"""δ = min(raw / (|local best| + ε), 1); the first-ever improvement is always 1.0."""
if self.best_score == float("-inf"):
return 1.0
return min(raw_delta / (abs(self.best_score) + self.epsilon), 1.0)
def record_evaluation(self, fitness: float) -> float:
"""The verbatim ``record_evaluation``: improvement-only updates — G is NOT decayed on a
non-improving evaluation (the reference-code divergence from the paper)."""
self.total_evaluations += 1
if fitness > self.best_score:
raw_delta = fitness - self.best_score
normalized_delta = self._normalize_delta(raw_delta)
self.best_score = fitness
self.improvement_count += 1
# G_t = ρ * G_{t-1} + (1 - ρ) * δ²
self.accumulated_signal = (self.decay * self.accumulated_signal
+ (1 - self.decay) * normalized_delta ** 2)
return normalized_delta
return 0.0
def receive_external_improvement(self, fitness: float) -> float:
"""Migration path: update the local best + G via the same EMA, but never the
improvement/evaluation counters (the receiver didn't earn the UCB credit)."""
if fitness <= self.best_score:
return 0.0
raw_delta = fitness - self.best_score
normalized_delta = self._normalize_delta(raw_delta)
self.best_score = fitness
self.accumulated_signal = (self.decay * self.accumulated_signal
+ (1 - self.decay) * normalized_delta ** 2)
return normalized_delta
def get_search_intensity(self) -> float:
"""I = I_min + (I_max − I_min) / (1 + sqrt(G + ε))."""
return self.intensity_min + (self.intensity_max - self.intensity_min) / (
1 + math.sqrt(self.accumulated_signal + self.epsilon)
)
def get_productivity(self) -> float:
return self.improvement_count / self.total_evaluations if self.total_evaluations else 0.0
# ---------------------------------------------------------------------------------------------
# Level 2 — MultiDimensionalAdapter (adaptation.py::MultiDimensionalAdapter)
# ---------------------------------------------------------------------------------------------
class MultiDimensionalAdapter:
"""UCB bandit over islands with dual normalization: LOCAL best for intensity (Level 1),
GLOBAL best for UCB rewards (the "poor island bias" fix). Rewards R and decayed visits V both
decay at ρ so ``R/V`` stays a recency-weighted average (the "breakthrough memory" fix); the
exploration bonus uses RAW visit counts."""
def __init__(self, num_dims: int, decay: float = 0.9, intensity_min: float = 0.15,
intensity_max: float = 0.5, ucb_exploration: float = 1.41, min_visits: int = 3,
epsilon: float = 1e-8):
self.decay = decay
self.intensity_min = intensity_min
self.intensity_max = intensity_max
self.ucb_exploration = ucb_exploration # C = √2
self.min_visits = min_visits
self.epsilon = epsilon
self.global_best_score = float("-inf")
self.states: list[AdaptiveState] = []
self.dimension_visits: list[int] = [] # raw n_k (the UCB bonus denominator)
self.dimension_rewards: list[float] = [] # decayed R_k (globally normalized)
self.decayed_visits: list[float] = [] # decayed V_k
for _ in range(num_dims):
self.add_dimension()
def add_dimension(self) -> None:
self.states.append(AdaptiveState(decay=self.decay, epsilon=self.epsilon,
intensity_min=self.intensity_min,
intensity_max=self.intensity_max))
self.dimension_visits.append(0)
self.dimension_rewards.append(0.0)
self.decayed_visits.append(0.0)
def _normalize_by_global(self, raw_delta: float) -> float:
if raw_delta <= 0:
return 0.0
if self.global_best_score == float("-inf"):
return 1.0
return min(raw_delta / (abs(self.global_best_score) + self.epsilon), 1.0)
def record_evaluation(self, dim_idx: int, fitness: float) -> float:
"""The verbatim adapter ``record_evaluation``: local update via the state, then the
globally-normalized reward into the decayed UCB stats."""
local_best_before = self.states[dim_idx].best_score
local_normalized_delta = self.states[dim_idx].record_evaluation(fitness)
self.dimension_visits[dim_idx] += 1
self.decayed_visits[dim_idx] = self.decay * self.decayed_visits[dim_idx] + 1.0 # V = ρV + 1
if fitness > local_best_before:
raw_delta = fitness - local_best_before
global_normalized_delta = self._normalize_by_global(raw_delta)
if fitness > self.global_best_score:
self.global_best_score = fitness
else:
global_normalized_delta = 0.0
self.dimension_rewards[dim_idx] = (
self.decay * self.dimension_rewards[dim_idx] + global_normalized_delta # R = ρR + r
)
return local_normalized_delta
def receive_external_improvement(self, dim_idx: int, fitness: float) -> float:
"""Migration path: global best may rise, the state's local best/G update, UCB untouched."""
if fitness > self.global_best_score:
self.global_best_score = fitness
return self.states[dim_idx].receive_external_improvement(fitness)
def select_dimension_ucb(self, total_iterations: int, rng) -> int:
"""``select_dimension_ucb``: random among islands with raw visits < ``min_visits``; else
argmax of ``R/V + C·sqrt(ln(N+1)/n_raw)``."""
n_dims = len(self.states)
underexplored = [i for i in range(n_dims) if self.dimension_visits[i] < self.min_visits]
if underexplored:
return rng.choice(underexplored)
best_idx, best_score = 0, float("-inf")
for i in range(n_dims):
dec_visits = self.decayed_visits[i]
reward_avg = self.dimension_rewards[i] / dec_visits if dec_visits > 0 else 0.0
raw_visits = self.dimension_visits[i]
bonus = (self.ucb_exploration * math.sqrt(math.log(total_iterations + 1) / raw_visits)
if raw_visits > 0 else float("inf"))
ucb_score = reward_avg + bonus
if ucb_score > best_score:
best_idx, best_score = i, ucb_score
return best_idx
def get_global_productivity(self) -> float:
"""Σ improvements / Σ evaluations across islands; 1.0 when empty (assume productive —
prevents spawning before any data)."""
total = sum(s.total_evaluations for s in self.states)
if total == 0:
return 1.0
return sum(s.improvement_count for s in self.states) / total
# ---------------------------------------------------------------------------------------------
# The policy (sampling half of AdaEvolveDatabase + controller scheduling)
# ---------------------------------------------------------------------------------------------
class AdaEvolvePolicy(SelectionPolicy):
"""Adaptive-intensity 3-mode parent sampling on the island chosen by the UCB bandit.
:meth:`select` evolves the island chosen at the previous :meth:`end_iteration` (island 0 at
start), computes the intensity from that island's G, draws the explore/exploit/balanced mode,
samples the parent per mode from the island's archive, then — if a paradigm is active
(``state.signals["adaevolve"]["paradigm_active"]``) — overrides the parent with the global
best genome (the upstream parent-replacement under an active paradigm) BEFORE the inspiration
set is computed, so the parent never appears in its own inspirations. Inspirations =
``max(1, int(n·local_ratio))`` most-distant-from-parent programs from the island's top half
plus a global top-fitness fill that excludes ONLY the parent (reference ``_sample_global_top``
— global picks may duplicate local picks). The sampling island is published on
``state.signals["adaevolve"]["island"]``; the Proposer stamps it onto the child (archived
parents' metadata is never mutated here).
:meth:`observe` mirrors ``AdaEvolveDatabase.add``'s state updates: archive-rejected and
eval-gated children (``metadata["admitted"]`` false) update nothing; migrants would go through
``receive_external_improvement``; normal children through the full local+UCB update.
"""
def __init__(self, seed: int = 0, num_islands: int = 2, decay: float = 0.9,
intensity_min: float = 0.15, intensity_max: float = 0.5,
use_adaptive_search: bool = True, fixed_intensity: float = 0.4,
use_ucb_selection: bool = True, use_migration: bool = True,
migration_interval: int = 15, use_dynamic_islands: bool = True,
max_islands: int = 5, spawn_productivity_threshold: float = 0.015,
spawn_cooldown_iterations: int = 30, local_context_program_ratio: float = 0.6,
num_inspirations: int = 4):
super().__init__(seed)
if intensity_min > intensity_max: # upstream validation: swap, then clamp decay
intensity_min, intensity_max = intensity_max, intensity_min
decay = min(max(decay, 0.0), 1.0)
self.use_adaptive_search = use_adaptive_search
self.fixed_intensity = fixed_intensity
self.use_ucb_selection = use_ucb_selection
self.use_migration = use_migration
self.migration_interval = max(1, int(migration_interval))
self.use_dynamic_islands = use_dynamic_islands
self.max_islands = int(max_islands)
self.spawn_productivity_threshold = spawn_productivity_threshold
self.spawn_cooldown_iterations = int(spawn_cooldown_iterations)
self.local_context_program_ratio = local_context_program_ratio
self.num_inspirations = int(num_inspirations)
self.adapter = MultiDimensionalAdapter(num_dims=max(1, int(num_islands)), decay=decay,
intensity_min=intensity_min,
intensity_max=intensity_max)
self.current_island = 0
# upstream AdaEvolveDatabase inits last_spawn_iteration = -spawn_cooldown so the FIRST dynamic
# spawn is gated only by productivity/cap/non-empty (not by the cooldown); initializing to 0
# would block any spawn until iteration == spawn_cooldown_iterations.
self.last_spawn_iteration = -self.spawn_cooldown_iterations
# ---- select ---------------------------------------------------------------------------------
def select(self, population, state: RunState | None = None) -> Selection:
members = population.all()
if not members:
raise RuntimeError("cannot select from an empty population")
while len(self.adapter.states) < population.num_islands: # keep dims in sync (defensive)
self.adapter.add_dimension()
isl = self.current_island % population.num_islands
sig = state.signals.setdefault("adaevolve", {}) if state is not None else {}
# Level 1: intensity from the island's G, then the 3-mode split
intensity = (self.adapter.states[isl].get_search_intensity()
if self.use_adaptive_search else self.fixed_intensity)
rand = self.rng.random()
if rand < intensity:
mode = "exploration" # P = I
elif rand < intensity + (1 - intensity) * 0.7:
mode = "exploitation" # P = (1 − I) · 0.7
else:
mode = "balanced" # P = (1 − I) · 0.3
parent = population.sample_parent(isl, mode, self.rng)
# active paradigm: the parent is REPLACED with the global best BEFORE the inspiration set
# is computed, so the final parent can never appear in its own inspirations
if sig.get("paradigm_active"):
best = population.best()
if best is not None:
parent = best
# inspirations: local most-distant-from-parent (top half) + global top-fitness fill; the
# global share excludes ONLY the parent (reference _sample_global_top) and may therefore
# duplicate local picks
n = self.num_inspirations
local_n = max(1, int(n * self.local_context_program_ratio))
local = population.context_candidates(isl, parent, local_n)
global_n = n - local_n
inspirations = local + (population.global_top(global_n, exclude={parent.id})
if global_n > 0 else [])
# sibling context for the prompt: last <=5 children of this parent ON THE CURRENT ISLAND
siblings = [{"parent_fitness": parent.fitness, "child_fitness": kid.fitness,
"delta": kid.fitness - parent.fitness}
for kid in population.children_of(parent.id, limit=5, island=isl)]
sig.update({"mode": mode, "intensity": intensity, "island": isl, "siblings": siblings,
"num_islands": population.num_islands,
"global_productivity": self.adapter.get_global_productivity()})
return Selection(parent=parent, inspirations=inspirations, pool=members)
# ---- observe ----------------------------------------------------------------------------------
def observe(self, genome: Genome, state: RunState | None = None) -> None:
if not genome.metadata.get("admitted"):
return # rejected/eval-gated children update neither G nor UCB stats
isl = genome.metadata.get("island", self.current_island)
if not isinstance(isl, int) or not (0 <= isl < len(self.adapter.states)):
isl = self.current_island % len(self.adapter.states)
if genome.metadata.get("migrated_from") is not None:
self.adapter.receive_external_improvement(isl, genome.fitness)
else:
self.adapter.record_evaluation(isl, genome.fitness)
# ---- end of iteration (AdaEvolveDatabase.end_iteration, called from scaffold.periodic) ------
def end_iteration(self, iteration: int, population) -> None:
"""(1) dynamic-spawn check, (2) pick the next island (UCB or round-robin), (3) ring
migration every ``migration_interval`` iterations — in the upstream order."""
if (self.use_dynamic_islands and len(population) > 0
and population.num_islands < self.max_islands
and iteration - self.last_spawn_iteration >= self.spawn_cooldown_iterations
and self.adapter.get_global_productivity() < self.spawn_productivity_threshold):
self._spawn(population)
self.last_spawn_iteration = iteration
if self.use_ucb_selection:
self.current_island = self.adapter.select_dimension_ucb(iteration, self.rng)
else:
self.current_island = (iteration + 1) % population.num_islands # round-robin ablation
if self.use_migration and iteration > 0 and iteration % self.migration_interval == 0:
for migrant in population.migrate():
dest = migrant.metadata.get("migrated_to", migrant.metadata.get("island", 0))
self.adapter.receive_external_improvement(int(dest) % len(self.adapter.states),
migrant.fitness)
def _spawn(self, population) -> None:
"""``_spawn_island``: least-used preset (random among ties, through ``self.rng``), a fresh
adaptive-state dimension, and top-5 seeding (done by the population)."""
usage = population.preset_usage()
lowest = min(usage.values())
ties = [p for p in ISLAND_CONFIG_PRESETS if usage[p["name"]] == lowest]
preset = self.rng.choice(ties)
population.spawn_island(preset)
self.adapter.add_dimension()