2025-09-03 13:20:23 +02:00

229 lines
11 KiB
Python

from sqlalchemy import or_, String
from logline_leviathan.database.database_manager import get_db_session, EntitiesTable, DistinctEntitiesTable, EntityTypesTable, ContextTable, FileMetadata, session_scope
from PyQt5.QtCore import pyqtSignal, QThread, pyqtSignal
from fuzzywuzzy import fuzz
import re
class QueryThread(QThread):
queryCompleted = pyqtSignal(dict) # Signal to indicate completion with a dictionary
def __init__(self, db_query_instance, query_text):
super(QueryThread, self).__init__()
self.db_query_instance = db_query_instance
self.query_text = query_text
def run(self):
base_query, search_terms = self.db_query_instance.prepare_query(self.query_text)
query_lambda = self.db_query_instance.parse_query(self.query_text)
results = base_query.filter(query_lambda).all()
# Calculate scored results and create a dictionary with entities_id as keys
scored_results = {result.entities_id: self.db_query_instance.calculate_match_score(result, self.query_text) for result in results}
self.queryCompleted.emit(scored_results)
class DatabaseGUIQuery:
def __init__(self):
self.db_session = get_db_session()
self.entity_types = EntityTypesTable
self.entities = EntitiesTable
self.distinct_entities = DistinctEntitiesTable
self.context = ContextTable
self.file_metadata = FileMetadata
def parse_query(self, query):
if not query.strip():
return lambda _: False
# Extract quoted and unquoted parts
quoted_parts = re.findall(r'"([^"]+)"', query)
unquoted_parts = re.split(r'"[^"]+"', query)
# Process unquoted parts (case-insensitive)
unquoted_tokens = []
for part in unquoted_parts:
unquoted_tokens.extend(re.findall(r'\S+', part))
filters = []
# Handling unquoted parts with 'ilike' for case-insensitive search
for token in unquoted_tokens:
search_condition = f'%{token.replace("*", "%")}%'
filters.append(
or_(
self.distinct_entities.distinct_entity.ilike(search_condition),
self.entity_types.entity_type.ilike(search_condition),
self.entity_types.gui_name.ilike(search_condition),
self.entity_types.gui_tooltip.ilike(search_condition),
self.file_metadata.file_name.ilike(search_condition),
self.file_metadata.file_path.ilike(search_condition),
self.file_metadata.file_mimetype.ilike(search_condition),
self.entities.line_number.cast(String).ilike(search_condition),
self.context.context_large.ilike(search_condition)
# ... [add other fields for ilike search]
)
)
# Handling quoted parts with 'like' for case-sensitive exact match
for token in quoted_parts:
exact_condition = f'%{token}%'
filters.append(
or_(
self.distinct_entities.distinct_entity.like(exact_condition),
self.entity_types.entity_type.like(exact_condition),
self.entity_types.gui_name.like(exact_condition),
self.entity_types.gui_tooltip.like(exact_condition),
self.file_metadata.file_name.like(exact_condition),
self.file_metadata.file_path.like(exact_condition),
self.file_metadata.file_mimetype.like(exact_condition),
self.entities.line_number.cast(String).like(exact_condition),
self.context.context_large.like(exact_condition)
# ... [add other fields for exact match search]
)
)
return lambda: or_(*filters)
def parse_search_terms(self, query):
tokens = query.split()
search_terms = [token.lstrip('+-') for token in tokens if not token.startswith('-') and not token.startswith('+')]
return search_terms
def prepare_query(self, query):
search_terms = self.parse_search_terms(query)
# Construct the base query with proper joins
base_query = self.db_session.query(
self.distinct_entities.distinct_entity,
self.entity_types.gui_name,
self.file_metadata.file_name,
self.entities.line_number,
self.entities.entry_timestamp,
self.context.context_large,
self.entities.flag,
self.entities.entities_id
).join(
self.entities, self.distinct_entities.distinct_entities_id == self.entities.distinct_entities_id
).join(
self.file_metadata, self.entities.file_id == self.file_metadata.file_id
).join(
self.context, self.entities.entities_id == self.context.entities_id
).join(
self.entity_types, self.entities.entity_types_id == self.entity_types.entity_type_id
).distinct()
# Apply filters and return results
return base_query, search_terms
def calculate_match_score(self, result, query):
# Adjusted weights and thresholds
distinct_entity_weight = 4
file_name_weight = 4
timestamp_weight = 1
line_number_weight = 1
context_weight = 5
multiple_term_weight = 1
order_weight = 8 # Increased weight for exact order of terms
fuzzy_match_weight = 0.3 # More discerning fuzzy match
threshold_for_fuzzy = 90 # Higher threshold for fuzzy matches
proximity_weight = 2 # Increased weight for proximity
positive_operand_weight = 10 # Weight for terms with '+'
negative_operand_penalty = -5 # Penalty for terms with '-'
exact_match_weight = 10 # Increased weight for exact sequence match
score = 0
# Extracting operands and terms
tokens = re.findall(r'"[^"]+"|\S+', query)
processed_terms = [(token.startswith('+'), token.startswith('-'), token.strip('+-"').lower()) for token in tokens]
# Normalize result fields
lower_distinct_entity = result.distinct_entity.lower()
lower_file_name = result.file_name.lower()
timestamp_str = str(result.entry_timestamp).lower()
line_number_str = str(result.line_number).lower()
words_in_context = result.context_large.lower().split()
# Check matches in various fields with operand consideration
for is_positive, is_negative, term in processed_terms:
if term in lower_distinct_entity:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else distinct_entity_weight)
if term in lower_file_name:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else file_name_weight)
if term in timestamp_str:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else timestamp_weight)
if term in line_number_str:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else line_number_weight)
if term in words_in_context:
score += positive_operand_weight if is_positive else (negative_operand_penalty if is_negative else context_weight)
# Creating a cleaned substring of search terms in the exact order they appear in the query
exact_terms_substring = ' '.join([token.strip('+-"').lower() for token in tokens])
# Check for exact order of terms in the context
if exact_terms_substring and exact_terms_substring in ' '.join(words_in_context):
score += exact_match_weight
# Check for exact order of terms
if '"' in query:
exact_query = ' '.join(term for _, _, term in processed_terms)
if exact_query in ' '.join(words_in_context):
score += order_weight
# Additional weight for multiple different terms
unique_terms = set(term for _, _, term in processed_terms)
score += len(unique_terms) * multiple_term_weight
# Proximity score calculation
for _, _, term in processed_terms:
if term in words_in_context:
# Find the positions of the term and the entity in the context
term_pos = words_in_context.index(term)
entity_pos = words_in_context.index(lower_distinct_entity) if lower_distinct_entity in words_in_context else 0
# Calculate the distance and adjust the score
distance = abs(term_pos - entity_pos)
proximity_score = max(0, proximity_weight - distance * 0.01) # Reduce score based on distance
score += proximity_score
# Fuzzy matching
all_text = f"{result.distinct_entity} {result.file_name} {result.entry_timestamp} {result.line_number} {result.context_large}".lower()
for _, _, term in processed_terms:
fuzzy_score = max(fuzz.partial_ratio(term, word) for word in all_text.split())
if fuzzy_score > threshold_for_fuzzy:
score += (fuzzy_score / 100) * fuzzy_match_weight
# Normalize the score
max_possible_positive_score = (
distinct_entity_weight + file_name_weight +
timestamp_weight + line_number_weight +
context_weight * len(processed_terms) + # Assuming each term can match in the context
order_weight + exact_match_weight +
len(processed_terms) * multiple_term_weight + # Each term contributes to multiple_term_weight
len(processed_terms) * positive_operand_weight # Each term could have a positive operand
)
# Considering the negative operand penalty
max_possible_negative_score = len(processed_terms) * negative_operand_penalty
# The maximum score is the sum of the possible positive score and the absolute value of the possible negative score
max_possible_score = max_possible_positive_score + abs(max_possible_negative_score)
# Normalizing the score to a scale of 100
score = (score / max_possible_score) * 100
return score
def get_entity_types(self):
with session_scope() as session:
# Query to filter entity types that have either regex_pattern or script_parser
return [entity_type.gui_name for entity_type in session.query(EntityTypesTable)
.filter(or_(EntityTypesTable.regex_pattern.isnot(None),
EntityTypesTable.script_parser.isnot(None)))
.all()]