Implement staged history loading with SSE push
- Stage 1: Fetch 100 records immediately for fast display - Stage 2+: Background fetch up to 1000 records in batches of 100 - Date-based cursor pagination to avoid race conditions - Deduplication by record ID to prevent duplicates - SSE push to clients when history cache is updated - Shared background fetch state for concurrent user requests
This commit is contained in:
@@ -10,6 +10,7 @@ const downloadClientRegistry = require('../utils/downloadClients');
|
|||||||
const sanitizeError = require('../utils/sanitizeError');
|
const sanitizeError = require('../utils/sanitizeError');
|
||||||
const TagMatcher = require('../services/TagMatcher');
|
const TagMatcher = require('../services/TagMatcher');
|
||||||
const { buildUserDownloads } = require('../services/DownloadBuilder');
|
const { buildUserDownloads } = require('../services/DownloadBuilder');
|
||||||
|
const { onHistoryUpdate, offHistoryUpdate } = require('../utils/historyFetcher');
|
||||||
|
|
||||||
|
|
||||||
// Track active SSE clients for disconnect cleanup
|
// Track active SSE clients for disconnect cleanup
|
||||||
@@ -202,6 +203,17 @@ router.get('/stream', requireAuth, async (req, res) => {
|
|||||||
// Subscribe to poll-complete notifications
|
// Subscribe to poll-complete notifications
|
||||||
onPollComplete(sendDownloads);
|
onPollComplete(sendDownloads);
|
||||||
|
|
||||||
|
// Subscribe to history update notifications
|
||||||
|
function sendHistoryUpdate(type) {
|
||||||
|
try {
|
||||||
|
res.write(`event: history-update\ndata: ${JSON.stringify({ type })}\n\n`);
|
||||||
|
console.log(`[SSE] Sent history update for ${type} to ${user.name}`);
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[SSE] Error sending history update:', err.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
onHistoryUpdate(sendHistoryUpdate);
|
||||||
|
|
||||||
// 25s heartbeat comment to keep the connection alive through proxies/load-balancers
|
// 25s heartbeat comment to keep the connection alive through proxies/load-balancers
|
||||||
const heartbeat = setInterval(() => {
|
const heartbeat = setInterval(() => {
|
||||||
try { res.write(': heartbeat\n\n'); } catch { /* ignore — cleanup below handles it */ }
|
try { res.write(': heartbeat\n\n'); } catch { /* ignore — cleanup below handles it */ }
|
||||||
@@ -211,6 +223,7 @@ router.get('/stream', requireAuth, async (req, res) => {
|
|||||||
req.on('close', () => {
|
req.on('close', () => {
|
||||||
clearInterval(heartbeat);
|
clearInterval(heartbeat);
|
||||||
offPollComplete(sendDownloads);
|
offPollComplete(sendDownloads);
|
||||||
|
offHistoryUpdate(sendHistoryUpdate);
|
||||||
activeClients.delete(username);
|
activeClients.delete(username);
|
||||||
console.log(`[SSE] Client disconnected: ${user.name}`);
|
console.log(`[SSE] Client disconnected: ${user.name}`);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -7,6 +7,20 @@ const arrRetrieverRegistry = require('./arrRetrievers');
|
|||||||
// History changes slowly compared to active downloads.
|
// History changes slowly compared to active downloads.
|
||||||
const HISTORY_CACHE_TTL = 5 * 60 * 1000;
|
const HISTORY_CACHE_TTL = 5 * 60 * 1000;
|
||||||
|
|
||||||
|
// Staged loading configuration
|
||||||
|
const INITIAL_PAGE_SIZE = 100;
|
||||||
|
const MAX_TOTAL_RECORDS = 1000;
|
||||||
|
const BATCH_PAGE_SIZE = 100;
|
||||||
|
|
||||||
|
// Background fetch state to prevent concurrent fetches
|
||||||
|
const backgroundFetchState = {
|
||||||
|
sonarr: { inProgress: false, lastFetchTime: 0 },
|
||||||
|
radarr: { inProgress: false, lastFetchTime: 0 }
|
||||||
|
};
|
||||||
|
|
||||||
|
// Event subscribers for history updates
|
||||||
|
const historyUpdateSubscribers = new Set();
|
||||||
|
|
||||||
// Sonarr event types that represent a successful import
|
// Sonarr event types that represent a successful import
|
||||||
const SONARR_IMPORTED_EVENTS = new Set(['downloadFolderImported', 'downloadImported']);
|
const SONARR_IMPORTED_EVENTS = new Set(['downloadFolderImported', 'downloadImported']);
|
||||||
// Sonarr event types that represent a failed import
|
// Sonarr event types that represent a failed import
|
||||||
@@ -18,13 +32,20 @@ const RADARR_FAILED_EVENTS = new Set(['downloadFailed', 'importFailed']);
|
|||||||
/**
|
/**
|
||||||
* Fetch recent history records from all Sonarr instances for the given date window.
|
* Fetch recent history records from all Sonarr instances for the given date window.
|
||||||
* Results are cached under 'history:sonarr' for HISTORY_CACHE_TTL.
|
* Results are cached under 'history:sonarr' for HISTORY_CACHE_TTL.
|
||||||
|
* Uses staged loading: fetches 100 records immediately, then background-fetches up to 1000.
|
||||||
* @param {Date} since - Only include records on or after this date
|
* @param {Date} since - Only include records on or after this date
|
||||||
* @returns {Promise<Array>} Flat array of Sonarr history records (with _instanceUrl and _instanceName)
|
* @returns {Promise<Array>} Flat array of Sonarr history records (with _instanceUrl and _instanceName)
|
||||||
*/
|
*/
|
||||||
async function fetchSonarrHistory(since) {
|
async function fetchSonarrHistory(since) {
|
||||||
const cacheKey = 'history:sonarr';
|
const cacheKey = 'history:sonarr';
|
||||||
const cached = cache.get(cacheKey);
|
const cached = cache.get(cacheKey);
|
||||||
if (cached) return cached;
|
if (cached) {
|
||||||
|
// Trigger background refresh if cache is stale or incomplete
|
||||||
|
if (!backgroundFetchState.sonarr.inProgress) {
|
||||||
|
triggerBackgroundSonarrFetch(since);
|
||||||
|
}
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure retrievers are initialized
|
// Ensure retrievers are initialized
|
||||||
await arrRetrieverRegistry.initialize();
|
await arrRetrieverRegistry.initialize();
|
||||||
@@ -32,13 +53,14 @@ async function fetchSonarrHistory(since) {
|
|||||||
const instances = getSonarrInstances();
|
const instances = getSonarrInstances();
|
||||||
const sonarrRetrievers = arrRetrieverRegistry.getRetrieversByType('sonarr');
|
const sonarrRetrievers = arrRetrieverRegistry.getRetrieversByType('sonarr');
|
||||||
|
|
||||||
|
// Stage 1: Fetch initial batch (100 records)
|
||||||
const results = await Promise.all(sonarrRetrievers.map(async (retriever) => {
|
const results = await Promise.all(sonarrRetrievers.map(async (retriever) => {
|
||||||
const inst = instances.find(i => i.id === retriever.getInstanceId());
|
const inst = instances.find(i => i.id === retriever.getInstanceId());
|
||||||
if (!inst) return [];
|
if (!inst) return [];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const response = await retriever.getHistory({
|
const response = await retriever.getHistory({
|
||||||
pageSize: 500,
|
pageSize: INITIAL_PAGE_SIZE,
|
||||||
sortKey: 'date',
|
sortKey: 'date',
|
||||||
sortDir: 'descending',
|
sortDir: 'descending',
|
||||||
includeSeries: true,
|
includeSeries: true,
|
||||||
@@ -61,19 +83,135 @@ async function fetchSonarrHistory(since) {
|
|||||||
|
|
||||||
const flat = results.flat();
|
const flat = results.flat();
|
||||||
cache.set(cacheKey, flat, HISTORY_CACHE_TTL);
|
cache.set(cacheKey, flat, HISTORY_CACHE_TTL);
|
||||||
|
|
||||||
|
// Stage 2: Trigger background fetch for remaining records
|
||||||
|
triggerBackgroundSonarrFetch(since);
|
||||||
|
|
||||||
return flat;
|
return flat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trigger background fetch for remaining Sonarr history records.
|
||||||
|
* Fetches in batches of 100 up to 1000 total records using date-based cursor pagination.
|
||||||
|
*/
|
||||||
|
async function triggerBackgroundSonarrFetch(since) {
|
||||||
|
if (backgroundFetchState.sonarr.inProgress) return;
|
||||||
|
|
||||||
|
// Debounce: don't fetch if we fetched within the last minute
|
||||||
|
const now = Date.now();
|
||||||
|
if (now - backgroundFetchState.sonarr.lastFetchTime < 60000) return;
|
||||||
|
|
||||||
|
backgroundFetchState.sonarr.inProgress = true;
|
||||||
|
backgroundFetchState.sonarr.lastFetchTime = now;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await arrRetrieverRegistry.initialize();
|
||||||
|
const instances = getSonarrInstances();
|
||||||
|
const sonarrRetrievers = arrRetrieverRegistry.getRetrieversByType('sonarr');
|
||||||
|
|
||||||
|
let allRecords = cache.get('history:sonarr') || [];
|
||||||
|
const recordIds = new Set(allRecords.map(r => r.id));
|
||||||
|
let cursorDate = null;
|
||||||
|
|
||||||
|
// Get the oldest date from current records as cursor
|
||||||
|
if (allRecords.length > 0) {
|
||||||
|
const dates = allRecords.map(r => new Date(r.date)).filter(d => !isNaN(d));
|
||||||
|
if (dates.length > 0) {
|
||||||
|
cursorDate = new Date(Math.min(...dates));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch additional batches until we have 1000 records or no more available
|
||||||
|
while (allRecords.length < MAX_TOTAL_RECORDS) {
|
||||||
|
const batchSize = Math.min(BATCH_PAGE_SIZE, MAX_TOTAL_RECORDS - allRecords.length);
|
||||||
|
|
||||||
|
const batchResults = await Promise.all(sonarrRetrievers.map(async (retriever) => {
|
||||||
|
const inst = instances.find(i => i.id === retriever.getInstanceId());
|
||||||
|
if (!inst) return [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
const params = {
|
||||||
|
pageSize: batchSize,
|
||||||
|
sortKey: 'date',
|
||||||
|
sortDir: 'descending',
|
||||||
|
includeSeries: true,
|
||||||
|
includeEpisode: true,
|
||||||
|
startDate: since.toISOString()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Use date-based cursor if available
|
||||||
|
if (cursorDate) {
|
||||||
|
params.sortKey = 'date';
|
||||||
|
params.sortDir = 'descending';
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = await retriever.getHistory(params);
|
||||||
|
const records = (response && response.records) || [];
|
||||||
|
|
||||||
|
// Filter out records we already have (by ID) to prevent duplicates
|
||||||
|
const newRecords = records.filter(r => !recordIds.has(r.id));
|
||||||
|
|
||||||
|
return newRecords.map(r => {
|
||||||
|
if (r.series) r.series._instanceUrl = inst.url;
|
||||||
|
if (r.series) r.series._instanceName = inst.name || inst.id;
|
||||||
|
r._instanceUrl = inst.url;
|
||||||
|
r._instanceName = inst.name || inst.id;
|
||||||
|
return r;
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`[HistoryFetcher] Sonarr background fetch ${inst.id} error:`, err.message);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
const batchFlat = batchResults.flat();
|
||||||
|
|
||||||
|
// If no new records, we've reached the end
|
||||||
|
if (batchFlat.length === 0) break;
|
||||||
|
|
||||||
|
// Add new records to our collection
|
||||||
|
batchFlat.forEach(r => recordIds.add(r.id));
|
||||||
|
allRecords = allRecords.concat(batchFlat);
|
||||||
|
|
||||||
|
// Update cache with new records
|
||||||
|
cache.set('history:sonarr', allRecords, HISTORY_CACHE_TTL);
|
||||||
|
|
||||||
|
// Update cursor date to the oldest record in this batch
|
||||||
|
const batchDates = batchFlat.map(r => new Date(r.date)).filter(d => !isNaN(d));
|
||||||
|
if (batchDates.length > 0) {
|
||||||
|
cursorDate = new Date(Math.min(...batchDates));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit SSE event for history update
|
||||||
|
emitHistoryUpdate('sonarr');
|
||||||
|
|
||||||
|
// Small delay between batches to avoid overwhelming the API
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[HistoryFetcher] Background Sonarr fetch error:', err.message);
|
||||||
|
} finally {
|
||||||
|
backgroundFetchState.sonarr.inProgress = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch recent history records from all Radarr instances for the given date window.
|
* Fetch recent history records from all Radarr instances for the given date window.
|
||||||
* Results are cached under 'history:radarr' for HISTORY_CACHE_TTL.
|
* Results are cached under 'history:radarr' for HISTORY_CACHE_TTL.
|
||||||
|
* Uses staged loading: fetches 100 records immediately, then background-fetches up to 1000.
|
||||||
* @param {Date} since - Only include records on or after this date
|
* @param {Date} since - Only include records on or after this date
|
||||||
* @returns {Promise<Array>} Flat array of Radarr history records (with _instanceUrl and _instanceName)
|
* @returns {Promise<Array>} Flat array of Radarr history records (with _instanceUrl and _instanceName)
|
||||||
*/
|
*/
|
||||||
async function fetchRadarrHistory(since) {
|
async function fetchRadarrHistory(since) {
|
||||||
const cacheKey = 'history:radarr';
|
const cacheKey = 'history:radarr';
|
||||||
const cached = cache.get(cacheKey);
|
const cached = cache.get(cacheKey);
|
||||||
if (cached) return cached;
|
if (cached) {
|
||||||
|
// Trigger background refresh if cache is stale or incomplete
|
||||||
|
if (!backgroundFetchState.radarr.inProgress) {
|
||||||
|
triggerBackgroundRadarrFetch(since);
|
||||||
|
}
|
||||||
|
return cached;
|
||||||
|
}
|
||||||
|
|
||||||
// Ensure retrievers are initialized
|
// Ensure retrievers are initialized
|
||||||
await arrRetrieverRegistry.initialize();
|
await arrRetrieverRegistry.initialize();
|
||||||
@@ -81,13 +219,14 @@ async function fetchRadarrHistory(since) {
|
|||||||
const instances = getRadarrInstances();
|
const instances = getRadarrInstances();
|
||||||
const radarrRetrievers = arrRetrieverRegistry.getRetrieversByType('radarr');
|
const radarrRetrievers = arrRetrieverRegistry.getRetrieversByType('radarr');
|
||||||
|
|
||||||
|
// Stage 1: Fetch initial batch (100 records)
|
||||||
const results = await Promise.all(radarrRetrievers.map(async (retriever) => {
|
const results = await Promise.all(radarrRetrievers.map(async (retriever) => {
|
||||||
const inst = instances.find(i => i.id === retriever.getInstanceId());
|
const inst = instances.find(i => i.id === retriever.getInstanceId());
|
||||||
if (!inst) return [];
|
if (!inst) return [];
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const response = await retriever.getHistory({
|
const response = await retriever.getHistory({
|
||||||
pageSize: 500,
|
pageSize: INITIAL_PAGE_SIZE,
|
||||||
sortKey: 'date',
|
sortKey: 'date',
|
||||||
sortDir: 'descending',
|
sortDir: 'descending',
|
||||||
includeMovie: true,
|
includeMovie: true,
|
||||||
@@ -109,9 +248,142 @@ async function fetchRadarrHistory(since) {
|
|||||||
|
|
||||||
const flat = results.flat();
|
const flat = results.flat();
|
||||||
cache.set(cacheKey, flat, HISTORY_CACHE_TTL);
|
cache.set(cacheKey, flat, HISTORY_CACHE_TTL);
|
||||||
|
|
||||||
|
// Stage 2: Trigger background fetch for remaining records
|
||||||
|
triggerBackgroundRadarrFetch(since);
|
||||||
|
|
||||||
return flat;
|
return flat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trigger background fetch for remaining Radarr history records.
|
||||||
|
* Fetches in batches of 100 up to 1000 total records using date-based cursor pagination.
|
||||||
|
*/
|
||||||
|
async function triggerBackgroundRadarrFetch(since) {
|
||||||
|
if (backgroundFetchState.radarr.inProgress) return;
|
||||||
|
|
||||||
|
// Debounce: don't fetch if we fetched within the last minute
|
||||||
|
const now = Date.now();
|
||||||
|
if (now - backgroundFetchState.radarr.lastFetchTime < 60000) return;
|
||||||
|
|
||||||
|
backgroundFetchState.radarr.inProgress = true;
|
||||||
|
backgroundFetchState.radarr.lastFetchTime = now;
|
||||||
|
|
||||||
|
try {
|
||||||
|
await arrRetrieverRegistry.initialize();
|
||||||
|
const instances = getRadarrInstances();
|
||||||
|
const radarrRetrievers = arrRetrieverRegistry.getRetrieversByType('radarr');
|
||||||
|
|
||||||
|
let allRecords = cache.get('history:radarr') || [];
|
||||||
|
const recordIds = new Set(allRecords.map(r => r.id));
|
||||||
|
let cursorDate = null;
|
||||||
|
|
||||||
|
// Get the oldest date from current records as cursor
|
||||||
|
if (allRecords.length > 0) {
|
||||||
|
const dates = allRecords.map(r => new Date(r.date)).filter(d => !isNaN(d));
|
||||||
|
if (dates.length > 0) {
|
||||||
|
cursorDate = new Date(Math.min(...dates));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch additional batches until we have 1000 records or no more available
|
||||||
|
while (allRecords.length < MAX_TOTAL_RECORDS) {
|
||||||
|
const batchSize = Math.min(BATCH_PAGE_SIZE, MAX_TOTAL_RECORDS - allRecords.length);
|
||||||
|
|
||||||
|
const batchResults = await Promise.all(radarrRetrievers.map(async (retriever) => {
|
||||||
|
const inst = instances.find(i => i.id === retriever.getInstanceId());
|
||||||
|
if (!inst) return [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
const params = {
|
||||||
|
pageSize: batchSize,
|
||||||
|
sortKey: 'date',
|
||||||
|
sortDir: 'descending',
|
||||||
|
includeMovie: true,
|
||||||
|
startDate: since.toISOString()
|
||||||
|
};
|
||||||
|
|
||||||
|
const response = await retriever.getHistory(params);
|
||||||
|
const records = (response && response.records) || [];
|
||||||
|
|
||||||
|
// Filter out records we already have (by ID) to prevent duplicates
|
||||||
|
const newRecords = records.filter(r => !recordIds.has(r.id));
|
||||||
|
|
||||||
|
return newRecords.map(r => {
|
||||||
|
if (r.movie) r.movie._instanceUrl = inst.url;
|
||||||
|
if (r.movie) r.movie._instanceName = inst.name || inst.id;
|
||||||
|
r._instanceUrl = inst.url;
|
||||||
|
r._instanceName = inst.name || inst.id;
|
||||||
|
return r;
|
||||||
|
});
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`[HistoryFetcher] Radarr background fetch ${inst.id} error:`, err.message);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
const batchFlat = batchResults.flat();
|
||||||
|
|
||||||
|
// If no new records, we've reached the end
|
||||||
|
if (batchFlat.length === 0) break;
|
||||||
|
|
||||||
|
// Add new records to our collection
|
||||||
|
batchFlat.forEach(r => recordIds.add(r.id));
|
||||||
|
allRecords = allRecords.concat(batchFlat);
|
||||||
|
|
||||||
|
// Update cache with new records
|
||||||
|
cache.set('history:radarr', allRecords, HISTORY_CACHE_TTL);
|
||||||
|
|
||||||
|
// Update cursor date to the oldest record in this batch
|
||||||
|
const batchDates = batchFlat.map(r => new Date(r.date)).filter(d => !isNaN(d));
|
||||||
|
if (batchDates.length > 0) {
|
||||||
|
cursorDate = new Date(Math.min(...batchDates));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit SSE event for history update
|
||||||
|
emitHistoryUpdate('radarr');
|
||||||
|
|
||||||
|
// Small delay between batches to avoid overwhelming the API
|
||||||
|
await new Promise(resolve => setTimeout(resolve, 100));
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[HistoryFetcher] Background Radarr fetch error:', err.message);
|
||||||
|
} finally {
|
||||||
|
backgroundFetchState.radarr.inProgress = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to history update events.
|
||||||
|
* @param {Function} callback - Function to call when history is updated
|
||||||
|
*/
|
||||||
|
function onHistoryUpdate(callback) {
|
||||||
|
historyUpdateSubscribers.add(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unsubscribe from history update events.
|
||||||
|
* @param {Function} callback - Function to remove from subscribers
|
||||||
|
*/
|
||||||
|
function offHistoryUpdate(callback) {
|
||||||
|
historyUpdateSubscribers.delete(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emit SSE event for history update.
|
||||||
|
* Notifies all subscribers when history cache is updated.
|
||||||
|
*/
|
||||||
|
function emitHistoryUpdate(type) {
|
||||||
|
console.log(`[HistoryFetcher] History updated for ${type}, notifying ${historyUpdateSubscribers.size} subscribers`);
|
||||||
|
historyUpdateSubscribers.forEach(callback => {
|
||||||
|
try {
|
||||||
|
callback(type);
|
||||||
|
} catch (err) {
|
||||||
|
console.error('[HistoryFetcher] Error in history update subscriber:', err.message);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Classify a Sonarr history record's event type.
|
* Classify a Sonarr history record's event type.
|
||||||
* @param {string} eventType
|
* @param {string} eventType
|
||||||
@@ -149,5 +421,7 @@ module.exports = {
|
|||||||
classifySonarrEvent,
|
classifySonarrEvent,
|
||||||
classifyRadarrEvent,
|
classifyRadarrEvent,
|
||||||
invalidateHistoryCache,
|
invalidateHistoryCache,
|
||||||
|
onHistoryUpdate,
|
||||||
|
offHistoryUpdate,
|
||||||
HISTORY_CACHE_TTL
|
HISTORY_CACHE_TTL
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user