Skip to content

API Reference#

Cosecha: Tools for harvesting earth observation data for use in flood forecasting.

GriddedReaper #

GriddedReaper()

Bases: ReaperBase

Abstract base class for harvesting gridded data.

reap #

reap()

Fetch data from source and store it in instance state.

Returns:

  • DataFrame | Dataset

    Harvested data from the source.

sow_to_zarr #

sow_to_zarr(file_path, consolidate=True)

Write Dataset to Zarr store.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/store.zarr) where the Zarr store will be written. Parent directories are created automatically for local paths.

  • consolidate (bool, default: True ) –

    Whether to consolidate metadata after writing (default: True).

Returns:

  • str

    The path or URI of the written Zarr store.

sow_to_netcdf #

sow_to_netcdf(file_path)

Write Dataset to NetCDF format.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/data.nc) where the NetCDF file will be written. Parent directories are created automatically for local paths.

Returns:

  • str

    The path or URI of the written NetCDF file.

sow_to_icechunk #

sow_to_icechunk(storage_path, group_path)

Write Dataset to IceChunk format.

Parameters:

  • storage_path (str | Path) –

    Local path or S3 URI (e.g. s3://bucket/prefix) for the IceChunk storage. Local directories are created if needed; S3 credentials are read from the environment.

  • group_path (str) –

    Path to the IceChunk group within the repository.

Returns:

  • str

    The path or URI of the written IceChunk grouping.

MRMSReaper #

MRMSReaper(
    dates,
    variable="MultiSensor_QPE_01H_Pass2_00.00",
    transformations=None,
    cache_data=False,
)

Bases: GriddedReaper

Reaper for NOAA MRMS gridded precipitation data.

Initialize MRMSReaper.

Parameters:

  • dates (Literal['latest'] | tuple[str, str]) –

    "latest" to fetch the most recent available data, or a tuple of (start_time, end_time) to fetch a custom range, e.g. ("2026-01-01 00:00Z", "2026-01-01 18:00Z"). To fetch a single time point, set start_time and end_time to the same value, e.g. ("2026-01-01 00:00Z", "2026-01-01 00:00Z").

  • variable (str, default: 'MultiSensor_QPE_01H_Pass2_00.00' ) –

    MRMS variable name.

  • transformations (dict[str, Any], default: None ) –

    Optional transformations to apply to the raw data before returning.

  • cache_data (bool, default: False ) –

    Whether to cache decompressed MRMS files on disk.

reap #

reap()

Fetch data from source and store it in instance state.

Returns:

  • DataFrame | Dataset

    Harvested data from the source.

sow_to_zarr #

sow_to_zarr(file_path, consolidate=True)

Write Dataset to Zarr store.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/store.zarr) where the Zarr store will be written. Parent directories are created automatically for local paths.

  • consolidate (bool, default: True ) –

    Whether to consolidate metadata after writing (default: True).

Returns:

  • str

    The path or URI of the written Zarr store.

sow_to_netcdf #

sow_to_netcdf(file_path)

Write Dataset to NetCDF format.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/data.nc) where the NetCDF file will be written. Parent directories are created automatically for local paths.

Returns:

  • str

    The path or URI of the written NetCDF file.

sow_to_icechunk #

sow_to_icechunk(storage_path, group_path)

Write Dataset to IceChunk format.

Parameters:

  • storage_path (str | Path) –

    Local path or S3 URI (e.g. s3://bucket/prefix) for the IceChunk storage. Local directories are created if needed; S3 credentials are read from the environment.

  • group_path (str) –

    Path to the IceChunk group within the repository.

Returns:

  • str

    The path or URI of the written IceChunk grouping.

NWPReaper #

NWPReaper(
    init_time,
    forecast_hours=None,
    model="hrrr",
    variable="hourly_precip",
    search_str=None,
    product=None,
    transformations=None,
)

Bases: GriddedReaper

Fetch NOAA Numerical Weather Prediction (NWP) forecast data.

Initialize NWPReaper.

Parameters:

  • init_time (str) –

    Model initialization time in format "YYYY-MM-DD HH:MM" or similar. Parsed by pandas.to_datetime(). Also accepts "latest" to automatically fetch the most recent initialization time for the specified model.

  • forecast_hours (list[int] | range | None, default: None ) –

    Forecast hours to request (e.g., [1, 6, 12] or range(1, 19)). Can be none if fetching analysis product.

  • model (str, default: 'hrrr' ) –

    NWP model name (default: 'hrrr'). Other options: 'rrfs', 'rtma', etc.

  • variable (str | list[str] | None, default: 'hourly_precip' ) –

    A simplified variable name (or list of names) mapping to predefined GRIB regex search strings. Common examples include 'hourly_precip', 'total_precip', 'temp_2m'.

  • search_str (str | list[str] | None, default: None ) –

    Exact GRIB regex search string(s) to use. Can be combined with variable.

  • product (str | None, default: None ) –

    Specific Herbie model product string.

  • transformations (dict[str, Any] | None, default: None ) –

    Optional transformations to apply to the raw data before returning.

Raises:

  • ValueError

    If init_time is invalid or forecast_hours are malformed.

  • ReaperError

    If variable is not recognized for the given model, or neither variable nor search_str are provided.

  • ImportError

    If herbie is not installed.

Examples:

>>> reaper = NWPReaper(
...     init_time="2026-01-01 00:00",
...     forecast_hours=range(1, 19),
...     model="hrrr",
...     variable="hourly_precip",
...     transformations={
...         "spatial_subset": {'lat_bounds': (40, 50), 'lon_bounds': (-90, -80)},
...         "variable_rename": {"tp": "total_precipitation"},
...     }
... )

reap #

reap()

Fetch data from source and store it in instance state.

Returns:

  • DataFrame | Dataset

    Harvested data from the source.

sow_to_zarr #

sow_to_zarr(file_path, consolidate=True)

Write Dataset to Zarr store.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/store.zarr) where the Zarr store will be written. Parent directories are created automatically for local paths.

  • consolidate (bool, default: True ) –

    Whether to consolidate metadata after writing (default: True).

