Reliable Job Control Pattern for Long-Running Market Analysis
January 30, 2025
The Challenge
When dealing with long-running market analysis jobs in a distributed system, one of the key challenges is implementing reliable control mechanisms. The specific requirements we faced were:
- Allow users to pause/resume analysis through Telegram buttons
- Handle unstable connections that might trigger multiple control actions
- Support recovery through admin tools (BullBoard)
- Maintain job state consistency
- Avoid corrupting the analysis state
The Solution: Job Chain Pattern with Redis Control
Instead of treating our market analysis as a single long-running job, we designed a chain of smaller jobs, each representing a step in the analysis. This approach, combined with Redis-based control mechanisms, provides several benefits:
1. Job Structure
interface MarketRankingJob {
type: 'INIT' | 'RANKING';
gistId: string;
rankingStrategy: RankingStrategy;
config?: RankingConfig;
action?: 'START' | 'RESUME' | 'REFINE';
jobChainId?: string;
sequence?: number;
}
2. Control Mechanism
We use Redis locks with TTL for control:
- Each job chain has a unique control key
- Pause/resume operations are atomic
- 24h TTL prevents permanent stuck states
- Control state is independent of analysis state
3. Recovery Paths
Multiple ways to recover:
- Auto-recovery through job chain (each job creates next)
- Manual recovery via BullBoard (retry any job in chain)
- Force resume through admin tools
- TTL-based stuck state resolution
Implementation Details
Job Controller
The RankingJobController manages the control state:
- Uses Redis SET NX for atomic operations
- Maintains job chain continuity
- Provides admin recovery methods
Consumer Logic
The consumer implements the checkpoint pattern:
- Check pause state before processing
- Use checkpoint data to resume work
- Create next job in chain
- Maintain sequence numbers
Benefits
- Idempotency: Multiple control actions won't corrupt state
- Recoverability: Resume from any checkpoint
- Visibility: Clear job chain in BullBoard
- Safety: TTL prevents stuck states
- Atomicity: Redis ensures consistent control
- Stateless: Control separate from analysis
- Debuggability: Clear checkpoint data
Lessons Learned
- Break long-running jobs into chains
- Use atomic operations for control
- Always plan for recovery
- Keep control separate from business logic
- Make each step idempotent
Next Steps
- Implement
RankingJobController - Update
MarketRankingConsumerwith checkpoint logic - Add job chain support to handlers
- Update Telegram button handlers
- Add admin recovery tools
Code Examples
See the implementation in:
apps/mercury-bot/src/market-ranking/ranking-job.controller.tsapps/mercury-bot/src/market-ranking/market-ranking.consumer.tsapps/mercury-bot/src/telegram/handlers/market/ranking.handler.ts
References
Implementation Status (as of 2025-01-30)
Completed ✅
- Basic job processing structure
- Redis-based state management with
RedisGistService - Event-based state tracking
- Handler selection based on strategy
- Basic error handling
- E2E tests for ranking flows
In Progress 🚧
- Job control features (pause/resume)
- Job chain tracking
- Proper job status reporting
Easiest to Implement Next 🎯
-
RankingConfigFactory (Medium effort)
- Already have most config logic, just needs centralization
- Easy to test with existing market data
- Would improve code organization
-
Job Status Enhancement (Easy)
- Add rankingStrategy to job data ✓
- Add proper status reporting
- Easy to test with existing infrastructure
-
Basic Control Methods (Medium)
pauseAnalysisandresumeAnalysis- Can reuse existing event system
- Clear test cases
Requires More Planning 🤔
- Refinement logic
- Advanced error recovery
- Complex job chains
- Advanced control features
Testing Strategy
For new features, we can follow existing testing patterns:
-
Unit Tests
- RankingConfigFactory methods
- Control method logic
- Job status transitions
-
Integration Tests
- Config creation with real market data
- State transitions with Redis
- Job control flow
-
E2E Tests
- Full ranking flows with control operations
- Error recovery scenarios
- Job chain execution
notes by Max
I'll help break this down and propose a clean architecture for the ranking initialization and control flow.
Let's design each component's responsibilities:
- New Service: RankingConfigFactory
@Injectable()
class RankingConfigFactory {
constructor(
private readonly taService: TaService,
private readonly configService: ConfigService,
) {}
// Creates config for standard ranking
async createRankingConfig(
markets: string[],
timeframe?: TimeFrame,
indicators?: Indicator[],
customPrompt?: string,
rankingStrategy?: RankingStrategy,
marketCount: number = 30,
topK: number = 10,
): Promise<RankingConfig>;
// Creates config for entry analysis
async createEntryConfig(
markets: string[],
prompt: string,
): Promise<RankingConfig>;
// Creates config for exit analysis
async createExitConfig(
markets: string[],
prompt: string,
): Promise<RankingConfig>;
// Gets available markets based on config requirements
async getAvailableMarkets(
indicators: Indicator[],
timeframe: TimeFrame,
count?: number,
): Promise<string[]>;
}
- RankingAnalysisService (extended)
@Injectable()
class RankingAnalysisService {
// Existing methods...
// New methods for control
async pauseAnalysis(id: string): Promise<RankingState> {
const mutation = {
type: RankingMutationType.UPDATE_CONTROL,
payload: { isRunning: false },
};
await this.updateAnalysis(id, mutation);
return this.getAnalysis(id);
}
async resumeAnalysis(id: string): Promise<RankingState> {
const mutation = {
type: RankingMutationType.UPDATE_CONTROL,
payload: { isRunning: true },
};
await this.updateAnalysis(id, mutation);
// Submit ranking job
await this.marketRankingQueue.add('market-ranking', {
type: 'RANKING',
gistId: id,
action: 'RESUME',
});
return this.getAnalysis(id);
}
async refineAnalysis(id: string): Promise<RankingState> {
const state = await this.getAnalysis(id);
// Create refined config with higher confidence targets
const refinedConfig = {
...state.config,
targetConfidence: Math.min(state.config.targetConfidence + 0.05, 0.99),
model: Model.GPT4, // Use more powerful model for refinement
};
// Submit refinement job
await this.marketRankingQueue.add('market-ranking', {
type: 'RANKING',
gistId: id,
action: 'REFINE',
config: refinedConfig,
});
return this.getAnalysis(id);
}
}
- MarketRankingConsumer (job structure)
interface MarketRankingJob {
type: 'INIT' | 'RANKING';
gistId: string;
config?: RankingConfig; // Required for INIT
action?: 'START' | 'RESUME' | 'REFINE'; // For RANKING type
}
class MarketRankingConsumer {
async process(job: Job<MarketRankingJob>) {
const { type, gistId, config, action } = job.data;
switch (type) {
case 'INIT':
// Create initial state and artifacts
await this.rankingAnalysisService.createAnalysis(gistId, config);
// Submit ranking job
await this.queue.add('market-ranking', {
type: 'RANKING',
gistId,
action: 'START',
});
return { status: 'INITIALIZED', gistId };
case 'RANKING':
const handler = this.getHandler(config?.rankingStrategy);
const isDone = await handler.execute({ gistId });
return {
status: isDone ? 'COMPLETED' : 'IN_PROGRESS',
gistId,
action,
};
}
}
}
- TelegramHandlers (simplified)
@Injectable()
class MarketRankingHandler {
constructor(
private readonly configFactory: RankingConfigFactory,
private readonly rankingAnalysisService: RankingAnalysisService,
private readonly marketRankingQueue: Queue
) {}
async handle(ctx: Context, ...configParams) {
// Get markets and create config
const markets = await this.configFactory.getAvailableMarkets(...);
const config = await this.configFactory.createRankingConfig(markets, ...configParams);
// Submit init job
await this.marketRankingQueue.add('market-ranking', {
type: 'INIT',
gistId: uuidv4(),
config
});
}
}
This design:
- Separates config creation logic into dedicated service
- Makes handlers very thin
- Splits initialization and ranking into separate job types
- Adds proper control methods to analysis service
- Makes job data more informative for bullboard
- Keeps single responsibility for each component
Would you like me to start implementing any specific part of this design?
