1 /***
2 *
3 */
4 package de.cohesion.bssh.impl;
5
6 import java.util.Date;
7 import java.util.HashMap;
8 import java.util.Iterator;
9 import java.util.Map;
10 import java.util.NoSuchElementException;
11 import java.util.Set;
12 import java.util.Timer;
13 import java.util.TimerTask;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.CancellationException;
16 import java.util.concurrent.CompletionService;
17 import java.util.concurrent.ExecutionException;
18 import java.util.concurrent.ExecutorCompletionService;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.Semaphore;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.concurrent.locks.Lock;
26 import java.util.concurrent.locks.ReentrantLock;
27
28 import de.cohesion.bssh.Command;
29 import de.cohesion.bssh.Member;
30 import de.cohesion.bssh.Result;
31 import de.cohesion.bssh.Concurrency.Level;
32 import de.cohesion.bssh.impl.lang.NotifyingThread;
33
34 /***
35 * @author schulzs
36 */
37 public class CommandProcessor {
38
39 private CompletionService<Result> svc;
40
41 private ExecutorService execSvc;
42
43 private final Lock serviceLock;
44
45 private final Set<? extends Command<Result>> commands;
46
47 private final Set<Member> members;
48
49 private boolean queried = false;
50
51 private final Timer timer;
52
53 private final long timeout;
54
55 private final Map<Future<Result>, Result> results;
56
57 public CommandProcessor(final Set<? extends Command<Result>> commands,
58 final Set<Member> members, long timeout) {
59 serviceLock = new ReentrantLock();
60 this.commands = commands;
61 this.members = members;
62 timer = new Timer();
63 this.timeout = timeout;
64 results = new HashMap<Future<Result>, Result>();
65 setConcurrency(Level.UNLIMITED);
66 }
67
68 public void setConcurrency(Level level) {
69 serviceLock.lock();
70 try {
71 if (execSvc != null) {
72 execSvc.shutdown();
73 while (!execSvc.isTerminated()) {
74 try {
75 execSvc.awaitTermination(Long.MAX_VALUE,
76 TimeUnit.SECONDS);
77 } catch (InterruptedException ie) {
78
79 }
80 }
81 }
82
83 switch (level) {
84 case FIXED: {
85 execSvc = Executors.newFixedThreadPool(level.getValue(),
86 NotifyingThread.FACTORY);
87 break;
88 }
89 case UNLIMITED: {
90 execSvc = Executors
91 .newCachedThreadPool(NotifyingThread.FACTORY);
92 break;
93 }
94 }
95
96 svc = new ExecutorCompletionService<Result>(execSvc);
97
98 } finally {
99 serviceLock.unlock();
100 }
101
102 }
103
104 private void process(final Set<? extends Command<Result>> commands,
105 final Set<Member> members) {
106 for (final Command<Result> c : commands) {
107 for (final Member m : members) {
108 serviceLock.lock();
109 try {
110
111 final Semaphore s = new Semaphore(0);
112
113
114 final Future<Result> f = svc.submit(new Callable<Result>() {
115 public Result call() throws Exception {
116 s.release();
117 Result r = null;
118 try {
119 r = c.execute(m);
120 } catch (Exception e) {
121 r = new ResultImpl(m, c, e);
122 }
123 return r;
124 }
125 });
126
127 Thread t = new Thread(new Runnable() {
128 public void run() {
129
130
131 try {
132 s.acquire();
133 } catch (InterruptedException ie) {
134 return;
135 }
136
137
138 timer.schedule(new TimerTask() {
139 @Override
140 public void run() {
141 if (!f.isDone()) {
142 f.cancel(true);
143 }
144 }
145 }, new Date(Math.max(timeout, timeout
146 + System.currentTimeMillis())));
147 }
148 });
149 t.start();
150
151 results.put(f, new ResultImpl(m, c, new TimeoutException(
152 "execution timed out")));
153
154 } finally {
155 serviceLock.unlock();
156 }
157 }
158 }
159 }
160
161 public Iterable<Result> getResults() {
162
163 if (queried) {
164 throw new IllegalStateException();
165 }
166
167 process(commands, members);
168
169 return new Iterable<Result>() {
170 public Iterator<Result> iterator() {
171 return new ResultIterator();
172 }
173 };
174 }
175
176 private class ResultIterator implements Iterator<Result> {
177
178 private int count = 0;
179
180 private Result next;
181
182 private boolean delivered = true;
183
184 public boolean hasNext() {
185
186 if (!delivered) {
187 return true;
188 }
189
190 if (count >= members.size() * commands.size()) {
191 return false;
192 }
193
194 Future<Result> f = null;
195 try {
196 serviceLock.lock();
197 try {
198 next = (f = svc.take()).get();
199 } finally {
200 serviceLock.unlock();
201 }
202 delivered = false;
203 } catch (InterruptedException ie) {
204 return false;
205 } catch (ExecutionException ee) {
206
207
208
209
210 assert false;
211 } catch (CancellationException ce) {
212 assert f != null;
213 next = results.get(f);
214 delivered = false;
215 }
216
217 return true;
218 }
219
220 public Result next() throws NoSuchElementException {
221 if (!hasNext()) {
222 throw new NoSuchElementException();
223 }
224 count++;
225 delivered = true;
226 return next;
227 }
228
229 public void remove() {
230 throw new UnsupportedOperationException();
231 }
232
233 }
234
235 }