Complete DataFrame Analysis and Optimal Spark Submit Configuration

 from pyspark.sql import SparkSession

from pyspark.sql.functions import input_file_name, col, size, length

import math

import time


def analyze_dataframe_and_get_optimal_config(df, operation_type="read", 

                                           target_cluster_cores=None, 

                                           target_cluster_memory_gb=None):

    """

    Comprehensive DataFrame analysis with optimal Spark configuration recommendations

    

    Args:

        df: PySpark DataFrame to analyze

        operation_type: Type of operation ("read", "write", "transform", "ml", "join")

        target_cluster_cores: Total cores available in cluster (if known)

        target_cluster_memory_gb: Total memory available in cluster (if known)

    

    Returns:

        Dictionary with complete analysis and optimal spark-submit parameters

    """

    

    spark = df.sql_ctx.sparkSession

    

    # === DATAFRAME ANALYSIS ===

    

    def get_dataframe_metadata():

        """Extract comprehensive DataFrame metadata"""

        

        try:

            # Basic DataFrame info

            schema_info = {

                'column_count': len(df.columns),

                'columns': df.columns,

                'data_types': df.dtypes

            }

            

            # Try to get row count (with timeout for very large DataFrames)

            try:

                row_count = df.count()

            except:

                row_count = "Unable to count - very large dataset"

            

            # Get current partitions

            current_partitions = df.rdd.getNumPartitions()

            

            # Estimate data size

            try:

                # Sample to estimate row size

                sample_df = df.sample(0.001, seed=42).limit(1000)

                sample_rows = sample_df.collect()

                

                if sample_rows:

                    # Estimate bytes per row

                    avg_row_size = 0

                    for row in sample_rows[:10]:  # Use first 10 rows for estimation

                        row_str = str(row)

                        avg_row_size += len(row_str.encode('utf-8'))

                    

                    avg_row_size = avg_row_size / min(10, len(sample_rows))

                    

                    if isinstance(row_count, int):

                        estimated_size_bytes = row_count * avg_row_size

                        estimated_size_gb = estimated_size_bytes / (1024**3)

                    else:

                        estimated_size_gb = "Cannot estimate"

                else:

                    estimated_size_gb = "Cannot estimate"

                    avg_row_size = "Cannot estimate"

            except:

                estimated_size_gb = "Cannot estimate"

                avg_row_size = "Cannot estimate"

            

            return {

                'row_count': row_count,

                'current_partitions': current_partitions,

                'estimated_size_gb': estimated_size_gb,

                'avg_row_size_bytes': avg_row_size,

                'schema_info': schema_info

            }

        except Exception as e:

            return {'error': f"Could not analyze DataFrame: {e}"}

    

    def get_hdfs_block_analysis():

        """Get HDFS block information for the DataFrame"""

        

        try:

            # Add file path information

            df_with_files = df.withColumn("input_file", input_file_name())

            

            # Get unique files (limit to prevent memory issues)

            files_sample = df_with_files.select("input_file").distinct().limit(100).collect()

            

            if not files_sample:

                return {'blocks_info': 'No file-based sources detected'}

            

            # Get Hadoop FileSystem

            hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()

            hadoop_fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)

            

            total_blocks = 0

            total_size = 0

            files_analyzed = 0

            

            for file_row in files_sample[:20]:  # Analyze max 20 files for performance

                try:

                    file_path = file_row.input_file

                    path_obj = spark._jvm.org.apache.hadoop.fs.Path(file_path)

                    file_status = hadoop_fs.getFileStatus(path_obj)

                    file_size = file_status.getLen()

                    block_locations = hadoop_fs.getFileBlockLocations(file_status, 0, file_size)

                    

                    total_blocks += len(block_locations)

                    total_size += file_size

                    files_analyzed += 1

                except:

                    continue

            

            # Estimate total blocks if we sampled

            if len(files_sample) > files_analyzed and files_analyzed > 0:

                estimated_total_blocks = int(total_blocks * (len(files_sample) / files_analyzed))

                estimated_total_size_gb = (total_size * (len(files_sample) / files_analyzed)) / (1024**3)

            else:

                estimated_total_blocks = total_blocks

                estimated_total_size_gb = total_size / (1024**3)

            

            return {

                'total_files': len(files_sample),

                'files_analyzed': files_analyzed,

                'estimated_total_blocks': estimated_total_blocks,

                'estimated_size_gb': round(estimated_total_size_gb, 2),

                'avg_blocks_per_file': round(estimated_total_blocks / len(files_sample), 2) if len(files_sample) > 0 else 0

            }

        except Exception as e:

            return {'blocks_info': f'Could not analyze blocks: {e}'}

    

    def calculate_optimal_configuration(metadata, blocks_info):

        """Calculate optimal Spark configuration based on analysis"""

        

        # Default values

        config = {

            'executor_memory': '4g',

            'driver_memory': '2g',

            'executor_cores': 4,

            'num_executors': 2,

            'max_partition_bytes': '128MB',

            'shuffle_partitions': 200,

            'serializer': 'org.apache.spark.serializer.KryoSerializer',

            'adaptive_enabled': True

        }

        

        try:

            # Determine data size for calculations

            data_size_gb = 0

            if isinstance(metadata.get('estimated_size_gb'), (int, float)):

                data_size_gb = metadata['estimated_size_gb']

            elif isinstance(blocks_info.get('estimated_size_gb'), (int, float)):

                data_size_gb = blocks_info['estimated_size_gb']

            

            if data_size_gb == 0:

                data_size_gb = 1  # Minimum assumption

            

            # Calculate based on data size and operation type

            if data_size_gb < 1:  # Small data

                config.update({

                    'executor_memory': '2g',

                    'driver_memory': '1g',

                    'executor_cores': 2,

                    'num_executors': 2,

                    'max_partition_bytes': '64MB',

                    'shuffle_partitions': 50

                })

            elif data_size_gb < 10:  # Medium data

                config.update({

                    'executor_memory': '4g',

                    'driver_memory': '2g',

                    'executor_cores': 4,

                    'num_executors': max(2, int(data_size_gb / 2)),

                    'max_partition_bytes': '128MB',

                    'shuffle_partitions': max(100, int(data_size_gb * 20))

                })

            elif data_size_gb < 100:  # Large data

                config.update({

                    'executor_memory': '8g',

                    'driver_memory': '4g',

                    'executor_cores': 5,

                    'num_executors': max(4, int(data_size_gb / 4)),

                    'max_partition_bytes': '256MB',

                    'shuffle_partitions': max(200, int(data_size_gb * 10))

                })

            else:  # Very large data

                config.update({

                    'executor_memory': '16g',

                    'driver_memory': '8g',

                    'executor_cores': 5,

                    'num_executors': max(8, int(data_size_gb / 8)),

                    'max_partition_bytes': '512MB',

                    'shuffle_partitions': max(400, int(data_size_gb * 5))

                })

            

            # Adjust for operation type

            if operation_type == "ml":

                # ML operations need more memory

                config['executor_memory'] = str(int(config['executor_memory'][:-1]) * 2) + 'g'

                config['driver_memory'] = str(int(config['driver_memory'][:-1]) * 2) + 'g'

            elif operation_type == "join":

                # Joins need more shuffle partitions

                config['shuffle_partitions'] = int(config['shuffle_partitions'] * 1.5)

            elif operation_type == "write":

                # Writes benefit from fewer partitions

                config['shuffle_partitions'] = max(100, int(config['shuffle_partitions'] / 2))

            

            # Adjust based on cluster resources if provided

            if target_cluster_cores:

                max_executors = target_cluster_cores // config['executor_cores']

                config['num_executors'] = min(config['num_executors'], max_executors - 1)  # Reserve 1 for driver

            

            if target_cluster_memory_gb:

                max_executor_memory = (target_cluster_memory_gb * 0.8) // config['num_executors']  # 80% usable

                current_executor_memory = int(config['executor_memory'][:-1])

                if current_executor_memory > max_executor_memory:

                    config['executor_memory'] = str(int(max_executor_memory)) + 'g'

            

            return config

        

        except Exception as e:

            return {**config, 'config_error': f'Error calculating config: {e}'}

    

    def generate_spark_submit_command(config, app_name="dataframe_job"):

        """Generate complete spark-submit command"""

        

        base_command = "spark-submit"

        

        # Core resource parameters

        params = [

            f"--executor-memory {config['executor_memory']}",

            f"--driver-memory {config['driver_memory']}",

            f"--executor-cores {config['executor_cores']}",

            f"--num-executors {config['num_executors']}",

            f"--name {app_name}"

        ]

        

        # Configuration parameters

        conf_params = [

            f"--conf spark.sql.files.maxPartitionBytes={config['max_partition_bytes']}",

            f"--conf spark.sql.shuffle.partitions={config['shuffle_partitions']}",

            f"--conf spark.serializer={config['serializer']}",

            f"--conf spark.sql.adaptive.enabled={str(config['adaptive_enabled']).lower()}",

            f"--conf spark.sql.adaptive.coalescePartitions.enabled=true",

            f"--conf spark.sql.adaptive.skewJoin.enabled=true",

            f"--conf spark.dynamicAllocation.enabled=false",

            f"--conf spark.network.timeout=800s",

            f"--conf spark.executor.heartbeatInterval=60s"

        ]

        

        # Additional safety configurations

        safety_params = [

            f"--conf spark.sql.execution.arrow.pyspark.enabled=true",

            f"--conf spark.sql.execution.arrow.maxRecordsPerBatch=10000",

            f"--conf spark.task.maxAttempts=3",

            f"--conf spark.stage.maxConsecutiveAttempts=8",

            f"--conf spark.kubernetes.executor.deleteOnTermination=true" if "kubernetes" in str(spark.sparkContext.getConf().getAll()) else "",

            f"--conf spark.speculation=false"

        ]

        

        # Combine all parameters

        all_params = params + conf_params + [param for param in safety_params if param]

        

        command = f"{base_command} \\\n  " + " \\\n  ".join(all_params) + " \\\n  your_script.py"

        

        return command

    

    def get_optimization_recommendations(metadata, config):

        """Provide optimization recommendations"""

        

        recommendations = []

        

        # Partition recommendations

        current_partitions = metadata.get('current_partitions', 0)

        optimal_partitions = config['num_executors'] * config['executor_cores'] * 3

        

        if current_partitions < optimal_partitions // 2:

            recommendations.append(f"Consider repartitioning to ~{optimal_partitions} partitions for better parallelism")

        elif current_partitions > optimal_partitions * 2:

            recommendations.append(f"Consider coalescing to ~{optimal_partitions} partitions to reduce overhead")

        

        # Data size recommendations

        if isinstance(metadata.get('estimated_size_gb'), (int, float)):

            size_gb = metadata['estimated_size_gb']

            if size_gb > 100:

                recommendations.append("Enable dynamic allocation for large datasets")

                recommendations.append("Consider using Delta Lake for better performance")

            if size_gb > 1000:

                recommendations.append("Consider breaking job into smaller chunks")

                recommendations.append("Use columnar formats (Parquet/Delta) if not already")

        

        # Memory recommendations

        executor_memory_gb = int(config['executor_memory'][:-1])

        if executor_memory_gb < 4:

            recommendations.append("Monitor for memory pressure; increase executor memory if needed")

        elif executor_memory_gb > 32:

            recommendations.append("Very high memory allocation; ensure cluster can support this")

        

        # General recommendations

        recommendations.extend([

            "Monitor Spark UI for task skew and adjust partitioning if needed",

            "Enable adaptive query execution for dynamic optimization",

            "Use caching for DataFrames accessed multiple times",

            "Consider broadcast joins for small tables (<200MB)"

        ])

        

        return recommendations

    

    # === MAIN ANALYSIS EXECUTION ===

    

    print("🔍 Analyzing DataFrame...")

    start_time = time.time()

    

    # Get all analysis data

    metadata = get_dataframe_metadata()

    blocks_info = get_hdfs_block_analysis()

    config = calculate_optimal_configuration(metadata, blocks_info)

    spark_submit_cmd = generate_spark_submit_command(config)

    recommendations = get_optimization_recommendations(metadata, config)

    

    analysis_time = time.time() - start_time

    

    # === COMPILE COMPLETE RESULTS ===

    

    results = {

        'analysis_summary': {

            'analysis_time_seconds': round(analysis_time, 2),

            'dataframe_metadata': metadata,

            'hdfs_blocks_analysis': blocks_info,

            'operation_type': operation_type

        },

        'optimal_configuration': config,

        'spark_submit_command': spark_submit_cmd,

        'optimization_recommendations': recommendations,

        'cluster_requirements': {

            'minimum_cores': config['num_executors'] * config['executor_cores'] + 2,  # +2 for driver

            'minimum_memory_gb': config['num_executors'] * int(config['executor_memory'][:-1]) + int(config['driver_memory'][:-1]),

            'recommended_nodes': max(2, math.ceil(config['num_executors'] / 4))  # Assuming 4 executors per node

        },

        'monitoring_commands': {

            'spark_ui': "Access Spark UI at http://<driver-node>:4040",

            'yarn_ui': "yarn application -list -appStates RUNNING",

            'kill_job': "yarn application -kill <application_id>"

        }

    }

    

    return results


