galapagos
DocsHubLeaderboardPlaygroundNews
galapagos

six blocks · any task ·
better solutions emerge.

Platform

  • Hub
  • Leaderboard
  • Playground

Resources

  • Docs
  • API reference
  • Card spec

Community

  • GitHub
  • Contribute

Updates

  • News
  • Releases

© 2026 Galapagos. Licensed under Apache-2.0.

Build your own scaffold.

Hub/Scaffolds/SkyDiscover/AdaEvolve

SkyDiscover/adaevolve

AdaEvolve

Hierarchical adaptive search: G-signal exploration intensity, UCB island allocation, and LLM meta-guidance on stagnation.

Test-time searchApache-2.0
Scaffold cardFiles and versions
adaevolve/selection_policy.py
333 lines · 17.9 KBpythonDownload
"""AdaEvolve SelectionPolicy component — Level-1 adaptive exploration intensity (the G signal)
+ Level-2 UCB island bandit + 3-mode parent sampling.

One file per component (see scaffold.py). Faithful port of SkyDiscover's ``AdaptiveState`` and
``MultiDimensionalAdapter`` (search/adaevolve/adaptation.py) plus the sampling half of
``AdaEvolveDatabase``. Where the paper and the reference code diverge, this port follows the
code: G is updated only on improvement events (no ``G ← ρ·G`` decay during stagnation), island
spawning triggers on global *productivity* (improvements/evaluations < threshold) rather than
``G ≤ τ_S``, and parent selection uses the 3-mode split ``P(explore)=I``,
``P(exploit)=(1−I)·0.7``, ``P(balanced)=(1−I)·0.3``.

The end-of-iteration bookkeeping (UCB island rotation, ring migration, dynamic spawning —
upstream ``AdaEvolveDatabase.end_iteration``) is exposed as :meth:`AdaEvolvePolicy.end_iteration`
and called by the scaffold's ``periodic`` hook, because ``observe`` is skipped on NO_DIFF steps
while ``periodic`` runs every iteration (the open mirror of upstream's ``finally`` block).

All randomness flows through ``self.rng``.
"""
from __future__ import annotations

import math
from dataclasses import dataclass, field

from ...components.selection import SelectionPolicy
from ...records import Genome, RunState, Selection
from .population import ISLAND_CONFIG_PRESETS


# ---------------------------------------------------------------------------------------------
# Level 1 — AdaptiveState  (adaptation.py::AdaptiveState)
# ---------------------------------------------------------------------------------------------


@dataclass
class AdaptiveState:
    """Per-island Level-1 state: the accumulated improvement signal G and the local best."""

    decay: float = 0.9
    epsilon: float = 1e-8
    intensity_min: float = 0.15
    intensity_max: float = 0.5
    accumulated_signal: float = 0.0                 # G
    best_score: float = field(default=float("-inf"))
    improvement_count: int = 0
    total_evaluations: int = 0

    def _normalize_delta(self, raw_delta: float) -> float:
        """δ = min(raw / (|local best| + ε), 1); the first-ever improvement is always 1.0."""
        if self.best_score == float("-inf"):
            return 1.0
        return min(raw_delta / (abs(self.best_score) + self.epsilon), 1.0)

    def record_evaluation(self, fitness: float) -> float:
        """The verbatim ``record_evaluation``: improvement-only updates — G is NOT decayed on a
        non-improving evaluation (the reference-code divergence from the paper)."""
        self.total_evaluations += 1
        if fitness > self.best_score:
            raw_delta = fitness - self.best_score
            normalized_delta = self._normalize_delta(raw_delta)
            self.best_score = fitness
            self.improvement_count += 1
            # G_t = ρ * G_{t-1} + (1 - ρ) * δ²
            self.accumulated_signal = (self.decay * self.accumulated_signal
                                       + (1 - self.decay) * normalized_delta ** 2)
            return normalized_delta
        return 0.0

    def receive_external_improvement(self, fitness: float) -> float:
        """Migration path: update the local best + G via the same EMA, but never the
        improvement/evaluation counters (the receiver didn't earn the UCB credit)."""
        if fitness <= self.best_score:
            return 0.0
        raw_delta = fitness - self.best_score
        normalized_delta = self._normalize_delta(raw_delta)
        self.best_score = fitness
        self.accumulated_signal = (self.decay * self.accumulated_signal
                                   + (1 - self.decay) * normalized_delta ** 2)
        return normalized_delta

    def get_search_intensity(self) -> float:
        """I = I_min + (I_max − I_min) / (1 + sqrt(G + ε))."""
        return self.intensity_min + (self.intensity_max - self.intensity_min) / (
            1 + math.sqrt(self.accumulated_signal + self.epsilon)
        )

    def get_productivity(self) -> float:
        return self.improvement_count / self.total_evaluations if self.total_evaluations else 0.0


