Source code for pyrokinetics.gk_code.gk_output

from __future__ import annotations

import dataclasses
import warnings
from abc import abstractmethod
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Generator,
    Iterable,
    List,
    NoReturn,
    Optional,
    Tuple,
)

import numpy as np
import pint
from numpy.typing import ArrayLike
from scipy.integrate import trapezoid

from ..dataset_wrapper import DatasetWrapper
from ..file_utils import ReadableFromFile
from ..normalisation import ConventionNormalisation, SimulationNormalisation
from ..typing import PathLike
from ..units import ureg as units

if TYPE_CHECKING:
    import xarray as xr


[docs] @dataclasses.dataclass class GKOutputArgs: """ Utility base dataclass used to pass quantities to ``GKOutput``. Derived classes include ``Coords``, ``Fields``, ``Fluxes``, etc. This class contains features such as automatic unit conversion and a dict-like interface to quantities. Derived classes should define an ``InitVar[Tuple[str, ...]]`` called ``dims``, which sets the dimensionality of each quantity, e.g. ``("kx", "ky", "time")``. This should be set in ``__post_init__``, along with a call to ``_set_and_check_dims``. This is to work around the pre-Python 3.10 dataclass issues regarding keyword arguments in base classes """ @property def names(self) -> Tuple[str, ...]: """Names of all quantities held by this dataclass""" return tuple(x.name for x in dataclasses.fields(self))
[docs] @abstractmethod def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: """Return the units for each quantity""" pass
def __getitem__(self, key: str) -> Any: """Look up quantities with dict-like inteface""" if key not in self.names: raise KeyError(key) return getattr(self, key) def __setitem__(self, key: str, val: Any) -> None: """ Set quantities with dict-like inteface. Warning: No validity checking is performed. """ if key not in self.names: raise KeyError(key) setattr(self, key, val) @property def coords(self) -> Tuple[str, ...]: """ Tuple containing the names of each supplied field (those that aren't ``None``). """ return tuple(k for k in self.names if self[k] is not None) def __iter__(self) -> Generator[str, None, None]: """Iterate over quantity names. Skips ``None`` quantities.""" return iter(self.coords)
[docs] def values(self) -> Generator[Any, None, None]: """Dict-like values iteration""" try: it = iter(self) while True: yield self[next(it)] except StopIteration: return
[docs] def items(self) -> Generator[Tuple[str, Any], None, None]: """Dict-like items iteration""" return zip(iter(self), self.values())
#: List of quantities that have normalised units, e.g. 'lref'. #: Should be overridden in derived classes. _has_normalised_units: ClassVar[Tuple[str, ...]] = () #: List of quantities that have physical units, e.g. 'radians'. #: Should be overridden in derived classes. _has_physical_units: ClassVar[Tuple[str, ...]] = ()
[docs] def with_units(self, c: ConventionNormalisation): """ Apply units to each quantity in turn and return a new ``Coords``. If units are already applied, renormalises according to the convention supplied. """ kwargs = {} for key, val in self.items(): if val is None: kwargs[key] = None continue if key in self._has_normalised_units: if hasattr(val, "units"): kwargs[key] = val.to(c) else: kwargs[key] = val * self.units(key, c) continue if key in self._has_physical_units: if hasattr(val, "units"): kwargs[key] = val.to(self.units(key, c)) else: kwargs[key] = val * self.units(key, c) continue # Pass everything else through kwargs[key] = val # Pass through the pseudo-field 'dims' if hasattr(self, "dims"): kwargs["dims"] = self.dims return self.__class__(**kwargs)
@property def shape(self) -> Tuple[int, ...]: """ Shape of quantities. Raises error if all are None. Should be overridden in ``Coords``, where this function makes no sense. """ for _, val in self.items(): if val is not None: return np.shape(val) raise ValueError("Fields contains no data") def _set_and_check_dims(self, dims) -> None: """Set ``dims``, perform checks on the values assigned.""" self.dims = dims for key, val in self.items(): if val is not None and np.ndim(val) != len(self.dims): raise ValueError(f"Quantity '{key}' has incorrect number of dims")
[docs] @dataclasses.dataclass class Coords(GKOutputArgs): """Utility dataclass type used to pass coordinates to ``GKOutput``""" #: 1D grid of radial wave-numbers used in the simulation #: Units of [rhoref ** -1] kx: ArrayLike #: 1D grid of bi-normal wave-numbers used in the simulation #: Units of [rhoref ** -1] ky: ArrayLike #: 1D grid of time of the simulation output #: Units of [lref / vref] time: ArrayLike #: List of species names in the simulation species: Iterable[str] #: 1D grid of theta used in the simulation #: Units of [radians] theta: Optional[ArrayLike] = None #: TODO document mode: Optional[ArrayLike] = None #: 1D grid of the energy in the simulation #: Units of [dimensionless] energy: Optional[ArrayLike] = None #: 1D grid of the pitch angle in the simulation #: Units of [dimensionless] pitch: Optional[ArrayLike] = None #: List of fields. Normally this information is obtained from the ``Fields`` class, #: but there are edge cases in which no ``Fields`` can be supplied but a fields #: coordinate must be defined. field: Optional[Iterable[str]] = None _has_physical_units: ClassVar[Tuple[str, ...]] = ( "theta", "mode", "energy", "pitch", ) _has_normalised_units: ClassVar[Tuple[str, ...]] = ("kx", "ky", "time")
[docs] def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: if name not in self.names: raise ValueError(f"The coord '{name}' is not recognised") if name in ("kx", "ky"): return c.rhoref**-1 if name == "time": return c.lref / c.vref if name == "theta": return units.radians return units.dimensionless
@property def shape(self) -> NoReturn: raise RuntimeError("Coords does not implement 'shape'")
[docs] @dataclasses.dataclass class Fields(GKOutputArgs): """Utility dataclass type used to pass field data to ``GKOutput``.""" #: :math:`\phi`: the electrostatic potential. #: Units of ``[tref * rhoref / (qref * lref)]`` phi: Optional[ArrayLike] = None #: :math:`A_\parallel`: Parallel component of the magnetic vector potential. #: Units of ``[bref * rhoref**2 / lref]``. apar: Optional[ArrayLike] = None #: :math:`B_\parallel`: Parallel component of the magnetic flux density. #: Units of ``[bref * rhoref / lref]``. bpar: Optional[ArrayLike] = None _has_normalised_units: ClassVar[Tuple[str, ...]] = ("phi", "apar", "bpar") #: The dimensionality of the fields. #: Each field should have the same number of dimensions. dims: dataclasses.InitVar[Tuple[str, ...]] = ("theta", "kx", "ky", "t")
[docs] def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: """Return units associated with each field for a given convention""" if name == "phi": return c.tref / c.qref * c.rhoref / c.lref elif name == "apar": return c.bref * c.rhoref * c.rhoref / c.lref elif name == "bpar": return c.bref * c.rhoref / c.lref else: raise ValueError(f"Field name '{name}' not recognised")
def __post_init__(self, dims): self._set_and_check_dims(dims)
[docs] @dataclasses.dataclass class Fluxes(GKOutputArgs): """Utility dataclass type used to pass fluxes to ``GKOutput``.""" #: Units of ``[nref * vref * (rhoref / lref)**2]``. particle: Optional[ArrayLike] = None #: Units of ``[nref * vref * tref * (rhoref / lref)**2]``. heat: Optional[ArrayLike] = None #: units of ``[nref * lref * tref * (rhoref / lref)**2]``. momentum: Optional[ArrayLike] = None _has_normalised_units: ClassVar[Tuple[str, ...]] = ("particle", "heat", "momentum") #: The dimensionality of the fluxes. #: Each array should have the same dimensionality. dims: dataclasses.InitVar[Tuple[str, ...]] = ("field", "species", "kx", "ky", "t")
[docs] def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: """Return units associated with each flux for a given convention""" if name == "particle": return c.nref * c.vref * (c.rhoref / c.lref) ** 2 elif name == "momentum": return c.nref * c.lref * c.tref * (c.rhoref / c.lref) ** 2 elif name == "heat": return c.nref * c.vref * c.tref * (c.rhoref / c.lref) ** 2 else: raise ValueError(f"Flux name '{name}' not recognised.")
def __post_init__(self, dims): self._set_and_check_dims(dims)
[docs] @dataclasses.dataclass class Moments(GKOutputArgs): """Utility dataclass type used to pass moments to ``GKOutput``.""" #: Units of ``[nref * rhoref / lref)]`` density: Optional[ArrayLike] = None #: Units of ``[tref * rhoref / lref]`` temperature: Optional[ArrayLike] = None #: Units of ``[vref * rhoref / lref]`` velocity: Optional[ArrayLike] = None _has_normalised_units: ClassVar[Tuple[str, ...]] = ( "density", "temperature", "velocity", ) #: The dimensionality of the moments. #: Each array should have the same dimensionality. dims: dataclasses.InitVar[Tuple[str, ...]] = ("theta", "kx", "species", "ky", "t")
[docs] def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: """Return units associated with each moment for a given convention""" if name == "density": return c.nref * c.rhoref / c.lref elif name == "temperature": return c.tref * c.rhoref / c.lref elif name == "velocity": return c.vref * c.rhoref / c.lref else: raise ValueError(f"Moment name '{name}' not recognised.")
def __post_init__(self, dims): self._set_and_check_dims(dims)
[docs] @dataclasses.dataclass class Eigenvalues(GKOutputArgs): """ Utility dataclass type used to pass eigenvalues to ``GKOutput``. Unlike the classes ``Fields``, ``Fluxes``, and ``Moments``, all entries to ``Eigenvalues`` are non-optional. """ #: Units of ``[vref / lref]``. growth_rate: ArrayLike #: Units of ``[vref / lref]``. mode_frequency: ArrayLike _has_normalised_units: ClassVar[Tuple[str, ...]] = ("growth_rate", "mode_frequency") #: The dimensionality of the eigenvalues #: Each array should have the same dimensionality. dims: dataclasses.InitVar[Tuple[str, ...]] = ("kx", "ky", "time")
[docs] def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: """Return units for a given convention""" return c.vref / c.lref
def __post_init__(self, dims): self._set_and_check_dims(dims)
[docs] @dataclasses.dataclass class Eigenfunctions(GKOutputArgs): """Utility dataclass type used to pass eigenfunctions to ``GKOutput``.""" #: Eigenfunction data to pass to ``GKOutput`` eigenfunctions: ArrayLike #: The dimensionality of the eigenfunctions. Should match ``data``. dims: dataclasses.InitVar[Tuple[str, ...]] = ("field", "theta", "kx", "ky", "time") _has_normalised_units: ClassVar[Tuple[str, ...]] = ("eigenfunctions",)
[docs] def units(self, name: str, c: ConventionNormalisation) -> pint.Unit: """Return units for a given convention""" return units.dimensionless
def __post_init__(self, dims): self._set_and_check_dims(dims)
# TODO define Diagnostics stuff on this class. could use accessors # https://docs.xarray.dev/en/stable/internals/extending-xarray.html
[docs] class GKOutput(DatasetWrapper, ReadableFromFile): """ Contains the output data from gyrokinetics codes. Converts the results of each code to a standard set of normalisation conventions, which allows for easier cross-code comparisons. Users are not expected to initialise ``GKOutput`` objects directly, and in most cases should instead make use of the function ``read_gk_output``. The inputs to ``GKOutput`` should be given "physical units", as defined on the `normalisation` page, appropriate to the code that generated the output data. If inputs are not given units, it is assumed they are already compliant with the Pyrokinetics standards. Parameters ---------- coords Dataclass specifying the coordinates of the simulation norm The normalisation scheme of the simulation. fields Dataclass specifying the fields in the simulation fluxes Dataclass specifying the fluxes in the simulation moments Dataclass specifying the moments in the simulation eigenvalues Dataclass specifying the eigenvalues in the simulation. Should only be supplied for linear runs. If not provided, will be set from field data. eigenfunctions Dataclass specifying the eigenfunctions in the simulation Should only be supplied for linear runs. If not provided, will be set from field data. linear Set True for linear gyrokinetics runs, False for nonlinear runs. normalise_flux_moment TODO write docs gk_code: The gyrokinetics code that generated the results. input_file The input file used to generate the results. Attributes ---------- data: xarray.Dataset The internal representation of the ``GKOutput`` object. The functions ``__getattr__`` and ``__getitem__`` redirect most attribute/indexing lookups here, but the Dataset itself may be accessed directly by the user if they wish to perform more complex manipulations. linear: bool ``True`` if the run is linear, ``False`` if the run is nonlinear gk_code: str The gyrokinetics code that generated the data. input_file: str Gyrokinetics input file expressed as a string. norm: SimulationNormalisation The normalisation scheme used for the data. """
[docs] def __init__( self, *, # args are keyword only coords: Coords, norm: SimulationNormalisation, output_convention: str = "pyrokinetics", fields: Optional[Fields] = None, fluxes: Optional[Fluxes] = None, moments: Optional[Moments] = None, eigenvalues: Optional[Eigenvalues] = None, eigenfunctions: Optional[Eigenfunctions] = None, linear: bool = True, normalise_flux_moment: bool = False, gk_code: Optional[str] = None, input_file: Optional[str] = None, input_convention: Optional[str] = None, jacobian: Optional[ArrayLike] = None, ): self.norm = norm convention = getattr(norm, output_convention.lower()) # Renormalise inputs coords = coords.with_units(convention) if fields is not None: fields = fields.with_units(convention) if fluxes is not None: fluxes = fluxes.with_units(convention) if moments is not None: moments = moments.with_units(convention) # Get coords to hand over to underlying Dataset def make_var(dim, val, desc): if val is None: return None if hasattr(val, "units"): return (dim, val.m, {"units": str(val.u), "long_name": desc}) return (dim, val, {"units": None, "long_name": desc}) dataset_coords = { "time": make_var("time", coords.time, "Time"), "kx": make_var("kx", coords.kx, "Radial wavenumber"), "ky": make_var("ky", coords.ky, "Bi-normal wavenumber"), "theta": make_var("theta", coords.theta, "Angle"), "energy": make_var("energy", coords.energy, "Energy"), "pitch": make_var("pitch", coords.pitch, "Pitch angle"), "mode": make_var("mode", coords.mode, "Mode"), "species": make_var("species", coords.species, "Species"), } # Add field, flux and moment coords if coords.field is not None: dataset_coords["field"] = make_var("field", coords.field, "Field") elif fields is not None: dataset_coords["field"] = make_var( "field", np.array(fields.coords), "Field" ) if fluxes is not None: dataset_coords["flux"] = make_var("flux", np.array(fluxes.coords), "Flux") if moments is not None: dataset_coords["moment"] = make_var( "moment", np.array(moments.coords), "Moment" ) # Remove None entries dataset_coords = {k: v for k, v in dataset_coords.items() if v is not None} # Renormalise fields, fluxes, moments # Normalise QL fluxes and moments if linear and needed if fields is not None and linear: # Normalise fluxes to time varying fields (flux exponentially growing) or # normalise to final value (flux already normalised in output) final = not normalise_flux_moment if fluxes is not None: fluxes = self._normalise_to_fields(fields, coords, fluxes, final=final) if moments is not None: moments = self._normalise_to_fields( fields, coords, moments, final=final ) # Normalise fields to GKDB standard if fields is not None and linear: # Add time dimension back to match original shape fields = self._normalise_linear_fields(fields, coords.theta.m) # Set up data vars to hand over to underlying Dataset data_vars = {} if fields is not None: field_desc = { "phi": "Electrostatic potential", "apar": "Parallel magnetic vector potential", "bpar": "Parallel magnetic flux density", } for key in fields.coords: data_vars[key] = make_var(fields.dims, fields[key], field_desc[key]) if moments is not None: moment_desc = { "density": "Density fluctuations", "temperature": "Temperature fluctuations", "velocity": "Velocity fluctuations", } for key in moments.coords: data_vars[key] = make_var(moments.dims, moments[key], moment_desc[key]) if fluxes is not None: flux_desc = { "particle": "Particle flux", "heat": "Heat flux", "momentum": "Momentum flux", } for key in fluxes.coords: data_vars[key] = make_var(fluxes.dims, fluxes[key], flux_desc[key]) # Add eigenvalues. If not provided, try to generate from fields if eigenvalues is None and fields is not None and linear: eigenvalues = self._eigenvalues_from_fields( fields, coords.theta.magnitude, coords.time.magnitude ) if eigenvalues is not None: eigenvalues = eigenvalues.with_units(convention) data_vars["growth_rate"] = make_var( eigenvalues.dims, eigenvalues.growth_rate, "Growth rate" ) data_vars["mode_frequency"] = make_var( eigenvalues.dims, eigenvalues.mode_frequency, "Mode frequency" ) data_vars["eigenvalues"] = make_var( eigenvalues.dims, eigenvalues.mode_frequency + 1.0j * eigenvalues.growth_rate, "Eigenvalues", ) # Add eigenfunctions. If not provided, try to generate from fields if eigenfunctions is None and fields is not None and linear: eigenfunctions = self._eigenfunctions_from_fields(fields, coords.theta.m) if eigenfunctions is not None: if fields is None: eigenfunctions_data = eigenfunctions.eigenfunctions eigenfunctions_dict = {} for ifield, field in enumerate(coords.field): eigenfunctions_dict[field] = eigenfunctions_data[ifield, ...] field_norm = Fields( **eigenfunctions_dict, dims=eigenfunctions.dims[1:] ).with_units(getattr(norm, gk_code.lower())) field_norm = field_norm.with_units(convention) amplitude = self._get_field_amplitude( field_norm, coords.theta.m, ) for ifield, field in enumerate(coords.field): eigenfunctions_data[ifield, ...] = (field_norm[field] / amplitude).m eigenfunctions.eigenfunctions = eigenfunctions_data data_vars["eigenfunctions"] = make_var( eigenfunctions.dims, eigenfunctions.eigenfunctions, "Eigenfunctions", ) if jacobian is not None: data_vars["jacobian"] = make_var(("theta"), jacobian, "Jacobian") # Set up attrs to hand over to underlying dataset attrs = { "linear": int(linear), "gk_code": gk_code if gk_code is not None else "", "input_file": input_file if input_file is not None else "", } # Hand over to underlying dataset super().__init__(data_vars=data_vars, coords=dataset_coords, attrs=attrs) # Calculate growth_rate_tolerance with default inputs if eigenvalues is not None and linear: if "time" in eigenvalues.dims: self.data["growth_rate_tolerance"] = self.get_growth_rate_tolerance() else: self.data["growth_rate_tolerance"] = 0.0 * units.dimensionless
[docs] def field(self, name: str) -> xr.DataArray: if name not in Fields.names: raise ValueError( f"'name' should be one of {', '.join(Fields.names)}. Received '{name}'." ) if name not in self.data_vars: raise ValueError(f"GKOutput does not contain the field '{name}'") return self.data_vars[name]
[docs] def flux(self, name: str) -> xr.DataArray: if name not in Fluxes.names: raise ValueError( f"'name' should be one of {', '.join(Fluxes.names)}. Received '{name}'" ) if name not in self.data_vars: raise ValueError(f"GKOutput does not contain the flux '{name}'") return self.data_vars[name]
[docs] def moment(self, name: str) -> xr.DataArray: if name not in Moments.names: raise ValueError( f"'name' should be one of {', '.join(Moments.names)}. Received '{name}'" ) if name not in self.data_vars: raise ValueError(f"GKOutput does not contain the moment '{name}'") return self.data_vars[name]
[docs] def get_growth_rate_tolerance(self, time_range: float = 0.8) -> float: """ Given a pyrokinetics output dataset with eigenvalues determined, calculate the growth rate tolerance. This is calculated starting at the time given by time_range * max_time. """ if "growth_rate" not in self.data_vars: raise ValueError( "GKOutput does not have 'growth rate'. Only results associated with " "linear gyrokinetics runs will have this." ) # Check accuracy of frequency, starting at time_range*final_time time = self["time"] final_time = time.isel(time=-1) mode_frequency = self["mode_frequency"] mode_frequency = mode_frequency.where( mode_frequency["time"] > time_range * final_time, drop=True ) max_freq = np.pi / np.mean(np.diff(time)) if np.any(np.abs(mode_frequency.data.m) / max_freq > 0.5): warnings.warn( "Mode frequency may not be accurate due to low temporal resolution" ) # Calculate growth rate tolerance growth_rate = self["growth_rate"] final_growth_rate = growth_rate.isel(time=-1) difference = ((growth_rate - final_growth_rate) / final_growth_rate) ** 2 # Average over the end of the simulation, starting at time_range*final_time difference = difference.where( difference["time"] > time_range * final_time, drop=True ) tolerance = np.sqrt( difference.integrate("time") / (difference["time"].isel(time=-1) - difference["time"].isel(time=0)) ) return tolerance
@staticmethod def _eigenvalues_from_fields( fields: Fields, theta: ArrayLike, time: ArrayLike, ) -> Eigenvalues: """ Call during __init__ after converting to pyro normalisations """ # field dims are (theta, kx, ky, time) # FIXME field dims are now variable! shape = fields.shape sum_fields = np.zeros(shape, dtype=complex) square_fields = np.zeros(shape) for field in fields.values(): sum_fields += field.magnitude square_fields += np.abs(field.magnitude) ** 2 # Integrate over theta field_amplitude = trapezoid(square_fields, theta, axis=0) ** 0.5 # Differentiate with respect to time and avoid 0 in log log_field_amplitude = np.where( field_amplitude > 0, np.log(field_amplitude), 0.0 ) growth_rate = np.gradient(log_field_amplitude, time, axis=-1) field_angle = np.angle(trapezoid(sum_fields, theta, axis=0)) # Change angle by 2pi for every rotation so gradient is easier to calculate pi_change = np.cumsum( np.where( field_angle[:, :, :-1] * field_angle[:, :, 1:] < -np.pi, -np.sign(field_angle[:, :, 1:]), 0, ), axis=-1, ) field_angle[:, :, 1:] += 2 * np.pi * pi_change mode_frequency = -np.gradient(field_angle, time, axis=-1) max_freq = np.pi / np.mean(np.diff(time)) if np.any(np.abs(mode_frequency[:, :, -1]) / max_freq > 1): warnings.warn( "Mode frequency may not be accurate due to low temporal resolution" ) return Eigenvalues( growth_rate=growth_rate, mode_frequency=mode_frequency, ) @staticmethod def _eigenfunctions_from_fields(fields: Fields, theta: ArrayLike) -> Eigenfunctions: eigenfunctions = np.zeros((len(fields.coords),) + fields.shape, dtype=complex) for ifield, field in enumerate(fields.values()): eigenfunctions[ifield] = field.magnitude eigenfunctions *= units.dimensionless return Eigenfunctions(eigenfunctions, dims=("field",) + fields.dims) @staticmethod def _get_field_amplitude(fields: Fields, theta): field_squared = 0.0 for field in fields.values(): field_squared += np.abs(field.m) ** 2 if len(theta) > 1: amplitude = np.sqrt(trapezoid(field_squared, theta, axis=0) / (2 * np.pi)) else: amplitude = np.sqrt(field_squared[0, ...] / (2 * np.pi)) if "phi" in fields.coords: phase_field = "phi" else: phase_field = fields.coords[0] phi = fields[phase_field] if "time" in fields.dims: # Check for final time slice with finite data final_index = np.argwhere(np.isfinite(amplitude))[-1][-1] if final_index != amplitude.shape[-1] - 1: warnings.warn( "Non-finite data found in fields. Likely to due NaN/Inf in GKoutput data" ) phi_final = phi[:, :, :, final_index] else: phi_final = phi if hasattr(phi_final, "magnitude"): phi_final = phi_final.m if "theta" in fields.dims: theta_star_index = np.argmax(abs(phi_final), axis=0) phi_theta_star = np.take_along_axis( phi_final, theta_star_index[np.newaxis, ...], axis=0 ).squeeze(0) else: phi_theta_star = phi_final phase = np.exp(1j * np.angle(phi_theta_star)) if "time" in fields.dims: phase = phase[..., np.newaxis] normalising_factor = phase * amplitude normalising_factor = np.where(normalising_factor == 0, 1.0, normalising_factor) return normalising_factor def _normalise_linear_fields(self, fields: Fields, theta) -> ArrayLike: """ Normalise fields as done in GKDB manual sec 5.5.3->5.5.5 Parameters ---------- fields Returns ------- fields """ amplitude = self._get_field_amplitude(fields, theta) final_index = np.argwhere(np.isfinite(amplitude))[-1][-1] final_amplitude = amplitude[..., final_index] for f in fields: fields[f] *= 1.0 / final_amplitude[..., np.newaxis] return fields def _normalise_to_fields( self, fields: Fields, coords: Coords, outputs, final: bool = False ): """ Normalise output (moments/fluxes) to fields to obtain quasi-linear value Only valid for linear simulations Parameters ---------- fields : FieldDict Field data used to normalise output coords: Coords Coordinate data outputs: MomentDict or FluxDict Output to renormalise final: bool Normalise to time series or just final value Returns ------- outputs: MomentDict or FluxDict Re-normalised outputs """ theta = coords.theta.m amplitude = self._get_field_amplitude(fields, theta) if final: final_index = np.argwhere(np.isfinite(amplitude))[-1][-1] amplitude = amplitude[..., final_index] amplitude = amplitude[..., np.newaxis] # Indices should be (kx, ky, 1) if "kx" not in outputs.dims and len(coords.kx) > 1: amplitude = np.sum(amplitude, axis=0) for output_name in outputs.coords: outputs[output_name] /= np.abs(amplitude) ** 2 return outputs
[docs] def to(self, norms: ConventionNormalisation, *contexts): """ Parameters ---------- norms : ConventionNormalisation Normalisation convention to convert to Returns ------- GKOutput with units from norms """ for data_var in self.data_vars: self.data[data_var].data = self.data[data_var].data.to(norms, *contexts) # Coordinates with units not supported in xarray need to manually change new_coords = {} for coord in self.coords: if hasattr(self.data[coord], "units"): new_coord = (self.data[coord].data * self.data[coord].units).to( norms, *contexts ) new_coords[coord] = ( coord, new_coord.m, {"units": new_coord.units, "long_name": self.data[coord].long_name}, ) self.data = self.data.assign_coords(coords=new_coords)
[docs] def convert_physical_units(self, norms): """Convert physical-unit data vars and coords to generic simulation units of ``norms``.""" for data_var in self.data_vars: data = self.data[data_var].data if hasattr(data, "convert_physical_units"): self.data[data_var].data = data.convert_physical_units(norms) new_coords = {} for coord in self.coords: if hasattr(self.data[coord], "units"): quantity = self.data[coord].data * self.data[coord].units if hasattr(quantity, "convert_physical_units"): new_coord = quantity.convert_physical_units(norms) new_coords[coord] = ( coord, new_coord.m, { "units": new_coord.units, "long_name": self.data[coord].long_name, }, ) self.data = self.data.assign_coords(coords=new_coords)
[docs] def add_data( self, name: str, data: ArrayLike, coords: Tuple, units: pint.Unit, output_convention="pyrokinetics", ): """ Modifies existing GKOutput by adding specified data Parameters ---------- data : ArrayLike N-D array of data to be added in coords: Tuple Coordinates of the data provided. Must match shape of data units: pint.Unit Units of data """ if not hasattr(data, "units"): data *= units convention = getattr(self.norm, output_convention.lower()) data = data.to(convention) self.data[name] = (coords, data)
[docs] def read_gk_output( path: PathLike, norm: SimulationNormalisation, output_convention: ConventionNormalisation = "pyrokinetics", load_fields=True, load_fluxes=True, load_moments=False, gk_type: Optional[str] = None, **kwargs, ): r""" Read gyrokinetics output file(s) from disk, returning an `GKOutput` instance. Parameters ---------- convention path: PathLike Location of the file(s) on disk. norm: SimulationNormalisation The normalisation scheme of the simulation that produced the data. Usually generated by a Pyro object. output_convention: ConventionNormalisation, default "pyrokinetics" Convention to convert output to load_fields: bool, default True Flag to load fields or not load_fluxes: bool, default True Flag to load fluxes or not load_moments: bool, default False Flag to load moments or not gk_type: Optional[str] String specifying the type of gyrokinetics files. If unset, the file type will be inferred automatically. Specifying the file type may improve performance. **kwargs: Keyword arguments forwarded to the gyrokinetics output file reader. Raises ------ ValueError If path doesn't exist or isn't a file. """ return GKOutput.from_file( path, file_type=gk_type, norm=norm, output_convention=output_convention, load_fields=load_fields, load_fluxes=load_fluxes, load_moments=load_moments, **kwargs, )
[docs] def flux_surface_average(self): """ averages the fields over theta weighted by the Jacobian """ field_list = ["phi", "apar", "bpar"] for field in field_list: if field in self.data_vars: if "theta" not in self.data[field].dims: raise ValueError(f"{field} does not have a 'theta' dimension") self.data[field] = (self.data[field] * self.data["jacobian"]).sum( dim="theta" ) / self.data["jacobian"].sum(dim="theta")
[docs] def supported_gk_output_types() -> List[str]: """ Returns a list of all registered GKOutput file types. These file types are readable by ``from_file``. """ return GKOutput.supported_file_types()