# === USAGE EXAMPLES ===


def print_analysis_report(analysis_results):

    """Print a comprehensive analysis report"""

    

    print("=" * 80)

    print("🚀 DATAFRAME ANALYSIS & OPTIMAL SPARK CONFIGURATION")

    print("=" * 80)

    

    # Analysis Summary

    summary = analysis_results['analysis_summary']

    print(f"\n📊 DATAFRAME ANALYSIS (completed in {summary['analysis_time_seconds']}s)")

    print("-" * 50)

    

    metadata = summary['dataframe_metadata']

    if 'error' not in metadata:

        print(f"• Rows: {metadata['row_count']}")

        print(f"• Columns: {metadata['schema_info']['column_count']}")

        print(f"• Current Partitions: {metadata['current_partitions']}")

        print(f"• Estimated Size: {metadata['estimated_size_gb']} GB")

        print(f"• Avg Row Size: {metadata['avg_row_size_bytes']} bytes")

    

    blocks = summary['hdfs_blocks_analysis']

    if 'estimated_total_blocks' in blocks:

        print(f"• HDFS Blocks: {blocks['estimated_total_blocks']}")

        print(f"• Files: {blocks['total_files']}")

    

    # Optimal Configuration

    print(f"\n⚙️ OPTIMAL CONFIGURATION")

    print("-" * 50)

    config = analysis_results['optimal_configuration']

    print(f"• Executor Memory: {config['executor_memory']}")

    print(f"• Driver Memory: {config['driver_memory']}")

    print(f"• Executor Cores: {config['executor_cores']}")

    print(f"• Number of Executors: {config['num_executors']}")

    print(f"• Shuffle Partitions: {config['shuffle_partitions']}")

    print(f"• Max Partition Bytes: {config['max_partition_bytes']}")

    

    # Cluster Requirements

    print(f"\n🏗️ CLUSTER REQUIREMENTS")

    print("-" * 50)

    cluster = analysis_results['cluster_requirements']

    print(f"• Minimum Cores: {cluster['minimum_cores']}")

    print(f"• Minimum Memory: {cluster['minimum_memory_gb']} GB")

    print(f"• Recommended Nodes: {cluster['recommended_nodes']}")

    

    # Spark Submit Command

    print(f"\n🚀 SPARK SUBMIT COMMAND")

    print("-" * 50)

    print(analysis_results['spark_submit_command'])

    

    # Recommendations

    print(f"\n💡 OPTIMIZATION RECOMMENDATIONS")

    print("-" * 50)

    for i, rec in enumerate(analysis_results['optimization_recommendations'], 1):

        print(f"{i}. {rec}")

    

    print("\n" + "=" * 80)


