mlstream package

Submodules

mlstream.datasets module

class mlstream.datasets.LumpedBasin(data_root: pathlib.Path, basin: str, forcing_vars: List[T], dates: List[T], is_train: bool, train_basins: List[T], seq_length: int, with_attributes: bool = False, concat_static: bool = True, db_path: str = None, allow_negative_target: bool = False, scalers: Tuple[mlstream.scaling.InputScaler, mlstream.scaling.OutputScaler, Dict[str, mlstream.scaling.StaticAttributeScaler]] = None)

Bases: sphinx.ext.autodoc.importer._MockObject

PyTorch data set to work with the raw text files for lumped (daily basin-aggregated) forcings and streamflow.

Parameters:
  • data_root (Path) – Path to the main directory of the data set
  • basin (str) – Gauge-id of the basin
  • forcing_vars (List) – Names of forcing variables to use
  • dates (List) – Start and end date of the period.
  • is_train (bool) – If True, discharge observations are normalized and invalid discharge samples are removed
  • train_basins (List) – List of basins used in the training of the experiment this Dataset is part of. Needed to create the correct feature scalers (the ones that are calculated on these basins)
  • seq_length (int) – Length of the input sequence
  • with_attributes (bool, optional) – If True, loads and returns addtionaly attributes, by default False
  • concat_static (bool, optional) – If true, adds catchment characteristics at each time step to the meteorological forcing input data, by default True
  • db_path (str, optional) – Path to sqlite3 database file containing the catchment characteristics, by default None
  • allow_negative_target (bool, optional) – If False, will remove samples with negative target value from the dataset.
  • scalers (Tuple[InputScaler, OutputScaler, Dict[str, StaticAttributeScaler]], optional) – Scalers to normalize and resale input, output, and static variables. If not provided, the scalers will be initialized at runtime, which will result in poor performance if many datasets are created. Instead, it makes sense to re-use the scalers across datasets.
class mlstream.datasets.LumpedH5(h5_file: pathlib.Path, basins: List[T], db_path: str, concat_static: bool = True, cache: bool = False, no_static: bool = False)

Bases: sphinx.ext.autodoc.importer._MockObject

PyTorch data set to work with pre-packed hdf5 data base files. Should be used only in combination with the files processed from create_h5_files in the utils module.

Parameters:
  • h5_file (Path) – Path to hdf5 file, containing the bundled data
  • basins (List) – List containing the basin ids
  • db_path (str) – Path to sqlite3 database file, containing the catchment characteristics
  • concat_static (bool) – If true, adds catchment characteristics at each time step to the meteorological forcing input data, by default True
  • cache (bool, optional) – If True, loads the entire data into memory, by default False
  • no_static (bool, optional) – If True, no catchment attributes are added to the inputs, by default False

mlstream.datautils module

mlstream.datautils.get_basin_list(data_root: pathlib.Path, basin_type: str) → List[T]

Returns the list of basin names.

If basin_type is ‘C’ or ‘V’, the gauge_info.csv needs to contain a column ‘Cal_Val’ that indicates the basin type.

Parameters:
  • data_root (Path) – Path to base data directory, which contains a folder ‘gauge_info’ with the gauge_info.csv file
  • basin_type (str) – ‘C’ to return calibration stations only, ‘V’ to return validation stations only, ‘*’ to return all stations
Returns:

List of basin name strings

Return type:

list

mlstream.datautils.load_discharge(data_root: pathlib.Path, basins: List[T] = None) → pandas.core.frame.DataFrame

Loads observed discharge for (calibration) gauging stations.

Parameters:
  • data_root (Path) – Path to base data directory, which contains a directory ‘discharge’ with one or more nc-files.
  • basins (List, optional) – List of basins for which to return data. If None (default), all basins are returned
Returns:

A DataFrame with columns [date, basin, qobs], where ‘qobs’ contains the streamflow.

Return type:

pd.DataFrame

mlstream.datautils.load_forcings_lumped(data_root: pathlib.Path, basins: List[T] = None) → Dict[KT, VT]

Loads basin-lumped forcings.

Parameters:
  • data_root (Path) – Path to base data directory, which contains the directory ‘forcings/lumped/’, which contains one .rvt/.csv/.txt -file per basin.
  • basins (List, optional) – List of basins for which to return data. Default (None) returns data for all basins.
Returns:

Dictionary of forcings (pd.DataFrame) per basin

Return type:

dict

mlstream.datautils.load_static_attributes(db_path: pathlib.Path, basins: List[T], drop_lat_lon: bool = True, keep_features: List[T] = None) → pandas.core.frame.DataFrame

Loads attributes from database file into DataFrame and one-hot-encodes non-numerical features.

Parameters:
  • db_path (Path) – Path to sqlite3 database file
  • basins (List) – List containing the basin id
  • drop_lat_lon (bool) – If True, drops latitude and longitude column from final data frame, by default True
  • keep_features (List) – If a list is passed, a pd.DataFrame containing these features will be returned. By default, returns a pd.DataFrame containing the features used for training.
Returns:

Attributes in a pandas DataFrame. Index is basin id.

Return type:

pd.DataFrame

mlstream.datautils.reshape_data

Reshape data into LSTM many-to-one input samples

Parameters:
  • x (np.ndarray) – Input features of shape [num_samples, num_features]
  • y (np.ndarray) – Output feature of shape [num_samples, 1]
  • seq_length (int) – Length of the requested input sequences.
Returns:

  • x_new (np.ndarray) – Reshaped input features of shape [num_samples*, seq_length, num_features], where num_samples* is equal to num_samples - seq_length + 1, due to the need of a warm start at the beginning
  • y_new (np.ndarray) – The target value for each sample in x_new

