"""
Created on Mon Feb 1 13:36:02 2016
@author: Skevja
This file contains definitions of the functions (not related to the model physics),
that are used later in the main code.
"""
import time
import os
import numpy as np
from collections import namedtuple
import scipy.signal as sgn
from astropy.table import Table
from .constants import tp, tr
from . import localpath
[docs]def read_parameters(path_to_file):
"""
Reads parameters from text file(s).
:param path_to_file: Relative path to the parameter file(s). Main parameter file must have name
``'parameters'`` and the second, optional parameter file, must be called ``'sfrd_peaks_parameters'``.
:type path_to_file: string
:return: Names and values of all parameters given in the parameter file(s).
:rtype: namedtuple
"""
f = open(os.path.join('.',path_to_file),'r')
paramd = {}
for line in f:
try:
# Read parameter name, commented lines are ignored.
key, value = None, None
key = line.split()[0]
if list(key)[0] == '#': pass
else:
# Read parameter value, decompose it into characters
# (parameter values can be not only numbers, but also strings).
value_str = line.split()[1]
value_str_char = list(value_str)
# Interpret input as a string, float or integer.
if "'" in value_str_char:
value = str(value_str[1:-1])
else:
if '.' in value_str_char:
value = float(value_str)
else:
value = int(value_str)
paramd[key]=value
except:
pass
f.close
if ('pkey' in paramd.keys()) and (paramd['pkey']==1) or (paramd['pkey']==2):
message = ("File must have the following parameters organized as columns:\n"+
"sigmap[Msun/pc^2] : SF amplitude-related parameter (for R=Rsun)\n"+
"taup[Gyr] : age of peak center\n"+
"dtaup[Gyr] : dispersion in time\n"+
"Rp[kpc] : peak center at R-axis\n"+
"dRp[kpc] : dispersion in radius\n")
if (paramd['pkey']==1):
message += "sigp[km/s] : W-velocity dispersion of the peak's populations\n"
if os.path.isfile(os.path.join('.','sfrd_peaks_parameters')):
sf_peaks = np.loadtxt(os.path.join('.','sfrd_peaks_parameters'))
if len(sf_peaks)!=0:
if type(sf_peaks[0])==np.float64 or type(sf_peaks[0])==int:
if len(sf_peaks)==6 or len(sf_peaks)==5:
paramd['sigmap'] = np.array([sf_peaks[0]])
paramd['tpk'] = np.array([tp - sf_peaks[1]])
paramd['dtp'] = np.array([sf_peaks[2]])
paramd['Rp'] = np.array([sf_peaks[3]])
paramd['dRp'] = np.array([sf_peaks[4]])
if (paramd['pkey']==1):
paramd['sigp'] = np.array([sf_peaks[5]])
else:
print("\nSome of parameters are missing in 'sfrd_peaks_parameters'! " + message)
else:
if len(sf_peaks[0])==6 or len(sf_peaks[0])==5:
paramd['sigmap'] = sf_peaks[:,0]
paramd['tpk'] = tp - sf_peaks[:,1]
paramd['dtp'] = sf_peaks[:,2]
paramd['Rp'] = sf_peaks[:,3]
paramd['dRp'] = sf_peaks[:,4]
if (paramd['pkey']==1):
paramd['sigp'] = sf_peaks[:,5]
else:
print("\nSome of parameters are missing in 'sfrd_peaks_parameters'! " + message)
else:
print("\nFile 'sfrd_peaks_parameters' is empty, give the parameters! " + message)
else:
print("\nFile 'sfrd_peaks_parameters' not found! Create it or copy to the directory. " + message)
# All parameters are organized as a tuple and can be called as tuple_name.parameter_name
# Non-local parameters
pnames_nonlocal = ['Vsun','Rmin','Rmax','dR','Rd','Rt','Rg1','Rg2','Rg10','Rg20',
'fkey','Rf','Rdf','a_in','Mb','k_td2','k_dzeta','k_eta','Rp','dRp',
'k_FeHd0','k_FeHdp','k_rd','k_q']
pnames_amrd0 = ['FeHdp','rd','q','k_FeHd0','k_FeHdp','k_rd','k_q']
pnames_amrd1 = ['rd1','rd2','Rbr1','Rbr2','Rbr3','k1_FeHdp','b1_FeHdp','k_alphaw','b_alphaw',
'k1_t01','b1_t01','k2_t01','b2_t01','k3_t01','b3_t01',
'k1_t02','b1_t02','k2_t02','b2_t02','k3_t02','b3_t02']
if paramd['run_mode']==0:
paramd_reduced = {}
keys = list(paramd.keys())
for i in range(len(keys)):
if keys[i] not in pnames_nonlocal:
paramd_reduced[keys[i]] = paramd[keys[i]]
paramd = paramd_reduced
else:
if type(paramd['dR'])==int:
paramd['dR'] = float(paramd['dR'])
pnames_imf = ['a0','a1','a2','a3','m0','m1','m2']
if paramd['imfkey']==1:
for i in range(len(pnames_imf)):
del paramd[pnames_imf[i]]
if paramd['fehkey']==0:
for i in range(len(pnames_amrd1)):
try:
paramd[pnames_amrd1[i]]
except: pass
if paramd['fehkey']==1:
try:
for i in range(len(pnames_amrd0)):
del paramd[pnames_amrd0[i]]
except: pass
if paramd['fehkey']==2:
try:
paramd['FeHd0']
for i in range(len(pnames_amrd0)):
paramd[pnames_amrd0[i]]
except: pass
for i in range(len(pnames_amrd1)):
try:
del paramd[pnames_amrd1[i]]
except: pass
k,v = paramd.keys(), paramd.values()
p = namedtuple('p',k)
p = p._make(v)
if paramd['pkey']==0:
nparams = len(paramd.values())
if paramd['pkey']==1:
try:
nparams = len(paramd.values()) + 6*len(paramd['sigmap']-1)
except:
nparams = len(paramd.values()) + 6
if paramd['pkey']==2:
try:
nparams = len(paramd.values()) + 5*len(paramd['sigmap']-1)
except:
nparams = len(paramd.values()) + 5
pnames_technical = ['run_mode','out_dir','out_mode','nprocess','pkey','imfkey','fkey','fehkey']
count_technical = 0
keys = list(paramd.keys())
for i in range(len(keys)):
if keys[i] in pnames_technical:
count_technical += 1
print('\nParameter file(s) : ok.')
print('Number of parameters = ',nparams, ', among them technical = ',count_technical)
print('\n',p)
return p
[docs]def resave_parameters(path_to_parameterfile,path_to_parameterfile_copy,p):
"""
Reads parameters from text file(s) and saves parameter file(s)
to the output folder with only those parameters which are needed for this run
(cleans parameter files from unnecessary input).
:param path_to_parameterfile: Relative path to the parameter file(s).
:type path_to_parameterfile: string
:param path_to_parameterfile_copy: Destination where the file copies should be saved.
:type path_to_parameterfile_copy: string
:param p: Set of model parameters from the parameter file.
:type p: namedtuple
:return: None
"""
pnames_local = ['run_mode','out_dir','out_mode','nprocess','Rsun','zsun','Vsun','zmax','dz',
'sigmad','sigmat','sigmag1','sigmag2','sigmadh','sigmash',
'td1','td2','dzeta','eta','pkey','tt1','tt2','gamma','beta',
'FeHd0','FeHdp','rd','q','dFeHdt','n_FeHdt','FeHt0','FeHtp','rt','t0',
'FeHsh','dFeHsh','n_FeHsh','alpha','sige','sigt','sigdh','sigsh']
if p.pkey==1:
pnames_local.extend(['sigmap','taup','dtaup','sigp'])
if p.pkey==2:
pnames_local.extend(['sigmap','taup','dtaup'])
# Main parameter file
if p.pkey==0:
f = open(os.path.join('.',path_to_parameterfile),'r')
f_out = open(os.path.join('.',path_to_parameterfile_copy),'w')
else:
f = open(os.path.join('.',path_to_parameterfile[0]),'r')
f_out = open(os.path.join('.',path_to_parameterfile_copy[0]),'w')
for line in f:
if line!="\n":
# Read parameter name, commented lines are ignored.
linechar = line.split()[0]
if linechar[0]=='#':
f_out.write(line)
else:
parameter = line.split('\t')[0]
if p.run_mode==0:
if parameter in pnames_local:
f_out.write(line)
else:
f_out.write(line)
else:
f_out.write(line)
f.close
f_out.close
# SFRd-peaks parameter file
if p.run_mode==0 and p.pkey!=0:
f = open(os.path.join('.',path_to_parameterfile[1]),'r')
f_out = open(os.path.join('.',path_to_parameterfile_copy[1]),'w')
for line in f:
if line!="\n":
# Read parameter name, commented lines are ignored.
linechar = line.split()[0]
if linechar[0]=='#':
if linechar[:7]!='#sigmap':
f_out.write(line)
else:
parameters = line.split('\t')
parameter0 = list(parameters[0].split()[0])
parameters[0] = ''.join((parameter0[1:])) # removes '#' char
parameter1 = list(parameters[-1].split()[0])
parameters[-1] = ''.join((parameter1)) # removes '\n' char
ind_reduced = []
for i in range(len(parameters)):
if parameters[i] in pnames_local:
ind_reduced.append(i)
ind_reduced = np.array(ind_reduced,dtype=np.int)
parameters = np.array(parameters)
f_out.write('#' + '\t'.join((parameters[ind_reduced])) + '\n')
else:
parameters = line.split('\t')
parameters = np.array(parameters)
f_out.write('\t'.join((parameters[ind_reduced])))
else:
f_out.write(line)
f.close
f_out.close
[docs]class LogFiles():
"""
Class for manipulations with text files. Includes methods for creating
text files and adding lines to them. Used for writing log files.
"""
[docs] def __init__(self,filename):
"""
Creates an instance of the class.
:param filename: Name of the file.
:type filename: str
"""
self.filename = filename
[docs] def write(self,text):
"""
Creates a file with **filename** (current directory), writes a line, closes the file.
:param text: Text to be added to the file.
:type text: str
:return: None.
"""
f = open(self.filename,'w+')
f.write(text)
f.close()
[docs] def append(self,text):
"""
Opens the file, adds a line, closes the file.
:param text: Text to be added to the file.
:type text: str
:return: None.
"""
f = open(self.filename,'a+')
f.write(text)
f.close()
[docs]class Timer():
"""
Class for measuring time intervals.
"""
[docs] def start(self):
"""
Creates an instance and checks the current time.
"""
return time.time()
[docs] def stop(self,moment):
"""
Returns time interval between the current and some previous
moment. Output in fractions of hours, minutes and seconds.
:param moment: Time record (in the format of output of time.time()).
:type moment: float
:return: Time interval in hms.
:rtype: str
"""
s = round(time.time() - moment,2)
h = int((s/3600)//1)
mi = int(((s/3600-h)*60)//1)
ss = round((((s/3600-h)*60)-mi)*60,3)
return ''.join((str(h),'h ',str(mi),'m ',str(ss),'s'))
[docs]class ConvertAxes():
"""
Interpolation methods.
"""
[docs] def interpolate(self,axis1_new,axis1_old,axis2_old):
"""
Returns new *axis2*-values for a new set of *axis1*-values
given the old (*axis1,axis2*). Uses linear interpolation.
:param axis1_new: New values along *axis1*.
:type axis1_new: array-like
:param axis1_old: Old values along *axis1*.
:type axis1_old: array-like
:param axis2_old: Old values along *axis2*.
:type axis2_old: array-like
:return: New values along *axis2* corresponding to the new *axis1*-values set.
:rtype: 1d-array
"""
pnum = 2 # Number of point used for interpolation
axis2_new = [] # Empty list for new axis2-values
# -----------------------------------------------------------------------------------------
# We find two nearest values to a given new axis1-value in the old axis1-array.
# Then we use these two neighbour points to perform linear interpolation.
# We manually set the slope to zero if points are too close.
# Then a new axis2-value is calculated.
# -----------------------------------------------------------------------------------------
for i in range(len(axis1_new)):
ind = np.argsort(abs(axis1_new[i]-axis1_old))[np.arange(pnum)]
if abs(axis1_old[ind[0]]-axis1_old[ind[1]]) < 0.0000001:
k = 0
else:
k = (axis2_old[ind[0]]-axis2_old[ind[1]])/(axis1_old[ind[0]]-axis1_old[ind[1]])
b = axis2_old[ind[0]] - k*axis1_old[ind[0]]
axis2_new.append(k*axis1_new[i]+b)
return np.array(axis2_new)
[docs] def get_closest(self,axis1_new,axis1_old,axis2_old):
"""
No interpolation is applied, the new *axis1*-values are
replaced by the closest values from the old *axis1*-array
and the corresponding old *axis2*-values are returned. This
method is good when the resolution of the old *axis1*-array
is much better than the resolution of the new *axis1*-array.
:param axis1_new: New values along *axis1*.
:type axis1_new: array-like
:param axis1_old: Old values along *axis1*.
:type axis1_old: array-like
:param axis2_old: Old values along *axis2*.
:type axis2_old: array-like
:return: New values along *axis2* corresponding to the new *axis1*-values set.
:rtype: 1d-array
"""
axis2_new = []
for i in range(len(axis1_new)):
ind = np.argsort(abs(axis1_new[i]-axis1_old))
axis2_new.append(axis2_old[ind[0]])
return np.array(axis2_new)
[docs]def rebin_histogram(bin_edges,x_centers,counts):
"""
Uses existing histogram to create a new histogram for the
different set of x-bins, such that the overall counts are
conserved.
:param bin_edges: Edges of the new x-bins.
:type bin_edges: array-like
:param x_centers: Centers of the old x-bins.
:type x_centers: array-like
:param counts: Histogram counts corresponding to the old x-bin centers.
:type counts: array-like
:return: New counts corresponding to the new x-bins (``len(bin_edges)-1``).
:rtype: 1d-array
"""
n = len(bin_edges) - 1 # Number of new bins
bin_width = np.diff(bin_edges)[0] # New bin width
dx = np.diff(x_centers)[0] # Old bin width
hist = np.zeros((n))
# ---------------------------------------------------------------------------------------------
# Firstly, we select all old bins falling into a new bin, at least partially.
# Then we calculate how far the outer boundary of the old bin lays within
# the range of the new bin, and assign weights to all selected old bins.
# Case 1: Old bin not fully entered the new bin.
# Case 2: Old bin partly left the new bin
# Case 3: For all old bins that are fully inside the new bin weights remain 1.
# ---------------------------------------------------------------------------------------------
for i in range(n):
ind = np.where((x_centers + dx/2 > bin_edges[i])&(x_centers - dx/2 <= bin_edges[i+1]))[0]
weights = np.linspace(1,1,len(ind)) # Case 3
frac = x_centers[ind] + dx/2 - bin_edges[i]
weights[frac < dx] = frac[frac < dx]/dx # Case 1
weights[frac > bin_width] = (dx - frac[frac > bin_width] + bin_width)/dx # Case 2
hist[i] = np.sum(counts[ind]*weights)
return hist
def _transition_2curves_(epsilon,x_break,x,y):
"""
This is a crazy routine of my own invention, that allows to
make a smooth `naturally looking` transition between two curves.
It could be a fragmet of cycloid or hyperbola, but according
to my tests, they don't always look good. This function is
applied only once in this package, in funcs.heffr and only if
the thin-disk flaring is swithed on. Then this routine makes
a smooth transition between the straight line and exponent.
If you know any better way to do this, you're welcome to
improve the code, but so far it is not really needed as this
function does the job. Just be careful if you decide to apply
this function for your own purposes, plot the outcome to check
what it does.
Parameters
----------
epsilon : scalar
Half-width of the window, where the smooth transition will
be constructed. Units of epsilon are the same as units of x.
x_break : scalar
x-coordinate of a break point.
x : array_like
x-coordinates.
y : array_like
y-coordinates.
Returns
-------
y_smooth : ndarray
y-coordinates with a smooth transition in [x_break-epsilon,
x_break+epsilon].
"""
if np.mean(np.diff(x)) > epsilon/2:
dx = epsilon/10
ind_br = np.where(abs((x-x_break))==np.amin(abs((x-x_break))))[0][0]
x1 = np.arange(x[0],x_break,dx)
x2 = np.arange(x_break,x[-1]+dx,dx)
ax = ConvertAxes()
y1 = ax.interpolate(x1,x[:ind_br],y[:ind_br])
y2 = ax.interpolate(x2,x[ind_br:],y[ind_br:])
X = np.concatenate((x1,x2),axis=0)
Y = np.concatenate((y1,y2),axis=0)
else:
X = np.copy(x)
Y = np.copy(y)
x_smooth_start = x_break - epsilon
x_smooth_stop = x_break + epsilon
if (x[0] > x_break - 3*epsilon):
x_2en = x[0]
print('Warning: (Rdf-Rmin)/epsilon < 3, smoothing algorithm may fail.\
Decrease Rmin or epsilon for a given Rf.')
else:
x_2en = x_break - 3*epsilon
if (x[-1] < x_break + 3*epsilon):
x_2ep = x[-1]
print('Warning: (Rmax-Rdf)/epsilon < 3, smoothing algorithm may fail.\
Increase Rmax or decrease epsilon for a given Rf.')
else:
x_2ep = x_break + 3*epsilon
ind_sm1 = np.where(abs((X-x_smooth_start))==np.amin(abs((X-x_smooth_start))))[0][0]
ind_sm2 = np.where(abs((X-x_smooth_stop))==np.amin(abs((X-x_smooth_stop))))[0][0]
ind_2en = np.where(abs((X-x_2en))==np.amin(abs((X-x_2en))))[0][0]
ind_2ep = np.where(abs((X-x_2ep))==np.amin(abs((X-x_2ep))))[0][0]
y_smooth_start = Y[ind_sm1]
y_smooth_stop = Y[ind_sm2]
xx = np.concatenate((X[ind_2en:ind_sm1],X[ind_sm2:ind_2ep]),axis=-1)
yy = np.concatenate((Y[ind_2en:ind_sm1],Y[ind_sm2:ind_2ep]),axis=-1)
pp11 = np.poly1d(np.polyfit(xx,yy,2))
pp12 = np.poly1d(np.polyfit(xx,yy,6))
def pp1(xp):
return np.mean([pp11(xp),pp12(xp)])
smooth = [(x_smooth_stop-i)/2/epsilon*pp11(i) + (1 - (x_smooth_stop-i)/2/epsilon)*pp12(i)
for i in X[ind_sm1:ind_sm2+1]]
shift_low1 = smooth[0] - y_smooth_start
shift_low2 = smooth[-1] - y_smooth_stop
correction_low = np.zeros((ind_sm2-ind_sm1+1))
correction_low[:int((ind_sm2-ind_sm1)/2)] =\
[shift_low1*(i-X[int(ind_sm1+(ind_sm2-ind_sm1)/2)])/(x_smooth_start-\
X[int(ind_sm1+(ind_sm2-ind_sm1)/2)]) for i in X[ind_sm1:int(ind_sm1+(ind_sm2-ind_sm1)/2)]]
correction_low[int((ind_sm2-ind_sm1)/2):] =\
[shift_low2*(i-X[int(ind_sm1+(ind_sm2-ind_sm1)/2)])/(x_smooth_stop-\
X[int(ind_sm1+(ind_sm2-ind_sm1)/2)]) for i in X[int(ind_sm1+(ind_sm2-ind_sm1)/2):ind_sm2+1]]
y_smooth = np.concatenate((Y[:ind_sm1],smooth-correction_low,Y[ind_sm2+1:]),axis=-1)
if len(X)!=len(x):
y_smooth = ax.interpolate(x,X,y_smooth)
return y_smooth
[docs]def cumhist_smooth_savgol(y,m,n):
"""
Smoothes a normalized cumulative distribution of some quantity
with the Savitzky-Golay filter.
:param y: y-coordinates of the normalized cumulative distribution.
:type y: array-like
:param m: Parameter window_length of scipy.signal.savgol_filter.
:type m: int
:param n: Parameter polyorder of scipy.signal.savgol_filter.
:type n: int
:return y_sm: Smoothed y.
:rtype: 1d-array
"""
dn = 3*m
y_long = np.concatenate((np.linspace(0,0,dn),y,np.linspace(1,1,dn)),axis=-1)
y_sm = sgn.savgol_filter(y_long,m,n)
y_sm = y_sm[dn:-dn]
y_sm[y_sm<0] = 0
y_sm[y_sm>1] = 1
return y_sm
def _rotation_matrix_(axis, theta):
"""
Returns the rotation matrix associated with counterclockwise rotation about
the given axis by theta radians.
"""
axis = np.asarray(axis)
axis = axis/np.sqrt(np.dot(axis, axis))
a = np.cos(theta/2.0)
b, c, d = -axis*np.sin(theta/2.0)
aa, bb, cc, dd = a * a, b * b, c * c, d * d
bc, ad, ac, ab, bd, cd = b * c, a * d, a * c, a * b, b * d, c * d
return np.array([[aa + bb - cc - dd, 2 * (bc + ad), 2 * (bd - ac)],
[2 * (bc - ad), aa + cc - bb - dd, 2 * (cd + ab)],
[2 * (bd + ac), 2 * (cd - ab), aa + dd - bb - cc]])
[docs]def gauss_weights(x,mean,sigma):
"""
Draws instances from the Gaussian PDF.
:param x: Set of points, where probability density (PD) has to be calculated.
:type x: array-like
:param mean: Mean of the Gaussian distribution.
:type mean: scalar
:param sigma: Standard deviation of the Gaussian distribution.
:type sigma: scalar
:return: PD values at x, normalized to unity.
:rtype: array-like
"""
weights = [np.exp(-(i-mean)**2/2/sigma**2) for i in x]
weights = weights/np.sum(weights)
return weights
[docs]def reduce_table(tab,a):
"""
Calculates the surface number density of the mono-age subpopulations, thus,
reduces the length of the 'stellar assemblies' table to the number of thin- or thick-disk,
or halo subpopulations.
:param tab: Isochrone columns.
:type tab: dict
:param a: Collection of the fixed model parameters, useful quantities, and arrays.
:type a: namedtuple
:return: Table with two columns: ``'t'`` and ``'N'``, Galactic time (Gyr)
and surface number densities (:math:`\mathrm{number \ pc^{-2}}`).
:rtype: astropy Table
"""
timebins = np.arange(0,tp+tr,tr)
time = np.subtract(tp,tab['age'])
out = Table()
out['t'], out['N'], out['Sigma'] = a.t, np.zeros((a.jd)), np.zeros((a.jd))
c = 0
for i in range(a.jd):
ind = np.where((time>=timebins[i]) & (time<timebins[i+1]))[0]
c+=len(ind)
if ind!=[]:
out['N'][i] = np.sum(np.array(tab['N'][ind]))
out['Sigma'][i] = np.sum(np.array(tab['N'][ind]*tab['Mf'][ind]))
return out
[docs]def convolve2d_gauss(array,dxy_smooth,xy_range):
"""
Smoothing a 2d-array with a Gaussian kernel.
:param array: Initial array.
:type array: 2d-array.
:param dxy_smooth: Dispersions in x and y, [*dx,dy*].
:type dxy_smooth: list
:param xy_range: Min and max values along the array axis,
[[*xmin,xmax*],[*ymin,ymax*]].
:type xy_range: list[list]
:return: New smoothed array of the same shape.
:rtype: 2d-array
"""
dx, dy = dxy_smooth
[xmin,xmax],[ymin,ymax] = xy_range
x_step = (xmax-xmin)/array.shape[0]
y_step = (ymax-ymin)/array.shape[1]
k_dx = int(round(dx/x_step,0))
k_dy = int(round(dy/y_step,0))
if k_dx%2==0:
k_dx = k_dx - 1
if k_dy%2==0:
k_dy = k_dy - 1
K = np.ones((k_dx,k_dy),np.float32)
K_norm = K/np.sum(K)
array = sgn.convolve2d(array,K_norm,boundary='symm',mode='same')
return array
[docs]def convolve1d_gauss(array,dx_smooth,x_range):
"""
Smoothing a 1d-array with a Gaussian kernel.
:param array: Initial array.
:type array: 1d-array.
:param dx_smooth: Dispersion in x.
:type dx_smooth: scalar
:param x_range: Min and max values along the array axes, [*xmin,xmax*].
:type x_range: list
:return: New smoothed array of the same shape.
:rtype: 1d-array
"""
xmin, xmax = x_range
x_step = (xmax-xmin)/len(array)
k_dx = int(round(dx_smooth/x_step,0))
if k_dx%2==0:
k_dx = k_dx - 1
K = np.ones(k_dx,np.float32)
K_norm = K/np.sum(K)
array = sgn.convolve(array,K_norm,mode='same')
return array