diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboard.java b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboard.java index 2c7a1a9d86..230d504f95 100755 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboard.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboard.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2015-2017 Basis Technology Corp. + * Copyright 2011-2017 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -166,7 +166,7 @@ public final class AutoIngestDashboard extends JPanel implements Observer { * controlling automated ingest for a single node within the cluster. */ private AutoIngestDashboard() { - autoIngestMonitor = AutoIngestMonitor.getInstance(); + autoIngestMonitor = AutoIngestMonitor.createMonitor(); pendingTableModel = new DefaultTableModel(JobsTableModelColumns.headers, 0) { private static final long serialVersionUID = 1L; @@ -524,7 +524,7 @@ public final class AutoIngestDashboard extends JPanel implements Observer { try { autoIngestMonitor.startUp(); autoIngestStarted = true; - } catch (AutoIngestMonitor.AutoIngestMonitorStartupException ex) { + } catch (AutoIngestMonitor.AutoIngestMonitorException ex) { SYS_LOGGER.log(Level.SEVERE, "Dashboard error starting up auto ingest", ex); tbStatusMessage.setText(NbBundle.getMessage(AutoIngestControlPanel.class, "AutoIngestDashboard.AutoIngestStartupError")); autoIngestMonitor = null; @@ -557,7 +557,7 @@ public final class AutoIngestDashboard extends JPanel implements Observer { */ updateExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(UPDATE_TASKS_THREAD_NAME).build()); updateExecutor.submit(new UpdateAllJobsTablesTask()); - autoIngestMonitor.scanInputDirsNow(); + autoIngestMonitor.queryCoordinationService(); //bnPause.setEnabled(true); bnPause.setText(org.openide.util.NbBundle.getMessage(AutoIngestDashboard.class, "AutoIngestDashboard.bnPause.text")); @@ -657,27 +657,6 @@ public final class AutoIngestDashboard extends JPanel implements Observer { case CASE_DELETED: updateExecutor.submit(new UpdateAllJobsTablesTask()); break; - case PAUSED_BY_REQUEST: - EventQueue.invokeLater(() -> { - tbStatusMessage.setText(org.openide.util.NbBundle.getMessage(AutoIngestDashboard.class, "AutoIngestDashboard.bnPause.paused")); - bnRefresh.setEnabled(false); - isPaused = true; - }); - break; - case PAUSED_FOR_SYSTEM_ERROR: - EventQueue.invokeLater(() -> { - tbStatusMessage.setText(org.openide.util.NbBundle.getMessage(AutoIngestDashboard.class, "AutoIngestDashboard.PauseDueToSystemError")); - bnRefresh.setEnabled(false); - pause(false); - isPaused = true; - setServicesStatusMessage(); - }); - break; - case RESUMED: - EventQueue.invokeLater(() -> { - tbStatusMessage.setText(org.openide.util.NbBundle.getMessage(AutoIngestDashboard.class, "AutoIngestDashboard.bnPause.running")); - }); - break; case CASE_PRIORITIZED: updateExecutor.submit(new UpdatePendingJobsTableTask()); break; diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.form b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.form index 63887aceb0..5f3eab1a5f 100755 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.form +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.form @@ -25,4 +25,4 @@ - \ No newline at end of file + diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.java b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.java index cfb2e7e18f..5f2fa268a0 100755 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestDashboardTopComponent.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2017 Basis Technology Corp. + * Copyright 2011 - 2017 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestMonitor.java b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestMonitor.java index 8be3358838..e85915a2b4 100755 --- a/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestMonitor.java +++ b/Experimental/src/org/sleuthkit/autopsy/experimental/autoingest/AutoIngestMonitor.java @@ -1,7 +1,7 @@ /* * Autopsy Forensic Browser * - * Copyright 2017 Basis Technology Corp. + * Copyright 2011-2017 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -21,438 +21,234 @@ package org.sleuthkit.autopsy.experimental.autoingest; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; -import java.io.IOException; -import static java.nio.file.FileVisitOption.FOLLOW_LINKS; -import java.nio.file.FileVisitResult; -import static java.nio.file.FileVisitResult.CONTINUE; -import static java.nio.file.FileVisitResult.TERMINATE; -import java.nio.file.FileVisitor; -import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.BasicFileAttributes; -import java.sql.SQLException; -import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Date; -import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Observable; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; -import java.util.logging.Logger; import javax.annotation.concurrent.GuardedBy; -import org.openide.util.Exceptions; -import org.openide.util.Lookup; import org.sleuthkit.autopsy.coordinationservice.CoordinationService; import org.sleuthkit.autopsy.coordinationservice.CoordinationService.CoordinationServiceException; -import org.sleuthkit.autopsy.core.UserPreferencesException; +import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.NetworkUtils; -import org.sleuthkit.autopsy.events.AutopsyEvent; import org.sleuthkit.autopsy.events.AutopsyEventException; import org.sleuthkit.autopsy.events.AutopsyEventPublisher; -import org.sleuthkit.autopsy.experimental.autoingest.AutoIngestJob.Stage; -import static org.sleuthkit.autopsy.experimental.autoingest.ManifestNodeData.ProcessingStatus.COMPLETED; -import static org.sleuthkit.autopsy.experimental.autoingest.ManifestNodeData.ProcessingStatus.DELETED; -import static org.sleuthkit.autopsy.experimental.autoingest.ManifestNodeData.ProcessingStatus.PENDING; -import static org.sleuthkit.autopsy.experimental.autoingest.ManifestNodeData.ProcessingStatus.PROCESSING; -import org.sleuthkit.autopsy.experimental.configuration.AutoIngestUserPreferences; /** - * - * @author dgrove + * An auto ingest monitor responsible for monitoring and reporting the + * processing of auto ingest jobs. */ public final class AutoIngestMonitor extends Observable implements PropertyChangeListener { - - private static final int NUM_INPUT_SCAN_SCHEDULING_THREADS = 1; - private static final String INPUT_SCAN_SCHEDULER_THREAD_NAME = "AIM-input-scan-scheduler-%d"; - private static final String INPUT_SCAN_THREAD_NAME = "AIM-input-scan-%d"; - private static int DEFAULT_JOB_PRIORITY = 0; - private static final String AUTO_INGEST_THREAD_NAME = "AIM-job-processing-%d"; + + private static final Logger LOGGER = Logger.getLogger(AutoIngestMonitor.class.getName()); + private static final int NUM_COORD_SVC_QUERY_THREADS = 1; + private static final String COORD_SVC_QUERY_THREAD_NAME = "AIM-coord-svc-query-thread-%d"; //NON-NLS + private static final int CORRD_SVC_QUERY_INERVAL_MINS = 5; private static final String LOCAL_HOST_NAME = NetworkUtils.getLocalHostName(); - private static final String EVENT_CHANNEL_NAME = "Auto-Ingest-Manager-Events"; + private static final String EVENT_CHANNEL_NAME = "Auto-Ingest-Manager-Events"; //NON-NLS private static final Set EVENT_LIST = new HashSet<>(Arrays.asList(new String[]{ Event.JOB_STATUS_UPDATED.toString(), Event.JOB_COMPLETED.toString(), Event.CASE_PRIORITIZED.toString(), Event.JOB_STARTED.toString()})); - private static final long JOB_STATUS_EVENT_INTERVAL_SECONDS = 10; - private static final String JOB_STATUS_PUBLISHING_THREAD_NAME = "AIM-job-status-event-publisher-%d"; - private static final long MAX_MISSED_JOB_STATUS_UPDATES = 10; - private static final java.util.logging.Logger SYS_LOGGER = AutoIngestSystemLogger.getLogger(); - - private static AutoIngestMonitor instance; private final AutopsyEventPublisher eventPublisher; - private final Object scanMonitor; - private final ScheduledThreadPoolExecutor inputScanSchedulingExecutor; - private final ExecutorService inputScanExecutor; - private final ExecutorService jobProcessingExecutor; - private final ScheduledThreadPoolExecutor jobStatusPublishingExecutor; - private final ConcurrentHashMap hostNamesToLastMsgTime; - private final ConcurrentHashMap hostNamesToRunningJobs; - private final Object jobsLock; - @GuardedBy("jobsLock") private final Map> casesToManifests; - @GuardedBy("jobsLock") private List pendingJobs; - @GuardedBy("jobsLock") private AutoIngestJob currentJob; - @GuardedBy("jobsLock") private List completedJobs; private CoordinationService coordinationService; - private Path rootInputDirectory; - private Path rootOutputDirectory; - private volatile State state; - private volatile ErrorState errorState; + private final ScheduledThreadPoolExecutor coordSvcQueryExecutor; + private final Object jobsLock; + @GuardedBy("jobsLock") + private JobsSnapshot jobsSnapshot; /** - * Gets a singleton auto ingest monitor responsible for processing auto - * ingest jobs defined by manifest files that can be added to any level of a - * designated input directory tree. + * Creates an auto ingest monitor responsible for monitoring and reporting + * the processing of auto ingest jobs. * - * @return A singleton AutoIngestMonitor instance. + * @return The auto ingest monitor. + * + * @throws AutoIngestMonitorException If the monitor cannot be created. */ - synchronized static AutoIngestMonitor getInstance() { - if (instance == null) { - instance = new AutoIngestMonitor(); - } - return instance; + static AutoIngestMonitor createMonitor() throws AutoIngestMonitorException { + AutoIngestMonitor monitor = new AutoIngestMonitor(); + monitor.startUp(); + return monitor; } - + /** - * Constructs an auto ingest monitor responsible for processing auto ingest - * jobs defined by manifest files that can be added to any level of a - * designated input directory tree. + * Constructs an auto ingest monitor responsible for monitoring and + * reporting the processing of auto ingest jobs. */ - public AutoIngestMonitor() { - SYS_LOGGER.log(Level.INFO, "Initializing auto ingest"); - state = State.IDLE; + private AutoIngestMonitor() { eventPublisher = new AutopsyEventPublisher(); - scanMonitor = new Object(); - inputScanSchedulingExecutor = new ScheduledThreadPoolExecutor(NUM_INPUT_SCAN_SCHEDULING_THREADS, new ThreadFactoryBuilder().setNameFormat(INPUT_SCAN_SCHEDULER_THREAD_NAME).build()); - inputScanExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(INPUT_SCAN_THREAD_NAME).build()); - jobProcessingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(AUTO_INGEST_THREAD_NAME).build()); - jobStatusPublishingExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(JOB_STATUS_PUBLISHING_THREAD_NAME).build()); - hostNamesToRunningJobs = new ConcurrentHashMap<>(); - hostNamesToLastMsgTime = new ConcurrentHashMap<>(); + coordSvcQueryExecutor = new ScheduledThreadPoolExecutor(NUM_COORD_SVC_QUERY_THREADS, new ThreadFactoryBuilder().setNameFormat(COORD_SVC_QUERY_THREAD_NAME).build()); jobsLock = new Object(); - casesToManifests = new HashMap<>(); - pendingJobs = new ArrayList<>(); - completedJobs = new ArrayList<>(); - rootOutputDirectory = Paths.get(AutoIngestUserPreferences.getAutoModeResultsFolder()); - errorState = ErrorState.NONE; } /** - * Starts up auto ingest. + * Starts up the auto ingest monitor. * - * @throws AutoIngestMonitorStartupException if there is a problem starting - * auto ingest. + * @throws AutoIngestMonitorException If there is a problem starting the + * auto ingest monitor. */ - void startUp() throws AutoIngestMonitor.AutoIngestMonitorStartupException { - SYS_LOGGER.log(Level.INFO, "Auto ingest starting"); + private void startUp() throws AutoIngestMonitor.AutoIngestMonitorException { try { coordinationService = CoordinationService.getInstance(); } catch (CoordinationServiceException ex) { - throw new AutoIngestMonitorStartupException("Failed to get coordination service", ex); + throw new AutoIngestMonitorException("Failed to get coordination service", ex); //NON-NLS } try { eventPublisher.openRemoteEventChannel(EVENT_CHANNEL_NAME); - SYS_LOGGER.log(Level.INFO, "Opened auto ingest event channel"); } catch (AutopsyEventException ex) { - SYS_LOGGER.log(Level.SEVERE, "Failed to open auto ingest event channel", ex); - throw new AutoIngestMonitorStartupException("Failed to open auto ingest event channel", ex); + throw new AutoIngestMonitorException("Failed to open auto ingest event channel", ex); //NON-NLS } - rootInputDirectory = Paths.get(AutoIngestUserPreferences.getAutoModeImageFolder()); - rootOutputDirectory = Paths.get(AutoIngestUserPreferences.getAutoModeResultsFolder()); - inputScanSchedulingExecutor.scheduleAtFixedRate(new InputDirScanSchedulingTask(), 0, AutoIngestUserPreferences.getMinutesOfInputScanInterval(), TimeUnit.MINUTES); - jobStatusPublishingExecutor.scheduleAtFixedRate(new PeriodicJobStatusEventTask(), JOB_STATUS_EVENT_INTERVAL_SECONDS, JOB_STATUS_EVENT_INTERVAL_SECONDS, TimeUnit.SECONDS); - eventPublisher.addSubscriber(EVENT_LIST, instance); - state = State.RUNNING; - errorState = ErrorState.NONE; + coordSvcQueryExecutor.scheduleAtFixedRate(new CoordinationServiceQueryTask(), 0, CORRD_SVC_QUERY_INERVAL_MINS, TimeUnit.MINUTES); + eventPublisher.addSubscriber(EVENT_LIST, this); } /** - * Gets the error state of the autop ingest monitor. - * - * @return The error state, may be NONE. + * Shuts down the auto ingest ingest monitor. */ - ErrorState getErrorState() { - return errorState; + void shutDown() { + try { + eventPublisher.removeSubscriber(EVENT_LIST, this); + coordSvcQueryExecutor.shutdownNow(); + while (!coordSvcQueryExecutor.awaitTermination(30, TimeUnit.SECONDS)) { + LOGGER.log(Level.WARNING, "Auto ingest monitor waited at least thirty seconds for coordination service executor to shut down, continuing to wait"); //NON-NLS + } + eventPublisher.closeRemoteEventChannel(); + } catch (InterruptedException ex) { + LOGGER.log(Level.WARNING, "Auto ingest monitor interrupted during shut down", ex); //NON-NLS + } } /** - * Handles auto ingest events published by other auto ingest nodes. + * Handles auto ingest job events published by the auto ingest nodes in an + * auto ingest cluster. * * @param event An auto ingest event from another node. */ @Override public void propertyChange(PropertyChangeEvent event) { - if (event instanceof AutopsyEvent) { - if (((AutopsyEvent) event).getSourceType() == AutopsyEvent.SourceType.REMOTE) { - if (event instanceof AutoIngestJobStartedEvent) { - handleRemoteJobStartedEvent((AutoIngestJobStartedEvent) event); - } else if (event instanceof AutoIngestJobStatusEvent) { - handleRemoteJobStatusEvent((AutoIngestJobStatusEvent) event); - } else if (event instanceof AutoIngestJobCompletedEvent) { - handleRemoteJobCompletedEvent((AutoIngestJobCompletedEvent) event); - } else if (event instanceof AutoIngestCasePrioritizedEvent) { - handleRemoteCasePrioritizationEvent((AutoIngestCasePrioritizedEvent) event); - } else if (event instanceof AutoIngestCaseDeletedEvent) { - handleRemoteCaseDeletedEvent((AutoIngestCaseDeletedEvent) event); - } - } + if (event instanceof AutoIngestJobStartedEvent) { + handleJobStartedEvent((AutoIngestJobStartedEvent) event); + } else if (event instanceof AutoIngestJobStatusEvent) { + handleJobStatusEvent((AutoIngestJobStatusEvent) event); + } else if (event instanceof AutoIngestJobCompletedEvent) { + handleJobCompletedEvent((AutoIngestJobCompletedEvent) event); + } else if (event instanceof AutoIngestCasePrioritizedEvent) { + handleCasePrioritizationEvent((AutoIngestCasePrioritizedEvent) event); + } else if (event instanceof AutoIngestCaseDeletedEvent) { + handleCaseDeletedEvent((AutoIngestCaseDeletedEvent) event); } } /** - * Processes a job started event from another node by removing the job from - * the pending queue, if it is present, and adding the job in the event to - * the collection of jobs running on other hosts. - *

- * Note that the processing stage of the job will be whatever it was when - * the job was serialized for inclusion in the event message. + * Handles an auto ingest job started event. * - * @param event A job started from another auto ingest node. + * @param event A auto ingest job started event. */ - private void handleRemoteJobStartedEvent(AutoIngestJobStartedEvent event) { - String hostName = event.getJob().getNodeName(); - hostNamesToLastMsgTime.put(hostName, Instant.now()); - synchronized (jobsLock) { - Path manifestFilePath = event.getJob().getNodeData().getManifestFilePath(); - for (Iterator iterator = pendingJobs.iterator(); iterator.hasNext();) { - AutoIngestJob pendingJob = iterator.next(); - if (pendingJob.getNodeData().getManifestFilePath().equals(manifestFilePath)) { - iterator.remove(); - break; - } - } - } - hostNamesToRunningJobs.put(event.getJob().getNodeName(), event.getJob()); + private void handleJobStartedEvent(AutoIngestJobStartedEvent event) { + // DLG: Remove job from event from pending queue, if present + // DLG: Add job to running jobs list setChanged(); notifyObservers(Event.JOB_STARTED); } /** - * Processes a job status event from another node by adding the job in the - * event to the collection of jobs running on other hosts. - *

- * Note that the processing stage of the job will be whatever it was when - * the job was serialized for inclusion in the event message. + * Handles an auto ingest job status event. * - * @param event An job status event from another auto ingest node. + * @param event A auto ingest job status event. */ - private void handleRemoteJobStatusEvent(AutoIngestJobStatusEvent event) { - String hostName = event.getJob().getNodeName(); - hostNamesToLastMsgTime.put(hostName, Instant.now()); - hostNamesToRunningJobs.put(hostName, event.getJob()); + private void handleJobStatusEvent(AutoIngestJobStatusEvent event) { + // DLG: Replace job in running list with job from event setChanged(); notifyObservers(Event.JOB_STATUS_UPDATED); } /** - * Processes a job completed event from another node by removing the job in - * the event from the collection of jobs running on other hosts and adding - * it to the list of completed jobs. - *

- * Note that the processing stage of the job will be whatever it was when - * the job was serialized for inclusion in the event message. + * Handles an auto ingest job completed event. * - * @param event An job completed event from another auto ingest node. + * @param event A auto ingest job completed event. */ - private void handleRemoteJobCompletedEvent(AutoIngestJobCompletedEvent event) { - String hostName = event.getJob().getNodeName(); - hostNamesToLastMsgTime.put(hostName, Instant.now()); - hostNamesToRunningJobs.remove(hostName); - if (event.shouldRetry() == false) { - synchronized (jobsLock) { - completedJobs.add(event.getJob()); - } - } - //scanInputDirsNow(); + private void handleJobCompletedEvent(AutoIngestJobCompletedEvent event) { + // DLG: Remove job from event from running list, if present + // DLG: Add job to completed list setChanged(); notifyObservers(Event.JOB_COMPLETED); } /** - * Processes a job/case prioritization event from another node by triggering - * an immediate input directory scan. + * Handles an auto ingest job/case prioritization event. * - * @param event A prioritization event from another auto ingest node. + * @param event A job/case prioritization event. */ - private void handleRemoteCasePrioritizationEvent(AutoIngestCasePrioritizedEvent event) { - String hostName = event.getNodeName(); - hostNamesToLastMsgTime.put(hostName, Instant.now()); - scanInputDirsNow(); + private void handleCasePrioritizationEvent(AutoIngestCasePrioritizedEvent event) { + // DLG: Replace job in pending queue with job from event setChanged(); notifyObservers(Event.CASE_PRIORITIZED); } /** - * Processes a case deletin event from another node by triggering an - * immediate input directory scan. + * Handles a case deletion event. * - * @param event A case deleted event from another auto ingest node. + * @param event A job/case prioritization event. */ - private void handleRemoteCaseDeletedEvent(AutoIngestCaseDeletedEvent event) { - String hostName = event.getNodeName(); - hostNamesToLastMsgTime.put(hostName, Instant.now()); - scanInputDirsNow(); - setChanged(); - notifyObservers(Event.CASE_DELETED); + private void handleCaseDeletedEvent(AutoIngestCaseDeletedEvent event) { + coordSvcQueryExecutor.submit(new CoordinationServiceQueryTask()); } /** - * Gets snapshots of the pending jobs queue, running jobs list, and - * completed jobs list. Any of these collection can be excluded by passing a - * null for the correspioding in/out list parameter. + * Gets the auto ingest monitor's current snapshot of the pending jobs + * queue, running jobs list, and completed jobs list for an auto ingest + * cluster. * - * @param pendingJobs A list to be populated with pending jobs, can be - * null. - * @param runningJobs A list to be populated with running jobs, can be - * null. - * @param completedJobs A list to be populated with competed jobs, can be - * null. + * @return The snapshot. */ - void getJobs(List pendingJobs, List runningJobs, List completedJobs) { + JobsSnapshot getJobsSnapshot() { synchronized (jobsLock) { - if (null != pendingJobs) { - pendingJobs.clear(); - pendingJobs.addAll(this.pendingJobs); - } - if (null != runningJobs) { - runningJobs.clear(); - if (null != currentJob) { - runningJobs.add(currentJob); - } - for (AutoIngestJob job : hostNamesToRunningJobs.values()) { - runningJobs.add(job); - runningJobs.sort(new AutoIngestJob.AlphabeticalComparator()); - } - } - if (null != completedJobs) { - completedJobs.clear(); - completedJobs.addAll(this.completedJobs); - } + return jobsSnapshot; } } /** - * An instance of this runnable is responsible for periodically sending auto - * ingest job status event to remote auto ingest nodes and timing out stale - * remote jobs. The auto ingest job status event is sent only if auto ingest - * monitor has a currently running auto ingest job. + * Makes the auto ingest monitor's refresh its current snapshot of the + * pending jobs queue, running jobs list, and completed jobs list for an + * auto ingest cluster. + * + * @return The refreshed snapshot. */ - private final class PeriodicJobStatusEventTask implements Runnable { - - private final long MAX_SECONDS_WITHOUT_UPDATE = JOB_STATUS_EVENT_INTERVAL_SECONDS * MAX_MISSED_JOB_STATUS_UPDATES; - - private PeriodicJobStatusEventTask() { - SYS_LOGGER.log(Level.INFO, "Periodic status publishing task started"); + JobsSnapshot refreshJobsSnapshot() { + JobsSnapshot newJobsSnapshot = queryCoordinationService(); + synchronized (jobsLock) { + jobsSnapshot = newJobsSnapshot; + return jobsSnapshot; } + } - @Override - public void run() { - - try { - synchronized (jobsLock) { - if (currentJob != null) { - setChanged(); - notifyObservers(AutoIngestMonitor.Event.JOB_STATUS_UPDATED); - eventPublisher.publishRemotely(new AutoIngestJobStatusEvent(currentJob)); - } - - if (AutoIngestUserPreferences.getStatusDatabaseLoggingEnabled()) { - String message; - boolean isError = false; - if (getErrorState().equals(AutoIngestMonitor.ErrorState.NONE)) { - if (currentJob != null) { - message = "Processing " + currentJob.getNodeData().getDataSourceFileName() - + " for case " + currentJob.getNodeData().getCaseName(); - } else { - message = "Paused or waiting for next case"; - } - } else { - message = getErrorState().toString(); - isError = true; - } - try { - StatusDatabaseLogger.logToStatusDatabase(message, isError); - } catch (SQLException | UserPreferencesException ex) { - SYS_LOGGER.log(Level.WARNING, "Failed to update status database", ex); - } - } - } - - // check whether any remote nodes have timed out - for (AutoIngestJob job : hostNamesToRunningJobs.values()) { - if (isStale(hostNamesToLastMsgTime.get(job.getNodeName()))) { - // remove the job from remote job running map. - /* - * NOTE: there is theoretically a check-then-act race - * condition but I don't it's worth introducing another - * lock because of it. If a job status update is - * received after we check the last message fileTime - * stamp (i.e. "check") but before we remove the remote - * job (i.e. "act") then the remote job will get added - * back into hostNamesToRunningJobs as a result of - * processing the job status update. - */ - SYS_LOGGER.log(Level.WARNING, "Auto ingest node {0} timed out while processing folder {1}", - new Object[]{job.getNodeName(), job.getNodeData().getManifestFilePath().toString()}); - hostNamesToRunningJobs.remove(job.getNodeName()); - setChanged(); - notifyObservers(AutoIngestMonitor.Event.JOB_COMPLETED); - } - } - - } catch (Exception ex) { - SYS_LOGGER.log(Level.SEVERE, "Unexpected exception in PeriodicJobStatusEventTask", ex); //NON-NLS + /** + * Gets a new snapshot of the pending jobs queue, running jobs list, and + * completed jobs list for an auto ingest cluster. + * + * @return The snapshot. + */ + private JobsSnapshot queryCoordinationService() { + JobsSnapshot newJobsSnapshot = new JobsSnapshot(); + List nodeList; + try { + nodeList = coordinationService.getNodeList(CoordinationService.CategoryNode.MANIFESTS); + for (String node : nodeList) { + // DLG: Do not need a lock here + // DLG: Get the node data and construct a AutoIngestJobNodeData object (rename ManifestNodeData => AutoIngestJobData) + // DLG: Construct an AutoIngestJob object from the } + return newJobsSnapshot; + } catch (CoordinationServiceException ex) { + LOGGER.log(Level.SEVERE, "Failed to get node list from coordination service", ex); } - - /** - * Determines whether or not the fileTime since the last message from - * node is greater than the maximum acceptable interval between - * messages. - * - * @return True or false. - */ - boolean isStale(Instant lastUpdateTime) { - return (Duration.between(lastUpdateTime, Instant.now()).toMillis() / 1000 > MAX_SECONDS_WITHOUT_UPDATE); - } - } - - /** - * Triggers an immediate scan of the input directories. - */ - void scanInputDirsNow() { - if (State.RUNNING != state) { - return; - } - inputScanExecutor.submit(new InputDirScanTask()); - } - - /** - * Start a scan of the input directories and wait for scan to complete. - */ - void scanInputDirsAndWait() { - if (State.RUNNING != state) { - return; - } - SYS_LOGGER.log(Level.INFO, "Starting input scan of {0}", rootInputDirectory); - InputDirScanner scanner = new InputDirScanner(); - scanner.scan(); - SYS_LOGGER.log(Level.INFO, "Completed input scan of {0}", rootInputDirectory); } /** @@ -461,11 +257,6 @@ public final class AutoIngestMonitor extends Observable implements PropertyChang * @param caseName The name of the case to be prioritized. */ void prioritizeCase(final String caseName) { - - if (state != State.RUNNING) { - return; - } - List prioritizedJobs = new ArrayList<>(); int maxPriority = 0; synchronized (jobsLock) { @@ -486,11 +277,8 @@ public final class AutoIngestMonitor extends Observable implements PropertyChang nodeData.setPriority(maxPriority); coordinationService.setNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestNodePath, nodeData.toArray()); } catch (ManifestNodeDataException ex) { - SYS_LOGGER.log(Level.WARNING, String.format("Unable to use node data for %s", manifestNodePath), ex); } catch (CoordinationServiceException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Coordination service error while prioritizing %s", manifestNodePath), ex); } catch (InterruptedException ex) { - SYS_LOGGER.log(Level.SEVERE, "Unexpected interrupt while updating coordination service node data for {0}", manifestNodePath); } job.getNodeData().setPriority(maxPriority); } @@ -512,10 +300,6 @@ public final class AutoIngestMonitor extends Observable implements PropertyChang * @param manifestPath The manifest file path for the job to be prioritized. */ void prioritizeJob(Path manifestPath) { - if (state != State.RUNNING) { - return; - } - int maxPriority = 0; AutoIngestJob prioritizedJob = null; synchronized (jobsLock) { @@ -535,11 +319,8 @@ public final class AutoIngestMonitor extends Observable implements PropertyChang nodeData.setPriority(maxPriority); coordinationService.setNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestNodePath, nodeData.toArray()); } catch (ManifestNodeDataException ex) { - SYS_LOGGER.log(Level.WARNING, String.format("Unable to use node data for %s", manifestPath), ex); } catch (CoordinationServiceException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Coordination service error while prioritizing %s", manifestNodePath), ex); } catch (InterruptedException ex) { - SYS_LOGGER.log(Level.SEVERE, "Unexpected interrupt while updating coordination service node data for {0}", manifestNodePath); } prioritizedJob.getNodeData().setPriority(maxPriority); } @@ -556,354 +337,34 @@ public final class AutoIngestMonitor extends Observable implements PropertyChang } /** - * Get the current snapshot of the job lists. - * - * @return Snapshot of jobs lists + * A task that queries the coordination service for auto ingest manifest + * node data and converts it to auto ingest jobs for publication top its + * observers. */ - JobsSnapshot getCurrentJobsSnapshot() { - synchronized (jobsLock) { - List runningJobs = new ArrayList<>(); - getJobs(null, runningJobs, null); - return new JobsSnapshot(pendingJobs, runningJobs, completedJobs); - } - } - - /** - * A task that submits an input directory scan task to the input directory - * scan task executor. - */ - private final class InputDirScanSchedulingTask implements Runnable { + private final class CoordinationServiceQueryTask implements Runnable { /** - * Constructs a task that submits an input directory scan task to the - * input directory scan task executor. - */ - private InputDirScanSchedulingTask() { - SYS_LOGGER.log(Level.INFO, "Periodic input scan scheduling task started"); - } - - /** - * Submits an input directory scan task to the input directory scan task - * executor. + * Queries the coordination service for auto ingest manifest node data + * and converts it to auto ingest jobs for publication top its + * observers. */ @Override public void run() { - scanInputDirsNow(); - } - } - - /** - * A task that scans the input directory tree and refreshes the pending jobs - * queue and the completed jobs list. Crashed job recovery is perfomed as - * needed. - */ - private final class InputDirScanTask implements Callable { - - /** - * Scans the input directory tree and refreshes the pending jobs queue - * and the completed jobs list. Crashed job recovery is performed as - * needed. - */ - @Override - public Void call() throws Exception { if (Thread.currentThread().isInterrupted()) { - return null; - } - SYS_LOGGER.log(Level.INFO, "Starting input scan of {0}", rootInputDirectory); - InputDirScanner scanner = new InputDirScanner(); - scanner.scan(); - SYS_LOGGER.log(Level.INFO, "Completed input scan of {0}", rootInputDirectory); - setChanged(); - notifyObservers(Event.INPUT_SCAN_COMPLETED); - return null; - } - - } - - /** - * A FileVisitor that searches the input directories for manifest files. The - * search results are used to refresh the pending jobs queue and the - * completed jobs list. Crashed job recovery is performed as needed. - */ - private final class InputDirScanner { // DLG: Replace with task that calls regularly / reusable by refresh - - private final List newPendingJobsList = new ArrayList<>(); - private final List newCompletedJobsList = new ArrayList<>(); - - /** - * Searches the input directories for manifest files. The search results - * are used to refresh the pending jobs queue and the completed jobs - * list. - */ - private void scan() { - synchronized (jobsLock) { - if (Thread.currentThread().isInterrupted()) { - return; + JobsSnapshot newJobsSnapshot = queryCoordinationService(); + synchronized (jobsLock) { + jobsSnapshot = newJobsSnapshot; } - try { - newPendingJobsList.clear(); - newCompletedJobsList.clear(); - - List manifestList = coordinationService.getNodeList( - CoordinationService.CategoryNode.MANIFESTS); - - for(int i=0; i < manifestList.size(); i++) { - visitFile(Paths.get(manifestList.get(i))); // DLG: Just call CoordinationService - } - - Collections.sort(newPendingJobsList, new AutoIngestJob.PriorityComparator()); - AutoIngestMonitor.this.pendingJobs = newPendingJobsList; - AutoIngestMonitor.this.completedJobs = newCompletedJobsList; - - } catch (Exception ex) { - /* - * NOTE: Need to catch all exceptions here. Otherwise - * uncaught exceptions will propagate up to the calling - * thread and may stop it from running. - */ - SYS_LOGGER.log(Level.SEVERE, String.format("Error scanning the input directory %s", rootInputDirectory), ex); - } - } - synchronized (scanMonitor) { - scanMonitor.notify(); - } - } - - private FileVisitResult visitFile(Path filePath) throws IOException { - if (Thread.currentThread().isInterrupted()) { - return TERMINATE; - } - - Manifest manifest = parseManifestFile(filePath); // DLG: No longer use this - - if (Thread.currentThread().isInterrupted()) { - return TERMINATE; - } - - if(manifest != null) { - String caseName = manifest.getCaseName(); - Path manifestPath = manifest.getFilePath(); - - if (casesToManifests.containsKey(caseName)) { - Set manifestPathSet = casesToManifests.get(caseName); - manifestPathSet.add(manifestPath); - } else { - Set manifestPathSet = new HashSet<>(); - manifestPathSet.add(manifestPath); - casesToManifests.put(caseName, manifestPathSet); - } - - /* - * Add a job to the pending jobs queue, the completed jobs list, - * or do crashed job recovery, as required. - */ - try { - addJob(manifest); - } catch (CoordinationService.CoordinationServiceException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error getting node data for %s", manifestPath), ex); - return CONTINUE; - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return TERMINATE; - } - } - - if (!Thread.currentThread().isInterrupted()) { - return CONTINUE; - } - - return TERMINATE; - } - - private Manifest parseManifestFile(Path filePath) throws IOException { - Manifest manifest = null; - - for (ManifestFileParser parser : Lookup.getDefault().lookupAll(ManifestFileParser.class)) { - if (parser.fileIsManifest(filePath)) { - try { - manifest = parser.parse(filePath); - break; - } catch (ManifestFileParser.ManifestFileParserException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to parse %s with parser %s", filePath, parser.getClass().getCanonicalName()), ex); - } - } - } - - return manifest; - } - - /** - * Add a job to the pending jobs queue, the completed jobs list, - * or do crashed job recovery, as required. - */ - private void addJob(Manifest manifest) throws CoordinationService.CoordinationServiceException, InterruptedException { - Path manifestPath = manifest.getFilePath(); - - byte[] rawData = coordinationService.getNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestPath.toString()); - if (null != rawData) { - try { - ManifestNodeData nodeData = new ManifestNodeData(rawData); - if (nodeData.coordSvcNodeDataWasSet()) { - ManifestNodeData.ProcessingStatus processingStatus = nodeData.getStatus(); - switch (processingStatus) { - case PENDING: - addPendingJob(nodeData); - break; - case PROCESSING: - doRecoveryIfCrashed(nodeData); - break; - case COMPLETED: - addCompletedJob(nodeData); - break; - case DELETED: - // Do nothing - we dont'want to add it to any job list or do recovery - break; - default: - SYS_LOGGER.log(Level.SEVERE, "Unknown ManifestNodeData.ProcessingStatus"); - break; - } - } else { - addNewPendingJob(manifest); - } - } catch(ManifestNodeDataException ex) { - SYS_LOGGER.log(Level.WARNING, String.format("Unable to use node data for %s", manifestPath), ex); - } - } else { - addNewPendingJob(manifest); + setChanged(); + notifyObservers(Event.INPUT_SCAN_COMPLETED); // RJCTODO: Change this event name } } - /** - * Adds a job to process a manifest to the pending jobs queue. - * - * @param nodeData The data stored in the coordination service node for - * the manifest. - */ - private void addPendingJob(ManifestNodeData nodeData) { - Path caseDirectory = PathUtils.findCaseDirectory(rootOutputDirectory, nodeData.getCaseName()); - nodeData.setCompletedDate(new Date(0)); - nodeData.setErrorsOccurred(false); - newPendingJobsList.add(new AutoIngestJob(nodeData, caseDirectory, LOCAL_HOST_NAME, AutoIngestJob.Stage.PENDING)); - } - - /** - * Adds a job to process a manifest to the pending jobs queue. - * - * @param manifest The manifest. - * - * @throws InterruptedException if the thread running the input - * directory scan task is interrupted while - * blocked, i.e., if auto ingest is - * shutting down. - */ - private void addNewPendingJob(Manifest manifest) throws InterruptedException { - // TODO (JIRA-1960): This is something of a hack, grabbing the lock to create the node. - // Is use of Curator.create().forPath() possible instead? - try (CoordinationService.Lock manifestLock = coordinationService.tryGetExclusiveLock(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString())) { - if (null != manifestLock) { - ManifestNodeData newNodeData = new ManifestNodeData(manifest, PENDING, DEFAULT_JOB_PRIORITY, 0, new Date(0), false); - coordinationService.setNodeData(CoordinationService.CategoryNode.MANIFESTS, manifest.getFilePath().toString(), newNodeData.toArray()); - newPendingJobsList.add(new AutoIngestJob(newNodeData, null, LOCAL_HOST_NAME, AutoIngestJob.Stage.PENDING)); - } - } catch (CoordinationService.CoordinationServiceException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifest.getFilePath()), ex); - } - } - - /** - * Does crash recovery for a manifest, if required. The criterion for - * crash recovery is a manifest with coordination service node data - * indicating it is being processed for which an exclusive lock on the - * node can be acquired. If this condition is true, it is probable that - * the node that was processing the job crashed and the processing - * status was not updated. - * - * @param nodeData - * - * @throws InterruptedException if the thread running the input - * directory scan task is interrupted while - * blocked, i.e., if auto ingest is - * shutting down. - */ - private void doRecoveryIfCrashed(ManifestNodeData nodeData) throws InterruptedException { - String manifestPath = nodeData.getManifestFilePath().toString(); - if (nodeData.coordSvcNodeDataWasSet() && ManifestNodeData.ProcessingStatus.PROCESSING == nodeData.getStatus()) { - SYS_LOGGER.log(Level.SEVERE, "Attempting crash recovery for {0}", manifestPath); - int numberOfCrashes = nodeData.getNumberOfCrashes(); - ++numberOfCrashes; - nodeData.setNumberOfCrashes(numberOfCrashes); - nodeData.setCompletedDate(new Date(0)); - nodeData.setErrorsOccurred(true); - if (numberOfCrashes <= AutoIngestUserPreferences.getMaxNumTimesToProcessImage()) { - nodeData.setStatus(PENDING); - Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, nodeData.getCaseName()); - newPendingJobsList.add(new AutoIngestJob(nodeData, caseDirectoryPath, LOCAL_HOST_NAME, AutoIngestJob.Stage.PENDING)); - if (null != caseDirectoryPath) { - try { - AutoIngestAlertFile.create(caseDirectoryPath); - } catch (AutoIngestAlertFile.AutoIngestAlertFileException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error creating alert file for crashed job for %s", manifestPath), ex); - } - try { - new AutoIngestJobLogger(nodeData.getManifestFilePath(), nodeData.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryWithRetry(); - } catch (AutoIngestJobLogger.AutoIngestJobLoggerException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error creating case auto ingest log entry for crashed job for %s", manifestPath), ex); - } - } - } else { - nodeData.setStatus(COMPLETED); - Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, nodeData.getCaseName()); - newCompletedJobsList.add(new AutoIngestJob(nodeData, caseDirectoryPath, LOCAL_HOST_NAME, AutoIngestJob.Stage.COMPLETED)); - if (null != caseDirectoryPath) { - try { - AutoIngestAlertFile.create(caseDirectoryPath); - } catch (AutoIngestAlertFile.AutoIngestAlertFileException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error creating alert file for crashed job for %s", manifestPath), ex); - } - try { - new AutoIngestJobLogger(nodeData.getManifestFilePath(), nodeData.getDataSourceFileName(), caseDirectoryPath).logCrashRecoveryNoRetry(); - } catch (AutoIngestJobLogger.AutoIngestJobLoggerException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error creating case auto ingest log entry for crashed job for %s", manifestPath), ex); - } - } - } - try { - coordinationService.setNodeData(CoordinationService.CategoryNode.MANIFESTS, manifestPath, nodeData.toArray()); - } catch (CoordinationService.CoordinationServiceException ex) { - SYS_LOGGER.log(Level.SEVERE, String.format("Error attempting to set node data for %s", manifestPath), ex); - } - } - } - - /** - * Adds a job to process a manifest to the completed jobs list. - * - * @param nodeData The data stored in the coordination service node for - * the manifest. - */ - private void addCompletedJob(ManifestNodeData nodeData) { - Path caseDirectoryPath = PathUtils.findCaseDirectory(rootOutputDirectory, nodeData.getCaseName()); - if (null != caseDirectoryPath) { - newCompletedJobsList.add(new AutoIngestJob(nodeData, caseDirectoryPath, LOCAL_HOST_NAME, AutoIngestJob.Stage.COMPLETED)); - } else { - SYS_LOGGER.log(Level.WARNING, String.format("Job completed for %s, but cannot find case directory, ignoring job", nodeData.getManifestFilePath())); - } - } - } - - /* - * The possible states of an auto ingest monitor. - */ - private enum State { - IDLE, - RUNNING, - SHUTTING_DOWN; } /* * Events published by an auto ingest monitor. The events are published - * locally to auto ingest monitor clients that register as observers and are - * broadcast to other auto ingest nodes. + * locally to auto ingest monitor clients that register as observers. */ enum Event { @@ -912,104 +373,153 @@ public final class AutoIngestMonitor extends Observable implements PropertyChang JOB_STATUS_UPDATED, JOB_COMPLETED, CASE_PRIORITIZED, - CASE_DELETED, - PAUSED_BY_REQUEST, - PAUSED_FOR_SYSTEM_ERROR, - RESUMED - } - - /** - * The current auto ingest error state. - */ - private enum ErrorState { - NONE("None"), - COORDINATION_SERVICE_ERROR("Coordination service error"), - SHARED_CONFIGURATION_DOWNLOAD_ERROR("Shared configuration download error"), - SERVICES_MONITOR_COMMUNICATION_ERROR("Services monitor communication error"), - DATABASE_SERVER_ERROR("Database server error"), - KEYWORD_SEARCH_SERVER_ERROR("Keyword search server error"), - CASE_MANAGEMENT_ERROR("Case management error"), - ANALYSIS_STARTUP_ERROR("Analysis startup error"), - FILE_EXPORT_ERROR("File export error"), - ALERT_FILE_ERROR("Alert file error"), - JOB_LOGGER_ERROR("Job logger error"), - DATA_SOURCE_PROCESSOR_ERROR("Data source processor error"), - UNEXPECTED_EXCEPTION("Unknown error"); - - private final String desc; - - private ErrorState(String desc) { - this.desc = desc; - } - - @Override - public String toString() { - return desc; - } + CASE_DELETED } /** * A snapshot of the pending jobs queue, running jobs list, and completed - * jobs list. + * jobs list for an auto ingest cluster. */ static final class JobsSnapshot { - private final List pendingJobs; - private final List runningJobs; - private final List completedJobs; + private final Set pendingJobs = new HashSet<>(); + private final Set runningJobs = new HashSet<>(); + private final Set completedJobs = new HashSet<>(); /** - * Constructs a snapshot of the pending jobs queue, running jobs list, - * and completed jobs list. + * Gets the snapshot of the pending jobs queue for an auto ingest + * cluster. * - * @param pendingJobs The pending jobs queue. - * @param runningJobs The running jobs list. - * @param completedJobs The cmopleted jobs list. - */ - private JobsSnapshot(List pendingJobs, List runningJobs, List completedJobs) { - this.pendingJobs = new ArrayList<>(pendingJobs); - this.runningJobs = new ArrayList<>(runningJobs); - this.completedJobs = new ArrayList<>(completedJobs); - } - - /** - * Gets the snapshot of the pending jobs queue. - * - * @return The jobs collection. + * @return The pending jobs queue. */ List getPendingJobs() { - return Collections.unmodifiableList(this.pendingJobs); + return new ArrayList<>(this.pendingJobs); } /** - * Gets the snapshot of the running jobs list. + * Gets the snapshot of the running jobs list for an auto ingest + * cluster. * - * @return The jobs collection. + * @return The running jobs list. */ List getRunningJobs() { - return Collections.unmodifiableList(this.runningJobs); + return new ArrayList<>(this.runningJobs); } /** - * Gets the snapshot of the completed jobs list. + * Gets the snapshot of the completed jobs list for an auto ingest + * cluster. * - * @return The jobs collection. + * @return The completed jobs list. */ List getCompletedJobs() { - return Collections.unmodifiableList(this.completedJobs); + return new ArrayList<>(this.completedJobs); + } + + /** + * Adds an auto job to the snapshot of the pending jobs queue for an + * auto ingest cluster. If an equivalent job already exists, it is + * removed. + * + * @param job The job. + */ + private void addOrReplacePendingJob(AutoIngestJob job) { + addOrReplaceJob(this.pendingJobs, job); + } + + /** + * Removes a job, if present, in the snapshot of the pending jobs queue + * for an auto ingest cluster. + * + * @param job The auot ingest job. + */ + private void removePendingJob(AutoIngestJob job) { + this.pendingJobs.remove(job); + } + + /** + * Adds an auto job to the snapshot of the running jobs list for an auto + * ingest cluster. If an equivalent job already exists, it is removed. + * + * @param job The job. + */ + private void addOrReplaceRunningJob(AutoIngestJob job) { + addOrReplaceJob(this.runningJobs, job); + } + + /** + * Removes a job, if present, in the snapshot of the running jobs list + * for an auto ingest cluster. + * + * @param job The auot ingest job. + */ + private void removeRunningJob(AutoIngestJob job) { + this.runningJobs.remove(job); + } + + /** + * Adds an auto job to the snapshot of the completed jobs list for an + * auto ingest cluster. If an equivalent job already exists, it is + * removed. + * + * @param job The job. + */ + private void addOrReplaceCompletedJob(AutoIngestJob job) { + addOrReplaceJob(this.completedJobs, job); + } + + /** + * Removes a job, if present, in the snapshot of the completed jobs list + * for an auto ingest cluster. + * + * @param job The auot ingest job. + */ + private void removeCompletedJob(AutoIngestJob job) { + this.pendingJobs.remove(job); + } + + /** + * Adds a job to a set. If an equivalent job already exists, it is + * removed. + * + * @param jobSet A set of auto ingest jobs. + * @param job The auto ingest job to add. + */ + private static void addOrReplaceJob(Set jobSet, AutoIngestJob job) { + if (jobSet.contains(job)) { + jobSet.remove(job); + } + jobSet.add(job); } } - static final class AutoIngestMonitorStartupException extends Exception { + /** + * Exception type thrown when there is an error completing an auto ingest + * monitor operation. + */ + static final class AutoIngestMonitorException extends Exception { private static final long serialVersionUID = 1L; - private AutoIngestMonitorStartupException(String message) { + /** + * Constructs an instance of the exception type thrown when there is an + * error completing an auto ingest monitor operation. + * + * @param message The exception message. + */ + private AutoIngestMonitorException(String message) { super(message); } - private AutoIngestMonitorStartupException(String message, Throwable cause) { + /** + * Constructs an instance of the exception type thrown when there is an + * error completing an auto ingest monitor operation. + * + * @param message The exception message. + * @param cause A Throwable cause for the error. + */ + private AutoIngestMonitorException(String message, Throwable cause) { super(message, cause); }