feat(webhooks): add polling optimization and fallback when webhooks are active (Phase 5)
All checks were successful
All checks were successful
This commit is contained in:
@@ -72,4 +72,64 @@ class MemoryCache {
|
|||||||
|
|
||||||
const cache = new MemoryCache();
|
const cache = new MemoryCache();
|
||||||
|
|
||||||
|
// Webhook metrics for polling optimization
|
||||||
|
// These are stored separately from regular cache entries
|
||||||
|
const webhookMetrics = {
|
||||||
|
// Per-instance metrics: key = instance URL, value = { lastWebhookTimestamp, eventsReceived, pollsSkipped }
|
||||||
|
instances: new Map(),
|
||||||
|
// Global metrics
|
||||||
|
lastGlobalWebhookTimestamp: null,
|
||||||
|
totalWebhookEventsReceived: 0
|
||||||
|
};
|
||||||
|
|
||||||
|
function getWebhookMetrics(instanceUrl) {
|
||||||
|
if (!instanceUrl) return null;
|
||||||
|
return webhookMetrics.instances.get(instanceUrl) || {
|
||||||
|
lastWebhookTimestamp: null,
|
||||||
|
eventsReceived: 0,
|
||||||
|
pollsSkipped: 0
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function updateWebhookMetrics(instanceUrl) {
|
||||||
|
const now = Date.now();
|
||||||
|
webhookMetrics.lastGlobalWebhookTimestamp = now;
|
||||||
|
webhookMetrics.totalWebhookEventsReceived++;
|
||||||
|
|
||||||
|
if (instanceUrl) {
|
||||||
|
const metrics = webhookMetrics.instances.get(instanceUrl) || {
|
||||||
|
lastWebhookTimestamp: null,
|
||||||
|
eventsReceived: 0,
|
||||||
|
pollsSkipped: 0
|
||||||
|
};
|
||||||
|
metrics.lastWebhookTimestamp = now;
|
||||||
|
metrics.eventsReceived++;
|
||||||
|
webhookMetrics.instances.set(instanceUrl, metrics);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function incrementPollsSkipped(instanceUrl) {
|
||||||
|
if (instanceUrl) {
|
||||||
|
const metrics = webhookMetrics.instances.get(instanceUrl) || {
|
||||||
|
lastWebhookTimestamp: null,
|
||||||
|
eventsReceived: 0,
|
||||||
|
pollsSkipped: 0
|
||||||
|
};
|
||||||
|
metrics.pollsSkipped++;
|
||||||
|
webhookMetrics.instances.set(instanceUrl, metrics);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getGlobalWebhookMetrics() {
|
||||||
|
return {
|
||||||
|
lastGlobalWebhookTimestamp: webhookMetrics.lastGlobalWebhookTimestamp,
|
||||||
|
totalWebhookEventsReceived: webhookMetrics.totalWebhookEventsReceived,
|
||||||
|
instances: Object.fromEntries(webhookMetrics.instances)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = cache;
|
module.exports = cache;
|
||||||
|
module.exports.getWebhookMetrics = getWebhookMetrics;
|
||||||
|
module.exports.updateWebhookMetrics = updateWebhookMetrics;
|
||||||
|
module.exports.incrementPollsSkipped = incrementPollsSkipped;
|
||||||
|
module.exports.getGlobalWebhookMetrics = getGlobalWebhookMetrics;
|
||||||
|
|||||||
@@ -14,6 +14,13 @@ const POLL_INTERVAL = (rawPollInterval === 'off' || rawPollInterval === 'false'
|
|||||||
: (parseInt(process.env.POLL_INTERVAL, 10) || 5000);
|
: (parseInt(process.env.POLL_INTERVAL, 10) || 5000);
|
||||||
const POLLING_ENABLED = POLL_INTERVAL > 0;
|
const POLLING_ENABLED = POLL_INTERVAL > 0;
|
||||||
|
|
||||||
|
// Webhook fallback timeout in minutes (default 10)
|
||||||
|
const WEBHOOK_FALLBACK_TIMEOUT_MINUTES = parseInt(process.env.WEBHOOK_FALLBACK_TIMEOUT, 10) || 10;
|
||||||
|
const WEBHOOK_FALLBACK_TIMEOUT_MS = WEBHOOK_FALLBACK_TIMEOUT_MINUTES * 60 * 1000;
|
||||||
|
|
||||||
|
// Webhook poll interval multiplier when webhooks are active (default 3x)
|
||||||
|
const WEBHOOK_POLL_INTERVAL_MULTIPLIER = parseInt(process.env.WEBHOOK_POLL_INTERVAL_MULTIPLIER, 10) || 3;
|
||||||
|
|
||||||
let polling = false;
|
let polling = false;
|
||||||
let lastPollTimings = null;
|
let lastPollTimings = null;
|
||||||
|
|
||||||
@@ -30,6 +37,42 @@ async function timed(label, fn) {
|
|||||||
return { label, result, ms: Date.now() - t0 };
|
return { label, result, ms: Date.now() - t0 };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper function to determine if instance polling should be skipped
|
||||||
|
function shouldSkipInstancePolling(instances, instanceType) {
|
||||||
|
if (!instances || instances.length === 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
let allInstancesHaveRecentWebhooks = true;
|
||||||
|
let skippedCount = 0;
|
||||||
|
|
||||||
|
for (const instance of instances) {
|
||||||
|
const metrics = cache.getWebhookMetrics(instance.url);
|
||||||
|
|
||||||
|
// Skip polling if:
|
||||||
|
// 1. Webhook events have been received (eventsReceived > 0)
|
||||||
|
// 2. Last webhook was recent (within fallback timeout)
|
||||||
|
// 3. Webhook has been enabled (we have metrics)
|
||||||
|
const hasWebhookActivity = metrics && metrics.eventsReceived > 0;
|
||||||
|
const isRecent = metrics && metrics.lastWebhookTimestamp && (now - metrics.lastWebhookTimestamp) < WEBHOOK_FALLBACK_TIMEOUT_MS;
|
||||||
|
|
||||||
|
if (hasWebhookActivity && isRecent) {
|
||||||
|
skippedCount++;
|
||||||
|
cache.incrementPollsSkipped(instance.url);
|
||||||
|
} else {
|
||||||
|
allInstancesHaveRecentWebhooks = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (allInstancesHaveRecentWebhooks && skippedCount > 0) {
|
||||||
|
console.log(`[Poller] Skipping ${instanceType} polling for ${skippedCount} instance(s) with active webhooks`);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
async function pollAllServices() {
|
async function pollAllServices() {
|
||||||
if (polling) {
|
if (polling) {
|
||||||
console.log('[Poller] Previous poll still running, skipping');
|
console.log('[Poller] Previous poll still running, skipping');
|
||||||
@@ -46,36 +89,50 @@ async function pollAllServices() {
|
|||||||
const sonarrInstances = getSonarrInstances();
|
const sonarrInstances = getSonarrInstances();
|
||||||
const radarrInstances = getRadarrInstances();
|
const radarrInstances = getRadarrInstances();
|
||||||
|
|
||||||
|
// Check webhook fallback: if no webhook events for WEBHOOK_FALLBACK_TIMEOUT, force full poll
|
||||||
|
const globalMetrics = cache.getGlobalWebhookMetrics();
|
||||||
|
const now = Date.now();
|
||||||
|
const lastWebhookTime = globalMetrics.lastGlobalWebhookTimestamp;
|
||||||
|
const fallbackTriggered = lastWebhookTime && (now - lastWebhookTime) > WEBHOOK_FALLBACK_TIMEOUT_MS;
|
||||||
|
|
||||||
|
if (fallbackTriggered) {
|
||||||
|
console.log(`[Poller] Webhook fallback triggered: no webhook events for ${WEBHOOK_FALLBACK_TIMEOUT_MINUTES} minutes, forcing full poll`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine which instances should be polled based on webhook activity
|
||||||
|
const shouldPollSonarr = fallbackTriggered || !shouldSkipInstancePolling(sonarrInstances, 'sonarr');
|
||||||
|
const shouldPollRadarr = fallbackTriggered || !shouldSkipInstancePolling(radarrInstances, 'radarr');
|
||||||
|
|
||||||
// All fetches in parallel, each individually timed
|
// All fetches in parallel, each individually timed
|
||||||
const results = await Promise.all([
|
const results = await Promise.all([
|
||||||
timed('Download Clients', async () => {
|
timed('Download Clients', async () => {
|
||||||
const downloadsByType = await getDownloadsByClientType();
|
const downloadsByType = await getDownloadsByClientType();
|
||||||
return downloadsByType;
|
return downloadsByType;
|
||||||
}),
|
}),
|
||||||
timed('Sonarr Tags', async () => {
|
shouldPollSonarr ? timed('Sonarr Tags', async () => {
|
||||||
const tagsByType = await arrRetrieverRegistry.getTagsByType();
|
const tagsByType = await arrRetrieverRegistry.getTagsByType();
|
||||||
return tagsByType.sonarr || [];
|
return tagsByType.sonarr || [];
|
||||||
}),
|
}) : timed('Sonarr Tags', async () => []),
|
||||||
timed('Sonarr Queue', async () => {
|
shouldPollSonarr ? timed('Sonarr Queue', async () => {
|
||||||
const queuesByType = await arrRetrieverRegistry.getQueuesByType();
|
const queuesByType = await arrRetrieverRegistry.getQueuesByType();
|
||||||
return queuesByType.sonarr || [];
|
return queuesByType.sonarr || [];
|
||||||
}),
|
}) : timed('Sonarr Queue', async () => []),
|
||||||
timed('Sonarr History', async () => {
|
shouldPollSonarr ? timed('Sonarr History', async () => {
|
||||||
const historyByType = await arrRetrieverRegistry.getHistoryByType({ pageSize: 10 });
|
const historyByType = await arrRetrieverRegistry.getHistoryByType({ pageSize: 10 });
|
||||||
return historyByType.sonarr || [];
|
return historyByType.sonarr || [];
|
||||||
}),
|
}) : timed('Sonarr History', async () => []),
|
||||||
timed('Radarr Queue', async () => {
|
shouldPollRadarr ? timed('Radarr Queue', async () => {
|
||||||
const queuesByType = await arrRetrieverRegistry.getQueuesByType();
|
const queuesByType = await arrRetrieverRegistry.getQueuesByType();
|
||||||
return queuesByType.radarr || [];
|
return queuesByType.radarr || [];
|
||||||
}),
|
}) : timed('Radarr Queue', async () => []),
|
||||||
timed('Radarr History', async () => {
|
shouldPollRadarr ? timed('Radarr History', async () => {
|
||||||
const historyByType = await arrRetrieverRegistry.getHistoryByType({ pageSize: 10 });
|
const historyByType = await arrRetrieverRegistry.getHistoryByType({ pageSize: 10 });
|
||||||
return historyByType.radarr || [];
|
return historyByType.radarr || [];
|
||||||
}),
|
}) : timed('Radarr History', async () => []),
|
||||||
timed('Radarr Tags', async () => {
|
shouldPollRadarr ? timed('Radarr Tags', async () => {
|
||||||
const tagsByType = await arrRetrieverRegistry.getTagsByType();
|
const tagsByType = await arrRetrieverRegistry.getTagsByType();
|
||||||
return tagsByType.radarr || [];
|
return tagsByType.radarr || [];
|
||||||
}),
|
}) : timed('Radarr Tags', async () => []),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
const [
|
const [
|
||||||
@@ -163,43 +220,63 @@ async function pollAllServices() {
|
|||||||
cache.set('poll:qbittorrent', qbittorrentLegacy, cacheTTL);
|
cache.set('poll:qbittorrent', qbittorrentLegacy, cacheTTL);
|
||||||
|
|
||||||
// Sonarr
|
// Sonarr
|
||||||
cache.set('poll:sonarr-tags', sonarrTagsResults, cacheTTL);
|
if (shouldPollSonarr) {
|
||||||
// Tag queue/history records with _instanceUrl so embedded series/movie objects can build links
|
cache.set('poll:sonarr-tags', sonarrTagsResults, cacheTTL);
|
||||||
cache.set('poll:sonarr-queue', {
|
// Tag queue/history records with _instanceUrl so embedded series/movie objects can build links
|
||||||
records: sonarrQueues.flatMap(q => {
|
cache.set('poll:sonarr-queue', {
|
||||||
const inst = sonarrInstances.find(i => i.id === q.instance);
|
records: sonarrQueues.flatMap(q => {
|
||||||
const url = inst ? inst.url : null;
|
const inst = sonarrInstances.find(i => i.id === q.instance);
|
||||||
const key = inst ? inst.apiKey : null;
|
const url = inst ? inst.url : null;
|
||||||
return (q.data.records || []).map(r => {
|
const key = inst ? inst.apiKey : null;
|
||||||
if (r.series) r.series._instanceUrl = url;
|
return (q.data.records || []).map(r => {
|
||||||
r._instanceUrl = url;
|
if (r.series) r.series._instanceUrl = url;
|
||||||
r._instanceKey = key;
|
r._instanceUrl = url;
|
||||||
return r;
|
r._instanceKey = key;
|
||||||
});
|
return r;
|
||||||
})
|
});
|
||||||
}, cacheTTL);
|
})
|
||||||
cache.set('poll:sonarr-history', {
|
}, cacheTTL);
|
||||||
records: sonarrHistories.flatMap(h => h.data.records || [])
|
cache.set('poll:sonarr-history', {
|
||||||
}, cacheTTL);
|
records: sonarrHistories.flatMap(h => h.data.records || [])
|
||||||
|
}, cacheTTL);
|
||||||
|
} else {
|
||||||
|
// Extend TTL of existing cached data when polling is skipped
|
||||||
|
const existingSonarrTags = cache.get('poll:sonarr-tags');
|
||||||
|
const existingSonarrQueue = cache.get('poll:sonarr-queue');
|
||||||
|
const existingSonarrHistory = cache.get('poll:sonarr-history');
|
||||||
|
if (existingSonarrTags) cache.set('poll:sonarr-tags', existingSonarrTags, cacheTTL);
|
||||||
|
if (existingSonarrQueue) cache.set('poll:sonarr-queue', existingSonarrQueue, cacheTTL);
|
||||||
|
if (existingSonarrHistory) cache.set('poll:sonarr-history', existingSonarrHistory, cacheTTL);
|
||||||
|
}
|
||||||
|
|
||||||
// Radarr
|
// Radarr
|
||||||
cache.set('poll:radarr-queue', {
|
if (shouldPollRadarr) {
|
||||||
records: radarrQueues.flatMap(q => {
|
cache.set('poll:radarr-queue', {
|
||||||
const inst = radarrInstances.find(i => i.id === q.instance);
|
records: radarrQueues.flatMap(q => {
|
||||||
const url = inst ? inst.url : null;
|
const inst = radarrInstances.find(i => i.id === q.instance);
|
||||||
const key = inst ? inst.apiKey : null;
|
const url = inst ? inst.url : null;
|
||||||
return (q.data.records || []).map(r => {
|
const key = inst ? inst.apiKey : null;
|
||||||
if (r.movie) r.movie._instanceUrl = url;
|
return (q.data.records || []).map(r => {
|
||||||
r._instanceUrl = url;
|
if (r.movie) r.movie._instanceUrl = url;
|
||||||
r._instanceKey = key;
|
r._instanceUrl = url;
|
||||||
return r;
|
r._instanceKey = key;
|
||||||
});
|
return r;
|
||||||
})
|
});
|
||||||
}, cacheTTL);
|
})
|
||||||
cache.set('poll:radarr-history', {
|
}, cacheTTL);
|
||||||
records: radarrHistories.flatMap(h => h.data.records || [])
|
cache.set('poll:radarr-history', {
|
||||||
}, cacheTTL);
|
records: radarrHistories.flatMap(h => h.data.records || [])
|
||||||
cache.set('poll:radarr-tags', radarrTagsResults.flatMap(t => t.data || []), cacheTTL);
|
}, cacheTTL);
|
||||||
|
cache.set('poll:radarr-tags', radarrTagsResults.flatMap(t => t.data || []), cacheTTL);
|
||||||
|
} else {
|
||||||
|
// Extend TTL of existing cached data when polling is skipped
|
||||||
|
const existingRadarrQueue = cache.get('poll:radarr-queue');
|
||||||
|
const existingRadarrHistory = cache.get('poll:radarr-history');
|
||||||
|
const existingRadarrTags = cache.get('poll:radarr-tags');
|
||||||
|
if (existingRadarrQueue) cache.set('poll:radarr-queue', existingRadarrQueue, cacheTTL);
|
||||||
|
if (existingRadarrHistory) cache.set('poll:radarr-history', existingRadarrHistory, cacheTTL);
|
||||||
|
if (existingRadarrTags) cache.set('poll:radarr-tags', existingRadarrTags, cacheTTL);
|
||||||
|
}
|
||||||
|
|
||||||
// qBittorrent (already set above in download clients section)
|
// qBittorrent (already set above in download clients section)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user