Ну, название объясняется само собой. Мой код выглядит примерно так:

var clients = new ConcurrentBag();
var peers = new ConcurrentDictionary();
clients.Add(...); // This may happen on another thread
clients.Add(...); // This may happen on another thread

var token = cts.Token;
while (!token.IsCancellationRequested)
{
    if (clients.Count == 0)
    {
        // Waiting for more clients
        await TaskEx.Delay(500, token); // Using BCL.Async as I need to support Windows XP (.Net 4)
    }
    else if (clients.Count == 1)
    { 
        // Close the left-alone client
        clients.FirstOrDefault()?.Close(); // Close may remove or modify **_clients**.
    }
    else if (clients.Any(c => c.DataAvailable))
    {
        // There is some data, let see if we have timed out peers from last time
        foreach (var p in peers.Where(peer => !peer.Key.IsCompleted))
        {
            // Close the timed out clients, will also terminate the task
            p.Value.Close(); // Close may remove or modify **_clients**.
        }
        peers.Clear();
        // Checking for possible operations and filling the array
        Parallel.ForEach(clients.Where(c => c.DataAvailable), (item) =>
        {
            Parallel.ForEach(clients.Where(c => !c.Equals(item)), (item2) =>
            {
                peers.TryAdd(item2.Broadcast(item.Data), item2); //Broadcast may add, remove or modify **_clients**.
            });
        });
        // Minimum of 10 secs for operations to run before getting timedout
        await TaskEx.WhenAny(TaskEx.WhenAll(peers.Keys.ToArray()), TaskEx.Delay(10000, token)); // Using BCL.Async as I need to support Windows XP (.Net 4)
        // Even tho some of them may have timed out by now, as we may have no data for the next operation, we will wait for data before deciding to close them
    }
    else
    {
        // Waiting for some data to appear - Recheck as soon as OS allows us
        await TaskEx.Delay(1, token); // Using BCL.Async as I need to support Windows XP (.Net 4)
    }
}

Как вы можете видеть, я использовал ConcurrentBag и ConcurrentDictionary наряду с методами await, Linq, Simple foreach, Parallel.ForEach и BCL.Async.

.

Поскольку я впервые использую Parallel, ConcurrentBag и ConcurrentDictionary, я хочу попросить людей, более знакомых с этими частями Framework, рассказать мне, есть ли проблемы в логике, безопасности потоков и/или новых способах выполнения задач.

Я весь внимание, спасибо заранее

EDIT

Согласно документации MSDN, упомянутой в ответе Ивана, для большей безопасности мне следует удалить метод .Where() и включить его в тело Parallel.ForEach в качестве if.

Но все же мало что известно о методах .FirstOrDefault() и .Any() и их ситуации с потоковой безопасностью. Я знаю, что хорошей практикой является использование lock каждый раз, когда происходит обращение к переменной. Но в данном случае, поскольку я использую ConcurrentBag, я хочу быть уверенным в необходимости копирования переменной перед выполнением этих строк кода:

if (clients.Any(client => client.IsConnected))
{
    clients.FirstOrDefault()?.Close(); // Close may remove or modify **_clients**.

ЭДИТ 2

Разобрав методы .FirstOrDefault() и .Any(), я обнаружил, что оба они используют простой foreach. Поэтому я считаю, что они все-таки должны быть потокобезопасными. Итак, теперь вопрос:

Возможно ли запустить Parallel.ForEach внутри другого Parallel.ForEach с тем же источником, а затем изменить этот источник? Или мне следует изменить логику?

Parallel.ForEach(clients, (item) =>
{
    if (item.DataAvailable)
    {
        Parallel.ForEach(clients, (item2) =>
        {
            if (!item2.Equals(item))
            {
                // Modify **clients**

Ответы (1)

Из Класс ConcurrentBag документации:

Безопасность ниток

Все публичные и защищенные члены ConcurrentBag являются потокобезопасными и могут использоваться одновременно из нескольких потоков. Однако члены, доступ к которым осуществляется через один из интерфейсов, реализуемых ConcurrentBag, включая методы расширения, не гарантируют потокобезопасность и могут нуждаться в синхронизации вызывающей стороны.

.

Обратите внимание на выделенный жирным курсивом абзац. Поскольку LINQ основан на одном из реализованных интерфейсов (IEnumerable), ваше использование не является потокобезопасным.

2022 WebDevInsider