# ---------------------------------------------------------------------------------------------
# Level 2 — MultiDimensionalAdapter  (adaptation.py::MultiDimensionalAdapter)
# ---------------------------------------------------------------------------------------------


class MultiDimensionalAdapter:
    """UCB bandit over islands with dual normalization: LOCAL best for intensity (Level 1),
    GLOBAL best for UCB rewards (the "poor island bias" fix). Rewards R and decayed visits V both
    decay at ρ so ``R/V`` stays a recency-weighted average (the "breakthrough memory" fix); the
    exploration bonus uses RAW visit counts."""

    def __init__(self, num_dims: int, decay: float = 0.9, intensity_min: float = 0.15,
                 intensity_max: float = 0.5, ucb_exploration: float = 1.41, min_visits: int = 3,
                 epsilon: float = 1e-8):
        self.decay = decay
        self.intensity_min = intensity_min
        self.intensity_max = intensity_max
        self.ucb_exploration = ucb_exploration      # C = √2
        self.min_visits = min_visits
        self.epsilon = epsilon
        self.global_best_score = float("-inf")
        self.states: list[AdaptiveState] = []
        self.dimension_visits: list[int] = []       # raw n_k (the UCB bonus denominator)
        self.dimension_rewards: list[float] = []    # decayed R_k (globally normalized)
        self.decayed_visits: list[float] = []       # decayed V_k
        for _ in range(num_dims):
            self.add_dimension()

    def add_dimension(self) -> None:
        self.states.append(AdaptiveState(decay=self.decay, epsilon=self.epsilon,
                                         intensity_min=self.intensity_min,
                                         intensity_max=self.intensity_max))
        self.dimension_visits.append(0)
        self.dimension_rewards.append(0.0)
        self.decayed_visits.append(0.0)

    def _normalize_by_global(self, raw_delta: float) -> float:
        if raw_delta <= 0:
            return 0.0
        if self.global_best_score == float("-inf"):
            return 1.0
        return min(raw_delta / (abs(self.global_best_score) + self.epsilon), 1.0)

    def record_evaluation(self, dim_idx: int, fitness: float) -> float:
        """The verbatim adapter ``record_evaluation``: local update via the state, then the
        globally-normalized reward into the decayed UCB stats."""
        local_best_before = self.states[dim_idx].best_score
        local_normalized_delta = self.states[dim_idx].record_evaluation(fitness)
        self.dimension_visits[dim_idx] += 1
        self.decayed_visits[dim_idx] = self.decay * self.decayed_visits[dim_idx] + 1.0  # V = ρV + 1
        if fitness > local_best_before:
            raw_delta = fitness - local_best_before
            global_normalized_delta = self._normalize_by_global(raw_delta)
            if fitness > self.global_best_score:
                self.global_best_score = fitness
        else:
            global_normalized_delta = 0.0
        self.dimension_rewards[dim_idx] = (
            self.decay * self.dimension_rewards[dim_idx] + global_normalized_delta    # R = ρR + r
        )
        return local_normalized_delta

    def receive_external_improvement(self, dim_idx: int, fitness: float) -> float:
        """Migration path: global best may rise, the state's local best/G update, UCB untouched."""
        if fitness > self.global_best_score:
            self.global_best_score = fitness
        return self.states[dim_idx].receive_external_improvement(fitness)

    def select_dimension_ucb(self, total_iterations: int, rng) -> int:
        """``select_dimension_ucb``: random among islands with raw visits < ``min_visits``; else
        argmax of ``R/V + C·sqrt(ln(N+1)/n_raw)``."""
        n_dims = len(self.states)
        underexplored = [i for i in range(n_dims) if self.dimension_visits[i] < self.min_visits]
        if underexplored:
            return rng.choice(underexplored)
        best_idx, best_score = 0, float("-inf")
        for i in range(n_dims):
            dec_visits = self.decayed_visits[i]
            reward_avg = self.dimension_rewards[i] / dec_visits if dec_visits > 0 else 0.0
            raw_visits = self.dimension_visits[i]
            bonus = (self.ucb_exploration * math.sqrt(math.log(total_iterations + 1) / raw_visits)
                     if raw_visits > 0 else float("inf"))
            ucb_score = reward_avg + bonus
            if ucb_score > best_score:
                best_idx, best_score = i, ucb_score
        return best_idx

    def get_global_productivity(self) -> float:
        """Σ improvements / Σ evaluations across islands; 1.0 when empty (assume productive —
        prevents spawning before any data)."""
        total = sum(s.total_evaluations for s in self.states)
        if total == 0:
            return 1.0
        return sum(s.improvement_count for s in self.states) / total


