from numpy import uint, newaxis, finfo, float32, float64, zeros
from .numba import sign, arange
from numba import njit, prange
from psutil import virtual_memory
from .exceptions import FutureExceedsMemory
from scipy.linalg.blas import dsyrk, ssyrk # For XTX, XXT
from scipy.linalg import lapack as _lapack
from . import numba
from .base import USE_NUMBA
__all__ = ['lapack','svd_flip', 'eig_flip', '_svdCond', '_eighCond',
'memoryXTX', 'memoryCovariance', 'memorySVD', '_float',
'traceXTX', 'fastDot', '_XTX', '_XXT',
'rowSum', 'rowSum_A','reflect',
'addDiagonal', 'setDiagonal']
_condition = {'f': 1e3, 'd': 1e6}
[docs]class lapack():
"""
[Added 11/11/2018] [Edited 13/11/2018 -> made into a class]
[Edited 14/11/2018 -> fixed class]
Get a LAPACK function based on the dtype(X). Acts like Scipy.
"""
def __init__(self, function, fast = True, numba = None):
self.function = function
self.fast = fast
self.f = None
if numba != None and USE_NUMBA:
try: f = eval(f'numba.{function}')
except: pass
f = eval(f)
self.f = f
def __repr__(self):
return f"Calls LAPACK or Numba function {self.function}"
def __call__(self, *args, **kwargs):
if self.f == None:
self.f = f"_lapack.d{self.function}"
if len(args) == 0:
a = next(iter(kwargs.values()))
else:
a = args[0]
if a.dtype == np.float32 and self.fast:
self.f = f"_lapack.s{self.function}"
self.f = eval(self.f)
return self.f(*args, **kwargs)
[docs]def svd_flip(U, VT, U_decision = True):
"""
Flips the signs of U and VT for SVD in order to force deterministic output.
Follows Sklearn convention by looking at U's maximum in columns
as default.
"""
if U_decision:
max_abs_cols = abs(U).argmax(0)
signs = sign( U[max_abs_cols, arange(U.shape[1]) ] )
else:
# rows of v, columns of u
max_abs_rows = abs(VT).argmax(1)
signs = sign( VT[ arange(VT.shape[0]) , max_abs_rows] )
U *= signs
VT *= signs[:,newaxis]
return U, VT
[docs]def eig_flip(V):
"""
Flips the signs of V for Eigendecomposition in order to
force deterministic output.
Follows Sklearn convention by looking at V's maximum in columns
as default. This essentially mirrors svd_flip(U_decision = False)
"""
max_abs_rows = abs(V).argmax(0)
signs = sign( V[max_abs_rows, arange(V.shape[1]) ] )
V *= signs
return V
def _svdCond(U, S, VT, alpha):
"""
Condition number from Scipy.
cond = 1e-3 / 1e-6 * eps * max(S)
"""
t = S.dtype.char.lower()
cond = (S > (_condition[t] * finfo(t).eps * S[0]))
rank = cond.sum()
S /= (S**2 + alpha)
return U[:, :rank], S[:rank], VT[:rank]
def _eighCond(S2, V):
"""
Condition number from Scipy.
cond = 1e-3 / 1e-6 * eps * max(S2)
Note that maximum could be either S2[-1] or S2[0]
depending on it's absolute magnitude.
"""
t = S2.dtype.char.lower()
absS = abs(S2)
maximum = absS[0] if absS[0] >= absS[-1] else absS[-1]
cond = (absS > (_condition[t] * finfo(t).eps * maximum) )
S2 = S2[cond]
return S2, V[:, cond]
[docs]def memoryXTX(X):
"""
Computes the memory usage for X.T @ X so that error messages
can be broadcast without submitting to a memory error.
"""
free = virtual_memory().free * 0.95
byte = 4 if '32' in str(X.dtype) else 8
memUsage = X.shape[1]**2 * byte
return memUsage < free
[docs]def memoryCovariance(X):
"""
Computes the memory usage for X.T @ X or X @ X.T so that error messages
can be broadcast without submitting to a memory error.
"""
n,p = X.shape
free = virtual_memory().free * 0.95
byte = 4 if '32' in str(X.dtype) else 8
size = p if n > p else n
memUsage = size**2 * byte
if memUsage > free:
raise FutureExceedsMemory()
return
[docs]def memorySVD(X):
"""
Computes the approximate memory usage of SVD(X) [transpose or not].
How it's computed:
X = U * S * VT
U(n,p) * S(p) * VT(p,p)
This means RAM usgae is np+p+p^2 approximately.
### TODO: Divide N Conquer SVD vs old SVD
"""
n,p = X.shape
if n > p: n,p = p,n
free = virtual_memory().free * 0.95
byte = 4 if '32' in str(X.dtype) else 8
U = n*p
S = p
VT = p*p
memUsage = (U+S+VT) * byte
return memUsage < free
def _float(data):
dtype = str(data.dtype)
if 'f' not in dtype:
if '64' in dtype:
return data.astype(float64)
return data.astype(float32)
return data
[docs]@njit(fastmath = True, nogil = True, cache = True)
def traceXTX(X):
"""
[Edited 18/10/2018]
One drawback of truncated algorithms is that they can't output the correct
variance explained ratios, since the full eigenvalue decomp needs to be
done. However, using linear algebra, trace(XT*X) = sum(eigenvalues).
So, this function outputs the trace(XT*X) efficiently without computing
explicitly XT*X.
Changes --> now uses Numba which is approx 20% faster.
"""
n, p = X.shape
s = 0
for i in range(n):
for j in range(p):
xij = X[i,j]
s += xij*xij
return s
[docs]def fastDot(A, B, C):
"""
[Added 23/9/2018]
[Updated 1/10/2018 Error in calculating which is faster]
Computes a fast matrix multiplication of 3 matrices.
Either performs (A @ B) @ C or A @ (B @ C) depending which is more
efficient.
"""
size = A.shape
n = size[0]
p = size[1] if len(size) > 1 else 1
size = B.shape
k = size[1] if len(size) > 1 else 1
size = C.shape
d = size[1] if len(size) > 1 else 1
# Forward (A @ B) @ C
# p*k*n + k*d*n = kn(p+d)
forward = k*n*(p+d)
# Backward A @ (B @ C)
# p*d*n + k*d*p = pd(n+k)
backward = p*d*(n+k)
if forward <= backward:
return (A @ B) @ C
return A @ (B @ C)
def _XTX(XT):
"""
[Added 30/9/2018]
Computes XT @ X much faster than naive XT @ X.
Notice XT @ X is symmetric, hence instead of doing the
full matrix multiplication XT @ X which takes O(np^2) time,
compute only the upper triangular which takes slightly
less time and memory.
"""
if XT.dtype == float64:
return dsyrk(1, XT, trans = 0).T
return ssyrk(1, XT, trans = 0).T
def _XXT(XT):
"""
[Added 30/9/2018]
Computes X @ XT much faster than naive X @ XT.
Notice X @ XT is symmetric, hence instead of doing the
full matrix multiplication X @ XT which takes O(pn^2) time,
compute only the upper triangular which takes slightly
less time and memory.
"""
if XT.dtype == float64:
return dsyrk(1, XT, trans = 1).T
return ssyrk(1, XT, trans = 1).T
@njit(fastmath = True, nogil = True, cache = True)
def rowSum_0(X, norm = False):
"""
[Added 17/10/2018]
Computes rowSum**2 for dense matrix efficiently, instead of using einsum
"""
n, p = X.shape
S = zeros(n, dtype = X.dtype)
for i in range(n):
s = 0
Xi = X[i]
for j in range(p):
Xij = Xi[j]
s += Xij*Xij
S[i] = s
if norm:
S**=0.5
return S
[docs]@njit(fastmath = True, nogil = True, cache = True)
def rowSum_A(X, norm = False):
"""
[Added 22/10/2018]
Computes rowSum**2 for dense array efficiently, instead of using einsum
"""
n = len(X)
s = 0
for i in range(n):
s += X[i]**2
if norm:
s **= 0.5
return s
[docs]def rowSum(X, norm = False):
"""
[Added 22/10/2018]
Combines rowSum for matrices and arrays.
"""
if len(X.shape) > 1:
return rowSum_0(X, norm)
return rowSum_A(X, norm)
def _reflect(X):
"""
See reflect(X, n_jobs = N) documentation.
"""
n = len(X)
for i in prange(1, n):
Xi = X[i]
for j in range(i):
X[j, i] = Xi[j]
return X
reflect_single = njit(_reflect, fastmath = True, nogil = True, cache = True)
reflect_parallel = njit(_reflect, fastmath = True, nogil = True, parallel = True)
[docs]def reflect(X, n_jobs = 1):
"""
[Added 15/10/2018] [Edited 18/10/2018]
Reflects lower triangular of matrix efficiently to upper.
Notice much faster than say X += X.T or naive:
for i in range(n):
for j in range(i, n):
X[i,j] = X[j,i]
In fact, it is much faster to perform vertically:
for i in range(1, n):
Xi = X[i]
for j in range(i):
X[j,i] = Xi[j]
The trick is to notice X[i], which reduces array access.
"""
X = reflect_parallel(X) if n_jobs != 1 else reflect_single(X)
return X
[docs]def addDiagonal(X, c = 1):
"""
[Added 11/11/2018]
Add c to diagonal of matrix
"""
n = X.shape[0]
X.flat[::n+1] += c
[docs]def setDiagonal(X, c = 1):
"""
[Added 11/11/2018]
Set c to diagonal of matrix
"""
n = X.shape[0]
X.flat[::n+1] = c