Source code for radvel.nested_sampling

from __future__ import annotations

import os
import shutil
from typing import Optional

import h5py
import numpy as np
from numpy.typing import ArrayLike
from pandas import DataFrame, read_hdf


from radvel.posterior import Posterior


def _run_dynesty(
    post: Posterior,
    output_dir: Optional[str] = None,
    sampler_type: str = "static",
    proceed: bool = False,
    sampler_kwargs: Optional[dict] = None,
    run_kwargs: Optional[dict] = None,
) -> dict:
    """Run nested sampling with `Dynesty <https://dynesty.readthedocs.io/>`_

    Args:
        post: radvel posterior object
        output_dir: Output directory where the sampler checkpoints and results will be stored. Nothing is stored by default.
            **Note**: This replaces the sampler's built-in "checkpoint_file" argument. A ``dynesty.save`` file is created automatically.
            When sampling is finished, the final state of the sampler is stored.
        proceed: Continue from previous run in output_dir if available.
        sampler_kwargs: Dictionary of keyword arguments passed to the 'sampler' object from the underlying nested sampling package at initialization.
            See each package's documentation to learn more on the available arguments. This is not available for ``sampler='multinest'``.
            Defaults to ``None``.
        run_kwargs: Dictionary of keyword arguments passed to the 'run' methods from the underlying nested sampling package.
            See each package's documentation to learn more on the available aruments.
    Returns:
        Dictionary of results with the following keys:
            - ``samples``: Samples array with shape ``(nsamples, nparams)``
            - ``lnZ``: Log of the Bayesian evidence
            - ``lnZ``: Statistical uncertainty on the evidence
            - ``sampler``: Sampler object used by the nested sampling library. Provides more fine-grained access to the results.

    """
    from dynesty import DynamicNestedSampler, NestedSampler

    run_kwargs = run_kwargs or {}
    sampler_kwargs = sampler_kwargs or {}

    if sampler_type == "static":
        sampler_class = NestedSampler
    elif sampler_type == "dynamic":
        sampler_class = DynamicNestedSampler
    else:
        raise ValueError(
            f"Expected 'dynamic' or 'static' as sampler_type. Got {sampler_type}"
        )

    if "resume" in run_kwargs:
        raise ValueError("'resume' not supported for dynesty. Use radlvel's 'proceed' instead'")
    run_kwargs["resume"] = proceed

    if "checkpoint_file" in run_kwargs:
        raise ValueError(
            "checkpoint_file not supported for dynesty. Use radvel's output_dir instead."
        )
    if output_dir is not None:
        checkpoint_file = os.path.join(output_dir, "sampler.save")
        run_kwargs["checkpoint_file"] = checkpoint_file
    checkpoint_file = run_kwargs.get("checkpoint_file", None)

    if proceed and checkpoint_file is not None and os.path.exists(checkpoint_file):
        sampler = sampler_class.restore(checkpoint_file)
    else:
        sampler = sampler_class(
            post.likelihood_ns_array,
            post.prior_transform,
            len(post.name_vary_params()),
            **sampler_kwargs,
        )
        # Dynesty cannot resume when the file does not exist
        if (
            proceed
            and checkpoint_file is not None
            and not os.path.exists(checkpoint_file)
        ):
            run_kwargs["resume"] = False

    if checkpoint_file is not None and not os.path.exists(checkpoint_file):
        outdir = os.path.dirname(checkpoint_file)
        os.makedirs(outdir)

    sampler.run_nested(**run_kwargs)

    if checkpoint_file is not None:
        sampler.save(checkpoint_file)

    results = {
        "samples": sampler.results.samples_equal(),
        "lnZ": sampler.results["logz"][-1],
        "lnZerr": sampler.results["logzerr"][-1],
        "sampler": sampler,
    }

    return results


