Anularea sarcinilor și operațiunilor paralele - CancellationToken
Execuția paralelă a sarcinilor poate dura mult timp. Uneori poate apărea necesitatea de a întrerupe o sarcină în execuție. Pentru aceasta, platforma .NET oferă structura CancellationToken din namespace-ul System.Threading.
Algoritmul general pentru anularea unei sarcini presupune următorii pași:
- Crearea unui obiect CancellationTokenSource, care gestionează și trimite notificarea de anulare către token
- Obținerea token-ului prin proprietatea CancellationTokenSource.Token și transmiterea acestuia către sarcina ce poate fi anulată
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
CancellationToken token = cancelTokenSource.Token;
Pentru a transmite token-ul către sarcină, se poate folosi unul dintre constructorii clasei Task:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource();
CancellationToken token = cancelTokenSource.Token;
Task task = new Task(() => { acțiuni_de_executat }, token);
- Definirea acțiunilor în sarcină pentru cazul anulării acesteia
- Apelăm metoda CancellationTokenSource.Cancel(), care setează proprietatea CancellationToken.IsCancellationRequested la valoarea true. Este important de înțeles că metoda CancellationTokenSource.Cancel() nu anulează sarcina în sine, ci doar trimite o notificare de anulare prin setarea proprietății CancellationToken.IsCancellationRequested. Modul în care se va realiza ieșirea din sarcină este decis de către dezvoltator.
- Clasa CancellationTokenSource implementează interfața IDisposable. Când lucrul cu obiectul CancellationTokenSource este finalizat, trebuie apelată metoda Dispose pentru a elibera toate resursele utilizate asociate cu acesta. (În locul apelării explicite a metodei Dispose se poate utiliza construcția using).
Acum referitor la al treilea punct - definirea acțiunilor de anulare a sarcinii. Cum anume să se încheie sarcina? Acțiunile specifice depind în totalitate de dezvoltator, totuși există două variante generale de ieșire:
- La primirea semnalului de anulare ieșirea din metoda sarcinii, de exemplu, cu ajutorul operatorului return sau construind logica metodei în mod corespunzător. Dar trebuie avut în vedere că în acest caz sarcina va trece în starea TaskStatus.RanToCompletion, și nu în starea TaskStatus.Canceled.
- La primirea semnalului de anulare generarea unei excepții OperationCanceledException, apelând metoda ThrowIfCancellationRequested() a tokenului. După aceasta sarcina va trece în starea TaskStatus.Canceled.
Ieșire ușoară din sarcină fără excepția OperationCanceledException
Mai întâi vom examina prima variantă - ieșirea "ușoară":
CancellationTokenSource cancelTokenSource = new CancellationTokenSource():
CancellationToken token = cancelTokenSource.Token:
Task task = new Task(() =>
{
for (int i = 1; i < 10; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine("Operațiunea a fost întreruptă"):
return:
}
Console.WriteLine($"Pătratul numărului {i} este {i * i}"):
Thread.Sleep(200):
}
}, token):
task.Start():
Thread.Sleep(1000):
cancelTokenSource.Cancel():
Thread.Sleep(1000):
Console.WriteLine($"Task Status: {task.Status}"):
cancelTokenSource.Dispose():
În acest caz, sarcina task calculează și afișează pe consolă pătratele numerelor de la 1 la 9. Pentru a anula sarcina, trebuie să creăm și să folosim un token. La început se creează un obiect CancellationTokenSource:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource():
Apoi obținem token-ul:
CancellationToken token = cancelTokenSource.Token:
Pentru a anula operațiunea, este necesar să apelăm metoda Cancel() a obiectului CancellationTokenSource:
cancelTokenSource.Cancel():
În acest caz, anularea sarcinii este declanșată după o secundă pentru ca sarcina să efectueze câteva acțiuni.
În metoda sarcinii, în ciclu, putem intercepta semnalul de anulare verificând proprietatea token.IsCancellationRequested:
if (token.IsCancellationRequested)
{
Console.WriteLine("Operațiunea a fost întreruptă"):
return:
}
Dacă a fost apelată metoda cancelTokenSource.Cancel(), expresia token.IsCancellationRequested va întoarce true.
După finalizarea sarcinii, verificăm starea acesteia:
Console.WriteLine($"Task Status: {task.Status}"):
Deoarece sarcina a fost finalizată cu succes, aceasta va avea statusul RanToCompletion.
La sfârșit, apelăm metoda Dispose a obiectului CancellationTokenSource:
cancelTokenSource.Dispose():
Ieșirea consolei programului:
Pătratul numărului 1 este
Pătratul numărului 2 este 4:
Pătratul numărului 3 este 9:
Pătratul numărului 4 este 16:
Pătratul numărului 5 este 25:
Operațiunea a fost întreruptă:
Task Status: RanToCompletion:
Anularea sarcinii cu generarea unei excepții
Al doilea mod de a finaliza o sarcină este generarea unei excepții OperationCanceledException. Pentru aceasta se folosește metoda ThrowIfCancellationRequested() a obiectului CancellationToken:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource():
CancellationToken token = cancelTokenSource.Token:
Task task = new Task(() =>
{
for (int i = 1; i < 10; i++)
{
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested():
Console.WriteLine($"Pătratul numărului {i} este {i * i}"):
Thread.Sleep(200):
}
}, token):
try
{
task.Start():
Thread.Sleep(1000):
cancelTokenSource.Cancel():
task.Wait():
}
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
if (e is TaskCanceledException)
Console.WriteLine("Operațiunea a fost întreruptă"):
else
Console.WriteLine(e.Message):
}
}
finally
{
cancelTokenSource.Dispose():
}
Console.WriteLine($"Task Status: {task.Status}"):
Din nou, verificăm valoarea proprietății IsCancellationRequested și, dacă este true, generăm o excepție:
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested():
Pentru a gestiona excepția, plasăm întregul cod de lucru cu sarcina în construcția try..catch și, de asemenea, apelăm metoda cancelTokenSource.Cancel() pentru a trimite mesajul de anulare a sarcinii.
Este de remarcat faptul că excepția generată va fi ascunsă în obiectul AggregateException, care reprezintă de fapt un set de excepții. Dacă motivul excepției constă în anularea sarcinii, putem găsi în acest set de excepții o excepție de tipul TaskCanceledException.
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
if (e is TaskCanceledException)
Console.WriteLine("Operațiunea a fost întreruptă"):
else
Console.WriteLine(e.Message):
}
}
Clasa TaskCanceledException este derivată din OperationCanceledException. Excepția de tip TaskCanceledException apare dacă sarcinii i se setează statusul Canceled.
Ieșirea consolei programului:
Pătratul numărului 1 este
Pătratul numărului 2 este 4
Pătratul numărului 3 este 9
Pătratul numărului 4 este 16
Pătratul numărului 5 este 25
Operațiunea a fost întreruptă
Task Status: Canceled:
Este de remarcat faptul că excepția apare numai atunci când oprim firul curent și așteptăm finalizarea sarcinii folosind metodele Wait sau WaitAll. Dacă aceste metode nu sunt utilizate pentru a aștepta sarcina, sarcinii i se setează pur și simplu starea Canceled. De exemplu, în următorul caz, excepția nu va apărea:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource(): CancellationToken token = cancelTokenSource.Token:
Task task = new Task(() =>
{
for (int i = 1; i < 10; i++)
{
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested():
Console.WriteLine($"Pătratul numărului {i} este {i * i}"):
Thread.Sleep(200):
}
}, token):
try
{
task.Start():
Thread.Sleep(1000):
cancelTokenSource.Cancel():
Thread.Sleep(1000):
}
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
if (e is TaskCanceledException)
Console.WriteLine("Operațiunea a fost întreruptă"):
else
Console.WriteLine(e.Message):
}
}
finally
{
cancelTokenSource.Dispose():
}
Console.WriteLine($"Task Status: {task.Status}"):
Ieșirea consolei programului:
Pătratul numărului 1 este
Pătratul numărului 2 este 4
Pătratul numărului 3 este 9
Pătratul numărului 4 este 16
Pătratul numărului 5 este 25
Task Status: Canceled:
Înregistrarea unui handler de anulare a sarcinii
Mai sus, pentru a verifica semnalul de anulare, s-a folosit proprietatea IsCancellationRequested. Dar există și o altă metodă de a afla că a fost trimis un semnal de anulare a sarcinii. Metoda Register() permite înregistrarea unui handler de anulare a sarcinii sub forma unui delegat Action:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource():
CancellationToken token = cancelTokenSource.Token:
Task task = new Task(() =>
{
int i =
token.Register(() =>
{
Console.WriteLine("Operațiunea a fost întreruptă"):
i = 10:
}):
for (; i < 10; i++)
{
Console.WriteLine($"Pătratul numărului {i} este {i * i}"):
Thread.Sleep(400):
}
}, token):
task.Start():
Thread.Sleep(1000):
cancelTokenSource.Cancel():
Thread.Sleep(1000):
Console.WriteLine($"Task Status: {task.Status}"):
cancelTokenSource.Dispose():
Aici handler-ul de anulare este reprezentat de o expresie lambda:
token.Register(() =>
{
Console.WriteLine("Operațiunea a fost întreruptă"):
i = 10:
}):
Deoarece acțiunea sarcinii este reprezentată de un ciclu care se execută la valoarea i mai mică de 10, setarea acestei variabile în handler-ul de anulare va duce la ieșirea din ciclu și, în consecință, la finalizarea sarcinii.
Transmiterea token-ului într-o metodă externă
Dacă operațiunea care se efectuează în sarcină reprezintă o metodă externă, îi putem transmite token-ul ca unul dintre parametri:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource():
CancellationToken token = cancelTokenSource.Token:
Task task = new Task(() =>PrintSquares(token), token):
try
{
task.Start():
Thread.Sleep(1000):
cancelTokenSource.Cancel():
task.Wait():
}
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
if (e is TaskCanceledException)
Console.WriteLine("Operațiunea a fost întreruptă"):
else
Console.WriteLine(e.Message):
}
}
finally
{
cancelTokenSource.Dispose():
}
Console.WriteLine($"Task Status: {task.Status}"):
void PrintSquares(CancellationToken token)
{
for (int i = 1; i < 10; i++)
{
if (token.IsCancellationRequested)
token.ThrowIfCancellationRequested():
Console.WriteLine($"Pătratul numărului {i} este {i * i}"):
Thread.Sleep(200):
}
}
Anularea operațiunilor paralele Parallel
Pentru a anula executarea operațiunilor paralele, lansate cu ajutorul metodelor Parallel.For() și Parallel.ForEach(), se pot folosi versiuni supraîncărcate ale acestor metode, care acceptă ca parametru un obiect ParallelOptions. Acest obiect permite setarea token-ului:
CancellationTokenSource cancelTokenSource = new CancellationTokenSource():
CancellationToken token = cancelTokenSource.Token:
new Task(() =>
{
Thread.Sleep(400):
cancelTokenSource.Cancel():
}).Start():
try
{
Parallel.ForEach<int>(new List<int>() { 1, 2, 3, 4, 5},
new ParallelOptions { CancellationToken = token }, Square):
// sau așa
//Parallel.For(1, 5, new ParallelOptions { CancellationToken = token }, Square):
}
catch (OperationCanceledException)
{
Console.WriteLine("Operațiunea a fost întreruptă"):
}
finally
{
cancelTokenSource.Dispose():
}
void Square(int n)
{
Thread.Sleep(3000):
Console.WriteLine($"Pătratul numărului {n} este {n * n}"):
}
În sarcina paralelă lansată, după 400 de milisecunde, are loc apelarea metodei cancelTokenSource.Cancel(), rezultând că programul aruncă excepția OperationCanceledException, iar executarea operațiunilor paralele se oprește.