View Javadoc

1   /***
2    * 
3    */
4   package de.cohesion.bssh.impl;
5   
6   import java.util.Date;
7   import java.util.HashMap;
8   import java.util.Iterator;
9   import java.util.Map;
10  import java.util.NoSuchElementException;
11  import java.util.Set;
12  import java.util.Timer;
13  import java.util.TimerTask;
14  import java.util.concurrent.Callable;
15  import java.util.concurrent.CancellationException;
16  import java.util.concurrent.CompletionService;
17  import java.util.concurrent.ExecutionException;
18  import java.util.concurrent.ExecutorCompletionService;
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Executors;
21  import java.util.concurrent.Future;
22  import java.util.concurrent.Semaphore;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.TimeoutException;
25  import java.util.concurrent.locks.Lock;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import de.cohesion.bssh.Command;
29  import de.cohesion.bssh.Member;
30  import de.cohesion.bssh.Result;
31  import de.cohesion.bssh.Concurrency.Level;
32  import de.cohesion.bssh.impl.lang.NotifyingThread;
33  
34  /***
35   * @author schulzs
36   */
37  public class CommandProcessor {
38  
39  	private CompletionService<Result> svc;
40  
41  	private ExecutorService execSvc;
42  
43  	private final Lock serviceLock;
44  
45  	private final Set<? extends Command<Result>> commands;
46  
47  	private final Set<Member> members;
48  
49  	private boolean queried = false;
50  
51  	private final Timer timer;
52  
53  	private final long timeout;
54  
55  	private final Map<Future<Result>, Result> results;
56  
57  	public CommandProcessor(final Set<? extends Command<Result>> commands,
58  			final Set<Member> members, long timeout) {
59  		serviceLock = new ReentrantLock();
60  		this.commands = commands;
61  		this.members = members;
62  		timer = new Timer();
63  		this.timeout = timeout;
64  		results = new HashMap<Future<Result>, Result>();
65  		setConcurrency(Level.UNLIMITED);
66  	}
67  
68  	public void setConcurrency(Level level) {
69  		serviceLock.lock();
70  		try {
71  			if (execSvc != null) {
72  				execSvc.shutdown();
73  				while (!execSvc.isTerminated()) {
74  					try {
75  						execSvc.awaitTermination(Long.MAX_VALUE,
76  								TimeUnit.SECONDS);
77  					} catch (InterruptedException ie) {
78  						/* Ignore. */
79  					}
80  				}
81  			}
82  
83  			switch (level) {
84  			case FIXED: {
85  				execSvc = Executors.newFixedThreadPool(level.getValue(),
86  						NotifyingThread.FACTORY);
87  				break;
88  			}
89  			case UNLIMITED: {
90  				execSvc = Executors
91  						.newCachedThreadPool(NotifyingThread.FACTORY);
92  				break;
93  			}
94  			}
95  
96  			svc = new ExecutorCompletionService<Result>(execSvc);
97  
98  		} finally {
99  			serviceLock.unlock();
100 		}
101 
102 	}
103 
104 	private void process(final Set<? extends Command<Result>> commands,
105 			final Set<Member> members) {
106 		for (final Command<Result> c : commands) {
107 			for (final Member m : members) {
108 				serviceLock.lock();
109 				try {
110 
111 					final Semaphore s = new Semaphore(0);
112 
113 					/* Submit command to executor. */
114 					final Future<Result> f = svc.submit(new Callable<Result>() {
115 						public Result call() throws Exception {
116 							s.release();
117 							Result r = null;
118 							try {
119 								r = c.execute(m);
120 							} catch (Exception e) {
121 								r = new ResultImpl(m, c, e);
122 							}
123 							return r;
124 						}
125 					});
126 
127 					Thread t = new Thread(new Runnable() {
128 						public void run() {
129 
130 							/* Wait until command executor thread has started. */
131 							try {
132 								s.acquire();
133 							} catch (InterruptedException ie) {
134 								return;
135 							}
136 
137 							/* Schedule task cancelation. */
138 							timer.schedule(new TimerTask() {
139 								@Override
140 								public void run() {
141 									if (!f.isDone()) {
142 										f.cancel(true);
143 									}
144 								}
145 							}, new Date(Math.max(timeout, timeout
146 									+ System.currentTimeMillis())));
147 						}
148 					});
149 					t.start();
150 
151 					results.put(f, new ResultImpl(m, c, new TimeoutException(
152 							"execution timed out")));
153 
154 				} finally {
155 					serviceLock.unlock();
156 				}
157 			}
158 		}
159 	}
160 
161 	public Iterable<Result> getResults() {
162 
163 		if (queried) {
164 			throw new IllegalStateException();
165 		}
166 
167 		process(commands, members);
168 
169 		return new Iterable<Result>() {
170 			public Iterator<Result> iterator() {
171 				return new ResultIterator();
172 			}
173 		};
174 	}
175 
176 	private class ResultIterator implements Iterator<Result> {
177 
178 		private int count = 0;
179 
180 		private Result next;
181 
182 		private boolean delivered = true;
183 
184 		public boolean hasNext() {
185 
186 			if (!delivered) {
187 				return true;
188 			}
189 
190 			if (count >= members.size() * commands.size()) {
191 				return false;
192 			}
193 
194 			Future<Result> f = null;
195 			try {
196 				serviceLock.lock();
197 				try {
198 					next = (f = svc.take()).get();
199 				} finally {
200 					serviceLock.unlock();
201 				}
202 				delivered = false;
203 			} catch (InterruptedException ie) {
204 				return false;
205 			} catch (ExecutionException ee) {
206 				/*
207 				 * Won't happen, since we catch all exceptions thrown in
208 				 * callable (see above).
209 				 */
210 				assert false;
211 			} catch (CancellationException ce) {
212 				assert f != null;
213 				next = results.get(f);
214 				delivered = false;
215 			}
216 
217 			return true;
218 		}
219 
220 		public Result next() throws NoSuchElementException {
221 			if (!hasNext()) {
222 				throw new NoSuchElementException();
223 			}
224 			count++;
225 			delivered = true;
226 			return next;
227 		}
228 
229 		public void remove() {
230 			throw new UnsupportedOperationException();
231 		}
232 
233 	}
234 
235 }