from typing import Optional
# Can't use xarray, as TRANSP has a variable called X which itself has a dimension
# called X
import netCDF4 as nc
import numpy as np
from scipy.interpolate import RBFInterpolator
from ..file_utils import FileReader
from ..typing import PathLike
from ..units import UnitSpline
from ..units import ureg as units
from .equilibrium import Equilibrium
[docs]
class EquilibriumReaderTRANSP(FileReader, file_type="TRANSP", reads=Equilibrium):
r"""
Class that can read TRANSP equilibrium files. Rather than creating instances of this
class directly, users are recommended to use the function `read_equilibrium`.
See Also
--------
Equilibrium: Class representing a global tokamak equilibrium.
read_equilibrium: Read an equilibrium file, return an `Equilibrium`.
"""
[docs]
def read_from_file(
self,
filename: PathLike,
time: Optional[float] = None,
time_index: Optional[int] = None,
nR: Optional[int] = None,
nZ: Optional[int] = None,
clockwise_phi: bool = False,
cocos: Optional[int] = None,
neighbors: Optional[int] = 16,
) -> Equilibrium:
r"""
Read in TRANSP netCDF and creates Equilibrium object.
TRANSP makes use of radial grids, and these are interpolated onto a Cartesian
RZ grid. Additional keyword-only arguments may be provided to control the
resolution of the Cartesian grid, and to choose the time at which the
equilibrium is taken.
Parameters
----------
filename: PathLike
Path to the TRANSP netCDF file.
time: Optional[float]
The time, in seconds, at which equilibrium data should be taken. Data will
be drawn from the time closest to the provided value. Users should only
provide one of ``time`` or ``time_index``. If neither is provided, data is
drawn at the last time stamp.
time_index: Optional[int]
As an alternative to providing the time directly, users may provide the
index of the desired time stamp.
nR: Optional[int]
The number of grid points in the major radius direction. By default, this
is set to the number of radial grid points in the TRANSP file.
nZ: Optional[int]
The number of grid points in the vertical direction. By default, this
is set to the number of radial grid points in the TRANSP file.
clockwise_phi: bool, default False
Determines whether the :math:`\phi` grid increases clockwise or
anti-clockwise when viewed from above. Used to determine COCOS convention of
the inputs.
cocos: Optional[int]
If set, asserts that the GEQDSK file follows that COCOS convention, and
neither ``clockwise_phi`` nor the file contents will be used to identify
the actual convention in use. The resulting Equilibrium is always converted
to COCOS 11.
neighbors: Optional[int]
Sets number of nearest neighbours to use when performing the interpolation
to flux surfaces to R,Z. By default, this is 16
Raises
------
FileNotFoundError
If ``filename`` is not a valid file.
RuntimeError
If the user provides both ``time`` and ``time_index``.
ValueError
If nr or nz are negative.
IndexError
If ``time_index`` is provided, but is out of bounds.
Returns
-------
Equilibrium
"""
# Define some units to be used later
# Note that length units are in centimeters!
# This is not consistent throughout. Pressure is in Pascal as usual, not
# Newtons per centimeter^2. However, it does affect our units for F.
len_units = units.cm
psi_units = units.weber / units.radian
if time_index is not None and time is not None:
raise RuntimeError("Cannot set both 'time' and 'time_index'")
with nc.Dataset(filename) as data:
# Determine time at which we should read data
time_cdf = data["TIME3"][:]
if time_index is None:
time_index = -1 if time is None else np.argmin(np.abs(time_cdf - time))
# Determine factors on the psi grid
# TRANSP uses several grids: 'XB', the minor radius of each flux surface
# divided by the minor radius of the last closed flux surface; 'X', the
# points in between 'XB' and the previous points, with an implied zero at
# the first point, and "RMAJM", the major radius extents of each flux
# surface. The latter includes the position at the magnetic axis, so we
# should prefer quantities on this grid. However, we only need the values on
# the low-field side (LFS), so we should discard the values on the
# high-field side (HFS). We wish to have variables and their derivatives in
# terms of psi.
axis_idx = np.argmin(data["PLFMP"][time_index])
rmajm = np.asarray(data["RMAJM"][time_index]) * len_units
psi = np.asarray(data["PLFMP"][time_index, axis_idx:]) * psi_units
# F is not given directly, so we must compute it using:
# F = B_t * R
# The toroidal B field is also not given directly, so this is calculated
# using:
# B_t = (|Bt| / |B|) * |B| * sign(B_t)
Bt_vacuum = np.asarray(data["BTX"][time_index, axis_idx:])
Bt_total = np.asarray(data["FBTX"][time_index, axis_idx:]) * Bt_vacuum
# Try to read sign of Btor directly, other determine from neoclassical poloidal velocity
try:
Bt_sign = np.sign(data["BPHI_MSE"][time_index, 0].data)
except IndexError:
Bt_sign = np.sign(data["VPOL_AVG"][time_index, 0].data)
F = Bt_sign * Bt_total * units.tesla * rmajm[axis_idx:]
B_0 = Bt_sign * Bt_vacuum[0] * units.tesla
# ffprime is determined by fitting a spline and taking its derivative.
# We'll use UnitSpline to ensure units are carried forward.
FF_prime = F * UnitSpline(psi, F)(psi, derivative=1)
# PMHD_IN lives on TRANSP's zone-centre grid X, while psi[1:]
# (= PLFMP at the off-axis LFS RMAJM points) is sampled at the
# zone boundaries XB. The two are offset by half a cell, so to
# build the pressure-vs-psi spline we resample psi onto X with
# a (rho_tor=0, psi=psi_axis) anchor at the magnetic axis to
# cover the inner half-cell.
# We use 'PMHD_IN', the 'pressure input to MHD solver', as the
# plasma pressure 'PPLAS' is the thermal pressure only.
X_grid = np.asarray(data["X"][time_index, :])
XB_grid = np.asarray(data["XB"][time_index, :])
psi_at_X = (
np.interp(
X_grid,
np.concatenate(([0.0], XB_grid)),
np.concatenate(([psi[0].magnitude], psi[1:].magnitude)),
)
* psi_units
)
p_input = np.asarray(data["PMHD_IN"][time_index]) * units.pascal
p_spline = UnitSpline(psi_at_X, p_input)
p = p_spline(psi)
p_prime = p_spline(psi, derivative=1)
# Q is given directly. We use "QMP" instead of "Q" as this includes the
# magnetic axis
q = np.asarray(data["QMP"][time_index, axis_idx:]) * units.dimensionless
# z_mid can be obtained using "YMPA" and "YAXIS"
Z_mid = np.empty(len(psi)) * len_units
Z_mid[0] = np.asarray(data["YAXIS"][time_index]) * len_units
Z_mid[1:] = np.asarray(data["YMPA"][time_index]) * len_units
# Determine r, z, psi_rz
# We'll ignore units for now, as they can misbehave around interpolation
# routines. They're reintroduced at the end.
# Begin by calculating flux surfaces from moments up to 17
# The length of the theta grid is fixed at 256.
ntheta = 256
theta = np.linspace(0, 2 * np.pi, ntheta)
# Begin with the 0th moment. Surface grids have shape (nradial, ntheta).
# There is no 0th sin moment, as it's 0 by definition.
R_surface = np.outer(np.asarray(data["RMC00"][time_index]), np.ones(ntheta))
Z_surface = np.outer(np.asarray(data["YMC00"][time_index]), np.ones(ntheta))
# Add moments 1 to 17
for mom in range(1, 17):
# Exit early if this moment doesn't exist
if f"RMC{mom:02d}" not in data.variables:
break
c = np.cos(mom * theta)
s = np.sin(mom * theta)
R_surface += np.outer(np.asarray(data[f"RMC{mom:02d}"][time_index]), c)
R_surface += np.outer(np.asarray(data[f"RMS{mom:02d}"][time_index]), s)
Z_surface += np.outer(np.asarray(data[f"YMC{mom:02d}"][time_index]), c)
Z_surface += np.outer(np.asarray(data[f"YMS{mom:02d}"][time_index]), s)
# R_major can be obtained from the flux surfaces
R_major = np.empty(len(psi)) * len_units
R_major[0] = np.asarray(data["RAXIS"][time_index]) * len_units
R_major[1:] = (
(np.max(R_surface, axis=1) + np.min(R_surface, axis=1)) / 2 * len_units
)
# r_minor can be obtained from the flux surfaces
r_minor = np.empty(len(psi)) * len_units
r_minor[0] = 0.0 * len_units
r_minor[1:] = (
(np.max(R_surface, axis=1) - np.min(R_surface, axis=1)) / 2 * len_units
)
# Combine arrays into shape (nradial*ntheta, 2), such that [i,0] is the
# major radius and [i,1] is the vertical position of coordinate i.
surface_coords = np.stack((R_surface.ravel(), Z_surface.ravel()), -1)
# Get psi at each of these coordinates. Discard the value on the mag. axis.
surface_psi = np.repeat(psi.magnitude[1:], ntheta)
# Create interpolator we can use to interpolate to RZ grid.
psi_interp = RBFInterpolator(
surface_coords, surface_psi, kernel="cubic", neighbors=neighbors
)
# Convert to RZ grid.
# Lengths are the same as the netCDF radial grid if nr, nz not provided.
nR = R_surface.shape[0] if nR is None else int(nR)
nZ = R_surface.shape[0] if nZ is None else int(nZ)
R = np.linspace(min(R_surface[-1, :]), max(R_surface[-1, :]), nR)
Z = np.linspace(min(Z_surface[-1, :]), max(Z_surface[-1, :]), nZ)
RZ_coords = np.stack([x.ravel() for x in np.meshgrid(R, Z)], -1)
try:
psi_RZ = psi_interp(RZ_coords).reshape((nZ, nR)).T
except np.linalg.LinAlgError as err:
if "Singular matrix" in str(err):
raise ValueError(
"Interpolation resulted in singular matrix. Try increasing number of nearest neighbors in "
"eq_kwargs"
)
else:
raise
# Get current and b_axis to determine COCOS
# TRANSP files define the following possibilities:
# - MEASURED PLASMA CURRENT, PCUR
# - EQ PLASMA CURRENT, PCUREQ (seems to be zero in some files?)
# - CALCULATED PLASMA CURRENT, PCURC
# - TOTAL PLASMA CURRENT, CUR (units of amps per centimeter^2)
# Here we use PCURC, as it is the same as PCUR in an interpretive run,
# but is also present in a predictive run.
I_p = data["PCURC"][time_index] * units.ampere
return Equilibrium(
R=R * units.cm,
Z=Z * units.cm,
psi_RZ=psi_RZ * psi_units,
psi=psi,
F=F,
FF_prime=FF_prime,
p=p,
p_prime=p_prime,
q=q,
R_major=R_major,
r_minor=r_minor,
Z_mid=Z_mid,
psi_lcfs=data["PLFLXA"][time_index][()] * psi_units,
a_minor=r_minor[-1],
B_0=B_0,
I_p=I_p,
clockwise_phi=clockwise_phi,
cocos=cocos,
eq_type="TRANSP",
)
[docs]
def verify_file_type(self, filename: PathLike) -> None:
"""Quickly verify that we're looking at a TRANSP file without processing"""
# Try opening data file. If it doesn't exist or isn't netcdf, this will fail.
data = nc.Dataset(filename)
try:
# Given it is a netcdf, check it has the attribute TRANSP_version
data.TRANSP_version
except AttributeError as exc:
# Failing this, check for a subset of expected data_vars
var_names = ["TIME3", "XB", "RAXIS", "YAXIS", "PSI0_TR", "PLFLXA", "PLFMP"]
if not np.all(np.isin(var_names, list(data.variables))):
var_str = "', '".join(var_names)
raise ValueError(
f"The netCDF {filename} can't be read as a TRANSP file. "
f"The following vars are needed: '{var_str}'."
) from exc
finally:
data.close()