7415 file pipeline file save batching

This commit is contained in:
Richard Cordovano 2021-03-25 10:08:03 -04:00
parent 9795a55861
commit 258ab2e74a
3 changed files with 57 additions and 34 deletions

View File

@ -95,6 +95,7 @@ final class DataSourceIngestPipeline extends IngestTaskPipeline<DataSourceIngest
if (!ingestJobPipeline.isCancelled() && ingestJobPipeline.currentDataSourceIngestModuleIsCancelled()) { if (!ingestJobPipeline.isCancelled() && ingestJobPipeline.currentDataSourceIngestModuleIsCancelled()) {
ingestJobPipeline.currentDataSourceIngestModuleCancellationCompleted(getDisplayName()); ingestJobPipeline.currentDataSourceIngestModuleCancellationCompleted(getDisplayName());
} }
// See JIRA-7449
// if (result == ProcessResult.ERROR) { // if (result == ProcessResult.ERROR) {
// throw new IngestModuleException(String.format("%s experienced an error analyzing %s (data source objId = %d)", getDisplayName(), dataSource.getName(), dataSource.getId())); //NON-NLS // throw new IngestModuleException(String.format("%s experienced an error analyzing %s (data source objId = %d)", getDisplayName(), dataSource.getName(), dataSource.getId())); //NON-NLS
// } // }

View File

