This commit is contained in:
Greg DiCristofaro 2020-07-22 15:31:12 -04:00
parent aab740e57e
commit 9baf804e0f
7 changed files with 13 additions and 194 deletions

View File

@ -1,12 +1,8 @@
"""Provides tools for parsing and writing to a csv file. """Provides tools for parsing and writing to a csv file.
""" """
import codecs from typing import List, Iterable, Tuple
from typing import List, Iterable, Tuple, TypeVar
import csv import csv
import os import os
import unittest
from envutil import get_proj_dir
from unittestutil import TEST_OUTPUT_FOLDER
def records_to_csv(output_path: str, rows: Iterable[List[str]]): def records_to_csv(output_path: str, rows: Iterable[List[str]]):
@ -53,40 +49,3 @@ def csv_to_records(input_path: str, header_row: bool) -> Tuple[List[List[str]],
raise Exception("There was an error parsing csv {path}".format(path=input_path), e) raise Exception("There was an error parsing csv {path}".format(path=input_path), e)
return results, header return results, header
class CsvUtilTest(unittest.TestCase):
T = TypeVar('T')
def assert_equal_arr(self, a: List[T], b: List[T]):
self.assertEqual(len(a), len(b), 'arrays are not equal length')
for i in range(0, len(a)):
if isinstance(a[i], list) and isinstance(b[i], list):
self.assert_equal_arr(a[i], b[i])
else:
self.assertEqual(a[i], b[i], "Items: {0} and {1} at index {2} are not equal.".format(a[i], b[i], i))
def test_read_write(self):
data = [['header1', 'header2', 'header3', 'additional header'],
['data1', 'data2', 'data3'],
['', 'data2-1', 'data2-2']]
os.makedirs(os.path.join(get_proj_dir(), TEST_OUTPUT_FOLDER))
test_path = os.path.join(get_proj_dir(), TEST_OUTPUT_FOLDER, 'test.csv')
records_to_csv(test_path, data)
byte_inf = min(32, os.path.getsize(test_path))
with open(test_path, 'rb') as bom_test_file:
raw = bom_test_file.read(byte_inf)
if not raw.startswith(codecs.BOM_UTF8):
self.fail("written csv does not have appropriate BOM")
read_records_no_header, no_header = csv_to_records(test_path, header_row=False)
self.assert_equal_arr(read_records_no_header, data)
read_rows, header = csv_to_records(test_path, header_row=True)
self.assert_equal_arr(header, data[0])
self.assert_equal_arr(read_rows, [data[1], data[2]])

View File

@ -1,6 +1,6 @@
import os import os
import unittest
from typing import Union, Tuple from typing import Union, Tuple
from pathlib import Path
def get_path_pieces(orig_path: str) -> Tuple[str, Union[str, None], Union[str, None]]: def get_path_pieces(orig_path: str) -> Tuple[str, Union[str, None], Union[str, None]]:
@ -13,11 +13,13 @@ def get_path_pieces(orig_path: str) -> Tuple[str, Union[str, None], Union[str, N
""" """
potential_parent_dir, orig_file = os.path.split(orig_path) potential_parent_dir, orig_file = os.path.split(str(Path(orig_path)))
filename, file_extension = os.path.splitext(orig_file) filename, file_extension = os.path.splitext(orig_file)
if file_extension.startswith('.'):
file_extension = file_extension[1:]
if file_extension is None or len(file_extension) < 1: if file_extension is None or len(file_extension) < 1:
return orig_path, None, None return str(Path(orig_path)), None, None
else: else:
return potential_parent_dir, filename, file_extension return potential_parent_dir, filename, file_extension
@ -35,7 +37,7 @@ def get_new_path(orig_path: str, new_filename: str) -> str:
""" """
parent_dir, filename, ext = get_path_pieces(orig_path) parent_dir, filename, ext = get_path_pieces(orig_path)
return os.path.join(parent_dir, new_filename) return str(Path(parent_dir) / Path(new_filename))
# For use with creating csv filenames for entries that have been omitted. # For use with creating csv filenames for entries that have been omitted.
@ -55,52 +57,7 @@ def get_filename_addition(orig_path: str, filename_addition: str) -> str:
""" """
parent_dir, filename, extension = get_path_pieces(orig_path) parent_dir, filename, extension = get_path_pieces(orig_path)
if filename is None: if filename is None:
return orig_path + filename_addition return str(Path(orig_path + filename_addition))
else: else:
ext = '' if extension is None else extension ext = '' if extension is None else extension
return os.path.join(parent_dir, '{0}{1}.{2}'.format(filename, filename_addition, ext)) return str(Path(parent_dir) / Path('{0}{1}.{2}'.format(filename, filename_addition, ext)))
class FileUtilTest(unittest.TestCase):
@staticmethod
def _joined_paths(pieces: Tuple[str, str, str]) -> str:
return os.path.join(pieces[0], pieces[1] + '.' + pieces[2])
PATH_PIECES1 = ('/test/folder', 'filename', 'ext')
PATH_PIECES2 = ('/test.test2/folder.test2', 'filename.test', 'ext')
PATH_PIECES3 = ('/test.test2/folder.test2/folder', None, None)
def __init__(self):
self.PATH1 = FileUtilTest._joined_paths(self.PATH_PIECES1)
self.PATH2 = FileUtilTest._joined_paths(self.PATH_PIECES2)
self.PATH3 = self.PATH_PIECES3[0]
self.ALL_ITEMS = [
(self.PATH_PIECES1, self.PATH1),
(self.PATH_PIECES2, self.PATH2),
(self.PATH_PIECES3, self.PATH3)
]
def get_path_pieces_test(self):
for (expected_path, expected_filename, expected_ext), path in self.ALL_ITEMS:
path, filename, ext = get_path_pieces(path)
self.assertEqual(path, expected_path)
self.assertEqual(filename, expected_filename)
self.assertEqual(ext, expected_ext)
def get_new_path_test(self):
for (expected_path, expected_filename, expected_ext), path in self.ALL_ITEMS:
new_name = "newname.file"
new_path = get_new_path(path, new_name)
self.assertEqual(new_path, os.path.join(expected_path, new_name))
def get_filename_addition_test(self):
for (expected_path, expected_filename, expected_ext), path in self.ALL_ITEMS:
addition = "addition"
new_path = get_filename_addition(path, addition)
self.assertEqual(
new_path, os.path.join(
expected_path,
"{file_name}{addition}.{extension}".format(
file_name=expected_filename, addition=addition, extension=expected_ext)))

View File

@ -1,5 +1,4 @@
import unittest from typing import Iterator, List, Union
from typing import Iterator, List, Union, Dict
from propsutil import get_entry_dict from propsutil import get_entry_dict
from enum import Enum from enum import Enum
@ -31,25 +30,13 @@ class ItemChange:
self.key = key self.key = key
self.prev_val = prev_val self.prev_val = prev_val
self.cur_val = cur_val self.cur_val = cur_val
if ItemChange.has_str_content(cur_val) and not ItemChange.has_str_content(prev_val): if cur_val is not None and prev_val is None:
self.type = ChangeType.ADDITION self.type = ChangeType.ADDITION
elif not ItemChange.has_str_content(cur_val) and ItemChange.has_str_content(prev_val): elif cur_val is None and prev_val is not None:
self.type = ChangeType.DELETION self.type = ChangeType.DELETION
else: else:
self.type = ChangeType.CHANGE self.type = ChangeType.CHANGE
@staticmethod
def has_str_content(content: str):
"""Determines whether or not the content is empty or None.
Args:
content (str): The text.
Returns:
bool: Whether or not it has content.
"""
return content is not None and len(content.strip()) > 0
@staticmethod @staticmethod
def get_headers() -> List[str]: def get_headers() -> List[str]:
"""Returns the csv headers to insert when serializing a list of ItemChange objects to csv. """Returns the csv headers to insert when serializing a list of ItemChange objects to csv.
@ -111,84 +98,3 @@ def get_changed(rel_path: str, a_str: str, b_str: str) -> Iterator[ItemChange]:
mapped = map(lambda key: get_item_change( mapped = map(lambda key: get_item_change(
rel_path, key, a_dict.get(key), b_dict.get(key)), all_keys) rel_path, key, a_dict.get(key), b_dict.get(key)), all_keys)
return filter(lambda entry: entry is not None, mapped) return filter(lambda entry: entry is not None, mapped)
class ItemChangeTest(unittest.TestCase):
@staticmethod
def dict_to_prop_str(dict: Dict[str,str]) -> str:
toret = ''
for key,val in dict.items:
toret += "{key}={value}\n".format(key=key,val=val)
return toret
def get_changed_test(self):
deleted_key = 'deleted.property.key'
deleted_val = 'will be deleted'
change_key = 'change.property.key'
change_val_a = 'original value'
change_val_b = 'new value'
change_key2 = 'change2.property.key'
change_val2_a = 'original value 2'
change_val2_b = ''
addition_key = 'addition.property.key'
addition_new_val = 'the added value'
same_key = 'samevalue.property.key'
same_value = 'the same value'
same_key2 = 'samevalue2.property.key'
same_value2 = ''
a_dict = {
deleted_key: deleted_val,
change_key: change_val_a,
change_key2: change_val2_a,
same_key: same_value,
same_key2: same_value2
}
b_dict = {
change_key: change_val_b,
change_key2: change_val2_b,
addition_key: addition_new_val,
same_key: same_value,
same_key2: same_value2
}
a_str = ItemChangeTest.dict_to_prop_str(a_dict)
b_str = ItemChangeTest.dict_to_prop_str(b_dict)
rel_path = 'my/rel/path.properties'
key_to_change = {}
for item_change in get_changed(rel_path, a_str, b_str):
self.assertEqual(item_change.rel_path, rel_path)
key_to_change[item_change.key] = item_change
deleted_item = key_to_change[deleted_key]
self.assertEqual(deleted_item.type, ChangeType.DELETION)
self.assertEqual(deleted_item.prev_val, deleted_val)
self.assertEqual(deleted_item.cur_val, None)
addition_item = key_to_change[addition_key]
self.assertEqual(addition_item.type, ChangeType.ADDITION)
self.assertEqual(addition_item.prev_val, None)
self.assertEqual(deleted_item.cur_val, addition_new_val)
change_item = key_to_change[change_key]
self.assertEqual(change_item.type, ChangeType.CHANGE)
self.assertEqual(change_item.prev_val, change_val_a)
self.assertEqual(change_item.cur_val, change_val_b)
change_item2 = key_to_change[change_key2]
self.assertEqual(change_item2.type, ChangeType.CHANGE)
self.assertEqual(change_item2.prev_val, change_val2_a)
self.assertEqual(change_item2.cur_val, change_val2_b)
self.assertTrue(same_key not in key_to_change)
self.assertTrue(same_key2 not in key_to_change)

View File

@ -1,6 +1,5 @@
"""Functions handling retrieving and storing when a language was last updated. """Functions handling retrieving and storing when a language was last updated.
""" """
from typing import Union from typing import Union
from envutil import get_proj_dir from envutil import get_proj_dir
from propsutil import get_entry_dict_from_path, update_entry_dict from propsutil import get_entry_dict_from_path, update_entry_dict

View File

@ -1,11 +1,11 @@
"""Provides tools for reading from and writing to java properties files. """Provides tools for reading from and writing to java properties files.
""" """
from typing import Dict, Union, IO from typing import Dict, Union, IO
from jproperties import Properties from jproperties import Properties
import os import os
# The default extension for property files in autopsy repo # The default extension for property files in autopsy repo
DEFAULT_PROPS_EXTENSION = 'properties-MERGED' DEFAULT_PROPS_EXTENSION = 'properties-MERGED'

View File

@ -1 +0,0 @@
output

View File

@ -1 +0,0 @@
TEST_OUTPUT_FOLDER = 'testartifacts/output'