initial commit

This commit is contained in:
overcuriousity
2025-09-03 13:20:23 +02:00
parent 13855a70ae
commit 759acc855d
57 changed files with 7306 additions and 2 deletions

View File

View File

@@ -0,0 +1,107 @@
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Text, DateTime, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager
import logging
SessionFactory = sessionmaker(bind=create_engine('sqlite:///entities.db'))
Base = declarative_base()
class DistinctEntitiesTable(Base):
__tablename__ = 'distinct_entities_table'
distinct_entities_id = Column(Integer, primary_key=True) #is the primary key of the distinct_entities_table
distinct_entity = Column(String, index=True) # is the distinct entity iself, e.g. 192.168.1.1, 192.168.1.1, etc., bc1qy3h5l8n9, etc.
entity_types_id = Column(Integer, ForeignKey('entity_types_table.entity_type_id')) # is the foreign key of the entity_types_table
regex_library = relationship("EntityTypesTable")
individual_entities = relationship("EntitiesTable", back_populates="entity")
class EntitiesTable(Base):
__tablename__ = 'entities_table'
entities_id = Column(Integer, primary_key=True) # is the primary key of the entities_table
distinct_entities_id = Column(Integer, ForeignKey('distinct_entities_table.distinct_entities_id')) # is the foreign key of the distinct_entities_table
entity_types_id = Column(Integer, ForeignKey('entity_types_table.entity_type_id')) # is the foreign key of the entity_types_table
regex_library = relationship("EntityTypesTable")
file_id = Column(Integer, ForeignKey('file_metadata.file_id')) # is the foreign key of the file_metadata
line_number = Column(Integer) # is the line number - the line inside the file which is available in the file_metadata
entry_timestamp = Column(DateTime) # the timestamp which was obtained via regex from the original input file
flag = Column(Boolean, default=False, index=True) # allows a flag to be set by the user and customize data inspection by the flag presence
entity = relationship("DistinctEntitiesTable", back_populates="individual_entities")
file = relationship("FileMetadata")
context = relationship("ContextTable", uselist=False, back_populates="individual_entity")
class ContextTable(Base):
__tablename__ = 'context_table'
context_id = Column(Integer, primary_key=True) # is the primary key of the context_table
entities_id = Column(Integer, ForeignKey('entities_table.entities_id')) # is the foreign key of the entities_table
context_small = Column(Text) # is the context of the entity which was parsed from the original file, by a specific number of lines before and after the entity
context_medium = Column(Text) # is the context of the entity which was parsed from the original file, by a specific number of lines before and after the entity
context_large = Column(Text, index=True)
#context_indexed = Column(Text, index=True) # is the context of the entity which was parsed from the original file, by a specific number of lines before and after the entity
individual_entity = relationship("EntitiesTable", back_populates="context")
class FileMetadata(Base):
__tablename__ = 'file_metadata'
# all stays as it is
file_id = Column(Integer, primary_key=True) # is the primary key of the file_metadata
file_name = Column(String, index=True) # is the name of the original input file
file_path = Column(String) # is the path of the original input file
file_mimetype = Column(String) # is the MIME type of the original input file
class EntityTypesTable(Base):
__tablename__ = 'entity_types_table'
entity_type_id = Column(Integer, primary_key=True) # is the primary key of the entity_types_table
entity_type = Column(String) # is the entity type short form, e.g. ipv4, ipv6, btcaddr, etc
regex_pattern = Column(String) # a regex pattern which could be used for parsing the files
script_parser = Column(String) # the name of the python script which could be used for parsing the files
gui_tooltip = Column(String) # the GUI tooltip
gui_name = Column(String) # the GUI name which is more descriptive than entity_type
parent_type = Column(String, default='root') # hierarchical structure from yaml specs
parser_enabled = Column(Boolean, default=True) # is the parser enabled
def create_database(db_path='sqlite:///entities.db'):
engine = create_engine(db_path)
logging.debug(f"Create Database Engine")
Base.metadata.create_all(engine)
logging.debug(f"Created all Metadata")
engine.dispose()
logging.debug(f"Disposed Engine")
# Start a new session
session = SessionFactory()
logging.debug(f"Started new session with session factory")
# Check if EntityTypesTable is empty
if not session.query(EntityTypesTable).first():
# Populate EntityTypesTable from the YAML file
logging.debug(f"Didnt find the EntityTypesTable, running populate_entity_types_table")
#populate_entity_types_table(session)
session.close()
def get_db_session():
return SessionFactory()
if __name__ == "__main__":
create_database()
@contextmanager
def session_scope():
"""Provide a transactional scope around a series of operations."""
session = SessionFactory()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
raise e
finally:
session.close()

View File

