limit the scope of mutext when getting IngestManager singleton instance

This commit is contained in:
adam-m 2013-03-28 12:38:27 -04:00
parent ab39110253
commit 8e2bcdfae0

View File

@ -113,10 +113,9 @@ public class IngestManager {
* *
*/ */
DATA, DATA,
/** /**
* Event send when content changed, either its attributes changed, or new content * Event send when content changed, either its attributes changed, or
* children have been added * new content children have been added
*/ */
CONTENT_CHANGED CONTENT_CHANGED
}; };
@ -173,10 +172,14 @@ public class IngestManager {
* *
* @returns Instance of class. * @returns Instance of class.
*/ */
public static synchronized IngestManager getDefault() { public static IngestManager getDefault() {
if (instance == null) { if (instance == null) {
logger.log(Level.INFO, "creating manager instance"); synchronized (IngestManager.class) {
instance = new IngestManager(); if (instance == null) {
logger.log(Level.INFO, "creating manager instance");
instance = new IngestManager();
}
}
} }
return instance; return instance;
} }
@ -197,7 +200,7 @@ public class IngestManager {
static synchronized void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) { static synchronized void fireModuleDataEvent(ModuleDataEvent moduleDataEvent) {
pcs.firePropertyChange(IngestModuleEvent.DATA.toString(), moduleDataEvent, null); pcs.firePropertyChange(IngestModuleEvent.DATA.toString(), moduleDataEvent, null);
} }
static synchronized void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) { static synchronized void fireModuleContentEvent(ModuleContentEvent moduleContentEvent) {
pcs.firePropertyChange(IngestModuleEvent.CONTENT_CHANGED.toString(), moduleContentEvent, null); pcs.firePropertyChange(IngestModuleEvent.CONTENT_CHANGED.toString(), moduleContentEvent, null);
} }
@ -259,18 +262,18 @@ public class IngestManager {
logger.log(Level.INFO, "Will enqueue image: " + image.getName()); logger.log(Level.INFO, "Will enqueue image: " + image.getName());
execute(modules, images); execute(modules, images);
} }
/** /**
* Schedule a file for ingest. * Schedule a file for ingest. Scheduler updates the current progress.
* Scheduler updates the current progress. *
* * The file is usually a product of a recently ran ingest. Now we want to
* The file is usually a product of a recently ran ingest. * process this file with the same ingest context.
* Now we want to process this file with the same ingest context. *
*
* @param file file to be scheduled * @param file file to be scheduled
* @param pipelineContext ingest context used to ingest parent of the file to be scheduled * @param pipelineContext ingest context used to ingest parent of the file
* to be scheduled
*/ */
void scheduleFile(AbstractFile file, PipelineContext pipelineContext) { void scheduleFile(AbstractFile file, PipelineContext pipelineContext) {
scheduler.getFileScheduler().schedule(file, pipelineContext); scheduler.getFileScheduler().schedule(file, pipelineContext);
} }
@ -325,7 +328,7 @@ public class IngestManager {
IngestModuleInit moduleInit = new IngestModuleInit(); IngestModuleInit moduleInit = new IngestModuleInit();
moduleInit.setModuleArgs(taskModule.getArguments()); moduleInit.setModuleArgs(taskModule.getArguments());
PipelineContext<IngestModuleImage>imagepipelineContext = PipelineContext<IngestModuleImage> imagepipelineContext =
new PipelineContext<IngestModuleImage>(imageTask, getProcessUnallocSpace()); new PipelineContext<IngestModuleImage>(imageTask, getProcessUnallocSpace());
final IngestImageThread newImageWorker = new IngestImageThread(this, final IngestImageThread newImageWorker = new IngestImageThread(this,
imagepipelineContext, imageTask.getImage(), taskModule, moduleInit); imagepipelineContext, imageTask.getImage(), taskModule, moduleInit);
@ -406,9 +409,9 @@ public class IngestManager {
if (!cancelled) { if (!cancelled) {
logger.log(Level.INFO, "Unable to cancel file ingest worker, likely already stopped"); logger.log(Level.INFO, "Unable to cancel file ingest worker, likely already stopped");
} }
abstractFileIngester = null; abstractFileIngester = null;
} }
List<IngestImageThread> toStop = new ArrayList<IngestImageThread>(); List<IngestImageThread> toStop = new ArrayList<IngestImageThread>();
@ -612,19 +615,18 @@ public class IngestManager {
ui.displayMessage(message); ui.displayMessage(message);
} }
} }
/** /**
* Get free disk space of a drive where ingest data are written to * Get free disk space of a drive where ingest data are written to That
* That drive is being monitored by IngestMonitor thread when ingest is running. * drive is being monitored by IngestMonitor thread when ingest is running.
* Use this method to get amount of free disk space anytime. * Use this method to get amount of free disk space anytime.
* *
* @return amount of disk space, -1 if unknown * @return amount of disk space, -1 if unknown
*/ */
long getFreeDiskSpace() { long getFreeDiskSpace() {
if (ingestMonitor != null) { if (ingestMonitor != null) {
return ingestMonitor.getFreeSpace(); return ingestMonitor.getFreeSpace();
} } else {
else {
return -1; return -1;
} }
} }
@ -734,12 +736,11 @@ public class IngestManager {
String moduleName; String moduleName;
if (module != null) { if (module != null) {
moduleName = module.getName(); moduleName = module.getName();
} } else {
else {
//manager message //manager message
moduleName = "System"; moduleName = "System";
} }
sb.append("\t").append(moduleName).append(": ").append(errorsModule).append(EOL); sb.append("\t").append(moduleName).append(": ").append(errorsModule).append(EOL);
} }
} }
@ -816,25 +817,23 @@ public class IngestManager {
} }
} }
/** /**
* File ingest pipeline processor. * File ingest pipeline processor. Worker runs until AbstractFile queue is
* Worker runs until AbstractFile queue is consumed * consumed New instance is created and started when data arrives and
* New instance is created and started when data arrives and previous pipeline completed. * previous pipeline completed.
*/ */
private class IngestAbstractFileProcessor extends SwingWorker<Object, Void> { private class IngestAbstractFileProcessor extends SwingWorker<Object, Void> {
private Logger logger = Logger.getLogger(IngestAbstractFileProcessor.class.getName()); private Logger logger = Logger.getLogger(IngestAbstractFileProcessor.class.getName());
//progress bar //progress bar
private ProgressHandle progress; private ProgressHandle progress;
@Override @Override
protected Object doInBackground() throws Exception { protected Object doInBackground() throws Exception {
logger.log(Level.INFO, "Starting background ingest file processor"); logger.log(Level.INFO, "Starting background ingest file processor");
logger.log(Level.INFO, PlatformUtil.getAllMemUsageInfo()); logger.log(Level.INFO, PlatformUtil.getAllMemUsageInfo());
stats.start(); stats.start();
//notify main thread modules started //notify main thread modules started
@ -869,7 +868,7 @@ public class IngestManager {
final PipelineContext<IngestModuleAbstractFile> filepipelineContext = fileTask.context; final PipelineContext<IngestModuleAbstractFile> filepipelineContext = fileTask.context;
final ScheduledImageTask<IngestModuleAbstractFile> fileIngestTask = filepipelineContext.getScheduledTask(); final ScheduledImageTask<IngestModuleAbstractFile> fileIngestTask = filepipelineContext.getScheduledTask();
final AbstractFile fileToProcess = fileTask.file; final AbstractFile fileToProcess = fileTask.file;
//clear return values from modules for last file //clear return values from modules for last file
synchronized (abstractFileModulesRetValues) { synchronized (abstractFileModulesRetValues) {
abstractFileModulesRetValues.clear(); abstractFileModulesRetValues.clear();
@ -877,7 +876,7 @@ public class IngestManager {
logger.log(Level.INFO, "IngestManager: Processing: {0}", fileToProcess.getName()); logger.log(Level.INFO, "IngestManager: Processing: {0}", fileToProcess.getName());
progress.progress(fileToProcess.getName(), processedFiles); progress.progress(fileToProcess.getName(), processedFiles);
for (IngestModuleAbstractFile module : fileIngestTask.getModules() ) { for (IngestModuleAbstractFile module : fileIngestTask.getModules()) {
//process the file with every file module //process the file with every file module
if (isCancelled()) { if (isCancelled()) {
logger.log(Level.INFO, "Terminating file ingest due to cancellation."); logger.log(Level.INFO, "Terminating file ingest due to cancellation.");
@ -886,8 +885,7 @@ public class IngestManager {
try { try {
stats.logFileModuleStartProcess(module); stats.logFileModuleStartProcess(module);
IngestModuleAbstractFile.ProcessResult result = module.process IngestModuleAbstractFile.ProcessResult result = module.process(filepipelineContext, fileToProcess);
(filepipelineContext, fileToProcess);
stats.logFileModuleEndProcess(module); stats.logFileModuleEndProcess(module);
//store the result for subsequent modules for this file //store the result for subsequent modules for this file
@ -898,13 +896,12 @@ public class IngestManager {
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "Error: unexpected exception from module: " + module.getName(), e); logger.log(Level.SEVERE, "Error: unexpected exception from module: " + module.getName(), e);
stats.addError(module); stats.addError(module);
} } catch (OutOfMemoryError e) {
catch (OutOfMemoryError e) {
logger.log(Level.SEVERE, "Error: out of memory from module: " + module.getName(), e); logger.log(Level.SEVERE, "Error: out of memory from module: " + module.getName(), e);
stats.addError(module); stats.addError(module);
} }
} //end for every module } //end for every module
//free the internal file resource after done with every module //free the internal file resource after done with every module
fileToProcess.close(); fileToProcess.close();
@ -921,8 +918,8 @@ public class IngestManager {
++processedFiles; ++processedFiles;
} }
//--totalEnqueuedFiles; //--totalEnqueuedFiles;
} //end of for every AbstractFile } //end of for every AbstractFile
logger.log(Level.INFO, "IngestManager: Finished processing files"); logger.log(Level.INFO, "IngestManager: Finished processing files");
return null; return null;
@ -939,7 +936,7 @@ public class IngestManager {
IngestManager.fireModuleEvent(IngestModuleEvent.COMPLETED.toString(), s.getName()); IngestManager.fireModuleEvent(IngestModuleEvent.COMPLETED.toString(), s.getName());
} }
} }
logger.log(Level.INFO, PlatformUtil.getAllMemUsageInfo()); logger.log(Level.INFO, PlatformUtil.getAllMemUsageInfo());
logger.log(Level.INFO, "Freeing jvm heap resources post file pipeline run"); logger.log(Level.INFO, "Freeing jvm heap resources post file pipeline run");
System.gc(); System.gc();
@ -1105,16 +1102,14 @@ public class IngestManager {
//queue to schedulers //queue to schedulers
final boolean processUnalloc = getProcessUnallocSpace(); final boolean processUnalloc = getProcessUnallocSpace();
final ScheduledImageTask<IngestModuleImage> imageTask = new ScheduledImageTask<IngestModuleImage>(image, imageMods); final ScheduledImageTask<IngestModuleImage> imageTask = new ScheduledImageTask<IngestModuleImage>(image, imageMods);
final PipelineContext<IngestModuleImage>imagepipelineContext final PipelineContext<IngestModuleImage> imagepipelineContext = new PipelineContext<IngestModuleImage>(imageTask, processUnalloc);
= new PipelineContext<IngestModuleImage>(imageTask, processUnalloc);
logger.log(Level.INFO, "Queing image ingest task: " + imageTask); logger.log(Level.INFO, "Queing image ingest task: " + imageTask);
progress.progress("Image Ingest" + " " + imageName, processed); progress.progress("Image Ingest" + " " + imageName, processed);
imageScheduler.schedule(imagepipelineContext); imageScheduler.schedule(imagepipelineContext);
progress.progress("Image Ingest" + " " + imageName, ++processed); progress.progress("Image Ingest" + " " + imageName, ++processed);
final ScheduledImageTask fTask = new ScheduledImageTask(image, fileMods); final ScheduledImageTask fTask = new ScheduledImageTask(image, fileMods);
final PipelineContext<IngestModuleAbstractFile>filepipelineContext final PipelineContext<IngestModuleAbstractFile> filepipelineContext = new PipelineContext<IngestModuleAbstractFile>(fTask, processUnalloc);
= new PipelineContext<IngestModuleAbstractFile>(fTask, processUnalloc);
logger.log(Level.INFO, "Queing file ingest task: " + fTask); logger.log(Level.INFO, "Queing file ingest task: " + fTask);
progress.progress("File Ingest" + " " + imageName, processed); progress.progress("File Ingest" + " " + imageName, processed);
fileScheduler.schedule(filepipelineContext); fileScheduler.schedule(filepipelineContext);