Source code for aris_lite.phenology

#!/usr/bin/env python

"""Phenology module

This module computes the crop coefficients (Kc) and plant heights for different crops
based on temperature and crop-specific phenological rules. These variables are essential
for modeling crop growth, water use, and yield.
"""

__all__ = [
    "run_calc_pheno",
    "main",
    "main_cli",
    "compute_phenology_variables",
]

from aris_lite import T_crop_names
from aris_lite.deprecation import warn_legacy_python_api
from aris_lite.paths import DEFAULT_BASE_DIR, intermediate_year, reference_year
from dask import array as dask_arr
import numpy as np
import operator
import pandas as pd
from typing import Iterable
import xarray as xr


class Kc_condition_atom:
    """
    Internal comparator class for phenology conditions.

    Represents a single comparison operation (e.g., >= threshold) to be applied
    to a DataArray, supporting both temporal and numeric comparisons.

    :param comparator: Comparison function (e.g., operator.ge).
    :type comparator: callable
    :param value: Value to compare against (float, pd.Timestamp, or xr.DataArray).
    :type value: float | pd.Timestamp | xr.DataArray
    """

    def __init__(
        self, comparator: callable, value: float | pd.Timestamp | xr.DataArray
    ):
        self.comparator = comparator
        self.value = value

    def __str__(self):
        return (
            "Custom condition(comparator: "
            f"{self.comparator}, reference value: {self.value})"
        )

    @property
    def is_temporal(self) -> bool:
        return isinstance(
            self.value, pd.Timestamp
        ) or pd.api.types.is_datetime64_any_dtype(self.value)

    def compare(self, other: xr.DataArray) -> xr.DataArray:
        if self.is_temporal:
            years = np.unique(other.time.dt.year)
            assert years.shape == (
                1,
            )  # implement handling longer time series if relevant
            if isinstance(self.value, pd.Timestamp):
                comp_val = self.value.replace(year=years[0])
            else:
                # self.value is derived from dataset; year does not need to be adapted
                comp_val = self.value
            return self.comparator(other.time.broadcast_like(other), comp_val)
        else:
            return self.comparator(other, self.value)


class Kc_condition:
    """
    Internal class to combine multiple Kc_condition_atom comparators.

    Allows combining several conditions (e.g., after a date AND above a threshold)
    to define phenological stages.

    :param condition_tuple: Tuple of Kc_condition_atom instances.
    :type condition_tuple: tuple[Kc_condition_atom]
    """

    def __init__(self, *condition_tuple: "Kc_condition_atom"):
        self.condition_tuple = condition_tuple

    def __str__(self):
        return "\n".join(
            [
                "Custom condition containing condition atoms:",
                *[str(c) for c in self.condition_tuple],
            ]
        )

    def compare(self, other: xr.DataArray) -> xr.DataArray:
        # True if all conditions met
        return xr.concat(
            [condition_atom.compare(other) for condition_atom in self.condition_tuple],
            dim="temporary_dimension",
        ).all("temporary_dimension")


def conditional_cumulative_temperature(
    temperature: xr.DataArray,
    start_month: int,
    basis_temperature: float,
    start_growing_season: tuple[int, float],
) -> xr.DataArray:
    """
    Calculate cumulative temperature above a threshold.

    Only counts days where the temperature exceeds the basis temperature for a
    minimum number of consecutive days as specified by start_growing_season.
    Used to determine phenological stage transitions.

    :param temperature: Daily temperature time series (one year).
    :type temperature: xr.DataArray
    :param start_month: Month in which to start counting (1-12).
    :type start_month: int
    :param basis_temperature: Temperature threshold for counting.
    :type basis_temperature: float
    :param start_growing_season: Tuple of (minimum consecutive days above threshold,
        threshold value).
    :type start_growing_season: tuple[int, float]
    :return: Cumulative temperature above threshold.
    :rtype: xr.DataArray
    """
    return xr.where(
        np.logical_and(
            #  this intransparent lines (until end) satisfy the requirement to
            #  have a number of consecutive days with temperatures above a
            #  threshold
            np.logical_and(
                temperature.time.dt.month >= start_month,
                (temperature >= start_growing_season[1])
                .isel(time=slice(None, None, -1))
                .rolling(time=start_growing_season[0])
                .sum()
                == start_growing_season[0],
            ).cumsum("time")
            >= 1,  # end
            temperature >= basis_temperature,
        ),
        temperature - basis_temperature,
        0,
    ).cumsum("time")


