Шрифт:
Интервал:
Закладка:
Использование каналов для безопасной передачи данных
Если совместное использование переменных между волокнами не является правильным способом взаимодействия между волокнами, то что? Ответ – каналы. Канал — это способ связи между волокнами без необходимости беспокоиться об условиях гонки, блокировках, семафорах или других специальных структурах. Давайте посмотрим на следующий пример:
input_channel = Channel(Int32).new
output_channel = Channel(Int32).new
spawn do
output_channel.send input_channel.receive * 2
end
input_channel.send 2
puts output_channel.receive
В предыдущем примере создаются два канала, содержащие входные и выходные значения Int32. Затем он порождает волокно, которое сначала получает значение из входного канала, удваивает его и отправляет в выходной канал. Затем мы отправляем входному каналу начальное значение 2 и, наконец, печатаем результат, который получаем обратно из выходного канала. Как упоминалось в предыдущем разделе, само волокно не выполняется ни при его создании, ни при отправке ему значения. Ключевой частью этого примера является последний вызов приема на выходном канале. Этот вызов блокирует основной файбер до тех пор, пока он не получит обратно значение, в результате чего наш файбер будет выполнен и напечатан окончательный результат 4.
Давайте посмотрим на другой пример, который сделает поведение более понятным:
channel = Channel(Int32).new
spawn do
loop do
puts "Waiting"
sleep 0.5
end
end
spawn do
sleep 2
channel.send channel.receive * 2
sleep 1
channel.send channel.receive * 3
end
channel.send 2
puts channel.receive
channel.send 3
puts channel.receive
Запуск программы приводит к следующему выводу:
Waiting
Waiting
Waiting
Waiting
4
Waiting
Waiting
9
Первые результаты отправки и получения во втором волокне выполняются первыми. Однако первая строка — это sleep 2, поэтому она делает именно это. Поскольку спящий режим является блокирующей операцией, планировщик Crystal выполнит следующее ожидающее волокно, то есть то, которое печатает Waiting, а затем в цикле ожидает полсекунды. Это сообщение выводится четыре раза, что соответствует двухсекундному спящему режиму, за которым следует ожидаемый результат 4. Затем выполнение возвращается ко второму волокну, но сразу же переходит к первому волокну из-за sleep 1, что печатает Ожидание еще дважды, прежде чем отправить ожидаемый вывод 9 обратно в канал.
В обоих примерах мы работали с небуферизованными каналами. Небуферизованный канал продолжит выполнение на волокне, ожидающем получения отправленного значения из канала. Другими словами, именно поэтому выполнение программы возвращается к основному волокну для печати значения вместо продолжения выполнения второго волокна.
С другой стороны, буферизованный канал не будет переключаться на другое волокно при вызове отправки, если буфер не заполнен. Буферизованный канал можно создать, передав размер буфера конструктору канала. Например, взгляните на следующее:
channel = Channel(Int32).new 2
spawn do
puts "Before send 1"
channel.send 1
puts "Before send 2"
channel.send 2
puts "Before send 3"
channel.send 3
puts "After send"
end
3.times do
puts channel.receive
end
Это выведет следующее:
Before send 1
Before send 2
Before send 3
After send
1
2
3
Теперь, если мы запустим тот же код с небуферизованным каналом, результат будет следующий:
Before send 1
Before send 2
1
2
Before send 3
After send
3
В обоих случаях первое значение было отправлено, как и следовало ожидать. Однако два типа каналов начинают различаться, когда отправляется второе значение. В случае без буферизации ожидающий получатель отсутствует, поэтому канал запускает перепланирование, в результате чего выполнение переключается обратно на основное волокно. После печати первых двух значений выполнение переключается обратно на волокно и отправляет третье значение. Это приводит к перепланированию, при котором выполнение будет переключено обратно на основное волокно, когда в следующий раз появится такая возможность. В данном конкретном случае такой шанс появляется после печати конечного сообщения и когда в волокне больше нечего выполнять.
В случае с буферизацией первое отправленное значение выполняет команду channel.receive, которая первоначально вызвала выполнение оптоволокна. В буфер добавляется второе значение, за ним следует третье значение и, наконец, конечное сообщение. На этом этапе волокно завершает выполнение, поэтому выполнение переключается обратно на основное волокно, печатая все три значения: они включают одно из начального приема плюс два из буфера канала. Давайте добавим еще одно значение к волокну, добавив puts “Before send 4” и channel.send 4 перед конечным сообщением. Затем обновите цикл, чтобы сказать 4.times do. Повторный запуск программы дает следующий результат:
Before send 1
Before send 2
Before send 3
Before send 4
1
2
3
4
Обратите внимание, что на этот раз конечное сообщение не было напечатано. Это связано с тем, что второе и третье значения укладываются в размер буфера, равный 2. Однако, когда отправляется четвертое значение, буфер больше не может обрабатывать дополнительные значения, поэтому канал запускает перепланирование, в результате чего выполнение переключается на основное волокно снова. Поскольку первое значение было отправлено как часть исходного канала channel.recieve, а второе, третье и четвертое значения уже находятся в буфере канала, они печатаются так, как и следовало ожидать. Однако к этому моменту основное волокно уже получило четыре желаемых значения. Поэтому у него никогда не будет возможности возобновить выполнение волокна, чтобы распечатать конечное сообщение.
Во всех этих примерах мы получали значение из одного канала. Но что, если вы хотите использовать первые значения, полученные из набора из нескольких каналов? Здесь в игру вступает ключевое слово select (не путать с методом #select). Ключевое слово select позволяет вам ожидать на нескольких каналах и выполнять некоторую логику в зависимости от того, какой из них получит значение первым. Кроме того, он поддерживает работу логики, если все каналы заблокированы и по истечении заданного периода времени значение не получено. Начнем с простого примера:
channel1 = Channel(Int32).new
channel2 = Channel(Int32).new
spawn do
puts "Starting fiber 1"