RxJS è una libreria per la programmazione reattiva basata su Observable, perfetta per modellare azioni asincrone come l'upload di file. In questo articolo vedremo come usare RxJS per costruire una pipeline di caricamento robusta, estendibile e facile da testare, partendo da esempi semplici fino a scenari con progresso, annullamento e gestione degli errori.

Perché usare RxJS per l'upload di file

L'upload di file è un problema apparentemente semplice, ma che diventa rapidamente complesso quando aggiungiamo:

Gli Observable di RxJS sono perfetti per esprimere questi flussi come composizione di operatori, invece di avere una giungla di callback e listener difficili da mantenere.

Setup di base: catturare l'evento di selezione file

Immaginiamo di avere un input file e un pulsante "Carica":

<input type="file" id="fileInput" multiple>
<button id="uploadBtn">Carica</button>
<div id="status"></div>

Con RxJS possiamo trasformare gli eventi DOM in Observable usando fromEvent:

import { fromEvent } from 'rxjs';
import { map, filter } from 'rxjs/operators';

const fileInput = document.getElementById('fileInput');
const uploadBtn = document.getElementById('uploadBtn');
const statusEl = document.getElementById('status');

// Stream dei file selezionati
const files$ = fromEvent(fileInput, 'change').pipe(
  map(event => /** @type {HTMLInputElement} */ (event.target).files),
  filter(files => !!files && files.length > 0)
);

// Stream dei click sul pulsante upload
const uploadClick$ = fromEvent(uploadBtn, 'click');

Abbiamo ora due flussi distinti:

Creare un Observable per l'upload tramite XMLHttpRequest

Per tracciare il progresso dell'upload abbiamo bisogno degli eventi nativi di XMLHttpRequest, in particolare quelli di xhr.upload. Incapsuliamo tutto in un Observable custom:

import { Observable } from 'rxjs';

/**
 * Esegue l'upload di un singolo file e restituisce un Observable
 * che emette oggetti di stato: { type, progress, response, error }.
 */
function uploadFile$(url, file) {
  return new Observable(subscriber => {
    const xhr = new XMLHttpRequest();
    const formData = new FormData();

    formData.append('file', file);

    // Eventi di progresso
    xhr.upload.addEventListener('progress', event => {
      if (event.lengthComputable) {
        const percent = Math.round((event.loaded / event.total) * 100);
        subscriber.next({
          type: 'progress',
          progress: percent
        });
      }
    });

    // Completamento
    xhr.addEventListener('load', () => {
      if (xhr.status >= 200 && xhr.status < 300) {
        subscriber.next({
          type: 'success',
          response: xhr.response
        });
        subscriber.complete();
      } else {
        subscriber.error({
          type: 'error',
          status: xhr.status,
          message: xhr.statusText || 'Errore di upload'
        });
      }
    });

    // Errori di rete
    xhr.addEventListener('error', () => {
      subscriber.error({
        type: 'error',
        status: xhr.status,
        message: 'Errore di rete'
      });
    });

    // Annullamento
    xhr.addEventListener('abort', () => {
      subscriber.next({ type: 'cancelled' });
      subscriber.complete();
    });

    xhr.open('POST', url);
    xhr.send(formData);

    // Funzione di cleanup chiamata su unsubscribe
    return () => {
      if (xhr.readyState !== XMLHttpRequest.DONE) {
        xhr.abort();
      }
    };
  });
}

Questo Observable è molto potente:

Collegare selezione file, click e upload

Ora vogliamo che al click sul pulsante parta l'upload dei file selezionati. Usiamo gli operatori RxJS per combinare i flussi:

import { withLatestFrom, mergeMap, tap } from 'rxjs/operators';

const uploadUrl = '/api/upload';

// Ogni click prende l'ultima FileList disponibile
const upload$ = uploadClick$.pipe(
  withLatestFrom(files$),
  map(([, files]) => Array.from(files)),       // FileList -> File[]
  tap(() => {
    statusEl.textContent = 'Inizio upload...';
  }),
  // Carica i file in sequenza (concorrenza 1)
  mergeMap(
    files => files,
    (files, file) => file
  ),
  mergeMap(file => {
    statusEl.textContent = `Carico: ${file.name}`;
    return uploadFile$(uploadUrl, file).pipe(
      tap(event => {
        if (event.type === 'progress') {
          statusEl.textContent = `Carico ${file.name}: ${event.progress}%`;
        }
      })
    );
  })
);

upload$.subscribe({
  next: event => {
    if (event.type === 'success') {
      statusEl.textContent = 'Upload completato con successo';
    }
  },
  error: err => {
    statusEl.textContent = `Errore: ${err.message || 'Upload fallito'}`;
  },
  complete: () => {
    statusEl.textContent = 'Tutti i file sono stati caricati';
  }
});

In questo esempio i file vengono caricati in sequenza. Possiamo facilmente cambiare strategia di concorrenza regolando gli operatori.

