Tirumala Laddu Controversy : లడ్డూ ప్రసాదంపై తలెత్తిన వివాదం గురించి రమణ దీక్షితులు ఏమన్నారు?




"""
BigQuery Sample DAG with Custom Logging
Three tasks: read count from customer table, write to results table, and completion
"""

# =============================================================================
# IMPORTS
# =============================================================================
import os
import sys
from datetime import datetime, timedelta

# Add the current directory to Python path for custom logger import
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

# =============================================================================
# GLOBAL VARIABLES
# =============================================================================
GLOBAL_LOGGER = None

# =============================================================================
# CONFIGURATION CONSTANTS
# =============================================================================
# GCS Configuration
GCS_BUCKET_SQL = "test-project"
GCS_BUCKET_LOGS = "test-project-staging"
GCS_FOLDER_SQL = "sql"
GCS_FOLDER_LOGS = "logs"

# Custom Logger Path Configuration
CUSTOM_LOGGER_PATHS = [
    "/opt/airflow/dags/project-usercase/myfolder/customlogger.py",
    "/opt/airflow/dags/project-usercase/customlogger.py",
    "/home/airflow/gcs/dags/project-usercase/myfolder/customlogger.py",
    "/home/airflow/gcs/dags/project-usercase/customlogger.py"
]

# BigQuery Configuration
PROJECT_ID = "your-gcp-project-id"
DATASET_ID = "analytics"
CUSTOMER_TABLE = f"{PROJECT_ID}.{DATASET_ID}.customer"
RESULTS_TABLE = f"{PROJECT_ID}.{DATASET_ID}.customer_analytics_results"

# =============================================================================
# SQL QUERIES
# =============================================================================
CUSTOMER_COUNT_QUERY = """
SELECT
    COUNT(*) as customer_count,
    COUNT(CASE WHEN status = 'active' THEN 1 END) as active_customers,
    COUNT(CASE WHEN status = 'inactive' THEN 1 END) as inactive_customers,
    DATE(CURRENT_DATE()) as analysis_date,
    'analysis_v1' as analysis_version
FROM `{customer_table}`
WHERE created_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
""".format(customer_table=CUSTOMER_TABLE)

INSERT_RESULTS_QUERY = """
INSERT INTO `{results_table}` (
    customer_count,
    active_customers,
    inactive_customers,
    analysis_date,
    analysis_version,
    processed_at
)
VALUES (
    {customer_count},
    {active_customers},
    {inactive_customers},
    '{analysis_date}',
    '{analysis_version}',
    CURRENT_TIMESTAMP()
)
"""

# =============================================================================
# CUSTOM LOGGER INITIALIZATION
# =============================================================================
def load_custom_logger_from_gcp():
    """Load custom logger by reading the file from GCP folder"""
    try:
        # Try to read the custom logger file from GCP folder using dynamic paths
        logger_code = None
        found_path = None
       
        for path in CUSTOM_LOGGER_PATHS:
            try:
                if os.path.exists(path):
                    with open(path, 'r', encoding='utf-8') as f:
                        logger_code = f.read()
                    found_path = path
                    print(f"Found custom logger at: {path}")
                    break
            except Exception as e:
                print(f"Could not read {path}: {e}")
                continue
       
        if logger_code:
            # Create a namespace to execute the code
            namespace = {}
           
            # Execute the custom logger code
            exec(logger_code, namespace)
           
            # Return the create_logger function
            if 'create_logger' in namespace:
                print(f"Successfully loaded custom logger from: {found_path}")
                return namespace['create_logger']
            else:
                print("create_logger function not found in the file")
                return create_fallback_logger
        else:
            print(f"Custom logger file not found in any of these paths: {CUSTOM_LOGGER_PATHS}")
            return create_fallback_logger
           
    except Exception as e:
        print(f"Failed to load custom logger from GCP: {e}")
        return create_fallback_logger


