Skip to content

Aggregation

influpaint.batch.aggregation

Post-processing script to aggregate inpainting results and compute WIS scores.

Reads results from MLflow experiment and computes: - WIS scores across scenarios/dates/configs - Summary tables and plots - Ensemble forecasts

Usage

python aggregate_results.py -e "paper-2025-06_inpainting" --output_dir "./results"

compute_wis_scores(df)

Compute WIS scores (placeholder - needs ground truth data)

Source code in influpaint/batch/aggregation.py
def compute_wis_scores(df):
    """Compute WIS scores (placeholder - needs ground truth data)"""
    # This is a placeholder - you'd need to implement actual WIS computation
    # based on your ground truth data and forecast quantiles

    print("⚠️  WIS computation not implemented yet - need ground truth data")
    print("   This would require:")
    print("   1. Loading ground truth observations for each date")
    print("   2. Loading forecast quantiles from saved files")  
    print("   3. Computing WIS using proper scoring rules")

    # Return placeholder results
    wis_data = []
    for scenario in df['scenario_id'].unique():
        for config in df['config_name'].unique():
            wis_data.append({
                'scenario_id': scenario,
                'config_name': config,
                'wis_score': np.random.random(),  # Placeholder
                'num_dates': len(df[(df['scenario_id'] == scenario) & (df['config_name'] == config)])
            })

    return pd.DataFrame(wis_data)

create_ensemble_forecasts(df)

Create ensemble forecasts by averaging across configs

Source code in influpaint/batch/aggregation.py
def create_ensemble_forecasts(df):
    """Create ensemble forecasts by averaging across configs"""
    ensemble = df.groupby(['scenario_id', 'forecast_date']).agg({
        'forecast_mean': 'mean',
        'forecast_std': 'mean',  # Simple average - could use more sophisticated methods
        'num_samples': 'first'
    }).reset_index()

    ensemble['config_name'] = 'ensemble'
    return ensemble

create_summary_plots(df, output_path)

Create summary plots

