Source code for pyrokinetics.gk_code.gkw

import copy
import os
import re
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import f90nml
import numpy as np
from cleverdict import CleverDict
from scipy.integrate import cumulative_trapezoid, trapezoid

from ..file_utils import FileReader
from ..local_geometry import (
    LocalGeometry,
    LocalGeometryMiller,
    LocalGeometryMXH,
    MetricTerms,
    default_miller_inputs,
    default_mxh_inputs,
)
from ..local_species import LocalSpecies
from ..normalisation import SimulationNormalisation as Normalisation
from ..normalisation import convert_dict
from ..numerics import Numerics
from ..templates import gk_templates
from ..typing import PathLike
from .gk_input import GKInput
from .gk_output import (
    Coords,
    Eigenfunctions,
    Eigenvalues,
    Fields,
    Fluxes,
    GKOutput,
    Moments,
)


[docs] class GKInputGKW(GKInput, FileReader, file_type="GKW", reads=GKInput): """ Class that can read GKW input files, and produce Numerics, LocalSpecies, and LocalGeometry objects """ code_name = "GKW" default_file_name = "input.dat" norm_convention = "gkw" pyro_gkw_miller = { "rho": ["geom", "eps"], "q": ["geom", "q"], "shat": ["geom", "shat"], "kappa": ["geom", "kappa"], "s_kappa": ["geom", "skappa"], "delta": ["geom", "delta"], "s_delta": ["geom", "sdelta"], "shift": ["geom", "drmil"], "Z0": ["geom", "zmil"], "dZ0dr": ["geom", "dzmil"], "ip_ccw": ["geom", "signj"], "bt_ccw": ["geom", "signb"], } pyro_gkw_miller_defaults = { "rho": 0.16666, "q": 2.0, "shat": 1.0, "kappa": 1.0, "s_kappa": 0.0, "delta": 0.0, "s_delta": 0.0, "shift": 0.0, "Z0": 0.0, "dZ0dr": 0.0, "ip_ccw": -1, "bt_ccw": -1, } pyro_gkw_mxh = { **pyro_gkw_miller, "cn": ["geom", "c"], "sn": ["geom", "s"], "dcndr": ["geom", "c_prime"], "dsndr": ["geom", "s_prime"], "n_moments": ["geom", "n_shape"], } pyro_gkw_mxh_defaults = { **pyro_gkw_miller_defaults, "cn": [0.0, 0.0, 0.0, 0.0], "sn": [0.0, 0.0, 0.0, 0.0], "dcndr": [0.0, 0.0, 0.0, 0.0], "dsndr": [0.0, 0.0, 0.0, 0.0], "n_moments": 4, } pyro_gkw_species = { "mass": "mass", "z": "z", "dens": "dens", "temp": "temp", "inverse_lt": "rlt", "inverse_ln": "rln", "domega_drho": "uprim", }
[docs] def read_from_file( self, filename: PathLike, detect_norm: bool = True ) -> Dict[str, Any]: """ Reads GKW input file into a dictionary Uses default read, which assumes input is a Fortran90 namelist """ return super().read_from_file(filename, detect_norm=detect_norm)
[docs] def read_str(self, input_string: str, detect_norm: bool = True) -> Dict[str, Any]: """ Reads GKW input file given as string Uses default read_str, which assumes input_string is a Fortran90 namelist """ return super().read_str(input_string, detect_norm=detect_norm)
[docs] def read_dict( self, input_dict: Dict[str, Any], detect_norm: bool = True ) -> Dict[str, Any]: """ Reads GKW input file given as dict Uses default read_dict, which assumes input is a dict """ return super().read_dict(input_dict, detect_norm=detect_norm)
[docs] def verify_file_type(self, filename: PathLike): """ Ensure this file is a valid gkw input file, and that it contains sufficient info for Pyrokinetics to work with """ expected_keys = ["control", "gridsize", "mode", "geom", "spcgeneral"] self.verify_expected_keys(filename, expected_keys)
[docs] def write( self, filename: PathLike, float_format: str = "", local_norm=None, code_normalisation=None, ): """ Write self.data to a gyrokinetics input file. Uses default write, which writes to a Fortan90 namelist """ if local_norm is None: local_norm = Normalisation("write") if code_normalisation is None: code_normalisation = self.code_name.lower() convention = getattr(local_norm, code_normalisation) for name, namelist in self.data.items(): self.data[name] = convert_dict(namelist, convention) # f90nml doesnt like numpy arrays... self.data["collisions"]["nu_ab"] = list(self.data["collisions"]["nu_ab"]) super().write(filename, float_format=float_format)
[docs] def is_nonlinear(self) -> bool: is_box = self.data["mode"]["mode_box"] is_nonlin = self.data["control"]["non_linear"] return is_box and is_nonlin
[docs] def add_flags(self, flags) -> None: """ Add extra flags to GKW input file """ super().add_flags(flags)
[docs] def get_local_geometry(self) -> LocalGeometry: """ Returns local geometry. Delegates to more specific functions """ if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_local_geometry") convention = getattr(norms, self.norm_convention) geometry_type = self.data["geom"]["geom_type"] if geometry_type in ["miller", "circ"]: default_inputs = default_miller_inputs() pyro_gkw_local_geometry = self.pyro_gkw_miller pyro_gkw_local_geometry_defaults = self.pyro_gkw_miller_defaults local_geometry_class = LocalGeometryMiller elif geometry_type == "mxh": default_inputs = default_mxh_inputs() pyro_gkw_local_geometry = self.pyro_gkw_mxh pyro_gkw_local_geometry_defaults = self.pyro_gkw_mxh_defaults local_geometry_class = LocalGeometryMXH else: raise NotImplementedError( f"LocalGeometry type {geometry_type} not implemented for GKW" ) local_geometry_data = default_inputs for (pyro_key, (gkw_param, gkw_key)), gkw_default in zip( pyro_gkw_local_geometry.items(), pyro_gkw_local_geometry_defaults.values() ): local_geometry_data[pyro_key] = self.data[gkw_param].get( gkw_key, gkw_default ) if geometry_type == "mxh": for key in ["cn", "sn", "dcndr", "dsndr"]: local_geometry_data[key] = [float(i) for i in local_geometry_data[key]] for key, value in local_geometry_data.items(): if isinstance(value, list): local_geometry_data[key] = np.array(value)[ : local_geometry_data["n_moments"] ] local_geometry_data["Rmaj"] = 1.0 local_geometry_data["rho"] = self.data["geom"]["eps"] local_geometry_data["bt_ccw"] *= -1 local_geometry_data["ip_ccw"] *= -1 try: beta = self.data["spcgeneral"]["beta_ref"] except KeyError: beta = self.data["spcgeneral"]["beta"] if self.data["spcgeneral"].get("betaprime_type", "ref") == "ref": local_geometry_data["beta_prime"] = self.data["spcgeneral"].get( "betaprime_ref", 0.0 ) elif self.data["spcgeneral"].get("betaprime_type", "ref") == "sp": # Need species to set up beta_prime local_species = self.get_local_species() local_geometry_data["beta_prime"] = ( -local_species.inverse_lp.m * local_species.pressure.m * beta ) else: raise ValueError( f"betaprime tpye {self.data['spcgeneral']['betaprime_type']} not supported for GKW" ) # must construct using from_gk_data as we cannot determine bunit_over_b0 here local_geometry = local_geometry_class.from_gk_data(local_geometry_data) local_geometry.B0 = 1.0 local_geometry.dpsidr *= local_geometry.B0 local_geometry.normalise(norms=convention) local_geometry.Fpsi = local_geometry.get_f_psi() local_geometry.FF_prime = local_geometry.get_f_prime() * local_geometry.Fpsi return local_geometry
[docs] def get_local_species(self): """ Load LocalSpecies object from GKW file """ if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_numerics") convention = getattr(norms, self.norm_convention) # Dictionary of local species parameters local_species = LocalSpecies() ion_count = 0 rotation = self.data.get("rotation", {"vcor": 0.0, "shear_rate": 0.0}) n_species = self.data["gridsize"]["number_of_species"] individual_coll = False ion_ion_coll = False reference_coll = False if self.data["control"]["collisions"]: coll_data = self.data["collisions"] zeff = self.data["collisions"].get("zeff", 1.0) if coll_data.get("freq_input", False): individual_coll = True collisions = np.array(coll_data.get("nu_ab", np.zeros(n_species**2)))[ : n_species**2 ].reshape((n_species, n_species)) elif coll_data.get("freq_override", False): ion_ion_coll = True collisions = coll_data.get("coll_freq", 0.0) else: reference_coll = True nref = coll_data.get("nref", 1.0) rref = coll_data.get("rref", 1.0) tref = coll_data.get("tref", 1.0) collisions = 6.5141e-5 * rref * nref / tref**2 else: collisions = 0.0 zeff = 1.0 # Load each species into a dictionary for i_sp in range(n_species): species_data = CleverDict() try: gkw_data = self.data["species"][i_sp] except TypeError: # case when only 1 species gkw_data = self.data["species"] for pyro_key, gkw_key in self.pyro_gkw_species.items(): species_data[pyro_key] = gkw_data[gkw_key] species_data["omega0"] = rotation.get("vcor", 0.0) if species_data.z == -1: name = "electron" else: ion_count += 1 name = f"ion{ion_count}" species_data.name = name if individual_coll: species_data.nu = collisions[i_sp, i_sp] * np.sqrt( species_data["temp"] / species_data["mass"] ) elif ion_ion_coll: species_data.nu = ( collisions * species_data.z**4 * species_data.dens / species_data.temp**2 ) elif reference_coll: if name == "electron": coulog = ( 14.9 - 0.5 * np.log(0.1 * nref * species_data.dens) + np.log(tref * species_data.temp) ) else: coulog = ( 17.3 - np.log(species_data.z**4 / (species_data.temp * tref)) - 0.5 * np.log(0.1 * nref / tref) - 0.5 * np.log( 2 * species_data.z**2 * species_data.dens / species_data.temp ) ) species_data.nu = ( collisions * species_data.z**4 * species_data.dens / species_data.temp**2 * coulog * np.sqrt(species_data["temp"] / species_data["mass"]) ) else: species_data.nu = 0.0 # normalisations species_data.dens *= convention.nref species_data.mass *= convention.mref species_data.nu *= convention.vref / convention.lref species_data.temp *= convention.tref species_data.z *= convention.qref species_data.inverse_lt *= convention.lref**-1 species_data.inverse_ln *= convention.lref**-1 species_data.omega0 *= convention.vref / convention.lref species_data.domega_drho *= convention.vref / convention.lref**2 # Add individual species data to dictionary of species local_species.add_species(name=name, species_data=species_data) local_species.zeff = zeff * convention.qref # Can't normalise to pyrokinetics normalisations so leave as GKW and calculate total pressure gradient local_species.normalise(convention) return local_species
[docs] def get_numerics(self) -> Numerics: """Gather numerical info (grid spacing, time steps, etc)""" if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_numerics") convention = getattr(norms, self.norm_convention) numerics_data = {} # Set no. of fields numerics_data["phi"] = True numerics_data["apar"] = self.data["control"].get("nlapar", True) numerics_data["bpar"] = self.data["control"].get("nlbpar", False) # Set time stepping delta_time = self.data["control"].get("dtim", 0.005) naverage = self.data["control"].get("naverage", 100) ntime = self.data["control"].get("ntime", 100) numerics_data["delta_time"] = delta_time numerics_data["max_time"] = delta_time * naverage * ntime # Theta grid n_s_grid = self.data["gridsize"]["n_s_grid"] nperiod = self.data["gridsize"]["nperiod"] numerics_data["nperiod"] = nperiod numerics_data["ntheta"] = n_s_grid // (2 * nperiod - 1) # Mode box specifications numerics_data["nonlinear"] = self.is_nonlinear() numerics_data["nkx"] = self.data["gridsize"]["nx"] numerics_data["nky"] = self.data["gridsize"]["nmod"] if not self.data["mode"]["mode_box"]: kthrho = self.data["mode"]["kthrho"] if isinstance(kthrho, list): kthrho = kthrho[: numerics_data["nky"]] else: kthrho = self.data["mode"]["krhomax"] / (numerics_data["nky"] - 1) local_geometry = self.get_local_geometry() drho_dpsi = ( local_geometry.q / local_geometry.rho / local_geometry.get_bunit_over_b0() ) e_eps_zeta = drho_dpsi / (4 * np.pi) # Ensure odd ntheta to get theta = 0.0 on grid metric_ntheta = (numerics_data["ntheta"] // 2) * 2 + 1 metric_terms = MetricTerms(local_geometry, ntheta=metric_ntheta) theta_index = np.argmin(abs(metric_terms.regulartheta)) g_aa = metric_terms.field_aligned_contravariant_metric("alpha", "alpha")[ theta_index ] kthnorm = np.sqrt(g_aa) / (2 * np.pi) shat = local_geometry.shat.m numerics_data["ky"] = kthrho * (e_eps_zeta * 2 / kthnorm).m if self.data["mode"]["mode_box"]: numerics_data["kx"] = ( numerics_data["ky"] * 2 * np.pi * shat / self.data["mode"]["ikxspace"] ) else: numerics_data["theta0"] = self.data["mode"].get("chin", 0.0) # Velocity grid numerics_data["nenergy"] = self.data["gridsize"].get("n_vpar_grid", 32) // 2 numerics_data["npitch"] = self.data["gridsize"].get("n_mu_grid", 16) # Beta try: numerics_data["beta"] = self.data["spcgeneral"]["beta_ref"] except KeyError: numerics_data["beta"] = self.data["spcgeneral"]["beta"] rotation = self.data.get("rotation", {"vcor": 0.0, "shear_rate": 0.0}) numerics_data["gamma_exb"] = rotation.get("shear_rate", 0.0) return Numerics(**numerics_data).with_units(convention)
[docs] def get_reference_values(self, local_norm: Normalisation) -> Dict[str, Any]: """ Reads in normalisation values from input file """ return {}
def _detect_normalisation(self): """ Determines the necessary inputs and passes information to the base method _set_up_normalisation. The following values are needed default_references: dict Dictionary containing default reference values for the gk_code: str GK code electron_density: float Electron density from GK input electron_temperature: float Electron density from GK input e_mass: float Electron mass from GK input electron_index: int Index of electron in list of data found_electron: bool Flag on whether electron was found densities: ArrayLike List of species densities temperatures: ArrayLike List of species temperature reference_density_index: ArrayLike List of indices where the species has a density of 1.0 reference_temperature_index: ArrayLike List of indices where the species has a temperature of 1.0 major_radius: float Normalised major radius from GK input rgeo_rmaj: float Ratio of Geometric and flux surface major radius minor_radius: float Normalised minor radius from GK input """ default_references = { "nref_species": "electron", "tref_species": "electron", "mref_species": "deuterium", "bref": "B0", "lref": "major_radius", "ne": 1.0, "te": 1.0, "rgeo_rmaj": 1.0, "vref": "most_probable", "rhoref": "gs2", "raxis_rmaj": None, } reference_density_index = [] reference_temperature_index = [] densities = [] temperatures = [] masses = [] found_electron = False e_mass = None electron_temperature = None electron_density = None electron_index = None n_species = self.data["gridsize"]["number_of_species"] if self.data["spcgeneral"].get("adiabatic_electrons"): n_species += 1 # Load each species into a dictionary for i_sp in range(n_species): dens = self.data["species"][i_sp]["dens"] temp = self.data["species"][i_sp]["temp"] mass = self.data["species"][i_sp]["mass"] # Find all reference values if self.data["species"][i_sp]["z"] == -1: electron_density = dens electron_temperature = temp e_mass = mass electron_index = i_sp found_electron = True if np.isclose(dens, 1.0): reference_density_index.append(i_sp) if np.isclose(temp, 1.0): reference_temperature_index.append(i_sp) densities.append(dens) temperatures.append(temp) masses.append(mass) if not found_electron: ne = 0.0 for i_sp in range(self.data["gridsize"]["number_of_species"]): ne += ( self.data["species"][i_sp]["dens"] * self.data["species"][i_sp]["z"] ) electron_density = ne electron_temperature = self.data["species"][0]["temp"] super()._set_up_normalisation( default_references=default_references, gk_code=self.code_name.lower(), electron_density=electron_density, electron_temperature=electron_temperature, e_mass=e_mass, electron_index=electron_index, found_electron=found_electron, densities=densities, temperatures=temperatures, reference_density_index=reference_density_index, reference_temperature_index=reference_temperature_index, major_radius=1.0, rgeo_rmaj=1.0, minor_radius=None, )
[docs] def set( self, local_geometry: LocalGeometry, local_species: LocalSpecies, numerics: Numerics, local_norm: Normalisation = None, template_file: Optional[PathLike] = None, code_normalisation: Optional[str] = None, **kwargs, ): """ Set self.data using LocalGeometry, LocalSpecies, and Numerics. These may be obtained via another GKInput file, or from Equilibrium/Kinetics objects. """ # If self.data is not already populated, fill in defaults from a given # template file. If this is not provided by the user, fall back to the # default. if self.data is None: if template_file is None: template_file = gk_templates["GKW"] self.read_from_file(template_file) if local_norm is None: local_norm = Normalisation("set") if code_normalisation is None: code_normalisation = self.norm_convention convention = getattr(local_norm, code_normalisation) # Set Miller Geometry bits if isinstance(local_geometry, LocalGeometryMiller): # Miller settings self.data["geom"]["geom_type"] = "miller" elif isinstance(local_geometry, LocalGeometryMXH): # Miller settings self.data["geom"]["geom_type"] = "mxh" else: raise NotImplementedError( f"LocalGeometry type {local_geometry.__class__.__name__} for GKW not supported yet" ) self.data["geom"]["eps"] = local_geometry.rho if local_geometry.local_geometry == "Miller": for pyro_key, (gkw_param, gkw_key) in self.pyro_gkw_miller.items(): self.data[gkw_param][gkw_key] = local_geometry[pyro_key] elif local_geometry.local_geometry == "MXH": for pyro_key, (gkw_param, gkw_key) in self.pyro_gkw_mxh.items(): self.data[gkw_param][gkw_key] = local_geometry[pyro_key] # GENE defines whether clockwise/ pyro defines whether counter-clockwise - need to flip sign self.data["geom"]["signj"] = -1 * local_geometry.ip_ccw self.data["geom"]["signb"] = -1 * local_geometry.bt_ccw # Pyro forces to its own value of beta_prime self.data["spcgeneral"]["betaprime_type"] = "ref" self.data["spcgeneral"]["betaprime_ref"] = local_geometry.beta_prime # Kinetic data n_species = local_species.nspec self.data["gridsize"]["number_of_species"] = n_species stored_species = len(self.data["species"]) extra_species = stored_species - n_species if extra_species > 0: for i in range(extra_species): del self.data["species"][-1] for iSp, name in enumerate(local_species.names): try: single_species = self.data["species"][iSp] except IndexError: if f90nml.__version__ < "1.4": self.data["species"].append(copy.copy(self.data["species"][0])) single_species = self.data["species"][iSp] else: # FIXME f90nml v1.4+ uses 'Cogroups' for Namelist groups sharing # a common key. As of version 1.4.2, Cogroup derives from # 'list', but does not implement all methods, so confusingly it # allows calls to 'append', but then doesn't do anything! # Currently working around this in a horribly inefficient # manner, by deconstructing the entire Namelist to a dict, using # secret cogroup names directly, and rebundling the Namelist. # There must be a better way! d = self.data.todict() copied = copy.deepcopy(d["_grp_species_0"]) copied["name"] = None d[f"_grp_species_{iSp}"] = copied self.data = f90nml.Namelist(d) single_species = self.data["species"][iSp] for key, val in self.pyro_gkw_species.items(): single_species[val] = local_species[name][key] self.data["collisions"]["zeff"] = local_species.zeff # write collision information self.data["collisions"]["freq_override"] = False self.data["collisions"]["freq_input"] = True nu_ee = local_species["electron"].nu e_mass = local_species["electron"].mass te = local_species["electron"].temp ne = local_species["electron"].dens ze = local_species["electron"].z nu_ab_array = np.zeros(local_species.nspec**2) counter = 0 for b in local_species.names: for a in local_species.names: dens = local_species[f"{b}"].dens mass = local_species[f"{a}"].mass temp = local_species[f"{a}"].temp Za = local_species[f"{a}"].z Zb = local_species[f"{b}"].z nu_ab = ( ( (dens / ne) * ((Za / ze) ** 2) * ((Zb / ze) ** 2) / (((temp / te) ** 1.5) * (mass / e_mass) ** 0.5) ) * nu_ee / np.sqrt(temp.m / mass.m) ) nu_ab_array[counter] = nu_ab.m counter += 1 self.data["collisions"]["nu_ab"] = nu_ab_array * nu_ee.units # beta_ref = local_norm.gs2.beta if local_norm else 0.0 beta_ref = 0.0 self.data["spcgeneral"]["beta_ref"] = ( numerics.beta if numerics.beta is not None else beta_ref ) # Set numerics # fields self.data["control"]["nlphi"] = numerics.phi self.data["control"]["nlapar"] = numerics.apar self.data["control"]["nlbpar"] = numerics.bpar # time stepping dtim = numerics.delta_time naverage = self.data["control"]["naverage"] self.data["control"]["dtim"] = dtim self.data["control"]["ntime"] = int( numerics.max_time / (numerics.delta_time * naverage) ) drho_dpsi = local_geometry.q / local_geometry.rho / local_geometry.bunit_over_b0 e_eps_zeta = drho_dpsi / (4 * np.pi) # Ensure odd ntheta to get theta = 0.0 on grid metric_ntheta = (numerics.ntheta // 2) * 2 + 1 metric_terms = MetricTerms(local_geometry, ntheta=metric_ntheta) theta_index = np.argmin(abs(metric_terms.regulartheta)) g_aa = metric_terms.field_aligned_contravariant_metric("alpha", "alpha")[ theta_index ] kthnorm = np.sqrt(g_aa) / (2 * np.pi) kthrho = ( numerics.ky * (1 * convention.bref / local_norm.gs2.bref).to_base_units() / (e_eps_zeta * 2 / kthnorm).m ) # mode box / single mode self.data["control"]["non_linear"] = numerics.nonlinear if numerics.nky == 1 and not numerics.nonlinear: self.data["control"]["non_linear"] = False self.data["mode"]["mode_box"] = False self.data["mode"]["kthrho"] = kthrho self.data["mode"]["chin"] = numerics.theta0 self.data["gridsize"]["nx"] = 1 self.data["gridsize"]["nmod"] = 1 self.data["gridsize"]["nperiod"] = numerics.nperiod self.data["gridsize"]["n_s_grid"] = ( 2 * numerics.nperiod - 1 ) * numerics.ntheta else: self.data["mode"]["mode_box"] = True self.data["mode"]["krhomax"] = kthrho * (numerics.nky - 1) if numerics.kx != 0: self.data["mode"]["ikxspace"] = int( numerics.ky * 2 * np.pi * local_geometry.shat / numerics.kx ) else: self.data["mode"]["ikxspace"] = 0 self.data["gridsize"]["nx"] = numerics.nkx self.data["gridsize"]["nmod"] = numerics.nky self.data["gridsize"]["nperiod"] = 1 self.data["gridsize"]["n_s_grid"] = numerics.ntheta # velocity grid self.data["gridsize"]["n_mu_grid"] = numerics.npitch self.data["gridsize"]["n_vpar_grid"] = numerics.nenergy * 2 # Rotation self.data["rotation"]["vcor"] = local_species.electron.omega0 self.data["rotation"]["shear_rate"] = numerics.gamma_exb if not local_norm: return for name, namelist in self.data.items(): self.data[name] = convert_dict(namelist, convention)
[docs] def get_ne_te_normalisation(self): adiabatic_electrons = True # Get electron temp and density to normalise input for i_sp in range(self.data["gridsize"]["number_of_species"]): if self.data["species"][i_sp]["z"] == -1: ne = self.data["species"][i_sp]["dens"] Te = self.data["species"][i_sp]["temp"] adiabatic_electrons = False if adiabatic_electrons: ne = 0.0 for i_sp in range(self.data["gridsize"]["number_of_species"]): ne += ( self.data["species"][i_sp]["dens"] * self.data["species"][i_sp]["z"] ) Te = self.data["species"][0]["temp"] return ne, Te
[docs] class GKWFile:
[docs] def __init__(self, path: PathLike, required: bool, binary: bool): self.path = Path(path) self.required = required self.binary = binary self.fmt = self.path.name.split(".")[0]
def _fromfile(*args, **kwargs): """Replacement to ``np.fromfile`` that always promotes to 64 bit float. Older versions of NumPy had different rules for type promotion which could lead to unintentional loss of precision. """ return np.asarray(np.fromfile(*args, **kwargs), dtype=float)
[docs] class GKOutputReaderGKW(FileReader, file_type="GKW", reads=GKOutput): fields = ["phi", "apar", "bpar"] moments = ["density", "temperature", "velocity"]
[docs] def read_from_file( self, filename: PathLike, norm: Normalisation, output_convention: str = "pyrokinetics", downsize: int = 1, load_fields=True, load_fluxes=True, load_moments=False, ) -> GKOutput: raw_data, gk_input, input_str = self._get_raw_data(filename) coords = self._get_coords(raw_data, gk_input, downsize) fields = self._get_fields(raw_data, gk_input, coords) if load_fields else None fluxes = self._get_fluxes(raw_data, gk_input, coords) if load_fluxes else None moments = self._get_moments(raw_data, coords) if load_moments else None if load_fields and len(fields[coords["field"][0]].shape) == 3: field_dims = ("theta", "kx", "ky") else: field_dims = ("theta", "kx", "ky", "time") if load_moments and len(moments[coords["moment"][0]].shape) == 4: moment_dims = ("kx", "ky", "theta", "species") else: moment_dims = ("kx", "ky", "theta", "species", "time") field_normalise = gk_input.data["control"].get("normalized", True) normalise_flux_moment = not field_normalise if coords["linear"]: eigenvalues = self._get_eigenvalues(raw_data, coords) if "time" in field_dims: if field_normalise: amplitude = np.exp( eigenvalues["growth_rate"].flatten() * coords["time"] ) else: eigenvalues = None amplitude = 1 for f in fields.keys(): fields[f] *= amplitude else: eigenvalues = None eigenfunctions = None eigenfunction_dims = None # Assign units and return GKOutput convention = norm.gkw norm.default_convention = output_convention.lower() flux_dims = ("field", "species", "time") return GKOutput( coords=Coords( time=coords["time"], kx=coords["kx"], ky=coords["ky"], theta=coords["theta"], pitch=coords["pitch"], energy=coords["energy"], species=coords["species"], field=coords["field"], ).with_units(convention), norm=norm, fields=( Fields(**fields, dims=field_dims).with_units(convention) if fields else None ), fluxes=( Fluxes(**fluxes, dims=flux_dims).with_units(convention) if fluxes else None ), moments=( Moments(**moments, dims=moment_dims).with_units(convention) if moments else None ), eigenvalues=( Eigenvalues(**eigenvalues).with_units(convention) if eigenvalues else None ), eigenfunctions=( None if eigenfunctions is None else Eigenfunctions(eigenfunctions, dims=eigenfunction_dims).with_units( convention ) ), linear=coords["linear"], gk_code="GKW", input_file=input_str, output_convention=output_convention, normalise_flux_moment=normalise_flux_moment, jacobian=coords["jacobian"], )
[docs] def verify_file_type(self, dirname: PathLike): dirname = Path(dirname) for f in self._required_files(dirname).values(): if f.required and not f.path.exists(): raise RuntimeError(f"Missing the file '{f.path}'")
[docs] @staticmethod def infer_path_from_input_file(filename: PathLike) -> Path: """ Given path to input file, guess at the path for associated output files. For GKW, simply returns dir of the path. """ return Path(filename).parent
@staticmethod def _required_files(dirname: PathLike): dirname = Path(dirname) return { "input": GKWFile(dirname / "input.dat", required=True, binary=False), "time": GKWFile(dirname / "time.dat", required=True, binary=False), "parallel": GKWFile(dirname / "parallel.dat", required=True, binary=False), "krho": GKWFile(dirname / "krho", required=True, binary=False), "kxrh": GKWFile(dirname / "kxrh", required=True, binary=False), "geom": GKWFile(dirname / "geom.dat", required=True, binary=False), "file_count": GKWFile(dirname / "file_count", required=True, binary=False), "flux_phi": GKWFile(dirname / "fluxes.dat", required=False, binary=False), "flux_apar": GKWFile( dirname / "fluxes_em.dat", required=False, binary=False ), "flux_bpar": GKWFile( dirname / "fluxes_bpar.dat", required=False, binary=False ), } @staticmethod def _get_gkw_field_files(dirname: PathLike, raw_data: dict): dirname = Path(dirname) field_names = {"phi": "Phi", "apar": "Apa", "bpar": "Bpa"} for pyro_field, gkw_field in field_names.items(): raw_data[f"field_{pyro_field}"] = [ dirname / f for f in sorted(os.listdir(dirname)) if re.search(rf"^{gkw_field}_kykxs\d{{8}}_\w{{4}}", f) ] @staticmethod def _get_gkw_field_spc_files(dirname: PathLike, raw_data: dict): dirname = Path(dirname) field_names = {"phi": "Spc3d", "apar": "Apc3d", "bpar": "Bpc3d"} for pyro_field, gkw_field in field_names.items(): raw_data[f"field_spc_{pyro_field}"] = [ dirname / f for f in sorted(os.listdir(dirname)) if re.search(rf"^{gkw_field}\d{{8}}", f) ] @staticmethod def _get_gkw_moment_files(dirname: PathLike, raw_data: dict): dirname = Path(dirname) moment_names = { "density": "dens", "temperature_par": "Tpar", "temperature_perp": "Tperp", "velocity": "vpar", } for pyro_moment, gkw_moment in moment_names.items(): raw_data[f"moment_{pyro_moment}"] = [ dirname / f for f in sorted(os.listdir(dirname)) if re.search(rf"^{gkw_moment}_kykxs\d{{2}}_\d{{6}}_\w{{4}}", f) ] @classmethod def _get_raw_data(cls, dirname: PathLike) -> Tuple[Dict[str, Any], GKInputGKW, str]: expected_data = cls._required_files(dirname) # Read in files raw_data = {} for key, gkw_file in expected_data.items(): if not gkw_file.path.exists(): if gkw_file.required: raise RuntimeError( f"GKOutputReaderGKW: The file {gkw_file.path.name} is needed" ) continue # Read in file according to format if key == "input": with open(gkw_file.path, "r") as f: raw_data[key] = f.read() elif key == "geom": with open(gkw_file.path, "r") as f: raw_data[key] = f.read().split("\n") else: raw_data[key] = np.loadtxt(gkw_file.path) input_str = raw_data["input"] # Read as GKInputGKW and into plain string gk_input = GKInputGKW() gk_input.read_str(input_str) cls._get_gkw_field_files(dirname, raw_data) cls._get_gkw_field_spc_files(dirname, raw_data) cls._get_gkw_moment_files(dirname, raw_data) # Defer processing field and flux data until their respective functions # Simply return files in place of raw data return raw_data, gk_input, input_str @staticmethod def _get_coords( raw_data: Dict[str, Any], gk_input: GKInputGKW, downsize: int = 1 ) -> Dict[str, Any]: """ Sets coords and attrs of a Pyrokinetics dataset from a collection of GKW files. Args: raw_data (Dict[str,Any]): Dict containing GKW output. gk_input (GKInputGKW): Processed GKW input file. Returns: Dict: Dictionary with coords """ # Process time data time = raw_data["time"][:] if time.ndim == 2: time = time[:, 0] if len(time) % downsize != 0: residual = len(time) % downsize - downsize else: residual = 0 time = time[::downsize] # read geometrical terms to map from gkw.ky to pyro.ky geom = raw_data["geom"] kth_index = geom.index("kthnorm") eez_index = geom.index("E_eps_zeta") e_eps_zeta = float(geom[eez_index + 1].split(" ")[-1]) kthnorm = float(geom[kth_index + 1]) kx = np.array([raw_data["kxrh"]]) * 2.0 * e_eps_zeta / kthnorm ky = np.array([raw_data["krho"]]) * 2.0 * e_eps_zeta / kthnorm if kx.ndim == 3: kx = kx[0, 0, :] if ky.ndim == 3: ky = ky[0, :, 0] fields = ["phi", "apar", "bpar"] fields_defaults = [True, False, False] fields = [ f for f, d in zip(fields, fields_defaults) if gk_input.data["control"].get(f"nl{f}", d) ] fluxes = ["particle", "heat", "momentum"] moments = ["density", "temperature", "velocity"] species = gk_input.get_local_species().names # Eigenfunctions repeated for each species s_index = geom.index("s_grid") bn_index = geom.index("bn") gkw_s = [] for i in range(bn_index - s_index - 1): gkw_s.extend( [ float(sval) for sval in geom[s_index + i + 1].strip().split(" ") if sval ] ) local_geometry = gk_input.get_local_geometry() ntheta = gk_input.get_numerics().ntheta metric_terms = MetricTerms(local_geometry, ntheta=ntheta * 4) nperiod = gk_input.get_numerics().nperiod metric_theta = metric_terms.regulartheta jacobian = metric_terms.Jacobian.m metric_s = cumulative_trapezoid( jacobian, metric_theta, initial=0.0 ) / trapezoid(jacobian, metric_theta) # The total number of poloidal turns is 2*nperiod-1 m = np.linspace(-(nperiod - 1), nperiod - 1, 2 * nperiod - 1) # Exclude last point to avoid duplicates ntheta = len(metric_theta) - 1 metric_theta = np.tile(metric_theta[:-1], 2 * nperiod - 1) metric_s = np.tile(metric_s[:-1], 2 * nperiod - 1) m = np.repeat(m, ntheta) metric_theta = metric_theta + 2.0 * np.pi * m metric_s = metric_s + m - 0.5 theta = np.interp(gkw_s, metric_s, metric_theta) n_theta = len(theta) n_energy = gk_input.data["gridsize"]["n_vpar_grid"] // 2 energy = np.linspace(0, n_energy - 1, n_energy) n_pitch = gk_input.data["gridsize"]["n_mu_grid"] pitch = np.linspace(0, n_pitch - 1, n_pitch) file_count = raw_data["file_count"] if len(raw_data[f"field_{fields[0]}"]) != 0: test_binary = _fromfile(raw_data[f"field_{fields[0]}"][0], dtype="float32") elif len(raw_data[f"field_spc_{fields[0]}"]) != 0: test_binary = "not_binary" else: raise ValueError("Cannot find any field files of GKW output") if len(test_binary) == n_theta: binary_dtype = "float32" elif len(test_binary) == 2 * n_theta: binary_dtype = "float64" elif test_binary == "not_binary": binary_dtype = "spc" else: raise ValueError("Cannot determine dtype of binary GKW output") theta_mod = np.mod(theta, 2 * np.pi) Jacobian = np.interp( theta_mod, metric_terms.regulartheta, metric_terms.Jacobian, period=2 * np.pi, ) # Store grid data as xarray DataSet return { "time": time, "kx": kx, "ky": ky, "theta": theta, "energy": energy, "pitch": pitch, "field": fields, "moment": moments, "flux": fluxes, "species": species, "linear": gk_input.is_linear(), "downsize": downsize, "residual": residual, "file_count": file_count, "binary_dtype": binary_dtype, "jacobian": Jacobian, } @staticmethod def _get_fields( raw_data: Dict[str, Any], gk_input: Dict[str, Any], coords: Dict[str, Any], ) -> Dict[str, np.ndarray]: """ Sets 3D fields over time. The field coordinates should be (field, theta, kx, ky, time) """ nkx = len(coords["kx"]) nky = len(coords["ky"]) ntheta = len(coords["theta"]) ntime = len(coords["time"]) nfield = len(coords["field"]) downsize = coords["downsize"] residual = coords["residual"] binary_dtype = coords["binary_dtype"] full_ntime = ntime * downsize + residual field_names = ["phi", "apar", "bpar"][:nfield] if binary_dtype == "spc": raise NotImplementedError( "Pyrokinetics does not yet support the reading of Spc/Poten GKW output files" ) if len(raw_data[f"field_{field_names[0]}"]) == 0: raise FileNotFoundError("No field files found for GKW Output.") elif len(raw_data[f"field_{field_names[0]}"]) != 2 * ntime: full_ntime = 1 signj = gk_input.data["geom"].get("signj", 1.0) results = {} # Loop through all fields and add field for ifield, field_name in enumerate(field_names): fields = np.empty((ntheta, nkx, nky, full_ntime), dtype=complex) raw_fields = np.empty((ntheta * nkx * nky, full_ntime), dtype=complex) for i_time in range(full_ntime): if full_ntime == 1: i_time = -1 imag_index = 2 * i_time real_index = imag_index + 1 raw_fields[:, i_time] = _fromfile( raw_data[f"field_{field_name}"][real_index], dtype=binary_dtype ) - 1j * signj * _fromfile( raw_data[f"field_{field_name}"][imag_index], dtype=binary_dtype ) fields = np.reshape(raw_fields, fields.shape) fields = fields[:, :, :, ::downsize] if full_ntime == 1: fields = np.squeeze(fields, axis=-1) # Move theta to 0 axis results[field_name] = fields return results @staticmethod def _get_moments( raw_data: Dict[str, Any], coords: Dict[str, Any], ) -> Dict[str, np.ndarray]: """ Sets 3D moments over time. The moment coordinates should be (moment, theta, kx, species, ky, time) """ nkx = len(coords["kx"]) nky = len(coords["ky"]) ntheta = len(coords["theta"]) ntime = len(coords["time"]) nspecies = len(coords["species"]) nmoment = len(coords["moment"]) downsize = coords["downsize"] residual = coords["residual"] binary_dtype = coords["binary_dtype"] full_ntime = ntime * downsize + residual moment_names = ["density", "temperature_par", "temperature_perp", "velocity"][ : nmoment + 1 ] if len(raw_data[f"moment_{moment_names[0]}"]) == 0: raise FileNotFoundError("No moment files found for GKW Output") if len(raw_data[f"moment_{moment_names[0]}"]) != 2 * ntime * nspecies: full_ntime = 1 results = {} # Loop through all moments and add moment for imoment, moment_name in enumerate(moment_names): moments = np.empty((nkx, nky, ntheta, nspecies, full_ntime), dtype=complex) raw_moments = np.empty( (nkx * nky * ntheta, nspecies, full_ntime), dtype=complex ) for i_time in range(full_ntime): if full_ntime == 1: i_time = -1 for i_spec in range(nspecies): i_time_species = i_time * (2 * nspecies) + i_spec * 2 imag_index = i_time_species real_index = i_time_species + 1 raw_moments[:, i_spec, i_time] = _fromfile( raw_data[f"moment_{moment_name}"][real_index], dtype=binary_dtype, ) + 1j * _fromfile( raw_data[f"moment_{moment_name}"][imag_index], dtype=binary_dtype, ) moments = np.reshape(raw_moments, moments.shape) moments = moments[:, :, :, :, ::downsize] if full_ntime == 1: moments = np.squeeze(moments, axis=-1) results[moment_name] = moments results["temperature"] = np.sqrt( results["temperature_par"] ** 2 + results["temperature_perp"] ** 2 ) del results["temperature_par"] del results["temperature_perp"] return results @staticmethod def _get_fluxes( raw_data: Dict[str, Any], gk_input: Dict, coords: Dict, ) -> Dict[str, np.ndarray]: """ Set flux data over time. The flux coordinates should be (species, moment, field, ky, time) """ results = {} ntime = len(coords["time"]) nspecies = len(coords["species"]) nflux = len(coords["flux"]) downsize = coords["downsize"] residual = coords["residual"] fields = coords["field"] nfield = len(fields) ntime = ntime * downsize + residual fluxes = np.empty((nfield, ntime, nspecies, nflux)) for ifield, field in enumerate(fields): flux_key = f"flux_{field}" if flux_key in raw_data: raw_fluxes = raw_data[flux_key] raw_fluxes = np.reshape(raw_fluxes, (ntime, nspecies, nflux)) fluxes[ifield, ...] = raw_fluxes flux_norm = 1.0 for iflux, flux in enumerate(coords["flux"]): results[flux] = fluxes[:, ::downsize, :, iflux].swapaxes(1, 2) / flux_norm return results @classmethod def _get_eigenvalues( self, raw_data: Dict[str, Any], coords: Dict ) -> Dict[str, np.ndarray]: """ Takes an xarray Dataset that has had coordinates and fields set. Uses this to add eigenvalues: data['eigenvalues'] = eigenvalues(kx, ky, time) data['mode_frequency'] = mode_frequency(kx, ky, time) data['growth_rate'] = growth_rate(kx, ky, time) This should be called after _set_fields, and is only valid for linear runs. Unlike the version in the super() class, GKW may need to get extra info from an eigenvalue file. Args: data (xr.Dataset): The dataset to be modified. dirname (PathLike): Directory containing GKW output files. Returns: Dict: The modified dataset which was passed to 'data'. """ ntime = len(coords["time"]) nky = len(coords["ky"]) nkx = len(coords["kx"]) shape = (nkx, nky, ntime) growth_rate = raw_data["time"][:, 1].reshape(shape) mode_frequency = raw_data["time"][:, 2].reshape(shape) result = { "growth_rate": growth_rate, "mode_frequency": mode_frequency, } return result @staticmethod def _get_eigenfunctions(raw_data: Dict[str, Any], coords: Dict) -> np.ndarray: """ Loads eigenfunctions into data with the following coordinates: data['eigenfunctions'] = eigenfunctions(kx, ky, field, theta) This should be called after _set_fields, and is only valid for linear runs. """ ntheta = len(coords["theta"]) nkx = len(coords["kx"]) nky = len(coords["ky"]) indexes = {"phi": [1, 2], "apar": [3, 4], "bpar": [13, 14]} coord_names = ["field", "theta", "kx", "ky"] eigenfunctions = np.empty( [len(coords[coord_name]) for coord_name in coord_names], dtype=complex ) parallel_data = raw_data["parallel"] for ifield, field in enumerate(coords["field"]): real_index = indexes[field][0] imag_index = indexes[field][1] eigenfunctions_data = ( parallel_data[:ntheta, real_index] + 1j * parallel_data[:ntheta, imag_index] ) eigenfunctions[ifield, ...] = np.reshape( eigenfunctions_data, (ntheta, nkx, nky) ) result = eigenfunctions return result