Source code for pyrokinetics.gk_code.gene
import copy
import csv
import logging
import re
import struct
import warnings
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import f90nml
import h5py
import numpy as np
from cleverdict import CleverDict
from ..constants import deuterium_mass, electron_mass, pi
from ..file_utils import FileReader
from ..local_geometry import (
LocalGeometry,
LocalGeometryFourierGENE,
LocalGeometryMiller,
LocalGeometryMillerTurnbull,
LocalGeometryMXH,
MetricTerms,
default_fourier_gene_inputs,
default_miller_inputs,
default_miller_turnbull_inputs,
default_mxh_inputs,
)
from ..local_species import LocalSpecies
from ..normalisation import SimulationNormalisation as Normalisation
from ..normalisation import convert_dict, ureg
from ..numerics import Numerics
from ..templates import gk_templates
from ..typing import PathLike
from ..units import PyroContextError
from .gk_input import GKInput
from .gk_output import Coords, Eigenvalues, Fields, Fluxes, GKOutput, Moments
[docs]
class GKInputGENE(GKInput, FileReader, file_type="GENE", reads=GKInput):
"""
Class that can read GENE input files, and produce
Numerics, LocalSpecies, and LocalGeometry objects
"""
code_name = "GENE"
default_file_name = "input.gene"
norm_convention = "gene"
_convention_dict = {}
_drhotor_dr = 1.0
pyro_gene_miller = {
"q": ["geometry", "q0"],
"kappa": ["geometry", "kappa"],
"s_kappa": ["geometry", "s_kappa"],
"delta": ["geometry", "delta"],
"s_delta": ["geometry", "s_delta"],
"shat": ["geometry", "shat"],
"shift": ["geometry", "drr"],
"dZ0dr": ["geometry", "drz"],
"ip_ccw": ["geometry", "sign_ip_cw"],
"bt_ccw": ["geometry", "sign_bt_cw"],
}
pyro_gene_miller_default = {
"q": None,
"kappa": 1.0,
"s_kappa": 0.0,
"delta": 0.0,
"s_delta": 0.0,
"shat": 0.0,
"shift": 0.0,
"dZ0dr": 0.0,
"ip_ccw": -1,
"bt_ccw": -1,
}
pyro_gene_miller_turnbull = {
"q": ["geometry", "q0"],
"kappa": ["geometry", "kappa"],
"s_kappa": ["geometry", "s_kappa"],
"delta": ["geometry", "delta"],
"s_delta": ["geometry", "s_delta"],
"zeta": ["geometry", "zeta"],
"s_zeta": ["geometry", "s_zeta"],
"shat": ["geometry", "shat"],
"shift": ["geometry", "drr"],
"dZ0dr": ["geometry", "drz"],
"ip_ccw": ["geometry", "sign_ip_cw"],
"bt_ccw": ["geometry", "sign_bt_cw"],
}
pyro_gene_miller_turnbull_default = {
"q": None,
"kappa": 1.0,
"s_kappa": 0.0,
"delta": 0.0,
"s_delta": 0.0,
"zeta": 0.0,
"s_zeta": 0.0,
"shat": 0.0,
"shift": 0.0,
"dZ0dr": 0.0,
"ip_ccw": -1,
"bt_ccw": -1,
}
pyro_gene_mxh = {
"q": ["geometry", "q0"],
"kappa": ["geometry", "kappa"],
"s_kappa": ["geometry", "s_kappa"],
"shat": ["geometry", "shat"],
"shift": ["geometry", "drr"],
"dZ0dr": ["geometry", "drz"],
"cn": ["geometry", "cN_m"],
"sn": ["geometry", "sN_m"],
"dcndr": ["geometry", "cNdr_m"],
"dsndr": ["geometry", "sNdr_m"],
"ip_ccw": ["geometry", "sign_ip_cw"],
"bt_ccw": ["geometry", "sign_bt_cw"],
}
pyro_gene_mxh_default = {
"q": None,
"kappa": 1.0,
"s_kappa": 0.0,
"shat": 0.0,
"shift": 0.0,
"dZ0dr": 0.0,
"cn": [0.0, 0.0, 0.0, 0.0],
"sn": [0.0, 0.0, 0.0, 0.0],
"dcndr": [0.0, 0.0, 0.0, 0.0],
"dsndr": [0.0, 0.0, 0.0, 0.0],
"ip_ccw": -1,
"bt_ccw": -1,
}
pyro_gene_circular = {
"q": ["geometry", "q0"],
"shat": ["geometry", "shat"],
"ip_ccw": ["geometry", "sign_ip_cw"],
"bt_ccw": ["geometry", "sign_bt_cw"],
}
pyro_gene_circular_default = {
"q": None,
"shat": 0.0,
"ip_ccw": -1,
"bt_ccw": -1,
}
pyro_gene_fourier = {
"q": ["geometry", "q0"],
"shat": ["geometry", "shat"],
"cN": ["geometry", "cn_m"],
"sN": ["geometry", "sn_m"],
"dcNdr": ["geometry", "cndr_m"],
"dsNdr": ["geometry", "sndr_m"],
"ip_ccw": ["geometry", "sign_ip_cw"],
"bt_ccw": ["geometry", "sign_ip_cw"],
}
pyro_gene_fourier_default = {
"q": None,
"shat": 0.0,
"cN": [0.5, *[0.0] * 31],
"sN": [0.5, *[0.0] * 31],
"dcNdr": [0.5, *[0.0] * 31],
"dsNdr": [0.5, *[0.0] * 31],
"ip_ccw": -1,
"bt_ccw": -1,
}
pyro_gene_species = {
"mass": "mass",
"z": "charge",
"dens": "dens",
"temp": "temp",
"inverse_lt": "omt",
"inverse_ln": "omn",
}
[docs]
def read_from_file(
self, filename: PathLike, detect_norm: bool = True
) -> Dict[str, Any]:
"""
Reads GENE input file into a dictionary
Uses default read, which assumes input is a Fortran90 namelist
"""
# TODO Hacky fix in erroneous brackets from GENE v3.0
self.original_filename = filename
read_str = False
with open(filename, "r") as f:
filedata = f.readlines()
for i, line in enumerate(filedata):
if "FCVERSION" in line and "(" in line:
read_str = True
filedata[i] = line.replace("(", "")
if ")" in line:
filedata[i] = filedata[i].replace(")", "")
filedata = "".join(filedata)
if read_str:
return self.read_str(filedata)
else:
return super().read_from_file(filename, detect_norm=detect_norm)
[docs]
def read_str(self, input_string: str, detect_norm: bool = True) -> Dict[str, Any]:
"""
Reads GENE input file given as string
Uses default read_str, which assumes input is a Fortran90 namelist
"""
return super().read_str(input_string, detect_norm=detect_norm)
[docs]
def read_dict(self, input_dict: dict, detect_norm: bool = True) -> Dict[str, Any]:
"""
Reads GENE input file given as dict
Uses default read_dict, which assumes input is a dict
"""
return super().read_dict(input_dict, detect_norm=detect_norm)
[docs]
def verify_file_type(self, filename: PathLike):
"""
Ensure this file is a valid gene input file, and that it contains sufficient
info for Pyrokinetics to work with
"""
expected_keys = ["general", "geometry", "box"]
self.verify_expected_keys(filename, expected_keys)
[docs]
def write(
self,
filename: PathLike,
float_format: str = "",
local_norm: Normalisation = None,
code_normalisation: str = None,
):
"""
Write self.data to a gyrokinetics input file.
Uses default write, which writes to a Fortan90 namelist
"""
if local_norm is None:
local_norm = Normalisation("write")
aspect_ratio = (
self.data["geometry"]["major_r"] / self.data["geometry"]["minor_r"]
)
local_norm.set_ref_ratios(aspect_ratio=aspect_ratio)
if code_normalisation is None:
code_normalisation = self.code_name.lower()
convention = getattr(local_norm, code_normalisation)
for name, namelist in self.data.items():
self.data[name] = convert_dict(namelist, convention)
super().write(filename, float_format=float_format)
[docs]
def add_flags(self, flags) -> None:
"""
Add extra flags to GENE input file
Uses default, which assumes a Fortan90 namelist
"""
super().add_flags(flags)
[docs]
def get_local_geometry(self) -> LocalGeometry:
"""
Returns local geometry. Delegates to more specific functions
"""
geometry_type = self.data["geometry"]["magn_geometry"]
if geometry_type == "miller":
if (
self.data["geometry"].get("zeta", 0.0) != 0.0
or self.data["geometry"].get("zeta", 0.0) != 0.0
):
default_inputs = default_miller_turnbull_inputs()
pyro_gene_local_geometry = self.pyro_gene_miller_turnbull
pyro_gene_local_geometry_default = (
self.pyro_gene_miller_turnbull_default
)
local_geometry_class = LocalGeometryMillerTurnbull
else:
default_inputs = default_miller_inputs()
pyro_gene_local_geometry = self.pyro_gene_miller
pyro_gene_local_geometry_default = self.pyro_gene_miller_default
local_geometry_class = LocalGeometryMiller
elif geometry_type == "miller_mxh":
default_inputs = default_mxh_inputs()
pyro_gene_local_geometry = self.pyro_gene_mxh
pyro_gene_local_geometry_default = self.pyro_gene_mxh_default
local_geometry_class = LocalGeometryMXH
elif geometry_type == "miller_general":
default_inputs = default_fourier_gene_inputs()
pyro_gene_local_geometry = self.pyro_gene_fourier
pyro_gene_local_geometry_default = self.pyro_gene_fourier_default
local_geometry_class = LocalGeometryFourierGENE
elif geometry_type in ["circular", "s_alpha", "slab"]:
default_inputs = default_miller_inputs()
pyro_gene_local_geometry = self.pyro_gene_circular
pyro_gene_local_geometry_default = self.pyro_gene_circular_default
local_geometry_class = LocalGeometryMiller
elif geometry_type in ["tracer_efit", "gene"]:
default_inputs = default_fourier_gene_inputs()
pyro_gene_local_geometry = self.pyro_gene_fourier
pyro_gene_local_geometry_default = self.pyro_gene_fourier_default
local_geometry_class = LocalGeometryFourierGENE
geometry_dict = self.get_gene_geometry()
else:
raise NotImplementedError(
f"LocalGeometry type {geometry_type} not implemented for GENE"
)
local_geometry_data = default_inputs
for (pyro_key, (gene_param, gene_key)), gene_default in zip(
pyro_gene_local_geometry.items(), pyro_gene_local_geometry_default.values()
):
local_geometry_data[pyro_key] = self.data[gene_param].get(
gene_key, gene_default
)
major_R = self.data["geometry"].get("major_r", 1.0)
major_Z = self.data["geometry"].get("major_z", 0.0)
local_geometry_data["Rmaj"] = major_R
local_geometry_data["Z0"] = major_Z
trpeps = self.get_trpeps()
# Handle the fact that GENE defaults to 'minor_r = 1.0' in
# 'parameters.dat' if 'minor_r' is not specified in 'parameters'
minor_r = self.data["geometry"].get("minor_r", 0.0)
if not (minor_r == 0.0 or np.isclose(minor_r, major_R)):
local_geometry_data["aspect_ratio"] = major_R / minor_r
local_geometry_data["rho"] = trpeps * local_geometry_data["Rmaj"]
# Need to add in factor of rho
if geometry_type == "miller_mxh":
for key in ["dcndr", "dsndr"]:
if local_geometry_data[key] == 0.0:
local_geometry_data[key] = [0.0] * len(local_geometry_data["cn"])
else:
local_geometry_data[key] = [
float(i) / local_geometry_data["rho"]
for i in local_geometry_data[key]
]
local_geometry_data["n_moments"] = len(local_geometry_data["cn"])
# Different defn of s_delta/s_zeta
delta = self.data["geometry"].get(
"delta", np.sin(local_geometry_data["sn"][1])
)
zeta = self.data["geometry"].get("zeta", -local_geometry_data["sn"][2])
s_delta = (
self.data["geometry"].get("s_delta", local_geometry_data["dsndr"][1])
/ local_geometry_data["rho"]
)
s_zeta = (
self.data["geometry"].get("s_zeta", -local_geometry_data["dsndr"][2])
/ local_geometry_data["rho"]
)
local_geometry_data["sn"][1] = np.arcsin(delta)
local_geometry_data["sn"][2] = -zeta
local_geometry_data["dsndr"][1] = s_delta
local_geometry_data["dsndr"][2] = -s_zeta
for key, value in local_geometry_data.items():
if isinstance(value, list):
local_geometry_data[key] = np.array(value, dtype=float)
# GENE defines whether clockwise - need to flip sign
local_geometry_data["ip_ccw"] *= -1
local_geometry_data["bt_ccw"] *= -1
# Assume ne * Tref*8pi*1e-7 = 1.0
(
ne,
Te,
) = self.get_ne_te_normalisation()
beta = self.data["general"].get("beta", 0.0) / ne
dpdx = self.data["geometry"].get("dpdx_pm", -2)
amhd = self.data["geometry"].get("amhd", 0.0)
local_species = self.get_local_species()
if amhd != -1:
amhd_beta_prime = -amhd / (
local_geometry_data["q"] ** 2 * local_geometry_data["Rmaj"]
)
else:
amhd_beta_prime = (
-local_species.inverse_lp.m * local_species.pressure.m * beta
)
if dpdx == -1:
if local_geometry_data["B0"] is not None:
local_geometry_data["beta_prime"] = (
-local_species.inverse_lp.m * local_species.pressure.m * beta
)
else:
local_geometry_data["beta_prime"] = 0.0
elif dpdx == -2:
local_geometry_data["beta_prime"] = amhd_beta_prime
else:
local_geometry_data["beta_prime"] = -dpdx
if not np.isclose(local_geometry_data["beta_prime"], amhd_beta_prime):
warnings.warn(
f"GENE dpdx_pm = {local_geometry_data['beta_prime']} not set consistently with amhd = "
f"{amhd_beta_prime} - drifts may not behave as expected"
)
if geometry_type in ["tracer_efit", "gene"]:
for key, value in geometry_dict.items():
local_geometry_data[key] = value
local_geometry = local_geometry_class.from_gk_data(local_geometry_data)
if geometry_type == "miller_mxh":
local_geometry.dthetaR_dr = local_geometry.get_dthetaR_dr(
local_geometry.theta, local_geometry.dcndr, local_geometry.dsndr
)
# Need to get convention after?
if hasattr(self, "convention"):
convention = self.convention
else:
norms = Normalisation("get_local_geometry")
convention = getattr(norms, self.norm_convention)
_, _, rgeo_rmaj = self._get_rgeo_rmaj()
local_geometry.B0 = rgeo_rmaj
local_geometry.dpsidr = local_geometry.dpsidr * local_geometry.B0
local_geometry.normalise(norms=convention)
if geometry_type in ["tracer_efit", "gene"]:
lref = local_geometry.Rmaj.units
bref = local_geometry.B0.units
local_geometry.R_eq = geometry_dict["R_eq"] * lref
local_geometry.Z_eq = geometry_dict["Z_eq"] * lref
local_geometry.Rmaj = geometry_dict["Rmaj"] * lref
local_geometry.b_poloidal_eq = geometry_dict["b_poloidal_eq"] * bref
local_geometry.Z0 = geometry_dict["Z0"] * lref
local_geometry.rho = geometry_dict["rho"] * lref
local_geometry.dpsidr = geometry_dict["dpsidr"] * bref * lref
local_geometry.beta_prime = geometry_dict["beta_prime"] * bref**2 / lref
self._drhotor_dr = geometry_dict["drhotor_dr"]
local_geometry._set_shape_coefficients(
local_geometry.R_eq,
local_geometry.Z_eq,
local_geometry.b_poloidal_eq,
)
raxis_rmaj, _, rgeo_rmaj = self._get_rgeo_rmaj()
# Need to modify shape coefficients to match dpsidr
bunit_over_b0 = local_geometry.get_bunit_over_b0()
dpsidr = (
bunit_over_b0 / local_geometry.q * local_geometry.rho
).m * rgeo_rmaj
# Rescale to account for a/Lref and B0/Bref
ratio_dpsidr = geometry_dict["dpsidr"] / dpsidr
local_geometry.dcNdr *= ratio_dpsidr
local_geometry.dsNdr *= ratio_dpsidr
local_geometry.b_poloidal_eq *= 1 / ratio_dpsidr
local_geometry.bunit_over_b0 = local_geometry.get_bunit_over_b0()
local_geometry.dpsidr = (
(local_geometry.bunit_over_b0 / local_geometry.q * local_geometry.rho)
* bref
* rgeo_rmaj
)
local_geometry.R, local_geometry.Z = local_geometry.get_flux_surface(
local_geometry.theta
)
local_geometry.b_poloidal = local_geometry.get_b_poloidal(
theta=local_geometry.theta,
)
(
local_geometry.dRdtheta,
local_geometry.dRdr,
local_geometry.dZdtheta,
local_geometry.dZdr,
) = local_geometry.get_RZ_derivatives(local_geometry.theta)
local_geometry.Fpsi = local_geometry.get_f_psi()
try:
local_geometry.FF_prime = local_geometry.get_f_prime() * local_geometry.Fpsi
except ValueError:
local_geometry.FF_prime = None
return local_geometry
[docs]
def get_trpeps(self) -> float:
"""
Returns
-------
trpeps: trpeps from the input file
"""
geometry_type = self.data["geometry"]["magn_geometry"]
if hasattr(self, "original_filename"):
original_filename = Path(self.original_filename)
prefix = original_filename.parent / geometry_type
if original_filename.suffix:
suffix = original_filename.suffix
else:
filename_split = original_filename.name.split("_")
if len(filename_split) > 1:
suffix = f"_{filename_split[-1]}"
else:
suffix = ""
geometry_filename = Path(f"{str(prefix)}{suffix}")
if geometry_filename.exists():
direct_geometry_nml = f90nml.read(geometry_filename)
trpeps = direct_geometry_nml["parameters"]["trpeps"]
if trpeps == 0.0:
trpeps = (
np.sqrt(direct_geometry_nml["parameters"]["s0"])
/ direct_geometry_nml["parameters"]["major_R"]
)
else:
trpeps = self.data["geometry"].get("trpeps", 0.0)
else:
trpeps = self.data["geometry"].get("trpeps", 0.0)
return trpeps
[docs]
def get_gene_geometry(self):
"""
Returns
-------
trpeps: trpeps from the input file
"""
if hasattr(self, "_gene_geometry_dict"):
return self._gene_geometry_dict
geometry_type = self.data["geometry"]["magn_geometry"]
geo_dict = {}
if hasattr(self, "original_filename"):
original_filename = Path(self.original_filename)
prefix = original_filename.parent / geometry_type
if original_filename.suffix:
suffix = original_filename.suffix
else:
filename_split = original_filename.name.split("_")
if len(filename_split) > 1:
suffix = f"_{filename_split[-1]}"
else:
suffix = ""
geometry_filename = Path(f"{str(prefix)}{suffix}")
if geometry_filename.exists():
geometry_nml = f90nml.read(geometry_filename)
# GENE Lref is not magnetic axis so we need to shift it to that
geo_dict["Lref"] = (
geometry_nml["parameters"]["lref"]
* geometry_nml["parameters"]["major_r"]
)
if geo_dict["Lref"] == 0.0:
geo_dict["Lref"] = 1.0
skiprows = 19
if "edge_opt" in geometry_nml["parameters"].keys():
skiprows += 1
geometry_data = np.loadtxt(geometry_filename, skiprows=skiprows)
Z0 = self.data["geometry"].get("major_z", False)
if Z0:
Z0_index = np.argmin(
abs(geometry_data[:, 13] / geo_dict["Lref"] - Z0)
)
elif geometry_type in ["gene", "tracer_efit"]:
Z0_index = np.argmin(abs(geometry_data[:, 12]))
Z0 = geometry_data[-Z0_index, 13] / geo_dict["Lref"]
else:
Z0 = 0.0
Z0_index = np.argmin(abs(geometry_data[:, 13] - Z0))
geometry_data = np.roll(geometry_data, -Z0_index, axis=0)
R = geometry_data[:, 11] / geo_dict["Lref"]
Z = geometry_data[:, 13] / geo_dict["Lref"]
rho = (max(R) - min(R)) / 2
R_major = (max(R) + min(R)) / 2
Zmid = (max(Z) + min(Z)) / 2
# Handle edge_opt != 0.0
edge_opt = self.data["geometry"].get("edge_opt", 0.0)
if edge_opt != 0.0:
# Initial GENE zprime is from -pi to pi
zprime = np.linspace(-np.pi, np.pi, len(R), endpoint=False)
cap_N = np.arcsinh(edge_opt * np.pi) / np.pi
dz_dzprime = cap_N * np.cosh(cap_N * zprime) / edge_opt
# Need to roll to match data
dz_dzprime = np.roll(dz_dzprime, -Z0_index, axis=0)
else:
dz_dzprime = 1.0
reverse = False
if R[0] < R_major:
if Z[1] > Z[0]:
roll_sign = -1
else:
reverse = True
roll_sign = 1
else:
if Z[1] > Z[0]:
roll_sign = 1
else:
reverse = True
roll_sign = -1
R_diff = R - R_major
Z_diff = Z - Zmid
aN = np.sqrt((R_diff) ** 2 + (Z_diff) ** 2)
dot_product = R_diff * np.roll(R_diff, roll_sign) + Z_diff * np.roll(
Z_diff, roll_sign
)
magnitude = np.sqrt(R_diff**2 + Z_diff**2)
arc_angle = dot_product / (magnitude * np.roll(magnitude, roll_sign))
theta0 = np.arcsin(Z_diff[0] / aN[0])
theta_diff = np.arccos(arc_angle)
if Z[1] > Z[0]:
theta = np.cumsum(theta_diff) - theta_diff[0]
else:
theta = -np.cumsum(theta_diff) - theta_diff[0]
theta += theta0
gxx = geometry_data[:, 0]
gxy = geometry_data[:, 1]
gxz = geometry_data[:, 2]
gyy = geometry_data[:, 3]
gyz = geometry_data[:, 4]
gzz = geometry_data[:, 5]
bmod = geometry_data[:, 6]
gxz *= dz_dzprime
gyz *= dz_dzprime
gzz *= dz_dzprime**2
dxdR = geometry_data[:, 14]
Cy = geometry_nml["parameters"]["cy"]
Cxy = geometry_nml["parameters"]["cxy"]
q = geometry_nml["parameters"]["q0"]
ip_ccw = -geometry_nml["parameters"]["sign_Ip_CW"]
bt_ccw = -geometry_nml["parameters"]["sign_Bt_CW"]
if ip_ccw == 0:
ip_ccw = 1
if bt_ccw == 0:
bt_ccw = 1
gxy *= ip_ccw * bt_ccw
gyz *= ip_ccw * bt_ccw
maxR_index = np.argmax(R)
minR_index = np.argmin(R)
drhotor_dr = (
2.0
/ (
dxdR[maxR_index] / gxx[maxR_index]
- dxdR[minR_index] / gxx[minR_index]
)
* geometry_nml["parameters"]["major_r"]
)
gxx *= drhotor_dr**-2
gxy *= drhotor_dr**-1
gxz *= drhotor_dr**-1
Cxy *= drhotor_dr
# x0 = rho_tor
Cx_prime = 1.0
dpsidr = (
Cxy
* Cy
* Cx_prime
* drhotor_dr
/ geometry_nml["parameters"]["major_r"] ** 2
)
b_pol = np.sqrt(
Cxy**2
* (
((Cy * q) ** 2 * (gxx * gzz - gxz**2))
+ 2 * Cy * q * (gxy * gxz - gxx * gyz)
+ (gxx * gyy - gxy**2)
)
)
# f in Bref_Bgeo
f = np.sqrt(bmod**2 - (b_pol) ** 2) * R
f_dev = np.std(f)
if f_dev > 0.1:
raise ValueError(
"Error in determination of F in get_gene_geometry. std too large"
)
rgeo_rmaj = np.mean(f) / R_major
# Need to convert shat_GENE to shat_pyro
shat_gene = geometry_nml["parameters"]["shat"]
x0 = np.sqrt(geometry_nml["parameters"]["s0"])
shat = shat_gene * rho / x0 * drhotor_dr
beta_prime = -geometry_nml["parameters"]["my_dpdx"] * drhotor_dr
if reverse:
R = R[::-1]
Z = Z[::-1]
theta = theta[::-1]
b_pol = b_pol[::-1]
geo_dict["Rmaj"] = R_major
geo_dict["Z0"] = Z0
geo_dict["rho"] = rho
geo_dict["theta_eq"] = theta
geo_dict["R_eq"] = R
geo_dict["Z_eq"] = Z
geo_dict["dpsidr"] = dpsidr
geo_dict["shat"] = shat
geo_dict["q"] = q
geo_dict["beta_prime"] = beta_prime
geo_dict["b_poloidal_eq"] = b_pol
geo_dict["drhotor_dr"] = drhotor_dr
geo_dict["rgeo_rmaj"] = rgeo_rmaj
self._gene_geometry_dict = geo_dict
return self._gene_geometry_dict
else:
raise FileNotFoundError(
f"Can't find geometry file: {geometry_filename} in get_gene_geometry"
)
else:
return {}
[docs]
def get_local_species(self):
"""
Load LocalSpecies object from GENE file
"""
local_species = LocalSpecies()
ion_count = 0
if hasattr(self, "convention"):
convention = self.convention
else:
norms = Normalisation("get_local_species")
if self._convention_dict:
code_normalisation = self.norm_convention
else:
code_normalisation = "pyrokinetics"
convention = getattr(norms, code_normalisation)
gene_nu_ei = self.data["general"].get("coll", 0.0)
external_contr = self.data.get(
"external_contr", {"exbrate": 0.0, "omega0_tor": 0.0, "pfsrate": 0.0}
)
if external_contr.get("pfsrate", 0.0) == -1111:
external_contr["pfsrate"] = external_contr["exbrate"]
trpeps = self.get_trpeps()
rho = trpeps * self.data["geometry"].get("major_r", 1.0)
if rho == 0.0:
domega_drho = 0.0
else:
domega_drho = (
-self.data["geometry"]["q0"] / rho * external_contr.get("pfsrate", 0.0)
)
names = self.get_local_species_names()
# Load each species into a dictionary
for i_sp in range(self.data["box"]["n_spec"]):
species_data = CleverDict()
try:
gene_data = self.data["species"][i_sp]
except TypeError:
# Case when only 1 species
gene_data = self.data["species"]
for pyro_key, gene_key in self.pyro_gene_species.items():
species_data[pyro_key] = gene_data[gene_key]
# Always force to major_r norm and then re-normalise to pyro after
if self.data["geometry"].get("magn_geometry") in ["tracer_efit", "gene"]:
lref_scale = self.data["geometry"]["major_r"]
else:
lref_scale = 1.0
species_data["inverse_lt"] = (
gene_data["omt"] * self._drhotor_dr * lref_scale
)
species_data["inverse_ln"] = (
gene_data["omn"] * self._drhotor_dr * lref_scale
)
species_data["omega0"] = external_contr.get("omega0_tor", 0.0)
species_data["domega_drho"] = domega_drho * self._drhotor_dr * lref_scale
if species_data.z == -1:
species_data.nu = (
(gene_nu_ei * 4 * (deuterium_mass / electron_mass) ** 0.5)
* convention.vref
/ convention.lref
)
else:
ion_count += 1
# Set nu to zero initially, will be updated later if electrons present
species_data.nu = 0.0 * convention.vref / convention.lref
species_data.name = names[i_sp]
# normalisations
species_data.dens *= convention.nref
species_data.mass *= convention.mref
species_data.temp *= convention.tref
species_data.z *= convention.qref
species_data.inverse_lt *= convention.lref**-1
species_data.inverse_ln *= convention.lref**-1
species_data.omega0 *= convention.vref / convention.lref
species_data.domega_drho *= convention.vref / convention.lref**2
# Add individual species data to dictionary of species
local_species.add_species(name=names[i_sp], species_data=species_data)
# Calculate collision frequencies for ions (skip if adiabatic electrons)
if hasattr(local_species, "electron"):
nu_ee = local_species.electron.nu
te = local_species.electron.temp
ne = local_species.electron.dens
me = local_species.electron.mass
for ion in range(ion_count):
key = f"ion{ion + 1}"
nion = local_species[key]["dens"]
tion = local_species[key]["temp"]
mion = local_species[key]["mass"]
zion = local_species[key]["z"]
# Not exact at log(Lambda) does change but pretty close...
local_species[key]["nu"] = (
nu_ee
* (zion**4 * nion / tion**1.5 / mion**0.5)
/ (ne / te**1.5 / me**0.5)
).m * nu_ee.units
local_species.zeff = (
self.data["general"].get("zeff", 1.0) * ureg.elementary_charge
)
# Normalise to pyrokinetics normalisations and calculate total pressure gradient
local_species.normalise(convention)
return local_species
[docs]
def get_local_species_names(self):
"""
Returns a list of the local species names
Returns
-------
names: Array
List of local species names
"""
names = []
ion_count = 0
for i_sp in range(self.data["box"]["n_spec"]):
try:
gene_data = self.data["species"][i_sp]
except TypeError:
# Case when only 1 species
gene_data = self.data["species"]
species_z = gene_data["charge"]
if species_z == -1:
names.append("electron")
else:
ion_count += 1
names.append(f"ion{ion_count}")
return names
[docs]
def get_numerics(self) -> Numerics:
"""Gather numerical info (grid spacing, time steps, etc)"""
if hasattr(self, "convention"):
convention = self.convention
else:
norms = Normalisation("get_local_species")
if self._convention_dict:
code_normalisation = self.norm_convention
else:
code_normalisation = "pyrokinetics"
convention = getattr(norms, code_normalisation)
numerics_data = dict()
# Set number of fields
numerics_data["phi"] = True
numerics_data["apar"] = bool(self.data["general"].get("beta", 0))
numerics_data["bpar"] = bool(self.data["general"].get("bpar", 0))
numerics_data["delta_time"] = self.data["general"].get("dt_max", 0.01)
numerics_data["max_time"] = self.data["general"].get("simtimelim", 500.0)
try:
numerics_data["theta0"] = -self.data["box"]["kx_center"] / (
self.data["box"]["kymin"] * self.data["geometry"]["shat"]
)
except KeyError:
numerics_data["theta0"] = 0.0
numerics_data["nky"] = self.data["box"]["nky0"]
numerics_data["ky"] = self.data["box"]["kymin"]
# Set to zero if box.lx not present
numerics_data["kx"] = 2 * pi / self.data["box"].get("lx", np.inf)
# Velocity grid
numerics_data["ntheta"] = self.data["box"].get("nz0", 24)
numerics_data["nenergy"] = self.data["box"].get("nv0", 16) // 2
numerics_data["npitch"] = self.data["box"].get("nw0", 16)
numerics_data["nonlinear"] = bool(self.data["general"].get("nonlinear", False))
if numerics_data["nonlinear"]:
numerics_data["nkx"] = self.data["box"]["nx0"]
numerics_data["nperiod"] = 1
else:
numerics_data["nkx"] = 1
numerics_data["nperiod"] = self.data["box"]["nx0"] - 1
(
ne,
Te,
) = self.get_ne_te_normalisation()
numerics_data["beta"] = self.data["general"].get("beta", 0.0) / ne
external_contr = self.data.get(
"external_contr", {"exbrate": 0.0, "omega0_tor": 0.0, "pfsrate": 0.0}
)
numerics_data["gamma_exb"] = external_contr["exbrate"]
return Numerics(**numerics_data).with_units(convention)
[docs]
def get_reference_values(self, local_norm: Normalisation) -> Dict[str, Any]:
"""
Reads in reference values from input file
"""
if "units" not in self.data.keys():
return {}
if not self.data["units"].keys():
return {}
norms = {}
raxis_rmaj, magnetic_axis_radius, rgeo_rmaj = self._get_rgeo_rmaj()
if magnetic_axis_radius:
R_major = magnetic_axis_radius / raxis_rmaj
norms["lref_major_radius"] = R_major * local_norm.units.meter
else:
if "major_R" in self.data["geometry"]:
lref_scale = self.data["geometry"]["major_R"]
lref_key = "major_radius"
elif "minor_r" in self.data["geometry"]:
lref_scale = self.data["geometry"]["minor_r"]
lref_key = "minor_radius"
norms[f"lref_{lref_key}"] = (
self.data["units"]["lref"] * local_norm.units.meter * lref_scale
)
norms["tref_electron"] = self.data["units"]["Tref"] * local_norm.units.keV
norms["nref_electron"] = (
self.data["units"]["nref"] * local_norm.units.meter**-3 * 1e19
)
norms["bref_B0"] = (
self.data["units"]["Bref"] * local_norm.units.tesla * rgeo_rmaj
)
return norms
def _detect_normalisation(self):
"""
Determines the necessary inputs and passes information to the base method _set_up_normalisation.
The following values are needed
default_references: dict
Dictionary containing default reference values for the
gk_code: str
GK code
electron_density: float
Electron density from GK input
electron_temperature: float
Electron density from GK input
e_mass: float
Electron mass from GK input
electron_index: int
Index of electron in list of data
found_electron: bool
Flag on whether electron was found
densities: ArrayLike
List of species densities
temperatures: ArrayLike
List of species temperature
reference_density_index: ArrayLike
List of indices where the species has a density of 1.0
reference_temperature_index: ArrayLike
List of indices where the species has a temperature of 1.0
major_radius: float
Normalised major radius from GK input
rgeo_rmaj: float
Ratio of Geometric and flux surface major radius
minor_radius: float
Normalised minor radius from GK input
"""
default_references = {
"nref_species": "electron",
"tref_species": "electron",
"mref_species": "deuterium",
"bref": "B0",
"lref": "major_radius",
"ne": 1.0,
"te": 1.0,
"rgeo_rmaj": 1.0,
"vref": "nrl",
"rhoref": "pyro",
"raxis_rmaj": None,
}
reference_density_index = []
reference_temperature_index = []
densities = []
temperatures = []
masses = []
found_electron = False
e_mass = None
electron_temperature = None
electron_density = None
electron_index = None
# Load each species into a dictionary
for i_sp in range(self.data["box"]["n_spec"]):
try:
gene_data = self.data["species"][i_sp]
except TypeError:
# Case when only 1 species
gene_data = self.data["species"]
dens = gene_data["dens"]
temp = gene_data["temp"]
mass = gene_data["mass"]
if gene_data["charge"] == -1:
electron_density = dens
electron_temperature = temp
e_mass = mass
electron_index = i_sp
found_electron = True
if np.isclose(dens, 1.0):
reference_density_index.append(i_sp)
if np.isclose(temp, 1.0):
reference_temperature_index.append(i_sp)
densities.append(dens)
temperatures.append(temp)
masses.append(mass)
if not found_electron:
# Calculate electron density from quasineutrality for adiabatic electrons
ne = 0.0
for i_sp in range(self.data["box"]["n_spec"]):
try:
species_data = self.data["species"][i_sp]
except TypeError:
# Case when only 1 species
species_data = self.data["species"]
ne += densities[i_sp] * species_data["charge"]
electron_density = ne
# Get temperature from first species (handle single species case)
try:
electron_temperature = self.data["species"][0]["temp"]
except TypeError:
# Case when only 1 species
electron_temperature = self.data["species"]["temp"]
e_mass = (electron_mass / deuterium_mass).m
found_electron = True
magnetic_axis_radius = None
minor_radius = self.data["geometry"].get("minor_r", 0.0)
major_radius = self.data["geometry"]["major_r"]
rgeo_rmaj = 1.0
raxis_rmaj = None
geometry_type = self.data["geometry"].get("magn_geometry", "miller")
if geometry_type in ["tracer_efit", "gene"]:
major_radius = 0.0
minor_radius = 0.0
raxis_rmaj, magnetic_axis_radius, rgeo_rmaj = self._get_rgeo_rmaj()
super()._set_up_normalisation(
default_references=default_references,
gk_code=self.code_name.lower(),
electron_density=electron_density,
electron_temperature=electron_temperature,
e_mass=e_mass,
electron_index=electron_index,
found_electron=found_electron,
densities=densities,
temperatures=temperatures,
reference_density_index=reference_density_index,
reference_temperature_index=reference_temperature_index,
major_radius=major_radius,
rgeo_rmaj=rgeo_rmaj,
minor_radius=minor_radius,
raxis_rmaj=raxis_rmaj,
)
def _get_rgeo_rmaj(self):
if hasattr(self, "original_filename"):
geometry_type = self.data["geometry"].get("magn_geometry", "miller")
# Only for Tracer EFIT
if geometry_type not in ["tracer_efit", "gene"]:
return 1.0, None, 1.0
geometry_dict = self.get_gene_geometry()
if geometry_dict:
rgeo_rmaj = geometry_dict["rgeo_rmaj"]
magnetic_axis_radius = geometry_dict["Lref"]
raxis_rmaj = 1.0 / geometry_dict["Rmaj"]
else:
rgeo_rmaj = 1.0
raxis_rmaj = 1.0
magnetic_axis_radius = None
return raxis_rmaj, magnetic_axis_radius, rgeo_rmaj
else:
return 1.0, None, 1.0
[docs]
def set(
self,
local_geometry: LocalGeometry,
local_species: LocalSpecies,
numerics: Numerics,
local_norm: Normalisation = None,
template_file: Optional[PathLike] = None,
code_normalisation: Optional[str] = None,
**kwargs,
):
"""
Set self.data using LocalGeometry, LocalSpecies, and Numerics.
These may be obtained via another GKInput file, or from Equilibrium/Kinetics
objects.
"""
# If self.data is not already populated, fill in defaults from a given
# template file. If this is not provided by the user, fall back to the
# default.
if self.data is None:
if template_file is None:
template_file = gk_templates["GENE"]
self.read_from_file(template_file)
if local_norm is None:
local_norm = Normalisation("set")
# TODO Find way to get norm_convention = pyrokinetics if we find minor_radius as lref
if code_normalisation is None:
if np.isclose(self.data["geometry"]["minor_r"], 1.0):
code_normalisation = "pyrokinetics"
elif np.isclose(self.data["geometry"]["major_R"], 1.0):
code_normalisation = "gene"
convention = getattr(local_norm, code_normalisation)
# Geometry data
if isinstance(local_geometry, LocalGeometryMillerTurnbull):
eq_type = "MillerTurnbull"
elif isinstance(local_geometry, LocalGeometryMiller):
eq_type = "Miller"
elif isinstance(local_geometry, LocalGeometryMXH):
eq_type = "MXH"
elif isinstance(local_geometry, LocalGeometryFourierGENE):
eq_type = "FourierGENE"
else:
raise NotImplementedError(
f"Writing LocalGeometry type {local_geometry.__class__.__name__} "
"for GENE not yet supported"
)
if eq_type == "MXH":
self.data["geometry"]["magn_geometry"] = "miller_mxh"
elif eq_type == "FourierGENE":
self.data["geometry"]["magn_geometry"] = "miller_general"
else:
self.data["geometry"]["magn_geometry"] = "miller"
if eq_type == "MillerTurnbull":
for pyro_key, (
gene_param,
gene_key,
) in self.pyro_gene_miller_turnbull.items():
self.data[gene_param][gene_key] = local_geometry[pyro_key]
elif eq_type == "MXH":
for pyro_key, (
gene_param,
gene_key,
) in self.pyro_gene_mxh.items():
self.data[gene_param][gene_key] = local_geometry[pyro_key]
# GENE uses rho * dcN_dr
self.data[gene_param]["cNdr_m"] = [
dcndr * local_geometry.rho for dcndr in self.data[gene_param]["cNdr_m"]
]
self.data[gene_param]["sNdr_m"] = [
dsndr * local_geometry.rho for dsndr in self.data[gene_param]["sNdr_m"]
]
# GENE s_delta/s_zeta take precedence over sNdr
self.data[gene_param]["delta"] = np.sin(local_geometry.sn[1])
self.data[gene_param]["zeta"] = -local_geometry.sn[2]
self.data[gene_param]["s_delta"] = (
local_geometry.dsndr[1] * local_geometry.rho
)
self.data[gene_param]["s_zeta"] = (
-local_geometry.dsndr[2] * local_geometry.rho
)
elif eq_type == "FourierGENE":
for pyro_key, (
gene_param,
gene_key,
) in self.pyro_gene_fourier.items():
self.data[gene_param][gene_key] = local_geometry[pyro_key]
elif eq_type == "Miller":
for pyro_key, (gene_param, gene_key) in self.pyro_gene_miller.items():
self.data[gene_param][gene_key] = local_geometry[pyro_key]
# Need to remove any MillerTurnbull key if they exist
for turnbull_key in self.pyro_gene_miller_turnbull.keys():
if (
turnbull_key not in self.pyro_gene_miller.keys()
and turnbull_key in self.data["geometry"].keys()
):
self.data["geometry"].pop(turnbull_key)
self.data["geometry"]["amhd"] = (
-(local_geometry.q**2) * local_geometry.Rmaj * local_geometry.beta_prime
)
self.data["geometry"]["dpdx_pm"] = -local_geometry.beta_prime
self.data["geometry"]["trpeps"] = local_geometry.rho / local_geometry.Rmaj
if code_normalisation == "pyrokinetics":
self.data["geometry"]["minor_r"] = 1.0
elif code_normalisation == "gene":
if local_geometry.a_minor is not None:
self.data["geometry"]["minor_r"] = local_geometry.a_minor
else:
if "minor_r" in self.data["geometry"].keys():
self.data["geometry"].pop("minor_r")
self.data["geometry"]["major_r"] = local_geometry.Rmaj
self.data["geometry"]["major_z"] = local_geometry.Z0
# GENE defines whether clockwise/ pyro defines whether counter-clockwise - need to flip sign
self.data["geometry"]["sign_Ip_CW"] = -1 * local_geometry.ip_ccw
self.data["geometry"]["sign_Bt_CW"] = -1 * local_geometry.bt_ccw
# Kinetic data
n_species = local_species.nspec
self.data["box"]["n_spec"] = n_species
stored_species = len(self.data["species"])
extra_species = stored_species - n_species
if extra_species > 0:
for i in range(extra_species):
del self.data["species"][-1]
iIon = 1
for iSp, name in enumerate(local_species.names):
try:
single_species = self.data["species"][iSp]
except IndexError:
if f90nml.__version__ < "1.4":
self.data["species"].append(copy.copy(self.data["species"][0]))
single_species = self.data["species"][iSp]
else:
# FIXME f90nml v1.4+ uses 'Cogroups' for Namelist groups sharing
# a common key. As of version 1.4.2, Cogroup derives from
# 'list', but does not implement all methods, so confusingly it
# allows calls to 'append', but then doesn't do anything!
# Currently working around this in a horribly inefficient
# manner, by deconstructing the entire Namelist to a dict, using
# secret cogroup names directly, and rebundling the Namelist.
# There must be a better way!
d = self.data.todict()
copied = copy.deepcopy(d["_grp_species_0"])
copied["name"] = None
d[f"_grp_species_{iSp}"] = copied
self.data = f90nml.Namelist(d)
single_species = self.data["species"][iSp]
if name == "electron":
single_species["name"] = "electron"
else:
single_species["name"] = f"ion{iIon}"
iIon += 1
# TODO Currently forcing GENE to use default pyro. Should check local_norm first
for key, val in self.pyro_gene_species.items():
single_species[val] = local_species[name][key]
if "external_contr" not in self.data.keys():
self.data["external_contr"] = f90nml.Namelist(
{
"omega0_tor": local_species.electron.omega0,
"pfsrate": -local_species.electron.domega_drho
* local_geometry.rho
/ local_geometry.q,
}
)
else:
self.data["external_contr"]["omega0_tor"] = local_species.electron.omega0
self.data["external_contr"]["pfsrate"] = (
-local_species.electron.domega_drho
* local_geometry.rho
/ local_geometry.q
)
self.data["general"]["zeff"] = local_species.zeff
beta_ref = convention.beta if local_norm else 0.0
beta = numerics.beta if numerics.beta is not None else beta_ref
# GENE beta is ALWAYS ne*Tref/B0^2 regardless of existing nref
original_convention = getattr(local_norm, self.norm_convention)
ne = (1 * original_convention.nref).to(local_norm.gene)
self.data["general"]["beta"] = beta * ne
self.data["general"]["coll"] = local_species.electron.nu / (
4 * np.sqrt(deuterium_mass / electron_mass)
)
# Numerics
if numerics.bpar and not numerics.apar:
raise ValueError("Can't have bpar without apar in GENE")
self.data["general"]["bpar"] = numerics.bpar
# FIXME breaks a roundtrip when doing electrostatic simulations
# FIXME can't really fix this due to GENE set up...
if not numerics.apar:
self.data["general"]["beta"] = 0.0
self.data["general"]["dt_max"] = numerics.delta_time
self.data["general"]["simtimelim"] = numerics.max_time
if numerics["nonlinear"]:
# TODO Currently forces NL sims to have nperiod = 1
self.data["general"]["nonlinear"] = True
self.data["box"]["nky0"] = numerics["nky"]
self.data["box"]["nx0"] = numerics["nkx"]
else:
self.data["general"]["nonlinear"] = False
self.data["box"]["nky0"] = numerics.nky
self.data["box"]["kymin"] = (
numerics.ky * (1 * convention.bref / local_norm.gs2.bref).to_base_units()
)
self.data["box"]["kx_center"] = (
-1 * numerics.theta0 * numerics.ky * local_geometry.shat
)
if numerics.kx != 0.0:
self.data["box"]["lx"] = 2 * pi / numerics.kx
self.data["box"]["nz0"] = numerics.ntheta
self.data["box"]["nv0"] = 2 * numerics.nenergy
self.data["box"]["nw0"] = numerics.npitch
if "external_contr" not in self.data.keys():
self.data["external_contr"] = f90nml.Namelist(
{"exbrate": numerics.gamma_exb}
)
else:
self.data["external_contr"]["exbrate"] = numerics.gamma_exb
if not local_norm:
return
try:
(1 * convention.tref).to("keV")
si_units = True
except PyroContextError:
si_units = False
if si_units:
if "units" not in self.data.keys():
self.data["units"] = f90nml.Namelist()
self.data["units"]["Tref"] = (1 * convention.tref).to("keV").m
self.data["units"]["nref"] = (1e-19 * convention.nref).to("meter**-3").m
self.data["units"]["mref"] = (1 * convention.mref).to("proton_mass").m
self.data["units"]["Bref"] = (1 * convention.bref).to("tesla").m
self.data["units"]["Lref"] = (1 * convention.lref).to("meter").m
self.data["units"]["omegatorref"] = (
local_species.electron.omega0.to(convention).to("radians/second").m
)
for name, namelist in self.data.items():
self.data[name] = convert_dict(namelist, convention)
[docs]
def get_ne_te_normalisation(self):
adiabatic_electrons = True
# Get electron temp and density to normalise input
for i_sp in range(self.data["box"]["n_spec"]):
try:
gene_data = self.data["species"][i_sp]
except TypeError:
# Case when only 1 species
gene_data = self.data["species"]
if gene_data["charge"] == -1:
ne = gene_data["dens"]
Te = gene_data["temp"]
adiabatic_electrons = False
if adiabatic_electrons:
ne = 0.0
for i_sp in range(self.data["box"]["n_spec"]):
try:
gene_data = self.data["species"][i_sp]
except TypeError:
# Case when only 1 species
gene_data = self.data["species"]
ne += gene_data["dens"] * gene_data["charge"]
# Get temperature from first species (handle single species case)
try:
Te = self.data["species"][0]["temp"]
except TypeError:
# Case when only 1 species
Te = self.data["species"]["temp"]
return ne, Te
[docs]
class GKOutputReaderGENE(FileReader, file_type="GENE", reads=GKOutput):
fields = ["phi", "apar", "bpar"]
[docs]
def read_from_file(
self,
filename: PathLike,
norm: Normalisation,
output_convention: str = "pyrokinetics",
load_fields=True,
load_fluxes=True,
load_moments=False,
downsample: Dict[str, Any] = {},
) -> GKOutput:
raw_data, gk_input, input_str = self._get_raw_data(filename, norm)
fmt_downsample = {}
for key, value in downsample.items():
if isinstance(value, int):
fmt_downsample[key + "_idx"] = slice(value, value + 1)
else:
fmt_downsample[key + "_idx"] = value
downsample = fmt_downsample
coords = self._get_coords(raw_data, gk_input, downsample)
fields = (
self._get_fields(raw_data, gk_input, coords, downsample)
if load_fields
else None
)
fluxes = (
self._get_fluxes(raw_data, gk_input, coords, downsample)
if load_fluxes
else None
)
moments = (
self._get_moments(raw_data, gk_input, coords, downsample)
if load_moments
else None
)
if coords["linear"] and not fields:
eigenvalues = self._get_eigenvalues(raw_data, coords)
else:
# Rely on gk_output to generate eigenvalues
eigenvalues = None
# Assign units and return GKOutput
field_dims = ("theta", "kx", "ky", "time")
flux_dims = ("field", "species", "time")
moment_dims = ("theta", "kx", "species", "ky", "time")
# Assign units and return GKOutput
convention = getattr(norm, gk_input.norm_convention)
norm.default_convention = output_convention.lower()
return GKOutput(
coords=Coords(
time=coords["time"],
kx=coords["kx"],
ky=coords["ky"],
theta=coords["theta"],
pitch=coords["pitch"],
energy=coords["energy"],
species=coords["species"],
field=coords["field"],
).with_units(convention),
norm=norm,
fields=(
Fields(**fields, dims=field_dims).with_units(convention)
if fields
else None
),
fluxes=(
Fluxes(**fluxes, dims=flux_dims).with_units(convention)
if fluxes
else None
),
moments=(
Moments(**moments, dims=moment_dims).with_units(convention)
if moments
else None
),
eigenvalues=(
Eigenvalues(**eigenvalues).with_units(convention)
if eigenvalues
else None
),
linear=coords["linear"],
gk_code="GENE",
input_file=input_str,
normalise_flux_moment=True,
output_convention=output_convention,
jacobian=coords["jacobian"],
)
@staticmethod
def _get_gene_files(filename: PathLike) -> Dict[str, Path]:
"""
Given a directory name, looks for the files filename/parameters_0000,
filename/field_0000 and filename/nrg_0000.
If instead given any of the files parameters_####, field_#### or nrg_####,
looks up the rest of the files in the same directory.
"""
filename = Path(filename)
prefixes = ["parameters", "field", "nrg", "omega"]
if filename.is_dir():
# If given a dir name, looks for dir/parameters_0000
dirname = filename
dat_matches = np.any(
[Path(filename / f"{p}.dat").is_file() for p in prefixes]
)
if dat_matches:
suffix = "dat"
delimiter = "."
else:
suffix = "0000"
delimiter = "_"
else:
# If given a file, searches for all similar GENE files in that file's dir
dirname = filename.parent
# Ensure provided file is a GENE file (fr"..." means raw format str)
matches = [re.search(rf"^{p}_\d{{4}}$", filename.name) for p in prefixes]
if not np.any(matches):
raise RuntimeError(
f"GKOutputReaderGENE: The provided file {filename} is not a GENE "
"output file."
)
suffix = filename.name.split("_")[1]
delimiter = "_"
# Get all files in the same dir
files = {
prefix: dirname / f"{prefix}{delimiter}{suffix}"
for prefix in prefixes
if (dirname / f"{prefix}{delimiter}{suffix}").exists()
}
if not files:
raise RuntimeError(
"GKOutputReaderGENE: Could not find GENE output files in the "
f"directory '{dirname}'."
)
if "parameters" not in files:
raise RuntimeError(
"GKOutputReaderGENE: Could not find GENE output file 'parameters_"
f"{suffix}' when provided with the file/directory '{filename}'."
)
# If binary field file absent, adds .h5 field file,
# if present, to 'files'
if "field" not in files:
if (dirname / f"field{delimiter}{suffix}.h5").exists():
files.update({"field": dirname / f"field{delimiter}{suffix}.h5"})
if "nrg" not in files:
if (dirname / f"nrg{delimiter}{suffix}.h5").exists():
files.update({"nrg": dirname / f"nrg{delimiter}{suffix}.h5"})
return files
@staticmethod
def _get_gene_mom_geo_files(
filename: PathLike, files: Dict, species_names, geometry_type
) -> Dict[str, Path]:
"""
Given a directory name, looks for the files filename/parameters_0000,
filename/field_0000 and filename/nrg_0000.
If instead given any of the files parameters_####, field_#### or nrg_####,
looks up the rest of the files in the same directory.
"""
filename = Path(filename)
prefixes = [f"mom_{species_name}" for species_name in species_names] + [
geometry_type
]
if filename.is_dir():
# If given a dir name, looks for dir/parameters_0000
dirname = filename
dat_matches = np.any(
[Path(filename / f"{p}.dat").is_file() for p in prefixes]
)
if dat_matches:
suffix = "dat"
delimiter = "."
else:
suffix = "0000"
delimiter = "_"
else:
# If given a file, searches for all similar GENE files in that file's dir
dirname = filename.parent
suffix = filename.name.split("_")[-1]
delimiter = "_"
# Get all files in the same dir
for prefix in prefixes:
if (dirname / f"{prefix}{delimiter}{suffix}").exists():
files[prefix] = dirname / f"{prefix}{delimiter}{suffix}"
return files
[docs]
@staticmethod
def infer_path_from_input_file(filename: PathLike) -> Path:
"""
Given path to input file, guess at the path for associated output files.
"""
# If the input file is of the form name_####, get the numbered part and
# search for 'parameters_####' in the run directory. If not, simply return
# the directory.
filename = Path(filename)
num_part_regex = re.compile(r"(\d{4})")
num_part_match = num_part_regex.search(filename.name)
if num_part_match is None:
return Path(filename).parent
else:
return Path(filename).parent / f"parameters_{num_part_match[0]}"
pass
@classmethod
def _get_raw_data(
cls, filename: PathLike, norm: Normalisation = None
) -> Tuple[Dict[str, Any], GKInputGENE, str]:
files = cls._get_gene_files(filename)
# Read parameters_#### as GKInputGENE and into plain string
with open(files["parameters"], "r") as f:
input_str = f.read()
# TODO remove hacky fix from GENE () issue
input_str = input_str.replace("(", "")
input_str = input_str.replace(")", "")
gk_input = GKInputGENE()
gk_input.read_str(input_str)
gk_input.original_filename = files["parameters"]
if gk_input._convention_dict and norm:
gk_input.convention = norm.gene_bespoke
# Handle both single species (namelist) and multiple species (list of namelists)
n_spec = gk_input.data["box"]["n_spec"]
if n_spec == 1:
species_names = [gk_input.data["species"]["name"]]
else:
species_names = [species["name"] for species in gk_input.data["species"]]
geometry_type = gk_input.data["geometry"]["magn_geometry"]
files = cls._get_gene_mom_geo_files(
filename, files, species_names, geometry_type
)
# Defer processing field and flux data until their respective functions
# Simply return files in place of raw data
return files, gk_input, input_str
@staticmethod
def _get_coords(
raw_data: Dict[str, Any],
gk_input: GKInputGENE,
downsample: Dict[str, Any] = {},
) -> Dict[str, Any]:
"""
Sets coords and attrs of a Pyrokinetics dataset from a GENE parameters file.
Args:
raw_data (Dict[str,Any]): Dict containing GENE output. Ignored.
gk_input (GKInputGENE): Processed GENE input file.
Returns:
xr.Dataset: Dataset with coords and attrs set, but not data_vars
"""
nml = gk_input.data
# The last time step is not always written, but depends on
# whatever condition is met first between simtimelim and timelim
species = gk_input.get_local_species_names()
if ".h5" not in str(raw_data["nrg"]):
with open(raw_data["nrg"], "r") as f:
full_data = f.readlines()
ntime = len(full_data) // (len(species) + 1)
lasttime = float(full_data[-(len(species) + 1)])
if (
ntime * nml["in_out"]["istep_nrg"] % nml["in_out"]["istep_field"]
== 0
):
add_on = 0
else:
add_on = 1
else:
with h5py.File(raw_data["nrg"], "r") as file:
key = list(file.keys())[1]
time = file[f"{key}/time"][:]
ntime = len(time)
lasttime = time[-1]
if nml["in_out"]["istep_nrg"] == nml["in_out"]["istep_field"]:
add_on = 0
else:
add_on = 1
ntime = (
int(ntime * nml["in_out"]["istep_nrg"] / nml["in_out"]["istep_field"])
) + add_on
ntime = ntime
# Set time to index for now, gets overwritten by field data
time = np.linspace(0, ntime - 1, ntime)
nfield = nml["info"]["n_fields"]
field = ["phi", "apar", "bpar"][:nfield]
nky = nml["box"]["nky0"]
nkx = nml["box"]["nx0"]
nz = nml["box"]["nz0"]
z = np.linspace(-pi, pi, nz, endpoint=False)
ntheta = nz
local_geometry = gk_input.get_local_geometry()
metric_terms = MetricTerms(local_geometry, ntheta=nz * 4)
z_full = metric_terms.alpha / local_geometry.q
theta = np.interp(z, z_full, metric_terms.regulartheta)
nenergy = nml["box"]["nv0"]
energy = np.linspace(-1, 1, nenergy)
npitch = nml["box"]["nw0"]
pitch = np.linspace(-1, 1, npitch)
fluxes = ["particle", "heat", "momentum"]
moments = ["density", "temperature", "velocity"]
if gk_input.is_linear():
# Set up ballooning angle
single_theta_loop = theta
single_ntheta_loop = ntheta
nkx = nkx - 1 + nkx % 2
ntheta = ntheta * nkx
theta = np.empty(ntheta)
start = 0
for i in range(nkx):
pi_segment = i - (nkx - 1) // 2
theta[start : start + single_ntheta_loop] = (
single_theta_loop + pi_segment * 2 * pi
)
start += single_ntheta_loop
ky = [nml["box"]["kymin"]]
kx = [0.0]
nkx = 1
# TODO should we not also set nky=1?
else:
kymin = nml["box"]["kymin"]
ky = np.linspace(0, kymin * (nky - 1), nky)
lx = nml["box"]["lx"]
dkx = 2 * np.pi / lx
kx = np.empty(nkx)
for i in range(-1, nkx - 1):
if i < (nkx / 2 + 1):
kx[i] = i * dkx
else:
kx[i] = (i - nkx) * dkx
kx = np.roll(np.fft.fftshift(kx), -1)
full_ky = ky
full_kx = kx
full_theta = theta
full_time = time
if downsample:
ky = ky[downsample.get("ky_idx", slice(None))]
kx = kx[downsample.get("kx_idx", slice(None))]
theta = theta[downsample.get("theta_idx", slice(None))]
time = time[downsample.get("time_idx", slice(None))]
theta_mod = np.mod(theta, 2 * np.pi)
Jacobian = np.interp(
theta_mod,
metric_terms.regulartheta,
metric_terms.Jacobian,
period=2 * np.pi,
)
# Store grid data as xarray DataSet
return {
"time": time,
"kx": kx,
"ky": ky,
"theta": theta,
"full_time": full_time,
"full_kx": full_kx,
"full_ky": full_ky,
"full_theta": full_theta,
"energy": energy,
"pitch": pitch,
"moment": moments,
"flux": fluxes,
"field": field,
"species": species,
"linear": gk_input.is_linear(),
"lasttime": lasttime,
"jacobian": Jacobian,
}
@staticmethod
def _get_fields(
raw_data: Dict[str, Any],
gk_input: GKInputGENE,
coords: Dict[str, Any],
downsample: Dict[str, Any],
) -> Dict[str, np.ndarray]:
"""
Sets 3D fields over time.
The field coordinates should be (field, theta, kx, ky, time)
"""
if "field" not in raw_data:
return {}
# Time data stored as binary (int, double, int)
# Check precision used in GENE
if gk_input.data["info"]["PRECISION"] == "SINGLE":
time_data_fmt = "=ifi"
complex_size = 8
dtype = np.complex64
elif gk_input.data["info"]["PRECISION"] == "DOUBLE":
time_data_fmt = "=idi"
complex_size = 16
dtype = np.complex128
else:
raise ValueError(
f"Pyrokinetics can't handle cases when GENE precision is {gk_input.data['info']['PRECISION']}"
)
time = []
time_data_size = struct.calcsize(time_data_fmt)
int_size = 4
if downsample:
kx_idx = downsample.get("kx_idx", None)
ky_idx = downsample.get("ky_idx", None)
theta_idx = downsample.get("theta_idx", None)
time_idx = downsample.get("time_idx", None)
else:
kx_idx = None
ky_idx = None
theta_idx = None
time_idx = None
kx_idx = slice(None) if kx_idx is None else kx_idx
ky_idx = slice(None) if ky_idx is None else ky_idx
theta_idx = slice(None) if theta_idx is None else theta_idx
time_idx = slice(None) if time_idx is None else time_idx
nx = gk_input.data["box"]["nx0"]
nz = gk_input.data["box"]["nz0"]
nkx = len(coords["kx"])
nky = len(coords["ky"])
ntheta = len(coords["theta"])
ntime = len(coords["time"])
nfield = len(coords["field"])
full_nky = len(coords["full_ky"])
full_ntime = len(coords["full_time"])
# Account for kx data being ifft shifted
kx_shifted = list(range(*kx_idx.indices((nx))))
kx_unshifted = [(i + 1 + nx // 2) % nx for i in kx_shifted]
field_size = nx * nz * full_nky * complex_size
time_block_size = time_data_size + nfield * (2 * int_size + field_size)
if gk_input.is_linear():
sliced_field = np.empty((nfield, nx, nky, nz, ntime), dtype=dtype)
else:
sliced_field = np.empty((nfield, nkx, nky, ntheta, ntime), dtype=dtype)
fields = np.empty((nfield, nkx, nky, ntheta, ntime), dtype=dtype)
# Read binary file if present
if ".h5" not in str(raw_data["field"]):
with open(raw_data["field"], "rb") as f:
for it_out, it in enumerate(range(*time_idx.indices(full_ntime))):
# Seek to requested time block
f.seek(it * time_block_size)
# Read time header
time_value = struct.unpack(time_data_fmt, f.read(time_data_size))[1]
time.append(time_value)
for ifield in range(nfield):
f.seek(int_size, 1) # record marker
field_offset = f.tell()
mm = np.memmap(
raw_data["field"],
dtype=dtype,
mode="r",
offset=field_offset,
shape=(nx, full_nky, nz),
order="F",
)
# Slice spatially
sliced_field[
ifield,
:,
:,
:,
it_out,
] = mm[kx_unshifted, ky_idx, theta_idx]
# Skip over field block + trailing marker
f.seek(field_size + int_size, 1)
# Read .h5 file if binary file absent
else:
h5_field_subgroup_names = ["phi", "A_par", "B_par"]
fields = np.empty(
(nfield, nx, nky, nz, ntime),
dtype=complex,
)
with h5py.File(raw_data["field"], "r") as file:
# Read in time data
time.extend(list(file.get("field/time")))
for i_field in range(nfield):
h5_subgroup = "field/" + h5_field_subgroup_names[i_field] + "/"
h5_dataset_names = list(file[h5_subgroup].keys())
for i_time in range(ntime):
h5_dataset = h5_subgroup + h5_dataset_names[i_time]
raw_field = np.array(file.get(h5_dataset))
raw_field = np.array(
raw_field["real"] + raw_field["imaginary"] * 1j,
dtype="complex128",
)
sliced_field[i_field, :, :, :, i_time] = np.swapaxes(
raw_field, 0, 2
)
# Match pyro convention for ion/electron direction
sliced_field = np.conjugate(sliced_field)
if not gk_input.is_linear():
nl_shape = (nfield, nkx, nky, ntheta, ntime)
fields = sliced_field.reshape(nl_shape, order="F")
# Convert from kx to ballooning space
else:
try:
n0_global = gk_input.data["box"]["n0_global"]
q0 = gk_input.data["geometry"]["q0"]
phase_fac = -np.exp(2 * np.pi * 1j * n0_global * q0)
except KeyError:
phase_fac = -1
i_ball = 0
nx_actual = nx - 1 + nx % 2
for i_conn in range(0, nx_actual):
fields[:, 0, :, i_ball : i_ball + nz, :] = (
sliced_field[:, i_conn, :, :, :] * (phase_fac) ** i_conn
)
i_ball += nz
# =================================================
# Overwrite 'time' coordinate as determined in _init_dataset
coords["time"] = time
# Original method coords: (field, kx, ky, theta, time)
# New coords: (field, theta, kx, ky, time)
fields = fields.transpose(0, 3, 1, 2, 4)
result = {}
for ifield, field_name in enumerate(coords["field"]):
result[field_name] = fields[ifield, ...]
return result
@staticmethod
def _get_moments(
raw_data: Dict[str, Any],
gk_input: GKInputGENE,
coords: Dict[str, Any],
downsample: Dict[str, Any],
) -> Dict[str, np.ndarray]:
"""
Sets 3D moments over time.
The moment coordinates should be (moment, theta, kx, species, ky, time)
"""
if f"mom_{gk_input.data['species'][0]['name']}" not in raw_data:
return {}
# Time data stored as binary (int, double, int)
# Check precision used in GENE
if gk_input.data["info"]["PRECISION"] == "SINGLE":
time_data_fmt = "=ifi"
complex_size = 8
dtype = np.complex64
elif gk_input.data["info"]["PRECISION"] == "DOUBLE":
time_data_fmt = "=idi"
complex_size = 16
dtype = np.complex128
else:
raise ValueError(
f"Pyrokinetics can't handle cases when GENE precision is {gk_input.data['info']['PRECISION']}"
)
time = []
time_data_size = struct.calcsize(time_data_fmt)
int_size = 4
if downsample:
kx_idx = downsample.get("kx_idx", None)
ky_idx = downsample.get("ky_idx", None)
theta_idx = downsample.get("theta_idx", None)
time_idx = downsample.get("time_idx", None)
else:
kx_idx = None
ky_idx = None
theta_idx = None
time_idx = None
kx_idx = slice(None) if kx_idx is None else kx_idx
ky_idx = slice(None) if ky_idx is None else ky_idx
theta_idx = slice(None) if theta_idx is None else theta_idx
time_idx = slice(None) if time_idx is None else time_idx
nx = gk_input.data["box"]["nx0"]
nz = gk_input.data["box"]["nz0"]
nkx = len(coords["kx"])
nky = len(coords["ky"])
ntheta = len(coords["theta"])
ntime = len(coords["time"])
full_nky = len(coords["full_ky"])
full_ntime = len(coords["full_time"])
# Account for kx data being ifft shifted
kx_shifted = list(range(*kx_idx.indices((nx))))
kx_unshifted = [(i + 1 + nx // 2) % nx for i in kx_shifted]
species = [species["name"] for species in gk_input.data["species"]]
nspecies = len(species)
nmoment_output = 6
if len(coords["field"]) > 2:
nmoment_output += 3
moment_size = nx * nz * full_nky * complex_size
time_block_size = time_data_size + nmoment_output * (2 * int_size + moment_size)
if gk_input.is_linear():
sliced_moment = np.empty(
(nspecies, nmoment_output, nx, nky, nz, ntime), dtype=dtype
)
else:
sliced_moment = np.empty(
(nspecies, nmoment_output, nkx, nky, ntheta, ntime), dtype=dtype
)
moments = np.empty(
(nspecies, nmoment_output, nkx, nky, ntheta, ntime), dtype=dtype
)
for i_sp, spec in enumerate(species):
# Read binary file if present
if ".h5" not in str(raw_data[f"mom_{spec}"]):
with open(raw_data[f"mom_{spec}"], "rb") as f:
for it_out, it in enumerate(range(*time_idx.indices(full_ntime))):
# Seek to requested time block
f.seek(it * time_block_size)
# Read time header
time_value = struct.unpack(
time_data_fmt, f.read(time_data_size)
)[1]
if i_sp == 0:
time.append(time_value)
for i_moment in range(nmoment_output):
f.seek(int_size, 1)
moment_offset = f.tell()
mm = np.memmap(
raw_data[f"mom_{spec}"],
dtype=dtype,
mode="r",
offset=moment_offset,
shape=(nx, full_nky, nz),
order="F",
)
sliced_moment[i_sp, i_moment, :, :, :, it_out] = mm[
kx_unshifted, ky_idx, theta_idx
]
# Skip over field block + trailing marker
f.seek(moment_size + int_size, 1)
# Read .h5 file if binary file absent
else:
raise NotImplementedError("Moments from HDf5 not yet supported")
# Match pyro convention for ion/electron direction
sliced_moment = np.conjugate(sliced_moment)
if not gk_input.is_linear():
nl_shape = (nspecies, nmoment_output, nkx, nky, ntheta, ntime)
moments = sliced_moment.reshape(nl_shape, order="F")
# Convert from kx to ballooning space
else:
try:
n0_global = gk_input.data["box"]["n0_global"]
q0 = gk_input.data["geometry"]["q0"]
phase_fac = -np.exp(-2 * np.pi * 1j * n0_global * q0)
except KeyError:
phase_fac = -1
i_ball = 0
nx_actual = nx - 1 + nx % 2
for i_conn in range(0, nx_actual):
moments[:, :, 0, :, i_ball : i_ball + nz, :] = (
sliced_moment[:, :, i_conn, :, :, :] * (phase_fac) ** i_conn
)
i_ball += nz
# =================================================
# Overwrite 'time' coordinate as determined in _init_dataset
coords["time"] = time
# Original method coords: (species, moment, kx, ky, theta, time)
# New coords: (moment, theta, kx, species, ky, time)
moments = moments.transpose(1, 4, 2, 0, 3, 5)
result = {}
result["density"] = moments[0, ...]
result["temperature"] = moments[1, ...] / 3 + moments[2, ...] * 2 / 3
result["velocity"] = moments[5, ...]
return result
@staticmethod
def _get_fluxes(
raw_data: Dict[str, Any],
gk_input: GKInputGENE,
coords: Dict[str, Any],
downsample: Dict[str, Any],
) -> Dict[str, np.ndarray]:
"""
Set flux data over time.
The flux coordinates should be (species, flux, field, ky, time)
"""
nml = gk_input.data
# ky data not available in the nrg file so no ky coords here
coord_names = ["species", "flux", "field", "full_time"]
shape = [len(coords[coord_name]) for coord_name in coord_names]
fluxes = np.empty(shape)
nfield = len(coords["field"])
nspecies = len(coords["species"])
ntime = len(coords["time"])
full_ntime = len(coords["full_time"])
if downsample:
time_idx = downsample.get("time_idx", None)
else:
time_idx = None
time_idx = slice(None) if time_idx is None else time_idx
if "nrg" not in raw_data:
logging.warning("Flux data not found, setting all fluxes to zero")
fluxes[...] = 0
result = {"fluxes": fluxes}
return result
flux_istep = nml["in_out"]["istep_nrg"]
field_istep = nml["in_out"]["istep_field"]
ntime_flux = nml["info"]["steps"][0] // flux_istep + 1
if nml["info"]["steps"][0] % flux_istep > 0:
ntime_flux += 1
if flux_istep < field_istep:
time_skip = int(field_istep / flux_istep) - 1
else:
time_skip = 0
if ".h5" not in str(raw_data["nrg"]):
with open(raw_data["nrg"], "r") as csv_file:
nrg_data = csv.reader(csv_file, delimiter=" ", skipinitialspace=True)
if nfield == 3:
logging.warning(
"GENE combines Apar and Bpar particle and heat fluxes, setting Bpar ones to zero"
)
fluxes[:, :, 2, :] = 0.0
field_size = 2
else:
field_size = nfield
for i_time in range(full_ntime):
time = next(nrg_data) # noqa
coords["full_time"][i_time] = float(time[0])
for i_species in range(nspecies):
nrg_line = np.array(next(nrg_data), dtype=float)
# Particle
fluxes[i_species, 0, :field_size, i_time] = nrg_line[
4 : 4 + field_size,
]
# Heat
fluxes[i_species, 1, :field_size, i_time] = nrg_line[
6 : 6 + field_size,
]
# Momentum
if len(nrg_line) < 11:
# The default fluxes saved by GENE are the radial fluxes of parallel momentum
# which are *not* those that enter the transport equations. Should we warn
# the user here?
fluxes[i_species, 2, :field_size, i_time] = nrg_line[
8 : 8 + field_size,
]
else:
# Setting `tor_ang_mom_flux = T` in the GENE input file will compute the radial
# fluxes of toroidal angular momentum, which we here load preferentially
field_columns = {
0: [10, 11], # Phi
1: [12, 13, 17], # Apar
2: [15, 16], # Bpar
}
for i_field, cols in field_columns.items():
if (
i_field < field_size
): # Only process fields that exist
if len(nrg_line) >= max(cols) + 1:
fluxes[i_species, 2, i_field, i_time] = (
nrg_line[cols].sum()
)
else:
fluxes[i_species, 2, i_field, i_time] = 0.0
# Skip time/data values in field print out is less
if i_time < ntime - 1:
for skip_t in range(time_skip):
for skip_s in range(nspecies + 1):
next(nrg_data)
else:
with h5py.File(raw_data["nrg"], "r") as file:
spec_keys = list(file.keys())[1:]
suffixes = ["es", "em"]
prefixes = ["Gamma", "Q", "P"]
if len(coords["time"]) != len(
file[spec_keys[0]]["time"][:: time_skip + 1]
):
final_append = True
else:
final_append = False
if nfield == 3:
logging.warning(
"GENE combines Apar and Bpar particle and heat fluxes, setting Bpar ones to zero"
)
fluxes[:, :, 2, :] = 0.0
if any(
"P_t_par_es" in group
for group in file.values()
if hasattr(group, "__contains__")
):
# Setting `tor_ang_mom_flux = T` in the GENE input file will compute the radial
# fluxes of toroidal angular momentum, which we here load preferentially
prefixes[-1] = "P_t"
suffixes_momentum = {
0: ["par_es", "per_es"], # Phi
1: ["paremA", "peremA"], # Apar
2: ["paremB", "peremB"], # Bpar
}
time_nrg = file[spec_keys[0]]["time"]
for i_species, spec_key in enumerate(spec_keys):
species_data = file[spec_key]
for i_field in range(nfield):
for i_flux in range(len(coords["flux"])):
if i_flux == 2 and prefixes[-1] == "P_t":
# Sum over contributions
flux_data = sum(
(
species_data[f"{prefixes[i_flux]}_{suffix}"][:]
for suffix in suffixes_momentum[i_field]
if f"{prefixes[i_flux]}_{suffix}"
in species_data
),
np.zeros(len(time_nrg)),
)
# Add Maxwell stress contribution
if "PtfieldAV2" in species_data and i_field == 1:
flux_data += species_data["PtfieldAV2"][:]
else:
flux_data = (
species_data.get(
f"{prefixes[i_flux]}_{suffixes[i_field]}",
np.zeros(len(time_nrg)),
)
if i_field < len(suffixes)
else np.zeros(len(time_nrg))
)
if final_append:
fluxes[i_species, i_flux, i_field, :-1] = flux_data[
:: time_skip + 1
]
fluxes[i_species, i_flux, i_field, -1] = flux_data[-1]
else:
fluxes[i_species, i_flux, i_field, :] = flux_data[
:: time_skip + 1
]
results = {}
fluxes = fluxes.transpose(1, 2, 0, 3)
fluxes = fluxes[..., time_idx]
coords["time"] = coords["full_time"][time_idx]
if gk_input.data["geometry"].get("norm_flux_projection", False):
geometry_type = gk_input.data["geometry"]["magn_geometry"]
geometry_filename = raw_data[geometry_type]
geometry_nml = f90nml.read(geometry_filename)
skiprows = 19
if "edge_opt" in geometry_nml["parameters"].keys():
skiprows += 1
geometry_data = np.loadtxt(geometry_filename, skiprows=skiprows)
grho = np.sqrt(geometry_data[:, 0])
jacob = geometry_data[:, -6]
flux_norm = np.sum(jacob) / np.sum(jacob * grho)
else:
flux_norm = 1.0
if gk_input.is_linear():
flux_norm *= 2 * np.pi**1.5
for iflux, flux in enumerate(coords["flux"]):
results[flux] = fluxes[iflux, ...] / flux_norm
return results
@staticmethod
def _get_eigenvalues(
raw_data: Dict[str, Any], coords: Dict
) -> Dict[str, np.ndarray]:
"""
Parameters
----------
raw_data
coords
Returns
-------
Dict of eigenvalues with coords (kx, ky, time)
Only final time is output so we set that to all the times
"""
nky = len(coords["ky"])
nkx = len(coords["kx"])
ntime = len(coords["time"])
mode_frequency = np.empty((nkx, nky, ntime))
growth_rate = np.empty((nkx, nky, ntime))
with open(raw_data["omega"], "r") as csv_file:
omega_data = csv.reader(csv_file, delimiter=" ", skipinitialspace=True)
for iky, line in enumerate(omega_data):
ky, growth, frequency = line
mode_frequency[:, iky, :] = float(frequency)
growth_rate[:, iky, :] = float(growth)
results = {"growth_rate": growth_rate, "mode_frequency": mode_frequency}
return results