План занятия:

· Семафоры

· Мьютекс

· Правила упрощенного параллелизма

· Рекурсивный мьютекс

· Условные переменные

Механизмами синхронизации являются средства операционной системы, которые помогают решать основную задачу синхронизации - обеспечивать координацию потоков, которые работают с совместно используемыми данными. Такие средства являются минимальными блоками для построения многопоточных программ, их называют синхронизационными механизмами.

Синхронизационные механизмы подразделяют на следующие основные категории:

  • универсальные для низкого уровня - можно использовать различными способами (семафоры);
  • простые для низкого уровня - каждый из которых приспособлен к решению только одной задачи (мьютексы и условные переменные);
  • универсальные для высокого уровня, выраженные через простые - к этой группе относится концепция монитора, которая может быть выражена через мьютексы и условные переменные;
  • простые для высокого уровня - приспособленные к решению конкретной синхронизационной задачи (блокировка чтения-записи и барьеры).

Семафоры

Концепцию семафоров предложил в 1965 году Э. Дейкстра - известный голландский специалист в области компьютерных наук. Семафоры являются старейшими синхронизационными примитивами из числа применяемых на практике.

Семафор - это совместно используемый неотъемлемый целочисленный счетчик, для которого задано начальное значение и определены следующие атомарные операции.

  • Уменьшение семафора (down): если значение семафора больше нуля, его уменьшают на единицу, если же значение равно нулю, этот поток переходит в состояние ожидания до тех пор, пока оно не станет больше нуля (говорят, что поток «ожидает семафор» или «заблокирован на семафоре»). Эту операцию называют также ожиданиям - wait;
  • Увеличение семафора (up): значение семафора увеличивается на единицу; когда при этом есть потоки, которые ожидают на семафоре, один из них выходит из ожидания и выполняет свою операцию уменьшения. Если на семафоре ожидают несколько потоков, то вследствие выполнения операции увеличения, его значение остается нулевым, но один из потоков продолжает выполнение (в большинстве реализаций выбор этого потока будет случайным). Эту операцию также называют сигнализацией - post.

Фактически значение семафора определяет количество потоков, которое может пройти через этот семафор без блокировки. Когда для семафора задано нулевое начальное значение, то он будет блокировать все потоки до тех пор, пока какой-то поток его не "откроет", выполнив операцию up. Операции up и down могут быть выполнены любыми потоками, имеющих доступ к семафора.


Мьютекс называют синхронизацией примитив, что не допускает выполнения некоторого фрагмента кода более чем одним потоком. Фактически мьютекс является реализацией блокировки на уровне ОС.

Мьютекс, как и следует из его названия, реализует взаимное исключение. Его основная задача - блокировать все потоки, которые пытаются получить доступ к коду, когда этот код уже выполняет некоторый поток.

Мьютекс может находиться в двух состояниях: свободном и занятом. Начальным состоянием является «свободный». Над мьютекс возможны две атомарные операции.

  • Занять мьютекс: если мьютекс был свободен, он становится занятым, и поток продолжает свое выполнение (входя в критическую секцию); если мьютекс был занят, поток переходит в состояние ожидания (говорят, что поток «ожидает мьютекс», или «заблокирован на мьютекс»), выполнение продолжает другой поток. Поток, который занял мьютекс, называют владельцем мьютекс;
  • Освободить мьютекс: мьютекс становится свободным; если на нем ожидают несколько потоков, из них выбирают один, он начинает выполняться, занимает мьютекс и входит в критическую секцию. В большинстве реализаций выбор потока будет случайным. Освободить мьютекс может только его владелец.

Правила упрощенного параллелизма

Правила упрощенного параллелизма предназначены для упрощения программирования на базе мьютексов. Они основываются на том очевидном факте, что мьютекс защищает не код критической секции, а совместно используемые данные внутри этой секции. Особенности работы упрощенного параллелизма:

  • Каждая переменная, которую совместно использует более одного поток, должна быть защищена отдельным мьютексом;
  • Перед каждой операцией изменения такой переменной, соответствующий мьютекс должен быть занят, а после изменения освобожден;
  • Если надо работать одновременно с несколькими совместно используемыми переменными, необходимо занять все их мьютексы до начала работы и освободить их только после полного окончания работы.

Рекурсивный мьютекс

Рекурсивный мьютекс - особый вид мьютекса. Он позволяет повторное занятие тем же потоком, а также отслеживает, какой поток пытается его занять.

Когда это не тот поток, который уже занимает, мьютекс ведет себя как обычный, и поток переходит в состояние ожидания. Когда же это тот самый поток, внутренний счетчик блокировок этого мьютекс увеличивают на единицу, и поток продолжает свое выполнение. В случае увольнения мьютекс внутренний счетчик уменьшается на единицу, для других потоков рекурсивный мьютекс будет разблокирована только тогда, когда счетчик дойдет до нуля (то есть когда все его блокировки одним потоком будут сняты).

Рекурсивные мьютексы менее эффективны в реализации, не могут быть использованы вместе с условными переменными (этот вопрос рассмотрим в следующем разделе), поэтому обращаться к ним нужно только тогда, когда без этого нельзя обойтись. Например, библиотека функций может использовать такие мьютексы для того, чтобы избежать взаимных блокировок при повторном вызове таких функций одним потоком.

Условные переменные

