TSK-442: Investigate image queuing performance

This commit is contained in:
Dick Fickling 2012-04-06 09:35:15 -04:00
parent 944117bcdf
commit c6082e26a4

View File

@ -25,13 +25,12 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -186,30 +185,30 @@ public class IngestManager {
while (hasNextImage()) { while (hasNextImage()) {
//dequeue //dequeue
// get next image and set of services // get next image and set of services
final QueueUnit<Image, IngestServiceImage> qu = final Map.Entry<Image, List<IngestServiceImage>> qu =
this.getNextImage(); this.getNextImage();
// check if each service for this image is already running // check if each service for this image is already running
//synchronized (this) { //synchronized (this) {
for (IngestServiceImage quService : qu.services) { for (IngestServiceImage quService : qu.getValue()) {
boolean alreadyRunning = false; boolean alreadyRunning = false;
for (IngestImageThread worker : imageIngesters) { for (IngestImageThread worker : imageIngesters) {
// ignore threads that are on different images // ignore threads that are on different images
if (!worker.getImage().equals(qu.content)) { if (!worker.getImage().equals(qu.getKey())) {
continue; //check next worker continue; //check next worker
} }
//same image, check service (by name, not id, since different instances) //same image, check service (by name, not id, since different instances)
if (worker.getService().getName().equals(quService.getName())) { if (worker.getService().getName().equals(quService.getName())) {
alreadyRunning = true; alreadyRunning = true;
logger.log(Level.INFO, "Image Ingester <" + qu.content + ", " + quService.getName() + "> is already running"); logger.log(Level.INFO, "Image Ingester <" + qu.getKey() + ", " + quService.getName() + "> is already running");
break; break;
} }
} }
//checked all workers //checked all workers
if (alreadyRunning == false) { if (alreadyRunning == false) {
logger.log(Level.INFO, "Starting new image Ingester <" + qu.content + ", " + quService.getName() + ">"); logger.log(Level.INFO, "Starting new image Ingester <" + qu.getKey() + ", " + quService.getName() + ">");
IngestImageThread newImageWorker = new IngestImageThread(this, qu.content, quService); IngestImageThread newImageWorker = new IngestImageThread(this, qu.getKey(), quService);
imageIngesters.add(newImageWorker); imageIngesters.add(newImageWorker);
@ -477,9 +476,7 @@ public class IngestManager {
* @param service * @param service
* @param image * @param image
*/ */
private void addFsContent(IngestServiceFsContent service, Image image) { private void addFsContent(IngestServiceFsContent service, Collection<FsContent> fsContents) {
Collection<FsContent> fsContents = new GetAllFilesContentVisitor().visit(image);
logger.log(Level.INFO, "Adding image " + image.getName() + " with " + fsContents.size() + " number of fsContent to service " + service.getName());
synchronized (queuesLock) { synchronized (queuesLock) {
for (FsContent fsContent : fsContents) { for (FsContent fsContent : fsContents) {
fsContentQueue.enqueue(fsContent, service); fsContentQueue.enqueue(fsContent, service);
@ -492,8 +489,8 @@ public class IngestManager {
* the queue of FsContent to process is maintained internally * the queue of FsContent to process is maintained internally
* and could be dynamically sorted as data comes in * and could be dynamically sorted as data comes in
*/ */
private QueueUnit<FsContent, IngestServiceFsContent> getNextFsContent() { private Map.Entry<FsContent, List<IngestServiceFsContent>> getNextFsContent() {
QueueUnit<FsContent, IngestServiceFsContent> ret = null; Map.Entry<FsContent, List<IngestServiceFsContent>> ret = null;
synchronized (queuesLock) { synchronized (queuesLock) {
ret = fsContentQueue.dequeue(); ret = fsContentQueue.dequeue();
} }
@ -522,14 +519,6 @@ public class IngestManager {
} }
} }
private void sortFsContents() {
logger.log(Level.INFO, "Sorting fscontents");
synchronized (queuesLock) {
fsContentQueue.sort();
}
logger.log(Level.INFO, "Done sorting fscontents");
}
private void emptyImages() { private void emptyImages() {
synchronized (queuesLock) { synchronized (queuesLock) {
imageQueue.empty(); imageQueue.empty();
@ -541,8 +530,8 @@ public class IngestManager {
* the queue of Images to process is maintained internally * the queue of Images to process is maintained internally
* and could be dynamically sorted as data comes in * and could be dynamically sorted as data comes in
*/ */
private QueueUnit<Image, IngestServiceImage> getNextImage() { private Map.Entry<Image, List<IngestServiceImage>> getNextImage() {
QueueUnit<Image, IngestServiceImage> ret = null; Map.Entry<Image, List<IngestServiceImage>> ret = null;
synchronized (queuesLock) { synchronized (queuesLock) {
ret = imageQueue.dequeue(); ret = imageQueue.dequeue();
} }
@ -643,48 +632,40 @@ public class IngestManager {
*/ */
private class FsContentQueue { private class FsContentQueue {
final List<QueueUnit<FsContent, IngestServiceFsContent>> fsContentUnits = new ArrayList<QueueUnit<FsContent, IngestServiceFsContent>>(); final Comparator<FsContent> sorter = new Comparator<FsContent>() {
final Comparator<QueueUnit<FsContent, IngestServiceFsContent>> sorter = new Comparator<QueueUnit<FsContent, IngestServiceFsContent>>() {
@Override @Override
public int compare(QueueUnit<FsContent, IngestServiceFsContent> q1, QueueUnit<FsContent, IngestServiceFsContent> q2) { public int compare(FsContent q1, FsContent q2) {
FsContentPriotity.Priority p1 = FsContentPriotity.getPriority(q1.content); FsContentPriotity.Priority p1 = FsContentPriotity.getPriority(q1);
FsContentPriotity.Priority p2 = FsContentPriotity.getPriority(q2.content); FsContentPriotity.Priority p2 = FsContentPriotity.getPriority(q2);
if (p1 == p2) { if (p1 == p2) {
return (int) (q2.content.getId() - q1.content.getId()); return (int) (q2.getId() - q1.getId());
} else { } else {
return p2.ordinal() - p1.ordinal(); return p2.ordinal() - p1.ordinal();
} }
} }
}; };
final TreeMap<FsContent, List<IngestServiceFsContent>> fsContentUnits = new TreeMap<FsContent, List<IngestServiceFsContent>>(sorter);
void enqueue(FsContent fsContent, IngestServiceFsContent service) { void enqueue(FsContent fsContent, IngestServiceFsContent service) {
QueueUnit<FsContent, IngestServiceFsContent> found = findFsContent(fsContent); //fsContentUnits.put(fsContent, Collections.singletonList(service));
List<IngestServiceFsContent> services = fsContentUnits.get(fsContent);
if (found != null) { if(services == null) {
//FsContent already enqueued services = new ArrayList<IngestServiceFsContent>();
//merge services to use with already enqueued image fsContentUnits.put(fsContent, services);
found.add(service);
} else {
//enqueue brand new FsContent with the services
found = new QueueUnit<FsContent, IngestServiceFsContent>(fsContent, service);
fsContentUnits.add(found);
} }
services.add(service);
} }
void enqueue(FsContent fsContent, List<IngestServiceFsContent> services) { void enqueue(FsContent fsContent, List<IngestServiceFsContent> services) {
QueueUnit<FsContent, IngestServiceFsContent> found = findFsContent(fsContent);
if (found != null) { List<IngestServiceFsContent> oldServices = fsContentUnits.get(fsContent);
//FsContent already enqueued if(oldServices == null) {
//merge services to use with already enqueued FsContent oldServices = new ArrayList<IngestServiceFsContent>();
found.addAll(services); fsContentUnits.put(fsContent, oldServices);
} else {
//enqueue brand new FsContent with the services
found = new QueueUnit<FsContent, IngestServiceFsContent>(fsContent, services);
fsContentUnits.add(found);
} }
oldServices.addAll(services);
} }
boolean hasNext() { boolean hasNext() {
@ -699,22 +680,17 @@ public class IngestManager {
fsContentUnits.clear(); fsContentUnits.clear();
} }
void sort() {
Collections.sort(fsContentUnits, sorter);
}
/** /**
* Returns next FsContent and list of associated services * Returns next FsContent and list of associated services
* @return * @return
*/ */
QueueUnit<FsContent, IngestServiceFsContent> dequeue() { Map.Entry<FsContent, List<IngestServiceFsContent>> dequeue() {
if (!hasNext()) { if (!hasNext()) {
throw new UnsupportedOperationException("FsContent processing queue is empty"); throw new UnsupportedOperationException("FsContent processing queue is empty");
} }
QueueUnit<FsContent, IngestServiceFsContent> remove = fsContentUnits.remove(0);
//logger.log(Level.INFO, "DEQUE: " + remove.content.getParentPath() + " SIZE: " + toString()); //logger.log(Level.INFO, "DEQUE: " + remove.content.getParentPath() + " SIZE: " + toString());
return (remove); return (fsContentUnits.pollFirstEntry());
} }
/** /**
@ -723,30 +699,11 @@ public class IngestManager {
* @return true if the service is enqueued to do work * @return true if the service is enqueued to do work
*/ */
boolean hasServiceEnqueued(IngestServiceFsContent service) { boolean hasServiceEnqueued(IngestServiceFsContent service) {
boolean found = false; for(List<IngestServiceFsContent> list : fsContentUnits.values()) {
for (QueueUnit<FsContent, IngestServiceFsContent> unit : fsContentUnits) { if(list.contains(service))
for (IngestServiceFsContent s : unit.services) { return true;
if (s.equals(service)) {
found = true;
break;
}
}
if (found == true) {
break;
}
} }
return found; return false;
}
private QueueUnit<FsContent, IngestServiceFsContent> findFsContent(FsContent fsContent) {
QueueUnit<FsContent, IngestServiceFsContent> found = null;
for (QueueUnit<FsContent, IngestServiceFsContent> unit : fsContentUnits) {
if (unit.content.equals(fsContent)) {
found = unit;
break;
}
}
return found;
} }
@Override @Override
@ -756,10 +713,10 @@ public class IngestManager {
public String printQueue() { public String printQueue() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (QueueUnit<FsContent, IngestServiceFsContent> u : fsContentUnits) { /*for (QueueUnit<FsContent, IngestServiceFsContent> u : fsContentUnits) {
sb.append(u.toString()); sb.append(u.toString());
sb.append("\n"); sb.append("\n");
} }*/
return sb.toString(); return sb.toString();
} }
} }
@ -771,34 +728,32 @@ public class IngestManager {
*/ */
private class ImageQueue { private class ImageQueue {
private List<QueueUnit<Image, IngestServiceImage>> imageUnits = new ArrayList<QueueUnit<Image, IngestServiceImage>>(); final Comparator<Image> sorter = new Comparator<Image>() {
@Override
public int compare(Image q1, Image q2) {
return (int) (q2.getId() - q1.getId());
}
};
private TreeMap<Image, List<IngestServiceImage>> imageUnits = new TreeMap<Image, List<IngestServiceImage>>(sorter);
void enqueue(Image image, IngestServiceImage service) { void enqueue(Image image, IngestServiceImage service) {
QueueUnit<Image, IngestServiceImage> found = findImage(image); List<IngestServiceImage> services = imageUnits.get(image);
if(services == null) {
if (found != null) { services = new ArrayList<IngestServiceImage>();
//image already enqueued imageUnits.put(image, services);
//merge services to use with already enqueued image
found.add(service);
} else {
//enqueue brand new image with the services
found = new QueueUnit<Image, IngestServiceImage>(image, service);
imageUnits.add(found);
} }
services.add(service);
} }
void enqueue(Image image, List<IngestServiceImage> services) { void enqueue(Image image, List<IngestServiceImage> services) {
QueueUnit<Image, IngestServiceImage> found = findImage(image); List<IngestServiceImage> oldServices = imageUnits.get(image);
if(oldServices == null) {
if (found != null) { oldServices = new ArrayList<IngestServiceImage>();
//image already enqueued imageUnits.put(image, oldServices);
//merge services to use with already enqueued image
found.addAll(services);
} else {
//enqueue brand new image with the services
found = new QueueUnit<Image, IngestServiceImage>(image, services);
imageUnits.add(found);
} }
oldServices.addAll(services);
} }
boolean hasNext() { boolean hasNext() {
@ -817,29 +772,12 @@ public class IngestManager {
* Return a QueueUnit that contains an image and set of services to run on it. * Return a QueueUnit that contains an image and set of services to run on it.
* @return * @return
*/ */
QueueUnit<Image, IngestServiceImage> dequeue() { Map.Entry<Image, List<IngestServiceImage>> dequeue() {
if (!hasNext()) { if (!hasNext()) {
throw new UnsupportedOperationException("Image processing queue is empty"); throw new UnsupportedOperationException("Image processing queue is empty");
} }
return imageUnits.remove(0); return imageUnits.pollFirstEntry();
}
/**
* Search existing list to see if an image already has a set of
* services associated with it
* @param image
* @return
*/
private QueueUnit<Image, IngestServiceImage> findImage(Image image) {
QueueUnit<Image, IngestServiceImage> found = null;
for (QueueUnit<Image, IngestServiceImage> unit : imageUnits) {
if (unit.content.equals(image)) {
found = unit;
break;
}
}
return found;
} }
@Override @Override
@ -848,79 +786,6 @@ public class IngestManager {
} }
} }
/**
* generic representation of queued content (Image or FsContent) and its services
*/
private class QueueUnit<T, S> {
T content;
LinkedHashSet<S> services; //ordering matters (Lookup order)
QueueUnit(T content, S service) {
this.content = content;
this.services = new LinkedHashSet<S>();
add(service);
}
QueueUnit(T content, List<S> services) {
this.content = content;
this.services = new LinkedHashSet<S>();
addAll(services);
}
//merge services with the current collection of services per image
//this assumes singleton instances of every service type for correct merge
//in case of multiple instances, they need to be handled correctly after dequeue()
final void addAll(List<S> services) {
this.services.addAll(services);
}
//this assumes singleton instances of every service type for correct merge
//in case of multiple instances, they need to be handled correctly after dequeue()
final void add(S service) {
this.services.add(service);
}
@Override
@SuppressWarnings("unchecked")
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final QueueUnit<T, S> other = (QueueUnit<T, S>) obj;
if (this.content != other.content && (this.content == null || !this.content.equals(other.content))) {
return false;
}
if (this.services != other.services && (this.services == null || !this.services.equals(other.services))) {
return false;
}
return true;
}
@Override
public int hashCode() {
int hash = 7;
hash = 37 * hash + (this.content != null ? this.content.hashCode() : 0);
hash = 37 * hash + (this.services != null ? this.services.hashCode() : 0);
return hash;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("content: ");
sb.append(content.toString());
sb.append("\nservices: ");
for (S service : services) {
sb.append(service.toString()).append(" ");
}
return sb.toString();
}
}
/** /**
* collects IngestManager statistics during runtime * collects IngestManager statistics during runtime
*/ */
@ -1065,19 +930,19 @@ public class IngestManager {
initMainProgress(numFsContents); initMainProgress(numFsContents);
//process fscontents queue //process fscontents queue
while (hasNextFsContent()) { while (hasNextFsContent()) {
QueueUnit<FsContent, IngestServiceFsContent> unit = getNextFsContent(); Map.Entry<FsContent, List<IngestServiceFsContent>> unit = getNextFsContent();
//clear return values from services for last file //clear return values from services for last file
synchronized (fsContentServiceResults) { synchronized (fsContentServiceResults) {
fsContentServiceResults.clear(); fsContentServiceResults.clear();
} }
for (IngestServiceFsContent service : unit.services) { for (IngestServiceFsContent service : unit.getValue()) {
if (isCancelled()) { if (isCancelled()) {
return null; return null;
} }
try { try {
IngestServiceFsContent.ProcessResult result = service.process(unit.content); IngestServiceFsContent.ProcessResult result = service.process(unit.getKey());
//handle unconditional stop //handle unconditional stop
if (result == IngestServiceFsContent.ProcessResult.STOP) { if (result == IngestServiceFsContent.ProcessResult.STOP) {
break; break;
@ -1103,7 +968,7 @@ public class IngestManager {
initMainProgress(numFsContents); initMainProgress(numFsContents);
} }
progress.progress(unit.content.getName(), ++processedFiles); progress.progress(unit.getKey().getName(), ++processedFiles);
--numFsContents; --numFsContents;
} //end of this fsContent } //end of this fsContent
logger.log(Level.INFO, "Done background processing"); logger.log(Level.INFO, "Done background processing");
@ -1270,6 +1135,7 @@ public class IngestManager {
int processed = 0; int processed = 0;
for (Image image : images) { for (Image image : images) {
final String imageName = image.getName(); final String imageName = image.getName();
Collection<FsContent> fsContents = null;
for (IngestServiceAbstract service : services) { for (IngestServiceAbstract service : services) {
if (isCancelled()) { if (isCancelled()) {
return; return;
@ -1292,20 +1158,27 @@ public class IngestManager {
//addImage((IngestServiceImage) service, image); //addImage((IngestServiceImage) service, image);
break; break;
case FsContent: case FsContent:
if(fsContents == null) {
long start = System.currentTimeMillis();
fsContents = new GetAllFilesContentVisitor().visit(image);
logger.info("Get all files took " + (System.currentTimeMillis()-start) + "ms");
}
//enqueue the same singleton fscontent service //enqueue the same singleton fscontent service
addFsContent((IngestServiceFsContent) service, image); logger.log(Level.INFO, "Adding image " + image.getName() + " with " + fsContents.size() + " number of fsContent to service " + service.getName());
addFsContent((IngestServiceFsContent) service, fsContents);
break; break;
default: default:
logger.log(Level.SEVERE, "Unexpected service type: " + service.getType().name()); logger.log(Level.SEVERE, "Unexpected service type: " + service.getType().name());
} }
progress.progress(serviceName + " " + imageName, ++processed); progress.progress(serviceName + " " + imageName, ++processed);
} }
if(fsContents != null)
fsContents.clear();
} }
//logger.log(Level.INFO, fsContentQueue.printQueue()); //logger.log(Level.INFO, fsContentQueue.printQueue());
progress.progress("Sorting files", processed); progress.progress("Sorting files", processed);
sortFsContents();
} }
private void handleInterruption() { private void handleInterruption() {