def _run_ultranest(
    post: Posterior,
    output_dir: Optional[str] = None,
    proceed: bool = False,
    sampler_kwargs: Optional[dict] = None,
    run_kwargs: Optional[dict] = None,
) -> dict:
    """Run nested sampling with `Ultranest <https://johannesbuchner.github.io/UltraNest/ultranest.html#ultranest.integrator.ReactiveNestedSampler>`_

    The SliceSampler will be used automatically for more than 7 parameters.
    See `the Ultranest docs <https://johannesbuchner.github.io/UltraNest/example-sine-highd.html#Step-samplers-in-UltraNest>`_ for more information

    Parameters starting with ``tc`` or ``tp`` are assumed to by time of conjunction or time of periastron and are marked as ``wrapped_params`` automatically.

    Args:
        post: radvel posterior object
        output_dir: Output directory where the sampler checkpoints and results will be stored. Nothing is stored by default.
            **Note**: This replaces the sampler's built-in "log_dir" argument.
            The ultranest ``log_dir`` is automatically set to ``output_dir``.
        proceed: Continue from previous run in output_dir if available.
        sampler_kwargs: Dictionary of keyword arguments passed to the 'sampler' object from the underlying nested sampling package at initialization.
            See each package's documentation to learn more on the available arguments.
            Defaults to ``None``.
        run_kwargs: Dictionary of keyword arguments passed to the 'run' methods from the underlying nested sampling package.
            See each package's documentation to learn more on the available aruments.
    Returns:
        Dictionary of results with the following keys:
            - ``samples``: Samples array with shape ``(nsamples, nparams)``
            - ``lnZ``: Log of the Bayesian evidence
            - ``lnZ``: Statistical uncertainty on the evidence
            - ``sampler``: Sampler object used by the nested sampling library. Provides more fine-grained access to the results.
    """
    from ultranest import ReactiveNestedSampler
    from ultranest.stepsampler import SliceSampler, generate_mixture_random_direction

    run_kwargs = run_kwargs or {}
    sampler_kwargs = sampler_kwargs or {}

    if "log_dir" in sampler_kwargs:
        raise ValueError(
            "log_dir not supported for ultranest. Use radvel's output_dir instead."
        )
    if "resume" in sampler_kwargs:
        raise ValueError(
            "'resume' not supported for ultranest. Use radvel's 'proceed' instead."
        )
    sampler_kwargs["resume"] = proceed or 'overwrite'
    sampler_kwargs["log_dir"] = output_dir

    param_names = post.name_vary_params()
    wrapped_params = [pn.startswith(("tc", "tp")) for pn in param_names]
    # I guess simplest is to use overwrite or re-run
    sampler = ReactiveNestedSampler(
        param_names,
        post.likelihood_ns_array,
        post.prior_transform,
        wrapped_params=wrapped_params,
        **sampler_kwargs,
    )

    num_params = len(param_names)
    if num_params > 7:
        nsteps = len(param_names) * 2
        sampler.stepsampler = SliceSampler(
            nsteps=nsteps, generate_direction=generate_mixture_random_direction,
        )

    sampler.run(**run_kwargs)

    results = {
        "samples": sampler.results["samples"],
        "lnZ": sampler.results["logz"],
        "lnZerr": sampler.results["logzerr"],
        "sampler": sampler,
    }

    return results


def _run_multinest(
    post: Posterior,
    output_dir: Optional[str] = None,
    overwrite: bool = False,
    proceed: bool = False,
    run_kwargs: Optional[dict] = None,
) -> dict:
    """Run nested sampling with `PyMultiNest <https://johannesbuchner.github.io/PyMultiNest/pymultinest.html#>`_

    Args:
        post: radvel posterior object
        output_dir: Output directory where the sampler checkpoints and results will be stored. Nothing is stored by default.
            **Note**: This replaces the sampler's built-in "outputfiles_basename" argument.
            If ``output_dir`` is specified, sets ``outputfiles_basename`` to ``<output_dir>/out``
        proceed: Continue from previous run in output_dir if available.
        overwrite: Overwrite the output files if they exist. Defaults to ``False``.
        run_kwargs: Dictionary of keyword arguments passed to the 'run' methods from the underlying nested sampling package.
            See each package's documentation to learn more on the available aruments.
    Returns:
        Dictionary of results with the following keys:
            - ``samples``: Samples array with shape ``(nsamples, nparams)``
            - ``lnZ``: Log of the Bayesian evidence
            - ``lnZ``: Statistical uncertainty on the evidence
    """
    import pymultinest

    run_kwargs = run_kwargs or {}

    # By default, assume we want a temporary output dir
    tmp = True

    if output_dir is None:
        output_dir = "tmpdir"
    else:
        # if an actual outupt dir was specified, it is not temporary
        tmp = False

    if "outputfiles_basename" in run_kwargs:
        raise ValueError(
            "outputfiles_basename not supported for multinest. Use radvel's output_dir instead."
        )
    run_kwargs["outputfiles_basename"] = os.path.join(output_dir, "out")

    if "resume" in run_kwargs:
        raise ValueError(
            "'resume' not supported for multinest. Use radvel's 'proceed' instead."
        )
    run_kwargs["resume"] = proceed

    os.makedirs(output_dir, exist_ok=tmp or overwrite or proceed)

    def loglike(p: ArrayLike, ndim: int, nparams: int) -> float:
        """Log-likelihood for multinest
        Must support ndim and nparams arguments
        and create a list-copy of the object to avoid segfault.
        """
        # This is required to avoid segfault
        # See here: https://github.com/JohannesBuchner/PyMultiNest/issues/41, which I semi-understand
        p = [p[i] for i in range(ndim)]
        return post.likelihood_ns_array(p)

    def prior_transform(u: ArrayLike, ndim: int, nparams: int) -> None:
        """Prior transform for multinest

        Multinest requires the prior transform to handle ndim, nparams arguments
        and to modify the array in-place
        """
        post.prior_transform(u, inplace=True)

    ndim = len(post.name_vary_params())

    pymultinest.run(loglike, prior_transform, ndim, **run_kwargs)

    a = pymultinest.Analyzer(
        outputfiles_basename=run_kwargs["outputfiles_basename"], n_params=ndim
    )

    results = {}
    results["samples"] = a.get_equal_weighted_posterior()[:, :-1]
    results["lnZ"] = a.get_stats()["global evidence"]
    results["lnZerr"] = a.get_stats()["global evidence error"]

    if tmp:
        shutil.rmtree(output_dir)

    return results