@ -24,8 +24,10 @@ import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.openide.util.NbBundle; import org.openide.util.NbBundle;
import org.sleuthkit.autopsy.casemodule.Case; import org.sleuthkit.autopsy.casemodule.Case;
import org.sleuthkit.autopsy.casemodule.NoCurrentCaseException;
import org.sleuthkit.datamodel.AbstractFile; import org.sleuthkit.datamodel.AbstractFile;
import org.sleuthkit.datamodel.SleuthkitCase; import org.sleuthkit.datamodel.SleuthkitCase;
import org.sleuthkit.datamodel.SleuthkitCase.CaseDbTransaction;
import org.sleuthkit.datamodel.TskCoreException; import org.sleuthkit.datamodel.TskCoreException;
/** /**
@ -74,10 +76,15 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
void completeTask(FileIngestTask task) throws IngestTaskPipelineException { void completeTask(FileIngestTask task) throws IngestTaskPipelineException {
ingestManager.setIngestTaskProgress(task, Bundle.FileIngestPipeline_SaveResults_Activity()); ingestManager.setIngestTaskProgress(task, Bundle.FileIngestPipeline_SaveResults_Activity());
/* /*
* Code in only one file ingest thread at a time will try to access the * Close and cache the file from the file ingest task. The cache will be
* file list. The synchronization here is to ensure visibility of the * used for an eventual batch update of the case database with new
* files in all of the threads that share the list, rather than to * properties added to the files in the cache by the ingest modules that
* prevent simultaneous access in multiple threads. * processed them.
*
* Only one file ingest thread at a time will try to access the file
* cache. The synchronization here is to ensure visibility of the files
* in all of the threads that share the cache, rather than to prevent
* simultaneous access in multiple threads.
*/ */
synchronized (fileBatch) { synchronized (fileBatch) {
AbstractFile file = null; AbstractFile file = null;
@ -85,13 +92,12 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
file = task.getFile(); file = task.getFile();
file.close(); file.close();
} catch (TskCoreException ex) { } catch (TskCoreException ex) {
// RJCTODO: Is this right?
throw new IngestTaskPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS throw new IngestTaskPipelineException(String.format("Failed to get file (file objId = %d)", task.getFileId()), ex); //NON-NLS
} }
if (!ingestJobPipeline.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
fileBatch.add(file); fileBatch.add(file);
if (fileBatch.size() >= FILE_BATCH_SIZE) { if (fileBatch.size() >= FILE_BATCH_SIZE) {
updateFiles(); clearFileCache();
} }
} }
ingestManager.setIngestTaskProgressCompleted(task); ingestManager.setIngestTaskProgressCompleted(task);
@ -100,47 +106,63 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
@Override @Override
List<IngestModuleError> shutDown() { List<IngestModuleError> shutDown() {
Date start = new Date(); List<IngestModuleError> errors = new ArrayList<>();
updateFiles(); if (!ingestJobPipeline.isCancelled()) {
Date finish = new Date(); Date start = new Date();
ingestManager.incrementModuleRunTime("Save Files", finish.getTime() - start.getTime()); // RJCTODO try {
return super.shutDown(); clearFileCache();
} catch (IngestTaskPipelineException ex) {
errors.add(new IngestModuleError(Bundle.FileIngestPipeline_SaveResults_Activity(), ex));
}
Date finish = new Date();
ingestManager.incrementModuleRunTime(Bundle.FileIngestPipeline_SaveResults_Activity(), finish.getTime() - start.getTime());
}
errors.addAll(super.shutDown());
return errors;
} }
/** /**
* RJCTODO * Updates the case database with new properties added to the files in the
* cache by the ingest modules that processed them.
* *
* @throws TskCoreException * @throws IngestTaskPipelineException Exception thrown if the case database
* update fails.
*/ */
private void updateFiles() throws TskCoreException { private void clearFileCache() throws IngestTaskPipelineException {
Case currentCase = Case.getCurrentCase(); /*
SleuthkitCase caseDb = currentCase.getSleuthkitCase(); * Only one file ingest thread at a time will try to access the file
SleuthkitCase.CaseDbTransaction transaction = caseDb.beginTransaction(); * cache. The synchronization here is to ensure visibility of the files
// transaction.commit(); * in all of the threads that share the cache, rather than to prevent
* simultaneous access in multiple threads.
*/
synchronized (fileBatch) { synchronized (fileBatch) {
for (AbstractFile file : fileBatch) { CaseDbTransaction transaction = null;
try { try {
Case currentCase = Case.getCurrentCaseThrows();
SleuthkitCase caseDb = currentCase.getSleuthkitCase();
transaction = caseDb.beginTransaction();
for (AbstractFile file : fileBatch) {
if (!ingestJobPipeline.isCancelled()) { if (!ingestJobPipeline.isCancelled()) {
file.save(transaction); file.save(transaction);
} }
} catch (TskCoreException ex) { }
// RJCTODO: Log instead? transaction.commit();
// throw new IngestTaskPipelineException(String.format("Failed to save updated data for file (file objId = %d)", task.getFileId()), ex); //NON-NLS if (!ingestJobPipeline.isCancelled()) {
} finally { for (AbstractFile file : fileBatch) {
if (!ingestJobPipeline.isCancelled()) {
IngestManager.getInstance().fireFileIngestDone(file); IngestManager.getInstance().fireFileIngestDone(file);
} }
file.close();
//ingestManager.setIngestTaskProgressCompleted(task);
} }
} } catch (NoCurrentCaseException | TskCoreException ex) {
for (AbstractFile file : fileBatch) { if (transaction != null) {
if (!ingestJobPipeline.isCancelled()) { try {
IngestManager.getInstance().fireFileIngestDone(file); transaction.rollback();
} catch (TskCoreException ignored) {
}
} }
throw new IngestTaskPipelineException("Failed to save updated properties for cached files from tasks", ex); //NON-NLS
} finally {
fileBatch.clear();
} }
fileBatch.clear();
} }
} }
@ -176,6 +198,7 @@ final class FileIngestPipeline extends IngestTaskPipeline<FileIngestTask> {
ingestManager.setIngestTaskProgress(task, getDisplayName()); ingestManager.setIngestTaskProgress(task, getDisplayName());
ingestJobPipeline.setCurrentFileIngestModule(getDisplayName(), file.getName()); ingestJobPipeline.setCurrentFileIngestModule(getDisplayName(), file.getName());
ProcessResult result = module.process(file); ProcessResult result = module.process(file);
// See JIRA-7449
// if (result == ProcessResult.ERROR) { // if (result == ProcessResult.ERROR) {
// throw new IngestModuleException(String.format("%s experienced an error analyzing %s (file objId = %d)", getDisplayName(), file.getName(), file.getId())); //NON-NLS // throw new IngestModuleException(String.format("%s experienced an error analyzing %s (file objId = %d)", getDisplayName(), file.getName(), file.getId())); //NON-NLS
// } // }

View File

@ -36,7 +36,6 @@ import org.sleuthkit.autopsy.coreutils.MessageNotifyUtil;
*/ */
abstract class IngestTaskPipeline<T extends IngestTask> { abstract class IngestTaskPipeline<T extends IngestTask> {
private static final IngestManager ingestManager = IngestManager.getInstance();
private final IngestJobPipeline ingestJobPipeline; private final IngestJobPipeline ingestJobPipeline;
private final List<IngestModuleTemplate> moduleTemplates; private final List<IngestModuleTemplate> moduleTemplates;
private final List<PipelineModule<T>> modules; private final List<PipelineModule<T>> modules;