From 51f080de9aef1902615008dc4e07f26f6d1cca56 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Tue, 9 Dec 2014 09:55:31 -0500 Subject: [PATCH 1/3] Remove dead code from IngestManager and start refactoring --- .../autopsy/ingest/IngestManager.java | 38 +++++-------------- 1 file changed, 9 insertions(+), 29 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index a336dd9bd9..3027002b3f 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -70,11 +70,18 @@ public class IngestManager { private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); private final ConcurrentHashMap ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots. private final ConcurrentHashMap ingestModuleRunTimes = new ConcurrentHashMap<>(); - private final Object processedFilesSnapshotLock = new Object(); - private ProcessedFilesSnapshot processedFilesSnapshot = new ProcessedFilesSnapshot(); private volatile IngestMessageTopComponent ingestMessageBox; private int numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; + /** + * These static fields are used for the creation and management of ingest + * jobs in progress. + */ + private static volatile boolean jobCreationIsEnabled; + private static final AtomicLong nextJobId = new AtomicLong(0L); + private static final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); + + /** * Ingest job events. */ @@ -388,10 +395,6 @@ public class IngestManager { IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId()); ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap); - synchronized (processedFilesSnapshotLock) { - processedFilesSnapshot.incrementProcessedFilesCount(); - } - incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); } @@ -782,27 +785,4 @@ public class IngestManager { } } - static final class ProcessedFilesSnapshot { - - private final Date startTime; - private long processedFilesCount; - - ProcessedFilesSnapshot() { - this.startTime = new Date(); - this.processedFilesCount = 0; - } - - void incrementProcessedFilesCount() { - ++processedFilesCount; - } - - Date getStartTime() { - return startTime; - } - - long getProcessedFilesCount() { - return processedFilesCount; - } - } - } From fb12ca7adc309e087538ff09eed750771c2d44b0 Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Tue, 9 Dec 2014 14:38:22 -0500 Subject: [PATCH 2/3] Refactor IngestJob and IngestManager in preparation for API work --- .../sleuthkit/autopsy/ingest/IngestJob.java | 135 ++++------------ .../autopsy/ingest/IngestManager.java | 147 ++++++++++++++---- .../ingest/IngestProgressSnapshotPanel.java | 2 +- 3 files changed, 154 insertions(+), 130 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index cd1e6e3480..98dbbb3930 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import javax.swing.JOptionPane; import org.netbeans.api.progress.ProgressHandle; @@ -45,35 +44,16 @@ final class IngestJob { private static final Logger logger = Logger.getLogger(IngestJob.class.getName()); /** - * The task scheduler singleton is responsible for creating and scheduling - * the ingest tasks that make up ingest jobs. - */ - private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance(); - - /** - * These static fields are used for the creation and management of ingest - * jobs in progress. - */ - private static volatile boolean jobCreationIsEnabled; - private static final AtomicLong nextJobId = new AtomicLong(0L); - private static final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); - - /** - * These fields define the ingest job, including its ingest pipelines. Note - * that there is a collection of multiple copies of the file ingest - * pipeline, one for each file ingest thread. + * These fields define the ingest job: a unique ID supplied by the ingest + * manager, the user's ingest job settings, and the data source to be + * processed. */ private final long id; - private final Content dataSource; private final IngestJobSettings ingestJobSettings; - private final Object dataSourceIngestPipelineLock; - private DataSourceIngestPipeline firstStageDataSourceIngestPipeline; - private DataSourceIngestPipeline secondStageDataSourceIngestPipeline; - private DataSourceIngestPipeline currentDataSourceIngestPipeline; - private final LinkedBlockingQueue fileIngestPipelines; + private final Content dataSource; /** - * An ingest runs in stages. + * An ingest job runs in stages. */ private static enum Stages { @@ -99,6 +79,28 @@ final class IngestJob { private Stages stage; private final Object stageCompletionCheckLock; + /** + * There is a data source level ingest modules pipeline for both the first + * stage and the second stage. Longer running, lower priority data source + * level ingest modules belong in the second stage pipeline. + */ + private final Object dataSourceIngestPipelineLock; + private DataSourceIngestPipeline firstStageDataSourceIngestPipeline; + private DataSourceIngestPipeline secondStageDataSourceIngestPipeline; + private DataSourceIngestPipeline currentDataSourceIngestPipeline; + + /** + * There is a collection of identical file level ingest module pipelines, + * one for each file level ingest thread in the ingest manager. + */ + private final LinkedBlockingQueue fileIngestPipelines; + + /** + * The task scheduler singleton is responsible for creating and scheduling + * the ingest tasks that make up this ingest jobs. + */ + private static final IngestTasksScheduler taskScheduler = IngestTasksScheduler.getInstance(); + /** * These fields are used to provide data source level task progress bars for * the job. @@ -128,70 +130,6 @@ final class IngestJob { */ private final long startTime; - /** - * Enables and disables ingest job creation. - * - * @param enabled True or false. - */ - static void jobCreationEnabled(boolean enabled) { - IngestJob.jobCreationIsEnabled = enabled; - } - - /** - * Starts an ingest job for a data source. - * - * @param dataSource The data source to ingest. - * @param settings The settings for the job. - * @return A collection of ingest module start up errors, empty on success. - */ - static List startJob(Content dataSource, IngestJobSettings settings) { - List errors = new ArrayList<>(); - if (IngestJob.jobCreationIsEnabled) { - long jobId = nextJobId.incrementAndGet(); - IngestJob job = new IngestJob(jobId, dataSource, settings); - IngestJob.jobsById.put(jobId, job); - errors = job.start(settings.getEnabledIngestModuleTemplates()); - if (errors.isEmpty() && job.hasIngestPipeline()) { - IngestManager.getInstance().fireIngestJobStarted(jobId); - IngestJob.logger.log(Level.INFO, "Ingest job {0} started", jobId); - } else { - IngestJob.jobsById.remove(jobId); - } - } - return errors; - } - - /** - * Queries whether or not ingest jobs are running. - * - * @return True or false. - */ - static boolean ingestJobsAreRunning() { - return !jobsById.isEmpty(); - } - - /** - * Gets snapshots of the state of all running ingest jobs. - * - * @return A list of ingest job state snapshots. - */ - static List getJobSnapshots() { - List snapShots = new ArrayList<>(); - for (IngestJob job : IngestJob.jobsById.values()) { - snapShots.add(job.getSnapshot()); - } - return snapShots; - } - - /** - * Cancels all running ingest jobs. - */ - static void cancelAllJobs() { - for (IngestJob job : jobsById.values()) { - job.cancel(); - } - } - /** * Constructs an ingest job. * @@ -200,10 +138,9 @@ final class IngestJob { * @param processUnallocatedSpace Whether or not unallocated space should be * processed during the ingest job. */ - private IngestJob(long id, Content dataSource, IngestJobSettings settings) { + IngestJob(long id, Content dataSource, IngestJobSettings settings) { this.id = id; this.dataSource = dataSource; - this.ingestJobSettings = settings; this.dataSourceIngestPipelineLock = new Object(); this.fileIngestPipelines = new LinkedBlockingQueue<>(); @@ -575,7 +512,7 @@ final class IngestJob { * * @return A collection of ingest module startup errors, empty on success. */ - private List start(List ingestModuleTemplates) { + List start(List ingestModuleTemplates) { this.createIngestPipelines(ingestModuleTemplates); List errors = startUpIngestPipelines(); if (errors.isEmpty()) { @@ -738,7 +675,7 @@ final class IngestJob { * * @return True or false. */ - private boolean hasIngestPipeline() { + boolean hasIngestPipeline() { return this.hasFirstStageDataSourceIngestPipeline() || this.hasFileIngestPipeline() || this.hasSecondStageDataSourceIngestPipeline(); @@ -952,14 +889,7 @@ final class IngestJob { } } - IngestJob.jobsById.remove(this.id); - if (!this.isCancelled()) { - logger.log(Level.INFO, "Ingest job {0} completed", this.id); - IngestManager.getInstance().fireIngestJobCompleted(this.id); - } else { - logger.log(Level.INFO, "Ingest job {0} cancelled", this.id); - IngestManager.getInstance().fireIngestJobCancelled(this.id); - } + IngestManager.getInstance().finishJob(this); } /** @@ -986,9 +916,8 @@ final class IngestJob { * * @return An ingest job statistics object. */ - private IngestJobSnapshot getSnapshot() { + IngestJobSnapshot getSnapshot() { return new IngestJobSnapshot(); - } /** diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 3027002b3f..20480b92a9 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -52,36 +52,80 @@ import org.sleuthkit.datamodel.Content; */ public class IngestManager { + private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); + private static IngestManager instance = null; + + /** + * When ingest job creation is enabled, the ingest manager assigns a unique + * ID to each ingest job and maintains a mapping of job IDs to jobs. + */ + private volatile boolean jobCreationIsEnabled; + private final AtomicLong nextJobId = new AtomicLong(0L); + private final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); + + /** + * Each runnable/callable task the ingest manager farms out to a thread pool + * is given a unique thread/task ID. + */ + private final AtomicLong nextThreadId = new AtomicLong(0L); + + /** + * Ingest jobs are started on a pool thread by ingest job starters. A + * mapping of thread/task IDs to the result objects associated with each + * ingest job starter is maintained to provide handles that can be used to + * cancel the ingest job starter. + */ + private final ConcurrentHashMap> ingestJobStarters = new ConcurrentHashMap<>(); + private final ExecutorService startIngestJobsThreadPool = Executors.newSingleThreadExecutor(); + + /** + * Ingest jobs use an ingest task scheduler to break themselves down into + * data source level and file level tasks. The ingest scheduler puts these + * ingest tasks into queues for execution on ingest manager pool threads by + * ingest task executers. There is a single data source level ingest thread + * and a user configurable number of file level ingest threads. + */ private static final int MIN_NUMBER_OF_FILE_INGEST_THREADS = 1; private static final int MAX_NUMBER_OF_FILE_INGEST_THREADS = 16; private static final int DEFAULT_NUMBER_OF_FILE_INGEST_THREADS = 2; - private static final int MAX_ERROR_MESSAGE_POSTS = 200; - private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); - private static IngestManager instance = null; - private final PropertyChangeSupport ingestJobEventPublisher = new PropertyChangeSupport(IngestManager.class); - private final PropertyChangeSupport ingestModuleEventPublisher = new PropertyChangeSupport(IngestManager.class); - private final IngestMonitor ingestMonitor = new IngestMonitor(); - private final ExecutorService startIngestJobsThreadPool = Executors.newSingleThreadExecutor(); + private int numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; private final ExecutorService dataSourceIngestThreadPool = Executors.newSingleThreadExecutor(); private final ExecutorService fileIngestThreadPool; - private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor(); - private final AtomicLong nextThreadId = new AtomicLong(0L); - private final ConcurrentHashMap> ingestJobStarters = new ConcurrentHashMap<>(); // Maps thread ids to cancellation handles. - private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); - private final ConcurrentHashMap ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots. - private final ConcurrentHashMap ingestModuleRunTimes = new ConcurrentHashMap<>(); - private volatile IngestMessageTopComponent ingestMessageBox; - private int numberOfFileIngestThreads = DEFAULT_NUMBER_OF_FILE_INGEST_THREADS; /** - * These static fields are used for the creation and management of ingest - * jobs in progress. + * The ingest manager uses the property change feature from Java Beans as an + * event publishing mechanism. There are two kinds of events, ingest job + * events and ingest module events. Property changes are fired by ingest + * event publishers on a pool thread. */ - private static volatile boolean jobCreationIsEnabled; - private static final AtomicLong nextJobId = new AtomicLong(0L); - private static final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); + private final PropertyChangeSupport ingestJobEventPublisher = new PropertyChangeSupport(IngestManager.class); + private final PropertyChangeSupport ingestModuleEventPublisher = new PropertyChangeSupport(IngestManager.class); + private final ExecutorService fireIngestEventsThreadPool = Executors.newSingleThreadExecutor(); + + /** + * The ingest manager uses an ingest monitor to determine when system + * resources are under pressure. If the monitor detects such a situation, it + * calls back to the ingest manager to cancel all ingest jobs in progress. + */ + private final IngestMonitor ingestMonitor = new IngestMonitor(); + + /** + * The ingest manager provides access to a top component that is used by + * ingest module to post messages for the user. A count of the posts is used + * as a cap to avoid bogging down the application. + */ + private static final int MAX_ERROR_MESSAGE_POSTS = 200; + private volatile IngestMessageTopComponent ingestMessageBox; + private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); + + /** + * The ingest manager supports reporting of ingest processing progress by + * collecting snapshots of the activities of the ingest threads, ingest job + * progress, and ingest module run times. + */ + private final ConcurrentHashMap ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots. + private final ConcurrentHashMap ingestModuleRunTimes = new ConcurrentHashMap<>(); - /** * Ingest job events. */ @@ -192,7 +236,7 @@ public class IngestManager { * @return True or false. */ public boolean isIngestRunning() { - return IngestJob.ingestJobsAreRunning(); + return !this.jobsById.isEmpty(); } /** @@ -205,7 +249,9 @@ public class IngestManager { } // Cancel all the jobs already created. - IngestJob.cancelAllJobs(); + for (IngestJob job : this.jobsById.values()) { + job.cancel(); + } } /** @@ -336,12 +382,12 @@ public class IngestManager { } void handleCaseOpened() { - IngestJob.jobCreationEnabled(true); + this.jobCreationIsEnabled = true; clearIngestMessageBox(); } void handleCaseClosed() { - IngestJob.jobCreationEnabled(false); + this.jobCreationIsEnabled = false; cancelAllIngestJobs(); clearIngestMessageBox(); } @@ -353,6 +399,42 @@ public class IngestManager { ingestErrorMessagePosts.set(0); } + /** + * Starts an ingest job for a data source. + * + * @param dataSource The data source to ingest. + * @param settings The settings for the job. + * @return A collection of ingest module start up errors, empty on success. + */ + private List startJob(Content dataSource, IngestJobSettings settings) { + List errors = new ArrayList<>(); + if (this.jobCreationIsEnabled) { + long jobId = this.nextJobId.incrementAndGet(); + IngestJob job = new IngestJob(jobId, dataSource, settings); + this.jobsById.put(jobId, job); + errors = job.start(settings.getEnabledIngestModuleTemplates()); + if (errors.isEmpty() && job.hasIngestPipeline()) { + this.fireIngestJobStarted(jobId); + IngestManager.logger.log(Level.INFO, "Ingest job {0} started", jobId); + } else { + this.jobsById.remove(jobId); + } + } + return errors; + } + + void finishJob(IngestJob job) { + long jobId = job.getId(); + this.jobsById.remove(jobId); + if (!job.isCancelled()) { + IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); + this.fireIngestJobCompleted(jobId); + } else { + IngestManager.logger.log(Level.INFO, "Ingest job {0} cancelled", jobId); + this.fireIngestJobCancelled(jobId); + } + } + /** * Called each time a module in a data source pipeline starts * @@ -535,6 +617,19 @@ public class IngestManager { } } + /** + * Gets snapshots of the state of all running ingest jobs. + * + * @return A list of ingest job state snapshots. + */ + List getIngestJobSnapshots() { + List snapShots = new ArrayList<>(); + for (IngestJob job : this.jobsById.values()) { + snapShots.add(job.getSnapshot()); + } + return snapShots; + } + /** * Creates and starts an ingest job, i.e., processing by ingest modules, for * each data source in a collection of data sources. @@ -611,7 +706,7 @@ public class IngestManager { /** * Start an ingest job for this data source. */ - List errors = IngestJob.startJob(dataSource, this.settings); + List errors = IngestManager.this.startJob(dataSource, this.settings); if (!errors.isEmpty() && this.doStartupErrorsMsgBox) { StringBuilder moduleStartUpErrors = new StringBuilder(); for (IngestModuleError error : errors) { diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java index 792a6a4e7f..47042c8335 100755 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestProgressSnapshotPanel.java @@ -170,7 +170,7 @@ public class IngestProgressSnapshotPanel extends javax.swing.JPanel { } private void refresh() { - jobSnapshots = IngestJob.getJobSnapshots(); + jobSnapshots = IngestManager.getInstance().getIngestJobSnapshots(); fireTableDataChanged(); } From 194963862e8a7fd24b170af47c49e229964797cf Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Wed, 10 Dec 2014 09:26:20 -0500 Subject: [PATCH 3/3] Add comment to IngestManager --- .../sleuthkit/autopsy/ingest/IngestManager.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 20480b92a9..3e3f70c473 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -56,10 +56,9 @@ public class IngestManager { private static IngestManager instance = null; /** - * When ingest job creation is enabled, the ingest manager assigns a unique - * ID to each ingest job and maintains a mapping of job IDs to jobs. + * The ingest manager assigns a unique ID to each ingest job and maintains a + * mapping of job IDs to jobs. */ - private volatile boolean jobCreationIsEnabled; private final AtomicLong nextJobId = new AtomicLong(0L); private final ConcurrentHashMap jobsById = new ConcurrentHashMap<>(); @@ -126,6 +125,12 @@ public class IngestManager { private final ConcurrentHashMap ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); // Maps ingest thread ids to progress ingestThreadActivitySnapshots. private final ConcurrentHashMap ingestModuleRunTimes = new ConcurrentHashMap<>(); + /** + * The ingest job creation capability of the ingest manager can be turned on + * and off to support an orderly shut down of the application. + */ + private volatile boolean jobCreationIsEnabled; + /** * Ingest job events. */ @@ -425,7 +430,7 @@ public class IngestManager { void finishJob(IngestJob job) { long jobId = job.getId(); - this.jobsById.remove(jobId); + this.jobsById.remove(jobId); if (!job.isCancelled()) { IngestManager.logger.log(Level.INFO, "Ingest job {0} completed", jobId); this.fireIngestJobCompleted(jobId); @@ -434,7 +439,7 @@ public class IngestManager { this.fireIngestJobCancelled(jobId); } } - + /** * Called each time a module in a data source pipeline starts * @@ -629,7 +634,7 @@ public class IngestManager { } return snapShots; } - + /** * Creates and starts an ingest job, i.e., processing by ingest modules, for * each data source in a collection of data sources.