Skip to main content

Reliable Job Control Pattern for Long-Running Market Analysis

· 4 min read
Max Kaido
Architect

January 30, 2025

The Challenge

When dealing with long-running market analysis jobs in a distributed system, one of the key challenges is implementing reliable control mechanisms. The specific requirements we faced were:

  1. Allow users to pause/resume analysis through Telegram buttons
  2. Handle unstable connections that might trigger multiple control actions
  3. Support recovery through admin tools (BullBoard)
  4. Maintain job state consistency
  5. Avoid corrupting the analysis state

The Solution: Job Chain Pattern with Redis Control

Instead of treating our market analysis as a single long-running job, we designed a chain of smaller jobs, each representing a step in the analysis. This approach, combined with Redis-based control mechanisms, provides several benefits:

1. Job Structure

interface MarketRankingJob {
type: 'INIT' | 'RANKING';
gistId: string;
rankingStrategy: RankingStrategy;
config?: RankingConfig;
action?: 'START' | 'RESUME' | 'REFINE';
jobChainId?: string;
sequence?: number;
}

2. Control Mechanism

We use Redis locks with TTL for control:

  • Each job chain has a unique control key
  • Pause/resume operations are atomic
  • 24h TTL prevents permanent stuck states
  • Control state is independent of analysis state

3. Recovery Paths

Multiple ways to recover:

  1. Auto-recovery through job chain (each job creates next)
  2. Manual recovery via BullBoard (retry any job in chain)
  3. Force resume through admin tools
  4. TTL-based stuck state resolution

Implementation Details

Job Controller

The RankingJobController manages the control state:

  • Uses Redis SET NX for atomic operations
  • Maintains job chain continuity
  • Provides admin recovery methods

Consumer Logic

The consumer implements the checkpoint pattern:

  1. Check pause state before processing
  2. Use checkpoint data to resume work
  3. Create next job in chain
  4. Maintain sequence numbers

Benefits

  1. Idempotency: Multiple control actions won't corrupt state
  2. Recoverability: Resume from any checkpoint
  3. Visibility: Clear job chain in BullBoard
  4. Safety: TTL prevents stuck states
  5. Atomicity: Redis ensures consistent control
  6. Stateless: Control separate from analysis
  7. Debuggability: Clear checkpoint data

Lessons Learned

  1. Break long-running jobs into chains
  2. Use atomic operations for control
  3. Always plan for recovery
  4. Keep control separate from business logic
  5. Make each step idempotent

Next Steps

  1. Implement RankingJobController
  2. Update MarketRankingConsumer with checkpoint logic
  3. Add job chain support to handlers
  4. Update Telegram button handlers
  5. Add admin recovery tools

Code Examples

See the implementation in:

  • apps/mercury-bot/src/market-ranking/ranking-job.controller.ts
  • apps/mercury-bot/src/market-ranking/market-ranking.consumer.ts
  • apps/mercury-bot/src/telegram/handlers/market/ranking.handler.ts

References

Implementation Status (as of 2025-01-30)

Completed ✅

  1. Basic job processing structure
  2. Redis-based state management with RedisGistService
  3. Event-based state tracking
  4. Handler selection based on strategy
  5. Basic error handling
  6. E2E tests for ranking flows

In Progress 🚧

  1. Job control features (pause/resume)
  2. Job chain tracking
  3. Proper job status reporting

Easiest to Implement Next 🎯

  1. RankingConfigFactory (Medium effort)

    • Already have most config logic, just needs centralization
    • Easy to test with existing market data
    • Would improve code organization
  2. Job Status Enhancement (Easy)

    • Add rankingStrategy to job data ✓
    • Add proper status reporting
    • Easy to test with existing infrastructure
  3. Basic Control Methods (Medium)

    • pauseAnalysis and resumeAnalysis
    • Can reuse existing event system
    • Clear test cases

Requires More Planning 🤔

  1. Refinement logic
  2. Advanced error recovery
  3. Complex job chains
  4. Advanced control features

Testing Strategy

For new features, we can follow existing testing patterns:

  1. Unit Tests

    • RankingConfigFactory methods
    • Control method logic
    • Job status transitions
  2. Integration Tests

    • Config creation with real market data
    • State transitions with Redis
    • Job control flow
  3. E2E Tests

    • Full ranking flows with control operations
    • Error recovery scenarios
    • Job chain execution

