7415 file pipeline file save batching

This commit is contained in:
Richard Cordovano 2021-03-26 12:49:04 -04:00
parent 7cf9360605
commit e7cce9069e

View File

@ -34,9 +34,13 @@ import org.sleuthkit.datamodel.TskCoreException;
* A pipeline of file ingest modules for performing file ingest tasks for an
* ingest job.
*/
@NbBundle.Messages({
"FileIngestPipeline_SaveResults_Activity=Saving Results"
})
final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
private static final int FILE_BATCH_SIZE = 500;
private static final String SAVE_RESULTS_ACTIVITY = Bundle.FileIngestPipeline_SaveResults_Activity();
private static final IngestManager ingestManager = IngestManager.getInstance();
private final IngestJobPipeline ingestJobPipeline;
private final List<AbstractFile> fileBatch;
@ -70,36 +74,15 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
}
@Override
@NbBundle.Messages({
"FileIngestPipeline_SaveResults_Activity=Saving Results"
})
void completeTask(FileIngestTask task) throws IngestTaskPipelineException {
ingestManager.setIngestTaskProgress(task, Bundle.FileIngestPipeline_SaveResults_Activity());
/*
* Close and cache the file from the file ingest task. The cache will be
* used for an eventual batch update of the case database with new
* properties added to the files in the cache by the ingest modules that
* processed them.
*
* Only one file ingest thread at a time will try to access the file
* cache. The synchronization here is to ensure visibility of the files
* in all of the threads that share the cache, rather than to prevent
* simultaneous access in multiple threads.
*/
synchronized (fileBatch) {
AbstractFile file = null;
try {
file = task.getFile();
file.close();
} catch (TskCoreException ex) {
throw new IngestTaskPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS
}
if (!ingestJobPipeline.isCancelled()) {
fileBatch.add(file);
if (fileBatch.size() >= FILE_BATCH_SIZE) {
clearFileCache();
}
}
try {
ingestManager.setIngestTaskProgress(task, SAVE_RESULTS_ACTIVITY);
AbstractFile file = task.getFile();
file.close();
cacheFileForBatchUpdate(file);
} catch (TskCoreException ex) {
throw new IngestTaskPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS
} finally {
ingestManager.setIngestTaskProgressCompleted(task);
}
}
@ -107,20 +90,44 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
@Override
List<IngestModuleError> shutDown() {
List<IngestModuleError> errors = new ArrayList<>();
if (!ingestJobPipeline.isCancelled()) {
Date start = new Date();
try {
clearFileCache();
} catch (IngestTaskPipelineException ex) {
errors.add(new IngestModuleError(Bundle.FileIngestPipeline_SaveResults_Activity(), ex));
}
Date finish = new Date();
ingestManager.incrementModuleRunTime(Bundle.FileIngestPipeline_SaveResults_Activity(), finish.getTime() - start.getTime());
Date start = new Date();
try {
clearFileCache();
} catch (IngestTaskPipelineException ex) {
errors.add(new IngestModuleError(SAVE_RESULTS_ACTIVITY, ex));
}
Date finish = new Date();
ingestManager.incrementModuleRunTime(SAVE_RESULTS_ACTIVITY, finish.getTime() - start.getTime());
errors.addAll(super.shutDown());
return errors;
}
/**
* Adds a file to a file cache used to update the case database with new
* properties added to the files in the cache by the ingest modules that
* processed them. If adding the file to the cache fills the cache, a batch
* update is done immediately.
*
* @param file The file.
*
* @throws IngestTaskPipelineException Exception thrown if the case database
* update fails.
*/
private void cacheFileForBatchUpdate(AbstractFile file) throws IngestTaskPipelineException {
/*
* Only one file ingest thread at a time will try to access the file
* cache. The synchronization here is to ensure visibility of the files
* in all of the threads that share the cache, rather than to prevent
* simultaneous access in multiple threads.
*/
synchronized (fileBatch) {
fileBatch.add(file);
if (fileBatch.size() >= FILE_BATCH_SIZE) {
clearFileCache();
}
}
}
/**
* Updates the case database with new properties added to the files in the
* cache by the ingest modules that processed them.
@ -138,16 +145,14 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
synchronized (fileBatch) {
CaseDbTransaction transaction = null;
try {
Case currentCase = Case.getCurrentCaseThrows();
SleuthkitCase caseDb = currentCase.getSleuthkitCase();
transaction = caseDb.beginTransaction();
for (AbstractFile file : fileBatch) {
if (!ingestJobPipeline.isCancelled()) {
if (!ingestJobPipeline.isCancelled()) {
Case currentCase = Case.getCurrentCaseThrows();
SleuthkitCase caseDb = currentCase.getSleuthkitCase();
transaction = caseDb.beginTransaction();
for (AbstractFile file : fileBatch) {
file.save(transaction);
}
}
transaction.commit();
if (!ingestJobPipeline.isCancelled()) {
transaction.commit();
for (AbstractFile file : fileBatch) {
IngestManager.getInstance().fireFileIngestDone(file);
}