Source code for pyhomogenize._time_control

import xarray as xr
from iteration_utilities import duplicates

from . import _consts as consts
from ._netcdf_basics import netcdf_basics


[docs]class time_control(netcdf_basics): """Class for dealing with a netCDF file's time axis. The :class:`time_control` contains the :class:`Įšetcdf_basics`. """
[docs] def __init__(self, *args, **kwargs): netcdf_basics.__init__(self, *args, **kwargs) del self.frequency del self.calendar self.calendar = self.calendar() self.time = self._equalize_time(self.time()) self.time_encoding = self.ds.time.encoding self.ds["time"] = self.time self.ds.time.encoding = self.time_encoding self.frequency = self.frequency() self.time_fmt = self.time_fmt() self.equalize = self.equalize()
[docs] def time(self): """netCDF file's time axis""" return self._convert_time(self.ds.time)
[docs] def frequency(self): """netCDF file's frequency""" return self._get_frequency()
[docs] def time_fmt(self): """predefined explicit format string derived from `frequency`""" return consts.fmt[self.ds.frequency]
[docs] def equalize(self): """predefined list of ``datetime.datetime`` instance attributes to be ignored """ return consts.equalize[self.ds.frequency]
[docs] def calendar(self): """Calendar type read from netCDF file""" if hasattr(self.ds.time, "calendar"): return self.ds.time.calendar if hasattr(self.time, "calendar"): return self.time.calendar return self.ds.time.dt.calendar
def _get_frequency(self): """Get frequency of xr.Dataset""" try: frequency = xr.infer_freq(self.ds.time) except ValueError: frequency = None if not frequency: try: frequency = consts.frequencies[self.ds.frequency] except Exception: print("Could not determine any frequency") return if "frequency" not in self.ds.attrs: self.ds.attrs["frequency"] = self._get_key_to_value( consts.frequencies, frequency ) return frequency def _duplicates(self): """Get duplicated time steps.""" time = self._equalize_time(self.time, ignore=self.equalize) return sorted(list(duplicates(time))) def _missings(self): """Get missing time steps.""" time = self._equalize_time(self.time, ignore=self.equalize) date_range = self._equalize_time( self.date_range( time[0], time[-1], self.frequency, calendar=self.calendar, ), ignore=self.equalize, ) return sorted(list(set(date_range).difference(time))) def _redundants(self): """Get redundant time steps.""" time = self._equalize_time(self.time, ignore=self.equalize) date_range = self._equalize_time( self.date_range( time[0], time[-1], self.frequency, calendar=self.calendar, ), ignore=self.equalize, ) return sorted(list(set(time).difference(date_range))) def _write_timesteps(self, timesteps, naming): """Write timesteps to variable attributes.""" timesteps = self._convert_to_string(timesteps) self._dictionary(naming, self.name, timesteps) self.to_variable_attributes(timesteps, naming)
[docs] def get_duplicates(self): """Get string of duplicated time steps.""" return self._convert_to_string(self._duplicates())
[docs] def get_missings(self): """Get string of missing time steps.""" return self._convert_to_string(self._missings())
[docs] def get_redundants(self): """Get string of redundant time steps.""" return self._convert_to_string(self._redundants())
[docs] def check_timestamps( self, selection=["duplicates", "redundants", "missings"], output=None, correct=False, ): """Check netCDF file's time axis. Exist whether duplicated, missing and/or redundant time steps. Parameters ---------- selection: str or list, default=['duplicates','redundants','missings'] Check which kind of time steps exist. output: str, optional Write result on disk. correct: bool, default: False Delete located time steps from xr.Dataset. Automatically set True if output. Returns ------- ``pyhomogenize.time_control`` object Example ------- To check netCDF file's time axis whether duplicated, missing and/or redundant time steps exist and result write on disk:: from pyhomogenize import time_control time_control('input.nc').check_timestamps(output='output.nc') """ if isinstance(selection, str): selection = [selection] deletes = [] time = self._equalize_time(self.time, ignore=self.equalize) for select in selection: nmng = consts.naming[select] if not select.startswith("_"): select = "_" + select add = getattr(self, select)() for a in add: loc = [i for i, e in enumerate(time) if e == a][1:] deletes += loc self._write_timesteps(add, nmng) dlist = list(dict.fromkeys(deletes)) timesteps = [n for n, t in enumerate(time) if n not in dlist] if output: correct = True if correct: self.ds = self.ds.isel(time=timesteps) self.time = self._convert_time(self.ds.time) if output: self.write(output=output) return self
[docs] def select_time_range(self, time_range, output=None): """Select user-given time slice from xr.Dataset Parameters ---------- time_range: list List of two strings or ``cftime.datatime`` object representing the left and right time bounds output: str, optional Write result on disk. Returns ------- ``pyhomogenize.time_control`` object Example ------- To select time slice from netCDF file.:: from pyhomogenize import time_control time_control('input.nc').select_time_range( ['2005-01-01','2005-12-31'], output='output.nc') """ start_date, end_date = time_range if not isinstance(start_date, str): start_date = self.date_to_str(start_date) if not isinstance(end_date, str): end_date = self.date_to_str(end_date) self.ds = self.ds.sel(time=slice(start_date, end_date)) self.time = self._convert_time(self.ds.time) if output: self.write(output=output) return self
[docs] def select_limited_time_range(self, output=None, **kwargs): """Select time slice from xr.Dataset satisfying user-given conditions. See pyh.basics.date_range_to_frequency_limits. Parameters ---------- output: str, optional Write result on disk. kwargs: Optional parameters transferred to function `date_range_to_frequency_limits`: smonth emonth is_month_start is_month_end Returns ------- ``pyhomogenize.time_control`` object Example ------- To select time slice from netCDF file starts with the first month of any season and ends with the last month of any season. The time slice is then e.g. from 2005-03-16 to 2005-11-16:: from pyhomogenize import time_control time_control('input.nc').select_limited_time_range( smonth=[3,6,9,12], emonth=[2,5,8,11], output='output.nc') """ date_range = self.date_range_to_frequency_limits( self, date_range=self.time, frequency=self.frequency, get_range=True, **kwargs, ) self.ds = self.ds.sel(time=date_range) self.time = self._convert_time(self.ds.time) if output: self.write(output=output) return self
[docs] def within_time_range(self, requested_time_range, fmt=None): """ Checks whether netCDF files time axis is within user-given borders. Parameters ---------- requested_time_range: list List of two strings or ``cftime.datatime`` object representing the left and right time bounds fmt: str, default: '%Y-%m-%dT%H:%M:%S' Explicit format string for converting string into ``cftime.datetime`` object Returns ------- bool Example ------- To check whether netCDF files time axis is within user-given borders.:: from pyhomogenize import time_control within = time_control('input.nc').within_time_range(['2005-01-02', '2005-12-31']) """ avail_start = self.time[0] avail_end = self.time[-1] req_start = requested_time_range[0] req_end = requested_time_range[-1] if isinstance(req_start, str): req_start = self.str_to_date( req_start, fmt=fmt, calendar=self.calendar, ) if isinstance(req_end, str): req_end = self.str_to_date( req_end, fmt=fmt, calendar=self.calendar, mode="end" ) key = self.ds.frequency for unit_of_time in consts.within[key]: astart = getattr(avail_start, unit_of_time) rstart = getattr(req_start, unit_of_time) if astart > rstart: return False if astart == rstart: continue break for unit_of_time in consts.within[key]: aend = getattr(avail_end, unit_of_time) rend = getattr(req_end, unit_of_time) if aend < rend: return False if aend == rend: continue break return True
def add_time_bounds( self, frequency=None, add=True, **kwargs, ): """ Add time bounds to dataset calculated from time axis. Parameters ---------- frequency: str or list, default:'D' CF frequency string or list of CF frequency strings or frequency string or list of frequency strings for use with ``cftime`` calendars https://xarray.pydata.org/en/stable/generated/xarray.cftime_range.html add: bool, default: True If True add time_bounds to `ds`. If False return time bounds. Returns ------- ``pyhomogenize.time_control`` object """ da_time = self.ds.time.copy() da_time = da_time.reset_coords(drop=True) if frequency is None: frequency = self.frequency if self.ds.time.values.size > 1: start = self.ds.time.values[0] end = self.ds.time.values[-1] else: try: start = self.ds.time.values[0] except Exception: start = self.ds.time.values[()] end = None kwargs["periods"] = 2 tbounds = self.get_time_bounds( start=start, end=end, dims=da_time.dims, coords=da_time.coords, frequency=frequency, **kwargs, ) if add is False: return tbounds self.ds = self.ds.assign({"time_bnds": tbounds}) self.ds["time_bnds"].encoding = self.ds["time"].encoding self.ds["time"].attrs["bounds"] = "time_bnds" self._encoding_coordinates() return self