/* * Autopsy Forensic Browser * * Copyright 2011 Basis Technology Corp. * Contact: carrier sleuthkit org * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.sleuthkit.autopsy.ingest; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.swing.SwingUtilities; import javax.swing.SwingWorker; import org.netbeans.api.progress.ProgressHandle; import org.netbeans.api.progress.ProgressHandleFactory; import org.openide.util.Cancellable; import org.openide.util.Lookup; import org.sleuthkit.autopsy.ingest.IngestMessage.MessageType; import org.sleuthkit.datamodel.FsContent; import org.sleuthkit.datamodel.Image; /** * IngestManager sets up and manages ingest services * runs them in a background thread * notifies services when work is complete or should be interrupted * processes messages from services in postMessage() and posts them to GUI * */ public class IngestManager { private static final Logger logger = Logger.getLogger(IngestManager.class.getName()); private IngestTopComponent tc; private IngestManagerStats stats; private int updateFrequency; //queues private final ImageQueue imageQueue = new ImageQueue(); private final FsContentQueue fsContentQueue = new FsContentQueue(); private IngestThread ingester; final Collection imageServices = enumerateImageServices(); final Collection fsContentServices = enumerateFsContentServices(); /** * * @param tc handle to Ingest top component */ IngestManager(IngestTopComponent tc) { this.tc = tc; //one time initialization of services for (IngestServiceImage s : imageServices) { s.init(this); } for (IngestServiceFsContent s : fsContentServices) { s.init(this); } } /** * Multiple image version of execute, enqueues multiple images and associated services at once * @param services services to execute on every image * @param images images to execute services on */ void execute(final Collection services, final Collection images) { tc.enableStartButton(false); SwingWorker queueWorker = new EnqueueWorker(services, images); queueWorker.execute(); //logger.log(Level.INFO, "Queues: " + imageQueue.toString() + " " + fsContentQueue.toString()); } /** * IngestManager entry point, enqueues image to be processed. * Spawns background thread which enumerates all sorted files and executes chosen services per file in a pre-determined order. * Notifies services when work is complete or should be interrupted using complete() and stop() calls. * Does not block and can be called multiple times to enqueue more work to already running background process. * @param services services to execute on the image * @param image image to execute services on */ void execute(final Collection services, final Image image) { Collection images = new ArrayList(); images.add(image); execute(services, images); } private void startAll() { boolean start = false; if (ingester == null) { start = true; } //if worker had completed, restart it in case data is still enqueued else if (ingester.isDone() && (hasNextFsContent() || hasNextImage())) { logger.log(Level.INFO, "Restarting ingester thread."); start = true; } else { logger.log(Level.INFO, "Ingester is still running"); } if (start) { logger.log(Level.INFO, "Starting new ingester."); ingester = new IngestThread(); stats = new IngestManagerStats(); ingester.execute(); } } /** * returns the current minimal update frequency setting * Services should call this between processing iterations to get current setting * and use the setting to change notification and data refresh intervals */ public synchronized int getUpdateFrequency() { return updateFrequency; } /** * set new minimal update frequency services should use * @param frequency */ synchronized void setUpdateFrequency(int frequency) { this.updateFrequency = frequency; } /** * returns ingest summary report (how many files ingested, any errors, etc) */ String getReport() { return stats.toString(); } /** * Service publishes message using InegestManager handle * Does not block. * The message gets enqueued in the GUI thread and displayed in a widget * IngestService should make an attempt not to publish the same message multiple times. * Viewer will attempt to identify duplicate messages and filter them out (slower) */ public synchronized void postMessage(final IngestMessage message) { if (stats != null) { //record the error for stats, if stats are running if (message.getMessageType() == MessageType.ERROR) { stats.addError(message.getSource()); } } SwingUtilities.invokeLater(new Runnable() { @Override public void run() { tc.displayMessage(message); } }); } /** * helper to return all image services managed (using Lookup API) */ public static Collection enumerateImageServices() { return (Collection) Lookup.getDefault().lookupAll(IngestServiceImage.class); } /** * helper to return all file/dir services managed (using Lookup API) */ public static Collection enumerateFsContentServices() { return (Collection) Lookup.getDefault().lookupAll(IngestServiceFsContent.class); } private void addImage(IngestServiceImage service, Image image) { imageQueue.enqueue(image, service); } private void addFsContent(IngestServiceFsContent service, Image image) { Collection fsContents = new GetAllFilesContentVisitor().visit(image); for (FsContent fsContent : fsContents) { fsContentQueue.enqueue(fsContent, service); } } /** * get next file/dir to process * the queue of FsContent to process is maintained internally * and could be dynamically sorted as data comes in */ private QueueUnit getNextFsContent() { QueueUnit ret = null; ret = fsContentQueue.dequeue(); return ret; } private boolean hasNextFsContent() { boolean ret = false; ret = fsContentQueue.hasNext(); return ret; } private int getNumFsContents() { int ret = 0; ret = fsContentQueue.getCount(); return ret; } private void emptyFsContents() { fsContentQueue.empty(); } private void emptyImages() { imageQueue.empty(); } /** * get next Image to process * the queue of Images to process is maintained internally * and could be dynamically sorted as data comes in */ private QueueUnit getNextImage() { QueueUnit ret = null; ret = imageQueue.dequeue(); return ret; } private boolean hasNextImage() { boolean ret = false; ret = imageQueue.hasNext(); return ret; } private int getNumImages() { int ret = 0; ret = imageQueue.getCount(); return ret; } private void initMainProgress(final int maximum) { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { tc.initProgress(maximum); } }); } private void updateMainProgress(final int progress) { SwingUtilities.invokeLater(new Runnable() { @Override public void run() { tc.updateProgress(progress); } }); } //manages queue of pending FsContent and IngestServiceFsContent to use on that content //TODO in future content sort will be maintained based on priorities private class FsContentQueue { List> fsContentUnits = new ArrayList>(); synchronized void enqueue(FsContent fsContent, IngestServiceFsContent service) { QueueUnit found = findFsContent(fsContent); if (found != null) { //FsContent already enqueued //merge services to use with already enqueued image found.add(service); } else { //enqueue brand new FsContent with the services found = new QueueUnit(fsContent, service); fsContentUnits.add(found); } } synchronized void enqueue(FsContent fsContent, Collection services) { QueueUnit found = findFsContent(fsContent); if (found != null) { //FsContent already enqueued //merge services to use with already enqueued FsContent found.addAll(services); } else { //enqueue brand new FsContent with the services found = new QueueUnit(fsContent, services); fsContentUnits.add(found); } } synchronized boolean hasNext() { return !fsContentUnits.isEmpty(); } synchronized int getCount() { return fsContentUnits.size(); } synchronized void empty() { fsContentUnits.clear(); } synchronized QueueUnit dequeue() { if (!hasNext()) { throw new UnsupportedOperationException("FsContent processing queue is empty"); } return fsContentUnits.remove(0); } private QueueUnit findFsContent(FsContent fsContent) { QueueUnit found = null; for (QueueUnit unit : fsContentUnits) { if (unit.content.equals(fsContent)) { found = unit; break; } } return found; } @Override synchronized public String toString() { return "FsContentQueue, size: " + Integer.toString(fsContentUnits.size()); } } //manages queue of pending Images and IngestServiceImage to use on that image private class ImageQueue { List> imageUnits = new ArrayList>(); synchronized void enqueue(Image image, IngestServiceImage service) { QueueUnit found = findImage(image); if (found != null) { //image already enqueued //merge services to use with already enqueued image found.add(service); } else { //enqueue brand new image with the services found = new QueueUnit(image, service); imageUnits.add(found); } } synchronized void enqueue(Image image, Collection services) { QueueUnit found = findImage(image); if (found != null) { //image already enqueued //merge services to use with already enqueued image found.addAll(services); } else { //enqueue brand new image with the services found = new QueueUnit(image, services); imageUnits.add(found); } } synchronized boolean hasNext() { return !imageUnits.isEmpty(); } synchronized int getCount() { return imageUnits.size(); } synchronized void empty() { imageUnits.clear(); } synchronized QueueUnit dequeue() { if (!hasNext()) { throw new UnsupportedOperationException("Image processing queue is empty"); } return imageUnits.remove(0); } private synchronized QueueUnit findImage(Image image) { QueueUnit found = null; for (QueueUnit unit : imageUnits) { if (unit.content.equals(image)) { found = unit; break; } } return found; } @Override public synchronized String toString() { return "ImageQueue, size: " + Integer.toString(imageUnits.size()); } } /** * generic representation of queued content (Image or FsContent) and its services */ private class QueueUnit { T content; Set services; QueueUnit(T content, S service) { this.content = content; this.services = new HashSet(); add(service); } QueueUnit(T content, Collection services) { this.content = content; this.services = new HashSet(); addAll(services); } //merge services with the current collection of services per image //this assumes that there is one singleton instance of each type of service final void addAll(Collection services) { this.services.addAll(services); } final void add(S service) { this.services.add(service); } } /** * collects IngestManager statistics during runtime */ private static class IngestManagerStats { Date startTime; Date endTime; int errorsTotal; Map errors; private static DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); IngestManagerStats() { errors = new HashMap(); } @Override public String toString() { final String EOL = System.getProperty("line.separator"); StringBuilder sb = new StringBuilder(); if (startTime != null) { sb.append("Start time: ").append(dateFormatter.format(startTime)).append(EOL); } if (endTime != null) { sb.append("End time: ").append(dateFormatter.format(endTime)).append(EOL); } sb.append("Total ingest time: ").append(getTotalTimeString()).append(EOL); sb.append("Total errors: ").append(errorsTotal).append(EOL); if (errorsTotal > 0) { sb.append("Errors per service:"); for (IngestServiceAbstract service : errors.keySet()) { final int errorsService = errors.get(service); sb.append("\t").append(service.getName()).append(": ").append(errorsService).append(EOL); } } return sb.toString(); } public String toHtmlString() { StringBuilder sb = new StringBuilder(); sb.append(""); if (startTime != null) { sb.append("Start time: ").append(dateFormatter.format(startTime)).append("
"); } if (endTime != null) { sb.append("End time: ").append(dateFormatter.format(endTime)).append("
"); } sb.append("Total ingest time: ").append(getTotalTimeString()).append("
"); sb.append("Total errors: ").append(errorsTotal).append("
"); if (errorsTotal > 0) { sb.append("Errors per service:"); for (IngestServiceAbstract service : errors.keySet()) { final int errorsService = errors.get(service); sb.append("\t").append(service.getName()).append(": ").append(errorsService).append("
"); } } sb.append(""); return sb.toString(); } void start() { startTime = new Date(); } void end() { endTime = new Date(); } long getTotalTime() { if (startTime == null || endTime == null) { return 0; } return endTime.getTime() - startTime.getTime(); } String getStartTimeString() { return dateFormatter.format(startTime); } String getEndTimeString() { return dateFormatter.format(endTime); } String getTotalTimeString() { long ms = getTotalTime(); long hours = TimeUnit.MILLISECONDS.toHours(ms); ms -= TimeUnit.HOURS.toMillis(hours); long minutes = TimeUnit.MILLISECONDS.toMinutes(ms); ms -= TimeUnit.MINUTES.toMillis(minutes); long seconds = TimeUnit.MILLISECONDS.toSeconds(ms); final StringBuilder sb = new StringBuilder(); sb.append(hours < 10 ? "0" : "").append(hours).append(':').append(minutes < 10 ? "0" : "").append(minutes).append(':').append(seconds < 10 ? "0" : "").append(seconds); return sb.toString(); } synchronized void addError(IngestServiceAbstract source) { ++errorsTotal; int curServiceError = errors.get(source); errors.put(source, curServiceError + 1); } } //ingester worker doing work in background //in current design, worker runs until queues are consumed //and if needed, it is restarted when data arrives private class IngestThread extends SwingWorker { private Logger logger = Logger.getLogger(IngestThread.class.getName()); private ProgressHandle progress; @Override protected Object doInBackground() throws Exception { logger.log(Level.INFO, "Starting background processing"); stats.start(); progress = ProgressHandleFactory.createHandle("Ingesting", new Cancellable() { @Override public boolean cancel() { return IngestThread.this.cancel(true); } }); progress.start(); progress.switchToIndeterminate(); int numImages = getNumImages(); progress.switchToDeterminate(numImages); int processedImages = 0; //process image queue while (hasNextImage()) { QueueUnit unit = getNextImage(); for (IngestServiceImage service : unit.services) { if (isCancelled()) { return null; } try { service.process(unit.content); } catch (Exception e) { logger.log(Level.INFO, "Exception from service: " + service.getName(), e); stats.addError(service); } //check if new files enqueued int newImages = getNumImages(); if (newImages > numImages) { numImages = newImages + processedImages + 1; progress.switchToIndeterminate(); progress.switchToDeterminate(numImages); } progress.progress("Images (" + service.getName() + ")", ++processedImages); --numImages; } } progress.switchToIndeterminate(); int numFsContents = getNumFsContents(); progress.switchToDeterminate(numFsContents); int processedFiles = 0; initMainProgress(numFsContents); //process fscontents queue while (hasNextFsContent()) { QueueUnit unit = getNextFsContent(); for (IngestServiceFsContent service : unit.services) { if (isCancelled()) { return null; } try { service.process(unit.content); } catch (Exception e) { logger.log(Level.INFO, "Exception from service: " + service.getName(), e); stats.addError(service); } } int newFsContents = getNumFsContents(); if (newFsContents > numFsContents) { //update progress bar if new enqueued numFsContents = newFsContents + processedFiles + 1; progress.switchToIndeterminate(); progress.switchToDeterminate(numFsContents); initMainProgress(numFsContents); } progress.progress("Files", ++processedFiles); updateMainProgress(processedFiles); --numFsContents; } logger.log(Level.INFO, "Done background processing"); return null; } @Override protected void done() { try { super.get(); //block and get all exceptions thrown while doInBackground() //notify services of completion if (!this.isCancelled()) { for (IngestServiceImage s : imageServices) { s.complete(); } for (IngestServiceFsContent s : fsContentServices) { s.complete(); } } } catch (CancellationException e) { //task was cancelled handleInterruption(); } catch (InterruptedException ex) { handleInterruption(); } catch (ExecutionException ex) { handleInterruption(); logger.log(Level.SEVERE, "Fatal error during ingest.", ex); } catch (Exception ex) { handleInterruption(); logger.log(Level.SEVERE, "Fatal error during ingest.", ex); } finally { stats.end(); progress.finish(); if (!this.isCancelled()) { logger.log(Level.INFO, "Summary Report: " + stats.toString()); tc.displayReport(stats.toHtmlString()); } } } @Override protected void process(List chunks) { super.process(chunks); } private void handleInterruption() { for (IngestServiceImage s : imageServices) { s.stop(); } for (IngestServiceFsContent s : fsContentServices) { s.stop(); } //empty queues emptyFsContents(); emptyImages(); //reset main progress bar initMainProgress(0); } } private class EnqueueWorker extends SwingWorker { Collection services; final Collection images; int total; EnqueueWorker(final Collection services, final Collection images) { this.services = services; this.images = images; } private ProgressHandle progress; @Override protected Object doInBackground() throws Exception { progress = ProgressHandleFactory.createHandle("Queueing Ingest", new Cancellable() { @Override public boolean cancel() { return EnqueueWorker.this.cancel(true); } }); total = services.size() * images.size(); progress.start(total); //progress.switchToIndeterminate(); queueAll(services, images); return null; } @Override protected void done() { try { super.get(); //block and get all exceptions thrown while doInBackground() } catch (CancellationException e) { //task was cancelled handleInterruption(); } catch (InterruptedException ex) { handleInterruption(); } catch (ExecutionException ex) { handleInterruption(); } catch (Exception ex) { handleInterruption(); } finally { //queing end if (this.isCancelled()) { //empty queues emptyFsContents(); emptyImages(); } else { //start ingest workers startAll(); } progress.finish(); tc.enableStartButton(true); } } private void queueAll(Collection services, final Collection images) { int processed = 0; for (Image image : images) { final String imageName = image.getName(); for (IngestServiceAbstract service : services) { final String serviceName = service.getName(); progress.progress(serviceName + " " + imageName, processed); switch (service.getType()) { case Image: addImage((IngestServiceImage) service, image); break; case FsContent: addFsContent((IngestServiceFsContent) service, image); break; default: logger.log(Level.SEVERE, "Unexpected service type: " + service.getType().name()); } progress.progress(serviceName + " " + imageName, ++processed); } } } private void handleInterruption() { //empty queues emptyFsContents(); emptyImages(); } } }