Introduction aux Reactive Extensions : Télécharger un fichier en asynchrone
By Michael DELVA on Monday 12 April 2010, 14:58 - C# - Permalink
TweetJ'ai déjà parlé à plusieurs reprises des reactives extensions, que l'on appelle plus familièrement Rx. Je me suis dit que ce serait une bonne idée que de vous les présenter plus en détail. Je ne vais pas écrire un article qui va tout vous expliquer depuis le début, car il existe déjà beaucoup de ressources. (ici, ici, ou bien ici, ou même encore là) Je me suis donc dit que ça serait peut-être une meilleure idée que de vous expliquer comment ça peut fonctionner grâce à un exemple concret. Et comme j'avais déjà un article expliquant comment réaliser une opération asynchrone en utilisant la TPL, j'ai repris cet exemple, en utilisant cette fois Rx.
Le problème principal (ou l'un des problèmes? ;) ) du code de téléchargement utilisant la TPL est que l'utilisateur de la classe est obligé de souscrire à 3 évènements pour être tenu informé de quand le téléchargement démarre, quand il progresse, et quand il se termine. De plus, savoir si le téléchargement s'est bien terminé, ou s'il y a eu une erreur pendant le processus, nécessite d'analyser l'instance de DownloadFinishedEventArgs retournée par l'évènement DownloadCompleted.
Rx va permettre de simplifier la gestion des évènements de la classe. En effet, nous allons purement et simplement supprimer les évènements.
Pour ce faire, nous allons faire retourner par la fonction DownloadFileASync une instance de type IObservable<DownloadProgress>. Et nous pourrons gérer les 3 évènements grâce à la fonction Subscribe de cette interface:
- Le premier évènement, qui est lancé quand le téléchargement débute, va devenir inutile. En effet, contrairement à l'exemple avec la TPL, le téléchargement ne démarrera pas automatiquement dans la fonction DownloadFileASync. C'est l'appelant qui va décider de démarrer le téléchargement, lorsqu'il s'abonnera via la fonction Subscribe à l'observable que retournera la fonction DownloadFileASync.
- Pour être au courant de la progression du téléchargement, ce sera fait dans le canal OnNext de la fonction Subscribe. Exit donc le deuxième évènement.
- Et pour remplacer le troisième évènement, nous n'avons qu'à implémenter le code dans les canaux OnError et OnCompleted de la fonction Subscribe. Nous avons une amélioration ici puisque nous n'avons pas à vérifier la valeur des membres de DownloadFinishedEventArgs pour savoir si le traitement a été réalisé ou pas.
Pour l'appelant, nous aurons donc un code qui devrait prendre la forme suivante:
var observable = fileDownloader.DownloadFileASync(urlOfFile, downloadFolder, resume);
observable.Subscribe(progression => { //Manage progression },
error => { //Manage error },
() => { //Manage completion });
Ce qui, je pense que vous en conviendrez, est nettement plus visible et concis que de s'abonner à des évènements. Et l'avantage ici est que vous avez des interfaces qui unifient la gestion des opérations asynchrones. Pas besoin donc de chercher dans la documentation de la classe pour savoir quels évènements appeler en cas d'erreur ou de progression du traitement effectué.
Voyons maintenant comment écrire la fonction DownloadFileASync (que nous amputerons du suffixe ASync, puisque le fait de retourner un IObservable est une indication claire et suffisante que l'opération est asynchrone).
Si l'on reprend le code de l'exemple précédent, on a tout d'abord les vérifications de base sur la validité des arguments donnés. On peut déjà remplacer les tests sur la nullité des arguments par des appels à l'API de Code Contracts. Reste le test sur l'existance du dossier où télécharger le fichier, qui à mon sens n'est pas un contrat. Il ne doit donc pas terminer l'application comme le ferait Contract.Requires (via un appel à Debug.FailFast), mais doit envoyer une exception, que l'on peut gérer à un niveau supérieur, pour permettre à l'utilisateur de choisir un autre dossier de destination.
Vient ici une règle que je trouve de base lorsqu'on a une fonction qui retourne un IObservable: toutes les exceptions doivent pouvoir être traitées par le canal OnError, lorsqu'un observer souscrit à l'observable. Pour ce faire, il faut donc que la fonctionne retourne un observable, qui va lui-même envoyer une exception:
public class WebFileDownloader
{
IObservable<DownloadProgress> DownloadFile(string remoteFilePath,
string downloadFolder,
bool allowResume = false)
{
if (!Directory.Exists(downloadFolder))
return Observable.Throw<DownloadProgress>(new DirectoryNotFoundException());
//...
}
}
Observable.Throw est l'une des nombreuses méthodes d'extension qui existent dans l'espace de noms des reactive extensions. Cette méthode d'extension ne va envoyer aucune valeur aux observers, mais va leur transmettre l'exception que vous lui passez en argument, afin de stopper l'observable. Et vous pourrez donc récupérer cette exception dans le OnError de l'observer.
On crée ensuite le WebRequest, et on regarde si on peut activer la complétion du téléchargement:
var wreq = (HttpWebRequest)WebRequest.Create(uri);
var fi = new FileInfo(tmpLocalFile);
if (fi.Exists)
{
if (allowResume)
wreq.AddRange((int)fi.Length);
else
File.Delete(tmpLocalFile);
}
else
allowResume = false;
Maintenant que la base est là, entrons dans le vif du sujet.
Première étape: contacter le serveur grâce aux fonctions BeginGetResponse et EndGetResponse. Comme lors de l'article sur la TPL, il existe au sein des Reactive Extensions une fonction helper pour gérer les fonctions asynchrones qui nécessitent un callback, comme justement BeginGetResponse et EndGetResponse. Cette fonction helper retournant un Func<IObservable<TType>>, avec TType qui correspond au type de retour de la fonction Endxxx. Pour simplifier l'utilisation de l'appel au serveur, on va mettre cette fonction dans une méthode d'extension:
public static class WebRequestExtensions
{
public static IObservable<WebResponse> GetResponseAsync(this WebRequest webRequest)
{
return Observable.FromAsyncPattern<WebResponse>(webRequest.BeginGetResponse,
webRequest.EndGetResponse)();
}
}
Une fois que nous avons la réponse du serveur, nous devons appeler la fonction GetResponseStream depuis l'objet WebResponse, puis copier le contenu de ce Stream dans un FileStream sur le disque dur. Nous devons être capable de suivre la progression du téléchargement, et de l'interrompre à tout moment. (Bien entendu, la gestion des erreurs et de la fin du téléchargement sont pris en charge automatiquement par le framework Rx) Pour suivre la progression, nous allons tout simplement faire retourner à la fonction DownloadFile que nous allons écrire un IObservable<DownloadProgress>, qui est ainsi définie:
public class DownloadProgress
{
private readonly long downloadedBytes, totalBytes;
private readonly bool starting;
public static readonly DownloadProgress Empty = new DownloadProgress(0,0);
public long DownloadedBytes { get { return downloadedBytes; } }
public long TotalBytes { get { return totalBytes; } }
public double Percentage
{
get
{
return (totalBytes == 0)
? (starting)
? 0
: 100
: 100 * downloadedBytes / (double)totalBytes;
}
}
public DownloadProgress(long downloadedBytes, long totalBytes)
{
this.downloadedBytes = downloadedBytes;
this.totalBytes = totalBytes;
}
public override string ToString()
{
return Percentage.ToString("P", CultureInfo.CurrentCulture);
}
}
Rien de compliqué...
Voici maintenant le tronc de la fonction DownloadFile, qui sera également une méthode d'extension pour HttpWebRequest:
public static IObservable<DownloadProgress> DownloadFile(this HttpWebRequest request,
string destinationPath,
bool allowResume, IScheduler scheduler)
{
return
from response in request.GetResponseAsync().Cast<HttpWebResponse>()
from progress in Observable.Using(response.GetResponseStream,
responseStream =>
{
// Copy GetResponseStream in a FileStream
})
select progress;
}
Comme vous le voyez, et c'est là une de ses forces, nous manipulons les extensions Rx comme nous utilisons LINQ. Ce que fait ce début de fonction est simple à lire et à comprendre: on récupère la réponse du serveur, on la caste en HttpWebResponse, on utilise le GetResponseStream avec l'extension Observable.Using (qui va comme vous vous en doutez libérer la ressource automatiquement en appelant Dispose) dans une clause SelectMany (le deuxième from).
Voyons maintenant ce que va donner le code de la copie du stream retourné par le serveur, vers un fichier sur le disque. Le code d'abord, les explications ensuite:
public static IObservable<DownloadProgress> DownloadFile(this HttpWebRequest request,
string destinationPath,
bool allowResume,
IScheduler scheduler)
{
return from response in request.GetResponseAsync().Cast<HttpWebResponse>()
from progress in Observable.Using(response.GetResponseStream,
responseStream =>
ObservableWorker.CreateWithDisposable<DownloadProgress>(scheduler, worker =>
{
var cancel = new BooleanDisposable();
var disposables = new CompositeDisposable(cancel, worker);
worker.Schedule(() =>
{
if (cancel.IsDisposed)
return;
var file = new FileInfo(destinationPath);
allowResume = allowResume
&& response.StatusCode == HttpStatusCode.PartialContent
&& file.Exists;
long totalRead = 0;
long length = response.ContentLength;
Action notifyProgress = () =>
{
if (length > 0)
worker.OnNext(new DownloadProgress(totalRead, length));
};
using (var destinationStream = new FileStream(destinationPath,
allowResume
? FileMode.Append
: FileMode.Create,
FileAccess.Write))
{
file.Refresh();
totalRead = file.Length;
length += totalRead;
if (cancel.IsDisposed)
return;
notifyProgress();
var buffer = new byte[1024];
int read;
while ((read = responseStream.Read(buffer, 0, buffer.Length)) > 0)
{
if (cancel.IsDisposed)
return;
totalRead += read;
notifyProgress();
destinationStream.Write(buffer, 0, read);
}
}
if (length <= 0)
{
length = totalRead;
notifyProgress();
}
worker.OnCompleted();
});
return disposables;
}))
select progress;
}
Quand on regarde la définition de Observable.Using:
IObservable<TSource> Using<TSource, TResource>(Func<TResource> resourceSelector, Func<TResource, IObservable<TSource>> resourceUsage)
On voit que le deuxième paramètre est un Func qui prend comme premier argument l'instance du type défini par l'argument resourceSelector, et qui retourne un IObservable<TSource>. Dans notre cas, nous avons DownloadProgress comme TSource. C'est ce que va retourner la fonction:
ObservableWorker.CreateWithDisposable<DownloadProgress>()
Ne cherchez pas ObservableWorker dans les extensions fournies par Rx, cette classe n'existe pas. Je vous en donnerai le code source à la fin de cet article. Sachez juste qu'elle permet de créer un observable et de faciliter la gestion des exceptions quand la séquence est observée. Car lorsque vous observez un Observable, il faut bien différencier les exceptions lancées par l'observer (lorsqu'il applique un traitement aux valeurs qu'il reçoit) et les exceptions lancées par ce qui génère les valeurs (et qui doivent donc être passées au canal OnError). Mais ce sujet sera sûrement l'occasion d'un article ultérieur.
La première chose que nous faisons lorsque nous appelons CreateWithDisposable est de créer ces 2 membres, qui vont nous permettre de gérer l'annulation du téléchargement.
var cancel = new BooleanDisposable();
var disposables = new CompositeDisposable(cancel, worker);
Le membre cancel va nous permettre de savoir lors de la copie du stream distant vers le stream local si l'appelant a annulé l'opération. Si tel est le cas, la propriété IsDisposed de cancel va prendre la value true. Cette propriété va être changée automatiquement par le membre disposables, de type CompositeDisposable, lorsque ce dernier va être "disposé" lors de l'annulation. CompositeDisposable garde en interne une liste d'objets implémentant IDisposable, et appelle leur fonction Dispose quand lui-même est disposé.
Puis lorsque les 2 disposables sont crées, on prépare la copie sur le scheduler passé en paramètre. Cette dernière est très simple: on regarde si le fichier que l'on veut télécharger existe déjà sur le disque, on met en place une éventuelle reprise du téléchargement, et on lance la copie du stream, séquentiellement. Pas de craintes à avoir avec un blocage du thread, puisqu'en choisissant Scheduler.TaskPool comme scheduler par défaut, la copie du stream se fera au sein d'un nouveau thread.
Il est important toutefois de noter:
- comment on envoie les données aux observers: grâce à la fonction worker.OnNext, qui en interne va appeler la fonction OnNext de l'observer que la classe ObservableWorker aura créé en interne, en lui passant une instance de la classe DownloadProgress dont le constructeur a pour arguments le nombre de bytes téléchargés, et le nombre de bytes total à télécharger
- comment on signale la fin du téléchargement: via l'appel à worker.OnCompleted(), qui là aussi en interne va appeler OnCompleted sur son observer.
Si on revient à notre fonction WebFileDownloader.DownloadFile que nous avons commencé à définir au début de cet article, et en utilisant la méthode d'extension que nous venons d'écrire, nous avons donc:
public IObservable<FileDownloadProgress> DownloadFile(string remoteFilePath,
string destinationPath,
bool allowResume = false)
{
var uri = new Uri(remoteFilePath);
if (uri.Segments.Length <= 1)
return Observable.Throw<FileDownloadProgress>(new InvalidUriException(uri));
if (!Directory.Exists(Path.GetDirectoryName(destinationPath)))
return Observable.Throw<FileDownloadProgress>(new DirectoryNotFoundException());
if (string.IsNullOrEmpty(Path.GetFileName(destinationPath)))
return Observable.Throw<FileDownloadProgress>(new ArgumentException("The file name of the destination path must be defined"));
if (string.IsNullOrEmpty(Path.GetExtension(destinationPath)))
return Observable.Throw<FileDownloadProgress>(new ArgumentException("The extension of the destination path must be defined"));
var wreq = (HttpWebRequest)WebRequest.Create(uri);
var fi = new FileInfo(destinationPath);
if (fi.Exists)
{
if (allowResume)
wreq.AddRange((int)fi.Length);
else
File.Delete(tmpLocalFile);
}
else
allowResume = false;
return wreq.DownloadFile(destinationPath, allowResume);
}
Le tour est joué, et le "cahier des charges" est respecté: on est informé de la progression du téléchargement, de sa fin, des éventuelles erreurs qui peuvent apparaître, et on a la possibilité de l'annuler en cours de route.
On peut ajouter une nouvelle fonctionnalité à cette fonction, qui est de télécharger le fichier avec un nom temporaire, et de renommer le fichier avec son nom définitif en cas de succès de téléchargement. Pour cela, rien de plus simple. Il suffit d'utiliser la méthode d'extension Do, définie comme suit:
IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
IObservable<TSource> Do<TSource>(this IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
Cette méthode d'extension permet d'exécuter une action à chaque fois que l'observable envoie une donnée, génère une erreur, ou se termine, et retourne le même observable que celui à qui il est appliqué. C'est une fonction qui peut se révéler très utile pour ajouter du log dans les évènements de l'observable sans avoir à le modifier.
Si on reprend notre exemple, on peut tracer la progression du téléchargement, et renommer le fichier temporaire grâce à cette fonction. Ce qui nous donne:
string tmpLocalFile = destinationPath + ".tmp";
//...
string pathMovedFile = tmpLocalFile.Substring(0, tmpLocalFile.Length - 4);
return from downloadProgress in wreq.DownloadFile(tmpLocalFile, allowResume)
.Do(progress => Logger.Debug("Progression {0} %", progress.Percentage),
error => { },
() =>
{
Logger.Verbose("Download completed !!!");
if (File.Exists(pathMovedFile))
File.Delete(pathMovedFile);
Logger.Verbose("Move {0} to {1}", tmpLocalFile, pathMovedFile);
File.Move(tmpLocalFile, pathMovedFile);
})
select downloadProgress;
Voilà pour cet article. J'ai longuement hésité à le publier, parce qu'il ne me semble pas être une introduction très facile à cette fabuleuse API qu'est Rx. Mais comme il existe déjà pas mal d'articles sur le net qui peuvent faire office de présentation, je me suis finalement décidé. Et je me suis dit que c'était une idée pas trop mauvaise que de re-coder l'exemple du téléchargement de fichier, car ça permet de comparer entre la version TPL et celle-ci, qui je dois bien l'avouer a gagné mes faveurs, tant elle est simple à comprendre.
En tout cas, attendez vous à ce que je poste quelques articles supplémentaires sur Rx dans les prochains jours, qui porteront sur l'utilisation de différentes méthodes d'extension, afin de répondre à des problèmes concrets. Il y en a 5 en attente pour le moment, mais j'attendais d'avoir écrit cet article "introductif" avant de les publier.
En espérant que cela vous a plu!
++
Comments
Je confirme, comme première découverte de cette API, j'ai gagné un mal de tête :]
Je confesse que ce n'est sûrement pas la meilleure introduction qui soit à Rx, mais comme je le disais, vu qu'il existe déjà quelques articles de présentation à droite à gauche, j'ai voulu partir sur quelque chose de plus concret.
Mais j'espère que les nouveaux articles seront plus digestes :)