def _run_nautilus(
    post: Posterior,
    output_dir: Optional[str] = None,
    proceed: bool = False,
    sampler_kwargs: Optional[dict] = None,
    run_kwargs: Optional[dict] = None,
) -> dict:
    """Run nested sampling with `Nautilus <https://nautilus-sampler.readthedocs.io/en/latest/api_high.html>`_

    Args:
        post: radvel posterior object
        output_dir: Output directory where the sampler checkpoints and results will be stored. Nothing is stored by default.
            **Note**: This replaces the sampler's built-in "filepath argument.
            The nautilus output is automatically stored in ``nautilus_output.hdf5`` under that location.
        proceed: Continue from previous run in output_dir if available.
        sampler_kwargs: Dictionary of keyword arguments passed to the 'sampler' object from the underlying nested sampling package at initialization.
            See each package's documentation to learn more on the available arguments.
            Defaults to ``None``.
        run_kwargs: Dictionary of keyword arguments passed to the 'run' methods from the underlying nested sampling package.
            See each package's documentation to learn more on the available aruments.
    Returns:
        Dictionary of results with the following keys:
            - ``samples``: Samples array with shape ``(nsamples, nparams)``
            - ``lnZ``: Log of the Bayesian evidence
            - ``lnZ``: Statistical uncertainty on the evidence
            - ``sampler``: Sampler object used by the nested sampling library. Provides more fine-grained access to the results.
    """
    from nautilus import Sampler

    sampler_kwargs = sampler_kwargs or {}
    run_kwargs = run_kwargs or {}
    run_kwargs.setdefault("verbose", True)

    if "filepath" in sampler_kwargs:
        raise ValueError(
            "filepath not supported for nautilus. Use radvel's output_dir instead."
        )
    if output_dir is not None:
        sampler_kwargs["filepath"] = os.path.join(output_dir, "nautilus_output.hdf5")
    if "resume" in sampler_kwargs:
        raise ValueError(
            "'resume' not supported for ultranest. Use radvel's 'proceed' instead."
        )
    sampler_kwargs["resume"] = proceed

    ndim = len(post.name_vary_params())
    sampler = Sampler(
        post.prior_transform, post.likelihood_ns_array, n_dim=ndim, **sampler_kwargs
    )
    sampler.run(**run_kwargs)
    results = {
        "samples": sampler.posterior(equal_weight=True)[0],
        "lnZ": sampler.log_z,
        "lnZerr": sampler.n_eff**-0.5,
        "sampler": sampler,
    }
    return results


BACKENDS = {
    "dynesty-static": _run_dynesty,
    "dynesty-dynamic": _run_dynesty,
    "multinest": _run_multinest,
    "ultranest": _run_ultranest,
    "nautilus": _run_nautilus,
}


