Merge pull request #583 from rcordovano/parallel_file_ingest

Moved ingest progress bars from task workers to ingest job
This commit is contained in:
Richard Cordovano 2014-04-01 19:37:27 -04:00
commit 4643e21030
6 changed files with 394 additions and 339 deletions

View File

@ -16,6 +16,9 @@ IngestMessagePanel.totalMessagesNameLabel.text=Total:
IngestMessagePanel.totalMessagesNameVal.text=-
IngestMessagePanel.totalUniqueMessagesNameLabel.text=Unique:
IngestMessagePanel.totalUniqueMessagesNameVal.text=-
IngestJob.progress.dataSourceIngest.displayName=Data Source Ingest of {0}
IngestJob.progress.fileIngest.displayName=File Ingest of {0}
IngestJob.progress.cancelling={0} (Cancelling...)
IngestJobConfigurationPanel.processUnallocCheckbox.toolTipText=Processes unallocated space, such as deleted files. Produces more complete results, but it may take longer to process on large images.
IngestJobConfigurationPanel.processUnallocCheckbox.text=Process Unallocated Space
IngestJobConfigurationPanel.advancedButton.text=Advanced
@ -46,14 +49,8 @@ IngestManager.toHtmlStr.totalErrs.text=Total errors\: {0}
IngestManager.toHtmlStr.module.text=Module
IngestManager.toHtmlStr.time.text=Time
IngestManager.toHtmlStr.errors.text=Errors
IngestManager.FileTaskWorker.displayName=File Ingest
IngestManager.FileTaskWorker.process.cancelling={0} (Cancelling...)
IngestManager.EnqueueWorker.displayName.text=Queueing Ingest
IngestManager.EnqueueWorker.process.cancelling={0} (Cancelling...)
IngestManager.DataSourceTaskWorker.progress.pending={0} (Pending...)
IngestManager.DataSourceTaskWorker.progress.cancelling={0} (Cancelling...)
IngestManager.datatSourceIngest.progress.text=DataSource Ingest {0}
IngestManager.fileIngest.progress.text=File Ingest {0}
IngestJob.DataSourceIngestPipeline.displayName.text={0} processing {1}
IngestMessage.toString.type.text=type\: {0}
IngestMessage.toString.source.text=\ source\: {0}

View File

