from typing import Iterable, Optional, Tuple
import numpy as np
from astromodels import (
Cubic,
Gaussian,
Line,
Log_normal,
Model,
PointSource,
Quadratic,
Quartic,
)
from threeML.bayesian.bayesian_analysis import BayesianAnalysis
from threeML.classicMLE.joint_likelihood import JointLikelihood
from threeML.config.config import threeML_config
from threeML.config.config_utils import get_value
from threeML.data_list import DataList
from threeML.exceptions.custom_exceptions import BadCovariance # , FitFailed
from threeML.io.logging import setup_logger, silence_console_log
from threeML.minimizer.grid_minimizer import AllFitFailed
from threeML.minimizer.minimization import (
CannotComputeCovariance,
FitFailed,
GlobalMinimization,
LocalMinimization,
)
from threeML.plugins.UnbinnedPoissonLike import EventObservation, UnbinnedPoissonLike
from threeML.plugins.XYLike import XYLike
log = setup_logger(__name__)
# we include the line twice to mimic a constant
_grade_model_lookup = (Line, Line, Quadratic, Cubic, Quartic)
[docs]
class Polynomial(object):
def __init__(self, coefficients: Iterable[float], is_integral: bool = False):
"""A polynomial.
:param coefficients: array of poly coefficients
:param is_integral: if this polynomial is an
"""
self._coefficients: Iterable[float] = coefficients
self._degree: int = len(coefficients) - 1
log.debug(f"creating polynomial of degree {self._degree}")
log.debug(f"with coefficients {self._coefficients}")
self._i_plus_1: np.ndarray = np.array(
list(range(1, self._degree + 1 + 1)), dtype=float
)
self._cov_matrix: np.ndarray = np.zeros((self._degree + 1, self._degree + 1))
# we can fix some things for speed
# we only need to set the coeff for the
# integral polynomial
if not is_integral:
log.debug("This is NOT and intergral polynomial")
integral_coeff = [0]
integral_coeff.extend(
[
self._coefficients[i - 1] / float(i)
for i in range(1, self._degree + 1 + 1)
]
)
self._integral_polynomial: "Polynomial" = Polynomial(
integral_coeff, is_integral=True
)
[docs]
@classmethod
def from_previous_fit(cls, coefficients, covariance) -> "Polynomial":
log.debug("restoring polynomial from previous fit")
poly = Polynomial(coefficients=coefficients)
poly._cov_matrix = covariance
return poly
@property
def degree(self) -> int:
"""The polynomial degree :return:"""
return self._degree
@property
def error(self):
"""The error on the polynomial coefficients :return:"""
return np.sqrt(self._cov_matrix.diagonal())
def __get_coefficient(self):
"""Gets the coefficients."""
return np.array(self._coefficients)
def ___get_coefficient(self):
"""Indirect coefficient getter."""
return self.__get_coefficient()
def __set_coefficient(self, val):
"""Sets the coefficients."""
self._coefficients = val
integral_coeff = [0]
integral_coeff.extend(
[
self._coefficients[i - 1] / float(i)
for i in range(1, self._degree + 1 + 1)
]
)
self._integral_polynomial = Polynomial(integral_coeff, is_integral=True)
def ___set_coefficient(self, val):
"""Indirect coefficient setter."""
return self.__set_coefficient(val)
coefficients = property(
___get_coefficient,
___set_coefficient,
doc="""Gets or sets the coefficients of the polynomial.""",
)
def __repr__(self):
return f"({self._coefficients})"
def __call__(self, x):
result = 0
for coefficient in self._coefficients[::-1]:
result = result * x + coefficient
return result
[docs]
def set_covariace_matrix(self, matrix) -> None:
self._cov_matrix = matrix
@property
def covariance_matrix(self) -> np.ndarray:
return self._cov_matrix
[docs]
def integral(self, xmin, xmax) -> float:
"""Evaluate the integral of the polynomial between xmin and xmax."""
return self._integral_polynomial(xmax) - self._integral_polynomial(xmin)
def _eval_basis(self, x):
return (1.0 / self._i_plus_1) * np.power(x, self._i_plus_1)
[docs]
def integral_error(self, xmin, xmax) -> float:
"""Computes the integral error of an interval :param xmin: start of the
interval :param xmax: stop of the interval :return: interval error."""
c = self._eval_basis(xmax) - self._eval_basis(xmin)
tmp = c.dot(self._cov_matrix)
err2 = tmp.dot(c)
return np.sqrt(err2)
[docs]
def polyfit(
x: Iterable[float],
y: Iterable[float],
grade: int,
exposure: Iterable[float],
bayes: Optional[bool] = False,
) -> Tuple[Polynomial, float]:
"""Function to fit a polynomial to data. not a member to allow parallel
computation.
:param x: the x coord of the data
:param y: the y coord of the data
:param grade: the polynomical order or grade
:param expousure: the exposure of the interval
:param bayes: to do a bayesian fit or not
"""
# Check that we have enough counts to perform the fit, otherwise
# return a "zero polynomial"
log.debug(f"starting polyfit with grade {grade} ")
bayes = get_value("bayes", bayes, bool, threeML_config.time_series.fit.bayes)
nan_mask = np.isnan(y)
y = y[~nan_mask]
x = x[~nan_mask]
exposure = exposure[~nan_mask]
non_zero_mask = y > 0
n_non_zero = non_zero_mask.sum()
if n_non_zero == 0:
log.debug("no counts, return 0")
# No data, nothing to do!
return Polynomial([0.0] * (grade + 1)), 0.0
# create 3ML plugins and fit them with 3ML!
# should eventuallly allow better config
# seelct the model based on the grade
shape = _grade_model_lookup[grade]()
ps = PointSource("_dummy", 0, 0, spectral_shape=shape)
model = Model(ps)
avg = np.mean(y / exposure)
log.debug(f"starting polyfit with avg norm {avg}")
with silence_console_log():
xy = XYLike(
"series", x=x, y=y, exposure=exposure, poisson_data=True, quiet=True
)
# from matplotlib import pyplot as plt
# xy.plot()
# plt.plot(x,exposure)
if not bayes:
# make sure the model is positive
for i, (k, v) in enumerate(model.free_parameters.items()):
if i == 0:
v.bounds = (0, None)
v.value = avg
else:
v.value = 0.0
# v.bounds = (-1e-3, 1e-3)
# we actually use a line here
# because a constant is returns a
# single number
if grade == 0:
shape.b = 0
shape.b.fix = True
jl: JointLikelihood = JointLikelihood(model, DataList(xy))
jl.set_minimizer("minuit")
# if the fit falis, retry and then just accept
# print('polynomials grade:',grade)
# print('polynomials model:')
# model.display(complete=True)
try:
# print ("=================>FIRST FIT!!!!")
jl.fit(quiet=True)
except (FitFailed, BadCovariance, AllFitFailed, CannotComputeCovariance):
# print ("=================>FIRST FIT FAILED!!!!")
log.debug("1st fit failed")
try:
# print ("=================>SECOND FIT!!!!")
jl.fit(quiet=True)
except (
FitFailed,
BadCovariance,
AllFitFailed,
CannotComputeCovariance,
):
# print ("=================>SECOND FIT FAILED!!!!")
log.debug("all MLE fits failed")
pass
# plt.plot(x,model._dummy.spectrum.main(x),'k:')
# plt.show()
coeff = [v.value for _, v in model.free_parameters.items()]
log.debug(f"got coeff: {coeff}")
final_polynomial = Polynomial(coeff)
try:
final_polynomial.set_covariace_matrix(jl.results.covariance_matrix)
except Exception:
log.exception("Fit failed in channel")
raise FitFailed()
min_log_likelihood = xy.get_log_like()
else:
# set smart priors
for i, (k, v) in enumerate(model.free_parameters.items()):
if i == 0:
v.bounds = (0, None)
v.prior = Log_normal(
mu=np.log(avg), sigma=np.max([np.log(avg / 2), 1])
)
v.value = 1
else:
v.prior = Gaussian(mu=0, sigma=2)
v.value = 1e-2
# we actually use a line here
# because a constant is returns a
# single number
if grade == 0:
shape.b = 0
shape.b.fix = True
ba: BayesianAnalysis = BayesianAnalysis(model, DataList(xy))
ba.set_sampler("emcee")
ba.sampler.setup(n_iterations=500, n_burn_in=200, n_walkers=20)
ba.sample(quiet=True)
ba.restore_median_fit()
coeff = [v.value for _, v in model.free_parameters.items()]
log.debug(f"got coeff: {coeff}")
final_polynomial = Polynomial(coeff)
final_polynomial.set_covariace_matrix(
ba.results.estimate_covariance_matrix()
)
min_log_likelihood = xy.get_log_like()
log.debug(f"-min loglike: {-min_log_likelihood}")
return final_polynomial, -min_log_likelihood
[docs]
def unbinned_polyfit(
events: Iterable[float],
grade: int,
t_start: Iterable[float],
t_stop: Iterable[float],
exposure: float,
bayes: bool,
) -> Tuple[Polynomial, float]:
"""Function to fit a polynomial to unbinned event data. not a member to
allow parallel computation.
:param events: the events to fit
:param grade: the polynomical order or grade
:param t_start: the start time to fit over
:param t_stop: the end time to fit over
:param expousure: the exposure of the interval
:param bayes: to do a bayesian fit or not
"""
log.debug(f"starting unbinned_polyfit with grade {grade}")
log.debug(f"have {len(events)} events with {exposure} exposure")
# create 3ML plugins and fit them with 3ML!
# select the model based on the grade
bayes = get_value("bayes", bayes, bool, threeML_config.time_series.fit.bayes)
if len(events) == 0:
log.debug("no events! returning zero")
return Polynomial([0] * (grade + 1)), 0
shape = _grade_model_lookup[grade]()
with silence_console_log():
ps = PointSource("dummy", 0, 0, spectral_shape=shape)
model = Model(ps)
observation = EventObservation(
events, exposure, t_start, t_stop, for_timeseries=True
)
xy = UnbinnedPoissonLike("series", observation=observation)
if not bayes:
# make sure the model is positive
for i, (k, v) in enumerate(model.free_parameters.items()):
if i == 0:
v.bounds = (0, None)
v.value = 10
else:
v.value = 0.0
# we actually use a line here
# because a constant is returns a
# single number
if grade == 0:
shape.b = 0
shape.b.fix = True
jl: JointLikelihood = JointLikelihood(model, DataList(xy))
grid_minimizer = GlobalMinimization("grid")
local_minimizer = LocalMinimization("minuit")
my_grid = {model.dummy.spectrum.main.shape.a: np.logspace(0, 3, 10)}
grid_minimizer.setup(second_minimization=local_minimizer, grid=my_grid)
jl.set_minimizer(grid_minimizer)
# if the fit falis, retry and then just accept
try:
jl.fit(quiet=True)
except (FitFailed, BadCovariance, AllFitFailed, CannotComputeCovariance):
try:
jl.fit(quiet=True)
except (
FitFailed,
BadCovariance,
AllFitFailed,
CannotComputeCovariance,
):
log.debug("all MLE fits failed, returning zero")
return Polynomial([0] * (grade + 1)), 0
coeff = [v.value for _, v in model.free_parameters.items()]
log.debug(f"got coeff: {coeff}")
final_polynomial = Polynomial(coeff)
final_polynomial.set_covariace_matrix(jl.results.covariance_matrix)
min_log_likelihood = xy.get_log_like()
else:
# set smart priors
for i, (k, v) in enumerate(model.free_parameters.items()):
if i == 0:
v.bounds = (0, None)
v.prior = Log_normal(mu=np.log(5), sigma=np.log(5))
v.value = 1
else:
v.prior = Gaussian(mu=0, sigma=0.5)
v.value = 0.1
# we actually use a line here
# because a constant is returns a
# single number
if grade == 0:
shape.b = 0
shape.b.fix = True
ba: BayesianAnalysis = BayesianAnalysis(model, DataList(xy))
ba.set_sampler("emcee")
ba.sampler.setup(n_iterations=500, n_burn_in=200, n_walkers=20)
ba.sample(quiet=True)
ba.restore_median_fit()
coeff = [v.value for _, v in model.free_parameters.items()]
log.debug(f"got coeff: {coeff}")
final_polynomial = Polynomial(coeff)
final_polynomial.set_covariace_matrix(
ba.results.estimate_covariance_matrix()
)
min_log_likelihood = xy.get_log_like()
log.debug(f"-min loglike: {-min_log_likelihood}")
return final_polynomial, -min_log_likelihood