from __future__ import annotations
import dataclasses
import warnings
from abc import abstractmethod
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Generator,
Iterable,
List,
NoReturn,
Optional,
Tuple,
)
import numpy as np
import pint
from numpy.typing import ArrayLike
from scipy.integrate import trapezoid
from ..dataset_wrapper import DatasetWrapper
from ..file_utils import ReadableFromFile
from ..normalisation import ConventionNormalisation, SimulationNormalisation
from ..typing import PathLike
from ..units import ureg as units
if TYPE_CHECKING:
import xarray as xr
[docs]
@dataclasses.dataclass
class GKOutputArgs:
"""
Utility base dataclass used to pass quantities to ``GKOutput``. Derived classes
include ``Coords``, ``Fields``, ``Fluxes``, etc. This class contains features such
as automatic unit conversion and a dict-like interface to quantities.
Derived classes should define an ``InitVar[Tuple[str, ...]]`` called ``dims``,
which sets the dimensionality of each quantity, e.g. ``("kx", "ky", "time")``.
This should be set in ``__post_init__``, along with a call to
``_set_and_check_dims``. This is to work around the pre-Python 3.10 dataclass
issues regarding keyword arguments in base classes
"""
@property
def names(self) -> Tuple[str, ...]:
"""Names of all quantities held by this dataclass"""
return tuple(x.name for x in dataclasses.fields(self))
[docs]
@abstractmethod
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
"""Return the units for each quantity"""
pass
def __getitem__(self, key: str) -> Any:
"""Look up quantities with dict-like inteface"""
if key not in self.names:
raise KeyError(key)
return getattr(self, key)
def __setitem__(self, key: str, val: Any) -> None:
"""
Set quantities with dict-like inteface.
Warning: No validity checking is performed.
"""
if key not in self.names:
raise KeyError(key)
setattr(self, key, val)
@property
def coords(self) -> Tuple[str, ...]:
"""
Tuple containing the names of each supplied field (those that aren't ``None``).
"""
return tuple(k for k in self.names if self[k] is not None)
def __iter__(self) -> Generator[str, None, None]:
"""Iterate over quantity names. Skips ``None`` quantities."""
return iter(self.coords)
[docs]
def values(self) -> Generator[Any, None, None]:
"""Dict-like values iteration"""
try:
it = iter(self)
while True:
yield self[next(it)]
except StopIteration:
return
[docs]
def items(self) -> Generator[Tuple[str, Any], None, None]:
"""Dict-like items iteration"""
return zip(iter(self), self.values())
#: List of quantities that have normalised units, e.g. 'lref'.
#: Should be overridden in derived classes.
_has_normalised_units: ClassVar[Tuple[str, ...]] = ()
#: List of quantities that have physical units, e.g. 'radians'.
#: Should be overridden in derived classes.
_has_physical_units: ClassVar[Tuple[str, ...]] = ()
[docs]
def with_units(self, c: ConventionNormalisation):
"""
Apply units to each quantity in turn and return a new ``Coords``.
If units are already applied, renormalises according to the convention supplied.
"""
kwargs = {}
for key, val in self.items():
if val is None:
kwargs[key] = None
continue
if key in self._has_normalised_units:
if hasattr(val, "units"):
kwargs[key] = val.to(c)
else:
kwargs[key] = val * self.units(key, c)
continue
if key in self._has_physical_units:
if hasattr(val, "units"):
kwargs[key] = val.to(self.units(key, c))
else:
kwargs[key] = val * self.units(key, c)
continue
# Pass everything else through
kwargs[key] = val
# Pass through the pseudo-field 'dims'
if hasattr(self, "dims"):
kwargs["dims"] = self.dims
return self.__class__(**kwargs)
@property
def shape(self) -> Tuple[int, ...]:
"""
Shape of quantities. Raises error if all are None.
Should be overridden in ``Coords``, where this function makes no sense.
"""
for _, val in self.items():
if val is not None:
return np.shape(val)
raise ValueError("Fields contains no data")
def _set_and_check_dims(self, dims) -> None:
"""Set ``dims``, perform checks on the values assigned."""
self.dims = dims
for key, val in self.items():
if val is not None and np.ndim(val) != len(self.dims):
raise ValueError(f"Quantity '{key}' has incorrect number of dims")
[docs]
@dataclasses.dataclass
class Coords(GKOutputArgs):
"""Utility dataclass type used to pass coordinates to ``GKOutput``"""
#: 1D grid of radial wave-numbers used in the simulation
#: Units of [rhoref ** -1]
kx: ArrayLike
#: 1D grid of bi-normal wave-numbers used in the simulation
#: Units of [rhoref ** -1]
ky: ArrayLike
#: 1D grid of time of the simulation output
#: Units of [lref / vref]
time: ArrayLike
#: List of species names in the simulation
species: Iterable[str]
#: 1D grid of theta used in the simulation
#: Units of [radians]
theta: Optional[ArrayLike] = None
#: TODO document
mode: Optional[ArrayLike] = None
#: 1D grid of the energy in the simulation
#: Units of [dimensionless]
energy: Optional[ArrayLike] = None
#: 1D grid of the pitch angle in the simulation
#: Units of [dimensionless]
pitch: Optional[ArrayLike] = None
#: List of fields. Normally this information is obtained from the ``Fields`` class,
#: but there are edge cases in which no ``Fields`` can be supplied but a fields
#: coordinate must be defined.
field: Optional[Iterable[str]] = None
_has_physical_units: ClassVar[Tuple[str, ...]] = (
"theta",
"mode",
"energy",
"pitch",
)
_has_normalised_units: ClassVar[Tuple[str, ...]] = ("kx", "ky", "time")
[docs]
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
if name not in self.names:
raise ValueError(f"The coord '{name}' is not recognised")
if name in ("kx", "ky"):
return c.rhoref**-1
if name == "time":
return c.lref / c.vref
if name == "theta":
return units.radians
return units.dimensionless
@property
def shape(self) -> NoReturn:
raise RuntimeError("Coords does not implement 'shape'")
[docs]
@dataclasses.dataclass
class Fields(GKOutputArgs):
"""Utility dataclass type used to pass field data to ``GKOutput``."""
#: :math:`\phi`: the electrostatic potential.
#: Units of ``[tref * rhoref / (qref * lref)]``
phi: Optional[ArrayLike] = None
#: :math:`A_\parallel`: Parallel component of the magnetic vector potential.
#: Units of ``[bref * rhoref**2 / lref]``.
apar: Optional[ArrayLike] = None
#: :math:`B_\parallel`: Parallel component of the magnetic flux density.
#: Units of ``[bref * rhoref / lref]``.
bpar: Optional[ArrayLike] = None
_has_normalised_units: ClassVar[Tuple[str, ...]] = ("phi", "apar", "bpar")
#: The dimensionality of the fields.
#: Each field should have the same number of dimensions.
dims: dataclasses.InitVar[Tuple[str, ...]] = ("theta", "kx", "ky", "t")
[docs]
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
"""Return units associated with each field for a given convention"""
if name == "phi":
return c.tref / c.qref * c.rhoref / c.lref
elif name == "apar":
return c.bref * c.rhoref * c.rhoref / c.lref
elif name == "bpar":
return c.bref * c.rhoref / c.lref
else:
raise ValueError(f"Field name '{name}' not recognised")
def __post_init__(self, dims):
self._set_and_check_dims(dims)
[docs]
@dataclasses.dataclass
class Fluxes(GKOutputArgs):
"""Utility dataclass type used to pass fluxes to ``GKOutput``."""
#: Units of ``[nref * vref * (rhoref / lref)**2]``.
particle: Optional[ArrayLike] = None
#: Units of ``[nref * vref * tref * (rhoref / lref)**2]``.
heat: Optional[ArrayLike] = None
#: units of ``[nref * lref * tref * (rhoref / lref)**2]``.
momentum: Optional[ArrayLike] = None
_has_normalised_units: ClassVar[Tuple[str, ...]] = ("particle", "heat", "momentum")
#: The dimensionality of the fluxes.
#: Each array should have the same dimensionality.
dims: dataclasses.InitVar[Tuple[str, ...]] = ("field", "species", "kx", "ky", "t")
[docs]
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
"""Return units associated with each flux for a given convention"""
if name == "particle":
return c.nref * c.vref * (c.rhoref / c.lref) ** 2
elif name == "momentum":
return c.nref * c.lref * c.tref * (c.rhoref / c.lref) ** 2
elif name == "heat":
return c.nref * c.vref * c.tref * (c.rhoref / c.lref) ** 2
else:
raise ValueError(f"Flux name '{name}' not recognised.")
def __post_init__(self, dims):
self._set_and_check_dims(dims)
[docs]
@dataclasses.dataclass
class Moments(GKOutputArgs):
"""Utility dataclass type used to pass moments to ``GKOutput``."""
#: Units of ``[nref * rhoref / lref)]``
density: Optional[ArrayLike] = None
#: Units of ``[tref * rhoref / lref]``
temperature: Optional[ArrayLike] = None
#: Units of ``[vref * rhoref / lref]``
velocity: Optional[ArrayLike] = None
_has_normalised_units: ClassVar[Tuple[str, ...]] = (
"density",
"temperature",
"velocity",
)
#: The dimensionality of the moments.
#: Each array should have the same dimensionality.
dims: dataclasses.InitVar[Tuple[str, ...]] = ("theta", "kx", "species", "ky", "t")
[docs]
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
"""Return units associated with each moment for a given convention"""
if name == "density":
return c.nref * c.rhoref / c.lref
elif name == "temperature":
return c.tref * c.rhoref / c.lref
elif name == "velocity":
return c.vref * c.rhoref / c.lref
else:
raise ValueError(f"Moment name '{name}' not recognised.")
def __post_init__(self, dims):
self._set_and_check_dims(dims)
[docs]
@dataclasses.dataclass
class Eigenvalues(GKOutputArgs):
"""
Utility dataclass type used to pass eigenvalues to ``GKOutput``. Unlike the classes
``Fields``, ``Fluxes``, and ``Moments``, all entries to ``Eigenvalues`` are
non-optional.
"""
#: Units of ``[vref / lref]``.
growth_rate: ArrayLike
#: Units of ``[vref / lref]``.
mode_frequency: ArrayLike
_has_normalised_units: ClassVar[Tuple[str, ...]] = ("growth_rate", "mode_frequency")
#: The dimensionality of the eigenvalues
#: Each array should have the same dimensionality.
dims: dataclasses.InitVar[Tuple[str, ...]] = ("kx", "ky", "time")
[docs]
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
"""Return units for a given convention"""
return c.vref / c.lref
def __post_init__(self, dims):
self._set_and_check_dims(dims)
[docs]
@dataclasses.dataclass
class Eigenfunctions(GKOutputArgs):
"""Utility dataclass type used to pass eigenfunctions to ``GKOutput``."""
#: Eigenfunction data to pass to ``GKOutput``
eigenfunctions: ArrayLike
#: The dimensionality of the eigenfunctions. Should match ``data``.
dims: dataclasses.InitVar[Tuple[str, ...]] = ("field", "theta", "kx", "ky", "time")
_has_normalised_units: ClassVar[Tuple[str, ...]] = ("eigenfunctions",)
[docs]
def units(self, name: str, c: ConventionNormalisation) -> pint.Unit:
"""Return units for a given convention"""
return units.dimensionless
def __post_init__(self, dims):
self._set_and_check_dims(dims)
# TODO define Diagnostics stuff on this class. could use accessors
# https://docs.xarray.dev/en/stable/internals/extending-xarray.html
[docs]
class GKOutput(DatasetWrapper, ReadableFromFile):
"""
Contains the output data from gyrokinetics codes. Converts the results of each code
to a standard set of normalisation conventions, which allows for easier cross-code
comparisons.
Users are not expected to initialise ``GKOutput`` objects directly,
and in most cases should instead make use of the function ``read_gk_output``.
The inputs to ``GKOutput`` should be given "physical units", as defined on the
`normalisation` page, appropriate to the code that generated the output
data. If inputs are not given units, it is assumed they are already compliant with
the Pyrokinetics standards.
Parameters
----------
coords
Dataclass specifying the coordinates of the simulation
norm
The normalisation scheme of the simulation.
fields
Dataclass specifying the fields in the simulation
fluxes
Dataclass specifying the fluxes in the simulation
moments
Dataclass specifying the moments in the simulation
eigenvalues
Dataclass specifying the eigenvalues in the simulation. Should only be supplied
for linear runs. If not provided, will be set from field data.
eigenfunctions
Dataclass specifying the eigenfunctions in the simulation Should only be
supplied for linear runs. If not provided, will be set from field data.
linear
Set True for linear gyrokinetics runs, False for nonlinear runs.
normalise_flux_moment
TODO write docs
gk_code:
The gyrokinetics code that generated the results.
input_file
The input file used to generate the results.
Attributes
----------
data: xarray.Dataset
The internal representation of the ``GKOutput`` object. The functions
``__getattr__`` and ``__getitem__`` redirect most attribute/indexing lookups
here, but the Dataset itself may be accessed directly by the user if they wish
to perform more complex manipulations.
linear: bool
``True`` if the run is linear, ``False`` if the run is nonlinear
gk_code: str
The gyrokinetics code that generated the data.
input_file: str
Gyrokinetics input file expressed as a string.
norm: SimulationNormalisation
The normalisation scheme used for the data.
"""
[docs]
def __init__(
self,
*, # args are keyword only
coords: Coords,
norm: SimulationNormalisation,
output_convention: str = "pyrokinetics",
fields: Optional[Fields] = None,
fluxes: Optional[Fluxes] = None,
moments: Optional[Moments] = None,
eigenvalues: Optional[Eigenvalues] = None,
eigenfunctions: Optional[Eigenfunctions] = None,
linear: bool = True,
normalise_flux_moment: bool = False,
gk_code: Optional[str] = None,
input_file: Optional[str] = None,
input_convention: Optional[str] = None,
jacobian: Optional[ArrayLike] = None,
):
self.norm = norm
convention = getattr(norm, output_convention.lower())
# Renormalise inputs
coords = coords.with_units(convention)
if fields is not None:
fields = fields.with_units(convention)
if fluxes is not None:
fluxes = fluxes.with_units(convention)
if moments is not None:
moments = moments.with_units(convention)
# Get coords to hand over to underlying Dataset
def make_var(dim, val, desc):
if val is None:
return None
if hasattr(val, "units"):
return (dim, val.m, {"units": str(val.u), "long_name": desc})
return (dim, val, {"units": None, "long_name": desc})
dataset_coords = {
"time": make_var("time", coords.time, "Time"),
"kx": make_var("kx", coords.kx, "Radial wavenumber"),
"ky": make_var("ky", coords.ky, "Bi-normal wavenumber"),
"theta": make_var("theta", coords.theta, "Angle"),
"energy": make_var("energy", coords.energy, "Energy"),
"pitch": make_var("pitch", coords.pitch, "Pitch angle"),
"mode": make_var("mode", coords.mode, "Mode"),
"species": make_var("species", coords.species, "Species"),
}
# Add field, flux and moment coords
if coords.field is not None:
dataset_coords["field"] = make_var("field", coords.field, "Field")
elif fields is not None:
dataset_coords["field"] = make_var(
"field", np.array(fields.coords), "Field"
)
if fluxes is not None:
dataset_coords["flux"] = make_var("flux", np.array(fluxes.coords), "Flux")
if moments is not None:
dataset_coords["moment"] = make_var(
"moment", np.array(moments.coords), "Moment"
)
# Remove None entries
dataset_coords = {k: v for k, v in dataset_coords.items() if v is not None}
# Renormalise fields, fluxes, moments
# Normalise QL fluxes and moments if linear and needed
if fields is not None and linear:
# Normalise fluxes to time varying fields (flux exponentially growing) or
# normalise to final value (flux already normalised in output)
final = not normalise_flux_moment
if fluxes is not None:
fluxes = self._normalise_to_fields(fields, coords, fluxes, final=final)
if moments is not None:
moments = self._normalise_to_fields(
fields, coords, moments, final=final
)
# Normalise fields to GKDB standard
if fields is not None and linear:
# Add time dimension back to match original shape
fields = self._normalise_linear_fields(fields, coords.theta.m)
# Set up data vars to hand over to underlying Dataset
data_vars = {}
if fields is not None:
field_desc = {
"phi": "Electrostatic potential",
"apar": "Parallel magnetic vector potential",
"bpar": "Parallel magnetic flux density",
}
for key in fields.coords:
data_vars[key] = make_var(fields.dims, fields[key], field_desc[key])
if moments is not None:
moment_desc = {
"density": "Density fluctuations",
"temperature": "Temperature fluctuations",
"velocity": "Velocity fluctuations",
}
for key in moments.coords:
data_vars[key] = make_var(moments.dims, moments[key], moment_desc[key])
if fluxes is not None:
flux_desc = {
"particle": "Particle flux",
"heat": "Heat flux",
"momentum": "Momentum flux",
}
for key in fluxes.coords:
data_vars[key] = make_var(fluxes.dims, fluxes[key], flux_desc[key])
# Add eigenvalues. If not provided, try to generate from fields
if eigenvalues is None and fields is not None and linear:
eigenvalues = self._eigenvalues_from_fields(
fields, coords.theta.magnitude, coords.time.magnitude
)
if eigenvalues is not None:
eigenvalues = eigenvalues.with_units(convention)
data_vars["growth_rate"] = make_var(
eigenvalues.dims, eigenvalues.growth_rate, "Growth rate"
)
data_vars["mode_frequency"] = make_var(
eigenvalues.dims, eigenvalues.mode_frequency, "Mode frequency"
)
data_vars["eigenvalues"] = make_var(
eigenvalues.dims,
eigenvalues.mode_frequency + 1.0j * eigenvalues.growth_rate,
"Eigenvalues",
)
# Add eigenfunctions. If not provided, try to generate from fields
if eigenfunctions is None and fields is not None and linear:
eigenfunctions = self._eigenfunctions_from_fields(fields, coords.theta.m)
if eigenfunctions is not None:
if fields is None:
eigenfunctions_data = eigenfunctions.eigenfunctions
eigenfunctions_dict = {}
for ifield, field in enumerate(coords.field):
eigenfunctions_dict[field] = eigenfunctions_data[ifield, ...]
field_norm = Fields(
**eigenfunctions_dict, dims=eigenfunctions.dims[1:]
).with_units(getattr(norm, gk_code.lower()))
field_norm = field_norm.with_units(convention)
amplitude = self._get_field_amplitude(
field_norm,
coords.theta.m,
)
for ifield, field in enumerate(coords.field):
eigenfunctions_data[ifield, ...] = (field_norm[field] / amplitude).m
eigenfunctions.eigenfunctions = eigenfunctions_data
data_vars["eigenfunctions"] = make_var(
eigenfunctions.dims,
eigenfunctions.eigenfunctions,
"Eigenfunctions",
)
if jacobian is not None:
data_vars["jacobian"] = make_var(("theta"), jacobian, "Jacobian")
# Set up attrs to hand over to underlying dataset
attrs = {
"linear": int(linear),
"gk_code": gk_code if gk_code is not None else "",
"input_file": input_file if input_file is not None else "",
}
# Hand over to underlying dataset
super().__init__(data_vars=data_vars, coords=dataset_coords, attrs=attrs)
# Calculate growth_rate_tolerance with default inputs
if eigenvalues is not None and linear:
if "time" in eigenvalues.dims:
self.data["growth_rate_tolerance"] = self.get_growth_rate_tolerance()
else:
self.data["growth_rate_tolerance"] = 0.0 * units.dimensionless
[docs]
def field(self, name: str) -> xr.DataArray:
if name not in Fields.names:
raise ValueError(
f"'name' should be one of {', '.join(Fields.names)}. Received '{name}'."
)
if name not in self.data_vars:
raise ValueError(f"GKOutput does not contain the field '{name}'")
return self.data_vars[name]
[docs]
def flux(self, name: str) -> xr.DataArray:
if name not in Fluxes.names:
raise ValueError(
f"'name' should be one of {', '.join(Fluxes.names)}. Received '{name}'"
)
if name not in self.data_vars:
raise ValueError(f"GKOutput does not contain the flux '{name}'")
return self.data_vars[name]
[docs]
def moment(self, name: str) -> xr.DataArray:
if name not in Moments.names:
raise ValueError(
f"'name' should be one of {', '.join(Moments.names)}. Received '{name}'"
)
if name not in self.data_vars:
raise ValueError(f"GKOutput does not contain the moment '{name}'")
return self.data_vars[name]
[docs]
def get_growth_rate_tolerance(self, time_range: float = 0.8) -> float:
"""
Given a pyrokinetics output dataset with eigenvalues determined, calculate the
growth rate tolerance. This is calculated starting at the time given by
time_range * max_time.
"""
if "growth_rate" not in self.data_vars:
raise ValueError(
"GKOutput does not have 'growth rate'. Only results associated with "
"linear gyrokinetics runs will have this."
)
# Check accuracy of frequency, starting at time_range*final_time
time = self["time"]
final_time = time.isel(time=-1)
mode_frequency = self["mode_frequency"]
mode_frequency = mode_frequency.where(
mode_frequency["time"] > time_range * final_time, drop=True
)
max_freq = np.pi / np.mean(np.diff(time))
if np.any(np.abs(mode_frequency.data.m) / max_freq > 0.5):
warnings.warn(
"Mode frequency may not be accurate due to low temporal resolution"
)
# Calculate growth rate tolerance
growth_rate = self["growth_rate"]
final_growth_rate = growth_rate.isel(time=-1)
difference = ((growth_rate - final_growth_rate) / final_growth_rate) ** 2
# Average over the end of the simulation, starting at time_range*final_time
difference = difference.where(
difference["time"] > time_range * final_time, drop=True
)
tolerance = np.sqrt(
difference.integrate("time")
/ (difference["time"].isel(time=-1) - difference["time"].isel(time=0))
)
return tolerance
@staticmethod
def _eigenvalues_from_fields(
fields: Fields,
theta: ArrayLike,
time: ArrayLike,
) -> Eigenvalues:
"""
Call during __init__ after converting to pyro normalisations
"""
# field dims are (theta, kx, ky, time)
# FIXME field dims are now variable!
shape = fields.shape
sum_fields = np.zeros(shape, dtype=complex)
square_fields = np.zeros(shape)
for field in fields.values():
sum_fields += field.magnitude
square_fields += np.abs(field.magnitude) ** 2
# Integrate over theta
field_amplitude = trapezoid(square_fields, theta, axis=0) ** 0.5
# Differentiate with respect to time and avoid 0 in log
log_field_amplitude = np.where(
field_amplitude > 0, np.log(field_amplitude), 0.0
)
growth_rate = np.gradient(log_field_amplitude, time, axis=-1)
field_angle = np.angle(trapezoid(sum_fields, theta, axis=0))
# Change angle by 2pi for every rotation so gradient is easier to calculate
pi_change = np.cumsum(
np.where(
field_angle[:, :, :-1] * field_angle[:, :, 1:] < -np.pi,
-np.sign(field_angle[:, :, 1:]),
0,
),
axis=-1,
)
field_angle[:, :, 1:] += 2 * np.pi * pi_change
mode_frequency = -np.gradient(field_angle, time, axis=-1)
max_freq = np.pi / np.mean(np.diff(time))
if np.any(np.abs(mode_frequency[:, :, -1]) / max_freq > 1):
warnings.warn(
"Mode frequency may not be accurate due to low temporal resolution"
)
return Eigenvalues(
growth_rate=growth_rate,
mode_frequency=mode_frequency,
)
@staticmethod
def _eigenfunctions_from_fields(fields: Fields, theta: ArrayLike) -> Eigenfunctions:
eigenfunctions = np.zeros((len(fields.coords),) + fields.shape, dtype=complex)
for ifield, field in enumerate(fields.values()):
eigenfunctions[ifield] = field.magnitude
eigenfunctions *= units.dimensionless
return Eigenfunctions(eigenfunctions, dims=("field",) + fields.dims)
@staticmethod
def _get_field_amplitude(fields: Fields, theta):
field_squared = 0.0
for field in fields.values():
field_squared += np.abs(field.m) ** 2
if len(theta) > 1:
amplitude = np.sqrt(trapezoid(field_squared, theta, axis=0) / (2 * np.pi))
else:
amplitude = np.sqrt(field_squared[0, ...] / (2 * np.pi))
if "phi" in fields.coords:
phase_field = "phi"
else:
phase_field = fields.coords[0]
phi = fields[phase_field]
if "time" in fields.dims:
# Check for final time slice with finite data
final_index = np.argwhere(np.isfinite(amplitude))[-1][-1]
if final_index != amplitude.shape[-1] - 1:
warnings.warn(
"Non-finite data found in fields. Likely to due NaN/Inf in GKoutput data"
)
phi_final = phi[:, :, :, final_index]
else:
phi_final = phi
if hasattr(phi_final, "magnitude"):
phi_final = phi_final.m
if "theta" in fields.dims:
theta_star_index = np.argmax(abs(phi_final), axis=0)
phi_theta_star = np.take_along_axis(
phi_final, theta_star_index[np.newaxis, ...], axis=0
).squeeze(0)
else:
phi_theta_star = phi_final
phase = np.exp(1j * np.angle(phi_theta_star))
if "time" in fields.dims:
phase = phase[..., np.newaxis]
normalising_factor = phase * amplitude
normalising_factor = np.where(normalising_factor == 0, 1.0, normalising_factor)
return normalising_factor
def _normalise_linear_fields(self, fields: Fields, theta) -> ArrayLike:
"""
Normalise fields as done in GKDB manual sec 5.5.3->5.5.5
Parameters
----------
fields
Returns
-------
fields
"""
amplitude = self._get_field_amplitude(fields, theta)
final_index = np.argwhere(np.isfinite(amplitude))[-1][-1]
final_amplitude = amplitude[..., final_index]
for f in fields:
fields[f] *= 1.0 / final_amplitude[..., np.newaxis]
return fields
def _normalise_to_fields(
self, fields: Fields, coords: Coords, outputs, final: bool = False
):
"""
Normalise output (moments/fluxes) to fields to obtain quasi-linear value
Only valid for linear simulations
Parameters
----------
fields : FieldDict
Field data used to normalise output
coords: Coords
Coordinate data
outputs: MomentDict or FluxDict
Output to renormalise
final: bool
Normalise to time series or just final value
Returns
-------
outputs: MomentDict or FluxDict
Re-normalised outputs
"""
theta = coords.theta.m
amplitude = self._get_field_amplitude(fields, theta)
if final:
final_index = np.argwhere(np.isfinite(amplitude))[-1][-1]
amplitude = amplitude[..., final_index]
amplitude = amplitude[..., np.newaxis]
# Indices should be (kx, ky, 1)
if "kx" not in outputs.dims and len(coords.kx) > 1:
amplitude = np.sum(amplitude, axis=0)
for output_name in outputs.coords:
outputs[output_name] /= np.abs(amplitude) ** 2
return outputs
[docs]
def to(self, norms: ConventionNormalisation, *contexts):
"""
Parameters
----------
norms : ConventionNormalisation
Normalisation convention to convert to
Returns
-------
GKOutput with units from norms
"""
for data_var in self.data_vars:
self.data[data_var].data = self.data[data_var].data.to(norms, *contexts)
# Coordinates with units not supported in xarray need to manually change
new_coords = {}
for coord in self.coords:
if hasattr(self.data[coord], "units"):
new_coord = (self.data[coord].data * self.data[coord].units).to(
norms, *contexts
)
new_coords[coord] = (
coord,
new_coord.m,
{"units": new_coord.units, "long_name": self.data[coord].long_name},
)
self.data = self.data.assign_coords(coords=new_coords)
[docs]
def convert_physical_units(self, norms):
"""Convert physical-unit data vars and coords to generic simulation units of ``norms``."""
for data_var in self.data_vars:
data = self.data[data_var].data
if hasattr(data, "convert_physical_units"):
self.data[data_var].data = data.convert_physical_units(norms)
new_coords = {}
for coord in self.coords:
if hasattr(self.data[coord], "units"):
quantity = self.data[coord].data * self.data[coord].units
if hasattr(quantity, "convert_physical_units"):
new_coord = quantity.convert_physical_units(norms)
new_coords[coord] = (
coord,
new_coord.m,
{
"units": new_coord.units,
"long_name": self.data[coord].long_name,
},
)
self.data = self.data.assign_coords(coords=new_coords)
[docs]
def add_data(
self,
name: str,
data: ArrayLike,
coords: Tuple,
units: pint.Unit,
output_convention="pyrokinetics",
):
"""
Modifies existing GKOutput by adding specified data
Parameters
----------
data : ArrayLike
N-D array of data to be added in
coords: Tuple
Coordinates of the data provided. Must match shape of data
units: pint.Unit
Units of data
"""
if not hasattr(data, "units"):
data *= units
convention = getattr(self.norm, output_convention.lower())
data = data.to(convention)
self.data[name] = (coords, data)
[docs]
def read_gk_output(
path: PathLike,
norm: SimulationNormalisation,
output_convention: ConventionNormalisation = "pyrokinetics",
load_fields=True,
load_fluxes=True,
load_moments=False,
gk_type: Optional[str] = None,
**kwargs,
):
r"""
Read gyrokinetics output file(s) from disk, returning an `GKOutput` instance.
Parameters
----------
convention
path: PathLike
Location of the file(s) on disk.
norm: SimulationNormalisation
The normalisation scheme of the simulation that produced the data. Usually
generated by a Pyro object.
output_convention: ConventionNormalisation, default "pyrokinetics"
Convention to convert output to
load_fields: bool, default True
Flag to load fields or not
load_fluxes: bool, default True
Flag to load fluxes or not
load_moments: bool, default False
Flag to load moments or not
gk_type: Optional[str]
String specifying the type of gyrokinetics files. If unset, the file type
will be inferred automatically. Specifying the file type may improve
performance.
**kwargs:
Keyword arguments forwarded to the gyrokinetics output file reader.
Raises
------
ValueError
If path doesn't exist or isn't a file.
"""
return GKOutput.from_file(
path,
file_type=gk_type,
norm=norm,
output_convention=output_convention,
load_fields=load_fields,
load_fluxes=load_fluxes,
load_moments=load_moments,
**kwargs,
)
[docs]
def flux_surface_average(self):
"""
averages the fields over theta weighted by the Jacobian
"""
field_list = ["phi", "apar", "bpar"]
for field in field_list:
if field in self.data_vars:
if "theta" not in self.data[field].dims:
raise ValueError(f"{field} does not have a 'theta' dimension")
self.data[field] = (self.data[field] * self.data["jacobian"]).sum(
dim="theta"
) / self.data["jacobian"].sum(dim="theta")
[docs]
def supported_gk_output_types() -> List[str]:
"""
Returns a list of all registered GKOutput file types. These file types are
readable by ``from_file``.
"""
return GKOutput.supported_file_types()