from __future__ import print_function
import datetime
import os
import pickle as pk
import shutil
import numpy as np
from lensit.pbs import pbs
from lensit.qcinv import opfilt_cinv, chain_samples, cd_solve, cd_monitors, multigrid, ffs_ninv_filt_ideal
from lensit.ffs_covs import ell_mat
from lensit.ffs_covs import ffs_specmat as SM
from lensit.ffs_covs import ffs_specmat as pmat
from lensit.misc.misc_utils import timer, cls_hash, npy_hash, cl_inverse, extend_cl
from lensit.sims.sims_generic import hash_check
typs = ['T', 'QU', 'TQU']
_timed = True
_MFkeys = [0, 14]
_runtimebarriers = False # Can avoid problems with different MPI processes requiring different number of iterations
_runtimerankzero = True
def xylms_to_phiOmegalm(lib_alm, Fxx, Fyy, Fxy, Fyx=None):
# dx = phi_x + Om_y
# dy = phi_y - Om_x
# -> d dphi = ikx d/dx + iky d/dy
# -> d dOm = iky d/dy - ikx d/dy
lx = lib_alm.get_kx
ly = lib_alm.get_ky
if Fyx is None:
Fpp = Fxx * lx() ** 2 + Fyy * ly() ** 2 + 2. * Fxy * lx() * ly()
FOO = Fxx * ly() ** 2 + Fyy * lx() ** 2 - 2. * Fxy * lx() * ly()
# FIXME:
FpO = lx() * ly() * (Fxx - Fyy) + Fxy * (ly() ** 2 - lx() ** 2)
else:
Fpp = Fxx * lx() ** 2 + Fyy * ly() ** 2 + (Fxy + Fyx) * lx() * ly()
FOO = Fxx * ly() ** 2 + Fyy * lx() ** 2 - (Fxy + Fyx) * lx() * ly()
FpO = lx() * ly() * (Fxx - Fyy) + Fxy * (ly() ** 2 - lx() ** 2)
print('Fxy Fyx equal, allclose', np.all(Fxy == Fyx), np.allclose(Fxy, Fyx))
# FIXME: is the sign of the following line correct ? (anyway result should be close to zero)
return np.array([Fpp, FOO, FpO])
[docs]class ffs_diagcov_alm(object):
"""Library for flat-sky calculations of various lensing biases, responses, etc. in a idealized, isotropic case
Args:
lib_dir: many things will be saved there
lib_datalm: lib_alm instance (see *lensit.ffs_covs.ell_mat* containing mode filtering and flat-sky patch info
cls_unl(dict): unlensed CMB cls
cls_len(dict): lensed CMB cls
cl_transf: instrument transfer function
cls_noise(dict): 't', 'q' and 'u' noise arrays
lib_skyalm(optional): lib_alm instance describing the sky mode. Irrelevant with some exceptions. Defaults to lib_datalm
"""
def __init__(self, lib_dir, lib_datalm, cls_unl, cls_len, cl_transf, cls_noise,
lib_skyalm=None, init_rank=pbs.rank, init_barrier=pbs.barrier):
self.lib_datalm = lib_datalm
self.lib_skyalm = lib_datalm.clone() if lib_skyalm is None else lib_skyalm
self.cls_unl = cls_unl
self.cls_len = cls_len
self.cl_transf = cl_transf
self.cls_noise = cls_noise
for cl in self.cls_noise.values(): assert len(cl) > self.lib_datalm.ellmax, (len(cl), self.lib_datalm.ellmax)
for cl in self.cls_unl.values(): assert len(cl) > self.lib_skyalm.ellmax, (len(cl), self.lib_skyalm.ellmax)
for cl in self.cls_len.values(): assert len(cl) > self.lib_skyalm.ellmax, (len(cl), self.lib_skyalm.ellmax)
assert len(cl_transf) > self.lib_datalm.ellmax, (len(cl_transf), self.lib_datalm.ellmax)
self.dat_shape = self.lib_datalm.ell_mat.shape
self.lsides = self.lib_datalm.ell_mat.lsides
self.lib_dir = lib_dir
if not os.path.exists(lib_dir) and init_rank == 0:
os.makedirs(lib_dir)
init_barrier()
fn = os.path.join(lib_dir, 'cov_hash.pk')
if not os.path.exists(fn) and init_rank == 0:
pk.dump(self.hashdict(), open(fn, 'wb'), protocol=2)
init_barrier()
hash_check(pk.load(open(fn, 'rb')), self.hashdict())
self.barrier = pbs.barrier if _runtimebarriers else lambda: -1
self.pbsrank = 0 if _runtimerankzero else pbs.rank
def _deg(self, skyalm):
assert skyalm.shape == (self.lib_skyalm.alm_size,), (skyalm.shape, self.lib_skyalm.alm_size)
if self.lib_skyalm.iseq(self.lib_datalm, allow_shape=True): return skyalm
return self.lib_datalm.udgrade(self.lib_skyalm, skyalm)
def _upg(self, datalm):
assert datalm.shape == (self.lib_datalm.alm_size,), (datalm.shape, self.lib_datalm.alm_size)
if self.lib_datalm.iseq(self.lib_skyalm, allow_shape=True): return datalm
return self.lib_skyalm.udgrade(self.lib_datalm, datalm)
def _2smap(self, alm):
return self.lib_skyalm.alm2map(alm)
def _2dmap(self, alm):
return self.lib_datalm.alm2map(alm)
def _datalms_shape(self, typ):
assert typ in typs, (typ, typs)
return len(typ), self.lib_datalm.alm_size
def _skyalms_shape(self, typ):
assert typ in typs, (typ, typs)
return len(typ), self.lib_skyalm.alm_size
def _datmaps_shape(self, typ):
assert typ in typs, (typ, typs)
return len(typ), self.dat_shape[0], self.dat_shape[1]
def hashdict(self):
h = {'lib_alm': self.lib_datalm.hashdict(), 'lib_skyalm': self.lib_skyalm.hashdict()}
for key in self.cls_noise.keys():
h['cls_noise ' + key] = npy_hash(self.cls_noise[key])
for key in self.cls_unl.keys():
h['cls_unl ' + key] = npy_hash(self.cls_unl[key])
for key in self.cls_len.keys():
h['cls_len ' + key] = npy_hash(self.cls_len[key])
h['cl_transf'] = npy_hash(self.cl_transf)
return h
def _get_Nell(self, field):
return self.cls_noise[field.lower()][0]
def degrade(self, LD_shape, ellmin=None, ellmax=None, lib_dir=None, libtodegrade='sky', **kwargs):
assert 0, 'FIXME'
def _get_pmati(self, typ, i, j, use_cls_len=True):
r"""Inverse spectral matrix
"""
if i < j: return self._get_pmati(typ, j, i, use_cls_len=use_cls_len)
_str = {True: 'len', False: 'unl'}[use_cls_len]
fname = os.path.join(self.lib_dir, '%s_Pmatinv_%s_%s%s.npy' % (typ, _str, i, j))
if not os.path.exists(fname) and self.pbsrank == 0:
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
Pmatinv = pmat.get_Pmat(typ, self.lib_datalm, cls_cmb,
cl_transf=self.cl_transf, cls_noise=self.cls_noise, inverse=True)
for _j in range(len(typ)):
for _i in range(_j, len(typ)):
np.save(os.path.join(self.lib_dir, '%s_Pmatinv_%s_%s%s.npy' % (typ, _str, _i, _j)), Pmatinv[:, _i, _j])
print(" _get_pmati:: cached", os.path.join(self.lib_dir, '%s_Pmatinv_%s_%s%s.npy' % (typ, _str, _i, _j)))
self.barrier()
return np.load(fname)
def _get_rootpmatsky(self, typ, i, j, use_cls_len=False):
"""Root of sky rfft spectra matrix
"""
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
return pmat.get_rootunlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, j)
[docs] def get_delensinguncorrbias(self, lib_qlm, clpp_rec, wNoise=True, wCMB=True, recache=False, use_cls_len=True):
r"""Calculate delensing bias given a reconstructed potential map spectrum
Crudely, the delensing bias is defined as :math:`C_\ell^{\rm lensed} - C_\ell^{\rm delensed}` on Gaussian CMB maps (with lensed power spectrum).
More precisely this method calculates the 4-point disconnected contribution (see https://arxiv.org/abs/1701.01712)
if no statistical dependence of the lensing tracer reconstruction noise to the CMB maps
Args:
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
clpp_rec: lensing tracer power.
For the true bias calculation *clpp_rec* should be reconstruction noise power only.
For perturbative lensing calculation, include the signal power in *clpp_rec*
wNoise: include noise spectra in total observed CMB spectra (defaults to True)
wCMB: include signal CMB spectra in total observed CMB spectra (defaults to True)
use_cls_len: use lensed CMB cls if set, unlensed if not (defaults to True)
Returns:
(3, 3, lmax +1) array with bias :math:`C_\ell^{ij}, \textrm{ with }i,j \in (T,E,B)`
"""
# assert len(clpp_rec) > lib_qlm.ellmax,(len(clpp_rec),lib_qlm.ellmax)
if len(clpp_rec) <= lib_qlm.ellmax: clpp_rec = extend_cl(clpp_rec, lib_qlm.ellmax)
fname = os.path.join(self.lib_dir, 'TEBdelensUncorrBias_wN%s_w%sCMB%s_%s_%s.dat' \
% (wNoise, {True: 'len', False: 'unl'}[use_cls_len],
wCMB, npy_hash(clpp_rec[lib_qlm.ellmin:lib_qlm.ellmax + 1]),
lib_qlm.filt_hash()))
if (not os.path.exists(fname) or recache) and self.pbsrank == 0:
def ik_q(a):
assert a in [0, 1], a
return lib_qlm.get_ikx() if a == 1 else lib_qlm.get_iky()
def ik_d(a):
assert a in [0, 1], a
return self.lib_datalm.get_ikx() if a == 1 else self.lib_datalm.get_iky()
retalms = np.zeros((3, 3, self.lib_datalm.alm_size), dtype=complex)
cls_noise = {}
for key in self.cls_noise.keys():
cls_noise[key] = self.cls_noise[key] / self.cl_transf[:len(self.cls_noise[key])] ** 2 * wNoise
cmb_cls = self.cls_len if use_cls_len else self.cls_unl
for _i in range(3):
for _j in range(_i, 3):
if wCMB or (_i == _j):
_map = np.zeros(self.dat_shape, dtype=float)
if wCMB:
Pmat = pmat.get_datPmat_ij('TQU', self.lib_datalm, cmb_cls, np.ones_like(self.cl_transf),
cls_noise, _i, _j)
elif wNoise:
assert _i == _j, (_i, _j)
Pmat = cls_noise[{0: 't', 1: 'q', 2: 'u'}[_i]][self.lib_datalm.reduced_ellmat()]
else:
Pmat = 0
for a in [0, 1]:
for b in [0, 1][a:]:
facab = (2. - (a == b))
_phiab = lib_qlm.alm2map(clpp_rec[lib_qlm.reduced_ellmat()] * ik_q(a) * ik_q(b) * facab)
_map += (_phiab - _phiab[0, 0]) \
* self.lib_datalm.alm2map(Pmat * ik_d(a) * ik_d(b))
retalms[_i, _j, :] = (self.lib_datalm.map2alm(_map))
retalms = pmat.TQUPmats2TEBcls(self.lib_datalm, retalms) * (- 1. / np.sqrt(np.prod(self.lsides)))
save_arr = np.zeros((6, self.lib_datalm.ellmax + 1), dtype=float)
k = 0
for _i in range(3):
for _j in range(_i, 3):
save_arr[k, :] = retalms[_i, _j]
k += 1
header = datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y") + '\n' + __file__
header += "\n Delensing Uncorr. Bias. qlm ell-range (%s - %s)" % (lib_qlm.ellmin, lib_qlm.ellmax)
header += "\n Performed with (%s %s) on fsky = %s" % (
self.lib_datalm.shape[0], self.lib_datalm.shape[1], np.round(self.lib_datalm.fsky(), 2))
header += "\n Positive Contr. to C(len) - C(del)."
if wCMB: header += "\n incl. CMB "
if wNoise: header += "\n incl. Noise "
header += "\n TT TE TB EE EB BB"
np.savetxt(fname, save_arr.transpose(), fmt=['%.8e'] * 6, header=header)
print("Cached " + fname)
self.barrier()
cls = np.loadtxt(fname).transpose()
ret = np.zeros((3, 3, self.lib_datalm.ellmax + 1), dtype=float)
ret[0, 0] = cls[0]
ret[0, 1] = cls[1]
ret[1, 0] = cls[1]
ret[0, 2] = cls[2]
ret[2, 0] = cls[2]
ret[1, 1] = cls[3]
ret[1, 2] = cls[4]
ret[2, 1] = cls[4]
ret[2, 2] = cls[5]
return ret
def get_RDdelensinguncorrbias(self, lib_qlm, clpp_rec, clsobs_deconv, clsobs_deconv2=None, recache=False):
#putting cls_obs being cls_len + noise / transf ** 2 should give the same thing as get_delensinguncorrbias.
if len(clpp_rec) <= lib_qlm.ellmax: clpp_rec = extend_cl(clpp_rec, lib_qlm.ellmax)
fname = None
if (not False or recache) and self.pbsrank == 0:
def ik_q(a):
assert a in [0, 1], a
return lib_qlm.get_ikx() if a == 1 else lib_qlm.get_iky()
def ik_d(a):
assert a in [0, 1], a
return self.lib_datalm.get_ikx() if a == 1 else self.lib_datalm.get_iky()
retalms = np.zeros((3, 3, self.lib_datalm.alm_size), dtype=complex)
for _i in range(3):
for _j in range(_i, 3):
_map = np.zeros(self.dat_shape, dtype=float)
Pmat1 = pmat.get_unlPmat_ij('TQU', self.lib_datalm, clsobs_deconv, _i, _j)
Pmat2 = Pmat1 if clsobs_deconv2 is None else pmat.get_unlPmat_ij('TQU', self.lib_datalm, clsobs_deconv2,
_i, _j)
for a in [0, 1]:
for b in [0, 1][a:]:
facab = (2. - (a == b))
_phiab = lib_qlm.alm2map(clpp_rec[lib_qlm.reduced_ellmat()] * ik_q(a) * ik_q(b) * facab)
if clsobs_deconv2 is None:
_map += (_phiab - _phiab[0, 0]) * self.lib_datalm.alm2map(Pmat1 * ik_d(a) * ik_d(b))
else:
_map += (_phiab) * self.lib_datalm.alm2map(Pmat1 * ik_d(a) * ik_d(b))
_map -= (_phiab[0, 0]) * self.lib_datalm.alm2map(Pmat2 * ik_d(a) * ik_d(b))
retalms[_i, _j, :] = (self.lib_datalm.map2alm(_map))
return pmat.TQUPmats2TEBcls(self.lib_datalm, retalms) * (- 1. / np.sqrt(np.prod(self.lsides)))
[docs] def get_delensingcorrbias(self, typ, lib_qlm, alwfcl, CMBonly=False):
r"""Calculate delensing bias given a reconstructed potential map spectrum
Crudely, the delensing bias is defined as :math:`C_\ell^{\rm lensed} - C_\ell^{\rm delensed}` on Gaussian CMB maps (with lensed power spectrum).
More precisely this method calculates the 4-point disconnected contribution (see https://arxiv.org/abs/1701.01712)
from the statistical dependence of the lensing tracer reconstruction noise to the CMB maps
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
alwfcl: normalization of the Wiener-filter quadratic estimate (i.e. Wiener-filter times inverse response)
CMBonly: do not include noise spectra if set (defaults to False)
Returns:
(3, 3, lmax +1) array with bias :math:`C_\ell^{ij}, \textrm{ with }i,j \in (T,E,B)`
"""
assert typ in typs, (typ, typs)
t = timer(_timed)
t.checkpoint("delensing bias : Just started")
retalms = np.zeros((3, 3, self.lib_datalm.alm_size), dtype=complex)
cls_noise = {}
for key in self.cls_noise.keys():
cls_noise[key] = self.cls_noise[key] / self.cl_transf[:len(self.cls_noise[key])] ** 2 * (not CMBonly)
def get_datcl(l, j): # beam deconvolved Cls of the TQU data maps
# The first index is one in th qest estimator of type 'typ' and
# the second any in TQU.
assert (l in range(len(typ))) and (j in range(3)), (l, j)
if typ == 'QU':
l_idx = l + 1
elif typ == 'T':
l_idx = 0
else:
assert typ == 'TQU'
l_idx = l
return pmat.get_datPmat_ij(
'TQU', self.lib_datalm, self.cls_len, np.ones_like(self.cl_transf), cls_noise, l_idx, j)
def _get_Balm(a, l, m): # [ik_a b^2 C_len Cov^{-1}]_{m l}
# Here both indices should refer to the qest.
assert a in [0, 1], a
assert (l in range(len(typ))) and (m in range(len(typ))), (l, m)
ik = self.lib_datalm.get_ikx if a == 1 else self.lib_datalm.get_iky
ret = pmat.get_unlPmat_ij(typ, self.lib_datalm, self.cls_len, m, 0) \
* self._get_pmati(typ, 0, l, use_cls_len=True)
for _i in range(1, len(typ)):
ret += pmat.get_unlPmat_ij(typ, self.lib_datalm, self.cls_len, m, _i) \
* self._get_pmati(typ, _i, l, use_cls_len=True)
return self.lib_datalm.almxfl(ret, self.cl_transf ** 2) * ik()
def _get_Akm(l, m): # [b^2 Cov^{-1}]_{m k}
# Here both indices should refer to the qest.
assert (l in range(len(typ))) and (m in range(len(typ))), (l, m)
return self.lib_datalm.almxfl(self._get_pmati(typ, m, l, use_cls_len=True), self.cl_transf ** 2)
def get_BCamj(a, m, j): # sum_l B^{a, l m} \hat C^{lj}
# The first index m is one in th qest estimator of type 'typ' and
# the second any in TQU.
assert a in [0, 1], a
assert m in range(len(typ)), (m, typ)
assert j in range(3), j
ret = _get_Balm(a, 0, m) * get_datcl(0, j)
for _i in range(1, len(typ)):
ret += _get_Balm(a, _i, m) * get_datcl(_i, j)
return ret
def get_ACmj(m, j): # sum_k A^{k m} \hat C^{kj}
# The first index m is one in th qest estimator of type 'typ' and
# the second any in TQU.
assert m in range(len(typ)), (m, typ)
assert j in range(3), j
ret = _get_Akm(0, m) * get_datcl(0, j)
for _i in range(1, len(typ)):
ret += _get_Akm(_i, m) * get_datcl(_i, j)
return ret
def ik(a, libalm=self.lib_datalm):
assert a in [0, 1], a
return libalm.get_ikx() if a == 1 else libalm.get_iky()
_map = lambda _alm: self.lib_datalm.alm2map(_alm)
for a in [0, 1]:
for b in [0, 1]:
t.checkpoint(" Doing axes %s %s" % (a, b))
Hab = lib_qlm.alm2map(alwfcl[lib_qlm.reduced_ellmat()] * ik(a, libalm=lib_qlm) * ik(b, libalm=lib_qlm))
for _i in range(3): # Building TQU biases, before rotation to Gradient / Curl
for _j in range(0, 3):
t.checkpoint(
" Doing %s" % ({0: 'T', 1: 'Q', 2: 'U'}[_i] + {0: 'T', 1: 'Q', 2: 'U'}[_j]))
# Need a sum over a and m
for m in range(len(typ)): # Hab(z) * [ (AC_m_dai)(-z) (BC_bmj) + (AC_mj)(-z) (BC_bmdai)]
Pmat = (get_ACmj(m, _i) * ik(a)).conjugate()
retalms[_i, _j, :] += self.lib_datalm.map2alm(Hab * _map(Pmat)) * get_BCamj(b, m, _j)
Pmat = (get_BCamj(b, m, _i) * ik(a)).conjugate()
retalms[_i, _j, :] += self.lib_datalm.map2alm(Hab * _map(Pmat)) * get_ACmj(m, _j)
norm = 1. / np.sqrt(np.prod(self.lsides))
# Sure that i - j x-y is the same ?
for i in range(3): # Building TQU biases, before rotation to Gradient / Curl
for j in range(i, 3):
print(typ + ' :', np.allclose(retalms[j, i, :].real, retalms[i, j, :].real))
retalms[i, j, :] += retalms[j, i, :].conjugate()
return pmat.TQUPmats2TEBcls(self.lib_datalm, retalms) * norm
[docs] def get_RDdelensingcorrbias(self, typ, lib_qlm, alwfcl, clsobs_deconv, clsobs_deconv2=None, cls_weights=None):
r"""Calculate delensing bias given a reconstructed potential map spectrum
Same as *get_delensingcorrbias* using empirical input CMB spectra
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
alwfcl: normalization of the Wiener-filter quadratic estimate (i.e. Wiener-filter times inverse response)
clsobs_deconv(dict): emprical beam-deconvolved CMB data spectra.
Returns:
(3, 3, lmax +1) array with bias :math:`C_\ell^{ij}, \textrm{ with }i,j \in (T,E,B)`
"""
assert typ in typs, (typ, typs)
t = timer(_timed, suffix=' (delensing RD corr. Bias)')
retalms = np.zeros((3, 3, self.lib_datalm.alm_size), dtype=complex)
_cls_weights = cls_weights or self.cls_len
_clsobs_deconv2 = clsobs_deconv2 or clsobs_deconv
if cls_weights is not None: t.checkpoint("Using custom cls weights")
def get_datcl(l, j, id): # beam deconvolved Cls of the TQU data maps
# The first index is one in th qest estimator of type 'typ' and
# the second any in TQU.
assert (l in range(len(typ))) and (j in range(3)), (l, j)
assert id == 1 or id == 2, id
if typ == 'QU':
l_idx = l + 1
elif typ == 'T':
l_idx = 0
else:
assert typ == 'TQU'
l_idx = l
_cmb_cls = clsobs_deconv if id == 1 else _clsobs_deconv2
return pmat.get_unlPmat_ij('TQU', self.lib_datalm, _cmb_cls, l_idx, j)
def _get_Balm(a, l, m): # [ik_a b^2 C_len Cov^{-1}]_{m l}
# Here both indices should refer to the qest.
assert a in [0, 1], a
assert (l in range(len(typ))) and (m in range(len(typ))), (l, m)
ik = self.lib_datalm.get_ikx if a == 1 else self.lib_datalm.get_iky
ret = pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_weights, m, 0) \
* self._get_pmati(typ, 0, l, use_cls_len=True)
for _i in range(1, len(typ)):
ret += pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_weights, m, _i) \
* self._get_pmati(typ, _i, l, use_cls_len=True)
return self.lib_datalm.almxfl(ret, self.cl_transf ** 2) * ik()
def _get_Akm(l, m): # [b^2 Cov^{-1}]_{m k}
# Here both indices should refer to the qest.
assert (l in range(len(typ))) and (m in range(len(typ))), (l, m)
return self.lib_datalm.almxfl(self._get_pmati(typ, m, l, use_cls_len=True), self.cl_transf ** 2)
def get_BCamj(a, m, j): # sum_l B^{a, l m} \hat C^{lj}
# The first index m is one in th qest estimator of type 'typ' and
# the second any in TQU.
assert a in [0, 1], a
assert m in range(len(typ)), (m, typ)
assert j in range(3), j
ret = _get_Balm(a, 0, m) * get_datcl(0, j, 2)
for _i in range(1, len(typ)):
ret += _get_Balm(a, _i, m) * get_datcl(_i, j, 2)
return ret
def get_ACmj(m, j): # sum_k A^{k m} \hat C^{kj}
# The first index m is one in th qest estimator of type 'typ' and
# the second any in TQU.
assert m in range(len(typ)), (m, typ)
assert j in range(3), j
ret = _get_Akm(0, m) * get_datcl(0, j, 1)
for _i in range(1, len(typ)):
ret += _get_Akm(_i, m) * get_datcl(_i, j, 1)
return ret
def ik(a, libalm=self.lib_datalm):
assert a in [0, 1], a
return libalm.get_ikx() if a == 1 else libalm.get_iky()
_map = lambda _alm: self.lib_datalm.alm2map(_alm)
for a in [0, 1]:
for b in [0, 1]:
t.checkpoint(" Doing axes %s %s" % (a, b))
Hab = lib_qlm.alm2map(alwfcl[lib_qlm.reduced_ellmat()] * ik(a, libalm=lib_qlm) * ik(b, libalm=lib_qlm))
for _i in range(3): # Building TQU biases, before rotation to Gradient / Curl
for _j in range(0, 3):
t.checkpoint(" Doing %s" % ({0: 'T', 1: 'Q', 2: 'U'}[_i] + {0: 'T', 1: 'Q', 2: 'U'}[_j]))
# Need a sum over a and m
for m in range(len(typ)): # Hab(z) * [ (AC_m_dai)(-z) (BC_bmj) + (AC_mj)(-z) (BC_bmdai)]
Pmat = (get_ACmj(m, _i) * ik(a)).conjugate()
retalms[_i, _j, :] += self.lib_datalm.map2alm(Hab * _map(Pmat)) * get_BCamj(b, m, _j)
Pmat = (get_BCamj(b, m, _i) * ik(a)).conjugate()
retalms[_i, _j, :] += self.lib_datalm.map2alm(Hab * _map(Pmat)) * get_ACmj(m, _j)
norm = 1. / np.sqrt(np.prod(self.lsides)) # ?
# Sure that i - j x-y is the same ?
for _i in range(3): # Building TQU biases, before rotation to Gradient / Curl
for _j in range(_i, 3):
if _i == 0 and _j == 0:
print("Testing conjecture that in the MV case this is symmetric :")
print( typ + ' :', np.allclose(retalms[_j, _i, :].real, retalms[_i, _j, :].real))
retalms[_i, _j, :] += retalms[_j, _i, :].conjugate()
return pmat.TQUPmats2TEBcls(self.lib_datalm, retalms) * norm
def _apply_beams(self, typ, alms):
assert alms.shape == self._skyalms_shape(typ), (alms.shape, self._skyalms_shape(typ))
ret = np.empty_like(alms)
for _i in range(len(typ)): ret[_i] = self.lib_skyalm.almxfl(alms[_i], self.cl_transf)
return ret
def apply(self, typ, alms, **kwargs):
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
ret = np.zeros_like(alms)
for j in range(len(typ)):
for i in range(len(typ)):
ret[i] += \
pmat.get_datPmat_ij(typ, self.lib_datalm, self.cls_unl, self.cl_transf, self.cls_noise, i, j) * alms[j]
return ret
def apply_noise(self, typ, alms, inverse=False):
#Apply noise matrix or its inverse to inputs uqlms.
assert typ in typs, typ
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
ret = np.zeros_like(alms)
for _i in range(len(typ)):
_cl = self.cls_noise[typ[_i].lower()] if not inverse else 1. / self.cls_noise[typ[_i].lower()]
ret[_i] = self.lib_datalm.almxfl(alms[_i], _cl)
return ret
[docs] def get_mllms(self, typ, datmaps, use_cls_len=True, use_Pool=0, **kwargs):
r"""Returns maximum likelihood sky CMB modes.
This instance uses isotropic filtering
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
datmaps: data real-space maps array
use_cls_len: use lensed cls in filtering (defaults to True)
Returns:
T,E,B alm array
"""
ilms = self.apply_conddiagcl(typ, np.array([self.lib_datalm.map2alm(m) for m in datmaps]),
use_Pool=use_Pool, use_cls_len=use_cls_len)
cmb_cls = self.cls_len if use_cls_len else self.cls_unl
for i in range(len(typ)): ilms[i] = self.lib_datalm.almxfl(ilms[i], self.cl_transf)
ilms = SM.apply_TEBmat(typ, self.lib_datalm, cmb_cls, SM.TQU2TEBlms(typ, self.lib_datalm, ilms))
return np.array([self.lib_skyalm.udgrade(self.lib_datalm, _alm) for _alm in ilms])
[docs] def get_iblms(self, typ, datalms, use_cls_len=True, use_Pool=0, **kwargs):
r"""Inverse-variance filters input CMB maps
Produces :math:`B^t \rm{Cov}^{-1} X^{\rm dat}` (inputs to quadratic estimator routines)
This instance applies isotropic filtering, with no deflection field
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
datalms: data alms array
use_cls_len: use lensed cls in filtering (defaults to True)
Returns:
inverse-variance filtered alms (T and/or Q, Ulms)
"""
if datalms.shape == ((len(typ), self.dat_shape[0], self.dat_shape[1])):
_datalms = np.array([self.lib_datalm.map2alm(_m) for _m in datalms])
return self.get_iblms(typ, _datalms, use_cls_len=use_cls_len, use_Pool=use_Pool, **kwargs)
assert datalms.shape == self._datalms_shape(typ), (datalms.shape, self._datalms_shape(typ))
ilms = self.apply_conddiagcl(typ, datalms, use_Pool=use_Pool, use_cls_len=use_cls_len)
ret = np.empty(self._skyalms_shape(typ), dtype=complex)
for _i in range(len(typ)):
ret[_i] = self.lib_skyalm.udgrade(self.lib_datalm, self.lib_datalm.almxfl(ilms[_i], self.cl_transf))
return ret, 0
def cd_solve(self, typ, alms, cond='3', maxiter=50, ulm0=None,
use_Pool=0, tol=1e-5, tr_cd=cd_solve.tr_cg, cd_roundoff=25, d0=None):
#Solves for (F B D xi_unl D^t B^t F^t + N)^-1 dot input alms
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
if ulm0 is not None:
assert ulm0.shape == self._datalms_shape(typ), (ulm0.shape, self._datalms_shape(typ))
class dot_op():
def __init__(self):
pass
def __call__(self, alms1, alms2, **kwargs):
return np.sum(alms1.real * alms2.real + alms1.imag * alms2.imag)
cond_func = getattr(self, 'apply_cond%s' % cond)
# ! fwd_op and pre_ops must not change their arguments !
def fwd_op(_alms):
return self.apply(typ, _alms, use_Pool=use_Pool)
def pre_ops(_alms):
return cond_func(typ, _alms, use_Pool=use_Pool)
dot_op = dot_op()
if d0 is None:
d0 = dot_op(alms, alms)
if ulm0 is None: ulm0 = np.zeros_like(alms)
criterion = cd_monitors.monitor_basic(dot_op, iter_max=maxiter, eps_min=tol, d0=d0)
print("++ ffs_cov cd_solve: starting, cond %s " % cond)
it = cd_solve.cd_solve(ulm0, alms, fwd_op, [pre_ops], dot_op, criterion, tr_cd,
roundoff=cd_roundoff)
return ulm0, it
def _apply_cond3(self, typ, alms, use_Pool=0):
return self.apply_conddiagcl(typ, alms, use_Pool=use_Pool)
def apply_cond0(self, typ, alms, use_Pool=0, use_cls_len=True):
return self.apply_conddiagcl(typ, alms, use_Pool=use_Pool, use_cls_len=use_cls_len)
def apply_cond0unl(self, typ, alms, **kwargs):
return self.apply_conddiagcl(typ, alms, use_cls_len=False)
def apply_cond0len(self, typ, alms, **kwargs):
return self.apply_conddiagcl(typ, alms, use_cls_len=True)
def apply_conddiagcl(self, typ, alms, use_cls_len=True, use_Pool=0):
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
ret = np.zeros_like(alms)
for i in range(len(typ)):
for j in range(len(typ)):
ret[j] += self._get_pmati(typ, j, i, use_cls_len=use_cls_len) * alms[i]
return ret
def apply_condpseudiagcl(self, typ, alms, use_Pool=0):
return self.apply_conddiagcl(typ, alms, use_Pool=use_Pool)
[docs] def get_qlms(self, typ, iblms, lib_qlm, use_cls_len=True, **kwargs):
r"""Unormalized quadratic estimates (potential and curl).
Note:
the output differs by a factor of two from the standard QE. This is because this was written initially
as gradient function of the CMB likelihood w.r.t. real and imaginary parts. So to use this as QE's,
the normalization is 1/2 the standard normalization the inverse response. The *lensit.ffs_qlms.qlms.py* module contains methods
of the QE's with standard normalizations which you may want to use instead.
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
iblms: inverse-variance filtered CMB alm arrays
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
use_cls_len: use lensed or unlensed cls in QE weights (numerator), defaults to lensed cls
"""
assert iblms.shape == self._skyalms_shape(typ), (iblms.shape, self._skyalms_shape(typ))
assert lib_qlm.lsides == self.lsides, (self.lsides, lib_qlm.lsides)
t = timer(_timed)
weights_cls = self.cls_len if use_cls_len else self.cls_unl
clms = np.zeros((len(typ), self.lib_skyalm.alm_size), dtype=complex)
for _i in range(len(typ)):
for _j in range(len(typ)):
clms[_i] += pmat.get_unlPmat_ij(typ, self.lib_skyalm, weights_cls, _i, _j) * iblms[_j]
t.checkpoint(" get_qlms::mult with %s Pmat" % ({True: 'len', False: 'unl'}[use_cls_len]))
_map = lambda alm: self.lib_skyalm.alm2map(alm)
_2qlm = lambda _m: lib_qlm.udgrade(self.lib_skyalm, self.lib_skyalm.map2alm(_m))
retdx = _2qlm(_map(iblms[0]) * _map(clms[0] * self.lib_skyalm.get_ikx()))
retdy = _2qlm(_map(iblms[0]) * _map(clms[0] * self.lib_skyalm.get_iky()))
for _i in range(1, len(typ)):
retdx += _2qlm(_map(iblms[_i]) * _map(clms[_i] * self.lib_skyalm.get_ikx()))
retdy += _2qlm(_map(iblms[_i]) * _map(clms[_i] * self.lib_skyalm.get_iky()))
t.checkpoint(" get_qlms::cartesian gradients")
dphi = -retdx * lib_qlm.get_ikx() - retdy * lib_qlm.get_iky()
dOm = retdx * lib_qlm.get_iky() - retdy * lib_qlm.get_ikx()
t.checkpoint(" get_qlms::rotation to phi Omega")
return np.array([2 * dphi, 2 * dOm]) # Factor 2 since gradient w.r.t. real and imag. parts.
def _get_qlm_resprlm(self, typ, lib_qlm,
use_cls_len=True, cls_obs=None, cls_obs2=None, cls_filt=None, cls_weights=None):
assert typ in typs, (typ, typs)
t = timer(_timed)
Fpp, FOO, FpO = self._get_qlm_curvature(typ, lib_qlm,
use_cls_len=use_cls_len, cls_obs=cls_obs, cls_filt=cls_filt, cls_weights=cls_weights, cls_obs2=cls_obs2)
t.checkpoint(" get_qlm_resplm:: get curvature matrices")
del FpO
Rpp = np.zeros_like(Fpp)
ROO = np.zeros_like(FOO)
Rpp[np.where(Fpp > 0.)] = 1. / Fpp[np.where(Fpp > 0.)]
ROO[np.where(FOO > 0.)] = 1. / FOO[np.where(FOO > 0.)]
t.checkpoint(" get_qlm_resplm:: inversion")
return Rpp, ROO
[docs] def get_N0cls(self, typ, lib_qlm,
use_cls_len=True, cls_obs=None, cls_obs2=None, cls_weights=None, cls_filt=None):
r"""Lensing Gaussian bias :math:`N^{(0)}_L`
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance decribing the lensing alm arrays
cls_obs(dict, optional): empirical observed data spectra, for a realization-dependent estimate
cls_weights(dict, optional): CMB cls entering the QE weights (numerator)
cls_filt(dict, optional): CMB cls entering the inverse-variance filtering step (denominator of QE weights)
use_cls_len(optional): Uses lensed or unlensed CMB cls (when not superseeded byt the other keywords)
Returns:
Gradient and curl lensing mode noise :math:`N^{(0)}_L`
"""
assert typ in typs, (typ, typs)
if cls_obs is None and cls_obs2 is None and cls_weights is None and cls_filt is None: # default behavior is cached
fname = self.lib_dir + '/%s_N0cls_%sCls.dat' % (typ, {True: 'len', False: 'unl'}[use_cls_len])
if not os.path.exists(fname):
if self.pbsrank == 0:
lib_full = ell_mat.ffs_alm_pyFFTW(self.lib_datalm.ell_mat, filt_func=lambda ell: ell > 0)
Rpp, ROO = self._get_qlm_resprlm(typ, lib_full, use_cls_len=use_cls_len)
header = datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y") + '\n' + __file__
np.savetxt(fname, np.array((2 * lib_full.alm2cl(np.sqrt(Rpp)), 2 * lib_full.alm2cl(np.sqrt(ROO)))).transpose(),fmt=['%.8e'] * 2, header=header)
self.barrier()
cl = np.loadtxt(fname).transpose()[:, 0:lib_qlm.ellmax + 1]
cl[0] *= lib_qlm.filt_func(np.arange(len(cl[0])))
cl[1] *= lib_qlm.filt_func(np.arange(len(cl[1])))
return cl
else:
Rpp, ROO = self._get_qlm_resprlm(typ, lib_qlm,
use_cls_len=use_cls_len, cls_obs=cls_obs, cls_weights=cls_weights, cls_filt=cls_filt, cls_obs2=cls_obs2)
return 2 * lib_qlm.alm2cl(np.sqrt(Rpp)), 2 * lib_qlm.alm2cl(np.sqrt(ROO))
[docs] def iterateN0cls(self, typ, lib_qlm, itmax, return_delcls=False, _it=0, _cpp=None):
"""Iterative flat-sky :math:`N^{(0)}_L` calculation to estimate the noise levels of the iterative estimator.
This uses perturbative approach in Wiener-filtered displacement, consistent with box shape and mode structure.
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
itmax: Number of iterations to performs
return_delcls: optionally return partially delensed cmb cls as well
"""
N0 = self.get_N0cls(typ, lib_qlm, use_cls_len=True)[0][:lib_qlm.ellmax + 1]
if _it == itmax: return N0 if not return_delcls else (N0, self.cls_len)
if _cpp is None:
_cpp = np.copy(self.cls_unl['pp'])
cpp = np.zeros(lib_qlm.ellmax + 1)
cpp[:min(len(cpp), len(_cpp))] = (_cpp[:min(len(cpp), len(_cpp))])
clWF = cpp * cl_inverse(cpp + N0[:lib_qlm.ellmax + 1])
Bl = self.get_delensinguncorrbias(lib_qlm, cpp * (1. - clWF), wNoise=False, use_cls_len=False) # TEB matrix output
cls_delen = {}
for key in self.cls_len.keys():
cls_delen[key] = self.cls_unl[key].copy()
_Bl = Bl[{'t': 0, 'e': 1, 'b': 2}[key[0]], {'t': 0, 'e': 1, 'b': 2}[key[1]]]
cls_delen[key][:min(len(cls_delen[key]),len(_Bl))] -= _Bl[:min(len(cls_delen[key]),len(_Bl))]
cls_unl = {}
for key in self.cls_unl.keys():
cls_unl[key] = self.cls_unl[key].copy()
# cls_unl['pp'][0:min(len(cpp), len(cls_unl['pp']))] = (cpp * (1. - clWF))[0:min(len(cpp), len(cls_unl['pp']))]
new_libdir = os.path.join(self.lib_dir, '%s_N0iter' % typ, 'N0iter%04d' % (_it + 1)) if _it == 0 else \
self.lib_dir.replace('N0iter%04d' % _it, 'N0iter%04d' % (_it + 1))
try:
new_cov = ffs_diagcov_alm(new_libdir, self.lib_datalm, cls_unl, cls_delen, self.cl_transf, self.cls_noise,
lib_skyalm=self.lib_skyalm)
except:
print("hash check failed, removing " + new_libdir)
shutil.rmtree(new_libdir)
new_cov = ffs_diagcov_alm(new_libdir, self.lib_datalm, cls_unl, cls_delen, self.cl_transf, self.cls_noise,
lib_skyalm=self.lib_skyalm)
return new_cov.iterateN0cls(typ, lib_qlm, itmax, _it=_it + 1, return_delcls=return_delcls, _cpp=_cpp)
def get_N0Pk_minimal(self, typ, lib_qlm, use_cls_len=True, cls_obs=None):
# Same as N0cls but binning only in exactly identical frequencies.
assert typ in typs, (typ, typs)
Rpp, ROO = self._get_qlm_resprlm(typ, lib_qlm, use_cls_len=use_cls_len, cls_obs=cls_obs)
return lib_qlm.alm2Pk_minimal(np.sqrt(2 * Rpp)), lib_qlm.alm2Pk_minimal(np.sqrt(2 * ROO))
[docs] def get_response(self, typ, lib_qlm, cls_weights=None, cls_filt=None, cls_cmb=None, use_cls_len=True):
r"""Lensing quadratic estimator gradient and curl response functions.
Args:
cls_filt: CMB spectra used in the filtering procedure ( i.e. those entering :math:`\rm{Cov}^{-1}`).
Defaults to *self.cls_len* if set else *self.cls_unl*
cls_weights(dict): CMB spectra used in the QE weights (those entering the numerator in the usual Okamoto & Hu formulae)
Defaults to *self.cls_len* if set else *self.cls_unl*
cls_cmb(dict): CMB spectra of the sky entering the response contractions (in principle lensed cls or grad-lensed cls)
"""
assert typ in typs, (typ, typs)
t = timer(_timed, prefix=__name__, suffix=' curvpOlm')
_cls_weights = cls_weights or (self.cls_len if use_cls_len else self.cls_unl)
_cls_filt = cls_filt or (self.cls_len if use_cls_len else self.cls_unl)
_cls_cmb = cls_cmb or (self.cls_len if use_cls_len else self.cls_unl)
if not cls_weights is None: t.checkpoint('Using custom Cls weights')
if not cls_filt is None: t.checkpoint('Using custom Cls filt')
if not cls_cmb is None: t.checkpoint('Using custom Cls cmb')
Pinv_obs1 = pmat.get_Pmat(typ, self.lib_datalm, _cls_filt,
cls_noise=self.cls_noise, cl_transf=self.cl_transf, inverse=True)
Pinv_obs2 = Pinv_obs1
# xi K
def get_xiK(i, j, id, cls):
assert id in [1, 2]
_Pinv_obs = Pinv_obs1 if id == 1 else Pinv_obs2
ret = pmat.get_unlPmat_ij(typ, self.lib_datalm, cls, i, 0) * _Pinv_obs[:, 0, j]
for _k in range(1, len(typ)):
ret += pmat.get_unlPmat_ij(typ, self.lib_datalm, cls, i, _k) * _Pinv_obs[:, _k, j]
return self.lib_datalm.almxfl(ret, self.cl_transf ** 2)
# xi^w K xi^cmb
def get_xiwKxicmb(i, j, id):
assert id in [1, 2]
ret = get_xiK(i, 0, id, _cls_weights) * pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_cmb, 0, j)
for _k in range(1, len(typ)):
ret += get_xiK(i, _k, id, _cls_weights) * pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_cmb, _k, j)
return ret
ikx = self.lib_datalm.get_ikx
iky = self.lib_datalm.get_iky
t.checkpoint(" inverse %s Pmats" % ({True: 'len', False: 'unl'}[use_cls_len]))
F = np.zeros(self.lib_datalm.ell_mat.shape, dtype=float)
# Calculation of (xi^cmb,b K) (xi^w,a K)
for i in range(len(typ)):
for j in range(0, len(typ)):
# ! Matrix not symmetric for TQU or non identical noises. But xx or yy element ok.#(2 - (i == j)) *
F += self.lib_datalm.alm2map(ikx() * get_xiK(i, j, 1, _cls_cmb)) \
* self.lib_datalm.alm2map(ikx() * get_xiK(j, i, 2, _cls_weights))
Fxx = lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fxx , part 1")
for i in range(len(typ)):
for j in range(0, len(typ)):
# ! Matrix not symmetric for TQU or non identical noises. But xx or yy element ok.#(2 - (i == j)) *
F += self.lib_datalm.alm2map(iky() * get_xiK(i, j, 1, _cls_cmb)) \
* self.lib_datalm.alm2map(iky() * get_xiK(j, i, 2, _cls_weights))
Fyy = lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fyy , part 1")
for i in range(len(typ)):
for j in range(len(typ)):
# ! BPBCovi Matrix not symmetric for TQU or non identical noises.
F += self.lib_datalm.alm2map(ikx() * get_xiK(i, j, 1, _cls_cmb)) \
* self.lib_datalm.alm2map(iky() * get_xiK(j, i, 2, _cls_weights))
Fxy = lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fxy , part 1")
# Adding to that (K)(z) (xi^w,a K xi^cmb,b)(z)
tmap = lambda i, j: self.lib_datalm.alm2map(
self.lib_datalm.almxfl(Pinv_obs1[:, i, j], self.cl_transf ** 2))
for i in range(len(typ)):
for j in range(0, len(typ)):
F += tmap(i, j) * self.lib_datalm.alm2map(ikx() ** 2 * get_xiwKxicmb(i, j, 2))
Fxx += lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fxx , part 2")
for i in range(len(typ)):
for j in range(0, len(typ)):
F += tmap(i, j) * self.lib_datalm.alm2map(iky() ** 2 * get_xiwKxicmb(i, j, 2))
Fyy += lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fyy , part 2")
for i in range(len(typ)):
for j in range(0, len(typ)):
F += tmap(i, j) * self.lib_datalm.alm2map(iky() * ikx() * get_xiwKxicmb(i, j, 2))
Fxy += lib_qlm.map2alm(F)
t.checkpoint(" Fxy , part 2")
facunits = -1. / np.sqrt(np.prod(self.lsides))
return np.array([lib_qlm.bin_realpart_inell(r) for r in xylms_to_phiOmegalm(lib_qlm, Fxx.real * facunits, Fyy.real * facunits, Fxy.real * facunits)])
def _get_qlm_curvature(self, typ, lib_qlm,
cls_weights=None, cls_filt=None, cls_obs=None, cls_obs2=None, use_cls_len=True):
"""Fisher matrix for the displacement components phi and Omega (gradient and curl potentials)
"""
assert typ in typs, (typ, typs)
t = timer(_timed, prefix=__name__, suffix=' curvpOlm')
_cls_weights = cls_weights or (self.cls_len if use_cls_len else self.cls_unl)
_cls_filt = cls_filt or (self.cls_len if use_cls_len else self.cls_unl)
_cls_obs = cls_obs or (self.cls_len if use_cls_len else self.cls_unl)
_cls_obs2 = cls_obs2 or (self.cls_len if use_cls_len else self.cls_unl)
if not cls_weights is None: t.checkpoint('Using custom Cls weights')
if not cls_filt is None: t.checkpoint('Using custom Cls filt')
if not cls_obs is None: t.checkpoint('Using custom Cls obs')
if not cls_obs2 is None: t.checkpoint('Using custom Cls obs2')
# Build the inverse covariance matrix part
# For a standard N0 computation, this will just be cov^{-1}.
# For RDN0 compuation, it will be cov^{-1} cov_obs cov^{-1}
# In the case of RDN0 computation, only one of the two inverse covariance matrix is replaced.
if cls_obs is None:
assert cls_obs2 is None
_lib_qlm = ell_mat.ffs_alm_pyFFTW(self.lib_datalm.ell_mat,
filt_func=lambda ell: (ell > 0) & (
ell <= 2 * self.lib_datalm.ellmax))
if cls_obs is None and cls_weights is None and cls_filt is None and cls_obs2 is None: # default is cached
fname = os.path.join(self.lib_dir, '%s_resplm_%sCls.npy' % (typ, {True: 'len', False: 'unl'}[use_cls_len]))
if os.path.exists(fname):
return np.array([lib_qlm.udgrade(_lib_qlm, _a) for _a in np.load(fname)])
Pinv_obs1 = pmat.get_Pmat(typ, self.lib_datalm, _cls_filt,
cls_noise=self.cls_noise, cl_transf=self.cl_transf, inverse=True)
Pinv_obs2 = Pinv_obs1
else:
# FIXME : this will fail if lib_qlm does not have the right shape
_lib_qlm = lib_qlm
Covi = pmat.get_Pmat(typ, self.lib_datalm, _cls_filt, cls_noise=self.cls_noise, cl_transf=self.cl_transf,
inverse=True)
Pinv_obs1 = np.array([np.dot(a, b) for a, b in
zip(pmat.get_Pmat(typ, self.lib_datalm, _cls_obs, cls_noise=None, cl_transf=None),
Covi)])
Pinv_obs1 = np.array([np.dot(a, b) for a, b in zip(Covi, Pinv_obs1)])
if cls_obs2 is None:
Pinv_obs2 = Pinv_obs1
else:
Pinv_obs2 = np.array([np.dot(a, b) for a, b in
zip(pmat.get_Pmat(typ, self.lib_datalm, _cls_obs2, cls_noise=None, cl_transf=None),
Covi)])
Pinv_obs2 = np.array([np.dot(a, b) for a, b in zip(Covi, Pinv_obs2)])
del Covi
# B xi B^t Cov^{-1} (or Cov^-1 Cov_obs Cov^-1 for semi-analytical N0)
def get_BPBCovi(i, j, id):
assert id in [1, 2]
_Pinv_obs = Pinv_obs1 if id == 1 else Pinv_obs2
ret = pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_weights, i, 0) * _Pinv_obs[:, 0, j]
for _k in range(1, len(typ)):
ret += pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_weights, i, _k) * _Pinv_obs[:, _k, j]
return self.lib_datalm.almxfl(ret, self.cl_transf ** 2)
def get_BPBCoviP(i, j, id):
assert id in [1, 2]
ret = get_BPBCovi(i, 0, id) * pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_weights, 0, j)
for _k in range(1, len(typ)):
ret += get_BPBCovi(i, _k, id) * pmat.get_unlPmat_ij(typ, self.lib_datalm, _cls_weights, _k, j)
return ret
def get_BPBCovi_rot(i, j, id):
assert id in [1, 2]
_Pinv_obs = Pinv_obs1 if id == 1 else Pinv_obs2
ret = pmat.get_unlrotPmat_ij(typ, self.lib_datalm, _cls_weights, i, 0) * _Pinv_obs[:, 0, j]
for _k in range(1, len(typ)):
ret += pmat.get_unlrotPmat_ij(typ, self.lib_datalm, _cls_weights, i, _k) * _Pinv_obs[:, _k, j]
return self.lib_datalm.almxfl(ret, self.cl_transf ** 2)
ikx = self.lib_datalm.get_ikx
iky = self.lib_datalm.get_iky
t.checkpoint(" inverse %s Pmats" % ({True: 'len', False: 'unl'}[use_cls_len]))
F = np.zeros(self.lib_datalm.ell_mat.shape, dtype=float)
# 2.1 GB in memory for full sky 16384 ** 2 points. Note however that we can without any loss of accuracy
# calculate this using a twice as sparse grid, for reasonable input parameters.
# Calculation of (db xi B Cov^{-1} B^t )_{ab}(z) (daxi B Cov^{-1} B^t)^{ba}(z)
for i in range(len(typ)):
for j in range(i, len(typ)):
# ! BPBCovi Matrix not symmetric for TQU or non identical noises. But xx or yy element ok.
F += (2 - (i == j)) * self.lib_datalm.alm2map(ikx() * get_BPBCovi(i, j, 1)) \
* self.lib_datalm.alm2map(ikx() * get_BPBCovi(j, i, 2))
Fxx = _lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fxx , part 1")
for i in range(len(typ)):
for j in range(i, len(typ)):
# ! BPBCovi Matrix not symmetric for TQU or non identical noises. But xx or yy element ok.
F += (2 - (i == j)) * self.lib_datalm.alm2map(iky() * get_BPBCovi(i, j, 1)) \
* self.lib_datalm.alm2map(iky() * get_BPBCovi(j, i, 2))
Fyy = _lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fyy , part 1")
for i in range(len(typ)):
for j in range(len(typ)):
# ! BPBCovi Matrix not symmetric for TQU or non identical noises.
F += self.lib_datalm.alm2map(ikx() * get_BPBCovi(i, j, 1)) \
* self.lib_datalm.alm2map(iky() * get_BPBCovi(j, i, 2))
Fxy = _lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fxy , part 1")
# Adding to that (B Cov^-1 B^t)(z) (daxi B Cov^-1 B^t dbxi)(z)
# Construct Pmat:
# Cl * bl ** 2 * cov^{-1} cov_obs cov^{-1} * Cl if semianalytic N0
# Cl * bl ** 2 * cov^{-1} * Cl if N0
# Now both spectral matrices are symmetric.
tmap = lambda i, j: self.lib_datalm.alm2map(
self.lib_datalm.almxfl(Pinv_obs1[:, i, j], (2 - (j == i)) * self.cl_transf ** 2))
for i in range(len(typ)):
for j in range(i, len(typ)):
F += tmap(i, j) * self.lib_datalm.alm2map(ikx() ** 2 * get_BPBCoviP(i, j, 2))
Fxx += _lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fxx , part 2")
for i in range(len(typ)):
for j in range(i, len(typ)):
F += tmap(i, j) * self.lib_datalm.alm2map(iky() ** 2 * get_BPBCoviP(i, j, 2))
Fyy += _lib_qlm.map2alm(F)
F *= 0
t.checkpoint(" Fyy , part 2")
for i in range(len(typ)):
for j in range(i, len(typ)):
F += tmap(i, j) * self.lib_datalm.alm2map(iky() * ikx() * get_BPBCoviP(i, j, 2))
Fxy += _lib_qlm.map2alm(F)
t.checkpoint(" Fxy , part 2")
facunits = -2. / np.sqrt(np.prod(self.lsides))
ret = xylms_to_phiOmegalm(_lib_qlm, Fxx.real * facunits, Fyy.real * facunits, Fxy.real * facunits)
if cls_obs is None and cls_weights is None and cls_filt is None:
fname = os.path.join(self.lib_dir, '%s_resplm_%sCls.npy' % (typ, {True: 'len', False: 'unl'}[use_cls_len]))
np.save(fname, ret)
return np.array([lib_qlm.udgrade(_lib_qlm, _a) for _a in np.load(fname, mmap_mode='r')])
return np.array([lib_qlm.udgrade(_lib_qlm, _a) for _a in ret])
[docs] def get_mfrespcls(self, typ, lib_qlm, use_cls_len=True):
r"""Linear mean-field response :math:`R_L` of the unnormalized quadratic estimators (gradient and curl components)
This deflection-induced mean-field linear response is
:math:`\frac{\delta g^{\rm MF}(x)}{\delta \alpha(y)} = \frac 12 \frac{\delta \ln \rm{Cov}}{\delta \alpha(x) \delta \alpha(y)} = R(x- y)`
See https://arxiv.org/abs/1704.08230
The expected normalised MF spectrum is then :math:`\frac{R_L^2}{\mathcal R_L^2} C_L^{\hat g\hat g}`
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
use_cls_len: use lensed CMB cls if true, unlensed if not (default)
"""
return [lib_qlm.bin_realpart_inell(_r) for _r in self.get_mfresplms(typ, lib_qlm, use_cls_len=use_cls_len)]
def get_mfresplms(self, typ, lib_qlm, use_cls_len=False, recache=False):
assert self.lib_skyalm.shape == self.lib_datalm.shape
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
_lib_qlm = ell_mat.ffs_alm_pyFFTW(self.lib_skyalm.ell_mat,
filt_func=lambda ell: (ell <= 2 * self.lib_skyalm.ellmax))
fname = os.path.join(self.lib_dir, '%s_MFresplm_%s.npy' % (typ, {True: 'len', False: 'unl'}[use_cls_len]))
if not os.path.exists(fname) or recache:
def get_K(i, j):
# B Covi B
return self.lib_datalm.almxfl(self._get_pmati(typ, i, j, use_cls_len=use_cls_len),
self.cl_transf ** 2)
def get_xiK(i, j):
# xi K
ret = pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, 0) * self._upg(get_K(0, j))
for k in range(1, len(typ)):
ret += pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, k) * self._upg(get_K(k, j))
return ret
_2map = lambda alm: self.lib_skyalm.alm2map(alm)
_dat2map = lambda alm: self.lib_datalm.alm2map(alm)
_2qlm = lambda _map: _lib_qlm.map2alm(_map)
ik = lambda ax: self.lib_skyalm.get_ikx() if ax == 1 else self.lib_skyalm.get_iky()
Fxx = np.zeros(_lib_qlm.alm_size, dtype=complex)
Fyy = np.zeros(_lib_qlm.alm_size, dtype=complex)
Fxy = np.zeros(_lib_qlm.alm_size, dtype=complex)
for i in range(len(typ)): # (xia K)_ij(xib K)_ji
for j in range(len(typ)):
xiK1 = get_xiK(i, j)
xiK2 = get_xiK(j, i)
Fxx += _2qlm(_2map(xiK1 * ik(1)) * (_2map(xiK2 * ik(1))))
Fyy += _2qlm(_2map(xiK1 * ik(0)) * (_2map(xiK2 * ik(0))))
Fxy += _2qlm(_2map(xiK1 * ik(1)) * (_2map(xiK2 * ik(0))))
# adding to that (K) (xi K xi - xi) =
def get_xiKxi_xi(i, j):
ret = get_xiK(i, 0) * pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, 0, j)
for k in range(1, len(typ)):
ret += get_xiK(i, k) * pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, k, j)
ret -= pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, j)
return ret
for i in range(len(typ)): # (K) (xi K xi - xi)
for j in range(len(typ)):
A = _dat2map(get_K(i, j))
B = get_xiKxi_xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
assert _lib_qlm.reduced_ellmat()[0] == 0
Fxx -= Fxx[0]
Fyy -= Fyy[0]
Fxy -= Fxy[0]
facunits = 1. / np.sqrt(np.prod(self.lsides))
ret = xylms_to_phiOmegalm(_lib_qlm, Fxx.real * facunits, Fyy.real * facunits, Fxy.real * facunits)
np.save(fname, ret)
print("Cached " + fname)
return np.array([lib_qlm.udgrade(_lib_qlm, _a) for _a in np.load(fname)])
[docs] def get_dmfrespcls(self, typ, cmb_dcls, lib_qlm, use_cls_len=False):
r"""Variation of the linear mean-field response :math:`R_L` of the unnormalized quadratic estimators
Variation of *get_mfrespcls* given input CMB spectra variation
This deflection-induced mean-field linear response is
:math:`\frac{\delta g^{\rm MF}(x)}{\delta \alpha(y)} = \frac 12 \frac{\delta \ln \rm{Cov}}{\delta \alpha(x) \delta \alpha(y)} = R(x- y)`
See https://arxiv.org/abs/1704.08230
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
cmb_dcls(dict): CMB spectra variations
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
use_cls_len: use lensed CMB cls if true, unlensed if not (default)
"""
return [lib_qlm.bin_realpart_inell(_r) for _r in
self.get_dmfresplms(typ, cmb_dcls, lib_qlm, use_cls_len=use_cls_len)]
def get_dmfresplms(self, typ, cmb_dcls, lib_qlm, use_cls_len=False, recache=False):
assert self.lib_skyalm.shape == self.lib_datalm.shape
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
_lib_qlm = ell_mat.ffs_alm_pyFFTW(self.lib_skyalm.ell_mat, filt_func=lambda ell: (ell <= 2 * self.lib_skyalm.ellmax))
# FIXME dclhash !
print("!!!! dMFresplms::cmb_dcls hash is missing here !! ?")
fname = os.path.join(self.lib_dir, '%s_dMFresplm_%s.npy' % (typ, {True: 'len', False: 'unl'}[use_cls_len]))
if not os.path.exists(fname) or recache:
def mu(a, b, i, j):
ret = a(i, 0) * b(0, j)
for _k in range(1, len(typ)):
ret += a(i, _k) * b(_k, j)
return ret
def dxi(i, j):
return pmat.get_unlPmat_ij(typ, self.lib_skyalm, cmb_dcls, i, j)
def xi(i, j):
return pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, j)
def K(i, j):
return self._upg(
self.lib_datalm.almxfl(self._get_pmati(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2))
xiK = lambda i, j: mu(xi, K, i, j)
Kxi = lambda i, j: mu(K, xi, i, j)
dxiK = lambda i, j: mu(dxi, K, i, j)
dK = lambda i, j: (-mu(K, dxiK, i, j))
Kdxi = lambda i, j: mu(K, dxi, i, j)
xidK = lambda i, j: mu(xi, dK, i, j)
dKxi = lambda i, j: mu(dK, xi, i, j)
xiKdxi = lambda i, j: mu(xi, Kdxi, i, j)
dxiKxi = lambda i, j: mu(dxi, Kxi, i, j)
xiKxi_xi = lambda i, j: (mu(xiK, xi, i, j) - xi(i, j))
xidKxi = lambda i, j: mu(xi, dKxi, i, j)
d_xiK = lambda i, j: (dxiK(i, j) + xidK(i, j))
d_xiKxi_xi = lambda i, j: (xidKxi(i, j) + dxiKxi(i, j) + xiKdxi(i, j) - dxi(i, j))
_2map = lambda alm: self.lib_skyalm.alm2map(alm)
_2qlm = lambda _map: _lib_qlm.map2alm(_map)
ik = lambda ax: self.lib_skyalm.get_ikx() if ax == 1 else self.lib_skyalm.get_iky()
Fxx = np.zeros(_lib_qlm.alm_size, dtype=complex)
Fyy = np.zeros(_lib_qlm.alm_size, dtype=complex)
Fxy = np.zeros(_lib_qlm.alm_size, dtype=complex)
for i in range(len(typ)): # d[(xia K)_ij(xib K)_ji]
for j in range(len(typ)):
xiK1 = d_xiK(i, j)
xiK2 = xiK(j, i)
Fxx += _2qlm(_2map(xiK1 * ik(1)) * (_2map(xiK2 * ik(1))))
Fyy += _2qlm(_2map(xiK1 * ik(0)) * (_2map(xiK2 * ik(0))))
Fxy += _2qlm(_2map(xiK1 * ik(1)) * (_2map(xiK2 * ik(0))))
xiK1 = xiK(i, j)
xiK2 = d_xiK(j, i)
Fxx += _2qlm(_2map(xiK1 * ik(1)) * (_2map(xiK2 * ik(1))))
Fyy += _2qlm(_2map(xiK1 * ik(0)) * (_2map(xiK2 * ik(0))))
Fxy += _2qlm(_2map(xiK1 * ik(1)) * (_2map(xiK2 * ik(0))))
# adding to that (K) (xi K xi - xi) =
for i in range(len(typ)): # d [(K) (xi K xi - xi)]
for j in range(len(typ)):
A = _2map(dK(i, j))
B = xiKxi_xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
A = _2map(K(i, j))
B = d_xiKxi_xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
assert _lib_qlm.reduced_ellmat()[0] == 0
Fxx -= Fxx[0]
Fyy -= Fyy[0]
Fxy -= Fxy[0]
facunits = 1. / np.sqrt(np.prod(self.lsides))
ret = xylms_to_phiOmegalm(_lib_qlm, Fxx.real * facunits, Fyy.real * facunits, Fxy.real * facunits)
np.save(fname, ret)
print("Cached " + fname)
return np.array([lib_qlm.udgrade(_lib_qlm, _a) for _a in np.load(fname)])
[docs] def get_lndetcurv(self, typ, lib_qlm, get_A=None, use_cls_len=False):
r"""(Part of) covariance log-det second variation
Harmonic transform of :math:`\frac 1 2 \rm{Tr} A \frac{\delta^2 \rm{Cov}}{\delta \alpha(x) \delta \alpha(y)}` for vanishing deflection
if *get_A* argument is not set, output becomes :math:`\frac 1 2 \rm{Tr} \rm{Cov}^{-1} \delta^2 \rm{Cov}`
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
get_A(optional, callable): 1st trace operator. Defaults to :math:`\rm{Cov}^{-1}`
use_cls_len: use lensed or unlensed cls in QE weights (numerator), defaults to lensed cls
Returns:
gradient and curl Fourier components
"""
_lib_qlm = lib_qlm
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
if get_A is None:
get_A = self._get_pmati
def get_K(i, j):
# B^t A B
return self._upg(self.lib_datalm.almxfl(get_A(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2))
_2qlm = lambda _map: _lib_qlm.map2alm(_map).real
_2map = lambda alm: self.lib_skyalm.alm2map(alm)
ik = lambda ax: self.lib_skyalm.get_ikx() if ax == 1 else self.lib_skyalm.get_iky()
Fxx = np.zeros(_lib_qlm.alm_size, dtype=float)
Fyy = np.zeros(_lib_qlm.alm_size, dtype=float)
Fxy = np.zeros(_lib_qlm.alm_size, dtype=float)
for i in range(len(typ)): # (K) (xi_ab)
for j in range(i, len(typ)): # This is i-j symmetric
fac = 2 - (i == j)
K = _2map(get_K(i, j))
xi = pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, j, i)
Fxx -= fac * _2qlm(K * _2map(xi * ik(1) * ik(1)))
Fyy -= fac * _2qlm(K * _2map(xi * ik(0) * ik(0)))
Fxy -= fac * _2qlm(K * _2map(xi * ik(1) * ik(0)))
assert _lib_qlm.reduced_ellmat()[0] == 0
Fxx -= Fxx[0]
Fyy -= Fyy[0]
Fxy -= Fxy[0]
facunits = 1. / np.sqrt(np.prod(self.lsides))
return xylms_to_phiOmegalm(_lib_qlm, Fxx * facunits, Fyy * facunits, Fxy * facunits)
[docs] def get_dlndetcurv(self, typ, cmb_dcls, lib_qlm, K=None, dK=None, use_cls_len=False):
r"""Variation of *get_lndetcurv* with respect to CMB spectra variation
*get_lndetcurv* is the harmonic transform of :math:`\frac 1 2 \rm{Tr} A \frac{\delta^2 \rm{Cov}}{\delta \alpha(x) \delta \alpha(y)}` for vanishing deflection
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
cmb_dcls(dict): CMB spectra variation
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
use_cls_len: use lensed or unlensed cls in QE weights (numerator), defaults to lensed cls
Returns:
gradient and curl Fourier components
"""
#Finite differences test ok.
_lib_qlm = lib_qlm
if K is None: assert dK is None
if K is not None: assert dK is not None
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
def mu(a, b, i, j):
ret = a(i, 0) * b(0, j)
for _k in range(1, len(typ)):
ret += a(i, _k) * b(_k, j)
return ret
def dxi(i, j):
return pmat.get_unlPmat_ij(typ, self.lib_skyalm, cmb_dcls, i, j)
def xi(i, j):
return pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, j)
if K is None:
K = lambda i, j: (self._upg(
self.lib_datalm.almxfl(self._get_pmati(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2)))
if dK is None:
dxiK = lambda i, j: mu(dxi, K, i, j)
dK = lambda i, j: (-mu(K, dxiK, i, j))
_2qlm = lambda _map: _lib_qlm.map2alm(_map).real
_2map = lambda alm: self.lib_skyalm.alm2map(alm)
ik = lambda ax: self.lib_skyalm.get_ikx() if ax == 1 else self.lib_skyalm.get_iky()
Fxx = np.zeros(_lib_qlm.alm_size, dtype=float)
Fyy = np.zeros(_lib_qlm.alm_size, dtype=float)
Fxy = np.zeros(_lib_qlm.alm_size, dtype=float)
for i in range(len(typ)): # -(dK) (xi_ab) - (K)(dxi_ab)
for j in range(len(typ)):
A = _2map(dK(i, j))
B = xi(j, i)
Fxx -= _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy -= _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy -= _2qlm(A * _2map(B * ik(1) * ik(0)))
A = _2map(K(i, j))
B = dxi(j, i)
Fxx -= _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy -= _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy -= _2qlm(A * _2map(B * ik(1) * ik(0)))
assert _lib_qlm.reduced_ellmat()[0] == 0
Fxx -= Fxx[0]
Fyy -= Fyy[0]
Fxy -= Fxy[0]
facunits = 1. / np.sqrt(np.prod(self.lsides))
return xylms_to_phiOmegalm(_lib_qlm, Fxx, Fyy, Fxy) * facunits
[docs] def get_fishertrace(self, typ, lib_qlm, get_A1=None, get_A2=None, use_cls_len=True):
r"""Fisher-matrix like trace
:math:`\frac 1 2 \textrm{Tr}\: A_1 \:\frac{\delta\rm{Cov}}{\delta\phi} \:A_2 \:\frac{\delta\rm{Cov}}{\delta\phi}`
if operators :math:`A_1` or :math:`A_2` not set, they reduce to :math:`\rm{Cov}^{-1}`, and the result is the Fisher info for vanishing deflection
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
get_A1(optional, callable): 1st trace operator
get_A2(optional, callable): 2nd trace operator
use_cls_len: use lensed or unlensed cls in QE weights (numerator), defaults to lensed cls
"""
t = timer(_timed, prefix=__name__, suffix=' curvpOlm')
_lib_qlm = lib_qlm
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
if get_A1 is None:
get_A1 = self._get_pmati
if get_A2 is None:
get_A2 = self._get_pmati
def get_K1(i, j):
return self.lib_datalm.almxfl(get_A1(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2)
def get_K2(i, j):
return self.lib_datalm.almxfl(get_A2(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2)
def get_xiK(mat, i, j):
assert mat in [1, 2], mat
K = get_K1 if mat == 1 else get_K2
ret = pmat.get_unlPmat_ij(typ, self.lib_datalm, cls_cmb, i, 0) * K(0, j)
for _k in range(1, len(typ)):
ret += pmat.get_unlPmat_ij(typ, self.lib_datalm, cls_cmb, i, _k) * K(_k, j)
return ret
def get_xiKxi(mat, i, j):
assert mat in [1, 2], mat
ret = get_xiK(mat, i, 0) * pmat.get_unlPmat_ij(typ, self.lib_datalm, cls_cmb, 0, j)
for _k in range(1, len(typ)):
ret += get_xiK(mat, i, _k) * pmat.get_unlPmat_ij(typ, self.lib_datalm, cls_cmb, _k, j)
return ret
_2qlm = lambda _map: _lib_qlm.map2alm(_map).real
_2map = lambda alm: self.lib_datalm.alm2map(alm)
k = lambda ax: self.lib_datalm.get_ikx() if ax == 1 else self.lib_datalm.get_iky()
Fxx = np.zeros(_lib_qlm.alm_size, dtype=float)
Fyy = np.zeros(_lib_qlm.alm_size, dtype=float)
Fxy = np.zeros(_lib_qlm.alm_size, dtype=float)
Fyx = np.zeros(_lib_qlm.alm_size, dtype=float)
# -1/2 (xia B^t A1 B)(xib B^t A2 B) -1/2(xi_b B^t A1 B)(xi_a B^t A2 B)
# -1/2 (B^t A1 B)(xia B^t A2 B xib) -1/2(xia B^t A1 B xib)(B^t A2 B)
for i in range(len(typ)): # (K) (xi_ab)
for j in range(len(typ)):
_K1 = _2map(get_K1(i, j))
xiK2xi = get_xiKxi(2, j, i) # (B^t A1 B)(xia B^t A2 B xib)
Fxx += _2qlm(_K1 * _2map(xiK2xi * k(1) * k(1)))
Fyy += _2qlm(_K1 * _2map(xiK2xi * k(0) * k(0)))
Fxy += _2qlm(_K1 * _2map(xiK2xi * k(1) * k(0)))
Fyx += _2qlm(_K1 * _2map(xiK2xi * k(0) * k(1)))
del _K1, xiK2xi
_K2 = _2map(get_K2(i, j))
xiK1xi = get_xiKxi(1, j, i) # (xia B^t A B xib)(B^t A2 B)
Fxx += _2qlm(_K2 * _2map(xiK1xi * k(1) * k(1)))
Fyy += _2qlm(_K2 * _2map(xiK1xi * k(0) * k(0)))
Fxy += _2qlm(_K2 * _2map(xiK1xi * k(1) * k(0)))
Fyx += _2qlm(_K2 * _2map(xiK1xi * k(0) * k(1)))
del _K2, xiK1xi
xiK1 = get_xiK(1, i, j)
xiK2 = get_xiK(2, j, i) # (xia B^t A1 B)(xib B^t A2 B)
Fxx += _2qlm(_2map(xiK1 * k(1)) * _2map(xiK2 * k(1)))
Fyy += _2qlm(_2map(xiK1 * k(0)) * _2map(xiK2 * k(0)))
Fxy += _2qlm(_2map(xiK1 * k(1)) * _2map(xiK2 * k(0)))
Fyx += _2qlm(_2map(xiK1 * k(0)) * _2map(xiK2 * k(1)))
# K1xi = get_xiK(1, i, j)
# K2xi = get_xiK(2, j, i)#(xi_a B^t A2 B)(xi_b B^t A1 B)
Fxx += _2qlm(_2map(xiK2 * k(1)) * _2map(xiK1 * k(1)))
Fyy += _2qlm(_2map(xiK2 * k(0)) * _2map(xiK1 * k(0)))
Fxy += _2qlm(_2map(xiK2 * k(1)) * _2map(xiK1 * k(0)))
Fyx += _2qlm(_2map(xiK2 * k(0)) * _2map(xiK1 * k(1)))
# del K1xi,K2xi
t.checkpoint('%s %s done' % (i, j))
facunits = -0.5 * (1. / np.sqrt(np.prod(self.lsides))) # N0-like norm
return xylms_to_phiOmegalm(_lib_qlm, Fxx * facunits, Fyy * facunits, Fxy * facunits, Fyx=Fyx * facunits)
[docs] def get_dfishertrace(self, typ, cmb_dcls, lib_qlm, K1=None, K2=None, dK1=None, dK2=None,
use_cls_len=True, recache=False):
r"""Variation of Fisher-trace *get_fishertrace* with respect to input CMB variation
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
*cmb_dcls* (dict): CMB spectra variation
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
use_cls_len: use lensed or unlensed cls in QE weights (numerator), defaults to lensed cls
"""
if K1 is not None: assert dK1 is not None
if K2 is not None: assert dK2 is not None
t = timer(_timed, prefix=__name__, suffix=' curvpOlm')
_lib_qlm = lib_qlm
cls_cmb = self.cls_len if use_cls_len else self.cls_unl
def mu(a, b, i, j):
ret = a(i, 0) * b(0, j)
for _k in range(1, len(typ)):
ret += a(i, _k) * b(_k, j)
return ret
def dxi(i, j):
return pmat.get_unlPmat_ij(typ, self.lib_skyalm, cmb_dcls, i, j)
def xi(i, j):
return pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls_cmb, i, j)
if K1 is None:
assert dK1 is None
K1 = lambda i, j: self._upg(
self.lib_datalm.almxfl(self._get_pmati(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2))
if K2 is None:
assert dK2 is None
K2 = lambda i, j: self._upg(
self.lib_datalm.almxfl(self._get_pmati(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2))
xiK1 = lambda i, j: mu(xi, K1, i, j)
xiK2 = lambda i, j: mu(xi, K2, i, j)
dxiK1 = lambda i, j: mu(dxi, K1, i, j)
dxiK2 = lambda i, j: mu(dxi, K2, i, j)
if dK1 is None: dK1 = lambda i, j: (-mu(K1, dxiK1, i, j))
if dK2 is None: dK2 = lambda i, j: (-mu(K2, dxiK2, i, j))
xiK1xi = lambda i, j: mu(xiK1, xi, i, j)
xiK2xi = lambda i, j: mu(xiK2, xi, i, j)
xidK1 = lambda i, j: mu(xi, dK1, i, j)
xidK2 = lambda i, j: mu(xi, dK2, i, j)
d_xiK1 = lambda i, j: (dxiK1(i, j) + xidK1(i, j))
d_xiK2 = lambda i, j: (dxiK2(i, j) + xidK2(i, j))
d_xiK1xi = lambda i, j: (mu(d_xiK1, xi, i, j) + mu(xiK1, dxi, i, j))
d_xiK2xi = lambda i, j: (mu(d_xiK2, xi, i, j) + mu(xiK2, dxi, i, j))
_2qlm = lambda _map: _lib_qlm.map2alm(_map).real
_2map = lambda alm: self.lib_skyalm.alm2map(alm)
ik = lambda ax: self.lib_skyalm.get_ikx() if ax == 1 else self.lib_skyalm.get_iky()
Fxx = np.zeros(_lib_qlm.alm_size, dtype=float)
Fyy = np.zeros(_lib_qlm.alm_size, dtype=float)
Fxy = np.zeros(_lib_qlm.alm_size, dtype=float)
Fyx = np.zeros(_lib_qlm.alm_size, dtype=float)
# -1/2 (xi,a K1)(xi,b K2) -1/2(xi,b K1)(xi_a K2)
# -1/2 (K1)(xi,a K2 xib) -1/2(xi,a K1 xib)(K2)
for i in range(len(typ)):
for j in range(len(typ)):
# -1/2 d[ (xi,a K1)(xi,b K2)]
A = d_xiK1(i, j)
B = xiK2(j, i)
Fxx += _2qlm(_2map(A * ik(1)) * _2map(B * ik(1)))
Fyy += _2qlm(_2map(A * ik(0)) * _2map(B * ik(0)))
Fxy += _2qlm(_2map(A * ik(1)) * _2map(B * ik(0)))
Fyx += _2qlm(_2map(A * ik(0)) * _2map(B * ik(1)))
A = xiK1(i, j)
B = d_xiK2(j, i)
Fxx += _2qlm(_2map(A * ik(1)) * _2map(B * ik(1)))
Fyy += _2qlm(_2map(A * ik(0)) * _2map(B * ik(0)))
Fxy += _2qlm(_2map(A * ik(1)) * _2map(B * ik(0)))
Fyx += _2qlm(_2map(A * ik(0)) * _2map(B * ik(1)))
# -1/2 d[(xi_a K2)(xi,b K1)]
A = d_xiK2(i, j)
B = xiK1(j, i)
Fxx += _2qlm(_2map(A * ik(1)) * _2map(B * ik(1)))
Fyy += _2qlm(_2map(A * ik(0)) * _2map(B * ik(0)))
Fxy += _2qlm(_2map(A * ik(1)) * _2map(B * ik(0)))
Fyx += _2qlm(_2map(A * ik(0)) * _2map(B * ik(1)))
A = xiK2(i, j)
B = d_xiK1(j, i)
Fxx += _2qlm(_2map(A * ik(1)) * _2map(B * ik(1)))
Fyy += _2qlm(_2map(A * ik(0)) * _2map(B * ik(0)))
Fxy += _2qlm(_2map(A * ik(1)) * _2map(B * ik(0)))
Fyx += _2qlm(_2map(A * ik(0)) * _2map(B * ik(1)))
# -1/2 (K1)(xi,a K2 xib)
A = _2map(dK1(i, j))
B = xiK2xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
Fyx += _2qlm(A * _2map(B * ik(0) * ik(1)))
A = _2map(K1(i, j))
B = d_xiK2xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
Fyx += _2qlm(A * _2map(B * ik(0) * ik(1)))
# -1/2(xi,a K1 xib)(K2)
A = _2map(dK2(i, j))
B = xiK1xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
Fyx += _2qlm(A * _2map(B * ik(0) * ik(1)))
A = _2map(K2(i, j))
B = d_xiK1xi(j, i)
Fxx += _2qlm(A * _2map(B * ik(1) * ik(1)))
Fyy += _2qlm(A * _2map(B * ik(0) * ik(0)))
Fxy += _2qlm(A * _2map(B * ik(1) * ik(0)))
Fyx += _2qlm(A * _2map(B * ik(0) * ik(1)))
t.checkpoint('%s %s done' % (i, j))
facunits = -0.5 * (1. / np.sqrt(np.prod(self.lsides))) # N0-like norm
return xylms_to_phiOmegalm(_lib_qlm, Fxx * facunits, Fyy * facunits, Fxy * facunits, Fyx=Fyx * facunits)
[docs] def get_plmlikcurvcls(self, typ, datcmb_cls, lib_qlm, use_cls_len=True, recache=False, dat_only=False):
r"""Returns realization-dependent second variation (curvature) of lensing deflection likelihood
Second variation of
:math:`\frac 12 X^{\rm dat} \rm{Cov_\alpha}^{-1} X^{\rm dat} + \frac 1 2 \ln \det \rm{Cov_\alpha}`
with respect to deflection for custom data CMB spectra. See https://arxiv.org/abs/1808.10349
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
datcmb_cls(dict): data CMB spectra (if they do match lensed cls attribute, the result should be inverse lensing Gaussian noise bias)
lib_qlm: *ffs_alm* instance describing the lensing alm's arrays.
use_cls_len: use lensed cls in filtering (defaults to True)
"""
fname = os.path.join(self.lib_dir, '%splmlikcurv_cls%s_cldat' % (typ, {True: 'len', False: 'unl'}[use_cls_len]) \
+ cls_hash(datcmb_cls, lmax=self.lib_datalm.ellmax) + '.dat')
if not os.path.exists(fname) or recache:
def get_dcov(typ, i, j, use_cls_len=True):
# Covi (datCov - Cov) = Covi datCov - 1
ret = self._get_pmati(typ, i, 0, use_cls_len=use_cls_len) \
* pmat.get_datPmat_ij(typ, self.lib_datalm, datcmb_cls, self.cl_transf, self.cls_noise, 0, j)
for _k in range(1, len(typ)):
ret += self._get_pmati(typ, i, _k, use_cls_len=use_cls_len) \
* pmat.get_datPmat_ij(typ, self.lib_datalm, datcmb_cls, self.cl_transf, self.cls_noise, _k, j)
if i == j and not dat_only:
ret -= 1.
return ret
def get_dcovd(typ, i, j, use_cls_len=True):
# Covi(datCov - Cov)Covi
ret = get_dcov(typ, i, 0, use_cls_len=use_cls_len) \
* self._get_pmati(typ, 0, j, use_cls_len=use_cls_len)
for _k in range(1, len(typ)):
ret += get_dcov(typ, i, _k, use_cls_len=use_cls_len) \
* self._get_pmati(typ, _k, j, use_cls_len=use_cls_len)
return ret
def get_d2cov(typ, i, j, use_cls_len=True):
# Cov^-1 (2 datCov - Cov) = 2 Covi datCov - 1
ret = self._get_pmati(typ, i, 0, use_cls_len=use_cls_len) \
* pmat.get_datPmat_ij(typ, self.lib_datalm, datcmb_cls, self.cl_transf, self.cls_noise, 0, j)
for _k in range(1, len(typ)):
ret += self._get_pmati(typ, i, _k, use_cls_len=use_cls_len) \
* pmat.get_datPmat_ij(typ, self.lib_datalm, datcmb_cls, self.cl_transf, self.cls_noise, _k, j)
if i == j and not dat_only:
return 2 * ret - 1.
return 2 * ret
def get_idCi(typ, i, j, use_cls_len=True):
# Cov^-1 (2 datCov - Cov) Cov^-1
ret = get_d2cov(typ, i, 0, use_cls_len=use_cls_len) * self._get_pmati(typ, 0, j,
use_cls_len=use_cls_len)
for _k in range(1, len(typ)):
ret += get_d2cov(typ, i, _k, use_cls_len=use_cls_len) * self._get_pmati(typ, _k, j,
use_cls_len=use_cls_len)
return ret
# First term :
_lib_qlm = ell_mat.ffs_alm_pyFFTW(lib_qlm.ell_mat, filt_func=lambda ell: ell >= 0)
curv = -self.get_lndetcurv(typ, _lib_qlm, get_A=get_dcovd, use_cls_len=use_cls_len)
Fish = self.get_fishertrace(typ, _lib_qlm, get_A1=get_idCi, use_cls_len=use_cls_len)
ret = np.array([_lib_qlm.bin_realpart_inell(_r) for _r in (curv + Fish)[:2]])
np.savetxt(fname, ret.transpose(),
header='second variation (curvature) of <1/2Xdat Covi Xdat + 1/2 ln det Cov>_dat')
print("Cached " + fname)
print("loading " + fname)
cond = lib_qlm.ell_mat.get_Nell() > 0
ret = np.array([(_r * cond)[:lib_qlm.ellmax + 1] for _r in np.loadtxt(fname).transpose()])
return ret
[docs] def get_plmRDlikcurvcls(self, typ, datcls_obs, lib_qlm, use_cls_len=True, use_cls_len_D=None, recache=False,
dat_only=False):
r"""Returns realization-dependent second variation (curvature) of lensing deflection likelihood
Second variation of
:math:`\frac 12 X^{\rm dat} \rm{Cov_\alpha}^{-1} X^{\rm dat} + \frac 1 2 \ln \det \rm{Cov_\alpha}`
with respect to deflection for custom data spectra. See https://arxiv.org/abs/1808.10349
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
datcls_obs(dict): data (beam-convovled) CMB and noise spectra
lib_qlm: *ffs_alm* instance describing the lensing alm's arrays
use_cls_len: use lensed cls in filtering (defaults to True)
"""
# FIXME : The rel . agreement with 1/N0 is only 1e-6 not double prec., not sure what is going on.
fname = os.path.join(self.lib_dir, '%splmRDlikcurv_cls%s_cldat' % (typ, {True: 'len', False: 'unl'}[use_cls_len]) \
+ cls_hash(datcls_obs, lmax=self.lib_datalm.ellmax) + '.dat')
if dat_only:
fname = fname.replace('.dat', 'datonly.dat')
assert 'datonly' in fname
if use_cls_len_D is not None and use_cls_len_D != use_cls_len:
fname = fname.replace('.dat', '_clsD%s.dat' % {True: 'len', False: 'unl'}[use_cls_len_D])
else:
use_cls_len_D = use_cls_len
if not os.path.exists(fname) or recache:
def get_dcov(typ, i, j, use_cls_len=use_cls_len):
# Covi (datCov - Cov) = Covi datCov - 1
ret = self._get_pmati(typ, i, 0, use_cls_len=use_cls_len) \
* pmat.get_unlPmat_ij(typ, self.lib_datalm, datcls_obs, 0, j)
for _k in range(1, len(typ)):
ret += self._get_pmati(typ, i, _k, use_cls_len=use_cls_len) \
* pmat.get_unlPmat_ij(typ, self.lib_datalm, datcls_obs, _k, j)
if i == j and not dat_only:
ret -= 1.
return ret
def get_dcovd(typ, i, j, use_cls_len=use_cls_len):
# Covi(datCov - Cov)Covi
ret = get_dcov(typ, i, 0, use_cls_len=use_cls_len) \
* self._get_pmati(typ, 0, j, use_cls_len=use_cls_len)
for _k in range(1, len(typ)):
ret += get_dcov(typ, i, _k, use_cls_len=use_cls_len) \
* self._get_pmati(typ, _k, j, use_cls_len=use_cls_len)
return ret
def get_d2cov(typ, i, j, use_cls_len=use_cls_len):
# Cov^-1 (2 datCov - Cov) = 2 Covi datCov - 1
ret = self._get_pmati(typ, i, 0, use_cls_len=use_cls_len) \
* pmat.get_unlPmat_ij(typ, self.lib_datalm, datcls_obs, 0, j)
for _k in range(1, len(typ)):
ret += self._get_pmati(typ, i, _k, use_cls_len=use_cls_len) \
* pmat.get_unlPmat_ij(typ, self.lib_datalm, datcls_obs, _k, j)
if i == j and not dat_only:
return 2 * ret - 1.
return 2 * ret
def get_idCi(typ, i, j, use_cls_len=use_cls_len):
# Cov^-1 (2 datCov - Cov) Cov^-1
ret = get_d2cov(typ, i, 0, use_cls_len=use_cls_len) * self._get_pmati(typ, 0, j,
use_cls_len=use_cls_len)
for _k in range(1, len(typ)):
ret += get_d2cov(typ, i, _k, use_cls_len=use_cls_len) * self._get_pmati(typ, _k, j,
use_cls_len=use_cls_len)
return ret
# First term :
_lib_qlm = ell_mat.ffs_alm_pyFFTW(lib_qlm.ell_mat, filt_func=lambda ell: ell >= 0)
curv = -self.get_lndetcurv(typ, _lib_qlm, get_A=get_dcovd, use_cls_len=use_cls_len_D)
Fish = self.get_fishertrace(typ, _lib_qlm, get_A1=get_idCi, use_cls_len=use_cls_len)
ret = np.array([_lib_qlm.bin_realpart_inell(_r) for _r in (curv + Fish)[:2]])
np.savetxt(fname, ret.transpose(),
header='second variation (curvature) of <1/2Xdat Covi Xdat + 1/2 ln det Cov>_dat')
# retc = _lib_qlm.bin_realpart_inell(curv[0])
# retF = _lib_qlm.bin_realpart_inell(Fish[0])
# np.savetxt(fname, np.array([retc, retF]).transpose(),
# header='second variation (curvature) of <1/2Xdat Covi Xdat + 1/2 ln det Cov>_dat')
# print "FIXE THIS get_plm"
# ret = withD * np.array([_lib_qlm.bin_realpart_inell(_r) for _r in curv[:2]])
# if withF: ret += np.array([_lib_qlm.bin_realpart_inell(_r) for _r in Fish[:2]])
# return ret
print("loading ", fname)
cond = lib_qlm.ell_mat.get_Nell() > 0
ret = np.array([(_r * cond)[:lib_qlm.ellmax + 1] for _r in np.loadtxt(fname).transpose()])
return ret
[docs] def get_dplmRDlikcurvcls(self, typ, cmb_dcls, datcls_obs, lib_qlm, use_cls_len=True, recache=False,
dat_only=False):
"""
derivative of plmRDlikcurvcls (data held fixed)
Finite difference test OK (+ much faster)
"""
# FIXME : this is like really, really, really inefficient.
fname = os.path.join(self.lib_dir, '%sdplmRDlikcurv_cls%s_cldat' % (typ, {True: 'len', False: 'unl'}[use_cls_len]) \
+ cls_hash(datcls_obs, lmax=self.lib_datalm.ellmax) + cls_hash(cmb_dcls) + '.dat')
if dat_only:
fname = fname.replace('.dat', 'datonly.dat')
assert 'datonly' in fname
if not os.path.exists(fname) or recache:
# K going into trace should be 2 K datcls K - K
# K going into lndet should be K datcls K
# dK is -K dxi K
def mu(a, b, i, j):
ret = a(i, 0) * b(0, j)
for _k in range(1, len(typ)):
ret += a(i, _k) * b(_k, j)
return ret
dxi = lambda i, j: pmat.get_unlPmat_ij(typ, self.lib_skyalm, cmb_dcls, i, j)
K = lambda i, j: self._upg(
self.lib_datalm.almxfl(self._get_pmati(typ, i, j, use_cls_len=use_cls_len), self.cl_transf ** 2))
dxiK = lambda i, j: mu(dxi, K, i, j)
datKi = lambda i, j: self._upg(
self.lib_datalm.almxfl(pmat.get_unlPmat_ij(typ, self.lib_datalm, datcls_obs, i, j),
cl_inverse(self.cl_transf ** 2)))
KdatKi = lambda i, j: mu(K, datKi, i, j)
datKiK = lambda i, j: mu(datKi, K, i, j)
dK = lambda i, j: (-mu(K, dxiK, i, j))
KdatKiK = lambda i, j: mu(KdatKi, K, i, j)
if not dat_only:
Kdet = lambda i, j: (KdatKiK(i, j) - K(i, j))
dKdet = lambda i, j: (mu(dK, datKiK, i, j) + mu(KdatKi, dK, i, j) - dK(i, j))
Ktrace = lambda i, j: (2 * KdatKiK(i, j) - K(i, j))
dKtrace = lambda i, j: (2 * (mu(dK, datKiK, i, j) + mu(KdatKi, dK, i, j)) - dK(i, j))
else:
Kdet = lambda i, j: KdatKiK(i, j)
dKdet = lambda i, j: (mu(dK, datKiK, i, j) + mu(KdatKi, dK, i, j))
Ktrace = lambda i, j: 2 * Kdet(i, j)
dKtrace = lambda i, j: 2 * dKdet(i, j)
# First term :
_lib_qlm = ell_mat.ffs_alm_pyFFTW(lib_qlm.ell_mat, filt_func=lambda ell: ell >= 0)
curv = -self.get_dlndetcurv(typ, cmb_dcls, _lib_qlm, K=Kdet, dK=dKdet, use_cls_len=use_cls_len)
Fish = self.get_dfishertrace(typ, cmb_dcls, _lib_qlm, K1=Ktrace, dK1=dKtrace, use_cls_len=use_cls_len)
ret = np.array([_lib_qlm.bin_realpart_inell(_r) for _r in (curv + Fish)[:2]])
np.savetxt(fname, ret.transpose(),
header='second variation (curvature) of <1/2Xdat Covi Xdat + 1/2 ln det Cov>_dat')
print("cached ", fname)
print("loading ", fname)
cond = lib_qlm.ell_mat.get_Nell() > 0
return np.array([(_r * cond)[:lib_qlm.ellmax + 1] for _r in np.loadtxt(fname).transpose()])
[docs]class ffs_lencov_alm(ffs_diagcov_alm):
"""Extended *ffs_diagcov_alm* sub-class, adding a deflection field and its inverse
Args:
lib_dir: many things will be saved there
lib_datalm: *ffs_alm* instance containing mode filtering and flat-sky patch info
lib_skyalm: *ffs_alm* instance describing the sky modes.
cls_unl(dict): unlensed CMB cls
cls_len(dict): lensed CMB cls
cl_transf: instrument transfer function
cls_noise(dict): 't', 'q' and 'u' noise arrays
f: deflection field, forward operation (e.g. *ffs_displacement* instance)
fi: deflection field, backward operation (ideally the inverse f-deflection, *f.get_inverse()*)
"""
def __init__(self, lib_dir, lib_datalm, lib_skyalm, cls_unl, cls_len, cl_transf, cls_noise, f, fi,
init_rank=pbs.rank, init_barrier=pbs.barrier):
assert lib_datalm.ell_mat.lsides == lib_skyalm.ell_mat.lsides, (lib_datalm.ell_mat.lsides, lib_skyalm.ell_mat.lsides)
super(ffs_lencov_alm, self).__init__(lib_dir, lib_datalm, cls_unl, cls_len, cl_transf, cls_noise,
lib_skyalm=lib_skyalm, init_barrier=init_barrier, init_rank=init_rank)
self.lmax_dat = self.lib_datalm.ellmax
self.lmax_sky = self.lib_skyalm.ellmax
self.sky_shape = self.lib_skyalm.ell_mat.shape
# assert self.lmax_dat <= self.lmax_sky, (self.lmax_dat, self.lmax_sky)
for cl in self.cls_unl.values(): assert len(cl) > lib_skyalm.ellmax
assert f.shape == self.sky_shape and fi.shape == self.sky_shape, (f.shape, fi.shape, self.sky_shape)
assert f.lsides == self.lsides and fi.lsides == self.lsides, (f.lsides, fi.lsides, self.lsides)
self.fi = fi # inverse displacement
self.f = f # displacement
def hashdict(self):
h = {'lib_alm': self.lib_datalm.hashdict(), 'lib_skyalm': self.lib_skyalm.hashdict()}
for key in self.cls_noise.keys():
h['cls_noise ' + key] = npy_hash(self.cls_noise[key])
for key in self.cls_unl.keys():
h['cls_unl ' + key] = npy_hash(self.cls_unl[key])
for key in self.cls_len.keys():
h['cls_len ' + key] = npy_hash(self.cls_len[key])
h['cl_transf'] = npy_hash(self.cl_transf)
return h
[docs] def set_ffinv(self, f, fi):
"""Replace deflection-field attributes and its inverse
Args:
f: new deflection-field instance
fi: new inverse deflection-field instance
"""
assert f.shape == self.sky_shape and f.lsides == self.lsides, (f.shape, f.lsides)
assert fi.shape == self.sky_shape and fi.lsides == self.lsides, (fi.shape, fi.lsides)
assert hasattr(self, 'f') and hasattr(self, 'fi')
setattr(self, 'f', f)
setattr(self, 'fi', fi)
def apply(self, typ, alms, use_Pool=0):
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
ret = self._apply_signal(typ, alms, use_Pool=use_Pool)
ret += self.apply_noise(typ, alms)
return ret
def _apply_signal(self, typ, alms, use_Pool=0):
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
ret = np.empty_like(alms)
if use_Pool <= -100:
from lensit.gpu import apply_GPU
ablms = np.array([self.lib_datalm.almxfl(_a, self.cl_transf) for _a in alms])
apply_GPU.apply_FDxiDtFt_GPU_inplace(typ, self.lib_datalm, self.lib_skyalm, ablms,
self.f, self.fi, self.cls_unl)
for i in range(len(typ)):
ret[i] = self.lib_datalm.almxfl(ablms[i], self.cl_transf)
return ret
# could do with less
t = timer(_timed, prefix=__name__, suffix='apply_signal')
t.checkpoint("just started")
tempalms = np.empty(self._skyalms_shape(typ), dtype=complex)
for _i in range(len(typ)): # Lens with inverse and mult with determinant magnification.
tempalms[_i] = self.fi.lens_alm(self.lib_skyalm,
self._upg(self.lib_datalm.almxfl(alms[_i], self.cl_transf)),
lib_alm_out=self.lib_skyalm, mult_magn=True, use_Pool=use_Pool)
# NB : 7 new full sky alms for TQU in this routine - > 4 GB total for full sky lmax_sky = 6000.
t.checkpoint("backward lens + det magn")
skyalms = np.zeros_like(tempalms)
for j in range(len(typ)):
for i in range(len(typ)):
skyalms[i] += pmat.get_unlPmat_ij(typ, self.lib_skyalm, self.cls_unl, i, j) * tempalms[j]
del tempalms
t.checkpoint("mult with Punl mat ")
for i in range(len(typ)): # Lens with forward displacement
ret[i] = self._deg(self.f.lens_alm(self.lib_skyalm, skyalms[i], use_Pool=use_Pool))
t.checkpoint("Forward lensing mat ")
for i in range(len(typ)):
ret[i] = self.lib_datalm.almxfl(ret[i], self.cl_transf)
t.checkpoint("Beams")
return ret
def _apply_cond3(self, typ, alms, use_Pool=0):
#(DBxiB ^ tD ^ t + N) ^ -1 \sim D ^ -t(BxiBt + N) ^ -1 D ^ -1
assert alms.shape == self._datalms_shape(typ), (alms.shape, self._datalms_shape(typ))
t = timer(_timed, prefix=__name__, suffix='_apply_cond3')
t.checkpoint("just started")
if use_Pool <= -100:
# Try entire evaluation on GPU :
# FIXME !! lib_sky vs lib_dat
from lensit.gpu.apply_cond3_GPU import apply_cond3_GPU_inplace as c3GPU
ret = alms.copy()
c3GPU(typ, self.lib_datalm, ret, self.f, self.fi, self.cls_unl, self.cl_transf, self.cls_noise)
return ret
temp = np.empty_like(alms) # Cond. must not change their arguments
for i in range(len(typ)): # D^{-1}
temp[i] = self._deg(self.fi.lens_alm(self.lib_skyalm, self._upg(alms[i]), use_Pool=use_Pool))
t.checkpoint("Lensing with inverse")
ret = np.zeros_like(alms) # (B xi B^t + N)^{-1}
for i in range(len(typ)):
for j in range(len(typ)):
ret[i] += self._get_pmati(typ, i, j) * temp[j]
del temp
t.checkpoint("Mult. w. inv Pmat")
for i in range(len(typ)): # D^{-t}
ret[i] = self._deg(self.f.lens_alm(self.lib_skyalm, self._upg(ret[i]), use_Pool=use_Pool, mult_magn=True))
t.checkpoint("Lens w. forward and det magn.")
return ret
[docs] def get_iblms(self, typ, datalms, use_cls_len=False, use_Pool=0, **kwargs):
r"""Inverse-variance filters input CMB maps
Produces :math:`B^t \rm{Cov_\alpha}^{-1}X^{\rm dat}` (inputs to quadratic estimator routines)
The covariance matrix here includes the lensing deflection field :math:`\alpha` as given by the *self.f* and *self.fi* attributes
"""
assert use_cls_len == False, 'not implemented'
if typ == 'QU':
return self._get_iblms_v2(typ, datalms, use_cls_len=use_cls_len, use_Pool=use_cls_len, **kwargs)
if datalms.shape == (len(typ), self.dat_shape[0], self.dat_shape[1]):
_datalms = np.array([self.lib_datalm.map2alm(_m) for _m in datalms])
return self.get_iblms(typ, _datalms, use_cls_len=use_cls_len, use_Pool=use_Pool, **kwargs)
assert datalms.shape == self._datalms_shape(typ), (datalms.shape, self._datalms_shape(typ))
ilms, it = self.cd_solve(typ, datalms, use_Pool=use_Pool, **kwargs)
ret = np.empty(self._skyalms_shape(typ), dtype=complex)
for _i in range(len(typ)):
ret[_i] = self._upg(self.lib_datalm.almxfl(ilms[_i], self.cl_transf))
return ret, it
def _get_iblms_v2(self, typ, datalms, use_cls_len=False, use_Pool=0, **kwargs):
# some weird things happening with very low noise T ?
assert use_cls_len == False, 'not implemented'
MLTQUlms = SM.TEB2TQUlms(typ, self.lib_skyalm,self._get_mllms(typ, datalms,
use_cls_len=use_cls_len, use_Pool=use_Pool, **kwargs))
ret = self._mltqulms2ibtqulms(typ, MLTQUlms, datalms, use_Pool=use_Pool)
return ret, -1 # No iterations info implemented
def _mltqulms2ibtqulms(self, typ, MLTQUlms, datalms, use_Pool = 0):
""" Output TQU skyalm shaped """
ret = np.zeros(self._skyalms_shape(typ), dtype=complex)
for i in range(len(typ)):
temp = datalms[i] - self.lib_datalm.almxfl(self.f.lens_alm(self.lib_skyalm, MLTQUlms[i],
lib_alm_out=self.lib_datalm, use_Pool=use_Pool),self.cl_transf)
self.lib_datalm.almxfl(temp, self.cl_transf[:self.lib_datalm.ellmax + 1]
* cl_inverse(self.cls_noise[typ[i].lower()][:self.lib_datalm.ellmax + 1]),
inplace=True)
ret[i] = self._upg(temp)
return ret
[docs] def get_mllms(self, typ, datmaps, use_Pool=0, use_cls_len=False, **kwargs):
r"""Returns maximum likelihood sky CMB modes.
This instance uses anisotropic filtering, with lensing deflections as defined by
*self.f* and *self.fiv*
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
datmaps: data real-space maps array
use_cls_len: use lensed cls in filtering (defaults to True)
Returns:
T,E,B alm array
"""
return self._get_mllms(typ, np.array([self.lib_datalm.map2alm(m) for m in datmaps]),
use_cls_len=use_cls_len, use_Pool=use_Pool)
def _get_mllms(self, typ, datalms, use_Pool=0, use_cls_len=False, **kwargs):
assert np.all(self.cls_noise['t'] == self.cls_noise['t'][0]), 'adapt ninv filt ideal for coloured cls(easy)'
assert np.all(self.cls_noise['q'] == self.cls_noise['q'][0]), 'adapt ninv filt ideal for coloured cls(easy'
assert np.all(self.cls_noise['u'] == self.cls_noise['q'][0]), 'adapt ninv filt ideal for coloured cls(easy'
# FIXME could use opfilt_cinvBB
nlev_t = np.sqrt(self.cls_noise['t'][0] * (180. * 60 / np.pi) ** 2)
nlev_p = np.sqrt(self.cls_noise['q'][0] * (180. * 60 / np.pi) ** 2)
cmb_cls = self.cls_len if use_cls_len else self.cls_unl
filt = ffs_ninv_filt_ideal.ffs_ninv_filt_wl(self.lib_datalm, self.lib_skyalm,
cmb_cls, self.cl_transf, nlev_t, nlev_p, self.f,
self.fi, lens_pool=use_Pool)
opfilt = opfilt_cinv
opfilt._type = typ
chain = chain_samples.get_isomgchain(self.lib_skyalm.ellmax, self.lib_datalm.shape, **kwargs)
mchain =multigrid.multigrid_chain(opfilt, typ, chain, filt)
soltn = np.zeros((opfilt.TEBlen(typ), self.lib_skyalm.alm_size), dtype=complex)
mchain.solve(soltn, datalms, finiop='MLIK')
return soltn
[docs] def get_qlms(self, typ, iblms, lib_qlm, use_Pool=0, use_cls_len=False, **kwargs):
r"""Unormalized quadratic estimates (potential and curl), including current lensing deflection estimate
Note:
the output differs by a factor of two from the standard QE. This is because this was written initially
as gradient function of the CMB likelihood w.r.t. real and imaginary parts. So to use this as QE's,
the normalization is 1/2 the standard normalization the inverse response. The *lensit.ffs_qlms.qlms.py* module contains methods
of the QE's with standard normalizations which you may want to use instead.
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
iblms: inverse-variance filtered CMB alm arrays
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
use_cls_len: use lensed or unlensed cls in QE weights (numerator), defaults to lensed cls
"""
assert iblms.shape == self._skyalms_shape(typ), (iblms.shape, self._skyalms_shape(typ))
assert lib_qlm.ell_mat.lsides == self.lsides, (self.lsides, lib_qlm.ell_mat.lsides)
cls = self.cls_len if use_cls_len else self.cls_unl
almsky1 = np.empty((len(typ), self.lib_skyalm.alm_size), dtype=complex)
Bu = lambda idx: self.lib_skyalm.alm2map(iblms[idx])
_2qlm = lambda _m: lib_qlm.udgrade(self.lib_skyalm, self.lib_skyalm.map2alm(_m))
def DxiDt(alms, axis):
assert axis in [0, 1]
kfunc = self.lib_skyalm.get_ikx if axis == 1 else self.lib_skyalm.get_iky
return self.f.alm2lenmap(self.lib_skyalm, alms * kfunc(), use_Pool=use_Pool)
t = timer(_timed)
t.checkpoint(" get_likgrad::just started ")
for _j in range(len(typ)): # apply Dt and back to harmonic space :
almsky1[_j] = self.fi.lens_alm(self.lib_skyalm, iblms[_j],
mult_magn=True, use_Pool=use_Pool)
t.checkpoint(" get_likgrad::Forward lensing maps, (%s map(s)) " % len(typ))
almsky2 = np.zeros((len(typ), self.lib_skyalm.alm_size), dtype=complex)
for _i in range(len(typ)):
for _j in range(len(typ)):
almsky2[_i] += pmat.get_unlPmat_ij(typ, self.lib_skyalm, cls, _i, _j) * almsky1[_j]
del almsky1
t.checkpoint(" get_likgrad::Mult. w. unlPmat, %s field(s)" % len(typ))
retdx = _2qlm(Bu(0) * DxiDt(almsky2[0], 1))
retdy = _2qlm(Bu(0) * DxiDt(almsky2[0], 0))
for _i in range(1, len(typ)):
retdx += _2qlm(Bu(_i) * DxiDt(almsky2[_i], 1))
retdy += _2qlm(Bu(_i) * DxiDt(almsky2[_i], 0))
t.checkpoint(" get_likgrad::Cartesian Grad. (%s map(s) lensed, %s fft(s)) " % (2 * len(typ), 2 * len(typ)))
dphi = retdx * lib_qlm.get_ikx() + retdy * lib_qlm.get_iky()
dOm = - retdx * lib_qlm.get_iky() + retdy * lib_qlm.get_ikx()
return np.array([-2 * dphi, -2 * dOm]) # Factor 2 since gradient w.r.t. real and imag. parts.
[docs] def degrade(self, LD_shape, no_lensing=False, ellmax=None, ellmin=None, libtodegrade='sky', lib_dir=None):
"""Degrades covariance matrix to some lower resolution.
"""
if lib_dir is None: lib_dir = self.lib_dir + '/%sdegraded%sx%s_%s_%s' % (
{True: 'unl', False: 'len'}[no_lensing], LD_shape[0], LD_shape[1], ellmin, ellmax)
if libtodegrade == 'sky':
lib_datalmLD = self.lib_datalm.degrade(LD_shape)
lib_skyalmLD = self.lib_skyalm.degrade(LD_shape, ellmax=ellmax, ellmin=ellmin)
else:
lib_dir += 'dat'
lib_datalmLD = self.lib_datalm.degrade(LD_shape, ellmax=ellmax, ellmin=ellmin)
lib_skyalmLD = self.lib_skyalm.degrade(LD_shape)
if no_lensing:
return ffs_diagcov_alm(lib_dir, lib_datalmLD, self.cls_unl, self.cls_len, self.cl_transf, self.cls_noise,
lib_skyalm=lib_skyalmLD)
fLD = self.f.degrade(LD_shape, no_lensing)
finvLD = self.fi.degrade(LD_shape, no_lensing)
return ffs_lencov_alm(lib_dir, lib_datalmLD, lib_skyalmLD,
self.cls_unl, self.cls_len, self.cl_transf, self.cls_noise, fLD, finvLD)
[docs] def predMFpOlms(self, typ, lib_qlm, use_cls_len=False):
"""
Perturbative prediction for the mean field <hat phi>_f,
which is Rlm plm
"""
Rpp, ROO, RpO = self.get_mfrespcls(typ, lib_qlm, use_cls_len=use_cls_len)
del RpO
plm, Olm = self.f.get_pOlm(lib_qlm)
plm *= Rpp
Olm *= ROO
return np.array([plm, Olm])
[docs] def eval_mf(self, typ, mfkey, xlms_sky, xlms_dat, lib_qlm, use_Pool=0, **kwargs):
"""Delfection-induced mean-field estimation from input random phases
Args:
typ: 'T', 'QU', 'TQU' for temperature-only, polarization-only or joint analysis
mfkey: mean-field estimator key
xlms_sky: i.i.d. sky modes random phases (if relevant)
xlms_dat: i.i.d. data modes random phases (if relevant)
lib_qlm: *ffs_alm* instance describing the lensing alm arrays
Returns:
mean-field estimate (gradient and curl component)
"""
assert lib_qlm.ell_mat.lsides == self.lsides, (self.lsides, lib_qlm.ell_mat.lsides)
assert typ in typs, (typ, typs)
times = timer(_timed)
ikx = lambda: self.lib_skyalm.get_ikx()
iky = lambda: self.lib_skyalm.get_iky()
times.checkpoint('Just started eval MF %s %s' % (typ, mfkey))
if mfkey == 14:
# W1 = B^t F^t l^{1/2}, W2 = D daxi D^t B^t F^t Cov_f^{-1}l^{-1/2}
assert np.all([(_x.size == self.lib_datalm.alm_size) for _x in xlms_dat])
ell_w = 1. / np.sqrt(np.arange(1, self.lib_datalm.ellmax + 2) - 0.5)
for _i in range(len(typ)):
xlms_dat[_i] = self.lib_datalm.almxfl(xlms_dat[_i], ell_w)
def Bx(i):
_cl = self.cl_transf[:self.lib_datalm.ellmax + 1] / ell_w ** 2
_alm = self.lib_datalm.almxfl(xlms_dat[i], _cl)
_alm = self.lib_skyalm.udgrade(self.lib_datalm, _alm)
return self.lib_skyalm.alm2map(_alm)
ilms, it = self.cd_solve(typ, xlms_dat, use_Pool=use_Pool, **kwargs)
times.checkpoint(' Done with cd solving')
for _i in range(len(typ)):
ilms[_i] = self.lib_datalm.almxfl(ilms[_i], self.cl_transf)
skyalms = np.empty((len(typ), self.lib_skyalm.alm_size), dtype=complex)
for _i in range(len(typ)):
skyalms[_i] = self.lib_skyalm.udgrade(self.lib_datalm, ilms[_i])
skyalms[_i] = self.fi.lens_alm(self.lib_skyalm, skyalms[_i], mult_magn=True, use_Pool=use_Pool)
del ilms
times.checkpoint(' Done with first lensing')
def _2lenmap(_alm):
return self.f.alm2lenmap(self.lib_skyalm, _alm, use_Pool=use_Pool)
dx = np.zeros(lib_qlm.alm_size, dtype=complex)
dy = np.zeros(lib_qlm.alm_size, dtype=complex)
for _i in range(len(typ)):
tempalms = pmat.get_unlPmat_ij(typ, self.lib_skyalm, self.cls_unl, _i, 0) * skyalms[0]
for _j in range(1, len(typ)):
tempalms += pmat.get_unlPmat_ij(typ, self.lib_skyalm, self.cls_unl, _i, _j) * skyalms[_j]
dx += lib_qlm.map2alm(Bx(_i) * _2lenmap(tempalms * ikx()))
dy += lib_qlm.map2alm(Bx(_i) * _2lenmap(tempalms * iky()))
times.checkpoint(' Done with second lensing. Done.')
del skyalms, tempalms
elif mfkey == 0:
# Std qest. We build the sim and use the std methods
assert np.all([(_x.size == self.lib_skyalm.alm_size) for _x in xlms_sky])
assert np.all([(_x.size == self.lib_datalm.alm_size) for _x in xlms_dat])
sim = np.empty((len(typ), self.lib_datalm.alm_size), dtype=complex)
for _i in range(len(typ)):
skysim = self._get_rootpmatsky(typ, _i, 0) * xlms_sky[0]
for _j in range(1, len(typ)):
skysim += self._get_rootpmatsky(typ, _i, _j) * xlms_sky[_j]
skysim = self.f.lens_alm(self.lib_skyalm, skysim, use_Pool=use_Pool)
sim[_i] = self.lib_datalm.udgrade(self.lib_skyalm, skysim)
sim[_i] = self.lib_datalm.almxfl(sim[_i], self.cl_transf)
if typ == 'QU':
sim[0] += self.lib_datalm.almxfl(xlms_dat[0], np.sqrt(self.cls_noise['q']))
sim[1] += self.lib_datalm.almxfl(xlms_dat[1], np.sqrt(self.cls_noise['u']))
elif typ == 'TQU':
sim[0] += self.lib_datalm.almxfl(xlms_dat[0], np.sqrt(self.cls_noise['t']))
sim[1] += self.lib_datalm.almxfl(xlms_dat[1], np.sqrt(self.cls_noise['q']))
sim[2] += self.lib_datalm.almxfl(xlms_dat[2], np.sqrt(self.cls_noise['u']))
elif typ == 'T':
sim[0] += self.lib_datalm.almxfl(xlms_dat[0], np.sqrt(self.cls_noise['t']))
else:
assert 0
times.checkpoint(' Done with building sim')
sim, it = self.cd_solve(typ, sim, use_Pool=use_Pool, **kwargs)
for _i in range(len(typ)): # xlms is now iblms
sim[_i] = self.lib_datalm.almxfl(sim[_i], self.cl_transf)
times.checkpoint(' Done with ivf')
return self.get_qlms(typ, np.array([self.lib_skyalm.udgrade(self.lib_datalm, _s) for _s in sim]),
lib_qlm=lib_qlm, use_Pool=use_Pool)
else:
dx = 0
dy = 0
it = 0
assert 0, 'mfkey %s not implemented' % mfkey
dphi = dx * lib_qlm.get_ikx() + dy * lib_qlm.get_iky()
dOm = -dx * lib_qlm.get_iky() + dy * lib_qlm.get_ikx()
return np.array([-2 * dphi, -2 * dOm]), it # Factor 2 since gradient w.r.t. real and imag. parts.