Авторы оригинала: Aleksandar Prokopec, Heather Miller
Перевод Анастасии Маркиной
Мотивация
Пока производители процессоров в последние годы дружно переходили от одноядерных к многоядерным архитектурам, научное и производственное сообщества не менее дружно признали, что многопоточное программирование по-прежнему трудно сделать популярным.
Чтобы упростить написание многопоточных программ, в стандартную библиотеку Scala были включены параллельные коллекции, которые скрыли от пользователей низкоуровневые подробности параллелизации, дав им привычную высокоуровневую абстракцию. Надежда была (и остается) на то, что скрытая под уровнем абстракции параллельность позволит на шаг приблизиться к ситуации, когда среднестатистический разработчик будет повседневно использовать в работе надежно исполняемый параллельный код.
Идея проста: коллекции – хорошо понятная и часто используемая программистами абстракция. Упорядоченность коллекций позволяет эффективно и прозрачно (для пользователя) обрабатывать их параллельно. Позволив пользователю “подменить” последовательные коллекции на те, что обрабатываются параллельно, решение Scala делает большой шаг вперед к охвату большего количества кода возможностями параллельной обработки.
Рассмотрим следующий пример, где мы исполняем монадическую операцию на некоторой большой последовательной коллекции:
val list = (1 to 10000).toList
list.map(_ + 42)
Чтобы выполнить ту же самую операцию параллельно, требуется просто вызвать метод par
на последовательной коллекции list
. После этого можно работать с параллельной коллекцией так же, как и с последовательной. То есть, пример выше примет вид:
list.par.map(_ + 42)
Библиотека параллельных коллекций Scala тесно связана с “последовательной” библиотекой коллекций Scala (представлена в версии 2.8), во многом потому, что последняя служила вдохновением к ее дизайну. Он предоставляет параллельную альтернативу ряду важных структур данных из библиотеки (последовательных) коллекций Scala, в том числе:
ParArray
ParVector
mutable.ParHashMap
mutable.ParHashSet
immutable.ParHashMap
immutable.ParHashSet
ParRange
ParTrieMap
(collection.concurrent.TrieMap
впервые в версии 2.10)
Библиотека параллельных коллекций Scala расширяема также как и последовательные коллекции, представленные в стандартной библиотеке. Другими словами, как и в случае с обычными последовательными коллекциями, пользователи могут внедрять свои собственные типы коллекций, автоматически наследуя все предопределенные (параллельные) операции, доступные для других параллельных коллекций в стандартной библиотеке.
Несколько примеров
Попробуем изобразить всеобщность и полезность представленных коллекций на ряде простых примеров, для каждого из которых характерно прозрачно-параллельное выполнение.
Примечание: Некоторые из последующих примеров оперируют небольшими коллекциями, для которых такой подход не рекомендуется. Они должны рассматриваться только как иллюстрация. Эвристически, ускорение становится заметным, когда размер коллекции дорастает до нескольких тысяч элементов. (Более подробно о взаимосвязи между размером коллекции и производительностью, смотрите соответствующий подраздел раздела, посвященного производительности в данном руководстве.)
map
Используем параллельную map
для преобразования набора строк String
в верхний регистр:
scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)
scala> lastNames.map(_.toUpperCase)
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(SMITH, JONES, FRANKENSTEIN, BACH, JACKSON, RODIN)
fold
Суммируем через fold
на ParArray
:
scala> val parArray = (1 to 10000).toArray.par
parArray: scala.collection.parallel.mutable.ParArray[Int] = ParArray(1, 2, 3, ...
scala> parArray.fold(0)(_ + _)
res0: Int = 50005000
filter
Используем параллельный filter
для отбора фамилий, которые начинаются с буквы “J” или стоящей дальше в алфавите:
scala> val lastNames = List("Smith","Jones","Frankenstein","Bach","Jackson","Rodin").par
lastNames: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Frankenstein, Bach, Jackson, Rodin)
scala> lastNames.filter(_.head >= 'J')
res0: scala.collection.parallel.immutable.ParSeq[String] = ParVector(Smith, Jones, Jackson, Rodin)
Создание параллельной коллекции
Параллельные коллекции предназначались для того, чтобы быть использованными таким же образом, как и последовательные; единственное значимое отличие – в методе получения параллельной коллекции.
В общем виде, есть два варианта создания параллельной коллекции:
Первый, с использованием ключевого слова new
и подходящего оператора import:
import scala.collection.parallel.immutable.ParVector
val pv = new ParVector[Int]
Второй, преобразованием последовательной коллекции:
val pv = Vector(1,2,3,4,5,6,7,8,9).par
Разовьем эту важную мысль – последовательные коллекции можно конвертировать в параллельные вызовом метода par
на последовательных, и, соответственно, параллельные коллекции также можно конвертировать в последовательные вызовом метода seq
на параллельных.
На заметку: Коллекции, являющиеся последовательными в силу наследования (в том смысле, что доступ к их элементам требуется получать по порядку, один элемент за другим), такие, как списки, очереди и потоки (streams), преобразовываются в свои параллельные аналоги копированием элементов в соответствующие параллельные коллекции. Например, список List
конвертируется в стандартную неизменяемую параллельную последовательность, то есть в ParVector
. Естественно, что копирование, которое для этого требуется, вносит дополнительный расход производительности, которого не требуют другие типы коллекций, такие как Array
, Vector
, HashMap
и т.д.
Больше информации о конвертировании можно найти в разделах преобразования и конкретные классы параллельных коллекций этого руководства.
Семантика
В то время, как абстракция параллельной коллекции заставляет думать о ней так, как если бы речь шла о нормальной последовательной коллекции, важно помнить, что семантика различается, особенно в том, что касается побочных эффектов и неассоциативных операций.
Для того, чтобы увидеть, что происходит, для начала представим, как именно операции выполняются параллельно. В концепции, когда фреймворк параллельных коллекций Scala распараллеливает операцию на соответствующей коллекции, он рекурсивно “разбивает” данную коллекцию, параллельно выполняет операцию на каждом разделе коллекции, а затем “комбинирует” все полученные результаты.
Эти многопоточные, “неупорядоченные” семантики параллельных коллекций приводят к следующим скрытым следствиям:
- Операции, имеющие побочные эффекты, могут нарушать детерминизм
- Неассоциативные операции могут нарушать детерминизм
Операции, имеющие побочные эффекты.
Вследствие использования фреймворком параллельных коллекций семантики многопоточного выполнения, в большинстве случаев для соблюдения детерминизма требуется избегать выполнения на коллекциях операций, которые выполняют побочные действия. В качестве простого примера попробуем использовать метод доступа foreach
для увеличения значения переменной var
, объявленной вне замыкания, которое было передано foreach
.
scala> var sum = 0
sum: Int = 0
scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…
scala> list.foreach(sum += _); sum
res01: Int = 467766
scala> var sum = 0
sum: Int = 0
scala> list.foreach(sum += _); sum
res02: Int = 457073
scala> var sum = 0
sum: Int = 0
scala> list.foreach(sum += _); sum
res03: Int = 468520
В примере видно, что несмотря на то, что каждый раз sum
инициализируется в 0, при каждом новом вызове foreach
на предложенном list
, sum
получает различное значение. Источником недетерминизма является так называемая гонка– параллельное чтение/запись одной и той же изменяемой переменной.
В примере выше возможен случай, когда два потока прочитают одно и то же значение переменной sum
, потратят некоторое время на выполнение операции над этим значением sum
, а потом попытаются записать новое значение в sum
, что может привести к перезаписи (а следовательно, к потере) значимого результата, как показано ниже:
Поток A: читает значение sum, sum = 0 значение sum: 0
Поток B: читает значение sum, sum = 0 значение sum: 0
Поток A: увеличивает sum на 760, пишет sum = 760 значение sum: 760
Поток B: увеличивает sum на 12, пишет sum = 12 значение sum: 12
Приведенный выше пример демонстрирует сценарий, где два потока успевают прочитать одно и то же значение, 0
, прежде чем один или другой из них успеет прибавить к этому 0
элемент из своего куска параллельной коллекции. В этом случае Поток A
читает 0
и прибавляет к нему свой элемент, 0+760
, в то время, как Поток B
прибавляет 0
к своему элементу, 0+12
. После того, как они вычислили свои суммы, каждый из них записывает свое значение в sum
. Получилось так, что Поток A
успевает записать значение первым, только для того, чтобы это помещенное в sum
значение было практически сразу же перезаписано потоком B
, тем самым полностью перезаписав (и потеряв) значение 760
.
Неассоциативные операции
Из-за “неупорядоченной” семантики, нелишней осторожностью становится требование выполнять только ассоциативные операции во избежание недетерминированности. То есть, если мы имеем параллельную коллекцию pcoll
, нужно убедиться, что при вызове на pcoll
функции более высокого уровня, такой как pcoll.reduce(func)
, порядок, в котором func
применяется к элементам pcoll
, может быть произвольным. Простым и очевидным примером неассоциативной операции является вычитание:
scala> val list = (1 to 1000).toList.par
list: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3,…
scala> list.reduce(_-_)
res01: Int = -228888
scala> list.reduce(_-_)
res02: Int = -61000
scala> list.reduce(_-_)
res03: Int = -331818
В примере выше, мы берем ParVector[Int]
, вызываем функцию reduce
, и передаем ей _-_
, которая просто берет два неименованных элемента, и вычитает один из другого. Вследствие того, что фреймворк параллельных коллекций порождает потоки и независимо выполняет reduce(_-_)
на разных частях коллекции, результат двух запусков reduce(_-_)
на одной и той же коллекции не будет одним и тем же.
Примечание: Часто возникает мысль, что так же, как и в случае с неассоциативными, некоммутативные операции, переданные в более высокую уровнем функцию на параллельной коллекции, приводят к недетеминированному поведению. Это неверно, простой пример – конкатенация строк – ассоциативная, но некоммутативная операция:
scala> val strings = List("abc","def","ghi","jk","lmnop","qrs","tuv","wx","yz").par
strings: scala.collection.parallel.immutable.ParSeq[java.lang.String] = ParVector(abc, def, ghi, jk, lmnop, qrs, tuv, wx, yz)
scala> val alphabet = strings.reduce(_++_)
alphabet: java.lang.String = abcdefghijklmnopqrstuvwxyz
“Неупорядоченная” семантика параллельных коллекций означает только то, что операции будут выполнены не по порядку (во временном отношении. То есть, не последовательно), она не означает, что результат будет “перемешан” относительно изначального порядка (в пространственном отношении). Напротив, результат будет практически всегда пересобран по-порядку– то есть, параллельная коллекция, разбитая на части в порядке A, B, C, будет снова объединена в том же порядке A, B, C, а не в каком-то произвольном, например, B, C, A.
Если требуется больше информации о том, как разделяются и комбинируются операции на различных типах коллекций, посетите раздел Архитектура этого руководства.