r"""`Equilibrium` defines a global 2D Tokamak equilibrium. This can be
used to load a `LocalGeometry` class at a given :math:`\psi_n`.
"""
from __future__ import annotations # noqa
import warnings
from copy import deepcopy
from textwrap import dedent
from typing import TYPE_CHECKING, List, Optional
import numpy as np
from numpy.typing import ArrayLike
from pyloidal.cocos import Transform as TransformCOCOS
from pyloidal.cocos import identify_cocos
from scipy.integrate import cumulative_simpson
from pyrokinetics._version import __version__
from pyrokinetics.dataset_wrapper import DatasetWrapper
from pyrokinetics.file_utils import FileReader, ReadableFromFile
from pyrokinetics.typing import PathLike
from pyrokinetics.units import UnitSpline, UnitSpline2D
from pyrokinetics.units import ureg as units
from .flux_surface import FluxSurface, _flux_surface_contour
from .utils import eq_units
if TYPE_CHECKING:
import matplotlib.pyplot as plt
[docs]
class EquilibriumCOCOSWarning(UserWarning):
pass
[docs]
class Equilibrium(DatasetWrapper, ReadableFromFile):
r"""
Contains a solution of the Grad-Shafranov equation, which defines a tokamak plasma
equilibrium. Users are not expected to initialise ``Equilibrium`` objects directly,
and in most cases should instead make use of the function ``read_equilibrium``,
which can read popular plasma equilibrium file types such as GEQDSK, generated by
tools such as FreeGS_ and EFIT_, or netCDF4 files generated by TRANSP_ or VMEC_.
The user may specify the COCOS convention followed by their inputs. Alternatively,
If supplied with ``b_axis``, ``current``, and (optionally) ``clockwise_phi``, inputs
will be converted to the convention COCOS 11. Otherwise, it is assumed that the
inputs are already COCOS 11.
.. _FreeGS: https://github.com/freegs-plasma/freegs
.. _EFIT: https://omfit.io/modules/mod_EFIT++.html
.. _TRANSP: https://transp.pppl.gov/
.. _VMEC: https://princetonuniversity.github.io/STELLOPT/VMEC
Parameters
----------
R: ArrayLike, units [meter]
Linearly spaced and monotonically increasing 1D grid of major radius
coordinates. This is the radius from the central column of a tokamak, and not
the radial distance from the magnetic axis.
Z: ArrayLike, units [meter]
Linearly spaced and monotonically increasing 1D grid of tokamak z-coordinates.
This is usually the height above the plasma midplane, but z=0 may be set at any
reference point.
psi_RZ: ArrayLike, units [weber]
2D grid defining the poloidal magnetic flux function :math::`\psi` with respect
to ``R`` and ``Z``. Should have the shape ``(len(r), len(z))``. If supplied
in units of Webers per radian (i.e. following COCOS 1 to 8), this will be
converted.
psi: ArrayLike, units [weber]
1D grid defining the poloidal magnetic flux function. This grid defines
magnetic flux surface coordinates, on which most other parameters are defined.
``psi[0]`` should be the value of ``psi`` on the magnetic axis. ``psi`` should
be monotonically increasing or decreasing. If supplied in units of Weber per
radian (i.e. following COCOS 11 to 18), this will be converted.
psi_tor: ArrayLike, units [weber]
1D grid defining the toroidal magnetic flux function ``psi_tor``.
F: ArrayLike, units [meter * tesla]
1D grid defining the poloidal current function with respect to ``psi``. Should
have the same length as ``psi``.
FF_prime: ArrayLike, units [meter**2 * tesla**2 / weber]
1D grid defining the poloidal current function ``f`` multiplied by its
derivative with respect to ``psi``. Should have the same length as ``psi``.
p: ArrayLike, units [pascal]
1D grid defining the plasma pressure with respect to ``psi``. Should have the
same length as ``psi``.
p_prime: ArrayLike, units [pascal / weber]
1D grid defining the derivative of the plasma pressure with respect to ``psi``.
Should have the same length as ``psi``.
q: ArrayLike, units [dimensionless]
1D grid defining the 'safety factor' with respect to ``psi``. Should have same
length as ``psi``.
R_major: ArrayLike, units [meter]
1D grid of the major radius positions of the center of each flux surface with
respect to ``psi``. Should have the same length as ``psi``. This should be given
by the mean of the maximum and minimum major radii of the flux surface.
``R_major[0]`` should be the major radius of the magnetic axis.
r_minor: ArrayLike, units[meter]
1D grid of the minor radius of each flux surface with respect to ``psi``. Should
have the same length as ``psi``. This should be half of the difference between
the maximum and minimum major radii of a flux surface. ``r_minor[0]`` should be
0.0.
Z_mid: ArrayLike, units [meter]
1D grid of the z-midpoint of each flux surface with respace to ``psi``. Should
have the same length as ``psi``. This should be the mean of the maximum and
minimum z-positions of the flux surface. ``Z_mid[0]`` should be the z-position
of the magnetic axis.
psi_lcfs: float, units [weber]
The value of ``psi`` on the last closed flux surface (LCFS).
a_minor: float, units [meter]
The minor radius of the last closed flux surface (LCFS). The minor radius of a
flux surface is half of the difference between its maximum and minimum major
radii.
B_axis: Optional[float], units [tesla]
Vacuum toroidal magnetic field. Used to determine COCOS convention of the
inputs.
current: Optional[float], units [ampere]
Plasma current. Used to determine COCOS convention of the inputs.
clockwise_phi: bool, default False
Determines whether the :math:`\phi` grid increases clockwise or anti-clockwise
when viewed from above. Used to determine COCOS convention of the inputs.
cocos: Optional[int]
Asserts that the inputs follow a particular COCOS convention. If set, prevents
the automatic detection of COCOS conventions using the values of ``b_axis``,
``current``, ``q``, ``psi``, ``r_minor``, and ``clockwise_phi``. Results will
be converted from this convention to COCOS 11. Possible values are between 1
to 8, or between 11 to 18.
eq_type: Optional[str]
A label denoting the type of Equilibrium, such as "GEQDSK", "TRANSP", "VMEC",
etc.
surface_interps: Optional[dict]
A dictionary of interpolations of R, Z and Bpol as a function of psin and theta
Useful for codes that directly spit out these surfaces
Attributes
----------
data: xarray.Dataset
The internal representation of the ``Equilibrium`` object. The functions
``__getattr__`` and ``__getitem__`` redirect most attribute/indexing lookups
here, but the Dataset itself may be accessed directly by the user if they wish
to perform more complex manipulations.
R_axis: float, units [meter]
Major radius position of the magnetic axis.
Z_axis: float, units [meter]
Vertical position of the magnetic axis, in meters.
psi_axis: float, units [weber]
Poloidal magnetic flux function :math:`\psi` on the magnetic axis.
psi_lcfs: float, units [weber]
Poloidal magnetic flux function :math:`\psi` on the last closed flux surface.
a_minor: float, units [meter]
Minor radius of the last closed flux surface.
dR: float, units [meter]
Grid spacing in the radial direction.
dZ: float, units [meter]
Grid spacing in the vertical direction.
eq_type: str
Type of equilibrium.
See Also
--------
read_equilibrium: Create ``Equilibrium`` from a file.
FluxSurface: An individual flux surface created from an ``Equilibrium``.
Notes
-----
Here we make use of COCOS 11 [Sauter et al. 2013], which is the ITER standard for
defining a tokamak coordinate system:
* The cylindrical coordinate system representing the whole Tokamak is right-handed
and follows the ordering :math:`(R,\theta,Z)`.
* When viewed from above the Tokamak, the toroidal angle :math:`\phi` increases in
the *counter-clockwise* direction.
* The coordinate system in the poloidal plane is right-handed and follows the
ordering :math:`(r,\theta,\phi)`.
* When viewing the poloidal plane on the right hand side of the major vertical axis,
the poloidal angle `\theta` increases in the *clockwise* direction.
The Grad-Shafranov equation [1]_ may be written:
.. math::
\frac{\partial^2 \psi}{\partial R^2}
- \frac{1}{R}\frac{\partial\psi}{\partial R}
+ \frac{\partial^2 \psi}{\partial Z^2}
= -\mu_0 r^2 p'(\psi) - F(\psi)F'(\psi)
:math:`\psi` is the poloidal magnetic flux function, defined as:
.. math::
\vec{B_p} = \frac{1}{2\pi r} \nabla\psi \times \nabla\phi
where :math:`\vec{B_p}` is the poloidal magnetic field vector. :math:`\psi` is
arbitrary to within an additive constant. The choice to include the factor of
:math:`2\pi` is part of the COCOS 11 convention. :math:`F` is the poloidal current
function, and is defined similarly:
.. math::
\mu_0 \vec{J_p} = \nabla F \times \nabla\phi
where :math:`\vec{J_p}` is the poloidal current density. Comparing this to Ampere's
law gives the relation:
.. math::
F = R B_\phi
It may be shown that :math:`F` is a function of :math:`\psi` only, as is the plasma
pressure :math:`p`, and the 'safety factor' :math:`q(\psi)`:
.. math::
q = \frac{1}{2\pi}
\int\frac{\vec{B}\cdot\nabla\phi}{\vec{B}\cdot\nabla\theta}d\theta
:math:`q` describes the number of times a magnetic field line wraps around the
toroidal direction before arriving at the same point in the poloidal plane.
:math:`\psi` is often used to index flux surfaces, so one can effectively work
in the poloidal coordinate system :math:`(\psi,\theta,\phi)`. It is common for
researchers to instead make use of the normalised :math:`\psi_n`, which is scaled
so that :math:`\psi_n=0` at the magnetic axis, and :math:`\psi_n=1` at the Last
Closed Flux Surface (LCFS).
.. [1] J. Wesson and D.J. Campbell: "Tokamaks", Oxford University Press, 2011,
chapter 3
"""
# This dict defines the units for each argument to __init__.
# The values are passed to the units.wraps decorator.
_init_units = {
"self": None,
"R": eq_units["len"],
"Z": eq_units["len"],
"psi_RZ": eq_units["psi"],
"psi": eq_units["psi"],
"F": eq_units["F"],
"FF_prime": eq_units["FF_prime"],
"p": eq_units["p"],
"p_prime": eq_units["p_prime"],
"q": eq_units["q"],
"R_major": eq_units["len"],
"r_minor": eq_units["len"],
"Z_mid": eq_units["len"],
"psi_lcfs": eq_units["psi"],
"a_minor": eq_units["len"],
"B_0": eq_units["B"],
"I_p": eq_units["I"],
"clockwise_phi": None,
"cocos": None,
"eq_type": None,
"surface_interps": None,
}
[docs]
@units.wraps(None, [*_init_units.values()], strict=False)
def __init__(
self,
R: ArrayLike,
Z: ArrayLike,
psi_RZ: ArrayLike,
psi: ArrayLike,
F: ArrayLike,
FF_prime: ArrayLike,
p: ArrayLike,
p_prime: ArrayLike,
q: ArrayLike,
R_major: ArrayLike,
r_minor: ArrayLike,
Z_mid: ArrayLike,
psi_lcfs: float,
a_minor: float,
B_0: Optional[float] = None,
I_p: Optional[float] = None,
clockwise_phi: bool = False,
cocos: Optional[int] = None,
eq_type: Optional[str] = None,
surface_interps: Optional[dict] = None,
) -> None:
# Determine COCOS convention
if cocos is not None:
# User has asserted the input COCOS convention
cocos_in = cocos
elif B_0 is not None and I_p is not None:
# Detect the input COCOS convention
cocos_in = identify_cocos(B_0, I_p, q, psi, clockwise_phi, r_minor)[0]
if cocos_in != 11:
warnings.warn(
f"Detected inputs have COCOS {cocos_in}. Converting to 11.",
EquilibriumCOCOSWarning,
)
else:
# Assume the input COCOS convention is already 11
cocos_in = 11
cocos_factors = TransformCOCOS(cocos_in, 11)
# Check the grids R, Z, and psi_RZ
R = np.asarray(R, dtype=float) * eq_units["len"]
Z = np.asarray(Z, dtype=float) * eq_units["len"]
psi_RZ = np.asarray(psi_RZ, dtype=float) * cocos_factors.psi * eq_units["psi"]
# Check that r and z are linearly spaced and increasing 1D grids
for name, grid in {"R": R, "Z": Z}.items():
if len(grid.shape) != 1:
raise ValueError(f"The grid {name} must be 1D.")
diff = np.diff(grid)
if not np.allclose(diff, diff[0], rtol=1e-4):
raise ValueError(f"The grid {name} must linearly spaced.")
if diff[0] <= 0.0:
raise ValueError(f"The grid {name} must have a positive spacing.")
# Check that psi_RZ has the correct dimensions
shape_2d = (len(R), len(Z))
if not np.array_equal(psi_RZ.shape, shape_2d):
raise ValueError(
f"The grid psi_RZ has shape {psi_RZ.shape}. "
f"It should have shape {shape_2d}."
)
# Create bivariate spline and partial derivatives over psi_RZ
self._psi_RZ_spline = UnitSpline2D(R, Z, psi_RZ)
# Splines of R, Z and Bpol as a function of psin and theta
self._surface_interps = surface_interps
# Check the psi grids
psi = np.asarray(psi, dtype=float) * cocos_factors.psi * eq_units["psi"]
F = np.asarray(F, dtype=float) * cocos_factors.f * eq_units["F"]
FF_prime = (
np.asarray(FF_prime, dtype=float)
* cocos_factors.ffprime
* eq_units["FF_prime"]
)
p = np.asarray(p, dtype=float) * eq_units["p"]
p_prime = (
np.asarray(p_prime, dtype=float)
* cocos_factors.pprime
* eq_units["p_prime"]
)
q = np.asarray(q, dtype=float) * cocos_factors.q * eq_units["q"]
R_major = np.asarray(R_major, dtype=float) * eq_units["len"]
r_minor = np.asarray(r_minor, dtype=float) * eq_units["len"]
Z_mid = np.asarray(Z_mid, dtype=float) * eq_units["len"]
@units.wraps(eq_units["psi"], (eq_units["q"], eq_units["psi"]))
def calculate_psi_tor(q, psi):
if psi[-1] < psi[0]:
psi_sign = -1
else:
psi_sign = 1
return psi_sign * cumulative_simpson(q, x=psi * psi_sign, initial=0.0)
psi_tor = calculate_psi_tor(q, psi)
Ip_sign = 1 if I_p is None else int(np.sign(I_p * cocos_factors.plasma_current))
# Ensure psi is 1D and monotonically increasing
if len(psi.shape) != 1:
raise ValueError("The grid psi must be 1D.")
if np.any(np.diff(psi * Ip_sign) <= 0):
raise ValueError("The grid 'sign(I_p) * psi' must have a positive spacing.")
# Ensure all psi grids have the correct shape
psi_grids = {
"F": F,
"FF_prime": FF_prime,
"p": p,
"p_prime": p_prime,
"q": q,
"R_major": R_major,
"r_minor": r_minor,
"Z_mid": Z_mid,
"psi_tor": psi_tor,
}
psi_shape = psi.shape
for name, grid in psi_grids.items():
if not np.array_equal(grid.shape, psi_shape):
raise ValueError(
f"The grid {name} has shape {grid.shape}. "
f"It should have shape {psi_shape}."
)
# Check that floats are valid
psi_lcfs = float(psi_lcfs) * cocos_factors.psi * eq_units["psi"]
if Ip_sign * psi_lcfs < Ip_sign * psi[0]:
raise ValueError(
"psi_lcfs should be greater than psi[0] when I_p > 0, and vice versa when I_p < 0"
)
a_minor = float(a_minor) * eq_units["len"]
if a_minor <= 0.0:
raise ValueError("a_minor should be a positive float.")
# Check r_minor and create normalised version 'rho'
if r_minor[0] != 0.0:
raise ValueError("r_minor[0] should be zero.")
if np.any(np.diff(r_minor) <= 0):
raise ValueError("The grid r_minor must have a positive spacing.")
rho = r_minor / a_minor
# Create normalised 1d psi grid
psi_n = (psi - psi[0]) / (psi_lcfs - psi[0])
# Create normalised rho toroidal
rho_tor = np.sqrt((psi_tor - psi_tor[0]) / (psi_tor[-1] - psi_tor[0]))
# Create spline functions for all psi grids with respect to psi
self._F_psi_spline = UnitSpline(psi, F)
self._FF_prime_psi_spline = UnitSpline(psi, FF_prime)
self._p_psi_spline = UnitSpline(psi, p)
self._p_prime_psi_spline = UnitSpline(psi, p_prime)
self._q_psi_spline = UnitSpline(psi, q)
self._R_major_psi_spline = UnitSpline(psi, R_major)
self._r_minor_psi_spline = UnitSpline(psi, r_minor)
self._Z_mid_psi_spline = UnitSpline(psi, Z_mid)
self._psi_tor_psi_spline = UnitSpline(psi, psi_tor)
# Assemble grids into underlying xarray Dataset
def make_var(dim, val, desc):
return (dim, val, {"units": str(val.units), "long_name": desc})
coords = {
"R": make_var("R_dim", R, "R Major Position"),
"Z": make_var("Z_dim", Z, "Vertical Position"),
"psi": make_var("psi_dim", psi, "Poloidal Flux Function"),
}
data_vars = {
"psi_RZ": make_var(("R_dim", "Z_dim"), psi_RZ, "Poloidal Flux Function"),
"F": make_var("psi_dim", F, "Poloidal Current Function"),
"FF_prime": make_var("psi_dim", FF_prime, "FF'"),
"p": make_var("psi_dim", p, "Plasma Pressure"),
"p_prime": make_var("psi_dim", p_prime, "p'(psi)"),
"q": make_var("psi_dim", q, "Safety Factor"),
"R_major": make_var("psi_dim", R_major, "Flux Surface Radial Midpoint"),
"r_minor": make_var("psi_dim", r_minor, "Flux Surface Width"),
"Z_mid": make_var("psi_dim", Z_mid, "Flux Surface Vertical Midpoint"),
"rho": make_var("psi_dim", rho, "Normalised Flux Surface Width"),
"psi_n": make_var("psi_dim", psi_n, "Normalised Poloidal Flux Function"),
"psi_tor": make_var("psi_dim", psi_tor, "Toroidal Flux"),
"rho_tor": make_var(
"psi_dim", rho_tor, "Normalised square root Toroidal Flux"
),
}
attrs = {
"R_axis": R_major[0],
"Z_axis": Z_mid[0],
"psi_axis": psi[0],
"psi_lcfs": psi_lcfs,
"a_minor": a_minor,
"dR": R[1] - R[0],
"dZ": Z[1] - Z[0],
"eq_type": str(eq_type),
}
if B_0 is not None:
attrs["B_0"] = B_0 * cocos_factors.b_toroidal * eq_units["B"]
if I_p is not None:
attrs["I_p"] = I_p * cocos_factors.plasma_current * eq_units["I"]
super().__init__(data_vars=data_vars, coords=coords, attrs=attrs)
[docs]
@units.wraps(eq_units["psi"], (None, units.dimensionless), strict=False)
def psi(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return actual poloidal magnetic flux function :math:`\psi` for a given
normalised :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Normalised poloidal magnetic flux function, scaled such that
:math:`\psi_n=0` on the magnetic axis and :math:`\psi_n=1` on the last
closed flux surface. Results are undefined outside this range.
Returns
-------
np.ndarray, units [weber]
Actual poloidal magnetic flux. Note that the result is arbitrary within an
additive constant.
"""
# units introduced via self.psi_axis and self.psi_lcfs
return self.psi_axis + np.asanyarray(psi_n) * (self.psi_lcfs - self.psi_axis)
[docs]
@units.wraps(units.dimensionless, (None, eq_units["psi"]), strict=False)
def psi_n(self, psi: ArrayLike) -> np.ndarray:
r"""
Return normalised poloidal magnetic flux function :math:`\psi_n` for a given
actual :math:`\psi`.
Parameters
----------
psi: ArrayLike, units [weber]
Actual poloidal magnetic flux. Note that the result is arbitrary within an
additive constant.
Returns
-------
np.ndarray, units [dimensionless]
Normalised poloidal magnetic flux function, scaled such that
:math:`\psi_n=0` on the magnetic axis and :math:`\psi_n=1` on the last
closed flux surface. Results are undefined outside this range.
"""
psi = np.asanyarray(psi) * eq_units["psi"]
return (psi - self.psi_axis) / (self.psi_lcfs - self.psi_axis)
[docs]
@units.wraps(eq_units["F"], (None, units.dimensionless), strict=False)
def F(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return poloidal current function :math:`F` at the normalised poloidal magnetic
flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter * tesla]
"""
return self._F_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["FF_prime"], (None, units.dimensionless), strict=False)
def FF_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return poloidal current function :math:`F` multiplied by its derivative with
respect to :math:`\psi` for a given normalised poloidal magnetic flux function
:math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter**2 * tesla**2 / weber]
"""
return self._FF_prime_psi_spline(self.psi(psi_n))
[docs]
def F_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return derivative of the poloidal current function :math:`f` with respect to
:math:`\psi` for a given normalised poloidal magnetic flux function
:math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter * tesla / weber]
"""
# Note: Does not need units.wraps, as both self.f and self.FF_prime do instead
return self.FF_prime(psi_n) / self.F(psi_n)
[docs]
@units.wraps(eq_units["p"], (None, units.dimensionless), strict=False)
def p(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return plasma pressure for a given normalised poloidal magnetic flux function
:math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [pascal]
"""
return self._p_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["p_prime"], (None, units.dimensionless), strict=False)
def p_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return derivative of the plasma pressure with respect to :math:`\psi` for a
given normalised poloidal magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [pascal / weber]
"""
return self._p_prime_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["q"], (None, units.dimensionless), strict=False)
def q(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the safety factor for a given normalised poloidal magnetic flux function
:math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [dimensionless]
"""
return self._q_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["q_prime"], (None, units.dimensionless), strict=False)
def q_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the derivative of the safety factor with respect to :math:`\psi` for a
given normalised poloidal magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [weber**-1]
"""
return self._q_psi_spline(self.psi(psi_n), derivative=1)
[docs]
@units.wraps(eq_units["len"], (None, units.dimensionless), strict=False)
def R_major(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the major radius position of the midpoint of the flux surface represented
by a given normalised poloidal magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter]
"""
return self._R_major_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["len_prime"], (None, units.dimensionless), strict=False)
def R_major_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return derivative with respect to :math:`\psi` of the major radius position of
the midpoint of the flux surface represented by a given normalised poloidal
magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter * radian / weber]
"""
return self._R_major_psi_spline(self.psi(psi_n), derivative=1)
[docs]
@units.wraps(eq_units["len"], (None, units.dimensionless), strict=False)
def r_minor(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return half of the width of the flux surface represented by a given normalised
poloidal magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter]
"""
return self._r_minor_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["len_prime"], (None, units.dimensionless), strict=False)
def r_minor_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return derivative with respect to :math:`\psi` of the width of the flux surface
represented by a given normalised poloidal magnetic flux function
:math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter * radian / weber]
"""
return self._r_minor_psi_spline(self.psi(psi_n), derivative=1)
[docs]
@units.wraps(eq_units["len"], (None, units.dimensionless), strict=False)
def Z_mid(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the vertical position of the midpoint of the flux surface represented by
a given normalised poloidal magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter]
"""
return self._Z_mid_psi_spline(self.psi(psi_n))
[docs]
@units.wraps(eq_units["len_prime"], (None, units.dimensionless), strict=False)
def Z_mid_prime(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the derivative with respect to :math:`\psi` of the vertical position of
the midpoint of the flux surface represented by a given normalised poloidal
magnetic flux function :math:`\psi_n`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [meter * radian / weber]
"""
return self._Z_mid_psi_spline(self.psi(psi_n), derivative=1)
[docs]
def rho(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the normalised minor radius of the flux surface represented by a given
normalised poloidal magnetic flux function :math:`\psi_n`. This is the same as
``r_minor/a_minor``.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [dimensionless]
"""
# Note: does not need to use units.wraps, as it defers to self.r_minor
return self.r_minor(psi_n) / self.a_minor
[docs]
@units.wraps(eq_units["psi"], (None, units.dimensionless), strict=False)
def psi_tor(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the toroidal of the flux surface represented by a given
normalised poloidal magnetic flux function :math:`\psi_n`. This is the same as
:math:`\psi_{tor} = \int q d\psi`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [weber]
"""
return self._psi_tor_psi_spline(self.psi(psi_n))
[docs]
def rho_tor(self, psi_n: ArrayLike) -> np.ndarray:
r"""
Return the square root of the normalised toroidal flux of the flux surface
represented by a given normalised poloidal magnetic flux function :math:`\psi_n`.
This is the same as :math:`\sqrt{psi_{tor}/\psi_{tor}^{LCFS}}`.
Parameters
----------
psi_n: ArrayLike, units [dimensionless]
Returns
-------
np.ndarray, units [dimensionless]
"""
# Note: does not need to use units.wraps, as it defers to self.r_minor
return np.sqrt(self.psi_tor(psi_n) / self.psi_tor(1.0))
[docs]
@units.wraps(eq_units["B"], (None, eq_units["len"], eq_units["len"]), strict=False)
def B_radial(self, R: ArrayLike, Z: ArrayLike) -> np.ndarray:
r"""
Return the radial magnetic flux density at the position(s) ``(R, Z)``.
Parameters
----------
R: ArrayLike, units [meter]
Major radius positions.
Z: ArrayLike, units [meter]
Vertical positions. Should have the same shape as ``R``, or be broadcastable
to ``R``.
Returns
-------
np.ndarray, units [tesla]
"""
R = np.asanyarray(R) * eq_units["len"]
Z = np.asanyarray(Z) * eq_units["len"]
return -self._psi_RZ_spline(R, Z, dy=1) / (2 * np.pi * R)
[docs]
@units.wraps(eq_units["B"], (None, eq_units["len"], eq_units["len"]), strict=False)
def B_vertical(self, R: ArrayLike, Z: ArrayLike) -> np.ndarray:
r"""
Return the vertical magnetic flux density at the position(s) ``(R, Z)``.
Parameters
----------
R: ArrayLike, units [meter]
Major radius positions.
Z: ArrayLike, units [meter]
Vertical positions. Should have the same shape as ``R``, or be broadcastable
to ``R``.
Returns
-------
np.ndarray, units [tesla]
"""
R = np.asanyarray(R) * eq_units["len"]
Z = np.asanyarray(Z) * eq_units["len"]
return self._psi_RZ_spline(R, Z, dx=1) / (2 * np.pi * R)
[docs]
@units.wraps(eq_units["B"], (None, eq_units["len"], eq_units["len"]), strict=False)
def B_poloidal(self, R: ArrayLike, Z: ArrayLike) -> np.ndarray:
r"""
Return the magnitude of the polooidal magnetic flux density at the position(s)
``(R, Z)``.
Parameters
----------
R: ArrayLike, units [meter]
Major radius positions.
Z: ArrayLike, units [meter]
Vertical positions. Should have the same shape as ``R``, or be broadcastable
to ``R``.
Returns
-------
np.ndarray, units [tesla]
"""
return np.hypot(self.B_radial(R, Z), self.B_vertical(R, Z))
[docs]
@units.wraps(eq_units["B"], (None, eq_units["len"], eq_units["len"]), strict=False)
def B_toroidal(self, R: ArrayLike, Z: ArrayLike) -> np.ndarray:
r"""
Return the toroidal magnetic flux density at the position(s) ``(R, Z)``.
Parameters
----------
R: ArrayLike, units [meter]
Major radius positions.
Z: ArrayLike, units [meter]
Vertical positions. Should have the same shape as ``R``, or be broadcastable
to ``R``.
Returns
-------
np.ndarray, units [tesla]
"""
R = np.asanyarray(R) * eq_units["len"]
Z = np.asanyarray(Z) * eq_units["len"]
# Get psi along the path, use this to get F
psi = self._psi_RZ_spline(R, Z)
return self.F(self.psi_n(psi)) / R
[docs]
def flux_surface(
self, psi_n: float, surface_interps=False, ntheta=256
) -> FluxSurface:
r"""
Generate a FluxSurface object representing the flux surface with normalised
poloidal magnetic flux function :math:`\psi_n`. This Dataset-like object
contains information such as the path swept out by the flux surface in
``(R, z)`` coordinates, the magnetic flux density along the path, and
quantities such as pressure, safety factor, the poloidal current function
:math:`f`, and their derivatives with respect to :math:`\psi` on the flux
surface. It also contains derivatives with respect to the minor radius of
the flux surface, and normalised versions of :math:`\psi`
psi_n: float
Normalised poloidal flux of FluxSurface to load
surface_interps: bool
Use direct surface interpolation instead of contour fitting
"""
# Get rz contours
# (the 'wraps' decorator used by _flux_surface_contour does not recognise
# xarray DataArrays)
if surface_interps and self._surface_interps is not None:
theta = np.linspace(2 * np.pi, 0, ntheta)
psi_ns = psi_n * np.ones(ntheta)
R = self._surface_interps["R"](psi_ns, theta)
Z = self._surface_interps["Z"](psi_ns, theta)
B_poloidal = self._surface_interps["Bpol"](psi_ns, theta)
else:
R, Z = _flux_surface_contour(
self["R"].data,
self["Z"].data,
self["psi_RZ"].data,
self.R_axis,
self.Z_axis,
self.psi(psi_n),
)
# Get magnetic field quantities around the contour path
B_poloidal = self.B_poloidal(R, Z)
# Get attributes on the flux surface
R_major = self.R_major(psi_n)
r_minor = self.r_minor(psi_n)
Z_mid = self.Z_mid(psi_n)
F = self.F(psi_n)
FF_prime = self.FF_prime(psi_n)
p = self.p(psi_n)
q = self.q(psi_n)
R_major_prime = self.R_major_prime(psi_n)
r_minor_prime = self.r_minor_prime(psi_n)
Z_mid_prime = self.Z_mid_prime(psi_n)
p_prime = self.p_prime(psi_n)
q_prime = self.q_prime(psi_n)
magnetic_shear = (r_minor / q) * (q_prime / r_minor_prime)
shafranov_shift = R_major_prime / r_minor_prime
midplane_shift = Z_mid_prime / r_minor_prime
pressure_gradient = p_prime / r_minor_prime
psi_gradient = 1.0 / r_minor_prime
return FluxSurface(
R=R,
Z=Z,
B_poloidal=B_poloidal,
R_major=R_major,
r_minor=r_minor,
Z_mid=Z_mid,
F=F,
FF_prime=FF_prime,
p=p,
q=q,
magnetic_shear=magnetic_shear,
shafranov_shift=shafranov_shift,
midplane_shift=midplane_shift,
pressure_gradient=pressure_gradient,
psi_gradient=psi_gradient,
a_minor=self.a_minor,
)
[docs]
def plot(
self,
quantity: str,
ax: Optional[plt.Axes] = None,
psi_n: bool = False,
show: bool = False,
x_label: Optional[str] = None,
y_label: Optional[str] = None,
**kwargs,
) -> plt.Axes:
r"""
Plot a quantity defined on the :math:`\psi` grid.
Parameters
----------
quantity: str
Name of the quantity to plot. Must be defined over the grid ``psi``.
ax: Optional[plt.Axes]
Axes object on which to plot. If not provided, a new figure is created.
psi_n: bool, default False
If True, plot against normalised :math:`\psi_n`. Otherwise, plot against
:math:`\psi`.
show: bool, default False
Immediately show Figure after creation.
x_label: Optional[str], default None
Overwrite the default x label. Set to an empty string ``""`` to disable.
y_label: Optional[str], default None
Overwrite the default y label. Set to an empty string ``""`` to disable.
**kwargs
Additional arguments to pass to Matplotlib's ``plot`` call.
Returns
-------
plt.Axes
The Axes object created after plotting.
Raises
------
ValueError
If ``quantity`` is not a quantity defined over the :math:`\psi` grid,
or is not the name of an Equilibrium quantity.
"""
import matplotlib.pyplot as plt
if quantity not in self.data_vars:
raise ValueError(
f"Must be provided with a quantity defined on the psi grid."
f"The quantity '{quantity}' is not recognised."
)
quantity_dims = self[quantity].dims
if "psi_dim" not in quantity_dims or len(quantity_dims) != 1:
raise ValueError(
f"Must be provided with a quantity defined on the psi grid."
f"The quantity '{quantity}' has coordinates {quantity_dims}."
)
if ax is None:
_, ax = plt.subplots(1, 1)
x_data = self["psi_n" if psi_n else "psi"]
if x_label is None:
x_label = x_data.long_name
if x_data.data.units != "":
x_label += f" / ${x_data.data.units:L~}$"
y_data = self[quantity]
if y_label is None:
y_label = y_data.long_name
if y_data.data.units != "":
y_label += f" / ${y_data.data.units:L~}$"
ax.plot(x_data.data.magnitude, y_data.data.magnitude, **kwargs)
if x_label != "":
ax.set_xlabel(x_label)
if y_label != "":
ax.set_ylabel(y_label)
if show:
plt.show()
return ax
[docs]
def contour(
self,
ax: Optional[plt.Axes] = None,
cbar: bool = True,
aspect: bool = False,
show: bool = False,
x_label: Optional[str] = None,
y_label: Optional[str] = None,
z_label: Optional[str] = None,
**kwargs,
) -> plt.Axes:
r"""
Plot :math:`\psi` over the :math:`(R, Z)` grid using a 2D coloured contour plot.
Uses Matplotlib's ``contourf``.
Parameters
----------
ax: Optional[plt.Axes]
Axes object on which to plot. If not provided, a new figure is created.
cbar: bool, default True
If True, builds a colourbar next to the generated plot.
aspect: bool, default False
If True, ensures the axes have the correct aspect ratio. If the user
supplies their own ``ax``, has no effect.
show: bool, default False
Immediately show Figure after creation.
x_label: Optional[str], default None
Overwrite the default x label. Set to an empty string ``""`` to disable.
y_label: Optional[str], default None
Overwrite the default y label. Set to an empty string ``""`` to disable.
z_label: Optional[str], default None
Overwrite the default colorbar label. Set to an empty string ``""`` to
disable. Ignored if ``cbar`` is False.
**kwargs
Additional arguments to pass to Matplotlib's ``contourf`` call.
Returns
-------
plt.Axes
The Axes object created after plotting.
"""
import matplotlib.pyplot as plt
if ax is None:
_, ax = plt.subplots(1, 1)
if aspect:
ax.set_aspect("equal")
x_data = self["R"]
if x_label is None:
x_label = f"{x_data.long_name} / ${x_data.data.units:L~}$"
y_data = self["Z"]
if y_label is None:
y_label = f"{y_data.long_name} / ${y_data.data.units:L~}$"
z_data = self["psi_RZ"]
if z_label is None:
z_label = f"{z_data.long_name} / ${z_data.data.units:L~}$"
x_grid, y_grid = np.meshgrid(
x_data.data.magnitude, y_data.data.magnitude, indexing="ij"
)
im = ax.contourf(x_grid, y_grid, z_data.data.magnitude, **kwargs)
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
if cbar:
fig = ax.get_figure()
colorbar = fig.colorbar(im, ax=ax)
colorbar.set_label(z_label)
if show:
plt.show()
return ax
def __deepcopy__(self, memodict):
"""Copy Equilibrium object in full, following references down the stack."""
# Create new object without calling __init__
new_equilibrium = Equilibrium.__new__(Equilibrium)
# Deep copy each member individually
for key, value in vars(self).items():
setattr(new_equilibrium, key, deepcopy(value, memodict))
return new_equilibrium
[docs]
def upscale(self, R_upscale: int, Z_upscale: int):
ds = self.data
# Get original 1D coordinates and their dimension names
R_old = ds["R"].data
Z_old = ds["Z"].data
R_dim = ds["R"].dims[0] # expected "R_dim"
Z_dim = ds["Z"].dims[0] # expected "Z_dim"
# Build new (uniform) upscaled coordinate arrays
R_new = np.linspace(np.min(R_old), np.max(R_old), R_upscale * len(R_old))
Z_new = np.linspace(np.min(Z_old), np.max(Z_old), Z_upscale * len(Z_old))
# Evaluate the spline on the new grid
# Prefer passing 1D arrays if your spline follows RectBivariateSpline semantics:
# psi_new = self._psi_RZ_spline(R_new, Z_new) # shape (len(R_new), len(Z_new))
# If your UnitSpline2D expects mesh inputs, keep this (ensure 'ij' indexing):
RR, ZZ = np.meshgrid(R_new, Z_new, indexing="ij")
psi_new = self._psi_RZ_spline(RR, ZZ) # shape (len(R_new), len(Z_new))
# --- Detach old indexes/coords to avoid alignment conflicts ---
# If R_dim/Z_dim are used as pandas indexes, reset them.
if R_dim in ds.indexes:
ds = ds.reset_index(R_dim, drop=True)
if Z_dim in ds.indexes:
ds = ds.reset_index(Z_dim, drop=True)
# Drop old coordinate variables so we can replace them cleanly
for coord_name in ["R", "Z"]:
if coord_name in ds.variables:
ds = ds.drop_vars(coord_name)
# Update coordinates with explicit dims
ds = ds.assign_coords(
R=(R_dim, R_new), Z=(Z_dim, Z_new), psi_RZ=((R_dim, Z_dim), psi_new)
)
# Update psi_RZ data variable with explicit dims
ds = ds.assign(psi_RZ=((R_dim, Z_dim), psi_new))
ds["R"].attrs["units"] = ds["R"].attrs.get("units", "m")
ds["Z"].attrs["units"] = ds["Z"].attrs.get("units", "m")
ds["psi_RZ"].attrs["units"] = ds["psi_RZ"].attrs.get("units", "Wb")
# Save back
self.data = ds
[docs]
@classmethod
def from_netcdf(
cls,
path: PathLike,
*args,
overwrite_metadata: bool = False,
overwrite_title: Optional[str] = None,
**kwargs,
) -> "Equilibrium":
"""Initialise an Equilibrium from a Pyrokinetics netCDF file.
This behaves like DatasetWrapper.from_netcdf, but also recreates the
internal spline objects so that methods depending on them (e.g.
B_radial, q, flux_surface, etc.) continue to work after reload.
"""
# Use the base implementation to build the Dataset-backed instance
instance = super().from_netcdf(
path,
*args,
overwrite_metadata=overwrite_metadata,
overwrite_title=overwrite_title,
**kwargs,
)
# Recreate 2D psi(R, Z) spline
R = instance["R"].data
Z = instance["Z"].data
psi_RZ = instance["psi_RZ"].data
instance._psi_RZ_spline = UnitSpline2D(R, Z, psi_RZ)
# Recreate 1D splines of psi-dependent profiles
psi = instance["psi"].data
F = instance["F"].data
FF_prime = instance["FF_prime"].data
p = instance["p"].data
p_prime = instance["p_prime"].data
q = instance["q"].data
R_major = instance["R_major"].data
r_minor = instance["r_minor"].data
Z_mid = instance["Z_mid"].data
psi_tor = instance["psi_tor"].data
instance._F_psi_spline = UnitSpline(psi, F)
instance._FF_prime_psi_spline = UnitSpline(psi, FF_prime)
instance._p_psi_spline = UnitSpline(psi, p)
instance._p_prime_psi_spline = UnitSpline(psi, p_prime)
instance._q_psi_spline = UnitSpline(psi, q)
instance._R_major_psi_spline = UnitSpline(psi, R_major)
instance._r_minor_psi_spline = UnitSpline(psi, r_minor)
instance._Z_mid_psi_spline = UnitSpline(psi, Z_mid)
instance._psi_tor_psi_spline = UnitSpline(psi, psi_tor)
return instance
[docs]
class EquilibriumReaderPyro(FileReader, file_type="Pyrokinetics", reads=Equilibrium):
"""
An Equilibrium reader class for netcdf files generated from Pyrokinetics Equilibrium
objects. These can be created using ``eq.to_netcdf("my_filename.nc")``.
"""
[docs]
def read_from_file(self, filename: PathLike, **kwargs) -> Equilibrium:
self.verify_file_type(filename)
eq = Equilibrium.from_netcdf(filename, **kwargs)
if eq.software_version != __version__:
warnings.warn(dedent(f"""\
Pyrokinetics Equilibrium object {filename} may not be compatible.
It was created with version {eq.software_version}, while this is
version {__version__}.
"""))
return eq
[docs]
def verify_file_type(self, filename: PathLike) -> None:
import xarray as xr
ds = xr.open_dataset(filename)
if ds.software_name != "Pyrokinetics":
raise ValueError
if ds.object_type != "Equilibrium":
raise ValueError
# Plain function versions of Equilibrium classmethods:
[docs]
def read_equilibrium(
path: PathLike, file_type: Optional[str] = None, **kwargs
) -> Equilibrium:
r"""A plain-function alternative to ``Equilibrium.from_file``."""
return Equilibrium.from_file(path, file_type=file_type, **kwargs)
[docs]
def supported_equilibrium_types() -> List[str]:
r"""A plain-function alternative to ``Equilibrium.supported_file_types``."""
return Equilibrium.supported_file_types()