Solucionado (ver solução)
Solucionado
(ver solução)
1
resposta

Como executar Tasks em paralelo sem afetar toda capacidade de processamento?

Preciso criar um processo que receberá várias notificações e então ele registrará essas notificações no banco de dados.

Em alguns momentos ele pode receber uma quantidade grande de notificações.

Por se tratar de um processo que roda em segundo plano (será um Windows Service) e não querer consumir toda a capacidade de processamento, uma quantidade limitada de threads deveriam ser executadas em paralelo.

Então preciso de um gerenciador de tarefas que poderá receber novas Tasks, que execute essas Tasks em paralelo mas sem consumir todo recurso de processamento, que me dê um método para aguardar as Tasks terminarem sua execução e também a possibilidade de encerrar tudo.

Desejando usar o TaskFactory para melhor gerenciar minhas Tasks, pensei em criar algo como:

public class TaskQueue : IDisposable
{
    private readonly ConcurrentQueue<Action> _actions =
        new ConcurrentQueue<Action>();

    private CancellationTokenSource _cancellationTokenSource =
        new CancellationTokenSource();

    private readonly CancellationToken _cancellationToken;

    private bool _running = false;

    public TaskQueue() {
        _cancellationToken = _cancellationTokenSource.Token;
        Task.Factory.StartNew(Run);
    }

    public void Enqueue(Action action)  {
        _actions.Enqueue(action);
    }

    public int Count() => _actions.Count;

    public bool Running() {
        lock (this)  
            return _running;
    }

    private void SetRunning() {
        lock (this)
            _running = true;
    }

    private void SetPaused() {
        lock (this)
            _running = false;
    }

    private void WaitForRunningTasksFinish() {
        while (Running())
            Thread.Sleep(250);
    }

    private void Run() {
        var processors = Environment.ProcessorCount;

        while (!_cancellationToken.IsCancellationRequested) {
            Thread.Sleep(250);
            if (_actions.IsEmpty)
                continue;

            var runningTasks = new List<Task>();
            SetRunning();
            try {
                for (var index = 1; index <= processors; index++) {
                    _cancellationToken.ThrowIfCancellationRequested();
                    if (_actions.TryDequeue(out var action)) {
                        _cancellationToken.ThrowIfCancellationRequested();
                        runningTasks.Add(
                           Task.Factory.StartNew(action, _cancellationToken));
                    }
                }
                Task.WaitAll(runningTasks.ToArray());
            } catch (OperationCanceledException e)  {
                Console.WriteLine($"{e.Message}");
            } finally {
                SetPaused();
            }
        }
    }

    public void Stop() => _cancellationTokenSource.Cancel(false);

    public void Dispose() {
        Stop();
        WaitForRunningTasksFinish();
        _cancellationTokenSource?.Dispose();
        _cancellationTokenSource = null;
    }
}

Seria bom essa ideia ainda implementar a adição de outra Task assim que uma qualquer terminasse, pensei em fazê-lo com Task.WhenAny.

Mas essa lógica, para vocês apresenta algum problema?Ou, o que poderia ser melhorado?

Conhecem algo já pronto, testado, de preferência nativo que faça esse trabalho?

Grato!

1 resposta
solução!

Encontrei um exemplo da Microsoft que me atendeu bem.