CronProcessor Refactoring

Overview

The CronProcessor is a generic utility that eliminates code duplication across cron job methods in admin and safety controller classes. It provides a standardized approach to processing collections of database items with consistent error handling, logging, and performance tracking.

Problem Statement

Before this refactoring, cron methods across the codebase followed a repetitive pattern:

@perf_decorator(job_type=AppJobType.CR_INVITE_UNPROCESSED)
def cron_invite_unprocessed(self) -> LogPerfResult:
    main_result = AppResult('cron_invite_unprocessed')
    passed_ct = 0
    failed_ct = 0
    skipped_ct = 0

    try:
        invites = self.invite_db.get_unprocessed_invites_last_week()
        LOG().info(f"Found {len(invites)} unprocessed invites...")

        for invite in invites:
            value = self._invite_processor.process_invite_signup(invite)
            if value.is_error:
                doc_id = invite.doc_id or NO_DOC_ID
                msg = f"Error processing invite {doc_id}"
                main_result.add_error(AppErrorType.DATABASE_EX, msg)
                failed_ct += 1
            elif value.is_skipped:
                skipped_ct += 1
            else:
                passed_ct += 1

    except Exception as e:
        msg = f"Exception during cron_invite processing: {e}"
        LOG().error(msg)
        main_result.add_error(AppErrorType.DATABASE_EX, msg)

    LOG().info(f"cron_invite completed: {passed_ct} passed, {failed_ct} failed...")
    
    if main_result.is_error:
        return LogPerfResult(BinaryPerfResult.error()).error(main_result)
    else:
        return LogPerfResult(BinaryPerfResult(passed_ct=passed_ct, ...))

This pattern was repeated across:

  • friend_admin.py: cron_invite_unprocessed, cron_friend_unprocessed
  • group_admin.py: cron_group, cron_group_member_unprocessed, cron_group_member_status_unprocessed
  • user_admin.py: cron_user_unprocessed
  • restriction_controller.py: cron_inactive_restrictions, cron_expired_restrictions

Issues:

  1. ~200 lines of duplicate boilerplate code
  2. Inconsistent error messages and logging
  3. Testing requires duplicating tests for each method
  4. Bug fixes/improvements need changes in multiple locations
  5. Hard to maintain consistency across all cron jobs

Solution

Generic CronProcessor

The CronProcessor class extracts the common pattern into a reusable utility:

config = CronConfig(
    job_type=AppJobType.CR_INVITE_UNPROCESSED,
    job_name="cron_invite_unprocessed",
    query_fn=lambda: self.invite_db.get_unprocessed_invites_last_week(),
    process_fn=lambda invite: self._invite_processor.process_invite_signup(invite)
)
return CronProcessor.process(config)

Configuration Options

The CronConfig class provides flexibility:

CronConfig(
    job_type: AppJobType,          # For perf decorator and reporting
    job_name: str,                  # For logging
    query_fn: Callable[[], list[T]], # Fetches items to process
    process_fn: Callable[[T], Union[ResultValue, AppResult]], # Processes each item
    error_type: AppErrorType = AppErrorType.DATABASE_EX,  # Default error type
    report_fn: Optional[Callable] = None,  # Optional reporting callback
    skip_empty_check: bool = False  # Skip early return on empty query
)

Benefits

1. Code Reduction

Before: ~40 lines per cron method × 8 methods = ~320 lines
After: ~8 lines per cron method × 8 methods = ~64 lines
Savings: ~256 lines (80% reduction)

2. Consistent Error Handling

All cron jobs now have:

  • Standardized exception handling
  • Consistent logging format
  • Uniform performance tracking
  • Predictable error reporting

3. Simplified Testing

Before: Each cron method required separate test suite:

  • Test success cases
  • Test failure cases
  • Test skip cases
  • Test exception handling
  • Test counter tracking
  • Test logging

After: Test CronProcessor once, then only test business logic:

# One comprehensive test suite for CronProcessor
test/unit/test_cron_processor.py

# Individual admin tests only need to verify:
# - Correct query function
# - Correct process function
# No need to test error handling, counting, logging, etc.

4. Easier Maintenance

