Rx: Exécuter plusieurs observables et attendre qu'ils soient tous terminés
By Michael DELVA on Tuesday 11 May 2010, 11:00 - C# - Permalink
TweetOn continue avec la petite série d'articles Rx avec cette fois une nouvelle résolution de "problème". On a vu lors de l'article précédent comment ignorer les erreurs lorsque des adresses de téléchargement étaient invalides ou injoignables. Nous allons voir cette fois-ci comment lancer tous les téléchargements simultanément et comment attendre qu'ils soient tous terminés avant de les exécuter les uns après les autres pour les installer.
La fonction AssemblyUpdater.DownloadUpdates que nous avons vu dans l'article précédent retournait un Dictionary<AssemblyUpdateVersion, IObservable<DownloadProgress>>. Je vous avais dit que c'était afin de pouvoir utiliser le DataBinding sur le ViewModel grâce à la clé du dictionnaire, pour lui permettre de mettre à jour la barre de progression propre à chaque mise à jour. Voyons comment tout cela a été mis en place.
Tout commence par la fonction ShellViewModel.Download():
private IObservable<Unit[]> Download()
{
try
{
Dictionary<AssemblyUpdateVersion, IObservable<DownloadProgress>> allDownloads = assemblyUpdater.DownloadUpdates();
List<IObservable<Unit>> allObservableDownloads = new List<IObservable<Unit>>();
allDownloads.ForEach(keyValuePair =>
{
InstalledAssembly installedAssembly = InstalledAssemblies.First(ia => ia.AssemblyName == keyValuePair.Key);
IObservable<Unit> downloadObservable = installedAssembly.DownloadUpdate(keyValuePair.Value);
allObservableDownloads.Add(downloadObservable);
});
return allObservableDownloads.ForkJoin();
}
catch (DirectoryNotFoundException ex)
{
return Observable.Throw<Unit[]>(new Exception(ShellViewModelRes.InvalidDownloadFolder));
}
}
Dans cette fonction, on parcourt le dictionnaire que retourne AssemblyUpdater.DownloadUpdates. Pour chacune des clés, on va récupérer l'instance correspondante dans InstalledAssemblies, qui est de type ObservableCollection<InstalledAssembly> (et qui est donc utilisée par la vue grâce au databinding). On va appeler la fonction DownloadUpdate de cette instance, en lui passant l'IObservable<DownloadProgress> créé auparavant.
Voici la définition de la fonction InstalledAssembly.DownloadUpdate:
public IObservable<Unit> Download(IObservable<DownloadProgress> progression)
{
UpdateStatus = UpdateStatus.Downloading;
StatusMessage = ShellViewModelRes.DownloadingUpdate;
return Observable.Create<Unit>(observer =>
{
progression
.Subscribe(progress =>
{
DownloadProgression = progress;
if (DownloadProgression.TotalBytes == 1
&& DownloadProgression.DownloadedBytes == 1)
{
fileAlreadyDownloaded = true;
}
},
error =>
{
UpdateStatus = UpdateStatus.DownloadError;
StatusMessage = string.Format(CultureInfo.CurrentUICulture, "{0} : {1}", ShellViewModelRes.DownloadError, error.Message);
observer.OnError(error);
},
() =>
{
StatusMessage = fileAlreadyDownloaded ? ShellViewModelRes.FileAlreadyDownloaded : ShellViewModelRes.FileDownloaded;
UpdateStatus = UpdateStatus.DownloadEnded;
observer.OnCompleted();
});
return () => { };
});
}
Cette fonction prend donc un IObservable<DownloadProgress> pour renvoyer un IObservable<Unit>. Unit est un type "dummy" déclaré dans l'espace de nom de Rx, et qui est à considérer comme étant un void. Il est donc très utile pour manipuler des observables pour lesquels vous n'avez pas d'informations à faire passer dans le OnNext, et pour lesquels être au courant de la fin de l'opération est importante. Je vais donc m'en servir ici pour informer l'appelant de InstalledAssembly.DownloadUpdate de quand le téléchargement est fini (avec succès ou pas). Inutile d'informer l'appelant de la progression du téléchargement, puisque seule la classe InstalledAssembly a besoin de cette info pour mettre à jour sa progressbar. C'est ce que nous allons faire dans la définition de la fonction, grâce à l'utilisation de Observable.Create. Nous allons y enregistrer un observer à l'observable progression passé en paramètre (qui va pousser des valeurs de type DownloadProgress, qui vont elles-même être générées par les extensions dont j'ai parlé ici), afin de mettre à jour la progressbar, en changeant la valeur de la propriété DownloadProgression. Dans les canaux OnError et OnCompleted, on va juste se contenter de mettre à jour la vue pour refléter l'état du téléchargement, et en informer les futurs observateurs de l'observable que l'on va retourner (via les lignes observer.OnError(error); et observer.OnCompleted();)
Il est intéressant de noter qu'à cette étape, le téléchargement n'est pas encore démarré, puisqu'on n'a pas encore enregistré d'observateur à l'observable que retournera la fonction. Le code contenu dans Observable.Create sera donc exécuté lorsque nous appelerons Subscribe sur l'IObservable<Unit> retourné par la fonction.
Si on revient au code de la fonction Download() du ViewModel, on voit qu'on va ajouter, pour chaque mise à jour, le retour de la fonction IObservable<Unit> Download(IObservable<DownloadProgress> progression), à allObservableDownloads, qui est de type List<IObservable<Unit>>.
On va lancer tous les téléchargements en parallèle grâce à l'utilisation de ForkJoin(), dont voici la description:
Runs all observable sequences in parallel and combines their last values
Cette fonction va retourner IObservable<Unit>, qui est le type de retour de notre fonction Download, et que l'on va donc... retourner à l'appelant. Tout ce qui l'intéressera ici est de savoir si:
- il y a eu des erreurs, auquel cas il ne va pas tenter d'installer les mises à jour
- toutes les mises à jour ont été téléchargées, et donc on peut les installer
Comme on n'a pas d'informations spéciales à avoir, on utilise là aussi le type Unit comme type de retour. A noter que là non plus le téléchargement n'est pas encore démarré, puisque nous utilisons toujours des variables de type IObservable<>, et que nous n'avons pas encore souscrit d'observateur.
Maintenant que toute la plomberie est prête, il ne nous reste plus qu'à déclencher (pour de bon cette fois) les téléchargements, en souscrivant un observateur à l'observable que va retourner Download():
public void Install()
{
Download()
.ObserveOn(scheduler)
.Subscribe(_ => { },
error =>
{
HasError = true;
// blabla...
},
DoInstall);
}
Rien de bien compliqué ici. On ne fait rien dans le canal OnNext, puisqu'on est attend qu'il y ait une erreur ou que tout soit terminé avant de déclencher la suite des opérations. En cas d'erreur, on l'affiche et on agit en conséquence. En cas de succès, on appelle la fonction DoInstall.
Pour résumer, voici ce que nous avons utilisé de notable pour parvenir à nos fins:
- Observable.Create<Unit> dans InstalledAssembly.Download pour ne générer que les évènements de fin pour chacun des téléchargements, et arrêter dans cette fonction la propagation de DownloadProgress
- Observable.ForkJoin pour lancer tous les téléchargements en même temps et retourner un observable, que l'on va utiliser pour gérer la suite du téléchargement au mieux (en cas de succès, ou d'erreur)
C'est tout pour cet article. J'espère qu'il aura su rester compréhensible, je ne trouve pas toujours évident de switcher entre observable et observer par écrit dans mes explications, donc n'hésitez pas à me le dire.
Je vous donne donc rendez-vous pour le prochain article sur Rx, qui expliquera comment lancer une opération asynchrone grâce aux observables, et qui sera dans la continuité de ce dont j'ai parlé dans cet article et le précédent.