Source code for aris_lite.water_budget

#!/usr/bin/env python

"""Water budget module

This module computes the snow/rain fraction, snow cover, evapotranspiration,
and soil water availability for crop modeling. It provides functions to process
meteorological and crop data to estimate water-related variables critical for
crop growth and yield estimation.
"""

__all__ = [
    "main_snow",
    "main_soil_water",
    "calc_snow",
    "calc_soil_water",
]

import dask.array as dask_arr
import numpy as np
import os
import xarray as xr
from typing import Iterable

import snowmaus


[docs] def calc_snow(ds: xr.Dataset) -> xr.Dataset: """ Calculate snowfall, meltwater production, and snow cover over time. This function simulates the snow accumulation and melting process based on precipitation and temperature data. It returns a dataset with snowfall, meltwater production, and snow cover for each time step. :param ds: Dataset containing variables "precipitation", "min_air_temp", "max_air_temp", and "initial_snowcover". :type ds: xr.Dataset :return: Dataset with variables "snowfall", "meltwater_production", and "snowcover". :rtype: xr.Dataset """ if ds.precipitation.isnull().all(): template = xr.DataArray( dask_arr.zeros_like(ds.precipitation, dtype="f4", chunks=(-1, 37, 41)), dims=ds.precipitation.dims, coords=ds.precipitation.coords, ) return xr.merge( [ template.rename("snowfall"), template.rename("meltwater_production"), template.rename("snowcover"), ] ) potential_melt = snowmaus.meltwater_production( ds.min_air_temp.values, ds.max_air_temp.values ) snowfall = snowmaus.snowfall(ds.precipitation.values, ds.min_air_temp.values) melt = np.zeros_like(snowfall) snowcover = np.zeros_like(snowfall) for i in range(snowcover.shape[0]): snowcover[i] = snowcover[i - 1] if i != 0 else ds.initial_snowcover.values snowcover[i] -= snowmaus.sublimed_snowcover(snowcover[i]) snowcover[i] += np.where(np.isnan(snowfall[i]), 0, snowfall[i]) melt[i] = np.where( potential_melt[i] > snowcover[i], snowcover[i], potential_melt[i] ) snowcover[i] -= np.where(np.isnan(melt[i]), 0, melt[i]) snowcover = np.where(ds.min_air_temp.isnull().all("time"), np.nan, snowcover) melt = np.where(ds.min_air_temp.isnull().all("time"), np.nan, melt) return xr.Dataset( { "snowfall": (ds.precipitation.dims, snowfall), "meltwater_production": (ds.precipitation.dims, melt), "snowcover": (ds.precipitation.dims, snowcover), }, coords=ds.precipitation.coords, )
[docs] def calc_soil_water(ds: xr.Dataset) -> xr.Dataset: """ Calculate evapotranspiration and soil water depletion for crops. This function estimates the actual evapotranspiration, crop evapotranspiration (ETC), and soil water depletion in different soil layers, based on crop coefficients, meteorological data, and soil properties. :param ds: Dataset containing variables "Kc_factor", "plant_height", "precipitation", "wind_speed", "rel_humidity", "pot_evapotransp", "snowfall", and "meltwater_production". :type ds: xr.Dataset :return: Dataset with variables "evapotranspiration", "evapo_ETC", and "soil_depletion". :rtype: xr.Dataset """ if ds.Kc_factor.isnull().all(): template = xr.DataArray( dask_arr.zeros( shape=(*ds.Kc_factor.shape, 2), dtype="float", chunks=(-1, -1, 37, 41, -1) if ds.Kc_factor.chunks else -1, ), dims=[*ds.Kc_factor.dims, "layer"], coords={ **{k: ds.Kc_factor[k] for k in ds.Kc_factor.coords}, "layer": ds.TAW.layer, }, ) return xr.merge( [ template.rename("evapotranspiration"), template.rename("evapo_ETC"), template.rename("soil_depletion"), ] ) # activate one of the two implementations # # below implements the formula from the Schaumber Thesis # pot_interc_precip = dask_arr.maximum( # 1.875 * ds.Kc_factor - 0.25, 0.2 * ds.pot_evapotransp # ) # below implements the (original) ARIS interception with and without minimum pot_interc_precip = dask_arr.maximum( (0.4 * ds.plant_height / ds.plant_height.max("time")).where( ds.time < (ds.plant_height.sel(time=slice(None, None, -1)) > 0).idxmax("time"), 0.1, ), 0.2 * ds.pot_evapotransp, ) liq_precip = ds.precipitation - ds.snowfall incoming_water = xr.where( pot_interc_precip <= liq_precip, liq_precip - pot_interc_precip, 0 ) incoming_water += ds.meltwater_production root_factor = xr.DataArray([0.6, 0.4], coords={"layer": ["top", "sub"]}) ET0 = ds.pot_evapotransp * root_factor def wind_profile_correction(input_height): # FAO wind profile (2 m reference height) return 4.87 / np.log((67.8 * input_height) - 5.42) wind_elev = 10 # m above ground climEff = ( (0.04 * ((wind_profile_correction(wind_elev) * ds.wind_speed).clip(1, 6) - 2)) - (0.004 * (ds.min_rel_hum.clip(20, 80) - 45)) ) * (ds.plant_height.clip(max=10).where(ds.plant_height > 0.1, 0) / 3) ** 0.3 Kc_plus_climEff = ds.Kc_factor + climEff ETC = Kc_plus_climEff * ET0 D_r = xr.DataArray( # soil depletion = missing water np.zeros((*Kc_plus_climEff.shape, 2), dtype="float32"), dims=[*Kc_plus_climEff.dims, "layer"], coords={ **{k: Kc_plus_climEff[k] for k in Kc_plus_climEff.coords}, "layer": ds.TAW.layer, }, ).rename("soil_depletion") ET = xr.DataArray( np.full((*Kc_plus_climEff.shape, 2), np.nan, dtype="float32"), dims=[*Kc_plus_climEff.dims, "layer"], coords={ **{k: Kc_plus_climEff[k] for k in Kc_plus_climEff.coords}, "layer": ds.TAW.layer, }, ).rename("evapotranspiration") for t in incoming_water.time[~incoming_water.time.dt.month.isin([1, 2, 12])].values: i = np.argwhere(t == incoming_water.time.values).flatten()[0] p__lower_lim, p__upper_lim = 0.1, 0.8 p_T = xr.DataArray( [0.55, 0.55, 0.55, 0.5, 0.6, 0.35, 0.35, 0.35, 0.35], coords={ "crop": [ "winter wheat", "spring barley", "maize", "soybean", "grassland", "norm potato", "wofost potato very early", "wofost potato mid", "wofost potato late", ] }, ) p = (p_T + (0.04 * (5 - ETC.sel(time=t)))).clip(p__lower_lim, p__upper_lim) Ks_i = ((1 - D_r.isel(time=i - 1) / ds.TAW).values / (1 - p)).clip(0, 1) ET[:, i] = ( ET0.sel(time=t) * Kc_plus_climEff.sel(time=t) * Ks_i ) # Ks_i has to be multiplied after Kc_... because of coord precedence top_shortage = ( # water in top layer before surplus is released D_r.isel(time=i - 1).squeeze().sel(layer="top") + ET.sel(time=t, layer="top") - incoming_water.sel(time=t) ) maybe_new_top_layer_value = top_shortage.where( top_shortage < ds.TAW.sel(layer="top"), ds.TAW.sel(layer="top") ) DP = (-top_shortage).clip(0) sub_shortage = ( D_r.isel(time=i - 1).squeeze().sel(layer="sub") + ET.sel(time=t, layer="sub") - DP ) maybe_new_sub_layer_value = sub_shortage.where( sub_shortage < ds.TAW.sel(layer="sub"), ds.TAW.sel(layer="sub") ) potential_depletion = xr.concat( [maybe_new_top_layer_value, maybe_new_sub_layer_value], dim="layer" ) D_r[:, i] = potential_depletion.clip(0) return xr.merge([ET, ETC.rename("evapo_ETC"), D_r])
[docs] def main_soil_water(years: Iterable[int]): """ Load input data and write soil water and evapotranspiration results to Zarr store. For each year, this function loads the necessary datasets, computes soil water depletion and evapotranspiration, and saves the results. :param years: List of years to compute. :type years: Iterable[int] """ for year in years: if os.path.isdir(f"../data/intermediate/{year}.zarr/soil_depletion"): print(f"! WARNING: {year}.zarr/soil_depletion already exists. Skipping.") continue print("Calculating soil water and evapotranspiration for year", year) pheno_ds = xr.open_zarr( f"../data/intermediate/{year}.zarr", decode_coords="all" ) snow_ds = xr.open_zarr( f"../data/intermediate/snow_{year}.zarr", decode_coords="all" ) meteo_ds = xr.open_zarr(f"../data/input/{year}.zarr", decode_coords="all") TAW = xr.open_dataarray("../data/input/soil_taw.nc", decode_coords="all") main_ds = xr.merge([pheno_ds, meteo_ds, TAW, snow_ds]).drop_vars( ["lambert_conformal_conic"] ) template = xr.DataArray( dask_arr.zeros( shape=(*main_ds.Kc_factor.shape, 2), dtype="f4", chunks=(4, -1, 37, 41, 2), ), dims=[*main_ds.Kc_factor.dims, "layer"], coords={ **{k: main_ds.Kc_factor[k] for k in main_ds.Kc_factor.coords}, "layer": TAW.layer, }, ) D_r = main_ds.map_blocks( calc_soil_water, template=xr.merge( [ template.rename("evapotranspiration"), template.rename("evapo_ETC"), template.rename("soil_depletion"), ] ), ) D_r.drop_encoding().to_zarr(f"../data/intermediate/{year}.zarr", mode="a-")
[docs] def main_snow(years: Iterable[int]): """ Load input data and write snow-related results to Zarr store. For each year, this function loads meteorological data, initializes snow cover, computes snowfall and meltwater production, and saves the results. :param years: List of years to compute. :type years: Iterable[int] """ for year in years: if os.path.isdir(f"../data/intermediate/snow_{year}.zarr"): print(f"! WARNING: snow_{year}.zarr already exists. Skipping.") continue main_ds = xr.open_zarr(f"../data/input/{year}.zarr", decode_coords="all") if os.path.isdir( f"../data/intermediate/snow_{year - 1}.zarr" ) and "snowcover" in xr.open_zarr(f"../data/intermediate/snow_{year - 1}.zarr"): main_ds["initial_snowcover"] = xr.open_zarr( f"../data/intermediate/snow_{year - 1}.zarr" ).snowcover.isel(time=-1) # next step is necessary! somehow this `xr.where` changes how the # data looks internally main_ds["precipitation"] = xr.where( main_ds.time.dt.month == 7, 0, main_ds.precipitation ) else: print( "\n! WARNING: snowcover data for previous year are missing; " "initializing with zero snowcover\nconsider not using data of this " "year for computing yield expectations\n" ) main_ds["initial_snowcover"] = xr.zeros_like( main_ds.precipitation.isel(time=0) ) main_ds["precipitation"] = xr.where( main_ds.time.dt.month < 8, 0, main_ds.precipitation ) print("Calculating snow related variables for year", year) template = xr.DataArray( dask_arr.zeros_like(main_ds.precipitation, dtype="f4", chunks=(-1, 37, 41)), dims=main_ds.precipitation.dims, coords=main_ds.precipitation.coords, ) main_ds.map_blocks( calc_snow, template=xr.merge( [ template.rename("snowfall"), template.rename("meltwater_production"), template.rename("snowcover"), ] ), ).drop_encoding().to_zarr(f"../data/intermediate/snow_{year}.zarr", mode="a")
def main_cli(): """ Command-line interface for computing snow/melt or soil water and evapotranspiration. Parses command-line arguments to determine which computations to perform and for which years. Initializes a Dask cluster for parallel processing, handles missing data, and manages workflow for snow and soil water calculations. Usage: aris-calc-waterbudget [-m MODE] [years ...] [--workers N] [--mem-per-worker SIZE] :return: None """ import argparse parser = argparse.ArgumentParser( description="computes either the snow/melt or the soil water " "and evapotranspiration" ) parser.add_argument( "-m", "--mode", type=str, choices=["snow", "soil", "auto"], default="auto", help="choose which part of the water budget to compute", ) parser.add_argument( "years", type=int, nargs="*", default=[2020, 2021, 2023], help="list years to compute", ) parser.add_argument("--workers", type=int, default=4, help="number of dask workers") parser.add_argument( "--mem-per-worker", type=str, default="2Gb", help='memory per worker, e.g. "5.67Gb"', ) args = parser.parse_args() args.years = sorted(args.years) if args.mode == "auto": if all( os.path.isdir(f"../data/intermediate/snow_{year}.zarr") for year in args.years ): print( "Snow related variables are present, assuming you mean to have the " "soil part of the water budget computed" ) args.mode = "soil" else: print( "Snow related variables are missing for year(s):", ", ".join( [ str(year) for year in args.years if not os.path.isdir(f"../data/intermediate/snow_{year}.zarr") ] ) + ".", "Computing these now.", ) args.mode = "snow" from dask.distributed import LocalCluster, Client print(f"Starting dask ({args.workers} CPUs, each {args.mem_per_worker} RAM)") client = Client( LocalCluster( n_workers=args.workers, memory_limit=args.mem_per_worker, death_timeout=30 ) ) print("... access the dashboard at", client.dashboard_link) try: if args.mode == "snow": main_snow(args.years) else: main_soil_water(args.years) except (FileNotFoundError,) as err: if str(err).startswith("Unable to find group"): print( "\n! ERROR: data missing. Verify that the necessary data are " "available.\n" ) raise finally: client.close() print("Closed dask client\n") print(f"Sucessfully computed {args.mode} related variables!\n") if args.mode == "snow": print( "Continue by computing the crop coefficients (needed to calculate the " "evapotranspiration later) by running\n\t`python phenology.py " "[year1 ...]`\n" ) else: print( "Continue by computing the expected yield by running\n\t`python " "yield_expectation.py [year1 ...]`\n" ) if __name__ == "__main__": main_cli()