# === MAIN USAGE FUNCTION ===


def analyze_and_optimize_dataframe(df, operation_type="read", cluster_cores=None, cluster_memory_gb=None):

    """

    Main function to analyze DataFrame and get complete optimization strategy

    

    Usage:

        df = spark.table("your_database.your_table").filter("your_condition")

        results = analyze_and_optimize_dataframe(df, "transform", cluster_cores=64, cluster_memory_gb=256)

        print_analysis_report(results)

    """

    

    results = analyze_dataframe_and_get_optimal_config(

        df, 

        operation_type=operation_type,

        target_cluster_cores=cluster_cores,

        target_cluster_memory_gb=cluster_memory_gb

    )

    

    return results


# Example usage:

"""

# Initialize Spark

spark = SparkSession.builder.appName("DataFrame Analysis").getOrCreate()


# Your DataFrame

df = spark.table("sales_db.billion_row_table") \

    .filter("date >= '2024-01-01'") \

    .filter("amount > 1000")


# Analyze and get optimal configuration

results = analyze_and_optimize_dataframe(

    df, 

    operation_type="transform",  # or "read", "write", "ml", "join"

    cluster_cores=64,            # Your cluster's total cores

    cluster_memory_gb=256        # Your cluster's total memory

)


# Print comprehensive report

print_analysis_report(results)


# Use the spark-submit command from results['spark_submit_command']

"""

https://www.perplexity.ai/search/how-data-is-read-in-distribute-rRv8YfujQ0Cck5DRZRTfQQ


Comments

Popular posts from this blog

Prompt Engineering in 2025