"""Implementation of the ARIS(_lite) phenology model
This package provides functions for simulating crop phenology, water budget, and yield
expectation using environmental and crop-specific data.
"""
__version__ = "0.2.0.dev1"
__all__ = [
"aris_1go",
"extract_point_data",
"phenology",
"water_budget",
"yield_expectation",
]
from typing import Literal
import xarray as xr
T_crop_names = Literal[
"winter wheat",
"spring barley",
"maize",
"grassland",
"wofost potato very early",
"wofost potato mid",
"wofost potato late",
]
[docs]
def aris_1go(
ds: xr.Dataset,
crops: list[T_crop_names],
):
"""
Run the full ARIS-lite workflow on a single dataset.
This function sequentially applies snow calculation, phenology computation,
soil water calculation, water stress, combined stress, and yield estimation
to the input dataset. It is intended for small datasets and returns a merged
xarray.Dataset with all computed variables.
:param ds: Input dataset containing meteorological and crop data.
:type ds: xr.Dataset
:return: Dataset with all ARIS-lite output variables.
:rtype: xr.Dataset
"""
from aris_lite.water_budget import calc_snow, calc_soil_water
from aris_lite.phenology import compute_phenology_variables
from aris_lite.yield_expectation import calc_combined_stress, calc_yield
def _load_resample_apply(ds, func, *args, **kwargs):
return (
ds.load()
.resample(time="YE")
.map(func, args=args, **kwargs)
.assign_coords(time=("time", ds.time.data))
)
ds = xr.merge(
[
ds,
calc_snow(
ds.assign(
initial_snowcover=xr.zeros_like(ds.precipitation.isel(time=0))
)
).persist(),
]
)
ds = xr.merge(
[
ds,
(
xr.coding.calendar_ops.convert_calendar(
ds.air_temperature.where(~(ds.snowcover > 0)), "gregorian"
).pipe(
_load_resample_apply,
compute_phenology_variables,
crops,
)
).persist(),
]
)
ds = xr.merge(
[
ds,
ds.map_blocks(
_load_resample_apply,
args=(calc_soil_water,),
template=xr.Dataset(
{
var: xr.zeros_like(ds.Kc_factor.broadcast_like(ds.TAW))
.transpose(*ds.Kc_factor.dims, ...)
.chunk(ds.chunks)
for var in ["evapotranspiration", "evapo_ETC", "soil_depletion"]
}
),
)
.chunk(ds.chunks)
.persist(),
]
)
ds = ds.assign(
waterstress=(ds.soil_depletion * 100 / ds.TAW).mean("layer").persist()
)
ds = xr.merge([ds, calc_combined_stress(ds).persist()])
ds = xr.merge(
[
ds,
ds.combined_stress.pipe(_load_resample_apply, calc_yield).persist(),
]
)
return ds.map(lambda da: da.astype("float32") if da.dtype.kind == "f" else da)
def cli():
"""
Command-line interface for running the full ARIS-lite workflow.
Parses command-line arguments for input/output paths and Dask cluster configuration,
then runs the full workflow and writes the output dataset.
Usage:
aris-1go [--workers N] [--mem-per-worker SIZE] input.zarr output.zarr
:return: None
"""
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from textwrap import dedent
parser = ArgumentParser(
prog="aris-1go", # TODO add name
formatter_class=RawDescriptionHelpFormatter,
description=dedent(
"""Calc all standard ARIS output in a single run
This routine is meant for small datasets. Note that intermediately large
amounts of data are generated that can quickly exhaust your memory.
"""
),
)
parser.add_argument(
"--workers", type=int, default=6, help="number of dask workers"
) # TODO change default to 1
parser.add_argument(
"--mem-per-worker",
type=str,
default="3Gb",
help='memory per worker, e.g. "5.67Gb"',
)
parser.add_argument("input", nargs=1, type=str, help="Path to input dataset")
parser.add_argument("output", nargs=1, type=str, help="Path to output dataset")
args = parser.parse_args()
if args.workers > 1:
from dask.distributed import Client, LocalCluster
client = Client(
LocalCluster(n_workers=args.workers, memory_limit=args.mem_per_worker)
)
print(client.dashboard_link)
out_ds = aris_1go(xr.open_zarr(args.input[0]).load().chunk(location=1))
out_ds.chunk(location=-1).to_zarr(args.output[0], mode="w")