@@ -0,0 +1,357 @@
import logging
import os
import yaml
from PyQt5.QtWidgets import QDialog, QVBoxLayout, QMessageBox, QLabel, QRadioButton, QPushButton
from logline_leviathan.gui.ui_helper import UIHelper
from logline_leviathan.database.database_manager import *
class DatabaseOperations:
def __init__(self, main_window, db_init_func):
self.main_window = main_window
self.db_init_func = db_init_func
self.selected_resolutions = []
def ensureDatabaseExists(self):
db_path = 'entities.db'
db_exists = os.path.exists(db_path)
if not db_exists:
logging.info("Database does not exist. Creating new database...")
self.db_init_func() # This should call create_database
else:
logging.info("Database exists.")
def loadRegexFromYAML(self):
with open('./data/entities.yaml', 'r') as file:
yaml_data = yaml.safe_load(file)
clean_yaml_data = self.notify_duplicates_from_yaml(yaml_data)
return clean_yaml_data
def notify_duplicates_from_yaml(self, yaml_data):
duplicates = []
seen_fields = {'entity_type': {}, 'gui_name': {}, 'gui_tooltip': {}, 'regex_pattern': {}, 'script_parser': {}}
for entity_name, entity_data in yaml_data.items():
# Iterate through each field and check for duplicates
for field in seen_fields:
value = entity_data.get(field)
if value: # Only check non-empty values
if value in seen_fields[field]:
duplicates.append({
"duplicate_field": field,
"entity_name": entity_name,
"original_entity_name": seen_fields[field][value]
})
seen_fields[field][value] = entity_name
if duplicates:
self.show_duplicate_error_dialog(duplicates)
raise ValueError("Duplicate entries found in YAML file. Aborting.")
return yaml_data
def show_duplicate_error_dialog(self, duplicates):
dialog = DuplicateErrorDialog(duplicates)
dialog.exec_()
def show_resolve_inconsistencies_dialog(self, db_entity, yaml_entity):
dialog = ResolveInconsistenciesDialog([(db_entity, yaml_entity)])
result = dialog.exec_()
if result == QDialog.Accepted:
resolutions = dialog.getSelectedResolutions()
if resolutions:
return resolutions[0] # Return the first (and only) resolution
return None
def populate_and_update_entities_from_yaml(self, yaml_data):
with session_scope() as session:
db_entities = session.query(EntityTypesTable).all()
db_entity_dict = {entity.entity_type: entity for entity in db_entities}
for entity_name, entity_data in yaml_data.items():
entity_type = entity_data['entity_type']
db_entity = db_entity_dict.get(entity_type)
if db_entity is None:
db_entity = self.find_potentially_modified_entity(db_entities, entity_data)
if db_entity:
parser_enabled_db = db_entity.parser_enabled
entity_data['parser_enabled'] = parser_enabled_db
if self.is_duplicate_or_inconsistent(db_entity, entity_data, db_entities):
logging.warning(f"Issue found with entity {db_entity} and {entity_data}. Handling resolution.")
resolution = self.show_resolve_inconsistencies_dialog(db_entity, entity_data)
if resolution:
self.apply_resolution([(resolution, db_entity)], session) # Pass db_entity as part of the resolution
else:
for key, value in entity_data.items():
setattr(db_entity, key, value)
else:
new_entity = EntityTypesTable(**entity_data)
session.add(new_entity)
session.commit()
def find_potentially_modified_entity(self, db_entities, yaml_entity):
for db_ent in db_entities:
if any(
getattr(db_ent, key) == yaml_entity[key]
for key in ['entity_type', 'gui_name', 'gui_tooltip', 'regex_pattern', 'script_parser', 'parser_enabled']
if yaml_entity[key]
):
return db_ent
return None
def is_duplicate_or_inconsistent(self, db_entity, yaml_entity, db_entities):
if db_entity:
# Exclude 'parser_enabled' from the inconsistency check
keys_to_check = ['entity_type', 'gui_name', 'gui_tooltip', 'regex_pattern', 'script_parser']
for key in keys_to_check:
if getattr(db_entity, key, None) != yaml_entity.get(key) and yaml_entity.get(key) is not None:
logging.debug(f"Found inconsistent entity: DB-Entity: {db_entity} YAML-Entity: {yaml_entity}")
return True
# Check for duplicate across all entities
for db_ent in db_entities:
if db_ent.entity_type == yaml_entity['entity_type']:
continue
if any(
getattr(db_ent, key) == yaml_entity[key] and yaml_entity[key] is not None
for key in ['entity_type', 'gui_name', 'gui_tooltip', 'regex_pattern', 'script_parser',]
):
logging.debug(f"Found duplicate entity: {db_ent}")
return True
return False
def update_database_entry(self, db_entity, yaml_entity):
for key, value in yaml_entity.items():
setattr(db_entity, key, value)
def apply_resolution(self, resolutions, session):
with open('./data/entities.yaml', 'r') as file:
yaml_data = yaml.safe_load(file)
for (resolution, entity), db_entity in resolutions:
if resolution == 'yaml':
logging.debug(f"Resolving YAML entity: {entity} with resolution: yaml and db_entity: {db_entity}")
if db_entity:
foreign_keys = self.capture_foreign_keys(db_entity.entity_type_id, session)
session.delete(db_entity)
new_entity = EntityTypesTable(**entity)
session.add(new_entity)
session.flush()
self.reassign_foreign_keys(new_entity, foreign_keys, session)
elif resolution == 'db':
if entity: # Existing database entity is chosen
yaml_data[entity.entity_type] = {
'entity_type': entity.entity_type,
'gui_name': entity.gui_name,
'gui_tooltip': entity.gui_tooltip,
'parent_type': entity.parent_type,
'regex_pattern': entity.regex_pattern,
'script_parser': entity.script_parser,
'parser_enabled': entity.parser_enabled
}
with open('./data/entities.yaml', 'w') as file:
yaml.dump(yaml_data, file)
def capture_foreign_keys(self, entity_id, session):
foreign_keys = {}
# Use entity_id to capture references
distinct_entities_refs = session.query(DistinctEntitiesTable).filter_by(entity_types_id=entity_id).all()
foreign_keys['distinct_entities'] = [ref.distinct_entities_id for ref in distinct_entities_refs]
entities_refs = session.query(EntitiesTable).filter_by(entity_types_id=entity_id).all()
foreign_keys['entities'] = [ref.entities_id for ref in entities_refs]
return foreign_keys
def reassign_foreign_keys(self, new_entity, foreign_keys, session):
# Reassigning references in DistinctEntitiesTable
for distinct_id in foreign_keys.get('distinct_entities', []):
distinct_entity = session.query(DistinctEntitiesTable).get(distinct_id)
distinct_entity.entity_types_id = new_entity.entity_type_id
# Reassigning references in EntitiesTable
for entity_id in foreign_keys.get('entities', []):
entity = session.query(EntitiesTable).get(entity_id)
entity.entity_types_id = new_entity.entity_type_id
def checkScriptPresence(self):
parser_directory = './data/parser'
missing_scripts = []
with session_scope() as session:
all_entities = session.query(EntityTypesTable).all()
for entity in all_entities:
script_name = entity.script_parser
if script_name:
script_path = os.path.join(parser_directory, script_name)
if not os.path.exists(script_path):
missing_scripts.append(script_name)
if missing_scripts:
missing_scripts_str = "\n".join(missing_scripts)
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setWindowTitle("Fehlende Skripte")
msg.setText(".\nDas ist nicht zwingend ein Problem, aber falls nötig,\nsollten die Skritpte in ./data/parser/ ergänzt werden.\nListe der erwarteten Skripte:")
msg.setInformativeText(missing_scripts_str)
msg.exec_() # Display the message box
return missing_scripts
def purgeWordlistEntries(self):
try:
with session_scope() as session:
# Identify the entity_type_id for 'generated_wordlist_match'
wordlist_entity_type = session.query(EntityTypesTable).filter_by(entity_type='generated_wordlist_match').one_or_none()
if not wordlist_entity_type:
logging.info("No 'generated_wordlist_match' entity type found. No action taken.")
return
# Find all distinct entities associated with the wordlist entity type
distinct_entities_to_remove = session.query(DistinctEntitiesTable).filter_by(entity_types_id=wordlist_entity_type.entity_type_id).all()
for distinct_entity in distinct_entities_to_remove:
# Remove all related entities entries and their context
entities_to_remove = session.query(EntitiesTable).filter_by(distinct_entities_id=distinct_entity.distinct_entities_id).all()
for entity in entities_to_remove:
# Remove related context entries
session.query(ContextTable).filter_by(entities_id=entity.entities_id).delete()
# Remove the entity itself
session.delete(entity)
# Commit the changes
session.commit()
except Exception as e:
logging.error(f"Error during wordlist entries purge: {str(e)}")
raise
class ResolveInconsistenciesDialog(QDialog):
def __init__(self, inconsistencies, parent=None):
super().__init__(parent)
self.setWindowTitle("Inkonsistenzen auflösen")
self.inconsistencies = inconsistencies
self.resolution_choices = []
self.selected_entity = None
self.selected_entities = []
self.selected_resolutions = []
self.initUI()
def initUI(self):
layout = QVBoxLayout(self)
for db_entity, yaml_entity in self.inconsistencies:
db_entity_str = self.format_entity_for_display(db_entity)
yaml_entity_str = self.format_entity_for_display(yaml_entity)
# Create labels and radio buttons for each inconsistency
db_label = QLabel(f"Datenbank-Eintrag: {db_entity_str}")
yaml_label = QLabel(f"YAML-Eintrag: {yaml_entity_str}")
db_radio = QRadioButton("Datenbank-Eintrag behalten")
yaml_radio = QRadioButton("YAML-Eintrag behalten")
layout.addWidget(db_label)
layout.addWidget(db_radio)
layout.addWidget(yaml_label)
layout.addWidget(yaml_radio)
self.resolution_choices.append((db_radio, yaml_radio))
# Buttons for OK and Cancel
btn_ok = QPushButton("OK", self)
btn_ok.clicked.connect(self.on_ok)
btn_cancel = QPushButton("Abbruch", self)
btn_cancel.clicked.connect(self.reject)
layout.addWidget(btn_ok)
layout.addWidget(btn_cancel)
def on_ok(self):
self.selected_resolutions = [] # Reset the list before storing new selections
for (db_radio, yaml_radio), (db_entity, yaml_entity) in zip(self.resolution_choices, self.inconsistencies):
if db_radio.isChecked():
self.selected_resolutions.append(('db', db_entity))
elif yaml_radio.isChecked():
self.selected_resolutions.append(('yaml', yaml_entity))
else:
self.selected_resolutions.append((None, None))
self.accept()
def getSelectedResolutions(self):
return self.selected_resolutions
def format_entity_for_display(self, entity):
if isinstance(entity, dict):
# YAML entity is already a dictionary
return "\n".join(f"{key}: {value}" for key, value in entity.items())
else:
# Database entity needs to be formatted
return "\n".join(f"{attr}: {getattr(entity, attr)}" for attr in ['entity_type', 'gui_name', 'gui_tooltip', 'parent_type', 'regex_pattern', 'script_parser', 'parser_enabled'])
class DuplicateErrorDialog(QDialog):
def __init__(self, duplicates, parent=None):
super().__init__(parent)
self.setWindowTitle("Duplikate gefunden")
self.duplicates = duplicates
self.initUI()
def initUI(self):
layout = QVBoxLayout(self)
# Display duplicate entries
error_label = QLabel("Duplikate wurden in ./data/entities.yaml gefunden. Diese sollten manuell aufgelöst werden:")
layout.addWidget(error_label)
for dup in self.duplicates:
dup_str = self.format_entity_for_display(dup)
dup_label = QLabel(dup_str)
layout.addWidget(dup_label)
# Buttons
open_button = QPushButton("YAML-Datei oeffnen", self)
open_button.clicked.connect(self.openYAML)
exit_button = QPushButton("Abbruch", self)
exit_button.clicked.connect(self.close)
layout.addWidget(open_button)
layout.addWidget(exit_button)
def format_entity_for_display(self, entity):
if isinstance(entity, dict):
return "\n".join(f"{key}: {value}" for key, value in entity.items())
def openYAML(self):
ui_helper = UIHelper(main_window=self)
ui_helper.openFile('data/entities.yaml')

