Source code for aris_lite.water_budget

#!/usr/bin/env python

"""Water budget module

This module computes the snow/rain fraction, snow cover, evapotranspiration,
and soil water availability for crop modeling. It provides functions to process
meteorological and crop data to estimate water-related variables critical for
crop growth and yield estimation.
"""

__all__ = [
    "run_water_budget_snow",
    "run_water_budget_soil",
    "run_calc_waterbudget",
    "main_snow",
    "main_soil_water",
    "main_cli",
    "calc_snow",
    "calc_soil_water",
]

import dask.array as dask_arr
import numpy as np
import xarray as xr
from typing import Iterable

from aris_lite.deprecation import warn_legacy_python_api
from aris_lite.paths import (
    DEFAULT_BASE_DIR,
    input_taw,
    intermediate_snow,
    intermediate_year,
    reference_year,
)

import snowmaus


[docs] def calc_snow(ds: xr.Dataset) -> xr.Dataset: """ Calculate snowfall, meltwater production, and snow cover over time. This function simulates the snow accumulation and melting process based on precipitation and temperature data. It returns a dataset with snowfall, meltwater production, and snow cover for each time step. :param ds: Dataset containing variables "precipitation", "min_air_temp", "max_air_temp", and "initial_snowcover". :type ds: xr.Dataset :return: Dataset with variables "snowfall", "meltwater_production", and "snowcover". :rtype: xr.Dataset """ if ds.precipitation.isnull().all(): template = xr.DataArray( dask_arr.zeros_like(ds.precipitation, dtype="f4", chunks=(-1, 37, 41)), dims=ds.precipitation.dims, coords=ds.precipitation.coords, ) return xr.merge( [ template.rename("snowfall"), template.rename("meltwater_production"), template.rename("snowcover"), ] ) potential_melt = snowmaus.meltwater_production( ds.min_air_temp.values, ds.max_air_temp.values ) snowfall = snowmaus.snowfall(ds.precipitation.values, ds.min_air_temp.values) melt = np.zeros_like(snowfall) snowcover = np.zeros_like(snowfall) for i in range(snowcover.shape[0]): snowcover[i] = snowcover[i - 1] if i != 0 else ds.initial_snowcover.values snowcover[i] -= snowmaus.sublimed_snowcover(snowcover[i]) snowcover[i] += np.where(np.isnan(snowfall[i]), 0, snowfall[i]) melt[i] = np.where( potential_melt[i] > snowcover[i], snowcover[i], potential_melt[i] ) snowcover[i] -= np.where(np.isnan(melt[i]), 0, melt[i]) snowcover = np.where(ds.min_air_temp.isnull().all("time"), np.nan, snowcover) melt = np.where(ds.min_air_temp.isnull().all("time"), np.nan, melt) return xr.Dataset( { "snowfall": (ds.precipitation.dims, snowfall), "meltwater_production": (ds.precipitation.dims, melt), "snowcover": (ds.precipitation.dims, snowcover), }, coords=ds.precipitation.coords, )
[docs] def calc_soil_water(ds: xr.Dataset) -> xr.Dataset: """ Calculate evapotranspiration and soil water depletion for crops. This function estimates the actual evapotranspiration, crop evapotranspiration (ETC), and soil water depletion in different soil layers, based on crop coefficients, meteorological data, and soil properties. :param ds: Dataset containing variables "Kc_factor", "plant_height", "precipitation", "wind_speed", "rel_humidity", "pot_evapotransp", "snowfall", and "meltwater_production". :type ds: xr.Dataset :return: Dataset with variables "evapotranspiration", "evapo_ETC", and "soil_depletion". :rtype: xr.Dataset """ if ds.Kc_factor.isnull().all(): template = xr.DataArray( dask_arr.zeros( shape=(*ds.Kc_factor.shape, 2), dtype="float", chunks=(-1, -1, 37, 41, -1) if ds.Kc_factor.chunks else -1, ), dims=[*ds.Kc_factor.dims, "layer"], coords={ **{k: ds.Kc_factor[k] for k in ds.Kc_factor.coords}, "layer": ds.TAW.layer, }, ) return xr.merge( [ template.rename("evapotranspiration"), template.rename("evapo_ETC"), template.rename("soil_depletion"), ] ) # activate one of the two implementations # # below implements the formula from the Schaumber Thesis # pot_interc_precip = dask_arr.maximum( # 1.875 * ds.Kc_factor - 0.25, 0.2 * ds.pot_evapotransp # ) # below implements the (original) ARIS interception with and without minimum pot_interc_precip = dask_arr.maximum( (0.4 * ds.plant_height / ds.plant_height.max("time")).where( ds.time < (ds.plant_height.sel(time=slice(None, None, -1)) > 0).idxmax("time"), 0.1, ), 0.2 * ds.pot_evapotransp, ) liq_precip = ds.precipitation - ds.snowfall incoming_water = xr.where( pot_interc_precip <= liq_precip, liq_precip - pot_interc_precip, 0 ) incoming_water += ds.meltwater_production root_factor = xr.DataArray([0.6, 0.4], coords={"layer": ["top", "sub"]}) ET0 = ds.pot_evapotransp * root_factor def wind_profile_correction(input_height): # FAO wind profile (2 m reference height) return 4.87 / np.log((67.8 * input_height) - 5.42) wind_elev = 10 # m above ground climEff = ( (0.04 * ((wind_profile_correction(wind_elev) * ds.wind_speed).clip(1, 6) - 2)) - (0.004 * (ds.min_rel_hum.clip(20, 80) - 45)) ) * (ds.plant_height.clip(max=10).where(ds.plant_height > 0.1, 0) / 3) ** 0.3 Kc_plus_climEff = ds.Kc_factor + climEff ETC = Kc_plus_climEff * ET0 D_r = xr.DataArray( # soil depletion = missing water np.zeros((*Kc_plus_climEff.shape, 2), dtype="float32"), dims=[*Kc_plus_climEff.dims, "layer"], coords={ **{k: Kc_plus_climEff[k] for k in Kc_plus_climEff.coords}, "layer": ds.TAW.layer, }, ).rename("soil_depletion") ET = xr.DataArray( np.full((*Kc_plus_climEff.shape, 2), np.nan, dtype="float32"), dims=[*Kc_plus_climEff.dims, "layer"], coords={ **{k: Kc_plus_climEff[k] for k in Kc_plus_climEff.coords}, "layer": ds.TAW.layer, }, ).rename("evapotranspiration") p__lower_lim, p__upper_lim = 0.1, 0.8 p_T = xr.DataArray( [0.55, 0.55, 0.55, 0.5, 0.6, 0.35, 0.35, 0.35, 0.35], coords={ "crop": [ "winter wheat", "spring barley", "maize", "soybean", "grassland", "norm potato", "wofost potato very early", "wofost potato mid", "wofost potato late", ] }, ) for t in incoming_water.time[~incoming_water.time.dt.month.isin([1, 2, 12])].values: i = np.argwhere(t == incoming_water.time.values).flatten()[0] p = (p_T + (0.04 * (5 - ETC.sel(time=t)))).clip(p__lower_lim, p__upper_lim) # raise Exception(f"{(1 - D_r.isel(time=i - 1) / ds.TAW) / (1 - p)}") Ks_i = ((1 - D_r.isel(time=i - 1) / ds.TAW) / (1 - p)).clip(0, 1) ET[:, i] = ( ETC.sel(time=t) * Ks_i ) # Ks_i has to be multiplied after Kc_... because of coord precedence # if i == 92: # print(p, Ks_i, ET[:, i]) # # raise top_shortage = ( # water in top layer before surplus is released D_r.isel(time=i - 1).squeeze().sel(layer="top") + ET.sel(time=t, layer="top") - incoming_water.sel(time=t) ) maybe_new_top_layer_value = top_shortage.where( top_shortage < ds.TAW.sel(layer="top"), ds.TAW.sel(layer="top") ) DP = (-top_shortage).clip(0) sub_shortage = ( D_r.isel(time=i - 1).squeeze().sel(layer="sub") + ET.sel(time=t, layer="sub") - DP ) maybe_new_sub_layer_value = sub_shortage.where( sub_shortage < ds.TAW.sel(layer="sub"), ds.TAW.sel(layer="sub") ) potential_depletion = xr.concat( [maybe_new_top_layer_value, maybe_new_sub_layer_value], dim="layer" ) D_r[:, i] = potential_depletion.clip(0) return xr.merge([ET, ETC.rename("evapo_ETC"), D_r])
[docs] def run_water_budget_soil( years: Iterable[int], base_dir: str = str(DEFAULT_BASE_DIR), ): """ Load input data and write soil water and evapotranspiration results to Zarr store. For each year, this function loads the necessary datasets, computes soil water depletion and evapotranspiration, and saves the results. :param years: List of years to compute. :type years: Iterable[int] """ for year in years: yearly_intermediate = intermediate_year(year, base_dir=base_dir) yearly_snow = intermediate_snow(year, base_dir=base_dir) yearly_reference = reference_year(year, base_dir=base_dir) taw_path = input_taw(base_dir=base_dir) if (yearly_intermediate / "soil_depletion").exists(): print( f"! WARNING: {yearly_intermediate}/soil_depletion already " "exists. Skipping." ) continue print("Calculating soil water and evapotranspiration for year", year) pheno_ds = xr.open_zarr(str(yearly_intermediate), decode_coords="all") snow_ds = xr.open_zarr(str(yearly_snow), decode_coords="all") meteo_ds = xr.open_zarr(str(yearly_reference), decode_coords="all") TAW = xr.open_dataarray(str(taw_path), decode_coords="all") main_ds = xr.merge([pheno_ds, meteo_ds, TAW, snow_ds]).drop_vars( ["lambert_conformal_conic"] ) template = xr.DataArray( dask_arr.zeros( shape=(*main_ds.Kc_factor.shape, 2), dtype="f4", chunks=(4, -1, 37, 41, 2), ), dims=[*main_ds.Kc_factor.dims, "layer"], coords={ **{k: main_ds.Kc_factor[k] for k in main_ds.Kc_factor.coords}, "layer": TAW.layer, }, ) D_r = main_ds.map_blocks( calc_soil_water, template=xr.merge( [ template.rename("evapotranspiration"), template.rename("evapo_ETC"), template.rename("soil_depletion"), ] ), ) D_r.drop_encoding().to_zarr(str(yearly_intermediate), mode="a-")
[docs] def run_water_budget_snow( years: Iterable[int], base_dir: str = str(DEFAULT_BASE_DIR), ): """ Load input data and write snow-related results to Zarr store. For each year, this function loads meteorological data, initializes snow cover, computes snowfall and meltwater production, and saves the results. :param years: List of years to compute. :type years: Iterable[int] """ for year in years: yearly_snow = intermediate_snow(year, base_dir=base_dir) previous_yearly_snow = intermediate_snow(year - 1, base_dir=base_dir) yearly_reference = reference_year(year, base_dir=base_dir) if yearly_snow.exists(): print(f"! WARNING: {yearly_snow} already exists. Skipping.") continue main_ds = xr.open_zarr(str(yearly_reference), decode_coords="all") if previous_yearly_snow.exists() and "snowcover" in xr.open_zarr( str(previous_yearly_snow) ): main_ds["initial_snowcover"] = xr.open_zarr( str(previous_yearly_snow) ).snowcover.isel(time=-1) # next step is necessary! somehow this `xr.where` changes how the # data looks internally main_ds["precipitation"] = xr.where( main_ds.time.dt.month == 7, 0, main_ds.precipitation ) else: print( "\n! WARNING: snowcover data for previous year are missing; " "initializing with zero snowcover\nconsider not using data of this " "year for computing yield expectations\n" ) main_ds["initial_snowcover"] = xr.zeros_like( main_ds.precipitation.isel(time=0) ) main_ds["precipitation"] = xr.where( main_ds.time.dt.month < 8, 0, main_ds.precipitation ) print("Calculating snow related variables for year", year) template = xr.DataArray( dask_arr.zeros_like(main_ds.precipitation, dtype="f4", chunks=(-1, 37, 41)), dims=main_ds.precipitation.dims, coords=main_ds.precipitation.coords, ) main_ds.map_blocks( calc_snow, template=xr.merge( [ template.rename("snowfall"), template.rename("meltwater_production"), template.rename("snowcover"), ] ), ).drop_encoding().to_zarr(str(yearly_snow), mode="a")
[docs] def run_calc_waterbudget( years: Iterable[int], mode: str = "auto", workers: int = 4, mem_per_worker: str = "2Gb", base_dir: str = str(DEFAULT_BASE_DIR), ): """ Command-line interface for computing snow/melt or soil water and evapotranspiration. Parses command-line arguments to determine which computations to perform and for which years. Initializes a Dask cluster for parallel processing, handles missing data, and manages workflow for snow and soil water calculations. Usage: aris-calc-waterbudget [-m MODE] [years ...] [--workers N] [--mem-per-worker SIZE] :return: None """ years = sorted(years) mode = mode.lower() if mode == "auto": if all( intermediate_snow(year, base_dir=base_dir).exists() for year in years ): print( "Snow related variables are present, assuming you mean to have the " "soil part of the water budget computed" ) mode = "soil" else: print( "Snow related variables are missing for year(s):", ", ".join( [ str(year) for year in years if not intermediate_snow(year, base_dir=base_dir).exists() ] ) + ".", "Computing these now.", ) mode = "snow" from dask.distributed import LocalCluster, Client print(f"Starting dask ({workers} CPUs, each {mem_per_worker} RAM)") client = Client( LocalCluster( n_workers=workers, memory_limit=mem_per_worker, death_timeout=30 ) ) print("... access the dashboard at", client.dashboard_link) try: if mode == "snow": run_water_budget_snow(years=years, base_dir=base_dir) else: run_water_budget_soil(years=years, base_dir=base_dir) except (FileNotFoundError,) as err: if str(err).startswith("Unable to find group"): print( "\n! ERROR: data missing. Verify that the necessary data are " "available.\n" ) raise finally: client.close() print("Closed dask client\n") print(f"Successfully computed {mode} related variables!\n") if mode == "snow": print( "Continue by computing the crop coefficients (needed to calculate the " "evapotranspiration later) by running\n\t`aris calc pheno " "[year1 ...]`\n" ) else: print( "Continue by computing the expected yield by running\n\t" "`aris calc yield --mode both [year1 ...]`\n" )
[docs] def main_soil_water( years: Iterable[int], base_dir: str = str(DEFAULT_BASE_DIR), ): """Legacy alias for :func:`run_water_budget_soil`.""" warn_legacy_python_api( "aris_lite.water_budget.main_soil_water", "aris_lite.water_budget.run_water_budget_soil", ) run_water_budget_soil(years=years, base_dir=base_dir)
[docs] def main_snow( years: Iterable[int], base_dir: str = str(DEFAULT_BASE_DIR), ): """Legacy alias for :func:`run_water_budget_snow`.""" warn_legacy_python_api( "aris_lite.water_budget.main_snow", "aris_lite.water_budget.run_water_budget_snow", ) run_water_budget_snow(years=years, base_dir=base_dir)
[docs] def main_cli(argv: list[str] | None = None) -> int: """Legacy CLI alias for `aris-calc-waterbudget`; use `aris calc waterbudget`.""" warn_legacy_python_api( "aris_lite.water_budget.main_cli", "aris_lite.cli.main:main_cli", ) from aris_lite.cli.legacy_wrappers import legacy_waterbudget_cli return legacy_waterbudget_cli(argv=argv, emit_warning=False)
if __name__ == "__main__": raise SystemExit(main_cli())