Changed code to address CommunicationsArtifactHelper API changes

This commit is contained in:
U-BASIS\dsmyda 2019-10-01 17:28:26 -04:00
parent 5ede580ee0
commit 7d9d61213e
8 changed files with 90 additions and 124 deletions

View File

@ -37,7 +37,6 @@ class TskCallLogsParser(ResultSetIterator):
super(TskCallLogsParser, self).__init__(result_set)
self._DEFAULT_STRING = ""
self._DEFAULT_DIRECTION = CommunicationDirection.UNKNOWN
self._DEFAULT_ADDRESS = None
self._DEFAULT_CALL_TYPE = CallMediaType.UNKNOWN
self._DEFAULT_LONG = -1L
@ -50,10 +49,10 @@ class TskCallLogsParser(ResultSetIterator):
return self._DEFAULT_DIRECTION
def get_phone_number_from(self):
return self._DEFAULT_ADDRESS
return None
def get_phone_number_to(self):
return self._DEFAULT_ADDRESS
return None
def get_call_start_date_time(self):
return self._DEFAULT_LONG

View File

@ -29,10 +29,9 @@ class TskContactsParser(ResultSetIterator):
def __init__(self, result_set):
super(TskContactsParser, self).__init__(result_set)
self._DEFAULT_VALUE = ""
self._DEFAULT_ACCOUNT_ADDRESS = None
def get_account_address(self):
return self._DEFAULT_ACCOUNT_ADDRESS
def get_contact_name(self):
return self._DEFAULT_VALUE
def get_phone(self):
return self._DEFAULT_VALUE
@ -45,3 +44,6 @@ class TskContactsParser(ResultSetIterator):
def get_email(self):
return self._DEFAULT_VALUE
def get_other_attributes(self):
return None

View File

@ -37,7 +37,6 @@ class TskMessagesParser(ResultSetIterator):
self._DEFAULT_TEXT = ""
self._DEFAULT_LONG = -1L
self._DEFAULT_MSG_READ_STATUS = MessageReadStatus.UNKNOWN
self._DEFAULT_ACCOUNT_ADDRESS = None
self._DEFAULT_COMMUNICATION_DIRECTION = CommunicationDirection.UNKNOWN
self.INCOMING = CommunicationDirection.INCOMING
@ -52,10 +51,10 @@ class TskMessagesParser(ResultSetIterator):
return self._DEFAULT_COMMUNICATION_DIRECTION
def get_phone_number_from(self):
return self._DEFAULT_ACCOUNT_ADDRESS
return None
def get_phone_number_to(self):
return self._DEFAULT_ACCOUNT_ADDRESS
return None
def get_message_date_time(self):
return self._DEFAULT_LONG

View File