View File

@@ -0,0 +1,89 @@
import shutil
import logging
import time
import os
from PyQt5.QtWidgets import QFileDialog
from sqlalchemy.exc import SQLAlchemyError
from logline_leviathan.database.database_manager import session_scope
from logline_leviathan.database.database_operations import DatabaseOperations
class DatabaseUtility():
def __init__(self, main_window):
self.main_window = main_window
self.database_operations = DatabaseOperations(self, main_window.db_init_func)
def purgeDatabase(self):
if self.main_window.isProcessing():
self.main_window.showProcessingWarning()
return
try:
with session_scope() as db_session:
# Close and dispose of any existing database session
if db_session:
db_session.close()
db_session.bind.dispose()
# Attempt to delete the database file with retries
retries = 3
for attempt in range(retries):
try:
if os.path.exists('entities.db'):
os.remove('entities.db')
break
except OSError as e:
if attempt < retries - 1:
time.sleep(0.1)
else:
raise e
# Reinitialize the database
self.main_window.db_init_func()
self.main_window.statusLabel.setText(" Leere Datenbank initalisiert. Mit der Analyse fortfahren.")
logging.debug("Database created.")
yaml_data = self.database_operations.loadRegexFromYAML()
self.database_operations.populate_and_update_entities_from_yaml(yaml_data)
self.main_window.refreshApplicationState()
self.main_window.generate_report_window.updateCheckboxes() # Add this line to update the checkboxes
self.main_window.generate_wordlist_window.updateCheckboxes()
except SQLAlchemyError as e:
logging.error(f"Error creating database: {e}")
except Exception as e:
logging.error(f"General error: {e}")
def importDatabase(self):
if self.main_window.isProcessing():
self.main_window.showProcessingWarning()
return
options = QFileDialog.Options()
db_file, _ = QFileDialog.getOpenFileName(self.main_window, "Select External Database", "", "Database Files (*.db);;All Files (*)", options=options)
if db_file and db_file.endswith(".db"):
try:
shutil.copy(db_file, 'entities.db')
self.main_window.current_db_path = db_file
self.main_window.statusLabel.setText(" Bestehende Datenbank für diese Sitzung ausgewählt.")
self.main_window.refreshApplicationState()
self.main_window.generate_report_window.updateCheckboxes() # Add this line to update the checkboxes
self.main_window.generate_wordlist_window.updateCheckboxes()
except Exception as e:
logging.error(f"Error selecting external database: {e}")
self.main_window.statusLabel.setText(f" Fehler bei der Auswahl der Datenbank: {e}")
else:
self.main_window.statusLabel.setText(" Keine gueltige Datenbank ausgewählt.")
def exportDatabase(self):
if self.main_window.isProcessing():
self.main_window.showProcessingWarning()
return
options = QFileDialog.Options()
default_filename = "entities_" + time.strftime('%Y%m%d_%H%M%S') + ".db"
save_path, _ = QFileDialog.getSaveFileName(self.main_window, "Save Database File", default_filename, "Database Files (*.db);;All Files (*)", options=options)
if save_path:
try:
shutil.copy('entities.db', save_path)
self.main_window.statusLabel.setText(f" Datenbank erfolgreich exportiert nach {save_path}")
except Exception as e:
logging.error(f"Error exporting database: {e}")
self.main_window.statusLabel.setText(f" Fehler beim Exportieren der Datenbank: {e}")

View File

