Source code for pyrokinetics.kinetics.iterdb

import re
from textwrap import dedent
from typing import Optional

import numpy as np
from path import Path

from ..constants import deuterium_mass, electron_mass, hydrogen_mass, tritium_mass
from ..equilibrium import Equilibrium
from ..file_utils import FileReader
from ..species import Species
from ..typing import PathLike
from ..units import UnitSpline
from ..units import ureg as units
from .kinetics import Kinetics

_FLOAT_PATTERN = re.compile(r"[+-]?(?:(?:\d+\.\d*)|(?:\.\d+)|(?:\d+))(?:[Ee][+-]?\d+)?")

_ION_MASSES = {
    "hydrogen": hydrogen_mass,
    "deuterium": deuterium_mass,
    "tritium": tritium_mass,
}


def _numbers_from_line(line):
    return [float(match.group(0)) for match in _FLOAT_PATTERN.finditer(line)]


def _read_iterdb_blocks(filename: PathLike):
    with open(filename) as file:
        lines = file.readlines()

    blocks = {}
    index = 0
    while index < len(lines):
        line = lines[index]
        if "DEPENDENT VARIABLE LABEL" not in line or "INDEPENDENT" in line:
            index += 1
            continue

        variable = line.split()[0].upper()

        while index < len(lines) and "# OF X PTS" not in lines[index]:
            index += 1
        if index == len(lines):
            raise ValueError(f"ITERDB block '{variable}' is missing X grid size")
        nx = int(_numbers_from_line(lines[index])[0])

        index += 1
        if index == len(lines) or "# OF Y PTS" not in lines[index]:
            raise ValueError(f"ITERDB block '{variable}' is missing Y grid size")
        ny = int(_numbers_from_line(lines[index])[0])

        expected_values = nx + ny + nx * ny
        values = []
        index += 1
        while index < len(lines) and len(values) < expected_values:
            values.extend(_numbers_from_line(lines[index]))
            index += 1

        if len(values) < expected_values:
            raise ValueError(
                f"ITERDB block '{variable}' ended before all data was read"
            )

        x_grid = np.asarray(values[:nx])
        time_grid = np.asarray(values[nx : nx + ny])
        data = np.asarray(values[nx + ny : expected_values]).reshape(ny, nx)
        blocks[variable] = {
            "x": x_grid,
            "time": time_grid,
            "data": data,
        }

    if not blocks:
        raise ValueError("No ITERDB UFILE profile blocks were found")

    return blocks


def _select_time_slice(block, time_index: int = -1, time: Optional[float] = None):
    if time_index != -1 and time is not None:
        raise ValueError("Cannot set both `time` and `time_index`")

    if time is not None:
        time_index = int(np.argmin(np.abs(block["time"] - time)))
    return block["x"], block["data"][time_index]


def _profile(blocks, name, time_index: int, time: Optional[float], required=True):
    try:
        return _select_time_slice(blocks[name], time_index, time)
    except KeyError:
        if required:
            raise ValueError(f"ITERDB file does not contain required profile '{name}'")
        return None


def _rho_from_equilibrium(eq: Equilibrium):
    rho = eq["r_minor"].data
    rho = rho / rho[-1] * units.lref_minor_radius
    return UnitSpline(eq["psi_n"].data, rho)


def _psi_n_from_rho_tor(eq: Equilibrium, rho_tor):
    rho_tor_grid = eq["rho_tor"].data
    psi_n_grid = eq["psi_n"].data
    rho_tor_values = np.asarray(
        rho_tor_grid.magnitude if hasattr(rho_tor_grid, "magnitude") else rho_tor_grid,
        dtype=float,
    )
    psi_n_values = np.asarray(
        psi_n_grid.magnitude if hasattr(psi_n_grid, "magnitude") else psi_n_grid,
        dtype=float,
    )
    return np.interp(np.asarray(rho_tor, dtype=float), rho_tor_values, psi_n_values)