def apply_condition_value_list(
    condition_value_list: Iterable[tuple["Kc_condition", float]],
    arr: xr.DataArray,
) -> xr.DataArray:
    """
    Assign values to an array based on a list of (condition, value) pairs.

    Later values override earlier ones. Used to set Kc or plant height values
    for different phenological stages.

    :param condition_value_list: List of (condition, value) pairs.
    :type condition_value_list: Iterable[tuple[Kc_condition, float]]
    :param arr: Input DataArray to assign values to.
    :type arr: xr.DataArray
    :return: DataArray with assigned values where conditions are met.
    :rtype: xr.DataArray
    """
    out = xr.DataArray(np.nan, coords=arr.coords)
    for cond, val in condition_value_list:
        out = xr.where(cond.compare(arr), val, out)
    return out


def build_Kc_factor_array(
    Kc_factor_defs: Iterable[tuple["Kc_condition", float]],
    arr: xr.DataArray,
) -> xr.DataArray:
    """
    Interpolate Kc factor values based on phenological stage definitions.

    Applies condition-value pairs and linearly interpolates missing values over time.

    :param Kc_factor_defs: Iterable of (condition, value) pairs for Kc.
    :type Kc_factor_defs: Iterable[tuple[Kc_condition, float]]
    :param arr: Values DataArray that is compared against conditions.
    :type arr: xr.DataArray
    :return: Interpolated Kc factor DataArray.
    :rtype: xr.DataArray
    """
    return apply_condition_value_list(Kc_factor_defs, arr).interpolate_na(
        "time", "linear"
    )


def build_plant_height_array(
    plant_height_defs: Iterable[tuple["Kc_condition", float]],
    arr: xr.DataArray,
) -> xr.DataArray:
    """
    Assign plant height values based on phenological stage definitions.

    Applies condition-value pairs and fills missing values with zero.

    :param plant_height_defs: Iterable of (condition, value) pairs for plant height.
    :type plant_height_defs: Iterable[tuple[Kc_condition, float]]
    :param arr: Values DataArray that is compared against conditions.
    :type arr: xr.DataArray
    :return: Plant height DataArray.
    :rtype: xr.DataArray
    """
    return (
        apply_condition_value_list(plant_height_defs, arr)
        .interpolate_na("time", "linear")
        .fillna(0)
    )