def create_fallback_logger(*args, **kwargs):
    """Fallback logger when custom logger is not available"""
    import logging
   
    class FallbackLogger:
        def __init__(self, *args, **kwargs):
            self.logger = logging.getLogger("fallback_logger")
            self.logger.setLevel(logging.INFO)
           
            if not self.logger.handlers:
                handler = logging.StreamHandler()
                formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
                handler.setFormatter(formatter)
                self.logger.addHandler(handler)
       
        def start_logging(self, context=None):
            self.logger.info("Fallback logger started")
       
        def stop_logging(self, success=True):
            status = "SUCCESS" if success else "FAILED"
            self.logger.info(f"Fallback logger stopped: {status}")
       
        def info(self, message):
            self.logger.info(message)
       
        def warning(self, message):
            self.logger.warning(message)
       
        def error(self, message):
            self.logger.error(message)
   
    return FallbackLogger(*args, **kwargs)


# Initialize the custom logger
create_logger = load_custom_logger_from_gcp()

# =============================================================================
# AIRFLOW IMPORTS
# =============================================================================
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryExecuteQueryOperator,
    BigQueryInsertJobOperator
)
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook

# =============================================================================
# DAG CONFIGURATION
# =============================================================================
DEFAULT_ARGS = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=1)
}

# =============================================================================
# TASK FUNCTIONS
# =============================================================================
def _initialize_logger(**context):
    """Initialize global logger once"""
    global GLOBAL_LOGGER
   
    try:
        # Create logger only once
        GLOBAL_LOGGER = create_logger(
            job_type="bigquery_analytics",
            gcs_bucket=GCS_BUCKET_LOGS,
            gcs_folder=GCS_FOLDER_LOGS,
            context=context
        )
       
        # Start logging
        GLOBAL_LOGGER.start_logging(context)
       
        GLOBAL_LOGGER.info("BigQuery analytics job initialized")
        GLOBAL_LOGGER.info(f"Target tables: {CUSTOMER_TABLE} -> {RESULTS_TABLE}")
        GLOBAL_LOGGER.info(f"GCS Bucket SQL: {GCS_BUCKET_SQL}")
        GLOBAL_LOGGER.info(f"GCS Bucket Logs: {GCS_BUCKET_LOGS}")
       
        return "logger_initialized"
       
    except Exception as e:
        print(f"Logger initialization failed: {e}")
        raise


def _execute_customer_count_query(**context):
    """Execute customer count query and store results"""
    global GLOBAL_LOGGER
   
    try:
        if not GLOBAL_LOGGER:
            raise ValueError("Logger not initialized")
       
        GLOBAL_LOGGER.info("Starting customer count analysis")
        GLOBAL_LOGGER.info(f"Executing query on table: {CUSTOMER_TABLE}")
       
        # Execute BigQuery query
        bq_hook = BigQueryHook()
       
        try:
            query_job = bq_hook.run(
                sql=CUSTOMER_COUNT_QUERY,
                use_legacy_sql=False,
                location='US'
            )
           
            # Process results
            results = query_job.to_dataframe()
           
            if results.empty:
                GLOBAL_LOGGER.warning("Query returned no results")
                return None
           
            # Extract results
            row = results.iloc[0]
            customer_count = int(row['customer_count'])
            active_customers = int(row['active_customers'])
            inactive_customers = int(row['inactive_customers'])
            analysis_date = str(row['analysis_date'])
            analysis_version = str(row['analysis_version'])
           
            # Log results
            GLOBAL_LOGGER.info(f"Analysis results:")
            GLOBAL_LOGGER.info(f"  - Total customers: {customer_count}")
            GLOBAL_LOGGER.info(f"  - Active customers: {active_customers}")
            GLOBAL_LOGGER.info(f"  - Inactive customers: {inactive_customers}")
            GLOBAL_LOGGER.info(f"  - Analysis date: {analysis_date}")
            GLOBAL_LOGGER.info(f"  - Analysis version: {analysis_version}")
           
            # Store results in XCom
            analysis_results = {
                'customer_count': customer_count,
                'active_customers': active_customers,
                'inactive_customers': inactive_customers,
                'analysis_date': analysis_date,
                'analysis_version': analysis_version,
                'query_execution_time': datetime.now().isoformat()
            }
           
            context['task_instance'].xcom_push(
                key='customer_analysis_results',
                value=analysis_results
            )
           
            GLOBAL_LOGGER.info("Customer count analysis completed")
            return analysis_results
           
        except Exception as query_error:
            GLOBAL_LOGGER.error(f"Query execution failed: {query_error}")
            raise
           
    except Exception as e:
        if GLOBAL_LOGGER:
            GLOBAL_LOGGER.error(f"Customer count task failed: {e}")
        raise


