From 9311dba3d6dec1836b483cd339f5aac6b497b91b Mon Sep 17 00:00:00 2001 From: Richard Cordovano Date: Fri, 2 Jun 2017 09:08:31 -0400 Subject: [PATCH] IngestManager thread safety fixes --- .../sleuthkit/autopsy/ingest/IngestJob.java | 10 + .../autopsy/ingest/IngestManager.java | 237 +++++++++--------- 2 files changed, 122 insertions(+), 125 deletions(-) diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java index 4393f18510..e8863e7a6e 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestJob.java @@ -248,6 +248,16 @@ public final class IngestJob { } else { IngestManager.getInstance().fireDataSourceAnalysisCancelled(id, job.getId(), job.getDataSource()); } + try + { + System.out.println("\n\n######################\nJob done - sleeping for 30 seconds...\n\n"); + Thread.sleep(30000); + } + catch(InterruptedException ex) + { + Thread.currentThread().interrupt(); + } + System.out.println("\n##### Finished sleeping\n"); if (incompleteJobsCount.decrementAndGet() == 0) { ingestManager.finishIngestJob(this); } diff --git a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java index 2cdcea532d..cde10b0fa8 100644 --- a/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java +++ b/Core/src/org/sleuthkit/autopsy/ingest/IngestManager.java @@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; import javax.swing.JOptionPane; import org.netbeans.api.progress.ProgressHandle; import org.openide.util.Cancellable; @@ -64,82 +66,76 @@ import org.sleuthkit.datamodel.Content; /** * Manages the creation and execution of ingest jobs, i.e., the processing of * data sources by ingest modules. + * + * Every ingest job that is submitted to the ingest manager is passed to an + * ingest task scheduler to be broken down into data source level and file level + * ingest job tasks that are put into queues for execution by the ingest + * manager's executors. The process of starting an ingest job is handled by a + * single-threaded executor, the processing of data source level ingest tasks is + * handled by another single-threaded executor, and the processing of file level + * ingest jobs is handled by an executor with a configurable number of threads. + * + * The ingest manager publishes two kinds of application events: ingest job + * events and ingest module events. Ingest job events are published when an + * ingest job changes states, e.g., an ingest job is started or completed. + * Ingest module events are published on behalf of ingest modules working on an + * ingest job, e.g., content or an artifact was added to the case. Each event + * type is handled by a separate event publisher with its own remore event + * channel, but all event publishing is handled by a dedicated executor. + * + * The ingest manager publishes two kinds of application events: ingest job + * events and ingest module events. Ingest job events are published when an + * ingest job changes states, e.g., an ingest job is started or completed. + * Ingest module events are published on behalf of ingest modules working on an + * ingest job, e.g., content or an artifact was added to the case. Each event + * type is handled by a separate event publisher with its own remore event + * channel, but all event publishing is handled by a dedicated executor. + * + * The ingest manager uses an ingest monitor to determine when system resources + * are under pressure. If the ingest monitor detects such a situation, it calls + * back to the ingest manager to cancel all ingest jobs in progress. + * + * The ingest manager also uses a service monitor to watch for service outages. + * If a key services goes down, the ingest manager cancels all ingest jobs in + * progress. + * + * The ingest manager provides access to a top component that is used by ingest + * modules to post messages for the user. A count of the posts is used as a cap + * to avoid bogging down the application. + * + * The ingest manager supports reporting of ingest processing progress by + * collecting snapshots of the activities of the ingest threads, ingest job + * progress, and ingest module run times. */ +@ThreadSafe public class IngestManager { - private static final Logger LOGGER = Logger.getLogger(IngestManager.class.getName()); + private final static Logger LOGGER = Logger.getLogger(IngestManager.class.getName()); + private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS + private final static Set INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet()); + private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS + private final static Set INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet()); + private final static int MAX_ERROR_MESSAGE_POSTS = 200; + @GuardedBy("IngestManager.class") private static IngestManager instance; - - /* - * Every ingest job that is submitted to the ingest manager is passed to an - * ingest task scheduler to be broken down into data source level and file - * level ingest job tasks that are put into queues for execution by the - * ingest manager's executors. The process of starting an ingest job is - * handled by a single-threaded executor, the processing of data source - * level ingest tasks is handled by another single-threaded executor, and - * the processing of file level ingest jobs is handled by an executor with a - * configurable number of threads. - */ - private volatile boolean ingestJobCreationIsEnabled; - private final Map ingestJobsById = new ConcurrentHashMap<>(); - private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L); - private final Map> startIngestJobTasks = new ConcurrentHashMap<>(); + private final int numberOfFileIngestThreads; private final ExecutorService startIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS; private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS; - private static final int MIN_NUM_FILE_INGEST_THREADS = 1; - private static final int MAX_NUM_FILE_INGEST_THREADS = 16; - private static final int DEFAULT_NUM_FILE_INGEST_THREADS = 2; - private int numberOfFileIngestThreads; - private ExecutorService fileLevelIngestJobTasksExecutor; - - /* - * The ingest manager publishes two kinds of application events: ingest job - * events and ingest module events. Ingest job events are published when an - * ingest job changes states, e.g., an ingest job is started or completed. - * Ingest module events are published on behalf of ingest modules working on - * an ingest job, e.g., content or an artifact was added to the case. Each - * event type is handled by a separate event publisher with its own remore - * event channel, but all event publishing is handled by a dedicated - * executor. - */ - private static final String JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS - private static final Set INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet()); - private AutopsyEventPublisher jobEventPublisher = new AutopsyEventPublisher(); - private static final String MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS - private static final Set INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet()); - private AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher(); + private final ExecutorService fileLevelIngestJobTasksExecutor; private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS; - - /* - * The ingest manager uses an ingest monitor to determine when system - * resources are under pressure. If the ingest monitor detects such a - * situation, it calls back to the ingest manager to cancel all ingest jobs - * in progress. - * - * The ingest manager also uses a service monitor to watch for service - * outages. If a key services goes down, the ingest manager cancels all - * ingest jobs in progress. - */ - private final ServicesMonitor servicesMonitor = ServicesMonitor.getInstance(); + private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L); + private final Map> startIngestJobTasks = new ConcurrentHashMap<>(); + private final Map ingestJobsById = new ConcurrentHashMap<>(); private final IngestMonitor ingestMonitor = new IngestMonitor(); - - /* - * The ingest manager provides access to a top component that is used by - * ingest modules to post messages for the user. A count of the posts is - * used as a cap to avoid bogging down the application. - */ - private static final int MAX_ERROR_MESSAGE_POSTS = 200; - private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); + private final ServicesMonitor servicesMonitor = ServicesMonitor.getInstance(); + private final AutopsyEventPublisher jobEventPublisher = new AutopsyEventPublisher(); + private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher(); private final Object ingestMessageBoxLock = new Object(); - private volatile IngestMessageTopComponent ingestMessageBox; - - /* - * The ingest manager supports reporting of ingest processing progress by - * collecting snapshots of the activities of the ingest threads, ingest job - * progress, and ingest module run times. - */ + private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L); private final ConcurrentHashMap ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); private final ConcurrentHashMap ingestModuleRunTimes = new ConcurrentHashMap<>(); + private volatile boolean caseIsOpen; + private volatile IngestMessageTopComponent ingestMessageBox; /** * Gets the manager of the creation and execution of ingest jobs, i.e., the @@ -148,13 +144,14 @@ public class IngestManager { * @return A singleton ingest manager object. */ public synchronized static IngestManager getInstance() { - if (instance == null) { + if (null == instance) { /** * Two stage construction to avoid allowing the "this" reference to * be prematurely published from the constructor via the Case * property change listener. */ instance = new IngestManager(); + instance.subscribeToServiceMonitorEvents(); instance.subscribeToCaseEvents(); } return instance; @@ -162,19 +159,37 @@ public class IngestManager { /** * Constructs a manager of the creation and execution of ingest jobs, i.e., - * the processing of data sources by ingest modules. The manager immediately - * submits ingest job task ingest processing tasks to its executors. These - * tasks normally run as long as the application runs. + * the processing of data sources by ingest modules. + * + * The manager immediately submits ingest job task ingest processing tasks + * to its executors. These tasks normally run as long as the application + * runs. */ private IngestManager() { - subscribeToServiceMonitorEvents(); - startDataSourceLevelIngestJobTaskExecutor(); - startFileLevelIngestJobTasksExecutor(); + /* + * Submit a single Runnable task for processing data source level ingest + * job tasks to the data source level ingest job tasks executor. + */ + long threadId = nextIngestManagerTaskId.incrementAndGet(); + dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); + ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); + + /* + * Submit a configurable number of Runnable tasks for processing file + * level ingest job tasks to the file level ingest job tasks executor. + */ + numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads(); + fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS + for (int i = 0; i < numberOfFileIngestThreads; ++i) { + threadId = nextIngestManagerTaskId.incrementAndGet(); + fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); + ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); + } } /** - * Subscribes ingest manager to events published by its service monitor. The - * event handler cancels all ingest jobs if one a key service goes down. + * Subscribes the ingest manager to events published by its service monitor. + * The event handler cancels all ingest jobs if a key service goes down. */ private void subscribeToServiceMonitorEvents() { PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> { @@ -222,35 +237,7 @@ public class IngestManager { } /** - * Submits a task for processing data source level ingest job tasks to the - * data source level ingest job tasks executor. - */ - private void startDataSourceLevelIngestJobTaskExecutor() { - long threadId = nextIngestManagerTaskId.incrementAndGet(); - dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue())); - ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); - } - - /** - * Submits a configurable number of tasks for processing file level ingest - * job tasks to the file level ingest job tasks executor. - */ - private void startFileLevelIngestJobTasksExecutor() { - numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads(); - if ((numberOfFileIngestThreads < MIN_NUM_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUM_FILE_INGEST_THREADS)) { - numberOfFileIngestThreads = DEFAULT_NUM_FILE_INGEST_THREADS; - UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads); - } - fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS - for (int i = 0; i < numberOfFileIngestThreads; ++i) { - long threadId = nextIngestManagerTaskId.incrementAndGet(); - fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue())); - ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId)); - } - } - - /** - * Subscribes this ingest manager to local and remote case-related events. + * Subscribes the ingest manager to local and remote case-related events. */ private void subscribeToCaseEvents() { Case.addEventSubscriber(Case.Events.CURRENT_CASE.toString(), (PropertyChangeEvent event) -> { @@ -263,23 +250,20 @@ public class IngestManager { } /* - * Handles a case opened event. + * Handles a current case opened event. + * + * Note that current case change events are published in a strictly + * serialized manner. */ - synchronized void handleCaseOpened() { - this.ingestJobCreationIsEnabled = true; + void handleCaseOpened() { + this.caseIsOpen = true; clearIngestMessageBox(); try { - /** - * Use the text index name as the remote event channel name prefix - * since it is unique, the same as the case database name for a - * multiuser case, and is readily available through the - * Case.getTextIndexName() API. - */ Case openedCase = Case.getCurrentCase(); String channelPrefix = openedCase.getName(); if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) { - jobEventPublisher.openRemoteEventChannel(String.format(JOB_EVENT_CHANNEL_NAME, channelPrefix)); - moduleEventPublisher.openRemoteEventChannel(String.format(MODULE_EVENT_CHANNEL_NAME, channelPrefix)); + jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix)); + moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix)); } } catch (IllegalStateException | AutopsyEventException ex) { LOGGER.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS @@ -289,9 +273,12 @@ public class IngestManager { } /* - * Handles a case closed event. + * Handles a current case closed event. + * + * Note that current case change events are published in a strictly + * serialized manner. */ - synchronized void handleCaseClosed() { + void handleCaseClosed() { /* * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs * to complete when a case is closed. @@ -299,7 +286,7 @@ public class IngestManager { this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED); jobEventPublisher.closeRemoteEventChannel(); moduleEventPublisher.closeRemoteEventChannel(); - this.ingestJobCreationIsEnabled = false; + this.caseIsOpen = false; clearIngestMessageBox(); } @@ -369,7 +356,7 @@ public class IngestManager { * @param settings The settings for the ingest job. */ public void queueIngestJob(Collection dataSources, IngestJobSettings settings) { - if (ingestJobCreationIsEnabled) { + if (caseIsOpen) { IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI()); if (job.hasIngestPipeline()) { long taskId = nextIngestManagerTaskId.incrementAndGet(); @@ -388,13 +375,13 @@ public class IngestManager { * @return The IngestJobStartResult describing the results of attempting to * start the ingest job. */ - public synchronized IngestJobStartResult beginIngestJob(Collection dataSources, IngestJobSettings settings) { - if (this.ingestJobCreationIsEnabled) { + public IngestJobStartResult beginIngestJob(Collection dataSources, IngestJobSettings settings) { + if (this.caseIsOpen) { IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI()); if (job.hasIngestPipeline()) { return this.startIngestJob(job); // Start job } - return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled."), null); + return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null); } return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); } @@ -415,7 +402,7 @@ public class IngestManager { }) private IngestJobStartResult startIngestJob(IngestJob job) { List errors = null; - if (this.ingestJobCreationIsEnabled) { + if (this.caseIsOpen) { if (Case.getCurrentCase().getCaseType() == Case.CaseType.MULTI_USER_CASE) { try { if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) { @@ -480,7 +467,7 @@ public class IngestManager { * * @param job The completed job. */ - synchronized void finishIngestJob(IngestJob job) { + void finishIngestJob(IngestJob job) { long jobId = job.getId(); ingestJobsById.remove(jobId); if (!job.isCancelled()) { @@ -846,12 +833,12 @@ public class IngestManager { /** * Executes ingest jobs by acting as a consumer for an ingest tasks queue. */ - private final class ExecuteIngestJobsTask implements Runnable { + private final class ExecuteIngestJobTasksTask implements Runnable { private final long threadId; private final IngestTaskQueue tasks; - ExecuteIngestJobsTask(long threadId, IngestTaskQueue tasks) { + ExecuteIngestJobTasksTask(long threadId, IngestTaskQueue tasks) { this.threadId = threadId; this.tasks = tasks; }