Source code for ndscan.experiment.scan_generator

import logging
import random
from collections.abc import Iterator
from dataclasses import dataclass
from itertools import product
from typing import Any

import numpy as np

logger = logging.getLogger(__name__)

__all__ = [
    "ScanGenerator",
    "RefiningGenerator",
    "LinearGenerator",
    "ListGenerator",
    "CentreSpanRefiningGenerator",
    "IntRefiningGenerator",
    "IntLinearGenerator",
    "IntCentreSpanGenerator",
    "IntCentreSpanRefiningGenerator",
    "ScanOptions",
    "GENERATORS",
    "INT_GENERATORS",
]


[docs] class ScanGenerator: """Generates points along a single scan axis to be visited.""" def has_level(self, level: int) -> bool: """ """ raise NotImplementedError def points_for_level(self, level: int, rng=None) -> list[Any]: """ """ raise NotImplementedError def describe_limits(self, target: dict[str, Any]) -> None: """ """ raise NotImplementedError
[docs] class RefiningGenerator(ScanGenerator): """Generates progressively finer grid by halving distance between points each level.""" def __init__(self, lower, upper, randomise_order): self.lower = float(min(lower, upper)) self.upper = float(max(lower, upper)) self.randomise_order = randomise_order self.limit_lower = float("-inf") self.limit_upper = float("inf") def has_level(self, level: int) -> bool: "" # For floating-point parameters, a refining scan, in practical terms, never runs # out of points. Will need to be amended for integer parameters. return True def points_for_level(self, level: int, rng=None) -> list[Any]: "" if level == 0: points = np.array([self.lower, self.upper]) else: d = self.upper - self.lower num = 2 ** (level - 1) points = np.arange(num) * d / num + d / (2 * num) + self.lower # Silently drop points outside of the limits points = points[(points >= self.limit_lower) & (points <= self.limit_upper)] if self.randomise_order: rng.shuffle(points) return points.tolist() def describe_limits(self, target: dict[str, Any]) -> None: "" target["min"] = self.lower target["max"] = self.upper @staticmethod def args_from_centre_span( centre, half_span, randomise_order, limit_lower, limit_upper ): lower = centre - half_span upper = centre + half_span if lower < limit_lower and upper > limit_upper: # If both extents of the span exceed the limits, change the span to fit # the larger of the two. This way we still don't move the scan centre. half_span = max(abs(centre - limit_lower), abs(limit_upper - centre)) lower = centre - half_span upper = centre + half_span return lower, upper, randomise_order
[docs] class CentreSpanRefiningGenerator(RefiningGenerator): """Generates progressively finer grid in span around centre by halving distance between points each level. Scan is always centred on the given centre, even if the span exceeds the limits. :param limit_lower: Optional lower limit (inclusive) to the range of generated points. Useful for representing scans on parameters the range of which is limited (e.g. to be non-negative). :param limit_upper: See `limit_lower`. """ def __init__( self, centre, half_span, randomise_order, limit_lower=-np.inf, limit_upper=np.inf, ): super().__init__( *super().args_from_centre_span( centre, half_span, randomise_order, limit_lower, limit_upper ) ) self.centre, self.half_span = centre, half_span self.limit_lower, self.limit_upper = limit_lower, limit_upper
class ExpandingGenerator(ScanGenerator): """Generates points with given, constant spacing in progressively growing range around a given centre. """ def __init__( self, centre, spacing, randomise_order: bool, limit_lower=None, limit_upper=None ): """ :param limit_lower: Optional lower limit (inclusive) to the range of generated points. Useful for representing scans on parameters the range of which is limited (e.g. to be non-negative). :param limit_upper: See `limit_lower`. """ self.centre = centre self.spacing = abs(spacing) self.randomise_order = randomise_order self.limit_lower = limit_lower if limit_lower is not None else float("-inf") if centre < self.limit_lower: raise ValueError("Given scan centre exceeds lower limit") self.limit_upper = limit_upper if limit_upper is not None else float("inf") if centre > self.limit_upper: raise ValueError("Given scan centre exceeds upper limit") def has_level(self, level: int) -> bool: "" def num_points(limit): return np.floor(abs(self.centre - limit) / self.spacing) return level <= max(num_points(self.limit_lower), num_points(self.limit_upper)) def points_for_level(self, level: int, rng=None) -> list[Any]: "" if level == 0: return [self.centre] points = [] lower = self.centre - level * self.spacing if lower >= self.limit_lower: points.append(lower) upper = self.centre + level * self.spacing if upper <= self.limit_upper: points.append(upper) if self.randomise_order: rng.shuffle(points) return points def describe_limits(self, target: dict[str, Any]) -> None: "" if self.limit_lower > float("-inf"): target["min"] = self.limit_lower if self.limit_upper < float("inf"): target["max"] = self.limit_upper target["increment"] = self.spacing
[docs] class LinearGenerator(ScanGenerator): """Generates equally spaced points between two endpoints.""" min_num_points = 2 def __init__(self, start, stop, num_points, randomise_order): if num_points < self.min_num_points: raise ValueError( f"Need at least {self.min_num_points} point(s) in linear scan" ) self.start = start self.stop = stop self.num_points = num_points self.randomise_order = randomise_order def has_level(self, level: int) -> bool: "" return level == 0 def points_for_level(self, level: int, rng=None) -> list[Any]: "" assert level == 0 points = np.linspace( start=self.start, stop=self.stop, num=self.num_points, endpoint=True ) if self.randomise_order: rng.shuffle(points) return points.tolist() def describe_limits(self, target: dict[str, Any]) -> None: "" target["min"] = min(self.start, self.stop) target["max"] = max(self.start, self.stop) target["increment"] = abs(self.stop - self.start) / (self.num_points - 1) @staticmethod def args_from_centre_span( centre, half_span, num_points, randomise_order, limit_lower, limit_upper ): num_points = num_points randomise_order = randomise_order start = centre - half_span if limit_lower is not None: start = max(start, limit_lower) stop = centre + half_span if limit_upper is not None: stop = min(stop, limit_upper) if start > stop: raise ValueError("Empty centre/span scan (lower limit larger than upper)") if num_points == 1: start = stop = centre return start, stop, num_points, randomise_order
class CentreSpanGenerator(LinearGenerator): """Generates equally spaced points in ``centre``±``half_span``.""" min_num_points = 1 def __init__( self, centre, half_span, num_points: int, randomise_order: bool, limit_lower=None, limit_upper=None, ): """ :param limit_lower: Optional lower limit (inclusive) to the range of generated points. Useful for representing scans on parameters the range of which is limited (e.g. to be non-negative). :param limit_upper: See `limit_lower`. """ super().__init__( *super().args_from_centre_span( centre, half_span, num_points, randomise_order, limit_lower, limit_upper ) ) self.limit_lower, self.limit_upper = limit_lower, limit_upper def _refining_max_level(d: int) -> int: """Largest refinement level at which :class:`IntRefiningGenerator`'s bisection scheme can contribute further integers, given range size ``d = upper - lower``. The bisection forms an implicit binary search tree of depth at most ``ceil(log2(d))`` over the integers in ``[lower, upper]`` — each level halves the largest open interval, until no interval has interior integers left. """ if d <= 0: return 0 return (d - 1).bit_length() def _bisection_points(lower: int, upper: int, depth: int): """Yield, in left-to-right order, the integer midpoints at the given depth of the bisection tree of the open interval ``(lower, upper)``. Depth 0 is the root midpoint ``(lower + upper) // 2``; depth ``k`` recurses into the left and right halves at depth ``k - 1``. Once an interval has no interior integers (``upper - lower < 2``), recursion stops — each integer in ``[lower, upper]`` is therefore visited at most once across all depths. """ if upper - lower < 2: return mid = (lower + upper) // 2 if depth == 0: yield mid return yield from _bisection_points(lower, mid, depth - 1) yield from _bisection_points(mid, upper, depth - 1)
[docs] class IntRefiningGenerator(RefiningGenerator): """Integer-valued analogue of :class:`RefiningGenerator`. Level 0 emits ``lower`` and ``upper``; subsequent levels recursively bisect the resulting open intervals, emitting the integer midpoint of each. By construction every integer in ``[lower, upper]`` is visited at most once, and the scan terminates exactly once all of them have been visited. """ def __init__(self, lower, upper, randomise_order): super().__init__(lower, upper, randomise_order) self.lower, self.upper = round(self.lower), round(self.upper) def has_level(self, level: int) -> bool: "" return level <= _refining_max_level(self.upper - self.lower) def points_for_level(self, level: int, rng=None) -> list[Any]: "" if level == 0: if self.lower == self.upper: points = [self.lower] else: points = [self.lower, self.upper] else: points = list(_bisection_points(self.lower, self.upper, level - 1)) # Silently drop points outside of the limits. points = [p for p in points if self.limit_lower <= p <= self.limit_upper] if self.randomise_order: rng.shuffle(points) return points
[docs] class IntCentreSpanRefiningGenerator(IntRefiningGenerator): """Integer-valued analogue of :class:`CentreSpanRefiningGenerator`. :param limit_lower: Optional lower limit (inclusive) to the range of generated points. :param limit_upper: See ``limit_lower``. """ def __init__( self, centre, half_span, randomise_order, limit_lower=-np.inf, limit_upper=np.inf, ): super().__init__( *super().args_from_centre_span( centre, half_span, randomise_order, limit_lower, limit_upper ) ) self.centre, self.half_span = centre, half_span self.limit_lower, self.limit_upper = limit_lower, limit_upper
[docs] class IntLinearGenerator(LinearGenerator): """Integer-valued analogue of :class:`LinearGenerator`. Computes ``num_points`` equally spaced positions between ``start`` and ``stop`` and rounds each to the nearest integer. Because the underlying floating-point sequence is monotonic, integer duplicates that appear when the requested density exceeds one point per integer are always adjacent — a single pass collapses them, so every integer is visited at most once. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.start, self.stop, self.num_points = ( round(self.start), round(self.stop), round(self.num_points), ) max_num_points = abs(self.stop - self.start) + 1 if self.num_points > max_num_points: logger.warning( "Cannot generate %d unique points in range [%d, %d]. " "Reducing number of points to %d.", self.num_points, self.start, self.stop, max_num_points, ) self.num_points = max_num_points def points_for_level(self, level: int, rng=None) -> list[Any]: "" assert level == 0 points = np.linspace( start=self.start, stop=self.stop, num=self.num_points, endpoint=True, dtype=int, ) if self.randomise_order: rng.shuffle(points) return points.tolist()
[docs] class IntCentreSpanGenerator(IntLinearGenerator): """Integer-valued analogue of :class:`CentreSpanGenerator`..""" min_num_points = 1 def __init__( self, centre, half_span, num_points: int, randomise_order: bool, limit_lower=None, limit_upper=None, ): super().__init__( *super().args_from_centre_span( centre, half_span, num_points, randomise_order, limit_lower, limit_upper ) ) self.start, self.stop = round(self.start), round(self.stop) self.limit_lower, self.limit_upper = limit_lower, limit_upper
[docs] class ListGenerator(ScanGenerator): """Generates points by reading from an explicitly specified list.""" def __init__(self, values, randomise_order): self.values = values self.randomise_order = randomise_order def has_level(self, level: int) -> bool: "" return level == 0 def points_for_level(self, level: int, rng=None) -> list[Any]: "" assert level == 0 values = self.values if self.randomise_order: rng.shuffle(values) return values def describe_limits(self, target: dict[str, Any]) -> None: "" values = np.array(self.values) if np.issubdtype(values.dtype, np.number): target["min"] = np.min(values) target["max"] = np.max(values)
GENERATORS = { "refining": RefiningGenerator, "expanding": ExpandingGenerator, "linear": LinearGenerator, "centre_span": CentreSpanGenerator, "centre_span_refining": CentreSpanRefiningGenerator, "list": ListGenerator, } #: Generators to prefer for integer parameters; fall back to :data:`GENERATORS` for #: scan types not listed here (``expanding`` already produces integer-spaced points, #: ``list`` simply enumerates user-supplied values). INT_GENERATORS = { "refining": IntRefiningGenerator, "linear": IntLinearGenerator, "centre_span": IntCentreSpanGenerator, "centre_span_refining": IntCentreSpanRefiningGenerator, } @dataclass class ScanOptions: """ """ #: How many times to iterate through the specified scan points. #: #: For scans with more than one level, the repetitions are executed for each level #: before proceeding to the next one. This way, e.g. :class:`.RefiningGenerator`, #: which produces infinitely many points, can still be employed in interactive #: work when wanting to use repeats to gather statistical information. num_repeats: int = 1 #: How many times to repeat each point consecutively in a scan (i.e. without #: changing parameters). This is useful for scans where there is some settling time #: after moving to a new point. num_repeats_per_point: int = 1 #: Whether to randomise the acquisition order of data points across all axes #: (within a refinement level). #: #: This is complementary to the randomisation options a :class:`.ScanGenerator` will #: typically have, as for a multi-dimensional scan, that alone would still lead to #: data being acquired "stripe by stripe" (hyperplane by hyperplane). randomise_order_globally: bool = False #: Global seed to use for randomising the point acquisition order (if requested). seed: int = None def __post_init__(self): if self.seed is None: self.seed = random.getrandbits(32) def generate_points( axis_generators: list[ScanGenerator], options: ScanOptions ) -> Iterator[Any]: rng = np.random.RandomState(options.seed) # Stores computed coordinates for each axis, indexed first by # axis order, then by level. axis_level_points = [[] for _ in axis_generators] max_level = 0 while True: found_new_levels = False for i, a in enumerate(axis_generators[::-1]): if a.has_level(max_level): axis_level_points[i].append(a.points_for_level(max_level, rng)) found_new_levels = True if not found_new_levels: # No levels left to exhaust, done. return points = [] for axis_levels in product(*(range(len(p)) for p in axis_level_points)): if all(lvl < max_level for lvl in axis_levels): # Previously visited this combination already. continue tp = product(*(p[lvl] for (lvl, p) in zip(axis_levels, axis_level_points))) points.extend(tp) for _ in range(options.num_repeats): if options.randomise_order_globally: rng.shuffle(points) for p in points: for _ in range(options.num_repeats_per_point): yield p[::-1] max_level += 1