Tirumala Laddu Controversy : లడ్డూ ప్రసాదంపై తలెత్తిన వివాదం గురించి రమణ దీక్షితులు ఏమన్నారు?
"""
BigQuery Sample DAG with Custom Logging
Three tasks: read count from customer table, write to results table, and completion
"""
# =============================================================================
# IMPORTS
# =============================================================================
import os
import sys
from datetime import datetime, timedelta
# Add the current directory to Python path for custom logger import
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# =============================================================================
# GLOBAL VARIABLES
# =============================================================================
GLOBAL_LOGGER = None
# =============================================================================
# CONFIGURATION CONSTANTS
# =============================================================================
# GCS Configuration
GCS_BUCKET_SQL = "test-project"
GCS_BUCKET_LOGS = "test-project-staging"
GCS_FOLDER_SQL = "sql"
GCS_FOLDER_LOGS = "logs"
# Custom Logger Path Configuration
CUSTOM_LOGGER_PATHS = [
"/opt/airflow/dags/project-usercase/myfolder/customlogger.py",
"/opt/airflow/dags/project-usercase/customlogger.py",
"/home/airflow/gcs/dags/project-usercase/myfolder/customlogger.py",
"/home/airflow/gcs/dags/project-usercase/customlogger.py"
]
# BigQuery Configuration
PROJECT_ID = "your-gcp-project-id"
DATASET_ID = "analytics"
CUSTOMER_TABLE = f"{PROJECT_ID}.{DATASET_ID}.customer"
RESULTS_TABLE = f"{PROJECT_ID}.{DATASET_ID}.customer_analytics_results"
# =============================================================================
# SQL QUERIES
# =============================================================================
CUSTOMER_COUNT_QUERY = """
SELECT
COUNT(*) as customer_count,
COUNT(CASE WHEN status = 'active' THEN 1 END) as active_customers,
COUNT(CASE WHEN status = 'inactive' THEN 1 END) as inactive_customers,
DATE(CURRENT_DATE()) as analysis_date,
'analysis_v1' as analysis_version
FROM `{customer_table}`
WHERE created_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
""".format(customer_table=CUSTOMER_TABLE)
INSERT_RESULTS_QUERY = """
INSERT INTO `{results_table}` (
customer_count,
active_customers,
inactive_customers,
analysis_date,
analysis_version,
processed_at
)
VALUES (
{customer_count},
{active_customers},
{inactive_customers},
'{analysis_date}',
'{analysis_version}',
CURRENT_TIMESTAMP()
)
"""
# =============================================================================
# CUSTOM LOGGER INITIALIZATION
# =============================================================================
def load_custom_logger_from_gcp():
"""Load custom logger by reading the file from GCP folder"""
try:
# Try to read the custom logger file from GCP folder using dynamic paths
logger_code = None
found_path = None
for path in CUSTOM_LOGGER_PATHS:
try:
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as f:
logger_code = f.read()
found_path = path
print(f"Found custom logger at: {path}")
break
except Exception as e:
print(f"Could not read {path}: {e}")
continue
if logger_code:
# Create a namespace to execute the code
namespace = {}
# Execute the custom logger code
exec(logger_code, namespace)
# Return the create_logger function
if 'create_logger' in namespace:
print(f"Successfully loaded custom logger from: {found_path}")
return namespace['create_logger']
else:
print("create_logger function not found in the file")
return create_fallback_logger
else:
print(f"Custom logger file not found in any of these paths: {CUSTOM_LOGGER_PATHS}")
return create_fallback_logger
except Exception as e:
print(f"Failed to load custom logger from GCP: {e}")
return create_fallback_logger
def create_fallback_logger(*args, **kwargs):
"""Fallback logger when custom logger is not available"""
import logging
class FallbackLogger:
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger("fallback_logger")
self.logger.setLevel(logging.INFO)
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def start_logging(self, context=None):
self.logger.info("Fallback logger started")
def stop_logging(self, success=True):
status = "SUCCESS" if success else "FAILED"
self.logger.info(f"Fallback logger stopped: {status}")
def info(self, message):
self.logger.info(message)
def warning(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
return FallbackLogger(*args, **kwargs)
# Initialize the custom logger
create_logger = load_custom_logger_from_gcp()
# =============================================================================
# AIRFLOW IMPORTS
# =============================================================================
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryExecuteQueryOperator,
BigQueryInsertJobOperator
)
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
# =============================================================================
# DAG CONFIGURATION
# =============================================================================
DEFAULT_ARGS = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
'sla': timedelta(hours=1)
}
# =============================================================================
# TASK FUNCTIONS
# =============================================================================
def _initialize_logger(**context):
"""Initialize global logger once"""
global GLOBAL_LOGGER
try:
# Create logger only once
GLOBAL_LOGGER = create_logger(
job_type="bigquery_analytics",
gcs_bucket=GCS_BUCKET_LOGS,
gcs_folder=GCS_FOLDER_LOGS,
context=context
)
# Start logging
GLOBAL_LOGGER.start_logging(context)
GLOBAL_LOGGER.info("BigQuery analytics job initialized")
GLOBAL_LOGGER.info(f"Target tables: {CUSTOMER_TABLE} -> {RESULTS_TABLE}")
GLOBAL_LOGGER.info(f"GCS Bucket SQL: {GCS_BUCKET_SQL}")
GLOBAL_LOGGER.info(f"GCS Bucket Logs: {GCS_BUCKET_LOGS}")
return "logger_initialized"
except Exception as e:
print(f"Logger initialization failed: {e}")
raise
def _execute_customer_count_query(**context):
"""Execute customer count query and store results"""
global GLOBAL_LOGGER
try:
if not GLOBAL_LOGGER:
raise ValueError("Logger not initialized")
GLOBAL_LOGGER.info("Starting customer count analysis")
GLOBAL_LOGGER.info(f"Executing query on table: {CUSTOMER_TABLE}")
# Execute BigQuery query
bq_hook = BigQueryHook()
try:
query_job = bq_hook.run(
sql=CUSTOMER_COUNT_QUERY,
use_legacy_sql=False,
location='US'
)
# Process results
results = query_job.to_dataframe()
if results.empty:
GLOBAL_LOGGER.warning("Query returned no results")
return None
# Extract results
row = results.iloc[0]
customer_count = int(row['customer_count'])
active_customers = int(row['active_customers'])
inactive_customers = int(row['inactive_customers'])
analysis_date = str(row['analysis_date'])
analysis_version = str(row['analysis_version'])
# Log results
GLOBAL_LOGGER.info(f"Analysis results:")
GLOBAL_LOGGER.info(f" - Total customers: {customer_count}")
GLOBAL_LOGGER.info(f" - Active customers: {active_customers}")
GLOBAL_LOGGER.info(f" - Inactive customers: {inactive_customers}")
GLOBAL_LOGGER.info(f" - Analysis date: {analysis_date}")
GLOBAL_LOGGER.info(f" - Analysis version: {analysis_version}")
# Store results in XCom
analysis_results = {
'customer_count': customer_count,
'active_customers': active_customers,
'inactive_customers': inactive_customers,
'analysis_date': analysis_date,
'analysis_version': analysis_version,
'query_execution_time': datetime.now().isoformat()
}
context['task_instance'].xcom_push(
key='customer_analysis_results',
value=analysis_results
)
GLOBAL_LOGGER.info("Customer count analysis completed")
return analysis_results
except Exception as query_error:
GLOBAL_LOGGER.error(f"Query execution failed: {query_error}")
raise
except Exception as e:
if GLOBAL_LOGGER:
GLOBAL_LOGGER.error(f"Customer count task failed: {e}")
raise
def _write_results_to_bigquery(**context):
"""Write results to BigQuery table and complete job"""
global GLOBAL_LOGGER
try:
if not GLOBAL_LOGGER:
raise ValueError("Logger not initialized")
# Get results from previous task
analysis_results = context['task_instance'].xcom_pull(
task_ids='execute_customer_count_query',
key='customer_analysis_results'
)
if not analysis_results:
raise ValueError("Analysis results not found")
GLOBAL_LOGGER.info("Starting results writing to BigQuery")
GLOBAL_LOGGER.info(f"Target table: {RESULTS_TABLE}")
# Construct insert query
insert_query = INSERT_RESULTS_QUERY.format(
results_table=RESULTS_TABLE,
customer_count=analysis_results['customer_count'],
active_customers=analysis_results['active_customers'],
inactive_customers=analysis_results['inactive_customers'],
analysis_date=analysis_results['analysis_date'],
analysis_version=analysis_results['analysis_version']
)
GLOBAL_LOGGER.info("Constructed insert query")
# Execute insert
bq_hook = BigQueryHook()
try:
insert_job = bq_hook.run(
sql=insert_query,
use_legacy_sql=False,
location='US'
)
GLOBAL_LOGGER.info("Results successfully written to BigQuery")
GLOBAL_LOGGER.info(f"Insert job ID: {insert_job.job_id}")
GLOBAL_LOGGER.info(f"Target table: {RESULTS_TABLE}")
GLOBAL_LOGGER.info(f"Records inserted: 1")
# Store completion data
completion_data = {
'insert_job_id': insert_job.job_id,
'target_table': RESULTS_TABLE,
'records_inserted': 1,
'completion_time': datetime.now().isoformat(),
'analysis_results': analysis_results
}
context['task_instance'].xcom_push(
key='insert_completion',
value=completion_data
)
# Complete job
GLOBAL_LOGGER.info("BigQuery analytics job completed successfully")
GLOBAL_LOGGER.stop_logging(success=True)
return completion_data
except Exception as insert_error:
GLOBAL_LOGGER.error(f"Insert operation failed: {insert_error}")
GLOBAL_LOGGER.stop_logging(success=False)
raise
except Exception as e:
if GLOBAL_LOGGER:
GLOBAL_LOGGER.error(f"Results writing task failed: {e}")
GLOBAL_LOGGER.stop_logging(success=False)
raise
# =============================================================================
# DAG DEFINITION
# =============================================================================
dag = DAG(
'test-bigquery',
default_args=DEFAULT_ARGS,
description='BigQuery analytics with custom logging',
schedule_interval='0 2 * * *', # Daily at 2 AM
catchup=False,
max_active_runs=1,
tags=['bigquery', 'analytics', 'custom-logging']
)
# =============================================================================
# TASK DEFINITIONS
# =============================================================================
# Task 1: Initialize logger
initialize_logger_task = PythonOperator(
task_id='initialize_logger',
python_callable=_initialize_logger,
provide_context=True,
dag=dag,
doc="Initialize global custom logger"
)
# Task 2: Execute customer count query
customer_count_task = PythonOperator(
task_id='execute_customer_count_query',
python_callable=_execute_customer_count_query,
provide_context=True,
dag=dag,
doc="Execute customer count analysis"
)
# Task 3: Write results to BigQuery
write_results_task = PythonOperator(
task_id='write_results_to_bigquery',
python_callable=_write_results_to_bigquery,
provide_context=True,
dag=dag,
doc="Write results to BigQuery table and complete job"
)
# =============================================================================
# TASK DEPENDENCIES
# =============================================================================
initialize_logger_task >> customer_count_task >> write_results_task
Comments
Post a Comment