Changes to the cron pattern (e.g., adding metrics, improving logging) only require modifying one class.

5. Type Safety

Generic type parameter ensures type safety:

class CronConfig(Generic[T]):  # T is bound to DatabaseModel

Migration Examples

Friend Admin

Before (40 lines):

@perf_decorator(job_type=AppJobType.CR_INVITE_UNPROCESSED)
def cron_invite_unprocessed(self) -> LogPerfResult:
    main_result = AppResult('cron_invite_unprocessed')
    passed_ct = 0
    failed_ct = 0
    skipped_ct = 0
    # ... 30+ more lines

After (8 lines):

def cron_invite_unprocessed(self) -> LogPerfResult:
    config = CronConfig(
        job_type=AppJobType.CR_INVITE_UNPROCESSED,
        job_name="cron_invite_unprocessed",
        query_fn=lambda: self.invite_db.get_unprocessed_invites_last_week(),
        process_fn=lambda invite: self._invite_processor.process_invite_signup(invite)
    )
    return CronProcessor.process(config)

Group Admin

Before (38 lines × 3 methods = 114 lines):

  • cron_group
  • cron_group_member_unprocessed
  • cron_group_member_status_unprocessed

After (8 lines × 3 methods = 24 lines): All three methods use the same simple pattern with different configurations.

User Admin

Before (36 lines with custom filtering):

users = self.user_db.get_recent_unprocessed()
for user in users:
    if user.processing_complete:
        skipped_ct += 1
        continue
    # ... process

After (8 lines with filtering in query):

config = CronConfig(
    job_type=AppJobType.CR_USER_UNPROCESSED,
    job_name="cron_user_unprocessed",
    query_fn=lambda: [u for u in self.user_db.get_recent_unprocessed() if not u.processing_complete],
    process_fn=lambda user: self.user_processor.process_user(user, is_update=False)
)

Advanced Features

Custom Error Types

config = CronConfig(
    # ...
    error_type=AppErrorType.MODERATION  # Override default
)

Reporting Integration

config = CronConfig(
    # ...
    report_fn=lambda job_type, ok_ids, error_ids: 
        self.system_report.run_report(job_type, ok_ids, error_ids)
)

AppResult Return Type

The processor handles both ResultValue and AppResult returns:

def process_item(item):
    result = AppResult("process")
    # ... complex processing with detailed error tracking
    return result  # CronProcessor handles this automatically

Testing Strategy

Core Processor Tests (test_cron_processor.py)

  • Test all success scenarios
  • Test all failure scenarios
  • Test all skip scenarios
  • Test exception handling
  • Test counter tracking
  • Test reporting integration
  • Test empty list handling
  • Test query exceptions

Admin-Specific Tests

Focus only on business logic:

def test_friend_admin_cron_invite():
    """Test that cron_invite_unprocessed uses correct query and processor."""
    admin = FriendAdmin()
    
    # Mock the database query
    mock_invites = [Mock(), Mock()]
    admin.invite_db.get_unprocessed_invites_last_week = Mock(return_value=mock_invites)
    
    # Mock the processor
    admin._invite_processor.process_invite_signup = Mock(return_value=ResultValue.OK)
    
    # Execute
    result = admin.cron_invite_unprocessed()
    
    # Verify correct wiring (no need to test error handling, counting, etc.)
    assert admin.invite_db.get_unprocessed_invites_last_week.called
    assert admin._invite_processor.process_invite_signup.call_count == 2

Future Enhancements

Potential improvements to consider:

  1. Batch Processing: Add support for processing items in batches
  2. Progress Callbacks: Add progress reporting for long-running jobs
  3. Retry Logic: Add configurable retry for transient failures
  4. Rate Limiting: Add throttling for external API calls
  5. Parallel Processing: Add optional parallel processing for independent items
  6. Dry Run Mode: Add mode to simulate processing without side effects

Conclusion

The CronProcessor refactoring:

  • Reduces code by ~80%
  • Improves consistency across all cron jobs
  • Simplifies testing dramatically
  • Enhances maintainability
  • Maintains flexibility for special cases

This is a significant improvement in code quality and maintainability without sacrificing any functionality.