implement semeionArtifact

This commit is contained in:
2025-11-29 23:18:55 +01:00
parent 220d1b67b5
commit f5219bf2e4
17 changed files with 484 additions and 2 deletions

View File

@@ -206,7 +206,7 @@ Every artifact — regardless of source platform — conforms to a universal sch
│ │
│ Identity: id, case_id │
│ Classification: artifact_class, source_platform, searchable │
│ Temporal: timestamp, timestamp_precision
│ Temporal: timestamp
│ Actors: [{identifier, display_name, role}]
│ Content: text, semantic_text │
│ Entities: indexed_entities[] (for filtering)

View File

@@ -8,4 +8,4 @@
# TODO: implement qdrant connector
__all__ = []
__all__ = ["QdrantInterface"]

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
class QdrantInterface():
def __init__(self):
pass
def submit(pyload):
pass

View File

@@ -0,0 +1,222 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from datetime import datetime
import enum
from typing import Annotated, Union
from uuid import UUID
from interfaces import QdrantInterface
from pydantic import BaseModel, Field
class ArtifactClass(str, enum):
"""
the general class of the artifact which determines wether it can be searched for or not.
this also affects how the artifact is processed during ingestion and search.
currently only major artifact classes will have semantic meaning associated to them
and are searcheable, while others will only be stored for timeline context display.
"""
# searchable artifact classes:
MESSAGE = "message"
BROWSER_EVENT = "browser_event"
EMAIL = "email"
DOCUMENT = "document"
# non-searchable artifact classes:
FILE_EVENT = "file_event"
PROCESS_EVENT = "process_event"
REGISTRY_EVENT = "registry_event"
SYSTEM_EVENT = "system_event"
NETWORK_EVENT = "network_event"
AUTHENTICATION_EVENT = "authentication_event"
SCHEDULED_TASK = "scheduled_task"
class Actors(BaseModel):
"""
the actor is any entity associated with some behavioral pattern.
"""
identifier: str # some unique identifier which is consistent across artifacts
display_name: str # human readeable, needs to be parsed properly by ingestion module
role: ActorsRole # see class
class ActorsRole(str, enum):
"""
this will determine the role of the actor, which is mainly important for message filtering and later behavioral analysis.
"""
SENDER = "sender" # message, email
RECEIVER = "receiver" # message, email
PARTICIPANT = "participant" # document collaboration, chat groups?
CREATOR = "creator" # document, file system
OWNER = "owner" # filesystem objects
INITIATOR = "initiator" # browser events, filesystem, network events
TARGET = "target" # network events, authentication events
class Content(BaseModel):
"""
the most important field, this is what gets embedded.
"""
text: str
truncated: bool = False # this should never happen, but in some scenario it might be useful for debugging or edge cases
# not chunked: the chunking is handled at ingestion time and linked via chunk_info
class ChunkInfo(BaseModel):
"""
if an artifact is chunked, this contains information about the chunking.
"""
index: int # zero-based index of the chunk
total: int # total number of chunks
class ContextGroup(BaseModel):
"""
some artifacts can be aggregated into context groups, if they are semantically liked
"""
type: ContextGroupType
id: str
class ContextGroupType(str, enum):
"""
some artifacts can be aggregated into context groups, if they are semantically liked
"""
THREAD = "thread" # email threads, message threads
SESSION = "session" # browser sessions, network sessions
# not process trees: they are inherently linked by parent_id references
# not file system directories: they are inherently linked by path and parent_id
# no chunks: they are inherently linked by chunk_info
class Location(BaseModel):
"""
any information about where the artifact was located.
"""
host: str | None = None # hostname or machine identifier
path: str | None = None # file path or resource path
url: str | None = None # URL of the resource
title: str | None = None # mostly there to provide something human readable for browser events
physical: str | None = None # physical location, e.g., GPS coordinates, if applicable
class Ingestion(BaseModel):
"""
metadata about the ingestion process.
"""
ingested_at: datetime
source_file: str # the original source file from which this artifact was ingested
ingestor_id: str # identifier of the ingestor used
ingestor_version: str # version of the ingestor used
# source-specifi classes are imported from submodules
from .source_specific_models import *
source_specific_models = Union[
DocumentMetadata,
EmailMetadata,
MessageMetadata,
FileEventMetadata,
NetworkEventMetadata,
ProcessEventMetadata,
RegistryEventMetadata,
ScheduledTaskMetadata,
SystemEventMetadata,
AuthenticationEventMetadata,
BrowserEventMetadata
]
class SemeionArtifact(BaseModel):
"""
standard artifact structure for the semeion application.
JSON representation:
{
"_schema_version": "1.1.0",
"id": "string (uuid-v5)",
"case_id": "string",
"searchable": "bool",
"artifact_class": "string (enum)",
"source_platform": "string",
"timestamp": "string (ISO8601 UTC)",
"actors": [
{
"identifier": "string",
"display_name": "string | null",
"role": "string (enum)"
}
],
"content": {
"text": "string",
"truncated": "bool"
},
"display_text": "string",
"indexed_entities": ["string"],
"parent_id": "string (uuid) | null",
"chunk_info": {
"index": "int",
"total": "int"
} | null,
"context_group": {
"type": "string (enum)",
"id": "string"
} | null,
"location": {
"host": "string | null",
"path": "string | null",
"url": "string | null",
"title": "string | null"
} | null,
"source_specific": {} | null,
"ingestion": {
"ingested_at": "string (ISO8601)",
"source_file": "string",
"parser_id": "string",
"parser_version": "string"
}
}
"""
_schema_version: str = "1.1.0"
id: UUID = Field(description="deterministic UUID v5 based on case_id, source_file, and unique key")
case_id: str = Field(description="case identifier this artifact belongs to")
searchable: bool = Field(description="searchable or only for timeline context")
artifact_class: ArtifactClass = Field(description="general class of the artifact")
source_platform: str = Field(description="source platform from which this artifact was ingested (sleuthkit, chromium SQLite, gecko SQLite, etc.)")
timestamp: datetime = Field(description="timestamp of the artifact in ISO8601 UTC format - the main timestamp which will be used for searching and timeline ordering")
actors: list[Actors] = Field(description="of actors associated with this artifact")
content: Content = Field(description="main content of the artifact which will be embedded for semantic search")
display_text: str = Field(description="human readable representation of the artifact content for displaying in the UI")
indexed_entities: list[str] = Field(description="list of entities from the artifact content, might use sparse vectors for hybrid search")
parent_id: UUID | None = Field(default=None, description="serves for any nested relationships, e.g., file system hierarchies, process trees, etc.")
chunk_info: ChunkInfo | None = Field(default=None, description="if the artifact content is chunked, this contains information about the chunking")
context_group: ContextGroup | None = Field(default=None, description="a parameter which provides information for semantically linked artifacts, e.g., email threads, browser sessions, etc.")
location: Location | None = Field(default=None, description="information about where the artifact was located, e.g., file path, URL, host, etc.")
source_specific: source_specific_models | None = Field(default=None, description="source-specific metadata for the artifact")
ingestion: Ingestion = Field(description="metadata about the ingestion process")
class Config:
"""
this is necessary for pydantic
"""
use_enum_values = True
populate_by_name = True
validate_assignment = True
def is_searchable(self) -> bool:
if self.artifact_class in [ArtifactClass.MESSAGE, ArtifactClass.BROWSER_EVENT, ArtifactClass.EMAIL, ArtifactClass.DOCUMENT] and self.searchable:
return True
elif self.artifact_class in [ArtifactClass.MESSAGE, ArtifactClass.BROWSER_EVENT, ArtifactClass.EMAIL, ArtifactClass.DOCUMENT] and not self.searchable:
exception = f"Artifact {self.id} of class {self.artifact_class} is marked as non-searchable, but this class should always be searchable."
raise ValueError(exception)
return False
else:
return False
def submit_to_vector_db(self) -> bool:
"""
submit to qdrant
"""
payload = self.model_dump(mode="json", by_alias=True)
QdrantInterface.submit(payload)