Source code in influpaint/batch/aggregation.py
def create_summary_plots(df, output_path):
    """Create summary plots"""

    # Plot 1: Forecast means by scenario and config
    plt.figure(figsize=(12, 8))

    # Pivot for heatmap
    pivot_data = df.pivot_table(
        values='forecast_mean', 
        index='scenario_id', 
        columns='config_name',
        aggfunc='mean'
    )

    sns.heatmap(pivot_data, annot=True, fmt='.1f', cmap='viridis')
    plt.title('Average Forecast Mean by Scenario and Config')
    plt.tight_layout()
    plt.savefig(output_path / 'forecast_heatmap.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Plot 2: Forecast distribution by config
    plt.figure(figsize=(10, 6))
    sns.boxplot(data=df, x='config_name', y='forecast_mean')
    plt.xticks(rotation=45)
    plt.title('Forecast Distribution by Config')
    plt.tight_layout()
    plt.savefig(output_path / 'forecast_distribution.png', dpi=300, bbox_inches='tight')
    plt.close()

    # Plot 3: Timeline of forecasts
    if 'forecast_date' in df.columns:
        df_timeline = df.copy()
        df_timeline['forecast_date'] = pd.to_datetime(df_timeline['forecast_date'])

        plt.figure(figsize=(14, 8))
        for config in df['config_name'].unique():
            config_data = df_timeline[df_timeline['config_name'] == config]
            daily_avg = config_data.groupby('forecast_date')['forecast_mean'].mean()
            plt.plot(daily_avg.index, daily_avg.values, marker='o', label=config, alpha=0.7)

        plt.xlabel('Forecast Date')
        plt.ylabel('Average Forecast Mean')
        plt.title('Forecast Timeline by Config')
        plt.legend()
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(output_path / 'forecast_timeline.png', dpi=300, bbox_inches='tight')
        plt.close()

generate_summary_stats(df)

Generate summary statistics from results

Source code in influpaint/batch/aggregation.py
def generate_summary_stats(df):
    """Generate summary statistics from results"""
    stats = {
        'total_runs': len(df),
        'unique_scenarios': df['scenario_id'].nunique(),
        'unique_dates': df['forecast_date'].nunique(),
        'unique_configs': df['config_name'].nunique(),
        'scenario_range': f"{df['scenario_id'].min()}-{df['scenario_id'].max()}",
        'date_range': f"{df['forecast_date'].min()} to {df['forecast_date'].max()}",
        'configs': sorted(df['config_name'].unique().tolist()),
        'completion_rate': f"{len(df[df['status'] == 'FINISHED'])} / {len(df)} ({100*len(df[df['status'] == 'FINISHED'])/len(df):.1f}%)",
        'avg_forecast_mean': df['forecast_mean'].mean(),
        'avg_forecast_std': df['forecast_std'].mean(),
    }
    return stats

load_mlflow_results(experiment_name)

Load inpainting results from MLflow experiment

Source code in influpaint/batch/aggregation.py
def load_mlflow_results(experiment_name):
    """Load inpainting results from MLflow experiment"""
    client = MlflowClient()

    try:
        experiment = client.get_experiment_by_name(experiment_name)
        if experiment is None:
            print(f"❌ Experiment '{experiment_name}' not found")
            return pd.DataFrame()

        runs = client.search_runs(
            experiment_ids=[experiment.experiment_id],
            filter_string="attributes.status = 'FINISHED'",
            order_by=["start_time DESC"]
        )

        if not runs:
            print(f"❌ No finished runs found in experiment")
            return pd.DataFrame()

        # Extract data from runs
        data = []
        for run in runs:
            params = run.data.params
            metrics = run.data.metrics

            data.append({
                'run_id': run.info.run_id,
                'scenario_id': int(params.get('scenario_id', -1)),
                'forecast_date': params.get('forecast_date'),
                'config_name': params.get('config_name'),
                'unet_name': params.get('unet_name'),
                'dataset_name': params.get('dataset_name'),
                'transform_name': params.get('transform_name'),
                'enrich_name': params.get('enrich_name'),
                'training_run_id': params.get('training_run_id'),
                'forecast_mean': metrics.get('forecast_mean'),
                'forecast_std': metrics.get('forecast_std'),
                'forecast_min': metrics.get('forecast_min'),
                'forecast_max': metrics.get('forecast_max'),
                'num_samples': metrics.get('num_samples'),
                'start_time': run.info.start_time,
                'end_time': run.info.end_time,
                'status': run.info.status
            })

        return pd.DataFrame(data)

    except Exception as e:
        print(f"❌ Error loading MLflow results: {e}")
        return pd.DataFrame()

main(experiment_name, output_dir, compute_wis, create_ensemble, plot_results)

Aggregate inpainting results from MLflow experiment

Source code in influpaint/batch/aggregation.py
@click.command()
@click.option("-e", "--experiment_name", required=True, help="MLflow inpainting experiment name")
@click.option("--output_dir", default="./results", help="Output directory for aggregated results")
@click.option("--compute_wis", is_flag=True, help="Compute WIS scores (requires ground truth)")
@click.option("--create_ensemble", is_flag=True, help="Create ensemble forecasts across configs")
@click.option("--plot_results", is_flag=True, help="Generate summary plots")
def main(experiment_name, output_dir, compute_wis, create_ensemble, plot_results):
    """Aggregate inpainting results from MLflow experiment"""

    output_path = Path(output_dir)
    output_path.mkdir(parents=True, exist_ok=True)

    print(f"🔍 Aggregating results from experiment: {experiment_name}")
    print(f"📁 Output directory: {output_path}")

    # Load results from MLflow
    print("\n📊 Loading results from MLflow...")
    results_df = load_mlflow_results(experiment_name)

    if results_df.empty:
        print("❌ No results found in MLflow experiment")
        return

    print(f"✅ Loaded {len(results_df)} inpainting runs")

    # Save raw results
    results_file = output_path / "raw_results.csv"
    results_df.to_csv(results_file, index=False)
    print(f"💾 Raw results saved to: {results_file}")

    # Generate summary statistics
    print("\n📈 Generating summary statistics...")
    summary_stats = generate_summary_stats(results_df)

    # Save summary
    summary_file = output_path / "summary_stats.json"
    with open(summary_file, 'w') as f:
        json.dump(summary_stats, f, indent=2, default=str)
    print(f"💾 Summary stats saved to: {summary_file}")

    # Print summary table
    print("\n📋 Summary by Scenario and Config:")
    print_summary_table(results_df)

    # Compute WIS if requested
    if compute_wis:
        print("\n🎯 Computing WIS scores...")
        try:
            wis_results = compute_wis_scores(results_df)
            wis_file = output_path / "wis_scores.csv"
            wis_results.to_csv(wis_file, index=False)
            print(f"💾 WIS scores saved to: {wis_file}")

            print("\n🏆 Best performing configs by WIS:")
            print_wis_rankings(wis_results)

        except Exception as e:
            print(f"⚠️  WIS computation failed: {e}")

    # Create ensemble forecasts
    if create_ensemble:
        print("\n🤝 Creating ensemble forecasts...")
        try:
            ensemble_results = create_ensemble_forecasts(results_df)
            ensemble_file = output_path / "ensemble_forecasts.csv"
            ensemble_results.to_csv(ensemble_file, index=False)
            print(f"💾 Ensemble forecasts saved to: {ensemble_file}")
        except Exception as e:
            print(f"⚠️  Ensemble creation failed: {e}")

    # Generate plots
    if plot_results:
        print("\n📊 Generating plots...")
        try:
            create_summary_plots(results_df, output_path)
            print(f"💾 Plots saved to: {output_path}")
        except Exception as e:
            print(f"⚠️  Plot generation failed: {e}")

    print(f"\n✅ Aggregation complete! Results in: {output_path}")

print_summary_table(df)

Print summary table of results

Source code in influpaint/batch/aggregation.py
def print_summary_table(df):
    """Print summary table of results"""
    summary = df.groupby(['scenario_id', 'config_name']).agg({
        'forecast_mean': ['count', 'mean', 'std'],
        'forecast_std': 'mean',
        'forecast_date': 'nunique'
    }).round(2)

    summary.columns = ['num_dates', 'mean_forecast', 'std_forecast', 'mean_uncertainty', 'unique_dates']
    summary = summary.reset_index()

    print(tabulate(summary.head(20), headers='keys', tablefmt='simple', showindex=False))
    if len(summary) > 20:
        print(f"... and {len(summary) - 20} more rows")

print_wis_rankings(wis_df)

Print WIS rankings by config

Source code in influpaint/batch/aggregation.py
def print_wis_rankings(wis_df):
    """Print WIS rankings by config"""
    avg_wis = wis_df.groupby('config_name')['wis_score'].mean().sort_values()

    print("Average WIS by config (lower is better):")
    for i, (config, wis) in enumerate(avg_wis.items(), 1):
        print(f"  {i}. {config}: {wis:.3f}")