# ---------------------------------------------------------------------------------------------
# The policy  (sampling half of AdaEvolveDatabase + controller scheduling)
# ---------------------------------------------------------------------------------------------


class AdaEvolvePolicy(SelectionPolicy):
    """Adaptive-intensity 3-mode parent sampling on the island chosen by the UCB bandit.

    :meth:`select` evolves the island chosen at the previous :meth:`end_iteration` (island 0 at
    start), computes the intensity from that island's G, draws the explore/exploit/balanced mode,
    samples the parent per mode from the island's archive, then — if a paradigm is active
    (``state.signals["adaevolve"]["paradigm_active"]``) — overrides the parent with the global
    best genome (the upstream parent-replacement under an active paradigm) BEFORE the inspiration
    set is computed, so the parent never appears in its own inspirations. Inspirations =
    ``max(1, int(n·local_ratio))`` most-distant-from-parent programs from the island's top half
    plus a global top-fitness fill that excludes ONLY the parent (reference ``_sample_global_top``
    — global picks may duplicate local picks). The sampling island is published on
    ``state.signals["adaevolve"]["island"]``; the Proposer stamps it onto the child (archived
    parents' metadata is never mutated here).

    :meth:`observe` mirrors ``AdaEvolveDatabase.add``'s state updates: archive-rejected and
    eval-gated children (``metadata["admitted"]`` false) update nothing; migrants would go through
    ``receive_external_improvement``; normal children through the full local+UCB update.
    """

    def __init__(self, seed: int = 0, num_islands: int = 2, decay: float = 0.9,
                 intensity_min: float = 0.15, intensity_max: float = 0.5,
                 use_adaptive_search: bool = True, fixed_intensity: float = 0.4,
                 use_ucb_selection: bool = True, use_migration: bool = True,
                 migration_interval: int = 15, use_dynamic_islands: bool = True,
                 max_islands: int = 5, spawn_productivity_threshold: float = 0.015,
                 spawn_cooldown_iterations: int = 30, local_context_program_ratio: float = 0.6,
                 num_inspirations: int = 4):
        super().__init__(seed)
        if intensity_min > intensity_max:  # upstream validation: swap, then clamp decay
            intensity_min, intensity_max = intensity_max, intensity_min
        decay = min(max(decay, 0.0), 1.0)
        self.use_adaptive_search = use_adaptive_search
        self.fixed_intensity = fixed_intensity
        self.use_ucb_selection = use_ucb_selection
        self.use_migration = use_migration
        self.migration_interval = max(1, int(migration_interval))
        self.use_dynamic_islands = use_dynamic_islands
        self.max_islands = int(max_islands)
        self.spawn_productivity_threshold = spawn_productivity_threshold
        self.spawn_cooldown_iterations = int(spawn_cooldown_iterations)
        self.local_context_program_ratio = local_context_program_ratio
        self.num_inspirations = int(num_inspirations)
        self.adapter = MultiDimensionalAdapter(num_dims=max(1, int(num_islands)), decay=decay,
                                               intensity_min=intensity_min,
                                               intensity_max=intensity_max)
        self.current_island = 0
        # upstream AdaEvolveDatabase inits last_spawn_iteration = -spawn_cooldown so the FIRST dynamic
        # spawn is gated only by productivity/cap/non-empty (not by the cooldown); initializing to 0
        # would block any spawn until iteration == spawn_cooldown_iterations.
        self.last_spawn_iteration = -self.spawn_cooldown_iterations

    # ---- select ---------------------------------------------------------------------------------
    def select(self, population, state: RunState | None = None) -> Selection:
        members = population.all()
        if not members:
            raise RuntimeError("cannot select from an empty population")
        while len(self.adapter.states) < population.num_islands:  # keep dims in sync (defensive)
            self.adapter.add_dimension()
        isl = self.current_island % population.num_islands
        sig = state.signals.setdefault("adaevolve", {}) if state is not None else {}

        # Level 1: intensity from the island's G, then the 3-mode split
        intensity = (self.adapter.states[isl].get_search_intensity()
                     if self.use_adaptive_search else self.fixed_intensity)
        rand = self.rng.random()
        if rand < intensity:
            mode = "exploration"                              # P = I
        elif rand < intensity + (1 - intensity) * 0.7:
            mode = "exploitation"                             # P = (1 − I) · 0.7
        else:
            mode = "balanced"                                 # P = (1 − I) · 0.3
        parent = population.sample_parent(isl, mode, self.rng)

        # active paradigm: the parent is REPLACED with the global best BEFORE the inspiration set
        # is computed, so the final parent can never appear in its own inspirations
        if sig.get("paradigm_active"):
            best = population.best()
            if best is not None:
                parent = best

        # inspirations: local most-distant-from-parent (top half) + global top-fitness fill; the
        # global share excludes ONLY the parent (reference _sample_global_top) and may therefore
        # duplicate local picks
        n = self.num_inspirations
        local_n = max(1, int(n * self.local_context_program_ratio))
        local = population.context_candidates(isl, parent, local_n)
        global_n = n - local_n
        inspirations = local + (population.global_top(global_n, exclude={parent.id})
                                if global_n > 0 else [])

        # sibling context for the prompt: last <=5 children of this parent ON THE CURRENT ISLAND
        siblings = [{"parent_fitness": parent.fitness, "child_fitness": kid.fitness,
                     "delta": kid.fitness - parent.fitness}
                    for kid in population.children_of(parent.id, limit=5, island=isl)]
        sig.update({"mode": mode, "intensity": intensity, "island": isl, "siblings": siblings,
                    "num_islands": population.num_islands,
                    "global_productivity": self.adapter.get_global_productivity()})
        return Selection(parent=parent, inspirations=inspirations, pool=members)

    # ---- observe ----------------------------------------------------------------------------------
    def observe(self, genome: Genome, state: RunState | None = None) -> None:
        if not genome.metadata.get("admitted"):
            return  # rejected/eval-gated children update neither G nor UCB stats
        isl = genome.metadata.get("island", self.current_island)
        if not isinstance(isl, int) or not (0 <= isl < len(self.adapter.states)):
            isl = self.current_island % len(self.adapter.states)
        if genome.metadata.get("migrated_from") is not None:
            self.adapter.receive_external_improvement(isl, genome.fitness)
        else:
            self.adapter.record_evaluation(isl, genome.fitness)

    # ---- end of iteration  (AdaEvolveDatabase.end_iteration, called from scaffold.periodic) ------
    def end_iteration(self, iteration: int, population) -> None:
        """(1) dynamic-spawn check, (2) pick the next island (UCB or round-robin), (3) ring
        migration every ``migration_interval`` iterations — in the upstream order."""
        if (self.use_dynamic_islands and len(population) > 0
                and population.num_islands < self.max_islands
                and iteration - self.last_spawn_iteration >= self.spawn_cooldown_iterations
                and self.adapter.get_global_productivity() < self.spawn_productivity_threshold):
            self._spawn(population)
            self.last_spawn_iteration = iteration

        if self.use_ucb_selection:
            self.current_island = self.adapter.select_dimension_ucb(iteration, self.rng)
        else:
            self.current_island = (iteration + 1) % population.num_islands  # round-robin ablation

        if self.use_migration and iteration > 0 and iteration % self.migration_interval == 0:
            for migrant in population.migrate():
                dest = migrant.metadata.get("migrated_to", migrant.metadata.get("island", 0))
                self.adapter.receive_external_improvement(int(dest) % len(self.adapter.states),
                                                          migrant.fitness)

    def _spawn(self, population) -> None:
        """``_spawn_island``: least-used preset (random among ties, through ``self.rng``), a fresh
        adaptive-state dimension, and top-5 seeding (done by the population)."""
        usage = population.preset_usage()
        lowest = min(usage.values())
        ties = [p for p in ISLAND_CONFIG_PRESETS if usage[p["name"]] == lowest]
        preset = self.rng.choice(ties)
        population.spawn_island(preset)
        self.adapter.add_dimension()