import warnings
import numpy as np
__all__ = ['occ_detect']
[docs]
def occ_detect(flux, dflux, time, cycle, maximum_duration=None, dur_step=None, snr_limit=None,
n_detections=None, tmin=None, tmax=None, plot=False):
"""Detects automatically the occultation event in the light curve.
Detects a 'square well' shaped transit. All parameters are optional.
Parameters
----------
flux : `float` array
Flux of the time series. Dependent variable.
dflux: `float` array
Error in the flux. Error in the dependent variable.
time: `float` array
Time variable. Independent variable.
cycle: `float`
Sampling value of the time series.
maximum_duration : `float`, default: light curve time span
Maximum duration of the occultation event.
dur_step : `float`, default: 1/2 cycle
Step size to sweep occultation duration event.
snr_limit : `float`, default=None
Minimum occultation SNR.
n_detections : `int`, default=1
Number of detections regardless the SNR. `n_detections` is superseded by
`snr_limit`.
tmin : `float`, default=None
Lower limit in time to the lightcurve.
tmax : `float`, default=None
Upper limit in time to the lightcurve.
plot : `boolean`, default=False
True if output plots are desired.
Returns
-------
OrderedDict : `dict`
An ordered dictionary of :attr:`name`::attr:`value` pairs for each
parameter. 'occ_mask' parameter reflects `tmin` and `tmax` intervals
when applied.
Examples
--------
>>> from sora.lightcurve.occdetect import occ_detect
>>> params = occ_detect(flux, dflux, time, 0.2)
>>> params
{'rank': 1,
'occultation_duration': 40.1384063065052,
'central_time': 7916.773870512843,
'immersion_time': 7896.7046673595905,
'emersion_time': 7936.843073666096,
'time_err': 0.05011036992073059,
'depth': 0.8663887801707082,
'depth_err': 0.10986223384336465,
'baseline': 0.9110181732552853,
'baseline_err': 0.19045768512595365,
'snr': 7.886138392251848,
'occ_mask': array([False, False, False, ..., False, False, False])}
"""
original_array_lenght = len(time)
# Check if tmin == tmax
if (tmin is not None) and (tmax is not None) and (tmin == tmax):
raise ValueError("tmin should be less than tmax")
# Ensure that tmin < tmax
if (tmin is not None) and (tmax is not None) and (tmin > tmax):
tmin, tmax = tmax, tmin
# Assuming tmin and tmax are already defined variables
tidx0 = np.argmin(time <= tmin) if (tmin is not None) else 0
tidx1 = np.argmax(time >= tmax)-1 if (tmax is not None) else -1
# Adjust variables for tmin and tmax
time, flux = time[tidx0:tidx1].copy(), flux[tidx0:tidx1].copy()
dflux = dflux[tidx0:tidx1].copy() if dflux is not None else None
# duration of the light curve
time_span = time[-1] - time[0]
if maximum_duration and (maximum_duration > time_span):
warnings.warn('Occultation duration (maximum_duration={0}) '
'exceeds the time series length ({1:0.5f}).'
' maximum_duration reset to the time series length.'
.format(maximum_duration, time_span))
maximum_duration = time_span
if not maximum_duration:
maximum_duration = time_span
if not dur_step:
dur_step = cycle / 2
if dur_step < cycle / 2:
warnings.warn('The given dur_step is oversampled by a factor '
'of {0:0.1f} and has been reset to half a cycle '
'({1:0.4f}).'.format((cycle / 2.) / dur_step, cycle / 2.))
dur_step = cycle / 2
duration_grid = np.arange(dur_step, maximum_duration, dur_step)
# initial occultation mask (all data points)
mask = np.ones(len(time), dtype=bool)
# initial detection rank
rank = 1
if snr_limit:
# minimum SNR accepted in a detection for multiple search
snr_value = snr_limit + 1
occ0 = run_bls(flux, dflux, time, time_span, duration_grid)
mask *= ~occ0['occ_mask']
while snr_value > snr_limit:
rank += 1
occ1 = run_bls(flux, dflux, time, time_span, duration_grid, mask=mask,
rank=rank)
if occ1['snr'] > snr_limit:
snr_value = occ1['snr']
mask *= ~occ1['occ_mask']
occ0 = summarize_bls(occ0, occ1)
else:
snr_value = snr_limit
if plot:
plot_occ_detect(occ0, flux, time)
return occ0
elif n_detections:
# search the n best fits
occ0 = run_bls(flux, dflux, time, time_span, duration_grid)
mask *= ~occ0['occ_mask']
for i in range(n_detections - 1):
rank += 1
occ1 = run_bls(flux, dflux, time, time_span, duration_grid, mask=mask,
rank=rank)
mask *= ~occ1['occ_mask']
occ0 = summarize_bls(occ0, occ1)
if plot:
plot_occ_detect(occ0, flux, time)
return occ0
else:
# search only the first best fit
occ0 = run_bls(flux, dflux, time, time_span, duration_grid)
if plot:
plot_occ_detect(occ0, flux, time)
return occ0
def plot_occ_detect(occ, flux, time):
import matplotlib.pyplot as plt
n = np.size(occ['rank'])
if n > 1:
# case for multiple detections
plt.plot(time, flux, 'k.-')
mask = np.zeros(len(time), dtype=bool)
for i in range(n):
trues = np.sum(occ['occ_mask'][i])
plt.plot(time[occ['occ_mask'][i]], np.repeat(np.mean(flux[occ['occ_mask'][i]]), trues),
'.', label='Rank: ' + str(i + 1))
mask += occ['occ_mask'][i]
falses = list(mask).count(False)
plt.plot(time[~mask], np.repeat(np.mean(flux[~mask]), falses), 'r.', label='Baseline')
plt.xlabel('Time [seconds]')
plt.ylabel('Relative Flux')
plt.legend()
else:
# case for single occultation
trues = list(occ['occ_mask']).count(True)
falses = list(occ['occ_mask']).count(False)
plt.plot(time, flux, 'k.-')
plt.plot(time[occ['occ_mask']], np.repeat(np.mean(flux[occ['occ_mask']]), trues),
'.', label='Occultation')
plt.plot(time[~occ['occ_mask']], np.repeat(np.mean(flux[~occ['occ_mask']]), falses),
'r.', label='Baseline')
plt.xlabel('Time [seconds]')
plt.ylabel('Relative Flux')
plt.legend()
def run_bls(flux, dflux, time, per_grid, dur_grid, mask=None, rank=None):
""" Private function to find the best box fit suitable to the data
"""
from astropy.timeseries import BoxLeastSquares
# object with no occultation mask
mskmodel = BoxLeastSquares(time, flux, dy=dflux)
# if there is no dflux array, reset it to None in case of
# using a mask
if dflux is None:
dfluxmask = None
else:
dfluxmask = dflux[mask]
# object with occultation mask
if np.sum(mask):
model = BoxLeastSquares(time[mask], flux[mask],
dy=dfluxmask)
else:
model = mskmodel
r = model.power(per_grid, dur_grid, objective='snr', method='fast')
# statistics of the BLS fit
stats = model.compute_stats(r.period, r.duration, r.transit_time)
# occultation mask of the event with respect to all data
occ_mask = mskmodel.transit_mask(time, r.period, r.duration,
r.transit_time)
# parameters computation for clarity purposes
occultation_duration = r.duration[0]
central_time = stats['transit_times'][0]
immersion_time = stats['transit_times'][0] - r.duration[0] / 2
emersion_time = stats['transit_times'][0] + r.duration[0] / 2
time_err = np.median(time[1:-1] - time[0:-2]) / 2
depth = np.mean(flux[~occ_mask]) - np.mean(flux[occ_mask])
depth_err = np.std(flux[occ_mask], ddof=1)
baseline = np.mean(flux[~occ_mask])
baseline_err = np.std(flux[~occ_mask], ddof=1)
# If there is only one measurement during the occultation it will
# use the baseline_err to compute SNR, otherwise it will use depth_err
if np.sum(occ_mask) < 2:
snr = depth / baseline_err
else:
snr = depth / depth_err
# define rank
if rank:
rank = rank
else:
rank = 1
return {'rank': rank,
'occultation_duration': occultation_duration,
'central_time': central_time,
'immersion_time': immersion_time,
'emersion_time': emersion_time,
'time_err': time_err,
'depth': depth, 'depth_err': depth_err,
'baseline': baseline, 'baseline_err': baseline_err,
'snr': snr, 'occ_mask': occ_mask}
def summarize_bls(dict1, dict2):
""" Function to merge dictionaries returned by BLS and
to keep values of common keys in list.
"""
dict3 = {}
for key, value in dict1.items():
if key == 'occ_mask':
sz = int(np.size(dict1[key]) / np.size(dict2[key]))
if sz > 1:
k = [None] * (sz + 1)
for i in range(sz):
k[i] = dict1[key][i]
k[i + 1] = dict2[key]
dict3[key] = k
else:
dict3[key] = [dict1[key], dict2[key]]
else:
dict3[key] = np.append(dict1[key], dict2[key])
return dict3