@@ -0,0 +1,816 @@
from sqlalchemy import or_, and_, not_, String
from PyQt5.QtWidgets import QProgressBar, QMainWindow, QTableWidget, QTableWidgetItem, QLineEdit, QStyledItemDelegate, QTextEdit, QWidget, QVBoxLayout, QHBoxLayout, QPushButton, QComboBox, QStyle, QLabel
from logline_leviathan.database.database_manager import get_db_session, EntitiesTable, DistinctEntitiesTable, EntityTypesTable, ContextTable, FileMetadata, session_scope
from PyQt5.QtCore import pyqtSignal, Qt, QThread, pyqtSignal, QTimer
from PyQt5.QtGui import QTextDocument, QTextOption
from fuzzywuzzy import fuzz
import re
import logging
import html
class QueryThread(QThread):
queryCompleted = pyqtSignal(list, list) # Signal to indicate completion
def __init__(self, db_query_instance, query_text):
super(QueryThread, self).__init__()
self.db_query_instance = db_query_instance
self.query_text = query_text
def run(self):
base_query, search_terms = self.db_query_instance.prepare_query(self.query_text)
query_lambda = self.db_query_instance.parse_query(self.query_text)
# Pass the lambda function directly to filter
results = base_query.filter(query_lambda).all()
# Calculate scored results
scored_results = [(result, self.db_query_instance.calculate_match_score(result, self.query_text)) for result in results]
self.queryCompleted.emit(scored_results, search_terms)
class DatabaseGUIQuery:
def __init__(self):
self.db_session = get_db_session()
self.entity_types = EntityTypesTable
self.entities = EntitiesTable
self.distinct_entities = DistinctEntitiesTable
self.context = ContextTable
self.file_metadata = FileMetadata
def parse_query(self, query):
if not query.strip():
return lambda _: False
# Split and strip special characters for database query
tokens = re.findall(r'"[^"]+"|\S+', query)
stripped_tokens = [token.strip('+-"') for token in tokens]
filters = []
for token in stripped_tokens:
search_condition = f'%{token.replace("*", "%")}%'
condition = or_(
self.distinct_entities.distinct_entity.like(search_condition),
self.entity_types.entity_type.like(search_condition),
self.entity_types.gui_name.like(search_condition),
self.entity_types.gui_tooltip.like(search_condition),
self.entity_types.script_parser.like(search_condition),
self.file_metadata.file_name.like(search_condition),
self.file_metadata.file_path.like(search_condition),
self.file_metadata.file_mimetype.like(search_condition),
self.entities.line_number.cast(String).like(search_condition),
self.context.context_large.like(search_condition)
# Add other fields as needed
)
filters.append(condition)
return lambda: or_(*filters)
def parse_search_terms(self, query):
tokens = query.split()
search_terms = [token.lstrip('+-') for token in tokens if not token.startswith('-') and not token.startswith('+')]
return search_terms
def prepare_query(self, query):
search_terms = self.parse_search_terms(query)
# Construct the base query with proper joins
base_query = self.db_session.query(
self.distinct_entities.distinct_entity,
self.entity_types.gui_name,
self.file_metadata.file_name,
self.entities.line_number,
self.entities.entry_timestamp,
self.context.context_large
).join(
self.entities, self.distinct_entities.distinct_entities_id == self.entities.distinct_entities_id
).join(
self.file_metadata, self.entities.file_id == self.file_metadata.file_id
).join(
self.context, self.entities.entities_id == self.context.entities_id
).join(
self.entity_types, self.entities.entity_types_id == self.entity_types.entity_type_id
).distinct()
# Apply filters and return results
return base_query, search_terms
def display_results(self, results, search_terms):
self.results_window = ResultsWindow(results, search_terms)
self.results_window.show()
def calculate_match_score(self, result, query):
# Adjusted weights and thresholds
distinct_entity_weight = 4
file_name_weight = 4
timestamp_weight = 1
line_number_weight = 1
context_weight = 5
multiple_term_weight = 1
order_weight = 8 # Increased weight for exact order of terms
fuzzy_match_weight = 0.3 # More discerning fuzzy match
threshold_for_fuzzy = 90 # Higher threshold for fuzzy matches
proximity_weight = 2 # Increased weight for proximity
positive_operand_weight = 10 # Weight for terms with '+'
negative_operand_penalty = -5 # Penalty for terms with '-'
exact_match_weight = 10 # Increased weight for exact sequence match
score = 0
# Extracting operands and terms
tokens = re.findall(r'"[^"]+"|\S+', query)
processed_terms = [(token.startswith('+'), token.startswith('-'), token.strip('+-"').lower()) for token in tokens]
# Normalize result fields
lower_distinct_entity = result.distinct_entity.lower()
lower_file_name = result.file_name.lower()
timestamp_str = str(result.entry_timestamp).lower()
line_number_str = str(result.line_number).lower()
words_in_context = result.context_large.lower().split()
# Check matches in various fields with operand consideration
for is_positive, is_negative, term in processed_terms:
if term in lower_distinct_entity:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else distinct_entity_weight)
if term in lower_file_name:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else file_name_weight)
if term in timestamp_str:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else timestamp_weight)
if term in line_number_str:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else line_number_weight)
if term in words_in_context:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else context_weight)
# Creating a cleaned substring of search terms in the exact order they appear in the query
exact_terms_substring = ' '.join([token.strip('+-"').lower() for token in tokens])
# Check for exact order of terms in the context
if exact_terms_substring and exact_terms_substring in ' '.join(words_in_context):
score += exact_match_weight
# Check for exact order of terms
if '"' in query:
exact_query = ' '.join(term for _, _, term in processed_terms)
if exact_query in ' '.join(words_in_context):
score += order_weight
# Additional weight for multiple different terms
unique_terms = set(term for _, _, term in processed_terms)
score += len(unique_terms) * multiple_term_weight
# Proximity score calculation
for _, _, term in processed_terms:
if term in words_in_context:
# Find the positions of the term and the entity in the context
term_pos = words_in_context.index(term)
entity_pos = words_in_context.index(lower_distinct_entity) if lower_distinct_entity in words_in_context else 0
# Calculate the distance and adjust the score
distance = abs(term_pos - entity_pos)
proximity_score = max(0, proximity_weight - distance * 0.01) # Reduce score based on distance
score += proximity_score
# Fuzzy matching
all_text = f"{result.distinct_entity} {result.file_name} {result.entry_timestamp} {result.line_number} {result.context_large}".lower()
for _, _, term in processed_terms:
fuzzy_score = max(fuzz.partial_ratio(term, word) for word in all_text.split())
if fuzzy_score > threshold_for_fuzzy:
score += (fuzzy_score / 100) * fuzzy_match_weight
# Normalize the score
max_possible_positive_score = (
distinct_entity_weight + file_name_weight +
timestamp_weight + line_number_weight +
context_weight * len(processed_terms) + # Assuming each term can match in the context
order_weight + exact_match_weight +
len(processed_terms) * multiple_term_weight + # Each term contributes to multiple_term_weight
len(processed_terms) * positive_operand_weight # Each term could have a positive operand
)
# Considering the negative operand penalty
max_possible_negative_score = len(processed_terms) * negative_operand_penalty
# The maximum score is the sum of the possible positive score and the absolute value of the possible negative score
max_possible_score = max_possible_positive_score + abs(max_possible_negative_score)
# Normalizing the score to a scale of 100
score = (score / max_possible_score) * 100
return score
def get_entity_types(self):
with session_scope() as session:
# Query to filter entity types that have either regex_pattern or script_parser
return [entity_type.gui_name for entity_type in session.query(EntityTypesTable)
.filter(or_(EntityTypesTable.regex_pattern.isnot(None),
EntityTypesTable.script_parser.isnot(None)))
.all()]
COLUMN_WIDTHS = [200, 100, 250, 100, 120, 600, 80] # Adjust these values as needed
COLUMN_NAMES = ['Distinct Entity', 'Entity Type', 'File Name', 'Line Number', 'Timestamp', 'Context', 'Match Score']
DEFAULT_ROW_HEIGHT = 120
FILTER_EDIT_WIDTH = 150
class ResultsWindow(QMainWindow):
def __init__(self, db_query_instance, parent=None):
super(ResultsWindow, self).__init__(parent)
self.db_query_instance = db_query_instance
self.loaded_data_count = 0
self.total_data = []
self.current_filters = {}
self.setWindowTitle("Suchergebnis")
self.setGeometry(800, 600, 1500, 600) # Adjust size as needed
# Create central widget and set layout
centralWidget = QWidget(self)
self.setCentralWidget(centralWidget)
mainLayout = QVBoxLayout(centralWidget)
queryFieldLayout = QHBoxLayout()
self.databaseQueryLineEdit = QueryLineEdit(self)
self.databaseQueryLineEdit.setPlaceholderText(" Suchbegriff eingeben...")
self.databaseQueryLineEdit.returnPressed.connect(self.execute_query_from_results_window)
self.databaseQueryLineEdit.setStyleSheet("""
QLineEdit {
background-color: #3C4043;
color: white;
min-height: 20px;
}
""")
queryFieldLayout.addWidget(self.databaseQueryLineEdit)
# Create a progress bar for query in progress
self.queryProgressBar = QProgressBar(self)
self.queryProgressBar.setRange(0, 1) # Indeterminate mode
self.queryProgressBar.setFixedWidth(100) # Initially hidden
queryFieldLayout.addWidget(self.queryProgressBar)
executeQueryButton = QPushButton("Suche ausführen", self)
executeQueryButton.clicked.connect(self.execute_query_from_results_window)
queryFieldLayout.addWidget(executeQueryButton)
mainLayout.addLayout(queryFieldLayout)
# Create a horizontal layout for filter options
filterLayout = QHBoxLayout()
mainLayout.addLayout(filterLayout)
# Add the table widget to the main layout
self.tableWidget = QTableWidget()
mainLayout.addWidget(self.tableWidget)
# Updated stylesheet for the entire ResultsWindow
stylesheet = """
/* Styles for QTableWidget and headers */
QTableWidget, QHeaderView::section {
background-color: #2A2F35;
color: white;
border: 1px solid #4A4A4A;
}
/* Style for QLineEdit */
QLineEdit {
background-color: #3A3F44;
color: white;
border: 1px solid #4A4A4A;
}
/* Style for QPushButton */
QPushButton {
background-color: #4B5563;
color: white;
border-radius: 4px;
padding: 5px;
margin: 5px;
}
QPushButton:hover {
background-color: #5C677D;
}
QPushButton:pressed {
background-color: #2A2F35;
}
/* Style for empty rows and other areas */
QWidget {
background-color: #2A2F35;
color: white;
}
"""
self.setStyleSheet(stylesheet)
# Apply default row height after setting up the table
self.tableWidget.verticalHeader().setDefaultSectionSize(DEFAULT_ROW_HEIGHT)
self.clearAllButton = QPushButton("Alle Filteroptionen loeschen", self)
self.clearAllButton.clicked.connect(self.clear_all_filters)
filterLayout.addWidget(self.clearAllButton)
# Adding filter options after table setup
self.entityTypeComboBox = QComboBox()
filterLayout.addWidget(self.entityTypeComboBox)
# Initialize filterWidgets before calling setup_table
self.filterWidgets = []
# Create and add QLineEdit widgets to the filter layout
for i, column_name in enumerate(COLUMN_NAMES):
# Skipping the filter creation for certain columns
if column_name in ['Entity Type', 'Context']:
continue
filter_edit = QLineEdit(self)
filter_edit.setFixedWidth(FILTER_EDIT_WIDTH)
filter_edit.setPlaceholderText(f"Filtern nach {column_name}")
filter_edit.textChanged.connect(lambda text, col=i: self.apply_filter(text, col))
self.filterWidgets.append(filter_edit)
filterLayout.addWidget(filter_edit)
self.dataLoadTimer = QTimer(self)
self.dataLoadTimer.timeout.connect(self.load_more_data)
# Create and add the Dismiss button
self.dismissButton = QPushButton("Schließen", self)
self.dismissButton.clicked.connect(self.dataLoadTimer.stop)
self.dismissButton.clicked.connect(self.close)
mainLayout.addWidget(self.dismissButton)
self.populate_entity_type_combobox()
# Adjust column widths and filter widgets' widths
self.adjust_column_widths()
#self.tableWidget.verticalScrollBar().valueChanged.connect(self.check_scroll)
def populate_entity_type_combobox(self):
entity_types = DatabaseGUIQuery().get_entity_types()
self.entityTypeComboBox.addItem("Alle verfügbaren Typen", None) # Default option
for entity_type in entity_types:
self.entityTypeComboBox.addItem(entity_type, entity_type)
self.entityTypeComboBox.currentIndexChanged.connect(self.filter_by_entity_type)
def clear_table(self):
self.tableWidget.clear()
self.tableWidget.setRowCount(0)
self.tableWidget.setColumnCount(0)
def adjust_column_widths(self):
for column, width in enumerate(COLUMN_WIDTHS):
self.tableWidget.setColumnWidth(column, width)
def execute_query_from_results_window(self):
self.dataLoadTimer.start(2000)
query_text = self.databaseQueryLineEdit.text()
if not query_text:
return
self.clear_table()
self.queryProgressBar.setRange(0, 0)
self.query_thread = QueryThread(self.db_query_instance, query_text)
self.query_thread.queryCompleted.connect(self.on_query_completed)
self.query_thread.start()
def set_query_and_execute(self, query_text):
self.databaseQueryLineEdit.setText(query_text)
self.execute_query_from_results_window()
def on_query_completed(self, results, search_terms):
logging.debug(f"Query completed with {len(results)} results") # Debug statementself.queryProgressBar.setRange(0, 1)
self.total_data = results
self.search_terms = search_terms
self.loaded_data_count = 0
self.setup_table(search_terms)
self.apply_all_filters()
def setup_table(self, search_terms=[]):
# Set up the table columns and headers
self.tableWidget.setColumnCount(7)
self.tableWidget.setHorizontalHeaderLabels(['Distinct Entity', 'Entity Type', 'File Name', 'Line Number', 'Timestamp', 'Context', 'Match Score'])
highlight_delegate = HighlightDelegate(self, search_terms)
self.tableWidget.setItemDelegateForColumn(0, highlight_delegate)
self.tableWidget.setItemDelegateForColumn(1, highlight_delegate)
self.tableWidget.setItemDelegateForColumn(3, highlight_delegate)
# Apply column widths
self.adjust_column_widths()
# Disable sorting when initially populating data
self.tableWidget.setSortingEnabled(False)
# Load initial subset of data
self.load_more_data()
# Enable sorting by 'Match Score' after data is populated
self.tableWidget.setSortingEnabled(True)
self.tableWidget.sortItems(6, Qt.DescendingOrder)
def add_table_row(self, row_index, result, score):
self.tableWidget.insertRow(row_index)
# Distinct Entity with highlighting
distinct_entity_item = QTableWidgetItem(str(result[0]))
self.tableWidget.setItem(row_index, 0, distinct_entity_item)
# Entity Type
entity_type_item = QTableWidgetItem(str(result[1]))
self.tableWidget.setItem(row_index, 1, entity_type_item)
# File Name - using CellWidget
file_name_widget = CellWidget(str(result[2]), self.filterWidgets[1], self.search_terms)
self.tableWidget.setCellWidget(row_index, 2, file_name_widget)
file_name_item = QTableWidgetItem()
file_name_item.setData(Qt.UserRole, str(result[2]))
self.tableWidget.setItem(row_index, 2, file_name_item)
# Line Number
line_number_item = QTableWidgetItem(str(result[3]))
self.tableWidget.setItem(row_index, 3, line_number_item)
# Timestamp - using CellWidget
timestamp_widget = CellWidget(str(result[4]), self.filterWidgets[3], self.search_terms)
self.tableWidget.setCellWidget(row_index, 4, timestamp_widget)
timestamp_item = QTableWidgetItem()
timestamp_item.setData(Qt.UserRole, str(result[4]))
self.tableWidget.setItem(row_index, 4, timestamp_item)
# Context - using ScrollableTextWidget
scrollable_widget = ScrollableTextWidget(result[5], self.search_terms, str(result[0]))
self.tableWidget.setCellWidget(row_index, 5, scrollable_widget)
# Match Score
match_score_item = NumericTableWidgetItem("{:.4f}".format(float(score)))
self.tableWidget.setItem(row_index, 6, match_score_item)
# Apply highlight delegate if needed
highlight_delegate = HighlightDelegate(self, self.search_terms)
self.tableWidget.setItemDelegateForRow(row_index, highlight_delegate)
# Restore sorting, if it was enabled
self.tableWidget.setSortingEnabled(True)
# Check if total rows exceed 100 and remove the lowest 20% if so
if self.tableWidget.rowCount() > 500:
self.remove_lowest_scoring_rows(10) # 20% to be removed
def load_more_data(self):
if not self.is_new_data_available():
return # No new data available, just return
start_index = self.loaded_data_count
chunk_size = 50 # Adjust this number based on performance
end_index = min(start_index + chunk_size, len(self.total_data))
# Calculate the average match score of the current items
average_score = self.calculate_average_score()
# Sort the chunk by match score in descending order
sorted_chunk = sorted(self.total_data[start_index:end_index], key=lambda x: x[1], reverse=True)
for row_data in sorted_chunk:
score = row_data[1]
if score > average_score:
row_index = start_index + len(sorted_chunk) # Adjust index based on the sorted chunk
if self.matches_current_filters(row_index, row_data):
self.insert_row_in_sorted_order(row_data)
# Reapply filters after loading new data
self.apply_all_filters()
# Update loaded_data_count or other mechanism to keep track of processed data
self.update_data_tracking(end_index)
self.tableWidget.update() # Refresh the table
def remove_lowest_scoring_rows(self, percentage):
total_rows = self.tableWidget.rowCount()
rows_to_remove = total_rows * percentage // 100
# Collect scores and associated row indices
score_rows = []
for row in range(total_rows):
score_item = self.tableWidget.item(row, 6) # Assuming column 6 is Match Score
if score_item:
score_rows.append((float(score_item.text()), row))
# Sort by scores (ascending) and select the lowest ones
score_rows.sort(key=lambda x: x[0])
lowest_score_rows = score_rows[:rows_to_remove]
# Remove rows with the lowest scores
for _, row in sorted(lowest_score_rows, key=lambda x: x[1], reverse=True):
self.tableWidget.removeRow(row)
def is_new_data_available(self):
return self.loaded_data_count < len(self.total_data)
def calculate_average_score(self):
total_score = 0
row_count = self.tableWidget.rowCount()
for row_index in range(row_count):
score_item = self.tableWidget.item(row_index, 6) # Assuming column 6 is Match Score
total_score += float(score_item.text()) if score_item else 0
return total_score / row_count if row_count > 0 else 0
def update_data_tracking(self, end_index):
# Update loaded_data_count or implement other mechanism to keep track of processed data
self.loaded_data_count = end_index
def insert_row_in_sorted_order(self, row_data):
row_index = 0
score = row_data[1]
# Find the correct position based on match score
while row_index < self.tableWidget.rowCount():
current_score_item = self.tableWidget.item(row_index, 6) # Assuming column 6 is Match Score
current_score = float(current_score_item.text()) if current_score_item else 0
if score > current_score:
break
row_index += 1
self.add_table_row(row_index, row_data[0], score)
def matches_current_filters(self, row_index, row_data):
for column, filter_text in self.current_filters.items():
if not self.is_match(row_index, column, filter_text, row_data):
return False
return True
def is_match(self, row_index, column, filter_text, row_data):
# Extract text from the cell or widget
widget = self.tableWidget.cellWidget(row_index, column)
if isinstance(widget, CellWidget):
# CellWidget contains a QLabel with HTML-formatted text
document = QTextDocument()
document.setHtml(widget.label.text())
text = document.toPlainText()
elif isinstance(widget, ScrollableTextWidget):
# ScrollableTextWidget contains a QTextEdit with HTML-formatted text
text = widget.text_edit.toPlainText()
else:
# Standard QTableWidgetItem
item = self.tableWidget.item(row_index, column)
text = item.text() if item else ""
# Compare the extracted plain text with the filter text
return filter_text.lower() in text.lower()
def apply_filter(self, text, column):
self.current_filters[column] = text.lower()
self.apply_all_filters()
def extract_row_data(self, row_index):
# Construct row_data from the table content
row_data = []
for column in range(self.tableWidget.columnCount()):
cell_data = self.get_cell_data(row_index, column)
row_data.append(cell_data)
return row_data
def get_cell_data(self, row_index, column):
widget = self.tableWidget.cellWidget(row_index, column)
if isinstance(widget, CellWidget):
document = QTextDocument()
document.setHtml(widget.label.text())
return document.toPlainText()
elif isinstance(widget, ScrollableTextWidget):
return widget.text_edit.toPlainText()
else:
item = self.tableWidget.item(row_index, column)
return item.text() if item else ""
def apply_all_filters(self):
for row_index in range(self.tableWidget.rowCount()):
row_data = self.extract_row_data(row_index)
if self.matches_current_filters(row_index, row_data):
self.tableWidget.showRow(row_index)
else:
self.tableWidget.hideRow(row_index)
def filter_by_entity_type(self):
selected_type = self.entityTypeComboBox.currentData()
#logging.debug(f"Filtering by entity type: {selected_type}")
# Update the current filters dictionary
entity_type_column = COLUMN_NAMES.index('Entity Type') # Assuming 'Entity Type' is one of the column names
if selected_type is None:
# Clear the filter for entity type if 'All Entity Types' is selected
if entity_type_column in self.current_filters:
del self.current_filters[entity_type_column]
else:
# Set the filter for entity type
self.current_filters[entity_type_column] = selected_type.lower()
# Reapply all filters including the entity type filter
self.apply_all_filters()
def on_filter_change(self):
# Reapply all filters
self.apply_all_filters()
def clear_all_filters(self):
for filter_widget in self.filterWidgets:
filter_widget.clear()
self.current_filters.clear() # Clear all filters
#logging.debug("All filters cleared")
for row in range(self.tableWidget.rowCount()):
self.tableWidget.showRow(row) # Show all rows
# Optionally reapply entity type filter if it should be independent
self.filter_by_entity_type()
@staticmethod
def strip_html_tags(text):
return re.sub('<[^<]+?>', '', text)
class QueryLineEdit(QLineEdit):
returnPressed = pyqtSignal()
def keyPressEvent(self, event):
if event.key() == Qt.Key_Return:
self.returnPressed.emit()
else:
super().keyPressEvent(event)
class HighlightDelegate(QStyledItemDelegate):
def __init__(self, parent=None, search_terms=None):
super().__init__(parent)
self.search_terms = search_terms or []
def paint(self, painter, option, index):
painter.save()
# Set text color and other options
options = QTextOption()
options.setWrapMode(QTextOption.WrapAtWordBoundaryOrAnywhere)
document = QTextDocument()
document.setDefaultTextOption(options)
document.setDefaultFont(option.font)
# Prepare highlighted text
text = index.model().data(index)
highlighted_text = self.get_highlighted_text(text)
document.setHtml(highlighted_text)
# Set the width of the document to the cell width
document.setTextWidth(option.rect.width())
# Draw the contents
painter.translate(option.rect.topLeft())
document.drawContents(painter)
painter.restore()
def get_highlighted_text(self, text):
if text is None:
text = ""
text_with_color = f"<span style='color: white;'>{text}</span>"
for term in self.search_terms:
# Retain the '+' at the beginning and strip other special characters
is_positive = term.startswith('+')
clean_term = re.sub(r'[^\w\s]', '', term.lstrip('+-')).lower()
if is_positive and clean_term.lower() in text.lower():
# Use regex for case-insensitive search and replace
regex = re.compile(re.escape(clean_term), re.IGNORECASE)
highlighted_term = f"<span style='background-color: yellow; color: black;'>{clean_term}</span>"
text_with_color = regex.sub(highlighted_term, text_with_color)
return text_with_color.replace("\n", "<br>")
class ScrollableTextWidget(QWidget):
def __init__(self, text, search_terms, distinct_entity, parent=None):
super().__init__(parent)
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
self.text_edit = CustomTextEdit(self)
self.text_edit.setReadOnly(True)
# Apply styles including scrollbar styles
self.text_edit.setStyleSheet("""
QTextEdit {
background-color: #2A2F35; /* Dark blue-ish background */
color: white; /* White text */
}
QTextEdit QScrollBar:vertical {
border: none;
background-color: #3A3F44; /* Dark scrollbar background */
width: 8px; /* Width of the scrollbar */
}
QTextEdit QScrollBar::handle:vertical {
background-color: #6E6E6E; /* Scroll handle color */
border-radius: 4px; /* Rounded corners for the handle */
}
QTextEdit QScrollBar::add-line:vertical, QTextEdit QScrollBar::sub-line:vertical {
background: none;
}
""")
# Set the text with highlighting
self.setHighlightedText(text, search_terms, distinct_entity)
layout.addWidget(self.text_edit)
# Scroll to the distinct entity
self.scroll_to_text(distinct_entity)
def setHighlightedText(self, text, search_terms, distinct_entity):
if text is None:
text = ""
# Wrap the original text in a span to maintain color
text_with_color = f"<span style='color: white;'>{text}</span>"
# Highlight distinct entity in a different color
if distinct_entity:
distinct_entity_escaped = html.escape(distinct_entity)
text_with_color = re.sub(
re.escape(distinct_entity_escaped),
lambda match: f"<span style='background-color: blue; color: white;'>{match.group()}</span>",
text_with_color,
flags=re.IGNORECASE
)
for term in search_terms:
# Check if the term starts with '+'
is_positive = term.startswith('+')
clean_term = re.sub(r'[^\w\s]', '', term.lstrip('+-'))
# If the term starts with '+', highlight all matches regardless of case
if is_positive or clean_term.lower() in text.lower():
regex = re.compile(re.escape(clean_term), re.IGNORECASE)
highlighted_term = f"<span style='background-color: yellow; color: black;'>{clean_term}</span>"
text_with_color = regex.sub(highlighted_term, text_with_color)
self.text_edit.setHtml(text_with_color.replace("\n", "<br>"))
def scroll_to_text(self, text):
if text:
cursor = self.text_edit.document().find(text)
self.text_edit.setTextCursor(cursor)
class CustomTextEdit(QTextEdit):
def __init__(self, parent=None):
super().__init__(parent)
self.setVerticalScrollBarPolicy(Qt.ScrollBarAsNeeded) # Enable vertical scrollbar as needed
def wheelEvent(self, event):
# Always handle the wheel event within QTextEdit
super().wheelEvent(event)
# Stop propagation of the event to parent
if self.verticalScrollBar().isVisible():
event.accept()
else:
event.ignore()
class CellWidget(QWidget):
def __init__(self, text, filter_edit, search_terms, parent=None):
super(CellWidget, self).__init__(parent)
self.layout = QHBoxLayout(self)
self.label = QLabel(text)
self.setHighlightedText(text, search_terms)
self.button = QPushButton()
icon = self.button.style().standardIcon(QStyle.SP_CommandLink) # Example of a standard icon
self.button.setIcon(icon)
self.button.setFixedSize(20, 20) # Adjust size as needed
self.button.clicked.connect(lambda: filter_edit.setText(text))
self.layout.addWidget(self.label)
self.layout.addWidget(self.button)
self.layout.setContentsMargins(0, 0, 0, 0)
self.setLayout(self.layout)
def setHighlightedText(self, text, search_terms):
if text is None:
text = ""
# Wrap the original text in a span to maintain color
text_with_color = f"<span style='color: white;'>{text}</span>"
for term in search_terms:
# Strip leading operands (+ or -) and special characters
clean_term = re.sub(r'[^\w\s]', '', term.lstrip('+-'))
# Use regex for case-insensitive search and replace
regex = re.compile(re.escape(clean_term), re.IGNORECASE)
highlighted_term = f"<span style='background-color: yellow; color: black;'>{clean_term}</span>"
text_with_color = regex.sub(highlighted_term, text_with_color)
self.label.setText(text_with_color)
class NumericTableWidgetItem(QTableWidgetItem):
def __lt__(self, other):
return float(self.text()) < float(other.text())

