Source code for sora.ephem.utils

import warnings

__all__ = ['getBSPfromJPL', 'ephem_kernel', 'ephem_horizons']


[docs] def getBSPfromJPL(identifier, initial_date, final_date, email, directory='./'): """Downloads BSP files from JPL database. BSP files, which have information to generate the ephemeris of the objects, will be downloaded and named as (without spaces): '[identifier].bsp'. Important --------- It is not possible to download BSP files for Planets or Satellites. Parameters ---------- identifier : `str`, `list` Identifier of the object(s). It can be the `name`, `number` or `SPK ID`. It can also be a list of objects. Examples: ``'2137295'``, ``'1999 RB216'``, ``'137295'``, ``['2137295', '136199', '1999 RC216', 'Chariklo']``. initial_date : `str` Date the bsp file is to begin, within span `1900-2100`. Examples: ``'2003-02-01'``, ``'2003-3-5'``. final_date : `str` Date the bsp file is to end, within span [1900-2100]. Must be more than 32 days later than `initial_date`. Examples: ``'2006-01-12'``, ``'2006-1-12'``. email : `str` User's e-mail contact address. Required by JPL web service. Example: ``username@user.domain.name``. directory : `str` Directory path to save the bsp files. """ warnings.warn("This function is no longer working due to changes in the JPL Query service. Alternatives are being considered.") return import pathlib import shutil import requests from datetime import datetime date1 = datetime.strptime(initial_date, '%Y-%m-%d') date2 = datetime.strptime(final_date, '%Y-%m-%d') diff = date2 - date1 if diff.days <= 32: raise ValueError('The [final_date] must be more than 32 days later than [initial_date]') else: if isinstance(identifier, str): identifier = [identifier] url_jpl = 'https://ssd.jpl.nasa.gov/x/smb_spk.cgi?OPTION=Make+SPK' lim, opt, n = 10, 'y', len(identifier) if n > lim: parameters = {'OBJECT': identifier[0], 'START': date1.strftime('%Y-%b-%d'), 'STOP': date2.strftime('%Y-%b-%d'), 'EMAIL': email, 'TYPE': '-B'} t0 = datetime.now() r = requests.get(url_jpl, params=parameters, stream=True) tf = datetime.now() bsp_format = r.headers['Content-Type'] if r.status_code == requests.codes.ok and bsp_format == 'application/download': size0 = int(r.headers["Content-Length"]) / 1024 / 1024 # MB print('Estimated time to download {} ({:.3f} MB) files: {}'. format(n, n * size0, n * (tf - t0))) opt = input('\nAre you sure? (y/n):') else: raise ValueError('It was not able to download the bsp file for object {}\n'. format(identifier[0])) if opt in ['y', 'Y', 'YES', 'yes']: if directory != './': path = pathlib.Path(directory) if not path.exists(): raise ValueError('The directory {} does not exist!'.format(path)) directory += '/' print("\nDownloading bsp file(s) ...\n") m, size = 0, 0.0 failed = [] t0 = datetime.now() for obj in identifier: filename = obj.replace(' ', '') + '.bsp' parameters = {'OBJECT': obj, 'START': date1.strftime('%Y-%b-%d'), 'STOP': date2.strftime('%Y-%b-%d'), 'EMAIL': email, 'TYPE': '-B'} r = requests.get(url_jpl, params=parameters, stream=True) bsp_format = r.headers['Content-Type'] if r.status_code == requests.codes.ok and bsp_format == 'application/download': size += int(r.headers["Content-Length"]) / 1024 / 1024 m += 1 with open(directory + filename, 'wb') as f: r.raw.decode_content = True shutil.copyfileobj(r.raw, f) else: failed.append(obj) tf = datetime.now() print("{} ({:.3f} MB) file(s) was/were downloaded".format(m, size)) print("Download time: {}".format(tf - t0)) if len(failed) > 0: raise ValueError( 'It was not able to download the bsp files for next objects: {}'.format(sorted(failed)))
[docs] def ephem_kernel(time, target, observer, kernels, output='ephemeris'): """Calculates the ephemeris from kernel files. Parameters ---------- time : `str`, `astropy.time.Time` Reference instant to calculate ephemeris. It can be a string in the ISO format (yyyy-mm-dd hh:mm:ss.s) or an astropy Time object. target : `str` IAU (kernel) code of the target. observer : `str` IAU (kernel) code of the observer. kernels : `list`, `str` List of paths for all the kernels. output : `str` The output of data. ``ephemeris`` will output the observed position, while ``vector`` will output the Cartesian state vector, without light time correction. Returns ------- coord : `astropy.coordinates.SkyCoord` ICRS coordinate of the target. """ import numpy as np import astropy.units as u import astropy.constants as const import spiceypy as spice from astropy.coordinates import SkyCoord from astropy.time import Time from sora.observer import Observer, Spacecraft origins = {'geocenter': '399', 'barycenter': '0'} location = origins.get(observer) if not location and isinstance(observer, str): location = observer if isinstance(observer, (Observer, Spacecraft)): location = str(getattr(observer, "spkid", None)) if not location: raise ValueError("observer must be 'geocenter', 'barycenter' or an observer object.") if output not in ['ephemeris', 'vector']: raise ValueError("output must be 'ephemeris' or 'vector'") if type(kernels) == str: kernels = [kernels] for kern in kernels: spice.furnsh(kern) time = Time(time) t0 = Time('J2000', scale='tdb') dt = (time - t0) delt = 0 * u.s # calculates vector Solar System Barycenter -> Observer if isinstance(observer, (Observer, Spacecraft)): spice.kclear() # necessary because observer.get_vector() may load different kernels position1 = observer.get_vector(time=time, origin='barycenter') for kern in kernels: spice.furnsh(kern) else: position1 = spice.spkpos(location, dt.sec, 'J2000', 'NONE', '0')[0] position1 = SkyCoord(*position1.T * u.km, representation_type='cartesian') while True: # calculates new time tempo = dt - delt # calculates vector Solar System Barycenter -> Object position2 = spice.spkpos(target, tempo.sec, 'J2000', 'NONE', '0')[0] position2 = SkyCoord(*position2.T * u.km, representation_type='cartesian') position = position2.cartesian - position1.cartesian # calculates new light time delt = (position.norm() / const.c).decompose() # if difference between new and previous light time is smaller than 0.001 sec, then continue. if output == 'vector' or np.all(np.absolute(((dt - tempo) - delt).sec) < 0.001): break coord = SkyCoord(position, representation_type='cartesian') spice.kclear() if output == 'ephemeris': coord = SkyCoord(ra=coord.spherical.lon, dec=coord.spherical.lat, distance=coord.spherical.distance, obstime=time) if not coord.isscalar and len(coord) == 1: coord = coord[0] return coord
[docs] def ephem_horizons(time, target, observer, id_type='smallbody', output='ephemeris'): """Calculates the ephemeris from Horizons. Parameters ---------- time : `str`, `astropy.time.Time` Reference instant to calculate ephemeris. It can be a string in the ISO format (yyyy-mm-dd hh:mm:ss.s) or an astropy Time object. target : `str` IAU (kernel) code of the target. observer : `str` IAU (kernel) code of the observer. id_type : `str` Type of target object options: ``smallbody``, ``majorbody`` (planets but also anything that is not a small body), ``designation``, ``name``, ``asteroid_name``, ``comet_name``, ``id`` (Horizons id number), or ``smallbody`` (find the closest match under any id_type). output : `str` The output of data. ``ephemeris`` will output the observed position, while ``vector`` will output the Cartesian state vector, without light time correction. Returns ------- coord : `astropy.coordinates.SkyCoord` ICRS coordinate of the target. Notes ----- If the interval of time is larger than 30 days or so, a timeout error may be raised. The maximum interval will depend on the user connection. """ import astropy.units as u from astroquery.jplhorizons import Horizons from astropy.time import Time from astropy.coordinates import SkyCoord from sora.observer import Observer, Spacecraft from scipy import interpolate origins = {'geocenter': '@399', 'barycenter': '@0'} location = origins.get(observer) if not location and isinstance(observer, str): location = observer if isinstance(observer, (Observer, Spacecraft)): if getattr(observer, "code", None) is None: location = {'lon': observer.lon.deg, 'lat': observer.lat.deg, 'elevation': observer.height.to(u.km).value} else: location = f'{getattr(observer, "code", "")}@{getattr(observer, "spkid", "")}' if not location: raise ValueError("observer must be 'geocenter', 'barycenter' or an observer object.") if output not in ['ephemeris', 'vector']: raise ValueError("output must be 'ephemeris' or 'vector'") time = Time(time) time1 = getattr(time, {'ephemeris': 'utc', 'vector': 'tdb'}[output]).jd if not time.isscalar and len(time) > 50: step = '10m' if time.max() - time.min() > 30 * u.day: warnings.warn('Time interval may be too long. A timeout error may be raised.') if time.max() - time.min() <= 1 * u.day: step = '1m' time2 = {'start': (time.min() - 10 * u.min).iso.split('.')[0], 'stop': (time.max() + 10 * u.min).iso.split('.')[0], 'step': step, } else: time2 = time1 if getattr(observer, 'ephem', None) not in ['horizons', None]: warnings.warn('Ephemeris using kernel for the observer and Horizons for the target is under construction. ' 'We will use only Horizons.') id_type = None if id_type == 'majorbody' else id_type ob = Horizons(id=target, id_type=id_type, location=location, epochs=time2) if output == 'ephemeris': eph = ob.ephemerides(extra_precision=True, cache=False) obstime = Time(eph['datetime_jd'], format='jd', scale='utc') pos = SkyCoord(eph['RA'], eph['DEC'], eph['delta'], frame='icrs', obstime=obstime) else: vec = ob.vectors(refplane='earth', cache=False) obstime = Time(vec['datetime_jd'], format='jd', scale='tdb') pos = SkyCoord(*[vec[i] for i in ['x', 'y', 'z']] * u.AU, representation_type='cartesian', obstime=obstime) if isinstance(time2, dict): spl_x = interpolate.CubicSpline(obstime.jd, pos.cartesian.x.to(u.km)) spl_y = interpolate.CubicSpline(obstime.jd, pos.cartesian.y.to(u.km)) spl_z = interpolate.CubicSpline(obstime.jd, pos.cartesian.z.to(u.km)) pos = SkyCoord(x=spl_x(time1), y=spl_y(time1), z=spl_z(time1), unit=u.km, representation_type='cartesian') if output == 'ephemeris': pos = SkyCoord(ra=pos.spherical.lon, dec=pos.spherical.lat, distance=pos.spherical.distance) if not pos.isscalar and len(pos) == 1: pos = pos[0] return pos