import copy
from datetime import datetime as dt
from datetime import timedelta as td
import cftime
import pandas as pd
import xarray as xr
from . import _consts as consts
[docs]class basics:
"""Class for controlling netCDF CF standard time axis.
The :class:`basics` contains some basics functions.
Parameters
----------
fmt: str, default: '%Y-%m-%dT%H:%M:%S'
Time format for converting strings into ``cftime.datetime`` object
calendar: str, default: 'standard'
Calendar type for the datetimes.
"""
[docs] def __init__(
self,
fmt="%Y-%m-%dT%H:%M:%S",
calendar="standard",
frequency="D",
):
self.fmt = fmt
self.calendar = calendar
self.frequency = frequency
[docs] def fmt(self, fmt):
"""Time format for converting strings into
``cftime.datetime`` object.
"""
return fmt
[docs] def calendar(self, calendar):
"""Calendar type for the datetimes.
Will be overwritten by netCDF file's inherent calendar.
"""
return calendar
[docs] def frequency(self, frequency):
"""Frequency strings or list of strings can have multiples.
Will be overwritten by netCDF file's inherent frequency.
"""
return frequency
def _flatten_list(self, lst):
"""Flatten a list containing strings and lists
of arbitrarily nested lists
Parameters
----------
lst: list
List containing strings and lists of arbitrarily nested lists
Returns
-------
list
"""
rt = []
for i in lst:
if isinstance(i, list):
rt.extend(self._flatten_list(i))
else:
rt.append(i)
return rt
def _convert_to_string(self, values, delim=",", fmt=None):
"""Converts list of strings and ``cftime.datetime`` or
``datetime.datetime`` objects to string.
Parameters
----------
values: list
List of strings and ``cftime.datetime`` or
``datetime.datetime`` objects
delim: str, default: ','
Output string delimiter between input list entries
fmt: str, default: '%Y-%m-%dT%H:%M:%S'
Explicit format string for converting string into
``cftime.datetime`` object
Consider only if list element is ``cftime.datetime`` object
Returns
-------
str
"""
if not fmt:
fmt = self.fmt
converted = ""
for v in values:
try:
v = self.date_to_str(v, fmt=fmt)
except Exception:
pass
converted += str(v) + delim
return converted[:-1]
def _dictionary(self, attr, keys, value):
"""Write or update attribute using key-value pairs
Parameters
----------
attr: str
Attribute name
keys: str
Key names
values: any type
Key values
"""
for key in keys:
if attr in self.__dict__:
getattr(self, attr)[key] = value
else:
setattr(self, attr, {key: value})
def _get_key_to_value(self, dict, value):
"""Get key of key-value pair using value
Parameters
----------
dict: dict
Dictionary containing key-value pair
value: any type
Value of key-value pair
Returns
-------
key: str
Key of key-value pair
"""
key_list = list(dict.keys())
val_list = list(dict.values())
position = val_list.index(value)
return key_list[position]
def _get_date_attr(self, frequencies):
"""Get ``cftime.datetime`` instance attribute from CF frequency
Parameters
----------
frequencies: str or list
CF frequency or list of CF frequencies
Returns
-------
attribute: str
``cftime.datetime`` instance attribute
"""
f = self._get_key_to_value(consts.frequencies, frequencies)
return consts.translator[f]
[docs] def str_to_date(self, str, fmt=None, mode="start", calendar=None):
"""Converts string to ``cftime.datetime`` object
Parameters
----------
str: str
string representing the time
fmt: str, default: '%Y-%m-%dT%H:%M:%S'
Explicit format string
calendar: str, default: 'standard'
Calendar type for the datetimes
mode: {'start', 'end'}, default: 'start'
`start`: Set ``cftime.datetime`` instance attributes
not represented by `fmt` to 0.
`end`: Set ``cftime.datetime`` instance attributes
not represented by `fmt` to last possible values
considering `str`.
Returns
-------
``cftime.datetime`` object
"""
i = 1
if not fmt:
fmt = self.fmt
if not calendar:
calendar = self.calendar
if mode not in ["start", "end"]:
return
while True:
try:
date = dt.strptime(str, fmt[:i])
break
except Exception:
i += 1
cfdate = cftime.datetime(
date.year,
date.month,
date.day,
calendar=calendar,
)
if mode == "start":
return cfdate
if mode == "end":
return cfdate + td(days=1) - td(seconds=1)
[docs] def date_to_str(self, date, fmt=None):
"""Converts ``cftime.datetime`` or ``datetime.datetime`` object
to string
Parameters
----------
date: ``cftime.datetime`` or ``datetime.datetime`` object
time to be converted
fmt: str, default: '%Y-%m-%dT%H:%M:%S'
Explicit format string
Returns
-------
str: str
string representing the time
"""
if not fmt:
fmt = self.fmt
return date.strftime(fmt)
def _convert_time(self, time, calendar="proleptic_gregorian"):
"""Converts time object to ``datetime.datetime`` object"""
def cftimeindex(time_index, calendar):
datetime_calendar = consts.cftime_calendars[calendar]
cf_datetime = [
getattr(cftime, datetime_calendar)(
t.year,
t.month,
t.day,
t.hour,
t.minute,
)
for t in time_index
]
return xr.CFTimeIndex(cf_datetime)
if hasattr(self, "calendar"):
calendar = self.calendar
try:
if "units" in time.attrs:
time_index = pd.to_datetime(
time.indexes["time"].astype("str"),
format=time.units.split(" ")[-1],
)
else:
time_index = pd.to_datetime(
time.indexes["time"],
)
return cftimeindex(
time_index,
calendar=calendar,
)
except Exception:
if "time" in time.indexes:
return time.indexes["time"]
else:
t = time.values[()]
return cftimeindex(t, calendar=calendar)
def _equalize_time(
self,
time,
ignore=["second", "microsecond", "nanosecond"],
):
"""Ignore ``datetime.datetime`` instance attributes and set them to 0
Parameters
----------
time: CFTimeIndex
CFTimeIndex containing cftime.datetime objects
ignore: list, default: ['second','microsecond','nanosecond']
list of datetime.datetime instance attributes to be ignored
Returns
-------
time: CFTimeIndex
``CFTimeIndex`` containing ``cftime.datetime`` objects
with ignored instance attributes
"""
n = 0
while n < len(ignore):
if isinstance(time, list):
etime = time
else:
etime = time.tolist()
query = {ignr: 0 for ignr in ignore[::-1][n:]}
try:
i = 0
while i < len(etime):
etime[i] = etime[i].replace(**query)
i += 1
return etime
except Exception:
n += 1
if isinstance(time, list):
return time
return time.tolist()
def _interpret_frequency(self, freq):
if isinstance(freq, str):
if freq in consts.frequencies.keys():
freq = consts.frequencies[freq]
return freq
def _mid_timestep(self, freq, st, end, calendar=None, **kwargs):
"""Build ``CFTimeIndex``
Set elements between user-given frequencies
Parameters
----------
freq: list
list of two frequency strings for use with ``cftime`` calendars
https://xarray.pydata.org/en/stable/generated/xarray.cftime_range.html
st: cftime.datetime or datetime.datetime
Left bound for generating dates
end: cftime.datetime or datetime.datetime
Right bound for generating dates
calendar: str, default: 'standard'
Calendar type for the datetimes
Returns
-------
CFTimeIndex
"""
if not calendar:
calendar = self.calendar
t1 = xr.cftime_range(
st,
end,
freq=freq[0],
calendar=calendar,
**kwargs,
)
t2 = xr.cftime_range(
st,
periods=len(t1),
freq=freq[1],
calendar=calendar,
**kwargs,
)
return t1 + (t2 - t1) / 2
def _point_timestep(self, freq, st, end, calendar=None, **kwargs):
"""Build ``CFTimeIndex``
Set elements to user-given frequency
Parameters
----------
freq: str
frequency string for use with ``cftime`` calendars
https://xarray.pydata.org/en/stable/generated/xarray.cftime_range.html
st: cftime.datetime or datetime.datetime
Left bound for generating dates
end: cftime.datetime or datetime.datetime
Right bound for generating dates
calendar: str, default: 'standard'
Calendar type for the datetimes
Returns
-------
CFTimeIndex
"""
if not calendar:
calendar = self.calendar
return xr.cftime_range(st, end, freq=freq, calendar=calendar, **kwargs)
[docs] def date_range(self, start, end, frequency="D", calendar=None, **kwargs):
"""Build ``CFTimeIndex``
Parameters
----------
start: cftime.datetime or datetime.datetime or str
Left bound for generating dates
end: cftime.datetime or datetime.datetime or str
Right bound for generating dates
frequency: str ot list
frequency string or list of frequency strings
for use with ``cftime`` calendars
https://xarray.pydata.org/en/stable/generated/xarray.cftime_range.html
Set elements to user-given frequency if type of frequency is str
Set elements between user-given frequencies if type
calendar: str, default: 'standard'
Returns
-------
CFTimeIndex
"""
if not calendar:
calendar = self.calendar
frequency = self._interpret_frequency(frequency)
if isinstance(frequency, str):
return self._point_timestep(
frequency,
start,
end,
calendar=calendar,
**kwargs,
)
if isinstance(frequency, list):
return self._mid_timestep(
frequency,
start,
end,
calendar=calendar,
**kwargs,
)
[docs] def is_month_start(self, cftime_range):
"""Check whether each element of ``CFTimeIndex``
is first day of the month
Parameters
----------
cftime_range: CFTimeIndex
CFTimeIndex containing cftime.datetime objects
Returns
-------
list
list of boolean values
"""
array = []
for cftr in cftime_range:
prev = cftr - td(days=1)
if prev.month != cftr.month:
array += [True]
continue
array += [False]
return array
[docs] def is_month_end(self, cftime_range):
"""Check whether each element of ``CFTimeIndex``
is last day of the month
Parameters
----------
cftime_range: CFTimeIndex
CFTimeIndex containing cftime.datetime objects
Returns
-------
list
list of boolean values
"""
array = []
for cftr in cftime_range:
next = cftr + td(days=1)
if next.month != cftr.month:
array += [True]
continue
array += [False]
return array
[docs] def date_range_to_frequency_limits(
self,
start=None,
end=None,
date_range=None,
frequency=None,
calendar=None,
smonth=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
emonth=[12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
is_month_start=None,
is_month_end=None,
get_range=False,
**kwargs,
):
"""Get bounds of CFTimeIndex which satisfy user-given conditions.
Parameters
----------
start: str or datetime.datetime or cftime.cftime, optional
Left bound for generating dates
end: str or datetime.datetime or cftime.cftime, optional
Right bound for generating dates
date_range: CFTimeIndex, optional
CFTimeIndex containing cftime.datetime objects
frequency: str ot list
CF frequency string or list of CF frequency strings
or frequency string or list of frequency strings
for use with ``cftime`` calendars
https://xarray.pydata.org/en/stable/generated/xarray.cftime_range.html
calendar: str, default: 'standard'
Calendar type for the datetimes
smonth: list, default: [1,2,3,4,5,6,7,8,9,10,11,12]
One of the allowed values of left bound's
``datetime.datetime`` instance attribute month
emonth: list, default: [12,11,10,9,8,7,6,5,4,3,2,1]
One of the allowed values of right bound's
``datetime.datetime`` instance attribute month
is_month_start: bool, optional
Value of left bound's ``datetime.datetime`` instance attribute day
must be equal to first day of the month
Automatically set to True for sub_monthly frequencies
is_month_end: bool, optional
Value of right bound's ``datetime.datetime`` instance attribute day
must be equal to last day of the month
Automatically set to True for sub_monthly frequencies
get_range: bool, default:False
If False returns left and right bounds
If True returns CFTimeIndex
Returns
-------
tuple
Tuple of start and end dates satisfying all conditions
Example
-------
To get bounds of CFTimeIndex which satisfy user-given conditions.
Monthly frequency values with a 360-day calendar have to start with
the first month of any season and have to end with the last month
of any season.
It returns the tuple (2005-03-16 00:00:00 2005-11-16 00:00:00)::
from pyhomogenize import basics
basics = basics()
start, end = basics.date_range_to_frequency_limits(
start='2005-01-01',
end='2005-12-30',
smonth=[3,6,9,12],
emonth=[2,5,8,11],
calendar='360_day',
)
"""
if date_range is None:
if not frequency:
frequency = self.frequency
if not calendar:
calendar = self.calendar
frequency = self._interpret_frequency(frequency)
if start is None or end is None:
return None, None
date_range = self.date_range(
start,
end,
frequency=frequency,
calendar=calendar,
**kwargs,
)
if isinstance(date_range, list):
date_range = xr.CFTimeIndex(date_range)
start = date_range[0]
end = date_range[-1]
sdate_range = copy.copy(date_range)
edate_range = copy.copy(date_range)
if is_month_start is None:
is_month_start = consts.is_month[self._get_date_attr(frequency)]
if is_month_end is None:
is_month_end = consts.is_month[self._get_date_attr(frequency)]
if is_month_start:
sdate_range = date_range[self.is_month_start(date_range)]
if is_month_end:
edate_range = date_range[self.is_month_end(date_range)]
if sdate_range.empty:
return None, None
if edate_range.empty:
return None, None
for sdate in sdate_range:
if sdate < start:
continue
if sdate.month in smonth:
break
sdate = None
if not sdate:
return None, None
for edate in reversed(edate_range):
if edate > end:
continue
if edate.month in emonth:
break
edate = None
if not edate:
return None, None
if get_range:
return xr.CFTimeIndex(
[cft for cft in date_range if cft >= sdate and cft <= edate]
)
else:
return sdate, edate
def get_time_bounds(
self,
start=None,
end=None,
date_range=None,
frequency=None,
calendar=None,
l_freq=None,
u_freq=None,
tdelta=None,
dims={},
coords={},
**kwargs,
):
"""Get time bounds of CFTimeIndex.
Parameters
----------
start: str or datetime.datetime or cftime.cftime, optional
Left bound for generating dates
end: str or datetime.datetime or cftime.cftime, optional
Right bound for generating dates
date_range: CFTimeIndex, optional
CFTimeIndex containing cftime.datetime objects
frequency: str or list
CF frequency string or list of CF frequency strings
or frequency string or list of frequency strings
for use with ``cftime`` calendars
https://xarray.pydata.org/en/stable/generated/xarray.cftime_range.html
calendar: str, default: 'standard'
Calendar type for the datetimes
l_freq: str, optional
CF frequency string for left bounds
If None take time frequency-dependent string
u_freq: str, optional
CF frequency string for right bounds
If None take time frequency-dependent string
tdelta: int, optional
Number of hours to substract from left bounds
and add to right bounds
If None take time frequency-dependent integer
dims: dict
Time dimensions.
coords: dict
Time coordinates.
Returns
-------
xr.DataArray
"""
if start is None:
if date_range is None:
raise ValueError("Choose one of: start or date_range")
start = date_range[0]
end = date_range[-1]
frequency = date_range.freq
if frequency is None:
frequency = self.frequency
frequency = self._interpret_frequency(frequency)
if calendar is None:
calendar = self.calendar
f = self._get_key_to_value(consts.frequencies, frequency)
if l_freq is None:
l_freq = consts.tbounds[f][0]
if u_freq is None:
u_freq = consts.tbounds[f][1]
if tdelta is None:
tdelta = consts.tbounds[f][2]
if isinstance(start, str):
start = self.str_to_date(start)
if isinstance(end, str):
end = self.str_to_date(end)
ll = self.date_range(
start=start,
end=end,
frequency=l_freq,
**kwargs,
)
ul = self.date_range(
start=start,
end=end,
frequency=u_freq,
**kwargs,
)
if end is None:
end = start
ll = ll[:-1]
ul = ul[1:]
while True:
if ll[0] > start:
shift_l = ll.shift(-1, freq=l_freq)
ll = self.date_range(
start=shift_l[0],
end=ll[-1],
frequency=l_freq,
)
if ll[0] > start:
shift_u = ul.shift(-1, freq=u_freq)
ul = self.date_range(
start=shift_u[0],
end=ul[-1],
frequency=u_freq,
)
else:
break
while True:
if ul[-1] < end:
shift_u = ul.shift(1, freq=u_freq)
ul = self.date_range(
start=ul[0],
end=shift_u[-1],
frequency=u_freq,
)
if ul[-1] < end:
shift_l = ll.shift(1, freq=l_freq)
ll = self.date_range(
start=ll[0],
end=shift_l[-1],
frequency=l_freq,
)
else:
break
if start == end and len(ll) > 1:
ll = ll[:-1]
ll_ = ll - td(hours=tdelta)
if ll.equals(ul):
bounds = xr.DataArray(ll_, coords=coords, dims=dims)
dim_str = "".join(dims)
bounds = bounds.rename({dim_str: "bnds"})
else:
ul_ = ul + td(hours=tdelta)
lower = xr.DataArray(ll_, coords=coords, dims=dims)
upper = xr.DataArray(ul_, coords=coords, dims=dims)
bounds = xr.concat([lower, upper], dim="bnds")
# first = (bounds.isel({"time": 0}) - td(days=1)).assign_coords(
# {"time": start}
# )
# result = xr.concat([first, bounds], dim="time")
result = bounds
return result.transpose(..., "bnds")