Complete DataFrame Analysis and Optimal Spark Submit Configuration
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, col, size, length
import math
import time
def analyze_dataframe_and_get_optimal_config(df, operation_type="read",
target_cluster_cores=None,
target_cluster_memory_gb=None):
"""
Comprehensive DataFrame analysis with optimal Spark configuration recommendations
Args:
df: PySpark DataFrame to analyze
operation_type: Type of operation ("read", "write", "transform", "ml", "join")
target_cluster_cores: Total cores available in cluster (if known)
target_cluster_memory_gb: Total memory available in cluster (if known)
Returns:
Dictionary with complete analysis and optimal spark-submit parameters
"""
spark = df.sql_ctx.sparkSession
# === DATAFRAME ANALYSIS ===
def get_dataframe_metadata():
"""Extract comprehensive DataFrame metadata"""
try:
# Basic DataFrame info
schema_info = {
'column_count': len(df.columns),
'columns': df.columns,
'data_types': df.dtypes
}
# Try to get row count (with timeout for very large DataFrames)
try:
row_count = df.count()
except:
row_count = "Unable to count - very large dataset"
# Get current partitions
current_partitions = df.rdd.getNumPartitions()
# Estimate data size
try:
# Sample to estimate row size
sample_df = df.sample(0.001, seed=42).limit(1000)
sample_rows = sample_df.collect()
if sample_rows:
# Estimate bytes per row
avg_row_size = 0
for row in sample_rows[:10]: # Use first 10 rows for estimation
row_str = str(row)
avg_row_size += len(row_str.encode('utf-8'))
avg_row_size = avg_row_size / min(10, len(sample_rows))
if isinstance(row_count, int):
estimated_size_bytes = row_count * avg_row_size
estimated_size_gb = estimated_size_bytes / (1024**3)
else:
estimated_size_gb = "Cannot estimate"
else:
estimated_size_gb = "Cannot estimate"
avg_row_size = "Cannot estimate"
except:
estimated_size_gb = "Cannot estimate"
avg_row_size = "Cannot estimate"
return {
'row_count': row_count,
'current_partitions': current_partitions,
'estimated_size_gb': estimated_size_gb,
'avg_row_size_bytes': avg_row_size,
'schema_info': schema_info
}
except Exception as e:
return {'error': f"Could not analyze DataFrame: {e}"}
def get_hdfs_block_analysis():
"""Get HDFS block information for the DataFrame"""
try:
# Add file path information
df_with_files = df.withColumn("input_file", input_file_name())
# Get unique files (limit to prevent memory issues)
files_sample = df_with_files.select("input_file").distinct().limit(100).collect()
if not files_sample:
return {'blocks_info': 'No file-based sources detected'}
# Get Hadoop FileSystem
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
total_blocks = 0
total_size = 0
files_analyzed = 0
for file_row in files_sample[:20]: # Analyze max 20 files for performance
try:
file_path = file_row.input_file
path_obj = spark._jvm.org.apache.hadoop.fs.Path(file_path)
file_status = hadoop_fs.getFileStatus(path_obj)
file_size = file_status.getLen()
block_locations = hadoop_fs.getFileBlockLocations(file_status, 0, file_size)
total_blocks += len(block_locations)
total_size += file_size
files_analyzed += 1
except:
continue
# Estimate total blocks if we sampled
if len(files_sample) > files_analyzed and files_analyzed > 0:
estimated_total_blocks = int(total_blocks * (len(files_sample) / files_analyzed))
estimated_total_size_gb = (total_size * (len(files_sample) / files_analyzed)) / (1024**3)
else:
estimated_total_blocks = total_blocks
estimated_total_size_gb = total_size / (1024**3)
return {
'total_files': len(files_sample),
'files_analyzed': files_analyzed,
'estimated_total_blocks': estimated_total_blocks,
'estimated_size_gb': round(estimated_total_size_gb, 2),
'avg_blocks_per_file': round(estimated_total_blocks / len(files_sample), 2) if len(files_sample) > 0 else 0
}
except Exception as e:
return {'blocks_info': f'Could not analyze blocks: {e}'}
def calculate_optimal_configuration(metadata, blocks_info):
"""Calculate optimal Spark configuration based on analysis"""
# Default values
config = {
'executor_memory': '4g',
'driver_memory': '2g',
'executor_cores': 4,
'num_executors': 2,
'max_partition_bytes': '128MB',
'shuffle_partitions': 200,
'serializer': 'org.apache.spark.serializer.KryoSerializer',
'adaptive_enabled': True
}
try:
# Determine data size for calculations
data_size_gb = 0
if isinstance(metadata.get('estimated_size_gb'), (int, float)):
data_size_gb = metadata['estimated_size_gb']
elif isinstance(blocks_info.get('estimated_size_gb'), (int, float)):
data_size_gb = blocks_info['estimated_size_gb']
if data_size_gb == 0:
data_size_gb = 1 # Minimum assumption
# Calculate based on data size and operation type
if data_size_gb < 1: # Small data
config.update({
'executor_memory': '2g',
'driver_memory': '1g',
'executor_cores': 2,
'num_executors': 2,
'max_partition_bytes': '64MB',
'shuffle_partitions': 50
})
elif data_size_gb < 10: # Medium data
config.update({
'executor_memory': '4g',
'driver_memory': '2g',
'executor_cores': 4,
'num_executors': max(2, int(data_size_gb / 2)),
'max_partition_bytes': '128MB',
'shuffle_partitions': max(100, int(data_size_gb * 20))
})
elif data_size_gb < 100: # Large data
config.update({
'executor_memory': '8g',
'driver_memory': '4g',
'executor_cores': 5,
'num_executors': max(4, int(data_size_gb / 4)),
'max_partition_bytes': '256MB',
'shuffle_partitions': max(200, int(data_size_gb * 10))
})
else: # Very large data
config.update({
'executor_memory': '16g',
'driver_memory': '8g',
'executor_cores': 5,
'num_executors': max(8, int(data_size_gb / 8)),
'max_partition_bytes': '512MB',
'shuffle_partitions': max(400, int(data_size_gb * 5))
})
# Adjust for operation type
if operation_type == "ml":
# ML operations need more memory
config['executor_memory'] = str(int(config['executor_memory'][:-1]) * 2) + 'g'
config['driver_memory'] = str(int(config['driver_memory'][:-1]) * 2) + 'g'
elif operation_type == "join":
# Joins need more shuffle partitions
config['shuffle_partitions'] = int(config['shuffle_partitions'] * 1.5)
elif operation_type == "write":
# Writes benefit from fewer partitions
config['shuffle_partitions'] = max(100, int(config['shuffle_partitions'] / 2))
# Adjust based on cluster resources if provided
if target_cluster_cores:
max_executors = target_cluster_cores // config['executor_cores']
config['num_executors'] = min(config['num_executors'], max_executors - 1) # Reserve 1 for driver
if target_cluster_memory_gb:
max_executor_memory = (target_cluster_memory_gb * 0.8) // config['num_executors'] # 80% usable
current_executor_memory = int(config['executor_memory'][:-1])
if current_executor_memory > max_executor_memory:
config['executor_memory'] = str(int(max_executor_memory)) + 'g'
return config
except Exception as e:
return {**config, 'config_error': f'Error calculating config: {e}'}
def generate_spark_submit_command(config, app_name="dataframe_job"):
"""Generate complete spark-submit command"""
base_command = "spark-submit"
# Core resource parameters
params = [
f"--executor-memory {config['executor_memory']}",
f"--driver-memory {config['driver_memory']}",
f"--executor-cores {config['executor_cores']}",
f"--num-executors {config['num_executors']}",
f"--name {app_name}"
]
# Configuration parameters
conf_params = [
f"--conf spark.sql.files.maxPartitionBytes={config['max_partition_bytes']}",
f"--conf spark.sql.shuffle.partitions={config['shuffle_partitions']}",
f"--conf spark.serializer={config['serializer']}",
f"--conf spark.sql.adaptive.enabled={str(config['adaptive_enabled']).lower()}",
f"--conf spark.sql.adaptive.coalescePartitions.enabled=true",
f"--conf spark.sql.adaptive.skewJoin.enabled=true",
f"--conf spark.dynamicAllocation.enabled=false",
f"--conf spark.network.timeout=800s",
f"--conf spark.executor.heartbeatInterval=60s"
]
# Additional safety configurations
safety_params = [
f"--conf spark.sql.execution.arrow.pyspark.enabled=true",
f"--conf spark.sql.execution.arrow.maxRecordsPerBatch=10000",
f"--conf spark.task.maxAttempts=3",
f"--conf spark.stage.maxConsecutiveAttempts=8",
f"--conf spark.kubernetes.executor.deleteOnTermination=true" if "kubernetes" in str(spark.sparkContext.getConf().getAll()) else "",
f"--conf spark.speculation=false"
]
# Combine all parameters
all_params = params + conf_params + [param for param in safety_params if param]
command = f"{base_command} \\\n " + " \\\n ".join(all_params) + " \\\n your_script.py"
return command
def get_optimization_recommendations(metadata, config):
"""Provide optimization recommendations"""
recommendations = []
# Partition recommendations
current_partitions = metadata.get('current_partitions', 0)
optimal_partitions = config['num_executors'] * config['executor_cores'] * 3
if current_partitions < optimal_partitions // 2:
recommendations.append(f"Consider repartitioning to ~{optimal_partitions} partitions for better parallelism")
elif current_partitions > optimal_partitions * 2:
recommendations.append(f"Consider coalescing to ~{optimal_partitions} partitions to reduce overhead")
# Data size recommendations
if isinstance(metadata.get('estimated_size_gb'), (int, float)):
size_gb = metadata['estimated_size_gb']
if size_gb > 100:
recommendations.append("Enable dynamic allocation for large datasets")
recommendations.append("Consider using Delta Lake for better performance")
if size_gb > 1000:
recommendations.append("Consider breaking job into smaller chunks")
recommendations.append("Use columnar formats (Parquet/Delta) if not already")
# Memory recommendations
executor_memory_gb = int(config['executor_memory'][:-1])
if executor_memory_gb < 4:
recommendations.append("Monitor for memory pressure; increase executor memory if needed")
elif executor_memory_gb > 32:
recommendations.append("Very high memory allocation; ensure cluster can support this")
# General recommendations
recommendations.extend([
"Monitor Spark UI for task skew and adjust partitioning if needed",
"Enable adaptive query execution for dynamic optimization",
"Use caching for DataFrames accessed multiple times",
"Consider broadcast joins for small tables (<200MB)"
])
return recommendations
# === MAIN ANALYSIS EXECUTION ===
print("🔍 Analyzing DataFrame...")
start_time = time.time()
# Get all analysis data
metadata = get_dataframe_metadata()
blocks_info = get_hdfs_block_analysis()
config = calculate_optimal_configuration(metadata, blocks_info)
spark_submit_cmd = generate_spark_submit_command(config)
recommendations = get_optimization_recommendations(metadata, config)
analysis_time = time.time() - start_time
# === COMPILE COMPLETE RESULTS ===
results = {
'analysis_summary': {
'analysis_time_seconds': round(analysis_time, 2),
'dataframe_metadata': metadata,
'hdfs_blocks_analysis': blocks_info,
'operation_type': operation_type
},
'optimal_configuration': config,
'spark_submit_command': spark_submit_cmd,
'optimization_recommendations': recommendations,
'cluster_requirements': {
'minimum_cores': config['num_executors'] * config['executor_cores'] + 2, # +2 for driver
'minimum_memory_gb': config['num_executors'] * int(config['executor_memory'][:-1]) + int(config['driver_memory'][:-1]),
'recommended_nodes': max(2, math.ceil(config['num_executors'] / 4)) # Assuming 4 executors per node
},
'monitoring_commands': {
'spark_ui': "Access Spark UI at http://<driver-node>:4040",
'yarn_ui': "yarn application -list -appStates RUNNING",
'kill_job': "yarn application -kill <application_id>"
}
}
return results
# === USAGE EXAMPLES ===
def print_analysis_report(analysis_results):
"""Print a comprehensive analysis report"""
print("=" * 80)
print("🚀 DATAFRAME ANALYSIS & OPTIMAL SPARK CONFIGURATION")
print("=" * 80)
# Analysis Summary
summary = analysis_results['analysis_summary']
print(f"\n📊 DATAFRAME ANALYSIS (completed in {summary['analysis_time_seconds']}s)")
print("-" * 50)
metadata = summary['dataframe_metadata']
if 'error' not in metadata:
print(f"• Rows: {metadata['row_count']}")
print(f"• Columns: {metadata['schema_info']['column_count']}")
print(f"• Current Partitions: {metadata['current_partitions']}")
print(f"• Estimated Size: {metadata['estimated_size_gb']} GB")
print(f"• Avg Row Size: {metadata['avg_row_size_bytes']} bytes")
blocks = summary['hdfs_blocks_analysis']
if 'estimated_total_blocks' in blocks:
print(f"• HDFS Blocks: {blocks['estimated_total_blocks']}")
print(f"• Files: {blocks['total_files']}")
# Optimal Configuration
print(f"\n⚙️ OPTIMAL CONFIGURATION")
print("-" * 50)
config = analysis_results['optimal_configuration']
print(f"• Executor Memory: {config['executor_memory']}")
print(f"• Driver Memory: {config['driver_memory']}")
print(f"• Executor Cores: {config['executor_cores']}")
print(f"• Number of Executors: {config['num_executors']}")
print(f"• Shuffle Partitions: {config['shuffle_partitions']}")
print(f"• Max Partition Bytes: {config['max_partition_bytes']}")
# Cluster Requirements
print(f"\n🏗️ CLUSTER REQUIREMENTS")
print("-" * 50)
cluster = analysis_results['cluster_requirements']
print(f"• Minimum Cores: {cluster['minimum_cores']}")
print(f"• Minimum Memory: {cluster['minimum_memory_gb']} GB")
print(f"• Recommended Nodes: {cluster['recommended_nodes']}")
# Spark Submit Command
print(f"\n🚀 SPARK SUBMIT COMMAND")
print("-" * 50)
print(analysis_results['spark_submit_command'])
# Recommendations
print(f"\n💡 OPTIMIZATION RECOMMENDATIONS")
print("-" * 50)
for i, rec in enumerate(analysis_results['optimization_recommendations'], 1):
print(f"{i}. {rec}")
print("\n" + "=" * 80)
# === MAIN USAGE FUNCTION ===
def analyze_and_optimize_dataframe(df, operation_type="read", cluster_cores=None, cluster_memory_gb=None):
"""
Main function to analyze DataFrame and get complete optimization strategy
Usage:
df = spark.table("your_database.your_table").filter("your_condition")
results = analyze_and_optimize_dataframe(df, "transform", cluster_cores=64, cluster_memory_gb=256)
print_analysis_report(results)
"""
results = analyze_dataframe_and_get_optimal_config(
df,
operation_type=operation_type,
target_cluster_cores=cluster_cores,
target_cluster_memory_gb=cluster_memory_gb
)
return results
# Example usage:
"""
# Initialize Spark
spark = SparkSession.builder.appName("DataFrame Analysis").getOrCreate()
# Your DataFrame
df = spark.table("sales_db.billion_row_table") \
.filter("date >= '2024-01-01'") \
.filter("amount > 1000")
# Analyze and get optimal configuration
results = analyze_and_optimize_dataframe(
df,
operation_type="transform", # or "read", "write", "ml", "join"
cluster_cores=64, # Your cluster's total cores
cluster_memory_gb=256 # Your cluster's total memory
)
# Print comprehensive report
print_analysis_report(results)
# Use the spark-submit command from results['spark_submit_command']
"""
https://www.perplexity.ai/search/how-data-is-read-in-distribute-rRv8YfujQ0Cck5DRZRTfQQ
Comments
Post a Comment