import warnings
__all__ = ['getBSPfromJPL', 'ephem_kernel', 'ephem_horizons']
[docs]
def getBSPfromJPL(identifier, initial_date, final_date, email, directory='./'):
"""Downloads BSP files from JPL database.
BSP files, which have information to generate the ephemeris of the objects,
will be downloaded and named as (without spaces): '[identifier].bsp'.
Important
---------
It is not possible to download BSP files for Planets or Satellites.
Parameters
----------
identifier : `str`, `list`
Identifier of the object(s).
It can be the `name`, `number` or `SPK ID`.
It can also be a list of objects.
Examples: ``'2137295'``, ``'1999 RB216'``, ``'137295'``,
``['2137295', '136199', '1999 RC216', 'Chariklo']``.
initial_date : `str`
Date the bsp file is to begin, within span `1900-2100`.
Examples: ``'2003-02-01'``, ``'2003-3-5'``.
final_date : `str`
Date the bsp file is to end, within span [1900-2100].
Must be more than 32 days later than `initial_date`.
Examples: ``'2006-01-12'``, ``'2006-1-12'``.
email : `str`
User's e-mail contact address.
Required by JPL web service.
Example: ``username@user.domain.name``.
directory : `str`
Directory path to save the bsp files.
"""
warnings.warn("This function is no longer working due to changes in the JPL Query service. Alternatives are being considered.")
return
import pathlib
import shutil
import requests
from datetime import datetime
date1 = datetime.strptime(initial_date, '%Y-%m-%d')
date2 = datetime.strptime(final_date, '%Y-%m-%d')
diff = date2 - date1
if diff.days <= 32:
raise ValueError('The [final_date] must be more than 32 days later than [initial_date]')
else:
if isinstance(identifier, str):
identifier = [identifier]
url_jpl = 'https://ssd.jpl.nasa.gov/x/smb_spk.cgi?OPTION=Make+SPK'
lim, opt, n = 10, 'y', len(identifier)
if n > lim:
parameters = {'OBJECT': identifier[0], 'START': date1.strftime('%Y-%b-%d'),
'STOP': date2.strftime('%Y-%b-%d'), 'EMAIL': email, 'TYPE': '-B'}
t0 = datetime.now()
r = requests.get(url_jpl, params=parameters, stream=True)
tf = datetime.now()
bsp_format = r.headers['Content-Type']
if r.status_code == requests.codes.ok and bsp_format == 'application/download':
size0 = int(r.headers["Content-Length"]) / 1024 / 1024 # MB
print('Estimated time to download {} ({:.3f} MB) files: {}'.
format(n, n * size0, n * (tf - t0)))
opt = input('\nAre you sure? (y/n):')
else:
raise ValueError('It was not able to download the bsp file for object {}\n'.
format(identifier[0]))
if opt in ['y', 'Y', 'YES', 'yes']:
if directory != './':
path = pathlib.Path(directory)
if not path.exists():
raise ValueError('The directory {} does not exist!'.format(path))
directory += '/'
print("\nDownloading bsp file(s) ...\n")
m, size = 0, 0.0
failed = []
t0 = datetime.now()
for obj in identifier:
filename = obj.replace(' ', '') + '.bsp'
parameters = {'OBJECT': obj, 'START': date1.strftime('%Y-%b-%d'),
'STOP': date2.strftime('%Y-%b-%d'), 'EMAIL': email, 'TYPE': '-B'}
r = requests.get(url_jpl, params=parameters, stream=True)
bsp_format = r.headers['Content-Type']
if r.status_code == requests.codes.ok and bsp_format == 'application/download':
size += int(r.headers["Content-Length"]) / 1024 / 1024
m += 1
with open(directory + filename, 'wb') as f:
r.raw.decode_content = True
shutil.copyfileobj(r.raw, f)
else:
failed.append(obj)
tf = datetime.now()
print("{} ({:.3f} MB) file(s) was/were downloaded".format(m, size))
print("Download time: {}".format(tf - t0))
if len(failed) > 0:
raise ValueError(
'It was not able to download the bsp files for next objects: {}'.format(sorted(failed)))
[docs]
def ephem_kernel(time, target, observer, kernels, output='ephemeris'):
"""Calculates the ephemeris from kernel files.
Parameters
----------
time : `str`, `astropy.time.Time`
Reference instant to calculate ephemeris. It can be a string
in the ISO format (yyyy-mm-dd hh:mm:ss.s) or an astropy Time object.
target : `str`
IAU (kernel) code of the target.
observer : `str`
IAU (kernel) code of the observer.
kernels : `list`, `str`
List of paths for all the kernels.
output : `str`
The output of data. ``ephemeris`` will output the observed position,
while ``vector`` will output the Cartesian state vector, without
light time correction.
Returns
-------
coord : `astropy.coordinates.SkyCoord`
ICRS coordinate of the target.
"""
import numpy as np
import astropy.units as u
import astropy.constants as const
import spiceypy as spice
from astropy.coordinates import SkyCoord
from astropy.time import Time
from sora.observer import Observer, Spacecraft
origins = {'geocenter': '399', 'barycenter': '0'}
location = origins.get(observer)
if not location and isinstance(observer, str):
location = observer
if isinstance(observer, (Observer, Spacecraft)):
location = str(getattr(observer, "spkid", None))
if not location:
raise ValueError("observer must be 'geocenter', 'barycenter' or an observer object.")
if output not in ['ephemeris', 'vector']:
raise ValueError("output must be 'ephemeris' or 'vector'")
if type(kernels) == str:
kernels = [kernels]
for kern in kernels:
spice.furnsh(kern)
time = Time(time)
t0 = Time('J2000', scale='tdb')
dt = (time - t0)
delt = 0 * u.s
# calculates vector Solar System Barycenter -> Observer
if isinstance(observer, (Observer, Spacecraft)):
spice.kclear() # necessary because observer.get_vector() may load different kernels
position1 = observer.get_vector(time=time, origin='barycenter')
for kern in kernels:
spice.furnsh(kern)
else:
position1 = spice.spkpos(location, dt.sec, 'J2000', 'NONE', '0')[0]
position1 = SkyCoord(*position1.T * u.km, representation_type='cartesian')
while True:
# calculates new time
tempo = dt - delt
# calculates vector Solar System Barycenter -> Object
position2 = spice.spkpos(target, tempo.sec, 'J2000', 'NONE', '0')[0]
position2 = SkyCoord(*position2.T * u.km, representation_type='cartesian')
position = position2.cartesian - position1.cartesian
# calculates new light time
delt = (position.norm() / const.c).decompose()
# if difference between new and previous light time is smaller than 0.001 sec, then continue.
if output == 'vector' or np.all(np.absolute(((dt - tempo) - delt).sec) < 0.001):
break
coord = SkyCoord(position, representation_type='cartesian')
spice.kclear()
if output == 'ephemeris':
coord = SkyCoord(ra=coord.spherical.lon, dec=coord.spherical.lat,
distance=coord.spherical.distance, obstime=time)
if not coord.isscalar and len(coord) == 1:
coord = coord[0]
return coord
[docs]
def ephem_horizons(time, target, observer, id_type='smallbody', output='ephemeris'):
"""Calculates the ephemeris from Horizons.
Parameters
----------
time : `str`, `astropy.time.Time`
Reference instant to calculate ephemeris. It can be a string
in the ISO format (yyyy-mm-dd hh:mm:ss.s) or an astropy Time object.
target : `str`
IAU (kernel) code of the target.
observer : `str`
IAU (kernel) code of the observer.
id_type : `str`
Type of target object options: ``smallbody``, ``majorbody`` (planets but
also anything that is not a small body), ``designation``, ``name``,
``asteroid_name``, ``comet_name``, ``id`` (Horizons id number), or
``smallbody`` (find the closest match under any id_type).
output : `str`
The output of data. ``ephemeris`` will output the observed position,
while ``vector`` will output the Cartesian state vector, without
light time correction.
Returns
-------
coord : `astropy.coordinates.SkyCoord`
ICRS coordinate of the target.
Notes
-----
If the interval of time is larger than 30 days or so, a timeout error may be raised.
The maximum interval will depend on the user connection.
"""
import astropy.units as u
from astroquery.jplhorizons import Horizons
from astropy.time import Time
from astropy.coordinates import SkyCoord
from sora.observer import Observer, Spacecraft
from scipy import interpolate
origins = {'geocenter': '@399', 'barycenter': '@0'}
location = origins.get(observer)
if not location and isinstance(observer, str):
location = observer
if isinstance(observer, (Observer, Spacecraft)):
if getattr(observer, "code", None) is None:
location = {'lon': observer.lon.deg, 'lat': observer.lat.deg, 'elevation': observer.height.to(u.km).value}
else:
location = f'{getattr(observer, "code", "")}@{getattr(observer, "spkid", "")}'
if not location:
raise ValueError("observer must be 'geocenter', 'barycenter' or an observer object.")
if output not in ['ephemeris', 'vector']:
raise ValueError("output must be 'ephemeris' or 'vector'")
time = Time(time)
time1 = getattr(time, {'ephemeris': 'utc', 'vector': 'tdb'}[output]).jd
if not time.isscalar and len(time) > 50:
step = '10m'
if time.max() - time.min() > 30 * u.day:
warnings.warn('Time interval may be too long. A timeout error may be raised.')
if time.max() - time.min() <= 1 * u.day:
step = '1m'
time2 = {'start': (time.min() - 10 * u.min).iso.split('.')[0],
'stop': (time.max() + 10 * u.min).iso.split('.')[0],
'step': step,
}
else:
time2 = time1
if getattr(observer, 'ephem', None) not in ['horizons', None]:
warnings.warn('Ephemeris using kernel for the observer and Horizons for the target is under construction. '
'We will use only Horizons.')
id_type = None if id_type == 'majorbody' else id_type
ob = Horizons(id=target, id_type=id_type, location=location, epochs=time2)
if output == 'ephemeris':
eph = ob.ephemerides(extra_precision=True, cache=False)
obstime = Time(eph['datetime_jd'], format='jd', scale='utc')
pos = SkyCoord(eph['RA'], eph['DEC'], eph['delta'], frame='icrs', obstime=obstime)
else:
vec = ob.vectors(refplane='earth', cache=False)
obstime = Time(vec['datetime_jd'], format='jd', scale='tdb')
pos = SkyCoord(*[vec[i] for i in ['x', 'y', 'z']] * u.AU, representation_type='cartesian', obstime=obstime)
if isinstance(time2, dict):
spl_x = interpolate.CubicSpline(obstime.jd, pos.cartesian.x.to(u.km))
spl_y = interpolate.CubicSpline(obstime.jd, pos.cartesian.y.to(u.km))
spl_z = interpolate.CubicSpline(obstime.jd, pos.cartesian.z.to(u.km))
pos = SkyCoord(x=spl_x(time1), y=spl_y(time1), z=spl_z(time1), unit=u.km, representation_type='cartesian')
if output == 'ephemeris':
pos = SkyCoord(ra=pos.spherical.lon, dec=pos.spherical.lat, distance=pos.spherical.distance)
if not pos.isscalar and len(pos) == 1:
pos = pos[0]
return pos