persistent multithreading
This commit is contained in:
@@ -44,4 +44,4 @@ class LLMInterface():
|
||||
return response
|
||||
except Exception as e:
|
||||
print(f"CLNT:LLMCLIENT ERR: {e}")
|
||||
return Exception
|
||||
raise
|
||||
@@ -6,7 +6,7 @@
|
||||
# LICENSE file in the root directory of this source tree.
|
||||
#
|
||||
|
||||
from semeion.services.llm_service import LLMQueryService
|
||||
from semeion.services.qdrant_service import QDrantService
|
||||
from semeion.services.llm_service import LLMService
|
||||
from semeion.services.qdrant_service import QdrantService
|
||||
|
||||
__all__ = ["LLMQueryService", "QDrantService"]
|
||||
__all__ = ["LLMService", "QdrantService"]
|
||||
@@ -9,66 +9,68 @@
|
||||
from semeion.interfaces.llm.llm_client import LLMInterface
|
||||
from PySide6.QtCore import QObject, Signal, Slot, QThread
|
||||
|
||||
class LLMQueryWorker(QThread):
|
||||
target = Signal(object)
|
||||
|
||||
class LLMWorker(QObject): # initializes once and is queried repeatedly
|
||||
result = Signal(object)
|
||||
error = Signal(str)
|
||||
status = Signal(str)
|
||||
|
||||
def __init__(self, prompt: str):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.prompt = prompt
|
||||
self.stop_flag = False
|
||||
self.client = None
|
||||
|
||||
@Slot()
|
||||
def simple_query(self):
|
||||
try:
|
||||
self.status.emit("SRVC:LLM: TASK SIMPLE_QUERY STARTED")
|
||||
interface = LLMInterface()
|
||||
response = interface.simple_query(self.prompt)
|
||||
if self.stop_flag:
|
||||
self.status.emit("SRVC:LLM: TASK SIMPLE_QUERY STOPPED")
|
||||
return
|
||||
def initialize(self):
|
||||
self.client = LLMInterface()
|
||||
self.status.emit("LLM: ready")
|
||||
|
||||
self.status.emit("SRVC:LLM: TASK SIMPLE_QUERY FINISHED")
|
||||
self.target.emit(response)
|
||||
@Slot(str)
|
||||
def run_query(self, prompt: str):
|
||||
if self.client is None:
|
||||
self.error.emit("LLM client not initialized")
|
||||
return
|
||||
try:
|
||||
self.status.emit("LLM: started")
|
||||
raw_response = self.client.simple_query(prompt)
|
||||
response = raw_response.choices[0].message.content or ""
|
||||
self.status.emit("LLM: finished")
|
||||
self.result.emit(response)
|
||||
except Exception as e:
|
||||
self.error.emit(str(e))
|
||||
|
||||
@Slot()
|
||||
def stop(self):
|
||||
self.stop_flag = True
|
||||
self.status.emit("SRVC:LLM: TASK STOP REQUESTED")
|
||||
|
||||
class LLMQueryService(QObject):
|
||||
class LLMService(QObject):
|
||||
"""persistent service"""
|
||||
query_finished = Signal(object)
|
||||
query_error = Signal(str)
|
||||
query_status = Signal(str)
|
||||
|
||||
# signal to send to active worker
|
||||
_do_query = Signal(str)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
print("SRVC:LLMQUERYSERVICE INIT")
|
||||
self.worker = None
|
||||
self.new_thread = None
|
||||
self._thread = QThread()
|
||||
self.worker = LLMWorker()
|
||||
|
||||
@Slot(str)
|
||||
def query_llm_simple(self, prompt: str):
|
||||
self.new_thread = QThread()
|
||||
print("SRVC:LLMQUERYSERVICE QUERY_LLM_SIMPLE - NEW THREAD")
|
||||
self.worker = LLMQueryWorker(prompt)
|
||||
print("SRVC:LLMQUERYSERVICE QUERY_LLM_SIMPLE - WORKER INIT")
|
||||
self.worker.moveToThread(self._thread)
|
||||
|
||||
self.worker.moveToThread(self.new_thread)
|
||||
self.new_thread.started.connect(self.worker.simple_query)
|
||||
print("SRVC:LLMQUERYSERVICE QUERY_LLM_SIMPLE - WORKER STARED AND CONNECTED")
|
||||
self._thread.started.connect(self.worker.initialize)
|
||||
|
||||
# send to GUI
|
||||
self.worker.target.connect(self.query_finished.emit)
|
||||
self._do_query.connect(self.worker.run_query)
|
||||
|
||||
self.worker.result.connect(self.query_finished.emit)
|
||||
self.worker.error.connect(self.query_error.emit)
|
||||
self.worker.status.connect(self.query_status.emit)
|
||||
|
||||
# cleanup
|
||||
self.worker.target.connect(self.new_thread.quit)
|
||||
self.worker.error.connect(self.new_thread.quit)
|
||||
self.worker.status.connect(self.new_thread.quit)
|
||||
self.new_thread.finished.connect(self.worker.deleteLater)
|
||||
self.new_thread.finished.connect(self.new_thread.deleteLater)
|
||||
self._thread.start()
|
||||
|
||||
@Slot(str)
|
||||
def query(self, prompt: str):
|
||||
self._do_query.emit(prompt) # start query via signal
|
||||
|
||||
def stop(self):
|
||||
self._thread.quit()
|
||||
self._thread.wait()
|
||||
self.worker.deleteLater()
|
||||
self._thread.deleteLater()
|
||||
@@ -10,9 +10,58 @@ from semeion.interfaces.qdrant.qdrant_client import QdrantInterface
|
||||
from PySide6.QtCore import QObject, Signal, Slot, QThread
|
||||
|
||||
class QdrantWorker(QObject):
|
||||
collections_discovered = Signal(object)
|
||||
result = Signal(object)
|
||||
error = Signal(str)
|
||||
status = Signal(str)
|
||||
|
||||
def __init__(self, qdrant_client: QdrantInterface):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.qdrant_client = qdrant_client
|
||||
self.client = None
|
||||
|
||||
@Slot()
|
||||
def initialize(self):
|
||||
self.client = QdrantInterface()
|
||||
self.status.emit("Qdrant: ready")
|
||||
|
||||
@Slot()
|
||||
def get_collections(self):
|
||||
try:
|
||||
if self.client is None:
|
||||
self.error.emit("Qdrant: Failed to establish connection to QDrant")
|
||||
return
|
||||
self.status.emit("Qdrant: fetching collections")
|
||||
collections = self.client.get_collections()
|
||||
self.status.emit("Qdrant: Successfully fetched collections")
|
||||
self.result.emit(collections)
|
||||
except Exception as e:
|
||||
self.error.emit(str(e))
|
||||
|
||||
|
||||
# Service = manages Worker + QThread, UI connects to this
|
||||
class QdrantService(QObject):
|
||||
query_finished = Signal(object)
|
||||
query_error = Signal(str)
|
||||
query_status = Signal(str)
|
||||
|
||||
_do_discover = Signal()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._thread = QThread()
|
||||
self.worker =QdrantWorker()
|
||||
|
||||
self.worker.moveToThread(self._thread)
|
||||
|
||||
self._thread.started.connect(self.worker.initialize)
|
||||
|
||||
self._do_discover.connect(self.worker.get_collections)
|
||||
|
||||
self.worker.result.connect(self.query_finished.emit)
|
||||
self.worker.error.connect(self.query_error.emit)
|
||||
self.worker.status.connect(self.query_status.emit)
|
||||
|
||||
self._thread.start()
|
||||
|
||||
@Slot()
|
||||
def discover_collections(self):
|
||||
self._do_discover.emit()
|
||||
@@ -9,7 +9,7 @@ from PySide6.QtWidgets import QWidget, QVBoxLayout, QLabel, QPushButton, QLineEd
|
||||
from PySide6.QtGui import QAction
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from . import styling
|
||||
from semeion.services import LLMQueryService, QDrantService
|
||||
from semeion.services import LLMService, QdrantService
|
||||
|
||||
|
||||
class MainWindow(QWidget):
|
||||
@@ -58,6 +58,7 @@ class MainWindow(QWidget):
|
||||
self.searchInput.setFixedWidth(400)
|
||||
self.searchInput.setMinimumHeight(30)
|
||||
self.searchInput.setStyleSheet(styling.input_style)
|
||||
self.searchInput.returnPressed.connect(self.execute_from_input)
|
||||
self.main_layout.addWidget(self.searchInput, alignment=Qt.AlignmentFlag.AlignCenter)
|
||||
self.main_layout.addSpacing(10)
|
||||
|
||||
@@ -70,20 +71,28 @@ class MainWindow(QWidget):
|
||||
self.main_layout.addStretch(1)
|
||||
|
||||
def create_services(self):
|
||||
self.llm_service = LLMQueryService()
|
||||
self.qdrant_service = QDrantService()
|
||||
self.llm_service = LLMService()
|
||||
self.qdrant_service = QdrantService()
|
||||
|
||||
print("MAIN: SETTING UP LLM SERVICE")
|
||||
self.main_exec_start.connect(self.llm_service.query_llm_simple)
|
||||
# Button -> executes LLM Service
|
||||
self.main_exec_start.connect(self.llm_service.query)
|
||||
# response handling
|
||||
self.llm_service.query_finished.connect(self.handle_llm_response)
|
||||
|
||||
# connect qdrant signals
|
||||
self.qdrant_service.query_finished.connect(self.handle_qdrant_response)
|
||||
# Qdrant: execute collection discovery on startup
|
||||
self.qdrant_service.discover_collections()
|
||||
|
||||
def execute_from_input(self) -> str|None:
|
||||
query = self.searchInput.text().strip()
|
||||
if not query:
|
||||
return
|
||||
# TODO: implement logic
|
||||
self.main_exec_start.emit(query)
|
||||
print("MAIN: LLM QUERY SUBMIT:", query)
|
||||
|
||||
def handle_llm_response(self, response):
|
||||
print("MAIN: LLM QUERY RECEIVED:", response)
|
||||
print("MAIN: LLM RESPONSE:", response)
|
||||
|
||||
def handle_qdrant_response(self, response):
|
||||
print("MAIN: QDRANT COLLECTIONS:", response)
|
||||
Reference in New Issue
Block a user