def _write_results_to_bigquery(**context):
    """Write results to BigQuery table and complete job"""
    global GLOBAL_LOGGER
   
    try:
        if not GLOBAL_LOGGER:
            raise ValueError("Logger not initialized")
       
        # Get results from previous task
        analysis_results = context['task_instance'].xcom_pull(
            task_ids='execute_customer_count_query',
            key='customer_analysis_results'
        )
       
        if not analysis_results:
            raise ValueError("Analysis results not found")
       
        GLOBAL_LOGGER.info("Starting results writing to BigQuery")
        GLOBAL_LOGGER.info(f"Target table: {RESULTS_TABLE}")
       
        # Construct insert query
        insert_query = INSERT_RESULTS_QUERY.format(
            results_table=RESULTS_TABLE,
            customer_count=analysis_results['customer_count'],
            active_customers=analysis_results['active_customers'],
            inactive_customers=analysis_results['inactive_customers'],
            analysis_date=analysis_results['analysis_date'],
            analysis_version=analysis_results['analysis_version']
        )
       
        GLOBAL_LOGGER.info("Constructed insert query")
       
        # Execute insert
        bq_hook = BigQueryHook()
       
        try:
            insert_job = bq_hook.run(
                sql=insert_query,
                use_legacy_sql=False,
                location='US'
            )
           
            GLOBAL_LOGGER.info("Results successfully written to BigQuery")
            GLOBAL_LOGGER.info(f"Insert job ID: {insert_job.job_id}")
            GLOBAL_LOGGER.info(f"Target table: {RESULTS_TABLE}")
            GLOBAL_LOGGER.info(f"Records inserted: 1")
           
            # Store completion data
            completion_data = {
                'insert_job_id': insert_job.job_id,
                'target_table': RESULTS_TABLE,
                'records_inserted': 1,
                'completion_time': datetime.now().isoformat(),
                'analysis_results': analysis_results
            }
           
            context['task_instance'].xcom_push(
                key='insert_completion',
                value=completion_data
            )
           
            # Complete job
            GLOBAL_LOGGER.info("BigQuery analytics job completed successfully")
            GLOBAL_LOGGER.stop_logging(success=True)
           
            return completion_data
           
        except Exception as insert_error:
            GLOBAL_LOGGER.error(f"Insert operation failed: {insert_error}")
            GLOBAL_LOGGER.stop_logging(success=False)
            raise
           
    except Exception as e:
        if GLOBAL_LOGGER:
            GLOBAL_LOGGER.error(f"Results writing task failed: {e}")
            GLOBAL_LOGGER.stop_logging(success=False)
        raise

# =============================================================================
# DAG DEFINITION
# =============================================================================
dag = DAG(
    'test-bigquery',
    default_args=DEFAULT_ARGS,
    description='BigQuery analytics with custom logging',
    schedule_interval='0 2 * * *',  # Daily at 2 AM
    catchup=False,
    max_active_runs=1,
    tags=['bigquery', 'analytics', 'custom-logging']
)

# =============================================================================
# TASK DEFINITIONS
# =============================================================================
# Task 1: Initialize logger
initialize_logger_task = PythonOperator(
    task_id='initialize_logger',
    python_callable=_initialize_logger,
    provide_context=True,
    dag=dag,
    doc="Initialize global custom logger"
)

# Task 2: Execute customer count query
customer_count_task = PythonOperator(
    task_id='execute_customer_count_query',
    python_callable=_execute_customer_count_query,
    provide_context=True,
    dag=dag,
    doc="Execute customer count analysis"
)

# Task 3: Write results to BigQuery
write_results_task = PythonOperator(
    task_id='write_results_to_bigquery',
    python_callable=_write_results_to_bigquery,
    provide_context=True,
    dag=dag,
    doc="Write results to BigQuery table and complete job"
)

# =============================================================================
# TASK DEPENDENCIES
# =============================================================================
initialize_logger_task >> customer_count_task >> write_results_task

Comments

Popular posts from this blog

Prompt Engineering in 2025