[docs] class KineticsReaderITERDB(FileReader, file_type="ITERDB", reads=Kinetics):
[docs] def read_from_file( self, filename: PathLike, eq: Equilibrium = None, time_index: int = -1, time: Optional[float] = None, main_ion: str = "deuterium", main_ion_charge: float = 1.0, main_ion_mass=None, rotation_sign: float = 1.0, use_rhotor_as_rho: bool = False, ) -> Kinetics: """ Reads an ITERDB/UFILE-style profile file. The reader expects profile blocks with ``RHOTOR`` as the independent radial coordinate and at least ``TE``, ``TI``, ``NE`` and ``NM1``. ``VROT`` is used when present, otherwise the angular rotation is set to zero. """ filename = Path(filename) if not filename.is_file(): raise FileNotFoundError(filename) if eq is None and not use_rhotor_as_rho: raise ValueError(dedent(f"""\ {self.__class__.__name__} must be provided with an Equilibrium object via the keyword argument 'eq'. Please load an Equilibrium. """)) blocks = _read_iterdb_blocks(filename) te_rhotor, te = _profile(blocks, "TE", time_index, time) ti_rhotor, ti = _profile(blocks, "TI", time_index, time) ne_rhotor, ne = _profile(blocks, "NE", time_index, time) ni_rhotor, ni = _profile(blocks, "NM1", time_index, time) vrot_profile = _profile(blocks, "VROT", time_index, time, required=False) if eq is None: electron_coord = te_rhotor * units.dimensionless ion_temp_coord = ti_rhotor * units.dimensionless electron_dens_coord = ne_rhotor * units.dimensionless ion_dens_coord = ni_rhotor * units.dimensionless else: electron_coord = _psi_n_from_rho_tor(eq, te_rhotor) * units.dimensionless ion_temp_coord = _psi_n_from_rho_tor(eq, ti_rhotor) * units.dimensionless electron_dens_coord = ( _psi_n_from_rho_tor(eq, ne_rhotor) * units.dimensionless ) ion_dens_coord = _psi_n_from_rho_tor(eq, ni_rhotor) * units.dimensionless electron_temp_func = UnitSpline(electron_coord, te * units.eV) ion_temp_func = UnitSpline(ion_temp_coord, ti * units.eV) electron_dens_func = UnitSpline(electron_dens_coord, ne * units.meter**-3) ion_dens_func = UnitSpline(ion_dens_coord, ni * units.meter**-3) if vrot_profile is None: omega_coord = electron_coord omega = np.zeros(len(te_rhotor)) * units.radians / units.second else: vrot_rhotor, vrot = vrot_profile if eq is None: omega_coord = vrot_rhotor * units.dimensionless else: omega_coord = _psi_n_from_rho_tor(eq, vrot_rhotor) * units.dimensionless omega = rotation_sign * vrot * units.radians / units.second omega_func = UnitSpline(omega_coord, omega) if use_rhotor_as_rho: rho_func = UnitSpline(electron_coord, te_rhotor * units.lref_minor_radius) else: rho_func = _rho_from_equilibrium(eq) unit_charge_array = np.ones(len(electron_coord)) electron_charge = UnitSpline( electron_coord, -1 * unit_charge_array * units.elementary_charge ) if main_ion_mass is None: try: main_ion_mass = _ION_MASSES[main_ion.lower()] except KeyError: raise ValueError( "main_ion_mass must be provided when main_ion is not one of " f"{', '.join(sorted(_ION_MASSES))}" ) if not hasattr(main_ion_mass, "units"): main_ion_mass = main_ion_mass * units.kg ion_charge = main_ion_charge * units.elementary_charge ion_charge_func = UnitSpline(electron_coord, ion_charge * unit_charge_array) electron = Species( species_type="electron", charge=electron_charge, mass=electron_mass, dens=electron_dens_func, temp=electron_temp_func, omega0=omega_func, rho=rho_func, ) ion = Species( species_type=main_ion, charge=ion_charge_func, mass=main_ion_mass, dens=ion_dens_func, temp=ion_temp_func, omega0=omega_func, rho=rho_func, ) return Kinetics(kinetics_type="ITERDB", electron=electron, **{main_ion: ion})
[docs] def verify_file_type(self, filename: PathLike) -> None: filename = Path(filename) if not filename.is_file(): raise FileNotFoundError(filename) blocks = _read_iterdb_blocks(filename) required_profiles = {"TE", "TI", "NE", "NM1"} missing_profiles = required_profiles.difference(blocks) if missing_profiles: raise ValueError( "ITERDB file is missing required profiles " f"{', '.join(sorted(missing_profiles))}" )