@@ -1087,12 +1087,23 @@ def check_and_change_state_before_execution( # pylint: disable=too-many-argumen
1087
1087
self .log .info ("Executing %s on %s" , self .task , self .execution_date )
1088
1088
return True
1089
1089
1090
- def _date_or_empty (self , attr ):
1091
- if hasattr (self , attr ):
1092
- date = getattr (self , attr )
1093
- if date :
1094
- return date .strftime ('%Y%m%dT%H%M%S' )
1095
- return ''
1090
+ def _date_or_empty (self , attr : str ):
1091
+ result = getattr (self , attr , None ) # type: datetime
1092
+ return result .strftime ('%Y%m%dT%H%M%S' ) if result else ''
1093
+
1094
+ def _log_state (self , lead_msg : str = '' ):
1095
+ self .log .info (
1096
+ '%sMarking task as %s.'
1097
+ + ' dag_id=%s, task_id=%s,'
1098
+ + ' execution_date=%s, start_date=%s, end_date=%s' ,
1099
+ lead_msg ,
1100
+ self .state .upper (),
1101
+ self .dag_id ,
1102
+ self .task_id ,
1103
+ self ._date_or_empty ('execution_date' ),
1104
+ self ._date_or_empty ('start_date' ),
1105
+ self ._date_or_empty ('end_date' ),
1106
+ )
1096
1107
1097
1108
@provide_session
1098
1109
@Sentry .enrich_errors
@@ -1147,15 +1158,6 @@ def _run_raw_task(
1147
1158
self .log .info (e )
1148
1159
self .refresh_from_db (lock_for_update = True )
1149
1160
self .state = State .SKIPPED
1150
- self .log .info (
1151
- 'Marking task as SKIPPED. '
1152
- 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s' ,
1153
- self .dag_id ,
1154
- self .task_id ,
1155
- self ._date_or_empty ('execution_date' ),
1156
- self ._date_or_empty ('start_date' ),
1157
- self ._date_or_empty ('end_date' ),
1158
- )
1159
1161
except AirflowRescheduleException as reschedule_exception :
1160
1162
self .refresh_from_db ()
1161
1163
self ._handle_reschedule (actual_start_date , reschedule_exception , test_mode )
@@ -1181,17 +1183,9 @@ def _run_raw_task(
1181
1183
finally :
1182
1184
Stats .incr (f'ti.finish.{ task .dag_id } .{ task .task_id } .{ self .state } ' )
1183
1185
1184
- # Recording SUCCESS
1186
+ # Recording SKIPPED or SUCCESS
1185
1187
self .end_date = timezone .utcnow ()
1186
- self .log .info (
1187
- 'Marking task as SUCCESS. '
1188
- 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s' ,
1189
- self .dag_id ,
1190
- self .task_id ,
1191
- self ._date_or_empty ('execution_date' ),
1192
- self ._date_or_empty ('start_date' ),
1193
- self ._date_or_empty ('end_date' ),
1194
- )
1188
+ self ._log_state ()
1195
1189
self .set_duration ()
1196
1190
if not test_mode :
1197
1191
session .add (Log (self .state , self ))
@@ -1458,25 +1452,12 @@ def handle_failure(
1458
1452
1459
1453
if force_fail or not self .is_eligible_to_retry ():
1460
1454
self .state = State .FAILED
1461
- if force_fail :
1462
- log_message = "Immediate failure requested. Marking task as FAILED."
1463
- else :
1464
- log_message = "Marking task as FAILED."
1465
1455
email_for_state = task .email_on_failure
1466
1456
else :
1467
1457
self .state = State .UP_FOR_RETRY
1468
- log_message = "Marking task as UP_FOR_RETRY."
1469
1458
email_for_state = task .email_on_retry
1470
1459
1471
- self .log .info (
1472
- '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s' ,
1473
- log_message ,
1474
- self .dag_id ,
1475
- self .task_id ,
1476
- self ._safe_date ('execution_date' , '%Y%m%dT%H%M%S' ),
1477
- self ._safe_date ('start_date' , '%Y%m%dT%H%M%S' ),
1478
- self ._safe_date ('end_date' , '%Y%m%dT%H%M%S' ),
1479
- )
1460
+ self ._log_state ('Immediate failure requested. ' if force_fail else '' )
1480
1461
if email_for_state and task .email :
1481
1462
try :
1482
1463
self .email_alert (error )
@@ -1502,12 +1483,6 @@ def is_eligible_to_retry(self):
1502
1483
"""Is task instance is eligible for retry"""
1503
1484
return self .task .retries and self .try_number <= self .max_tries
1504
1485
1505
- def _safe_date (self , date_attr , fmt ):
1506
- result = getattr (self , date_attr , None )
1507
- if result is not None :
1508
- return result .strftime (fmt )
1509
- return ''
1510
-
1511
1486
@provide_session
1512
1487
def get_template_context (self , session = None ) -> Context : # pylint: disable=too-many-locals
1513
1488
"""Return TI Context"""
0 commit comments