Introduction aux Parallel Extensions : Télécharger un fichier en asynchrone
By Michael DELVA on Thursday 17 December 2009, 00:37 - .NET - Permalink
TweetVoici mon tout premier article sur les tant attendues (du moins par moi) Parallel Extensions, la librairie de threading .NET 4.0 qui va tout révolutionner (ceci est un avis hautement subjectif j'en conviens).
Au menu de cet article, un cas concret: télécharger un fichier sur internet, de manière asynchrone. Avec en plus la possibilité de stopper le téléchargement, de connaître la progression, et de pouvoir reprendre un téléchargement interrompu.
J'avais déjà codé une classe permettant de faire tout ça en .NET 3.5, en me basant sur ce code de la MSDN. Et c'est en lisant le blog de la team PFX que j'ai décidé de me jeter dans le monde du Parallel Programming 4.0.
Si vous ne connaissez pas du tout cette librairie, je vous encourage vivement à aller lire quelques articles à son sujet, afin de savoir de quoi je vais parler ici :)
Avant de commencer, je me dois de vous dire que je n'ai rien inventé du code de base que vous allez trouver dans cet article :) Rendons à César ce qui lui appartient, tout ou presque est issu des exemples que vous pouvez télécharger ici. J'ai uniquement rajouté la possibilité d'annuler le téléchargement, d'être averti de la fin de l'opération, et de connaître sa progression, des broutilles en somme, le but de cet article étant plus une présentation poussée de ce qu'il est possible de faire.
La classe du framework .NET qui permet de télécharger des fichiers sur internet est WebRequest. Cette classe utilise le pattern APM (Asynchronous Pattern Model) pour rendre certaines de ses opérations asynchrones. Qu'est-ce que APM? En voici une description par la PFX team qui résume bien le concept:
The Asynchronous Programming Model (APM) in the .NET Framework has been around since .NET 1.0 and is the most common pattern for asynchrony in the Framework. Even if you’re not familiar with the name, you’re likely familiar with the core of the pattern. For a given synchronous operation Xyz, the asynchronous version manifests as BeginXyz and EndXyz: BeginXyz starts the operation and the EndXyz method joins with it (completes it). There are several mechanisms by which the results can be joined with, such as by polling for completion using the IsCompleted property on the IAsyncResult returned from BeginXyz, blocking until the operation has completed by waiting on the IAsyncResult’s AsyncWaitHandle, simply calling EndXyz and passing it the IAsyncResult (which will block until the operation has completed), or passing to BeginXyz a callback which will be invoked when the operation has completed: that callback should then call EndXyz to retrieve the results.
C'est donc un pattern que l'on retrouve assez souvent dans le framework dès lors qu'un mode asynchrone est proposé par une classe. C'est utilisé comme on l'a dit précédemment par WebRequest, mais également par la classe Stream pour réaliser des lectures / écritures asynchrones, comme on le verra plus tard.
La première question que l'on peut se poser, c'est comment utiliser ce pattern avec la classe Task de la TPL (Task Parallel Library)?
C'est très facile :) Comme le pattern APM est très utilisé, les développeurs de la librairie ont intégré à la classe TaskFactory une fonction (et une multitude d'overloads) encapsulant ce pattern: FromASync, et dont vous avez un article le concernant ici. Comme vous le voyez, il suffit d'appeler Task<T>.Factory.FromAsync en passant comme premier argument la fonction qui va déclencher l'opération asynchrone, comme deuxième argument la fonction qui va la terminer, et en troisième argument l'objet contenant l'état à utiliser dans la fonction BeginXXX.
Donc pour notre code, l'appel initial à BeginGetResponse et EndGetResponse devient, en utilisant une méthode d'extension:
public static Task<WebResponse> GetResponseAsync(this WebRequest webRequest)
{
if (webRequest == null) throw new ArgumentNullException("webRequest");
return Task<WebResponse>.Factory.FromAsync(webRequest.BeginGetResponse, webRequest.EndGetResponse);
}
Grâce à cette seule ligne de code, vous créez donc une tâche asynchrone qui va contacter le serveur web sur l'adresse que vous avez donné en argument à la construction de la requête, attendre toute seule la réponse, et retourner le résultat de cette requête dans le WebResponse. L'enchainement d'appel des fonctions étant géré en interne par la TPL.
C'est déjà beaucoup plus simple comme ça non? Et ça ne fait que commencer! :)
Bien, nous avons donc une tâche qui va s'exécuter, et qui va retourner un objet WebResponse quand elle sera terminée. Il nous faut donc maintenant récupérer, comme dans le code initial, la référence vers le stream distant (ResponseStream), puis lire ce stream séquentiellement, et en écrire le contenu dans un fichier. Et c'est là que, au choix, les choses se corsent, ou qu'on trouve le code de toute beauté. (En ce qui me concerne, j'ai choisi mon camp).
Il va de soit que la lecture du stream et l'écriture du contenu dans un fichier (donc dans un autre stream) doit se faire uniquement lorsque la tâche initiale appelant BeginGetResponse est terminée. Pour enchaîner les tâches, il existe une fonction ContinueWith de la classe Task, qui va créer et déclencher une nouvelle tâche dès que la tâche appelante est terminée.
On a donc pour le moment le code suivant:
public static Task<WebResponse> GetResponseAsync(this WebRequest webRequest)
{
if (webRequest == null) throw new ArgumentNullException("webRequest");
return Task<WebResponse>.Factory.FromAsync(webRequest.BeginGetResponse, webRequest.EndGetResponse);
}
public static Task DownloadDataInFileAsync(this WebRequest webRequest, string destinationPath)
{
return webRequest.GetResponseAsync()
.ContinueWith(response =>
{
Stream distantStream = response.Result.GetResponseStream();
// Copier le contenu du stream distant dans un stream local correspondant à un fichier
});
}
Pour copier le contenu du stream correspondant au fichier distant dans un fichier local, le tout en asynchrone, nous devons d'abord créer le fichier local (en mode asynchrone bien entendu) :
const int BUFFER_SIZE = 0x2000;
Stream fileStream = new FileStream(destinationPath, FileMode.Create, FileAccess.Write, FileShare.None, BUFFER_SIZE, true);
Vient maintenant la copie du stream. La première chose à faire, c'est de s'assoir correctement, parce que là ça devient rock'n roll :)
Puis ensuite de lire cet article, parce que tout ce qui suit y est expliqué. Mais je vais quand même essayer de décortiquer tout ça pour vous.
Pour copier le contenu du stream, nous allons utiliser un itérateur asynchrone. C'est à dire un IEnumerable<Task> qui va être créé et parcouru à la volée de manière asynchrone, tant qu'il y aura des bytes à copier dans le stream d'origine vers le stream de destination.
On commence par deux extensions wrappant les fonctions de lecture et d'écriture asynchrone de la classe Stream. On en a parlé avant, on réutilise la fonction FromAsync pour wrapper le pattern APM. Vous remarquerez qu'on passe en plus des fonctions de début et de fin, les paramètres nécessaires aux fonction BeginRead et BeginWrite.
public static Task<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count)
{
return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, offset, count);
}
public static Task WriteAsync(this Stream stream, byte[] buffer, int offset, int count)
{
return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, count);
}
Voyons maintenant la fonction qui va créer notre IEnumerable<Task>, correspondant à l'itérateur asynchrone:
private static IEnumerable<Task> CopyStreamIterator(Stream input, Stream output)
{
// Create two buffers. One will be used for the current read operation and one for the current
// write operation. We'll continually swap back and forth between them.
byte[][] buffers = new byte[2][] { new byte[BUFFER_SIZE], new byte[BUFFER_SIZE] };
int filledBufferNum = 0;
Task writeTask = null;
// Until there's no more data to be read or cancellation
while (true)
{
// Read from the input asynchronously
var readTask = input.ReadAsync(buffers[filledBufferNum], 0, buffers[filledBufferNum].Length);
// If we have no pending write operations, just yield until the read operation has
// completed. If we have both a pending read and a pending write, yield until both the read
// and the write have completed.
yield return writeTask == null
? readTask
: Task.Factory.ContinueWhenAll(new[]
{
readTask,
writeTask
},
tasks => tasks.PropagateExceptions());
// If no data was read, nothing more to do.
if (readTask.Result <= 0)
break;
// Otherwise, write the written data out to the file
writeTask = output.WriteAsync(buffers[filledBufferNum], 0, readTask.Result);
// Swap buffers
filledBufferNum ^= 1;
}
}
Le principe est simple: on commence par lire un segment du stream d'origine dans un premier buffer, dans une nouvelle tâche créée par l'appel à input.ReadAsync, qu'on ajoute à l'énumération. Puis à chaque passage, tant qu'on arrive à lire dans le fichier d'entrée, on crée une nouvelle tâche d'écriture asynchrone dans le fichier de sortie. Et comme on ne peut pas retourner la tâche d'écriture directement, parce que la lecture et l'écriture se doivent d'être synchronisés, on va ajouter à l'énumération une nouvelle tâche, créée grâce à la fonction ContinueWhenAll. Cette fonction se charge de déclencher une action dans une nouvelle tâche lorsque les tâches données en paramètre sont terminées. Donc à chaque passage on lit un buffer, on écrit le buffer lu lors du passage précédent, on attend que les 2 opérations soient terminées, et on continue ainsi de suite jusqu'à lecture complète du stream d'entrée.
Nous venons de voir comment créer l'énumérateur. Il nous faut encore exécuter les tâches qu'il contient. C'est le rôle de l'extension Iterate de la classe Factory (encore une fois, se référer à ce lien pour les explications d'origine) :
public static Task Iterate(TaskFactory factory, IEnumerable<Task> asyncIterator, object state, TaskCreationOptions creationOptions, TaskScheduler scheduler)
{
// Validate parameters
if (factory == null) throw new ArgumentNullException("factory");
if (asyncIterator == null) throw new ArgumentNullException("asyncIterator");
if (scheduler == null) throw new ArgumentNullException("scheduler");
// Get an enumerator from the enumerable
var enumerator = asyncIterator.GetEnumerator();
if (enumerator == null) throw new InvalidOperationException();
// Create the task to be returned to the caller. And ensure
// that when everything is done, the enumerator is cleaned up.
var trs = new TaskCompletionSource<object>(state, creationOptions);
trs.Task.ContinueWith(_ => enumerator.Dispose(), factory.CancellationToken, TaskContinuationOptions.None, scheduler);
// This will be called every time more work can be done.
Action<Task> recursiveBody = null;
recursiveBody = antecedent =>
{
try
{
// If the previous task completed with any exceptions, bail
if (antecedent != null && antecedent.IsFaulted)
trs.TrySetException(antecedent.Exception.InnerExceptions);
// Else if the user requested cancellation, bail.
else if (factory.CancellationToken.IsCancellationRequested || (antecedent != null && antecedent.IsCanceled))
trs.TrySetCanceled();
// Else if we should continue iterating and there's more to iterate
// over, create a continuation to continue processing. We only
// want to continue processing once the current Task (as yielded
// from the enumerator) is complete.
else if (enumerator.MoveNext())
enumerator.Current.ContinueWith(recursiveBody, factory.CancellationToken, TaskContinuationOptions.None, scheduler).IgnoreExceptions();
// Otherwise, we're done!
else trs.TrySetResult(null);
}
// If MoveNext throws an exception, propagate that to the user
catch (Exception exc) { trs.TrySetException(exc); }
};
// Get things started by launching the first task
factory.StartNew(() => recursiveBody(null), CancellationToken.None, TaskCreationOptions.None, scheduler).IgnoreExceptions();
// Return the representative task to the user
return trs.Task;
}
La clé de voute de ce code ici est la classe TaskCompletionSource (MSDN).
On va ici créer le membre Action<Task> recursiveBody , qui est un delegate prenant une tâche en paramètre. Cette tâche correspondant à la tâche qui a été exécutée avant d'appeler le delegate. Ça semble un peu obscur dit comme ça, mais c'est en fait très simple: on va exécuter cette action tout d'abord une première fois avec l'argument null, comme il n'y a pas de tâche précédente à la première exécution. Puis pour chaque tâche de l'énumération, on récupère la tâche courante via enumerator.Current, on la chaîne avec l'appel de la tâche suivante via le paramètre recursiveBody dans le ContinueWith, qui va automatiquement donner comme paramètre à recursiveBody la tâche courante dans laquelle on se trouve! D'où le récursif :) Et en cas d'erreur d'exécution ou d'annulation d'une tâche, on met à jour le statut du TaskCompletionSource, ce qui aura pour effet d'arrêter l'énumération des tâches en passant le statut de la tâche créée par le TaskCompletionSource à Faulted ou Canceled.
Vous aurez remarqué également que l'on n'oublie pas d'appeler Dispose sur l'énumérateur à la fin de l'énumération grâce à (la tâche contenue dans trs étant bien entendu le parcours de l'énumération):
trs.Task.ContinueWith(_ => enumerator.Dispose(), factory.CancellationToken, TaskContinuationOptions.None, scheduler);
Je n'ai pas mis le code des extensions PropagateExceptions et IgnoreExceptions parce qu'il y a déjà beaucoup de code, mais vous pouvez bien entendu les trouver dans les {samples|http://code.msdn.microsoft.com/ParExtSamples|en].
J'espère que vous êtes toujours là, et surtout que mes explications auront réussi à rendre tout ça un peu plus clair. Mais rassurez-vous, nous arrivons bientôt à la fin de notre petit périple :)
Nous avons donc un helper pour créer notre IEnumerable<Task> de lecture / écriture d'un stream vers un autre, et un helper pour exécuter les tâches de cette énumération. Il ne nous reste plus qu'à utiliser le tout, via des méthodes d'extension, pour rendre le tout le plus générique et utilisable possible:
public static Task CopyStreamToFileAsync(this Stream source, string destinationPath)
{
if (source == null) throw new ArgumentNullException("source");
if (destinationPath == null) throw new ArgumentNullException("destinationPath");
// Open the output file for writing
var destinationStream = FileAsync.OpenWrite(destinationPath);
// Copy the source to the destination stream, then close the output file.
return CopyStreamToStreamAsync(source, destinationStream).ContinueWith(t =>
{
var e = t.Exception;
destinationStream.Close();
if (e != null)
throw e;
}, ct, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Current);
}
public static Task CopyStreamToStreamAsync(this Stream source, Stream destination)
{
if (source == null) throw new ArgumentNullException("source");
if (destination == null) throw new ArgumentNullException("destination");
return Task.Factory.Iterate(CopyStreamIterator(source, destination));
}
public static Task DownloadDataInFileAsync(this WebRequest webRequest, string destinationPath)
{
return webRequest.GetResponseAsync()
.ContinueWith(response =>
{
Stream distantStream = response.Result.GetResponseStream();
distantStream.
});
}
CopyStreamIterator étant la même fonction que celle décrite plus haut.
Et voilà! Pour télécharger un fichier en asynchrone, le code devient maintenant:
var uri = new Uri("http://...");
var wreq = (HttpWebRequest)WebRequest.Create(uri);
wreq.DownloadDataInFileAsync(pathToLocalFile);
Alors bien sûr beaucoup de complexité a été ajoutée derrière ces anodines 3 lignes de code, mais il ne faut pas perdre de vue que nous avons maintenant toute une série de helpers utilisables dans toutes situations inimaginables nécessitant des opérations asynchrones. Rien que pour ça le jeu en vaut la chandelle à mon avis.
Je vous avais parlé au début de cet article que j'avais ajouté la possibilité d'annuler le téléchargement, et d'en connaître la progression et la fin. Comme cet article est déjà bien long comme ça, je vous propose d'en rester là, et de mettre le code avec tout ça dans un nouvel article.
J'espère que cet article vous aura plu et vous aura donné autant envie qu'à moi de vous intéresser à cette nouvelle librairie très prometteuse qui permet de rendre le code multi-threadé bien plus abordable.
Une question pour terminer: vous en pensez quoi de la TPL au final? Technologie intéressante? Vous avez hâte d'y être? Vous préférez utiliser l'ancienne méthode et gérer vos threads à la main? connaître votre avis sur la question.
A bientôt !
Comments
Super !!! Merci beaucoup :)
Merci pour ce tres bon article. J'espere que d'autres suivront.