[docs] def run( post: Posterior, output_dir: Optional[str] = None, overwrite: bool = False, proceed: bool = False, sampler: str = "ultranest", sampler_kwargs: Optional[dict] = None, run_kwargs: Optional[dict] = None, ) -> dict: """Run nested sampling Args: post: radvel posterior object output_dir: Output directory where the sampler checkpoints and results will be stored. Nothing is stored by default. **Note**: This replaces the sampler's built-in "checkpoint_file", "log_dir", or "outputfiles_basename" argument. Once you specify output there, everything is saved there automatically. A ``results.hdf5`` file will also be saved with the results dict, except for the sampler. overwrite: Overwrite the results.hdf5 if True. Will be enabled automatically when proceed=True. proceed: Resume from a previous run in the same output_dir if available. Also automatically enables overwrite. sampler: name of the sampler to use. Should be one of 'ultranest', 'dynesty-static', 'dynesty-dynamic', 'nautilus', or 'multinest'. Defaults to 'ultranest'. sampler_kwargs: Dictionary of keyword arguments passed to the 'sampler' object from the underlying nested sampling package at initialization. See each package's documentation to learn more on the available arguments. This is not available for ``sampler='multinest'``. Defaults to ``None``. run_kwargs: Dictionary of keyword arguments passed to the 'run' methods from the underlying nested sampling package. See each package's documentation to learn more on the available aruments. Returns: Dictionary of results with the keys below. - ``samples``: Samples dataframe with one column per parameters and lnprobability (log-posterior) for each set. The samples are equally weighted, meaning they are equivalent to MCMC samples. - ``lnZ``: Log of the Bayesian evidence - ``lnZ``: Statistical uncertainty on the evidence - ``sampler``: Sampler object used by the nested sampling library. Provides more fine-grained access to the results. Link to each package's API documentation: - `Ultranest <https://johannesbuchner.github.io/UltraNest/ultranest.html#ultranest.integrator.ReactiveNestedSampler>`_ - `Nautilus <https://nautilus-sampler.readthedocs.io/en/latest/api_high.html>`_ - `Dynesty <https://dynesty.readthedocs.io>`_ - `PyMultiNest <https://johannesbuchner.github.io/PyMultiNest/pymultinest.html#>`_ """ post.check_proper_priors() # Proceed automatically enables overwrite. overwrite = overwrite or proceed if output_dir is not None: results_file = os.path.join(output_dir, "results.hdf5") if os.path.exists(results_file) and not overwrite: raise FileExistsError( f"Results file {results_file} exists and 'overwrite' is False." ) sampler = sampler.lower() if sampler == "pymultinest": sampler = "multinest" # fmt: off if sampler == "ultranest": results = _run_ultranest(post, output_dir=output_dir, proceed=proceed, sampler_kwargs=sampler_kwargs, run_kwargs=run_kwargs) elif sampler == "dynesty-static": results = _run_dynesty(post, sampler_type="static", proceed=proceed, output_dir=output_dir, sampler_kwargs=sampler_kwargs, run_kwargs=run_kwargs) elif sampler == "dynesty-dynamic": results = _run_dynesty(post, sampler_type="dynamic", proceed=proceed, output_dir=output_dir, sampler_kwargs=sampler_kwargs, run_kwargs=run_kwargs) elif sampler == "multinest": if sampler_kwargs is not None and len(sampler_kwargs) > 0: raise TypeError("Argument sampler_kwargs is invalid for sampler 'multinest', only run_kwargs is supported") results = _run_multinest(post, output_dir=output_dir, proceed=proceed, overwrite=overwrite, run_kwargs=run_kwargs) elif sampler == "nautilus": results = _run_nautilus(post, output_dir=output_dir, proceed=proceed, sampler_kwargs=sampler_kwargs, run_kwargs=run_kwargs) else: raise ValueError(f"Unknown sampler '{sampler}'. Available options are {list(BACKENDS.keys())}") # fmt: on df = DataFrame(results["samples"], columns=post.name_vary_params()) lnprob_arr = np.empty(len(df)) for i, row in df.iterrows(): lnprob_arr[i] = post.logprob_array(row.values) df["lnprobability"] = lnprob_arr results["samples"] = df if output_dir is not None: with h5py.File(results_file, mode="w") as h5f: for key, val in results.items(): if key == "sampler" or key == "samples": continue h5f.create_dataset(key, data=val) results["samples"].to_hdf(results_file, key="samples", mode="a") return results
[docs] def load_results(results_file: str) -> dict: """Load nested sampling results dictionary Args: results_file: Path to hdf5 file containing the results. Returns: Dictionary with nested sampling results. Note that the ``sampler`` key is not saved, so it is not in the dictionary returned by this function. """ results = {} with h5py.File(results_file) as h5f: results["lnZ"] = np.array(h5f["lnZ"]).item() results["lnZerr"] = np.array(h5f["lnZerr"]).item() results["samples"] = read_hdf(results_file, key="samples") return results