from typing import Optional
import numpy as np
from scipy.constants import mu_0
from scipy.interpolate import CloughTocher2DInterpolator, RBFInterpolator
from ..constants import electron_charge
from ..file_utils import FileReader
from ..typing import PathLike
from ..units import UnitCloughTocher2DInterpolator, UnitSpline
from ..units import ureg as units
from .equilibrium import Equilibrium
[docs]
def read_eqin(filename_or_file):
def token_generator(fp):
for line in fp:
for tok in line.split():
yield tok
with open(filename_or_file) as f:
f.readline()
tokens = token_generator(f)
try:
npsi, npol = int(next(tokens)), int(next(tokens))
except (StopIteration, ValueError):
raise IOError("Second line should contain Npsi and Npol")
data_dict = {"npsi": npsi, "npol": npol}
while True:
try:
varname = next(tokens).rstrip(":")
except StopIteration:
break
try:
if varname.lower() in {"r", "z"} or varname.startswith("B"):
data = np.array([float(next(tokens)) for _ in range(npsi * npol)])
data = data.reshape((npsi, npol), order="F")
elif varname in {"Zeff", "Zimp", "Aimp", "Amain"}:
data = float(next(tokens))
else:
data = np.array([float(next(tokens)) for _ in range(npsi)])
except (StopIteration, ValueError):
raise IOError(f"Error while reading {varname}")
data_dict[varname] = data
necessary_keys = ["Psi", "fpol", "q", "R", "z"]
if not all(key in data_dict.keys() for key in necessary_keys):
raise ValueError(
"Missing equilibrium data in ELITEINP file."
" Currently only HELENA ELITEINP files are supported, please"
" raise issue if another source (like) SCENE is needed"
)
if "Psi" in data_dict:
data_dict["Psi"] = data_dict["Psi"] * units.weber / units.radian
if "dp/dpsi" in data_dict:
data_dict["p_prime"] = (
data_dict["dp/dpsi"] * units.pascal / (units.weber / units.radian)
)
if "ffp" in data_dict:
data_dict["ff_prime"] = data_dict["ffp"] * (
units.tesla**2 * units.meter**2 * units.radian / units.weber
)
if "Bt" not in data_dict and "fpol" in data_dict and "R" in data_dict:
fpol = data_dict["fpol"]
R = data_dict["R"] # shape (npsi, npol)
data_dict["Bt"] = fpol[:, None] * R # shape (npsi, npol)
else:
raise ValueError("Missing Bt data in ELITEINP file")
if "p" not in data_dict:
if "ne" not in data_dict or "Te" not in data_dict:
raise ValueError(
"No electron density and temperature data found in ELITEINP"
)
if "nMainIon" not in data_dict or "Ti" not in data_dict:
raise ValueError(
"No main ion density and temperature data found in ELITEINP"
)
data_dict["p"] = (
data_dict["ne"] * (data_dict["Te"] * electron_charge.m)
) * units.pascal
data_dict["p"] += (
data_dict["nMainIon"] * (data_dict["Ti"] * electron_charge.m)
) * units.pascal
if "nZ" in data_dict and "Ti" in data_dict:
data_dict["p"] += (
data_dict["nZ"] * (data_dict["Ti"] * electron_charge.m)
) * units.pascal
return data_dict
[docs]
class EquilibriumReaderELITEINP(FileReader, file_type="ELITEINP", reads=Equilibrium):
[docs]
def read_from_file(
self,
filename: PathLike,
clockwise_phi: bool = False,
cocos: Optional[int] = None,
grid_length: Optional[int] = None,
interpolator: int = 0,
) -> Equilibrium:
data = read_eqin(filename)
# Units
len_units = units.meter
psi = data["Psi"]
psi_n = psi / psi[-1]
npsi = len(psi)
ntheta = data["npol"]
F = data["fpol"] * units.tesla * units.meter
p = data["p"]
q = data["q"] * units.dimensionless
FF_prime = F * UnitSpline(psi, F)(psi, derivative=1)
p_prime = UnitSpline(psi, p)(psi, derivative=1)
# Flux surface contours
R2D = data["R"] * len_units
Z2D = data["z"] * len_units
Bpol2D = data["Bp"] * units.tesla
# R_major can be obtained from the flux surfaces
R_major = (np.max(R2D, axis=1) + np.min(R2D, axis=1)) / 2
r_minor = (np.max(R2D, axis=1) - np.min(R2D, axis=1)) / 2
Z_mid = (np.max(Z2D, axis=1) + np.min(Z2D, axis=1)) / 2
a_minor = r_minor[-1]
# Flatten known grid and add axis once
surface_coords = np.stack((R2D[1:, :-1].m.ravel(), Z2D[1:, :-1].m.ravel()), -1)
surface_coords = np.append(
surface_coords, np.array([[R2D[0, 0].m, Z2D[0, 0].m]]), axis=0
)
Z_range = (np.max(Z2D, axis=1) - np.min(Z2D, axis=1)) / 2
Z_range[0] *= np.nan
Zind_upper = np.argmax(Z2D, axis=-1)
Zind_lower = np.argmin(Z2D, axis=-1)
rows = np.arange(Z2D.shape[0])
R_upper = R2D[rows, Zind_upper]
R_lower = R2D[rows, Zind_lower]
theta = np.arcsin((Z2D - Z_mid[:, None]) / (Z_range[:, None]))
theta = np.where(
(R2D <= R_upper[:, None]) & (Z2D > Z_mid[:, None]), np.pi - theta, theta
)
theta = np.where(
(R2D <= R_lower[:, None]) & (Z2D <= Z_mid[:, None]), np.pi - theta, theta
)
theta = np.where(
(R2D > R_lower[:, None]) & (Z2D <= Z_mid[:, None]), 2 * np.pi + theta, theta
)
# Remove axis NaN
theta[0, :] = np.linspace(0, 2 * np.pi, ntheta, endpoint=True)
theta[:, 0] = np.where(
theta[:, 0] < np.pi, theta[:, 0], theta[:, 0] - 2 * np.pi
)
# Set up R, Z and Bpol interpolator
surface_psin_theta = np.repeat(psi_n.m, ntheta)
psin_theta = np.stack((surface_psin_theta.ravel(), theta.m.ravel()), -1)
R2D_interp = UnitCloughTocher2DInterpolator(psin_theta, R2D.ravel())
Z2D_interp = UnitCloughTocher2DInterpolator(psin_theta, Z2D.ravel())
Bpol2D_interp = UnitCloughTocher2DInterpolator(psin_theta, Bpol2D.ravel())
surface_interps = {
"R": R2D_interp,
"Z": Z2D_interp,
"Bpol": Bpol2D_interp,
}
# Psi(R,Z) interpolator
surface_psi = np.repeat(psi[1:].magnitude, ntheta - 1)
surface_psi = np.append(surface_psi, psi[0].m)
psi_next = 2 * psi[-1] - psi[-2]
if interpolator == 0:
psi_interp = CloughTocher2DInterpolator(
surface_coords, surface_psi, fill_value=psi_next.m
)
else:
psi_interp = RBFInterpolator(surface_coords, surface_psi, neighbors=256)
if grid_length:
npsi = grid_length
R = np.linspace(min(R2D[-1, :]), max(R2D[-1, :]), npsi).m
Z = np.linspace(min(Z2D[-1, :]), max(Z2D[-1, :]), npsi).m
RZ_coords = np.stack([x.ravel() for x in np.meshgrid(R, Z)], -1)
psi_RZ = psi_interp(RZ_coords).reshape((npsi, npsi)).T * psi.units
# Magnetic field strength at axis
B_0 = F[0] / R_major[0]
# Infer total current
Ip = data.get("Ip")
if Ip is None:
try:
dpsi = np.gradient(psi.magnitude)
I_profile = (
2
* np.pi
* np.mean(data["R"])
* np.mean(data["Bt"])
/ (mu_0 * data["q"])
* dpsi
)
Ip = np.trapezoid(I_profile, psi.magnitude)
except Exception as e:
raise IOError(f"Could not infer total current: {e}")
Ip *= units.ampere
return Equilibrium(
R=R,
Z=Z,
psi_RZ=psi_RZ,
psi=psi,
F=F,
FF_prime=FF_prime,
p=p,
p_prime=p_prime,
q=q,
R_major=R_major,
r_minor=r_minor,
Z_mid=Z_mid,
psi_lcfs=psi[-1],
a_minor=a_minor,
B_0=B_0,
I_p=Ip,
clockwise_phi=clockwise_phi,
cocos=cocos,
eq_type="ELITEINP",
surface_interps=surface_interps,
)
[docs]
def verify_file_type(self, filename: PathLike) -> None:
with open(filename, "r") as f:
head = f.read(50).upper()
if "HELENA GENERATED INPUT" not in head and "Psi:" not in head:
raise ValueError(
f"{filename} does not appear to be an ELITEINP .eqin file."
" Currently only HELENA ELITEINP files are supported, please"
" raise issue if another source (like) SCENE is needed"
)