1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::sync::atomic::{AtomicBool, Ordering};
use super::internal::*;
use super::*;
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
pub struct WhileSome<I: ParallelIterator> {
base: I,
}
pub fn new<I>(base: I) -> WhileSome<I>
where I: ParallelIterator
{
WhileSome { base: base }
}
impl<I, T> ParallelIterator for WhileSome<I>
where I: ParallelIterator<Item = Option<T>>,
T: Send
{
type Item = T;
fn drive_unindexed<C>(self, consumer: C) -> C::Result
where C: UnindexedConsumer<Self::Item>
{
let full = AtomicBool::new(false);
let consumer1 = WhileSomeConsumer {
base: consumer,
full: &full,
};
self.base.drive_unindexed(consumer1)
}
}
struct WhileSomeConsumer<'f, C> {
base: C,
full: &'f AtomicBool,
}
impl<'f, T, C> Consumer<Option<T>> for WhileSomeConsumer<'f, C>
where C: Consumer<T>,
T: Send
{
type Folder = WhileSomeFolder<'f, C::Folder>;
type Reducer = C::Reducer;
type Result = C::Result;
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
let (left, right, reducer) = self.base.split_at(index);
(WhileSomeConsumer { base: left, ..self },
WhileSomeConsumer { base: right, ..self },
reducer)
}
fn into_folder(self) -> Self::Folder {
WhileSomeFolder {
base: self.base.into_folder(),
full: self.full,
}
}
fn full(&self) -> bool {
self.full.load(Ordering::Relaxed) || self.base.full()
}
}
impl<'f, T, C> UnindexedConsumer<Option<T>> for WhileSomeConsumer<'f, C>
where C: UnindexedConsumer<T>,
T: Send
{
fn split_off_left(&self) -> Self {
WhileSomeConsumer { base: self.base.split_off_left(), ..*self }
}
fn to_reducer(&self) -> Self::Reducer {
self.base.to_reducer()
}
}
struct WhileSomeFolder<'f, C> {
base: C,
full: &'f AtomicBool,
}
impl<'f, T, C> Folder<Option<T>> for WhileSomeFolder<'f, C>
where C: Folder<T>
{
type Result = C::Result;
fn consume(mut self, item: Option<T>) -> Self {
match item {
Some(item) => self.base = self.base.consume(item),
None => self.full.store(true, Ordering::Relaxed),
}
self
}
fn complete(self) -> C::Result {
self.base.complete()
}
fn full(&self) -> bool {
self.full.load(Ordering::Relaxed) || self.base.full()
}
}