Cycle through each DSP with non-zero confidence in decreasing order of confidence.

This commit is contained in:
Ann Priestman 2016-11-10 11:17:44 -05:00
parent 0be6b1c3dc
commit d21bbcfaea
2 changed files with 61 additions and 20 deletions

View File

@ -192,6 +192,32 @@ final class AutoIngestJobLogger {
log(MessageCategory.WARNING, "Cancelled adding data source to case"); log(MessageCategory.WARNING, "Cancelled adding data source to case");
} }
/**
* Logs selection of a data source processor
* @param dsp Name of the data source processor
* @throws AutoIngestJobLoggerException if there is an error writing the log
* message.
* @throws InterruptedException if interrupted while blocked waiting
* to acquire an exclusive lock on the
* log file.
*/
void logDataSourceProcessorSelected(String dsp) throws AutoIngestJobLoggerException, InterruptedException{
log(MessageCategory.INFO, "Using data source processor: " + dsp);
}
/**
* Logs the failure of the selected data source processor.
* @param dsp Name of the data source processor
* @throws AutoIngestJobLoggerException if there is an error writing the log
* message.
* @throws InterruptedException if interrupted while blocked waiting
* to acquire an exclusive lock on the
* log file.
*/
void logDataSourceProcessorError(String dsp) throws AutoIngestJobLoggerException, InterruptedException{
log(MessageCategory.ERROR, "Error processing with data source processor: " + dsp);
}
/** /**
* Logs the addition of a data source to the case database. * Logs the addition of a data source to the case database.
* *

View File

@ -63,6 +63,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.Immutable; import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
@ -2144,31 +2145,28 @@ public final class AutoIngestManager extends Observable implements PropertyChang
DataSourceProcessorProgressMonitor progressMonitor = new DoNothingDSPProgressMonitor(); DataSourceProcessorProgressMonitor progressMonitor = new DoNothingDSPProgressMonitor();
Path caseDirectoryPath = currentJob.getCaseDirectoryPath(); Path caseDirectoryPath = currentJob.getCaseDirectoryPath();
AutoIngestJobLogger jobLogger = new AutoIngestJobLogger(manifestPath, manifest.getDataSourceFileName(), caseDirectoryPath); AutoIngestJobLogger jobLogger = new AutoIngestJobLogger(manifestPath, manifest.getDataSourceFileName(), caseDirectoryPath);
AutomatedIngestDataSourceProcessor selectedProcessor = null;
try { try {
caseForJob.notifyAddingDataSource(taskId); caseForJob.notifyAddingDataSource(taskId);
// lookup all AutomatedIngestDataSourceProcessors // lookup all AutomatedIngestDataSourceProcessors
Collection<? extends AutomatedIngestDataSourceProcessor> processorCandidates = Lookup.getDefault().lookupAll(AutomatedIngestDataSourceProcessor.class); Collection<? extends AutomatedIngestDataSourceProcessor> processorCandidates = Lookup.getDefault().lookupAll(AutomatedIngestDataSourceProcessor.class);
int selectedProcessorConfidence = 0; Map<AutomatedIngestDataSourceProcessor, Integer> validDataSourceProcessorsMap = new HashMap<>();
for (AutomatedIngestDataSourceProcessor processor : processorCandidates) { for (AutomatedIngestDataSourceProcessor processor : processorCandidates) {
int confidence = 0;
try { try {
confidence = processor.canProcess(dataSource.getPath()); int confidence = processor.canProcess(dataSource.getPath());
if(confidence > 0){
validDataSourceProcessorsMap.put(processor, confidence);
}
} catch (AutomatedIngestDataSourceProcessor.AutomatedIngestDataSourceProcessorException ex) { } catch (AutomatedIngestDataSourceProcessor.AutomatedIngestDataSourceProcessorException ex) {
SYS_LOGGER.log(Level.SEVERE, "Exception while determining whether data source processor {0} can process {1}", new Object[]{processor.getDataSourceType(), dataSource.getPath()}); SYS_LOGGER.log(Level.SEVERE, "Exception while determining whether data source processor {0} can process {1}", new Object[]{processor.getDataSourceType(), dataSource.getPath()});
// rethrow the exception. It will get caught & handled upstream and will result in AIM auto-pause. // rethrow the exception. It will get caught & handled upstream and will result in AIM auto-pause.
throw ex; throw ex;
} }
if (confidence > selectedProcessorConfidence) {
selectedProcessor = processor;
selectedProcessorConfidence = confidence;
}
} }
// did we find a data source processor that can process the data source // did we find a data source processor that can process the data source
if (selectedProcessor == null) { if (validDataSourceProcessorsMap.isEmpty()) {
// This should never happen. We should add all unsupported data sources as logical files. // This should never happen. We should add all unsupported data sources as logical files.
AutoIngestAlertFile.create(caseDirectoryPath); AutoIngestAlertFile.create(caseDirectoryPath);
currentJob.setErrorsOccurred(true); currentJob.setErrorsOccurred(true);
@ -2177,19 +2175,36 @@ public final class AutoIngestManager extends Observable implements PropertyChang
return; return;
} }
// Get an ordered list of data source processors to try
List<AutomatedIngestDataSourceProcessor> validDataSourceProcessors = validDataSourceProcessorsMap.entrySet().stream()
.sorted(Map.Entry.<AutomatedIngestDataSourceProcessor, Integer>comparingByValue().reversed())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
synchronized (ingestLock) { synchronized (ingestLock) {
// Try each DSP in decreasing order of confidence
for(AutomatedIngestDataSourceProcessor selectedProcessor:validDataSourceProcessors){
jobLogger.logDataSourceProcessorSelected(selectedProcessor.getDataSourceType());
SYS_LOGGER.log(Level.INFO, "Identified data source type for {0} as {1}", new Object[]{manifestPath, selectedProcessor.getDataSourceType()}); SYS_LOGGER.log(Level.INFO, "Identified data source type for {0} as {1}", new Object[]{manifestPath, selectedProcessor.getDataSourceType()});
try { try {
selectedProcessor.process(dataSource.getDeviceId(), dataSource.getPath(), progressMonitor, callBack); selectedProcessor.process(dataSource.getDeviceId(), dataSource.getPath(), progressMonitor, callBack);
ingestLock.wait();
return;
} catch (AutomatedIngestDataSourceProcessor.AutomatedIngestDataSourceProcessorException ex) { } catch (AutomatedIngestDataSourceProcessor.AutomatedIngestDataSourceProcessorException ex) {
// Log that the current DSP failed and set the error flag. We consider it an error
// if a DSP fails even if a later one succeeds since we expected to be able to process
// the data source which each DSP on the list.
AutoIngestAlertFile.create(caseDirectoryPath); AutoIngestAlertFile.create(caseDirectoryPath);
currentJob.setErrorsOccurred(true); currentJob.setErrorsOccurred(true);
jobLogger.logFailedToAddDataSource(); jobLogger.logDataSourceProcessorError(selectedProcessor.getDataSourceType());
SYS_LOGGER.log(Level.SEVERE, "Exception while processing {0} with data source processor {1}", new Object[]{dataSource.getPath(), selectedProcessor.getDataSourceType()}); SYS_LOGGER.log(Level.SEVERE, "Exception while processing {0} with data source processor {1}", new Object[]{dataSource.getPath(), selectedProcessor.getDataSourceType()});
// rethrow the exception. It will get caught & handled upstream and will result in AIM auto-pause.
throw ex;
} }
ingestLock.wait(); }
// If we get to this point, none of the processors were successful
SYS_LOGGER.log(Level.SEVERE, "All data source processors failed to process {0}", dataSource.getPath());
jobLogger.logFailedToAddDataSource();
// Throw an exception. It will get caught & handled upstream and will result in AIM auto-pause.
throw new AutomatedIngestDataSourceProcessor.AutomatedIngestDataSourceProcessorException("Failed to process " + dataSource.getPath() + " with all data source processors");
} }
} finally { } finally {
currentJob.setDataSourceProcessor(null); currentJob.setDataSourceProcessor(null);