Budujemy „mózg” aplikacji
W Lab 4 zbudowaliśmy wyszukiwarkę semantyczną. Potrafimy znaleźć fragmenty tekstu pasujące do zapytania. Ale użytkownik nie chce listy fragmentów – użytkownik chce odpowiedzi.
Dzisiaj zamienimy naszą wyszukiwarkę w system RAG (Retrieval-Augumented Generation). Ale nie byle jaki. Zbudujemy pipeline trochę bardziej zaawansowany, wyposażony w Hybrid Search i Reranking. To dokładnie taka architektura, jakiej używa się w komercyjnych systemach produkcyjnych.
🎯 Cel na dziś
Złożymy kompletny potok przetwarzania danych:
- Hybrid Retrieval: Połączymy precyzję słów kluczowych (BM25) ze zrozumieniem kontekstu (Embeddings).
- Reranking: Użyjemy modelu Cross-Encoder, aby posortować wyniki lepiej niż robi to zwykła baza wektorowa.
- Generator (LLM): Nauczymy model odpowiadać wyłącznie na podstawie dostarczonych źródeł i cytować je (np.
[1]).
⚠️Uwaga: Reranking nie jest wymagany w ramach projektu zaliczeniowego.
🛠️ Przygotowanie
W tym laboratorium używasz tego samego kodu co w poprzednich laboratoriach (zajrzyj do Lab 1, Lab 2, Lab 3 i Lab 4). Uzupełnijmy go o kilka przydatnych w projekcie funkcji.
pip install pypdf
import glob, pandas as pd
from pypdf import PdfReader
def load_pdf(path):
rows=[]; r=PdfReader(path)
for i,p in enumerate(r.pages, start=1):
try: txt=p.extract_text() or ""
except Exception: txt=""
rows.append({"source":os.path.basename(path), "page":i, "text":txt})
return rows
def load_txt(path):
with open(path,"r",encoding="utf-8",errors="ignore") as f:
txt=f.read()
return [{"source":os.path.basename(path), "page":1, "text":txt}]
def load_md(path): return load_txt(path)
def load_corpus(data_dir="data"):
os.makedirs(data_dir, exist_ok=True)
rows=[]
for fp in glob.glob(os.path.join(data_dir,"*")):
l=fp.lower()
if l.endswith(".pdf"): rows+=load_pdf(fp)
elif l.endswith(".txt"): rows+=load_txt(fp)
elif l.endswith(".md"): rows+=load_md(fp)
if not rows:
rows=[{"source":"demo.md","page":1,"text":"RAG łączy retrieval z generacją."},
{"source":"demo.md","page":2,"text":"Embeddingi to wektory semantyczne; podobieństwo kosinusowe."}]
return pd.DataFrame(rows)
def make_chunks(df, chunk_chars=800, overlap=120):
rows=[]
for _,r in df.iterrows():
for k,(a,b,txt) in enumerate(simple_chunk(r["text"],chunk_chars,overlap)):
if txt.strip():
rows.append({"source":r["source"],"page":r["page"],"chunk_id":k,"start":a,"end":b,"chunk":txt.strip()})
return pd.DataFrame(rows)
docs_df=load_corpus('./data')
df=make_chunks(docs_df, 800, 120)
print("chunks:", len(chunks_df))
# Zaktualizujmy również naszą bazę wektorową:
embs=embed_texts(df["chunk"].tolist())
index=faiss.IndexFlatIP(embs.shape[1]); index.add(embs)
def retrieve_dense(query, k=5):
qv=embed_texts([query], batch_size=1)
scores, idxs=index.search(qv, k)
return [(float(scores[0][i]), df.iloc[idxs[0][i]].to_dict()) for i in range(k)]
Jeżeli w katalogu data nie znajdują się żadne pliki, załadowane zostaną domyślne, mock-owe wartości.
Architektura: lejek RAG
Nasze rozwiązanie przypomina lejek. Na każdym etapie zmniejszamy liczbę dokumentów, ale zwiększamy precyzję.
- Retriever (szybki, szeroki): Przeszukuje tysiące dokumentów w milisekundach. Używamy tu FAISS (wektory) i BM25 (słowa kluczowe). Pobieramy np. Top-50 wyników.
- Reranker (wolny, precyzyjny): „Sędzia”, który dokładnie czyta każdą parę Pytanie-Dokument i ocenia, czy faktycznie do siebie pasują. Zostawia nam Top-5 „perełek”.
- Generator (LLM): Dostaje te 5 fragmentów i pisze odpowiedź końcową.
Krok 1: Hybryda jest lepsza (RRF)
Dlaczego samo wyszukiwanie wektorowe to za mało? Bo wektory czasem gubią szczegóły (np. numery seryjne, rzadkie nazwiska). Z kolei BM25 nie rozumie synonimów. Rozwiązaniem jest RRF (Reciprocal Rank Fusion). To algorytm, który bierze ranking z FAISS i ranking z BM25, a następnie skleja je w jeden, sprawiedliwy wynik.
def rrf_fuse(dense_results, sparse_results, k=60):
scores = {}
# Dla każdego wyniku dodajemy punkty odwrotnie proporcjonalne do jego pozycji
for rank, (score, doc_id) in enumerate(dense_results):
scores[doc_id] += 1.0 / (k + rank)
for rank, (score, doc_id) in enumerate(sparse_results):
scores[doc_id] += 1.0 / (k + rank)
return sorted_by_score(scores)
Krok 2: Pakowanie kontekstu i cytowania
To jest moment, w którym „doklejamy” wiedzę do promptu. Musimy być sprytni:
- Limity: Nie możemy wrzucić 100 stron PDF-a, bo przekroczymy limit tokenów modelu.
- Różnorodność: Jeśli Top-5 wyników pochodzi z tej samej strony jednego dokumentu, nasza odpowiedź będzie płaska. Warto więc wprowadzić limit
max_per_source. - Cytowania: Model ma zakaz wymyślania. Mysi wykazywać źródła.
Takie podejście nazywamy Grounding (ugruntowaniem). Jeśli model nie znajdzie odpowiedzi w dostarczonych fragmentach, ma się przyznać do niewiedzy, zamiast halucynować.
Na potrzeby laboratorium załóżmy, że już mamy posortowane treści, uzyskane z obu retrieverów. Na potrzeby symulacji zachowania naszej aplikacji skorzystam z funkcji retrieve_dense w miejsce rrf_fuse.
SYSTEM_RULES=(
"You are a factual assistant. Answer ONLY using the provided context snippets. "
"If missing, reply 'Nie wiem – brak informacji w źródłach.' Include citations like [1],[2].")
def pack_context(hits, max_per_source=2, max_chars=2000):
per={}; ordered=[]
for _,rec in hits:
key=(rec["source"], rec["page"])
per.setdefault(key,0)
if per[key]<max_per_source:
ordered.append(rec); per[key]+=1
cites=[]; parts=[]
for i,rec in enumerate(ordered, start=1):
cites.append({"n":i,"source":rec["source"],"page":rec["page"],"chunk_id":rec["chunk_id"]})
parts.append(f"[{i}] {rec['chunk']}")
ctx="\n\n".join(parts)
return (ctx[:max_chars], cites)
def answer_with_api(question, hits):
ctx, cites=pack_context(hits)
prompt="Question: " + question + "\n\nContext:\n" + ctx + "\n\nAnswer in Polish with citations [n]."
return chat_once(prompt, system=SYSTEM_RULES, max_output_tokens=256, temperature=0.0), cites
print(answer_with_api('Jakie ery wyróżniamy?', retrieve_dense('RAG', k=4)))
print(answer_with_api('Co to jest RAG?', retrieve_dense('RAG', k=4)))
Krok 3: Reranking – jakość ponad wszystko
Zazwyczaj zanim przygotujemy kontekst do modelu językowego wykonujemy reranking. Baza wektorowa liczy podobieństwo „matematyczne” (iloczyn skalarny). Model Cross-Encoder działa inaczej: widzi pełne pytanie i pełną odpowiedź jednocześnie. Jest znacznie mądrzejszy, ale też wolniejszy, dlatego używamy go tylko do posortowania garstki kandydatów wybranych przez Retrievera.
W naszym laboratorium używamy modelu ms-marco-MiniLM-L6-v2. Jeśli nie uda załadować się modelu, kod automatycznie przełączy się na fallback (standardowe sortowanie wektorowe).
# Przykład Retrieval + Reranking
from typing import List, Tuple, Any
DOCS = [
{'id':'d1','text':'Embeddings are vector representations of text used for semantic search.'},
{'id':'d2','text':'BM25 is a bag-of-words retrieval algorithm based on term frequency.'},
{'id':'d3','text':'RAG combines retrieval and generation to ground LLM outputs.'},
{'id':'d4','text':'Cross-encoders score query+doc pairs with a deeper transformer for reranking.'},
{'id':'d5','text':'Embedding models like all-MiniLM produce compact vectors.'},
]
def tok(x): return re.findall(r"[a-ząćęłńóśźż0-9]+", x.lower())
texts = [d['text'] for d in DOCS]
embs = embedder.encode(texts, convert_to_numpy=True, normalize_embeddings=True).astype('float32')
idx = faiss.IndexFlatIP(embs.shape[1]); idx.add(embs)
bm25_corpus = [tok(t) for t in texts]
bm25 = BM25Okapi(bm25_corpus)
def retrieve_dense(query: str, k: int=5):
qv = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype('float32')
scores, ids = idx.search(qv, k)
return [(float(scores[0][i]), DOCS[ids[0][i]]) for i in range(min(k, len(ids[0])))]
# Cross-encoder (optional) - heavier; we will try import and fall back to semantic rerank
USE_CROSS_ENCODER = False
try:
from sentence_transformers import CrossEncoder
CROSS_ENCODER = CrossEncoder('cross-encoder/ms-marco-MiniLM-L6-v2')
USE_CROSS_ENCODER = True
except Exception:
CROSS_ENCODER = None
USE_CROSS_ENCODER = False
def rerank(query: str, candidates: List[Tuple[float, Dict[str, Any]]]) -> List[Tuple[float, Dict[str, Any]]]:
if USE_CROSS_ENCODER and CROSS_ENCODER is not None:
pairs = [(query, c[1]['text']) for c in candidates]
scores = CROSS_ENCODER.predict(pairs)
scored = [(float(s), c[1]) for s, c in zip(scores, candidates)]
scored.sort(key=lambda x: x[0], reverse=True)
return scored
# fallback: compute dot product between query embedding and chunk embedding (we have embeddings for docs in embs)
qv = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype('float32')[0]
out = []
for score, doc in candidates:
# find doc index
idx_doc = next((i for i, d in enumerate(DOCS) if d['id'] == doc['id']), None)
if idx_doc is None:
s = score
else:
s = float(np.dot(qv, embs[idx_doc]))
out.append((s, doc))
out.sort(key=lambda x: x[0], reverse=True)
return out
query = "what are embeddings used for?"
print('=== BM25 top-3 ===')
bm = bm25.get_scores(tok(query)); import numpy as np
bm_idx = np.argsort(bm)[::-1][:3]
for i in bm_idx:
print(texts[i])
print('\n=== Dense top-3 ===')
cand = retrieve_dense(query, k=3)
for s,d in cand:
print(s, d['text'])
print('\n=== Reranked ===')
rr = rerank(query, cand)
for s,d in rr:
print(s, d['text'])