@ -139,14 +139,15 @@ class LineAnalyzer(general.AndroidComponentAnalyzer):
def parse_contacts(self, contacts_db, helper):
try:
contacts_parser = LineContactsParser(contacts_db)
contacts_parser = LineContactsParser(contacts_db, self._PARSER_NAME)
while contacts_parser.next():
helper.addContact(
contacts_parser.get_account_address(),
contacts_parser.get_contact_name(),
contacts_parser.get_phone(),
contacts_parser.get_home_phone(),
contacts_parser.get_mobile_phone(),
contacts_parser.get_email()
contacts_parser.get_email(),
contacts_parser.get_other_attributes()
)
contacts_parser.close()
except SQLException as ex:
@ -291,23 +292,14 @@ class LineCallLogsParser(TskCallLogsParser):
group_members = self.result_set.getString("group_members")
if group_members is not None:
group_members = group_members.split(",")
group_names = self.result_set.getString("names").split(",")
return group_members
recipients = []
for member_id, member_name in zip(group_members, group_names):
recipients.append(Account.Address(member_id, member_name))
return recipients
return Account.Address(self.result_set.getString("caller_mid"),
self.result_set.getString("names"))
return self.result_set.getString("caller_mid")
return super(LineCallLogsParser, self).get_phone_number_to()
def get_phone_number_from(self):
if self.get_call_direction() == self.INCOMING_CALL:
return Account.Address(self.result_set.getString("caller_mid"),
self.result_set.getString("names"))
return self.result_set.getString("caller_mid")
return super(LineCallLogsParser, self).get_phone_number_from()
def get_call_type(self):
@ -331,7 +323,7 @@ class LineContactsParser(TskContactsParser):
a default value inherited from the super class.
"""
def __init__(self, contact_db):
def __init__(self, contact_db, analyzer):
super(LineContactsParser, self).__init__(contact_db.runQuery(
"""
SELECT m_id,
@ -341,9 +333,17 @@ class LineContactsParser(TskContactsParser):
)
)
def get_account_address(self):
return Account.Address(self.result_set.getString("m_id"),
self.result_set.getString("server_name"))
self._PARENT_ANALYZER = analyzer
def get_contact_name(self):
return self.result_set.getString("server_name")
def get_other_attributes(self):
return [BlackboardAttribute(
BlackboardAttribute.ATTRIBUTE_TYPE.TSK_ID,
self._PARENT_ANALYZER,
self.result_set.getString("m_id"))]
class LineMessagesParser(TskMessagesParser):
"""
@ -430,8 +430,7 @@ class LineMessagesParser(TskMessagesParser):
if self.get_message_direction() == self.INCOMING:
from_mid = self.result_set.getString("from_mid")
if from_mid is not None:
return Account.Address(from_mid,
self.result_set.getString("from_name"))
return from_mid
return super(LineMessagesParser, self).get_phone_number_from()
def get_phone_number_to(self):
@ -439,17 +438,9 @@ class LineMessagesParser(TskMessagesParser):
group = self.result_set.getString("members")
if group is not None:
group = group.split(",")
names = self.result_set.getString("member_names").split(",")
recipients = []
return group
for recipient_id, recipient_name in zip(group, names):
recipients.append(Account.Address(recipient_id, recipient_name))
return recipients
return Account.Address(self.result_set.getString("id"),
self.result_set.getString("name"))
return self.result_set.getString("id")
return super(LineMessagesParser, self).get_phone_number_to()

View File

@ -148,14 +148,15 @@ class SkypeAnalyzer(general.AndroidComponentAnalyzer):
#Query for contacts and iterate row by row adding
#each contact artifact
try:
contacts_parser = SkypeContactsParser(skype_db)
contacts_parser = SkypeContactsParser(skype_db, self._PARSER_NAME)
while contacts_parser.next():
helper.addContact(
contacts_parser.get_account_address(),
contacts_parser.get_contact_name(),
contacts_parser.get_phone(),
contacts_parser.get_home_phone(),
contacts_parser.get_mobile_phone(),
contacts_parser.get_email()
contacts_parser.get_email(),
contacts_parser.get_other_attributes()
)
contacts_parser.close()
except SQLException as ex:
@ -306,25 +307,17 @@ class SkypeCallLogsParser(TskCallLogsParser):
def get_phone_number_from(self):
if self.get_call_direction() == self.INCOMING_CALL:
return Account.Address(self.result_set.getString("sender_id"),
self.result_set.getString("sender_name"))
return self.result_set.getString("sender_id")
def get_phone_number_to(self):
if self.get_call_direction() == self.OUTGOING_CALL:
group_ids = self.result_set.getString("participant_ids")
name = self.result_set.getString("participants")
if group_ids is not None:
group_ids = group_ids.split(",")
name = name.split(",")
recipients = []
for person_id, person_name in zip(group_ids, name):
recipients.append(Account.Address(person_id, person_name))
return recipients
return group_ids
return Account.Address(self.result_set.getString("conversation_id"), name)
return self.result_set.getString("conversation_id")
return super(SkypeCallLogsParser, self).get_phone_number_to()
@ -351,7 +344,7 @@ class SkypeContactsParser(TskContactsParser):
a default value inherited from the super class.
"""
def __init__(self, contact_db):
def __init__(self, contact_db, analyzer):
super(SkypeContactsParser, self).__init__(contact_db.runQuery(
"""
SELECT entry_id,
@ -360,10 +353,17 @@ class SkypeContactsParser(TskContactsParser):
"""
)
)
self._PARENT_ANALYZER = analyzer
def get_account_address(self):
return Account.Address(self.result_set.getString("entry_id"),
self.result_set.getString("name"))
def get_contact_name(self):
return self.result_set.getString("name")
def get_other_attributes(self):
return [BlackboardAttribute(
BlackboardAttribute.ATTRIBUTE_TYPE.TSK_ID,
self._PARENT_ANALYZER,
self.result_set.getString("entry_id"))]
class SkypeMessagesParser(TskMessagesParser):
"""
@ -424,8 +424,7 @@ class SkypeMessagesParser(TskMessagesParser):
def get_phone_number_from(self):
if self.get_message_direction() == self.INCOMING:
return Account.Address(self.result_set.getString("sender_id"),
self.result_set.getString("sender_name"))
return self.result_set.getString("sender_id")
return super(SkypeMessagesParser, self).get_phone_number_from()
def get_message_direction(self):
@ -439,19 +438,12 @@ class SkypeMessagesParser(TskMessagesParser):
def get_phone_number_to(self):
if self.get_message_direction() == self.OUTGOING:
group_ids = self.result_set.getString("participant_ids")
names = self.result_set.getString("participants")
if group_ids is not None:
group_ids = group_ids.split(",")
names = names.split(",")
recipients = []
for participant_id, participant_name in zip(group_ids, names):
recipients.append(Account.Address(participant_id, participant_name))
return recipients
return group_ids
return Account.Address(self.result_set.getString("conversation_id"), names)
return self.result_set.getString("conversation_id")
return super(SkypeMessagesParser, self).get_phone_number_to()

View File

@ -108,7 +108,7 @@ class TextNowAnalyzer(general.AndroidComponentAnalyzer):
contacts_parser = TextNowContactsParser(textnow_db)
while contacts_parser.next():
helper.addContact(
contacts_parser.get_account_address(),
contacts_parser.get_contact_name(),
contacts_parser.get_phone(),
contacts_parser.get_home_phone(),
contacts_parser.get_mobile_phone(),
@ -220,14 +220,12 @@ class TextNowCallLogsParser(TskCallLogsParser):
def get_phone_number_from(self):
if self.get_call_direction() == self.OUTGOING_CALL:
return super(TextNowCallLogsParser, self).get_phone_number_from()
return Account.Address(self.result_set.getString("num"),
self.result_set.getString("num"))
return self.result_set.getString("num")
def get_phone_number_to(self):
if self.get_call_direction() == self.INCOMING_CALL:
return super(TextNowCallLogsParser, self).get_phone_number_to()
return Account.Address(self.result_set.getString("num"),
self.result_set.getString("num"))
return self.result_set.getString("num")
def get_call_direction(self):
if self.result_set.getInt("direction") == self._INCOMING_CALL_TYPE:
@ -266,9 +264,8 @@ class TextNowContactsParser(TskContactsParser):
)
)
def get_account_address(self):
return Account.Address(self.result_set.getString("number"),
self.result_set.getString("name"))
def get_contact_name(self):
return self.result_set.getString("name")
def get_phone(self):
return self.result_set.getString("number")
@ -341,8 +338,7 @@ class TextNowMessagesParser(TskMessagesParser):
def get_phone_number_from(self):
if self.result_set.getString("from_address") == "":
return super(TextNowMessagesParser, self).get_phone_number_from()
return Account.Address(self.result_set.getString("from_address"),
self.result_set.getString("from_address"))
return self.result_set.getString("from_address")
def get_message_direction(self):
direction = self.result_set.getInt("message_direction")
@ -354,12 +350,7 @@ class TextNowMessagesParser(TskMessagesParser):
if self.result_set.getString("to_address") == "":
return super(TextNowMessagesParser, self).get_phone_number_to()
recipients = self.result_set.getString("to_address").split(",")
recipient_accounts = []
for recipient in recipients:
recipient_accounts.append(Account.Address(recipient, recipient))
return recipient_accounts
return recipients
def get_message_date_time(self):
#convert ms to s

View File

@ -116,7 +116,7 @@ class ViberAnalyzer(general.AndroidComponentAnalyzer):
contacts_parser = ViberContactsParser(contacts_db)
while contacts_parser.next():
helper.addContact(
contacts_parser.get_account_address(),
contacts_parser.get_contact_name(),
contacts_parser.get_phone(),
contacts_parser.get_home_phone(),
contacts_parser.get_mobile_phone(),
@ -217,16 +217,14 @@ class ViberCallLogsParser(TskCallLogsParser):
def get_phone_number_from(self):
if self.get_call_direction() == self.INCOMING_CALL:
return Account.Address(self.result_set.getString("number"),
self.result_set.getString("number"))
return self.result_set.getString("number")
#Give default value if the call is outgoing,
#the device's # is not stored in the database.
return super(ViberCallLogsParser, self).get_phone_number_from()
def get_phone_number_to(self):
if self.get_call_direction() == self.OUTGOING_CALL:
return Account.Address(self.result_set.getString("number"),
self.result_set.getString("number"))
return self.result_set.getString("number")
#Give default value if the call is incoming,
#the device's # is not stored in the database.
return super(ViberCallLogsParser, self).get_phone_number_to()
@ -272,9 +270,8 @@ class ViberContactsParser(TskContactsParser):
)
)
def get_account_address(self):
return Account.Address(self.result_set.getString("number"),
self.result_set.getString("name"))
def get_contact_name(self):
return self.result_set.getString("name")
def get_phone(self):
return self.result_set.getString("number")
@ -339,8 +336,7 @@ class ViberMessagesParser(TskMessagesParser):
return self._VIBER_MESSAGE_TYPE
def get_phone_number_from(self):
return Account.Address(self.result_set.getString("from_number"),
self.result_set.getString("from_number"))
return self.result_set.getString("from_number")
def get_message_direction(self):
direction = self.result_set.getInt("direction")
@ -349,10 +345,7 @@ class ViberMessagesParser(TskMessagesParser):
return self.OUTGOING
def get_phone_number_to(self):
recipients = []
for token in self.result_set.getString("recipients").split(","):
recipients.append(Account.Address(token, token))
return recipients
return self.result_set.getString("recipients").split(","):
def get_message_date_time(self):
#transform from ms to seconds

View File

@ -167,14 +167,15 @@ class WhatsAppAnalyzer(general.AndroidComponentAnalyzer):
def parse_contacts(self, contacts_db, helper):
try:
contacts_parser = WhatsAppContactsParser(contacts_db)
contacts_parser = WhatsAppContactsParser(contacts_db, self._PARSER_NAME)
while contacts_parser.next():
helper.addContact(
contacts_parser.get_account_address(),
contacts_parser.get_contact_name(),
contacts_parser.get_phone(),
contacts_parser.get_home_phone(),
contacts_parser.get_mobile_phone(),
contacts_parser.get_email()
contacts_parser.get_email(),
contacts_parser.get_other_attributes()
)
contacts_parser.close()
except SQLException as ex:
@ -295,16 +296,14 @@ class WhatsAppGroupCallLogsParser(TskCallLogsParser):
def get_phone_number_from(self):
if self.get_call_direction() == self.INCOMING_CALL:
sender = self.result_set.getString("from_id")
return Account.Address(sender, sender)
return sender
return super(WhatsAppGroupCallLogsParser, self).get_phone_number_from()
def get_phone_number_to(self):
if self.get_call_direction() == self.OUTGOING_CALL:
#group_members column stores comma seperated list of groups or single contact
group = self.result_set.getString("group_members")
members = []
for token in group.split(","):
members.append(Account.Address(token, token))
return members
return group.split(","):
return super(WhatsAppGroupCallLogsParser, self).get_phone_number_to()
def get_call_start_date_time(self):
@ -354,13 +353,13 @@ class WhatsAppSingleCallLogsParser(TskCallLogsParser):
def get_phone_number_from(self):
if self.get_call_direction() == self.INCOMING_CALL:
sender = self.result_set.getString("num")
return Account.Address(sender, sender)
return sender
return super(WhatsAppSingleCallLogsParser, self).get_phone_number_from()
def get_phone_number_to(self):
if self.get_call_direction() == self.OUTGOING_CALL:
to = self.result_set.getString("num")
return Account.Address(to, to)
return to
return super(WhatsAppSingleCallLogsParser, self).get_phone_number_to()
def get_call_start_date_time(self):
@ -384,7 +383,7 @@ class WhatsAppContactsParser(TskContactsParser):
a default value inherited from the super class.
"""
def __init__(self, contact_db):
def __init__(self, contact_db, analyzer):
super(WhatsAppContactsParser, self).__init__(contact_db.runQuery(
"""
SELECT jid,
@ -409,14 +408,21 @@ class WhatsAppContactsParser(TskContactsParser):
"""
)
)
self._PARENT_ANALYZER = analyzer
def get_account_address(self):
return Account.Address(self.result_set.getString("jid"),
self.result_set.getString("name"))
def get_contact_name(self):
return self.result_set.getString("name")
def get_phone(self):
return self.result_set.getString("number")
def get_other_attributes(self):
return [BlackboardAttribute(
BlackboardAttribute.ATTRIBUTE_TYPE.TSK_ID,
self._PARENT_ANALYZER,
self.result_set.getString("jid"))]
class WhatsAppMessagesParser(TskMessagesParser):
"""
Extract TSK_MESSAGE information from the WhatsApp database.
@ -468,15 +474,9 @@ class WhatsAppMessagesParser(TskMessagesParser):
group = self.result_set.getString("recipients")
if group is not None:
group = group.split(",")
recipients = []
for token in group:
recipients.append(Account.Address(token, token))
return recipients
return group
return Account.Address(self.result_set.getString("id"),
self.result_set.getString("id"))
return self.result_set.getString("id")
return super(WhatsAppMessagesParser, self).get_phone_number_to()
def get_phone_number_from(self):
@ -484,10 +484,9 @@ class WhatsAppMessagesParser(TskMessagesParser):
group_sender = self.result_set.getString("group_sender")
group = self.result_set.getString("recipients")
if group_sender is not None and group is not None:
return Account.Address(group_sender, group_sender)
return group_sender
else:
return Account.Address(self.result_set.getString("id"),
self.result_set.getString("id"))
return self.result_set.getString("id")
return super(WhatsAppMessagesParser, self).get_phone_number_from()
def get_message_direction(self):