mlstream package¶
Subpackages¶
Submodules¶
mlstream.datasets module¶
-
class
mlstream.datasets.
LumpedBasin
(data_root: pathlib.Path, basin: str, forcing_vars: List[T], dates: List[T], is_train: bool, train_basins: List[T], seq_length: int, with_attributes: bool = False, concat_static: bool = True, db_path: str = None, allow_negative_target: bool = False, scalers: Tuple[mlstream.scaling.InputScaler, mlstream.scaling.OutputScaler, Dict[str, mlstream.scaling.StaticAttributeScaler]] = None)¶ Bases:
sphinx.ext.autodoc.importer._MockObject
PyTorch data set to work with the raw text files for lumped (daily basin-aggregated) forcings and streamflow.
Parameters: - data_root (Path) – Path to the main directory of the data set
- basin (str) – Gauge-id of the basin
- forcing_vars (List) – Names of forcing variables to use
- dates (List) – Start and end date of the period.
- is_train (bool) – If True, discharge observations are normalized and invalid discharge samples are removed
- train_basins (List) – List of basins used in the training of the experiment this Dataset is part of. Needed to create the correct feature scalers (the ones that are calculated on these basins)
- seq_length (int) – Length of the input sequence
- with_attributes (bool, optional) – If True, loads and returns addtionaly attributes, by default False
- concat_static (bool, optional) – If true, adds catchment characteristics at each time step to the meteorological forcing input data, by default True
- db_path (str, optional) – Path to sqlite3 database file containing the catchment characteristics, by default None
- allow_negative_target (bool, optional) – If False, will remove samples with negative target value from the dataset.
- scalers (Tuple[InputScaler, OutputScaler, Dict[str, StaticAttributeScaler]], optional) – Scalers to normalize and resale input, output, and static variables. If not provided, the scalers will be initialized at runtime, which will result in poor performance if many datasets are created. Instead, it makes sense to re-use the scalers across datasets.
-
class
mlstream.datasets.
LumpedH5
(h5_file: pathlib.Path, basins: List[T], db_path: str, concat_static: bool = True, cache: bool = False, no_static: bool = False)¶ Bases:
sphinx.ext.autodoc.importer._MockObject
PyTorch data set to work with pre-packed hdf5 data base files. Should be used only in combination with the files processed from create_h5_files in the utils module.
Parameters: - h5_file (Path) – Path to hdf5 file, containing the bundled data
- basins (List) – List containing the basin ids
- db_path (str) – Path to sqlite3 database file, containing the catchment characteristics
- concat_static (bool) – If true, adds catchment characteristics at each time step to the meteorological forcing input data, by default True
- cache (bool, optional) – If True, loads the entire data into memory, by default False
- no_static (bool, optional) – If True, no catchment attributes are added to the inputs, by default False
mlstream.datautils module¶
-
mlstream.datautils.
get_basin_list
(data_root: pathlib.Path, basin_type: str) → List[T]¶ Returns the list of basin names.
If basin_type is ‘C’ or ‘V’, the gauge_info.csv needs to contain a column ‘Cal_Val’ that indicates the basin type.
Parameters: - data_root (Path) – Path to base data directory, which contains a folder ‘gauge_info’
with the
gauge_info.csv
file - basin_type (str) – ‘C’ to return calibration stations only, ‘V’ to return validation stations only, ‘*’ to return all stations
Returns: List of basin name strings
Return type: list
- data_root (Path) – Path to base data directory, which contains a folder ‘gauge_info’
with the
-
mlstream.datautils.
load_discharge
(data_root: pathlib.Path, basins: List[T] = None) → pandas.core.frame.DataFrame¶ Loads observed discharge for (calibration) gauging stations.
Parameters: - data_root (Path) – Path to base data directory, which contains a directory ‘discharge’ with one or more nc-files.
- basins (List, optional) – List of basins for which to return data. If None (default), all basins are returned
Returns: A DataFrame with columns [date, basin, qobs], where ‘qobs’ contains the streamflow.
Return type: pd.DataFrame
-
mlstream.datautils.
load_forcings_lumped
(data_root: pathlib.Path, basins: List[T] = None) → Dict[KT, VT]¶ Loads basin-lumped forcings.
Parameters: - data_root (Path) – Path to base data directory, which contains the directory ‘forcings/lumped/’, which contains one .rvt/.csv/.txt -file per basin.
- basins (List, optional) – List of basins for which to return data. Default (None) returns data for all basins.
Returns: Dictionary of forcings (pd.DataFrame) per basin
Return type: dict
-
mlstream.datautils.
load_static_attributes
(db_path: pathlib.Path, basins: List[T], drop_lat_lon: bool = True, keep_features: List[T] = None) → pandas.core.frame.DataFrame¶ Loads attributes from database file into DataFrame and one-hot-encodes non-numerical features.
Parameters: - db_path (Path) – Path to sqlite3 database file
- basins (List) – List containing the basin id
- drop_lat_lon (bool) – If True, drops latitude and longitude column from final data frame, by default True
- keep_features (List) – If a list is passed, a pd.DataFrame containing these features will be returned. By default, returns a pd.DataFrame containing the features used for training.
Returns: Attributes in a pandas DataFrame. Index is basin id.
Return type: pd.DataFrame
-
mlstream.datautils.
reshape_data
¶ Reshape data into LSTM many-to-one input samples
Parameters: - x (np.ndarray) – Input features of shape [num_samples, num_features]
- y (np.ndarray) – Output feature of shape [num_samples, 1]
- seq_length (int) – Length of the requested input sequences.
Returns: - x_new (np.ndarray) – Reshaped input features of shape [num_samples*, seq_length, num_features], where num_samples* is equal to num_samples - seq_length + 1, due to the need of a warm start at the beginning
- y_new (np.ndarray) – The target value for each sample in x_new
-
mlstream.datautils.
store_static_attributes
(data_root: pathlib.Path, db_path: pathlib.Path = None, attribute_names: List[T] = None)¶ Loads catchment characteristics from text file and stores them in a sqlite3 table
Parameters: - data_root (Path) – Path to the main directory of the data set
- db_path (Path, optional) – Path to where the database file should be saved. If None, stores the database in data_root/static_attributes.db. Default: None
- attribute_names (List, optional) – List of attribute names to use. Default: use all attributes.
Raises: RuntimeError
– If attributes folder can not be found.
mlstream.experiment module¶
-
class
mlstream.experiment.
Experiment
(data_root: pathlib.Path, is_train: bool, run_dir: pathlib.Path, start_date: str = None, end_date: str = None, basins: List[T] = None, forcing_attributes: List[T] = None, static_attributes: List[T] = None, seq_length: int = 10, concat_static: bool = False, no_static: bool = False, cache_data: bool = False, n_jobs: int = 1, seed: int = 0, allow_negative_target: bool = False, run_metadata: Dict[KT, VT] = {})¶ Bases:
object
Main entrypoint for training and prediction.
-
get_nses
() → Dict[KT, VT]¶ Calculates the experiment’s NSE for each calibration basin.
Validation basins are ignored since they don’t provide ground truth.
Returns: nses – Dictionary mapping basin ids to their NSE Return type: Dict Raises: AttributeError
– If called before predicting
-
predict
() → Dict[KT, VT]¶ Generates predictions with a trained model.
Returns: results – Dictionary containing the DataFrame of predictions and observations for each basin. Return type: Dict
-
predict_basin
(ds: mlstream.datasets.LumpedBasin, allow_negative_target: bool = False) → Tuple[numpy.ndarray, numpy.ndarray]¶ Predicts a single basin.
Parameters: - ds (LumpedBasin) – Dataset for the basin to predict
- allow_negative_target (bool, default False) – If False, will clip predictions to values >= 0
Returns: - preds (np.ndarray) – Array containing the (rescaled) network prediction for the entire data period
- obs (np.ndarray) – Array containing the observed discharge for the entire data period
-
set_model
(model) → None¶ Set the model to use in the experiment.
-
train
() → None¶ Train model.
-
mlstream.scaling module¶
-
class
mlstream.scaling.
InputScaler
(data_root: pathlib.Path, basins: List[T], start_date: pandas._libs.tslibs.timestamps.Timestamp, end_date: pandas._libs.tslibs.timestamps.Timestamp, forcing_vars: List[T] = None)¶ Bases:
mlstream.scaling.Scaler
-
class
mlstream.scaling.
OutputScaler
(data_root: pathlib.Path, basins: List[T], start_date: pandas._libs.tslibs.timestamps.Timestamp, end_date: pandas._libs.tslibs.timestamps.Timestamp)¶ Bases:
mlstream.scaling.Scaler
-
class
mlstream.scaling.
Scaler
¶ Bases:
object
-
normalize
(feature: numpy.ndarray) → numpy.ndarray¶
-
rescale
(feature: numpy.ndarray) → numpy.ndarray¶
-
-
class
mlstream.scaling.
StaticAttributeScaler
(db_path: pathlib.Path, basins: List[T], variable_name: str)¶ Bases:
mlstream.scaling.Scaler
mlstream.utils module¶
-
mlstream.utils.
create_h5_files
(data_root: pathlib.Path, out_file: pathlib.Path, basins: List[T], dates: List[T], forcing_vars: List[T], seq_length: int, allow_negative_target: bool)¶ Creates H5 training set.
Parameters: - data_root (Path) – Path to the main directory of the data set
- out_file (Path) – Path of the location where the hdf5 file should be stored
- basins (List) – List containing the gauge ids
- dates (List) – List of start and end date of the discharge period to use, when combining the data.
- forcing_vars (List) – Names of forcing variables
- seq_length (int) – Length of the requested input sequences
- allow_negative_target (bool, optional) – If False, will remove samples with negative target value from the dataset.
Raises: FileExistsError
– If file at this location already exists.
-
mlstream.utils.
nse
(qsim: numpy.ndarray, qobs: numpy.ndarray) → float¶ Calculates NSE, ignoring NANs in
qobs
.\[\text{NSE} = 1 - \frac{\sum_{t=1}^T{(q_s^t - q_o^t)^2}}{\sum_{t=1}^T{(q_o^t - \bar{q}_o)^2}}\]Parameters: - qsim (np.ndarray) – Predicted streamflow
- qobs (np.ndarray) – Ground truth streamflow
Returns: nse – The prediction’s NSE
Return type: float
Raises: ValueError
– If lenghts of qsim and qobs are not equal.
-
mlstream.utils.
prepare_data
(cfg: Dict[KT, VT], basins: List[T]) → Dict[KT, VT]¶ Pre-processes training data.
Parameters: - cfg (Dict) – Dictionary containing the run config
- basins (List) – List containing the gauge ids
Returns: Dictionary containing the updated run config.
Return type: Dict
-
mlstream.utils.
setup_run
(cfg: Dict[KT, VT]) → Dict[KT, VT]¶ Creates the folder structure for the experiment.
Parameters: cfg (Dict) – Dictionary containing the run config Returns: Dictionary containing the updated run config Return type: Dict
-
mlstream.utils.
store_results
(user_cfg: Dict[KT, VT], run_cfg: Dict[KT, VT], results: Dict[KT, VT])¶ Stores prediction results in a pickle file.
Parameters: - user_cfg (Dict) – Dictionary containing the user entered evaluation config
- run_cfg (Dict) – Dictionary containing the run config loaded from the cfg.json file
- results (Dict) – DataFrame containing the observed and predicted discharge.