import numpy as np
import xarray as xr
from ..pyroscan import PyroScan
[docs]
class SaturationRules:
r"""
Contains all the different saturation rules that can be applied
Need a PyroScan object to apply the rule to
"""
[docs]
def __init__(self, pyro_scan: PyroScan):
self.pyro_scan = pyro_scan
[docs]
def mg_saturation(
self,
Q0: float = 25.0,
alpha: float = 2.5,
gamma_exb: float = 0.0,
output_convention: str = "pyrokinetics",
gamma_tolerance: float = 0.001,
ky_dim: str = "ky",
theta0_dim: str = "theta0",
):
"""
Please see doi:10.1017/S0022377824001107 for details on this quasi-linear model.
Parameters
----------
Q0: float
Heat flux absolute magnitude from fitted model
alpha: float
Power to raise QL metric Lambda
gamma_exb: float
Rate of ExB shear. Note must be in units matching output_convention
output_convention: str
Choice of output convention
gamma_tolerance: float
Tolerance of growth rate to be included in calculation
ky_dim: str
Name of the dimension in PyroScan object corresponding to ky
theta0_dim
Name of the dimension in PyroScan object corresponding to theta0
Returns
-------
gk_output: xr.Dataset
Dataset containing heat and particle flux with dimensions of Species and
other in original PyroScan object
"""
if not hasattr(self.pyro_scan, "gk_output"):
self.pyro_scan.load_gk_output(output_convention=output_convention)
data = self.pyro_scan.gk_output
pyro = self.pyro_scan.base_pyro
# Units factor to account for training done in pyro units
pyro_units = pyro.norms.pyrokinetics
units = getattr(pyro.norms, output_convention)
kys = data[ky_dim] * data[ky_dim].units
if theta0_dim in data.dims:
theta0s = data[theta0_dim]
else:
theta0s = [0.0]
shat = pyro.local_geometry.shat
theta = data["theta"].data
eigenfunctions = data["eigenfunctions"]
growth_rate_tolerance = data["growth_rate_tolerance"]
growth_rate = data["growth_rate"].where(
growth_rate_tolerance < gamma_tolerance, 0.0
)
heat_tot = data["heat"].sum(dim=("field", "species"))
heat = (
data["heat"]
.where(growth_rate_tolerance < gamma_tolerance, 0.0)
.sum(dim="field")
/ heat_tot
)
particle = (
data["particle"]
.where(growth_rate_tolerance < gamma_tolerance, 0.0)
.sum(dim="field")
/ heat_tot
)
field_squared = (
np.abs(eigenfunctions.where(growth_rate_tolerance < gamma_tolerance, 0.0))
** 2
)
# Set up Jacobian and k_perp
k_perp = field_squared.data * 0.0 * kys.data.units
# Need to load MetricTerms
pyro.load_metric_terms(ntheta=1024)
theta_metric = pyro.metric_terms.regulartheta
# Add one to cover additional theta grids (like in CGYRO)
nperiod = pyro.numerics.nperiod + 1
g_tt = pyro.metric_terms.field_aligned_covariant_metric("theta", "theta")
l_theta = theta_metric
# Geometric theta on grid on evenly spaced l(theta) grid
even_l_theta = np.linspace(-np.pi, np.pi, len(l_theta))
theta_geo = np.interp(even_l_theta, l_theta, theta_metric)
m = np.linspace(-(nperiod - 1), nperiod - 1, 2 * nperiod - 1)
ntheta = len(theta_geo) - 1
m = np.repeat(m, ntheta)
theta_geo_long = np.tile(theta_geo[:-1], 2 * nperiod - 1) + 2.0 * np.pi * m
# Append final point
theta_geo_final = 2 * np.pi * (m[-1]) + np.pi
theta_geo_long = np.append(theta_geo_long, theta_geo_final)
# L(theta) grid on regular theta grid long
m = np.linspace(-(nperiod - 1), nperiod - 1, 2 * nperiod - 1)
ntheta = len(l_theta) - 1
m = np.repeat(m, ntheta)
l_theta_long = np.tile(l_theta[:-1], 2 * nperiod - 1) + 2.0 * np.pi * m
l_theta_final = l_theta[-1] + 2.0 * np.pi * m[-1]
l_theta_long = np.append(l_theta_long, l_theta_final)
for itheta0, theta0 in enumerate(theta0s.data):
theta_long, k_perp_long = pyro.metric_terms.k_perp(
ky=1.0, theta0=theta0, nperiod=nperiod
)
k_perp_interp = np.interp(theta, theta_long, k_perp_long)
for iky, ky in enumerate(kys.data):
# Technically k_perp / ky
k_perp[iky, itheta0, :, :] = k_perp_interp.m * ky
bmag = pyro.metric_terms.B_magnitude
g_tt_long = np.tile(g_tt[:-1], 2 * nperiod - 1)
bmag_long = np.tile(bmag[:-1], 2 * nperiod - 1)
g_tt_long = np.append(g_tt_long, g_tt[-1])
bmag_long = np.append(bmag_long, bmag[-1])
pyro_jacob = pyro.metric_terms.dpsidr * np.sqrt(g_tt_long) / bmag_long
bmag_balloon = np.interp(theta, theta_geo_long, bmag_long)
jacobian_long = pyro_jacob
# Extend onto ballooning space
jacobian = np.interp(theta, theta_long, jacobian_long)[
np.newaxis, np.newaxis, np.newaxis, :
]
# Account for GS2 field normalisation used in training
b_units = bmag_balloon.units
bmag = field_squared * 0.0 * b_units + bmag_balloon
field_correction = xr.where(bmag.field == "bpar", bmag, 1.0 * b_units)
field_correction = xr.where(
field_correction.field == "apar", field_correction * 0.5, field_correction
)
field_squared *= field_correction**-2
# Field ratios
field_factor = np.sqrt(
field_squared.max(dim="theta")
/ field_squared.sel(field="phi").max(dim="theta")
)
# Numerator in Lambda
numerator = (field_squared * jacobian).integrate(coord="theta")
# Denominator
denom = (field_squared * jacobian * k_perp**2).integrate(coord="theta")
# Sum over fields
ql_metric_full = (growth_rate * numerator * field_factor / denom).sum(
dim="field"
) / (units.vref / units.lref * units.rhoref)
# Q_ql = Q_ql * Lambda
heat_ql = heat * ql_metric_full
particle_ql = particle * ql_metric_full
# Find theta0_max
max_gam = growth_rate.max(dim=theta0_dim)
thmax = gamma_exb / (shat * max_gam)
thmax = thmax.where(thmax.pint.dequantify() < np.pi, np.pi)
if len(theta0s) > 1:
thmax = thmax.where(thmax > theta0s[1], theta0s[1])
# Select relevant theta0
heat_theta0 = heat_ql.where(theta0s <= thmax, 0.0) / thmax
particle_theta0 = particle_ql.where(theta0s <= thmax, 0.0) / thmax
ql_metric_theta0 = ql_metric_full.where(theta0s <= thmax, 0.0) / thmax
# Integrate up to theta0_max
heat_ky = heat_theta0.integrate(coord=theta0_dim)
particle_ky = particle_theta0.integrate(coord=theta0_dim)
ql_metric_ky = ql_metric_theta0.integrate(coord=theta0_dim)
else:
# Select relevant theta0 only
heat_ky = heat_ql.where(theta0s == theta0s.data[0], drop=True)
particle_ky = particle_ql.where(theta0s == theta0s.data[0], drop=True)
ql_metric_ky = ql_metric_full.where(theta0s == theta0s.data[0], drop=True)
# Integrate over ky
heat = heat_ky.integrate(coord=ky_dim) * kys.data.units
particle = particle_ky.integrate(coord=ky_dim) * kys.data.units
ql_metric = ql_metric_ky.integrate(coord=ky_dim) * kys.data.units
if not hasattr(Q0, "units"):
Q0 *= (
units.nref * units.tref * units.vref * (units.rhoref / units.lref) ** 2
)
G0 = Q0 / units.tref
Q_gb_pyro_units = (
pyro_units.nref
* pyro_units.tref
* pyro_units.vref
* (pyro_units.rhoref / pyro_units.lref) ** 2
)
units_factor = (
(
1
* Q_gb_pyro_units
/ (pyro_units.vref * pyro_units.rhoref / pyro_units.lref) ** alpha
)
.to(units)
.m
)
qflux = Q0 * heat * ql_metric ** (alpha - 1) * units_factor
Gamma_gb_pyro_units = (
pyro_units.nref
* pyro_units.vref
* (pyro_units.rhoref / pyro_units.lref) ** 2
)
units_factor = (
(
1
* Gamma_gb_pyro_units
/ (pyro_units.vref * pyro_units.rhoref / pyro_units.lref) ** alpha
)
.to(units)
.m
)
# Full flux calculation
gflux = G0 * particle * ql_metric ** (alpha - 1) * units_factor
gk_output = xr.Dataset()
gk_output["heat"] = qflux
gk_output["particle"] = gflux
gk_output["lambda"] = ql_metric_full
return gk_output