Условной переменной называют синхронизационные примитивы, позволяющие организовать ожидания выполнения условия внутри критической секции, заданной мьютексом. Условная переменная всегда связана с конкретным мьютексом и данными, защищенными этим мьютексом. Для условной переменной определены следующие операции:

  1. Ожидания (wait). Дополнительным входным параметром эта операция принимает мьютекс, который должен находиться в закрытом состоянии. Вызов ожидания происходит в ситуации, когда не выполняется некоторое условие и нужны потоки для продолжения работы. Вследствие выполнения ожидания поток прекращается (говорят, что он «ожидает условной переменной»), а мьютекс открывается (эти два действия происходят атомарно). Так другие потоки получают возможность войти в критическую секцию и изменить там данные, которые она защищает, возможно, выполнив условие, необходимое потоку. На этом операция ожидания не заканчивается - ее завершит другой поток, вызвав операцию сигнализации после того, как условие будет выполнено.
  2. Сигнализация (signal). Эту операцию поток должен выполнить после того, как войдет в критическую секцию и завершит работу с данными (выполнив условие, которое ожидал поток, вызвавший операцию wait). Эта операция проверяет, нет ли потоков, ожидающих условной переменной, и если такие потоки есть, переводит один из них в состояние готовности. В результате восстановления поток завершает выполнение операции ожидания и блокирует мьютекс (обновления и блокировки тоже происходят атомарно). Если нет ни одного потока, который ожидает условной переменной, операция сигнализирования не делает ничего, и информацию о ее выполнении в системе не сохраняют.
  3. Широковещательная сигнализация (broadcast) отличается от обычной тем, что перевод в состояние готовности и восстановление выполняют для всех потоков, ожидающих этой условной переменной, а не только для одного из них.

Таким образом, выполнение операции ожидания состоит из следующих этапов: открытие мьютекс, ожидания (пока другой поток не выполнит операцию signal или broadcast), закрытие мьютекс.

В операционной системе Windows поддерживаются четыре типа объекта синхронизации.

  • Первый тип представляет собой классический семафор и может быть использован для управления доступом к определенным ресурсам ограниченного количества потоков. Разделяемый ресурс в этом случае может быть использован одним, и только одним потоком, либо некоторым числом потоков из множества претендующих на этот ресурс. Семафоры реализуются как простые счетчики, значения которых увеличиваются, когда поток освобождает семафор, и уменьшаются, когда поток занимает семафор.
  • Второй тип объектов синхронизации называется исключающим семафором (mutex). Исключающие семафоры применяются для разделения ресурсов таким образом, что в любой момент времени их может использовать один и только один поток. Очевидно, что исключающий семафор представляет собой специальный тип обычного семафора.
  • Третий тип объектов синхронизации - это событие (event). События могут служить для блокировки доступа к ресурсу до тех пор, пока другой поток не сигнализирует об его освобождении.
  • Четвертый тип объектов синхронизации представляет собой критическую секцию (critical section). При вхождении потока в критическую секцию никакой другой поток не может начать ее выполнение до того, как работающий с ней поток не выйдет из нее.

Следует отметить, что синхронизация потоков с помощью критических секций может осуществляться только в рамках одного процесса, так как невозможно передать адрес критической секции из одного процесса в другой, так как критическая секция располагается в области глобальных переменных запущенного процесса.

Семафор создается с помощью функции CreateSemaphore. Количество задач, одновременно имеющих доступ к некоторому ресурсу, определяется одним из параметров функции. Если значение этой функции равно 1, то семафор работает как исключающий. При успешном создании семафора возвращается его дескриптор, в противном случае - null. Функция WaitForSingleObject обеспечивает режим ожидания семафора. Один из параметров указывает время его ожидания в миллисекундах. Если значение этого параметра равно INFINITE, то время ожидания неопределено. При успешном завершении функции значение счетчика, связанного с семафором, уменьшается не единицу. Функция ReleaseSemaphore() освобождает семафор, позволяя использовать его другому потоку.

Исключающий семафор mutex создается с помощью функции CreateMutex(), которая возвращает идентификатор созданного объекта или null в случае ошибки. В случае необходимости объект освобождается с помощью универсальной функции CloseHandle(). Зная имя объекта mutex, его можно открыть с помощью функции OpenMutex(). С помощью этой функции несколько потоков могут открыть один и тот же объект, а затем одновременно выполнить его ожидание. После того, как имя объекта стало известно потоку, он может им завладеть, используя функции WaitForSingleObject или WaitForMultipleObjects. Освобождение объекта mutex осуществляется с помощью функции ReleaseMutex().

Событие создается с помощью функции CreateEvent. Она возвращает дескриптор созданного события или null в случае неуспешного завершения. После создании события поток просто ожидает его наступления используя функцию WaitForSingleObject, задавая в качестве первого параметра для нее дескриптор этого события. Тем самым выполнение потока приостанавливается до наступления соответствующего события. После вызова функции SetEvent процесс, ожидающий данного события с помощью функции WaitForSingleObject, продолжит свое выполнение. Событие может быть сброшено с помощью функции ResetEvent.

Привет! Сегодня продолжим рассматривать особенности многопоточного программирования и поговорим о синхронизации потоков.

Что же такое «синхронизация»? Вне области программирования под этим подразумевается некая настройка, позволяющая двум устройствам или программам работать совместно. Например, смартфон и компьютер можно синхронизировать с Google-аккаунтом, личный кабинет на сайте - с аккаунтами в социальных сетях, чтобы логиниться с их помощью. У синхронизации потоков похожий смысл: это настройка взаимодействия потоков между собой. В предыдущих лекциях наши потоки жили и работали обособленно друг от друга. Один что-то считал, второй спал, третий выводил что-то на консоль, но друг с другом они не взаимодействовали. В реальных программах такие ситуации редки. Несколько потоков могут активно работать, например, с одним и тем же набором данных и что-то в нем менять. Это создает проблемы. Представь, что несколько потоков записывают текст в одно и то же место - например, в текстовый файл или консоль. Этот файл или консоль в данном случае становится общим ресурсом. Потоки не знают о существовании друг друга, поэтому просто записывают все, что успеют за то время, которое планировщик потоков им выделит. В недавней лекции курса у нас был пример, к чему это приведет, давай его вспомним: Причина кроется в том, что потоки работали с общим ресурсом, консолью, не согласовывая действия друг с другом. Если планировщик потоков выделил время Потоку-1, тот моментально пишет все в консоль. Что там уже успели или не успели написать другие потоки - неважно. Результат, как видишь, плачевный. Поэтому в многопоточном программировании ввели специальное понятие мьютекс (от англ. «mutex», «mutual exclusion» - «взаимное исключение») . Задача мьютекса - обеспечить такой механизм, чтобы доступ к объекту в определенное время был только у одного потока. Если Поток-1 захватил мьютекс объекта А, остальные потоки не получат к нему доступ, чтобы что-то в нем менять. До тех пор, пока мьютекс объекта А не освободится, остальные потоки будут вынуждены ждать. Пример из жизни: представь, что ты и еще 10 незнакомых людей участвуете в тренинге. Вам нужно поочередно высказывать идеи и что-то обсуждать. Но, поскольку друг друга вы видите впервые, чтобы постоянно не перебивать друг друга и не скатываться в гвалт, вы используете правило c «говорящим мячиком»: говорить может только один человек - тот, у кого в руках мячик. Так дискуссия получается адекватной и плодотворной. Так вот, мьютекс, по сути, и есть такой мячик. Если мьютекс объекта находится в руках одного потока, другие потоки не смогут получить доступ к работе с этим объектом. Не нужно ничего делать, чтобы создать мьютекс: он уже встроен в класс Object , а значит, есть у каждого объекта в Java.

