import warnings
import numpy as np
import xarray as xr
from numpy.typing import ArrayLike
from scipy.integrate import simpson
from scipy.sparse.linalg import eigs
from ..pyro import Pyro
from ..units import ureg as units
[docs]
class Diagnostics:
"""
Contains all the diagnistics that can be applied to simulation output data.
Currently, this class contains only the function to generate a Poincare map,
but new diagnostics will be available in future.
Please call "load_gk_output" before attempting to use any diagnostic
Parameters
----------
pyro: Pyro object containing simulation output data (and geometry)
"""
[docs]
def __init__(self, pyro: Pyro):
self.pyro = pyro
[docs]
def gs2_geometry_terms(self, ntheta_multiplier: int = 1):
nperiod = self.pyro.numerics.nperiod
ntheta = ((2 * nperiod) - 1) * self.pyro.numerics.ntheta * ntheta_multiplier
theta_max = ((2 * nperiod) - 1) * np.pi
theta_even = np.linspace(-theta_max, theta_max, ntheta)
self.pyro.load_metric_terms(theta=theta_even)
metric = self.pyro.metric_terms
metric.to(self.pyro.norms.gs2, self.pyro.norms.context)
theta = metric.regulartheta * units.radians
dpdrho = metric.mu0dPdr
dpsidrho = metric.dpsidr
# Metric tensor terms
g_ra_contr = metric.field_aligned_contravariant_metric("r", "alpha")
g_aa = metric.field_aligned_contravariant_metric("alpha", "alpha")
g_at = metric.field_aligned_covariant_metric("alpha", "theta")
grad_alpha = np.sqrt(g_aa)
g_tt = metric.field_aligned_covariant_metric("theta", "theta")
g_rt = metric.field_aligned_covariant_metric("r", "theta")
g_rr = metric.field_aligned_contravariant_metric("r", "r")
g_rt_contr = metric.toroidal_contravariant_metric("r", "theta")
g_rr_contr = metric.toroidal_contravariant_metric("r", "r")
# Jacobian and derivatives
jacob = metric.Jacobian
# R and derivatives
R = metric.R
dR_dr = metric.dRdr
# Z and derivatives
Z = metric.Z
dZ_dr = metric.dZdr
# B_mag and derivatives
B_mag = metric.B_magnitude
dB_dr = metric.dB_magnitude_dr
dB_dtheta = metric.dB_magnitude_dtheta
# Drifts
gbdrift = -2 * (dB_dr - g_rt / g_tt * dB_dtheta) / B_mag
gbdrift0 = 2 * metric.dqdr * g_at * dB_dtheta / (g_tt * B_mag)
press = -2 * dpdrho * jacob / (np.sqrt(g_tt) * B_mag * dpsidrho)
cvdrift = gbdrift + press
cvdrift0 = gbdrift0
# GDS values
gds2 = (grad_alpha * dpsidrho) ** 2
gds21 = -(dpsidrho**2) * g_ra_contr * metric.dqdr
gds22 = (dpsidrho * metric.dqdr) ** 2 * g_rr
# Parallel gradient
gradpar = 1 / np.sqrt(g_tt)
# B field
bmag = B_mag
bpol = dpsidrho * np.sqrt(g_rr) / R
# Grad rho
grho = np.sqrt(g_rr)
# Eikonal
S = -metric.alpha
dS_dr = (
-dpsidrho
/ np.sqrt(g_rr_contr)
* (metric.dalpha_dr * g_rr_contr + metric.dalpha_dtheta * g_rt_contr)
)
return {
"dpdrho": dpdrho,
"theta": theta,
"bmag": bmag,
"bpol": bpol,
"gradpar": gradpar,
"cvdrift": cvdrift,
"cvdrift0": cvdrift0,
"gds2": gds2,
"gbdrift": gbdrift,
"gbdrift0": gbdrift0,
"grho": grho,
"gds21": gds21,
"gds22": gds22,
"jacob": jacob,
"rplot": R,
"zplot": Z,
"rprime": dR_dr,
"zprime": dZ_dr,
"aplot": S,
"aprime": dS_dr,
}
[docs]
def ideal_ballooning_solver(self, theta0: float = 0.0):
r"""
Adapted from ideal-ballooning-solver
https://github.com/rahulgaur104/ideal-ballooning-solver/blob/master/ball_scan.py
Parameters
----------
theta0: float
Ballooning angle
Returns
-------
gamma: Float
Ideal ballooning growth rate
"""
geometry_terms = self.gs2_geometry_terms()
dpdrho = geometry_terms["dpdrho"]
theta = geometry_terms["theta"]
bmag = geometry_terms["bmag"]
gradpar = geometry_terms["gradpar"]
cvdrift = geometry_terms["cvdrift"]
cvdrift0 = geometry_terms["cvdrift0"]
gds2 = geometry_terms["gds2"]
gds21 = geometry_terms["gds21"]
gds22 = geometry_terms["gds22"]
# theta0 correction
gds2 = gds2 + 2 * theta0 * gds21 + theta0**2 * gds22
cvdrift = cvdrift + theta0 * cvdrift0
gamma, X_arr, dX_arr, g_arr, c_arr, f_arr = gamma_ball_full(
dpdrho, theta, bmag, gradpar, cvdrift, gds2
)
return gamma
[docs]
def bicoherence(self, fluctuation, wavenumber_tolerance=1e-5, stationary=True):
r"""
Perform bicoherence analysis for a given DataArray
Bispectrum given by:
.. math::
Bi(k_{x,1}, k_{y,1}, k_{x,2}, k_{y,2}) = \langle f_1 * f_2 * f_3^\dagger\rangle_{t}
The bicoherence is normalised to
.. math::
b^2 = \frac{|Bi|^2}{\langle|f_1 * f_2|^2\rangle_{t} \langle|f_3|^2\rangle_{t}}
where
.. math::
f_1 = fluctuation(k_{x,1}, k_{y,1}, t)
.. math::
f_2 = fluctuation(k_{x,2}, k_{y,2}, t)
.. math::
f_3 = fluctuation(k_{x,3}, k_{y,3}, t)
with :math:`\langle\rangle_{t}` being an average over time
where
:math:`k_{x,3} = k_{x,1}+k_{x,2}`
:math:`k_{y,3} = k_{y,1}+k_{y,2}`
Parameters
----------
fluctuation: xr.DataArray
Fluctuation dataset which must be a DataArray with
dimensions (:math:`k_x`, :math:`k_y`, :math:`t`)
wavenumber_tolerance: float
Tolerance to find matching wavenumbers in :math:`k_{x,3}, k_{y,3}`
Returns
-------
data: xr.Dataset
Dataset with DataArray of bicoherence and phase with dimensions
(:math:`k_{x,1}`, :math:`k_{y,1}`,:math:`k_{x,2}`, :math:`k_{y,2}`)
"""
time = fluctuation["time"].data
kx = fluctuation["kx"].data
ky = fluctuation["ky"].data
ntime = len(time)
nkx = len(kx)
nky = len(ky)
kx_max = max(abs(kx))
ky_max = max(abs(ky))
# Initialise different Arrays
bispectrum = xr.DataArray(
np.zeros((nkx, nky, nkx, nky)),
coords=[kx, ky, kx, ky],
dims=["kx1", "ky1", "kx2", "ky2"],
)
bicoherence = xr.DataArray(
np.zeros((nkx, nky, nkx, nky)),
coords=[kx, ky, kx, ky],
dims=["kx1", "ky1", "kx2", "ky2"],
)
phase = xr.DataArray(
np.zeros((nkx, nky, nkx, nky)),
coords=[kx, ky, kx, ky],
dims=["kx1", "ky1", "kx2", "ky2"],
)
# Create mesh grids of the kx and ky values
kx1, kx2 = np.meshgrid(kx, kx, indexing="ij")
ky1, ky2 = np.meshgrid(ky, ky, indexing="ij")
# Compute kx3 and ky3 with modular arithmetic
kx3 = kx1 + kx2
ky3 = ky1 + ky2
kx3 = np.where(abs(kx3) <= max(abs(kx)), kx3, kx3 % kx_max * np.sign(kx3))
ky3 = np.where(abs(ky3) <= max(abs(ky)), ky3, ky3 % ky_max * np.sign(ky3))
# Extract the relevant data slices based on calculated indices
# fft_data for (kx1, ky1), (kx2, ky2), (kx3, ky3) Currently in the form
fluctuation_kx1_ky1 = fluctuation.sel(
kx=kx1.ravel(), ky=ky1.ravel()
).values.reshape(nkx, nkx, nky, nky, ntime)
fluctuation_kx2_ky2 = fluctuation.sel(
kx=kx2.ravel(), ky=ky2.ravel()
).values.reshape(nkx, nkx, nky, nky, ntime)
fluctuation_kx3_ky3 = fluctuation.sel(
kx=kx3.ravel(),
ky=ky3.ravel(),
method="nearest",
tolerance=wavenumber_tolerance,
).values.reshape(nkx, nkx, nky, nky, ntime)
# Swap axes such that dimensions are (kx1, ky1, kx2, ky2)
fluctuation_kx1_ky1 = np.swapaxes(fluctuation_kx1_ky1, 1, 2)
fluctuation_kx2_ky2 = np.swapaxes(fluctuation_kx2_ky2, 1, 2)
fluctuation_kx3_ky3 = np.swapaxes(fluctuation_kx3_ky3, 1, 2)
if not stationary:
fluctuation_kx1_ky1 *= 1.0 / np.abs(fluctuation_kx1_ky1)
fluctuation_kx2_ky2 *= 1.0 / np.abs(fluctuation_kx2_ky2)
fluctuation_kx3_ky3 *= 1.0 / np.abs(fluctuation_kx3_ky3)
bispectrum_data = (
fluctuation_kx1_ky1 * fluctuation_kx2_ky2 * np.conj(fluctuation_kx3_ky3)
)
phase.data = np.mean(
np.arctan2(bispectrum_data.imag, bispectrum_data.real), axis=-1
)
power_total = np.mean(
np.abs(fluctuation_kx1_ky1 * fluctuation_kx2_ky2) ** 2, axis=-1
) * np.mean(np.abs(fluctuation_kx3_ky3) ** 2, axis=-1)
bispectrum.data = np.abs(np.mean(bispectrum_data, axis=-1))
bicoherence.data = np.abs(bispectrum) ** 2 / (power_total)
bicoherence = bicoherence.fillna(0.0)
data = bicoherence.to_dataset(name="bicoherence")
data["phase"] = phase
return data
[docs]
def cross_bicoherence(
self,
fluctuation1,
fluctuation2,
fluctuation3,
wavenumber_tolerance=1e-5,
stationary=True,
):
r"""
Perform cross bicoherence analysis for a given DataArray
Bispectrum given by:
.. math::
Bi(k_{x,1}, k_{y,1}, k_{x,2}, k_{y,2}) = \langle f_1 * f_2 * f_3^\dagger\rangle_{t}
The bicoherence is normalised to
.. math::
b^2 = \frac{|Bi|^2}{\langle|f_1 * f_2|^2\rangle_{t} \langle|f_3|^2\rangle_{t}}
where
.. math::
f_1 = fluctuation1(k_{x,1}, k_{y,1}, t)
.. math::
f_2 = fluctuation2(k_{x,2}, k_{y,2}, t)
.. math::
f_3 = fluctuation3(k_{x,3}, k_{y,3}, t)
with :math:`\langle\rangle_{t}` being an average over time
where
:math:`k_{x,3} = k_{x,1}+k_{x,2}`
:math:`k_{y,3} = k_{y,1}+k_{y,2}`
Parameters
----------
fluctuation1: xr.DataArray
First fluctuation dataset which must be a DataArray with
dimensions (:math:`k_x`, :math:`k_y`, :math:`t`)
fluctuation2: xr.DataArray
Second fluctuation dataset which must be a DataArray with
dimensions (:math:`k_x`, :math:`k_y`, :math:`t`)
fluctuation3: xr.DataArray
Third fluctuation dataset which must be a DataArray with
dimensions (:math:`k_x`, :math:`k_y`, :math:`t`)
wavenumber_tolerance: float
Tolerance to find matching wavenumbers in :math:`k_{x,3}, k_{y,3}`
Returns
-------
data: xr.Dataset
Dataset with DataArray of cross-bicoherence and phase with dimensions
(:math:`k_{x,1}`, :math:`k_{y,1}`,:math:`k_{x,2}`, :math:`k_{y,2}`)
"""
time = fluctuation1["time"].data
kx = fluctuation1["kx"].data
ky = fluctuation1["ky"].data
ntime = len(time)
nkx = len(kx)
nky = len(ky)
kx_max = max(abs(kx))
ky_max = max(abs(ky))
# Initialise different Arrays
bispectrum = xr.DataArray(
np.zeros((nkx, nky, nkx, nky)),
coords=[kx, ky, kx, ky],
dims=["kx1", "ky1", "kx2", "ky2"],
)
bicoherence = xr.DataArray(
np.zeros((nkx, nky, nkx, nky)),
coords=[kx, ky, kx, ky],
dims=["kx1", "ky1", "kx2", "ky2"],
)
phase = xr.DataArray(
np.zeros((nkx, nky, nkx, nky)),
coords=[kx, ky, kx, ky],
dims=["kx1", "ky1", "kx2", "ky2"],
)
# Create mesh grids of the kx and ky values
kx1, kx2 = np.meshgrid(kx, kx, indexing="ij")
ky1, ky2 = np.meshgrid(ky, ky, indexing="ij")
# Compute kx3 and ky3 with modular arithmetic
kx3 = kx1 + kx2
ky3 = ky1 + ky2
kx3 = np.where(abs(kx3) <= max(kx), kx3, kx3 % kx_max * np.sign(kx3))
ky3 = np.where(abs(ky3) <= max(ky), ky3, ky3 % ky_max * np.sign(ky3))
# Extract the relevant data slices based on calculated indices
# fft_data for (kx1, ky1), (kx2, ky2), (kx3, ky3) Currently in the form
fluctuation_kx1_ky1 = fluctuation1.sel(
kx=kx1.ravel(), ky=ky1.ravel()
).values.reshape(nkx, nkx, nky, nky, ntime)
fluctuation_kx2_ky2 = fluctuation2.sel(
kx=kx2.ravel(), ky=ky2.ravel()
).values.reshape(nkx, nkx, nky, nky, ntime)
fluctuation_kx3_ky3 = fluctuation3.sel(
kx=kx3.ravel(),
ky=ky3.ravel(),
method="nearest",
tolerance=wavenumber_tolerance,
).values.reshape(nkx, nkx, nky, nky, ntime)
# Swap axes such that dimensions are (kx1, ky1, kx2, ky2)
fluctuation_kx1_ky1 = np.swapaxes(fluctuation_kx1_ky1, 1, 2)
fluctuation_kx2_ky2 = np.swapaxes(fluctuation_kx2_ky2, 1, 2)
fluctuation_kx3_ky3 = np.swapaxes(fluctuation_kx3_ky3, 1, 2)
if not stationary:
fluctuation_kx1_ky1 *= 1.0 / np.abs(fluctuation_kx1_ky1)
fluctuation_kx2_ky2 *= 1.0 / np.abs(fluctuation_kx2_ky2)
fluctuation_kx3_ky3 *= 1.0 / np.abs(fluctuation_kx3_ky3)
bispectrum_data = (
fluctuation_kx1_ky1 * fluctuation_kx2_ky2 * np.conj(fluctuation_kx3_ky3)
)
phase.data = np.mean(
np.arctan2(bispectrum_data.imag, bispectrum_data.real), axis=-1
)
power_total = np.mean(
np.abs(fluctuation_kx1_ky1 * fluctuation_kx2_ky2) ** 2, axis=-1
) * np.mean(np.abs(fluctuation_kx3_ky3) ** 2, axis=-1)
bispectrum.data = np.abs(np.mean(bispectrum_data, axis=-1))
bicoherence.data = np.abs(bispectrum) ** 2 / (power_total)
bicoherence = bicoherence.fillna(0.0)
data = bicoherence.to_dataset(name="bicoherence")
data["phase"] = phase
return data
[docs]
def gamma_ball_full(
dPdrho, theta_PEST, B, gradpar, cvdrift, gds2, vguess=None, sigma0=0.42
):
r"""
Ideal-ballooning growth rate finder.
This function uses a finite-difference method
to calculate the maximum growth rate against the
infinite-n ideal ballooning mode. The equation being solved is
:math::`\frac{\partial}{\partial \theta} \bigg( g \frac{\partial X}{\partial \theta} \bigg) + c X - \lambda f X = 0, g, f > 0`
where
:math::`g = \nabla_{||} gds2 / |B|`
:math::`c = \frac{1}{\nabla_{||}} \frac{\partial p}{\partial \rho} cvdrift`
:math::`f = \frac{gds2}{|B|^3} \frac{1}{\nabla_{||}}`
are needed along a field line to solve the ballooning equation once.
:math::`gds2 = \frac{\partial\psi}{\partial \rho}^2 |\nabla \alpha|^2`, :math::`\alpha = \phi - q (\theta - \theta_0)`
which is the field line bending term
:math::`\nabla_{||} = \hat{b} \cdot \vec{\nabla} \theta` which is the parallel gradient,
:math::`cvdrift = \frac{\partial\psi}{\partial\rho}^2 (\hat{b} \times (\hat{b} \cdot \vec{\nabla} \hat{b})) \cdot \vec{\nabla} \alpha`
which is the curvature drift
Parameters
----------
dPdrho: float
Gradient of the total plasma pressure w.r.t the radial coordinate rho
theta_PEST: ArrayLike
Vector of grid points in a straight field line theta (PEST coordinates)
B: ArrayLike
The magnetic field strength
gradpar: ArrayLike
Parallel gradient
cvdrift: ArrayLike
Geometric factor in curvature drift
gds2: ArrayLike
Field line bending term
vguess: ArrayLike
starting guess for the eigenfunction (length in N_theta_PEST -2)
sigma0: float
starting guess for the eigenvalue (square of the growth rate). Must always be greater than the final value. Choose a large value
Returns
-------
gam: Float
Ideal ballooning growth rate
"""
theta_ball = theta_PEST
# ntheta = len(theta_ball)
# Note that gds2 is (dpsidrho*|grad alpha|/(a_N*B_N))**2.
g = np.abs(gradpar) * gds2 / (B)
c = -1 * dPdrho * cvdrift * 1 / (np.abs(gradpar) * B)
f = gds2 / B**2 * 1 / (np.abs(gradpar) * B)
len1 = len(g)
# Uniform half theta ball
theta_ball_u = np.linspace(theta_ball[0], theta_ball[-1], len1)
# TODO pint=0.23 doesnt support np.diag
g_u = np.interp(theta_ball_u, theta_ball, g).m
c_u = np.interp(theta_ball_u, theta_ball, c).m
f_u = np.interp(theta_ball_u, theta_ball, f).m
# uniform theta_ball on half points with half the size, i.e., only from [0, (2*nperiod-1)*np.pi]
theta_ball_u_half = (theta_ball_u[:-1] + theta_ball_u[1:]) / 2
h = np.diff(theta_ball_u_half)[2].m
g_u_half = np.interp(theta_ball_u_half, theta_ball, g).m
g_u1 = g_u[:]
c_u1 = c_u[:]
f_u1 = f_u[:]
len2 = int(len1) - 2
A = np.zeros((len2, len2))
A = (
np.diag(g_u_half[1:-1] / f_u1[2:-1] * 1 / h**2, -1)
+ np.diag(
-(g_u_half[1:] + g_u_half[:-1]) / f_u1[1:-1] * 1 / h**2
+ c_u1[1:-1] / f_u1[1:-1],
0,
)
+ np.diag(g_u_half[1:-1] / f_u1[1:-2] * 1 / h**2, 1)
)
# Method without M is approx 3 X faster with Arnoldi iteration
# Perhaps, we should try dstemr as suggested by Max Ruth. However, I doubt if
# that will give us a significant speedup
w, v = eigs(A, 1, sigma=sigma0, v0=vguess, tol=5.0e-7, OPpart="r")
# w, v = eigs(A, 1, sigma=1.0, tol=1E-6, OPpart='r')
# w, v = eigs(A, 1, sigma=1.0, tol=1E-6, OPpart='r')
# ## Richardson extrapolation
X = np.zeros((len2 + 2,))
dX = np.zeros((len2 + 2,))
# X[1:-1] = np.reshape(v[:, idx_max].real, (-1,))/np.max(np.abs(v[:, idx_max].real))
X[1:-1] = np.reshape(v[:, 0].real, (-1,)) / np.max(np.abs(v[:, 0].real))
X[0] = 0.0
X[-1] = 0.0
dX[0] = (-1.5 * X[0] + 2 * X[1] - 0.5 * X[2]) / h
dX[1] = (X[2] - X[0]) / (2 * h)
dX[-2] = (X[-1] - X[-3]) / (2 * h)
dX[-1] = (0.5 * X[-3] - 2 * X[-2] + 1.5 * 0.0) / (h)
dX[2:-2] = 2 / (3 * h) * (X[3:-1] - X[1:-3]) - (X[4:] - X[0:-4]) / (12 * h)
Y0 = -g_u1 * dX**2 + c_u1 * X**2
Y1 = f_u1 * X**2
# plt.plot(range(len3+2), X, range(len3+2), dX); plt.show()
gam = simpson(Y0) / simpson(Y1)
# return np.sign(gam)*np.sqrt(abs(gam)), X, dX, g_u1, c_u1, f_u1
return gam, X, dX, g_u1, c_u1, f_u1