Add node data version checks to AIM

This commit is contained in:
Richard Cordovano 2017-09-27 14:51:07 -04:00
parent 261a8d950f
commit 0aa5557b34
2 changed files with 96 additions and 70 deletions

View File

@ -74,6 +74,16 @@ final class AutoIngestJobNodeData {
private String processingStageDetailsDescription; // 'byte' length used in byte array private String processingStageDetailsDescription; // 'byte' length used in byte array
private long processingStageDetailsStartDate; private long processingStageDetailsStartDate;
/**
* Gets the current version of the auto ingest job coordination service node
* data.
*
* @return The version number.
*/
static int getCurrentVersion() {
return AutoIngestJobNodeData.CURRENT_VERSION;
}
/** /**
* Uses an auto ingest job to construct an object that converts auto ingest * Uses an auto ingest job to construct an object that converts auto ingest
* job data for an auto ingest job coordination service node to and from * job data for an auto ingest job coordination service node to and from
@ -333,7 +343,7 @@ final class AutoIngestJobNodeData {
* Gets the path to the case directory of the case associated with the job. * Gets the path to the case directory of the case associated with the job.
* *
* @return The case directory path or an empty string path if the case * @return The case directory path or an empty string path if the case
* directory has not been created yet. * directory has not been created yet.
*/ */
synchronized Path getCaseDirectoryPath() { synchronized Path getCaseDirectoryPath() {
if (!caseDirectoryPath.isEmpty()) { if (!caseDirectoryPath.isEmpty()) {
@ -536,7 +546,7 @@ final class AutoIngestJobNodeData {
* specified, either a 'byte' or a 'short' will first be read out of the * specified, either a 'byte' or a 'short' will first be read out of the
* buffer which gives the length of the string so it can be properly parsed. * buffer which gives the length of the string so it can be properly parsed.
* *
* @param buffer The buffer from which the string will be read. * @param buffer The buffer from which the string will be read.
* @param lengthType The size of the length data. * @param lengthType The size of the length data.
* *
* @return The string read from the buffer. * @return The string read from the buffer.
@ -570,8 +580,8 @@ final class AutoIngestJobNodeData {
* parsed. * parsed.
* *
* @param stringValue The string to write to the buffer. * @param stringValue The string to write to the buffer.
* @param buffer The buffer to which the string will be written. * @param buffer The buffer to which the string will be written.
* @param lengthType The size of the length data. * @param lengthType The size of the length data.
*/ */
private void putStringIntoBuffer(String stringValue, ByteBuffer buffer, TypeKind lengthType) { private void putStringIntoBuffer(String stringValue, ByteBuffer buffer, TypeKind lengthType) {
switch (lengthType) { switch (lengthType) {

View File

@ -344,7 +344,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang
if (event.shouldRetry() == false) { if (event.shouldRetry() == false) {
synchronized (jobsLock) { synchronized (jobsLock) {
AutoIngestJob job = event.getJob(); AutoIngestJob job = event.getJob();
if(completedJobs.contains(job)) { if (completedJobs.contains(job)) {
completedJobs.remove(job); completedJobs.remove(job);
} }
completedJobs.add(event.getJob()); completedJobs.add(event.getJob());
@ -1123,33 +1123,47 @@ public final class AutoIngestManager extends Observable implements PropertyChang
* shutting down. * shutting down.
*/ */
private void addPendingJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws InterruptedException { private void addPendingJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws InterruptedException {
AutoIngestJob job = new AutoIngestJob(manifest); AutoIngestJob job;
job.setPriority(nodeData.getPriority()); if (nodeData.getVersion() == AutoIngestJobNodeData.getCurrentVersion()) {
job = new AutoIngestJob(nodeData);
Path caseDirectory = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
if (null != caseDirectory) {
job.setCaseDirectoryPath(caseDirectory);
}
} else {
job = new AutoIngestJob(manifest);
job.setPriority(nodeData.getPriority()); // Retain priority, present in all versions of the node data.
Path caseDirectory = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
if (null != caseDirectory) {
job.setCaseDirectoryPath(caseDirectory);
}
/*
* Try to upgrade/update the coordination service node data for
* the job.
*
* An exclusive lock is obtained before doing so because another
* host may have already found the job, obtained an exclusive
* lock, and started processing it. However, this locking does
* make it possible that two processing hosts will both try to
* obtain the lock to do the upgrade operation at the same time.
* If this happens, the host that is holding the lock will
* complete the upgrade operation, so there is nothing more for
* this host to do.
*/
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) {
updateCoordinationServiceNode(job);
}
} catch (CoordinationServiceException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex);
}
}
Path caseDirectory = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName()); Path caseDirectory = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
if (null != caseDirectory) { if (null != caseDirectory) {
job.setCaseDirectoryPath(caseDirectory); job.setCaseDirectoryPath(caseDirectory);
} }
newPendingJobsList.add(job); newPendingJobsList.add(job);
/*
* Try to upgrade/update the coordination service node data for the
* job.
*
* An exclusive lock is obtained before doing so because another
* host may have already found the job, obtained an exclusive lock,
* and started processing it. However, this locking does make it
* possible that two hosts will both try to obtain the lock to do
* the upgrade/update operation at the same time. If this happens,
* the host that is holding the lock will complete the
* update/upgrade operation, so there is nothing more to do.
*/
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) {
AutoIngestManager.this.updateCoordinationServiceNode(job);
}
} catch (CoordinationServiceException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex);
}
} }
/** /**
@ -1180,7 +1194,7 @@ public final class AutoIngestManager extends Observable implements PropertyChang
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) { try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
if (null != manifestLock) { if (null != manifestLock) {
AutoIngestJob job = new AutoIngestJob(manifest); AutoIngestJob job = new AutoIngestJob(manifest);
AutoIngestManager.this.updateCoordinationServiceNode(job); updateCoordinationServiceNode(job);
newPendingJobsList.add(job); newPendingJobsList.add(job);
} }
} catch (CoordinationServiceException ex) { } catch (CoordinationServiceException ex) {
@ -1303,46 +1317,48 @@ public final class AutoIngestManager extends Observable implements PropertyChang
private void addCompletedJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws CoordinationServiceException, InterruptedException { private void addCompletedJob(Manifest manifest, AutoIngestJobNodeData nodeData) throws CoordinationServiceException, InterruptedException {
Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName()); Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, manifest.getCaseName());
if (null != caseDirectoryPath) { if (null != caseDirectoryPath) {
/** AutoIngestJob job;
* We use the manifest rather than the nodeData here to create if (nodeData.getVersion() == AutoIngestJobNodeData.getCurrentVersion()) {
* a new AutoIngestJob instance because the AutoIngestJob job = new AutoIngestJob(nodeData);
* constructor that takes a nodeData expects the nodeData to job.setCaseDirectoryPath(caseDirectoryPath);
* have fields that do not exist in earlier versions. } else {
*/ /**
AutoIngestJob job = new AutoIngestJob(manifest); * Use the manifest rather than the node data here to create
/** * a new AutoIngestJob instance because the AutoIngestJob
* Update the job with the fields that exist in all versions * constructor that takes a node data object expects the
* of the nodeData. * node data to have fields that do not exist in earlier
*/ * versions.
job.setCompletedDate(nodeData.getCompletedDate()); */
job.setErrorsOccurred(nodeData.getErrorsOccurred()); job = new AutoIngestJob(manifest);
job.setPriority(nodeData.getPriority()); job.setCaseDirectoryPath(caseDirectoryPath);
job.setNumberOfCrashes(nodeData.getNumberOfCrashes());
job.setProcessingStage(AutoIngestJob.Stage.COMPLETED, nodeData.getCompletedDate());
job.setCaseDirectoryPath(caseDirectoryPath); /**
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED); * Update the job with the fields that exist in all versions
newCompletedJobsList.add(job); * of the nodeData.
*/
job.setCompletedDate(nodeData.getCompletedDate());
job.setErrorsOccurred(nodeData.getErrorsOccurred());
job.setPriority(nodeData.getPriority());
job.setNumberOfCrashes(nodeData.getNumberOfCrashes());
job.setProcessingStage(AutoIngestJob.Stage.COMPLETED, nodeData.getCompletedDate());
job.setProcessingStatus(AutoIngestJob.ProcessingStatus.COMPLETED);
/* /*
* Try to upgrade/update the coordination service node data for * Try to upgrade/update the coordination service node data
* the job. * for the job. It is possible that two hosts will both try
* * to obtain the lock to do the upgrade operation at the
* An exclusive lock is obtained before doing so because another * same time. If this happens, the host that is holding the
* host may have already found the job, obtained an exclusive * lock will complete the upgrade operation.
* lock, and started processing it. However, this locking does */
* make it possible that two hosts will both try to obtain the try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) {
* lock to do the upgrade/update operation at the same time. If if (null != manifestLock) {
* this happens, the host that is holding the lock will complete updateCoordinationServiceNode(job);
* the update/upgrade operation, so there is nothing more to do. }
*/ } catch (CoordinationServiceException ex) {
try (Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) { SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex);
if (null != manifestLock) {
updateCoordinationServiceNode(job);
} }
} catch (CoordinationServiceException ex) {
SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex);
} }
newCompletedJobsList.add(job);
} else { } else {
SYS_LOGGER.log(Level.WARNING, String.format("Job completed for %s, but cannot find case directory, ignoring job", nodeData.getManifestFilePath())); SYS_LOGGER.log(Level.WARNING, String.format("Job completed for %s, but cannot find case directory, ignoring job", nodeData.getManifestFilePath()));