@ -31,11 +31,13 @@ public class DataSourceIngestModuleStatusHelper {
private final SwingWorker worker;
private final ProgressHandle progress;
private final Content dataSource;
private final String moduleDisplayName;
DataSourceIngestModuleStatusHelper(SwingWorker worker, ProgressHandle progress, Content dataSource) {
DataSourceIngestModuleStatusHelper(SwingWorker worker, ProgressHandle progress, Content dataSource, String moduleDisplayName) {
this.worker = worker;
this.progress = progress;
this.dataSource = dataSource;
this.moduleDisplayName = moduleDisplayName;
}
/**
@ -57,7 +59,7 @@ public class DataSourceIngestModuleStatusHelper {
* @param workUnits Total number of work units for the processing of the
* data source.
*/
public void switchToDeterminate(int workUnits) { // RJCTODO: Fix this
public void switchToDeterminate(int workUnits) {
if (progress != null) {
progress.switchToDeterminate(workUnits);
}
@ -81,7 +83,7 @@ public class DataSourceIngestModuleStatusHelper {
*/
public void progress(int workUnits) {
if (progress != null) {
progress.progress(dataSource.getName(), workUnits);
progress.progress(this.moduleDisplayName, workUnits);
}
}
}

View File

@ -0,0 +1,148 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2014 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import javax.swing.SwingWorker;
import org.netbeans.api.progress.ProgressHandle;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.Content;
/**
* A data source ingest pipeline composed of a sequence of data source ingest
* modules constructed from ingest module templates.
*/
final class DataSourceIngestPipeline {
private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName());
private final IngestJob job;
private final List<IngestModuleTemplate> moduleTemplates;
private List<DataSourceIngestModuleDecorator> modules = new ArrayList<>();
DataSourceIngestPipeline(IngestJob task, List<IngestModuleTemplate> moduleTemplates) {
this.job = task;
this.moduleTemplates = moduleTemplates;
}
List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
// Create an ingest module instance from each ingest module template
// that has an ingest module factory capable of making data source
// ingest modules. Map the module class names to the module instance
// to allow the modules to be put in the sequence indicated by the
// ingest pipelines configuration.
Map<String, DataSourceIngestModuleDecorator> modulesByClass = new HashMap<>();
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) {
DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName());
IngestJobContext context = new IngestJobContext(job);
try {
module.startUp(context);
modulesByClass.put(module.getClassName(), module);
IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), module.getDisplayName());
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
}
// Establish the module sequence of the core ingest modules
// indicated by the ingest pipeline configuration, adding any
// additional modules found in the global lookup to the end of the
// pipeline in arbitrary order.
List<String> pipelineConfig = IngestPipelinesConfiguration.getInstance().getDataSourceIngestPipelineConfig();
for (String moduleClassName : pipelineConfig) {
if (modulesByClass.containsKey(moduleClassName)) {
modules.add(modulesByClass.remove(moduleClassName));
}
}
for (DataSourceIngestModuleDecorator module : modulesByClass.values()) {
modules.add(module);
}
return errors;
}
List<IngestModuleError> process(SwingWorker worker, ProgressHandle progress) {
List<IngestModuleError> errors = new ArrayList<>();
Content dataSource = this.job.getDataSource();
logger.log(Level.INFO, "Processing data source {0}", dataSource.getName());
for (DataSourceIngestModuleDecorator module : this.modules) {
try {
module.process(dataSource, new DataSourceIngestModuleStatusHelper(worker, progress, dataSource, module.getDisplayName()));
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
if (job.isCancelled()) {
break;
}
}
return errors;
}
List<IngestModuleError> shutDown(boolean ingestJobCancelled) {
List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestModuleDecorator module : this.modules) {
try {
module.shutDown(ingestJobCancelled);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
} finally {
IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName());
}
}
return errors;
}
private static class DataSourceIngestModuleDecorator implements DataSourceIngestModule {
private final DataSourceIngestModule module;
private final String displayName;
DataSourceIngestModuleDecorator(DataSourceIngestModule module, String displayName) {
this.module = module;
this.displayName = displayName;
}
String getClassName() {
return module.getClass().getCanonicalName();
}
String getDisplayName() {
return displayName;
}
@Override
public void startUp(IngestJobContext context) throws IngestModuleException {
module.startUp(context);
}
@Override
public IngestModule.ProcessResult process(Content dataSource, DataSourceIngestModuleStatusHelper statusHelper) {
return module.process(dataSource, statusHelper);
}
@Override
public void shutDown(boolean ingestJobWasCancelled) {
module.shutDown(ingestJobWasCancelled);
}
}
}

View File

