Source code for pyrokinetics.gk_code.cgyro
import logging
from ast import literal_eval
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import numpy as np
from cleverdict import CleverDict
from scipy.integrate import trapezoid
from ..constants import pi
from ..file_utils import FileReader
from ..local_geometry import (
LocalGeometry,
LocalGeometryFourierCGYRO,
LocalGeometryMiller,
LocalGeometryMXH,
MetricTerms,
default_fourier_cgyro_inputs,
default_miller_inputs,
default_mxh_inputs,
)
from ..local_species import LocalSpecies
from ..normalisation import SimulationNormalisation as Normalisation
from ..normalisation import convert_dict
from ..numerics import Numerics
from ..templates import gk_templates
from ..typing import PathLike
from .gk_input import GKInput
from .gk_output import (
Coords,
Eigenfunctions,
Eigenvalues,
Fields,
Fluxes,
GKOutput,
Moments,
)
[docs]
class GKInputCGYRO(GKInput, FileReader, file_type="CGYRO", reads=GKInput):
"""
Class that can read CGYRO input files, and produce
Numerics, LocalSpecies, and LocalGeometry objects
"""
code_name = "CGYRO"
default_file_name = "input.cgyro"
norm_convention = "cgyro"
_convention_dict = {}
_legacy_cgyro = True
pyro_cgyro_miller = {
"rho": "RMIN",
"Rmaj": "RMAJ",
"q": "Q",
"kappa": "KAPPA",
"s_kappa": "S_KAPPA",
"delta": "DELTA",
"s_delta": "S_DELTA",
"shat": "S",
"shift": "SHIFT",
"Z0": "ZMAG",
"dZ0dr": "DZMAG",
"ip_ccw": "IPCCW",
"bt_ccw": "BTCCW",
}
pyro_cgyro_mxh = {
**pyro_cgyro_miller,
"zeta": "ZETA",
"s_zeta": "S_ZETA",
"cn0": "SHAPE_COS0",
"cn1": "SHAPE_COS1",
"cn2": "SHAPE_COS2",
"cn3": "SHAPE_COS3",
"cn4": "SHAPE_COS4",
"cn5": "SHAPE_COS5",
"cn6": "SHAPE_COS6",
"sn3": "SHAPE_SIN3",
"sn4": "SHAPE_SIN4",
"sn5": "SHAPE_SIN5",
"sn6": "SHAPE_SIN6",
"dcndr0": "SHAPE_S_COS0",
"dcndr1": "SHAPE_S_COS1",
"dcndr2": "SHAPE_S_COS2",
"dcndr3": "SHAPE_S_COS3",
"dcndr4": "SHAPE_S_COS4",
"dcndr5": "SHAPE_S_COS5",
"dcndr6": "SHAPE_S_COS6",
"dsndr3": "SHAPE_S_SIN3",
"dsndr4": "SHAPE_S_SIN4",
"dsndr5": "SHAPE_S_SIN5",
"dsndr6": "SHAPE_S_SIN6",
}
pyro_cgyro_miller_defaults = {
"rho": 0.5,
"Rmaj": 3.0,
"q": 2.0,
"kappa": 1.0,
"s_kappa": 0.0,
"delta": 0.0,
"s_delta": 0.0,
"shat": 1.0,
"shift": 0.0,
"Z0": 0.0,
"dZ0dr": 0.0,
"ip_ccw": -1.0,
"bt_ccw": -1.0,
}
pyro_cgyro_mxh_defaults = {
**pyro_cgyro_miller_defaults,
"zeta": 0.0,
"s_zeta": 0.0,
"cn0": 0.0,
"cn1": 0.0,
"cn2": 0.0,
"cn3": 0.0,
"cn4": 0.0,
"cn5": 0.0,
"cn6": 0.0,
"sn3": 0.0,
"sn4": 0.0,
"sn5": 0.0,
"sn6": 0.0,
"dcndr0": 0.0,
"dcndr1": 0.0,
"dcndr2": 0.0,
"dcndr3": 0.0,
"dcndr4": 0.0,
"dcndr5": 0.0,
"dcndr6": 0.0,
"dsndr3": 0.0,
"dsndr4": 0.0,
"dsndr5": 0.0,
"dsndr6": 0.0,
}
pyro_cgyro_fourier = pyro_cgyro_miller
pyro_cgyro_fourier_defaults = pyro_cgyro_miller_defaults
[docs]
@staticmethod
def get_pyro_cgyro_species(iSp=1):
return {
"mass": f"MASS_{iSp}",
"z": f"Z_{iSp}",
"dens": f"DENS_{iSp}",
"temp": f"TEMP_{iSp}",
"inverse_lt": f"DLNTDR_{iSp}",
"inverse_ln": f"DLNNDR_{iSp}",
}
cgyro_eq_types = {
1: "SAlpha",
2: "MXH",
3: "Fourier",
}
[docs]
def set_legacy_cgyro(self, flag: bool):
"""
Set dictionay flags used to access namelist to legacy values or not
"""
self._legacy_cgyro = flag
[docs]
def read_from_file(
self, filename: PathLike, detect_norm: bool = True
) -> Dict[str, Any]:
"""
Reads CGYRO input file into a dictionary
"""
with open(filename) as f:
data_dict = self.parse_cgyro(f)
return super().read_dict(data_dict, detect_norm=detect_norm)
[docs]
def read_str(self, input_string: str, detect_norm: bool = True) -> Dict[str, Any]:
"""
Reads CGYRO input file given as string
"""
data_dict = self.parse_cgyro(input_string.split("\n"))
return super().read_dict(data_dict, detect_norm=detect_norm)
[docs]
def read_dict(self, input_dict: dict, detect_norm: bool = True) -> Dict[str, Any]:
"""
Reads GENE input file given as dict
Uses default read_dict, which assumes input is a dict
"""
return super().read_dict(input_dict, detect_norm=detect_norm)
[docs]
@staticmethod
def parse_cgyro(lines):
"""
Given lines of a cgyro file or a string split by '/n', return a dict of
CGYRO input data
"""
results = {}
for line in lines:
# Get line before comments, remove trailing whitespace
line = line.split("#")[0].strip()
# Skip empty lines (this will also skip comment lines)
if not line:
continue
# Splits by =, remove whitespace, store as (key,value) pair
key, value = (token.strip() for token in line.split("="))
# Use literal_eval to convert value to int/float/list etc
# If it fails, assume value should be a string
try:
results[key] = literal_eval(value)
except Exception:
results[key] = value
return results
[docs]
def verify_file_type(self, filename: PathLike):
"""
Ensure this file is a valid cgyro input file, and that it contains sufficient
info for Pyrokinetics to work with
"""
# The following keys are not strictly needed for a CGYRO input file,
# but they are needed by Pyrokinetics
expected_keys = [
"BETAE_UNIT",
"N_SPECIES",
"NU_EE",
"N_FIELD",
"N_RADIAL",
"RMIN",
"RMAJ",
"Q",
"S",
]
self.verify_expected_keys(filename, expected_keys)
[docs]
def write(
self,
filename: PathLike,
float_format: str = "",
local_norm: Normalisation = None,
code_normalisation: str = None,
):
# Create directories if they don't exist already
filename = Path(filename)
filename.parent.mkdir(parents=True, exist_ok=True)
if local_norm is None:
local_norm = Normalisation("write")
if code_normalisation is None:
code_normalisation = self.code_name.lower()
convention = getattr(local_norm, code_normalisation)
self.data = convert_dict(self.data, convention)
with open(filename, "w") as f:
for key, value in self.data.items():
if isinstance(value, float):
line = f"{key} = {value:{float_format}}\n"
else:
line = f"{key} = {value}\n"
f.write(line)
[docs]
def add_flags(self, flags) -> None:
"""
Add extra flags to CGYRO input file
"""
for key, value in flags.items():
self.data[key] = value
[docs]
def get_local_geometry(self) -> LocalGeometry:
"""
Returns local geometry. Delegates to more specific functions
"""
if hasattr(self, "convention"):
convention = self.convention
else:
norms = Normalisation("get_local_geometry")
convention = getattr(norms, self.norm_convention)
eq_type = self.cgyro_eq_types[self.data["EQUILIBRIUM_MODEL"]]
is_basic_miller = self._check_basic_miller()
if eq_type == "MXH" and is_basic_miller:
eq_type = "Miller"
if eq_type == "Miller":
local_geometry = self.get_local_geometry_miller()
elif eq_type == "MXH":
local_geometry = self.get_local_geometry_mxh()
elif eq_type == "Fourier":
local_geometry = self.get_local_geometry_fourier()
else:
raise NotImplementedError(
f"LocalGeometry type {eq_type} not implemented for CGYRO"
)
local_geometry.B0 = 1.0 / local_geometry.bunit_over_b0
local_geometry.dpsidr *= local_geometry.B0
local_geometry.normalise(norms=convention)
local_geometry.Fpsi = local_geometry.get_f_psi()
local_geometry.FF_prime = local_geometry.get_f_prime() * local_geometry.Fpsi
return local_geometry
[docs]
def get_local_geometry_miller(self) -> LocalGeometryMiller:
"""
Load Miller object from CGYRO file
"""
miller_data = default_miller_inputs()
for (key, val), val_default in zip(
self.pyro_cgyro_miller.items(),
self.pyro_cgyro_miller_defaults.values(),
):
miller_data[key] = self.data.get(val, val_default)
miller_data["s_delta"] *= 1.0 / np.sqrt(1 - miller_data["delta"] ** 2)
# Assume ne * Te*8pi*1e-7 = 1.0
(
ne,
Te,
) = self.get_ne_te_normalisation()
beta = self.data.get("BETAE_UNIT", 0.0) / (ne * Te)
# Need species to set up beta_prime
local_species = self.get_local_species()
beta_prime_scale = self.data.get("BETA_STAR_SCALE", 1.0)
miller_data["beta_prime"] = (
-local_species.inverse_lp.m
* local_species.pressure.m
* beta_prime_scale
* beta
)
miller = LocalGeometryMiller.from_gk_data(miller_data)
return miller
[docs]
def get_local_geometry_mxh(self) -> LocalGeometryMXH:
"""
Load MXH object from CGYRO file
"""
mxh_data = default_mxh_inputs(n_moments=7)
for (key, val), default in zip(
self.pyro_cgyro_mxh.items(), self.pyro_cgyro_mxh_defaults.values()
):
if "SHAPE" not in val:
mxh_data[key] = self.data.get(val, default)
else:
index = int(key[-1])
new_key = key[:-1]
if "SHAPE_S" in val:
mxh_data[new_key][index] = (
self.data.get(val, default) / mxh_data["rho"]
)
else:
mxh_data[new_key][index] = self.data.get(val, default)
mxh_keys = ["cn", "sn", "dcndr", "dsndr"]
for i_moment in range(6, 2, -1):
if np.all(
[True if mxh_data[key][i_moment] == 0.0 else False for key in mxh_keys]
):
for key in mxh_keys:
mxh_data[key] = mxh_data[key][:-1]
else:
break
# Force dsndr[0] = 0 as is definition
mxh_data["dsndr"][0] = 0.0
mxh_data["n_moments"] = len(mxh_data["cn"])
# Assume ne * Te *8pi*1e-7 = 1.0
(
ne,
Te,
) = self.get_ne_te_normalisation()
beta = self.data.get("BETAE_UNIT", 0.0) / (ne * Te)
# Need species to set up beta_prime
local_species = self.get_local_species()
beta_prime_scale = self.data.get("BETA_STAR_SCALE", 1.0)
mxh_data["beta_prime"] = (
-local_species.inverse_lp.m
* local_species.pressure.m
* beta_prime_scale
* beta
)
mxh = LocalGeometryMXH.from_gk_data(mxh_data)
mxh.dthetaR_dr = mxh.get_dthetaR_dr(mxh.theta, mxh.dcndr, mxh.dsndr)
return mxh
[docs]
def get_local_geometry_fourier(self) -> LocalGeometryFourierCGYRO:
"""
Load Fourier object from CGYRO file
"""
fourier_data = default_fourier_cgyro_inputs()
for (key, val), val_default in zip(
self.pyro_cgyro_fourier.items(), self.pyro_cgyro_fourier_defaults.values()
):
fourier_data[key] = self.data.get(val, val_default)
# Assume pref*8pi*1e-7 = 1.0
(
ne,
Te,
) = self.get_ne_te_normalisation()
beta = self.data.get("BETAE_UNIT", 0.0) / (ne * Te)
# Need species to set up beta_prime
local_species = self.get_local_species()
beta_prime_scale = self.data.get("BETA_STAR_SCALE", 1.0)
fourier_data["beta_prime"] = (
-local_species.inverse_lp.m
* local_species.pressure.m
* beta_prime_scale
* beta
)
fourier = LocalGeometryFourierCGYRO.from_gk_data(fourier_data)
return fourier
[docs]
def get_local_species(self):
"""
Load LocalSpecies object from CGYRO file
"""
# Dictionary of local species parameters
local_species = LocalSpecies()
ion_count = 0
if hasattr(self, "convention"):
convention = self.convention
else:
norms = Normalisation("get_local_species")
convention = getattr(norms, self.norm_convention)
# Load each species into a dictionary
for i_sp in range(self.data["N_SPECIES"]):
pyro_cgyro_species = self.get_pyro_cgyro_species(i_sp + 1)
species_data = CleverDict()
for p_key, c_key in pyro_cgyro_species.items():
species_data[p_key] = self.data[c_key]
species_data.omega0 = self.data.get("MACH", 0.0) / self.data["RMAJ"]
species_data.domega_drho = (
-self.data.get("GAMMA_P", 0.0) / self.data["RMAJ"]
)
if species_data.z == -1:
name = "electron"
species_data.nu = (
self.data.get("NU_EE", 0.1) * convention.vref / convention.lref
)
else:
ion_count += 1
name = f"ion{ion_count}"
species_data.name = name
# normalisations
species_data.dens *= convention.nref
species_data.mass *= convention.mref
species_data.temp *= convention.tref
species_data.z *= convention.qref
species_data.inverse_lt *= convention.lref**-1
species_data.inverse_ln *= convention.lref**-1
species_data.omega0 *= convention.vref / convention.lref
species_data.domega_drho *= convention.vref / convention.lref**2
# Add individual species data to dictionary of species
local_species.add_species(name=name, species_data=species_data)
# Handle adiabatic species if there
if self.data.get("AE_FLAG", 0) == 1:
nu_ee = self.data.get("NU_EE", 0.1) * convention.vref / convention.lref
te = self.data.get("TEMP_AE", 1.0) * convention.tref
ne = self.data.get("DENS_AE", 1.0) * convention.nref
me = self.data.get("MASS_AE", 2.724486e-4) * convention.mref
else:
nu_ee = local_species.electron.nu
te = local_species.electron.temp
ne = local_species.electron.dens
me = local_species.electron.mass
# Get collision frequency of ion species
for ion in range(ion_count):
key = f"ion{ion + 1}"
nion = local_species[key]["dens"]
tion = local_species[key]["temp"]
mion = local_species[key]["mass"]
zion = local_species[key]["z"]
# Not exact at log(Lambda) does change but pretty close...
local_species[key]["nu"] = (
nu_ee
* (zion**4 * nion / tion**1.5 / mion**0.5)
/ (ne / te**1.5 / me**0.5)
).m * nu_ee.units
# Normalise to pyrokinetics normalisations and calculate total pressure gradient
local_species.normalise(convention)
if self._legacy_cgyro:
if self.data.get("Z_EFF_METHOD", 2) == 2:
local_species.set_zeff()
else:
local_species.zeff = self.data.get("Z_EFF", 1.0) * convention.qref
else:
local_species.set_zeff()
return local_species
[docs]
def get_numerics(self) -> Numerics:
"""Gather numerical info (grid spacing, time steps, etc)"""
if hasattr(self, "convention"):
convention = self.convention
else:
norms = Normalisation("get_numerics")
convention = getattr(norms, self.norm_convention)
numerics_data = {}
nfields = self.data.get("N_FIELD", 1)
numerics_data["phi"] = nfields >= 1
numerics_data["apar"] = nfields >= 2
numerics_data["bpar"] = nfields >= 3
numerics_data["delta_time"] = self.data.get("DELTA_T", 0.01)
numerics_data["max_time"] = self.data.get("MAX_TIME", 1.0)
numerics_data["ky"] = (
self.data["KY"] / self.get_local_geometry().bunit_over_b0.m
)
numerics_data["nky"] = self.data.get("N_TOROIDAL", 1)
numerics_data["theta0"] = 2 * pi * self.data.get("PX0", 0.0)
numerics_data["nkx"] = self.data.get("N_RADIAL", 1)
if not self.is_nonlinear():
numerics_data["nperiod"] = int(self.data["N_RADIAL"] / 2)
else:
numerics_data["nperiod"] = 1
shat = self.data[self.pyro_cgyro_miller["shat"]]
box_size = self.data.get("BOX_SIZE", 1)
if numerics_data["nky"] == 1:
numerics_data["kx"] = numerics_data["ky"] * shat * numerics_data["theta0"]
else:
numerics_data["kx"] = numerics_data["ky"] * 2 * pi * shat / box_size
numerics_data["ntheta"] = self.data.get("N_THETA", 24)
numerics_data["nenergy"] = self.data.get("N_ENERGY", 8)
numerics_data["npitch"] = self.data.get("N_XI", 16)
numerics_data["nonlinear"] = self.is_nonlinear()
(
ne,
Te,
) = self.get_ne_te_normalisation()
numerics_data["beta"] = self.data.get("BETAE_UNIT", 0.0) / (ne * Te)
numerics_data["gamma_exb"] = self.data.get("GAMMA_E", 0.0)
return Numerics(**numerics_data).with_units(convention)
[docs]
def get_reference_values(self, local_norm: Normalisation) -> Dict[str, Any]:
"""
Reads in reference values from input file
"""
return {}
def _detect_normalisation(self):
"""
Determines the necessary inputs and passes information to the base method _set_up_normalisation.
The following values are needed
default_references: dict
Dictionary containing default reference values for the
gk_code: str
GK code
electron_density: float
Electron density from GK input
electron_temperature: float
Electron density from GK input
e_mass: float
Electron mass from GK input
electron_index: int
Index of electron in list of data
found_electron: bool
Flag on whether electron was found
densities: ArrayLike
List of species densities
temperatures: ArrayLike
List of species temperature
reference_density_index: ArrayLike
List of indices where the species has a density of 1.0
reference_temperature_index: ArrayLike
List of indices where the species has a temperature of 1.0
major_radius: float
Normalised major radius from GK input
rgeo_rmaj: float
Ratio of Geometric and flux surface major radius
minor_radius: float
Normalised minor radius from GK input
"""
default_references = {
"nref_species": "electron",
"tref_species": "electron",
"mref_species": "deuterium",
"bref": "Bunit",
"lref": "minor_radius",
"ne": 1.0,
"te": 1.0,
"rgeo_rmaj": 1.0,
"vref": "nrl",
"rhoref": "unit",
"raxis_rmaj": None,
}
reference_density_index = []
reference_temperature_index = []
densities = []
temperatures = []
masses = []
found_electron = False
e_mass = None
electron_temperature = None
electron_density = None
electron_index = None
if self.data.get("AE_FLAG", 0) == 1:
dens = self.data["DENS_AE"]
temp = self.data["TEMP_AE"]
mass = self.data["MASS_AE"]
electron_density = dens
electron_temperature = temp
e_mass = mass
electron_index = 0
found_electron = True
if np.isclose(dens, 1.0):
reference_density_index.append(0)
if np.isclose(temp, 1.0):
reference_temperature_index.append(0)
densities.append(dens)
temperatures.append(temp)
masses.append(mass)
else:
for i_sp in range(self.data["N_SPECIES"]):
dens = self.data[f"DENS_{i_sp + 1}"]
temp = self.data[f"TEMP_{i_sp + 1}"]
mass = self.data[f"MASS_{i_sp + 1}"]
if self.data[f"Z_{i_sp + 1}"] == -1:
electron_density = dens
electron_temperature = temp
e_mass = mass
electron_index = i_sp
found_electron = True
if np.isclose(dens, 1.0):
reference_density_index.append(i_sp)
if np.isclose(temp, 1.0):
reference_temperature_index.append(i_sp)
densities.append(dens)
temperatures.append(temp)
masses.append(mass)
major_radius = self.data["RMAJ"]
minor_radius = 1.0
super()._set_up_normalisation(
default_references=default_references,
gk_code=self.code_name.lower(),
electron_density=electron_density,
electron_temperature=electron_temperature,
e_mass=e_mass,
electron_index=electron_index,
found_electron=found_electron,
densities=densities,
temperatures=temperatures,
reference_density_index=reference_density_index,
reference_temperature_index=reference_temperature_index,
major_radius=major_radius,
rgeo_rmaj=1.0,
minor_radius=minor_radius,
)
[docs]
def set(
self,
local_geometry: LocalGeometry,
local_species: LocalSpecies,
numerics: Numerics,
local_norm: Optional[Normalisation] = None,
template_file: Optional[PathLike] = None,
code_normalisation: Optional[str] = None,
**kwargs,
):
"""
Set self.data using LocalGeometry, LocalSpecies, and Numerics.
These may be obtained via another GKInput file, or from Equilibrium/Kinetics
objects.
"""
# If self.data is not already populated, fill in defaults from a given
# template file. If this is not provided by the user, fall back to the
# default.
if self.data is None:
if template_file is None:
template_file = gk_templates["CGYRO"]
self.read_from_file(template_file)
if local_norm is None:
local_norm = Normalisation("set")
if code_normalisation is None:
code_normalisation = self.norm_convention
convention = getattr(local_norm, code_normalisation)
# Geometry data
if isinstance(local_geometry, LocalGeometryMXH):
eq_model = 2
eq_type = "MXH"
elif isinstance(local_geometry, LocalGeometryMiller):
eq_model = 2
eq_type = "Miller"
elif isinstance(local_geometry, LocalGeometryFourierCGYRO):
eq_model = 3
eq_type = "Fourier"
raise NotImplementedError(
f"LocalGeometry type {local_geometry.__class__.__name__} not "
"implemented yet for CGYRO"
)
else:
raise NotImplementedError(
f"LocalGeometry type {local_geometry.__class__.__name__} not "
"implemented for CGYRO"
)
# Set equilibrium type in input file
self.data["EQUILIBRIUM_MODEL"] = eq_model
if eq_type == "Miller":
# Assign Miller values to input file
for key, val in self.pyro_cgyro_miller.items():
self.data[val] = local_geometry[key]
self.data["S_DELTA"] = local_geometry.s_delta * np.sqrt(
1 - local_geometry.delta**2
)
# Need to remove any MXH keys
for mxh_key in self.pyro_cgyro_mxh.keys():
if (
mxh_key not in self.pyro_cgyro_miller.keys()
and mxh_key.upper() in self.data.keys()
):
self.data.pop(mxh_key.upper())
elif eq_type == "Fourier":
# Assign Fourier values to input file
for key, val in self.pyro_cgyro_fourier.items():
self.data[val] = local_geometry[key]
elif eq_type == "MXH":
# Assign MXH values to input file
for key, val in self.pyro_cgyro_mxh.items():
if "SHAPE" not in val:
self.data[val] = getattr(local_geometry, key)
else:
index = int(key[-1])
new_key = key[:-1]
# Skip in index beyond n_moments
if index >= local_geometry.n_moments:
continue
if "SHAPE_S" in val:
self.data[val] = (
getattr(local_geometry, new_key)[index] * local_geometry.rho
)
else:
self.data[val] = getattr(local_geometry, new_key)[index]
# Kinetic data
n_species = local_species.nspec
self.data["N_SPECIES"] = n_species
stored_species = len([key for key in self.data.keys() if "DENS_" in key])
extra_species = stored_species - n_species
if extra_species > 0:
for i_sp in range(extra_species):
pyro_cgyro_species = self.get_pyro_cgyro_species(i_sp + 1 + n_species)
for cgyro_key in pyro_cgyro_species.values():
if cgyro_key in self.data:
self.data.pop(cgyro_key)
for i_sp, name in enumerate(local_species.names):
pyro_cgyro_species = self.get_pyro_cgyro_species(i_sp + 1)
for pyro_key, cgyro_key in pyro_cgyro_species.items():
self.data[cgyro_key] = local_species[name][pyro_key]
if not self._legacy_cgyro:
if "Z_EFF" in self.data:
self.data.pop("Z_EFF")
if "Z_EFF_METHOD" in self.data:
self.data.pop("Z_EFF_METHOD")
else:
self.data["Z_EFF_METHOD"] = 1
self.data["Z_EFF"] = local_species.zeff
if "electron" in local_species.names:
first_species = "electron"
self.data["NU_EE"] = local_species.electron.nu
else:
first_species = local_species.names[0]
zion = local_species[first_species].z
nion = local_species[first_species].dens
tion = local_species[first_species].temp
mion = local_species[first_species].mass
te = self.data.get("TEMP_AE", 1.0) * convention.tref
ne = self.data.get("DENS_AE", 1.0) * convention.nref
me = self.data.get("MASS_AE", 2.724486e-4) * convention.mref
self.data["NU_EE"] = (
local_species[first_species].nu
/ (zion**4 * nion / tion**1.5 / mion**0.5)
* (ne / te**1.5 / me**0.5)
)
# Set adiabatic flags
self.data["AE_FLAG"] = 1
self.data["DENS_AE"] = ne
self.data["TEMP_AE"] = te
self.data["MASS_AE"] = me
self.data["MACH"] = local_species[first_species].omega0 * self.data["RMAJ"]
self.data["GAMMA_P"] = (
-local_species[first_species].domega_drho * self.data["RMAJ"]
)
beta_ref = convention.beta if local_norm else 0.0
beta = numerics.beta if numerics.beta is not None else beta_ref
# Calculate beta_prime_scale
if beta != 0.0:
beta_prime_scale = -local_geometry.beta_prime.to(convention) / (
local_species.inverse_lp.to(convention)
* local_species.pressure.to(convention).m
* beta.to(convention)
)
else:
beta_prime_scale = 1.0
# CGYRO beta is ALWAYS ne*Te/Bunit^2 regardless of existing nref and Tref
original_convention = getattr(local_norm, self.norm_convention)
ne = (1 * original_convention.nref).to(local_norm.cgyro)
Te = (1 * original_convention.tref).to(local_norm.cgyro)
self.data["BETAE_UNIT"] = beta * ne * Te
self.data["BETA_STAR_SCALE"] = beta_prime_scale
# Numerics
if numerics.bpar and not numerics.apar:
raise ValueError("Can't have bpar without apar in CGYRO")
self.data["N_FIELD"] = 1 + int(numerics.bpar) + int(numerics.apar)
# Set time stepping
self.data["DELTA_T"] = numerics.delta_time
self.data["MAX_TIME"] = numerics.max_time
if numerics.nonlinear:
self.data["NONLINEAR_FLAG"] = 1
self.data["N_RADIAL"] = numerics.nkx
if numerics.kx == 0.0:
self.data["BOX_SIZE"] = 1
else:
self.data["BOX_SIZE"] = int(
(numerics.ky * 2 * pi * local_geometry.shat / numerics.kx) + 0.1
)
else:
self.data["NONLINEAR_FLAG"] = 0
self.data["N_RADIAL"] = numerics.nperiod * 2
self.data["BOX_SIZE"] = 1
self.data["KY"] = numerics.ky * local_geometry.bunit_over_b0.m
self.data["N_TOROIDAL"] = numerics.nky
self.data["N_THETA"] = numerics.ntheta
self.data["THETA_PLOT"] = numerics.ntheta
self.data["PX0"] = numerics.theta0 / (2 * pi)
self.data["GAMMA_E"] = numerics.gamma_exb
self.data["N_ENERGY"] = numerics.nenergy
self.data["N_XI"] = numerics.npitch
if not local_norm:
return
self.data = convert_dict(self.data, convention)
def _check_basic_miller(self):
"""
Checks if CGYRO input file is a basic Miller geometry by seeing if moments that are higher than triangularity
are 0
Returns
-------
is_basic_miller: Boolean
True if Miller, False is MXH
"""
mxh_only_parameters = [
"ZETA",
"S_ZETA",
"SHAPE_COS0",
"SHAPE_COS1",
"SHAPE_COS2",
"SHAPE_COS3",
"SHAPE_SIN3",
"SHAPE_S_COS0",
"SHAPE_S_COS1",
"SHAPE_S_COS2",
"SHAPE_S_COS3",
"SHAPE_S_SIN3",
]
is_basic_miller = True
for param in mxh_only_parameters:
if self.data.get(param, 0.0) != 0:
is_basic_miller = False
break
return is_basic_miller
[docs]
def get_ne_te_normalisation(self):
found_electron = False
if self.data.get("AE_FLAG", 0) == 1:
ne = self.data["DENS_AE"]
Te = self.data["TEMP_AE"]
found_electron = True
else:
for i_sp in range(self.data["N_SPECIES"]):
if self.data[f"Z_{i_sp + 1}"] == -1:
ne = self.data[f"DENS_{i_sp + 1}"]
Te = self.data[f"TEMP_{i_sp + 1}"]
found_electron = True
break
if not found_electron:
raise TypeError(
"Pyro currently requires an electron species in the input file"
)
return ne, Te
[docs]
class CGYROFile:
[docs]
def __init__(self, path: PathLike, required: bool):
self.path = Path(path)
self.required = required
self.fmt = self.path.name.split(".")[0]
[docs]
class GKOutputReaderCGYRO(FileReader, file_type="CGYRO", reads=GKOutput):
fields = ["phi", "apar", "bpar"]
moments = ["n", "e", "v"]
[docs]
def read_from_file(
self,
filename: PathLike,
norm: Normalisation,
output_convention: str = "pyrokinetics",
load_fields=True,
load_fluxes=True,
load_moments=False,
downsample: Dict[str, Any] = {},
) -> GKOutput:
raw_data, gk_input, input_str = self._get_raw_data(
filename, load_fields, load_moments
)
# Assign units and return GKOutput
convention = getattr(norm, gk_input.norm_convention)
gk_input.convention = convention
norm.default_convention = output_convention.lower()
fmt_downsample = {}
for key, value in downsample.items():
if isinstance(value, int):
fmt_downsample[key + "_idx"] = slice(value, value + 1)
else:
fmt_downsample[key + "_idx"] = value
downsample = fmt_downsample
bunit_over_b0 = (1 * norm.cgyro.bref).to(norm.gs2).m
coords = self._get_coords(raw_data, gk_input, bunit_over_b0, downsample)
fields = (
self._get_fields(raw_data, gk_input, coords, downsample)
if load_fields
else None
)
fluxes = (
self._get_fluxes(raw_data, gk_input, coords, downsample)
if load_fluxes
else None
)
moments = (
self._get_moments(raw_data, gk_input, coords, downsample)
if load_moments
else None
)
if coords["linear"] and (
coords["ntheta_plot"] != coords["ntheta_grid"] or not fields
):
eigenvalues = self._get_eigenvalues(raw_data, coords, gk_input)
eigenfunctions = self._get_eigenfunctions(raw_data, coords)
else:
# Rely on gk_output to generate eigenvalues
eigenvalues = None
eigenfunctions = None
normalise_flux_moment = False
# Linear CGYRO outputs are normalised by <\phi(t)^2> and do not include factor
# k_y \rho_s so these must be undone and added now
if coords["linear"] and fields:
phi2 = np.abs(fields["phi"]) ** 2
w_theta = coords["w_theta"][:, None, None, None]
phi2_int = np.sum(phi2 * w_theta, axis=0) * gk_input.data["KY"] * 2
# Sum over kx if nky > 1
if len(coords["ky"]) > 1:
phi2_int = np.sum(phi2_int, axis=0)
if fluxes:
fluxes = {k: v * phi2_int for k, v in fluxes.items()}
if moments:
moments = {k: v * phi2_int for k, v in moments.items()}
normalise_flux_moment = True
field_dims = ("theta", "kx", "ky", "time")
flux_dims = ("field", "species", "ky", "time")
moment_dims = ("theta", "kx", "species", "ky", "time")
gk_output = GKOutput(
coords=Coords(
time=coords["time"],
kx=coords["kx"],
ky=coords["ky"],
theta=coords["theta"],
pitch=coords["pitch"],
energy=coords["energy"],
species=coords["species"],
field=coords["field"],
).with_units(convention),
norm=norm,
fields=(
Fields(**fields, dims=field_dims).with_units(convention)
if fields
else None
),
fluxes=(
Fluxes(**fluxes, dims=flux_dims).with_units(convention)
if fluxes
else None
),
moments=(
Moments(**moments, dims=moment_dims).with_units(convention)
if moments
else None
),
eigenvalues=(
Eigenvalues(**eigenvalues).with_units(convention)
if eigenvalues
else None
),
eigenfunctions=(
None
if eigenfunctions is None
else Eigenfunctions(eigenfunctions).with_units(convention)
),
linear=coords["linear"],
gk_code="CGYRO",
input_file=input_str,
normalise_flux_moment=normalise_flux_moment,
output_convention=output_convention,
jacobian=coords["jacobian"],
)
# Storing state of class variables for @staticmethod functions
gk_output._downsample = downsample
gk_output._raw_coords = coords
return gk_output
[docs]
def verify_file_type(self, dirname: PathLike):
dirname = Path(dirname)
for f in self._required_files(dirname).values():
if not f.path.exists():
raise RuntimeError(f"Missing the file '{f.path}'")
[docs]
@staticmethod
def infer_path_from_input_file(filename: PathLike) -> Path:
"""
Given path to input file, guess at the path for associated output files.
For CGYRO, simply returns dir of the path.
"""
return Path(filename).parent
@staticmethod
def _required_files(dirname: PathLike):
dirname = Path(dirname)
return {
"input": CGYROFile(dirname / "input.cgyro", required=True),
"time": CGYROFile(dirname / "out.cgyro.time", required=True),
"grids": CGYROFile(dirname / "out.cgyro.grids", required=True),
"equilibrium": CGYROFile(dirname / "out.cgyro.equilibrium", required=True),
"geo": CGYROFile(dirname / "bin.cgyro.geo", required=True),
}
@classmethod
def _get_raw_data(
cls, dirname: PathLike, load_fields, load_moments
) -> Tuple[Dict[str, Any], GKInputCGYRO, str]:
dirname = Path(dirname)
if not dirname.exists():
raise RuntimeError(
f"GKOutputReaderCGYRO: Provided path {dirname} does not exist. "
"Please supply the name of a directory containing CGYRO output files."
)
if not dirname.is_dir():
raise RuntimeError(
f"GKOutputReaderCGYRO: Provided path {dirname} is not a directory. "
"Please supply the name of a directory containing CGYRO output files."
)
# The following list of CGYRO files may exist
expected_files = {
**cls._required_files(dirname),
"flux": CGYROFile(dirname / "bin.cgyro.ky_flux", required=False),
"cflux": CGYROFile(dirname / "bin.cgyro.ky_cflux", required=False),
"eigenvalues_bin": CGYROFile(dirname / "bin.cgyro.freq", required=False),
"eigenvalues_out": CGYROFile(dirname / "out.cgyro.freq", required=False),
**{
f"field_{f}": CGYROFile(dirname / f"bin.cgyro.kxky_{f}", required=False)
for f in cls.fields
if load_fields
},
**{
f"moment_{m}": CGYROFile(
dirname / f"bin.cgyro.kxky_{m}", required=False
)
for m in cls.moments
if load_moments
},
**{
f"eigenfunctions_{f}": CGYROFile(
dirname / f"bin.cgyro.{f}b", required=False
)
for f in cls.fields
},
}
# Read in files
raw_data = {}
for key, cgyro_file in expected_files.items():
if not cgyro_file.path.exists():
if cgyro_file.required:
raise RuntimeError(
f"GKOutputReaderCGYRO: The file {cgyro_file.path.name} is needed"
)
continue
# Read in file according to format
if cgyro_file.fmt == "input":
with open(cgyro_file.path, "r") as f:
raw_data[key] = f.read()
if cgyro_file.fmt == "out":
raw_data[key] = np.loadtxt(cgyro_file.path)
if cgyro_file.fmt == "bin":
# Promote to 64 bit float here, as with older NumPy versions this
# can lock us into low precision computation throughout
# raw_data[key] = np.asarray(
# np.fromfile(cgyro_file.path, dtype=np.float32), dtype=float
# )
raw_data[key] = np.asarray(
np.memmap(cgyro_file.path, dtype=np.float32, mode="r")
)
input_str = raw_data["input"]
gk_input = GKInputCGYRO()
gk_input.read_str(input_str)
return raw_data, gk_input, input_str
@staticmethod
def _get_coords(
raw_data: Dict[str, Any],
gk_input: GKInputCGYRO,
bunit_over_b0: float = None,
downsample: Dict[str, Any] = {},
) -> Dict[str, Any]:
"""
Sets coords and attrs of a Pyrokinetics dataset from a collection of CGYRO
files.
Args:
raw_data (Dict[str,Any]): Dict containing CGYRO output.
gk_input (GKInputCGYRO): Processed CGYRO input file.
Returns:
Dict: Dictionary with coords
"""
# Process time data
time = raw_data["time"][:, 0]
# Process grid data
grid_data = raw_data["grids"]
nky = int(grid_data[0])
nspecies = int(grid_data[1])
nfield = int(grid_data[2])
nkx = int(grid_data[3])
ntheta_grid = int(grid_data[4])
nenergy = int(grid_data[5])
npitch = int(grid_data[6])
box_size = int(grid_data[7])
length_x = grid_data[8]
ntheta_plot = int(grid_data[10])
# Iterate through grid_data in chunks, starting after kx
pos = 11 + nkx
theta_grid = grid_data[pos : pos + ntheta_grid]
pos += ntheta_grid
energy = grid_data[pos : pos + nenergy]
pos += nenergy
pitch = grid_data[pos : pos + npitch]
pos += npitch
ntheta_ballooning = ntheta_grid * int(nkx / box_size)
theta_ballooning = grid_data[pos : pos + ntheta_ballooning]
pos += ntheta_ballooning
ky = np.abs(
grid_data[pos : pos + nky] / bunit_over_b0
) # Force the sign of ky to be positive
if gk_input.is_linear() and nky == 1:
# Convert to ballooning co-ordinate so only 1 kx
theta = theta_ballooning
ntheta = ntheta_ballooning
kx = [0.0]
nkx = 1
theta0 = gk_input.data.get("PX0", 0.0) * 2 * np.pi
else:
# Output data actually given on theta_plot grid
ntheta = ntheta_plot
theta = np.array(
([0.0] if ntheta == 1 else theta_grid[:: ntheta_grid // ntheta])
)
kx = (
2
* pi
* np.linspace(-int(nkx / 2), int((nkx + 1) / 2) - 1, nkx)
/ length_x
) / bunit_over_b0
theta0 = 0.0
theta += -theta0
# Get rho_star from equilibrium file
if len(raw_data["equilibrium"]) == 54 + 7 * nspecies:
rho_star = raw_data["equilibrium"][35]
elif len(raw_data["equilibrium"]) == 54 + 9 * nspecies:
rho_star = raw_data["equilibrium"][35]
elif len(raw_data["equilibrium"]) == 55 + 10 * nspecies:
rho_star = raw_data["equilibrium"][35]
elif len(raw_data["equilibrium"]) == 57 + 9 * nspecies:
rho_star = raw_data["equilibrium"][35]
elif len(raw_data["equilibrium"]) == 48 + 9 * nspecies:
rho_star = raw_data["equilibrium"][35]
gk_input.set_legacy_cgyro(False)
else:
rho_star = raw_data["equilibrium"][23]
fields = ["phi", "apar", "bpar"][:nfield]
fluxes = ["particle", "heat", "momentum"]
moments = ["density", "temperature", "velocity"]
species = gk_input.get_local_species().names
if nspecies != len(species):
raise RuntimeError(
"GKOutputReaderCGYRO: Different number of species in input and output."
)
g_theta_geo = raw_data["geo"][ntheta_grid : 2 * ntheta_grid]
bmag = raw_data["geo"][2 * ntheta_grid : 3 * ntheta_grid]
csf = gk_input.data.get("CONSTANT_STREAM_FLAG", 1)
if csf == 1:
dtheta = theta_grid[1] - theta_grid[0]
dtheta_eq = 2 * np.pi / ntheta_grid
g_theta_geo = dtheta * g_theta_geo[0] / dtheta_eq
w_theta = g_theta_geo / bmag
w_theta = w_theta / sum(w_theta)
nradial = int(gk_input.data["N_RADIAL"])
# Construct on ballooning space grid
if len(ky) == 1:
w_theta = np.tile(w_theta, nradial)
full_ky = ky
full_kx = kx
full_theta = theta
full_time = time
if downsample:
ky = ky[downsample.get("ky_idx", slice(None))]
kx = kx[downsample.get("kx_idx", slice(None))]
theta = theta[downsample.get("theta_idx", slice(None))]
time = time[downsample.get("time_idx", slice(None))]
local_geometry = gk_input.get_local_geometry()
metric_terms = MetricTerms(local_geometry, ntheta=ntheta_grid * 4)
theta_mod = np.mod(theta, 2 * np.pi)
Jacobian = np.interp(
theta_mod,
metric_terms.regulartheta,
metric_terms.Jacobian,
period=2 * np.pi,
)
# Store grid data as xarray DataSet
return {
"time": time,
"kx": kx,
"ky": ky,
"theta": theta,
"full_time": full_time,
"full_kx": full_kx,
"full_ky": full_ky,
"full_theta": full_theta,
"energy": energy,
"pitch": pitch,
"ntheta_plot": ntheta_plot,
"ntheta_grid": ntheta_grid,
"nradial": nradial,
"rho_star": rho_star,
"field": fields,
"moment": moments,
"flux": fluxes,
"species": species,
"linear": gk_input.is_linear(),
"w_theta": w_theta,
"jacobian": Jacobian,
}
@staticmethod
def _get_fields(
raw_data: Dict[str, Any],
gk_input: GKInputCGYRO,
coords: Dict[str, Any],
downsample: Dict[str, Any],
) -> Dict[str, np.ndarray]:
"""
Sets 3D fields over time.
The field coordinates should be (field, theta, kx, ky, time)
"""
nkx = len(coords["kx"])
nradial = coords["nradial"]
nky = len(coords["ky"])
ntheta = len(coords["theta"])
full_nkx = len(coords["full_kx"])
full_ntime = len(coords["full_time"])
full_nky = len(coords["full_ky"])
full_ntheta = len(coords["full_theta"])
ntheta_plot = coords["ntheta_plot"]
ntheta_grid = coords["ntheta_grid"]
ntime = len(coords["time"])
nfield = len(coords["field"])
field_names = ["phi", "apar", "bpar"][:nfield]
def _field_downsample(
memmap, kx_idx=None, ky_idx=None, theta_idx=None, time_idx=None, **kwargs
):
"""
Extract a subset of a 4D memmap array with custom indices for each dimension.
Parameters
----------
memmap_arr : np.memmap
The 4D memmap array with shape (kx, ky, theta, time).
kx_idx, ky_idx, theta_idx, time_idx : slice, int, list, or ndarray, optional
Indices for each dimension. Can be:
- None (means ':')
- int
- slice (e.g., slice(0, 10, 2))
- list or np.ndarray of indices
Returns
-------
np.ndarray
A normal NumPy array with the requested subset.
(This will load the selected data into memory.)
"""
# replace None with full slices
kx_idx = slice(None) if kx_idx is None else kx_idx
ky_idx = slice(None) if ky_idx is None else ky_idx
theta_idx = slice(None) if theta_idx is None else theta_idx
time_idx = slice(None) if time_idx is None else time_idx
# fancy indexing happens here
return memmap[:, kx_idx, theta_idx, ky_idx, time_idx]
raw_field_data = {f: raw_data.get(f"field_{f}", None) for f in field_names}
results = {}
# Check to see if there's anything to do
if not raw_field_data:
return results
# Loop through all fields and add field in if it exists
for ifield, (field_name, raw_field) in enumerate(raw_field_data.items()):
if raw_field is None:
logging.warning(
f"Field data {field_name} over time not found, expected the file "
f"bin.cgyro.kxky_{field_name} to exist. Setting this field to 0."
)
continue
# If linear, convert from kx to ballooning space.
# Use nradial instead of nkx, ntheta_plot instead of ntheta
if gk_input.is_linear() and nky == 1:
shape = (2, nradial, ntheta_plot, full_nky, full_ntime)
else:
shape = (2, full_nkx, full_ntheta, full_nky, full_ntime)
field_data = raw_field.reshape(shape, order="F")
field_data = _field_downsample(field_data, **downsample).astype(np.float64)
# Adjust sign to match pyrokinetics frequency convention
# (-ve is electron direction)
bt_ccw = gk_input.data.get("BTCCW", -1)
ip_ccw = gk_input.data.get("IPCCW", -1)
mode_sign = int(
np.sign(np.sign(gk_input.data.get("S", 1.0)) * bt_ccw * ip_ccw)
)
field_data = (field_data[0] + 1j * field_data[1]) / coords["rho_star"]
# If nonlinear, we can simply save the fields and continue
if gk_input.is_nonlinear() or nky != 1:
fields = field_data.swapaxes(0, 1)
else:
# If theta_plot != theta_grid, we get eigenfunction data and multiply by the
# field amplitude
if ntheta_plot != ntheta_grid:
# Get eigenfunction data
raw_eig_data = raw_data.get(f"eigenfunctions_{field_name}", None)
if raw_eig_data is None:
logging.warning(
f"When setting fields, eigenfunction data for {field_name} not "
f"found, expected the file bin.cgyro.{field_name}b to exist. "
f"Not setting the field {field_name}."
)
continue
eig_shape = [2, ntheta, full_ntime]
eig_data = raw_eig_data[: np.prod(eig_shape)].reshape(
eig_shape, order="F"
)
eig_data = eig_data[0] + 1j * eig_data[1]
# Get field amplitude
middle_kx = (nradial // 2) + 1
field_amplitude = np.abs(field_data[middle_kx, 0, 0, :])
# Multiply together
# FIXME We only set kx=ky=0 here, any other values are left undefined
# as fields is created using np.empty. Should we instead set
# all kx and ky to these values? Should we expect that nx=ny=1?
field_data = np.reshape(
eig_data * field_amplitude,
(nradial, ntheta_grid, nky, ntime),
)
# Poisson Sum (no negative in exponent to match frequency convention)
q = gk_input.get_local_geometry_miller().q
nx0 = gk_input.data.get("PX0", 0.0)
for i_radial in range(nradial):
nx = -nradial // 2 + (i_radial - 1)
field_data[i_radial, ...] *= np.exp(
-2j * pi * (nx + nx0) * np.abs(q)
)
if mode_sign == -1:
field_data = field_data[:, ::-1, :, :]
fields = field_data.reshape([ntheta, nkx, nky, ntime])
fields = fields[::-1, :, :, :]
else:
fields = field_data.reshape([ntheta, nkx, nky, ntime])
if ip_ccw == -1:
fields = np.conj(fields)
results[field_name] = fields
return results
@staticmethod
def _get_moments(
raw_data: Dict[str, Any],
gk_input: GKInputCGYRO,
coords: Dict[str, Any],
downsample: Dict,
) -> Dict[str, np.ndarray]:
"""
Sets 3D moments over time.
The moment coordinates should be (moment, theta, kx, species, ky, time)
"""
moment_names = {"n": "density", "e": "temperature", "v": "velocity"}
nkx = len(coords["kx"])
nradial = coords["nradial"]
nky = len(coords["ky"])
ntheta = len(coords["theta"])
ntheta_plot = coords["ntheta_plot"]
ntime = len(coords["time"])
nspec = len(coords["species"])
full_nkx = len(coords["full_kx"])
full_ntime = len(coords["full_time"])
full_nky = len(coords["full_ky"])
full_ntheta = len(coords["full_theta"])
raw_moment_data = {
value: raw_data.get(f"moment_{key}", None)
for key, value in moment_names.items()
}
results = {}
# Check to see if there's anything to do
if not raw_moment_data:
return results
def _moment_downsample(
memmap, kx_idx=None, ky_idx=None, theta_idx=None, time_idx=None, **kwargs
):
"""
Extract a subset of a 4D memmap array with custom indices for each dimension.
Parameters
----------
memmap_arr : np.memmap
The 4D memmap array with shape (kx, theta, species, ky, time).
kx_idx, ky_idx, theta_idx, time_idx : slice, int, list, or ndarray, optional
Indices for each dimension. Can be:
- None (means ':')
- int
- slice (e.g., slice(0, 10, 2))
- list or np.ndarray of indices
Returns
-------
np.ndarray
A normal NumPy array with the requested subset.
(This will load the selected data into memory.)
"""
# replace None with full slices
kx_idx = slice(None) if kx_idx is None else kx_idx
ky_idx = slice(None) if ky_idx is None else ky_idx
theta_idx = slice(None) if theta_idx is None else theta_idx
time_idx = slice(None) if time_idx is None else time_idx
# fancy indexing happens here
return memmap[:, kx_idx, theta_idx, :, ky_idx, time_idx]
# Loop through all moments and add moment in if it exists
for imoment, (moment_name, raw_moment) in enumerate(raw_moment_data.items()):
if raw_moment is None:
logging.warning(
f"moment data {moment_name} over time not found, expected the file "
f"bin.cgyro.kxky_{moment_name} to exist. Setting this moment to 0."
)
continue
# If linear, convert from kx to ballooning space.
# Use nradial instead of nkx, ntheta_plot instead of ntheta
if gk_input.is_linear() and nky == 1:
shape = (2, nradial, ntheta_plot, nspec, nky, full_ntime)
else:
shape = (2, full_nkx, full_ntheta, nspec, full_nky, full_ntime)
moment_data = raw_moment.reshape(shape, order="F")
moment_data = _moment_downsample(moment_data, **downsample).astype(
np.float64
)
# Adjust sign to match pyrokinetics frequency convention
# (-ve is electron direction)
bt_ccw = gk_input.data.get("BTCCW", -1)
ip_ccw = gk_input.data.get("IPCCW", -1)
mode_sign = int(
np.sign(np.sign(gk_input.data.get("S", 1.0)) * bt_ccw * ip_ccw)
)
moment_data = (moment_data[0] + 1j * moment_data[1]) / coords["rho_star"]
# If nonlinear, we can simply save the moments and continue
if gk_input.is_nonlinear() or nky != 1:
moments = moment_data.swapaxes(0, 1)
else:
# Poisson Sum (no negative in exponent to match frequency convention)
q = gk_input.get_local_geometry_miller().q
nx0 = gk_input.data.get("PX0", 0.0)
for i_radial in range(nradial):
nx = -nradial // 2 + (i_radial - 1)
moment_data[i_radial, ...] *= np.exp(
-2j * pi * (nx + nx0) * np.abs(q)
)
if mode_sign == -1:
moment_data = moment_data[:, ::-1, ...]
moments = moment_data.reshape([ntheta, nkx, nspec, nky, ntime])
moments = moments[::-1, :, :, :]
else:
moments = moment_data.reshape([ntheta, nkx, nspec, nky, ntime])
if ip_ccw == -1:
moments = np.conj(moments)
results[moment_name] = moments
temp_spec = np.ones((ntheta, nkx, nspec, nky, ntime))
for i in range(nspec):
temp_spec[:, :, i, :, :] = gk_input.data.get(f"TEMP_{i + 1}", 1.0)
if "temperature" in results:
# Convert CGYRO energy fluctuation to temperature
results["temperature"] = (
2 * results["temperature"] - results["density"] * temp_spec
)
return results
@staticmethod
def _flux_downsample(memmap, ky_idx=None, time_idx=None, **kwargs):
"""
Extract a subset of a 4D memmap array with custom indices for each dimension.
Parameters
----------
memmap_arr : np.memmap
The 5D memmap array with shape (species, flux, field, ky, time).
ky_idx, time_idx : slice, int, list, or ndarray, optional
Indices for each dimension. Can be:
- None (means ':')
- int
- slice (e.g., slice(0, 10, 2))
- list or np.ndarray of indices
Returns
-------
np.ndarray
A normal NumPy array with the requested subset.
(This will load the selected data into memory.)
"""
# replace None with full slices
ky_idx = slice(None) if ky_idx is None else ky_idx
time_idx = slice(None) if time_idx is None else time_idx
# fancy indexing happens here
return memmap[:, :, :, ky_idx, time_idx]
@staticmethod
def _get_fluxes(
raw_data: Dict[str, Any],
gk_input: Dict,
coords: Dict,
downsample: Dict,
) -> Dict[str, np.ndarray]:
"""
Set flux data over time.
The flux coordinates should be (species, moment, field, ky, time)
"""
results = {}
# cflux is more appropriate for CGYRO simulations
# with GAMMA_E > 0 and SHEAR_METHOD = 2.
# However, for cross-code consistency, gflux is used for now.
# if gk_input.data.get("GAMMA_E", 0.0) == 0.0:
# flux_key = "flux"
# else:
# flux_key = "cflux"
flux_key = "flux"
if flux_key in raw_data:
coord_names = ["species", "flux", "field", "full_ky", "full_time"]
shape = [len(coords[coord_name]) for coord_name in coord_names]
raw_fluxes = raw_data[flux_key].reshape(shape, order="F")
fluxes = GKOutputReaderCGYRO._flux_downsample(raw_fluxes, **downsample)
fluxes = np.swapaxes(fluxes, 0, 2)
if gk_input.is_linear() and gk_input._legacy_cgyro:
flux_norm = (
2
* np.pi**1.5
* -np.sign(
gk_input.data.get("IPCCW", -1) * gk_input.data.get("BTCCW", -1)
)
)
elif gk_input.is_linear() and not gk_input._legacy_cgyro:
flux_norm = 2 * np.pi**1.5 * -np.sign(gk_input.data.get("IPCCW", -1))
else:
flux_norm = 1.0
for iflux, flux in enumerate(coords["flux"]):
results[flux] = (fluxes[:, iflux, :, :, :]) / flux_norm
return results
@classmethod
def _get_eigenvalues(
self, raw_data: Dict[str, Any], coords: Dict, gk_input: Optional[Any] = None
) -> Dict[str, np.ndarray]:
"""
Takes an xarray Dataset that has had coordinates and fields set.
Uses this to add eigenvalues:
data['eigenvalues'] = eigenvalues(kx, ky, time)
data['mode_frequency'] = mode_frequency(kx, ky, time)
data['growth_rate'] = growth_rate(kx, ky, time)
This should be called after _set_fields, and is only valid for linear runs.
Unlike the version in the super() class, CGYRO may need to get extra info from
an eigenvalue file.
Args:
data (xr.Dataset): The dataset to be modified.
dirname (PathLike): Directory containing CGYRO output files.
Returns:
Dict: The modified dataset which was passed to 'data'.
"""
ntime = len(coords["time"])
nky = len(coords["ky"])
nkx = len(coords["kx"])
shape = (2, nky, ntime)
if "eigenvalues_bin" in raw_data:
eigenvalue_over_time = raw_data["eigenvalues_bin"][
: np.prod(shape)
].reshape(shape, order="F")
elif "eigenvalues_out" in raw_data:
eigenvalue_over_time = (
raw_data["eigenvalues_out"].transpose()[:, :ntime].reshape(shape)
)
else:
raise RuntimeError(
"Eigenvalues over time not found, expected the files bin.cgyro.freq or "
"out.cgyro.freq to exist. Could not set data_vars 'growth_rate', "
"'mode_frequency' and 'eigenvalue'."
)
mode_sign = -np.sign(
np.sign(gk_input.data.get("Q", 2.0)) * -gk_input.data.get("BTCCW", -1)
)
mode_frequency = mode_sign * eigenvalue_over_time[0, :, :]
growth_rate = eigenvalue_over_time[1, :, :]
# Add kx axis for compatibility with GS2 eigenvalues
# FIXME Is this appropriate? Should we drop the kx coordinate?
shape_with_kx = (nkx, nky, ntime)
mode_frequency = np.ones(shape_with_kx) * mode_frequency
growth_rate = np.ones(shape_with_kx) * growth_rate
result = {
"growth_rate": growth_rate,
"mode_frequency": mode_frequency,
}
return result
@staticmethod
def _get_eigenfunctions(raw_data: Dict[str, Any], coords: Dict) -> np.ndarray:
"""
Loads eigenfunctions into data with the following coordinates:
data['eigenfunctions'] = eigenfunctions(kx, ky, field, theta, time)
This should be called after _set_fields, and is only valid for linear runs.
"""
raw_eig_data = [
raw_data.get(f"eigenfunctions_{f}", None) for f in coords["field"]
]
ntime = len(coords["time"])
ntheta = len(coords["theta"])
nkx = len(coords["kx"])
nky = len(coords["ky"])
raw_shape = [2, ntheta, nkx, nky, ntime]
# FIXME Currently using kx and ky for compatibility with GS2 results, but
# these coordinates are not used. Should we remove these coordinates?
coord_names = ["field", "theta", "kx", "ky", "time"]
eigenfunctions = np.empty(
[len(coords[coord_name]) for coord_name in coord_names], dtype=complex
)
# Loop through all fields and add eigenfunction if it exists
for ifield, raw_eigenfunction in enumerate(raw_eig_data):
if raw_eigenfunction is not None:
eigenfunction = raw_eigenfunction[: np.prod(raw_shape)].reshape(
raw_shape, order="F"
)
eigenfunctions[ifield, ...] = eigenfunction[0] + 1j * eigenfunction[1]
theta_star = np.argmax(abs(eigenfunctions[0, :, 0, 0, -1]), axis=0)
phi_theta_star = eigenfunctions[0, theta_star, 0, 0, -1]
phase = np.abs(phi_theta_star) / phi_theta_star
field_squared = np.sum(np.abs(eigenfunctions) ** 2, 0)
amplitude = np.sqrt(
trapezoid(field_squared, coords["theta"], axis=0) / (2 * np.pi)
)
result = eigenfunctions * phase / amplitude
return result
[docs]
@staticmethod
def load_momentum_components(pyro):
"""
Load additional momentum-flux outputs from `bin.cgyro.ky_flux_mom`. This
file can be generated by setting `MOMENTUM_PRINT_FLAG = 1` in the CGYRO
input file, support for which is available from CGYRO commit
534ebc786c83fa1f1ed8dc1ad15193c20d421f72
The returned data has shape
`(field, momentum_component, species, ky, time)`
where `momentum_component` contains:
- `"parallel"`
Contribution from the parallel-flow and centrifugal terms.
- `"perpendicular"`
Contribution from perpendicular flows.
- `"electromagnetic"`
Contribution from electromagnetic-field terms in the momentum-transport
equation, related to the Maxwell stress.
Notes
-----
The sum of the `"parallel"` and `"perpendicular"` components reproduce
the standard momentum-flux outputs natively supported by Pyrokinetics.
"""
component_names = ["parallel", "perpendicular", "electromagnetic"]
# Check that the correct state exists
if pyro.gk_code != "CGYRO":
raise NotImplementedError(
"load_momentum_components() can only be called for CGYRO outputs."
)
if pyro.gk_output is None:
raise RuntimeError(
"pyro.load_gk_output() must be called before load_momentum_components()."
)
if "momentum" not in pyro.gk_output:
raise RuntimeError(
"Standard momentum-flux data must be in gk_output before calling "
"load_momentum_components()."
)
# Get output-file location
if pyro.gk_output_file is not None:
dirname = Path(pyro.gk_output_file)
else:
dirname = Path(pyro.gk_file).parent
filename = dirname / "bin.cgyro.ky_flux_mom"
if not filename.exists():
raise FileNotFoundError(f"Unable to locate {filename}")
# Setup data output shape
coords = pyro.gk_output._raw_coords
nspecies = len(coords["species"])
nfield = len(coords["field"])
nky = len(coords["full_ky"])
ntime = len(coords["full_time"])
ncomponent = len(component_names)
shape = (nspecies, ncomponent, nfield, nky, ntime)
# Load raw data
raw_data = np.asarray(np.memmap(filename, dtype=np.float32, mode="r"))
mflux = raw_data.reshape(shape, order="F")
mflux = GKOutputReaderCGYRO._flux_downsample(
mflux, **pyro.gk_output._downsample
)
mflux = np.swapaxes(mflux, 0, 2)
# Bootstrap normalisations from standard momentum-flux data
reference = pyro.gk_output["momentum"]
raw_sum = mflux[:, :2, ...].sum(axis=1)
scale = reference.pint.dequantify().data / raw_sum
mflux = mflux * scale[:, None, ...]
# Add data to gk_output object
component_dim = "component"
pyro.gk_output.data = pyro.gk_output.data.assign_coords(
{component_dim: (component_dim, component_names)}
)
pyro.gk_output.add_data(
name="momentum_components",
data=mflux,
coords=("field", component_dim, "species", "ky", "time"),
units=reference.pint.units,
output_convention=pyro.gk_output.norm.default_convention.name,
)