没有组合器的并行容器
正如可以定义没有构造器的个性化序列容器一样,我们也可以定义没有组合器的并行容器。没有组合器的结果就是容器的transformer方法(比如,map,flatMap,collect,filter,……)将会默认返回一个标准的容器类型,该类型是在继承树中与并行容器最接近的一个。举个例子,range类没有构造器,因此一个range类的元素映射会生成一个vector。
在接下来的例子中,我们定义了一个并行字符串容器。因为字符串在逻辑上是非可变序列,我们让并行字符串继承 immutable.ParSeq[Char]:
class ParString(val str: String)
extends immutable.ParSeq[Char] {
接着,我们定义非可变序列必须实现的方法:
def apply(i: Int) = str.charAt(i)
def length = str.length
我们还得定义这个并行容器的序列化副本。这里,我们返回WrappedString类:
def seq = new collection.immutable.WrappedString(str)
最后,我们必须为并行字符串容器定义一个splitter。我们给这个splitter起名ParStringSplitter,让它继承一个序列splitter,即,SeqSplitter[Char]:
def splitter = new ParStringSplitter(str, 0, str.length)
class ParStringSplitter(private var s: String, private var i: Int, private val ntl: Int)
extends SeqSplitter[Char] {
final def hasNext = i < ntl
final def next = {
val r = s.charAt(i)
i += 1
r
}
上面的代码中,ntl为字符串的总长,i是当前位置,s是字符串本身。
除了next和hasNext方法,并行容器迭代器,或者称它为splitter,还需要序列容器迭代器中的一些其他的方法。首先,他们有一个方法叫做 remaining,它返回这个分割器尚未遍历的元素数量。其次,需要dup方法用于复制当前的分割器。
def remaining = ntl - i
def dup = new ParStringSplitter(s, i, ntl)
最后,split和psplit方法用于创建splitter,这些splitter 用来遍历当前分割器的元素的子集。split方法,它返回一个分割器的序列,它用来遍历互不相交,互不重叠的分隔器元素子集,其中没有一个是空的。如果当前分割器有1个或更少的元素,然后就返回一个序列的分隔器。psplit方法必须返回和指定sizes参数个数一致的分割器序列。如果sizes参数指定元素数量小于当前分配器,然后一个带有额外的分配器就会附加在分配器的尾部。如果sizes参数比在当前分配器的剩余元素大很多,需要更多的元素,它将为每个分配器添加一个空的分配器。最后,调用split或psplit方法使得当前分配器无效。
def split = {
val rem = remaining
if (rem >= 2) psplit(rem / 2, rem - rem / 2)
else Seq(this)
}
def psplit(sizes: Int*): Seq[ParStringSplitter] = {
val splitted = new ArrayBuffer[ParStringSplitter]
for (sz <- sizes) {
val next = (i + sz) min ntl
splitted += new ParStringSplitter(s, i, next)
i = next
}
if (remaining > 0) splitted += new ParStringSplitter(s, i, ntl)
splitted
}
}
}
综上所述,split方法是通过psplit来实现的,它常用于并行序列计算中。由于不需要psplit,并行映射、集合、迭代器的实现,通常就更容易些。
因此,我们得到了一个并行字符串类。它唯一的缺点是,调用类似filter等转换方法不是生成并行串,而是生成并行向量,这可能是个折中的选择 - filter方法如果生成串而非向量,代价也许是昂贵的。
带组合子的并行容器
假设我们想要从并行字符串中过滤掉某些字符,例如,去除其中的逗号。如上所述,调用filter方法会生成一个并行向量,但是我们需要得到的是一个并行串(因为API中的某些接口可能需要一个连续的字符串来作为参数)。
为了避免这种情况的发生,我们需要为并行串容器写一个组合子。同时,我们也将继承ParSeqLike trait,以确保filter的返回类型是更具体的类型 - - ParString而不是ParSeq[Char]。ParSeqLike的第三个参数,用于指定并行容器对应的序列的类型(这点和序列化的 *Like trait 不同,它们只有两个类型参数)。
class ParString(val str: String)
extends immutable.ParSeq[Char]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString]
所有的方法仍然和以前一样,只是我们会增加一个额外的protected方法newCombiner,它在内部被filter方法调用。
protected[this] override def newCombiner: Combiner[Char, ParString] = new ParStringCombiner接下来我们定义ParStringCombiner类。组合子是builders的子类型,它们引进了名叫combine的方法,该方法接收另一个组合子作为参数,并返回一个新的组合子,该新的组合子包含了当前组合子和参数中的组合子中的所有元素。当前组合子和参数中的组合子在调用combine方法之后将会失效。如果参数中的组合子和当前的组合子是同一个对象,那么combine方法仅仅返回当前的组合子。该方法通常情况下是高效的,最坏情况下时间复杂度为元素个数的对数,因为它在一次并行计算中会被多次调用。
我们的ParStringCombiner会在内部维护一个字符串生成器的序列。它通过在序列的最后一个字符串builder中增加一个元素的方式,来实现+=方法。并且通过串联当前和参数中的组合子的串builder列表来实现combine方法。result方法,在并行计算结束后被调用,它会通过将所有字符串生成器添加在一起来产生一个并行串。这样一来,元素只在末端被复制一次,避免了每调一次combine方法就被复制一次。理想情况下,我们想并行化这一进程,并在它们并行时候进行复制(并行数组正在被这样做),但没有办法检测到的字符串的内部表现,这是我们能做的最好的 - 我们不得不忍受这种顺序化的瓶颈。
private class ParStringCombiner extends Combiner[Char, ParString] {
var sz = 0
val chunks = new ArrayBuffer[StringBuilder] += new StringBuilder
var lastc = chunks.last
def size: Int = sz
def +=(elem: Char): this.type = {
lastc += elem
sz += 1
this
}
def clear = {
chunks.clear
chunks += new StringBuilder
lastc = chunks.last
sz = 0
}
def result: ParString = {
val rsb = new StringBuilder
for (sb <- chunks) rsb.append(sb)
new ParString(rsb.toString)
}
def combine[U <: Char, NewTo >: ParString](other: Combiner[U, NewTo]) = if (other eq this) this else {
val that = other.asInstanceOf[ParStringCombiner]
sz += that.sz
chunks ++= that.chunks
lastc = chunks.last
this
}
}
大体上我如何来实现一个组合子?
没有现成的秘诀——它的实现依赖于手头上的数据结构,通常在实现上也需要一些创造性。但是,有几种方法经常被采用:
-
连接和合并。一些数据结构在这些操作上有高效的实现(经常是对数级的)。如果手头的容器可以由这样的一些数据结构支撑,那么它们的组合子就可以是容器本身。 Finger trees,ropes和各种堆尤其适合使用这种方法。
-
两阶段赋值,是在并行数组和并行哈希表中采用的方法,它假设元素子集可以被高效的划分到连续的排序桶中,这样最终的数据结构就可以并行的构建。第一阶段,不同的处理器独立的占据这些桶,并把这些桶连接在一起。第二阶段,数据结构被分配,不同的处理器使用不相交的桶中的元素并行地占据部分数据结构。必须注意的是,各处理器修改的部分不能有交集,否则,可能会产生微妙的并发错误。正如在前面的篇幅中介绍的,这种方法很容易应用到随机存取序列。
-
一个并发的数据结构。尽管后两种方法实际上不需要数据结构本身有同步原语,它们假定数据结构能够被不修改相同内存的不同处理器,以并发的方式建立。存在大量的并发数据结构,它们可以被多个处理器安全的修改——例如,并发skip list,并发哈希表,split-ordered list,并发 avl树等等。需要注意的是,并发的数据结构应该提供水平扩展的插入方法。对于并发并行容器,组合器可以是容器本身,并且,完成一个并行操作的所有的处理器会共享一个单独的组合器实例。
使用容器框架整合
ParString类还没有完成。虽然我们已经实现了一个自定义的组合子,它将会被类似filter,partition,takeWhile,或者span等方式使用,但是大部分transformer方法都需要一个隐式的CanBuildFrom出现(Scala collections guide有详细的解释)。为了让ParString可能,并且完全的整合到容器框架中,我们需要为其掺入额外的一个叫做GenericParTemplate的trait,并且定义ParString的伴生对象。
class ParString(val str: String)
extends immutable.ParSeq[Char]
with GenericParTemplate[Char, ParString]
with ParSeqLike[Char, ParString, collection.immutable.WrappedString] {
def companion = ParString 在这个伴生对象内部,我们隐式定义了CanBuildFrom。
object ParString {
implicit def canBuildFrom: CanCombineFrom[ParString, Char, ParString] =
new CanCombinerFrom[ParString, Char, ParString] {
def apply(from: ParString) = newCombiner
def apply() = newCombiner
}
def newBuilder: Combiner[Char, ParString] = newCombiner
def newCombiner: Combiner[Char, ParString] = new ParStringCombiner
def apply(elems: Char*): ParString = {
val cb = newCombiner
cb ++= elems
cb.result
}
}
进一步定制——并发和其他容器
实现一个并发容器(与并行容器不同,并发容器是像collection.concurrent.TrieMap一样可以被并发修改的)并不总是简单明了的。尤其是组合器,经常需要仔细想想。到目前为止,在大多数描述的并行容器中,组合器都使用两步评估。第一步元素被不同的处理器加入到组合器中,组合器被合并在一起。第二步,在所有元素完成处理后,结果容器就被创建。
组合器的另一种方式是把结果容器作为元素来构建。前提是:容器是线程安全的——组合器必须允许并发元素插入。这样的话,一个组合器就可以被所有处理器共享。
为了使一个并发容器并行化,它的组合器必须重写canBeShared方法以返回真。这会保证当一个并行操作被调用,只有一个组合器被创建。然后,+=方法必须是线程安全的。最后,如果当前的组合器和参数组合器是相同的,combine方法仍然返回当前的组合器,要不然会自动抛出异常。
为了获得更好的负载均衡,Splitter被分割成更小的splitter。默认情况下,remaining方法返回的信息被用来决定何时停止分割splitter。对于一些容器而言,调用remaining方法是有花销的,一些其他的方法应该被使用来决定何时分割splitter。在这种情况下,需要重写splitter的shouldSplitFurther方法。
如果剩余元素的数量比容器大小除以8倍并行级别更大,默认的实现将拆分splitter。
def shouldSplitFurther[S](coll: ParIterable[S], parallelismLevel: Int) =
remaining > thresholdFromSize(coll.size, parallelismLevel)
同样的,一个splitter可以持有一个计数器,来计算splitter被分割的次数。并且,如果split次数超过3+log(并行级别),shouldSplitFurther将直接返回true。这避免了必须去调用remaining方法。
此外,对于一个指定的容器如果调用remaining方法开销不低(比如,他需要评估容器中元素的数量),那么在splitter中的方法isRemainingCheap就应该被重写并返回false。
最后,若果在splitter中的remaining方法实现起来极其麻烦,你可以重写容器中的isStrictSplitterCollection方法,并返回false。虽然这些容器将不能够执行一些严格依赖splitter的方法,比如,在remaining方法中返回一个正确的值。重点是,这并不影响 for-comprehension 中使用的方法。