View File

@@ -0,0 +1,228 @@
from sqlalchemy import or_, String
from logline_leviathan.database.database_manager import get_db_session, EntitiesTable, DistinctEntitiesTable, EntityTypesTable, ContextTable, FileMetadata, session_scope
from PyQt5.QtCore import pyqtSignal, QThread, pyqtSignal
from fuzzywuzzy import fuzz
import re
class QueryThread(QThread):
queryCompleted = pyqtSignal(dict) # Signal to indicate completion with a dictionary
def __init__(self, db_query_instance, query_text):
super(QueryThread, self).__init__()
self.db_query_instance = db_query_instance
self.query_text = query_text
def run(self):
base_query, search_terms = self.db_query_instance.prepare_query(self.query_text)
query_lambda = self.db_query_instance.parse_query(self.query_text)
results = base_query.filter(query_lambda).all()
# Calculate scored results and create a dictionary with entities_id as keys
scored_results = {result.entities_id: self.db_query_instance.calculate_match_score(result, self.query_text) for result in results}
self.queryCompleted.emit(scored_results)
class DatabaseGUIQuery:
def __init__(self):
self.db_session = get_db_session()
self.entity_types = EntityTypesTable
self.entities = EntitiesTable
self.distinct_entities = DistinctEntitiesTable
self.context = ContextTable
self.file_metadata = FileMetadata
def parse_query(self, query):
if not query.strip():
return lambda _: False
# Extract quoted and unquoted parts
quoted_parts = re.findall(r'"([^"]+)"', query)
unquoted_parts = re.split(r'"[^"]+"', query)
# Process unquoted parts (case-insensitive)
unquoted_tokens = []
for part in unquoted_parts:
unquoted_tokens.extend(re.findall(r'\S+', part))
filters = []
# Handling unquoted parts with 'ilike' for case-insensitive search
for token in unquoted_tokens:
search_condition = f'%{token.replace("*", "%")}%'
filters.append(
or_(
self.distinct_entities.distinct_entity.ilike(search_condition),
self.entity_types.entity_type.ilike(search_condition),
self.entity_types.gui_name.ilike(search_condition),
self.entity_types.gui_tooltip.ilike(search_condition),
self.file_metadata.file_name.ilike(search_condition),
self.file_metadata.file_path.ilike(search_condition),
self.file_metadata.file_mimetype.ilike(search_condition),
self.entities.line_number.cast(String).ilike(search_condition),
self.context.context_large.ilike(search_condition)
# ... [add other fields for ilike search]
)
)
# Handling quoted parts with 'like' for case-sensitive exact match
for token in quoted_parts:
exact_condition = f'%{token}%'
filters.append(
or_(
self.distinct_entities.distinct_entity.like(exact_condition),
self.entity_types.entity_type.like(exact_condition),
self.entity_types.gui_name.like(exact_condition),
self.entity_types.gui_tooltip.like(exact_condition),
self.file_metadata.file_name.like(exact_condition),
self.file_metadata.file_path.like(exact_condition),
self.file_metadata.file_mimetype.like(exact_condition),
self.entities.line_number.cast(String).like(exact_condition),
self.context.context_large.like(exact_condition)
# ... [add other fields for exact match search]
)
)
return lambda: or_(*filters)
def parse_search_terms(self, query):
tokens = query.split()
search_terms = [token.lstrip('+-') for token in tokens if not token.startswith('-') and not token.startswith('+')]
return search_terms
def prepare_query(self, query):
search_terms = self.parse_search_terms(query)
# Construct the base query with proper joins
base_query = self.db_session.query(
self.distinct_entities.distinct_entity,
self.entity_types.gui_name,
self.file_metadata.file_name,
self.entities.line_number,
self.entities.entry_timestamp,
self.context.context_large,
self.entities.flag,
self.entities.entities_id
).join(
self.entities, self.distinct_entities.distinct_entities_id == self.entities.distinct_entities_id
).join(
self.file_metadata, self.entities.file_id == self.file_metadata.file_id
).join(
self.context, self.entities.entities_id == self.context.entities_id
).join(
self.entity_types, self.entities.entity_types_id == self.entity_types.entity_type_id
).distinct()
# Apply filters and return results
return base_query, search_terms
def calculate_match_score(self, result, query):
# Adjusted weights and thresholds
distinct_entity_weight = 4
file_name_weight = 4
timestamp_weight = 1
line_number_weight = 1
context_weight = 5
multiple_term_weight = 1
order_weight = 8 # Increased weight for exact order of terms
fuzzy_match_weight = 0.3 # More discerning fuzzy match
threshold_for_fuzzy = 90 # Higher threshold for fuzzy matches
proximity_weight = 2 # Increased weight for proximity
positive_operand_weight = 10 # Weight for terms with '+'
negative_operand_penalty = -5 # Penalty for terms with '-'
exact_match_weight = 10 # Increased weight for exact sequence match
score = 0
# Extracting operands and terms
tokens = re.findall(r'"[^"]+"|\S+', query)
processed_terms = [(token.startswith('+'), token.startswith('-'), token.strip('+-"').lower()) for token in tokens]
# Normalize result fields
lower_distinct_entity = result.distinct_entity.lower()
lower_file_name = result.file_name.lower()
timestamp_str = str(result.entry_timestamp).lower()
line_number_str = str(result.line_number).lower()
words_in_context = result.context_large.lower().split()
# Check matches in various fields with operand consideration
for is_positive, is_negative, term in processed_terms:
if term in lower_distinct_entity:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else distinct_entity_weight)
if term in lower_file_name:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else file_name_weight)
if term in timestamp_str:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else timestamp_weight)
if term in line_number_str:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else line_number_weight)
if term in words_in_context:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else context_weight)
# Creating a cleaned substring of search terms in the exact order they appear in the query
exact_terms_substring = ' '.join([token.strip('+-"').lower() for token in tokens])
# Check for exact order of terms in the context
if exact_terms_substring and exact_terms_substring in ' '.join(words_in_context):
score += exact_match_weight
# Check for exact order of terms
if '"' in query:
exact_query = ' '.join(term for _, _, term in processed_terms)
if exact_query in ' '.join(words_in_context):
score += order_weight
# Additional weight for multiple different terms
unique_terms = set(term for _, _, term in processed_terms)
score += len(unique_terms) * multiple_term_weight
# Proximity score calculation
for _, _, term in processed_terms:
if term in words_in_context:
# Find the positions of the term and the entity in the context
term_pos = words_in_context.index(term)
entity_pos = words_in_context.index(lower_distinct_entity) if lower_distinct_entity in words_in_context else 0
# Calculate the distance and adjust the score
distance = abs(term_pos - entity_pos)
proximity_score = max(0, proximity_weight - distance * 0.01) # Reduce score based on distance
score += proximity_score
# Fuzzy matching
all_text = f"{result.distinct_entity} {result.file_name} {result.entry_timestamp} {result.line_number} {result.context_large}".lower()
for _, _, term in processed_terms:
fuzzy_score = max(fuzz.partial_ratio(term, word) for word in all_text.split())
if fuzzy_score > threshold_for_fuzzy:
score += (fuzzy_score / 100) * fuzzy_match_weight
# Normalize the score
max_possible_positive_score = (
distinct_entity_weight + file_name_weight +
timestamp_weight + line_number_weight +
context_weight * len(processed_terms) + # Assuming each term can match in the context
order_weight + exact_match_weight +
len(processed_terms) * multiple_term_weight + # Each term contributes to multiple_term_weight
len(processed_terms) * positive_operand_weight # Each term could have a positive operand
)
# Considering the negative operand penalty
max_possible_negative_score = len(processed_terms) * negative_operand_penalty
# The maximum score is the sum of the possible positive score and the absolute value of the possible negative score
max_possible_score = max_possible_positive_score + abs(max_possible_negative_score)
# Normalizing the score to a scale of 100
score = (score / max_possible_score) * 100
return score
def get_entity_types(self):
with session_scope() as session:
# Query to filter entity types that have either regex_pattern or script_parser
return [entity_type.gui_name for entity_type in session.query(EntityTypesTable)
.filter(or_(EntityTypesTable.regex_pattern.isnot(None),
EntityTypesTable.script_parser.isnot(None)))
.all()]