mlstream.datautils.store_static_attributes(data_root: pathlib.Path, db_path: pathlib.Path = None, attribute_names: List[T] = None)

Loads catchment characteristics from text file and stores them in a sqlite3 table

Parameters:
  • data_root (Path) – Path to the main directory of the data set
  • db_path (Path, optional) – Path to where the database file should be saved. If None, stores the database in data_root/static_attributes.db. Default: None
  • attribute_names (List, optional) – List of attribute names to use. Default: use all attributes.
Raises:

RuntimeError – If attributes folder can not be found.

mlstream.experiment module

class mlstream.experiment.Experiment(data_root: pathlib.Path, is_train: bool, run_dir: pathlib.Path, start_date: str = None, end_date: str = None, basins: List[T] = None, forcing_attributes: List[T] = None, static_attributes: List[T] = None, seq_length: int = 10, concat_static: bool = False, no_static: bool = False, cache_data: bool = False, n_jobs: int = 1, seed: int = 0, allow_negative_target: bool = False, run_metadata: Dict[KT, VT] = {})

Bases: object

Main entrypoint for training and prediction.

get_nses() → Dict[KT, VT]

Calculates the experiment’s NSE for each calibration basin.

Validation basins are ignored since they don’t provide ground truth.

Returns:nses – Dictionary mapping basin ids to their NSE
Return type:Dict
Raises:AttributeError – If called before predicting
predict() → Dict[KT, VT]

Generates predictions with a trained model.

Returns:results – Dictionary containing the DataFrame of predictions and observations for each basin.
Return type:Dict
predict_basin(ds: mlstream.datasets.LumpedBasin, allow_negative_target: bool = False) → Tuple[numpy.ndarray, numpy.ndarray]

Predicts a single basin.

Parameters:
  • ds (LumpedBasin) – Dataset for the basin to predict
  • allow_negative_target (bool, default False) – If False, will clip predictions to values >= 0
Returns:

  • preds (np.ndarray) – Array containing the (rescaled) network prediction for the entire data period
  • obs (np.ndarray) – Array containing the observed discharge for the entire data period

set_model(model) → None

Set the model to use in the experiment.

train() → None

Train model.

mlstream.scaling module

class mlstream.scaling.InputScaler(data_root: pathlib.Path, basins: List[T], start_date: pandas._libs.tslibs.timestamps.Timestamp, end_date: pandas._libs.tslibs.timestamps.Timestamp, forcing_vars: List[T] = None)

Bases: mlstream.scaling.Scaler

class mlstream.scaling.OutputScaler(data_root: pathlib.Path, basins: List[T], start_date: pandas._libs.tslibs.timestamps.Timestamp, end_date: pandas._libs.tslibs.timestamps.Timestamp)

Bases: mlstream.scaling.Scaler

class mlstream.scaling.Scaler

Bases: object

normalize(feature: numpy.ndarray) → numpy.ndarray
rescale(feature: numpy.ndarray) → numpy.ndarray
class mlstream.scaling.StaticAttributeScaler(db_path: pathlib.Path, basins: List[T], variable_name: str)

Bases: mlstream.scaling.Scaler

mlstream.utils module

mlstream.utils.create_h5_files(data_root: pathlib.Path, out_file: pathlib.Path, basins: List[T], dates: List[T], forcing_vars: List[T], seq_length: int, allow_negative_target: bool)

Creates H5 training set.

Parameters:
  • data_root (Path) – Path to the main directory of the data set
  • out_file (Path) – Path of the location where the hdf5 file should be stored
  • basins (List) – List containing the gauge ids
  • dates (List) – List of start and end date of the discharge period to use, when combining the data.
  • forcing_vars (List) – Names of forcing variables
  • seq_length (int) – Length of the requested input sequences
  • allow_negative_target (bool, optional) – If False, will remove samples with negative target value from the dataset.
Raises:

FileExistsError – If file at this location already exists.

mlstream.utils.nse(qsim: numpy.ndarray, qobs: numpy.ndarray) → float

Calculates NSE, ignoring NANs in qobs.

\[\text{NSE} = 1 - \frac{\sum_{t=1}^T{(q_s^t - q_o^t)^2}}{\sum_{t=1}^T{(q_o^t - \bar{q}_o)^2}}\]
Parameters:
  • qsim (np.ndarray) – Predicted streamflow
  • qobs (np.ndarray) – Ground truth streamflow
Returns:

nse – The prediction’s NSE

Return type:

float

Raises:

ValueError – If lenghts of qsim and qobs are not equal.

mlstream.utils.prepare_data(cfg: Dict[KT, VT], basins: List[T]) → Dict[KT, VT]

Pre-processes training data.

Parameters:
  • cfg (Dict) – Dictionary containing the run config
  • basins (List) – List containing the gauge ids
Returns:

Dictionary containing the updated run config.

Return type:

Dict

mlstream.utils.setup_run(cfg: Dict[KT, VT]) → Dict[KT, VT]

Creates the folder structure for the experiment.

Parameters:cfg (Dict) – Dictionary containing the run config
Returns:Dictionary containing the updated run config
Return type:Dict
mlstream.utils.store_results(user_cfg: Dict[KT, VT], run_cfg: Dict[KT, VT], results: Dict[KT, VT])

Stores prediction results in a pickle file.

Parameters:
  • user_cfg (Dict) – Dictionary containing the user entered evaluation config
  • run_cfg (Dict) – Dictionary containing the run config loaded from the cfg.json file
  • results (Dict) – DataFrame containing the observed and predicted discharge.

Module contents