Source code for pyrokinetics.gk_code.gene

import copy
import csv
import logging
import re
import struct
import warnings
from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import f90nml
import h5py
import numpy as np
from cleverdict import CleverDict

from ..constants import deuterium_mass, electron_mass, pi
from ..file_utils import FileReader
from ..local_geometry import (
    LocalGeometry,
    LocalGeometryFourierGENE,
    LocalGeometryMiller,
    LocalGeometryMillerTurnbull,
    LocalGeometryMXH,
    MetricTerms,
    default_fourier_gene_inputs,
    default_miller_inputs,
    default_miller_turnbull_inputs,
    default_mxh_inputs,
)
from ..local_species import LocalSpecies
from ..normalisation import SimulationNormalisation as Normalisation
from ..normalisation import convert_dict, ureg
from ..numerics import Numerics
from ..templates import gk_templates
from ..typing import PathLike
from ..units import PyroContextError
from .gk_input import GKInput
from .gk_output import Coords, Eigenvalues, Fields, Fluxes, GKOutput, Moments


[docs] class GKInputGENE(GKInput, FileReader, file_type="GENE", reads=GKInput): """ Class that can read GENE input files, and produce Numerics, LocalSpecies, and LocalGeometry objects """ code_name = "GENE" default_file_name = "input.gene" norm_convention = "gene" _convention_dict = {} _drhotor_dr = 1.0 pyro_gene_miller = { "q": ["geometry", "q0"], "kappa": ["geometry", "kappa"], "s_kappa": ["geometry", "s_kappa"], "delta": ["geometry", "delta"], "s_delta": ["geometry", "s_delta"], "shat": ["geometry", "shat"], "shift": ["geometry", "drr"], "dZ0dr": ["geometry", "drz"], "ip_ccw": ["geometry", "sign_ip_cw"], "bt_ccw": ["geometry", "sign_bt_cw"], } pyro_gene_miller_default = { "q": None, "kappa": 1.0, "s_kappa": 0.0, "delta": 0.0, "s_delta": 0.0, "shat": 0.0, "shift": 0.0, "dZ0dr": 0.0, "ip_ccw": -1, "bt_ccw": -1, } pyro_gene_miller_turnbull = { "q": ["geometry", "q0"], "kappa": ["geometry", "kappa"], "s_kappa": ["geometry", "s_kappa"], "delta": ["geometry", "delta"], "s_delta": ["geometry", "s_delta"], "zeta": ["geometry", "zeta"], "s_zeta": ["geometry", "s_zeta"], "shat": ["geometry", "shat"], "shift": ["geometry", "drr"], "dZ0dr": ["geometry", "drz"], "ip_ccw": ["geometry", "sign_ip_cw"], "bt_ccw": ["geometry", "sign_bt_cw"], } pyro_gene_miller_turnbull_default = { "q": None, "kappa": 1.0, "s_kappa": 0.0, "delta": 0.0, "s_delta": 0.0, "zeta": 0.0, "s_zeta": 0.0, "shat": 0.0, "shift": 0.0, "dZ0dr": 0.0, "ip_ccw": -1, "bt_ccw": -1, } pyro_gene_mxh = { "q": ["geometry", "q0"], "kappa": ["geometry", "kappa"], "s_kappa": ["geometry", "s_kappa"], "shat": ["geometry", "shat"], "shift": ["geometry", "drr"], "dZ0dr": ["geometry", "drz"], "cn": ["geometry", "cN_m"], "sn": ["geometry", "sN_m"], "dcndr": ["geometry", "cNdr_m"], "dsndr": ["geometry", "sNdr_m"], "ip_ccw": ["geometry", "sign_ip_cw"], "bt_ccw": ["geometry", "sign_bt_cw"], } pyro_gene_mxh_default = { "q": None, "kappa": 1.0, "s_kappa": 0.0, "shat": 0.0, "shift": 0.0, "dZ0dr": 0.0, "cn": [0.0, 0.0, 0.0, 0.0], "sn": [0.0, 0.0, 0.0, 0.0], "dcndr": [0.0, 0.0, 0.0, 0.0], "dsndr": [0.0, 0.0, 0.0, 0.0], "ip_ccw": -1, "bt_ccw": -1, } pyro_gene_circular = { "q": ["geometry", "q0"], "shat": ["geometry", "shat"], "ip_ccw": ["geometry", "sign_ip_cw"], "bt_ccw": ["geometry", "sign_bt_cw"], } pyro_gene_circular_default = { "q": None, "shat": 0.0, "ip_ccw": -1, "bt_ccw": -1, } pyro_gene_fourier = { "q": ["geometry", "q0"], "shat": ["geometry", "shat"], "cN": ["geometry", "cn_m"], "sN": ["geometry", "sn_m"], "dcNdr": ["geometry", "cndr_m"], "dsNdr": ["geometry", "sndr_m"], "ip_ccw": ["geometry", "sign_ip_cw"], "bt_ccw": ["geometry", "sign_ip_cw"], } pyro_gene_fourier_default = { "q": None, "shat": 0.0, "cN": [0.5, *[0.0] * 31], "sN": [0.5, *[0.0] * 31], "dcNdr": [0.5, *[0.0] * 31], "dsNdr": [0.5, *[0.0] * 31], "ip_ccw": -1, "bt_ccw": -1, } pyro_gene_species = { "mass": "mass", "z": "charge", "dens": "dens", "temp": "temp", "inverse_lt": "omt", "inverse_ln": "omn", }
[docs] def read_from_file( self, filename: PathLike, detect_norm: bool = True ) -> Dict[str, Any]: """ Reads GENE input file into a dictionary Uses default read, which assumes input is a Fortran90 namelist """ # TODO Hacky fix in erroneous brackets from GENE v3.0 self.original_filename = filename read_str = False with open(filename, "r") as f: filedata = f.readlines() for i, line in enumerate(filedata): if "FCVERSION" in line and "(" in line: read_str = True filedata[i] = line.replace("(", "") if ")" in line: filedata[i] = filedata[i].replace(")", "") filedata = "".join(filedata) if read_str: return self.read_str(filedata) else: return super().read_from_file(filename, detect_norm=detect_norm)
[docs] def read_str(self, input_string: str, detect_norm: bool = True) -> Dict[str, Any]: """ Reads GENE input file given as string Uses default read_str, which assumes input is a Fortran90 namelist """ return super().read_str(input_string, detect_norm=detect_norm)
[docs] def read_dict(self, input_dict: dict, detect_norm: bool = True) -> Dict[str, Any]: """ Reads GENE input file given as dict Uses default read_dict, which assumes input is a dict """ return super().read_dict(input_dict, detect_norm=detect_norm)
[docs] def verify_file_type(self, filename: PathLike): """ Ensure this file is a valid gene input file, and that it contains sufficient info for Pyrokinetics to work with """ expected_keys = ["general", "geometry", "box"] self.verify_expected_keys(filename, expected_keys)
[docs] def write( self, filename: PathLike, float_format: str = "", local_norm: Normalisation = None, code_normalisation: str = None, ): """ Write self.data to a gyrokinetics input file. Uses default write, which writes to a Fortan90 namelist """ if local_norm is None: local_norm = Normalisation("write") aspect_ratio = ( self.data["geometry"]["major_r"] / self.data["geometry"]["minor_r"] ) local_norm.set_ref_ratios(aspect_ratio=aspect_ratio) if code_normalisation is None: code_normalisation = self.code_name.lower() convention = getattr(local_norm, code_normalisation) for name, namelist in self.data.items(): self.data[name] = convert_dict(namelist, convention) super().write(filename, float_format=float_format)
[docs] def is_nonlinear(self) -> bool: return bool(self.data["general"].get("nonlinear", False))
[docs] def add_flags(self, flags) -> None: """ Add extra flags to GENE input file Uses default, which assumes a Fortan90 namelist """ super().add_flags(flags)
[docs] def get_local_geometry(self) -> LocalGeometry: """ Returns local geometry. Delegates to more specific functions """ geometry_type = self.data["geometry"]["magn_geometry"] if geometry_type == "miller": if ( self.data["geometry"].get("zeta", 0.0) != 0.0 or self.data["geometry"].get("zeta", 0.0) != 0.0 ): default_inputs = default_miller_turnbull_inputs() pyro_gene_local_geometry = self.pyro_gene_miller_turnbull pyro_gene_local_geometry_default = ( self.pyro_gene_miller_turnbull_default ) local_geometry_class = LocalGeometryMillerTurnbull else: default_inputs = default_miller_inputs() pyro_gene_local_geometry = self.pyro_gene_miller pyro_gene_local_geometry_default = self.pyro_gene_miller_default local_geometry_class = LocalGeometryMiller elif geometry_type == "miller_mxh": default_inputs = default_mxh_inputs() pyro_gene_local_geometry = self.pyro_gene_mxh pyro_gene_local_geometry_default = self.pyro_gene_mxh_default local_geometry_class = LocalGeometryMXH elif geometry_type == "miller_general": default_inputs = default_fourier_gene_inputs() pyro_gene_local_geometry = self.pyro_gene_fourier pyro_gene_local_geometry_default = self.pyro_gene_fourier_default local_geometry_class = LocalGeometryFourierGENE elif geometry_type in ["circular", "s_alpha", "slab"]: default_inputs = default_miller_inputs() pyro_gene_local_geometry = self.pyro_gene_circular pyro_gene_local_geometry_default = self.pyro_gene_circular_default local_geometry_class = LocalGeometryMiller elif geometry_type in ["tracer_efit", "gene"]: default_inputs = default_fourier_gene_inputs() pyro_gene_local_geometry = self.pyro_gene_fourier pyro_gene_local_geometry_default = self.pyro_gene_fourier_default local_geometry_class = LocalGeometryFourierGENE geometry_dict = self.get_gene_geometry() else: raise NotImplementedError( f"LocalGeometry type {geometry_type} not implemented for GENE" ) local_geometry_data = default_inputs for (pyro_key, (gene_param, gene_key)), gene_default in zip( pyro_gene_local_geometry.items(), pyro_gene_local_geometry_default.values() ): local_geometry_data[pyro_key] = self.data[gene_param].get( gene_key, gene_default ) major_R = self.data["geometry"].get("major_r", 1.0) major_Z = self.data["geometry"].get("major_z", 0.0) local_geometry_data["Rmaj"] = major_R local_geometry_data["Z0"] = major_Z trpeps = self.get_trpeps() # Handle the fact that GENE defaults to 'minor_r = 1.0' in # 'parameters.dat' if 'minor_r' is not specified in 'parameters' minor_r = self.data["geometry"].get("minor_r", 0.0) if not (minor_r == 0.0 or np.isclose(minor_r, major_R)): local_geometry_data["aspect_ratio"] = major_R / minor_r local_geometry_data["rho"] = trpeps * local_geometry_data["Rmaj"] # Need to add in factor of rho if geometry_type == "miller_mxh": for key in ["dcndr", "dsndr"]: if local_geometry_data[key] == 0.0: local_geometry_data[key] = [0.0] * len(local_geometry_data["cn"]) else: local_geometry_data[key] = [ float(i) / local_geometry_data["rho"] for i in local_geometry_data[key] ] local_geometry_data["n_moments"] = len(local_geometry_data["cn"]) # Different defn of s_delta/s_zeta delta = self.data["geometry"].get( "delta", np.sin(local_geometry_data["sn"][1]) ) zeta = self.data["geometry"].get("zeta", -local_geometry_data["sn"][2]) s_delta = ( self.data["geometry"].get("s_delta", local_geometry_data["dsndr"][1]) / local_geometry_data["rho"] ) s_zeta = ( self.data["geometry"].get("s_zeta", -local_geometry_data["dsndr"][2]) / local_geometry_data["rho"] ) local_geometry_data["sn"][1] = np.arcsin(delta) local_geometry_data["sn"][2] = -zeta local_geometry_data["dsndr"][1] = s_delta local_geometry_data["dsndr"][2] = -s_zeta for key, value in local_geometry_data.items(): if isinstance(value, list): local_geometry_data[key] = np.array(value, dtype=float) # GENE defines whether clockwise - need to flip sign local_geometry_data["ip_ccw"] *= -1 local_geometry_data["bt_ccw"] *= -1 # Assume ne * Tref*8pi*1e-7 = 1.0 ( ne, Te, ) = self.get_ne_te_normalisation() beta = self.data["general"].get("beta", 0.0) / ne dpdx = self.data["geometry"].get("dpdx_pm", -2) amhd = self.data["geometry"].get("amhd", 0.0) local_species = self.get_local_species() if amhd != -1: amhd_beta_prime = -amhd / ( local_geometry_data["q"] ** 2 * local_geometry_data["Rmaj"] ) else: amhd_beta_prime = ( -local_species.inverse_lp.m * local_species.pressure.m * beta ) if dpdx == -1: if local_geometry_data["B0"] is not None: local_geometry_data["beta_prime"] = ( -local_species.inverse_lp.m * local_species.pressure.m * beta ) else: local_geometry_data["beta_prime"] = 0.0 elif dpdx == -2: local_geometry_data["beta_prime"] = amhd_beta_prime else: local_geometry_data["beta_prime"] = -dpdx if not np.isclose(local_geometry_data["beta_prime"], amhd_beta_prime): warnings.warn( f"GENE dpdx_pm = {local_geometry_data['beta_prime']} not set consistently with amhd = " f"{amhd_beta_prime} - drifts may not behave as expected" ) if geometry_type in ["tracer_efit", "gene"]: for key, value in geometry_dict.items(): local_geometry_data[key] = value local_geometry = local_geometry_class.from_gk_data(local_geometry_data) if geometry_type == "miller_mxh": local_geometry.dthetaR_dr = local_geometry.get_dthetaR_dr( local_geometry.theta, local_geometry.dcndr, local_geometry.dsndr ) # Need to get convention after? if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_local_geometry") convention = getattr(norms, self.norm_convention) _, _, rgeo_rmaj = self._get_rgeo_rmaj() local_geometry.B0 = rgeo_rmaj local_geometry.dpsidr = local_geometry.dpsidr * local_geometry.B0 local_geometry.normalise(norms=convention) if geometry_type in ["tracer_efit", "gene"]: lref = local_geometry.Rmaj.units bref = local_geometry.B0.units local_geometry.R_eq = geometry_dict["R_eq"] * lref local_geometry.Z_eq = geometry_dict["Z_eq"] * lref local_geometry.Rmaj = geometry_dict["Rmaj"] * lref local_geometry.b_poloidal_eq = geometry_dict["b_poloidal_eq"] * bref local_geometry.Z0 = geometry_dict["Z0"] * lref local_geometry.rho = geometry_dict["rho"] * lref local_geometry.dpsidr = geometry_dict["dpsidr"] * bref * lref local_geometry.beta_prime = geometry_dict["beta_prime"] * bref**2 / lref self._drhotor_dr = geometry_dict["drhotor_dr"] local_geometry._set_shape_coefficients( local_geometry.R_eq, local_geometry.Z_eq, local_geometry.b_poloidal_eq, ) raxis_rmaj, _, rgeo_rmaj = self._get_rgeo_rmaj() # Need to modify shape coefficients to match dpsidr bunit_over_b0 = local_geometry.get_bunit_over_b0() dpsidr = ( bunit_over_b0 / local_geometry.q * local_geometry.rho ).m * rgeo_rmaj # Rescale to account for a/Lref and B0/Bref ratio_dpsidr = geometry_dict["dpsidr"] / dpsidr local_geometry.dcNdr *= ratio_dpsidr local_geometry.dsNdr *= ratio_dpsidr local_geometry.b_poloidal_eq *= 1 / ratio_dpsidr local_geometry.bunit_over_b0 = local_geometry.get_bunit_over_b0() local_geometry.dpsidr = ( (local_geometry.bunit_over_b0 / local_geometry.q * local_geometry.rho) * bref * rgeo_rmaj ) local_geometry.R, local_geometry.Z = local_geometry.get_flux_surface( local_geometry.theta ) local_geometry.b_poloidal = local_geometry.get_b_poloidal( theta=local_geometry.theta, ) ( local_geometry.dRdtheta, local_geometry.dRdr, local_geometry.dZdtheta, local_geometry.dZdr, ) = local_geometry.get_RZ_derivatives(local_geometry.theta) local_geometry.Fpsi = local_geometry.get_f_psi() try: local_geometry.FF_prime = local_geometry.get_f_prime() * local_geometry.Fpsi except ValueError: local_geometry.FF_prime = None return local_geometry
[docs] def get_trpeps(self) -> float: """ Returns ------- trpeps: trpeps from the input file """ geometry_type = self.data["geometry"]["magn_geometry"] if hasattr(self, "original_filename"): original_filename = Path(self.original_filename) prefix = original_filename.parent / geometry_type if original_filename.suffix: suffix = original_filename.suffix else: filename_split = original_filename.name.split("_") if len(filename_split) > 1: suffix = f"_{filename_split[-1]}" else: suffix = "" geometry_filename = Path(f"{str(prefix)}{suffix}") if geometry_filename.exists(): direct_geometry_nml = f90nml.read(geometry_filename) trpeps = direct_geometry_nml["parameters"]["trpeps"] if trpeps == 0.0: trpeps = ( np.sqrt(direct_geometry_nml["parameters"]["s0"]) / direct_geometry_nml["parameters"]["major_R"] ) else: trpeps = self.data["geometry"].get("trpeps", 0.0) else: trpeps = self.data["geometry"].get("trpeps", 0.0) return trpeps
[docs] def get_gene_geometry(self): """ Returns ------- trpeps: trpeps from the input file """ if hasattr(self, "_gene_geometry_dict"): return self._gene_geometry_dict geometry_type = self.data["geometry"]["magn_geometry"] geo_dict = {} if hasattr(self, "original_filename"): original_filename = Path(self.original_filename) prefix = original_filename.parent / geometry_type if original_filename.suffix: suffix = original_filename.suffix else: filename_split = original_filename.name.split("_") if len(filename_split) > 1: suffix = f"_{filename_split[-1]}" else: suffix = "" geometry_filename = Path(f"{str(prefix)}{suffix}") if geometry_filename.exists(): geometry_nml = f90nml.read(geometry_filename) # GENE Lref is not magnetic axis so we need to shift it to that geo_dict["Lref"] = ( geometry_nml["parameters"]["lref"] * geometry_nml["parameters"]["major_r"] ) if geo_dict["Lref"] == 0.0: geo_dict["Lref"] = 1.0 skiprows = 19 if "edge_opt" in geometry_nml["parameters"].keys(): skiprows += 1 geometry_data = np.loadtxt(geometry_filename, skiprows=skiprows) Z0 = self.data["geometry"].get("major_z", False) if Z0: Z0_index = np.argmin( abs(geometry_data[:, 13] / geo_dict["Lref"] - Z0) ) elif geometry_type in ["gene", "tracer_efit"]: Z0_index = np.argmin(abs(geometry_data[:, 12])) Z0 = geometry_data[-Z0_index, 13] / geo_dict["Lref"] else: Z0 = 0.0 Z0_index = np.argmin(abs(geometry_data[:, 13] - Z0)) geometry_data = np.roll(geometry_data, -Z0_index, axis=0) R = geometry_data[:, 11] / geo_dict["Lref"] Z = geometry_data[:, 13] / geo_dict["Lref"] rho = (max(R) - min(R)) / 2 R_major = (max(R) + min(R)) / 2 Zmid = (max(Z) + min(Z)) / 2 # Handle edge_opt != 0.0 edge_opt = self.data["geometry"].get("edge_opt", 0.0) if edge_opt != 0.0: # Initial GENE zprime is from -pi to pi zprime = np.linspace(-np.pi, np.pi, len(R), endpoint=False) cap_N = np.arcsinh(edge_opt * np.pi) / np.pi dz_dzprime = cap_N * np.cosh(cap_N * zprime) / edge_opt # Need to roll to match data dz_dzprime = np.roll(dz_dzprime, -Z0_index, axis=0) else: dz_dzprime = 1.0 reverse = False if R[0] < R_major: if Z[1] > Z[0]: roll_sign = -1 else: reverse = True roll_sign = 1 else: if Z[1] > Z[0]: roll_sign = 1 else: reverse = True roll_sign = -1 R_diff = R - R_major Z_diff = Z - Zmid aN = np.sqrt((R_diff) ** 2 + (Z_diff) ** 2) dot_product = R_diff * np.roll(R_diff, roll_sign) + Z_diff * np.roll( Z_diff, roll_sign ) magnitude = np.sqrt(R_diff**2 + Z_diff**2) arc_angle = dot_product / (magnitude * np.roll(magnitude, roll_sign)) theta0 = np.arcsin(Z_diff[0] / aN[0]) theta_diff = np.arccos(arc_angle) if Z[1] > Z[0]: theta = np.cumsum(theta_diff) - theta_diff[0] else: theta = -np.cumsum(theta_diff) - theta_diff[0] theta += theta0 gxx = geometry_data[:, 0] gxy = geometry_data[:, 1] gxz = geometry_data[:, 2] gyy = geometry_data[:, 3] gyz = geometry_data[:, 4] gzz = geometry_data[:, 5] bmod = geometry_data[:, 6] gxz *= dz_dzprime gyz *= dz_dzprime gzz *= dz_dzprime**2 dxdR = geometry_data[:, 14] Cy = geometry_nml["parameters"]["cy"] Cxy = geometry_nml["parameters"]["cxy"] q = geometry_nml["parameters"]["q0"] ip_ccw = -geometry_nml["parameters"]["sign_Ip_CW"] bt_ccw = -geometry_nml["parameters"]["sign_Bt_CW"] if ip_ccw == 0: ip_ccw = 1 if bt_ccw == 0: bt_ccw = 1 gxy *= ip_ccw * bt_ccw gyz *= ip_ccw * bt_ccw maxR_index = np.argmax(R) minR_index = np.argmin(R) drhotor_dr = ( 2.0 / ( dxdR[maxR_index] / gxx[maxR_index] - dxdR[minR_index] / gxx[minR_index] ) * geometry_nml["parameters"]["major_r"] ) gxx *= drhotor_dr**-2 gxy *= drhotor_dr**-1 gxz *= drhotor_dr**-1 Cxy *= drhotor_dr # x0 = rho_tor Cx_prime = 1.0 dpsidr = ( Cxy * Cy * Cx_prime * drhotor_dr / geometry_nml["parameters"]["major_r"] ** 2 ) b_pol = np.sqrt( Cxy**2 * ( ((Cy * q) ** 2 * (gxx * gzz - gxz**2)) + 2 * Cy * q * (gxy * gxz - gxx * gyz) + (gxx * gyy - gxy**2) ) ) # f in Bref_Bgeo f = np.sqrt(bmod**2 - (b_pol) ** 2) * R f_dev = np.std(f) if f_dev > 0.1: raise ValueError( "Error in determination of F in get_gene_geometry. std too large" ) rgeo_rmaj = np.mean(f) / R_major # Need to convert shat_GENE to shat_pyro shat_gene = geometry_nml["parameters"]["shat"] x0 = np.sqrt(geometry_nml["parameters"]["s0"]) shat = shat_gene * rho / x0 * drhotor_dr beta_prime = -geometry_nml["parameters"]["my_dpdx"] * drhotor_dr if reverse: R = R[::-1] Z = Z[::-1] theta = theta[::-1] b_pol = b_pol[::-1] geo_dict["Rmaj"] = R_major geo_dict["Z0"] = Z0 geo_dict["rho"] = rho geo_dict["theta_eq"] = theta geo_dict["R_eq"] = R geo_dict["Z_eq"] = Z geo_dict["dpsidr"] = dpsidr geo_dict["shat"] = shat geo_dict["q"] = q geo_dict["beta_prime"] = beta_prime geo_dict["b_poloidal_eq"] = b_pol geo_dict["drhotor_dr"] = drhotor_dr geo_dict["rgeo_rmaj"] = rgeo_rmaj self._gene_geometry_dict = geo_dict return self._gene_geometry_dict else: raise FileNotFoundError( f"Can't find geometry file: {geometry_filename} in get_gene_geometry" ) else: return {}
[docs] def get_local_species(self): """ Load LocalSpecies object from GENE file """ local_species = LocalSpecies() ion_count = 0 if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_local_species") if self._convention_dict: code_normalisation = self.norm_convention else: code_normalisation = "pyrokinetics" convention = getattr(norms, code_normalisation) gene_nu_ei = self.data["general"].get("coll", 0.0) external_contr = self.data.get( "external_contr", {"exbrate": 0.0, "omega0_tor": 0.0, "pfsrate": 0.0} ) if external_contr.get("pfsrate", 0.0) == -1111: external_contr["pfsrate"] = external_contr["exbrate"] trpeps = self.get_trpeps() rho = trpeps * self.data["geometry"].get("major_r", 1.0) if rho == 0.0: domega_drho = 0.0 else: domega_drho = ( -self.data["geometry"]["q0"] / rho * external_contr.get("pfsrate", 0.0) ) names = self.get_local_species_names() # Load each species into a dictionary for i_sp in range(self.data["box"]["n_spec"]): species_data = CleverDict() try: gene_data = self.data["species"][i_sp] except TypeError: # Case when only 1 species gene_data = self.data["species"] for pyro_key, gene_key in self.pyro_gene_species.items(): species_data[pyro_key] = gene_data[gene_key] # Always force to major_r norm and then re-normalise to pyro after if self.data["geometry"].get("magn_geometry") in ["tracer_efit", "gene"]: lref_scale = self.data["geometry"]["major_r"] else: lref_scale = 1.0 species_data["inverse_lt"] = ( gene_data["omt"] * self._drhotor_dr * lref_scale ) species_data["inverse_ln"] = ( gene_data["omn"] * self._drhotor_dr * lref_scale ) species_data["omega0"] = external_contr.get("omega0_tor", 0.0) species_data["domega_drho"] = domega_drho * self._drhotor_dr * lref_scale if species_data.z == -1: species_data.nu = ( (gene_nu_ei * 4 * (deuterium_mass / electron_mass) ** 0.5) * convention.vref / convention.lref ) else: ion_count += 1 # Set nu to zero initially, will be updated later if electrons present species_data.nu = 0.0 * convention.vref / convention.lref species_data.name = names[i_sp] # normalisations species_data.dens *= convention.nref species_data.mass *= convention.mref species_data.temp *= convention.tref species_data.z *= convention.qref species_data.inverse_lt *= convention.lref**-1 species_data.inverse_ln *= convention.lref**-1 species_data.omega0 *= convention.vref / convention.lref species_data.domega_drho *= convention.vref / convention.lref**2 # Add individual species data to dictionary of species local_species.add_species(name=names[i_sp], species_data=species_data) # Calculate collision frequencies for ions (skip if adiabatic electrons) if hasattr(local_species, "electron"): nu_ee = local_species.electron.nu te = local_species.electron.temp ne = local_species.electron.dens me = local_species.electron.mass for ion in range(ion_count): key = f"ion{ion + 1}" nion = local_species[key]["dens"] tion = local_species[key]["temp"] mion = local_species[key]["mass"] zion = local_species[key]["z"] # Not exact at log(Lambda) does change but pretty close... local_species[key]["nu"] = ( nu_ee * (zion**4 * nion / tion**1.5 / mion**0.5) / (ne / te**1.5 / me**0.5) ).m * nu_ee.units local_species.zeff = ( self.data["general"].get("zeff", 1.0) * ureg.elementary_charge ) # Normalise to pyrokinetics normalisations and calculate total pressure gradient local_species.normalise(convention) return local_species
[docs] def get_local_species_names(self): """ Returns a list of the local species names Returns ------- names: Array List of local species names """ names = [] ion_count = 0 for i_sp in range(self.data["box"]["n_spec"]): try: gene_data = self.data["species"][i_sp] except TypeError: # Case when only 1 species gene_data = self.data["species"] species_z = gene_data["charge"] if species_z == -1: names.append("electron") else: ion_count += 1 names.append(f"ion{ion_count}") return names
[docs] def get_numerics(self) -> Numerics: """Gather numerical info (grid spacing, time steps, etc)""" if hasattr(self, "convention"): convention = self.convention else: norms = Normalisation("get_local_species") if self._convention_dict: code_normalisation = self.norm_convention else: code_normalisation = "pyrokinetics" convention = getattr(norms, code_normalisation) numerics_data = dict() # Set number of fields numerics_data["phi"] = True numerics_data["apar"] = bool(self.data["general"].get("beta", 0)) numerics_data["bpar"] = bool(self.data["general"].get("bpar", 0)) numerics_data["delta_time"] = self.data["general"].get("dt_max", 0.01) numerics_data["max_time"] = self.data["general"].get("simtimelim", 500.0) try: numerics_data["theta0"] = -self.data["box"]["kx_center"] / ( self.data["box"]["kymin"] * self.data["geometry"]["shat"] ) except KeyError: numerics_data["theta0"] = 0.0 numerics_data["nky"] = self.data["box"]["nky0"] numerics_data["ky"] = self.data["box"]["kymin"] # Set to zero if box.lx not present numerics_data["kx"] = 2 * pi / self.data["box"].get("lx", np.inf) # Velocity grid numerics_data["ntheta"] = self.data["box"].get("nz0", 24) numerics_data["nenergy"] = self.data["box"].get("nv0", 16) // 2 numerics_data["npitch"] = self.data["box"].get("nw0", 16) numerics_data["nonlinear"] = bool(self.data["general"].get("nonlinear", False)) if numerics_data["nonlinear"]: numerics_data["nkx"] = self.data["box"]["nx0"] numerics_data["nperiod"] = 1 else: numerics_data["nkx"] = 1 numerics_data["nperiod"] = self.data["box"]["nx0"] - 1 ( ne, Te, ) = self.get_ne_te_normalisation() numerics_data["beta"] = self.data["general"].get("beta", 0.0) / ne external_contr = self.data.get( "external_contr", {"exbrate": 0.0, "omega0_tor": 0.0, "pfsrate": 0.0} ) numerics_data["gamma_exb"] = external_contr["exbrate"] return Numerics(**numerics_data).with_units(convention)
[docs] def get_reference_values(self, local_norm: Normalisation) -> Dict[str, Any]: """ Reads in reference values from input file """ if "units" not in self.data.keys(): return {} if not self.data["units"].keys(): return {} norms = {} raxis_rmaj, magnetic_axis_radius, rgeo_rmaj = self._get_rgeo_rmaj() if magnetic_axis_radius: R_major = magnetic_axis_radius / raxis_rmaj norms["lref_major_radius"] = R_major * local_norm.units.meter else: if "major_R" in self.data["geometry"]: lref_scale = self.data["geometry"]["major_R"] lref_key = "major_radius" elif "minor_r" in self.data["geometry"]: lref_scale = self.data["geometry"]["minor_r"] lref_key = "minor_radius" norms[f"lref_{lref_key}"] = ( self.data["units"]["lref"] * local_norm.units.meter * lref_scale ) norms["tref_electron"] = self.data["units"]["Tref"] * local_norm.units.keV norms["nref_electron"] = ( self.data["units"]["nref"] * local_norm.units.meter**-3 * 1e19 ) norms["bref_B0"] = ( self.data["units"]["Bref"] * local_norm.units.tesla * rgeo_rmaj ) return norms
def _detect_normalisation(self): """ Determines the necessary inputs and passes information to the base method _set_up_normalisation. The following values are needed default_references: dict Dictionary containing default reference values for the gk_code: str GK code electron_density: float Electron density from GK input electron_temperature: float Electron density from GK input e_mass: float Electron mass from GK input electron_index: int Index of electron in list of data found_electron: bool Flag on whether electron was found densities: ArrayLike List of species densities temperatures: ArrayLike List of species temperature reference_density_index: ArrayLike List of indices where the species has a density of 1.0 reference_temperature_index: ArrayLike List of indices where the species has a temperature of 1.0 major_radius: float Normalised major radius from GK input rgeo_rmaj: float Ratio of Geometric and flux surface major radius minor_radius: float Normalised minor radius from GK input """ default_references = { "nref_species": "electron", "tref_species": "electron", "mref_species": "deuterium", "bref": "B0", "lref": "major_radius", "ne": 1.0, "te": 1.0, "rgeo_rmaj": 1.0, "vref": "nrl", "rhoref": "pyro", "raxis_rmaj": None, } reference_density_index = [] reference_temperature_index = [] densities = [] temperatures = [] masses = [] found_electron = False e_mass = None electron_temperature = None electron_density = None electron_index = None # Load each species into a dictionary for i_sp in range(self.data["box"]["n_spec"]): try: gene_data = self.data["species"][i_sp] except TypeError: # Case when only 1 species gene_data = self.data["species"] dens = gene_data["dens"] temp = gene_data["temp"] mass = gene_data["mass"] if gene_data["charge"] == -1: electron_density = dens electron_temperature = temp e_mass = mass electron_index = i_sp found_electron = True if np.isclose(dens, 1.0): reference_density_index.append(i_sp) if np.isclose(temp, 1.0): reference_temperature_index.append(i_sp) densities.append(dens) temperatures.append(temp) masses.append(mass) if not found_electron: # Calculate electron density from quasineutrality for adiabatic electrons ne = 0.0 for i_sp in range(self.data["box"]["n_spec"]): try: species_data = self.data["species"][i_sp] except TypeError: # Case when only 1 species species_data = self.data["species"] ne += densities[i_sp] * species_data["charge"] electron_density = ne # Get temperature from first species (handle single species case) try: electron_temperature = self.data["species"][0]["temp"] except TypeError: # Case when only 1 species electron_temperature = self.data["species"]["temp"] e_mass = (electron_mass / deuterium_mass).m found_electron = True magnetic_axis_radius = None minor_radius = self.data["geometry"].get("minor_r", 0.0) major_radius = self.data["geometry"]["major_r"] rgeo_rmaj = 1.0 raxis_rmaj = None geometry_type = self.data["geometry"].get("magn_geometry", "miller") if geometry_type in ["tracer_efit", "gene"]: major_radius = 0.0 minor_radius = 0.0 raxis_rmaj, magnetic_axis_radius, rgeo_rmaj = self._get_rgeo_rmaj() super()._set_up_normalisation( default_references=default_references, gk_code=self.code_name.lower(), electron_density=electron_density, electron_temperature=electron_temperature, e_mass=e_mass, electron_index=electron_index, found_electron=found_electron, densities=densities, temperatures=temperatures, reference_density_index=reference_density_index, reference_temperature_index=reference_temperature_index, major_radius=major_radius, rgeo_rmaj=rgeo_rmaj, minor_radius=minor_radius, raxis_rmaj=raxis_rmaj, ) def _get_rgeo_rmaj(self): if hasattr(self, "original_filename"): geometry_type = self.data["geometry"].get("magn_geometry", "miller") # Only for Tracer EFIT if geometry_type not in ["tracer_efit", "gene"]: return 1.0, None, 1.0 geometry_dict = self.get_gene_geometry() if geometry_dict: rgeo_rmaj = geometry_dict["rgeo_rmaj"] magnetic_axis_radius = geometry_dict["Lref"] raxis_rmaj = 1.0 / geometry_dict["Rmaj"] else: rgeo_rmaj = 1.0 raxis_rmaj = 1.0 magnetic_axis_radius = None return raxis_rmaj, magnetic_axis_radius, rgeo_rmaj else: return 1.0, None, 1.0
[docs] def set( self, local_geometry: LocalGeometry, local_species: LocalSpecies, numerics: Numerics, local_norm: Normalisation = None, template_file: Optional[PathLike] = None, code_normalisation: Optional[str] = None, **kwargs, ): """ Set self.data using LocalGeometry, LocalSpecies, and Numerics. These may be obtained via another GKInput file, or from Equilibrium/Kinetics objects. """ # If self.data is not already populated, fill in defaults from a given # template file. If this is not provided by the user, fall back to the # default. if self.data is None: if template_file is None: template_file = gk_templates["GENE"] self.read_from_file(template_file) if local_norm is None: local_norm = Normalisation("set") # TODO Find way to get norm_convention = pyrokinetics if we find minor_radius as lref if code_normalisation is None: if np.isclose(self.data["geometry"]["minor_r"], 1.0): code_normalisation = "pyrokinetics" elif np.isclose(self.data["geometry"]["major_R"], 1.0): code_normalisation = "gene" convention = getattr(local_norm, code_normalisation) # Geometry data if isinstance(local_geometry, LocalGeometryMillerTurnbull): eq_type = "MillerTurnbull" elif isinstance(local_geometry, LocalGeometryMiller): eq_type = "Miller" elif isinstance(local_geometry, LocalGeometryMXH): eq_type = "MXH" elif isinstance(local_geometry, LocalGeometryFourierGENE): eq_type = "FourierGENE" else: raise NotImplementedError( f"Writing LocalGeometry type {local_geometry.__class__.__name__} " "for GENE not yet supported" ) if eq_type == "MXH": self.data["geometry"]["magn_geometry"] = "miller_mxh" elif eq_type == "FourierGENE": self.data["geometry"]["magn_geometry"] = "miller_general" else: self.data["geometry"]["magn_geometry"] = "miller" if eq_type == "MillerTurnbull": for pyro_key, ( gene_param, gene_key, ) in self.pyro_gene_miller_turnbull.items(): self.data[gene_param][gene_key] = local_geometry[pyro_key] elif eq_type == "MXH": for pyro_key, ( gene_param, gene_key, ) in self.pyro_gene_mxh.items(): self.data[gene_param][gene_key] = local_geometry[pyro_key] # GENE uses rho * dcN_dr self.data[gene_param]["cNdr_m"] = [ dcndr * local_geometry.rho for dcndr in self.data[gene_param]["cNdr_m"] ] self.data[gene_param]["sNdr_m"] = [ dsndr * local_geometry.rho for dsndr in self.data[gene_param]["sNdr_m"] ] # GENE s_delta/s_zeta take precedence over sNdr self.data[gene_param]["delta"] = np.sin(local_geometry.sn[1]) self.data[gene_param]["zeta"] = -local_geometry.sn[2] self.data[gene_param]["s_delta"] = ( local_geometry.dsndr[1] * local_geometry.rho ) self.data[gene_param]["s_zeta"] = ( -local_geometry.dsndr[2] * local_geometry.rho ) elif eq_type == "FourierGENE": for pyro_key, ( gene_param, gene_key, ) in self.pyro_gene_fourier.items(): self.data[gene_param][gene_key] = local_geometry[pyro_key] elif eq_type == "Miller": for pyro_key, (gene_param, gene_key) in self.pyro_gene_miller.items(): self.data[gene_param][gene_key] = local_geometry[pyro_key] # Need to remove any MillerTurnbull key if they exist for turnbull_key in self.pyro_gene_miller_turnbull.keys(): if ( turnbull_key not in self.pyro_gene_miller.keys() and turnbull_key in self.data["geometry"].keys() ): self.data["geometry"].pop(turnbull_key) self.data["geometry"]["amhd"] = ( -(local_geometry.q**2) * local_geometry.Rmaj * local_geometry.beta_prime ) self.data["geometry"]["dpdx_pm"] = -local_geometry.beta_prime self.data["geometry"]["trpeps"] = local_geometry.rho / local_geometry.Rmaj if code_normalisation == "pyrokinetics": self.data["geometry"]["minor_r"] = 1.0 elif code_normalisation == "gene": if local_geometry.a_minor is not None: self.data["geometry"]["minor_r"] = local_geometry.a_minor else: if "minor_r" in self.data["geometry"].keys(): self.data["geometry"].pop("minor_r") self.data["geometry"]["major_r"] = local_geometry.Rmaj self.data["geometry"]["major_z"] = local_geometry.Z0 # GENE defines whether clockwise/ pyro defines whether counter-clockwise - need to flip sign self.data["geometry"]["sign_Ip_CW"] = -1 * local_geometry.ip_ccw self.data["geometry"]["sign_Bt_CW"] = -1 * local_geometry.bt_ccw # Kinetic data n_species = local_species.nspec self.data["box"]["n_spec"] = n_species stored_species = len(self.data["species"]) extra_species = stored_species - n_species if extra_species > 0: for i in range(extra_species): del self.data["species"][-1] iIon = 1 for iSp, name in enumerate(local_species.names): try: single_species = self.data["species"][iSp] except IndexError: if f90nml.__version__ < "1.4": self.data["species"].append(copy.copy(self.data["species"][0])) single_species = self.data["species"][iSp] else: # FIXME f90nml v1.4+ uses 'Cogroups' for Namelist groups sharing # a common key. As of version 1.4.2, Cogroup derives from # 'list', but does not implement all methods, so confusingly it # allows calls to 'append', but then doesn't do anything! # Currently working around this in a horribly inefficient # manner, by deconstructing the entire Namelist to a dict, using # secret cogroup names directly, and rebundling the Namelist. # There must be a better way! d = self.data.todict() copied = copy.deepcopy(d["_grp_species_0"]) copied["name"] = None d[f"_grp_species_{iSp}"] = copied self.data = f90nml.Namelist(d) single_species = self.data["species"][iSp] if name == "electron": single_species["name"] = "electron" else: single_species["name"] = f"ion{iIon}" iIon += 1 # TODO Currently forcing GENE to use default pyro. Should check local_norm first for key, val in self.pyro_gene_species.items(): single_species[val] = local_species[name][key] if "external_contr" not in self.data.keys(): self.data["external_contr"] = f90nml.Namelist( { "omega0_tor": local_species.electron.omega0, "pfsrate": -local_species.electron.domega_drho * local_geometry.rho / local_geometry.q, } ) else: self.data["external_contr"]["omega0_tor"] = local_species.electron.omega0 self.data["external_contr"]["pfsrate"] = ( -local_species.electron.domega_drho * local_geometry.rho / local_geometry.q ) self.data["general"]["zeff"] = local_species.zeff beta_ref = convention.beta if local_norm else 0.0 beta = numerics.beta if numerics.beta is not None else beta_ref # GENE beta is ALWAYS ne*Tref/B0^2 regardless of existing nref original_convention = getattr(local_norm, self.norm_convention) ne = (1 * original_convention.nref).to(local_norm.gene) self.data["general"]["beta"] = beta * ne self.data["general"]["coll"] = local_species.electron.nu / ( 4 * np.sqrt(deuterium_mass / electron_mass) ) # Numerics if numerics.bpar and not numerics.apar: raise ValueError("Can't have bpar without apar in GENE") self.data["general"]["bpar"] = numerics.bpar # FIXME breaks a roundtrip when doing electrostatic simulations # FIXME can't really fix this due to GENE set up... if not numerics.apar: self.data["general"]["beta"] = 0.0 self.data["general"]["dt_max"] = numerics.delta_time self.data["general"]["simtimelim"] = numerics.max_time if numerics["nonlinear"]: # TODO Currently forces NL sims to have nperiod = 1 self.data["general"]["nonlinear"] = True self.data["box"]["nky0"] = numerics["nky"] self.data["box"]["nx0"] = numerics["nkx"] else: self.data["general"]["nonlinear"] = False self.data["box"]["nky0"] = numerics.nky self.data["box"]["kymin"] = ( numerics.ky * (1 * convention.bref / local_norm.gs2.bref).to_base_units() ) self.data["box"]["kx_center"] = ( -1 * numerics.theta0 * numerics.ky * local_geometry.shat ) if numerics.kx != 0.0: self.data["box"]["lx"] = 2 * pi / numerics.kx self.data["box"]["nz0"] = numerics.ntheta self.data["box"]["nv0"] = 2 * numerics.nenergy self.data["box"]["nw0"] = numerics.npitch if "external_contr" not in self.data.keys(): self.data["external_contr"] = f90nml.Namelist( {"exbrate": numerics.gamma_exb} ) else: self.data["external_contr"]["exbrate"] = numerics.gamma_exb if not local_norm: return try: (1 * convention.tref).to("keV") si_units = True except PyroContextError: si_units = False if si_units: if "units" not in self.data.keys(): self.data["units"] = f90nml.Namelist() self.data["units"]["Tref"] = (1 * convention.tref).to("keV").m self.data["units"]["nref"] = (1e-19 * convention.nref).to("meter**-3").m self.data["units"]["mref"] = (1 * convention.mref).to("proton_mass").m self.data["units"]["Bref"] = (1 * convention.bref).to("tesla").m self.data["units"]["Lref"] = (1 * convention.lref).to("meter").m self.data["units"]["omegatorref"] = ( local_species.electron.omega0.to(convention).to("radians/second").m ) for name, namelist in self.data.items(): self.data[name] = convert_dict(namelist, convention)
[docs] def get_ne_te_normalisation(self): adiabatic_electrons = True # Get electron temp and density to normalise input for i_sp in range(self.data["box"]["n_spec"]): try: gene_data = self.data["species"][i_sp] except TypeError: # Case when only 1 species gene_data = self.data["species"] if gene_data["charge"] == -1: ne = gene_data["dens"] Te = gene_data["temp"] adiabatic_electrons = False if adiabatic_electrons: ne = 0.0 for i_sp in range(self.data["box"]["n_spec"]): try: gene_data = self.data["species"][i_sp] except TypeError: # Case when only 1 species gene_data = self.data["species"] ne += gene_data["dens"] * gene_data["charge"] # Get temperature from first species (handle single species case) try: Te = self.data["species"][0]["temp"] except TypeError: # Case when only 1 species Te = self.data["species"]["temp"] return ne, Te
[docs] class GKOutputReaderGENE(FileReader, file_type="GENE", reads=GKOutput): fields = ["phi", "apar", "bpar"]
[docs] def read_from_file( self, filename: PathLike, norm: Normalisation, output_convention: str = "pyrokinetics", load_fields=True, load_fluxes=True, load_moments=False, downsample: Dict[str, Any] = {}, ) -> GKOutput: raw_data, gk_input, input_str = self._get_raw_data(filename, norm) fmt_downsample = {} for key, value in downsample.items(): if isinstance(value, int): fmt_downsample[key + "_idx"] = slice(value, value + 1) else: fmt_downsample[key + "_idx"] = value downsample = fmt_downsample coords = self._get_coords(raw_data, gk_input, downsample) fields = ( self._get_fields(raw_data, gk_input, coords, downsample) if load_fields else None ) fluxes = ( self._get_fluxes(raw_data, gk_input, coords, downsample) if load_fluxes else None ) moments = ( self._get_moments(raw_data, gk_input, coords, downsample) if load_moments else None ) if coords["linear"] and not fields: eigenvalues = self._get_eigenvalues(raw_data, coords) else: # Rely on gk_output to generate eigenvalues eigenvalues = None # Assign units and return GKOutput field_dims = ("theta", "kx", "ky", "time") flux_dims = ("field", "species", "time") moment_dims = ("theta", "kx", "species", "ky", "time") # Assign units and return GKOutput convention = getattr(norm, gk_input.norm_convention) norm.default_convention = output_convention.lower() return GKOutput( coords=Coords( time=coords["time"], kx=coords["kx"], ky=coords["ky"], theta=coords["theta"], pitch=coords["pitch"], energy=coords["energy"], species=coords["species"], field=coords["field"], ).with_units(convention), norm=norm, fields=( Fields(**fields, dims=field_dims).with_units(convention) if fields else None ), fluxes=( Fluxes(**fluxes, dims=flux_dims).with_units(convention) if fluxes else None ), moments=( Moments(**moments, dims=moment_dims).with_units(convention) if moments else None ), eigenvalues=( Eigenvalues(**eigenvalues).with_units(convention) if eigenvalues else None ), linear=coords["linear"], gk_code="GENE", input_file=input_str, normalise_flux_moment=True, output_convention=output_convention, jacobian=coords["jacobian"], )
@staticmethod def _get_gene_files(filename: PathLike) -> Dict[str, Path]: """ Given a directory name, looks for the files filename/parameters_0000, filename/field_0000 and filename/nrg_0000. If instead given any of the files parameters_####, field_#### or nrg_####, looks up the rest of the files in the same directory. """ filename = Path(filename) prefixes = ["parameters", "field", "nrg", "omega"] if filename.is_dir(): # If given a dir name, looks for dir/parameters_0000 dirname = filename dat_matches = np.any( [Path(filename / f"{p}.dat").is_file() for p in prefixes] ) if dat_matches: suffix = "dat" delimiter = "." else: suffix = "0000" delimiter = "_" else: # If given a file, searches for all similar GENE files in that file's dir dirname = filename.parent # Ensure provided file is a GENE file (fr"..." means raw format str) matches = [re.search(rf"^{p}_\d{{4}}$", filename.name) for p in prefixes] if not np.any(matches): raise RuntimeError( f"GKOutputReaderGENE: The provided file {filename} is not a GENE " "output file." ) suffix = filename.name.split("_")[1] delimiter = "_" # Get all files in the same dir files = { prefix: dirname / f"{prefix}{delimiter}{suffix}" for prefix in prefixes if (dirname / f"{prefix}{delimiter}{suffix}").exists() } if not files: raise RuntimeError( "GKOutputReaderGENE: Could not find GENE output files in the " f"directory '{dirname}'." ) if "parameters" not in files: raise RuntimeError( "GKOutputReaderGENE: Could not find GENE output file 'parameters_" f"{suffix}' when provided with the file/directory '{filename}'." ) # If binary field file absent, adds .h5 field file, # if present, to 'files' if "field" not in files: if (dirname / f"field{delimiter}{suffix}.h5").exists(): files.update({"field": dirname / f"field{delimiter}{suffix}.h5"}) if "nrg" not in files: if (dirname / f"nrg{delimiter}{suffix}.h5").exists(): files.update({"nrg": dirname / f"nrg{delimiter}{suffix}.h5"}) return files @staticmethod def _get_gene_mom_geo_files( filename: PathLike, files: Dict, species_names, geometry_type ) -> Dict[str, Path]: """ Given a directory name, looks for the files filename/parameters_0000, filename/field_0000 and filename/nrg_0000. If instead given any of the files parameters_####, field_#### or nrg_####, looks up the rest of the files in the same directory. """ filename = Path(filename) prefixes = [f"mom_{species_name}" for species_name in species_names] + [ geometry_type ] if filename.is_dir(): # If given a dir name, looks for dir/parameters_0000 dirname = filename dat_matches = np.any( [Path(filename / f"{p}.dat").is_file() for p in prefixes] ) if dat_matches: suffix = "dat" delimiter = "." else: suffix = "0000" delimiter = "_" else: # If given a file, searches for all similar GENE files in that file's dir dirname = filename.parent suffix = filename.name.split("_")[-1] delimiter = "_" # Get all files in the same dir for prefix in prefixes: if (dirname / f"{prefix}{delimiter}{suffix}").exists(): files[prefix] = dirname / f"{prefix}{delimiter}{suffix}" return files
[docs] def verify_file_type(self, filename: PathLike): self._get_gene_files(filename)
[docs] @staticmethod def infer_path_from_input_file(filename: PathLike) -> Path: """ Given path to input file, guess at the path for associated output files. """ # If the input file is of the form name_####, get the numbered part and # search for 'parameters_####' in the run directory. If not, simply return # the directory. filename = Path(filename) num_part_regex = re.compile(r"(\d{4})") num_part_match = num_part_regex.search(filename.name) if num_part_match is None: return Path(filename).parent else: return Path(filename).parent / f"parameters_{num_part_match[0]}" pass
@classmethod def _get_raw_data( cls, filename: PathLike, norm: Normalisation = None ) -> Tuple[Dict[str, Any], GKInputGENE, str]: files = cls._get_gene_files(filename) # Read parameters_#### as GKInputGENE and into plain string with open(files["parameters"], "r") as f: input_str = f.read() # TODO remove hacky fix from GENE () issue input_str = input_str.replace("(", "") input_str = input_str.replace(")", "") gk_input = GKInputGENE() gk_input.read_str(input_str) gk_input.original_filename = files["parameters"] if gk_input._convention_dict and norm: gk_input.convention = norm.gene_bespoke # Handle both single species (namelist) and multiple species (list of namelists) n_spec = gk_input.data["box"]["n_spec"] if n_spec == 1: species_names = [gk_input.data["species"]["name"]] else: species_names = [species["name"] for species in gk_input.data["species"]] geometry_type = gk_input.data["geometry"]["magn_geometry"] files = cls._get_gene_mom_geo_files( filename, files, species_names, geometry_type ) # Defer processing field and flux data until their respective functions # Simply return files in place of raw data return files, gk_input, input_str @staticmethod def _get_coords( raw_data: Dict[str, Any], gk_input: GKInputGENE, downsample: Dict[str, Any] = {}, ) -> Dict[str, Any]: """ Sets coords and attrs of a Pyrokinetics dataset from a GENE parameters file. Args: raw_data (Dict[str,Any]): Dict containing GENE output. Ignored. gk_input (GKInputGENE): Processed GENE input file. Returns: xr.Dataset: Dataset with coords and attrs set, but not data_vars """ nml = gk_input.data # The last time step is not always written, but depends on # whatever condition is met first between simtimelim and timelim species = gk_input.get_local_species_names() if ".h5" not in str(raw_data["nrg"]): with open(raw_data["nrg"], "r") as f: full_data = f.readlines() ntime = len(full_data) // (len(species) + 1) lasttime = float(full_data[-(len(species) + 1)]) if ( ntime * nml["in_out"]["istep_nrg"] % nml["in_out"]["istep_field"] == 0 ): add_on = 0 else: add_on = 1 else: with h5py.File(raw_data["nrg"], "r") as file: key = list(file.keys())[1] time = file[f"{key}/time"][:] ntime = len(time) lasttime = time[-1] if nml["in_out"]["istep_nrg"] == nml["in_out"]["istep_field"]: add_on = 0 else: add_on = 1 ntime = ( int(ntime * nml["in_out"]["istep_nrg"] / nml["in_out"]["istep_field"]) ) + add_on ntime = ntime # Set time to index for now, gets overwritten by field data time = np.linspace(0, ntime - 1, ntime) nfield = nml["info"]["n_fields"] field = ["phi", "apar", "bpar"][:nfield] nky = nml["box"]["nky0"] nkx = nml["box"]["nx0"] nz = nml["box"]["nz0"] z = np.linspace(-pi, pi, nz, endpoint=False) ntheta = nz local_geometry = gk_input.get_local_geometry() metric_terms = MetricTerms(local_geometry, ntheta=nz * 4) z_full = metric_terms.alpha / local_geometry.q theta = np.interp(z, z_full, metric_terms.regulartheta) nenergy = nml["box"]["nv0"] energy = np.linspace(-1, 1, nenergy) npitch = nml["box"]["nw0"] pitch = np.linspace(-1, 1, npitch) fluxes = ["particle", "heat", "momentum"] moments = ["density", "temperature", "velocity"] if gk_input.is_linear(): # Set up ballooning angle single_theta_loop = theta single_ntheta_loop = ntheta nkx = nkx - 1 + nkx % 2 ntheta = ntheta * nkx theta = np.empty(ntheta) start = 0 for i in range(nkx): pi_segment = i - (nkx - 1) // 2 theta[start : start + single_ntheta_loop] = ( single_theta_loop + pi_segment * 2 * pi ) start += single_ntheta_loop ky = [nml["box"]["kymin"]] kx = [0.0] nkx = 1 # TODO should we not also set nky=1? else: kymin = nml["box"]["kymin"] ky = np.linspace(0, kymin * (nky - 1), nky) lx = nml["box"]["lx"] dkx = 2 * np.pi / lx kx = np.empty(nkx) for i in range(-1, nkx - 1): if i < (nkx / 2 + 1): kx[i] = i * dkx else: kx[i] = (i - nkx) * dkx kx = np.roll(np.fft.fftshift(kx), -1) full_ky = ky full_kx = kx full_theta = theta full_time = time if downsample: ky = ky[downsample.get("ky_idx", slice(None))] kx = kx[downsample.get("kx_idx", slice(None))] theta = theta[downsample.get("theta_idx", slice(None))] time = time[downsample.get("time_idx", slice(None))] theta_mod = np.mod(theta, 2 * np.pi) Jacobian = np.interp( theta_mod, metric_terms.regulartheta, metric_terms.Jacobian, period=2 * np.pi, ) # Store grid data as xarray DataSet return { "time": time, "kx": kx, "ky": ky, "theta": theta, "full_time": full_time, "full_kx": full_kx, "full_ky": full_ky, "full_theta": full_theta, "energy": energy, "pitch": pitch, "moment": moments, "flux": fluxes, "field": field, "species": species, "linear": gk_input.is_linear(), "lasttime": lasttime, "jacobian": Jacobian, } @staticmethod def _get_fields( raw_data: Dict[str, Any], gk_input: GKInputGENE, coords: Dict[str, Any], downsample: Dict[str, Any], ) -> Dict[str, np.ndarray]: """ Sets 3D fields over time. The field coordinates should be (field, theta, kx, ky, time) """ if "field" not in raw_data: return {} # Time data stored as binary (int, double, int) # Check precision used in GENE if gk_input.data["info"]["PRECISION"] == "SINGLE": time_data_fmt = "=ifi" complex_size = 8 dtype = np.complex64 elif gk_input.data["info"]["PRECISION"] == "DOUBLE": time_data_fmt = "=idi" complex_size = 16 dtype = np.complex128 else: raise ValueError( f"Pyrokinetics can't handle cases when GENE precision is {gk_input.data['info']['PRECISION']}" ) time = [] time_data_size = struct.calcsize(time_data_fmt) int_size = 4 if downsample: kx_idx = downsample.get("kx_idx", None) ky_idx = downsample.get("ky_idx", None) theta_idx = downsample.get("theta_idx", None) time_idx = downsample.get("time_idx", None) else: kx_idx = None ky_idx = None theta_idx = None time_idx = None kx_idx = slice(None) if kx_idx is None else kx_idx ky_idx = slice(None) if ky_idx is None else ky_idx theta_idx = slice(None) if theta_idx is None else theta_idx time_idx = slice(None) if time_idx is None else time_idx nx = gk_input.data["box"]["nx0"] nz = gk_input.data["box"]["nz0"] nkx = len(coords["kx"]) nky = len(coords["ky"]) ntheta = len(coords["theta"]) ntime = len(coords["time"]) nfield = len(coords["field"]) full_nky = len(coords["full_ky"]) full_ntime = len(coords["full_time"]) # Account for kx data being ifft shifted kx_shifted = list(range(*kx_idx.indices((nx)))) kx_unshifted = [(i + 1 + nx // 2) % nx for i in kx_shifted] field_size = nx * nz * full_nky * complex_size time_block_size = time_data_size + nfield * (2 * int_size + field_size) if gk_input.is_linear(): sliced_field = np.empty((nfield, nx, nky, nz, ntime), dtype=dtype) else: sliced_field = np.empty((nfield, nkx, nky, ntheta, ntime), dtype=dtype) fields = np.empty((nfield, nkx, nky, ntheta, ntime), dtype=dtype) # Read binary file if present if ".h5" not in str(raw_data["field"]): with open(raw_data["field"], "rb") as f: for it_out, it in enumerate(range(*time_idx.indices(full_ntime))): # Seek to requested time block f.seek(it * time_block_size) # Read time header time_value = struct.unpack(time_data_fmt, f.read(time_data_size))[1] time.append(time_value) for ifield in range(nfield): f.seek(int_size, 1) # record marker field_offset = f.tell() mm = np.memmap( raw_data["field"], dtype=dtype, mode="r", offset=field_offset, shape=(nx, full_nky, nz), order="F", ) # Slice spatially sliced_field[ ifield, :, :, :, it_out, ] = mm[kx_unshifted, ky_idx, theta_idx] # Skip over field block + trailing marker f.seek(field_size + int_size, 1) # Read .h5 file if binary file absent else: h5_field_subgroup_names = ["phi", "A_par", "B_par"] fields = np.empty( (nfield, nx, nky, nz, ntime), dtype=complex, ) with h5py.File(raw_data["field"], "r") as file: # Read in time data time.extend(list(file.get("field/time"))) for i_field in range(nfield): h5_subgroup = "field/" + h5_field_subgroup_names[i_field] + "/" h5_dataset_names = list(file[h5_subgroup].keys()) for i_time in range(ntime): h5_dataset = h5_subgroup + h5_dataset_names[i_time] raw_field = np.array(file.get(h5_dataset)) raw_field = np.array( raw_field["real"] + raw_field["imaginary"] * 1j, dtype="complex128", ) sliced_field[i_field, :, :, :, i_time] = np.swapaxes( raw_field, 0, 2 ) # Match pyro convention for ion/electron direction sliced_field = np.conjugate(sliced_field) if not gk_input.is_linear(): nl_shape = (nfield, nkx, nky, ntheta, ntime) fields = sliced_field.reshape(nl_shape, order="F") # Convert from kx to ballooning space else: try: n0_global = gk_input.data["box"]["n0_global"] q0 = gk_input.data["geometry"]["q0"] phase_fac = -np.exp(2 * np.pi * 1j * n0_global * q0) except KeyError: phase_fac = -1 i_ball = 0 nx_actual = nx - 1 + nx % 2 for i_conn in range(0, nx_actual): fields[:, 0, :, i_ball : i_ball + nz, :] = ( sliced_field[:, i_conn, :, :, :] * (phase_fac) ** i_conn ) i_ball += nz # ================================================= # Overwrite 'time' coordinate as determined in _init_dataset coords["time"] = time # Original method coords: (field, kx, ky, theta, time) # New coords: (field, theta, kx, ky, time) fields = fields.transpose(0, 3, 1, 2, 4) result = {} for ifield, field_name in enumerate(coords["field"]): result[field_name] = fields[ifield, ...] return result @staticmethod def _get_moments( raw_data: Dict[str, Any], gk_input: GKInputGENE, coords: Dict[str, Any], downsample: Dict[str, Any], ) -> Dict[str, np.ndarray]: """ Sets 3D moments over time. The moment coordinates should be (moment, theta, kx, species, ky, time) """ if f"mom_{gk_input.data['species'][0]['name']}" not in raw_data: return {} # Time data stored as binary (int, double, int) # Check precision used in GENE if gk_input.data["info"]["PRECISION"] == "SINGLE": time_data_fmt = "=ifi" complex_size = 8 dtype = np.complex64 elif gk_input.data["info"]["PRECISION"] == "DOUBLE": time_data_fmt = "=idi" complex_size = 16 dtype = np.complex128 else: raise ValueError( f"Pyrokinetics can't handle cases when GENE precision is {gk_input.data['info']['PRECISION']}" ) time = [] time_data_size = struct.calcsize(time_data_fmt) int_size = 4 if downsample: kx_idx = downsample.get("kx_idx", None) ky_idx = downsample.get("ky_idx", None) theta_idx = downsample.get("theta_idx", None) time_idx = downsample.get("time_idx", None) else: kx_idx = None ky_idx = None theta_idx = None time_idx = None kx_idx = slice(None) if kx_idx is None else kx_idx ky_idx = slice(None) if ky_idx is None else ky_idx theta_idx = slice(None) if theta_idx is None else theta_idx time_idx = slice(None) if time_idx is None else time_idx nx = gk_input.data["box"]["nx0"] nz = gk_input.data["box"]["nz0"] nkx = len(coords["kx"]) nky = len(coords["ky"]) ntheta = len(coords["theta"]) ntime = len(coords["time"]) full_nky = len(coords["full_ky"]) full_ntime = len(coords["full_time"]) # Account for kx data being ifft shifted kx_shifted = list(range(*kx_idx.indices((nx)))) kx_unshifted = [(i + 1 + nx // 2) % nx for i in kx_shifted] species = [species["name"] for species in gk_input.data["species"]] nspecies = len(species) nmoment_output = 6 if len(coords["field"]) > 2: nmoment_output += 3 moment_size = nx * nz * full_nky * complex_size time_block_size = time_data_size + nmoment_output * (2 * int_size + moment_size) if gk_input.is_linear(): sliced_moment = np.empty( (nspecies, nmoment_output, nx, nky, nz, ntime), dtype=dtype ) else: sliced_moment = np.empty( (nspecies, nmoment_output, nkx, nky, ntheta, ntime), dtype=dtype ) moments = np.empty( (nspecies, nmoment_output, nkx, nky, ntheta, ntime), dtype=dtype ) for i_sp, spec in enumerate(species): # Read binary file if present if ".h5" not in str(raw_data[f"mom_{spec}"]): with open(raw_data[f"mom_{spec}"], "rb") as f: for it_out, it in enumerate(range(*time_idx.indices(full_ntime))): # Seek to requested time block f.seek(it * time_block_size) # Read time header time_value = struct.unpack( time_data_fmt, f.read(time_data_size) )[1] if i_sp == 0: time.append(time_value) for i_moment in range(nmoment_output): f.seek(int_size, 1) moment_offset = f.tell() mm = np.memmap( raw_data[f"mom_{spec}"], dtype=dtype, mode="r", offset=moment_offset, shape=(nx, full_nky, nz), order="F", ) sliced_moment[i_sp, i_moment, :, :, :, it_out] = mm[ kx_unshifted, ky_idx, theta_idx ] # Skip over field block + trailing marker f.seek(moment_size + int_size, 1) # Read .h5 file if binary file absent else: raise NotImplementedError("Moments from HDf5 not yet supported") # Match pyro convention for ion/electron direction sliced_moment = np.conjugate(sliced_moment) if not gk_input.is_linear(): nl_shape = (nspecies, nmoment_output, nkx, nky, ntheta, ntime) moments = sliced_moment.reshape(nl_shape, order="F") # Convert from kx to ballooning space else: try: n0_global = gk_input.data["box"]["n0_global"] q0 = gk_input.data["geometry"]["q0"] phase_fac = -np.exp(-2 * np.pi * 1j * n0_global * q0) except KeyError: phase_fac = -1 i_ball = 0 nx_actual = nx - 1 + nx % 2 for i_conn in range(0, nx_actual): moments[:, :, 0, :, i_ball : i_ball + nz, :] = ( sliced_moment[:, :, i_conn, :, :, :] * (phase_fac) ** i_conn ) i_ball += nz # ================================================= # Overwrite 'time' coordinate as determined in _init_dataset coords["time"] = time # Original method coords: (species, moment, kx, ky, theta, time) # New coords: (moment, theta, kx, species, ky, time) moments = moments.transpose(1, 4, 2, 0, 3, 5) result = {} result["density"] = moments[0, ...] result["temperature"] = moments[1, ...] / 3 + moments[2, ...] * 2 / 3 result["velocity"] = moments[5, ...] return result @staticmethod def _get_fluxes( raw_data: Dict[str, Any], gk_input: GKInputGENE, coords: Dict[str, Any], downsample: Dict[str, Any], ) -> Dict[str, np.ndarray]: """ Set flux data over time. The flux coordinates should be (species, flux, field, ky, time) """ nml = gk_input.data # ky data not available in the nrg file so no ky coords here coord_names = ["species", "flux", "field", "full_time"] shape = [len(coords[coord_name]) for coord_name in coord_names] fluxes = np.empty(shape) nfield = len(coords["field"]) nspecies = len(coords["species"]) ntime = len(coords["time"]) full_ntime = len(coords["full_time"]) if downsample: time_idx = downsample.get("time_idx", None) else: time_idx = None time_idx = slice(None) if time_idx is None else time_idx if "nrg" not in raw_data: logging.warning("Flux data not found, setting all fluxes to zero") fluxes[...] = 0 result = {"fluxes": fluxes} return result flux_istep = nml["in_out"]["istep_nrg"] field_istep = nml["in_out"]["istep_field"] ntime_flux = nml["info"]["steps"][0] // flux_istep + 1 if nml["info"]["steps"][0] % flux_istep > 0: ntime_flux += 1 if flux_istep < field_istep: time_skip = int(field_istep / flux_istep) - 1 else: time_skip = 0 if ".h5" not in str(raw_data["nrg"]): with open(raw_data["nrg"], "r") as csv_file: nrg_data = csv.reader(csv_file, delimiter=" ", skipinitialspace=True) if nfield == 3: logging.warning( "GENE combines Apar and Bpar particle and heat fluxes, setting Bpar ones to zero" ) fluxes[:, :, 2, :] = 0.0 field_size = 2 else: field_size = nfield for i_time in range(full_ntime): time = next(nrg_data) # noqa coords["full_time"][i_time] = float(time[0]) for i_species in range(nspecies): nrg_line = np.array(next(nrg_data), dtype=float) # Particle fluxes[i_species, 0, :field_size, i_time] = nrg_line[ 4 : 4 + field_size, ] # Heat fluxes[i_species, 1, :field_size, i_time] = nrg_line[ 6 : 6 + field_size, ] # Momentum if len(nrg_line) < 11: # The default fluxes saved by GENE are the radial fluxes of parallel momentum # which are *not* those that enter the transport equations. Should we warn # the user here? fluxes[i_species, 2, :field_size, i_time] = nrg_line[ 8 : 8 + field_size, ] else: # Setting `tor_ang_mom_flux = T` in the GENE input file will compute the radial # fluxes of toroidal angular momentum, which we here load preferentially field_columns = { 0: [10, 11], # Phi 1: [12, 13, 17], # Apar 2: [15, 16], # Bpar } for i_field, cols in field_columns.items(): if ( i_field < field_size ): # Only process fields that exist if len(nrg_line) >= max(cols) + 1: fluxes[i_species, 2, i_field, i_time] = ( nrg_line[cols].sum() ) else: fluxes[i_species, 2, i_field, i_time] = 0.0 # Skip time/data values in field print out is less if i_time < ntime - 1: for skip_t in range(time_skip): for skip_s in range(nspecies + 1): next(nrg_data) else: with h5py.File(raw_data["nrg"], "r") as file: spec_keys = list(file.keys())[1:] suffixes = ["es", "em"] prefixes = ["Gamma", "Q", "P"] if len(coords["time"]) != len( file[spec_keys[0]]["time"][:: time_skip + 1] ): final_append = True else: final_append = False if nfield == 3: logging.warning( "GENE combines Apar and Bpar particle and heat fluxes, setting Bpar ones to zero" ) fluxes[:, :, 2, :] = 0.0 if any( "P_t_par_es" in group for group in file.values() if hasattr(group, "__contains__") ): # Setting `tor_ang_mom_flux = T` in the GENE input file will compute the radial # fluxes of toroidal angular momentum, which we here load preferentially prefixes[-1] = "P_t" suffixes_momentum = { 0: ["par_es", "per_es"], # Phi 1: ["paremA", "peremA"], # Apar 2: ["paremB", "peremB"], # Bpar } time_nrg = file[spec_keys[0]]["time"] for i_species, spec_key in enumerate(spec_keys): species_data = file[spec_key] for i_field in range(nfield): for i_flux in range(len(coords["flux"])): if i_flux == 2 and prefixes[-1] == "P_t": # Sum over contributions flux_data = sum( ( species_data[f"{prefixes[i_flux]}_{suffix}"][:] for suffix in suffixes_momentum[i_field] if f"{prefixes[i_flux]}_{suffix}" in species_data ), np.zeros(len(time_nrg)), ) # Add Maxwell stress contribution if "PtfieldAV2" in species_data and i_field == 1: flux_data += species_data["PtfieldAV2"][:] else: flux_data = ( species_data.get( f"{prefixes[i_flux]}_{suffixes[i_field]}", np.zeros(len(time_nrg)), ) if i_field < len(suffixes) else np.zeros(len(time_nrg)) ) if final_append: fluxes[i_species, i_flux, i_field, :-1] = flux_data[ :: time_skip + 1 ] fluxes[i_species, i_flux, i_field, -1] = flux_data[-1] else: fluxes[i_species, i_flux, i_field, :] = flux_data[ :: time_skip + 1 ] results = {} fluxes = fluxes.transpose(1, 2, 0, 3) fluxes = fluxes[..., time_idx] coords["time"] = coords["full_time"][time_idx] if gk_input.data["geometry"].get("norm_flux_projection", False): geometry_type = gk_input.data["geometry"]["magn_geometry"] geometry_filename = raw_data[geometry_type] geometry_nml = f90nml.read(geometry_filename) skiprows = 19 if "edge_opt" in geometry_nml["parameters"].keys(): skiprows += 1 geometry_data = np.loadtxt(geometry_filename, skiprows=skiprows) grho = np.sqrt(geometry_data[:, 0]) jacob = geometry_data[:, -6] flux_norm = np.sum(jacob) / np.sum(jacob * grho) else: flux_norm = 1.0 if gk_input.is_linear(): flux_norm *= 2 * np.pi**1.5 for iflux, flux in enumerate(coords["flux"]): results[flux] = fluxes[iflux, ...] / flux_norm return results @staticmethod def _get_eigenvalues( raw_data: Dict[str, Any], coords: Dict ) -> Dict[str, np.ndarray]: """ Parameters ---------- raw_data coords Returns ------- Dict of eigenvalues with coords (kx, ky, time) Only final time is output so we set that to all the times """ nky = len(coords["ky"]) nkx = len(coords["kx"]) ntime = len(coords["time"]) mode_frequency = np.empty((nkx, nky, ntime)) growth_rate = np.empty((nkx, nky, ntime)) with open(raw_data["omega"], "r") as csv_file: omega_data = csv.reader(csv_file, delimiter=" ", skipinitialspace=True) for iky, line in enumerate(omega_data): ky, growth, frequency = line mode_frequency[:, iky, :] = float(frequency) growth_rate[:, iky, :] = float(growth) results = {"growth_rate": growth_rate, "mode_frequency": mode_frequency} return results