[docs] def compute_phenology_variables( temperature: xr.DataArray, crops: Iterable[T_crop_names] = ( "winter wheat", "spring barley", "maize", "grassland", ), grass_cut_mask: xr.DataArray | None = None, ) -> xr.Dataset: """ Compute crop coefficients (Kc) and plant heights for specified crops. Uses temperature data and crop-specific rules to determine phenological stages, then assigns Kc and plant height values accordingly. :param temperature: Daily surface air temperature average for one year. :type temperature: xr.DataArray :param crops: List of crops to compute phenology for. :type crops: Iterable[str], optional :return: Dataset containing Kc_factor and plant_height for each crop. :rtype: xr.Dataset Notes ----- The current implementation draws on the three references below (basic idea: :cite:`allen2000`, parameters and specifications: :cite:`eitzinger2024`, grass cut days: :cite:`schaumberger2011`). .. bibliography:: """ # all of winter wheat, spring barley, grain maize, potato, soybeans and # grassland (mähwiese) need to be included # TODO search xclim for degree_day_exceedance ! # TODO CRS should be adopted from coords Kc_ini_val = 0.4 Kc_mid_val = 1.2 Kc_end_val = 0.5 Kc_out_val = 0.4 before_out_season = Kc_condition_atom( operator.lt, pd.Timestamp(month=12, day=1, year=999) ) out_season = Kc_condition_atom(operator.ge, pd.Timestamp(month=12, day=1, year=999)) Kc_factor_da_list = [] plant_height_da_list = [] for crop in crops: if crop == "winter wheat": season_start_cumT = 51 mid_season_start_cumT = season_start_cumT + 461 mid_season_end_cumT = mid_season_start_cumT + 535 end_season_start_cumT = mid_season_end_cumT + 426 cumT = (temperature - 5).clip(min=0).cumsum("time") elif crop == "spring barley": season_start_cumT = 99 mid_season_start_cumT = season_start_cumT + 532 mid_season_end_cumT = mid_season_start_cumT + 450 end_season_start_cumT = mid_season_end_cumT + 337 cumT = (temperature - 5).clip(min=0).cumsum("time") elif crop == "maize": season_start_cumT = 156 mid_season_start_cumT = season_start_cumT + 476 mid_season_end_cumT = mid_season_start_cumT + 1076 end_season_start_cumT = mid_season_end_cumT + 53 cumT = (temperature - 8).clip(min=0).cumsum("time") elif crop == "soybean": season_start_cumT = 165 mid_season_start_cumT = season_start_cumT + 500 mid_season_end_cumT = mid_season_start_cumT + 661 end_season_start_cumT = mid_season_end_cumT + 54 cumT = (temperature - 10).clip(min=0).cumsum("time") elif "norm potato" in crop: season_start_cumT = 259 mid_season_start_cumT = season_start_cumT + 501 mid_season_end_cumT = mid_season_start_cumT + 728 end_season_start_cumT = mid_season_end_cumT + 290 cumT = (temperature - 5).clip(min=0).cumsum("time") elif crop.startswith("wofost potato"): Kc_mid_val = 1.0 cumT = ( temperature.where(temperature >= 3, 0) .where( temperature.time >= pd.Timestamp( year=temperature.time.dt.year.item(0), month=4, day=15 ) ) .cumsum("time") ) season_start_cumT = 170 if crop.endswith("very early"): mid_season_start_cumT = season_start_cumT + 150 mid_season_end_cumT = mid_season_start_cumT + 1250 elif crop.endswith("mid"): mid_season_start_cumT = season_start_cumT + 150 mid_season_end_cumT = mid_season_start_cumT + 1500 elif crop.endswith("late"): mid_season_start_cumT = season_start_cumT + 200 mid_season_end_cumT = mid_season_start_cumT + 1700 else: print(f"! WARNING: requested crop {crop} not recognized; skipped") continue elif crop == "grassland": if grass_cut_mask is None: """ To be consistent with the original ARIS, grassland is implemented following the Dissertation by Schaumberger [1]. First: the temperature sums are evaluated at given dates Second: the cutting date is determined based on a predefined temperature sum to time difference mapping The algorithm is complex compared to the earlier crops. For details refer to [1]. [1] Schaumberger, A. (2011). Räumliche Modelle zur Vegetations- und Ertragsdynamik im Wirtschaftsgrünland [Dissertation, Graz University of Technology]. https://repository.tugraz.at/publications/npc97-y3058. """ earliest = [ [141, 250], [125, 161, 229], [122, 154, 193, 239], ] regular = [ [168, 274], [148, 206, 271], [139, 181, 227, 277], ] latest = [ [200, 307], [177, 260, 301], [155, 225, 269, 305], ] cumTs = [ [630, 710], [630, 710, 910], [630, 710, 910, 850], ] cumT = conditional_cumulative_temperature( temperature, start_month=3, basis_temperature=0, start_growing_season=(5, 5), ) # Kc and plant height values Kc_ini_val = 0.4 Kc_mid_val = 1.2 Kc_end_val = 0.9 Kc_out_val = 0.4 # plH_ini_val = 0 plH_mid_val = 0.7 plH_end_val = 0.2 plH_out_val = 0.2 # debatable choice before_out_season = Kc_condition_atom( operator.lt, pd.Timestamp(month=11, day=1, year=999) ) out_season = Kc_condition_atom( operator.ge, pd.Timestamp(month=11, day=1, year=999) ) group_output_collector = [] group_output_collector2 = [] try: for ( _, group, ) in cumT.groupby_bins( # TODO wrap in .map_blocks if chunked cumT.sel(time=f"{cumT.time[0].dt.year.values}-10-31"), [sum(sublist) for sublist in cumTs] + [99999], ): before_growing_season = Kc_condition_atom( operator.lt, group[ (group == 0).argmin("time").compute() ].time.dt.dayofyear, ) before_march_first = Kc_condition_atom( operator.lt, group[:, 0] .sel(time=f"{group.time.dt.year.item(0)}-03-01") .time.dt.dayofyear.item(0), ) # init list to define stages dynamically Kc_factor_periods = [ (before_growing_season, Kc_ini_val), (before_march_first, Kc_out_val), ] plant_height_periods = [ (before_growing_season, plH_out_val), (before_march_first, plH_out_val), ] # cycle through threshold values for T_threshold, mid, lower, upper in zip( cumTs.pop(0), regular.pop(0), earliest.pop(0), latest.pop(0) ): lower_fraction_limit = 0.5 if mid < 170 else 0.4 T_sum_ratio = ( group.where(group.time.dt.dayofyear == mid).sum("time") / T_threshold ).clip(lower_fraction_limit, 2) cut_doy = xr.where( # if smaller 1 then delayed, else earlier T_sum_ratio < 1, np.round( upper - (upper - mid) * (T_sum_ratio - lower_fraction_limit) / (1 - lower_fraction_limit) ).astype("int"), np.round( mid - (mid - lower) * (T_sum_ratio - 1) ).astype("int"), ) cond_cut_doy = Kc_condition_atom(operator.eq, cut_doy - 1) cond_just_after_cut = Kc_condition_atom( operator.eq, cut_doy ) Kc_factor_periods.extend( [ (cond_cut_doy, Kc_mid_val), (cond_just_after_cut, Kc_ini_val), ] ) plant_height_periods.extend( [ (cond_cut_doy, plH_mid_val), (cond_just_after_cut, plH_end_val), ] ) group = group - group.where( group.time.dt.dayofyear == cut_doy - 1 ).sum("time") # set end state # `ge` to match the original ARIS should be `gt` cond_after_cut = Kc_condition_atom(operator.ge, cut_doy) Kc_factor_periods.extend( [ (cond_after_cut, Kc_end_val), (out_season, Kc_out_val), ] ) group_output_collector.append( build_Kc_factor_array( Kc_factor_periods, group.time.dt.dayofyear.broadcast_like(group), ) ) plant_height_periods.extend( [ (cond_after_cut, plH_end_val), (out_season, plH_out_val), ] ) group_output_collector2.append( build_plant_height_array( plant_height_periods, group.time.dt.dayofyear.broadcast_like(group), ) ) Kc_factor_da_list.append( xr.concat( group_output_collector, group_output_collector[0].dims[1] ) .sortby(group_output_collector[0].dims[1]) .unstack() .reindex_like(cumT) .assign_coords( {coord: vals for coord, vals in cumT.coords.items()} ) .rename(crop.replace(" ", "_")) ) plant_height_da_list.append( xr.concat( group_output_collector2, group_output_collector[0].dims[1] ) .sortby(group_output_collector[0].dims[1]) .unstack() .reindex_like(cumT) .assign_coords( {coord: vals for coord, vals in cumT.coords.items()} ) .rename(crop.replace(" ", "_")) ) except ValueError as err: if str(err).startswith( "None of the data falls within bins with edges" ): for da_list in [Kc_factor_da_list, plant_height_da_list]: if da_list[-1].name != "grassland": da_list.append( xr.DataArray(np.nan, coords=cumT.coords).rename( crop.replace(" ", "_") ) ) else: raise err continue else: if grass_cut_mask.isnull().all(): for da_list in [Kc_factor_da_list, plant_height_da_list]: da_list.append( xr.DataArray(np.nan, coords=temperature.coords).rename( crop.replace(" ", "_") ) ) continue cumT = temperature.clip(min=0).cumsum("time") cumTs = [ [133, 1921, 3056], [107, 1135, 1117, 985], [96, 1024, 1032, 901, 713, 571, 209], ] # Kc and plant height values Kc_ini_val = 0.4 Kc_mid_val = 1.2 Kc_end_val = 0.9 Kc_out_val = 0.4 # plH_ini_val = 0 plH_mid_val = 0.7 plH_end_val = 0.2 plH_out_val = 0.2 # debatable choice group_output_collector = [] group_output_collector2 = [] try: # TODO wrap in .map_blocks if chunked for label, group in cumT.groupby(grass_cut_mask): tmp_thrs = np.cumsum(cumTs[int(label) - 1]) before_growing_season = Kc_condition_atom( operator.lt, tmp_thrs[0], ) out_season = Kc_condition_atom(operator.ge, tmp_thrs[-1]) # init list to define stages dynamically Kc_factor_periods = [ (before_growing_season, Kc_ini_val), ] plant_height_periods = [ (before_growing_season, plH_out_val), ] # cycle through threshold values for T_threshold in tmp_thrs[1:-1]: next_day = (group >= T_threshold).idxmax("time") next_day = next_day.where( next_day != group.time.values[0], group.time.values[-1] ) cond_cut_doy = Kc_condition_atom( operator.eq, next_day - pd.Timedelta(1, "d") ) cond_just_after_cut = Kc_condition_atom( operator.eq, next_day ) Kc_factor_periods.extend( [ (cond_cut_doy, Kc_mid_val), (cond_just_after_cut, Kc_ini_val), ] ) plant_height_periods.extend( [ (cond_cut_doy, plH_mid_val), (cond_just_after_cut, plH_end_val), ] ) # set end state cond_after_cut = Kc_condition_atom(operator.ge, next_day) # pyright: ignore[reportPossiblyUnboundVariable] Kc_factor_periods.extend( [ (cond_after_cut, Kc_end_val), (out_season, Kc_out_val), ] ) group_output_collector.append( build_Kc_factor_array( Kc_factor_periods, # pyright: ignore[reportArgumentType] group, ) ) plant_height_periods.extend( [ (cond_after_cut, plH_end_val), (out_season, plH_out_val), ] ) group_output_collector2.append( build_plant_height_array( plant_height_periods, # pyright: ignore[reportArgumentType] group.time.dt.dayofyear.broadcast_like(group), ) ) Kc_factor_da_list.append( xr.concat( group_output_collector, group_output_collector[0].dims[1] ) .sortby(group_output_collector[0].dims[1]) .unstack() .reindex_like(cumT) .assign_coords( {coord: vals for coord, vals in cumT.coords.items()} ) .rename(crop.replace(" ", "_")) ) plant_height_da_list.append( xr.concat( group_output_collector2, group_output_collector[0].dims[1] ) .sortby(group_output_collector[0].dims[1]) .unstack() .reindex_like(cumT) .assign_coords( {coord: vals for coord, vals in cumT.coords.items()} ) .rename(crop.replace(" ", "_")) ) except ValueError as err: if str(err).startswith( "None of the data falls within bins with edges" ): for da_list in [Kc_factor_da_list, plant_height_da_list]: if da_list[-1].name != "grassland": da_list.append( xr.DataArray(np.nan, coords=cumT.coords).rename( crop.replace(" ", "_") ) ) else: raise err continue else: print( f"! WARNING: requested crop {crop} was not recognized and is skipped." ) continue before_growing_season = Kc_condition_atom(operator.lt, season_start_cumT) after_mid_season_start = Kc_condition_atom(operator.ge, mid_season_start_cumT) before_mid_season_end = Kc_condition_atom(operator.le, mid_season_end_cumT) EGS_date = cumT[ before_mid_season_end.compare(cumT).argmin("time").compute() ].time # set EGS_date to nan where applicable EGS_date = EGS_date.where( EGS_date > pd.Timestamp(month=3, day=1, year=cumT.time[0].dt.year.values) ) before_EGS = Kc_condition_atom(operator.lt, EGS_date) # after_EGS = Kc_condition_atom(operator.ge, EGS_date + pd.Timedelta(days=1)) if "end_season_start_cumT" in locals(): mid_season = Kc_condition(after_mid_season_start, before_mid_season_end) end_season = Kc_condition( Kc_condition_atom(operator.ge, end_season_start_cumT), # pyright: ignore[reportPossiblyUnboundVariable] before_out_season, ) mid_and_late = Kc_condition( after_mid_season_start, Kc_condition_atom(operator.lt, end_season_start_cumT), # pyright: ignore[reportPossiblyUnboundVariable] ) else: mid_season = Kc_condition(after_mid_season_start, before_EGS) # late_and_end_season = Kc_condition([after_EGS, before_out_season]) before_late_end = Kc_condition_atom( operator.lt, EGS_date + pd.Timedelta(days=14) ) mid_and_late = Kc_condition(after_mid_season_start, before_late_end) after_late_season = Kc_condition_atom( operator.ge, EGS_date + pd.Timedelta(days=14) ) end_season = Kc_condition(after_late_season, before_out_season) # late_and_end = Kc_condition([after_EGS, before_out_season]) Kc_factor_periods = [ (before_growing_season, Kc_ini_val), (mid_season, Kc_mid_val), (end_season, Kc_end_val), (out_season, Kc_out_val), ] Kc_factor_da_list.append( build_Kc_factor_array(Kc_factor_periods, cumT).rename( crop.replace(" ", "_") ) ) if crop in ["winter wheat", "spring barley"]: plant_height_periods = [ (before_growing_season, 0), (mid_and_late, 1), # (end_season, 0.2), ] elif crop == "maize": plant_height_periods = [ (before_growing_season, 0), (mid_and_late, 2), # (end_season, 0.2), ] elif crop == "soybean": plant_height_periods = [ (before_growing_season, 0), (mid_and_late, 1), # (end_season, 0.2), ] elif "potato" in crop: plant_height_periods = [ (before_growing_season, 0), (mid_and_late, 0.6), # (end_season, 0), ] elif crop == "grassland": plant_height_periods = [(end_season, 0.2)] # inconsistent with the above else: raise Exception( "If you see this error, implement plant height for missing crop." ) plant_height_da_list.append( build_plant_height_array(plant_height_periods, cumT).rename( crop.replace(" ", "_") ) ) Kc_factor_da_list = ( xr.concat(Kc_factor_da_list, "crop") .assign_coords(crop=("crop", list(crops))) .rename("Kc_factor") ) plant_height_da_list = ( xr.concat(plant_height_da_list, "crop") .assign_coords(crop=("crop", list(crops))) .rename("plant_height") ) out = xr.merge([Kc_factor_da_list, plant_height_da_list]) return out
def _run_calc_pheno_years( years: Iterable[int], crops: Iterable[T_crop_names] = ( "winter wheat", "spring barley", "maize", "grassland", ), base_dir: str = str(DEFAULT_BASE_DIR), ): for year in years: out_path = intermediate_year(year, base_dir=base_dir) if out_path.exists(): print(f"! WARNING: {out_path} already exists. Skipping.") continue print("Calculating phenology variables for year", year, "and crops", crops) T2m = xr.open_zarr( str(reference_year(year, base_dir=base_dir)), decode_coords="all" ).air_temperature original_calendar = None if T2m.time.dt.calendar in ["noleap"]: original_calendar = T2m.time.dt.calendar T2m = xr.coding.calendar_ops.convert_calendar(T2m, "gregorian") template = xr.DataArray( dask_arr.zeros(shape=(len(crops), *T2m.shape), dtype="f4"), coords=T2m.expand_dims({"crop": crops}).coords, ).chunk(dict(crop=-1, time=-1, x=41, y=37)) template = xr.merge( [template.rename("Kc_factor"), template.rename("plant_height")] ) result = T2m.map_blocks( lambda x: compute_phenology_variables(x, crops), template=template ) if original_calendar is not None: result = xr.coding.calendar_ops.convert_calendar(result, original_calendar) result.drop_encoding().to_zarr(str(out_path), mode="a-")
[docs] def run_calc_pheno( years: Iterable[int], crops: Iterable[T_crop_names] = ( "winter wheat", "spring barley", "maize", "grassland", ), workers: int = 4, mem_per_worker: str = "3Gb", base_dir: str = str(DEFAULT_BASE_DIR), ): """ Load data, compute phenology variables, and save output for specified years. For each year, loads temperature data, computes phenology variables for the given crops, and writes the results to a Zarr store. :param years: List of years to compute. :type years: Iterable[int] :param crops: List of crops to compute, defaults to ("winter wheat", "spring barley", "maize", "grassland"). :type crops: Iterable, optional """ years = sorted(years) from dask.distributed import Client, LocalCluster print("Starting dask") client = Client( LocalCluster(n_workers=workers, memory_limit=mem_per_worker, death_timeout=30) ) print("... access the dashboard at", client.dashboard_link) try: _run_calc_pheno_years(years=years, crops=crops, base_dir=base_dir) except (FileNotFoundError,) as err: if str(err).startswith("Unable to find group"): print( "\n! ERROR: data missing. Verify that the necessary data are " "available.\n" ) raise finally: client.close() print("Closed dask client\n") print("Successfully computed phenology related variables!\n") print( "Continue by computing the soil water by running\n\t" "`aris calc waterbudget --mode soil [year1 ...]`\n" )
[docs] def main( years: Iterable[int], crops: Iterable[T_crop_names] = ( "winter wheat", "spring barley", "maize", "grassland", ), base_dir: str = str(DEFAULT_BASE_DIR), ): """Legacy alias for :func:`run_calc_pheno` yearly core.""" warn_legacy_python_api( "aris_lite.phenology.main", "aris_lite.phenology.run_calc_pheno" ) _run_calc_pheno_years(years=sorted(years), crops=crops, base_dir=base_dir)
[docs] def main_cli(argv: list[str] | None = None) -> int: """ Command-line interface for computing phenology variables. Parses command-line arguments to determine which years to process and how many Dask workers to use. Initializes a Dask cluster for parallel processing, handles missing data, and manages workflow for phenology calculations. Usage: aris-calc-pheno [years ...] [--workers N] [--mem-per-worker SIZE] :return: None """ warn_legacy_python_api( "aris_lite.phenology.main_cli", "aris_lite.cli.main:main_cli" ) from aris_lite.cli.legacy_wrappers import legacy_pheno_cli return legacy_pheno_cli(argv=argv, emit_warning=False)
if __name__ == "__main__": raise SystemExit(main_cli())