Source code for hydro_topo_features.processing.prepare_data
"""Functions for preparing input data for hydro-topological feature extraction."""
import os
import logging
import numpy as np
import rasterio
from pathlib import Path
from rasterio.merge import merge
from typing import Dict, List, Tuple, Any
import geopandas as gpd
import osmnx as ox
from .. import config
from pysheds.grid import Grid
from shapely.geometry import box
import rasterio
from rasterio.features import rasterize
from rasterio.transform import array_bounds
import pandas as pd
logger = logging.getLogger(__name__)
[docs]
def prepare_input_data(
site_id: str,
aoi_path: str,
dem_tile_folder_path: str,
output_dirs: Dict[str, Path]
) -> Dict[str, str]:
"""
Prepare input data for hydro-topological feature extraction.
This function performs the following steps:
1. Merges DEM tiles into a single DEM
2. Extracts water features from OpenStreetMap
3. Rasterizes water features to match DEM resolution
Args:
site_id: Unique identifier for the site
aoi_path: Path to AOI shapefile/geopackage
dem_tile_folder_path: Path to folder containing DEM tiles
output_dirs: Dictionary of output directories
Returns:
Dictionary of output paths (raw_dem, osm_water_vector, osm_water_raster)
"""
logger.info(f"Preparing input data for site: {site_id}")
# Define output file paths
raw_dem_path = output_dirs["interim"] / "raw_dem.tif"
interim_dir = output_dirs["interim"]
osm_vector_path = interim_dir / "osm_water_vector.gpkg"
osm_raster_path = interim_dir / "osm_water_raster.tif"
# 1. Merge DEM tiles
logger.info("Merging DEM tiles...")
merge_dem_tiles(dem_tile_folder_path, raw_dem_path)
# 2. Extract water features from OpenStreetMap
logger.info("Extracting water features from OpenStreetMap...")
extract_osm_water_features(aoi_path, osm_vector_path, str(raw_dem_path))
# 3. Rasterize water features
logger.info("Rasterizing water features...")
rasterize_water_features(raw_dem_path, osm_vector_path, osm_raster_path)
logger.info("Input data preparation completed")
return {
"raw_dem": str(raw_dem_path),
"osm_water_vector": str(osm_vector_path),
"osm_water_raster": str(osm_raster_path)
}
[docs]
def merge_dem_tiles(dem_tile_folder_path: str, output_path: Path) -> None:
"""
Merge multiple DEM tiles into a single DEM.
Args:
dem_tile_folder_path: Path to folder containing DEM tiles
output_path: Path to output merged DEM
"""
# Create parent directory if it doesn't exist
os.makedirs(output_path.parent, exist_ok=True)
# Get list of DEM tiles
dem_tile_folder = Path(dem_tile_folder_path)
dem_tiles = list(dem_tile_folder.glob("*.tif"))
if not dem_tiles:
raise FileNotFoundError(f"No DEM tiles found in: {dem_tile_folder}")
logger.info(f"Found {len(dem_tiles)} DEM tiles")
# Open all DEM tiles
sources = []
for tile in dem_tiles:
try:
src = rasterio.open(tile)
sources.append(src)
except Exception as e:
logger.error(f"Error opening DEM tile {tile}: {str(e)}")
if not sources:
raise ValueError("No valid DEM tiles could be opened")
# Merge tiles
try:
mosaic, out_transform = merge(sources)
# Get metadata from first tile
out_meta = sources[0].meta.copy()
# Update metadata
out_meta.update({
"driver": "GTiff",
"height": mosaic.shape[1],
"width": mosaic.shape[2],
"transform": out_transform,
"crs": sources[0].crs,
"nodata": config.DEM_PROCESSING["NODATA_VALUE"]
})
mosaic = mosaic / 100.0
# Write merged DEM
with rasterio.open(output_path, "w", **out_meta) as dest:
dest.write(mosaic.astype('float32'))
logger.info(f"Merged DEM saved to: {output_path}")
except Exception as e:
logger.error(f"Error merging DEM tiles: {str(e)}")
raise
finally:
# Close all sources
for src in sources:
src.close()
[docs]
def extract_osm_water_features(aoi_path: str, output_path: Path, dem_path: str) -> None:
"""
Extract water features from OpenStreetMap within the AOI.
Args:
aoi_path: Path to AOI shapefile/geopackage
output_path: Path to output water features
dem_path: Path to DEM for grid initialization
"""
# Create parent directory if it doesn't exist
os.makedirs(output_path.parent, exist_ok=True)
grid = Grid.from_raster(dem_path)
# Read DEM
raw_dem = grid.read_raster(dem_path)
bbox = box(*raw_dem.bbox)
all_features = []
# Properly iterate through OSM_WATER_TAGS dictionary
for category, tags in config.OSM_WATER_TAGS.items():
for tag in tags:
tag_dict = {category: tag}
try:
gdf = ox.features.features_from_polygon(bbox, tag_dict)
print(f" Retrieved {len(gdf)} features for {tag_dict}")
gdf.set_crs(epsg=4326, inplace=True)
all_features.append(gdf)
except Exception as e:
print(f"⚠️ Failed for tags {tag_dict}: {e}")
water_features = (
gpd.GeoDataFrame(pd.concat(all_features, ignore_index=True), crs="EPSG:4326")
if all_features else
gpd.GeoDataFrame(columns=['geometry'], crs="EPSG:4326")
)
# Reproject to match DEM's CRS
water_features = water_features.to_crs(raw_dem.crs)
# === ✂️ Clip water features to DEM grid extent ===
# Get the exact bounds from the raw_dem grid
height, width = raw_dem.shape
bounds = array_bounds(height, width, raw_dem.affine)
clip_box = box(*bounds)
water_clipped = gpd.clip(water_features, clip_box)
# === 🧹 Filter to valid geometries and essential attributes ===
valid_types = ['Polygon', 'MultiPolygon', 'LineString', 'MultiLineString']
water_filtered = water_clipped[water_clipped.geometry.geom_type.isin(valid_types)]
essential_cols = ['geometry', 'waterway', 'natural', 'landuse', 'name']
cols_present = [col for col in essential_cols if col in water_filtered.columns]
water_clean = water_filtered[cols_present]
# Save to file
water_clean.to_file(output_path, driver="GPKG")
logger.info(f"Water features saved to: {output_path}")
[docs]
def rasterize_water_features(dem_path: str, water_path: str, output_path: Path) -> None:
"""
Rasterize water features to match DEM resolution.
Args:
dem_path: Path to DEM
water_path: Path to water features
output_path: Path to output rasterized water features
"""
# Create parent directory if it doesn't exist
os.makedirs(output_path.parent, exist_ok=True)
try:
# Read water features
water_features = gpd.read_file(water_path)
if water_features.empty:
logger.warning("No water features to rasterize")
# Create empty raster with DEM metadata
with rasterio.open(dem_path) as src:
meta = src.meta.copy()
meta.update({
"dtype": "uint8",
"nodata": 0
})
with rasterio.open(output_path, "w", **meta) as dest:
dest.write(np.zeros((1, meta["height"], meta["width"]), dtype=np.uint8))
logger.info(f"Empty water raster saved to: {output_path}")
return
# Read DEM metadata
with rasterio.open(dem_path) as src:
meta = src.meta.copy()
transform = src.transform
height = src.height
width = src.width
# Convert water features to DEM CRS if needed
if water_features.crs != meta["crs"]:
water_features = water_features.to_crs(meta["crs"])
# Rasterize water features
from rasterio.features import geometry_mask
# Create mask from water features
geoms = water_features.geometry.values
mask = geometry_mask(geoms, out_shape=(height, width), transform=transform, invert=True)
# Convert mask to uint8
water_raster = mask.astype(np.uint8)
# Update metadata
meta.update({
"dtype": "uint8",
"nodata": 0,
"count": 1
})
# Write rasterized water features
with rasterio.open(output_path, "w", **meta) as dest:
dest.write(water_raster[np.newaxis, :, :])
logger.info(f"Rasterized water features saved to: {output_path}")
except Exception as e:
logger.error(f"Error rasterizing water features: {str(e)}")
raise