Merge pull request #4700 from rcordovano/4932-ain-input-scan-performance-fix

Restore less comprehensive manifest file locking behavior in AIM
This commit is contained in:
Richard Cordovano 2019-04-10 10:27:48 -04:00 committed by GitHub
commit 803da45d6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -127,7 +127,6 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
private static final int NUM_INPUT_SCAN_SCHEDULING_THREADS = 1;
private static final String INPUT_SCAN_SCHEDULER_THREAD_NAME = "AIM-input-scan-scheduler-%d";
private static final String INPUT_SCAN_THREAD_NAME = "AIM-input-scan-%d";
private static final int INPUT_SCAN_LOCKING_TIMEOUT_MINS = 5;
private static final String AUTO_INGEST_THREAD_NAME = "AIM-job-processing-%d";
private static final String LOCAL_HOST_NAME = NetworkUtils.getLocalHostName();
private static final String EVENT_CHANNEL_NAME = "Auto-Ingest-Manager-Events";
@ -175,6 +174,12 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
private volatile AutoIngestNodeStateEvent lastPublishedStateEvent;
/**
* Gets the name of the file in a case directory that is used to record the
* manifest file paths for the auto ingest jobs for the case.
*
* @return The file name.
*/
static String getCaseManifestsListFileName() {
return CASE_MANIFESTS_LIST_FILE_NAME;
}
@ -1013,7 +1018,8 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
}
/**
* Sets the error flag for case node data given a case directory path.
* Sets the error flag in the case node data stored in a case directory
* coordination service node.
*
* @param caseDirectoryPath The case directory path.
*
@ -1021,7 +1027,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* scan task is interrupted while blocked,
* i.e., if auto ingest is shutting down.
*/
private void setCaseNodeDataErrorsOccurred(Path caseDirectoryPath) throws InterruptedException {
private void setErrorsOccurredFlagForCase(Path caseDirectoryPath) throws InterruptedException {
try {
CaseNodeData caseNodeData = CaseNodeData.readCaseNodeData(caseDirectoryPath.toString());
caseNodeData.setErrorsOccurred(true);
@ -1158,8 +1164,8 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* @param filePath The path of the file.
* @param attrs The file system attributes of the file.
*
* @return TERMINATE if auto ingest is shutting down, CONTINUE if it has
* not.
* @return TERMINATE if auto ingest is shutting down, CONTINUE
* otherwise.
*/
@Override
public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) {
@ -1169,9 +1175,8 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
try {
/*
* Determine whether or not the file is an auto ingest job
* manifest file. If it is, then parse it. Otherwise, move on to
* the next file in the directory.
* Determine whether or not the file is a manifest file. If it
* is, then parse it.
*/
Manifest manifest = null;
for (ManifestFileParser parser : Lookup.getDefault().lookupAll(ManifestFileParser.class)) {
@ -1195,33 +1200,22 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
if (manifest == null) {
return CONTINUE;
}
/*
* If a manifest file has been found, get a manifest file lock,
* analyze the job state, and put a job into the appropriate job
* list. There is a short wait here in case the input directory
* scanner file visitor of another auto ingest node (AIN) has
* the lock. If the lock ultmiately can't be obtained, the wait
* was not long enough, or another auto ingest node (AIN) is
* holding the lock because it is executing the job, or a case
* deletion task has aquired the lock. In all of these cases the
* manifest can be skipped for this scan.
* If a manifest file has been found, get the corresponding auto
* ingest job state from the manifest file coordination service
* node and put the job in the appropriate jobs list.
*
* There can be a race condition between queuing jobs and case
* deletion. However, in practice eliminating the race condition
* by acquiring a manifest file coordination service lock when
* analyzing job state here appears to have a significant
* performance cost for both input directory scanning and
* dequeuing jobs. Therefore, job state must be checked again
* during job dequeuing, while actually holding the lock, before
* executing the job.
*/
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString(), INPUT_SCAN_LOCKING_TIMEOUT_MINS, TimeUnit.MINUTES)) {
if (null != manifestLock) {
/*
* Now that the lock has been acquired, make sure the
* manifest is still here. This is a way to resolve the
* race condition between this task and case deletion
* tasks without resorting to a protocol using locking
* of the input directory.
*/
if (!filePath.toFile().exists()) {
return CONTINUE;
}
byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString());
String manifestFilePath = manifest.getFilePath().toString();
byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestFilePath);
if (null != rawData && rawData.length > 0) {
AutoIngestJobNodeData nodeData = new AutoIngestJobNodeData(rawData);
AutoIngestJob.ProcessingStatus processingStatus = nodeData.getProcessingStatus();
@ -1230,101 +1224,94 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
addPendingJob(manifest, nodeData);
break;
case PROCESSING:
/*
* If an exclusive manifest file lock was
* obtained for an auto ingest job in the
* processing state, the auto ingest node
* (AIN) executing the job crashed and the
* lock was released when the coordination
* service detected that the AIN was no
* longer alive.
*/
doCrashRecovery(manifest, nodeData);
doRecoveryIfCrashed(manifest, nodeData);
break;
case COMPLETED:
addCompletedJob(manifest, nodeData);
break;
case DELETED:
/*
* Ignore jobs marked as deleted.
*/
break;
default:
sysLogger.log(Level.SEVERE, "Unknown ManifestNodeData.ProcessingStatus");
break;
}
} else {
try {
addNewPendingJob(manifest);
} catch (AutoIngestJobException ex) {
sysLogger.log(Level.SEVERE, String.format("Invalid manifest data for %s", manifest.getFilePath()), ex);
}
}
}
} catch (CoordinationServiceException | AutoIngestJobException | AutoIngestJobNodeData.InvalidDataException ex) {
sysLogger.log(Level.SEVERE, String.format("Error handling manifest at %s", manifest.getFilePath()), ex);
sysLogger.log(Level.SEVERE, String.format("Error visiting %s", filePath), ex);
} catch (InterruptedException ex) {
/*
* The thread running the input directory scan task was
* interrupted while blocked, i.e., auto ingest is shutting
* down.
*/
return TERMINATE;
}
} catch (Exception ex) {
/*
* This is an exception firewall so that an unexpected runtime
* exception from the handling of a single manifest file does
* not take out the input directory scanner.
* exception from the handling of a single file does not stop
* the input directory scan.
*/
sysLogger.log(Level.SEVERE, String.format("Unexpected exception handling %s", filePath), ex);
sysLogger.log(Level.SEVERE, String.format("Unexpected exception visiting %s", filePath), ex);
}
if (!Thread.currentThread().isInterrupted()) {
return CONTINUE;
} else {
sysLogger.log(Level.WARNING, String.format("Auto ingest shut down while visiting %s", filePath));
return TERMINATE;
}
}
/**
* Adds an auto ingest job to the pending jobs queue.
* Adds an existing auto ingest job to the pending jobs queue. If the
* version of the coordination service node data is out of date, it is
* upgraded to the current version.
*
* @param manifest The manifest for the job.
* @param nodeData The data stored in the manifest file lock
* coordination service node for the job.
* @param nodeData The data stored in the manifest file coordination
* service node for the job.
*
* @throws AutoIngestJobException If there was an error working
* with the node data.
* @throws CoordinationServiceException If a lock node data version
* update was required and there
* was an error writing the node
* data by the coordination
* service.
* @throws InterruptedException If the thread running the input
* directory scan task is
* interrupted while blocked, i.e.,
* if auto ingest is shutting down.
*/
private void addPendingJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
private void addPendingJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws AutoIngestJobException, InterruptedException {
AutoIngestJob job;
if (nodeData.getVersion() == AutoIngestJobNodeData.getCurrentVersion()) {
job = new AutoIngestJob(nodeData);
} else {
/*
* Upgrade the auto ingest node data to the current version.
*/
job = new AutoIngestJob(manifest);
job.setPriority(nodeData.getPriority());
Path caseDirectory = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
if (null != caseDirectory) {
job.setCaseDirectoryPath(caseDirectory);
}
/*
* Try to write the upgraded node data to coordination service
* manifest node data for the job. If the lock cannot be
* obtained, assume that the auto ingest node holding the lock
* is taking care of this.
*/
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) {
updateAutoIngestJobData(job);
}
} catch (CoordinationServiceException ex) {
sysLogger.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex);
}
}
newPendingJobsList.add(job);
}
/**
* Adds a new job to the pending jobs queue.
* Adds a new auto ingest job to the pending jobs queue.
*
* @param manifest The manifest for the job.
*
@ -1339,14 +1326,28 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* if auto ingest is shutting down.
*/
private void addNewPendingJob(Manifest manifest) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
/*
* Create the coordination service manifest file node data for the
* job. Getting the lock both guards the writing of the new node
* data and creates the coordination service node if it does not
* already exist. Note that if this auto ingest node cannot get the
* lock, it is assumed that the auto ingest node holding the lock is
* taking care of this. In this case, this auto ingest node will not
* add the new job to its pending queue during this scan of the
* input directory, but it will be picked up during the next scan.
*/
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) {
AutoIngestJob job = new AutoIngestJob(manifest);
updateAutoIngestJobData(job);
newPendingJobsList.add(job);
}
}
}
/**
* Does recovery for an auto ingest job that was left in the processing
* state by an auot ingest node (AIN) that crashed.
* If required, does recovery for an auto ingest job that was left in
* the processing state by an auto ingest node (AIN) that crashed.
*
* @param manifest The manifest for the job.
* @param nodeData The data stored in the manifest file lock
@ -1362,30 +1363,44 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* interrupted while blocked, i.e.,
* if auto ingest is shutting down.
*/
private void doCrashRecovery(Manifest manifest, AutoIngestJobNodeData jobNodeData) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
private void doRecoveryIfCrashed(Manifest manifest, AutoIngestJobNodeData jobNodeData) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
String manifestPath = manifest.getFilePath().toString();
sysLogger.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath);
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifestPath)) {
if (null != manifestLock) {
AutoIngestJob job = new AutoIngestJob(jobNodeData);
if (job.getProcessingStatus() == AutoIngestJob.ProcessingStatus.PROCESSING) {
/*
* If the lock can be obtained with the job status set
* to processing, then an auto ingest node crashed while
* executing the job and was unable to update the job
* status.
*/
sysLogger.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath);
/*
* Try to set the error flags that indicate incomplete or messy data
* in displays for the job and the case. Note that if the job
* crashed before a case directory was created, the job was a no-op,
* so the data quality flags do not need to be set.
* First, try to set the case node data error flag that
* indicates there was an auto ingest job error. If the
* auto ingest node that was executing the job crashed
* before the case directory was created, the job was a
* no-op, so the error flag does not need to be set.
* However, note that if another auto ingest job
* subsequently completed, the failed job may still have
* been a no-op, but in this case the flag will be set
* anyway, because a case directory will be found.
*/
Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
if (null != caseDirectoryPath) {
job.setCaseDirectoryPath(caseDirectoryPath);
job.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
} else {
job.setErrorsOccurred(false);
}
/*
* Update the crash count for the job, determine whether or not to
* retry processing its data source, and deal with the job
* accordingly.
* Update the crash count for the job, determine whether
* or not to retry processing its data source, and deal
* with the job accordingly.
*/
int numberOfCrashes = job.getNumberOfCrashes();
++numberOfCrashes;
@ -1400,8 +1415,6 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
sysLogger.log(Level.SEVERE, String.format("Error writing case auto ingest log entry for crashed job for %s", manifestPath), ex);
}
}
updateAutoIngestJobData(job);
newPendingJobsList.add(job);
} else {
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED);
job.setCompletedDate(Date.from(Instant.now()));
@ -1412,8 +1425,11 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
sysLogger.log(Level.SEVERE, String.format("Error writing case auto ingest log entry for crashed job for %s", manifestPath), ex);
}
}
}
updateAutoIngestJobData(job);
newCompletedJobsList.add(new AutoIngestJob(jobNodeData));
newPendingJobsList.add(job);
}
}
}
}
@ -1426,15 +1442,12 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
*
* @throws AutoIngestJobException If there was an error working
* with the node data.
* @throws CoordinationServiceException If there was an error writing
* updated node data by the
* coordination service.
* @throws InterruptedException If the thread running the input
* directory scan task is
* interrupted while blocked, i.e.,
* if auto ingest is shutting down.
*/
private void addCompletedJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
private void addCompletedJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws AutoIngestJobException, InterruptedException {
Path caseDirectoryPath = nodeData.getCaseDirectoryPath();
if (!caseDirectoryPath.toFile().exists()) {
sysLogger.log(Level.WARNING, String.format("Job completed for %s, but cannot find case directory %s, ignoring job", nodeData.getManifestFilePath(), caseDirectoryPath.toString()));
@ -1446,19 +1459,11 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
job = new AutoIngestJob(nodeData);
job.setCaseDirectoryPath(caseDirectoryPath);
} else {
/**
* Use the manifest rather than the node data here to create a
* new AutoIngestJob instance because the AutoIngestJob
* constructor that takes a node data object expects the node
* data to have fields that do not exist in earlier versions.
/*
* Upgrade the auto ingest node data to the current version.
*/
job = new AutoIngestJob(manifest);
job.setCaseDirectoryPath(caseDirectoryPath);
/**
* Update the job with the fields that exist in all versions of
* the nodeData.
*/
job.setCompletedDate(nodeData.getCompletedDate());
job.setErrorsOccurred(nodeData.getErrorsOccurred());
job.setPriority(nodeData.getPriority());
@ -1466,8 +1471,20 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
job.setProcessingStage(AutoIngestJob.Stage.COMPLETED, nodeData.getCompletedDate());
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED);
/*
* Try to write the upgraded node data to coordination service
* manifest node data for the job. If the lock cannot be
* obtained, assume that the auto ingest node holding the lock
* is taking care of this.
*/
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) {
updateAutoIngestJobData(job);
}
} catch (CoordinationServiceException ex) {
sysLogger.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex);
}
}
newCompletedJobsList.add(job);
}
@ -1964,6 +1981,17 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
}
try {
/*
* There can be a race condition between queuing jobs
* and case deletion. However, in practice eliminating
* the race condition by acquiring a manifest file
* coordination service lock when analyzing job state
* during the input directory scan appears to have a
* significant performance cost for both input directory
* scanning and dequeuing jobs. Therefore, job state
* must be checked again here, while actually holding
* the lock, before executing the job.
*/
AutoIngestJobNodeData nodeData = new AutoIngestJobNodeData(coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestPath.toString()));
if (!nodeData.getProcessingStatus().equals(PENDING)) {
iterator.remove();
@ -2097,7 +2125,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
if (currentJob.isCanceled()) {
Path caseDirectoryPath = currentJob.getCaseDirectoryPath();
if (null != caseDirectoryPath) {
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
AutoIngestJobLogger jobLogger = new AutoIngestJobLogger(manifestPath, currentJob.getManifest().getDataSourceFileName(), caseDirectoryPath);
jobLogger.logJobCancelled();
}
@ -2455,7 +2483,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
if (!dataSource.exists()) {
sysLogger.log(Level.SEVERE, "Missing data source for {0}", manifestPath);
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
jobLogger.logMissingDataSource();
return null;
}
@ -2500,7 +2528,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
// did we find a data source processor that can process the data source
if (validDataSourceProcessors.isEmpty()) {
// This should never happen. We should add all unsupported data sources as logical files.
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
currentJob.setErrorsOccurred(true);
jobLogger.logFailedToIdentifyDataSource();
sysLogger.log(Level.WARNING, "Unsupported data source {0} for {1}", new Object[]{dataSource.getPath(), manifestPath}); // NON-NLS
@ -2535,7 +2563,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
// If we get to this point, none of the processors were successful
sysLogger.log(Level.SEVERE, "All data source processors failed to process {0}", dataSource.getPath());
jobLogger.logFailedToAddDataSource();
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
currentJob.setErrorsOccurred(true);
// Throw an exception. It will get caught & handled upstream and will result in AIM auto-pause.
throw new AutoIngestDataSourceProcessor.AutoIngestDataSourceProcessorException("Failed to process " + dataSource.getPath() + " with all data source processors");
@ -2654,7 +2682,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
if (!cancelledModules.isEmpty()) {
sysLogger.log(Level.WARNING, String.format("Ingest module(s) cancelled for %s", manifestPath));
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
for (String module : snapshot.getCancelledDataSourceIngestModules()) {
sysLogger.log(Level.WARNING, String.format("%s ingest module cancelled for %s", module, manifestPath));
nestedJobLogger.logIngestModuleCancelled(module);
@ -2664,7 +2692,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
} else {
currentJob.setProcessingStage(AutoIngestJob.Stage.CANCELLING, Date.from(Instant.now()));
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
nestedJobLogger.logAnalysisCancelled();
CancellationReason cancellationReason = snapshot.getCancellationReason();
if (CancellationReason.NOT_CANCELLED != cancellationReason && CancellationReason.USER_CANCELLED != cancellationReason) {
@ -2677,13 +2705,13 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
sysLogger.log(Level.SEVERE, String.format("%s ingest module startup error for %s", error.getModuleDisplayName(), manifestPath), error.getThrowable());
}
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
jobLogger.logIngestModuleStartupErrors();
throw new AnalysisStartupException(String.format("Error(s) during ingest module startup for %s", manifestPath));
} else {
sysLogger.log(Level.SEVERE, String.format("Ingest manager ingest job start error for %s", manifestPath), ingestJobStartResult.getStartupException());
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
jobLogger.logAnalysisStartupError();
throw new AnalysisStartupException("Ingest manager error starting job", ingestJobStartResult.getStartupException());
}
@ -2692,7 +2720,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
sysLogger.log(Level.SEVERE, "Ingest job settings error for {0}: {1}", new Object[]{manifestPath, warning});
}
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
jobLogger.logIngestJobSettingsErrors();
throw new AnalysisStartupException("Error(s) in ingest job settings");
}
@ -2775,7 +2803,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
} catch (FileExportException ex) {
sysLogger.log(Level.SEVERE, String.format("Error doing file export for %s", manifestPath), ex);
currentJob.setErrorsOccurred(true);
setCaseNodeDataErrorsOccurred(caseDirectoryPath);
setErrorsOccurredFlagForCase(caseDirectoryPath);
jobLogger.logFileExportError();
}
}