@ -0,0 +1,149 @@
/*
* Autopsy Forensic Browser
*
* Copyright 2014 Basis Technology Corp.
* Contact: carrier <at> sleuthkit <dot> org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content;
/**
* A file ingest pipeline composed of a sequence of file ingest modules
* constructed from ingest module templates.
*/
final class FileIngestPipeline {
private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName());
private final IngestJob job;
private final List<IngestModuleTemplate> moduleTemplates;
private List<FileIngestModuleDecorator> modules = new ArrayList<>();
FileIngestPipeline(IngestJob task, List<IngestModuleTemplate> moduleTemplates) {
this.job = task;
this.moduleTemplates = moduleTemplates;
}
List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
// Create an ingest module instance from each ingest module template
// that has an ingest module factory capable of making data source
// ingest modules. Map the module class names to the module instance
// to allow the modules to be put in the sequence indicated by the
// ingest pipelines configuration.
Map<String, FileIngestModuleDecorator> modulesByClass = new HashMap<>();
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) {
FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName());
IngestJobContext context = new IngestJobContext(job);
try {
module.startUp(context);
modulesByClass.put(module.getClassName(), module);
IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), template.getModuleName());
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
}
// Establish the module sequence of the core ingest modules
// indicated by the ingest pipeline configuration, adding any
// additional modules found in the global lookup to the end of the
// pipeline in arbitrary order.
List<String> pipelineConfig = IngestPipelinesConfiguration.getInstance().getFileIngestPipelineConfig();
for (String moduleClassName : pipelineConfig) {
if (modulesByClass.containsKey(moduleClassName)) {
modules.add(modulesByClass.remove(moduleClassName));
}
}
for (FileIngestModuleDecorator module : modulesByClass.values()) {
modules.add(module);
}
return errors;
}
List<IngestModuleError> process(AbstractFile file) {
List<IngestModuleError> errors = new ArrayList<>();
Content dataSource = this.job.getDataSource();
logger.log(Level.INFO, String.format("Processing {0} from {1}", file.getName(), dataSource.getName()));
for (FileIngestModuleDecorator module : this.modules) {
try {
module.process(file);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
if (job.isCancelled()) {
break;
}
}
file.close();
IngestManager.fireFileDone(file.getId());
return errors;
}
List<IngestModuleError> shutDown(boolean ingestJobCancelled) {
List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) {
try {
module.shutDown(ingestJobCancelled);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
} finally {
IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName());
}
}
return errors;
}
private static class FileIngestModuleDecorator implements FileIngestModule {
private final FileIngestModule module;
private final String displayName;
FileIngestModuleDecorator(FileIngestModule module, String displayName) {
this.module = module;
this.displayName = displayName;
}
String getClassName() {
return module.getClass().getCanonicalName();
}
String getDisplayName() {
return displayName;
}
@Override
public void startUp(IngestJobContext context) throws IngestModuleException {
module.startUp(context);
}
@Override
public IngestModule.ProcessResult process(AbstractFile file) {
return module.process(file);
}
@Override
public void shutDown(boolean ingestJobWasCancelled) {
module.shutDown(ingestJobWasCancelled);
}
}
}

View File