Returns:

  • str

    The path or URI of the written Zarr store.

sow_to_netcdf #

sow_to_netcdf(file_path)

Write Dataset to NetCDF format.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/data.nc) where the NetCDF file will be written. Parent directories are created automatically for local paths.

Returns:

  • str

    The path or URI of the written NetCDF file.

sow_to_icechunk #

sow_to_icechunk(storage_path, group_path)

Write Dataset to IceChunk format.

Parameters:

  • storage_path (str | Path) –

    Local path or S3 URI (e.g. s3://bucket/prefix) for the IceChunk storage. Local directories are created if needed; S3 credentials are read from the environment.

  • group_path (str) –

    Path to the IceChunk group within the repository.

Returns:

  • str

    The path or URI of the written IceChunk grouping.

TimeSeriesReaper #

TimeSeriesReaper()

Bases: ReaperBase

Abstract base class for harvesting time-series data.

reap #

reap()

Fetch data from source and store it in instance state.

Returns:

  • DataFrame | Dataset

    Harvested data from the source.

sow_to_parquet #

sow_to_parquet(file_path)

Write HarvestedData to Parquet format.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/key.parquet) where the Parquet file will be written. Parent directories are created automatically for local paths.

Raises:

  • ReaperError

    If reap() has not been called, or if data is not time-series.

Returns:

  • str

    The path or URI of the written Parquet file.

sow_to_iceberg #

sow_to_iceberg(
    warehouse_path,
    table_name,
    namespace="default",
    catalog_name="default",
)

Write tabular data to Apache Iceberg format.

Parameters:

  • warehouse_path (str | Path) –

    Path to the Iceberg warehouse directory. Will be created if needed.

  • table_name (str) –

    Name of the Iceberg table to create or append to.

  • namespace (str, default: 'default' ) –

    Namespace (database) for Iceberg tables (default: 'default').

  • catalog_name (str, default: 'default' ) –

    Name of the PyIceberg catalog (default: 'default').

Returns:

  • str

    The fully qualified table name (namespace.table_name).

USGSNWISReaper #

USGSNWISReaper(
    site_ids,
    start_date,
    end_date,
    parameter_code=None,
    transformations=None,
)

Bases: TimeSeriesReaper

Reaper for USGS NWIS instantaneous data.

Use the dataretrieval library to fetch data from USGS NWIS/Water Data APIs.

Parameters:

  • site_ids (list[str]) –

    List of USGS site IDs (e.g., ["01018035"]).

  • start_date (str) –

    Start date in ISO 8601 format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ).

  • end_date (str) –

    End date in ISO 8601 format (YYYY-MM-DD or YYYY-MM-DDTHH:MM:SSZ).

  • parameter_code (str | list[str] | None, default: None ) –

    USGS parameter code (e.g., "00060" for streamflow or "00045" for precipitation). If a list is provided, fetches data for all specified parameters. If None, fetches all available parameters.

  • transformations (dict[str, Any], default: None ) –

    Optional transformations to apply to the data.

Examples:

>>> reaper = USGSNWISReaper(
...     site_ids=["01018035"],
...     start_date="2026-01-01",
...     end_date="2026-01-31",
...     parameter_code=["00060", "00045"],
... )
>>> data = reaper.reap()

reap #

reap()

Fetch data from source and store it in instance state.

Returns:

  • DataFrame | Dataset

    Harvested data from the source.

sow_to_parquet #

sow_to_parquet(file_path)

Write HarvestedData to Parquet format.

Parameters:

  • file_path (str | Path) –

    Local path or remote URI (e.g. s3://bucket/key.parquet) where the Parquet file will be written. Parent directories are created automatically for local paths.

Raises:

  • ReaperError

    If reap() has not been called, or if data is not time-series.

Returns:

  • str

    The path or URI of the written Parquet file.

sow_to_iceberg #

sow_to_iceberg(
    warehouse_path,
    table_name,
    namespace="default",
    catalog_name="default",
)

Write tabular data to Apache Iceberg format.

Parameters:

  • warehouse_path (str | Path) –

    Path to the Iceberg warehouse directory. Will be created if needed.

  • table_name (str) –

    Name of the Iceberg table to create or append to.

  • namespace (str, default: 'default' ) –

    Namespace (database) for Iceberg tables (default: 'default').

  • catalog_name (str, default: 'default' ) –

    Name of the PyIceberg catalog (default: 'default').

Returns:

  • str

    The fully qualified table name (namespace.table_name).

configure_logger #

configure_logger(
    *,
    verbose=None,
    level=None,
    file=None,
    file_level=None,
    file_mode="a",
    file_only=False,
)

Configure logging settings.

Parameters:

  • verbose (bool, default: None ) –

    Shortcut: True sets console to DEBUG, False to WARNING. If both level and verbose are given, level wins.

  • level (str or int, default: None ) –

    Console logging level ("DEBUG", "INFO", "WARNING", etc.).

  • file (str or Path, default: None ) –

    Enable file logging at this path. Pass None to disable file logging.

  • file_level (str or int, default: None ) –

    File handler level. Defaults to DEBUG.

  • file_mode (('a', 'w'), default: 'a' ) –

    Append or overwrite the log file. Defaults to 'a'.

  • file_only (bool, default: False ) –

    If True, disable console logging. Requires file to be set.