Caricamento multiplo con concorrenza controllata

Se i file sono molti, conviene caricarne alcuni in parallelo, ma non tutti, per non saturare la banda o il server. mergeMap accetta un parametro di concorrenza:

import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const MAX_CONCURRENCY = 3;

const parallelUpload$ = uploadClick$.pipe(
  withLatestFrom(files$),
  map(([, files]) => Array.from(files)),
  mergeMap(files => from(files)),
  mergeMap(file => uploadFile$(uploadUrl, file), MAX_CONCURRENCY)
);

Con MAX_CONCURRENCY = 3, al massimo tre upload saranno attivi contemporaneamente. È un esempio classico di come RxJS permetta di esprimere policy complesse con poche righe.

Gestione degli errori e retry

RxJS offre operatori avanzati per la gestione degli errori, come retry e retryWhen. Possiamo ad esempio ritentare l'upload di un file alcune volte, con attesa crescente tra un tentativo e l'altro.

import { retryWhen, scan, delay } from 'rxjs/operators';

function uploadFileWithRetry$(url, file) {
  const maxRetries = 3;
  const baseDelay = 1000; // ms

  return uploadFile$(url, file).pipe(
    retryWhen(errors$ =>
      errors$.pipe(
        scan((acc, error) => {
          if (acc.attempts >= maxRetries) {
            throw error;
          }
          return { attempts: acc.attempts + 1 };
        }, { attempts: 0 }),
        delay(acc => acc.attempts * baseDelay)
      )
    )
  );
}

Possiamo sostituire uploadFile$ con uploadFileWithRetry$ nella nostra pipeline per ottenere automaticamente un comportamento più resiliente, senza cambiare la logica di orchestrazione.

Annullare un upload in corso

Poiché il nostro Observable di upload supporta unsubscribe, annullare è semplice: basta conservare la Subscription e chiamare unsubscribe quando l'utente preme un pulsante "Annulla".

<button id="cancelBtn">Annulla upload</button>
const cancelBtn = document.getElementById('cancelBtn');

let currentUploadSub = null;

const cancellableUpload$ = uploadClick$.pipe(
  withLatestFrom(files$),
  map(([, files]) => Array.from(files)),
  mergeMap(file => uploadFile$(uploadUrl, file))
);

currentUploadSub = cancellableUpload$.subscribe({
  next: event => {
    // gestisci progresso / successo
  },
  error: err => {
    statusEl.textContent = `Errore: ${err.message}`;
  },
  complete: () => {
    statusEl.textContent = 'Upload completato o annullato';
  }
});

fromEvent(cancelBtn, 'click').subscribe(() => {
  if (currentUploadSub) {
    currentUploadSub.unsubscribe();
    statusEl.textContent = 'Upload annullato dall'utente';
  }
});

In un'applicazione reale potremmo modellare meglio il ciclo di vita della subscription (ad esempio creando un nuovo stream per ogni click), ma il concetto chiave è che l'Observable incapsula anche la logica di abort dell'XHR.

Combinare stato dell'interfaccia e upload

Una delle forze di RxJS è la possibilità di combinare facilmente upload, input dell'utente e stato dell'interfaccia. Ad esempio possiamo disabilitare il pulsante di upload quando non ci sono file selezionati, o quando è già in corso un caricamento.

import { combineLatest, BehaviorSubject } from 'rxjs';
import { map, startWith } from 'rxjs/operators';

const hasFiles$ = files$.pipe(
  map(files => files.length > 0),
  startWith(false)
);

const isUploading$ = new BehaviorSubject(false);

combineLatest([hasFiles$, isUploading$]).pipe(
  map(([hasFiles, isUploading]) => hasFiles && !isUploading)
).subscribe(canUpload => {
  uploadBtn.disabled = !canUpload;
});

// Aggiorna lo stato durante l'upload
uploadClick$.pipe(
  withLatestFrom(files$),
  map(([, files]) => Array.from(files)),
  mergeMap(file => {
    isUploading$.next(true);
    return uploadFile$(uploadUrl, file);
  })
).subscribe({
  next: () => {},
  error: () => {
    isUploading$.next(false);
  },
  complete: () => {
    isUploading$.next(false);
  }
});

In questo modo la UI rimane coerente con lo stato del sistema senza dover gestire manualmente mille flag.

Consigli di progettazione

Conclusioni

Usare RxJS per l'upload di file in JavaScript permette di modellare l'intero flusso come una combinazione di Observable e operatori: selezione dei file, click del pulsante, avanzamento, errori, ritentativi, annullamento, stato dell'interfaccia. Il risultato è un codice più dichiarativo, modulare e manutenibile.

A partire dagli esempi visti in questo articolo puoi adattare la pipeline alle tue esigenze: supporto al drag&drop, caricamento chunked, autenticazione, integrazione con framework come React o Angular, e molto altro, mantenendo sempre la stessa filosofia reattiva.