286 lines
7.6 KiB
Java
286 lines
7.6 KiB
Java
// ============================================================================
|
|
//
|
|
// Copyright (C) 2006-2016 Talend Inc. - www.talend.com
|
|
//
|
|
// This source code is available under agreement available at
|
|
// %InstallDIR%\features\org.talend.rcp.branding.%PRODUCTNAME%\%PRODUCTNAME%license.txt
|
|
//
|
|
// You should have received a copy of the agreement
|
|
// along with this program; if not, write to Talend SA
|
|
// 9 rue Pages 92150 Suresnes, France
|
|
//
|
|
// ============================================================================
|
|
package routines.system;
|
|
|
|
public class TalendThreadPool {
|
|
|
|
private volatile boolean stopAllWorkers = false;
|
|
|
|
private TalendThread errorThread = null;
|
|
|
|
private TalendThreadResult threadResult = null;
|
|
|
|
private ThreadQueue idleWorkers;
|
|
|
|
private ThreadPoolWorker[] workerList;
|
|
|
|
public TalendThreadPool(int numberOfThreads) {
|
|
threadResult = new TalendThreadResult();
|
|
numberOfThreads = Math.max(1, numberOfThreads);
|
|
idleWorkers = new ThreadQueue(numberOfThreads);
|
|
workerList = new ThreadPoolWorker[numberOfThreads];
|
|
for (int i = 0; i < workerList.length; i++) {
|
|
workerList[i] = new ThreadPoolWorker(idleWorkers);
|
|
}
|
|
}
|
|
|
|
public void execute(TalendThread target) throws InterruptedException {
|
|
if (!stopAllWorkers) {
|
|
ThreadPoolWorker worker = (ThreadPoolWorker) idleWorkers.remove();
|
|
target.talendThreadPool = this;
|
|
if (worker != null) {
|
|
worker.process(target);
|
|
}
|
|
}
|
|
}
|
|
|
|
public void waitForEndOfQueue() {
|
|
try {
|
|
while (!stopAllWorkers && idleWorkers.getSize() != workerList.length) {
|
|
Thread.sleep(100);
|
|
}
|
|
for (int i = 0; i < workerList.length; i++) {
|
|
workerList[i].stopRequest();
|
|
while (workerList[i].isAlive()) {
|
|
Thread.sleep(100);
|
|
}
|
|
}
|
|
} catch (InterruptedException x) {
|
|
}
|
|
}
|
|
|
|
public void stopAllWorkers() {
|
|
if (!stopAllWorkers) {
|
|
try {
|
|
stopAllWorkers = true;
|
|
idleWorkers.destory();
|
|
for (int i = 0; i < workerList.length; i++) {
|
|
workerList[i].stopRequest();
|
|
while (workerList[i].isAlive()) {
|
|
Thread.sleep(100);
|
|
}
|
|
}
|
|
} catch (InterruptedException x) {
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
public TalendThread getErrorThread() {
|
|
return errorThread;
|
|
}
|
|
|
|
// only keep the first ErrorThread
|
|
public void setErrorThread(TalendThread errorThread) {
|
|
if (this.errorThread == null) {
|
|
this.errorThread = errorThread;
|
|
}
|
|
}
|
|
|
|
public TalendThreadResult getTalendThreadResult() {
|
|
return threadResult;
|
|
}
|
|
}
|
|
|
|
class ThreadPoolWorker extends Object {
|
|
|
|
private static int nextWorkerID = 0;
|
|
|
|
private ThreadQueue idleWorkers;
|
|
|
|
private int workerID;
|
|
|
|
private ThreadQueue handoffBox;
|
|
|
|
private Thread internalThread;
|
|
|
|
private volatile boolean noStopRequested;
|
|
|
|
public ThreadPoolWorker(ThreadQueue idleWorkers) {
|
|
this.idleWorkers = idleWorkers;
|
|
|
|
workerID = getNextWorkerID();
|
|
handoffBox = new ThreadQueue(1); // only one slot
|
|
|
|
// just before returning, the thread should be created and started.
|
|
noStopRequested = true;
|
|
|
|
Runnable r = new Runnable() {
|
|
|
|
public void run() {
|
|
try {
|
|
runWork();
|
|
} catch (Exception x) {
|
|
// in case ANY exception slips through
|
|
x.printStackTrace();
|
|
}
|
|
}
|
|
};
|
|
|
|
internalThread = new Thread(r);
|
|
internalThread.start();
|
|
}
|
|
|
|
public static synchronized int getNextWorkerID() {
|
|
// notice: synchronized at the class level to ensure uniqueness
|
|
int id = nextWorkerID;
|
|
nextWorkerID++;
|
|
return id;
|
|
}
|
|
|
|
public void process(Runnable target) throws InterruptedException {
|
|
handoffBox.add(target);
|
|
}
|
|
|
|
private void runWork() {
|
|
while (noStopRequested) {
|
|
try {
|
|
idleWorkers.add(this);
|
|
Runnable r = (Runnable) handoffBox.remove();
|
|
runIt(r);
|
|
} catch (InterruptedException x) {
|
|
Thread.currentThread().interrupt(); // re-assert
|
|
}
|
|
}
|
|
}
|
|
|
|
private void runIt(Runnable r) {
|
|
try {
|
|
r.run();
|
|
} catch (Exception runex) {
|
|
runex.printStackTrace();
|
|
} finally {
|
|
Thread.interrupted();
|
|
}
|
|
}
|
|
|
|
public void stopRequest() {
|
|
noStopRequested = false;
|
|
internalThread.interrupt();
|
|
}
|
|
|
|
public boolean isAlive() {
|
|
return internalThread.isAlive();
|
|
}
|
|
}
|
|
|
|
class ThreadQueue {
|
|
|
|
private Object[] queue;
|
|
|
|
private int maxSize;
|
|
|
|
private int size;
|
|
|
|
private int head;
|
|
|
|
private int tail;
|
|
|
|
private volatile boolean isDestory = false;
|
|
|
|
public ThreadQueue(int cap) {
|
|
maxSize = (cap > 0) ? cap : 1; // at least 1
|
|
queue = new Object[maxSize];
|
|
head = 0;
|
|
tail = 0;
|
|
size = 0;
|
|
}
|
|
|
|
public synchronized int getSize() {
|
|
return size;
|
|
}
|
|
|
|
public synchronized boolean isEmpty() {
|
|
return (size == 0);
|
|
}
|
|
|
|
public synchronized boolean isFull() {
|
|
return (size == maxSize);
|
|
}
|
|
|
|
public synchronized void add(Object obj) throws InterruptedException {
|
|
waitWhileFull();
|
|
queue[head] = obj;
|
|
head = (head + 1) % maxSize;
|
|
size++;
|
|
|
|
notifyAll();
|
|
}
|
|
|
|
public synchronized Object remove() throws InterruptedException {
|
|
waitWhileEmpty();
|
|
Object obj = queue[tail];
|
|
queue[tail] = null;
|
|
tail = (tail + 1) % maxSize;
|
|
size--;
|
|
notifyAll();
|
|
return obj;
|
|
}
|
|
|
|
public synchronized Object[] removeAll() throws InterruptedException {
|
|
Object[] list = new Object[size];
|
|
for (int i = 0; i < list.length; i++) {
|
|
list[i] = remove();
|
|
}
|
|
return list;
|
|
}
|
|
|
|
public synchronized boolean waitUntilEmpty(long msTimeout) throws InterruptedException {
|
|
if (msTimeout == 0L) {
|
|
waitUntilEmpty();
|
|
return true;
|
|
}
|
|
|
|
long endTime = System.currentTimeMillis() + msTimeout;
|
|
long msRemaining = msTimeout;
|
|
|
|
while (!isEmpty() && (msRemaining > 0L)) {
|
|
wait(msRemaining);
|
|
msRemaining = endTime - System.currentTimeMillis();
|
|
}
|
|
return isEmpty();
|
|
}
|
|
|
|
public synchronized void waitUntilEmpty() throws InterruptedException {
|
|
while (!isEmpty()) {
|
|
wait();
|
|
}
|
|
}
|
|
|
|
// "remove" work with sign "isDestory"
|
|
public synchronized void waitWhileEmpty() throws InterruptedException {
|
|
while (!isDestory && isEmpty()) {
|
|
wait();
|
|
}
|
|
}
|
|
|
|
public synchronized void waitUntilFull() throws InterruptedException {
|
|
while (!isFull()) {
|
|
wait();
|
|
}
|
|
}
|
|
|
|
// "add" work with sign "isDestory"
|
|
public synchronized void waitWhileFull() throws InterruptedException {
|
|
while (!isDestory && isFull()) {
|
|
wait();
|
|
}
|
|
}
|
|
|
|
public synchronized void destory() throws InterruptedException {
|
|
isDestory = true;
|
|
this.notify();
|
|
}
|
|
|
|
}
|