Source code for pyrokinetics.gk_code.cgyro

import logging
from ast import literal_eval
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import numpy as np
from cleverdict import CleverDict
from scipy.integrate import trapezoid

from ..constants import pi
from ..file_utils import FileReader
from ..local_geometry import (
    LocalGeometry,
    LocalGeometryFourierCGYRO,
    LocalGeometryMiller,
    LocalGeometryMXH,
    MetricTerms,
    default_fourier_cgyro_inputs,
    default_miller_inputs,
    default_mxh_inputs,
)
from ..local_species import LocalSpecies
from ..normalisation import SimulationNormalisation as Normalisation
from ..normalisation import convert_dict
from ..numerics import Numerics
from ..templates import gk_templates
from ..typing import PathLike
from .gk_input import GKInput
from .gk_output import (
    Coords,
    Eigenfunctions,
    Eigenvalues,
    Fields,
    Fluxes,
    GKOutput,
    Moments,
)


[docs] class GKInputCGYRO(GKInput, FileReader, file_type="CGYRO", reads=GKInput): """ Class that can read CGYRO input files, and produce Numerics, LocalSpecies, and LocalGeometry objects """ code_name = "CGYRO" default_file_name = "input.cgyro" norm_convention = "cgyro" _convention_dict = {} _legacy_cgyro = True pyro_cgyro_miller = { "rho": "RMIN", "Rmaj": "RMAJ", "q": "Q", "kappa": "KAPPA", "s_kappa": "S_KAPPA", "delta": "DELTA", "s_delta": "S_DELTA", "shat": "S", "shift": "SHIFT", "Z0": "ZMAG", "dZ0dr": "DZMAG", "ip_ccw": "IPCCW", "bt_ccw": "BTCCW", } pyro_cgyro_mxh = { **pyro_cgyro_miller, "zeta": "ZETA", "s_zeta": "S_ZETA", "cn0": "SHAPE_COS0", "cn1": "SHAPE_COS1", "cn2": "SHAPE_COS2", "cn3": "SHAPE_COS3", "cn4": "SHAPE_COS4", "cn5": "SHAPE_COS5", "cn6": "SHAPE_COS6", "sn3": "SHAPE_SIN3", "sn4": "SHAPE_SIN4", "sn5": "SHAPE_SIN5", "sn6": "SHAPE_SIN6", "dcndr0": "SHAPE_S_COS0", "dcndr1": "SHAPE_S_COS1", "dcndr2": "SHAPE_S_COS2", "dcndr3": "SHAPE_S_COS3", "dcndr4": "SHAPE_S_COS4", "dcndr5": "SHAPE_S_COS5", "dcndr6": "SHAPE_S_COS6", "dsndr3": "SHAPE_S_SIN3", "dsndr4": "SHAPE_S_SIN4", "dsndr5": "SHAPE_S_SIN5", "dsndr6": "SHAPE_S_SIN6", } pyro_cgyro_miller_defaults = { "rho": 0.5, "Rmaj": 3.0, "q": 2.0, "kappa": 1.0, "s_kappa": 0.0, "delta": 0.0, "s_delta": 0.0, "shat": 1.0, "shift": 0.0, "Z0": 0.0, "dZ0dr": 0.0, "ip_ccw": -1.0, "bt_ccw": -1.0, } pyro_cgyro_mxh_defaults = { **pyro_cgyro_miller_defaults, "zeta": 0.0, "s_zeta": 0.0, "cn0": 0.0, "cn1": 0.0, "cn2": 0.0, "cn3": 0.0, "cn4": 0.0, "cn5": 0.0, "cn6": 0.0, "sn3": 0.0, "sn4": 0.0, "sn5": 0.0, "sn6": 0.0, "dcndr0": 0.0, "dcndr1": 0.0, "dcndr2": 0.0, "dcndr3": 0.0, "dcndr4": 0.0, "dcndr5": 0.0, "dcndr6": 0.0, "dsndr3": 0.0, "dsndr4": 0.0, "dsndr5": 0.0, "dsndr6": 0.0, } pyro_cgyro_fourier = pyro_cgyro_miller pyro_cgyro_fourier_defaults = pyro_cgyro_miller_defaults
[docs] @staticmethod def get_pyro_cgyro_species(iSp=1): return { "mass": f"MASS_{iSp}", "z": f"Z_{iSp}", "dens": f"DENS_{iSp}", "temp": f"TEMP_{iSp}", "inverse_lt": f"DLNTDR_{iSp}", "inverse_ln": f"DLNNDR_{iSp}", }
cgyro_eq_types = { 1: "SAlpha", 2: "MXH", 3: "Fourier", }
[docs] def set_legacy_cgyro(self, flag: bool): """ Set dictionay flags used to access namelist to legacy values or not """ self._legacy_cgyro = flag
[docs] def read_from_file( self, filename: PathLike, detect_norm: bool = True ) -> Dict[str, Any]: """ Reads CGYRO input file into a dictionary """ with open(filename) as f: data_dict = self.parse_cgyro(f) return super().read_dict(data_dict, detect_norm=detect_norm)
[docs] def read_str(self, input_string: str, detect_norm: bool = True) -> Dict[str, Any]: """ Reads CGYRO input file given as string """ data_dict = self.parse_cgyro(input_string.split("\n")) return super().read_dict(data_dict, detect_norm=detect_norm)
[docs] def read_dict(self, input_dict: dict, detect_norm: bool = True) -> Dict[str, Any]: """ Reads GENE input file given as dict Uses default read_dict, which assumes input is a dict """ return super().read_dict(input_dict, detect_norm=detect_norm)
[docs] @staticmethod def parse_cgyro(lines): """ Given lines of a cgyro file or a string split by '/n', return a dict of CGYRO input data """ results = {} for line in lines: # Get line before comments, remove trailing whitespace line = line.split("#")[0].strip() # Skip empty lines (this will also skip comment lines) if not line: continue # Splits by =, remove whitespace, store as (key,value) pair key, value = (token.strip() for token in line.split("=")) # Use literal_eval to convert value to int/float/list etc # If it fails, assume value should be a string try: results[key] = literal_eval(value) except Exception: results[key] = value return results
[docs] def verify_file_type(self, filename: PathLike): """ Ensure this file is a valid cgyro input file, and that it contains sufficient info for Pyrokinetics to work with """ # The following keys are not strictly needed for a CGYRO input file, # but they are needed by Pyrokinetics expected_keys = [ "BETAE_UNIT", "N_SPECIES", "NU_EE", "N_FIELD", "N_RADIAL", "RMIN", "RMAJ", "Q", "S", ] self.verify_expected_keys(filename, expected_keys)
[docs] def write( self, filename: PathLike, float_format: str = "", local_norm: Normalisation = None, code_normalisation: str = None, ): # Create directories if they don't exist already filename = Path(filename) filename.parent.mkdir(parents=True, exist_ok=True) if local_norm is None: local_norm = Normalisation("write") if code_normalisation is None: code_normalisation = self.code_name.lower() convention = getattr(local_norm, code_normalisation) self.data = convert_dict(self.data, convention) with open(filename, "w") as f: for key, value in self.data.items(): if isinstance(value, float): line = f"{key} = {value:{float_format}}\n" else: line = f"{key} = {value}\n" f.write(line)
[docs] def is_nonlinear(self) -> bool: return bool(self.data.get("NONLINEAR_FLAG", 0))
[docs] def add_flags(self, flags) -> None: """ Add extra flags to CGYRO input file """ for key, value in flags.items(): self.data[key] = value
[docs] def get_local_geometry(self) -> LocalGeometry: """ Returns local geometry. Delegates to more specific functions """ if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_local_geometry") convention = getattr(norms, self.norm_convention) eq_type = self.cgyro_eq_types[self.data["EQUILIBRIUM_MODEL"]] is_basic_miller = self._check_basic_miller() if eq_type == "MXH" and is_basic_miller: eq_type = "Miller" if eq_type == "Miller": local_geometry = self.get_local_geometry_miller() elif eq_type == "MXH": local_geometry = self.get_local_geometry_mxh() elif eq_type == "Fourier": local_geometry = self.get_local_geometry_fourier() else: raise NotImplementedError( f"LocalGeometry type {eq_type} not implemented for CGYRO" ) local_geometry.B0 = 1.0 / local_geometry.bunit_over_b0 local_geometry.dpsidr *= local_geometry.B0 local_geometry.normalise(norms=convention) local_geometry.Fpsi = local_geometry.get_f_psi() local_geometry.FF_prime = local_geometry.get_f_prime() * local_geometry.Fpsi return local_geometry
[docs] def get_local_geometry_miller(self) -> LocalGeometryMiller: """ Load Miller object from CGYRO file """ miller_data = default_miller_inputs() for (key, val), val_default in zip( self.pyro_cgyro_miller.items(), self.pyro_cgyro_miller_defaults.values(), ): miller_data[key] = self.data.get(val, val_default) miller_data["s_delta"] *= 1.0 / np.sqrt(1 - miller_data["delta"] ** 2) # Assume ne * Te*8pi*1e-7 = 1.0 ( ne, Te, ) = self.get_ne_te_normalisation() beta = self.data.get("BETAE_UNIT", 0.0) / (ne * Te) # Need species to set up beta_prime local_species = self.get_local_species() beta_prime_scale = self.data.get("BETA_STAR_SCALE", 1.0) miller_data["beta_prime"] = ( -local_species.inverse_lp.m * local_species.pressure.m * beta_prime_scale * beta ) miller = LocalGeometryMiller.from_gk_data(miller_data) return miller
[docs] def get_local_geometry_mxh(self) -> LocalGeometryMXH: """ Load MXH object from CGYRO file """ mxh_data = default_mxh_inputs(n_moments=7) for (key, val), default in zip( self.pyro_cgyro_mxh.items(), self.pyro_cgyro_mxh_defaults.values() ): if "SHAPE" not in val: mxh_data[key] = self.data.get(val, default) else: index = int(key[-1]) new_key = key[:-1] if "SHAPE_S" in val: mxh_data[new_key][index] = ( self.data.get(val, default) / mxh_data["rho"] ) else: mxh_data[new_key][index] = self.data.get(val, default) mxh_keys = ["cn", "sn", "dcndr", "dsndr"] for i_moment in range(6, 2, -1): if np.all( [True if mxh_data[key][i_moment] == 0.0 else False for key in mxh_keys] ): for key in mxh_keys: mxh_data[key] = mxh_data[key][:-1] else: break # Force dsndr[0] = 0 as is definition mxh_data["dsndr"][0] = 0.0 mxh_data["n_moments"] = len(mxh_data["cn"]) # Assume ne * Te *8pi*1e-7 = 1.0 ( ne, Te, ) = self.get_ne_te_normalisation() beta = self.data.get("BETAE_UNIT", 0.0) / (ne * Te) # Need species to set up beta_prime local_species = self.get_local_species() beta_prime_scale = self.data.get("BETA_STAR_SCALE", 1.0) mxh_data["beta_prime"] = ( -local_species.inverse_lp.m * local_species.pressure.m * beta_prime_scale * beta ) mxh = LocalGeometryMXH.from_gk_data(mxh_data) mxh.dthetaR_dr = mxh.get_dthetaR_dr(mxh.theta, mxh.dcndr, mxh.dsndr) return mxh
[docs] def get_local_geometry_fourier(self) -> LocalGeometryFourierCGYRO: """ Load Fourier object from CGYRO file """ fourier_data = default_fourier_cgyro_inputs() for (key, val), val_default in zip( self.pyro_cgyro_fourier.items(), self.pyro_cgyro_fourier_defaults.values() ): fourier_data[key] = self.data.get(val, val_default) # Assume pref*8pi*1e-7 = 1.0 ( ne, Te, ) = self.get_ne_te_normalisation() beta = self.data.get("BETAE_UNIT", 0.0) / (ne * Te) # Need species to set up beta_prime local_species = self.get_local_species() beta_prime_scale = self.data.get("BETA_STAR_SCALE", 1.0) fourier_data["beta_prime"] = ( -local_species.inverse_lp.m * local_species.pressure.m * beta_prime_scale * beta ) fourier = LocalGeometryFourierCGYRO.from_gk_data(fourier_data) return fourier
[docs] def get_local_species(self): """ Load LocalSpecies object from CGYRO file """ # Dictionary of local species parameters local_species = LocalSpecies() ion_count = 0 if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_local_species") convention = getattr(norms, self.norm_convention) # Load each species into a dictionary for i_sp in range(self.data["N_SPECIES"]): pyro_cgyro_species = self.get_pyro_cgyro_species(i_sp + 1) species_data = CleverDict() for p_key, c_key in pyro_cgyro_species.items(): species_data[p_key] = self.data[c_key] species_data.omega0 = self.data.get("MACH", 0.0) / self.data["RMAJ"] species_data.domega_drho = ( -self.data.get("GAMMA_P", 0.0) / self.data["RMAJ"] ) if species_data.z == -1: name = "electron" species_data.nu = ( self.data.get("NU_EE", 0.1) * convention.vref / convention.lref ) else: ion_count += 1 name = f"ion{ion_count}" species_data.name = name # normalisations species_data.dens *= convention.nref species_data.mass *= convention.mref species_data.temp *= convention.tref species_data.z *= convention.qref species_data.inverse_lt *= convention.lref**-1 species_data.inverse_ln *= convention.lref**-1 species_data.omega0 *= convention.vref / convention.lref species_data.domega_drho *= convention.vref / convention.lref**2 # Add individual species data to dictionary of species local_species.add_species(name=name, species_data=species_data) # Handle adiabatic species if there if self.data.get("AE_FLAG", 0) == 1: nu_ee = self.data.get("NU_EE", 0.1) * convention.vref / convention.lref te = self.data.get("TEMP_AE", 1.0) * convention.tref ne = self.data.get("DENS_AE", 1.0) * convention.nref me = self.data.get("MASS_AE", 2.724486e-4) * convention.mref else: nu_ee = local_species.electron.nu te = local_species.electron.temp ne = local_species.electron.dens me = local_species.electron.mass # Get collision frequency of ion species for ion in range(ion_count): key = f"ion{ion + 1}" nion = local_species[key]["dens"] tion = local_species[key]["temp"] mion = local_species[key]["mass"] zion = local_species[key]["z"] # Not exact at log(Lambda) does change but pretty close... local_species[key]["nu"] = ( nu_ee * (zion**4 * nion / tion**1.5 / mion**0.5) / (ne / te**1.5 / me**0.5) ).m * nu_ee.units # Normalise to pyrokinetics normalisations and calculate total pressure gradient local_species.normalise(convention) if self._legacy_cgyro: if self.data.get("Z_EFF_METHOD", 2) == 2: local_species.set_zeff() else: local_species.zeff = self.data.get("Z_EFF", 1.0) * convention.qref else: local_species.set_zeff() return local_species
[docs] def get_numerics(self) -> Numerics: """Gather numerical info (grid spacing, time steps, etc)""" if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_numerics") convention = getattr(norms, self.norm_convention) numerics_data = {} nfields = self.data.get("N_FIELD", 1) numerics_data["phi"] = nfields >= 1 numerics_data["apar"] = nfields >= 2 numerics_data["bpar"] = nfields >= 3 numerics_data["delta_time"] = self.data.get("DELTA_T", 0.01) numerics_data["max_time"] = self.data.get("MAX_TIME", 1.0) numerics_data["ky"] = ( self.data["KY"] / self.get_local_geometry().bunit_over_b0.m ) numerics_data["nky"] = self.data.get("N_TOROIDAL", 1) numerics_data["theta0"] = 2 * pi * self.data.get("PX0", 0.0) numerics_data["nkx"] = self.data.get("N_RADIAL", 1) if not self.is_nonlinear(): numerics_data["nperiod"] = int(self.data["N_RADIAL"] / 2) else: numerics_data["nperiod"] = 1 shat = self.data[self.pyro_cgyro_miller["shat"]] box_size = self.data.get("BOX_SIZE", 1) if numerics_data["nky"] == 1: numerics_data["kx"] = numerics_data["ky"] * shat * numerics_data["theta0"] else: numerics_data["kx"] = numerics_data["ky"] * 2 * pi * shat / box_size numerics_data["ntheta"] = self.data.get("N_THETA", 24) numerics_data["nenergy"] = self.data.get("N_ENERGY", 8) numerics_data["npitch"] = self.data.get("N_XI", 16) numerics_data["nonlinear"] = self.is_nonlinear() ( ne, Te, ) = self.get_ne_te_normalisation() numerics_data["beta"] = self.data.get("BETAE_UNIT", 0.0) / (ne * Te) numerics_data["gamma_exb"] = self.data.get("GAMMA_E", 0.0) return Numerics(**numerics_data).with_units(convention)
[docs] def get_reference_values(self, local_norm: Normalisation) -> Dict[str, Any]: """ Reads in reference values from input file """ return {}
def _detect_normalisation(self): """ Determines the necessary inputs and passes information to the base method _set_up_normalisation. The following values are needed default_references: dict Dictionary containing default reference values for the gk_code: str GK code electron_density: float Electron density from GK input electron_temperature: float Electron density from GK input e_mass: float Electron mass from GK input electron_index: int Index of electron in list of data found_electron: bool Flag on whether electron was found densities: ArrayLike List of species densities temperatures: ArrayLike List of species temperature reference_density_index: ArrayLike List of indices where the species has a density of 1.0 reference_temperature_index: ArrayLike List of indices where the species has a temperature of 1.0 major_radius: float Normalised major radius from GK input rgeo_rmaj: float Ratio of Geometric and flux surface major radius minor_radius: float Normalised minor radius from GK input """ default_references = { "nref_species": "electron", "tref_species": "electron", "mref_species": "deuterium", "bref": "Bunit", "lref": "minor_radius", "ne": 1.0, "te": 1.0, "rgeo_rmaj": 1.0, "vref": "nrl", "rhoref": "unit", "raxis_rmaj": None, } reference_density_index = [] reference_temperature_index = [] densities = [] temperatures = [] masses = [] found_electron = False e_mass = None electron_temperature = None electron_density = None electron_index = None if self.data.get("AE_FLAG", 0) == 1: dens = self.data["DENS_AE"] temp = self.data["TEMP_AE"] mass = self.data["MASS_AE"] electron_density = dens electron_temperature = temp e_mass = mass electron_index = 0 found_electron = True if np.isclose(dens, 1.0): reference_density_index.append(0) if np.isclose(temp, 1.0): reference_temperature_index.append(0) densities.append(dens) temperatures.append(temp) masses.append(mass) else: for i_sp in range(self.data["N_SPECIES"]): dens = self.data[f"DENS_{i_sp + 1}"] temp = self.data[f"TEMP_{i_sp + 1}"] mass = self.data[f"MASS_{i_sp + 1}"] if self.data[f"Z_{i_sp + 1}"] == -1: electron_density = dens electron_temperature = temp e_mass = mass electron_index = i_sp found_electron = True if np.isclose(dens, 1.0): reference_density_index.append(i_sp) if np.isclose(temp, 1.0): reference_temperature_index.append(i_sp) densities.append(dens) temperatures.append(temp) masses.append(mass) major_radius = self.data["RMAJ"] minor_radius = 1.0 super()._set_up_normalisation( default_references=default_references, gk_code=self.code_name.lower(), electron_density=electron_density, electron_temperature=electron_temperature, e_mass=e_mass, electron_index=electron_index, found_electron=found_electron, densities=densities, temperatures=temperatures, reference_density_index=reference_density_index, reference_temperature_index=reference_temperature_index, major_radius=major_radius, rgeo_rmaj=1.0, minor_radius=minor_radius, )
[docs] def set( self, local_geometry: LocalGeometry, local_species: LocalSpecies, numerics: Numerics, local_norm: Optional[Normalisation] = None, template_file: Optional[PathLike] = None, code_normalisation: Optional[str] = None, **kwargs, ): """ Set self.data using LocalGeometry, LocalSpecies, and Numerics. These may be obtained via another GKInput file, or from Equilibrium/Kinetics objects. """ # If self.data is not already populated, fill in defaults from a given # template file. If this is not provided by the user, fall back to the # default. if self.data is None: if template_file is None: template_file = gk_templates["CGYRO"] self.read_from_file(template_file) if local_norm is None: local_norm = Normalisation("set") if code_normalisation is None: code_normalisation = self.norm_convention convention = getattr(local_norm, code_normalisation) # Geometry data if isinstance(local_geometry, LocalGeometryMXH): eq_model = 2 eq_type = "MXH" elif isinstance(local_geometry, LocalGeometryMiller): eq_model = 2 eq_type = "Miller" elif isinstance(local_geometry, LocalGeometryFourierCGYRO): eq_model = 3 eq_type = "Fourier" raise NotImplementedError( f"LocalGeometry type {local_geometry.__class__.__name__} not " "implemented yet for CGYRO" ) else: raise NotImplementedError( f"LocalGeometry type {local_geometry.__class__.__name__} not " "implemented for CGYRO" ) # Set equilibrium type in input file self.data["EQUILIBRIUM_MODEL"] = eq_model if eq_type == "Miller": # Assign Miller values to input file for key, val in self.pyro_cgyro_miller.items(): self.data[val] = local_geometry[key] self.data["S_DELTA"] = local_geometry.s_delta * np.sqrt( 1 - local_geometry.delta**2 ) # Need to remove any MXH keys for mxh_key in self.pyro_cgyro_mxh.keys(): if ( mxh_key not in self.pyro_cgyro_miller.keys() and mxh_key.upper() in self.data.keys() ): self.data.pop(mxh_key.upper()) elif eq_type == "Fourier": # Assign Fourier values to input file for key, val in self.pyro_cgyro_fourier.items(): self.data[val] = local_geometry[key] elif eq_type == "MXH": # Assign MXH values to input file for key, val in self.pyro_cgyro_mxh.items(): if "SHAPE" not in val: self.data[val] = getattr(local_geometry, key) else: index = int(key[-1]) new_key = key[:-1] # Skip in index beyond n_moments if index >= local_geometry.n_moments: continue if "SHAPE_S" in val: self.data[val] = ( getattr(local_geometry, new_key)[index] * local_geometry.rho ) else: self.data[val] = getattr(local_geometry, new_key)[index] # Kinetic data n_species = local_species.nspec self.data["N_SPECIES"] = n_species stored_species = len([key for key in self.data.keys() if "DENS_" in key]) extra_species = stored_species - n_species if extra_species > 0: for i_sp in range(extra_species): pyro_cgyro_species = self.get_pyro_cgyro_species(i_sp + 1 + n_species) for cgyro_key in pyro_cgyro_species.values(): if cgyro_key in self.data: self.data.pop(cgyro_key) for i_sp, name in enumerate(local_species.names): pyro_cgyro_species = self.get_pyro_cgyro_species(i_sp + 1) for pyro_key, cgyro_key in pyro_cgyro_species.items(): self.data[cgyro_key] = local_species[name][pyro_key] if not self._legacy_cgyro: if "Z_EFF" in self.data: self.data.pop("Z_EFF") if "Z_EFF_METHOD" in self.data: self.data.pop("Z_EFF_METHOD") else: self.data["Z_EFF_METHOD"] = 1 self.data["Z_EFF"] = local_species.zeff if "electron" in local_species.names: first_species = "electron" self.data["NU_EE"] = local_species.electron.nu else: first_species = local_species.names[0] zion = local_species[first_species].z nion = local_species[first_species].dens tion = local_species[first_species].temp mion = local_species[first_species].mass te = self.data.get("TEMP_AE", 1.0) * convention.tref ne = self.data.get("DENS_AE", 1.0) * convention.nref me = self.data.get("MASS_AE", 2.724486e-4) * convention.mref self.data["NU_EE"] = ( local_species[first_species].nu / (zion**4 * nion / tion**1.5 / mion**0.5) * (ne / te**1.5 / me**0.5) ) # Set adiabatic flags self.data["AE_FLAG"] = 1 self.data["DENS_AE"] = ne self.data["TEMP_AE"] = te self.data["MASS_AE"] = me self.data["MACH"] = local_species[first_species].omega0 * self.data["RMAJ"] self.data["GAMMA_P"] = ( -local_species[first_species].domega_drho * self.data["RMAJ"] ) beta_ref = convention.beta if local_norm else 0.0 beta = numerics.beta if numerics.beta is not None else beta_ref # Calculate beta_prime_scale if beta != 0.0: beta_prime_scale = -local_geometry.beta_prime.to(convention) / ( local_species.inverse_lp.to(convention) * local_species.pressure.to(convention).m * beta.to(convention) ) else: beta_prime_scale = 1.0 # CGYRO beta is ALWAYS ne*Te/Bunit^2 regardless of existing nref and Tref original_convention = getattr(local_norm, self.norm_convention) ne = (1 * original_convention.nref).to(local_norm.cgyro) Te = (1 * original_convention.tref).to(local_norm.cgyro) self.data["BETAE_UNIT"] = beta * ne * Te self.data["BETA_STAR_SCALE"] = beta_prime_scale # Numerics if numerics.bpar and not numerics.apar: raise ValueError("Can't have bpar without apar in CGYRO") self.data["N_FIELD"] = 1 + int(numerics.bpar) + int(numerics.apar) # Set time stepping self.data["DELTA_T"] = numerics.delta_time self.data["MAX_TIME"] = numerics.max_time if numerics.nonlinear: self.data["NONLINEAR_FLAG"] = 1 self.data["N_RADIAL"] = numerics.nkx if numerics.kx == 0.0: self.data["BOX_SIZE"] = 1 else: self.data["BOX_SIZE"] = int( (numerics.ky * 2 * pi * local_geometry.shat / numerics.kx) + 0.1 ) else: self.data["NONLINEAR_FLAG"] = 0 self.data["N_RADIAL"] = numerics.nperiod * 2 self.data["BOX_SIZE"] = 1 self.data["KY"] = numerics.ky * local_geometry.bunit_over_b0.m self.data["N_TOROIDAL"] = numerics.nky self.data["N_THETA"] = numerics.ntheta self.data["THETA_PLOT"] = numerics.ntheta self.data["PX0"] = numerics.theta0 / (2 * pi) self.data["GAMMA_E"] = numerics.gamma_exb self.data["N_ENERGY"] = numerics.nenergy self.data["N_XI"] = numerics.npitch if not local_norm: return self.data = convert_dict(self.data, convention)
def _check_basic_miller(self): """ Checks if CGYRO input file is a basic Miller geometry by seeing if moments that are higher than triangularity are 0 Returns ------- is_basic_miller: Boolean True if Miller, False is MXH """ mxh_only_parameters = [ "ZETA", "S_ZETA", "SHAPE_COS0", "SHAPE_COS1", "SHAPE_COS2", "SHAPE_COS3", "SHAPE_SIN3", "SHAPE_S_COS0", "SHAPE_S_COS1", "SHAPE_S_COS2", "SHAPE_S_COS3", "SHAPE_S_SIN3", ] is_basic_miller = True for param in mxh_only_parameters: if self.data.get(param, 0.0) != 0: is_basic_miller = False break return is_basic_miller
[docs] def get_ne_te_normalisation(self): found_electron = False if self.data.get("AE_FLAG", 0) == 1: ne = self.data["DENS_AE"] Te = self.data["TEMP_AE"] found_electron = True else: for i_sp in range(self.data["N_SPECIES"]): if self.data[f"Z_{i_sp + 1}"] == -1: ne = self.data[f"DENS_{i_sp + 1}"] Te = self.data[f"TEMP_{i_sp + 1}"] found_electron = True break if not found_electron: raise TypeError( "Pyro currently requires an electron species in the input file" ) return ne, Te
[docs] class CGYROFile:
[docs] def __init__(self, path: PathLike, required: bool): self.path = Path(path) self.required = required self.fmt = self.path.name.split(".")[0]
[docs] class GKOutputReaderCGYRO(FileReader, file_type="CGYRO", reads=GKOutput): fields = ["phi", "apar", "bpar"] moments = ["n", "e", "v"]
[docs] def read_from_file( self, filename: PathLike, norm: Normalisation, output_convention: str = "pyrokinetics", load_fields=True, load_fluxes=True, load_moments=False, downsample: Dict[str, Any] = {}, ) -> GKOutput: raw_data, gk_input, input_str = self._get_raw_data( filename, load_fields, load_moments ) # Assign units and return GKOutput convention = getattr(norm, gk_input.norm_convention) gk_input.convention = convention norm.default_convention = output_convention.lower() fmt_downsample = {} for key, value in downsample.items(): if isinstance(value, int): fmt_downsample[key + "_idx"] = slice(value, value + 1) else: fmt_downsample[key + "_idx"] = value downsample = fmt_downsample bunit_over_b0 = (1 * norm.cgyro.bref).to(norm.gs2).m coords = self._get_coords(raw_data, gk_input, bunit_over_b0, downsample) fields = ( self._get_fields(raw_data, gk_input, coords, downsample) if load_fields else None ) fluxes = ( self._get_fluxes(raw_data, gk_input, coords, downsample) if load_fluxes else None ) moments = ( self._get_moments(raw_data, gk_input, coords, downsample) if load_moments else None ) if coords["linear"] and ( coords["ntheta_plot"] != coords["ntheta_grid"] or not fields ): eigenvalues = self._get_eigenvalues(raw_data, coords, gk_input) eigenfunctions = self._get_eigenfunctions(raw_data, coords) else: # Rely on gk_output to generate eigenvalues eigenvalues = None eigenfunctions = None normalise_flux_moment = False # Linear CGYRO outputs are normalised by <\phi(t)^2> and do not include factor # k_y \rho_s so these must be undone and added now if coords["linear"] and fields: phi2 = np.abs(fields["phi"]) ** 2 w_theta = coords["w_theta"][:, None, None, None] phi2_int = np.sum(phi2 * w_theta, axis=0) * gk_input.data["KY"] * 2 # Sum over kx if nky > 1 if len(coords["ky"]) > 1: phi2_int = np.sum(phi2_int, axis=0) if fluxes: fluxes = {k: v * phi2_int for k, v in fluxes.items()} if moments: moments = {k: v * phi2_int for k, v in moments.items()} normalise_flux_moment = True field_dims = ("theta", "kx", "ky", "time") flux_dims = ("field", "species", "ky", "time") moment_dims = ("theta", "kx", "species", "ky", "time") gk_output = GKOutput( coords=Coords( time=coords["time"], kx=coords["kx"], ky=coords["ky"], theta=coords["theta"], pitch=coords["pitch"], energy=coords["energy"], species=coords["species"], field=coords["field"], ).with_units(convention), norm=norm, fields=( Fields(**fields, dims=field_dims).with_units(convention) if fields else None ), fluxes=( Fluxes(**fluxes, dims=flux_dims).with_units(convention) if fluxes else None ), moments=( Moments(**moments, dims=moment_dims).with_units(convention) if moments else None ), eigenvalues=( Eigenvalues(**eigenvalues).with_units(convention) if eigenvalues else None ), eigenfunctions=( None if eigenfunctions is None else Eigenfunctions(eigenfunctions).with_units(convention) ), linear=coords["linear"], gk_code="CGYRO", input_file=input_str, normalise_flux_moment=normalise_flux_moment, output_convention=output_convention, jacobian=coords["jacobian"], ) # Storing state of class variables for @staticmethod functions gk_output._downsample = downsample gk_output._raw_coords = coords return gk_output
[docs] def verify_file_type(self, dirname: PathLike): dirname = Path(dirname) for f in self._required_files(dirname).values(): if not f.path.exists(): raise RuntimeError(f"Missing the file '{f.path}'")
[docs] @staticmethod def infer_path_from_input_file(filename: PathLike) -> Path: """ Given path to input file, guess at the path for associated output files. For CGYRO, simply returns dir of the path. """ return Path(filename).parent
@staticmethod def _required_files(dirname: PathLike): dirname = Path(dirname) return { "input": CGYROFile(dirname / "input.cgyro", required=True), "time": CGYROFile(dirname / "out.cgyro.time", required=True), "grids": CGYROFile(dirname / "out.cgyro.grids", required=True), "equilibrium": CGYROFile(dirname / "out.cgyro.equilibrium", required=True), "geo": CGYROFile(dirname / "bin.cgyro.geo", required=True), } @classmethod def _get_raw_data( cls, dirname: PathLike, load_fields, load_moments ) -> Tuple[Dict[str, Any], GKInputCGYRO, str]: dirname = Path(dirname) if not dirname.exists(): raise RuntimeError( f"GKOutputReaderCGYRO: Provided path {dirname} does not exist. " "Please supply the name of a directory containing CGYRO output files." ) if not dirname.is_dir(): raise RuntimeError( f"GKOutputReaderCGYRO: Provided path {dirname} is not a directory. " "Please supply the name of a directory containing CGYRO output files." ) # The following list of CGYRO files may exist expected_files = { **cls._required_files(dirname), "flux": CGYROFile(dirname / "bin.cgyro.ky_flux", required=False), "cflux": CGYROFile(dirname / "bin.cgyro.ky_cflux", required=False), "eigenvalues_bin": CGYROFile(dirname / "bin.cgyro.freq", required=False), "eigenvalues_out": CGYROFile(dirname / "out.cgyro.freq", required=False), **{ f"field_{f}": CGYROFile(dirname / f"bin.cgyro.kxky_{f}", required=False) for f in cls.fields if load_fields }, **{ f"moment_{m}": CGYROFile( dirname / f"bin.cgyro.kxky_{m}", required=False ) for m in cls.moments if load_moments }, **{ f"eigenfunctions_{f}": CGYROFile( dirname / f"bin.cgyro.{f}b", required=False ) for f in cls.fields }, } # Read in files raw_data = {} for key, cgyro_file in expected_files.items(): if not cgyro_file.path.exists(): if cgyro_file.required: raise RuntimeError( f"GKOutputReaderCGYRO: The file {cgyro_file.path.name} is needed" ) continue # Read in file according to format if cgyro_file.fmt == "input": with open(cgyro_file.path, "r") as f: raw_data[key] = f.read() if cgyro_file.fmt == "out": raw_data[key] = np.loadtxt(cgyro_file.path) if cgyro_file.fmt == "bin": # Promote to 64 bit float here, as with older NumPy versions this # can lock us into low precision computation throughout # raw_data[key] = np.asarray( # np.fromfile(cgyro_file.path, dtype=np.float32), dtype=float # ) raw_data[key] = np.asarray( np.memmap(cgyro_file.path, dtype=np.float32, mode="r") ) input_str = raw_data["input"] gk_input = GKInputCGYRO() gk_input.read_str(input_str) return raw_data, gk_input, input_str @staticmethod def _get_coords( raw_data: Dict[str, Any], gk_input: GKInputCGYRO, bunit_over_b0: float = None, downsample: Dict[str, Any] = {}, ) -> Dict[str, Any]: """ Sets coords and attrs of a Pyrokinetics dataset from a collection of CGYRO files. Args: raw_data (Dict[str,Any]): Dict containing CGYRO output. gk_input (GKInputCGYRO): Processed CGYRO input file. Returns: Dict: Dictionary with coords """ # Process time data time = raw_data["time"][:, 0] # Process grid data grid_data = raw_data["grids"] nky = int(grid_data[0]) nspecies = int(grid_data[1]) nfield = int(grid_data[2]) nkx = int(grid_data[3]) ntheta_grid = int(grid_data[4]) nenergy = int(grid_data[5]) npitch = int(grid_data[6]) box_size = int(grid_data[7]) length_x = grid_data[8] ntheta_plot = int(grid_data[10]) # Iterate through grid_data in chunks, starting after kx pos = 11 + nkx theta_grid = grid_data[pos : pos + ntheta_grid] pos += ntheta_grid energy = grid_data[pos : pos + nenergy] pos += nenergy pitch = grid_data[pos : pos + npitch] pos += npitch ntheta_ballooning = ntheta_grid * int(nkx / box_size) theta_ballooning = grid_data[pos : pos + ntheta_ballooning] pos += ntheta_ballooning ky = np.abs( grid_data[pos : pos + nky] / bunit_over_b0 ) # Force the sign of ky to be positive if gk_input.is_linear() and nky == 1: # Convert to ballooning co-ordinate so only 1 kx theta = theta_ballooning ntheta = ntheta_ballooning kx = [0.0] nkx = 1 theta0 = gk_input.data.get("PX0", 0.0) * 2 * np.pi else: # Output data actually given on theta_plot grid ntheta = ntheta_plot theta = np.array( ([0.0] if ntheta == 1 else theta_grid[:: ntheta_grid // ntheta]) ) kx = ( 2 * pi * np.linspace(-int(nkx / 2), int((nkx + 1) / 2) - 1, nkx) / length_x ) / bunit_over_b0 theta0 = 0.0 theta += -theta0 # Get rho_star from equilibrium file if len(raw_data["equilibrium"]) == 54 + 7 * nspecies: rho_star = raw_data["equilibrium"][35] elif len(raw_data["equilibrium"]) == 54 + 9 * nspecies: rho_star = raw_data["equilibrium"][35] elif len(raw_data["equilibrium"]) == 55 + 10 * nspecies: rho_star = raw_data["equilibrium"][35] elif len(raw_data["equilibrium"]) == 57 + 9 * nspecies: rho_star = raw_data["equilibrium"][35] elif len(raw_data["equilibrium"]) == 48 + 9 * nspecies: rho_star = raw_data["equilibrium"][35] gk_input.set_legacy_cgyro(False) else: rho_star = raw_data["equilibrium"][23] fields = ["phi", "apar", "bpar"][:nfield] fluxes = ["particle", "heat", "momentum"] moments = ["density", "temperature", "velocity"] species = gk_input.get_local_species().names if nspecies != len(species): raise RuntimeError( "GKOutputReaderCGYRO: Different number of species in input and output." ) g_theta_geo = raw_data["geo"][ntheta_grid : 2 * ntheta_grid] bmag = raw_data["geo"][2 * ntheta_grid : 3 * ntheta_grid] csf = gk_input.data.get("CONSTANT_STREAM_FLAG", 1) if csf == 1: dtheta = theta_grid[1] - theta_grid[0] dtheta_eq = 2 * np.pi / ntheta_grid g_theta_geo = dtheta * g_theta_geo[0] / dtheta_eq w_theta = g_theta_geo / bmag w_theta = w_theta / sum(w_theta) nradial = int(gk_input.data["N_RADIAL"]) # Construct on ballooning space grid if len(ky) == 1: w_theta = np.tile(w_theta, nradial) full_ky = ky full_kx = kx full_theta = theta full_time = time if downsample: ky = ky[downsample.get("ky_idx", slice(None))] kx = kx[downsample.get("kx_idx", slice(None))] theta = theta[downsample.get("theta_idx", slice(None))] time = time[downsample.get("time_idx", slice(None))] local_geometry = gk_input.get_local_geometry() metric_terms = MetricTerms(local_geometry, ntheta=ntheta_grid * 4) theta_mod = np.mod(theta, 2 * np.pi) Jacobian = np.interp( theta_mod, metric_terms.regulartheta, metric_terms.Jacobian, period=2 * np.pi, ) # Store grid data as xarray DataSet return { "time": time, "kx": kx, "ky": ky, "theta": theta, "full_time": full_time, "full_kx": full_kx, "full_ky": full_ky, "full_theta": full_theta, "energy": energy, "pitch": pitch, "ntheta_plot": ntheta_plot, "ntheta_grid": ntheta_grid, "nradial": nradial, "rho_star": rho_star, "field": fields, "moment": moments, "flux": fluxes, "species": species, "linear": gk_input.is_linear(), "w_theta": w_theta, "jacobian": Jacobian, } @staticmethod def _get_fields( raw_data: Dict[str, Any], gk_input: GKInputCGYRO, coords: Dict[str, Any], downsample: Dict[str, Any], ) -> Dict[str, np.ndarray]: """ Sets 3D fields over time. The field coordinates should be (field, theta, kx, ky, time) """ nkx = len(coords["kx"]) nradial = coords["nradial"] nky = len(coords["ky"]) ntheta = len(coords["theta"]) full_nkx = len(coords["full_kx"]) full_ntime = len(coords["full_time"]) full_nky = len(coords["full_ky"]) full_ntheta = len(coords["full_theta"]) ntheta_plot = coords["ntheta_plot"] ntheta_grid = coords["ntheta_grid"] ntime = len(coords["time"]) nfield = len(coords["field"]) field_names = ["phi", "apar", "bpar"][:nfield] def _field_downsample( memmap, kx_idx=None, ky_idx=None, theta_idx=None, time_idx=None, **kwargs ): """ Extract a subset of a 4D memmap array with custom indices for each dimension. Parameters ---------- memmap_arr : np.memmap The 4D memmap array with shape (kx, ky, theta, time). kx_idx, ky_idx, theta_idx, time_idx : slice, int, list, or ndarray, optional Indices for each dimension. Can be: - None (means ':') - int - slice (e.g., slice(0, 10, 2)) - list or np.ndarray of indices Returns ------- np.ndarray A normal NumPy array with the requested subset. (This will load the selected data into memory.) """ # replace None with full slices kx_idx = slice(None) if kx_idx is None else kx_idx ky_idx = slice(None) if ky_idx is None else ky_idx theta_idx = slice(None) if theta_idx is None else theta_idx time_idx = slice(None) if time_idx is None else time_idx # fancy indexing happens here return memmap[:, kx_idx, theta_idx, ky_idx, time_idx] raw_field_data = {f: raw_data.get(f"field_{f}", None) for f in field_names} results = {} # Check to see if there's anything to do if not raw_field_data: return results # Loop through all fields and add field in if it exists for ifield, (field_name, raw_field) in enumerate(raw_field_data.items()): if raw_field is None: logging.warning( f"Field data {field_name} over time not found, expected the file " f"bin.cgyro.kxky_{field_name} to exist. Setting this field to 0." ) continue # If linear, convert from kx to ballooning space. # Use nradial instead of nkx, ntheta_plot instead of ntheta if gk_input.is_linear() and nky == 1: shape = (2, nradial, ntheta_plot, full_nky, full_ntime) else: shape = (2, full_nkx, full_ntheta, full_nky, full_ntime) field_data = raw_field.reshape(shape, order="F") field_data = _field_downsample(field_data, **downsample).astype(np.float64) # Adjust sign to match pyrokinetics frequency convention # (-ve is electron direction) bt_ccw = gk_input.data.get("BTCCW", -1) ip_ccw = gk_input.data.get("IPCCW", -1) mode_sign = int( np.sign(np.sign(gk_input.data.get("S", 1.0)) * bt_ccw * ip_ccw) ) field_data = (field_data[0] + 1j * field_data[1]) / coords["rho_star"] # If nonlinear, we can simply save the fields and continue if gk_input.is_nonlinear() or nky != 1: fields = field_data.swapaxes(0, 1) else: # If theta_plot != theta_grid, we get eigenfunction data and multiply by the # field amplitude if ntheta_plot != ntheta_grid: # Get eigenfunction data raw_eig_data = raw_data.get(f"eigenfunctions_{field_name}", None) if raw_eig_data is None: logging.warning( f"When setting fields, eigenfunction data for {field_name} not " f"found, expected the file bin.cgyro.{field_name}b to exist. " f"Not setting the field {field_name}." ) continue eig_shape = [2, ntheta, full_ntime] eig_data = raw_eig_data[: np.prod(eig_shape)].reshape( eig_shape, order="F" ) eig_data = eig_data[0] + 1j * eig_data[1] # Get field amplitude middle_kx = (nradial // 2) + 1 field_amplitude = np.abs(field_data[middle_kx, 0, 0, :]) # Multiply together # FIXME We only set kx=ky=0 here, any other values are left undefined # as fields is created using np.empty. Should we instead set # all kx and ky to these values? Should we expect that nx=ny=1? field_data = np.reshape( eig_data * field_amplitude, (nradial, ntheta_grid, nky, ntime), ) # Poisson Sum (no negative in exponent to match frequency convention) q = gk_input.get_local_geometry_miller().q nx0 = gk_input.data.get("PX0", 0.0) for i_radial in range(nradial): nx = -nradial // 2 + (i_radial - 1) field_data[i_radial, ...] *= np.exp( -2j * pi * (nx + nx0) * np.abs(q) ) if mode_sign == -1: field_data = field_data[:, ::-1, :, :] fields = field_data.reshape([ntheta, nkx, nky, ntime]) fields = fields[::-1, :, :, :] else: fields = field_data.reshape([ntheta, nkx, nky, ntime]) if ip_ccw == -1: fields = np.conj(fields) results[field_name] = fields return results @staticmethod def _get_moments( raw_data: Dict[str, Any], gk_input: GKInputCGYRO, coords: Dict[str, Any], downsample: Dict, ) -> Dict[str, np.ndarray]: """ Sets 3D moments over time. The moment coordinates should be (moment, theta, kx, species, ky, time) """ moment_names = {"n": "density", "e": "temperature", "v": "velocity"} nkx = len(coords["kx"]) nradial = coords["nradial"] nky = len(coords["ky"]) ntheta = len(coords["theta"]) ntheta_plot = coords["ntheta_plot"] ntime = len(coords["time"]) nspec = len(coords["species"]) full_nkx = len(coords["full_kx"]) full_ntime = len(coords["full_time"]) full_nky = len(coords["full_ky"]) full_ntheta = len(coords["full_theta"]) raw_moment_data = { value: raw_data.get(f"moment_{key}", None) for key, value in moment_names.items() } results = {} # Check to see if there's anything to do if not raw_moment_data: return results def _moment_downsample( memmap, kx_idx=None, ky_idx=None, theta_idx=None, time_idx=None, **kwargs ): """ Extract a subset of a 4D memmap array with custom indices for each dimension. Parameters ---------- memmap_arr : np.memmap The 4D memmap array with shape (kx, theta, species, ky, time). kx_idx, ky_idx, theta_idx, time_idx : slice, int, list, or ndarray, optional Indices for each dimension. Can be: - None (means ':') - int - slice (e.g., slice(0, 10, 2)) - list or np.ndarray of indices Returns ------- np.ndarray A normal NumPy array with the requested subset. (This will load the selected data into memory.) """ # replace None with full slices kx_idx = slice(None) if kx_idx is None else kx_idx ky_idx = slice(None) if ky_idx is None else ky_idx theta_idx = slice(None) if theta_idx is None else theta_idx time_idx = slice(None) if time_idx is None else time_idx # fancy indexing happens here return memmap[:, kx_idx, theta_idx, :, ky_idx, time_idx] # Loop through all moments and add moment in if it exists for imoment, (moment_name, raw_moment) in enumerate(raw_moment_data.items()): if raw_moment is None: logging.warning( f"moment data {moment_name} over time not found, expected the file " f"bin.cgyro.kxky_{moment_name} to exist. Setting this moment to 0." ) continue # If linear, convert from kx to ballooning space. # Use nradial instead of nkx, ntheta_plot instead of ntheta if gk_input.is_linear() and nky == 1: shape = (2, nradial, ntheta_plot, nspec, nky, full_ntime) else: shape = (2, full_nkx, full_ntheta, nspec, full_nky, full_ntime) moment_data = raw_moment.reshape(shape, order="F") moment_data = _moment_downsample(moment_data, **downsample).astype( np.float64 ) # Adjust sign to match pyrokinetics frequency convention # (-ve is electron direction) bt_ccw = gk_input.data.get("BTCCW", -1) ip_ccw = gk_input.data.get("IPCCW", -1) mode_sign = int( np.sign(np.sign(gk_input.data.get("S", 1.0)) * bt_ccw * ip_ccw) ) moment_data = (moment_data[0] + 1j * moment_data[1]) / coords["rho_star"] # If nonlinear, we can simply save the moments and continue if gk_input.is_nonlinear() or nky != 1: moments = moment_data.swapaxes(0, 1) else: # Poisson Sum (no negative in exponent to match frequency convention) q = gk_input.get_local_geometry_miller().q nx0 = gk_input.data.get("PX0", 0.0) for i_radial in range(nradial): nx = -nradial // 2 + (i_radial - 1) moment_data[i_radial, ...] *= np.exp( -2j * pi * (nx + nx0) * np.abs(q) ) if mode_sign == -1: moment_data = moment_data[:, ::-1, ...] moments = moment_data.reshape([ntheta, nkx, nspec, nky, ntime]) moments = moments[::-1, :, :, :] else: moments = moment_data.reshape([ntheta, nkx, nspec, nky, ntime]) if ip_ccw == -1: moments = np.conj(moments) results[moment_name] = moments temp_spec = np.ones((ntheta, nkx, nspec, nky, ntime)) for i in range(nspec): temp_spec[:, :, i, :, :] = gk_input.data.get(f"TEMP_{i + 1}", 1.0) if "temperature" in results: # Convert CGYRO energy fluctuation to temperature results["temperature"] = ( 2 * results["temperature"] - results["density"] * temp_spec ) return results @staticmethod def _flux_downsample(memmap, ky_idx=None, time_idx=None, **kwargs): """ Extract a subset of a 4D memmap array with custom indices for each dimension. Parameters ---------- memmap_arr : np.memmap The 5D memmap array with shape (species, flux, field, ky, time). ky_idx, time_idx : slice, int, list, or ndarray, optional Indices for each dimension. Can be: - None (means ':') - int - slice (e.g., slice(0, 10, 2)) - list or np.ndarray of indices Returns ------- np.ndarray A normal NumPy array with the requested subset. (This will load the selected data into memory.) """ # replace None with full slices ky_idx = slice(None) if ky_idx is None else ky_idx time_idx = slice(None) if time_idx is None else time_idx # fancy indexing happens here return memmap[:, :, :, ky_idx, time_idx] @staticmethod def _get_fluxes( raw_data: Dict[str, Any], gk_input: Dict, coords: Dict, downsample: Dict, ) -> Dict[str, np.ndarray]: """ Set flux data over time. The flux coordinates should be (species, moment, field, ky, time) """ results = {} # cflux is more appropriate for CGYRO simulations # with GAMMA_E > 0 and SHEAR_METHOD = 2. # However, for cross-code consistency, gflux is used for now. # if gk_input.data.get("GAMMA_E", 0.0) == 0.0: # flux_key = "flux" # else: # flux_key = "cflux" flux_key = "flux" if flux_key in raw_data: coord_names = ["species", "flux", "field", "full_ky", "full_time"] shape = [len(coords[coord_name]) for coord_name in coord_names] raw_fluxes = raw_data[flux_key].reshape(shape, order="F") fluxes = GKOutputReaderCGYRO._flux_downsample(raw_fluxes, **downsample) fluxes = np.swapaxes(fluxes, 0, 2) if gk_input.is_linear() and gk_input._legacy_cgyro: flux_norm = ( 2 * np.pi**1.5 * -np.sign( gk_input.data.get("IPCCW", -1) * gk_input.data.get("BTCCW", -1) ) ) elif gk_input.is_linear() and not gk_input._legacy_cgyro: flux_norm = 2 * np.pi**1.5 * -np.sign(gk_input.data.get("IPCCW", -1)) else: flux_norm = 1.0 for iflux, flux in enumerate(coords["flux"]): results[flux] = (fluxes[:, iflux, :, :, :]) / flux_norm return results @classmethod def _get_eigenvalues( self, raw_data: Dict[str, Any], coords: Dict, gk_input: Optional[Any] = None ) -> Dict[str, np.ndarray]: """ Takes an xarray Dataset that has had coordinates and fields set. Uses this to add eigenvalues: data['eigenvalues'] = eigenvalues(kx, ky, time) data['mode_frequency'] = mode_frequency(kx, ky, time) data['growth_rate'] = growth_rate(kx, ky, time) This should be called after _set_fields, and is only valid for linear runs. Unlike the version in the super() class, CGYRO may need to get extra info from an eigenvalue file. Args: data (xr.Dataset): The dataset to be modified. dirname (PathLike): Directory containing CGYRO output files. Returns: Dict: The modified dataset which was passed to 'data'. """ ntime = len(coords["time"]) nky = len(coords["ky"]) nkx = len(coords["kx"]) shape = (2, nky, ntime) if "eigenvalues_bin" in raw_data: eigenvalue_over_time = raw_data["eigenvalues_bin"][ : np.prod(shape) ].reshape(shape, order="F") elif "eigenvalues_out" in raw_data: eigenvalue_over_time = ( raw_data["eigenvalues_out"].transpose()[:, :ntime].reshape(shape) ) else: raise RuntimeError( "Eigenvalues over time not found, expected the files bin.cgyro.freq or " "out.cgyro.freq to exist. Could not set data_vars 'growth_rate', " "'mode_frequency' and 'eigenvalue'." ) mode_sign = -np.sign( np.sign(gk_input.data.get("Q", 2.0)) * -gk_input.data.get("BTCCW", -1) ) mode_frequency = mode_sign * eigenvalue_over_time[0, :, :] growth_rate = eigenvalue_over_time[1, :, :] # Add kx axis for compatibility with GS2 eigenvalues # FIXME Is this appropriate? Should we drop the kx coordinate? shape_with_kx = (nkx, nky, ntime) mode_frequency = np.ones(shape_with_kx) * mode_frequency growth_rate = np.ones(shape_with_kx) * growth_rate result = { "growth_rate": growth_rate, "mode_frequency": mode_frequency, } return result @staticmethod def _get_eigenfunctions(raw_data: Dict[str, Any], coords: Dict) -> np.ndarray: """ Loads eigenfunctions into data with the following coordinates: data['eigenfunctions'] = eigenfunctions(kx, ky, field, theta, time) This should be called after _set_fields, and is only valid for linear runs. """ raw_eig_data = [ raw_data.get(f"eigenfunctions_{f}", None) for f in coords["field"] ] ntime = len(coords["time"]) ntheta = len(coords["theta"]) nkx = len(coords["kx"]) nky = len(coords["ky"]) raw_shape = [2, ntheta, nkx, nky, ntime] # FIXME Currently using kx and ky for compatibility with GS2 results, but # these coordinates are not used. Should we remove these coordinates? coord_names = ["field", "theta", "kx", "ky", "time"] eigenfunctions = np.empty( [len(coords[coord_name]) for coord_name in coord_names], dtype=complex ) # Loop through all fields and add eigenfunction if it exists for ifield, raw_eigenfunction in enumerate(raw_eig_data): if raw_eigenfunction is not None: eigenfunction = raw_eigenfunction[: np.prod(raw_shape)].reshape( raw_shape, order="F" ) eigenfunctions[ifield, ...] = eigenfunction[0] + 1j * eigenfunction[1] theta_star = np.argmax(abs(eigenfunctions[0, :, 0, 0, -1]), axis=0) phi_theta_star = eigenfunctions[0, theta_star, 0, 0, -1] phase = np.abs(phi_theta_star) / phi_theta_star field_squared = np.sum(np.abs(eigenfunctions) ** 2, 0) amplitude = np.sqrt( trapezoid(field_squared, coords["theta"], axis=0) / (2 * np.pi) ) result = eigenfunctions * phase / amplitude return result
[docs] @staticmethod def load_momentum_components(pyro): """ Load additional momentum-flux outputs from `bin.cgyro.ky_flux_mom`. This file can be generated by setting `MOMENTUM_PRINT_FLAG = 1` in the CGYRO input file, support for which is available from CGYRO commit 534ebc786c83fa1f1ed8dc1ad15193c20d421f72 The returned data has shape `(field, momentum_component, species, ky, time)` where `momentum_component` contains: - `"parallel"` Contribution from the parallel-flow and centrifugal terms. - `"perpendicular"` Contribution from perpendicular flows. - `"electromagnetic"` Contribution from electromagnetic-field terms in the momentum-transport equation, related to the Maxwell stress. Notes ----- The sum of the `"parallel"` and `"perpendicular"` components reproduce the standard momentum-flux outputs natively supported by Pyrokinetics. """ component_names = ["parallel", "perpendicular", "electromagnetic"] # Check that the correct state exists if pyro.gk_code != "CGYRO": raise NotImplementedError( "load_momentum_components() can only be called for CGYRO outputs." ) if pyro.gk_output is None: raise RuntimeError( "pyro.load_gk_output() must be called before load_momentum_components()." ) if "momentum" not in pyro.gk_output: raise RuntimeError( "Standard momentum-flux data must be in gk_output before calling " "load_momentum_components()." ) # Get output-file location if pyro.gk_output_file is not None: dirname = Path(pyro.gk_output_file) else: dirname = Path(pyro.gk_file).parent filename = dirname / "bin.cgyro.ky_flux_mom" if not filename.exists(): raise FileNotFoundError(f"Unable to locate {filename}") # Setup data output shape coords = pyro.gk_output._raw_coords nspecies = len(coords["species"]) nfield = len(coords["field"]) nky = len(coords["full_ky"]) ntime = len(coords["full_time"]) ncomponent = len(component_names) shape = (nspecies, ncomponent, nfield, nky, ntime) # Load raw data raw_data = np.asarray(np.memmap(filename, dtype=np.float32, mode="r")) mflux = raw_data.reshape(shape, order="F") mflux = GKOutputReaderCGYRO._flux_downsample( mflux, **pyro.gk_output._downsample ) mflux = np.swapaxes(mflux, 0, 2) # Bootstrap normalisations from standard momentum-flux data reference = pyro.gk_output["momentum"] raw_sum = mflux[:, :2, ...].sum(axis=1) scale = reference.pint.dequantify().data / raw_sum mflux = mflux * scale[:, None, ...] # Add data to gk_output object component_dim = "component" pyro.gk_output.data = pyro.gk_output.data.assign_coords( {component_dim: (component_dim, component_names)} ) pyro.gk_output.add_data( name="momentum_components", data=mflux, coords=("field", component_dim, "species", "ky", "time"), units=reference.pint.units, output_convention=pyro.gk_output.norm.default_convention.name, )