Source code for radvel.utils

from __future__ import annotations

import os
import sys
import types
from decimal import Decimal
from types import ModuleType
from contextlib import contextmanager
from typing import Generator
import warnings

from pandas import DataFrame
import numpy as np
from numpy.typing import NDArray
import pandas as pd
from datetime import datetime, timedelta
from astropy import constants as c
from astropy import units as u
from scipy.optimize import root

import radvel
import radvel.posterior
from radvel.model import Parameters, Vector, GeneralRVModel

# Normalization.
# RV m/s of a 1.0 Jupiter mass planet tugging on a 1.0
# solar mass star on a 1.0 year orbital period
K_0 = 28.4329


[docs] def load_module_from_file(module_name: str, module_path: str) -> ModuleType: """Loads a python module from the path of the corresponding file. Args: module_name (str): namespace where the python module will be loaded, e.g. ``foo.bar`` module_path (str): path of the python file containing the module Returns: A valid module object Raises: ImportError: when the module can't be loaded FileNotFoundError: when module_path doesn't exist """ if sys.version_info[0] == 3 and sys.version_info[1] >= 5: import importlib.util spec = importlib.util.spec_from_file_location(module_name, module_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) elif sys.version_info[0] == 3 and sys.version_info[1] < 5: import importlib.machinery loader = importlib.machinery.SourceFileLoader(module_name, module_path) module = loader.load_module() elif sys.version_info[0] == 2: import imp module = imp.load_source(module_name, module_path) return module
[docs] def initialize_posterior(config_file: str, decorr: bool = False) -> tuple[ModuleType, radvel.posterior.Posterior]: """Initialize Posterior object from a Python setup file. Parse a setup file and initialize the RVModel, Likelihood, Posterior and priors. Args: config_file (string): path to config file decorr (bool): (optional) decorrelate RVs against columns defined in the decorr_vars list Returns: tuple: (object representation of config file, radvel.Posterior object) """ system_name = os.path.basename(config_file).split('.')[0] P = load_module_from_file(system_name, os.path.abspath(config_file)) return _finalise_posterior(P, decorr=decorr)
[docs] def initialize_posterior_from_dict(config, decorr=False): """Initialize Posterior object from an in-memory dict. Mirrors the namespace of a Python setup file but accepts JSON-friendly types so callers (e.g. an HTTP API) can build a posterior without writing a file to disk first. Each top-level field may either contain a ready-made radvel object (e.g. a ``radvel.Parameters`` instance for ``params``) or a JSON-style representation that this function translates via the helpers below. Args: config (dict): mapping with keys ``starname``, ``nplanets``, ``instnames``, ``fitting_basis``, ``params``, ``data``, ``priors``, optionally ``bjd0``, ``time_base``, ``planet_letters``, ``stellar``, ``planet``, ``decorr_vars``, ``hnames``, ``kernel_name``, ``any_basis``, ``anybasis_params``. decorr (bool): same meaning as :func:`initialize_posterior`. Returns: tuple: (SimpleNamespace mirroring the legacy ``P`` module, radvel.Posterior) """ P = _namespace_from_dict(config) return _finalise_posterior(P, decorr=decorr)
def _namespace_from_dict(config): """Build a SimpleNamespace mirroring a setup-file's globals from a dict. Shared between :func:`initialize_posterior_from_dict` and the API setup shim that the HTTP service writes into a run directory so the legacy :func:`initialize_posterior` loader sees the same namespace. """ P = types.SimpleNamespace() P.starname = config.get('starname', 'unnamed') P.nplanets = int(config['nplanets']) P.instnames = list(config['instnames']) P.fitting_basis = config['fitting_basis'] P.bjd0 = float(config.get('bjd0', 0.0)) planet_letters = config.get('planet_letters') if planet_letters is not None: P.planet_letters = {int(k): v for k, v in planet_letters.items()} P.data = _data_to_dataframe(config['data']).reset_index(drop=True) # The CLI setup-file convention requires a 'tel' column. Inline-rows # payloads include it explicitly, but built-in CSV fixtures don't — # default to the single instrument when there's only one defined. if 'tel' not in P.data.columns and len(P.instnames) == 1: P.data['tel'] = P.instnames[0] if config.get('time_base') is not None: P.time_base = float(config['time_base']) else: P.time_base = float(np.mean([P.data.time.min(), P.data.time.max()])) any_basis = config.get('any_basis') or P.fitting_basis params_in = config['params'] if isinstance(params_in, radvel.Parameters): params_obj = params_in else: params_obj = _build_parameters(params_in, P.nplanets, any_basis) if str(params_obj.basis) != "Basis Object <{}>".format(P.fitting_basis): params_obj = params_obj.basis.to_any_basis(params_obj, P.fitting_basis) P.params = params_obj P.priors = [_build_prior(p) if isinstance(p, dict) else p for p in config.get('priors', [])] if config.get('stellar') is not None: P.stellar = dict(config['stellar']) if config.get('planet') is not None: P.planet = dict(config['planet']) if config.get('decorr_vars'): P.decorr_vars = list(config['decorr_vars']) if config.get('hnames'): P.hnames = {k: list(v) for k, v in config['hnames'].items()} if config.get('kernel_name'): P.kernel_name = dict(config['kernel_name']) return P def _normalise_rv_columns(df): """Map common alternative column names to the canonical CLI form. Many built-in example CSVs (e.g. ``epic203771098.csv``) ship with ``t/vel/errvel`` columns, while the rest of the pipeline expects ``time/mnvel/errvel/tel``. Rename in-place when the canonical column is absent. If no ``tel`` column exists at all, leave it missing — the caller decides whether that's an error. """ rename = {} if 'time' not in df.columns and 't' in df.columns: rename['t'] = 'time' if 'mnvel' not in df.columns and 'vel' in df.columns: rename['vel'] = 'mnvel' if rename: df = df.rename(columns=rename) return df def _data_to_dataframe(spec): """Translate a setup-file ``data`` field into a pandas DataFrame. Accepts either a pre-built DataFrame, a list of row dicts, or one of the JSON-style discriminated unions defined in :mod:`radvel.api.schemas` (``inline``, ``csv_base64``, ``server_path``, ``dataset_ref``). """ if isinstance(spec, pd.DataFrame): return spec.copy() if isinstance(spec, dict): kind = spec.get('kind') if kind == 'inline': return pd.DataFrame(list(spec.get('rows', []))) if kind == 'csv_base64': import base64 import io csv_bytes = base64.b64decode(spec['csv_base64']) sep = spec.get('separator', ',') return pd.read_csv(io.BytesIO(csv_bytes), sep=sep) if kind == 'server_path': return _normalise_rv_columns(pd.read_csv(spec['path'])) if kind == 'dataset_ref': import radvel as _radvel path = os.path.join(_radvel.DATADIR, spec['dataset']) return _normalise_rv_columns(pd.read_csv(path)) # Treat any other dict as a column-oriented mapping. return pd.DataFrame(spec) return pd.DataFrame(list(spec)) def _build_parameters(params_dict, nplanets, basis): """Translate a JSON-style mapping to a :class:`radvel.Parameters` instance.""" params_obj = radvel.Parameters(nplanets, basis=basis) for name, spec in params_dict.items(): if isinstance(spec, radvel.model.Parameter): params_obj[name] = spec continue if isinstance(spec, (int, float, np.floating, np.integer)): spec = {'value': float(spec)} kwargs = { 'value': float(spec['value']), 'vary': bool(spec.get('vary', True)), } if spec.get('mcmcscale') is not None: kwargs['mcmcscale'] = float(spec['mcmcscale']) if spec.get('linear', False): kwargs['linear'] = True params_obj[name] = radvel.model.Parameter(**kwargs) return params_obj def _build_prior(spec): """Dispatch a prior dict (with ``type`` key) to a :mod:`radvel.prior` instance.""" t = spec['type'] if t == 'gaussian': return radvel.prior.Gaussian(spec['param'], float(spec['mu']), float(spec['sigma'])) if t == 'jeffreys': return radvel.prior.Jeffreys(spec['param'], float(spec['minval']), float(spec['maxval'])) if t == 'modifiedjeffreys': return radvel.prior.ModifiedJeffreys(spec['param'], float(spec['minval']), float(spec['maxval']), float(spec['kneeval'])) if t == 'hardbounds': return radvel.prior.HardBounds(spec['param'], float(spec['minval']), float(spec['maxval'])) if t == 'eccentricity': return radvel.prior.EccentricityPrior(spec['num_planets'], upperlims=spec.get('upperlims', 0.99)) if t == 'positivek': return radvel.prior.PositiveKPrior(int(spec['num_planets'])) if t == 'secondaryeclipse': return radvel.prior.SecondaryEclipsePrior(int(spec['planet_num']), float(spec['ts']), float(spec['ts_err'])) if t == 'informative_baseline': return radvel.prior.InformativeBaselinePrior(spec['param'], float(spec['baseline']), duration=float(spec.get('duration', 0.0))) raise ValueError("Unknown prior type: {!r}".format(t)) def _finalise_posterior(P, decorr=False): """Build a :class:`radvel.posterior.Posterior` from a prepared namespace. ``P`` must expose ``params`` (a :class:`radvel.Parameters` already in the fitting basis), ``fitting_basis``, ``data`` (DataFrame with ``time``, ``mnvel``, ``errvel``, ``tel``), ``instnames``, ``time_base`` and ``priors``. Optional attributes ``hnames``, ``kernel_name`` and ``decorr_vars`` follow the same shape as the legacy setup-file namespace. This is the shared backend for both :func:`initialize_posterior` and :func:`initialize_posterior_from_dict`. """ params = P.params assert str(params.basis) == "Basis Object <{}>".format(P.fitting_basis), """ Parameters in config file must be converted to fitting basis. """ if decorr: try: decorr_vars = P.decorr_vars except AttributeError: raise Exception("--decorr option selected,\ but decorr_vars is not found in your setup file.") else: decorr_vars = [] for key in list(params.keys()): if key.startswith('logjit'): msg = """ Fitting log(jitter) is depreciated. Please convert your config files to initialize 'jit' instead of 'logjit' parameters. Converting 'logjit' to 'jit' for you now. """ warnings.warn(msg, DeprecationWarning, stacklevel=2) newkey = key.replace('logjit', 'jit') params[newkey] = radvel.model.Parameter(value=np.exp(params[key].value), vary=params[key].vary) del params[key] iparams = radvel.basis._copy_params(params) # Make sure we don't have duplicate indicies in the DataFrame P.data = P.data.reset_index(drop=True) # initialize RVmodel object mod = radvel.RVModel(params, time_base=P.time_base) # initialize Likelihood objects for each instrument telgrps = P.data.groupby('tel').groups likes = {} for inst in P.instnames: assert inst in P.data.groupby('tel').groups.keys(), \ "No data found for instrument '{}'.\nInstruments found in this dataset: {}".format(inst, list(telgrps.keys())) decorr_vectors = {} if decorr: for d in decorr_vars: decorr_vectors[d] = P.data.iloc[telgrps[inst]][d].values try: hnames = P.hnames[inst] liketype = radvel.likelihood.GPLikelihood try: kernel_name = P.kernel_name[inst] # if kernel_name == "Celerite": # liketype = radvel.likelihood.CeleriteLikelihood if kernel_name == "Celerite": liketype = radvel.likelihood.CeleriteLikelihood except AttributeError: kernel_name = "QuasiPer" except AttributeError: liketype = radvel.likelihood.RVLikelihood kernel_name = None hnames = None likes[inst] = liketype( mod, P.data.iloc[telgrps[inst]].time, P.data.iloc[telgrps[inst]].mnvel, P.data.iloc[telgrps[inst]].errvel, hnames=hnames, suffix='_'+inst, kernel_name=kernel_name, decorr_vars=decorr_vars, decorr_vectors=decorr_vectors ) likes[inst].params['gamma_'+inst] = iparams['gamma_'+inst] likes[inst].params['jit_'+inst] = iparams['jit_'+inst] like = radvel.likelihood.CompositeLikelihood(list(likes.values())) # Initialize Posterior object post = radvel.posterior.Posterior(like) post.priors = P.priors return P, post
[docs] def round_sig(x: float, sig: int = 2) -> float: """Round by significant figures Args: x (float): number to be rounded sig (int): (optional) number of significant figures to retain Returns: float: x rounded to sig significant figures """ if x == 0: return 0.0 return round(x, sig-int(np.floor(np.log10(abs(x))))-1)
[docs] def sigfig(med: float, errlow: float, errhigh: float | None = None) -> tuple[float, float, float]: """ Format values with errors into an equal number of signficant figures. Args: med (float): median value errlow (float): lower errorbar errhigh (float): upper errorbar Returns: tuple: (med,errlow,errhigh) rounded to the lowest number of significant figures """ if errhigh is None: errhigh = errlow ndec = Decimal(str(errlow)).as_tuple().exponent if abs(Decimal(str(errhigh)).as_tuple().exponent) > abs(ndec): ndec = Decimal(str(errhigh)).as_tuple().exponent if ndec < -1: tmpmed = round(med, abs(ndec)) p = 0 if med != 0: while tmpmed == 0: tmpmed = round(med, abs(ndec)+p) p += 1 med = tmpmed elif (ndec == -1 and str(errhigh)[-1] == '0') and (ndec == -1 and str(errlow)[-1] == '0') or ndec == 0: errlow = int(round_sig(errlow)) errhigh = int(round(errhigh)) med = int(round(med)) else: med = round(med, abs(ndec)) return med, errlow, errhigh
[docs] def time_print(tdiff: float) -> tuple[float, str]: """Print time Helper function to print time remaining in sensible units. Args: tdiff (float): time in seconds Returns: tuple: (float time, string units) """ units = 'seconds' if tdiff > 60: tdiff /= 60 units = 'minutes' if tdiff > 60: tdiff /= 60 units = 'hours' if tdiff > 24: tdiff /= 24 units = 'days' return tdiff, units
[docs] def timebin( time: NDArray, meas: NDArray, meas_err: NDArray, binsize: float ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Bin in equal sized time bins This routine bins a set of times, measurements, and measurement errors into time bins. All inputs and outputs should be floats or double. binsize should have the same units as the time array. (from Andrew Howard, ported to Python by BJ Fulton) Args: time (array): array of times meas (array): array of measurements to be comined meas_err (array): array of measurement uncertainties binsize (float): width of bins in same units as time array Returns: tuple: (bin centers, binned measurements, binned uncertainties) """ ind_order = np.argsort(time) time = time[ind_order] meas = meas[ind_order] meas_err = meas_err[ind_order] ct = 0 while ct < len(time): ind = np.where((time >= time[ct]) & (time < time[ct]+binsize))[0] num = len(ind) wt = (1./meas_err[ind])**2. # weights based in errors wt = wt/np.sum(wt) # normalized weights if ct == 0: time_out = [np.sum(wt*time[ind])] meas_out = [np.sum(wt*meas[ind])] meas_err_out = [1./np.sqrt(np.sum(1./(meas_err[ind])**2))] else: time_out.append(np.sum(wt*time[ind])) meas_out.append(np.sum(wt*meas[ind])) meas_err_out.append(1./np.sqrt(np.sum(1./(meas_err[ind])**2))) ct += num return time_out, meas_out, meas_err_out
[docs] def bintels( t: NDArray[float], vel: np.ndarray[float], err: np.ndarray[float], telvec: np.ndarray[str], binsize: float = 1/2. ) -> tuple[NDArray[float],NDArray[float],NDArray[float],NDArray[str]]: """Bin velocities by instrument Bin RV data with bins of with binsize in the units of t. Will not bin data from different telescopes together since there may be offsets between them. Args: t (array): array of timestamps vel (array): array of velocities err (array): array of velocity uncertainties telvec (array): array of strings corresponding to the instrument name for each velocity binsize (float): (optional) width of bin in units of t (default=1/2.) Returns: tuple: (bin centers, binned measurements, binned uncertainties, binned instrument codes) """ # Bin RV data with bins of with binsize in the units of t. # Will not bin data from different telescopes together since there may # be offsets between them. ntels = len(np.unique(telvec)) if ntels == 1: t_bin, vel_bin, err_bin = timebin(t, vel, err, binsize=binsize) return t_bin, vel_bin, err_bin, telvec[0:len(t_bin)] uniqorder = np.argsort(np.unique(telvec, return_index=1)[1]) uniqsort = np.unique(telvec)[uniqorder] rvtimes = np.array([]) rvdat = np.array([]) rverr = np.array([]) newtelvec = np.array([]) for i, tel in enumerate(uniqsort): pos = np.where(telvec == tel) t_bin, vel_bin, err_bin = timebin( t[pos], vel[pos], err[pos], binsize=binsize ) rvtimes = np.hstack((rvtimes, t_bin)) rvdat = np.hstack((rvdat, vel_bin)) rverr = np.hstack((rverr, err_bin)) newtelvec = np.hstack((newtelvec, np.array([tel]*len(t_bin)))) return rvtimes, rvdat, rverr, newtelvec
[docs] def fastbin( x: NDArray[float], y: NDArray[float], nbins: int = 30 ) -> tuple[NDArray[float], NDArray[float], NDArray[float]]: """Fast binning Fast binning function for equally spaced data Args: x (array): independent variable y (array): dependent variable nbins (int): number of bins Returns: tuple: (bin centers, binned measurements, binned uncertainties) """ n, _ = np.histogram(x, bins=nbins) sy, _ = np.histogram(x, bins=nbins, weights=y) sy2, _ = np.histogram(x, bins=nbins, weights=y*y) with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="invalid value encountered", category=RuntimeWarning) bindat = sy / n binerr = np.sqrt(sy2/n - bindat*bindat) / np.sqrt(n) bint = (_[1:] + _[:-1])/2. binN = n pos = binN >= 3 # 0.5 * np.mean(binN) bint = bint[pos] bindat = bindat[pos] binerr = binerr[pos] pos = bint > 0 bint = bint[pos] bindat = bindat[pos] binerr = binerr[pos] return bint, bindat, binerr
[docs] def t_to_phase(params: Parameters, t: NDArray[float], num_planet: int, cat: bool = False) -> NDArray[float]: """Time to phase Convert JD to orbital phase Args: params (radvel.params.RVParameters): RV parameters object t (array): JD timestamps num_planet (int): Which planet's ephemeris to phase fold on cat (bool): Concatenate/double the output phase array to extend from 0 to 2 Returns: array: orbital phase at each timestamp """ if ('tc%i' % num_planet) in params: timeparam = 'tc%i' % num_planet elif ('tp%i' % num_planet) in params: timeparam = 'tp%i' % num_planet P = params['per%i' % num_planet].value tc = params[timeparam].value phase = np.mod(t - tc, P) phase /= P if cat: phase = np.concatenate((phase, phase+1)) return phase
def t_to_phase_vector(vector: Vector, t: NDArray[float], num_planet: int, cat: bool = False) -> NDArray[float]: synth_params = vector.params.basis.v_to_synth(vector) P = synth_params[-5+(5*num_planet)][0] tc = synth_params[-4+(5*num_planet)][0] phase = np.mod(t - tc, P) phase /= P if cat: phase = np.concatenate((phase, phase + 1)) return phase
[docs] @contextmanager def working_directory(dir: str) -> Generator[None, None, None]: """Do something in a directory Function to use with `with` statements. Args: dir (string): name of directory to work in Example: >>> with workdir('/temp'): # do something within the /temp directory """ cwd = os.getcwd() os.chdir(dir) try: yield finally: os.chdir(cwd)
def cmd_exists(cmd: str) -> bool: return any( os.access(os.path.join(path, cmd), os.X_OK) for path in os.environ["PATH"].split(os.pathsep))
[docs] def date2jd(date: datetime) -> float: """ Convert datetime object to JD" Args: date (datetime.datetime): date to convert Returns: float: Julian date """ jd_td = date - datetime(2000, 1, 1, 12, 0, 0) jd = 2451545.0 + jd_td.days + jd_td.seconds/86400.0 return jd
[docs] def jd2date(jd: float) -> datetime: """ Convert JD to datetime.datetime object Args: jd (float): Julian date Returns: datetime.datetime: calendar date """ mjd = jd - 2400000.5 td = timedelta(days=mjd) dt = datetime(1858, 11, 17, 0, 0, 0) + td return dt
[docs] def geterr(vec: NDArray[float], angular: bool = False) -> tuple[float, float, float]: """ Calculate median, 15.9, and 84.1 percentile values for a given vector. Args: vec (array): vector, usually an MCMC chain for one parameter angular (bool [optioanl]): Is this an angular parameter? if True vec should be in radians. This will perform some checks to ensure proper boundary wrapping. Returns: tuple: 50, 15.9 and 84.1 percentiles """ try: vec = vec.values except AttributeError: pass if angular: val, edges = np.histogram(vec, bins=50) med = edges[np.argmax(val)] if med > np.radians(90): vec[vec < np.radians(0)] = vec[vec < np.radians(0)] + np.radians(360) if med <= np.radians(-90): vec[vec >= np.radians(0)] = vec[vec >= np.radians(0)] - np.radians(360) med = np.median(vec) else: med = np.median(vec) s = sorted(vec) errlow = med - s[int(0.159*len(s))] errhigh = s[int(0.841*len(s))] - med return med, errlow, errhigh
[docs] def semi_amplitude(Msini: float, P: float, Mtotal: float, e: float, Msini_units: str = 'jupiter') -> float: """Compute Doppler semi-amplitude Args: Msini (float): mass of planet [Mjup] P (float): Orbital period [days] Mtotal (float): Mass of star + mass of planet [Msun] e (float): eccentricity Msini_units (Optional[str]): Units of Msini {'earth','jupiter'} default: 'jupiter' Returns: Doppler semi-amplitude [m/s] """ # convert inputs to array so they work with units P = np.array(P) Msini = np.array(Msini) Mtotal = np.array(Mtotal) e = np.array(e) P = (P * u.d).to(u.year).value if Msini_units.lower() == 'jupiter': pass elif Msini_units.lower() == 'earth': Msini = (Msini * u.M_earth).to(u.M_jup).value else: raise Exception("Msini_units must be 'earth', or 'jupiter'") K = K_0*(1 - e**2)**-0.5*Msini*P**(-1.0/3.0)*Mtotal**(-2.0 / 3.0) return K
[docs] def semi_major_axis(P: float, Mtotal: float) -> float: """Semi-major axis Kepler's third law Args: P (float): Orbital period [days] Mtotal (float): Mass [Msun] Returns: float or array: semi-major axis in AU """ # convert inputs to array so they work with units P = np.array(P) Mtotal = np.array(Mtotal) Mtotal = Mtotal*c.M_sun.value P = (P * u.d).to(u.second).value G = c.G.value a = ((P**2)*G*Mtotal/(4*(np.pi)**2))**(1/3.) a = a/c.au.value return a
[docs] def Msini( K: float | NDArray[float], P: float | NDArray[float], Mstar: float | NDArray[float], e: float | NDArray[float], Msini_units: str = 'earth' ) -> float | NDArray[float]: """Calculate Msini Calculate Msini for a given K, P, stellar mass, and e Args: K (float or array: Doppler semi-amplitude [m/s] P (float or array): Orbital period [days] Mstar (float or array): Mass of star [Msun] e (float or array): eccentricity Msini_units (Optional[str]): Units of Msini {'earth','jupiter'} default: 'earth' Returns: float or array: Msini [units = Msini_units] """ # convert inputs to array so they work with units P = np.array(P) Mstar = np.array(Mstar) K = np.array(K) e = np.array(e) G = c.G.value # added gravitational constant Mjup = c.M_jup.value # added Jupiter's mass Msun = c.M_sun.value # added sun's mass Mstar = Mstar*Msun Mstar = np.array(Mstar) P_year = (P * u.d).to(u.year).value P = (P * u.d).to(u.second).value # First assume that Mp << Mstar Msini = K / K_0 * np.sqrt(1.0 - e ** 2.0) * (Mstar/Msun) ** (2.0 / 3.0) * P_year ** (1 / 3.0) # Use correct calculation if any elements are >10% of the stellar mass if (np.array(((Msini * u.Mjup).to(u.M_sun) / (Mstar/Msun)).value > 0.10)).any(): with warnings.catch_warnings(): warnings.simplefilter("always") warnings.warn("Mpsini << Mstar assumption broken, correcting Msini calculation.", stacklevel=2) a = K*(((2*(np.pi)*G)/P)**(-1/3.))*np.sqrt(1-(e**2)) Msini = [] if isinstance(P, float): n_elements = 1 else: assert type(K) == type(P) == type(Mstar) == type(e), "All input data types must match." assert K.size == P.size == Mstar.size == e.size, "All input arrays must have the same length." n_elements = len(P) for i in range(n_elements): def func(x: float) -> float: try: return x - a[i]*((Mstar[i]+x)**(2/3.)) except IndexError: return x - a * ((Mstar + x) ** (2 / 3.)) sol = root(func, Mjup) Msini.append(sol.x[0]) Msini = np.array(Msini) Msini = Msini/Mjup if Msini_units.lower() == 'jupiter': pass elif Msini_units.lower() == 'earth': Msini = (Msini * u.M_jup).to(u.M_earth).value else: raise Exception("Msini_units must be 'earth', or 'jupiter'") return Msini
[docs] def density(mass: float, radius: float, MR_units: str = 'earth') -> float: """Compute density from mass and radius Args: mass (float): mass [MR_units] radius (float): radius [MR_units] MR_units (string): (optional) units of mass and radius. Must be 'earth', or 'jupiter' (default 'earth'). Returns: float: density in g/cc """ mass = np.array(mass) radius = np.array(radius) if MR_units.lower() == 'earth': uradius = u.R_earth umass = u.M_earth elif MR_units.lower() == 'jupiter': uradius = u.R_jup umass = u.M_jup else: raise Exception("MR_units must be 'earth', or 'jupiter'") vol = 4. / 3. * np.pi * (radius * uradius) ** 3 rho = ((mass * umass / vol).to(u.g / u.cm ** 3)).value return rho
[docs] def draw_models_from_chain( mod: GeneralRVModel, chain: DataFrame, t: NDArray[float], nsamples: int = 50 ) -> NDArray[float]: """Draw Models from Chain Given an MCMC chain of parameters, draw representative parameters and synthesize models. Args: mod (radvel.RVmodel) : RV model chain (DataFrame): pandas DataFrame with different values from MCMC chain t (array): time range over which to synthesize models nsamples (int): number of draws Returns: array: 2D array with the different models as different rows """ np.random.seed(0) chain_samples = chain.iloc[np.random.choice(chain.index, nsamples)] models = [] for i in chain_samples.index: params = np.array(chain.loc[i, mod.list_vary_params()]) params = mod.array_to_params(params) models += [mod(t)] models = np.vstack(models) return models