Как работает оператор synchronized

Давай познакомимся с новым ключевым словом - synchronized . Им помечается определенный кусок нашего кода. Если блок кода помечен ключевым словом synchronized , это значит, что блок может выполняться только одним потоком одновременно. Синхронизацию можно реализовать по-разному. Например, создать целый синхронизированный метод: public synchronized void doSomething () { //...логика метода } Или же написать блок кода, где синхронизация осуществляется по какому-то объекту: public class Main { private Object obj = new Object () ; public void doSomething () { synchronized (obj) { } } } Смысл прост. Если один поток зашел внутрь блока кода, который помечен словом synchronized , он моментально захватывает мьютекс объекта, и все другие потоки, которые попытаются зайти в этот же блок или метод вынуждены ждать, пока предыдущий поток не завершит свою работу и не освободит монитор. Кстати! В лекциях курса ты уже видел примеры synchronized , но они выглядели иначе: public void swap () { synchronized (this ) { //...логика метода } } Тема для тебя новая, и путаница с синтаксисом, само собой, первое время будет. Поэтому запомни сразу, чтобы не путаться потом в способах написания. Эти два способа записи означают одно и то же: public void swap () { synchronized (this ) { //...логика метода } } public synchronized void swap () { } } В первом случае создаешь синхронизированный блок кода сразу же при входе в метод. Он синхронизируется по объекту this , то есть по текущему объекту. А во втором примере вешаешь слово synchronized на весь метод. Тут уже нет нужды явно указывать какой-то объект, по которому осуществляется синхронизация. Раз словом помечен целый метод, этот метод автоматически будет синхронизированным для всех объектов класса. Не будем углубляться в рассуждения, какой способ лучше. Пока выбирай то, что больше нравится:) Главное - помни: объявить метод синхронизированным можно только тогда, когда вся логика внутри него выполняется одним потоком одновременно. Например, в этом случае сделать метод doSomething() синхронизированным будет ошибкой: public class Main { private Object obj = new Object () ; public void doSomething () { //...какая-то логика, доступная для всех потоков synchronized (obj) { //логика, которая одновременно доступна только для одного потока } } } Как видишь, кусочек метода содержит логику, для которой синхронизация не обязательна. Код в нем могут выполнять несколько потоков одновременно, а все критически важные места выделены в отдельный блок synchronized . И еще один момент. Давай рассмотрим «под микроскопом» наш пример из лекции с обменом именами: public void swap () { synchronized (this ) { //...логика метода } } Обрати внимание: синхронизация проводится по this . То есть по конкретному объекту MyClass . Представь, что у нас есть 2 потока (Thread-1 и Thread-2) и всего один объект MyClass myClass . В этом случае, если Thread-1 вызовет метод myClass.swap() , мьютекс объекта будет занят, и Thread-2 при попытке вызвать myClass.swap() повиснет в ожидании, когда мьютекс освободится. Если же у нас будет 2 потока и 2 объекта MyClass - myClass1 и myClass2 - на разных объектах наши потоки спокойно смогут одновременно выполнять синхронизированные методы. Первый поток выполняет: myClass1. swap () ; Второй выполняет: myClass2. swap () ; В этом случае ключевое слово synchronized внутри метода swap() не повлияет на работу программы, поскольку синхронизация осуществляется по конкретному объекту. А в последнем случае объектов у нас 2. Поэтому потоки не создают друг другу проблем. Ведь у двух объектов есть 2 разных мьютекса, и их захват не зависит друг от друга .

Особенности синхронизации в статических методах

А что делать, если нужно синхронизировать статический метод ? class MyClass { private static String name1 = "Оля" ; private static String name2 = "Лена" ; public static synchronized void swap () { String s = name1; name1 = name2; name2 = s; } } Непонятно, что будет выполнять роль мьютекса в этом случае. Ведь мы уже определились, что у каждого объекта есть мьютекс. Но проблема в том, что для вызова статического метода MyClass.swap() нам не нужны объекты: метод-то статический! И что дальше? :/ На самом деле, проблемы в этом нет. Создатели Java обо всем позаботились:) Если метод, в котором содержится критически важная «многопоточная» логика, статический, синхронизация будет осуществляться по классу. Для большей ясности, приведенный выше код можно переписать так: class MyClass { private static String name1 = "Оля" ; private static String name2 = "Лена" ; public static void swap () { synchronized (MyClass. class ) { String s = name1; name1 = name2; name2 = s; } } } В принципе, ты мог до этого додуматься самостоятельно: раз объектов нет, значит механизм синхронизации должен быть как-то «зашит» в сами классы. Так оно и есть: по классам тоже можно синхронизироваться.

В предыдущих частях статьи я рассказал об общих принципах и конкретных методах построения многопоточных приложений. Различным потокам практически всегда периодически требуется взаимодействие между собой, при этом неизбежно возникает необходимость в синхронизации. Сегодня мы рассмотрим важнейший, самый мощный и универсальный инструмент синхронизации Windows: объекты синхронизации ядра.

WaitForMultipleObjects и другие функции ожидания

Как вы помните, для синхронизации потоков, обычно требуется временно приостановить выполнение одного из потоков. При этом он должен быть переведён средствами операционной системы в состояние ожидания, в котором он не занимает процессорное время. Мы уже знаем две функции, которые могут это делать: SuspendThread и ResumeThread . Но как я рассказывал в предыдущей части статьи, в силу некоторых особенностей эти функции непригодны для синхронизации.

