Rewrote portions of 'doRecoveryIfCrashed()' to fix bugs.

This commit is contained in:
U-BASIS\dgrove 2017-11-01 16:25:15 -04:00
parent 764e0a0007
commit a19b32e389

View File

@ -37,7 +37,6 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
@ -59,13 +58,11 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe;
import org.openide.util.Exceptions;
import org.openide.util.Lookup; import org.openide.util.Lookup;
import org.sleuthkit.autopsy.casemodule.Case; import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.autopsy.casemodule.Case.CaseType; import org.sleuthkit.autopsy.casemodule.Case.CaseType;
import org.sleuthkit.autopsy.casemodule.CaseActionException; import org.sleuthkit.autopsy.casemodule.CaseActionException;
import org.sleuthkit.autopsy.casemodule.CaseDetails;
import org.sleuthkit.autopsy.casemodule.CaseMetadata; import org.sleuthkit.autopsy.casemodule.CaseMetadata;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService; import org.sleuthkit.autopsy.coordinationservice.CoordinationService;
import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CoordinationServiceException; import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CoordinationServiceException;
@ -101,7 +98,6 @@ import org.sleuthkit.autopsy.ingest.IngestJobSettings;
import org.sleuthkit.autopsy.ingest.IngestJobStartResult; import org.sleuthkit.autopsy.ingest.IngestJobStartResult;
import org.sleuthkit.autopsy.ingest.IngestManager; import org.sleuthkit.autopsy.ingest.IngestManager;
import org.sleuthkit.autopsy.ingest.IngestModuleError; import org.sleuthkit.autopsy.ingest.IngestModuleError;
import org.sleuthkit.datamodel.Content;
/** /**
* An auto ingest manager is responsible for processing auto ingest jobs defined * An auto ingest manager is responsible for processing auto ingest jobs defined
@ -503,7 +499,6 @@ public final class AutoIngestManager extends Observable implements PropertyChang
} }
SYS_LOGGER.log(Level.INFO, "Starting input scan of {0}", rootInputDirectory); SYS_LOGGER.log(Level.INFO, "Starting input scan of {0}", rootInputDirectory);
InputDirScanner scanner = new InputDirScanner(); InputDirScanner scanner = new InputDirScanner();
scanner.scan(); scanner.scan();
SYS_LOGGER.log(Level.INFO, "Completed input scan of {0}", rootInputDirectory); SYS_LOGGER.log(Level.INFO, "Completed input scan of {0}", rootInputDirectory);
} }
@ -559,12 +554,10 @@ public final class AutoIngestManager extends Observable implements PropertyChang
if (!prioritizedJobs.isEmpty()) { if (!prioritizedJobs.isEmpty()) {
++maxPriority; ++maxPriority;
for (AutoIngestJob job : prioritizedJobs) { for (AutoIngestJob job : prioritizedJobs) {
int oldPriority = job.getPriority();
job.setPriority(maxPriority);
try { try {
this.updateCoordinationServiceNode(job); this.updateCoordinationServiceNode(job);
job.setPriority(maxPriority);
} catch (CoordinationServiceException | InterruptedException ex) { } catch (CoordinationServiceException | InterruptedException ex) {
job.setPriority(oldPriority);
throw new AutoIngestManagerException("Error updating case priority", ex); throw new AutoIngestManagerException("Error updating case priority", ex);
} }
} }
@ -615,14 +608,12 @@ public final class AutoIngestManager extends Observable implements PropertyChang
*/ */
if (null != prioritizedJob) { if (null != prioritizedJob) {
++maxPriority; ++maxPriority;
int oldPriority = prioritizedJob.getPriority();
prioritizedJob.setPriority(maxPriority);
try { try {
this.updateCoordinationServiceNode(prioritizedJob); this.updateCoordinationServiceNode(prioritizedJob);
} catch (CoordinationServiceException | InterruptedException ex) { } catch (CoordinationServiceException | InterruptedException ex) {
prioritizedJob.setPriority(oldPriority);
throw new AutoIngestManagerException("Error updating job priority", ex); throw new AutoIngestManagerException("Error updating job priority", ex);
} }
prioritizedJob.setPriority(maxPriority);
} }
Collections.sort(pendingJobs, new AutoIngestJob.PriorityComparator()); Collections.sort(pendingJobs, new AutoIngestJob.PriorityComparator());
@ -1051,8 +1042,8 @@ public final class AutoIngestManager extends Observable implements PropertyChang
if (null != manifest) { if (null != manifest) {
/* /*
* Update the mapping of case names to manifest paths that * Update the mapping of case names to manifest paths that is
* is used for case deletion. * used for case deletion.
*/ */
String caseName = manifest.getCaseName(); String caseName = manifest.getCaseName();
Path manifestPath = manifest.getFilePath(); Path manifestPath = manifest.getFilePath();
@ -1066,8 +1057,8 @@ public final class AutoIngestManager extends Observable implements PropertyChang
} }
/* /*
* Add a job to the pending jobs queue, the completed jobs * Add a job to the pending jobs queue, the completed jobs list,
* list, or do crashed job recovery, as required. * or do crashed job recovery, as required.
*/ */
try { try {
byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestPath.toString()); byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestPath.toString());
@ -1247,48 +1238,38 @@ public final class AutoIngestManager extends Observable implements PropertyChang
if (null != manifestLock) { if (null != manifestLock) {
SYS_LOGGER.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath); SYS_LOGGER.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath);
try { try {
Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
/* /*
* Create the recovery job. * Create the recovery job.
*/ */
AutoIngestJob job = new AutoIngestJob(nodeData); AutoIngestJob job = new AutoIngestJob(nodeData);
int numberOfCrashes = job.getNumberOfCrashes(); int numberOfCrashes = job.getNumberOfCrashes();
if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) {
++numberOfCrashes; ++numberOfCrashes;
job.setNumberOfCrashes(numberOfCrashes); job.setNumberOfCrashes(numberOfCrashes);
if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) {
job.setCompletedDate(new Date(0)); job.setCompletedDate(new Date(0));
Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName()); } else {
job.setCompletedDate(Date.from(Instant.now()));
}
}
if (null != caseDirectoryPath) { if (null != caseDirectoryPath) {
job.setCaseDirectoryPath(caseDirectoryPath); job.setCaseDirectoryPath(caseDirectoryPath);
job.setErrorsOccurred(true); job.setErrorsOccurred(true);
try {
/*
* Write the alert file and do the logging.
*/
AutoIngestAlertFile.create(caseDirectoryPath);
} catch (AutoIngestAlertFileException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error creating alert file for crashed job for %s", manifestPath), ex);
}
} else { } else {
job.setErrorsOccurred(false); job.setErrorsOccurred(false);
} }
/*
* Update the coordination service node for the job. If
* this fails, leave the recovery to another host.
*/
try {
updateCoordinationServiceNode(job);
if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) {
newPendingJobsList.add(job);
} else {
newCompletedJobsList.add(new AutoIngestJob(nodeData));
}
} catch (CoordinationServiceException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifestPath), ex);
return;
}
/*
* Write the alert file and do the logging.
*/
if (null != caseDirectoryPath) {
try {
AutoIngestAlertFile.create(nodeData.getCaseDirectoryPath());
} catch (AutoIngestAlertFileException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error creating alert file for crashed job for %s", manifestPath), ex);
}
}
if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) { if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) {
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.PENDING); job.setProcessingStatus(AutoIngestJob.ProcessingStatus.PENDING);
if (null != caseDirectoryPath) { if (null != caseDirectoryPath) {
@ -1302,13 +1283,32 @@ public final class AutoIngestManager extends Observable implements PropertyChang
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED); job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED);
if (null != caseDirectoryPath) { if (null != caseDirectoryPath) {
try { try {
new AutoIngestJobLogger(manifest.getFilePath(), manifest.getDataSourceFileName(), nodeData.getCaseDirectoryPath()).logCrashRecoveryNoRetry(); new AutoIngestJobLogger(manifest.getFilePath(), manifest.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryNoRetry();
} catch (AutoIngestJobLoggerException ex) { } catch (AutoIngestJobLoggerException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error creating case auto ingest log entry for crashed job for %s", manifestPath), ex); SYS_LOGGER.log(Level.SEVERE, String.format("Error creating case auto ingest log entry for crashed job for %s", manifestPath), ex);
} }
} }
} }
/*
* Update the coordination service node for the job. If
* this fails, leave the recovery to another host.
*/
try {
updateCoordinationServiceNode(job);
} catch (CoordinationServiceException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifestPath), ex);
return;
}
nodeData = new AutoIngestJobNodeData(job);
if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) {
newPendingJobsList.add(job);
} else {
newCompletedJobsList.add(new AutoIngestJob(nodeData));
}
} finally { } finally {
try { try {
manifestLock.release(); manifestLock.release();
@ -1459,7 +1459,6 @@ public final class AutoIngestManager extends Observable implements PropertyChang
*/ */
private final class JobProcessingTask implements Runnable { private final class JobProcessingTask implements Runnable {
private static final String AUTO_INGEST_MODULE_OUTPUT_DIR = "AutoIngest";
private final Object ingestLock; private final Object ingestLock;
private final Object pauseLock; private final Object pauseLock;
@GuardedBy("pauseLock") @GuardedBy("pauseLock")
@ -2126,7 +2125,8 @@ public final class AutoIngestManager extends Observable implements PropertyChang
Case.openAsCurrentCase(metadataFilePath.toString()); Case.openAsCurrentCase(metadataFilePath.toString());
} else { } else {
caseDirectoryPath = PathUtils.createCaseFolderPath(rootOutputDirectory, caseName); caseDirectoryPath = PathUtils.createCaseFolderPath(rootOutputDirectory, caseName);
Case.createAsCurrentCase(caseDirectoryPath.toString(), caseName, "", "", CaseType.MULTI_USER_CASE); CaseDetails caseDetails = new CaseDetails(caseName, "", "", "", "", "");
Case.createAsCurrentCase(CaseType.MULTI_USER_CASE, caseDirectoryPath.toString(), caseDetails);
/* /*
* Sleep a bit before releasing the lock to ensure * Sleep a bit before releasing the lock to ensure
* that the new case folder is visible on the * that the new case folder is visible on the
@ -2222,7 +2222,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang
return; return;
} }
DataSource dataSource = identifyDataSource(caseForJob); DataSource dataSource = identifyDataSource();
if (null == dataSource) { if (null == dataSource) {
currentJob.setProcessingStage(AutoIngestJob.Stage.COMPLETED, Date.from(Instant.now())); currentJob.setProcessingStage(AutoIngestJob.Stage.COMPLETED, Date.from(Instant.now()));
return; return;
@ -2275,7 +2275,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang
* interrupted while blocked, i.e., * interrupted while blocked, i.e.,
* if auto ingest is shutting down. * if auto ingest is shutting down.
*/ */
private DataSource identifyDataSource(Case caseForJob) throws AutoIngestAlertFileException, AutoIngestJobLoggerException, InterruptedException { private DataSource identifyDataSource() throws AutoIngestAlertFileException, AutoIngestJobLoggerException, InterruptedException {
Manifest manifest = currentJob.getManifest(); Manifest manifest = currentJob.getManifest();
Path manifestPath = manifest.getFilePath(); Path manifestPath = manifest.getFilePath();
SYS_LOGGER.log(Level.INFO, "Identifying data source for {0} ", manifestPath); SYS_LOGGER.log(Level.INFO, "Identifying data source for {0} ", manifestPath);
@ -2317,29 +2317,22 @@ public final class AutoIngestManager extends Observable implements PropertyChang
SYS_LOGGER.log(Level.INFO, "Adding data source for {0} ", manifestPath); SYS_LOGGER.log(Level.INFO, "Adding data source for {0} ", manifestPath);
currentJob.setProcessingStage(AutoIngestJob.Stage.ADDING_DATA_SOURCE, Date.from(Instant.now())); currentJob.setProcessingStage(AutoIngestJob.Stage.ADDING_DATA_SOURCE, Date.from(Instant.now()));
UUID taskId = UUID.randomUUID(); UUID taskId = UUID.randomUUID();
DataSourceProcessorCallback callBack = new AddDataSourceCallback(caseForJob, dataSource, taskId); DataSourceProcessorCallback callBack = new AddDataSourceCallback(caseForJob, dataSource, taskId, ingestLock);
DataSourceProcessorProgressMonitor progressMonitor = new DoNothingDSPProgressMonitor(); DataSourceProcessorProgressMonitor progressMonitor = new DoNothingDSPProgressMonitor();
Path caseDirectoryPath = currentJob.getCaseDirectoryPath(); Path caseDirectoryPath = currentJob.getCaseDirectoryPath();
AutoIngestJobLogger jobLogger = new AutoIngestJobLogger(manifestPath, manifest.getDataSourceFileName(), caseDirectoryPath); AutoIngestJobLogger jobLogger = new AutoIngestJobLogger(manifestPath, manifest.getDataSourceFileName(), caseDirectoryPath);
try { try {
caseForJob.notifyAddingDataSource(taskId); caseForJob.notifyAddingDataSource(taskId);
// lookup all AutomatedIngestDataSourceProcessors Map<AutoIngestDataSourceProcessor, Integer> validDataSourceProcessorsMap;
Collection<? extends AutoIngestDataSourceProcessor> processorCandidates = Lookup.getDefault().lookupAll(AutoIngestDataSourceProcessor.class);
Map<AutoIngestDataSourceProcessor, Integer> validDataSourceProcessorsMap = new HashMap<>();
for (AutoIngestDataSourceProcessor processor : processorCandidates) {
try { try {
int confidence = processor.canProcess(dataSource.getPath()); // lookup all AutomatedIngestDataSourceProcessors and poll which ones are able to process the current data source
if (confidence > 0) { validDataSourceProcessorsMap = DataSourceProcessorUtility.getDataSourceProcessor(dataSource.getPath());
validDataSourceProcessorsMap.put(processor, confidence);
}
} catch (AutoIngestDataSourceProcessor.AutoIngestDataSourceProcessorException ex) { } catch (AutoIngestDataSourceProcessor.AutoIngestDataSourceProcessorException ex) {
SYS_LOGGER.log(Level.SEVERE, "Exception while determining whether data source processor {0} can process {1}", new Object[]{processor.getDataSourceType(), dataSource.getPath()}); SYS_LOGGER.log(Level.SEVERE, "Exception while determining best data source processor for {0}", dataSource.getPath());
// rethrow the exception. It will get caught & handled upstream and will result in AIM auto-pause. // rethrow the exception. It will get caught & handled upstream and will result in AIM auto-pause.
throw ex; throw ex;
} }
}
// did we find a data source processor that can process the data source // did we find a data source processor that can process the data source
if (validDataSourceProcessorsMap.isEmpty()) { if (validDataSourceProcessorsMap.isEmpty()) {
@ -2596,79 +2589,6 @@ public final class AutoIngestManager extends Observable implements PropertyChang
} }
} }
/**
* A "callback" that collects the results of running a data source
* processor on a data source and unblocks the job processing thread
* when the data source processor finishes running in its own thread.
*/
@Immutable
class AddDataSourceCallback extends DataSourceProcessorCallback {
private final Case caseForJob;
private final DataSource dataSourceInfo;
private final UUID taskId;
/**
* Constructs a "callback" that collects the results of running a
* data source processor on a data source and unblocks the job
* processing thread when the data source processor finishes running
* in its own thread.
*
* @param caseForJob The case for the current job.
* @param dataSourceInfo The data source
* @param taskId The task id to associate with ingest job
* events.
*/
AddDataSourceCallback(Case caseForJob, DataSource dataSourceInfo, UUID taskId) {
this.caseForJob = caseForJob;
this.dataSourceInfo = dataSourceInfo;
this.taskId = taskId;
}
/**
* Called by the data source processor when it finishes running in
* its own thread.
*
* @param result The result code for the processing of
* the data source.
* @param errorMessages Any error messages generated during the
* processing of the data source.
* @param dataSourceContent The content produced by processing the
* data source.
*/
@Override
public void done(DataSourceProcessorCallback.DataSourceProcessorResult result, List<String> errorMessages, List<Content> dataSourceContent) {
if (!dataSourceContent.isEmpty()) {
caseForJob.notifyDataSourceAdded(dataSourceContent.get(0), taskId);
} else {
caseForJob.notifyFailedAddingDataSource(taskId);
}
dataSourceInfo.setDataSourceProcessorOutput(result, errorMessages, dataSourceContent);
dataSourceContent.addAll(dataSourceContent);
synchronized (ingestLock) {
ingestLock.notify();
}
}
/**
* Called by the data source processor when it finishes running in
* its own thread, if that thread is the AWT (Abstract Window
* Toolkit) event dispatch thread (EDT).
*
* @param result The result code for the processing of
* the data source.
* @param errorMessages Any error messages generated during the
* processing of the data source.
* @param dataSourceContent The content produced by processing the
* data source.
*/
@Override
public void doneEDT(DataSourceProcessorCallback.DataSourceProcessorResult result, List<String> errorMessages, List<Content> dataSources) {
done(result, errorMessages, dataSources);
}
}
/** /**
* A data source processor progress monitor does nothing. There is * A data source processor progress monitor does nothing. There is
* currently no mechanism for showing or recording data source processor * currently no mechanism for showing or recording data source processor
@ -3009,48 +2929,6 @@ public final class AutoIngestManager extends Observable implements PropertyChang
FULLY_DELETED FULLY_DELETED
} }
@ThreadSafe
private static final class DataSource {
private final String deviceId;
private final Path path;
private DataSourceProcessorResult resultCode;
private List<String> errorMessages;
private List<Content> content;
DataSource(String deviceId, Path path) {
this.deviceId = deviceId;
this.path = path;
}
String getDeviceId() {
return deviceId;
}
Path getPath() {
return this.path;
}
synchronized void setDataSourceProcessorOutput(DataSourceProcessorResult result, List<String> errorMessages, List<Content> content) {
this.resultCode = result;
this.errorMessages = new ArrayList<>(errorMessages);
this.content = new ArrayList<>(content);
}
synchronized DataSourceProcessorResult getResultDataSourceProcessorResultCode() {
return resultCode;
}
synchronized List<String> getDataSourceProcessorErrorMessages() {
return new ArrayList<>(errorMessages);
}
synchronized List<Content> getContent() {
return new ArrayList<>(content);
}
}
static final class AutoIngestManagerException extends Exception { static final class AutoIngestManagerException extends Exception {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;