Problem
One polling message triggered sequential processing of all entities. A slow provider (5s) blocked all fast providers (0.1s). Pods 2 and 3 sat idle.
Fix: publish one message per entity per task type
flowchart TD subgraph before [Before: 1 aggregate message] M1[1 message] --> P1[Pod 1] P1 --> A1[Provider A 5s] A1 --> B1[Provider B 0.1s] B1 --> C1[Provider C 0.1s] C1 --> D1[Provider D 0.1s] end subgraph after ["After: 1 message per entity per task type"] MA[msg A] --> PA["Pod 1<br/>Provider A 5s"] MB[msg B] --> PB["Pod 2<br/>Provider B 0.1s"] MC[msg C] --> PC["Pod 3<br/>Provider C 0.1s"] MD[msg D] --> PB end
Let RabbitMQ’s competing consumers do the load balancing.
Scheduler architecture
polling_scheduler maintains in-memory entity lists per module type, sourced from the DB and refreshed every 5 minutes:
flowchart TD start["Start()"] start --> refresh1["refreshEntities()<br/>DB → group by PollingEnabledModules"] start --> initial["triggerInitialPolling()<br/>publish all task types immediately<br/>(no waiting for first tick)"] refresh1 --> cache["locationEntities<br/>sessionEntities<br/>tariffEntities<br/>cdrEntities"] ticker5["5-min ticker"] --> refresh2["refreshEntities() → update cache"] tickerN["interval ticker<br/>(per task type)"] --> publish["publishPollingMessages(entities, taskType)"] publish --> setnx["ShouldPublish?<br/>Redis SETNX per entity per task type per window"] setnx -->|yes| msg["publish GOCPITaskMsg<br/>{TaskType, EntityParty, LastExecutedAt}"] setnx -->|no| skip[skip]
Key design decisions
GOCPITaskMsg carries the full EntityParty — the consumer processes exactly what’s in the message and never queries the DB for entities.
Entity list cached in scheduler — avoids a DB query on every tick. Refreshed every 5 minutes from is_polling_enabled=true rows.
One entity can appear in multiple lists — an entity with PollingEnabledModules = "LOCATIONS,TARIFFS" is polled independently for each module type.
Redis key scoped per entity per task type — gocpi:publish:{entityCode}:{taskType}:{alignedTimestamp} — each entity × task type combination gets its own idempotency window.
Sessions polling gated on ENABLE_SESSIONS_POLLING — env flag controls whether session entities are included in the polling cycle.
Why not separate queues per entity per task type?
- Adding a new entity would require a new queue — an infrastructure change instead of a DB config change
- Competing consumers on a single queue achieves the same parallelism naturally
See also
- setnx-dedup-multi-pod — prevents duplicate messages from multiple pods
- panic-recovery-dlq — handling panics in the consumer that processes these messages