Асинхронная передача сообщений
В этом разделе представлены две реализации асинхронной передачи сообщений. В первой из них к ядру для разделяемой памяти из главы 6 добавлены каналы и примитивы передачи сообщений. Эта реализация подходит для работы на одном процессоре или на мультипроцессоре с разделяемой памятью. Во второй реализации ядро с разделяемой памятью дополнено до распределенного ядра, которое может работать в многопроцессорной системе или в сети из отдельных машин.
10.1.1. Ядро для разделяемой памяти
Каждый канал программы представлен в ядре дескриптором канала. Дескриптор канала содержит заголовки списков сообщений и заблокированных процессов. В списке сообщений находятся сообщения, поставленные в очередь; в списке блокированных процессов — процессы, ожидающие получения сообщений. Хотя бы один из этих списков всегда пуст, поскольку, если есть доступное сообщение, процесс не блокируется, а если есть заблокированный процесс, то сообщения не ставятся в очередь.
Дескриптор создается с помощью примитива ядра createChan, который вызывается по одному разу для каждой декларации chan в программе до создания процессов. Массив каналов создается либо вызовом примитива createChan для каждого элемента, либо одним вызовом примитива createChan с параметром, указывающим размер массива. Примитив createChan возвращает имя (индекс или адрес) дескриптора.
376 Часть 2. Распределенное программирование
Оператор send реализован с помощью примитива sendChan. Сначала процесс-отправитель вычисляет выражения и собирает значения в единое сообщение, которое обычно записывает в стек выполнения процесса, передающего сообщение. Затем вызывается примитив sendChan; его аргументами являются имя канала (возвращенное из вызова createChan) и само сообщение. Примитив sendChan сначала находит дескриптор канала. Если в списке заблокированных процессов есть хотя бы один процесс, то оттуда удаляется самый старый процесс, а сообщение копируется в его адресное пространство.
После этого дескриптор процесса помещается в список готовых к работе. Если заблокированных процессов нет, сообщение необходимо сохранить в списке сообщений дескриптора, поскольку передача является неблокирующей операцией, и, следовательно, отправителю нужно позволить продолжать выполнение.
Пространство для сохраненного сообщения можно выделять динамически из единого буферного пула, или с каждым каналом может быть связан отдельный коммуникационный буфер. Однако асинхронная передача сообщений поднимает важный вопрос реализации: что, если пространство ядра исчерпано? У ядра есть два выхода: либо остановить выполнение программы из-за переполнения буфера, либо заблокировать передающий процесс, пока не появится достаточно места.
Остановка программы — это решительный шаг, поскольку свободное пространство может вскоре и появиться, но программист сразу получает сигнал о том, что сообщения производятся быстрее, чем потребляются (это обычно говорит об ошибке). С другой стороны, блокировка передающего процесса нарушает неблокирующую семантику оператора send и усложняет ядро, создавая дополнительный источник блокировок. И здесь автор параллельной программы не может ничего предполагать о скорости и порядке выполнения процессов. Ядра операционных систем блокируют отправителей сообщений и при необходимости выгружают заблокированные процессы из памяти в файл подкачки, поскольку должны избегать отказов системы. Однако для языков программирования высокого уровня приемлемым выбором является остановка программы.
Оператор receive реализуется с помощью примитива receiveChan. Его аргументами являются имя канала и адрес буфера сообщений. Действия примитива receiveChan дуальны действиям примитива sendChan. Сначала ядро находит дескриптор, соответствующий выбранному каналу, затем проверяет его список сообщений. Если список не пуст, первое сообщение из него удаляется и копируется в буфер сообщений получателя. Если список сообщений пуст, процесс-получатель добавляется в список заблокированных процессов.
Получив сообщение, процесс- адресат распаковывает сообщение из буфера в соответствующие переменные.
Четвертый примитив, emptyChan, используется для реализации функции empty (ch). Он просто находит дескриптор и проверяет, не пуст ли список сообщений. В действительности структуры данных ядра находятся не в защищенной области, и выполняемый процесс может сам проверять свой список сообщений. Критическая секция не нужна, поскольку процессу нужно просмотреть только заголовок списка сообщений.
В листинге 10.1 показаны схемы всех четырех примитивов. Эти примитивы добавлены к однопроцессорному ядру (см. листинг 6.1). Значением executing является адрес дескриптора процесса, выполняемого в данный момент, a dispatcher — это процедура, планирующая работу процессов на данном процессоре. Действия примитивов sendChan и receiveChan очень похожи на действия примитивов Р и V в семафорном ядре (см. листинг 6.4). Основное отличие состоит в том, что дескриптор канала содержит список сообщений, тогда как дескриптор семафора — только его значение.
Ядро в листинге 10.1 можно изменить для работы на мультипроцессоре с разделяемой памятью, используя методику, описанную в разделе 6.2. Основное требование состоит в том, что структуры данных ядра нужно хранить в памяти, доступной всем процессорам, а для защиты критических секций кода ядра, дающих доступ к разделяемым данным, использовать блокировки.
10.1.2. Распределенное ядро
Покажем, как для поддержки распределенного выполнения расширить ядро с разделяемой памятью. Главная идея — дублировать ядро, помещая по одной его копии на каждую машину, и обеспечить взаимодействие копий с помощью сетевых примитивов.
В распределенной программе каждый канал хранится на отдельной машине. Предположим пока, что у канала может быть сколько угодно отправителей и только один получатель. Тогда дескриптор канала было бы логично поместить на ту же машину, на которой выполняется получатель. Процесс, выполняемый на этой машине, обращается к каналу так же, как
378 Часть 2. Распределенное программирование
и при использовании ядра с разделяемой памятью. Но процесс, выполняемый на другой машине, не может обращаться к каналу напрямую; для этого должны взаимодействовать два ядра, выполняемых на этих машинах. Ниже будет описано, как изменить ядро с разделяемой памятью и как для реализации распределенной программы использовать сеть.
На рис. 10.1 показана структура распределенного ядра. Ядро, выполняемое на каждой машине, содержит дескрипторы каналов и процессы, расположенные на данной машине. Как и раньше, в каждом ядре есть обработчики локальных прерываний для вызовов супервизора (внутренние ловушки), таймеры и устройства ввода-вывода. Сеть связи является особым видом устройства ввода-вывода. Таким образом, в каждом ядре есть обработчики прерывания сети и процедуры, которые читают из сети и записывают в нее.
В качестве конкретного примера рассмотрим типичный доступ в сети Ethernet. Контроллер Ethernet состоит из двух независимых частей (для записи и для чтения). С каждой из этих частей в ядре связан обработчик прерывания. Прерывание записи устанавливается, когда операция записи завершается; сам контроллер следит за доступом к сети. Прерывание чтения устанавливается на процессоре, получающем по сети сообщение.
Примитив ядра, выполняемый в результате вызова из прикладного процесса, при передаче сообщения на другую машину вызывает процедуру ядра netWrite. Она имеет три аргумента: процессор назначения, вид сообщения (см. ниже) и само сообщение. Сначала процедура netWrite получает буфер, форматирует сообщение и записывает его в буфер. Затем, если записывающая часть сетевого контроллера свободна, инициируется запись; в противном случае буфер добавляется в очередь запросов на запись. В обоих случаях происходит выход из netWrite. Позже при возникновении прерывания записи связанный с ним обработчик освобождает буфер сообщения, которое только что было записано.
Если очередь записи не пуста, обработчик прерывания инициирует следующую сетевую запись.
Ввод из сети обычно обрабатывается в обратном порядке. Когда к ядру приходит сообщение, вызывается обработчик прерывания чтения из сети. Сначала он сохраняет состояние выполняющегося процесса, затем выделяет новый буфер для следующего входного сетевого сообщения. Наконец обработчик чтения распаковывает первое поле сообщения, чтобы определить его вид, и вызывает соответствующий виду примитив ядра."
В листинге 10.2 схематически представлены процедуры сетевого интерфейса. К ним относятся обработчики сетевых прерываний и процедура netWrite. Обработчик пе-
" При другом подходе к обработке сетевого ввода используется процесс-демон, выполняемый вне ядра. Обработчик прерывания просто передает сообщение в канал, из которого демон постоянно выбирает сообщения. Использование демона уменьшает время выполнения собственно обработчика чтения, но увеличивает общее время, необходимое для обработки сетевого ввода. С другой стороны, упрощается ядро, освобождаясь от подробностей обработки сетевых сообщений.
;
Для простоты предполагается, что передача по сети происходит без ошибок, и, следовательно, не нужно подтверждать получение сообщений или передавать их заново. Также игнорируется проблема исчерпания области буфера для входящих или исходящих сообщений. На практике для ограничения числа сообщений в буфере используется управление потоком. Ссылки на литературу, в которой описаны эти темы, даны в исторической справке.
Канал может храниться локально или удаленно, поэтому его имя должно состоять из двух полей: номера машины и индекса или смещения. Номер машины указывает, где хранится де-
380 Часть 2 Распределенное программирование
скриптор; индекс определяет положение дескриптора в ядре указанной машины. Примитив createChan также нужно дополнить аргументом, указывающим, на какой машине нужно создать канал.
Выполняя примитив createChan, ядро сначала проверяет этот аргумент. Если канал находится на той же машине, ядро создает канал (как в листинге 10.1). В противном случае ядро блокирует выполняемый процесс и передает на удаленную машину сообщение create_chan. Это сообщение содержит идентификатор выполняемого процесса. В конце концов локальное ядро получит сообщение chan_done, которое говорит о том, что на удаленной машине канал создан. Сообщение содержит имя канала и указывает процесс, для которого создан канал. Как показано в листинге 10.2, обработчик netRead_handler, получая это сообщение, вызывает еще один примитив ядра, chanDone, который снимает блокировку процесса, запросившего создание канала, и возвращает ему имя созданного канала.
Демон ядра на другой стороне сети, получив сообщение create_chan, вызывает примитив remoteCreate. Этот примитив создает канал и возвращает сообщение CHAN_DONE первому ядру. Таким образом, при создании канала на удаленной машине выполняются следующие шаги.
• Прикладной процесс вызывает локальный примитив createChan.
• Локальное ядро передает сообщение create_chan удаленному ядру.
• Обработчик прерывания чтения в удаленном ядре получает это сообщение и вызывает примитив remoteCreate удаленного ядра.
• Удаленное ядро создает канал и передает сообщение CHAN_DONE локальному ядру.
• Обработчик прерывания чтения в локальном ядре получает это сообщение и вызывает примитив chanDone, запускающий прикладной процесс.
В распределенном ядре нужно также изменить примитив sendChan. Примитив send-Chan здесь будет намного проще, чем createChan, поскольку операция передачи send является асинхронной. В частности, если канал находится на локальной машине, примитив sendChan должен выполнить такие же операции, как в листинге 10.1. Если канал находится на удаленной машине, примитив sendChan передает на эту машину сообщение SEND. В этот момент выполняемый процесс может продолжить работу. Получив сообщение SEND, удаленное ядро вызывает примитив remoteSend, который, по существу, выполняет те же действия, что и (локальный) примитив sendChan.
Его отличие состоит лишь в том, что входящее сообщение уже записано в буфер, поэтому ядру не нужно выделять для него новый буфер.
В листинге 10.3 схематически представлены примитивы распределенного ядра. Примитивы receiveChan и emptyChan по сравнению с листингом 10.1 не изменились, поскольку у каждого канала есть только один получатель, причем расположенный на той же машине, что и канал. Однако если это не так, то для взаимодействия машины, на которой был вызван примитив receiveChan или empty, и машины, на которой расположен канал, нужны дополнительные сообщения. Это взаимодействие аналогично взаимодействию при создании канала — локальное ядро передает сообщение удаленному ядру, которое выполняет примитив и возвращает результаты локальному ядру.
10.2. Синхронная передача сообщений
Напомним, что при синхронной передаче сообщений примитивы send и receive являются блокирующими: пытаясь взаимодействовать, процесс должен сначала подождать, пока
382 Часть 2. Распределенное программирование
к этому не будет готов второй процесс. Это делает ненужными потенциально неограниченные очереди буферизованных сообщений, но требует, чтобы для установления синхронизации получатель и отправитель обменялись управляющими сигналами.
Ниже будет показано, как реализовать синхронную передачу сообщений с помощью асинхронной, а затем — как реализовать операторы ввода, вывода и защищенные операторы взаимодействия библиотеки CSP, используя специальный учетный процесс (clearinghouse process). Вторую реализацию можно адаптировать для реализации пространства кортежей Linda (см. раздел 7.7). В исторической справке в конце главы даны ссылки на децентрализованные реализации; см. также упражнения.
10.2.1. Прямое взаимодействие с использованием асинхронных сообщений
Пусть дан набор из п процессов, которые взаимодействуют между собой, используя асинхронную передачу сообщений. Передающая сторона называет нужный ей приемник сообщений, а принимающая сторона может получать сообщения от любого отправителя.
Например, исходный процесс S передает сообщение процессу назначения D, выполняя операцию
synch_send(D, expressions);
Процесс назначения ждет получения сообщения из любого источника при выполнении оператора
synch_receive(source, variables);
Когда процессы доходят до выполнения этих операторов, идентификатор отправителя и значения выражений передаются в виде сообщения от процесса S процессу D. Затем эти данные записываются в переменные source и variables соответственно. Получатель, таким образом, узнает идентификатор отправителя сообщения.
Описанные примитивы можно реализовать с помощью асинхронной передачи сообщений, используя три массива каналов: sourceReady, destReady и transmit. Первые два массива используются для обмена управляющими сигналами, а третий — для передачи данных. Каналы используются, как показано в листинге 10.4. Процесс-получатель ждет сообщения из своего элемента массива sourceReady; сообщение идентифицирует отправителя. Затем получатель разрешает отправителю продолжить передачу, и передается само сообщение.
Код в листинге 10.4 обрабатывает отправку в указанное место назначения и прием сообщения из любого источника. Если обе стороны должны всегда называть друг друга, то в листинге 10.4 не нужны каналы sourceReady, а получатель может просто передавать отправителю сигнал о готовности к получению сообщения. Оставшихся операций передачи и приема вполне достаточно для синхронизации двух процессов. С другой стороны, если процесс-получатель может называть источник или принимать сообщения из любого источника, ситуация становится намного сложнее. (Такая возможность есть в библиотеке MPI.) Тогда либо нужно иметь отдельный канал для каждого пути взаимодействия и опрашивать каналы, либо получающий процесс должен проверять каждое сообщение и сохранять те из них, которые он еще не готов принять. Читателю предоставляется задача изменить реализацию, чтобы она обрабатывала описанную ситуацию (см. упражнения в конце главы).
Листинг 10.4. Синхронное взаимодействие с использованием асинхронных сообщений
разделяемые переменные:
chan sourceReady[n](int); # готовность отправителя
chan destReady[n](); # готовность получателя
chan transmit[n](byte msg[*]); # передача данных
10.2.2. Реализация защищенного взаимодействия с помощью учетного процесса
Вновь предположим, что есть n процессов, но они взаимодействуют и синхронизируются с помощью операторов ввода и вывода языка CSP (см. раздел 7.6). Напомним, что они имеют такой вид.
Source?port (переменные) ; # оператор ввода
Destination 'port (выражения) ; # оператор вывода
Эти операторы согласуются, когда процесс Destination выполняет оператор ввода, а процесс Source — оператор вывода, имена портов одинаковы, переменных и выражений поровну, и их типы совпадают.
В языке CSP также представлено защищенное взаимодействие с недетерминированным порядком. Напомним, что операторы защищенного взаимодействия имеют следующий вид. В; С -> S;
Здесь В — необязательное логическое выражение (защита), С — оператор ввода или вывода, as— список операторов. Операторы защищенного взаимодействия используются внутри операторов i f или do для выбора из нескольких возможных взаимодействий.
Основное в реализации операторов ввода, вывода и защищенных операторов — объединить в пары процессы, желающие выполнить согласованные операторы взаимодействия. Для подбора пар используется специальный "учетный процесс" СН ("clearinghouse"). Пусть обычный процесс Рг собирается выполнить оператор вывода, в котором процессом назначения является Р:, а процесс Р3 — операцию ввода с pj. в качестве источника. Предположим, что имя порта и типы сообщений совпадают. Эти процессы взаимодействуют с учетным процессом и между собой, как показано на рис. 10.2. Каждый из процессов Рх и Р-, передает учетному процессу СН сообщение, описывающее желаемое взаимодействие.
Процесс сн сначала сохраняет ;
первое из этих сообщений. Получив второе, он возвращается к первому и определяет, согласуются ли операторы двух процессов. Затем СН передает обоим процессам ответ. Получив ответ, процесс рг отсылает выражения своего оператора вывода процессу р.,, получающему их в переменные своего оператора ввода. В этот момент каждый процесс начинает выполнять код, следующий за оператором взаимодействия.
384 Часть 2. Распределенное программирование
Чтобы уточнить программную структуру на рис. 10.2, нужен канал для каждого пути взаимодействия. Один канал используется для сообщений от обычных процессов к учетному. Эти сообщения содержат шаблоны, описывающие возможные варианты согласованных операторов. Каждому обычному процессу для возвращения сообщений от учетного процесса нужен канал ответа. Наконец, нужен один канал данных для каждого обычного процесса, содержащего операторы ввода; такие каналы используются другими обычными процессами.
Пусть у каждого обычного процесса есть уникальный идентификатор (целое число от 1 до п). Эти идентификаторы используются для индексирования каналов данных и каналов ответа. Сообщения-ответы от учетного процесса определяют направление взаимодействия и идентификатор другого процесса. Сообщения по каналу данных передаются в виде массива байтов. Предполагается, что сообщения описывают сами себя, т.е. содержат метки, позволяющие получателю определить типы данных в сообщении.
Доходя до выполнения операторов ввода, вывода или операторов защищенного взаимодействия, обычные процессы передают учетному шаблоны. Эти шаблоны используются для подбора соответствующих пар операторов. Каждый шаблон имеет четыре поля. direction, source, destination, port
Для операторов вывода поле направления (direction) имеет значение OUT, для операторов ввода— IN. Источник (source) и приемник (destination) — это идентификаторы отправителя и желаемого получателя (для вывода) или желаемого отправителя и получателя (для ввода).
Поле порт (port) содержит целое число, которое однозначно определяет порт и, следовательно, типы данных операторов ввода и вывода. Каждому типу порта в исходном тексте программы должен соответствовать определенный номер. Это значит, что каждому явному имени порта должно быть назначено уникальное целочисленное значение, как и для каждого безымянного порта. (Напомним, что имена портов используются в исходной программе, поэтому номера портов можно присвоить статически во время компиляции программы.)
Листинг 10.5 содержит объявления разделяемых типов данных и каналов взаимодействия, а также код, выполняемый обычными процессами при достижении операторов ввода и вывода. Выполняя незащищенный оператор взаимодействия, процесс передает учетному процессу один шаблон и ждет ответа. Найдя согласующийся вызов (как описано ниже), учетный процесс передает ответ. Получив его, процесс-отправитель передает выражения оператора вывода в процесс назначения, который записывает их в переменные своего оператора ввода.
Используя защищенный оператор взаимодействия, процесс сначала должен проверить каждую защиту. Для каждого истинного выражения защиты процесс создает шаблон и добавляет его в множество t. После вычисления всех выражений защиты процесс передает множество t учетному процессу и ждет ответа. (Если t пусто, процесс просто продолжает работу.) Полученный ответ указывает процесс, выбранный для взаимодействия, и направление этого взаимодействия. Если направление OUT, процесс отсылает сообщение другому процессу, иначе ждет получения данных. После этого процесс выбирает соответствующий защищенный оператор и выполняет его. (Предполагается, что полей direction и who достаточно, чтобы определить, какой из операторов защищенного взаимодействия был выбран учетным процессом в качестве согласованного. В общем случае для этого нужны также порт и типы данных.)
В листинге 10.6 представлен учетный процесс СН. Массив pending содержит по одному набору шаблонов для каждого обычного процесса.
Если pending[i] не пусто, обычный процесс i блокируется в ожидании согласованного оператора взаимодействия. Получая новое множество t, процесс СН сначала просматривает один из шаблонов, чтобы определить, какой из процессов s передал его. (Если в шаблоне указано направление OUT, то источником является процесс s; если указано направление IN, то s —приемник.) Затем учетный процесс сравнивает элементы множества t с шаблонами в массиве pending, чтобы увидеть, есть ли согласование. По способу своего создания два шаблона являются согласованными, если их направления противоположны, а порты и источник с приемником одинаковы. Если СН находит соответствие с некоторым процессом i, он отсылает ответы процессам s и i (в ответах каждому процессу сообщаются идентификатор другого процесса и направление взаимодействия). В этом случае процесс СН очищает элемент pending [ i ], поскольку процесс i больше не заблокирован. Не найдя соответствия ни для одного шаблона во множестве t, процесс СН сохраняет t в элемент pending [s], где s — передающий процесс.
Листинг 10.6. Централизованный учетный процесс
# декларации глобальных типов и канале, как в листинге 10.5 process СН {
Глава 10 Реализация языковых механизмов 387
ловии, что в программе не может быть взаимных блокировок. Пусть элемент, с которого начинается поиск, указывается значением целочисленной переменной start. Получая новый набор шаблонов, процесс СН сначала просматривает элемент pending [ s tart ], затем pending [ s tart+1 ] и т.д. Как только процесс start получает шанс взаимодействия, учетный процесс СН увеличивает значение переменной start до индекса следующего процесса с непустым множеством ожидания. Тогда значение переменной s tar t будет циклически проходить по индексам процессов (при условии, что процесс start не блокируется навсегда). Таким образом, каждый процесс периодически будет получать шанс быть проверенным первым.