import os
import numpy as np
import pkg_resources
from scipy import ndimage
from scipy.interpolate import interp2d
from astropy.visualization import AsinhStretch, ImageNormalize
from astropy.io import fits
from astropy import constants as c
import pandas as pd
import matplotlib.pyplot as plt
# create a pandas data frame of the source information
sources = pd.read_csv(
pkg_resources.resource_filename(__name__, os.path.join('data', 'DSHARP_sources.csv')),
skipinitialspace=True)
sources.index = pd.Index([name.replace(' ', '') for name in sources.index])
[docs]def purge_data(disks=None):
"""Remove the downloaded DSHARP data files from the package.
Keywords:
---------
disks : None | str | list
can be None, then all are removed
can be a str, then the data of that disk is removed
can be a list of str, then the data for those disks is removed
"""
if disks is None:
disks = [k for k in sources.index]
else:
if type(disks) == str:
disks = [disks]
elif type(disks) != list:
raise TypeError('disks must be None or a disk name (str) or a list of those')
for disk in disks:
disk = disk.replace(' ', '')
for fname in ['{}_continuum.fits', '{}_CO.fits', '{}.profile.txt', '{}.SED.txt']:
fname = fname.format(disk)
fullpath = pkg_resources.resource_filename(__name__, os.path.join('data', fname))
if os.path.isfile(fullpath):
print('Deleting {}'.format(fname))
os.unlink(fullpath)
[docs]def download_disk(fname, type='image', authenticate=False):
"""
Download the specified disk from the project website (password protected).
Arguments:
----------
fname : str
file name, such as 'AS209_continuum.fits'
Keywords:
---------
type : str
specifies the type of file such as image, profile, or SED.
authenticate : bool
wether or not authentication should be used (asking for username/passw.)
"""
if type == 'image':
url = 'https://almascience.eso.org/almadata/lp/DSHARP/images/' + fname
elif type == 'profile':
url = 'https://almascience.eso.org/almadata/lp/DSHARP/profiles/' + fname
elif type == 'SED':
url = 'https://almascience.eso.org/almadata/lp/DSHARP/SEDs/' + fname
else:
raise ValueError('type must be image, profile, or SED!')
fullpath = pkg_resources.resource_filename(__name__, os.path.join('data', fname))
if not os.path.isfile(fullpath):
import requests
import getpass
if authenticate:
username = getpass.getpass(prompt='Username: ')
password = getpass.getpass(prompt='Password: ')
req = requests.get(url, auth=(username, password), stream=True)
else:
req = requests.get(url, stream=True)
print('Downloading file \'{}\' ... '.format(fname), end='', flush=True)
with open(fullpath, 'wb') as f:
for chunk in req.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
print('Done!')
else:
print('file already exists, will not download: {}'.format(fname))
[docs]def get_datafile(disk, suffix='continuum', type='image', side='N'):
"""
Get a path to the local data file, download it if it's not present.
Arguments:
----------
disk : str
disk name such as 'AS 209', ...
Keywords:
---------
suffix : str
specifies the version of the image, possibilities include
'continuum', 'CO'
type : str
can be 'profile', 'image', or 'SED'
"""
disk_name = disk.replace(' ', '')
if type == 'image':
fname = '{}_{}.fits'.format(disk_name, suffix)
elif type == 'profile':
if disk_name == 'AS205':
fname = '{}_{}.profile.txt'.format(disk_name, side)
else:
fname = '{}.profile.txt'.format(disk_name)
elif type == 'SED':
fname = '{}.SED.txt'.format(disk_name)
else:
raise ValueError('type must be image, profile, or SED!')
fullpath = pkg_resources.resource_filename(__name__, os.path.join('data', fname))
if not os.path.isfile(fullpath):
download_disk(fname, type=type)
return fullpath
[docs]def I_nu_from_T_b(T_b, lam_obs=0.125):
"Calculate Intensity from brightness temperature"
c_light = c.c.cgs.value
nu_obs = c_light / lam_obs
return 2 * nu_obs**2 * c.k_B.cgs.value * T_b / c_light**2
[docs]def get_profile(disk, side='N'):
"""Get the dsharp radial profile.
In the original data release, the profiles calculated the grid in
arcseconds incorrectly.
Parameters:
----------
disk : str
name of disk
"""
fname = get_datafile(disk, type='profile')
data = np.loadtxt(fname)
# radius in au
r_au = data[:, 0]
# with the correct radius in au, we can fix the incorrect radius in arcsec
r_as = r_au / sources.loc[disk]['distance [pc]']
# intensity in brightness temperature
T_b = data[:, 4]
dT_b = data[:, 5] # uncertainty on T_b
# convert to intensity in CGS
I_nu = I_nu_from_T_b(T_b)
I_nu_u = I_nu_from_T_b(T_b + dT_b)
I_nu_l = I_nu_from_T_b(T_b - dT_b)
return {
'r_as': r_as,
'I_nu': I_nu,
'I_nu_u': I_nu_u,
'I_nu_l': I_nu_l,
'data': data
}
[docs]def get_sed(disk):
"""
Parameters:
----------
disk : str
name of disk
"""
fname = get_datafile(disk, type='SED')
mask = np.ones(5, dtype=bool)
mask[-1] = False
data = np.loadtxt(fname, usecols=np.arange(4))
with open(fname) as fid:
header = ''.join([line for line in fid if line.startswith('#')])
references = np.loadtxt(fname, usecols=4, dtype=str)
return {
'data': data,
'header': header,
'references': references
}
[docs]def plot_profile(disk):
"""Plot DSHARP radial profile.
Parameters
----------
disk : str
name of disk
Returns
-------
Returns figure and axes object
"""
data = get_profile(disk)
r_as = data['r_as']
I_nu = data['I_nu']
I_nu_u = data['I_nu_u']
I_nu_l = data['I_nu_l']
f, ax = plt.subplots()
ax.semilogy(r_as, I_nu)
ax.fill_between(r_as, I_nu_l, I_nu_u, color='r', alpha=0.5)
ax.set_ylim(0.5 * I_nu.min(), 1.5 * I_nu.max())
ax.set_xlabel(r'radius [arcsec]')
ax.set_ylabel(r'$I_\nu$ [erg / (s cm$^2$ Hz sr)]')
return f, ax
[docs]def plot_sed(disk):
"""Plot SED of DSHARP disk.
Parameters
----------
disk : str
name of disk
Returns
-------
Returns figure and axes object
"""
data = get_sed(disk)['data']
lam_mic = data[:, 0]
flux_dens = data[:, 1]
df_stat = data[:, 2]
df_sys = flux_dens * data[:, 3]
error = (df_stat**2 + df_sys**2)**0.5
f, ax = plt.subplots()
ax.errorbar(lam_mic, flux_dens, yerr=error, ls='', marker='o', barsabove=True)
ax.set_xscale('log')
ax.set_yscale('log')
ax.set_xlabel(r'wavelength [micron]')
ax.set_ylabel(r'$F_\nu$ [Jy]')
return f, ax
[docs]def plot_DHSARP_continuum(
disk='HD 163296', fname=None, cmap='inferno', dpc=None,
ax=None, range=200, p0=[0, 0], **kwargs):
"""
Plot the specified disk where the version is given by the suffix
Arguments:
----------
disk : str
disk name such as 'AS 209', ...
Keywords:
---------
suffix : str
'continuum'
fname : str
to set a specific, possibly non-DSHARP filename
dpc : float
if set it is used as distance in parsec, otherwise it's read from
the LP_sources data or from input
ax : None | axes
in which axes to plot - if none, will create new figure
p0 : list of length 2
shift of image center in au
range : float
plotting range around the center
cmap : cmap | string
what's passed to pcolormesh as cmap
title : None | string
if given, overwrites title plotted in the figure, which defaults to
the name of the disk
Other keywords are passed to plot_fits
Output:
-------
returns figure and axes handle
"""
if fname is None:
source = sources.loc[disk]
dpc = dpc or source['distance [pc]']
fname = get_datafile(disk)
else:
while dpc is None:
try:
dpc = float(input('distance in PC for {}: '.format(fname)))
except ValueError:
dpc = None
with fits.open(fname) as hdulist:
header = hdulist[0].header
pixel_size_x = header['CDELT1'] * 3600. * 1e3
pixel_size_y = header['CDELT2'] * 3600. * 1e3
disk = kwargs.pop('title', disk)
fig, ax = plot_fits(
fname=fname, ax=ax, cmap=cmap, range=range, p0=p0,
pixel_size_x=pixel_size_x, pixel_size_y=pixel_size_y, dpc=dpc,
title=disk, **kwargs)
return fig, ax
[docs]def plot_fits(
fname, ax=None, cmap='inferno', range=None, p0=[0, 0], pixel_size_x=None,
pixel_size_y=None, dpc=None, vmin=None, vmax=None, rsqaure=False, n_up=None,
title=None, coronagraph_mask=None, fct='pcolormesh', beam=None,
autoshift=False, PA=None, stretch=AsinhStretch(), image_fct=None):
"""
fname : float
path to file
ax : None | axes
where to plot the figure, create if None
cmap : colormap
which colormap to pass to pcolormesh
range : None | float
which range in mas (or au if dpc given) to plot around the center
mask : None | float
what size of center circle to plot
p0 : list of two floats
where the center is located in mas
pixel_size : float | None
size of a pixel in mas
set to None to try and read it from the fits file
dpc : float
distance in parsec
vmin, vmax : None | float
which lower and upper bound to use
will figure something it out if `None`
rsquare : bool
if true, multiply the intensity with r**2
n_up : None | int
if int: upscale the image to that scale
title : str
title to plot in top left corner
coronagraph_mask : None | float
size of central circle to e.g. cover coronograph region
in mas (or au if dpc given)
beam : list
beam size (FWHM) for convolution in mas
fct : str
which bound method of the axes object to use for plotting.
For transparent pdfs, it's for example better to use the slower pcolor
while pcolormesh is much faster.
autoshift : bool
to put the brightest pixel in the center
PA : None | float
if float: rotate by this amount
stretch : astropy.visualization.BaseStretch instance
use LinearStretch for linear scaling, default for the
DSHARP images is AsinhStretch
image_fct : None | callable
pass a function with signature fct(x, y, image) that will be called
to process the image.
"""
from scipy.ndimage import rotate
if ax is None:
fig, ax = plt.subplots()
else:
fig = ax.figure
hdulist = fits.open(fname)
header = hdulist[0].header
Snu = np.squeeze(hdulist[0].data)
Snu[np.isnan(Snu)] = 0.0
if pixel_size_x is None:
if 'CDELT1' in header:
pixel_size_x = header['CDELT1'] * 3600. * 1e3
else:
pixel_size_x = 1
if pixel_size_y is None:
if 'CDELT2' in header:
pixel_size_y = header['CDELT2'] * 3600. * 1e3
else:
pixel_size_y = 1
if PA is not None:
Snu = rotate(Snu, PA, reshape=False, mode='constant', cval=0.0)
x = np.arange(Snu.shape[0], dtype=float) * abs(pixel_size_x) # in mas
y = np.arange(Snu.shape[1], dtype=float) * abs(pixel_size_y) # in mas
if dpc is not None:
x *= dpc * 1e-3
y *= dpc * 1e-3
if beam is not None:
beam = np.array(beam) * dpc * 1e-3
x -= x[-1] / 2.0
y -= y[-1] / 2.0
x -= p0[0]
y -= p0[1]
if autoshift:
cy, cx = np.unravel_index(Snu.argmax(), Snu.shape)
x -= x[cx]
y -= y[cy]
if range is not None:
ix0 = np.abs(x + range).argmin()
ix1 = np.abs(x - range).argmin()
iy0 = np.abs(y + range).argmin()
iy1 = np.abs(y - range).argmin()
x = x[ix0:ix1 + 1]
y = y[iy0:iy1 + 1]
Snu = Snu[iy0:iy1 + 1, ix0:ix1 + 1]
if n_up is not None:
print(f'scaling from {Snu.shape} to ({n_up},{n_up})')
f = interp2d(x, y, Snu)
x = np.linspace(x[0], x[-1], n_up)
y = np.linspace(y[0], y[-1], n_up)
Snu = np.maximum(0.0, f(x, y))
std = Snu[:20, :20].std()
if vmin is None:
vmin = 1.5 * std
if vmax is None:
# vmax = 100 * std
vmax = 0.75 * Snu.max()
print('{}: vmin = {:.2g}, vmax = {:.2g}'.format(title, vmin, vmax))
norm = ImageNormalize(vmin=vmin, vmax=vmax, stretch=stretch)
if beam is not None:
print('beam = {}'.format(beam))
sigma = beam / (2 * np.sqrt(2 * np.log(2)))
Snu = ndimage.gaussian_filter(Snu, sigma)
if image_fct is not None:
Snu = image_fct(x, y, Snu)
getattr(ax, fct)(
-x, y, Snu, cmap=cmap, norm=norm, rasterized=True,
# edgecolor=(1.0, 1.0, 1.0, 0.3), linewidth=0.0015625
)
ax.set_aspect('equal')
if dpc is None:
ax.set_xlabel(r'$\Delta$RA [mas]')
ax.set_ylabel(r'$\Delta$DEC [mas]')
else:
ax.set_xlabel(r'$\Delta$RA [au]')
ax.set_ylabel(r'$\Delta$DEC [au]')
if range is not None:
ax.set_xlim([range, -range])
ax.set_ylim([-range, range])
else:
ax.set_xlim(ax.get_xlim()[::-1])
if title is not None:
ax.text(0.05, 0.95, title, color='w', transform=ax.transAxes, verticalalignment='top')
if coronagraph_mask is not None:
ax.add_artist(plt.Circle((0, 0), radius=coronagraph_mask, color='0.5'))
ax.set_facecolor('k')
return fig, ax