View File

@@ -0,0 +1,60 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
# from .llm import LLMClient
# from .qdrant import QdrantClient
from datetime import datetime
from enum import Enum
from typing import Annotated, Union
from uuid import UUID
from pydantic import BaseModel, Field
class SemeionSearchObject(BaseModel):
"""
standard search object structure for the semeion application.
example JSON representation:
{
"_schema_version": "1.1.0",
"query_id": "string (uuid-v4)",
"created_at": "string (ISO8601)",
"semantic_query": "string",
"filters": {
"case_ids": ["string"] | null,
"artifact_classes": ["string"] | null,
"source_platforms": ["string"] | null,
"actor_identifiers": ["string"] | null,
"indexed_entities_any": ["string"] | null,
"time_after": "string (ISO8601)" | null,
"time_before": "string (ISO8601)" | null,
"context_group_ids": ["string"] | null,
"hosts": ["string"] | null
},
"options": {
"limit": "int",
"min_score": "float" | null,
"use_hybrid": "bool"
},
"interpretation": {
"original_query": "string",
"notes": ["string"],
"confidence": "float"
}
}
"""
pass

View File

@@ -0,0 +1,21 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from .document_metadata import DocumentMetadata
from .email import EmailMetadata
from .message import MessageMetadata
from .file_event import FileEventMetadata
from .network_event import NetworkEventMetadata
from .process_event import ProcessEventMetadata
from .registry_event import RegistryEventMetadata
from .scheduled_task import ScheduledTaskMetadata
from .system_event import SystemEventMetadata
from .authentication_event import AuthenticationEventMetadata
from .browser_event import BrowserEventMetadata
__all__ = ["DocumentMetadata", "EmailMetadata", "MessageMetadata", "FileEventMetadata", "NetworkEventMetadata", "ProcessEventMetadata", "RegistryEventMetadata", "ScheduledTaskMetadata", "SystemEventMetadata", "AuthenticationEventMetadata", "BrowserEventMetadata"]

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class AuthenticationEventMetadata(BaseModel):
"""
metadata for authentication events, e.g. ssh logins, user logins, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class BrowserEventMetadata(BaseModel):
"""
metadata for browser events, e.g. history, bookmarks, downloads, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class DocumentMetadata(BaseModel):
"""
metadata for documents, e.g. PDFs, Word files, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class EmailMetadata(BaseModel):
"""
metadata for emails
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class FileEventMetadata(BaseModel):
"""
metadata for file events, e.g. file creation, modification, deletion, filetype, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class MessageMetadata(BaseModel):
"""
metadata for messages (emails, chats, etc.)
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class NetworkEventMetadata(BaseModel):
"""
metadata for network events, e.g. connections, data transfers, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class ProcessEventMetadata(BaseModel):
"""
metadata for process events, e.g. process start, stop, fork, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class RegistryEventMetadata(BaseModel):
"""
metadata for registry events, e.g. key creation, modification, deletion, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class ScheduledTaskMetadata(BaseModel):
"""
metadata for scheduled tasks, e.g. cron jobs, automated scripts, etc.
"""
pass

View File

@@ -0,0 +1,15 @@
#
# Copyright (c) 2025, mstoeck3
# All rights reserved.
#
# This source code is licensed under the BSD-3-Clause license found in the
# LICENSE file in the root directory of this source tree.
#
from pydantic import BaseModel
class SystemEventMetadata(BaseModel):
"""
metadata for system events, e.g. system start, shutdown, errors, etc.
"""
pass