提交 f0727e53 编辑于 作者: Niu Liu's avatar Niu Liu
浏览文件

Remove vsh

上级 842ca8c1
#!/usr/bin/env ipython
# -*- coding: utf-8 -*-
# File name: __init__.py
"""
Created on Mon Feb 7 14:52:46 2022
@author: Neo(niu.liu@nju.edu.cn)
"""
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File name: auto_elim.py
"""
Created on Thu Dec 24 10:35:28 2020
@author: Neo(niu.liu@nju.edu.cn)
This script contains code for auto-elimination.
"""
import numpy as np
# My progs
from .stats_func import calc_nor_sep
from .matrix_calc import (nor_eq_sol_from_cache, nor_eq_sol,
residual_calc_from_cache,
cache_mat_calc, rm_cache_mat)
# ----------------------------- Function -----------------------------
def extract_data(mask, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad, ra_dc_cor=None):
"""Get a clean sample based on mask
Parameters
----------
mask : array of boolean
mask for extract data
dra/ddc : array of float
R.A.(*cos(Dec.))/Dec. differences
dra_err/ddc_err : array of float
formal uncertainty of dra(*cos(dc_rad))/ddc
ra_rad/dc_rad : array of float
Right ascension/Declination in radian
Returns
----------
dra_new/ddc_new: array of float
R.A.(*cos(Dec.))/Dec for the clean sample. differences
dra_err_new/ddc_err_new: array of float
formal uncertainty of dra(*cos(dc_rad))/ddc for the clean sample
ra_rad_new/dc_rad_new: array of float
Right ascension/Declination in radian for the clean sample
ra_dc_cor_new: array of float
covariance/correlation coefficient between dra and ddc for the clean sample
"""
# Extract the clean sample
dra_new, ddc_new = dra[mask], ddc[mask]
dra_err_new, ddc_err_new = dra_err[mask], ddc_err[mask]
ra_rad_new, dc_rad_new = ra_rad[mask], dc_rad[mask]
if ra_dc_cor is None:
ra_dc_cor_new = ra_dc_cor
else:
ra_dc_cor_new = ra_dc_cor[mask]
return dra_new, ddc_new, dra_err_new, ddc_err_new, ra_rad_new, dc_rad_new, ra_dc_cor_new
def elim_on_nor_sep(clip_limit, dra, ddc, dra_err, ddc_err, ra_dc_cor=None):
"""Get a clean sample based on normalized separation
Parameters
----------
dra/ddc: array of float
R.A.(*cos(Dec.))/Dec. differences
dra_err/ddc_err: array of float
formal uncertainty of dra(*cos(dc_rad))/ddc
ra_rad/dc_rad: array of float
Right ascension/Declination in radian
clip_limit: int ot float
maximum normalized separation for clipping data
ra_dc_cor: array of float
correlation coefficient between dra and ddc, default is None
Returns
----------
dra_new/ddc_new: array of float
R.A.(*cos(Dec.))/Dec for the clean sample. differences
dra_err_new/ddc_err_new: array of float
formal uncertainty of dra(*cos(dc_rad))/ddc for the clean sample
ra_rad_new/dc_rad_new: array of float
Right ascension/Declination in radian for the clean sample
ra_dc_cor_new: array of float
covariance/correlation coefficient between dra and ddc for the clean sample
"""
# Calculate normalized separation
nor_sep = calc_nor_sep(dra, dra_err, ddc, ddc_err, ra_dc_cor)
med_nor_sep = np.median(nor_sep)
max_nor_sep = clip_limit * med_nor_sep
# Constraint on normalized separation
mask = (nor_sep <= max_nor_sep)
return max_nor_sep, mask
def auto_elim_vsh_fit(clip_limit, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad,
ra_dc_cor=None, l_max=1, fit_type="full", num_iter=100):
""" Fit the VSH parameters with auto-elimination
Parameters
----------
clip_limit: float
thershold on normalized separation for auto-elimination
dra/ddc: array of float
R.A.(*cos(Dec.))/Dec. differences
dra_err/ddc_err: array of float
formal uncertainty of dra(*cos(dc_rad))/ddc
ra_rad/dc_rad: array of float
Right ascension/Declination in radian
ra_dc_cor: array of float
correlation coefficient between dra and ddc, default is None
l_max: int
maximum degree
fit_type: string
flag to determine which parameters to be fitted
full for T - and S-vectors both
T for T-vectors only
S for S-vectors only
pos_in_rad: Boolean
tell if positions are given in radian, mostly False
num_iter: int
number of source once processed. 100 should be fine
Returns
----------
pmt: array of float
estimation of(d1, d2, d3, r1, r2, r3)
sig: array of float
uncertainty of x
cor_mat: matrix
matrix of correlation coefficient.
"""
print("==================== Auto-elimination ====================")
print(" Nb_iteration Nb_sources Nb_outliers Threshold")
print(" {:11d} {:9d} {:9d} {:9.3f}".format(0, len(dra), 0, 0))
# mask1 = All True
mask1 = np.full(len(dra), True)
iter_count = 0
# first elimination
max_nor_sep2, mask2 = elim_on_nor_sep(
clip_limit, dra, ddc, dra_err, ddc_err, ra_dc_cor)
# Generate cache matrix
suffix_array = cache_mat_calc(
dra, ddc, dra_err, ddc_err, ra_rad, dc_rad,
ra_dc_cor=ra_dc_cor, l_max=l_max, fit_type=fit_type, num_iter=num_iter)
while np.any(mask1 != mask2):
# Renew the flags
max_nor_sep1, mask1 = max_nor_sep2, mask2
# Generate a clean sample
[dra1, ddc1, dra_err1, ddc_err1, ra_rad1, dc_rad1, ra_dc_cor1] = extract_data(
mask1, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad, ra_dc_cor)
iter_count += 1
print(" {:11d} {:9d} {:9d} {:9.3f}".format(
iter_count, len(dra1), len(dra)-len(dra1), max_nor_sep1))
pmt, sig, cor_mat = nor_eq_sol(
dra1, ddc1, dra_err1, ddc_err1, ra_rad1, dc_rad1,
ra_dc_cor=ra_dc_cor1, l_max=l_max, fit_type=fit_type,
num_iter=num_iter, calc_res=False)
# Calculate residuals for all sources
dra_r, ddc_r = residual_calc_from_cache(
dra, ddc, pmt, suffix_array)
# Re-eliminate the data
max_nor_sep2, mask2 = elim_on_nor_sep(
clip_limit, dra_r, ddc_r, dra_err, ddc_err, ra_dc_cor)
# Record the mask
mask = mask1
# If no source is classified as outlier
if iter_count == 0:
pmt, sig, cor_mat, dra_r, ddc_r = nor_eq_sol_from_cache(
dra, ddc, suffix_array, num_iter=num_iter)
# Remove cache file
rm_cache_mat(suffix_array)
return pmt, sig, cor_mat, dra_r, ddc_r, mask
def std_elim_vsh_fit(x_limit, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad,
ra_dc_cor=None, l_max=1, fit_type="full", num_iter=100):
"""Fit the VSH parameters with standard-elimination
Parameters
----------
x_limit: float
thershold on normalized separation for std-elimination
dra/ddc: array of float
R.A.(*cos(Dec.))/Dec. differences
dra_err/ddc_err: array of float
formal uncertainty of dra(*cos(dc_rad))/ddc
ra_rad/dc_rad: array of float
Right ascension/Declination in radian
ra_dc_cor: array of float
correlation coefficient between dra and ddc, default is None
l_max: int
maximum degree
fit_type: string
flag to determine which parameters to be fitted
full for T - and S-vectors both
T for T-vectors only
S for S-vectors only
pos_in_rad: Boolean
tell if positions are given in radian, mostly False
num_iter: int
number of source once processed. 100 should be fine
Returns
----------
pmt: array of float
estimation of(d1, d2, d3, r1, r2, r3)
sig: array of float
uncertainty of x
cor_mat : matrix
matrix of correlation coefficient.
mask : array-like of boolean
flag for the clean sample
"""
# Generate cache matrix
suffix_array = cache_mat_calc(
dra, ddc, dra_err, ddc_err, ra_rad, dc_rad,
ra_dc_cor=ra_dc_cor, l_max=l_max, fit_type=fit_type, num_iter=num_iter)
# Calculate normalized separation
nor_sep = calc_nor_sep(dra, dra_err, ddc, ddc_err, ra_dc_cor)
mask = (nor_sep <= x_limit)
# Generate a clean sample
[dra1, ddc1, dra_err1, ddc_err1, ra_rad1, dc_rad1, ra_dc_cor1] = extract_data(
mask, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad, ra_dc_cor)
print("==================== Standard-elimination ====================")
print(" Nb_sources Nb_outliers Threshold")
print(" {:9d} {:9d} {:9.3f}".format(
len(dra1), len(dra)-len(dra1), x_limit))
pmt, sig, cor_mat = nor_eq_sol(
dra1, ddc1, dra_err1, ddc_err1, ra_rad1, dc_rad1,
ra_dc_cor=ra_dc_cor1, l_max=l_max, fit_type=fit_type,
num_iter=num_iter, calc_res=False)
# Calculate residuals for all sources
dra_r, ddc_r = residual_calc_from_cache(
dra, ddc, pmt, suffix_array)
# Remove cache file
rm_cache_mat(suffix_array)
return pmt, sig, cor_mat, dra_r, ddc_r, mask
def main():
"""Maybe add some tests here
"""
print("Have a nice day!")
if __name__ == "__main__":
main()
# --------------------------------- END --------------------------------
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File name: bootstrap_sample.py
"""
Created on Mon Mar 29 15:55:07 2021
@author: Neo(niu.liu@nju.edu.cn)
Use the nonparameter bootstrap resampling to estimate the confidence level of
VSH parameters.
"""
import numpy as np
from .auto_elim import extract_data
from .matrix_calc import nor_eq_sol
from .pmt_convert import convert_ts_to_rgq
def bs_freq(X=None):
"""Determine bootstrap resampling frequency for sample X
N ~ n * ln(n)^2, n = len(X)
Reference:
Feigelson, E., & Babu, G. (2012). Modern Statistical Methods for Astronomy:
With R Applications. Cambridge: Cambridge University Press. P 55.
doi:10.1017/CBO9781139015653
Parameter
---------
X : array_like, optional
sample to be resampled
Return
------
N : int
resampling frequency
"""
print("Decide the frequency of bootstrap resamplings")
if X == None:
print("Since no sample is given, I will use the default value (1000)")
N = 1000
else:
n = len(X)
N = int(n * np.log(n)**2)
print("Sample size n={}, as a result, the resampling frequency is "
"N ~ n * ln(n)^2 = {}".format(n, N))
return N
def bs_formal_error(x, xi):
"""Estimate formal error (Std)
Parameters
----------
x : float
estimation
xi : array-like, float
estimations from bootstrap resampling
Returns
-------
xi_mean : float
mean value of xi
xi_std : float
standard deviation of xi
x_std : float
RMS of xi with respect to x
"""
xi_mean = np.mean(xi)
xi_std = np.std(xi)
xi_rms = np.sqrt(np.sum((xi-x)**2)/(len(xi)-1))
# Another way to estimate the confident interval is, e.g.,
# np.percentile(xi, [2.5, 97.5])
return xi_mean, xi_std, xi_rms
def bs_resampling(X, Y, samp_size=None):
"""Return a new sample from bootstrap resampling
Parameters
---------
X, Y : array-like, float
from .matrix_calc import nor_eq_sol, residual_calc
sample to be resampled
samp_size : array-like, optional
size of the new sample
Returns
------
new_X, new_Y : array-like, float
new samples
"""
if samp_size is None:
samp_size = len(X)
indice = np.arange(len(X))
new_samp_indx = np.random.choice(indice, size=samp_size)
new_X, new_Y = X[new_samp_indx], Y[new_samp_indx]
return new_X, new_Y
def bs_resampling_indx(X, samp_size=None):
"""Return a new sample from bootstrap resampling
Parameters
---------
X : array-like, float
sample to be resampled
samp_size : array-like, optional
size of the new sample
Returns
------
new_samp_indx : array-like, float
indice for the new samples
"""
if samp_size is None:
samp_size = len(X)
indice = np.arange(len(X))
new_samp_indx = np.random.choice(indice, size=samp_size)
return new_samp_indx
def bootstrap_resample_4_err(mask, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad,
ra_dc_cor, l_max, fit_type, num_iter, output):
"""Estimate formal uncertainty using bootstrap resampling technique
Parameters
----------
mask : array-like of boolean
flag to find the clean sample
dra/ddc : array of float
R.A.(*cos(Dec.))/Dec. differences in dex
dra_err/ddc_err : array of float
formal uncertainty of dra(*cos(dc_rad))/ddc in dex
ra_rad/dc_rad : array of float
Right ascension/Declination in radian
ra_dc_cor : array of float
correlation coefficient between dra and ddc, default is None
l_max : int
maximum degree
fit_type : string
flag to determine which parameters to be fitted
"full" for T- and S-vectors both
"T" for T-vectors only
"S" for S-vectors only
num_iter : int
number of source once processed. 100 should be fine
output : dict
VSH LSQ fitting output information
Return
------
output : dict
add new formal uncertainty estimate
"""
# Retrieve the clean sample
if not np.all(mask):
dra, ddc, dra_err, ddc_err, ra_rad, dc_rad, ra_dc_cor = extract_data(
mask, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad, ra_dc_cor)
# Retrieve estimates from the clean sample
pmt = output["pmt"]
if "pmt1" in output.keys():
pmt1 = output["pmt1"]
else:
pmt1 = output["pmt2"]
# Re-sampling and parameter estimation
N_samp = len(dra)
N_resamp = bs_freq(N_samp)
N_pmt = len(pmt)
N_pmt1 = len(pmt1)
pmt_ts_array = np.zeros((N_resamp, N_pmt)) # t_lm/s_lm coefficient
# rotation/glide/quadrupolar term
pmt_rgq_array = np.zeros((N_resamp, N_pmt1))
for i in range(N_resamp):
# Generate a new sample
new_indx = bs_resampling_indx(dra)
dra1, ddc1, dra_err1, ddc_err1, ra_rad1, dc_rad1, ra_dc_cor1 = extract_data(
new_indx, dra, ddc, dra_err, ddc_err, ra_rad, dc_rad, ra_dc_cor)
# estimate t_lm/s_lm coefficients from the new sample
pmti, sigi, cor_mati, dra_ri, ddc_ri = nor_eq_sol(
drai, ddci, dra_erri, ddc_erri, ra_radi, dc_radi, ra_dc_cor=ra_dc_cori,
l_max=l_max, fit_type=fit_type, num_iter=num_iter)
pmt_ts_array[i, :] = pmti
# Convert to rotation/glide/quadrupolar terms
pmt1i, err1i, cor_mat1i = convert_ts_to_rgq(
pmti, sigi, cor_mati, l_max, fit_type)
pmt_rgq_array[i, :] = pmt1i
# Delete several unused varaibles to free space
del dra1, ddc1, dra_err1, ddc_err1, ra_rad1, dc_rad1, ra_dc_cor1
# Estimate the formal uncertainty
# For t_lm/s_lm cpefficient
pmt_ts_mean = np.zeros(N_pmt)
pmt_ts_std = np.zeros(N_pmt)
pmt_ts_rms = np.zeros(N_pmt)
for i in range(N_pmt):
meani, stdi, rmsi = bs_formal_error(pmt[i], pmt_ts_array[:, i])
pmt_ts_mean[i] = meani
pmt_ts_std[i] = stdi
pmt_ts_rms[i] = rmsi
# For rotation/glide/quadrupolar terms
pmt_rgq_mean = np.zeros(N_pmt1)
pmt_rgq_std = np.zeros(N_pmt1)
pmt_rgq_rms = np.zeros(N_pmt1)
for i in range(N_pmt1):
meani, stdi, rmsi = bs_formal_error(pmt1[i], pmt_rgq_array[:, i])
pmt_rgq_mean[i] = meani
pmt_rgq_std[i] = stdi
pmt_rgq_rms[i] = rmsi
# Store the results
output["pmt_bs_mean"] = pmt_ts_mean
output["pmt_bs_std"] = pmt_ts_std
output["pmt_bs_rms"] = pmt_ts_rms
if "pmt1" in output.keys():
output["pmt1_bs_mean"] = pmt_rgq_mean
output["pmt1_bs_std"] = pmt_rgq_std
output["pmt1_bs_rms"] = pmt_rgq_rms
else:
output["pmt2_bs_mean"] = pmt_rgq_mean
output["pmt2_bs_std"] = pmt_rgq_std
output["pmt2_bs_rms"] = pmt_rgq_rms
# Add note
output["note"] = output["note"] + [
"pmt_bs_mean: VSH coeffiecent mean from bootstrap sampling\n"
"pmt_bs_std: VSH coefficient std from bootstrap sampling\n"
"pmt_bs_rms: VSH coefficient rms from bootstrap sampling\n"
"pmt1(2)_bs_mean: glide+rotation(+quadrupolar) mean from bootstrap sampling\n"
"pmt1(2)_bs_std: glide+rotation(+quadrupolar) std from bootstrap sampling\n"
"pmt1(2)_bs_rms: glide+rotation(+quadrupolar) rms from bootstrap sampling\n"][0]
return output
def main():
"""Maybe add some tests here
"""
print("Have a nice day!")
if __name__ == "__main__":
main()
# --------------------------------- END --------------------------------
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File name: generate_test_data.py
"""
Created on Tue Nov 24 13:16:49 2020
@author: Neo(niu.liu@nju.edu.cn)
In order to simulate the case of Gaia EDR3 where 1.5M qdexars are included, a
test sample of this amount is generated.
"""
import numpy as np
from astropy.table import Table
# My modules
from .rgq_func import vsh_func
# ----------------------------- MAIN -----------------------------
def generate_test_sample(sample_size, pmt_vec):
"""
"""
# A sample of sources located as (ra, dec)
ra = np.random.random_sample(sample_size) * 2 * np.pi
dec = (np.random.random_sample(sample_size) - 0.5) * np.pi
# Positional difference from ideal cases
dra_calc, ddec_calc = vsh_func(ra, dec, pmt_vec, l_max=1)
# Noise
dra_nois = 0 # np.random.normal(scale=1, size=sample_size) * 0
ddec_nois = 0 # np.random.normal(scale=1, size=sample_size) * 0
# Simulation data
dra_sim, ddec_sim = dra_calc + dra_nois, ddec_calc + ddec_nois
# Formal error, uniform one
dra_err = np.ones(sample_size)
ddec_err = np.ones(sample_size)
dra_ddec_cor = np.zeros(sample_size)
# Generate a Astropy.Table and store the data
data = Table([ra, dec, dra_sim, ddec_sim, dra_err, ddec_err, dra_ddec_cor],
names=["ra", "dec", "dra", "ddec", "dra_err", "ddec_err", "dra_ddec_cor"])
return data
def main():
"""
"""
# Read the simulated data
rot_vec = np.array([20, 30, 15])
gli_vec = np.array([30, 24, 12])
pmt_vec = np.concatenate((gli_vec, rot_vec))
data