Streaming ingest

This commit is contained in:
apriestman 2020-07-09 14:41:50 -04:00
parent b6dc3da22b
commit b3946d956d
23 changed files with 801 additions and 1841 deletions

View File

@ -30,6 +30,7 @@ import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorProgress
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.imagewriter.ImageWriterService; import org.sleuthkit.autopsy.imagewriter.ImageWriterService;
import org.sleuthkit.autopsy.imagewriter.ImageWriterSettings; import org.sleuthkit.autopsy.imagewriter.ImageWriterSettings;
import org.sleuthkit.datamodel.AddDataSourceCallbacks;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.Image; import org.sleuthkit.datamodel.Image;
import org.sleuthkit.datamodel.SleuthkitJNI; import org.sleuthkit.datamodel.SleuthkitJNI;
@ -42,17 +43,10 @@ import org.sleuthkit.datamodel.TskDataException;
class AddImageTask implements Runnable { class AddImageTask implements Runnable {
private final Logger logger = Logger.getLogger(AddImageTask.class.getName()); private final Logger logger = Logger.getLogger(AddImageTask.class.getName());
private final String deviceId; private final ImageDetails imageDetails;
private final String imagePath;
private final int sectorSize;
private final String timeZone;
private final ImageWriterSettings imageWriterSettings;
private final boolean ignoreFatOrphanFiles;
private final String md5;
private final String sha1;
private final String sha256;
private final DataSourceProcessorProgressMonitor progressMonitor; private final DataSourceProcessorProgressMonitor progressMonitor;
private final DataSourceProcessorCallback callback; private final AddDataSourceCallbacks addDataSourceCallbacks;
private final AddImageTaskCallback addImageTaskCallback;
private boolean criticalErrorOccurred; private boolean criticalErrorOccurred;
/* /*
@ -73,40 +67,18 @@ class AddImageTask implements Runnable {
/** /**
* Constructs a runnable task that adds an image to the case database. * Constructs a runnable task that adds an image to the case database.
* *
* @param deviceId An ASCII-printable identifier for the device * @param imageDetails Holds all data about the image.
* associated with the data source that is
* intended to be unique across multiple cases
* (e.g., a UUID).
* @param imagePath Path to the image file.
* @param sectorSize The sector size (use '0' for autodetect).
* @param timeZone The time zone to use when processing dates
* and times for the image, obtained from
* java.util.TimeZone.getID.
* @param ignoreFatOrphanFiles Whether to parse orphans if the image has a
* FAT filesystem.
* @param md5 The MD5 hash of the image, may be null.
* @param sha1 The SHA-1 hash of the image, may be null.
* @param sha256 The SHA-256 hash of the image, may be null.
* @param imageWriterPath Path that a copy of the image should be
* written to. Use empty string to disable image
* writing
* @param progressMonitor Progress monitor to report progress during * @param progressMonitor Progress monitor to report progress during
* processing. * processing.
* @param callback Callback to call when processing is done. * @param addDataSourceCallbacks Callback for sending data to the ingest pipeline if an ingest stream is being used.
* @param addImageTaskCallback Callback for dealing with add image task completion.
*/ */
AddImageTask(String deviceId, String imagePath, int sectorSize, String timeZone, boolean ignoreFatOrphanFiles, String md5, String sha1, String sha256, ImageWriterSettings imageWriterSettings, AddImageTask(ImageDetails imageDetails, DataSourceProcessorProgressMonitor progressMonitor, AddDataSourceCallbacks addDataSourceCallbacks,
DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) { AddImageTaskCallback addImageTaskCallback) {
this.deviceId = deviceId; this.imageDetails = imageDetails;
this.imagePath = imagePath; this.addDataSourceCallbacks = addDataSourceCallbacks;
this.sectorSize = sectorSize; this.addImageTaskCallback = addImageTaskCallback;
this.timeZone = timeZone;
this.ignoreFatOrphanFiles = ignoreFatOrphanFiles;
this.md5 = md5;
this.sha1 = sha1;
this.sha256 = sha256;
this.imageWriterSettings = imageWriterSettings;
this.callback = callback;
this.progressMonitor = progressMonitor; this.progressMonitor = progressMonitor;
tskAddImageProcessLock = new Object(); tskAddImageProcessLock = new Object();
} }
@ -120,21 +92,21 @@ class AddImageTask implements Runnable {
try { try {
currentCase = Case.getCurrentCaseThrows(); currentCase = Case.getCurrentCaseThrows();
} catch (NoCurrentCaseException ex) { } catch (NoCurrentCaseException ex) {
logger.log(Level.SEVERE, String.format("Failed to add image data source at %s, no current case", imagePath), ex); logger.log(Level.SEVERE, String.format("Failed to start AddImageTask for %s, no current case", imageDetails.getImagePath()), ex);
return; return;
} }
progressMonitor.setIndeterminate(true); progressMonitor.setIndeterminate(true);
progressMonitor.setProgress(0); progressMonitor.setProgress(0);
String imageWriterPath = ""; String imageWriterPath = "";
if (imageWriterSettings != null) { if (imageDetails.imageWriterSettings != null) {
imageWriterPath = imageWriterSettings.getPath(); imageWriterPath = imageDetails.imageWriterSettings.getPath();
} }
List<String> errorMessages = new ArrayList<>(); List<String> errorMessages = new ArrayList<>();
List<Content> newDataSources = new ArrayList<>(); List<Content> newDataSources = new ArrayList<>();
try { try {
synchronized (tskAddImageProcessLock) { synchronized (tskAddImageProcessLock) {
if (!tskAddImageProcessStopped) { if (!tskAddImageProcessStopped) {
tskAddImageProcess = currentCase.getSleuthkitCase().makeAddImageProcess(timeZone, true, ignoreFatOrphanFiles, imageWriterPath); tskAddImageProcess = currentCase.getSleuthkitCase().makeAddImageProcess(imageDetails.timeZone, true, imageDetails.ignoreFatOrphanFiles, imageWriterPath);
} else { } else {
return; return;
} }
@ -143,7 +115,7 @@ class AddImageTask implements Runnable {
progressUpdateThread.start(); progressUpdateThread.start();
runAddImageProcess(errorMessages); runAddImageProcess(errorMessages);
progressUpdateThread.interrupt(); progressUpdateThread.interrupt();
commitOrRevertAddImageProcess(currentCase, errorMessages, newDataSources); finishAddImageProcess(errorMessages, newDataSources);
progressMonitor.setProgress(100); progressMonitor.setProgress(100);
} finally { } finally {
DataSourceProcessorCallback.DataSourceProcessorResult result; DataSourceProcessorCallback.DataSourceProcessorResult result;
@ -154,7 +126,7 @@ class AddImageTask implements Runnable {
} else { } else {
result = DataSourceProcessorResult.NO_ERRORS; result = DataSourceProcessorResult.NO_ERRORS;
} }
callback.done(result, errorMessages, newDataSources); addImageTaskCallback.onCompleted(result, errorMessages, newDataSources);
} }
} }
@ -177,7 +149,7 @@ class AddImageTask implements Runnable {
tskAddImageProcess.stop(); tskAddImageProcess.stop();
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error cancelling adding image %s to the case database", imagePath), ex); //NON-NLS logger.log(Level.SEVERE, String.format("Error cancelling adding image %s to the case database", imageDetails.getImagePath()), ex); //NON-NLS
} }
} }
} }
@ -191,23 +163,22 @@ class AddImageTask implements Runnable {
*/ */
private void runAddImageProcess(List<String> errorMessages) { private void runAddImageProcess(List<String> errorMessages) {
try { try {
tskAddImageProcess.run(deviceId, new String[]{imagePath}, sectorSize); tskAddImageProcess.run(imageDetails.deviceId, imageDetails.image, imageDetails.sectorSize, this.addDataSourceCallbacks);
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Critical error occurred adding image %s", imagePath), ex); //NON-NLS logger.log(Level.SEVERE, String.format("Critical error occurred adding image %s", imageDetails.getImagePath()), ex); //NON-NLS
criticalErrorOccurred = true; criticalErrorOccurred = true;
errorMessages.add(ex.getMessage()); errorMessages.add(ex.getMessage());
} catch (TskDataException ex) { } catch (TskDataException ex) {
logger.log(Level.WARNING, String.format("Non-critical error occurred adding image %s", imagePath), ex); //NON-NLS logger.log(Level.WARNING, String.format("Non-critical error occurred adding image %s", imageDetails.getImagePath()), ex); //NON-NLS
errorMessages.add(ex.getMessage()); errorMessages.add(ex.getMessage());
} }
} }
/** /**
* Commits or reverts the results of the TSK add image process. If the * Handle the results of the TSK add image process.
* process was stopped before it completed or there was a critical error the * The image will be in the database even if a critical error occurred or
* results are reverted, otherwise they are committed. * the user canceled.
* *
* @param currentCase The current case.
* @param errorMessages Error messages, if any, are added to this list for * @param errorMessages Error messages, if any, are added to this list for
* eventual return via the callback. * eventual return via the callback.
* @param newDataSources If the new image is successfully committed, it is * @param newDataSources If the new image is successfully committed, it is
@ -216,84 +187,66 @@ class AddImageTask implements Runnable {
* *
* @return * @return
*/ */
private void commitOrRevertAddImageProcess(Case currentCase, List<String> errorMessages, List<Content> newDataSources) { private void finishAddImageProcess(List<String> errorMessages, List<Content> newDataSources) {
synchronized (tskAddImageProcessLock) { synchronized (tskAddImageProcessLock) {
if (tskAddImageProcessStopped || criticalErrorOccurred) { Image newImage = imageDetails.image;
String verificationError = newImage.verifyImageSize();
if (!verificationError.isEmpty()) {
errorMessages.add(verificationError);
}
if (imageDetails.imageWriterSettings != null) {
ImageWriterService.createImageWriter(newImage.getId(), imageDetails.imageWriterSettings);
}
newDataSources.add(newImage);
// If the add image process was cancelled don't do any further processing here
if (tskAddImageProcessStopped) {
return;
}
if (!StringUtils.isBlank(imageDetails.md5)) {
try { try {
tskAddImageProcess.revert(); newImage.setMD5(imageDetails.md5);
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error reverting after adding image %s to the case database", imagePath), ex); //NON-NLS logger.log(Level.SEVERE, String.format("Failed to add MD5 hash for image data source %s (objId=%d)", newImage.getName(), newImage.getId()), ex);
errorMessages.add(ex.getMessage()); errorMessages.add(ex.getMessage());
criticalErrorOccurred = true; criticalErrorOccurred = true;
} catch (TskDataException ignored) {
/*
* The only reasonable way for this to happen at
* present is through C/C++ processing of an EWF
* image, which is not an error.
*/
} }
} else { }
if (!StringUtils.isBlank(imageDetails.sha1)) {
try { try {
long imageId = tskAddImageProcess.commit(); newImage.setSha1(imageDetails.sha1);
if (imageId != 0) {
Image newImage = currentCase.getSleuthkitCase().getImageById(imageId);
String verificationError = newImage.verifyImageSize();
if (!verificationError.isEmpty()) {
errorMessages.add(verificationError);
}
if (imageWriterSettings != null) {
ImageWriterService.createImageWriter(imageId, imageWriterSettings);
}
newDataSources.add(newImage);
if (!StringUtils.isBlank(md5)) {
try {
newImage.setMD5(md5);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Failed to add MD5 hash for image data source %s (objId=%d)", newImage.getName(), newImage.getId()), ex);
errorMessages.add(ex.getMessage());
criticalErrorOccurred = true;
} catch (TskDataException ignored) {
/*
* The only reasonable way for this to happen at
* present is through C/C++ processing of an EWF
* image, which is not an error.
*/
}
}
if (!StringUtils.isBlank(sha1)) {
try {
newImage.setSha1(sha1);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Failed to add SHA1 hash for image data source %s (objId=%d)", newImage.getName(), newImage.getId()), ex);
errorMessages.add(ex.getMessage());
criticalErrorOccurred = true;
} catch (TskDataException ignored) {
/*
* The only reasonable way for this to happen at
* present is through C/C++ processing of an EWF
* image, which is not an error.
*/
}
}
if (!StringUtils.isBlank(sha256)) {
try {
newImage.setSha256(sha256);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Failed to add SHA256 for image data source %s (objId=%d)", newImage.getName(), newImage.getId()), ex);
errorMessages.add(ex.getMessage());
criticalErrorOccurred = true;
} catch (TskDataException ignored) {
/*
* The only reasonable way for this to happen at
* present is through C/C++ processing of an EWF
* image, which is not an error.
*/
}
}
} else {
String errorMessage = String.format("Error commiting after adding image %s to the case database, no object id returned", imagePath); //NON-NLS
logger.log(Level.SEVERE, errorMessage);
errorMessages.add(errorMessage);
criticalErrorOccurred = true;
}
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error committing adding image %s to the case database", imagePath), ex); //NON-NLS logger.log(Level.SEVERE, String.format("Failed to add SHA1 hash for image data source %s (objId=%d)", newImage.getName(), newImage.getId()), ex);
errorMessages.add(ex.getMessage()); errorMessages.add(ex.getMessage());
criticalErrorOccurred = true; criticalErrorOccurred = true;
} catch (TskDataException ignored) {
/*
* The only reasonable way for this to happen at
* present is through C/C++ processing of an EWF
* image, which is not an error.
*/
}
}
if (!StringUtils.isBlank(imageDetails.sha256)) {
try {
newImage.setSha256(imageDetails.sha256);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Failed to add SHA256 for image data source %s (objId=%d)", newImage.getName(), newImage.getId()), ex);
errorMessages.add(ex.getMessage());
criticalErrorOccurred = true;
} catch (TskDataException ignored) {
/*
* The only reasonable way for this to happen at
* present is through C/C++ processing of an EWF
* image, which is not an error.
*/
} }
} }
} }
@ -352,4 +305,37 @@ class AddImageTask implements Runnable {
} }
} }
/**
* Utility class to hold image data.
*/
static class ImageDetails {
String deviceId;
Image image;
int sectorSize;
String timeZone;
boolean ignoreFatOrphanFiles;
String md5;
String sha1;
String sha256;
ImageWriterSettings imageWriterSettings;
ImageDetails(String deviceId, Image image, int sectorSize, String timeZone, boolean ignoreFatOrphanFiles, String md5, String sha1, String sha256, ImageWriterSettings imageWriterSettings) {
this.deviceId = deviceId;
this.image = image;
this.sectorSize = sectorSize;
this.timeZone = timeZone;
this.ignoreFatOrphanFiles = ignoreFatOrphanFiles;
this.md5 = md5;
this.sha1 = sha1;
this.sha256 = sha256;
this.imageWriterSettings = imageWriterSettings;
}
String getImagePath() {
if (image.getPaths().length > 0) {
return image.getPaths()[0];
}
return "Unknown data source path";
}
}
} }

