Source code for hydro_topo_features.processing.derive_products

"""Functions for computing hydro-topological features."""

import os
import logging
from pathlib import Path
import numpy as np
import rasterio
from pysheds.grid import Grid
from scipy.ndimage import distance_transform_edt
from geopy.distance import geodesic
from typing import Dict
from .. import config

logger = logging.getLogger(__name__)

[docs] def get_osm_hand( site_id: str, raw_dem: str, osm_water_raster: str, burned_dem: str, output_dirs: Dict[str, Path] ) -> str: """ Compute Height Above Nearest Drainage (HAND) using OSM water features. Args: site_id: Unique identifier for the site raw_dem: Path to raw DEM osm_water_raster: Path to rasterized water features burned_dem: Path to burned DEM output_dirs: Dictionary of output directories Returns: Path to HAND raster """ logger.info(f"Computing HAND for site: {site_id}") # Output path hand_path = output_dirs["processed"] / "hand.tif" # Initialize grid grid = Grid.from_raster(raw_dem) # Read raster data raw_dem_data = grid.read_raster(raw_dem) burned_dem_data = grid.read_raster(burned_dem) osm_water = grid.read_raster(osm_water_raster) osm_water = grid.view(osm_water, nodata_out=0) # Fill pits in DEM pit_filled_dem = grid.fill_pits(burned_dem_data) # Fill depressions in DEM logger.info("Filling depressions in DEM...") flooded_dem = grid.fill_depressions(pit_filled_dem) # Resolve flats in DEM logger.info("Resolving flats in DEM...") inflated_dem = grid.resolve_flats(flooded_dem) # Compute flow direction logger.info("Computing flow direction") dirmap = (64, 128, 1, 2, 4, 8, 16, 32) fdir = grid.flowdir(inflated_dem, dirmap=dirmap, flats=-1, pits=-2, nodata_out=0) # Compute flow accumulation flow_accumulation = grid.accumulation(fdir) # Compute HAND logger.info("Computing HAND values") hand = grid.compute_hand(fdir, raw_dem_data, osm_water > 0) # Save HAND raster output_raster = grid.view(hand) # Get original DEM metadata with rasterio.open(raw_dem) as src: dem_meta = src.meta.copy() # Update metadata for writing dem_meta.update({ 'dtype': 'float32', 'nodata': np.nan, 'count': 1 }) # Write DEM to GeoTIFF with rasterio.open(hand_path, 'w', **dem_meta) as dst: dst.write(output_raster.astype(np.float32), 1) logger.info("HAND computation completed") return str(hand_path)
[docs] def get_slope( site_id: str, raw_dem: str, output_dirs: Dict[str, Path] ) -> str: """ Compute slope from DEM. Args: site_id: Unique identifier for the site raw_dem: Path to raw DEM output_dirs: Dictionary of output directories Returns: Path to slope raster """ logger.info(f"Computing slope for site: {site_id}") # Output path slope_path = output_dirs["processed"] / "slope.tif" # Initialize grid and read DEM grid = Grid() dem = grid.read_raster(raw_dem) # Compute slope Horn’s Method (1981) logger.info("Computing slope values") slope_params = config.FEATURE_PARAMS["SLOPE"] if slope_params["algorithm"] == 'horn': dy, dx = np.gradient(dem) slope = np.arctan(np.sqrt(dy**2 + dx**2)) if slope_params["units"] == 'degrees': slope = np.degrees(slope) elif slope_params["units"] == 'percent': slope = np.tan(slope) * 100 # Save slope raster with rasterio.open(raw_dem) as src: meta = src.meta.copy() meta.update({ "dtype": "float32", "nodata": config.DEM_PROCESSING["NODATA_VALUE"] }) with rasterio.open(slope_path, 'w', **meta) as dst: dst.write(slope.astype('float32'), 1) logger.info("Slope computation completed") return str(slope_path)
[docs] def get_edtw( site_id: str, osm_water_raster: str, output_dirs: Dict[str, Path] ) -> str: """ Compute Euclidean Distance to Water (EDTW). Args: site_id: Unique identifier for the site osm_water_raster: Path to rasterized water features output_dirs: Dictionary of output directories Returns: Path to EDTW raster """ logger.info(f"Computing EDTW for site: {site_id}") # Output path edtw_path = output_dirs["processed"] / "edtw.tif" try: # Read water raster with rasterio.open(osm_water_raster) as src: transform = src.transform crs = src.crs water = src.read(1) nodata = src.nodata # Handle nodata values if present if nodata is not None: water = np.ma.masked_equal(water, nodata) water = np.ma.filled(water, 0) # Fill nodata with 0 (non-water) # Get pixel width and height in degrees pixel_width_deg = abs(transform.a) pixel_height_deg = abs(transform.e) # Center of the raster to estimate latitude center_lat = src.bounds.top - (src.height // 2) * pixel_height_deg center_lon = src.bounds.left + (src.width // 2) * pixel_width_deg # Approximate meters per pixel using geodesic distance pixel_width_m = geodesic( (center_lat, center_lon), (center_lat, center_lon + pixel_width_deg) ).meters pixel_height_m = geodesic( (center_lat, center_lon), (center_lat + pixel_height_deg, center_lon) ).meters # Use average for sampling parameter pixel_size = (pixel_height_m + pixel_width_m) / 2 logger.info(f"Pixel size: approximately {pixel_size:.2f} meters") # Create water mask (1 for water, 0 for non-water) # OSM water raster has value 1 for water, 0 for non-water water_mask = (water == 1).astype(np.uint8) # Compute EDTW (distance from each non-water pixel to nearest water pixel) logger.info("Computing distance transform") # Note: distance_transform_edt computes distance from 0s to nearest 1s # So we need to invert the mask (1 - water_mask) edtw = distance_transform_edt(1 - water_mask) * pixel_size # Get max value for informational purposes max_dist = np.max(edtw) logger.info(f"Maximum distance to water: {max_dist:.2f} meters") # Save distance raster to file with rasterio.open( edtw_path, 'w', driver='GTiff', height=edtw.shape[0], width=edtw.shape[1], count=1, dtype='float32', crs=crs, transform=transform, nodata=np.finfo('float32').max ) as dst: dst.write(edtw.astype(np.float32), 1) logger.info("EDTW computation completed") return str(edtw_path) except Exception as e: logger.error(f"Error computing EDTW: {str(e)}") raise