Сегодня мы рассмотрим другую функцию, которая также переводит в поток в состояние ожидания, но в отличие от SuspendThread/ResumeThread , специально предназначена именно для организации синхронизации. Это WaitForMultipleObjects . Поскольку эта функция очень важна, я несколько отступлю от своего правила не вдаваться в детали API и расскажу о ней подробнее, даже приведу ее прототип:

DWORD WaitForMultipleObjects(

DWORD nCount, // число объектов в массиве lpHandles

CONST HANDLE * lpHandles, // указатель на массив описателей объектов ядра

BOOL bWaitAll, // флаг, означающей надо ли дожидаться всех объектов или достаточно одного

DWORD dwMilliseconds // таймаут

Главный параметр этой функции - это указатель на массив хэндлов объектов ядра. О том, что это за объекты, мы поговорим ниже. Пока нам важно знать то, что любой из таких объектов может находиться в одном из двух состояний: нейтральном или «сигнализирующем» (signaled state). Если флаг bWaitAll равен FALSE, функция вернет управление, как только хотя бы один из объектов подаст сигнал. А если флаг равен TRUE, это произойдет только тогда, когда сразу все объекты начнут сигнализировать (как мы увидим, это важнейшее свойство этой функции). В первом случае по возвращаемому значению можно узнать, какой именно из объектов подал сигнал. Надо вычесть из него константу WAIT_OBJECT_0 , и получится индекс в массиве lpHandles. Если время ожидания превысило указанный в последнем параметре таймаут, функция прекратит ожидание и вернет значение WAIT_TIMEOUT . В качестве таймаута можно указать константу INFINITE , и тогда функция будет ждать «до упора», а можно наоборот 0, и тогда поток вообще не будет приостановлен. В последнем случае функция вернет управление немедленно, но по ее результату можно будет узнать состояние объектов. Последний прием используется очень часто. Как видите, эта функция обладает богатыми возможностями. Имеется еще несколько WaitForXXX функций, но все они представляют собой вариации на тему главной. В частности, WaitForSingleObject представляет собой всего лишь ее упрощенный вариант. Остальные имеют каждая свою дополнительную функциональность, но применяются, в общем-то, реже. Например, они дают возможность реагировать не только на сигналы объектов ядра, но и на поступление новых оконных сообщений в очередь потока. Их описание, так же как и детальные сведения о WaitForMultipleObjects , вы, как обычно, найдете в MSDN.

Теперь о том, что же это за таинственные «объекты ядра». Начнем с того, что в их число входят сами потоки и процессы. Они переходят в сигнализирующее состояние сразу по завершении. Это очень важная возможность, поскольку очень часто бывает необходимо отслеживать момент завершения потока или процесса. Пусть, например, наше серверное приложение с набором рабочих потоков должно завершиться. Управляющий поток при этом должен каким-либо способом проинформировать рабочие потоки о том, что пора заканчивать работу (например, установив глобальный флаг), после чего дождаться, пока все потоки не завершатся, сделав все необходимые для корректного завершения действия: освободив ресурсы, информировав клиентов о завершении работы, закрыв сетевые соединения и т.п.

То, что потоки включают сигнал по окончанию работы, позволяет решить задачу синхронизации с завершением потока предельно легко:

// Пусть для простоты у нас будет всего один рабочий поток. Запускаем его:

HANDLE hWorkerThread = :: CreateThread(...);

// Перед окончанием работы надо каким-либо образом сообщаем рабочему потоку, что пора закачивать.

// Ждем завершения потока:

DWORD dwWaitResult = :: WaitForSingleObject( hWorkerThread, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* обработка ошибки */ }

// "Хэндл" потока можно закрыть:

VERIFY(:: CloseHandle( hWorkerThread );

/* Если CloseHandle завершилась неудачей и вернула FALSE, я не выбрасываю исключение. Во-первых, даже если бы это произошло из-за системной ошибки, это не имело бы прямых последствий для нашей программы, ведь раз мы закрываем хендл, значит никакой работы с ним в дальнейшем не предполагается. Реально же неудача CloseHandle может означать только ошибку в вашей программе. Поэтому вставим здесь макрос VERIFY, чтобы не пропустить её на этапе отладки приложения. */

Аналогично будет выглядеть код, ожидающий завершения процесса.

Если бы такой встроенной в систему возможности не было, рабочий поток должен был бы сам как-то передать информацию о своем завершении главному. Даже если бы он делал бы это в самую последнюю очередь, главный поток не мог бы быть уверенным, что рабочему не осталось выполнить хотя бы пару ассемблерных инструкций. В отдельных ситуациях (например, если код потока находится в DLL, которая должна быть выгружена по его завершении) это может привести к фатальным последствиям.

Хочу напомнить вам, что даже после того, как поток (или процесс) завершился, его описатели все равно остаются в силе до тех пор, пока явно не будут закрыты функцией CloseHandle . (Кстати, не забывайте делать это!) Это сделано как раз для того, чтобы в любой момент можно было бы проверить состояние потока.

Итак, функция WaitForMultipleObjects (и ее аналоги) позволяет синхронизировать выполнение потока с состоянием объектов синхронизации, в частности других потоков и процессов.

Специальные объекты ядра

Перейдем к рассмотрению объектов ядра, которые предназначены специально для синхронизации. Это события, семафоры и мьютексы. Кратко рассмотрим каждый из них:

Событие (event)

Пожалуй, самый простой и фундаментальный синхронизирующий объект. Это всего лишь флаг, которому функциями SetEvent/ResetEvent можно задать состояние: сигнализирующее или нейтральное. Событие - это самый удобный способ передать сигнал ожидающему потоку, что некое событие свершилось (потому оно так и называется), и можно продолжать работу. С помощью события мы легко решим проблему синхронизации при инициализации рабочего потока:

// Пусть для простоты хэндл события будет храниться в глобальной переменной:

HANDLE g_hEventInitComplete = NULL; // никогда не оставляем переменную неинициализированной!

{ // код в главном потоке

// создаем событие

g_hEventInitComplete = :: CreateEvent( NULL,

FALSE, // об этом параметре мы еще поговорим

FALSE, // начальное состояние - нейтральное

if (! g_hEventInitComplete ) { /* не забываем про обработку ошибок */ }

// создаем рабочий поток

DWORD idWorkerThread = 0 ;

HANDLE hWorkerThread = :: CreateThread( NULL, 0 , & WorkerThreadProc, NULL, 0 , & idWorkerThread );

if (! hWorkerThread ) { /* обработка ошибки */ }

// ждем сигнала от рабочего потока

DWORD dwWaitResult = :: WaitForSingleObject( g_hEventInitComplete, INFINITE );

if ( dwWaitResult != WAIT_OBJECT_0 ) { /* ошибка */ }

// вот теперь можно быть уверенным, что рабочий поток завершил инициализацию.

VERIFY(:: CloseHandle( g_hEventInitComplete )); // не забываем закрывать ненужные объекты

g_hEventInitComplete = NULL;

// функция рабочего потока

DWORD WINAPI WorkerThreadProc( LPVOID _parameter )

InitializeWorker(); // инициализация

// сигнализируем, что инициализация завершена

BOOL isOk = :: SetEvent( g_hEventInitComplete );

if (! isOk ) { /* ошибка */ }

Надо заметить, что существуют две заметно отличающиеся разновидности событий. Мы можем выбрать одну из них с помощью второго параметра функции CreateEvent . Если он TRUE, создается событие, состояние которого управляется только вручную, то есть функциями SetEvent/ResetEvent . Если же он FALSE, будет создано событие с автосбросом. Это означает, что как только некий поток, ожидающий данного события, будет освобожден сигналом от этого события, оно автоматически будет сброшено обратно в нейтральное состояние. Наиболее ярко их отличие проявляется в ситуации, когда одного события ожидают сразу несколько потоков. Событие с ручным управлением подобно стартовому пистолету. Как только оно будет установлено в сигнализирующее состояние, будут освобождены сразу все потоки. Событие же с автосбросом похоже на турникет в метро: оно отпустит лишь один поток и вернется в нейтральное состояние.

Мьютекс (mutex)

По сравнению с событием это более специализированный объект. Обычно он используется для решения такой распространенной задачи синхронизации, как доступ к общему для нескольких потоков ресурсу. Во многом он похож на событие с автосбросом. Основное отличие состоит в том, что он имеет специальную привязку к конкретному потоку. Если мьютекс находится в сигнальном состоянии - это означает, что он свободен и не принадлежит ни одному потоку. Как только некий поток дождался этого мьютекса, последний сбрасывается в нейтральное состояние (здесь он как раз похож на событие с автосбросом), а поток становится его хозяином до тех пор, пока явно не освободит мьютекс функцией ReleaseMutex , или же не завершится. Таким образом, чтобы быть уверенным, что с разделяемыми данными одновременно работает только один поток, следует все места, где происходит такая работа, окружить парой: WaitFor - ReleaseMutex :

HANDLE g_hMutex;

// Пусть хэндл мьютекса хранится в глобальной переменной. Его конечно надо создать заранее, до запуска рабочих потоков. Будем считать, что это уже было сделано.

int iWait = :: WaitForSingleObject( g_hMutex, INFINITE );

switch ( iWait ) {

case WAIT_OBJECT_0: // Все нормально

break ;

case WAIT_ABANDONED: /* Какой-то поток завершился, забыв вызвать ReleaseMutex. Скорее всего, это означает ошибку в вашей программе! Поэтому на всякий случай вставим здесь ASSERT, но в окончательно версии (release) будем считать этот код успешным. */

ASSERT( false );

break ;

default :

// Здесь должна быть обработка ошибки.

// Защищенный мьютексом участок кода.

ProcessCommonData();

VERIFY(:: ReleaseMutex( g_hMutex ));

Чем же мьютекс лучше события с автосбросом? В приведенном примере его также можно было бы использовать, только ReleaseMutex надо было бы заменить на SetEvent . Однако может возникнуть следующая сложность. Чаще всего работать с общими данными приходится в нескольких местах. Что будет, если ProcessCommonData в нашем примере вызовет функцию, которая работает с этими же данными и в которой уже есть своя пара WaitFor - ReleaseMutex (на практике это встречается весьма часто)? Если бы мы использовали событие, программа, очевидно, зависла бы, поскольку внутри защищенного блока событие находится в нейтральном состоянии. Мьютекс же устроен более хитро. Для потока-хозяина он всегда остается в сигнализирующем состоянии, несмотря на то, что для всех остальных потоков он при этом находится в нейтральном. Поэтому если поток захватил мьютекс, повторный вызов WaitFor функции не приведет к блокировке. Более того, в мьютекс встроен еще и счетчик, так что ReleaseMutex должна быть вызвана столько же раз, сколько было вызовов WaitFor . Таким образом, мы можем смело защищать каждый участок кода, работающий с общими данными, парой WaitFor - ReleaseMute x, не волнуясь о том, что этот код может быть вызван рекурсивно. Это делает мьютекс очень простым в использовании инструментом.

Семафор (semaphore)

Еще более специфический объект синхронизации. Должен сознаться, что в моей практике еще не было случая, где он пригодился бы. Семафор предназначен для того, чтобы ограничить максимальное число потоков, одновременно работающих с неким ресурсом. По сути, семафор - это событие со счетчиком. Пока этот счетчик больше нуля, семафор находится в сигнализирующем состоянии. Однако каждый вызов WaitFor уменьшает этот счетчик на единицу до тех пор, пока он не станет равным нулю и семафор перейдет в нейтральное состояние. Подобно мьютексу, для семафора есть функция ReleaseSemaphor , увеличивающая счётчик. Однако в отличие от мьютекса семафор не привязан к потоку и повторный вызов WaitFor/ReleaseSemaphor еще раз уменьшит/увеличит счетчик.

Как можно использовать семафор? Например, с его помощью можно искусственно ограничить многопоточность. Как я уже рассказывал, слишком много одновременно активных потоков может заметно ухудшить производительность всей системы из-за частых переключений контекстов. И если уж нам пришлось создать слишком много рабочих потоков, можно ограничить число одновременно активных потоков числом порядка числа процессоров.

Что ещё можно сказать про объекты синхронизации ядра? Очень удобна возможность давать им имена. Соответствующий параметр есть у всех функций, создающих объекты синхронизации: CreateEvent , CreateMutex , CreateSemaphore . Если вы дважды вызовете, к примеру CreateEvent , оба раза указав одно и тоже непустое имя, то второй раз функция вместо того, чтобы создать новый объект, вернет хэндл уже существующего. Это произойдет, даже если второй вызов был сделан из другого процесса. Последнее очень удобно в тех случаях, когда требуется синхронизировать потоки, принадлежащие разным процессам.

Когда объект синхронизации вам больше не требуется, не забывайте вызвать функцию CloseHandle , о которой я уже упоминал, когда рассказывал о потоках. На самом деле, она не обязательно сразу же удалит объект. Дело в том, что хэндлов у объекта может быть создано несколько, тогда он будет удален только когда будет закрыт последний из них.

Хочу напомнить, что лучший способ гарантировать, чтобы CloseHandle или подобная «очищающая» функция была обязательно вызвана, даже в случае нештатной ситуации, - это поместить ее в деструктор. Об этом, кстати, в свое время неплохо и очень подробно рассказывалось в статье Кирилла Плешивцева «Умный деструктор». В приведённых выше примерах я не использовал этот приём исключительно в учебных целях, чтобы работа функций API была более наглядной. В реальном же коде для очистки следует всегда использовать классы-оболочки с умными деструкторами.

Кстати, с функцией ReleaseMutex и подобными постоянно возникает та же проблема, что и с CloseHandle . Она обязана быть вызвана по окончании работы с общими данными, независимо от того, насколько успешно эта работа была завершена (ведь могло быть выброшено исключение). Последствия «забывчивости» здесь более серьёзны. Если не вызванный CloseHandle приведёт лишь у утечке ресурсов (что тоже плохо!), то не освобожденный мьютекс не позволит другим потокам работать с общим ресурсом до самого завершения давшего сбой потока, что скорее всего не позволит приложению нормально функционировать. Чтобы избежать этого, нам опять поможет специально обученный класс с умным деструктором.

Заканчивая обзор объектов синхронизации, хочется упомянуть об объекте, которого нет в Win32 API. Многие мои коллеги высказывают недоумение, почему в Win32 нет специализированного объекта типа «один пишет, многие читают». Этакий «продвинутый мьютекс», следящий за тем, чтобы получить доступ к общим данным на запись мог бы одновременно только один поток, а только на чтение - сразу несколько. Подобный объект можно найти в UNIX"ах. Некоторые библиотеки, например от Borland, предлагают эмулировать его на основе стандартных объектов синхронизации. Впрочем, реальная польза от таких эмуляций весьма сомнительна. Эффективно такой объект может быть реализован только на уровне ядра операционной системы. Но в ядре Windows подобный объект не предусмотрен.

Почему же разработчики ядра Windows NT не позаботились об этом? Чем мы хуже UNIX? На мой взгляд, ответ заключается в том, что реальной потребности в таком объекте для Windows пока просто не возникало. На обычной однопроцессорной машине, где потоки все равно физически не могут работать одновременно, он будет практически эквивалентен мьютексу. На многопроцессорной машине он может дать выигрыш за счёт того, что позволит читающим потокам работать параллельно. Вместе с тем, реально этот выигрыш станет ощутим лишь когда вероятность «столкновения» читающих потоков велика. Несомненно, что к примеру на 1024-процессорной машине подобный объект ядра будет жизненно необходим. Подобные машины существуют, но это - специализированные системы, работающие под специализированными ОС. Зачастую, такие ОС строят на основе UNIX, вероятно оттуда объект типа «один пишет, многие читают» попал и в более общеупотребительные версии этой системы. Но на привычных нам x86-машинах установлен, как правило, всего один и лишь изредка два процессора. И только самые продвинутые модели процессоров типа Intel Xeon поддерживают 4-х и даже более процессорные конфигурации, но такие системы пока остаются экзотикой. Но даже на такой «продвинутой» системе «продвинутый мьютекс» сможет дать заметный выигрыш в производительности лишь в очень специфических ситуациях.

Таким образом, реализация «продвинутого» мьютекса просто не стоит свеч. На «малопроцессорной» машине он может оказаться даже менее эффективным из-за усложнения логики объекта по сравнению со стандартным мьютексом. Учтите, реализация подобного объекта не так проста, как может показаться на первый взгляд. При неудачной реализации, если читающих потоков будет слишком много, пишущему потоку будет просто «не пробиться» к данным. По этим причинам я также не рекомендую вам пытаться эмулировать такой объект. В реальных приложениях на реальных машинах обычный мьютекс или критическая секция (о которой речь пойдет в следующей части статьи) прекрасно справится с задачей синхронизации доступа к общим данным. Хотя, я полагаю, с развитием ОС Windows объект ядра «один пишет многие читают» рано или поздно появится.

Примечание. На самом деле, объект «один пишет - многие читают» в Windows NT всё-таки есть. Просто, когда я писал эту статью, ещё не знал об этом. Этот объект носит название «ресурсы ядра» и не доступен для програм пользовательского режима, вероятно поэтому и не слишком известен. Подобности о нём можно найти в DDK. Спасибо Константину Манурину, что указал мне на это.

Deadlock

А теперь вернемся к функции WaitForMultipleObjects , точнее к ее третьему параметру, bWaitAll. Я обещал рассказать, почему возможность ожидания сразу нескольких объектов так важна.

Почему необходима функция, позволяющая ожидать один из нескольких объектов, понятно. В отсутствие специальной функции это можно было бы сделать, разве что последовательно проверяя состояние объектов в пустом цикле, что, конечно же, недопустимо. А вот необходимость в специальной функции, позволяющей ожидать момента, когда сразу несколько объектов перейдут в сигнальное состояние, не так очевидна. Действительно, представим следующую характерную ситуацию: нашему потоку в определенный момент необходим доступ сразу к двум наборам общих данных, за каждый из которых отвечает свой мьютекс, назовем их А и В. Казалось бы, поток может сначала подождать, пока освободиться мьютекс А, захватить его, затем подождать освобождения мьютекса В... Вроде бы, можно обойтись парой вызовов WaitForSingleObject . Действительно, это будет работать, но только до тех пор, пока все остальные потоки будут захватывать мьютексы в том же порядке: сначала А, потом В. Что произойдет, если некий поток попытается сделать наоборот: захватить сначала В, затем А? Рано или поздно возникнет ситуация, когда один поток захватил мьютекс А, другой В, первый ждет, когда освободится В, второй А. Понятно, что они этого никогда не дождутся и программа зависнет.

Подобная взаимная блокировка (deadlock) является очень характерной ошибкой. Как и все ошибки, связанные с синхронизацией, она проявляется лишь время от времени и способна испортить программисту немало нервов. В то же время взаимоблокировкой чревата практически любая схема, в которую вовлечены несколько объектов синхронизации. Поэтому, этой проблеме следует уделять особое внимание на этапе проектирования такой схемы.

В приведенном простом примере избежать блокировки довольно легко. Надо потребовать, чтобы все потоки захватывали мьютексы в определенном порядке: сначала А, потом В. Однако в сложной программе, где много различным образом связанных друг с другом объектов, добиться этого обычно бывает не так просто. В блокировку могут быть вовлечены не два, а много объектов и потоков. Поэтому, самый надёжный способ избежать взаимной блокировки в ситуации, когда потоку требуется сразу несколько объектов синхронизации - это захватывать их все одним вызовом функции WaitForMultipleObjects с параметром bWaitAll=TRUE. По правде говоря, при этом мы всего лишь перекладываем проблему взаимных блокировок на ядро операционной системы, но главное - это уже будет не наша забота. Впрочем, в сложной программе со множеством объектов, когда не всегда сразу можно сказать, какие именно из них потребуются для выполнения той или иной операции, свести все вызовы WaitFor в одно место и объединить тоже часто оказывается не просто.

Таким образом, есть два пути избежать возникновения deadlock. Надо либо следить, чтобы объекты синхронизации всегда захватывались потоками строго в одном и том же порядке, либо чтобы они захватывались единым вызовом WaitForMultipleObjects . Последний метод более прост и предпочтителен. Однако на практике, с выполнением и того и другого требования постоянно возникают сложности, приходится сочетать оба этих подхода. Проектирование сложных схем синхронизации часто является весьма нетривиальной задачей.

Пример организации синхронизации

В большинстве типичных ситуаций, вроде тех, что я описывал выше, организовать синхронизацию не составляет труда, достаточно события или мьютекса. Но периодически встречаются более сложные случаи, где решение проблемы не так очевидно. Хочу проиллюстрировать это на конкретном примере из своей практики. Как вы увидите, решение оказалось удивительно простым, но прежде чем я нашел его, мне пришлось перепробовать несколько неудачных вариантов.

Итак, задача. Практически во всех современных менеджерах закачек (download managers), или попросту говоря «качалках» есть возможность ограничения трафика, чтобы работающая в фоновом режиме «качалка» не сильно мешала пользователю лазить по Сети. Я разрабатывал похожую программу, и передо мной была поставлена задача реализовать именно такую «фичу». Моя качалка работала по классической схеме многопоточности, когда каждой задачей, в данном случае скачиванием конкретного файла, занимается отдельный поток. Ограничение трафика должно было быть суммарным для всех потоков. То есть, нужно было добиться, чтобы в течение заданного интервала времени все потоки считывали из своих сокетов не более определенного количества байтов. Просто разделить этот лимит поровну между потоками, очевидно, будет неэффективно, поскольку скачивание файлов может идти весьма неравномерно, один будет качаться быстро, другой медленно. Следовательно, нужен общий для всех потоков счетчик, сколько байт считано, и сколько еще можно считать. Здесь-то и не обойтись без синхронизации. Дополнительную сложность задаче придало требование того, чтобы в любой момент любой из рабочих потоков можно было остановить.

Сформулируем задачу более детально. Систему синхронизации я решил заключить в специальный класс. Вот его интерфейс:

class CQuota {

public : // methods

void Set( unsigned int _nQuota );

unsigned int Request( unsigned int _nBytesToRead, HANDLE _hStopEvent );

void Release( unsigned int _nBytesRevert, HANDLE _hStopEvent );

Периодически, скажем раз в секунду, управляющий поток вызывает метод Set, устанавливая квоту на скачивание. Перед тем, как рабочий поток считает данные, полученные из сети, он вызывает метод Request, который проверяет, что текущая квота не равна нулю, и если да, возвращает не превышающее текущую квоту число байтов, которые можно считать. Квота соответственно уменьшается на это число. Если же при вызове Request квота равна нулю, вызывающий поток должен подождать, пока она не появится. Иногда случается, что реально получено меньше байтов, чем было запрошено, в таком случае поток возвращает часть выделенной ему квоты методом Release. И, как я уже сказал, пользователь в любой момент может дать команду прекратить скачивание. В таком случае ожидание надо прервать независимо от наличия квоты. Для этого используется специальное событие: _hStopEvent. Поскольку задачи можно запускать и останавливать независимо друг от друга, для каждого рабочего потока используется свое событие остановки. Его описатель передается методам Request и Release.

В одном из неудачных вариантов я попробовал использовать сочетание мьютекса, синхронизирующего доступ к классу CQuota и события, сигнализирующего наличие квоты. Однако в эту схему никак не вписывается событие остановки. Если поток желает получить квоту, то его состояние ожидания должно управляться сложным логическим выражением: ((мьютекс И событие наличия квоты) ИЛИ событие остановки). Но WaitForMultipleObjects такого не позволяет, можно объединить несколько объектов ядра либо операцией И, либо ИЛИ, но не вперемешку. Попытка разделить ожидание двумя последовательными вызовами WaitForMultipleObjects неизбежно приводит к deadlock. В общем, этот путь оказался тупиковым.

Не буду больше напускать тумана и расскажу решение. Как я уже говорил, мьютекс очень похож на событие с автосбросом. И здесь мы имеем как раз тот редкий случай, когда удобнее использовать именно его, но не одно а сразу два:

class CQuota {

private: // data

unsigned int m_nQuota;

CEvent m_eventHasQuota;

CEvent m_eventNoQuota;

Одновременно может быть установлено только одно из этих событий. Любой поток, произведший манипуляции с квотой должен установить первое событие, если оставшаяся квота не равна нулю, и второе, если квота исчерпана. Поток, желающий получить квоту, должен подождать первого события. Потоку же, увеличивающего квоту, достаточно дождаться любого из этих событий, поскольку если оба они находятся в сброшенном состоянии, это означает, что с квотой в данный момент работает другой поток. Таким образом, два события выполняют сразу две функции: синхронизации доступа к данным и ожидание. Наконец, поскольку поток ожидает одного из двух событий, сюда легко включается и событие, дающее сигнал на остановку.

Приведу для примера реализацию метода Request. Остальные реализуются аналогично. Я слегка упростил код, использовавшийся в реальном проекте:

unsigned int CQuota:: Request( unsigned int _nRequest, HANDLE _hStopEvent )

if (! _nRequest ) return 0 ;

unsigned int nProvide = 0 ;

HANDLE hEvents[ 2 ];

hEvents[ 0 ] = _hStopEvent; // Событие остановки имеет больший приоритет. Ставим его первым.

hEvents[ 1 ] = m_eventHasQuota;

int iWaitResult = :: WaitForMultipleObjects( 2 , hEvents, FALSE, INFINITE );

switch ( iWaitResult ) {

case WAIT_FAILED:

// ОШИБКА

throw new CWin32Exception;

case WAIT_OBJECT_0:

// Событие остановки. Я обрабатывал его с помощью специального исключения, но ничто не мешает реализовать это как-то иначе.

throw new CStopException;

case WAIT_OBJECT_0+ 1 :

// Событие "квота доступна"

ASSERT( m_nQuota ); // Если сигнал подало это событие, но квоты на самом деле нет, значит где-то мы ошиблись. Надо искать баг!

if ( _nRequest >= m_nQuota ) {

nProvide = m_nQuota;

m_nQuota = 0 ;

m_eventNoQuota. Set();

else {

nProvide = _nRequest;

m_nQuota -= _nRequest;

m_eventHasQuota. Set();

break ;

return nProvide;

Маленькое замечание. Библиотека MFC в том проекте не использовалась, но, как вы наверно уже догадались, я сделал собственный класс CEvent, оболочку вокруг объекта ядра «событие», подобную MFC"шной. Как я уже говорил, такие простые классы-оболочки очень полезны, когда есть некий ресурс (в данном случае объект ядра), который необходимо не забыть освободить по окончанию работы. В остальном же нет разницы, писать ли SetEvent(m_hEvent) или m_event.Set().

Надеюсь, этот пример поможет вам спроектировать собственную схему синхронизации, если вам встретится нетривиальная ситуация. Главное - максимально тщательно проанализируйте вашу схему. Не может ли возникнуть ситуации, в которой она сработает неправильно, в частности, не может ли возникнуть блокировка? Отлавливать такие ошибки в отладчике - обычно безнадежное дело, здесь помогает только детальный анализ.

Итак, мы рассмотрели важнейшее средство синхронизации потоков: объекты синхронизации ядра. Это мощный и универсальный инструмент. С его помощью можно строить даже очень сложные схемы синхронизации. К счастью, такие нетривиальные ситуации встречаются нечасто. Кроме того, за универсальность всегда приходится расплачиваться производительностью. Поэтому во многих случаях стоит использовать другие средства синхронизации потоков, имеющиеся в Windows, такие как критические секции и атомарные операции. Они не так универсальны, зато просты и эффективны. О них мы и поговорим в следующей части.

Этот синхронизирующий объект может использоваться только локально внутри процесса, создавшего его. Остальные объекты могут быть использованы для синхронизации потоков разных процессов. Название объекта “критическая секция” связано с некоторым абстрактным выделением части программного кода (секции), выполняющего некоторые операции, порядок которых не может быть нарушен. То есть попытка двумя разными потоками одновременно выполнять код этой секции приведет к ошибке.

Например, такой секцией может быть удобно защитить функции-писатели, так как одновременный доступ нескольких писателей должен быть исключен.

Для критической секции вводят две операции:

войти в секцию; Пока какой-либо поток находится в критической секции, все остальные потоки при попытке войти в нее будут автоматически останавливаться в ожидании. Поток, уже вошедший в эту секцию, может входить в нее многократно, не ожидая ее освобождения.

покинуть секцию; При покидании потоком секции уменьшается счетчик числа вхождений этого потока в секцию, так что секция будет освобождена для других потоков только если поток выйдет из секции столько раз, сколько раз в нее входил. При освобождении критической секции будет пробужден только один поток, ожидающий разрешения на вход в эту секцию.

Вообще говоря, в других API, отличных от Win32 (например, OS/2), критическая секция рассматривается не как синхронизирующий объект, а как фрагмент кода программы, который может исполняться только одним потоком приложения. То есть вход в критическую секцию рассматривается как временное выключение механизма переключения потоков до выхода из этой секции. В Win32 API критические секции рассматриваются как объекты, что приводит к определенной путанице -- они очень близки по своим свойствам к неименованным объектам исключительного владения (mutex , см. ниже).

При использовании критических секций надо следить, что бы в секции не выделялись чересчур большие фрагменты кода, так как это может привести к существенным задержкам в выполнении других потоков.

Например, применительно к уже рассмотренным кучам -- не имеет смысла все функции по работе с кучей защищать критической секцией, так как функции-читатели могут выполняться параллельно. Более того, применение критической секции даже для синхронизации писателей на самом деле представляется малоудобным -- так как для синхронизации писателя с читателями последним все-равно придется входить в эту секцию, что практически приводит к защите всех функций единой секцией.

Можно выделить несколько случаев эффективного применения критических секций:

читатели не конфликтуют с писателями (защищать надо только писателей);

все потоки имеют примерно равные права доступа (скажем, нельзя выделить чистых писателей и читателей);

при построении составных синхронизирующих объектов, состоящих из нескольких стандартных, для защиты последовательных операций над составным объектом.