# The MIT License (MIT)
#
# Copyright (c) 2018, TU Wien
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
'''
Readers for the C3S soil moisture products daily, dekadal (10-daily) and monthly
images as well as for timeseries generated using this module
'''
import pandas as pd
import os
import netCDF4 as nc
import numpy as np
from datetime import timedelta
from dateutil.relativedelta import relativedelta
from netCDF4 import num2date
from smecv_grid.grid import SMECV_Grid_v052
from pygeobase.object_base import Image
from pygeobase.io_base import ImageBase
from pygeobase.io_base import MultiTemporalImageBase
from pygeogrids.netcdf import load_grid
import warnings
from netCDF4 import Dataset
from pynetcf.time_series import GriddedNcOrthoMultiTs
from datetime import datetime
from parse import parse
from cadati.dekad import dekad_index, dekad_startdate_from_date
try:
import xarray as xr
xr_supported = True
except ImportError:
xr_supported = False
fntempl = "C3S-SOILMOISTURE-L3S-SSM{unit}-{prod}-{temp}-{datetime}-{cdr}-{vers}.{subvers}.nc"
[docs]class C3SImg(ImageBase):
"""
Class to read a single C3S image (for one time stamp)
"""
def __init__(self,
filename,
parameters=None,
mode='r',
subgrid=SMECV_Grid_v052(None),
flatten=False,
fillval=None):
"""
Parameters
----------
filename : str
Path to the file to read
parameters : str or Iterable, optional (default: 'sm')
Names of parameters in the file to read.
If None are passed, all are read.
mode : str, optional (default: 'r')
Netcdf file mode, choosing something different to r may delete data.
subgrid : SMECV_Grid_v052
A subgrid of points to read. All other GPIS are masked (2d reading)
or ignored (when flattened).
flatten: bool, optional (default: False)
If set then the data is read into 1D arrays. This is used to e.g
reshuffle the data for a subset of points.
fillval : float or dict or None, optional (default: np.nan)
Fill Value for masked pixels, if a dict is passed, this can be
set for each parameter individually, otherwise it applies to all.
Note that choosing np.nan can lead to a change in dtype for some
(int) parameters. None will use the fill value from the netcdf file
"""
self.path = os.path.dirname(filename)
self.fname = os.path.basename(filename)
super(C3SImg, self).__init__(os.path.join(self.path, self.fname), mode=mode)
if parameters is None:
parameters = []
if type(parameters) != list:
parameters = [parameters]
self.parameters = parameters
self.subgrid = subgrid # subset to read
self.grid = SMECV_Grid_v052(None) # global input image
self.flatten = flatten
self.image_missing = False
self.img = None # to be loaded
self.glob_attrs = None
if isinstance(fillval, dict):
self.fillval = fillval
for p in self.parameters:
if p not in self.fillval:
self.fillval[p] = None
else:
self.fillval ={p: fillval for p in self.parameters}
def _read_flat_img(self) -> (dict, dict, dict, datetime):
"""
Reads a single C3S image, flat with gpi0 as first element
"""
with Dataset(self.filename, mode='r') as ds:
timestamp = num2date(ds['time'], ds['time'].units,
only_use_cftime_datetimes=True,
only_use_python_datetimes=False)
assert len(timestamp) == 1, "Found more than 1 time stamps in image"
timestamp = timestamp[0]
param_img = {}
param_meta = {}
if len(self.parameters) == 0:
# all data vars, exclude coord vars
self.parameters = [k for k in ds.variables.keys()
if k not in ds.dimensions.keys()]
parameters = list(self.parameters)
for parameter in parameters:
metadata = {}
param = ds.variables[parameter]
data = param[:][0] # there is only 1 time stamp in the image
self.shape = (data.shape[0], data.shape[1])
# read long name, FillValue and unit
for attr in param.ncattrs():
metadata[attr] = param.getncattr(attr)
if parameter in self.fillval:
if self.fillval[parameter] is None:
self.fillval[parameter] = data.fill_value
common_dtype = np.find_common_type(
array_types=[data.dtype],
scalar_types=[type(self.fillval[parameter])])
self.fillval[parameter] = np.array([self.fillval[parameter]],
dtype=common_dtype)[0]
data = data.astype(common_dtype)
data = data.filled(self.fillval[parameter])
else:
self.fillval[parameter] = data.fill_value
data = data.filled()
data = np.flipud(data)
data = data.flatten()
metadata['image_missing'] = 0
param_img[parameter] = data
param_meta[parameter] = metadata
global_attrs = ds.__dict__
global_attrs['timestamp'] = str(timestamp)
return param_img, param_meta, global_attrs, timestamp
def _mask_and_reshape(self,
data: dict) -> dict:
"""
Takes the grid and drops points that are not active.
for flattened arrays that means that only the active gpis are kept.
for 2 arrays inactive gpis are set to nan.
Parameters
----------
data: dict
Variable names and flattened image data.
shape_2d : tuple
2d shape of the original image.
Returns
-------
dat : dict
Masked, reshaped data.
"""
# check if flatten. if flatten, dont crop and dont reshape
# if not flatten, reshape based on grid shape.
# select active gpis
for param, dat in data.items():
if self.flatten:
dat = dat[self.subgrid.activegpis]
else:
exclude = (~np.isin(self.grid.gpis, self.subgrid.activegpis))
dat[exclude] = self.fillval[param]
if len(self.shape) != 2:
raise ValueError(
"Reading 2d image needs grid with 2d shape"
"You can either use the global grid without subsets,"
"or make sure that you create a subgrid from bbox in"
"an area where no gpis are missing.")
dat = dat.reshape(self.shape)
data[param] = dat
return data
[docs] def read(self, timestamp=None):
"""
Read a single C3S image, if it exists, otherwise fill an empty image.
Parameters
----------
timestamp : datetime, optional (default: None)
Time stamp of the image, if this is passed, it is compared to
the time stamp from the loaded file and must match
"""
data, var_meta, glob_meta, img_timestamp = self._read_flat_img()
if timestamp is not None:
if img_timestamp is None:
img_timestamp = timestamp
assert img_timestamp == timestamp, "Time stamps do not match"
# when flattened, this drops already all non-active gpis
data = self._mask_and_reshape(data)
if self.flatten:
return Image(self.subgrid.activearrlon,
self.subgrid.activearrlat,
data,
var_meta,
timestamp)
else:
# also cut 2d case to active area
min_lat, min_lon = self.subgrid.activearrlat.min(), \
self.subgrid.activearrlon.min()
max_lat, max_lon = self.subgrid.activearrlat.max(), \
self.subgrid.activearrlon.max()
corners = self.grid.gpi2rowcol([
self.grid.find_nearest_gpi(min_lon, min_lat)[0], # llc
self.grid.find_nearest_gpi(max_lon, min_lat)[0], # lrc
self.grid.find_nearest_gpi(max_lon, max_lat)[0], # urc
])
rows = slice(corners[0][0], corners[0][2] + 1)
cols = slice(corners[1][0], corners[1][1] + 1)
return Image(self.grid.arrlon.reshape(*self.shape)[rows, cols],
np.flipud(self.grid.arrlat.reshape(*self.shape)[rows, cols]),
{k: np.flipud(v[rows, cols]) for k, v in data.items()},
var_meta,
timestamp)
[docs] def write(self, *args, **kwargs):
pass
[docs] def close(self, *args, **kwargs):
pass
[docs] def flush(self, *args, **kwargs):
pass
[docs]class C3S_Nc_Img_Stack(MultiTemporalImageBase):
"""
Class for reading multiple images and iterate over them.
"""
def __init__(self,
data_path,
parameters='sm',
subgrid=SMECV_Grid_v052(None),
flatten=False,
solve_ambiguity='sort_last',
fntempl=fntempl,
subpath_templ=None,
fillval=None):
"""
Parameters
----------
data_path : str
Path to directory where C3S images are stored
parameters : list or str, optional (default: 'sm')
Variables to read from the image files.
grid : pygeogrids.CellGrid, optional (default: SMECV_Grid_v052(None)
Subset of the image to read
array_1D : bool, optional (default: False)
Flatten the read image to a 1D array instead of a 2D array
solve_ambiguity : str, optional (default: 'latest')
Method to solve ambiguous time stamps, e.g. if a reprocessing
was performed.
- error: raises error in case of ambiguity
- sort_last (default): uses the last file when sorted by file
name, in case that multiple files are found.
- sort_first: uses the first file when sorted by file name
in case that multiple files are found.
filename_templ: str, optional
Filename template to parse datetime from.
subpath_templ : list or None, optional (default: None)
List of subdirectory names to build file paths. e.g. ['%Y'] if files
in collected by years.
fillval : float or dict or None, optional (default: None)
Fill Value for masked pixels, if a dict is passed, this can be
set for each parameter individually, otherwise it applies to all.
Note that choosing np.nan can lead to a change in dtype for some
parameters (int to float).
None will use the fill value from the netcdf file
"""
self.data_path = data_path
ioclass_kwargs = {'parameters': parameters,
'subgrid': subgrid,
'flatten': flatten,
'fillval': fillval}
self.fname_args = self._parse_filename(fntempl)
self.solve_ambiguity = solve_ambiguity
fn_args = self.fname_args.copy()
fn_args['subvers'] = '*'
fn_args['cdr'] = '*'
filename_templ = fntempl.format(**fn_args)
super(C3S_Nc_Img_Stack, self).__init__(path=data_path,
ioclass=C3SImg,
fname_templ=filename_templ ,
datetime_format="%Y%m%d%H%M%S",
subpath_templ=subpath_templ,
exact_templ=False,
ioclass_kws=ioclass_kwargs)
def _build_filename(self, timestamp:datetime, custom_templ:str=None,
str_param:dict=None):
"""
This function uses _search_files to find the correct
filename and checks if the search was unambiguous.
Parameters
----------
timestamp: datetime
datetime for given filename
custom_tmpl : string, optional
If given the fname_templ is not used but the custom_templ. This
is convenient for some datasets where not all file names follow
the same convention and where the read_image function can choose
between templates based on some condition.
str_param : dict, optional
If given then this dict will be applied to the fname_templ using
the fname_templ.format(**str_param) notation before the resulting
string is put into datetime.strftime.
"""
filename = self._search_files(timestamp, custom_templ=custom_templ,
str_param=str_param)
if len(filename) == 0:
raise IOError("No file found for {:}".format(timestamp.ctime()))
if len(filename) > 1:
filename = sorted(filename)
if self.solve_ambiguity == 'sort_last':
warnings.warn(f'Ambiguous file for {str(timestamp)} found.'
f' Sort and use last: {filename[-1]}, skipped {filename[:-1]}')
filename = [filename[-1]]
elif self.solve_ambiguity == 'sort_first':
warnings.warn(f'Ambiguous file for {str(timestamp)} found.'
f' Sort and use first: {filename[0]}')
filename = [filename[0]]
else:
raise IOError(
"File search is ambiguous {:}".format(filename))
return filename[0]
def _parse_filename(self, template):
"""
Search a file in the passed directory and use the filename template to
to read settings.
Parameters
-------
template : str
Template for all files in the passed directory.
Returns
-------
parse_result : parse.Result
Parsed content of filename string from filename template.
"""
for curr, subdirs, files in os.walk(self.data_path):
for f in files:
file_args = parse(template, f)
if file_args is None:
continue
else:
file_args = file_args.named
file_args['datetime'] = '{datetime}'
return file_args
raise IOError('No file name in passed directory fits to template')
[docs] def tstamps_for_daterange(self, start_date, end_date):
"""
Return dates in the passed period, with respect to the temp resolution
of the images in the path.
Parameters
----------
start_date: datetime
start of date range
end_date: datetime
end of date range
Returns
-------
timestamps : Iterator
list of datetime objects of each available image between
start_date and end_date
"""
if self.fname_args['temp'] == 'MONTHLY':
timestamps = pd.date_range(start_date, end_date, freq='MS').to_pydatetime()
elif self.fname_args['temp'] == 'DAILY':
timestamps = pd.date_range(start_date, end_date, freq='D').to_pydatetime()
elif self.fname_args['temp'] == 'DEKADAL':
timestamps = dekad_index(start_date, end_date).to_pydatetime()
timestamps = [dekad_startdate_from_date(d) for d in timestamps]
else:
raise NotImplementedError
return iter(timestamps)
[docs] def read(self, timestamp, **kwargs):
"""
Return an image for a specific timestamp.
Parameters
----------
timestamp : datetime.datetime
Time stamp.
Returns
-------
image : object
pygeobase.object_base.Image object
"""
try:
img = self._assemble_img(timestamp, **kwargs)
return img
except IOError:
warnings.warn(f'Could not load image for {timestamp}.')
raise IOError
[docs]class C3STs(GriddedNcOrthoMultiTs):
"""
Module for reading C3S time series in netcdf format.
"""
def __init__(self, ts_path, grid_path=None, remove_nans=False, drop_tz=True,
**kwargs):
"""
Class for reading C3S SM time series after reshuffling.
Parameters
----------
ts_path : str
Directory where the netcdf time series files are stored
grid_path : str, optional (default: None)
Path to grid file, that is used to organize the location of time
series to read. If None is passed, grid.nc is searched for in the
ts_path.
remove_nans : bool or dict, optional (default: False)
Replace fill values in SM time series. Either
- dict of form {parameter: {val_to_replace: replacement_val}, ... }
- dict of form {parameter : val_to_set_NaN ...}
- True to replace -9999 with nan anywhere
- False to do nothing
drop_tz: bool, optional (default: True)
Drop time zone information from time series
Optional keyword arguments that are passed to the Gridded Base:
------------------------------------------------------------------------
parameters : list, optional (default: None)
Specific variable names to read, if None are selected, all are read.
offsets : dict, optional (default:None)
Offsets (values) that are added to the parameters (keys)
scale_factors : dict, optional (default:None)
Offset (value) that the parameters (key) is multiplied with
ioclass_kws: dict
Optional keyword arguments to pass to OrthoMultiTs class:
----------------------------------------------------------------
read_bulk : boolean, optional (default:False)
if set to True the data of all locations is read into memory,
and subsequent calls to read_ts read from the cache and not from disk
this makes reading complete files faster#
read_dates : boolean, optional (default:False)
if false dates will not be read automatically but only on specific
request useable for bulk reading because currently the netCDF
num2date routine is very slow for big datasets
"""
if isinstance(remove_nans, dict):
for var, is_should in remove_nans.copy().items():
if not isinstance(is_should, dict):
remove_nans[var] = {is_should: np.nan}
self.remove_nans = remove_nans
if grid_path is None:
grid_path = os.path.join(ts_path, "grid.nc")
grid = load_grid(grid_path)
self.drop_tz = drop_tz
super(C3STs, self).__init__(ts_path, grid=grid, **kwargs)
def _read_gp(self, gpi, **kwargs):
"""Read a single point from passed gpi or from passed lon, lat """
# override the _read_gp function from parent class, to add dropna functionality
ts = super(C3STs, self)._read_gp(gpi, **kwargs)
if ts is None:
return None
if self.remove_nans:
if self.remove_nans == True:
ts = ts.replace(-9999.0000, np.nan)
else:
ts = ts.replace(self.remove_nans)
if not self.drop_tz:
ts.index = ts.index.tz_localize('UTC')
else:
if (hasattr(ts.index, 'tz') and (ts.index.tz is not None)):
ts.index = ts.index.tz_convert(None)
return ts
[docs] def read_cell(self, cell, var='sm') -> pd.DataFrame:
"""
Read all time series for a single variable in the selected cell.
Parameters
-------
cell: int
Cell number as in the c3s grid
var : str, optional (default: 'sm')
Name of the variable to read.
"""
file_path = os.path.join(self.path, '{}.nc'.format("%04d" % (cell,)))
with nc.Dataset(file_path) as ncfile:
loc_id = ncfile.variables['location_id'][:]
time = ncfile.variables['time'][:]
unit_time = ncfile.variables['time'].units
delta = lambda t: timedelta(t)
vfunc = np.vectorize(delta)
since = pd.Timestamp(unit_time.split('since ')[1])
time = since + vfunc(time)
variable = ncfile.variables[var][:]
variable = np.transpose(variable)
data = pd.DataFrame(variable, columns=loc_id, index=time)
if self.remove_nans:
if self.remove_nans == True:
data = data.replace(-9999, np.nan)
else:
data = data.replace(self.remove_nans)
return data
[docs] def iter_ts(self, **kwargs):
pass
[docs] def write_ts(self, *args, **kwargs):
pass