View File

@ -302,7 +302,9 @@ class AddImageWizardAddingProgressPanel extends ShortcutWizardDescriptorPanel {
private void startIngest() { private void startIngest() {
if (!newContents.isEmpty() && readyToIngest && !ingested) { if (!newContents.isEmpty() && readyToIngest && !ingested) {
ingested = true; ingested = true;
IngestManager.getInstance().queueIngestJob(newContents, ingestJobSettings); if (dsProcessor != null && ! dsProcessor.supportsIngestStream()) {
IngestManager.getInstance().queueIngestJob(newContents, ingestJobSettings);
}
setStateFinished(); setStateFinished();
} }
} }
@ -360,8 +362,12 @@ class AddImageWizardAddingProgressPanel extends ShortcutWizardDescriptorPanel {
setStateStarted(); setStateStarted();
// Kick off the DSProcessor // Kick off the DSProcessor
dsProcessor.run(getDSPProgressMonitorImpl(), cbObj); if (dsProcessor.supportsIngestStream()) {
dsProcessor.runWithIngestStream(ingestJobSettings, getDSPProgressMonitorImpl(), cbObj);
} else {
dsProcessor.run(getDSPProgressMonitorImpl(), cbObj);
}
} }
} }

View File

@ -41,6 +41,7 @@ class AddImageWizardIterator implements WizardDescriptor.Iterator<WizardDescript
private final AddImageAction action; private final AddImageAction action;
private int progressPanelIndex; private int progressPanelIndex;
private int dsPanelIndex; private int dsPanelIndex;
private int ingestPanelIndex;
private final static String PROP_LASTPROFILE_NAME = "AIW_LASTPROFILE_NAME"; //NON-NLS private final static String PROP_LASTPROFILE_NAME = "AIW_LASTPROFILE_NAME"; //NON-NLS
AddImageWizardIterator(AddImageAction action) { AddImageWizardIterator(AddImageAction action) {
@ -69,6 +70,7 @@ class AddImageWizardIterator implements WizardDescriptor.Iterator<WizardDescript
panels.add(progressPanel); panels.add(progressPanel);
progressPanelIndex = panels.indexOf(progressPanel); //Doing programatically because number of panels is variable progressPanelIndex = panels.indexOf(progressPanel); //Doing programatically because number of panels is variable
dsPanelIndex = panels.indexOf(dsPanel); dsPanelIndex = panels.indexOf(dsPanel);
ingestPanelIndex = panels.indexOf(ingestConfigPanel);
String[] steps = new String[panels.size()]; String[] steps = new String[panels.size()];
for (int i = 0; i < panels.size(); i++) { for (int i = 0; i < panels.size(); i++) {
Component c = panels.get(i).getComponent(); Component c = panels.get(i).getComponent();
@ -177,7 +179,7 @@ class AddImageWizardIterator implements WizardDescriptor.Iterator<WizardDescript
// Start processing the data source by handing it off to the selected DSP, // Start processing the data source by handing it off to the selected DSP,
// so it gets going in the background while the user is still picking the Ingest modules // so it gets going in the background while the user is still picking the Ingest modules
// This will occur when the next button is clicked on the panel where you have chosen your data to process // This will occur when the next button is clicked on the panel where you have chosen your data to process
if (index == dsPanelIndex) { if (index == ingestPanelIndex) {
((AddImageWizardAddingProgressPanel) panels.get(progressPanelIndex)). ((AddImageWizardAddingProgressPanel) panels.get(progressPanelIndex)).
startDataSourceProcessing(((AddImageWizardDataSourceSettingsPanel) panels.get(dsPanelIndex)).getComponent().getCurrentDSProcessor()); startDataSourceProcessing(((AddImageWizardDataSourceSettingsPanel) panels.get(dsPanelIndex)).getComponent().getCurrentDSProcessor());
} }

View File

@ -24,6 +24,7 @@ import javax.swing.JPanel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
import java.util.logging.Level;
import java.util.UUID; import java.util.UUID;
import javax.swing.filechooser.FileFilter; import javax.swing.filechooser.FileFilter;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
@ -33,7 +34,14 @@ import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorProgress
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorCallback; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorCallback;
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor;
import org.sleuthkit.autopsy.coreutils.DataSourceUtils; import org.sleuthkit.autopsy.coreutils.DataSourceUtils;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.datasourceprocessors.AutoIngestDataSourceProcessor; import org.sleuthkit.autopsy.datasourceprocessors.AutoIngestDataSourceProcessor;
import org.sleuthkit.autopsy.ingest.IngestJobSettings;
import org.sleuthkit.autopsy.ingest.IngestManager;
import org.sleuthkit.autopsy.ingest.IngestStream;
import org.sleuthkit.datamodel.Image;
import org.sleuthkit.datamodel.SleuthkitJNI;
import org.sleuthkit.datamodel.TskCoreException;
/** /**
* A image file data source processor that implements the DataSourceProcessor * A image file data source processor that implements the DataSourceProcessor
@ -49,6 +57,7 @@ import org.sleuthkit.autopsy.datasourceprocessors.AutoIngestDataSourceProcessor;
public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSourceProcessor { public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSourceProcessor {
private final static String DATA_SOURCE_TYPE = NbBundle.getMessage(ImageDSProcessor.class, "ImageDSProcessor.dsType.text"); private final static String DATA_SOURCE_TYPE = NbBundle.getMessage(ImageDSProcessor.class, "ImageDSProcessor.dsType.text");
private final Logger logger = Logger.getLogger(ImageDSProcessor.class.getName());
private static final List<String> allExt = new ArrayList<>(); private static final List<String> allExt = new ArrayList<>();
private static final GeneralFilter rawFilter = new GeneralFilter(GeneralFilter.RAW_IMAGE_EXTS, GeneralFilter.RAW_IMAGE_DESC); private static final GeneralFilter rawFilter = new GeneralFilter(GeneralFilter.RAW_IMAGE_EXTS, GeneralFilter.RAW_IMAGE_DESC);
private static final GeneralFilter encaseFilter = new GeneralFilter(GeneralFilter.ENCASE_IMAGE_EXTS, GeneralFilter.ENCASE_IMAGE_DESC); private static final GeneralFilter encaseFilter = new GeneralFilter(GeneralFilter.ENCASE_IMAGE_EXTS, GeneralFilter.ENCASE_IMAGE_DESC);
@ -58,6 +67,8 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
private static final List<FileFilter> filtersList = new ArrayList<>(); private static final List<FileFilter> filtersList = new ArrayList<>();
private final ImageFilePanel configPanel; private final ImageFilePanel configPanel;
private AddImageTask addImageTask; private AddImageTask addImageTask;
private IngestStream ingestStream = null;
private Image image = null;
/* /*
* TODO: Remove the setDataSourceOptionsCalled flag and the settings fields * TODO: Remove the setDataSourceOptionsCalled flag and the settings fields
* when the deprecated method setDataSourceOptions is removed. * when the deprecated method setDataSourceOptions is removed.
@ -170,6 +181,68 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
*/ */
@Override @Override
public void run(DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) { public void run(DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) {
ingestStream = new DefaultIngestStream();
readConfigSettings();
try {
image = SleuthkitJNI.addImageToDatabase(Case.getCurrentCase().getSleuthkitCase(),
new String[]{imagePath}, sectorSize, timeZone, md5, sha1, sha256, deviceId);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error adding data source with path " + imagePath + " to database", ex);
final List<String> errors = new ArrayList<>();
errors.add(ex.getMessage());
callback.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
doAddImageProcess(deviceId, imagePath, sectorSize, timeZone, ignoreFatOrphanFiles, md5, sha1, sha256, progressMonitor, callback);
}
/**
* Adds a data source to the case database using a background task in a
* separate thread and the settings provided by the selection and
* configuration panel. Files found during ingest will be sent directly to the
* IngestStream provided. Returns as soon as the background task is started.
* The background task uses a callback object to signal task completion and
* return results.
*
* This method should not be called unless isPanelValid returns true, and
* should only be called for DSPs that support ingest streams.
*
* @param settings The ingest job settings.
* @param progress Progress monitor that will be used by the
* background task to report progress.
* @param callBack Callback that will be used by the background task
* to return results.
*/
@Override
public void runWithIngestStream(IngestJobSettings settings, DataSourceProcessorProgressMonitor progress,
DataSourceProcessorCallback callBack) {
// Read the settings from the wizard
readConfigSettings();
// Set up the data source before creating the ingest stream
try {
image = SleuthkitJNI.addImageToDatabase(Case.getCurrentCase().getSleuthkitCase(),
new String[]{imagePath}, sectorSize, timeZone, md5, sha1, sha256, deviceId);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error adding data source with path " + imagePath + " to database", ex);
final List<String> errors = new ArrayList<>();
errors.add(ex.getMessage());
callBack.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
// Now initialize the ingest stream
this.ingestStream = IngestManager.getInstance().openIngestStream(image, settings);
doAddImageProcess(deviceId, imagePath, sectorSize, timeZone, ignoreFatOrphanFiles, md5, sha1, sha256, progress, callBack);
}
/**
* Store the options from the config panel.
*/
private void readConfigSettings() {
if (!setDataSourceOptionsCalled) { if (!setDataSourceOptionsCalled) {
configPanel.storeSettings(); configPanel.storeSettings();
deviceId = UUID.randomUUID().toString(); deviceId = UUID.randomUUID().toString();
@ -190,8 +263,17 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
sha256 = null; sha256 = null;
} }
} }
run(deviceId, imagePath, sectorSize, timeZone, ignoreFatOrphanFiles, md5, sha1, sha256, progressMonitor, callback);
} }
/**
* Check if this DSP supports ingest streams.
*
* @return True if this DSP supports an ingest stream, false otherwise.
*/
@Override
public boolean supportsIngestStream() {
return true;
}
/** /**
* Adds a data source to the case database using a background task in a * Adds a data source to the case database using a background task in a
@ -215,7 +297,19 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
* @param callback Callback to call when processing is done. * @param callback Callback to call when processing is done.
*/ */
public void run(String deviceId, String imagePath, String timeZone, boolean ignoreFatOrphanFiles, DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) { public void run(String deviceId, String imagePath, String timeZone, boolean ignoreFatOrphanFiles, DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) {
run(deviceId, imagePath, 0, timeZone, ignoreFatOrphanFiles, null, null, null, progressMonitor, callback); ingestStream = new DefaultIngestStream();
try {
image = SleuthkitJNI.addImageToDatabase(Case.getCurrentCase().getSleuthkitCase(),
new String[]{imagePath}, sectorSize, timeZone, "", "", "", deviceId);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error adding data source with path " + imagePath + " to database", ex);
final List<String> errors = new ArrayList<>();
errors.add(ex.getMessage());
callback.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
doAddImageProcess(deviceId, imagePath, 0, timeZone, ignoreFatOrphanFiles, null, null, null, progressMonitor, callback);
} }
/** /**
@ -224,6 +318,10 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
* selection and configuration panel. Returns as soon as the background task * selection and configuration panel. Returns as soon as the background task
* is started and uses the callback object to signal task completion and * is started and uses the callback object to signal task completion and
* return results. * return results.
*
* The image should be loaded in the database and stored in "image"
* before calling this method. Additionally, an ingest stream should be initialized
* and stored in "ingestStream".
* *
* @param deviceId An ASCII-printable identifier for the device * @param deviceId An ASCII-printable identifier for the device
* associated with the data source that is * associated with the data source that is
@ -243,8 +341,31 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
* during processing. * during processing.
* @param callback Callback to call when processing is done. * @param callback Callback to call when processing is done.
*/ */
private void run(String deviceId, String imagePath, int sectorSize, String timeZone, boolean ignoreFatOrphanFiles, String md5, String sha1, String sha256, DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) { private void doAddImageProcess(String deviceId, String imagePath, int sectorSize, String timeZone, boolean ignoreFatOrphanFiles, String md5, String sha1, String sha256, DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) {
addImageTask = new AddImageTask(deviceId, imagePath, sectorSize, timeZone, ignoreFatOrphanFiles, md5, sha1, sha256, null, progressMonitor, callback);
// If the data source or ingest stream haven't been initialized, stop processing
if (ingestStream == null) {
String message = "Ingest stream was not initialized before running the add image process on " + imagePath;
logger.log(Level.SEVERE, message);
final List<String> errors = new ArrayList<>();
errors.add(message);
callback.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
if (image == null) {
String message = "Image was not added to database before running the add image process on " + imagePath;
logger.log(Level.SEVERE, message);
final List<String> errors = new ArrayList<>();
errors.add(message);
callback.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
AddImageTask.ImageDetails imageDetails = new AddImageTask.ImageDetails(deviceId, image, sectorSize, timeZone, ignoreFatOrphanFiles, md5, sha1, sha256, null);
addImageTask = new AddImageTask(imageDetails,
progressMonitor,
new StreamingAddDataSourceCallbacks(ingestStream),
new StreamingAddImageTaskCallback(ingestStream, callback));
new Thread(addImageTask).start(); new Thread(addImageTask).start();
} }
@ -260,6 +381,9 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
if (null != addImageTask) { if (null != addImageTask) {
addImageTask.cancelTask(); addImageTask.cancelTask();
} }
if (ingestStream != null) {
ingestStream.stop();
}
} }
/** /**
@ -316,7 +440,7 @@ public class ImageDSProcessor implements DataSourceProcessor, AutoIngestDataSour
this.timeZone = Calendar.getInstance().getTimeZone().getID(); this.timeZone = Calendar.getInstance().getTimeZone().getID();
this.ignoreFatOrphanFiles = false; this.ignoreFatOrphanFiles = false;
setDataSourceOptionsCalled = true; setDataSourceOptionsCalled = true;
run(deviceId, dataSourcePath.toString(), sectorSize, timeZone, ignoreFatOrphanFiles, null, null, null, progressMonitor, callBack); doAddImageProcess(deviceId, dataSourcePath.toString(), sectorSize, timeZone, ignoreFatOrphanFiles, null, null, null, progressMonitor, callBack);
} }
/** /**

View File

@ -18,15 +18,22 @@
*/ */
package org.sleuthkit.autopsy.casemodule; package org.sleuthkit.autopsy.casemodule;
import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.logging.Level;
import javax.swing.JPanel; import javax.swing.JPanel;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.openide.util.lookup.ServiceProvider; import org.openide.util.lookup.ServiceProvider;
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorCallback; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorCallback;
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorProgressMonitor; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorProgressMonitor;
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.imagewriter.ImageWriterSettings; import org.sleuthkit.autopsy.imagewriter.ImageWriterSettings;
import org.sleuthkit.datamodel.Image;
import org.sleuthkit.datamodel.SleuthkitJNI;
import org.sleuthkit.datamodel.TskCoreException;
/** /**
* A local drive data source processor that implements the DataSourceProcessor * A local drive data source processor that implements the DataSourceProcessor
@ -37,6 +44,7 @@ import org.sleuthkit.autopsy.imagewriter.ImageWriterSettings;
@ServiceProvider(service = DataSourceProcessor.class) @ServiceProvider(service = DataSourceProcessor.class)
public class LocalDiskDSProcessor implements DataSourceProcessor { public class LocalDiskDSProcessor implements DataSourceProcessor {
private final Logger logger = Logger.getLogger(LocalDiskDSProcessor.class.getName());
private static final String DATA_SOURCE_TYPE = NbBundle.getMessage(LocalDiskDSProcessor.class, "LocalDiskDSProcessor.dsType.text"); private static final String DATA_SOURCE_TYPE = NbBundle.getMessage(LocalDiskDSProcessor.class, "LocalDiskDSProcessor.dsType.text");
private final LocalDiskPanel configPanel; private final LocalDiskPanel configPanel;
private AddImageTask addDiskTask; private AddImageTask addDiskTask;
@ -139,7 +147,25 @@ public class LocalDiskDSProcessor implements DataSourceProcessor {
imageWriterSettings = null; imageWriterSettings = null;
} }
} }
addDiskTask = new AddImageTask(deviceId, drivePath, sectorSize, timeZone, ignoreFatOrphanFiles, null, null, null, imageWriterSettings, progressMonitor, callback);
Image image;
try {
image = SleuthkitJNI.addImageToDatabase(Case.getCurrentCase().getSleuthkitCase(),
new String[]{drivePath}, sectorSize,
timeZone, null, null, null, deviceId);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error adding local disk with path " + drivePath + " to database", ex);
final List<String> errors = new ArrayList<>();
errors.add(ex.getMessage());
callback.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
addDiskTask = new AddImageTask(
new AddImageTask.ImageDetails(deviceId, image, sectorSize, timeZone, ignoreFatOrphanFiles, null, null, null, imageWriterSettings),
progressMonitor,
new StreamingAddDataSourceCallbacks(new DefaultIngestStream()),
new StreamingAddImageTaskCallback(new DefaultIngestStream(), callback));
new Thread(addDiskTask).start(); new Thread(addDiskTask).start();
} }
@ -191,7 +217,23 @@ public class LocalDiskDSProcessor implements DataSourceProcessor {
* @param callback Callback to call when processing is done. * @param callback Callback to call when processing is done.
*/ */
private void run(String deviceId, String drivePath, int sectorSize, String timeZone, boolean ignoreFatOrphanFiles, DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) { private void run(String deviceId, String drivePath, int sectorSize, String timeZone, boolean ignoreFatOrphanFiles, DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback) {
addDiskTask = new AddImageTask(deviceId, drivePath, sectorSize, timeZone, ignoreFatOrphanFiles, null, null, null, imageWriterSettings, progressMonitor, callback); Image image;
try {
image = SleuthkitJNI.addImageToDatabase(Case.getCurrentCase().getSleuthkitCase(),
new String[]{drivePath}, sectorSize,
timeZone, null, null, null, deviceId);
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error adding local disk with path " + drivePath + " to database", ex);
final List<String> errors = new ArrayList<>();
errors.add(ex.getMessage());
callback.done(DataSourceProcessorCallback.DataSourceProcessorResult.CRITICAL_ERRORS, errors, new ArrayList<>());
return;
}
addDiskTask = new AddImageTask(new AddImageTask.ImageDetails(deviceId, image, sectorSize, timeZone, ignoreFatOrphanFiles, null, null, null, imageWriterSettings),
progressMonitor,
new StreamingAddDataSourceCallbacks(new DefaultIngestStream()),
new StreamingAddImageTaskCallback(new DefaultIngestStream(), callback));
new Thread(addDiskTask).start(); new Thread(addDiskTask).start();
} }

View File

@ -19,6 +19,7 @@
package org.sleuthkit.autopsy.corecomponentinterfaces; package org.sleuthkit.autopsy.corecomponentinterfaces;
import javax.swing.JPanel; import javax.swing.JPanel;
import org.sleuthkit.autopsy.ingest.IngestJobSettings;
/** /**
* Interface implemented by classes that add data sources of a particular type * Interface implemented by classes that add data sources of a particular type
@ -36,10 +37,6 @@ import javax.swing.JPanel;
* *
* Data source processors should perform all processing in a background task in * Data source processors should perform all processing in a background task in
* a separate thread, reporting results using a callback object. * a separate thread, reporting results using a callback object.
*
* It is recommended that implementers provide an overload of the run method
* that allows the data source processor to be run independently of the
* selection and configuration panel.
*/ */
public interface DataSourceProcessor { public interface DataSourceProcessor {
@ -111,6 +108,38 @@ public interface DataSourceProcessor {
* to return results. * to return results.
*/ */
void run(DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback); void run(DataSourceProcessorProgressMonitor progressMonitor, DataSourceProcessorCallback callback);
/**
* Adds a data source to the case database using a background task in a
* separate thread and the settings provided by the selection and
* configuration panel. Files found during ingest will be sent directly to
* the IngestStream provided. Returns as soon as the background task is
* started. The background task uses a callback object to signal task
* completion and return results.
*
* This method should not be called unless isPanelValid returns true, and
* should only be called for DSPs that support ingest streams. The ingest
* settings must be complete before calling this method.
*
* @param settings The ingest job settings.
* @param progress Progress monitor that will be used by the background task
* to report progress.
* @param callBack Callback that will be used by the background task to
* return results.
*/
default void runWithIngestStream(IngestJobSettings settings, DataSourceProcessorProgressMonitor progress,
DataSourceProcessorCallback callBack) {
throw new UnsupportedOperationException("Streaming ingest not supported for this data source processor");
}
/**
* Check if this DSP supports ingest streams.
*
* @return True if this DSP supports an ingest stream, false otherwise.
*/
default boolean supportsIngestStream() {
return false;
}
/** /**
* Requests cancellation of the background task that adds a data source to * Requests cancellation of the background task that adds a data source to

File diff suppressed because it is too large Load Diff

View File

@ -23,10 +23,10 @@ package org.sleuthkit.autopsy.ingest;
*/ */
public class DataSourceIngestModuleProgress { public class DataSourceIngestModuleProgress {
private final DataSourceIngestJob job; private final IngestJobPipeline ingestJobPipeline;
DataSourceIngestModuleProgress(DataSourceIngestJob job) { DataSourceIngestModuleProgress(IngestJobPipeline pipeline) {
this.job = job; this.ingestJobPipeline = pipeline;
} }
/** /**
@ -38,7 +38,7 @@ public class DataSourceIngestModuleProgress {
* data source. * data source.
*/ */
public void switchToDeterminate(int workUnits) { public void switchToDeterminate(int workUnits) {
this.job.switchDataSourceIngestProgressBarToDeterminate(workUnits); this.ingestJobPipeline.switchDataSourceIngestProgressBarToDeterminate(workUnits);
} }
/** /**
@ -46,7 +46,7 @@ public class DataSourceIngestModuleProgress {
* the total work units to process the data source is unknown. * the total work units to process the data source is unknown.
*/ */
public void switchToIndeterminate() { public void switchToIndeterminate() {
this.job.switchDataSourceIngestProgressBarToIndeterminate(); this.ingestJobPipeline.switchDataSourceIngestProgressBarToIndeterminate();
} }
/** /**
@ -56,7 +56,7 @@ public class DataSourceIngestModuleProgress {
* @param workUnits Number of work units performed so far by the module. * @param workUnits Number of work units performed so far by the module.
*/ */
public void progress(int workUnits) { public void progress(int workUnits) {
this.job.advanceDataSourceIngestProgressBar("", workUnits); this.ingestJobPipeline.advanceDataSourceIngestProgressBar("", workUnits);
} }
/** /**
@ -65,7 +65,7 @@ public class DataSourceIngestModuleProgress {
* @param message Message to display * @param message Message to display
*/ */
public void progress(String message) { public void progress(String message) {
this.job.advanceDataSourceIngestProgressBar(message); this.ingestJobPipeline.advanceDataSourceIngestProgressBar(message);
} }
/** /**
@ -76,7 +76,7 @@ public class DataSourceIngestModuleProgress {
* @param workUnits Number of work units performed so far by the module. * @param workUnits Number of work units performed so far by the module.
*/ */
public void progress(String currentTask, int workUnits) { public void progress(String currentTask, int workUnits) {
this.job.advanceDataSourceIngestProgressBar(currentTask, workUnits); this.ingestJobPipeline.advanceDataSourceIngestProgressBar(currentTask, workUnits);
} }
} }

View File

@ -24,12 +24,11 @@ import java.util.List;
import java.util.logging.Level; import java.util.logging.Level;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
/** /**
* This class manages a sequence of data source level ingest modules for a data * This class manages a sequence of data source level ingest modules for an
* source ingest job. It starts the modules, runs data sources through them, and * ingestJobPipeline. It starts the modules, runs data sources through them, and
* shuts them down when data source level ingest is complete. * shuts them down when data source level ingest is complete.
* <p> * <p>
* This class is thread-safe. * This class is thread-safe.
@ -38,7 +37,7 @@ final class DataSourceIngestPipeline {
private static final IngestManager ingestManager = IngestManager.getInstance(); private static final IngestManager ingestManager = IngestManager.getInstance();
private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName()); private static final Logger logger = Logger.getLogger(DataSourceIngestPipeline.class.getName());
private final DataSourceIngestJob job; private final IngestJobPipeline ingestJobPipeline;
private final List<PipelineModule> modules = new ArrayList<>(); private final List<PipelineModule> modules = new ArrayList<>();
private volatile PipelineModule currentModule; private volatile PipelineModule currentModule;
@ -47,13 +46,12 @@ final class DataSourceIngestPipeline {
* modules. It starts the modules, runs data sources through them, and shuts * modules. It starts the modules, runs data sources through them, and shuts
* them down when data source level ingest is complete. * them down when data source level ingest is complete.
* *
* @param job The data source ingest job that owns this * @param ingestJobPipeline The ingestJobPipeline that owns this pipeline.
* pipeline.
* @param moduleTemplates Templates for the creating the ingest modules that * @param moduleTemplates Templates for the creating the ingest modules that
* make up this pipeline. * make up this pipeline.
*/ */
DataSourceIngestPipeline(DataSourceIngestJob job, List<IngestModuleTemplate> moduleTemplates) { DataSourceIngestPipeline(IngestJobPipeline ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
this.job = job; this.ingestJobPipeline = ingestJobPipeline;
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
if (template.isDataSourceIngestModuleTemplate()) { if (template.isDataSourceIngestModuleTemplate()) {
PipelineModule module = new PipelineModule(template.createDataSourceIngestModule(), template.getModuleName()); PipelineModule module = new PipelineModule(template.createDataSourceIngestModule(), template.getModuleName());
@ -80,7 +78,7 @@ final class DataSourceIngestPipeline {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (PipelineModule module : modules) { for (PipelineModule module : modules) {
try { try {
module.startUp(new IngestJobContext(this.job)); module.startUp(new IngestJobContext(this.ingestJobPipeline));
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
@ -98,7 +96,7 @@ final class DataSourceIngestPipeline {
*/ */
synchronized List<IngestModuleError> process(DataSourceIngestTask task) { synchronized List<IngestModuleError> process(DataSourceIngestTask task) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
if (!this.job.isCancelled()) { if (!this.ingestJobPipeline.isCancelled()) {
Content dataSource = task.getDataSource(); Content dataSource = task.getDataSource();
for (PipelineModule module : modules) { for (PipelineModule module : modules) {
try { try {
@ -106,19 +104,19 @@ final class DataSourceIngestPipeline {
String displayName = NbBundle.getMessage(this.getClass(), String displayName = NbBundle.getMessage(this.getClass(),
"IngestJob.progress.dataSourceIngest.displayName", "IngestJob.progress.dataSourceIngest.displayName",
module.getDisplayName(), dataSource.getName()); module.getDisplayName(), dataSource.getName());
this.job.updateDataSourceIngestProgressBarDisplayName(displayName); this.ingestJobPipeline.updateDataSourceIngestProgressBarDisplayName(displayName);
this.job.switchDataSourceIngestProgressBarToIndeterminate(); this.ingestJobPipeline.switchDataSourceIngestProgressBarToIndeterminate();
DataSourceIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName()); DataSourceIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName());
logger.log(Level.INFO, "{0} analysis of {1} (jobId={2}) starting", new Object[]{module.getDisplayName(), this.job.getDataSource().getName(), this.job.getId()}); //NON-NLS logger.log(Level.INFO, "{0} analysis of {1} (pipeline={2}) starting", new Object[]{module.getDisplayName(), ingestJobPipeline.getDataSource().getName(), ingestJobPipeline.getId()}); //NON-NLS
module.process(dataSource, new DataSourceIngestModuleProgress(this.job)); module.process(dataSource, new DataSourceIngestModuleProgress(this.ingestJobPipeline));
logger.log(Level.INFO, "{0} analysis of {1} (jobId={2}) finished", new Object[]{module.getDisplayName(), this.job.getDataSource().getName(), this.job.getId()}); //NON-NLS logger.log(Level.INFO, "{0} analysis of {1} (pipeline={2}) finished", new Object[]{module.getDisplayName(), ingestJobPipeline.getDataSource().getName(), ingestJobPipeline.getId()}); //NON-NLS
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
if (this.job.isCancelled()) { if (this.ingestJobPipeline.isCancelled()) {
break; break;
} else if (this.job.currentDataSourceIngestModuleIsCancelled()) { } else if (this.ingestJobPipeline.currentDataSourceIngestModuleIsCancelled()) {
this.job.currentDataSourceIngestModuleCancellationCompleted(currentModule.getDisplayName()); this.ingestJobPipeline.currentDataSourceIngestModuleCancellationCompleted(currentModule.getDisplayName());
} }
} }
} }

View File

@ -20,13 +20,13 @@ package org.sleuthkit.autopsy.ingest;
final class DataSourceIngestTask extends IngestTask { final class DataSourceIngestTask extends IngestTask {
DataSourceIngestTask(DataSourceIngestJob job) { DataSourceIngestTask(IngestJobPipeline ingestJobPipeline) {
super(job); super(ingestJobPipeline);
} }
@Override @Override
void execute(long threadId) throws InterruptedException { void execute(long threadId) throws InterruptedException {
super.setThreadId(threadId); super.setThreadId(threadId);
getIngestJob().process(this); getIngestJobPipeline().process(this);
} }
} }

View File

@ -24,15 +24,14 @@ import java.util.List;
import java.util.logging.Level; import java.util.logging.Level;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil; import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.TskCoreException; import org.sleuthkit.datamodel.TskCoreException;
/** /**
* This class manages a sequence of file level ingest modules for a data source * This class manages a sequence of file level ingest modules for an
* ingest job. It starts the modules, runs files through them, and shuts them * ingest job pipeline. It starts the modules, runs files through them, and shuts them
* down when file level ingest is complete. * down when file level ingest is complete.
* <p> * <p>
* This class is thread-safe. * This class is thread-safe.
@ -40,7 +39,7 @@ import org.sleuthkit.datamodel.TskCoreException;
final class FileIngestPipeline { final class FileIngestPipeline {
private static final IngestManager ingestManager = IngestManager.getInstance(); private static final IngestManager ingestManager = IngestManager.getInstance();
private final DataSourceIngestJob job; private final IngestJobPipeline ingestJobPipeline;
private final List<PipelineModule> modules = new ArrayList<>(); private final List<PipelineModule> modules = new ArrayList<>();
private Date startTime; private Date startTime;
private volatile boolean running; private volatile boolean running;
@ -50,12 +49,12 @@ final class FileIngestPipeline {
* modules. It starts the modules, runs files through them, and shuts them * modules. It starts the modules, runs files through them, and shuts them
* down when file level ingest is complete. * down when file level ingest is complete.
* *
* @param job The data source ingest job that owns the pipeline. * @param ingestJobPipeline The ingestJobPipeline that owns the pipeline.
* @param moduleTemplates The ingest module templates that define the * @param moduleTemplates The ingest module templates that define the
* pipeline. * pipeline.
*/ */
FileIngestPipeline(DataSourceIngestJob job, List<IngestModuleTemplate> moduleTemplates) { FileIngestPipeline(IngestJobPipeline ingestJobPipeline, List<IngestModuleTemplate> moduleTemplates) {
this.job = job; this.ingestJobPipeline = ingestJobPipeline;
for (IngestModuleTemplate template : moduleTemplates) { for (IngestModuleTemplate template : moduleTemplates) {
if (template.isFileIngestModuleTemplate()) { if (template.isFileIngestModuleTemplate()) {
PipelineModule module = new PipelineModule(template.createFileIngestModule(), template.getModuleName()); PipelineModule module = new PipelineModule(template.createFileIngestModule(), template.getModuleName());
@ -103,7 +102,7 @@ final class FileIngestPipeline {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (PipelineModule module : this.modules) { for (PipelineModule module : this.modules) {
try { try {
module.startUp(new IngestJobContext(this.job)); module.startUp(new IngestJobContext(this.ingestJobPipeline));
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
@ -120,22 +119,31 @@ final class FileIngestPipeline {
*/ */
synchronized List<IngestModuleError> process(FileIngestTask task) { synchronized List<IngestModuleError> process(FileIngestTask task) {
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
if (!this.job.isCancelled()) { if (!this.ingestJobPipeline.isCancelled()) {
AbstractFile file = task.getFile(); AbstractFile file;
try {
file = task.getFile();
} catch (TskCoreException ex) {
// In practice, this task would never have been enqueued since the file
// lookup would have failed there.
errors.add(new IngestModuleError("File Ingest Pipeline", ex)); // NON-NLS
FileIngestPipeline.ingestManager.setIngestTaskProgressCompleted(task);
return errors;
}
for (PipelineModule module : this.modules) { for (PipelineModule module : this.modules) {
try { try {
FileIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName()); FileIngestPipeline.ingestManager.setIngestTaskProgress(task, module.getDisplayName());
this.job.setCurrentFileIngestModule(module.getDisplayName(), task.getFile().getName()); this.ingestJobPipeline.setCurrentFileIngestModule(module.getDisplayName(), task.getFile().getName());
module.process(file); module.process(file);
} catch (Throwable ex) { // Catch-all exception firewall } catch (Throwable ex) { // Catch-all exception firewall
errors.add(new IngestModuleError(module.getDisplayName(), ex)); errors.add(new IngestModuleError(module.getDisplayName(), ex));
} }
if (this.job.isCancelled()) { if (this.ingestJobPipeline.isCancelled()) {
break; break;
} }
} }
if (!this.job.isCancelled()) { if (!this.ingestJobPipeline.isCancelled()) {
// Save any properties that have not already been saved to the database // Save any properties that have not already been saved to the database
try{ try{
file.save(); file.save();

View File

@ -19,29 +19,45 @@
package org.sleuthkit.autopsy.ingest; package org.sleuthkit.autopsy.ingest;
import java.util.Objects; import java.util.Objects;
import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.TskCoreException;
/** /**
* Represents a single file analysis task, which is defined by a file to analyze * Represents a single file analysis task, which is defined by a file to analyze
* and the InjestJob/Pipeline to run it on. * and the InjestJob/Pipeline to run it on.
*/ */
final class FileIngestTask extends IngestTask { final class FileIngestTask extends IngestTask {
private final long fileId;
private AbstractFile file = null;
private final AbstractFile file; FileIngestTask(IngestJobPipeline ingestJobPipeline, AbstractFile file) {
super(ingestJobPipeline);
FileIngestTask(DataSourceIngestJob job, AbstractFile file) {
super(job);
this.file = file; this.file = file;
fileId = file.getId();
}
FileIngestTask(IngestJobPipeline ingestJobPipeline, long fileId) {
super(ingestJobPipeline);
this.fileId = fileId;
}
long getFileId() {
return fileId;
} }
AbstractFile getFile() { synchronized AbstractFile getFile() throws TskCoreException {
if (file == null) {
file = Case.getCurrentCase().getSleuthkitCase().getAbstractFileById(fileId);
}
return file; return file;
} }
@Override @Override
void execute(long threadId) throws InterruptedException { void execute(long threadId) throws InterruptedException {
super.setThreadId(threadId); super.setThreadId(threadId);
getIngestJob().process(this); getIngestJobPipeline().process(this);
} }
@Override @Override
@ -53,22 +69,19 @@ final class FileIngestTask extends IngestTask {
return false; return false;
} }
FileIngestTask other = (FileIngestTask) obj; FileIngestTask other = (FileIngestTask) obj;
DataSourceIngestJob job = getIngestJob(); IngestJobPipeline thisPipeline = getIngestJobPipeline();
DataSourceIngestJob otherJob = other.getIngestJob(); IngestJobPipeline otherPipeline = other.getIngestJobPipeline();
if (job != otherJob && (job == null || !job.equals(otherJob))) { if (thisPipeline != otherPipeline && (thisPipeline == null || !thisPipeline.equals(otherPipeline))) {
return false; return false;
} }
if (this.file != other.file && (this.file == null || !this.file.equals(other.file))) { return (this.fileId == other.fileId);
return false;
}
return true;
} }
@Override @Override
public int hashCode() { public int hashCode() {
int hash = 5; int hash = 5;
hash = 47 * hash + Objects.hashCode(getIngestJob()); hash = 47 * hash + Objects.hashCode(getIngestJobPipeline());
hash = 47 * hash + Objects.hashCode(this.file); hash = 47 * hash + Objects.hashCode(this.fileId);
return hash; return hash;
} }
} }

View File

@ -19,6 +19,7 @@
package org.sleuthkit.autopsy.ingest; package org.sleuthkit.autopsy.ingest;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
@ -27,10 +28,15 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.DataSource;
import org.sleuthkit.datamodel.TskCoreException;
import org.sleuthkit.datamodel.TskDataException;
/** /**
* Analyzes one or more data sources using a set of ingest modules specified via * Analyzes one or more data sources using a set of ingest modules specified via
@ -60,30 +66,37 @@ public final class IngestJob {
return displayName; return displayName;
} }
} }
enum Mode {
BATCH,
STREAMING
}
private static final Logger logger = Logger.getLogger(IngestJob.class.getName());
private final static AtomicLong nextId = new AtomicLong(0L); private final static AtomicLong nextId = new AtomicLong(0L);
private final long id; private final long id;
private final Map<Long, DataSourceIngestJob> dataSourceJobs; private final List<Content> dataSources = new ArrayList<>();
private final List<AbstractFile> files = new ArrayList<>();
private final Mode ingestMode;
private final Map<Long, IngestJobPipeline> ingestJobPipelines;
private final AtomicInteger incompleteJobsCount; private final AtomicInteger incompleteJobsCount;
private final IngestJobSettings settings;
private volatile CancellationReason cancellationReason; private volatile CancellationReason cancellationReason;
/** /**
* Constructs an ingest job that analyzes one or more data sources using a * Constructs an ingest job that analyzes one or more data sources using a
* set of ingest modules specified via ingest job settings. * set of ingest modules specified via ingest settings.
* *
* @param dataSources The data sources to be ingested. * @param dataSources The data sources to be ingested.
* @param settings The ingest job settings. * @param settings The ingest settings.
* @param doUI Whether or not this job should use progress bars,
* message boxes for errors, etc.
*/ */
IngestJob(Collection<Content> dataSources, IngestJobSettings settings, boolean doUI) { IngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
this.id = IngestJob.nextId.getAndIncrement(); this.id = IngestJob.nextId.getAndIncrement();
this.dataSourceJobs = new ConcurrentHashMap<>(); this.settings = settings;
for (Content dataSource : dataSources) { this.ingestJobPipelines = new ConcurrentHashMap<>();
DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, settings, doUI); this.ingestMode = Mode.BATCH;
this.dataSourceJobs.put(dataSourceIngestJob.getId(), dataSourceIngestJob); this.dataSources.addAll(dataSources);
} incompleteJobsCount = new AtomicInteger(dataSources.size());
incompleteJobsCount = new AtomicInteger(dataSourceJobs.size());
cancellationReason = CancellationReason.NOT_CANCELLED; cancellationReason = CancellationReason.NOT_CANCELLED;
} }
@ -92,18 +105,28 @@ public final class IngestJob {
* ingest modules specified via ingest job settings. Either all of the files * ingest modules specified via ingest job settings. Either all of the files
* in the data source or a given subset of the files will be analyzed. * in the data source or a given subset of the files will be analyzed.
* *
* @param dataSource The data source to be analyzed * @param dataSource The data source to be analyzed.
* @param files A subset of the files for the data source. * @param files A subset of the files for the data source.
* @param settings The ingest job settings. * @param settings The ingest job settings.
* @param doUI Whether or not this job should use progress bars,
* message boxes for errors, etc.
*/ */
IngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings, boolean doUI) { IngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
this(Arrays.asList(dataSource), settings);
this.files.addAll(files);
}
/**
* Constructs an ingest job that analyzes one data source, possibly using
* an ingest stream.
*
* @param settings The ingest job settings.
*/
IngestJob(DataSource dataSource, Mode ingestMode, IngestJobSettings settings) {
this.id = IngestJob.nextId.getAndIncrement(); this.id = IngestJob.nextId.getAndIncrement();
this.dataSourceJobs = new ConcurrentHashMap<>(); this.ingestJobPipelines = new ConcurrentHashMap<>();
DataSourceIngestJob dataSourceIngestJob = new DataSourceIngestJob(this, dataSource, files, settings, doUI); this.dataSources.add(dataSource);
this.dataSourceJobs.put(dataSourceIngestJob.getId(), dataSourceIngestJob); this.settings = settings;
incompleteJobsCount = new AtomicInteger(dataSourceJobs.size()); this.ingestMode = ingestMode;
incompleteJobsCount = new AtomicInteger(1);
cancellationReason = CancellationReason.NOT_CANCELLED; cancellationReason = CancellationReason.NOT_CANCELLED;
} }
@ -124,18 +147,35 @@ public final class IngestJob {
* @return True or false. * @return True or false.
*/ */
boolean hasIngestPipeline() { boolean hasIngestPipeline() {
/** return (!settings.getEnabledIngestModuleTemplates().isEmpty());
* TODO: This could actually be done more simply by adding a method to }
* the IngestJobSettings to check for at least one enabled ingest module
* template. The test could then be done in the ingest manager before /**
* even constructing an ingest job. * Add a set of files (by object ID) to be ingested.
*/ *
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { * @param fileObjIds the list of file IDs
if (dataSourceJob.hasIngestPipeline()) { */
return true; void addStreamingIngestFiles(List<Long> fileObjIds) {
} if (ingestJobPipelines.isEmpty()) {
logger.log(Level.SEVERE, "Attempted to add streaming ingest files with no IngestJobPipeline");
return;
} }
return false; // Streaming ingest jobs will only have one data source
IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next();
streamingIngestPipeline.addStreamingIngestFiles(fileObjIds);
}
/**
* Start data source processing for streaming ingest.
*/
void processStreamingIngestDataSource() {
if (ingestJobPipelines.isEmpty()) {
logger.log(Level.SEVERE, "Attempted to start data source ingest with no IngestJobPipeline");
return;
}
// Streaming ingest jobs will only have one data source
IngestJobPipeline streamingIngestPipeline = ingestJobPipelines.values().iterator().next();
streamingIngestPipeline.processStreamingIngestDataSource();
} }
/** /**
@ -145,17 +185,32 @@ public final class IngestJob {
* @return A collection of ingest module start up errors, empty on success. * @return A collection of ingest module start up errors, empty on success.
*/ */
List<IngestModuleError> start() { List<IngestModuleError> start() {
/* /*
* Try to start each data source ingest job. Note that there is a not * Set up the pipeline(s)
* unwarranted assumption here that if there is going to be a module */
* startup failure, it will be for the first data source ingest job. if (files.isEmpty()) {
for (Content dataSource : dataSources) {
IngestJobPipeline ingestJobPipeline = new IngestJobPipeline(this, dataSource, settings);
this.ingestJobPipelines.put(ingestJobPipeline.getId(), ingestJobPipeline);
}
} else {
IngestJobPipeline ingestJobPipeline = new IngestJobPipeline(this, dataSources.get(0), files, settings);
this.ingestJobPipelines.put(ingestJobPipeline.getId(), ingestJobPipeline);
}
incompleteJobsCount.set(ingestJobPipelines.size());
/*
* Try to start each data source ingest job. Note that there is an
* assumption here that if there is going to be a module
* startup failure, it will be for the first ingest job pipeline.
* *
* TODO (RC): Consider separating module start up from pipeline startup * TODO (RC): Consider separating module start up from pipeline startup
* so that no processing is done if this assumption is false. * so that no processing is done if this assumption is false.
*/ */
List<IngestModuleError> errors = new ArrayList<>(); List<IngestModuleError> errors = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { for (IngestJobPipeline ingestJobPipeline : this.ingestJobPipelines.values()) {
errors.addAll(dataSourceJob.start()); errors.addAll(ingestJobPipeline.start());
if (errors.isEmpty() == false) { if (errors.isEmpty() == false) {
break; break;
} }
@ -165,7 +220,7 @@ public final class IngestJob {
* Handle start up success or failure. * Handle start up success or failure.
*/ */
if (errors.isEmpty()) { if (errors.isEmpty()) {
for (DataSourceIngestJob dataSourceJob : this.dataSourceJobs.values()) { for (IngestJobPipeline dataSourceJob : this.ingestJobPipelines.values()) {
IngestManager.getInstance().fireDataSourceAnalysisStarted(id, dataSourceJob.getId(), dataSourceJob.getDataSource()); IngestManager.getInstance().fireDataSourceAnalysisStarted(id, dataSourceJob.getId(), dataSourceJob.getDataSource());
} }
} else { } else {
@ -174,6 +229,15 @@ public final class IngestJob {
return errors; return errors;
} }
/**
* Get the ingest mode for this job (batch or streaming).
*
* @return the ingest mode.
*/
Mode getIngestMode() {
return ingestMode;
}
/** /**
* Gets a snapshot of the progress of this ingest job. * Gets a snapshot of the progress of this ingest job.
@ -187,6 +251,8 @@ public final class IngestJob {
/** /**
* Gets a snapshot of the progress of this ingest job. * Gets a snapshot of the progress of this ingest job.
* *
* @param getIngestTasksSnapshot
*
* @return The snapshot. * @return The snapshot.
*/ */
public ProgressSnapshot getSnapshot(boolean getIngestTasksSnapshot) { public ProgressSnapshot getSnapshot(boolean getIngestTasksSnapshot) {
@ -199,9 +265,9 @@ public final class IngestJob {
* *
* @return A list of data source ingest job progress snapshots. * @return A list of data source ingest job progress snapshots.
*/ */
List<DataSourceIngestJob.Snapshot> getDataSourceIngestJobSnapshots() { List<Snapshot> getDataSourceIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapshots = new ArrayList<>(); List<Snapshot> snapshots = new ArrayList<>();
this.dataSourceJobs.values().stream().forEach((dataSourceJob) -> { this.ingestJobPipelines.values().stream().forEach((dataSourceJob) -> {
snapshots.add(dataSourceJob.getSnapshot(true)); snapshots.add(dataSourceJob.getSnapshot(true));
}); });
return snapshots; return snapshots;
@ -230,7 +296,7 @@ public final class IngestJob {
*/ */
public void cancel(CancellationReason reason) { public void cancel(CancellationReason reason) {
this.cancellationReason = reason; this.cancellationReason = reason;
this.dataSourceJobs.values().stream().forEach((job) -> { this.ingestJobPipelines.values().stream().forEach((job) -> {
job.cancel(reason); job.cancel(reason);
}); });
} }
@ -255,17 +321,17 @@ public final class IngestJob {
} }
/** /**
* Provides a callback for completed data source ingest jobs, allowing this * Provides a callback for completed ingest job pipeline, allowing this
* ingest job to notify the ingest manager when it is complete. * ingest job to notify the ingest manager when it is complete.
* *
* @param job A completed data source ingest job. * @param ingestJobPipeline A completed ingestJobPipeline.
*/ */
void dataSourceJobFinished(DataSourceIngestJob job) { void ingestJobPipelineFinished(IngestJobPipeline ingestJobPipeline) {
IngestManager ingestManager = IngestManager.getInstance(); IngestManager ingestManager = IngestManager.getInstance();
if (!job.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
ingestManager.fireDataSourceAnalysisCompleted(id, job.getId(), job.getDataSource()); ingestManager.fireDataSourceAnalysisCompleted(id, ingestJobPipeline.getId(), ingestJobPipeline.getDataSource());
} else { } else {
IngestManager.getInstance().fireDataSourceAnalysisCancelled(id, job.getId(), job.getDataSource()); IngestManager.getInstance().fireDataSourceAnalysisCancelled(id, ingestJobPipeline.getId(), ingestJobPipeline.getDataSource());
} }
if (incompleteJobsCount.decrementAndGet() == 0) { if (incompleteJobsCount.decrementAndGet() == 0) {
ingestManager.finishIngestJob(this); ingestManager.finishIngestJob(this);
@ -290,9 +356,9 @@ public final class IngestJob {
*/ */
public final class DataSourceProcessingSnapshot { public final class DataSourceProcessingSnapshot {
private final DataSourceIngestJob.Snapshot snapshot; private final Snapshot snapshot;
private DataSourceProcessingSnapshot(DataSourceIngestJob.Snapshot snapshot) { private DataSourceProcessingSnapshot(Snapshot snapshot) {
this.snapshot = snapshot; this.snapshot = snapshot;
} }
@ -346,13 +412,13 @@ public final class IngestJob {
fileIngestRunning = false; fileIngestRunning = false;
fileIngestStartTime = null; fileIngestStartTime = null;
dataSourceProcessingSnapshots = new ArrayList<>(); dataSourceProcessingSnapshots = new ArrayList<>();
for (DataSourceIngestJob dataSourceJob : dataSourceJobs.values()) { for (IngestJobPipeline pipeline : ingestJobPipelines.values()) {
DataSourceIngestJob.Snapshot snapshot = dataSourceJob.getSnapshot(getIngestTasksSnapshot); Snapshot snapshot = pipeline.getSnapshot(getIngestTasksSnapshot);
dataSourceProcessingSnapshots.add(new DataSourceProcessingSnapshot(snapshot)); dataSourceProcessingSnapshots.add(new DataSourceProcessingSnapshot(snapshot));
if (null == dataSourceModule) { if (null == dataSourceModule) {
DataSourceIngestPipeline.PipelineModule module = snapshot.getDataSourceLevelIngestModule(); DataSourceIngestPipeline.PipelineModule module = snapshot.getDataSourceLevelIngestModule();
if (null != module) { if (null != module) {
dataSourceModule = new DataSourceIngestModuleHandle(dataSourceJobs.get(snapshot.getJobId()), module); dataSourceModule = new DataSourceIngestModuleHandle(ingestJobPipelines.get(snapshot.getJobId()), module);
} }
} }
if (snapshot.getFileIngestIsRunning()) { if (snapshot.getFileIngestIsRunning()) {
@ -433,7 +499,7 @@ public final class IngestJob {
*/ */
public static class DataSourceIngestModuleHandle { public static class DataSourceIngestModuleHandle {
private final DataSourceIngestJob job; private final IngestJobPipeline ingestJobPipeline;
private final DataSourceIngestPipeline.PipelineModule module; private final DataSourceIngestPipeline.PipelineModule module;
private final boolean cancelled; private final boolean cancelled;
@ -442,14 +508,13 @@ public final class IngestJob {
* used to get basic information about the module and to request * used to get basic information about the module and to request
* cancellation of the module. * cancellation of the module.
* *
* @param job The data source ingest job that owns the data source * @param ingestJobPipeline The ingestJobPipeline that owns the data source level ingest module.
* level ingest module.
* @param module The data source level ingest module. * @param module The data source level ingest module.
*/ */
private DataSourceIngestModuleHandle(DataSourceIngestJob job, DataSourceIngestPipeline.PipelineModule module) { private DataSourceIngestModuleHandle(IngestJobPipeline ingestJobPipeline, DataSourceIngestPipeline.PipelineModule module) {
this.job = job; this.ingestJobPipeline = ingestJobPipeline;
this.module = module; this.module = module;
this.cancelled = job.currentDataSourceIngestModuleIsCancelled(); this.cancelled = ingestJobPipeline.currentDataSourceIngestModuleIsCancelled();
} }
/** /**
@ -499,8 +564,8 @@ public final class IngestJob {
* modules participating in this workaround will need to consult the * modules participating in this workaround will need to consult the
* cancelled flag in the adapters. * cancelled flag in the adapters.
*/ */
if (this.job.getCurrentDataSourceIngestModule() == this.module) { if (this.ingestJobPipeline.getCurrentDataSourceIngestModule() == this.module) {
this.job.cancelCurrentDataSourceIngestModule(); this.ingestJobPipeline.cancelCurrentDataSourceIngestModule();
} }
} }

View File

@ -28,10 +28,10 @@ import org.sleuthkit.datamodel.Content;
*/ */
public final class IngestJobContext { public final class IngestJobContext {
private final DataSourceIngestJob ingestJob; private final IngestJobPipeline ingestJobPipeline;
IngestJobContext(DataSourceIngestJob ingestJob) { IngestJobContext(IngestJobPipeline ingestJobPipeline) {
this.ingestJob = ingestJob; this.ingestJobPipeline = ingestJobPipeline;
} }
/** /**
@ -40,7 +40,7 @@ public final class IngestJobContext {
* @return The context string. * @return The context string.
*/ */
public String getExecutionContext() { public String getExecutionContext() {
return this.ingestJob.getExecutionContext(); return this.ingestJobPipeline.getExecutionContext();
} }
/** /**
@ -49,7 +49,7 @@ public final class IngestJobContext {
* @return The data source. * @return The data source.
*/ */
public Content getDataSource() { public Content getDataSource() {
return this.ingestJob.getDataSource(); return this.ingestJobPipeline.getDataSource();
} }
/** /**
@ -58,7 +58,7 @@ public final class IngestJobContext {
* @return The ingest job identifier. * @return The ingest job identifier.
*/ */
public long getJobId() { public long getJobId() {
return this.ingestJob.getId(); return this.ingestJobPipeline.getId();
} }
/** /**
@ -83,7 +83,7 @@ public final class IngestJobContext {
* @return True or false. * @return True or false.
*/ */
public boolean dataSourceIngestIsCancelled() { public boolean dataSourceIngestIsCancelled() {
return this.ingestJob.currentDataSourceIngestModuleIsCancelled() || this.ingestJob.isCancelled(); return this.ingestJobPipeline.currentDataSourceIngestModuleIsCancelled() || this.ingestJobPipeline.isCancelled();
} }
/** /**
@ -94,7 +94,7 @@ public final class IngestJobContext {
* @return True or false. * @return True or false.
*/ */
public boolean fileIngestIsCancelled() { public boolean fileIngestIsCancelled() {
return this.ingestJob.isCancelled(); return this.ingestJobPipeline.isCancelled();
} }
/** /**
@ -104,7 +104,7 @@ public final class IngestJobContext {
* @return True or false. * @return True or false.
*/ */
public boolean processingUnallocatedSpace() { public boolean processingUnallocatedSpace() {
return this.ingestJob.shouldProcessUnallocatedSpace(); return this.ingestJobPipeline.shouldProcessUnallocatedSpace();
} }
/** /**
@ -127,7 +127,7 @@ public final class IngestJobContext {
* @param files The files to be added. * @param files The files to be added.
*/ */
public void addFilesToJob(List<AbstractFile> files) { public void addFilesToJob(List<AbstractFile> files) {
this.ingestJob.addFiles(files); this.ingestJobPipeline.addFiles(files);
} }
} }

View File

@ -67,6 +67,8 @@ import org.sleuthkit.autopsy.ingest.events.DataSourceAnalysisStartedEvent;
import org.sleuthkit.autopsy.ingest.events.FileAnalyzedEvent; import org.sleuthkit.autopsy.ingest.events.FileAnalyzedEvent;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.DataSource;
import org.sleuthkit.datamodel.TskCoreException;
/** /**
* Manages the creation and execution of ingest jobs, i.e., the processing of * Manages the creation and execution of ingest jobs, i.e., the processing of
@ -285,6 +287,20 @@ public class IngestManager implements IngestProgressSnapshotProvider {
caseIsOpen = false; caseIsOpen = false;
clearIngestMessageBox(); clearIngestMessageBox();
} }
/**
* Creates an ingest stream from the given ingest settings for a data source.
*
* @param dataSource The data source
* @param settings The ingest job settings.
*
* @return The newly created ingest stream
*/
public IngestStream openIngestStream(DataSource dataSource, IngestJobSettings settings) {
IngestJob job = new IngestJob(dataSource, IngestJob.Mode.STREAMING, settings);
return new IngestJobInputStream(job);
}
/** /**
* Gets the number of file ingest threads the ingest manager is using to do * Gets the number of file ingest threads the ingest manager is using to do
@ -304,7 +320,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
*/ */
public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) { public void queueIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
if (caseIsOpen) { if (caseIsOpen) {
IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI()); IngestJob job = new IngestJob(dataSources, settings);
if (job.hasIngestPipeline()) { if (job.hasIngestPipeline()) {
long taskId = nextIngestManagerTaskId.incrementAndGet(); long taskId = nextIngestManagerTaskId.incrementAndGet();
Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job)); Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
@ -323,7 +339,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
*/ */
public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) { public void queueIngestJob(Content dataSource, List<AbstractFile> files, IngestJobSettings settings) {
if (caseIsOpen) { if (caseIsOpen) {
IngestJob job = new IngestJob(dataSource, files, settings, RuntimeProperties.runningWithGUI()); IngestJob job = new IngestJob(dataSource, files, settings);
if (job.hasIngestPipeline()) { if (job.hasIngestPipeline()) {
long taskId = nextIngestManagerTaskId.incrementAndGet(); long taskId = nextIngestManagerTaskId.incrementAndGet();
Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job)); Future<Void> task = startIngestJobsExecutor.submit(new StartIngestJobTask(taskId, job));
@ -333,7 +349,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
} }
/** /**
* Immdiately starts an ingest job for one or more data sources. * Immediately starts an ingest job for one or more data sources.
* *
* @param dataSources The data sources to process. * @param dataSources The data sources to process.
* @param settings The settings for the ingest job. * @param settings The settings for the ingest job.
@ -343,7 +359,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
*/ */
public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) { public IngestJobStartResult beginIngestJob(Collection<Content> dataSources, IngestJobSettings settings) {
if (caseIsOpen) { if (caseIsOpen) {
IngestJob job = new IngestJob(dataSources, settings, RuntimeProperties.runningWithGUI()); IngestJob job = new IngestJob(dataSources, settings);
if (job.hasIngestPipeline()) { if (job.hasIngestPipeline()) {
return startIngestJob(job); return startIngestJob(job);
} }
@ -366,7 +382,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
"IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.", "IngestManager.startupErr.dlgSolution=Please disable the failed modules or fix the errors before restarting ingest.",
"IngestManager.startupErr.dlgErrorList=Errors:" "IngestManager.startupErr.dlgErrorList=Errors:"
}) })
private IngestJobStartResult startIngestJob(IngestJob job) { IngestJobStartResult startIngestJob(IngestJob job) {
List<IngestModuleError> errors = null; List<IngestModuleError> errors = null;
Case openCase; Case openCase;
try { try {
@ -730,7 +746,7 @@ public class IngestManager implements IngestProgressSnapshotProvider {
* the task. * the task.
*/ */
void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) { void setIngestTaskProgress(DataSourceIngestTask task, String ingestModuleDisplayName) {
ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource())); ingestThreadActivitySnapshots.put(task.getThreadId(), new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), ingestModuleDisplayName, task.getDataSource()));
} }
/** /**
@ -746,7 +762,15 @@ public class IngestManager implements IngestProgressSnapshotProvider {
*/ */
void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) { void setIngestTaskProgress(FileIngestTask task, String ingestModuleDisplayName) {
IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId()); IngestThreadActivitySnapshot prevSnap = ingestThreadActivitySnapshots.get(task.getThreadId());
IngestThreadActivitySnapshot newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJob().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile()); IngestThreadActivitySnapshot newSnap;
try {
AbstractFile file = task.getFile();
newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), ingestModuleDisplayName, task.getDataSource(), task.getFile());
} catch (TskCoreException ex) {
// In practice, this task would never have been enqueued or processed since the file
// lookup would have failed.
newSnap = new IngestThreadActivitySnapshot(task.getThreadId(), task.getIngestJobPipeline().getId(), ingestModuleDisplayName, task.getDataSource());
}
ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap); ingestThreadActivitySnapshots.put(task.getThreadId(), newSnap);
incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime()); incrementModuleRunTime(prevSnap.getActivity(), newSnap.getStartTime().getTime() - prevSnap.getStartTime().getTime());
} }
@ -828,8 +852,8 @@ public class IngestManager implements IngestProgressSnapshotProvider {
* @return A list of ingest job state snapshots. * @return A list of ingest job state snapshots.
*/ */
@Override @Override
public List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots() { public List<Snapshot> getIngestJobSnapshots() {
List<DataSourceIngestJob.Snapshot> snapShots = new ArrayList<>(); List<Snapshot> snapShots = new ArrayList<>();
synchronized (ingestJobsById) { synchronized (ingestJobsById) {
ingestJobsById.values().forEach((job) -> { ingestJobsById.values().forEach((job) -> {
snapShots.addAll(job.getDataSourceIngestJobSnapshots()); snapShots.addAll(job.getDataSourceIngestJobSnapshots());

View File

@ -183,7 +183,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
"IngestJobTableModel.colName.rootQueued"), "IngestJobTableModel.colName.rootQueued"),
NbBundle.getMessage(this.getClass(), NbBundle.getMessage(this.getClass(),
"IngestJobTableModel.colName.dsQueued")}; "IngestJobTableModel.colName.dsQueued")};
private List<DataSourceIngestJob.Snapshot> jobSnapshots; private List<Snapshot> jobSnapshots;
private IngestJobTableModel() { private IngestJobTableModel() {
refresh(); refresh();
@ -211,7 +211,7 @@ class IngestProgressSnapshotPanel extends javax.swing.JPanel {
@Override @Override
public Object getValueAt(int rowIndex, int columnIndex) { public Object getValueAt(int rowIndex, int columnIndex) {
DataSourceIngestJob.Snapshot snapShot = jobSnapshots.get(rowIndex); Snapshot snapShot = jobSnapshots.get(rowIndex);
Object cellValue; Object cellValue;
switch (columnIndex) { switch (columnIndex) {
case 0: case 0:

View File

@ -38,7 +38,7 @@ public interface IngestProgressSnapshotProvider {
* *
* @return A list of ingest job snapshots. * @return A list of ingest job snapshots.
*/ */
List<DataSourceIngestJob.Snapshot> getIngestJobSnapshots(); List<Snapshot> getIngestJobSnapshots();
/** /**
* Gets the cumulative run times for the ingest module. * Gets the cumulative run times for the ingest module.

View File

@ -23,20 +23,20 @@ import org.sleuthkit.datamodel.Content;
abstract class IngestTask { abstract class IngestTask {
private final static long NOT_SET = Long.MIN_VALUE; private final static long NOT_SET = Long.MIN_VALUE;
private final DataSourceIngestJob job; private final IngestJobPipeline ingestJobPipeline;
private long threadId; private long threadId;
IngestTask(DataSourceIngestJob job) { IngestTask(IngestJobPipeline ingestJobPipeline) {
this.job = job; this.ingestJobPipeline = ingestJobPipeline;
threadId = NOT_SET; threadId = NOT_SET;
} }
DataSourceIngestJob getIngestJob() { IngestJobPipeline getIngestJobPipeline() {
return job; return ingestJobPipeline;
} }
Content getDataSource() { Content getDataSource() {
return getIngestJob().getDataSource(); return getIngestJobPipeline().getDataSource();
} }
long getThreadId() { long getThreadId() {

View File

@ -27,6 +27,7 @@ import java.util.Deque;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
@ -58,6 +59,8 @@ final class IngestTasksScheduler {
private final TreeSet<FileIngestTask> rootFileTaskQueue; private final TreeSet<FileIngestTask> rootFileTaskQueue;
@GuardedBy("this") @GuardedBy("this")
private final Deque<FileIngestTask> pendingFileTaskQueue; private final Deque<FileIngestTask> pendingFileTaskQueue;
@GuardedBy("this")
private final Queue<FileIngestTask> streamedTasksQueue;
private final IngestTaskTrackingQueue fileIngestThreadsQueue; private final IngestTaskTrackingQueue fileIngestThreadsQueue;
/** /**
@ -82,6 +85,7 @@ final class IngestTasksScheduler {
this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator()); this.rootFileTaskQueue = new TreeSet<>(new RootDirectoryTaskComparator());
this.pendingFileTaskQueue = new LinkedList<>(); this.pendingFileTaskQueue = new LinkedList<>();
this.fileIngestThreadsQueue = new IngestTaskTrackingQueue(); this.fileIngestThreadsQueue = new IngestTaskTrackingQueue();
this.streamedTasksQueue = new LinkedList<>();
} }
/** /**
@ -105,38 +109,38 @@ final class IngestTasksScheduler {
} }
/** /**
* Schedules a data source level ingest task and zero to many file level * Schedules a data source level ingest task and zero to many file level
* ingest tasks for a data source ingest job. * ingest tasks for an ingest job pipeline.
* *
* @param job The data source ingest job. * @param ingestJobPipeline The ingest job pipeline.
*/ */
synchronized void scheduleIngestTasks(DataSourceIngestJob job) { synchronized void scheduleIngestTasks(IngestJobPipeline ingestJobPipeline) {
if (!job.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
/* /*
* Scheduling of both the data source ingest task and the initial * Scheduling of both the data source ingest task and the initial
* file ingest tasks for a job must be an atomic operation. * file ingest tasks for an ingestJobPipeline must be an atomic operation.
* Otherwise, the data source task might be completed before the * Otherwise, the data source task might be completed before the
* file tasks are scheduled, resulting in a potential false positive * file tasks are scheduled, resulting in a potential false positive
* when another thread checks whether or not all the tasks for the * when another thread checks whether or not all the tasks for the
* job are completed. * ingestJobPipeline are completed.
*/ */
this.scheduleDataSourceIngestTask(job); this.scheduleDataSourceIngestTask(ingestJobPipeline);
this.scheduleFileIngestTasks(job, Collections.emptyList()); this.scheduleFileIngestTasks(ingestJobPipeline, Collections.emptyList());
} }
} }
/** /**
* Schedules a data source level ingest task for a data source ingest job. * Schedules a data source level ingest task for an ingest job pipeline.
* *
* @param job The data source ingest job. * @param ingestJobPipeline The ingest job pipeline.
*/ */
synchronized void scheduleDataSourceIngestTask(DataSourceIngestJob job) { synchronized void scheduleDataSourceIngestTask(IngestJobPipeline ingestJobPipeline) {
if (!job.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
DataSourceIngestTask task = new DataSourceIngestTask(job); DataSourceIngestTask task = new DataSourceIngestTask(ingestJobPipeline);
try { try {
this.dataSourceIngestThreadQueue.putLast(task); this.dataSourceIngestThreadQueue.putLast(task);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", job.getId()), ex); IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while blocked adding a task to the data source level ingest task queue (jobId={%d)", ingestJobPipeline.getId()), ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
@ -144,40 +148,59 @@ final class IngestTasksScheduler {
/** /**
* Schedules file tasks for either all the files or a given subset of the * Schedules file tasks for either all the files or a given subset of the
* files for a data source source ingest job. * files for an ingest job pipeline.
* *
* @param job The data source ingest job. * @param ingestJobPipeline The ingest job pipeline.
* @param files A subset of the files for the data source; if empty, then * @param files A subset of the files for the data source; if empty, then
* file tasks for all files in the data source are scheduled. * file tasks for all files in the data source are scheduled.
*/ */
synchronized void scheduleFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) { synchronized void scheduleFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
if (!job.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
Collection<AbstractFile> candidateFiles; Collection<AbstractFile> candidateFiles;
if (files.isEmpty()) { if (files.isEmpty()) {
candidateFiles = getTopLevelFiles(job.getDataSource()); candidateFiles = getTopLevelFiles(ingestJobPipeline.getDataSource());
} else { } else {
candidateFiles = files; candidateFiles = files;
} }
for (AbstractFile file : candidateFiles) { for (AbstractFile file : candidateFiles) {
FileIngestTask task = new FileIngestTask(job, file); FileIngestTask task = new FileIngestTask(ingestJobPipeline, file);
if (IngestTasksScheduler.shouldEnqueueFileTask(task)) { if (IngestTasksScheduler.shouldEnqueueFileTask(task)) {
this.rootFileTaskQueue.add(task); this.rootFileTaskQueue.add(task);
} }
} }
shuffleFileTaskQueues(); refillIngestThreadQueue();
} }
} }
/**
* Schedules file tasks for the given list of file IDs.
*
* @param ingestJobPipeline The ingest job pipeline.
* @param files A subset of the files for the data source; if empty, then
* file tasks for all files in the data source are scheduled.
*/
synchronized void scheduleStreamedFileIngestTasks(IngestJobPipeline ingestJobPipeline, List<Long> fileIds) {
if (!ingestJobPipeline.isCancelled()) {
for (long id : fileIds) {
// Create the file ingest task. Note that we do not do the shouldEnqueueFileTask()
// check here in order to delay loading the AbstractFile object.
FileIngestTask task = new FileIngestTask(ingestJobPipeline, id);
this.streamedTasksQueue.add(task);
}
refillIngestThreadQueue();
}
}
/** /**
* Schedules file level ingest tasks for a given set of files for a data * Schedules file level ingest tasks for a given set of files for an ingest
* source ingest job by adding them directly to the front of the file tasks * job pipeline by adding them directly to the front of the file tasks
* queue for the ingest manager's file ingest threads. * queue for the ingest manager's file ingest threads.
* *
* @param job The data source ingest job. * @param ingestJobPipeline The ingestJobPipeline.
* @param files A set of files for the data source. * @param files A set of files for the data source.
*/ */
synchronized void fastTrackFileIngestTasks(DataSourceIngestJob job, Collection<AbstractFile> files) { synchronized void fastTrackFileIngestTasks(IngestJobPipeline ingestJobPipeline, Collection<AbstractFile> files) {
if (!job.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
/* /*
* Put the files directly into the queue for the file ingest * Put the files directly into the queue for the file ingest
* threads, if they pass the file filter for the job. The files are * threads, if they pass the file filter for the job. The files are
@ -187,12 +210,12 @@ final class IngestTasksScheduler {
* already in progress. * already in progress.
*/ */
for (AbstractFile file : files) { for (AbstractFile file : files) {
FileIngestTask fileTask = new FileIngestTask(job, file); FileIngestTask fileTask = new FileIngestTask(ingestJobPipeline, file);
if (shouldEnqueueFileTask(fileTask)) { if (shouldEnqueueFileTask(fileTask)) {
try { try {
this.fileIngestThreadsQueue.putFirst(fileTask); this.fileIngestThreadsQueue.putFirst(fileTask);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", job.getId()), ex); IngestTasksScheduler.logger.log(Level.INFO, String.format("Ingest tasks scheduler interrupted while scheduling file level ingest tasks (jobId={%d)", ingestJobPipeline.getId()), ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
return; return;
} }
@ -219,34 +242,36 @@ final class IngestTasksScheduler {
*/ */
synchronized void notifyTaskCompleted(FileIngestTask task) { synchronized void notifyTaskCompleted(FileIngestTask task) {
this.fileIngestThreadsQueue.taskCompleted(task); this.fileIngestThreadsQueue.taskCompleted(task);
shuffleFileTaskQueues(); refillIngestThreadQueue();
} }
/** /**
* Queries the task scheduler to determine whether or not all of the ingest * Queries the task scheduler to determine whether or not all of the ingest
* tasks for a data source ingest job have been completed. * tasks for an ingest job pipeline have been completed.
* *
* @param job The data source ingest job. * @param ingestJobPipeline The ingestJobPipeline.
* *
* @return True or false. * @return True or false.
*/ */
synchronized boolean tasksForJobAreCompleted(DataSourceIngestJob job) { synchronized boolean currentTasksAreCompleted(IngestJobPipeline ingestJobPipeline) {
long jobId = job.getId(); long jobId = ingestJobPipeline.getId();
return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId) return !(this.dataSourceIngestThreadQueue.hasTasksForJob(jobId)
|| hasTasksForJob(this.rootFileTaskQueue, jobId) || hasTasksForJob(this.rootFileTaskQueue, jobId)
|| hasTasksForJob(this.pendingFileTaskQueue, jobId) || hasTasksForJob(this.pendingFileTaskQueue, jobId)
|| hasTasksForJob(this.streamedTasksQueue, jobId)
|| this.fileIngestThreadsQueue.hasTasksForJob(jobId)); || this.fileIngestThreadsQueue.hasTasksForJob(jobId));
} }
/** /**
* Clears the "upstream" task scheduling queues for a data source ingest * Clears the "upstream" task scheduling queues for an ingest pipeline,
* job, but does nothing about tasks that have already been moved into the * but does nothing about tasks that have already been moved into the
* queue that is consumed by the file ingest threads. * queue that is consumed by the file ingest threads.
* *
* @param job The data source ingest job. * @param ingestJobPipeline The ingestJobPipeline.
*/ */
synchronized void cancelPendingTasksForIngestJob(DataSourceIngestJob job) { synchronized void cancelPendingTasksForIngestJob(IngestJobPipeline ingestJobPipeline) {
long jobId = job.getId(); long jobId = ingestJobPipeline.getId();
IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId); IngestTasksScheduler.removeTasksForJob(this.rootFileTaskQueue, jobId);
IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId); IngestTasksScheduler.removeTasksForJob(this.pendingFileTaskQueue, jobId);
} }
@ -291,6 +316,47 @@ final class IngestTasksScheduler {
} }
return topLevelFiles; return topLevelFiles;
} }
/**
* Schedules file ingest tasks for the ingest manager's file ingest threads.
* Files from streaming ingest will be prioritized.
*/
synchronized private void refillIngestThreadQueue() {
try {
takeFromStreamingTaskQueue();
takeFromBatchTasksQueues();
} catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
Thread.currentThread().interrupt();
}
}
/**
* Move tasks from the streamedTasksQueue into the fileIngestThreadsQueue.
* Will attempt to move as many tasks as there are ingest threads.
*/
synchronized private void takeFromStreamingTaskQueue() throws InterruptedException {
/*
* Schedule files from the streamedTasksQueue
*/
while (fileIngestThreadsQueue.isEmpty()) {
/*
* We will attempt to schedule as many tasks as there are ingest queues.
*/
int taskCount = 0;
while (taskCount < IngestManager.getInstance().getNumberOfFileIngestThreads()) {
final FileIngestTask streamingTask = streamedTasksQueue.poll();
if (streamingTask == null) {
return; // No streaming tasks are queued right now
}
if (shouldEnqueueFileTask(streamingTask)) {
fileIngestThreadsQueue.putLast(streamingTask);
taskCount++;
}
}
}
}
/** /**
* Schedules file ingest tasks for the ingest manager's file ingest threads * Schedules file ingest tasks for the ingest manager's file ingest threads
@ -322,8 +388,9 @@ final class IngestTasksScheduler {
* during ingest. The reason for the LIFO additions is to give priority to * during ingest. The reason for the LIFO additions is to give priority to
* files derived from prioritized files. * files derived from prioritized files.
*/ */
synchronized private void shuffleFileTaskQueues() { synchronized private void takeFromBatchTasksQueues() throws InterruptedException {
while (this.fileIngestThreadsQueue.isEmpty()) {
while (this.fileIngestThreadsQueue.isEmpty()) {
/* /*
* If the pending file task queue is empty, move the highest * If the pending file task queue is empty, move the highest
* priority root file task, if there is one, into it. * priority root file task, if there is one, into it.
@ -345,17 +412,11 @@ final class IngestTasksScheduler {
return; return;
} }
if (shouldEnqueueFileTask(pendingTask)) { if (shouldEnqueueFileTask(pendingTask)) {
try { /*
/* * The task is added to the queue for the ingest threads
* The task is added to the queue for the ingest threads * AFTER the higher priority tasks that preceded it.
* AFTER the higher priority tasks that preceded it. */
*/ this.fileIngestThreadsQueue.putLast(pendingTask);
this.fileIngestThreadsQueue.putLast(pendingTask);
} catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
Thread.currentThread().interrupt();
return;
}
} }
/* /*
@ -365,27 +426,27 @@ final class IngestTasksScheduler {
* own, or into the queue for the file ingest threads, if it passes * own, or into the queue for the file ingest threads, if it passes
* the filter for the job. * the filter for the job.
*/ */
final AbstractFile file = pendingTask.getFile(); AbstractFile file = null;
try { try {
file = pendingTask.getFile();
for (Content child : file.getChildren()) { for (Content child : file.getChildren()) {
if (child instanceof AbstractFile) { if (child instanceof AbstractFile) {
AbstractFile childFile = (AbstractFile) child; AbstractFile childFile = (AbstractFile) child;
FileIngestTask childTask = new FileIngestTask(pendingTask.getIngestJob(), childFile); FileIngestTask childTask = new FileIngestTask(pendingTask.getIngestJobPipeline(), childFile);
if (childFile.hasChildren()) { if (childFile.hasChildren()) {
this.pendingFileTaskQueue.add(childTask); this.pendingFileTaskQueue.add(childTask);
} else if (shouldEnqueueFileTask(childTask)) { } else if (shouldEnqueueFileTask(childTask)) {
try { this.fileIngestThreadsQueue.putLast(childTask);
this.fileIngestThreadsQueue.putLast(childTask);
} catch (InterruptedException ex) {
IngestTasksScheduler.logger.log(Level.INFO, "Ingest tasks scheduler interrupted while blocked adding a task to the file level ingest task queue", ex);
Thread.currentThread().interrupt();
return;
}
} }
} }
} }
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS if (file != null) {
logger.log(Level.SEVERE, String.format("Error getting the children of %s (objId=%d)", file.getName(), file.getId()), ex); //NON-NLS
} else {
// In practice, the task would have already returned false from the call to shouldEnqueueFileTask()
logger.log(Level.SEVERE, "Error loading file with object ID {0}", pendingTask.getFileId());
}
} }
} }
} }
@ -400,7 +461,13 @@ final class IngestTasksScheduler {
* @return True or false. * @return True or false.
*/ */
private static boolean shouldEnqueueFileTask(final FileIngestTask task) { private static boolean shouldEnqueueFileTask(final FileIngestTask task) {
final AbstractFile file = task.getFile(); AbstractFile file;
try {
file = task.getFile();
} catch (TskCoreException ex) {
logger.log(Level.SEVERE, "Error loading file with ID {0}", task.getFileId());
return false;
}
// Skip the task if the file is actually the pseudo-file for the parent // Skip the task if the file is actually the pseudo-file for the parent
// or current directory. // or current directory.
@ -483,7 +550,12 @@ final class IngestTasksScheduler {
* @return True or false. * @return True or false.
*/ */
private static boolean shouldBeCarved(final FileIngestTask task) { private static boolean shouldBeCarved(final FileIngestTask task) {
return task.getIngestJob().shouldProcessUnallocatedSpace() && task.getFile().getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS); try {
AbstractFile file = task.getFile();
return task.getIngestJobPipeline().shouldProcessUnallocatedSpace() && file.getType().equals(TskData.TSK_DB_FILES_TYPE_ENUM.UNALLOC_BLOCKS);
} catch (TskCoreException ex) {
return false;
}
} }
/** /**
@ -495,7 +567,12 @@ final class IngestTasksScheduler {
* @return True or false. * @return True or false.
*/ */
private static boolean fileAcceptedByFilter(final FileIngestTask task) { private static boolean fileAcceptedByFilter(final FileIngestTask task) {
return !(task.getIngestJob().getFileIngestFilter().fileIsMemberOf(task.getFile()) == null); try {
AbstractFile file = task.getFile();
return !(task.getIngestJobPipeline().getFileIngestFilter().fileIsMemberOf(file) == null);
} catch (TskCoreException ex) {
return false;
}
} }
/** /**
@ -509,7 +586,7 @@ final class IngestTasksScheduler {
*/ */
synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long jobId) { synchronized private static boolean hasTasksForJob(Collection<? extends IngestTask> tasks, long jobId) {
for (IngestTask task : tasks) { for (IngestTask task : tasks) {
if (task.getIngestJob().getId() == jobId) { if (task.getIngestJobPipeline().getId() == jobId) {
return true; return true;
} }
} }
@ -527,7 +604,7 @@ final class IngestTasksScheduler {
Iterator<? extends IngestTask> iterator = tasks.iterator(); Iterator<? extends IngestTask> iterator = tasks.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
IngestTask task = iterator.next(); IngestTask task = iterator.next();
if (task.getIngestJob().getId() == jobId) { if (task.getIngestJobPipeline().getId() == jobId) {
iterator.remove(); iterator.remove();
} }
} }
@ -544,7 +621,7 @@ final class IngestTasksScheduler {
private static int countTasksForJob(Collection<? extends IngestTask> queue, long jobId) { private static int countTasksForJob(Collection<? extends IngestTask> queue, long jobId) {
int count = 0; int count = 0;
for (IngestTask task : queue) { for (IngestTask task : queue) {
if (task.getIngestJob().getId() == jobId) { if (task.getIngestJobPipeline().getId() == jobId) {
count++; count++;
} }
} }
@ -575,10 +652,36 @@ final class IngestTasksScheduler {
@Override @Override
public int compare(FileIngestTask q1, FileIngestTask q2) { public int compare(FileIngestTask q1, FileIngestTask q2) {
AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(q1.getFile()); // In practice the case where one or both calls to getFile() fails
AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(q2.getFile()); // should never occur since such tasks would not be added to the queue.
AbstractFile file1 = null;
AbstractFile file2 = null;
try {
file1 = q1.getFile();
} catch (TskCoreException ex) {
// Do nothing - the exception has been logged elsewhere
}
try {
file2 = q2.getFile();
} catch (TskCoreException ex) {
// Do nothing - the exception has been logged elsewhere
}
if (file1 == null) {
if (file2 == null) {
return (int) (q2.getFileId() - q1.getFileId());
} else {
return 1;
}
} else if (file2 == null) {
return -1;
}
AbstractFilePriority.Priority p1 = AbstractFilePriority.getPriority(file1);
AbstractFilePriority.Priority p2 = AbstractFilePriority.getPriority(file2);
if (p1 == p2) { if (p1 == p2) {
return (int) (q2.getFile().getId() - q1.getFile().getId()); return (int) (file2.getId() - file1.getId());
} else { } else {
return p2.ordinal() - p1.ordinal(); return p2.ordinal() - p1.ordinal();
} }

View File

@ -29,6 +29,7 @@ import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorCallback
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorProgressMonitor; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessorProgressMonitor;
import org.sleuthkit.autopsy.coreutils.Logger; import org.sleuthkit.autopsy.coreutils.Logger;
import org.sleuthkit.datamodel.Content; import org.sleuthkit.datamodel.Content;
import org.sleuthkit.datamodel.DefaultAddDataSourceCallbacks;
import org.sleuthkit.datamodel.Image; import org.sleuthkit.datamodel.Image;
import org.sleuthkit.datamodel.SleuthkitCase; import org.sleuthkit.datamodel.SleuthkitCase;
import org.sleuthkit.datamodel.SleuthkitJNI; import org.sleuthkit.datamodel.SleuthkitJNI;
@ -60,6 +61,7 @@ class AddMultipleImagesTask implements Runnable {
private List<String> errorMessages = new ArrayList<>(); private List<String> errorMessages = new ArrayList<>();
private DataSourceProcessorResult result; private DataSourceProcessorResult result;
private List<Content> newDataSources = new ArrayList<>(); private List<Content> newDataSources = new ArrayList<>();
private Image currentImage = null;
/* /*
* The cancellation requested flag and SleuthKit add image process are * The cancellation requested flag and SleuthKit add image process are
@ -105,6 +107,8 @@ class AddMultipleImagesTask implements Runnable {
@Messages({ @Messages({
"AddMultipleImagesTask.cancelled=Cancellation: Add image process reverted", "AddMultipleImagesTask.cancelled=Cancellation: Add image process reverted",
"# {0} - image path",
"AddMultipleImagesTask.imageError=Error adding image {0} to the database"
}) })
@Override @Override
public void run() { public void run() {
@ -118,15 +122,25 @@ class AddMultipleImagesTask implements Runnable {
List<String> corruptedImageFilePaths = new ArrayList<>(); List<String> corruptedImageFilePaths = new ArrayList<>();
progressMonitor.setIndeterminate(true); progressMonitor.setIndeterminate(true);
for (String imageFilePath : imageFilePaths) { for (String imageFilePath : imageFilePaths) {
try {
currentImage = SleuthkitJNI.addImageToDatabase(currentCase.getSleuthkitCase(), new String[]{imageFilePath},
0, timeZone, "", "", "", deviceId);
} catch (TskCoreException ex) {
LOGGER.log(Level.SEVERE, "Error adding image " + imageFilePath + " to database", ex);
errorMessages.add(Bundle.AddMultipleImagesTask_imageError(imageFilePath));
result = DataSourceProcessorResult.CRITICAL_ERRORS;
}
synchronized (tskAddImageProcessLock) { synchronized (tskAddImageProcessLock) {
if (!tskAddImageProcessStopped) { if (!tskAddImageProcessStopped) {
addImageProcess = currentCase.getSleuthkitCase().makeAddImageProcess(timeZone, false, false, ""); addImageProcess = currentCase.getSleuthkitCase().makeAddImageProcess(timeZone, false, false, "");
} else { } else {
return; return;
} }
} }
run(imageFilePath, corruptedImageFilePaths, errorMessages); run(imageFilePath, currentImage, corruptedImageFilePaths, errorMessages);
commitOrRevertAddImageProcess(imageFilePath, errorMessages, newDataSources); finishAddImageProcess(imageFilePath, errorMessages, newDataSources);
synchronized (tskAddImageProcessLock) { synchronized (tskAddImageProcessLock) {
if (tskAddImageProcessStopped) { if (tskAddImageProcessStopped) {
errorMessages.add(Bundle.AddMultipleImagesTask_cancelled()); errorMessages.add(Bundle.AddMultipleImagesTask_cancelled());
@ -218,7 +232,8 @@ class AddMultipleImagesTask implements Runnable {
/** /**
* Attempts to add an input image to the case. * Attempts to add an input image to the case.
* *
* @param imageFilePath The image file path. * @param imageFilePath Path to the image.
* @param image The image.
* @param corruptedImageFilePaths If the image cannot be added because * @param corruptedImageFilePaths If the image cannot be added because
* Sleuth Kit cannot detect a filesystem, * Sleuth Kit cannot detect a filesystem,
* the image file path is added to this list * the image file path is added to this list
@ -233,13 +248,13 @@ class AddMultipleImagesTask implements Runnable {
"# {0} - imageFilePath", "# {1} - deviceId", "# {2} - exceptionMessage", "AddMultipleImagesTask.criticalErrorAdding=Critical error adding {0} for device {1}: {2}", "# {0} - imageFilePath", "# {1} - deviceId", "# {2} - exceptionMessage", "AddMultipleImagesTask.criticalErrorAdding=Critical error adding {0} for device {1}: {2}",
"# {0} - imageFilePath", "# {1} - deviceId", "# {2} - exceptionMessage", "AddMultipleImagesTask.criticalErrorReverting=Critical error reverting add image process for {0} for device {1}: {2}", "# {0} - imageFilePath", "# {1} - deviceId", "# {2} - exceptionMessage", "AddMultipleImagesTask.criticalErrorReverting=Critical error reverting add image process for {0} for device {1}: {2}",
"# {0} - imageFilePath", "# {1} - deviceId", "# {2} - exceptionMessage", "AddMultipleImagesTask.nonCriticalErrorAdding=Non-critical error adding {0} for device {1}: {2}",}) "# {0} - imageFilePath", "# {1} - deviceId", "# {2} - exceptionMessage", "AddMultipleImagesTask.nonCriticalErrorAdding=Non-critical error adding {0} for device {1}: {2}",})
private void run(String imageFilePath, List<String> corruptedImageFilePaths, List<String> errorMessages) { private void run(String imageFilePath, Image image, List<String> corruptedImageFilePaths, List<String> errorMessages) {
/* /*
* Try to add the image to the case database as a data source. * Try to add the image to the case database as a data source.
*/ */
progressMonitor.setProgressText(Bundle.AddMultipleImagesTask_adding(imageFilePath)); progressMonitor.setProgressText(Bundle.AddMultipleImagesTask_adding(imageFilePath));
try { try {
addImageProcess.run(deviceId, new String[]{imageFilePath}); addImageProcess.run(deviceId, image, 0, new DefaultAddDataSourceCallbacks());
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
if (ex.getMessage().contains(TSK_FS_TYPE_UNKNOWN_ERR_MSG)) { if (ex.getMessage().contains(TSK_FS_TYPE_UNKNOWN_ERR_MSG)) {
/* /*
@ -259,9 +274,9 @@ class AddMultipleImagesTask implements Runnable {
} }
/** /**
* Commits or reverts the results of the TSK add image process. If the * Finishes TSK add image process.
* process was stopped before it completed or there was a critical error the * The image will always be in the database regardless of whether the user
* results are reverted, otherwise they are committed. * canceled or a critical error occurred.
* *
* @param imageFilePath The image file path. * @param imageFilePath The image file path.
* @param errorMessages Error messages, if any, are added to this list for * @param errorMessages Error messages, if any, are added to this list for
@ -270,44 +285,26 @@ class AddMultipleImagesTask implements Runnable {
* added to this list for eventual return via the * added to this list for eventual return via the
* getter method. * getter method.
*/ */
private void commitOrRevertAddImageProcess(String imageFilePath, List<String> errorMessages, List<Content> newDataSources) { private void finishAddImageProcess(String imageFilePath, List<String> errorMessages, List<Content> newDataSources) {
synchronized (tskAddImageProcessLock) { synchronized (tskAddImageProcessLock) {
if (tskAddImageProcessStopped || criticalErrorOccurred) { /*
try { * Add the new image to the list of new data
addImageProcess.revert(); * sources to be returned via the getter method.
} catch (TskCoreException ex) { */
errorMessages.add(Bundle.AddMultipleImagesTask_criticalErrorReverting(imageFilePath, deviceId, ex.getLocalizedMessage())); newDataSources.add(currentImage);
criticalErrorOccurred = true;
} // Do no further processing if the user canceled
if (tskAddImageProcessStopped) {
return; return;
} }
/*
* Try to commit the results of the add image process, retrieve the new
* image from the case database, and add it to the list of new data
* sources to be returned via the getter method.
*/
try {
long imageId = addImageProcess.commit();
Image dataSource = currentCase.getSleuthkitCase().getImageById(imageId);
newDataSources.add(dataSource);
/* /*
* Verify the size of the new image. Note that it may not be what is * Verify the size of the new image. Note that it may not be what is
* expected, but at least part of it was added to the case. * expected, but at least part of it was added to the case.
*/ */
String verificationError = dataSource.verifyImageSize(); String verificationError = currentImage.verifyImageSize();
if (!verificationError.isEmpty()) { if (!verificationError.isEmpty()) {
errorMessages.add(Bundle.AddMultipleImagesTask_nonCriticalErrorAdding(imageFilePath, deviceId, verificationError)); errorMessages.add(Bundle.AddMultipleImagesTask_nonCriticalErrorAdding(imageFilePath, deviceId, verificationError));
}
} catch (TskCoreException ex) {
/*
* The add image process commit failed or querying the case database
* for the newly added image failed. Either way, this is a critical
* error.
*/
errorMessages.add(Bundle.AddMultipleImagesTask_criticalErrorAdding(imageFilePath, deviceId, ex.getLocalizedMessage()));
criticalErrorOccurred = true;
} }
} }
} }

View File

@ -67,6 +67,8 @@ AddMultipleImagesTask.criticalErrorReverting=Critical error reverting add image
# {1} - exceptionMessage # {1} - exceptionMessage
AddMultipleImagesTask.errorAddingImgWithoutFileSystem=Error adding images without file systems for device {0}: {1} AddMultipleImagesTask.errorAddingImgWithoutFileSystem=Error adding images without file systems for device {0}: {1}
AddMultipleImagesTask.fsTypeUnknownErr=Cannot determine file system type AddMultipleImagesTask.fsTypeUnknownErr=Cannot determine file system type
# {0} - image path
AddMultipleImagesTask.imageError=Error adding image {0} to the database
# {0} - imageFilePath # {0} - imageFilePath
# {1} - deviceId # {1} - deviceId
# {2} - exceptionMessage # {2} - exceptionMessage

View File

@ -33,7 +33,7 @@ import javax.annotation.concurrent.Immutable;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor; import org.sleuthkit.autopsy.corecomponentinterfaces.DataSourceProcessor;
import org.sleuthkit.autopsy.coreutils.NetworkUtils; import org.sleuthkit.autopsy.coreutils.NetworkUtils;
import org.sleuthkit.autopsy.ingest.DataSourceIngestJob.Snapshot; import org.sleuthkit.autopsy.ingest.Snapshot;
import org.sleuthkit.autopsy.ingest.IngestJob; import org.sleuthkit.autopsy.ingest.IngestJob;
import org.sleuthkit.autopsy.ingest.IngestManager.IngestThreadActivitySnapshot; import org.sleuthkit.autopsy.ingest.IngestManager.IngestThreadActivitySnapshot;
import org.sleuthkit.autopsy.ingest.IngestProgressSnapshotProvider; import org.sleuthkit.autopsy.ingest.IngestProgressSnapshotProvider;

View File

@ -37,7 +37,7 @@ import org.sleuthkit.autopsy.datamodel.NodeProperty;
import org.sleuthkit.autopsy.experimental.autoingest.AutoIngestJob.Stage; import org.sleuthkit.autopsy.experimental.autoingest.AutoIngestJob.Stage;
import org.sleuthkit.autopsy.guiutils.DurationCellRenderer; import org.sleuthkit.autopsy.guiutils.DurationCellRenderer;
import org.sleuthkit.autopsy.guiutils.StatusIconCellRenderer; import org.sleuthkit.autopsy.guiutils.StatusIconCellRenderer;
import org.sleuthkit.autopsy.ingest.DataSourceIngestJob; import org.sleuthkit.autopsy.ingest.Snapshot;
/** /**
* A node which represents all AutoIngestJobs of a given AutoIngestJobStatus. * A node which represents all AutoIngestJobs of a given AutoIngestJobStatus.
@ -96,7 +96,7 @@ final class AutoIngestJobsNode extends AbstractNode {
* they can be changed by events in other threads which * they can be changed by events in other threads which
*/ */
private final Stage jobStage; private final Stage jobStage;
private final List<DataSourceIngestJob.Snapshot> jobSnapshot; private final List<Snapshot> jobSnapshot;
private final Integer jobPriority; private final Integer jobPriority;
AutoIngestJobWrapper(AutoIngestJob job) { AutoIngestJobWrapper(AutoIngestJob job) {