@ -21,13 +21,10 @@ package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import javax.swing.SwingWorker;
import org.netbeans.api.progress.ProgressHandle;
import org.netbeans.api.progress.ProgressHandleFactory;
import org.openide.util.Cancellable;
import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content;
/**
@ -42,8 +39,13 @@ final class IngestJob {
private final boolean processUnallocatedSpace;
private final HashMap<Long, FileIngestPipeline> fileIngestPipelines = new HashMap<>();
private final HashMap<Long, DataSourceIngestPipeline> dataSourceIngestPipelines = new HashMap<>();
private final IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler();
private FileIngestPipeline initialFileIngestPipeline = null;
private DataSourceIngestPipeline initialDataSourceIngestPipeline = null;
private ProgressHandle dataSourceTaskProgress;
private ProgressHandle fileTasksProgress;
int totalEnqueuedFiles = 0;
private int processedFiles = 0;
private boolean cancelled;
IngestJob(long id, Content dataSource, List<IngestModuleTemplate> ingestModuleTemplates, boolean processUnallocatedSpace) {
@ -66,15 +68,53 @@ final class IngestJob {
return processUnallocatedSpace;
}
synchronized void cancel() {
cancelled = true;
}
synchronized boolean isCancelled() {
return cancelled;
}
synchronized List<IngestModuleError> startUpIngestPipelines() {
startDataSourceIngestProgressBar();
startFileIngestProgressBar();
return startUpInitialIngestPipelines();
}
private void startDataSourceIngestProgressBar() {
final String displayName = NbBundle
.getMessage(this.getClass(), "IngestJob.progress.dataSourceIngest.displayName", this.dataSource.getName());
dataSourceTaskProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
if (dataSourceTaskProgress != null) {
dataSourceTaskProgress.setDisplayName(NbBundle.getMessage(this.getClass(),
"IngestJob.progress.cancelling",
displayName));
}
IngestManager.getInstance().stopAll();
return true;
}
});
dataSourceTaskProgress.start();
dataSourceTaskProgress.switchToIndeterminate();
}
private void startFileIngestProgressBar() {
final String displayName = NbBundle
.getMessage(this.getClass(), "IngestJob.progress.fileIngest.displayName", this.dataSource.getName());
fileTasksProgress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
if (fileTasksProgress != null) {
fileTasksProgress.setDisplayName(
NbBundle.getMessage(this.getClass(), "IngestJob.progress.cancelling",
displayName));
}
IngestManager.getInstance().stopAll();
return true;
}
});
fileTasksProgress.start();
fileTasksProgress.switchToIndeterminate();
totalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst();
fileTasksProgress.switchToDeterminate(totalEnqueuedFiles);
}
private List<IngestModuleError> startUpInitialIngestPipelines() {
// Create a per thread instance of each pipeline type right now to make
// (reasonably) sure that the ingest modules can be started.
initialDataSourceIngestPipeline = new DataSourceIngestPipeline(this, ingestModuleTemplates);
@ -124,13 +164,19 @@ final class IngestJob {
if (dataSourceIngestPipeline != null) {
errors.addAll(dataSourceIngestPipeline.shutDown(cancelled));
}
this.dataSourceIngestPipelines.remove(threadId);
dataSourceIngestPipelines.remove(threadId);
if (dataSourceIngestPipelines.isEmpty() && dataSourceTaskProgress != null) {
dataSourceTaskProgress.finish();
}
FileIngestPipeline fileIngestPipeline = fileIngestPipelines.get(threadId);
if (fileIngestPipeline != null) {
errors.addAll(fileIngestPipeline.shutDown(cancelled));
}
this.fileIngestPipelines.remove(threadId);
fileIngestPipelines.remove(threadId);
if (fileIngestPipelines.isEmpty() && fileTasksProgress != null) {
fileTasksProgress.finish();
}
return errors;
}
@ -139,251 +185,29 @@ final class IngestJob {
return (dataSourceIngestPipelines.isEmpty() && fileIngestPipelines.isEmpty());
}
/**
* A data source ingest pipeline composed of a sequence of data source ingest
* modules constructed from ingest module templates.
*/
static final class DataSourceIngestPipeline {
private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName());
private final IngestJob task;
private final List<IngestModuleTemplate> moduleTemplates;
private List<DataSourceIngestModuleDecorator> modules = new ArrayList<>();
private DataSourceIngestPipeline(IngestJob task, List<IngestModuleTemplate> moduleTemplates) {
this.task = task;
this.moduleTemplates = moduleTemplates;
}
private List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
// Create an ingest module instance from each ingest module template
// that has an ingest module factory capable of making data source
// ingest modules. Map the module class names to the module instance
// to allow the modules to be put in the sequence indicated by the
// ingest pipelines configuration.
Map<String, DataSourceIngestModuleDecorator> modulesByClass = new HashMap<>();
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) {
DataSourceIngestModuleDecorator module = new DataSourceIngestModuleDecorator(template.createDataSourceIngestModule(), template.getModuleName());
IngestJobContext context = new IngestJobContext(task);
try {
module.startUp(context);
modulesByClass.put(module.getClassName(), module);
IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), module.getDisplayName());
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
}
// Establish the module sequence of the core ingest modules
// indicated by the ingest pipeline configuration, adding any
// additional modules found in the global lookup to the end of the
// pipeline in arbitrary order.
List<String> pipelineConfig = IngestPipelinesConfiguration.getInstance().getDataSourceIngestPipelineConfig();
for (String moduleClassName : pipelineConfig) {
if (modulesByClass.containsKey(moduleClassName)) {
modules.add(modulesByClass.remove(moduleClassName));
}
}
for (DataSourceIngestModuleDecorator module : modulesByClass.values()) {
modules.add(module);
}
return errors;
}
List<IngestModuleError> process(SwingWorker worker, ProgressHandle progress) {
List<IngestModuleError> errors = new ArrayList<>();
Content dataSource = this.task.getDataSource();
logger.log(Level.INFO, "Processing data source {0}", dataSource.getName());
for (DataSourceIngestModuleDecorator module : this.modules) {
try {
String displayName = NbBundle.getMessage(this.getClass(), "IngestJob.DataSourceIngestPipeline.displayName.text", module.getDisplayName(), dataSource.getName());
progress.setDisplayName(displayName);
module.process(dataSource, new DataSourceIngestModuleStatusHelper(worker, progress, dataSource));
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
if (task.isCancelled()) {
break;
}
}
return errors;
}
private List<IngestModuleError> shutDown(boolean ingestJobCancelled) {
List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestModuleDecorator module : this.modules) {
try {
module.shutDown(ingestJobCancelled);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
} finally {
IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName());
}
}
return errors;
}
private static class DataSourceIngestModuleDecorator implements DataSourceIngestModule {
private final DataSourceIngestModule module;
private final String displayName;
DataSourceIngestModuleDecorator(DataSourceIngestModule module, String displayName) {
this.module = module;
this.displayName = displayName;
}
String getClassName() {
return module.getClass().getCanonicalName();
}
String getDisplayName() {
return displayName;
}
@Override
public void startUp(IngestJobContext context) throws IngestModuleException {
module.startUp(context);
}
@Override
public IngestModule.ProcessResult process(Content dataSource, DataSourceIngestModuleStatusHelper statusHelper) {
return module.process(dataSource, statusHelper);
}
@Override
public void shutDown(boolean ingestJobWasCancelled) {
module.shutDown(ingestJobWasCancelled);
}
}
synchronized ProgressHandle getDataSourceTaskProgressBar() {
return this.dataSourceTaskProgress;
}
/**
* A file ingest pipeline composed of a sequence of file ingest modules
* constructed from ingest module templates.
*/
static final class FileIngestPipeline {
private static final Logger logger = Logger.getLogger(FileIngestPipeline.class.getName());
private final IngestJob task;
private final List<IngestModuleTemplate> moduleTemplates;
private List<FileIngestModuleDecorator> modules = new ArrayList<>();
private FileIngestPipeline(IngestJob task, List<IngestModuleTemplate> moduleTemplates) {
this.task = task;
this.moduleTemplates = moduleTemplates;
synchronized void handleFileTaskStarted(IngestScheduler.FileScheduler.FileTask task) {
int newTotalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst();
if (newTotalEnqueuedFiles > totalEnqueuedFiles) {
totalEnqueuedFiles = newTotalEnqueuedFiles + 1;
fileTasksProgress.switchToIndeterminate();
fileTasksProgress.switchToDeterminate(totalEnqueuedFiles);
}
if (processedFiles < totalEnqueuedFiles) {
++processedFiles;
}
private List<IngestModuleError> startUp() {
List<IngestModuleError> errors = new ArrayList<>();
// Create an ingest module instance from each ingest module template
// that has an ingest module factory capable of making data source
// ingest modules. Map the module class names to the module instance
// to allow the modules to be put in the sequence indicated by the
// ingest pipelines configuration.
Map<String, FileIngestModuleDecorator> modulesByClass = new HashMap<>();
for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) {
FileIngestModuleDecorator module = new FileIngestModuleDecorator(template.createFileIngestModule(), template.getModuleName()); // RJCTODO: Move into try block for bpth impls
IngestJobContext context = new IngestJobContext(task);
try {
module.startUp(context);
modulesByClass.put(module.getClassName(), module);
IngestManager.fireModuleEvent(IngestManager.IngestEvent.STARTED.toString(), template.getModuleName());
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
}
}
// Establish the module sequence of the core ingest modules
// indicated by the ingest pipeline configuration, adding any
// additional modules found in the global lookup to the end of the
// pipeline in arbitrary order.
List<String> pipelineConfig = IngestPipelinesConfiguration.getInstance().getFileIngestPipelineConfig();
for (String moduleClassName : pipelineConfig) {
if (modulesByClass.containsKey(moduleClassName)) {
modules.add(modulesByClass.remove(moduleClassName));
}
}
for (FileIngestModuleDecorator module : modulesByClass.values()) {
modules.add(module);
}
return errors;
}
fileTasksProgress.progress(task.getFile().getName(), processedFiles);
}
List<IngestModuleError> process(AbstractFile file) {
List<IngestModuleError> errors = new ArrayList<>();
Content dataSource = this.task.getDataSource();
logger.log(Level.INFO, String.format("Processing {0} from {1}", file.getName(), dataSource.getName()));
for (FileIngestModuleDecorator module : this.modules) {
try {
module.process(file);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
}
if (task.isCancelled()) {
break;
}
}
file.close();
IngestManager.fireFileDone(file.getId());
return errors;
}
synchronized void cancel() {
cancelled = true;
}
private List<IngestModuleError> shutDown(boolean ingestJobCancelled) {
List<IngestModuleError> errors = new ArrayList<>();
for (FileIngestModuleDecorator module : this.modules) {
try {
module.shutDown(ingestJobCancelled);
} catch (Exception ex) {
errors.add(new IngestModuleError(module.getDisplayName(), ex));
} finally {
IngestManager.fireModuleEvent(IngestManager.IngestEvent.COMPLETED.toString(), module.getDisplayName());
}
}
return errors;
}
private static class FileIngestModuleDecorator implements FileIngestModule {
private final FileIngestModule module;
private final String displayName;
FileIngestModuleDecorator(FileIngestModule module, String displayName) {
this.module = module;
this.displayName = displayName;
}
String getClassName() {
return module.getClass().getCanonicalName();
}
String getDisplayName() {
return displayName;
}
@Override
public void startUp(IngestJobContext context) throws IngestModuleException {
module.startUp(context);
}
@Override
public IngestModule.ProcessResult process(AbstractFile file) {
return module.process(file);
}
@Override
public void shutDown(boolean ingestJobWasCancelled) {
module.shutDown(ingestJobWasCancelled);
}
}
}
synchronized boolean isCancelled() {
return cancelled;
}
}

