TPL et téléchargement: suite et fin
By Michael DELVA on Thursday 17 December 2009, 18:02 - .NET - Permalink
TweetAprès le précédent article d'introduction à la TPL et l'exemple de code de téléchargement d'un fichier sur internet, voici la suite, et fin, avec l'implémentation de la reprise, de la progression et de la fin (par annulation, ou à cause d'une erreur) du téléchargement.
Le code est quasiment le même que ce que j'ai montré la dernière fois. Pour implémenter ce qui manque, il suffit de passer à la chaine des fonctions appelées quelques paramètres supplémentaires.
Pour commencer, ma classe doit implémenter une interface que j'avais utilisé lors de mon implémentation traditionnelle avec le pattern APM de WebRequest et de Stream, et que j'avais validé via des Unit-Tests :
public interface IFileDownloader
{
event EventHandler<DownloadProgressEventArgs> DownloadProgress;
event EventHandler<DownloadFinishedEventArgs> DownloadCompleted;
event EventHandler<DownloadStartedEventArgs> DownloadStarted;
void DownloadFileASync(string remoteFilePath, string downloadFolder, bool allowResume = false);
void CancelDownloadASync();
}
Vous remarquerez déjà le bool allowResume = false; qui utilise les paramètres optionnels apportés par le .NET 4 (même sur les interfaces)
Je vais commencer par vous présenter les versions modifiées des fonctions que j'ai présentées la dernière fois, à commencer par la méthode d'extension DownloadDataInFileSync:
public static Task<WebResponse> GetResponseAsync(this WebRequest webRequest)
{
if (webRequest == null) throw new ArgumentNullException("webRequest");
return Task<WebResponse>.Factory.FromAsync(webRequest.BeginGetResponse, webRequest.EndGetResponse, webRequest);
}
public static Task DownloadDataInFileAsync(this WebRequest webRequest,
string destinationPath,
CancellationToken ct,
Action<bool, bool, Exception> endAction,
bool resumeDownload = false,
Action<long> totalBytesAction = null,
Action<long> progressAction = null)
{
return webRequest.GetResponseAsync()
.ContinueWith(response =>
{
HttpWebResponse httpWebResponse;
try
{
httpWebResponse = (HttpWebResponse)response.Result;
if (totalBytesAction != null)
totalBytesAction(response.Result.ContentLength);
}
catch (AggregateException aggregateException)
{
Exception firstException = aggregateException.InnerExceptions[0];
endAction(false, firstException is OperationCanceledException, firstException);
return;
}
resumeDownload = resumeDownload && (httpWebResponse.StatusCode == HttpStatusCode.PartialContent && File.Exists(destinationPath));
httpWebResponse.GetResponseStream().CopyStreamToFileAsync(destinationPath, ct, endAction, resumeDownload, progressAction);
}, ct);
}
Vous pouvez noter l'ajout des paramètres supplémentaires par rapport à la fonction d'origine: le CancellationToken qui permettra d'annuler l'opération, endAction qui sera appelé quand le processus sera terminé, resumeDownload, dont le nom ne nécessite pas d'explications supplémentaires, totalBytesAction qui va retourner le nombre total de bytes à télécharger sur le fichier distant, et progressAction, qui va retourner le nombre de bytes lus.
J'en profite pour vous conseiller de lire ce lien pour en apprendre plus sur la manière d'annuler les opérations réalisées par les tâches en .NET 4.
Ici on entoure l'appel à response.Result dans un try / catch, car si vous avez créé le WebRequest avec un fichier introuvable (erreur 404), ou que vous n'avez pas de connexion internet et que vous partez en timeout, bref si une exception est lancée lors de l'appel à BeginGetResponse ou EndGetResponse, c'est là que vous catchez l'exception. En utilisant les tâches, l'exception qui sera lancée sera très souvent une AggregateException, qui va contenir dans sa propriété InnerExceptions la liste de toutes les exceptions lancées pendant l'exécution de la tâche. Si l'opération a été annulée, vous aurez comme première exception dans cette propriété une exception de type OperationCanceledException.
Donc si une exception survient, on la catche, on regarde si c'est parce que l'opération a été annulée, on appelle endAction pour signaler la fin du téléchargement, et on quitte.
Si par contre aucune erreur n'est apparue, on continue le traitement en récupérant le nombre de bytes à télécharger via la propriété ContentLength de la classe WebResponse, à laquelle on accède via la propriété Result de la tâche exécutée précédemment (donc l'appel à BeginGetResponse / EndGetResponse). Et on informe l'appelant du nombre de bytes à télécharger via l'utilisation du paramètre totalBytesAction.
On autorise la reprise du téléchargement si l'appelant l'a lui-même autorisé, mais si le serveur le supporte aussi (httpWebResponse.StatusCode == HttpStatusCode.PartialContent) et si une partie du fichier déjà téléchargé est déjà sur le disque.
Et pour finir on lance le téléchargement / copie du stream distant vers le stream local, en appelant la méthode d'extension CopyStreamToFileAsync sur le stream distant:
public static Task CopyStreamToFileAsync(this Stream source, string destinationPath, CancellationToken ct, Action<bool, bool, Exception> endAction, bool resumeDownload = false, Action<long> progressAction = null)
{
if (source == null) throw new ArgumentNullException("source");
if (destinationPath == null) throw new ArgumentNullException("destinationPath");
// Open the output file for writing
var destinationStream = FileAsync.OpenWrite(destinationPath, resumeDownload);
// Copy the source to the destination stream, then close the output file.
return CopyStreamToStreamAsync(source, destinationStream, ct, progressAction).ContinueWith(t =>
{
var e = t.Exception;
destinationStream.Close();
endAction(e == null, false, e);
if (e != null)
throw e;
}, ct, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
}
On commence ici par ouvrir le fichier en asynchrone. Le flag resumeDownload va servir à changer le mode de création du FileStream (FileMode.Append ou FileMode.Create), puis on lance notre itérateur asynchrone de copie de stream, CopyStreamToStreamAsync, en ayant pris soin de lui ajouter 2 nouveaux paramètres: le CancellationToken qui va permettre l'annulation de la copie, et le progressAction, pour savoir où on en est de la copie. Quand la copie du stream sera terminée, on enchainera avec la fermeture du stream local, la notification à l'appelant de la fin de la tâche, et l'exception qui aura pu avoir eu lieu pendant le traitement.
public static Task CopyStreamToStreamAsync(this Stream source, Stream destination, CancellationToken ct, Action<long> progressAction = null)
{
if (source == null) throw new ArgumentNullException("source");
if (destination == null) throw new ArgumentNullException("destination");
return Task.Factory.Iterate(CopyStreamIterator(source, destination, ct, progressAction));
}
Ici rien de bien compliqué, on a juste ajouté le CancellationToken à l'appel de CopyStreamIterator:
private static IEnumerable<Task> CopyStreamIterator(Stream input, Stream output, CancellationToken ct, Action<long> progressAction = null)
{
// Create two buffers. One will be used for the current read operation and one for the current
// write operation. We'll continually swap back and forth between them.
byte[][] buffers = new byte[2][] { new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
int filledBufferNum = 0;
Task writeTask = null;
int readBytes = 0;
while (!ct.IsCancellationRequested)
{
// Read from the input asynchronously
var readTask = input.ReadAsync(buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
// If we have no pending write operations, just yield until the read operation has
// completed. If we have both a pending read and a pending write, yield until both the read
// and the write have completed.
yield return writeTask == null
? readTask
: Task.Factory.ContinueWhenAll(new[]
{
readTask,
writeTask
},
tasks => tasks.PropagateExceptions());
// If no data was read, nothing more to do.
if (readTask.Result <= 0)
break;
readBytes += readTask.Result;
if (progressAction != null)
progressAction(readBytes);
// Otherwise, write the written data out to the file
writeTask = output.WriteAsync(buffers[filledBufferNum], 0, readTask.Result);
// Swap buffers
filledBufferNum ^= 1;
}
}
Ici, tout simplement, j'ai remplacé le while(true) initial par un test sur la propriété IsCancellationRequested, et j'appelle à chaque boucle progressAction avec le nombre de bytes lus, que j'incrémente à chaque passage.
Et pour utiliser le tout, voici la classe implémentant l'interface initiale:
Maintenant la classe qui implémente cette interface:
public class WebFileDownloader : IFileDownloader
{
private CancellationTokenSource cancellationTokenSource;
public event EventHandler<DownloadProgressEventArgs> DownloadProgress = delegate { };
public event EventHandler<DownloadFinishedEventArgs> DownloadCompleted = delegate { };
public event EventHandler<DownloadStartedEventArgs> DownloadStarted = delegate { };
/// <exception cref="ArgumentNullException">Argument is null.</exception>
/// <exception cref="DirectoryNotFoundException"><c>DirectoryNotFoundException</c>.</exception>
/// <exception cref="FileDownloaderException">The remote file path is invalid</exception>
public void DownloadFileASync(string remoteFilePath, string downloadFolder, bool allowResume = false)
{
if (string.IsNullOrEmpty(remoteFilePath))
throw new ArgumentNullException("remoteFilePath");
if (string.IsNullOrEmpty(downloadFolder))
throw new ArgumentNullException("downloadFolder");
if (!Directory.Exists(downloadFolder))
throw new DirectoryNotFoundException();
try
{
var uri = new Uri(remoteFilePath);
string localFile = Path.Combine(downloadFolder, uri.Segments[uri.Segments.Length - 1]);
string tmpLocalFile = localFile + ".tmp";
// Create the request object.
var wreq = (HttpWebRequest)WebRequest.Create(uri);
var fi = new FileInfo(tmpLocalFile);
if (fi.Exists)
{
if (allowResume)
{
wreq.AddRange((int)fi.Length);
}
else
{
File.Delete(tmpLocalFile);
}
}
else
//No need to resume if it has not been downloaded yet
allowResume = false;
DownloadStarted(this, new DownloadStartedEventArgs(remoteFilePath));
long totalBytes = 0;
cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.Token.Register(() => SetEndDownload(false, true, null, tmpLocalFile, 0));
wreq.DownloadDataInFileAsync(tmpLocalFile,
cancellationTokenSource.Token,
(completed, canceled, exception) => SetEndDownload(completed, canceled, exception, tmpLocalFile, 0),
allowResume,
totalBytesAction => { totalBytes = totalBytesAction; },
readBytes =>
{
DownloadProgress(this, new DownloadProgressEventArgs(remoteFilePath, readBytes, totalBytes, (int) (100*readBytes/totalBytes)));
});
}
catch (SystemException e)
{
throw new FileDownloaderException("Impossible to start the download", e);
}
}
public void CancelDownloadASync()
{
cancellationTokenSource.Cancel();
}
private void SetEndDownload(bool completed, bool canceled, Exception e, string downloadedFilePath, long downloadedBytes)
{
string localFile = downloadedFilePath.Substring(0, downloadedFilePath.Length - 4);
if (completed)
{
if (File.Exists(localFile))
File.Delete(localFile);
File.Move(downloadedFilePath, localFile);
}
DownloadCompleted(this, new DownloadFinishedEventArgs(completed, canceled, e, localFile, downloadedBytes));
}
}
Rien de très compliqué. On appelle les évènements qui vont bien pour renseigner l'appelant de cette classe des différentes phases du téléchargement, et s'il veut annuler, il appelle tout simplement la fonction CancelDownloadASync(), qui va elle même appeler Cancel() du CancellationTokenSource, ce qui aura pour effet à la fin de sortir de la boucle de CopyStreamIterator.
Voilà, rien de révolutionnaire dans ce que j'ai ajouté, mais ça permet de montrer qu'en très peu de code on peut arriver à un contrôle complet de ce qui se passe dès lors que le téléchargement est lancé.
N'hésitez pas à me laisser vos commentaires pour me dire ce que vous en pensez, ou si vous avez des questions ou des idées d'amélioration ;)