Merge stashed changes into reduced AIN locking

This commit is contained in:
Richard Cordovano 2019-04-09 14:20:47 -04:00
parent d2877fd06a
commit 0b05ef5e07

View File

@ -174,6 +174,12 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
private volatile AutoIngestNodeStateEvent lastPublishedStateEvent; private volatile AutoIngestNodeStateEvent lastPublishedStateEvent;
/**
* Gets the file name used for a listing of the manifest file paths for a
* case in the case directory.
*
* @return The file name.
*/
static String getCaseManifestsListFileName() { static String getCaseManifestsListFileName() {
return CASE_MANIFESTS_LIST_FILE_NAME; return CASE_MANIFESTS_LIST_FILE_NAME;
} }
@ -1012,7 +1018,7 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
} }
/** /**
* Sets the error flag for case node data given a case directory path. * Sets the error flag in the node data for a case.
* *
* @param caseDirectoryPath The case directory path. * @param caseDirectoryPath The case directory path.
* *
@ -1151,26 +1157,25 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
/** /**
* Creates a pending or completed auto ingest job if the file visited is * Creates a pending or completed auto ingest job if the file visited is
* a manifest file, based on the data stored in the coordination service * a manifest file.
* node for the manifest.
* *
* @param filePath The path of the file. * @param filePath The path of the file.
* @param attrs The file system attributes of the file. * @param attrs The file system attributes of the file.
* *
* @return TERMINATE if auto ingest is shutting down, CONTINUE if it has * @return TERMINATE if auto ingest is shutting down, CONTINUE
* not. * otherwise.
*/ */
@Override @Override
public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) { public FileVisitResult visitFile(Path filePath, BasicFileAttributes attrs) {
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
sysLogger.log(Level.WARNING, String.format("Auto ingest shut down while visiting %s", filePath));
return TERMINATE; return TERMINATE;
} }
try { try {
/* /*
* Determine whether or not the file is an auto ingest job * Determine whether or not the file is a manifest file. If it
* manifest file. If it is, then parse it. Otherwise, move on to * is, then parse it.
* the next file in the directory.
*/ */
Manifest manifest = null; Manifest manifest = null;
for (ManifestFileParser parser : Lookup.getDefault().lookupAll(ManifestFileParser.class)) { for (ManifestFileParser parser : Lookup.getDefault().lookupAll(ManifestFileParser.class)) {
@ -1183,30 +1188,22 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
} }
} }
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
sysLogger.log(Level.WARNING, String.format("Auto ingest shut down while visiting %s", filePath));
return TERMINATE; return TERMINATE;
} }
} }
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
sysLogger.log(Level.WARNING, String.format("Auto ingest shut down while visiting %s", filePath));
return TERMINATE; return TERMINATE;
} }
if (manifest == null) {
return CONTINUE;
}
/* /*
* If a manifest file has been found, analyze the job state and * If a manifest file has been found, get the corresponding auto
* put a job into the appropriate job list. Note that there can * ingest job state from the manifest file coordination service
* be a race condition between this analysis and case deletion. * node and put the job in the appropriate jobs list.
* However, in practice eliminating the race condition by
* acquiring a manifest file coordination service lock at this
* point seems to have a significant performance cost for both
* input directory scanning and dequeuing jobs. Therefore,
* locking is not done here, with the consequence that the job
* state must be checked again before executing the job.
*/ */
try { if (manifest != null) {
byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString()); byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString());
if (null != rawData && rawData.length > 0) { if (null != rawData && rawData.length > 0) {
AutoIngestJobNodeData nodeData = new AutoIngestJobNodeData(rawData); AutoIngestJobNodeData nodeData = new AutoIngestJobNodeData(rawData);
@ -1224,69 +1221,64 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* released when the coordination service * released when the coordination service
* detected that the AIN was no longer alive. * detected that the AIN was no longer alive.
*/ */
doCrashRecovery(manifest, nodeData); doRecoveryIfCrashed(manifest, nodeData);
break; break;
case COMPLETED: case COMPLETED:
addCompletedJob(manifest, nodeData); addCompletedJob(manifest, nodeData);
break; break;
case DELETED: case DELETED:
/*
* Ignore jobs marked as deleted.
*/
break; break;
default: default:
sysLogger.log(Level.SEVERE, "Unknown ManifestNodeData.ProcessingStatus"); sysLogger.log(Level.SEVERE, "Unknown ManifestNodeData.ProcessingStatus");
break; break;
} }
} else { } else {
try { addNewPendingJob(manifest);
addNewPendingJob(manifest);
} catch (AutoIngestJobException ex) {
sysLogger.log(Level.SEVERE, String.format("Invalid manifest data for %s", manifest.getFilePath()), ex);
}
} }
} catch (CoordinationServiceException | AutoIngestJobException | AutoIngestJobNodeData.InvalidDataException ex) {
sysLogger.log(Level.SEVERE, String.format("Error handling manifest at %s", manifest.getFilePath()), ex);
} catch (InterruptedException ex) {
/*
* The thread running the input directory scan task was
* interrupted while blocked, i.e., auto ingest is shutting
* down.
*/
Thread.currentThread().interrupt();
return TERMINATE;
} }
} catch (CoordinationServiceException | AutoIngestJobException | AutoIngestJobNodeData.InvalidDataException ex) {
sysLogger.log(Level.SEVERE, String.format("Error visiting %s", filePath), ex);
} catch (InterruptedException ex) {
/*
* The thread running the input directory scan task was
* interrupted while blocked, i.e., auto ingest is shutting
* down.
*/
sysLogger.log(Level.WARNING, String.format("Auto ingest shut down while visiting %s", filePath), ex);
return TERMINATE;
} catch (Exception ex) { } catch (Exception ex) {
/* /*
* This is an exception firewall so that an unexpected runtime * This is an exception firewall so that an unexpected runtime
* exception from the handling of a single manifest file does * exception from the handling of a single manifest file does
* not take out the input directory scanner. * not take out the input directory scanner.
*/ */
sysLogger.log(Level.SEVERE, String.format("Unexpected exception handling %s", filePath), ex); sysLogger.log(Level.SEVERE, String.format("Unexpected exception visiting %s", filePath), ex);
} }
if (!Thread.currentThread().isInterrupted()) { if (!Thread.currentThread().isInterrupted()) {
return CONTINUE; return CONTINUE;
} else { } else {
sysLogger.log(Level.WARNING, String.format("Auto ingest shut down while visiting %s", filePath));
return TERMINATE; return TERMINATE;
} }
} }
/** /**
* Adds an auto ingest job to the pending jobs queue. * Adds an auto ingest job to the pending jobs queue. If the version of
* the coordination service node data is out of date, it is upgraded to
* the current version.
* *
* @param manifest The manifest for the job. * @param manifest The manifest for the job.
* @param nodeData The data stored in the manifest file lock * @param nodeData The data stored in the manifest file coordination
* coordination service node for the job. * service node for the job.
* *
* @throws AutoIngestJobException If there was an error working * @throws AutoIngestJobException If there was an error working
* with the node data. * with the node data.
* @throws CoordinationServiceException If a lock node data version * @throws CoordinationServiceException If a node data version update
* update was required and there * was required and there was an
* was an error writing the node * error writing the node data by
* data by the coordination * the coordination service.
* service.
* @throws InterruptedException If the thread running the input * @throws InterruptedException If the thread running the input
* directory scan task is * directory scan task is
* interrupted while blocked, i.e., * interrupted while blocked, i.e.,
@ -1345,11 +1337,8 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
*/ */
private void addNewPendingJob(Manifest manifest) throws AutoIngestJobException, CoordinationServiceException, InterruptedException { private void addNewPendingJob(Manifest manifest) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
/* /*
* Create the coordination service manifest node data for the job. * Create the coordination service manifest file node data for the
* Note that getting the lock will create the node for the job (with * job. An exclusive lock is obtained before creating the node data
* no data) if it does not already exist.
*
* An exclusive lock is obtained before creating the node data
* because another host may have already found the job, obtained an * because another host may have already found the job, obtained an
* exclusive lock, and started processing it. However, this locking * exclusive lock, and started processing it. However, this locking
* does make it possible that two hosts will both try to obtain the * does make it possible that two hosts will both try to obtain the
@ -1357,6 +1346,9 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* happens, the host that is locked out will not add the job to its * happens, the host that is locked out will not add the job to its
* pending queue for this scan of the input directory, but it will * pending queue for this scan of the input directory, but it will
* be picked up on the next scan. * be picked up on the next scan.
*
* Note that getting the node lock will create the node for the job
* (with no data) if it does not already exist.
*/ */
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) { try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) { if (null != manifestLock) {
@ -1387,58 +1379,75 @@ final class AutoIngestManager extends Observable implements PropertyChangeListen
* interrupted while blocked, i.e., * interrupted while blocked, i.e.,
* if auto ingest is shutting down. * if auto ingest is shutting down.
*/ */
private void doCrashRecovery(Manifest manifest, AutoIngestJobNodeData jobNodeData) throws AutoIngestJobException, CoordinationServiceException, InterruptedException { private void doRecoveryIfCrashed(Manifest manifest, AutoIngestJobNodeData jobNodeData) throws AutoIngestJobException, CoordinationServiceException, InterruptedException {
/*
* Try to get an exclusive lock on the coordination service node for
* the job. If the lock cannot be obtained, another host in the auto
* ingest cluster is either processing the job or is already doing
* the crash recovery, so there is nothing to do.
*/
String manifestPath = manifest.getFilePath().toString(); String manifestPath = manifest.getFilePath().toString();
sysLogger.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath); try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifestPath)) {
AutoIngestJob job = new AutoIngestJob(jobNodeData); if (null != manifestLock) {
/*
* If the lock can be obtained with the job state set to
* PROCESSING, then the job is crashed.
*/
AutoIngestJob job = new AutoIngestJob(jobNodeData);
if (job.getProcessingStatus() == AutoIngestJob.ProcessingStatus.PROCESSING) {
sysLogger.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath);
/* /*
* Try to set the error flags that indicate incomplete or messy data * Try to set the error flags that indicate incomplete
* in displays for the job and the case. Note that if the job * or messy data in displays for the job and the case.
* crashed before a case directory was created, the job was a no-op, * Note that if the job crashed before a case directory
* so the data quality flags do not need to be set. * was created, the job was a no-op, so the data quality
*/ * flags do not need to be set.
Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName()); */
if (null != caseDirectoryPath) { Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
job.setCaseDirectoryPath(caseDirectoryPath); if (null != caseDirectoryPath) {
job.setErrorsOccurred(true); job.setCaseDirectoryPath(caseDirectoryPath);
setCaseNodeDataErrorsOccurred(caseDirectoryPath); job.setErrorsOccurred(true);
} else { setCaseNodeDataErrorsOccurred(caseDirectoryPath);
job.setErrorsOccurred(false); } else {
} job.setErrorsOccurred(false);
}
/* /*
* Update the crash count for the job, determine whether or not to * Update the crash count for the job, determine whether
* retry processing its data source, and deal with the job * or not to retry processing its data source, and deal
* accordingly. * with the job accordingly.
*/ */
int numberOfCrashes = job.getNumberOfCrashes(); int numberOfCrashes = job.getNumberOfCrashes();
++numberOfCrashes; ++numberOfCrashes;
job.setNumberOfCrashes(numberOfCrashes); job.setNumberOfCrashes(numberOfCrashes);
if (numberOfCrashes < AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) { if (numberOfCrashes < AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) {
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.PENDING); job.setProcessingStatus(AutoIngestJob.ProcessingStatus.PENDING);
job.setCompletedDate(new Date(0)); job.setCompletedDate(new Date(0));
if (null != caseDirectoryPath) { if (null != caseDirectoryPath) {
try { try {
new AutoIngestJobLogger(manifest.getFilePath(), manifest.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryWithRetry(); new AutoIngestJobLogger(manifest.getFilePath(), manifest.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryWithRetry();
} catch (AutoIngestJobLoggerException ex) { } catch (AutoIngestJobLoggerException ex) {
sysLogger.log(Level.SEVERE, String.format("Error writing case auto ingest log entry for crashed job for %s", manifestPath), ex); sysLogger.log(Level.SEVERE, String.format("Error writing case auto ingest log entry for crashed job for %s", manifestPath), ex);
}
}
} else {
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED);
job.setCompletedDate(Date.from(Instant.now()));
if (null != caseDirectoryPath) {
try {
new AutoIngestJobLogger(manifest.getFilePath(), manifest.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryNoRetry();
} catch (AutoIngestJobLoggerException ex) {
sysLogger.log(Level.SEVERE, String.format("Error writing case auto ingest log entry for crashed job for %s", manifestPath), ex);
}
}
}
updateAutoIngestJobData(job);
newPendingJobsList.add(job);
} }
} }
updateAutoIngestJobData(job); } catch (CoordinationServiceException ex) {
newPendingJobsList.add(job); sysLogger.log(Level.SEVERE, String.format("Error attempting to get exclusive lock for %s", manifestPath), ex);
} else {
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED);
job.setCompletedDate(Date.from(Instant.now()));
if (null != caseDirectoryPath) {
try {
new AutoIngestJobLogger(manifest.getFilePath(), manifest.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryNoRetry();
} catch (AutoIngestJobLoggerException ex) {
sysLogger.log(Level.SEVERE, String.format("Error writing case auto ingest log entry for crashed job for %s", manifestPath), ex);
}
}
updateAutoIngestJobData(job);
newCompletedJobsList.add(new AutoIngestJob(jobNodeData));
} }
} }