View File

@ -488,7 +488,6 @@ public class IngestManager {
class DataSourceTaskWorker extends SwingWorker<Object, Void> {
private final long id;
private ProgressHandle progress;
DataSourceTaskWorker(long threadId) {
this.id = threadId;
@ -497,25 +496,6 @@ public class IngestManager {
@Override
protected Void doInBackground() throws Exception {
logger.log(Level.INFO, "Data source ingest thread (id={0}) started", this.id);
// Set up a progress bar that can be used to cancel all of the
// ingest jobs currently being performed.
progress = ProgressHandleFactory.createHandle("Data source ingest", new Cancellable() {
@Override
public boolean cancel() {
logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", DataSourceTaskWorker.this.id);
if (progress != null) {
progress.setDisplayName(NbBundle.getMessage(this.getClass(),
"IngestManager.DataSourceTaskWorker.process.cancelling",
"Data source ingest"));
}
IngestManager.getInstance().stopAll();
return true;
}
});
progress.start();
progress.switchToIndeterminate();
IngestScheduler.DataSourceScheduler scheduler = IngestScheduler.getInstance().getDataSourceScheduler();
while (scheduler.hasNext()) {
if (isCancelled()) {
@ -523,13 +503,11 @@ public class IngestManager {
return null;
}
IngestJob ingestJob = scheduler.next();
IngestJob.DataSourceIngestPipeline pipeline = ingestJob.getDataSourceIngestPipelineForThread(this.id);
pipeline.process(this, this.progress);
IngestJob job = scheduler.next();
DataSourceIngestPipeline pipeline = job.getDataSourceIngestPipelineForThread(this.id);
pipeline.process(this, job.getDataSourceTaskProgressBar());
}
logger.log(Level.INFO, "Data source ingest thread (id={0}) completed", this.id);
IngestManager.getInstance().reportThreadDone(this.id);
return null;
}
@ -539,13 +517,11 @@ public class IngestManager {
super.get();
} catch (CancellationException | InterruptedException e) {
logger.log(Level.INFO, "Data source ingest thread (id={0}) cancelled", this.id);
IngestManager.getInstance().reportThreadDone(this.id);
} catch (Exception ex) {
String message = String.format("Data source ingest thread (id=%d) experienced a fatal error", this.id);
logger.log(Level.SEVERE, message, ex);
IngestManager.getInstance().reportThreadDone(this.id);
} finally {
progress.finish();
IngestManager.getInstance().reportThreadDone(this.id);
}
}
}
@ -557,7 +533,6 @@ public class IngestManager {
class FileTaskWorker extends SwingWorker<Object, Void> {
private final long id;
private ProgressHandle progress;
FileTaskWorker(long threadId) {
this.id = threadId;
@ -566,58 +541,20 @@ public class IngestManager {
@Override
protected Object doInBackground() throws Exception {
logger.log(Level.INFO, "File ingest thread (id={0}) started", this.id);
// Set up a progress bar that can be used to cancel all of the
// ingest jobs currently being performed.
final String displayName = NbBundle
.getMessage(this.getClass(), "IngestManager.FileTaskWorker.displayName");
progress = ProgressHandleFactory.createHandle(displayName, new Cancellable() {
@Override
public boolean cancel() {
logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", FileTaskWorker.this.id);
if (progress != null) {
progress.setDisplayName(
NbBundle.getMessage(this.getClass(), "IngestManager.FileTaskWorker.process.cancelling",
displayName));
}
IngestManager.getInstance().stopAll();
return true;
}
});
progress.start();
progress.switchToIndeterminate();
IngestScheduler.FileScheduler fileScheduler = IngestScheduler.getInstance().getFileScheduler();
int totalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst();
progress.switchToDeterminate(totalEnqueuedFiles);
int processedFiles = 0;
while (fileScheduler.hasNext()) {
if (isCancelled()) {
IngestManager.getInstance().reportThreadDone(this.id);
logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id);
return null;
}
IngestScheduler.FileScheduler.FileTask task = fileScheduler.next();
AbstractFile file = task.getFile();
progress.progress(file.getName(), processedFiles);
IngestJob.FileIngestPipeline pipeline = task.getJob().getFileIngestPipelineForThread(this.id);
pipeline.process(file);
// Update the progress bar.
int newTotalEnqueuedFiles = fileScheduler.getFilesEnqueuedEst();
if (newTotalEnqueuedFiles > totalEnqueuedFiles) {
totalEnqueuedFiles = newTotalEnqueuedFiles + 1;
progress.switchToIndeterminate();
progress.switchToDeterminate(totalEnqueuedFiles);
}
if (processedFiles < totalEnqueuedFiles) {
++processedFiles;
}
IngestJob job = task.getJob();
FileIngestPipeline pipeline = job.getFileIngestPipelineForThread(this.id);
job.handleFileTaskStarted(task);
pipeline.process(task.getFile());
}
logger.log(Level.INFO, "File ingest thread (id={0}) completed", this.id);
IngestManager.getInstance().reportThreadDone(this.id);
return null;
}
@ -627,13 +564,11 @@ public class IngestManager {
super.get();
} catch (CancellationException | InterruptedException e) {
logger.log(Level.INFO, "File ingest thread (id={0}) cancelled", this.id);
IngestManager.getInstance().reportThreadDone(this.id);
} catch (Exception ex) {
String message = String.format("File ingest thread {0} experienced a fatal error", this.id);
logger.log(Level.SEVERE, message, ex);
IngestManager.getInstance().reportThreadDone(this.id);
} finally {
progress.finish();
IngestManager.getInstance().reportThreadDone(this.id);
}
}
}