notes by Max

I'll help break this down and propose a clean architecture for the ranking initialization and control flow.

Let's design each component's responsibilities:

  1. New Service: RankingConfigFactory
@Injectable()
class RankingConfigFactory {
constructor(
private readonly taService: TaService,
private readonly configService: ConfigService,
) {}

// Creates config for standard ranking
async createRankingConfig(
markets: string[],
timeframe?: TimeFrame,
indicators?: Indicator[],
customPrompt?: string,
rankingStrategy?: RankingStrategy,
marketCount: number = 30,
topK: number = 10,
): Promise<RankingConfig>;

// Creates config for entry analysis
async createEntryConfig(
markets: string[],
prompt: string,
): Promise<RankingConfig>;

// Creates config for exit analysis
async createExitConfig(
markets: string[],
prompt: string,
): Promise<RankingConfig>;

// Gets available markets based on config requirements
async getAvailableMarkets(
indicators: Indicator[],
timeframe: TimeFrame,
count?: number,
): Promise<string[]>;
}
  1. RankingAnalysisService (extended)
@Injectable()
class RankingAnalysisService {
// Existing methods...

// New methods for control
async pauseAnalysis(id: string): Promise<RankingState> {
const mutation = {
type: RankingMutationType.UPDATE_CONTROL,
payload: { isRunning: false },
};
await this.updateAnalysis(id, mutation);
return this.getAnalysis(id);
}

async resumeAnalysis(id: string): Promise<RankingState> {
const mutation = {
type: RankingMutationType.UPDATE_CONTROL,
payload: { isRunning: true },
};
await this.updateAnalysis(id, mutation);
// Submit ranking job
await this.marketRankingQueue.add('market-ranking', {
type: 'RANKING',
gistId: id,
action: 'RESUME',
});
return this.getAnalysis(id);
}

async refineAnalysis(id: string): Promise<RankingState> {
const state = await this.getAnalysis(id);
// Create refined config with higher confidence targets
const refinedConfig = {
...state.config,
targetConfidence: Math.min(state.config.targetConfidence + 0.05, 0.99),
model: Model.GPT4, // Use more powerful model for refinement
};

// Submit refinement job
await this.marketRankingQueue.add('market-ranking', {
type: 'RANKING',
gistId: id,
action: 'REFINE',
config: refinedConfig,
});

return this.getAnalysis(id);
}
}
  1. MarketRankingConsumer (job structure)
interface MarketRankingJob {
type: 'INIT' | 'RANKING';
gistId: string;
config?: RankingConfig; // Required for INIT
action?: 'START' | 'RESUME' | 'REFINE'; // For RANKING type
}

class MarketRankingConsumer {
async process(job: Job<MarketRankingJob>) {
const { type, gistId, config, action } = job.data;

switch (type) {
case 'INIT':
// Create initial state and artifacts
await this.rankingAnalysisService.createAnalysis(gistId, config);
// Submit ranking job
await this.queue.add('market-ranking', {
type: 'RANKING',
gistId,
action: 'START',
});
return { status: 'INITIALIZED', gistId };

case 'RANKING':
const handler = this.getHandler(config?.rankingStrategy);
const isDone = await handler.execute({ gistId });
return {
status: isDone ? 'COMPLETED' : 'IN_PROGRESS',
gistId,
action,
};
}
}
}
  1. TelegramHandlers (simplified)
@Injectable()
class MarketRankingHandler {
constructor(
private readonly configFactory: RankingConfigFactory,
private readonly rankingAnalysisService: RankingAnalysisService,
private readonly marketRankingQueue: Queue
) {}

async handle(ctx: Context, ...configParams) {
// Get markets and create config
const markets = await this.configFactory.getAvailableMarkets(...);
const config = await this.configFactory.createRankingConfig(markets, ...configParams);

// Submit init job
await this.marketRankingQueue.add('market-ranking', {
type: 'INIT',
gistId: uuidv4(),
config
});
}
}

This design:

  1. Separates config creation logic into dedicated service
  2. Makes handlers very thin
  3. Splits initialization and ranking into separate job types
  4. Adds proper control methods to analysis service
  5. Makes job data more informative for bullboard
  6. Keeps single responsibility for each component

Would you like me to start implementing any specific part of this design?