IngestManager thread safety fixes

This commit is contained in:
Richard Cordovano 2017-06-02 09:08:31 -04:00
parent 8e6d04adb8
commit 9311dba3d6
2 changed files with 122 additions and 125 deletions

View File

@ -248,6 +248,16 @@ public final class IngestJob {
} else { } else {
IngestManager.getInstance().fireDataSourceAnalysisCancelled(id, job.getId(), job.getDataSource()); IngestManager.getInstance().fireDataSourceAnalysisCancelled(id, job.getId(), job.getDataSource());
} }
try
{
System.out.println("\n\n######################\nJob done - sleeping for 30 seconds...\n\n");
Thread.sleep(30000);
}
catch(InterruptedException ex)
{
Thread.currentThread().interrupt();
}
System.out.println("\n##### Finished sleeping\n");
if (incompleteJobsCount.decrementAndGet() == 0) { if (incompleteJobsCount.decrementAndGet() == 0) {
ingestManager.finishIngestJob(this); ingestManager.finishIngestJob(this);
} }

View File

@ -40,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.swing.JOptionPane; import javax.swing.JOptionPane;
import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandle;
import org.openide.util.Cancellable; import org.openide.util.Cancellable;
@ -64,82 +66,76 @@ import org.sleuthkit.datamodel.Content;
/** /**
* Manages the creation and execution of ingest jobs, i.e., the processing of * Manages the creation and execution of ingest jobs, i.e., the processing of
* data sources by ingest modules. * data sources by ingest modules.
*
* Every ingest job that is submitted to the ingest manager is passed to an
* ingest task scheduler to be broken down into data source level and file level
* ingest job tasks that are put into queues for execution by the ingest
* manager's executors. The process of starting an ingest job is handled by a
* single-threaded executor, the processing of data source level ingest tasks is
* handled by another single-threaded executor, and the processing of file level
* ingest jobs is handled by an executor with a configurable number of threads.
*
* The ingest manager publishes two kinds of application events: ingest job
* events and ingest module events. Ingest job events are published when an
* ingest job changes states, e.g., an ingest job is started or completed.
* Ingest module events are published on behalf of ingest modules working on an
* ingest job, e.g., content or an artifact was added to the case. Each event
* type is handled by a separate event publisher with its own remore event
* channel, but all event publishing is handled by a dedicated executor.
*
* The ingest manager publishes two kinds of application events: ingest job
* events and ingest module events. Ingest job events are published when an
* ingest job changes states, e.g., an ingest job is started or completed.
* Ingest module events are published on behalf of ingest modules working on an
* ingest job, e.g., content or an artifact was added to the case. Each event
* type is handled by a separate event publisher with its own remore event
* channel, but all event publishing is handled by a dedicated executor.
*
* The ingest manager uses an ingest monitor to determine when system resources
* are under pressure. If the ingest monitor detects such a situation, it calls
* back to the ingest manager to cancel all ingest jobs in progress.
*
* The ingest manager also uses a service monitor to watch for service outages.
* If a key services goes down, the ingest manager cancels all ingest jobs in
* progress.
*
* The ingest manager provides access to a top component that is used by ingest
* modules to post messages for the user. A count of the posts is used as a cap
* to avoid bogging down the application.
*
* The ingest manager supports reporting of ingest processing progress by
* collecting snapshots of the activities of the ingest threads, ingest job
* progress, and ingest module run times.
*/ */
@ThreadSafe
public class IngestManager { public class IngestManager {
private static final Logger LOGGER = Logger.getLogger(IngestManager.class.getName()); private final static Logger LOGGER = Logger.getLogger(IngestManager.class.getName());
private final static String INGEST_JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
private final static Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
private final static String INGEST_MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
private final static Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
private final static int MAX_ERROR_MESSAGE_POSTS = 200;
@GuardedBy("IngestManager.class")
private static IngestManager instance; private static IngestManager instance;
private final int numberOfFileIngestThreads;
/*
* Every ingest job that is submitted to the ingest manager is passed to an
* ingest task scheduler to be broken down into data source level and file
* level ingest job tasks that are put into queues for execution by the
* ingest manager's executors. The process of starting an ingest job is
* handled by a single-threaded executor, the processing of data source
* level ingest tasks is handled by another single-threaded executor, and
* the processing of file level ingest jobs is handled by an executor with a
* configurable number of threads.
*/
private volatile boolean ingestJobCreationIsEnabled;
private final Map<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
private final Map<Long, Future<Void>> startIngestJobTasks = new ConcurrentHashMap<>();
private final ExecutorService startIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS; private final ExecutorService startIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-start-ingest-jobs-%d").build()); //NON-NLS;
private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS; private final ExecutorService dataSourceLevelIngestJobTasksExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-data-source-ingest-%d").build()); //NON-NLS;
private static final int MIN_NUM_FILE_INGEST_THREADS = 1; private final ExecutorService fileLevelIngestJobTasksExecutor;
private static final int MAX_NUM_FILE_INGEST_THREADS = 16;
private static final int DEFAULT_NUM_FILE_INGEST_THREADS = 2;
private int numberOfFileIngestThreads;
private ExecutorService fileLevelIngestJobTasksExecutor;
/*
* The ingest manager publishes two kinds of application events: ingest job
* events and ingest module events. Ingest job events are published when an
* ingest job changes states, e.g., an ingest job is started or completed.
* Ingest module events are published on behalf of ingest modules working on
* an ingest job, e.g., content or an artifact was added to the case. Each
* event type is handled by a separate event publisher with its own remore
* event channel, but all event publishing is handled by a dedicated
* executor.
*/
private static final String JOB_EVENT_CHANNEL_NAME = "%s-Ingest-Job-Events"; //NON-NLS
private static final Set<String> INGEST_JOB_EVENT_NAMES = Stream.of(IngestJobEvent.values()).map(IngestJobEvent::toString).collect(Collectors.toSet());
private AutopsyEventPublisher jobEventPublisher = new AutopsyEventPublisher();
private static final String MODULE_EVENT_CHANNEL_NAME = "%s-Ingest-Module-Events"; //NON-NLS
private static final Set<String> INGEST_MODULE_EVENT_NAMES = Stream.of(IngestModuleEvent.values()).map(IngestModuleEvent::toString).collect(Collectors.toSet());
private AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS; private final ExecutorService eventPublishingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("IM-ingest-events-%d").build()); //NON-NLS;
private final AtomicLong nextIngestManagerTaskId = new AtomicLong(0L);
/* private final Map<Long, Future<Void>> startIngestJobTasks = new ConcurrentHashMap<>();
* The ingest manager uses an ingest monitor to determine when system private final Map<Long, IngestJob> ingestJobsById = new ConcurrentHashMap<>();
* resources are under pressure. If the ingest monitor detects such a
* situation, it calls back to the ingest manager to cancel all ingest jobs
* in progress.
*
* The ingest manager also uses a service monitor to watch for service
* outages. If a key services goes down, the ingest manager cancels all
* ingest jobs in progress.
*/
private final ServicesMonitor servicesMonitor = ServicesMonitor.getInstance();
private final IngestMonitor ingestMonitor = new IngestMonitor(); private final IngestMonitor ingestMonitor = new IngestMonitor();
private final ServicesMonitor servicesMonitor = ServicesMonitor.getInstance();
/* private final AutopsyEventPublisher jobEventPublisher = new AutopsyEventPublisher();
* The ingest manager provides access to a top component that is used by private final AutopsyEventPublisher moduleEventPublisher = new AutopsyEventPublisher();
* ingest modules to post messages for the user. A count of the posts is
* used as a cap to avoid bogging down the application.
*/
private static final int MAX_ERROR_MESSAGE_POSTS = 200;
private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
private final Object ingestMessageBoxLock = new Object(); private final Object ingestMessageBoxLock = new Object();
private volatile IngestMessageTopComponent ingestMessageBox; private final AtomicLong ingestErrorMessagePosts = new AtomicLong(0L);
/*
* The ingest manager supports reporting of ingest processing progress by
* collecting snapshots of the activities of the ingest threads, ingest job
* progress, and ingest module run times.
*/
private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, IngestThreadActivitySnapshot> ingestThreadActivitySnapshots = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Long> ingestModuleRunTimes = new ConcurrentHashMap<>();
private volatile boolean caseIsOpen;
private volatile IngestMessageTopComponent ingestMessageBox;
/** /**
* Gets the manager of the creation and execution of ingest jobs, i.e., the * Gets the manager of the creation and execution of ingest jobs, i.e., the
@ -148,13 +144,14 @@ public class IngestManager {
* @return A singleton ingest manager object. * @return A singleton ingest manager object.
*/ */
public synchronized static IngestManager getInstance() { public synchronized static IngestManager getInstance() {
if (instance == null) { if (null == instance) {
/** /**
* Two stage construction to avoid allowing the "this" reference to * Two stage construction to avoid allowing the "this" reference to
* be prematurely published from the constructor via the Case * be prematurely published from the constructor via the Case
* property change listener. * property change listener.
*/ */
instance = new IngestManager(); instance = new IngestManager();
instance.subscribeToServiceMonitorEvents();
instance.subscribeToCaseEvents(); instance.subscribeToCaseEvents();
} }
return instance; return instance;
@ -162,19 +159,37 @@ public class IngestManager {
/** /**
* Constructs a manager of the creation and execution of ingest jobs, i.e., * Constructs a manager of the creation and execution of ingest jobs, i.e.,
* the processing of data sources by ingest modules. The manager immediately * the processing of data sources by ingest modules.
* submits ingest job task ingest processing tasks to its executors. These *
* tasks normally run as long as the application runs. * The manager immediately submits ingest job task ingest processing tasks
* to its executors. These tasks normally run as long as the application
* runs.
*/ */
private IngestManager() { private IngestManager() {
subscribeToServiceMonitorEvents(); /*
startDataSourceLevelIngestJobTaskExecutor(); * Submit a single Runnable task for processing data source level ingest
startFileLevelIngestJobTasksExecutor(); * job tasks to the data source level ingest job tasks executor.
*/
long threadId = nextIngestManagerTaskId.incrementAndGet();
dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
/*
* Submit a configurable number of Runnable tasks for processing file
* level ingest job tasks to the file level ingest job tasks executor.
*/
numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
threadId = nextIngestManagerTaskId.incrementAndGet();
fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobTasksTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
} }
/** /**
* Subscribes ingest manager to events published by its service monitor. The * Subscribes the ingest manager to events published by its service monitor.
* event handler cancels all ingest jobs if one a key service goes down. * The event handler cancels all ingest jobs if a key service goes down.
*/ */
private void subscribeToServiceMonitorEvents() { private void subscribeToServiceMonitorEvents() {
PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> { PropertyChangeListener propChangeListener = (PropertyChangeEvent evt) -> {
@ -222,35 +237,7 @@ public class IngestManager {
} }
/** /**
* Submits a task for processing data source level ingest job tasks to the * Subscribes the ingest manager to local and remote case-related events.
* data source level ingest job tasks executor.
*/
private void startDataSourceLevelIngestJobTaskExecutor() {
long threadId = nextIngestManagerTaskId.incrementAndGet();
dataSourceLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getDataSourceIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
/**
* Submits a configurable number of tasks for processing file level ingest
* job tasks to the file level ingest job tasks executor.
*/
private void startFileLevelIngestJobTasksExecutor() {
numberOfFileIngestThreads = UserPreferences.numberOfFileIngestThreads();
if ((numberOfFileIngestThreads < MIN_NUM_FILE_INGEST_THREADS) || (numberOfFileIngestThreads > MAX_NUM_FILE_INGEST_THREADS)) {
numberOfFileIngestThreads = DEFAULT_NUM_FILE_INGEST_THREADS;
UserPreferences.setNumberOfFileIngestThreads(numberOfFileIngestThreads);
}
fileLevelIngestJobTasksExecutor = Executors.newFixedThreadPool(numberOfFileIngestThreads, new ThreadFactoryBuilder().setNameFormat("IM-file-ingest-%d").build()); //NON-NLS
for (int i = 0; i < numberOfFileIngestThreads; ++i) {
long threadId = nextIngestManagerTaskId.incrementAndGet();
fileLevelIngestJobTasksExecutor.submit(new ExecuteIngestJobsTask(threadId, IngestTasksScheduler.getInstance().getFileIngestTaskQueue()));
ingestThreadActivitySnapshots.put(threadId, new IngestThreadActivitySnapshot(threadId));
}
}
/**
* Subscribes this ingest manager to local and remote case-related events.
*/ */
private void subscribeToCaseEvents() { private void subscribeToCaseEvents() {
Case.addEventSubscriber(Case.Events.CURRENT_CASE.toString(), (PropertyChangeEvent event) -> { Case.addEventSubscriber(Case.Events.CURRENT_CASE.toString(), (PropertyChangeEvent event) -> {
@ -263,23 +250,20 @@ public class IngestManager {
} }
/* /*
* Handles a case opened event. * Handles a current case opened event.
*
* Note that current case change events are published in a strictly
* serialized manner.
*/ */
synchronized void handleCaseOpened() { void handleCaseOpened() {
this.ingestJobCreationIsEnabled = true; this.caseIsOpen = true;
clearIngestMessageBox(); clearIngestMessageBox();
try { try {
/**
* Use the text index name as the remote event channel name prefix
* since it is unique, the same as the case database name for a
* multiuser case, and is readily available through the
* Case.getTextIndexName() API.
*/
Case openedCase = Case.getCurrentCase(); Case openedCase = Case.getCurrentCase();
String channelPrefix = openedCase.getName(); String channelPrefix = openedCase.getName();
if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) { if (Case.CaseType.MULTI_USER_CASE == openedCase.getCaseType()) {
jobEventPublisher.openRemoteEventChannel(String.format(JOB_EVENT_CHANNEL_NAME, channelPrefix)); jobEventPublisher.openRemoteEventChannel(String.format(INGEST_JOB_EVENT_CHANNEL_NAME, channelPrefix));
moduleEventPublisher.openRemoteEventChannel(String.format(MODULE_EVENT_CHANNEL_NAME, channelPrefix)); moduleEventPublisher.openRemoteEventChannel(String.format(INGEST_MODULE_EVENT_CHANNEL_NAME, channelPrefix));
} }
} catch (IllegalStateException | AutopsyEventException ex) { } catch (IllegalStateException | AutopsyEventException ex) {
LOGGER.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS LOGGER.log(Level.SEVERE, "Failed to open remote events channel", ex); //NON-NLS
@ -289,9 +273,12 @@ public class IngestManager {
} }
/* /*
* Handles a case closed event. * Handles a current case closed event.
*
* Note that current case change events are published in a strictly
* serialized manner.
*/ */
synchronized void handleCaseClosed() { void handleCaseClosed() {
/* /*
* TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs * TODO (JIRA-2227): IngestManager should wait for cancelled ingest jobs
* to complete when a case is closed. * to complete when a case is closed.
@ -299,7 +286,7 @@ public class IngestManager {
this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED); this.cancelAllIngestJobs(IngestJob.CancellationReason.CASE_CLOSED);
jobEventPublisher.closeRemoteEventChannel(); jobEventPublisher.closeRemoteEventChannel();
moduleEventPublisher.closeRemoteEventChannel(); moduleEventPublisher.closeRemoteEventChannel();
this.ingestJobCreationIsEnabled = false; this.caseIsOpen = false;
clearIngestMessageBox(); clearIngestMessageBox();
} }
@ -369,7 +356,7 @@ public class IngestManager {
* @param settings The settings for the ingest job. * @param settings The settings for the ingest job.
*/ */
public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) { public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
if (ingestJobCreationIsEnabled) { if (caseIsOpen) {
IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI()); IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
if (job.hasIngestPipeline()) { if (job.hasIngestPipeline()) {
long taskId = nextIngestManagerTaskId.incrementAndGet(); long taskId = nextIngestManagerTaskId.incrementAndGet();
@ -388,13 +375,13 @@ public class IngestManager {
* @return The IngestJobStartResult describing the results of attempting to * @return The IngestJobStartResult describing the results of attempting to
* start the ingest job. * start the ingest job.
*/ */
public synchronized IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) { public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
if (this.ingestJobCreationIsEnabled) { if (this.caseIsOpen) {
IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI()); IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI());
if (job.hasIngestPipeline()) { if (job.hasIngestPipeline()) {
return this.startIngestJob(job); // Start job return this.startIngestJob(job); // Start job
} }
return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled."), null); return new IngestJobStartResult(null, new IngestManagerException("No ingest pipeline created, likely due to no ingest modules being enabled"), null);
} }
return new IngestJobStartResult(null, new IngestManagerException("No case open"), null); return new IngestJobStartResult(null, new IngestManagerException("No case open"), null);
} }
@ -415,7 +402,7 @@ public class IngestManager {
}) })
private IngestJobStartResult startIngestJob(IngestJob job) { private IngestJobStartResult startIngestJob(IngestJob job) {
List<IngestModuleError> errors = null; List<IngestModuleError> errors = null;
if (this.ingestJobCreationIsEnabled) { if (this.caseIsOpen) {
if (Case.getCurrentCase().getCaseType() == Case.CaseType.MULTI_USER_CASE) { if (Case.getCurrentCase().getCaseType() == Case.CaseType.MULTI_USER_CASE) {
try { try {
if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) { if (!servicesMonitor.getServiceStatus(ServicesMonitor.Service.REMOTE_CASE_DATABASE.toString()).equals(ServicesMonitor.ServiceStatus.UP.toString())) {
@ -480,7 +467,7 @@ public class IngestManager {
* *
* @param job The completed job. * @param job The completed job.
*/ */
synchronized void finishIngestJob(IngestJob job) { void finishIngestJob(IngestJob job) {
long jobId = job.getId(); long jobId = job.getId();
ingestJobsById.remove(jobId); ingestJobsById.remove(jobId);
if (!job.isCancelled()) { if (!job.isCancelled()) {
@ -846,12 +833,12 @@ public class IngestManager {
/** /**
* Executes ingest jobs by acting as a consumer for an ingest tasks queue. * Executes ingest jobs by acting as a consumer for an ingest tasks queue.
*/ */
private final class ExecuteIngestJobsTask implements Runnable { private final class ExecuteIngestJobTasksTask implements Runnable {
private final long threadId; private final long threadId;
private final IngestTaskQueue tasks; private final IngestTaskQueue tasks;
ExecuteIngestJobsTask(long threadId, IngestTaskQueue tasks) { ExecuteIngestJobTasksTask(long threadId, IngestTaskQueue tasks) {
this.threadId = threadId; this.threadId = threadId;
this.tasks = tasks; this.tasks = tasks;
} }