一文简介 Bert 模型、向量表示、向量搜索、向量库、API 构建、API 接口、并行计算、RAG、Next.js、React.js、LangChain

郝鸿涛 / 2025-03-16

本文成品,一个简易 RAG 应用,网址为 https://htmovies.vercel.app/,检索需梯子

假设我们现在有一万个电影的文字描述,如何根据用户的搜索,推荐给用户最相关的几个电影?比如,用户输入「NASA 努力救一个困在火星的宇航员」,那我们肯定会导出《火星救援》。如何实现呢?

办法是,假设我们有一个神奇的工具,你可以把它想象成一张网。任何一串文字通过它之后,都会变成一个高维空间里的一个向量,也就是高维空间里的一个坐标。然后我们让这一万条电影的文字描述经过这张网,我们得到一万个坐标。用户输入搜索,我们让这个搜索也经过这张网,得到一个坐标。最后的结果,我们计算这个搜索对应的坐标与每一个电影坐标的余弦相似度 (Cosine Similarity),然后找到结果最大的几个,就是我们的搜索结果。

这里涉及到一个问题。如果我们把每串文字经过网后的结果看成是一个坐标,那么寻找相似的点,可以用欧几里得距离 。如果我们把每个结果看成是一个向量,那么需要用余弦相似度。那具体用什么方法呢?OpenAI Platform 给出的答案是:

We recommend cosine similarity. The choice of distance function typically doesn't matter much.

OpenAI embeddings are normalized to length 1, which means that:
    - Cosine similarity can be computed slightly faster using just a dot product
    - Cosine similarity and Euclidean distance will result in the identical rankings

也就是说,先把每一个点的坐标归一化,也就是确保每一个向量长度为 1,然后,不管我们用欧几里得距离还是余弦相似度,不影响搜索结果。为什么?我们来证明一下:

归一化之后,余弦相似度为:

$$\text{Cosine Similarity(A, B)} = \frac{\vec{A} \cdot \vec{B}}{|A||B|} = \sum_{i=1}^{n}a_i b_i$$

$$\begin{aligned} \text{Euclidean Distance(A, B)}^2 & = (a_1 - b_1)^2 + (a_2 - b_2)^2 + ... + (a_n - b_n)^2 \\ & = (a_1 + a_2 + ... a_n)^2 + (b_1 + b_2 + ... b_n)^2 - 2(a_1 b_1 + a_2 b_2 + ... + a_n b_n)\\ & = |A|^2 + |B|^2 - 2\sum_{i=1}^{n}a_i b_i \\ & = 2 - 2 \cdot \text{Cosine Similarity(A, B)} \end{aligned}$$

也就是说这两个是完全正相关的关系,余弦相似度越大,欧几里得距离也越大。所以两者虽然具体数值不同,但是排序的结果一样。那为什么 OpenAI 推荐用余弦相似度呢?因为它的运算更少。我们来看:

$$\text{Cosine Similarity(A, B)} = \sum_{i=1}^{n}a_i b_i$$

这里我们只需要 $n$ 次乘法和 $n-1$ 次加法。

$$\text{Euclidean Distance(A, B)} = \sqrt{\sum_{i=1}^n (a_i - b_i)^2}$$

我们需要:

虽然时间复杂度都是 $O(n)$,但欧几里得距离明显比余弦相似度运算量要大。

回到正题,这张神奇的网是「向量表示 (Embedding)」。比较知名的是 Bert,它比较大,我们这里用 Sentence Transformers。更神奇的是,如果我们用多语言的模型,比如 paraphrase-multilingual-mpnet-base-v2,那即使我们的训练数据是英文的,我们也可以用中文搜索。不过,多语言模型也比较大,我们这里不用。我们用比较小的 all-MiniLM-L6-v2

另外,我们用 FAISS (Facebook AI Similarity Search) 计算余弦相似度,因为这比较快。

关于 Embedding Models,请参考 Ollama 的这篇教程

原始数据 #

数据下载地址:https://hongtaoh.com/files/tmdb-movies.csv

最原始的数据来自于 Kaggle 上的 Full TMDB Movies Dataset 2024 (1M Movies) ,包含一百万条电影数据。我检查了一些关键条目,去掉了在这些条目上信息不全的电影,最后得到大约一万条电影数据。

import pandas as pd 
import numpy as np 
import uuid
Show Code
 

# 去掉了关键条目上信息不全的电影,最后得到大约一万条电影数据

# df = pd.read_csv('./data/tmdb_large.csv')
# corecols = ['id', 'title', 'vote_average', 'vote_count', 'release_date',
#        'revenue', 'runtime', 'backdrop_path', 'budget', 'homepage', 'original_language', 'overview',
#        'popularity', 'poster_path', 'tagline', 'genres',
#        'production_companies', 'production_countries', 'spoken_languages',
#        'keywords']
# df_clean = df.dropna(subset=corecols)
# df_clean.to_csv('../static/files/tmdb-movies.csv', index=False)
df = pd.read_csv('../static/files/tmdb-movies.csv')
df.shape
(10575, 24)
df.columns 

Index(['id', 'title', 'vote_average', 'vote_count', 'status', 'release_date', 'revenue', 'runtime', 'adult', 'backdrop_path', 'budget', 'homepage', 'imdb_id', 'original_language', 'original_title', 'overview', 'popularity', 'poster_path', 'tagline', 'genres', 'production_companies', 'production_countries', 'spoken_languages', 'keywords'], dtype='object')
数据很大,我只选取几个关键的列:

df[['id', 'title', 'overview', 'revenue', 'poster_path']].head(5)

idtitleoverviewrevenueposter_path
027205InceptionCobb, a skilled thief who commits corporate es...825532764/oYuLEt3zVCKq57qu2F8dT7NIa6f.jpg
1157336InterstellarThe adventures of a group of explorers who mak...701729206/gEU2QniE6E77NI6lCU6MxlNBvIx.jpg
2155The Dark KnightBatman raises the stakes in his war on crime. ...1004558444/qJ2tW6WMUDux911r6m7haRef0WH.jpg
319995AvatarIn the 22nd century, a paraplegic Marine is di...2923706026/kyeqWdyUXW608qlYkRqosgbbJyK.jpg
424428The AvengersWhen an unexpected enemy emerges and threatens...1518815515/RYMX2wcKCBAr24UyPD7xwmjaTn.jpg
movie_ids = df.id.tolist()
movie_overviews = df.overview.tolist()
# Create dictionaries
Id2Title = dict(zip(df.id, df.original_title))
Id2Overview = dict(zip(df.id, df.overview))
Id2PosterPath = dict(zip(df.id, df.poster_path))
Id2Revenue = dict(zip(df.id, df.revenue))

向量表示 #

from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import normalize
model = SentenceTransformer('all-MiniLM-L6-v2')
from typing import List, Dict, Tuple
import faiss 
def get_movie_embeddings(
    movie_ids:List[str], 
    movie_overviews:List[str], 
    model: SentenceTransformer,
    batch_size: int = 32, 
    ) -> Dict[str, np.ndarray]:
    """
    Generates normalized embeddings for movie overviews.

    Args:
        movie_ids: List of movie IDs.
        movie_overviews: List of movie overviews.
        model: Embedding model (e.g., SentenceTransformer).
        batch_size: Batch size for processing.

    Returns:
        Dictionary mapping movie IDs to normalized embeddings.
    """
    if len(movie_ids) != len(movie_overviews):
        raise ValueError("movie_ids and movie_overviews must have the same length.")

    all_embeddings = []
    for i in range(0, len(movie_ids), batch_size):
        batch_movies = movie_overviews[i:i+batch_size]
        try:
            batch_embeddings = model.encode(batch_movies)
            all_embeddings.extend(batch_embeddings)
        except Exception as e:
            print(f"Error encoding batch {i}: {e}")
            raise
    try:
        all_embeddings_np = np.array(all_embeddings) #added numpy array conversion.
        normalized_embeddings = normalize(all_embeddings_np, axis=1, norm='l2')
    except Exception as e:
        print(f"Error normalizing embeddings: {e}")
        raise
    # Create a hashmap; otherwise we cannot map from embeddings to distinct movies
    # starting from Python 3.7, dictionaries (dict) preserve insertion order by default.
    # Therefore, there is no need to sort
    movie_embeddings = dict(zip(movie_ids, normalized_embeddings))
    return movie_embeddings
movie_embeddings = get_movie_embeddings(movie_ids, movie_overviews, model)

搜索 #

def prepare_faiss_index(
    movie_embeddings: Dict[str, np.ndarray]) -> Tuple[faiss.Index, List[str]]:
    """
    Prepares a FAISS index from movie embeddings.
    
    Args:
        movie_embeddings: Dictionary of movie IDs to normalized embeddings.
    
    Returns:
        Tuple of (FAISS index, ordered list of movie IDs)
    """
    movie_ids = list(movie_embeddings.keys())
    embeddings = np.array([movie_embeddings[mid] for mid in movie_ids])

    #create and populate the index
    dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)
    index.add(embeddings)

    return index, movie_ids

def search_movies(
    faiss_index: faiss.Index,
    movie_ids: List[str],
    id2title_dict: Dict[int, str],
    user_query: str,
    model: SentenceTransformer,
    k: int = 10,
) -> Tuple[List[str], List[float]]:
    """
    Searches movies based on a user query using Faiss and cosine similarity.

    Args:
        faiss_index: Pre-built FAISS index
        movie_ids: Ordered list of movie IDs that corresponds to the index
        id2title_dict: Dict mapping from movie IDs to movie titles
        user_query: The user's search query.
        model: SentenceTransformer model.
        k: Number of results to return.

    Returns:
        Tuple, top K movie titles and their associated cosine similarity scores
    """
    #embed and normalize the user query
    query_embedding = model.encode([user_query])
    normalized_query_embedding = normalize(query_embedding, axis = 1, norm = 'l2')

    #search the index
    similarities, indices = faiss_index.search(normalized_query_embedding, k)

    #retrieve movie ids and similarity scores
    top_k_ids = [movie_ids[i] for i in indices[0]]
    top_k_titles = [id2title_dict[x] for x in top_k_ids]
    top_k_scores = similarities[0]
    return top_k_titles, top_k_scores
faiss_index, movie_ids = prepare_faiss_index(movie_embeddings)
user_query = "NASA tried to rescue an astronaut stranded on Mars."
results = search_movies(
    faiss_index,
    movie_ids,
    Id2Title,
    user_query,
    model
)
results 

(['Infinity 7', '2036 Origin Unknown', 'The Martian', 'Expedition Mars', 'Good Night Oppy', 'Infini', 'The Crystal Crypt', 'Life', 'Stowaway', 'Space Chimps'], array([0.5605497 , 0.5572939 , 0.55047965, 0.5243796 , 0.4685108 , 0.46683478, 0.46465227, 0.4630142 , 0.462315 , 0.45956635], dtype=float32))
结果正确。

走向云端 #

现在的问题是,一个不懂计算机的用户,需要搜索电影,怎么办?他不会下载数据,也不会运行上面的代码。这是大部分用户的现状。解决办法是把我们算好的 movie_embedding 存在云端。

我首先想到的一个办法是把我们的电影原始数据以及我们得到的向量表示 (Embedding) 储存在 MongoDB。然后,在一个 Web App (网页应用程序,你可以理解为一个网站) 上,把这个数据和向量表示下载下来,然后根据用户的搜索,给出最相似的电影推荐。这个办法可行,但是很慢。

更好的办法是使用专门用来做向量搜索的数据库。这些向量库有自己的储存和搜索方法,我们不需要用上面的 FAISS。只需要把所有的 embedding 存入。搜索的时候,给一个向量,这些向量库会进行优化搜素,快速给出结果。

Qdrant #

我先试了一下 Qdrant

免费版暂时够用了。

首先需要运行:

pip install qdrant-client
import os 
# `pip install python-dotenv` first
from dotenv import load_dotenv
load_dotenv()
url = os.getenv("QDRANT_MOVIE")
api_key = os.getenv("QDRANT_API_KEY")

from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance
import uuid

qdrant_client = QdrantClient(
    url=url,
    api_key=api_key
)
try:
    qdrant_client.delete_collection(collection_name='movies')
    print("Deleted existing collection")
except Exception as e:
    print(f"Collection might not exist yet: {e}")

# Create the collection first
vector_size = len(next(iter(movie_embeddings.values())))
qdrant_client.create_collection(
    collection_name="movies",
    vectors_config=VectorParams(
        size=vector_size,
        distance=Distance.COSINE
    )
)

# Qdrant requires ids to be strs, not int
def int2uuid(i:int):
    return str(uuid.uuid5(uuid.NAMESPACE_DNS, str(i)))

batch_size = 1_000 
for i in range(0, len(movie_ids), batch_size):
    batch_ids = movie_ids[i:i+batch_size]
    batch_vectors = []
    for movie_id in batch_ids:
        batch_vectors.append({
        'id': movie_id,
        'vector': movie_embeddings[movie_id].tolist(),
        'payload': {
            'title': Id2Title.get(movie_id, ''),
            'overview': Id2Overview.get(movie_id, ""),
            'poster_path': Id2PosterPath.get(movie_id, ""),
            'revenue': Id2Revenue.get(movie_id, "")
        }
    })
    qdrant_client.upsert(
        collection_name = 'movies',
        points = batch_vectors
    )
    print(f"Uploaded batch {i//batch_size + 1}/{(len(movie_ids)-1)//batch_size + 1}")

Deleted existing collection Uploaded batch 1/11 Uploaded batch 2/11 Uploaded batch 3/11 Uploaded batch 4/11 Uploaded batch 5/11 Uploaded batch 6/11 Uploaded batch 7/11 Uploaded batch 8/11 Uploaded batch 9/11 Uploaded batch 10/11 Uploaded batch 11/11
这里上传的时候我们为什么用到批量处理?

首先,为什么我们不一个一个上传?

那为什么不一次把一万个上传上去,而是分成 11 个批次?

def search_qdrant(
    query_text:str, model:SentenceTransformer, top_k:int = 5):
    query_embedding = model.encode([query_text])
    normalized_query = normalize(query_embedding, axis = 1, norm='l2')

    results = qdrant_client.query_points(
        collection_name = 'movies',
        query=normalized_query[0].tolist(),
        limit=top_k
    )
    movies = []
    for result in results.points:
        movies.append({
            'title': result.payload['title'],
            'overview': result.payload['overview'],
            'similarity_score': result.score
        })
    return movies 

更多的 Qdrant Search 请参考官方文档:https://qdrant.tech/documentation/concepts/search/

user_query = "NASA tried to rescue an astronaut stranded on Mars."
search_qdrant(query_text=user_query, model=model)
[{'title': 'Infinity 7', 'overview': 'A lone astronaut’s spacecraft malfunctions, threatening to leave him stranded in orbit forever.', 'similarity_score': 0.5605497}, {'title': '2036 Origin Unknown', 'overview': "After the first manned mission to Mars ends in a deadly crash, mission controller Mackenzie 'Mack' Wilson assists an artificial intelligence system, A.R.T.I. Their investigation uncovers a mysterious object under the surface of Mars that could change the future of our planet as we know it.", 'similarity_score': 0.55729395}, {'title': 'The Martian', 'overview': 'During a manned mission to Mars, Astronaut Mark Watney is presumed dead after a fierce storm and left behind by his crew. But Watney has survived and finds himself stranded and alone on the hostile planet. With only meager supplies, he must draw upon his ingenuity, wit and spirit to subsist and find a way to signal to Earth that he is alive.', 'similarity_score': 0.55047965}, {'title': 'Expedition Mars', 'overview': "Expedition Mars brings to life one of the greatest sagas of the Space Age, the epic adventures of Spirit and Opportunity, the rovers that saved NASA's Mars program after a string of failures in the 1990's.", 'similarity_score': 0.5243797}, {'title': 'Good Night Oppy', 'overview': 'The inspirational true story of Opportunity, a rover that was sent to Mars for a 90-day mission but ended up surviving for 15 years. Follow Opportunity’s groundbreaking journey on Mars and the remarkable bond forged between a robot and her humans millions of miles away.', 'similarity_score': 0.4685108}]

Pinecone #

也可以用 Pinecone 。首先,

pip install pinecone

from pinecone import Pinecone, ServerlessSpec

pinecone_api_key = api_key = os.getenv("PINECONE_API_KEY")
pc = Pinecone(api_key=pinecone_api_key)

index_name = "movies"

# Get the list of index names from the dictionaries
existing_indexes = [index["name"] for index in pc.list_indexes()]

if index_name in existing_indexes:
    print(f"Deleting existing index: {index_name}!")
    pc.delete_index(index_name)
else:
    print(f"Index '{index_name}' does not exist.")

vector_size = len(next(iter(movie_embeddings.values())))

pc.create_index(
    name=index_name,
    dimension=vector_size, # Replace with your model dimensions
    metric="cosine", # Replace with your model metric
    spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1"
    ) 
)

# Get the index object
index = pc.Index(index_name)

batch_size = 1_000
for i in range(0, len(movie_ids), batch_size):
    batch_ids = movie_ids[i:i+batch_size]
    batch_vectors = []
    for movie_id in batch_ids:
        batch_vectors.append({
            'id': str(movie_id),
            'values': movie_embeddings[movie_id].tolist(),
            'metadata':{
                'title': Id2Title.get(movie_id, ''),
                'overview': Id2Overview.get(movie_id, ""),
                'poster_path': Id2PosterPath.get(movie_id, ""),
                'revenue': Id2Revenue.get(movie_id, "")
            }
        })
    index.upsert(vectors = batch_vectors)
    print(f"Uploaded batch {i//batch_size + 1}/{(len(movie_ids)-1)//batch_size + 1}")

Deleting existing index: movies! Uploaded batch 1/11 Uploaded batch 2/11 Uploaded batch 3/11 Uploaded batch 4/11 Uploaded batch 5/11 Uploaded batch 6/11 Uploaded batch 7/11 Uploaded batch 8/11 Uploaded batch 9/11 Uploaded batch 10/11 Uploaded batch 11/11
这里我的直观感受是 Qdrant 的上传速度比 Pinecone 快一倍。

def search_pinecone(
    query_text:str, 
    model:SentenceTransformer, 
    top_k:int = 5):
    query_embedding = model.encode([query_text])
    normalized_query = normalize(query_embedding, axis = 1, norm='l2')

    results = index.query(
        vector=normalized_query[0].tolist(),
        top_k=top_k,
        include_metadata=True
    )

    # Format results
    movies = []
    for match in results['matches']:
        movies.append({
            "title": match['metadata']['title'],
            "overview": match['metadata']['overview'],
            "similarity_score": match['score']
        })
    
    return movies
user_query = "NASA tried to rescue an astronaut stranded on Mars."
search_pinecone(query_text=user_query, model=model)

[{'title': 'Infinity 7', 'overview': 'A lone astronaut’s spacecraft malfunctions, threatening to leave him stranded in orbit forever.', 'similarity_score': 0.560549676}, {'title': '2036 Origin Unknown', 'overview': "After the first manned mission to Mars ends in a deadly crash, mission controller Mackenzie 'Mack' Wilson assists an artificial intelligence system, A.R.T.I. Their investigation uncovers a mysterious object under the surface of Mars that could change the future of our planet as we know it.", 'similarity_score': 0.557293892}, {'title': 'The Martian', 'overview': 'During a manned mission to Mars, Astronaut Mark Watney is presumed dead after a fierce storm and left behind by his crew. But Watney has survived and finds himself stranded and alone on the hostile planet. With only meager supplies, he must draw upon his ingenuity, wit and spirit to subsist and find a way to signal to Earth that he is alive.', 'similarity_score': 0.55047965}, {'title': 'Expedition Mars', 'overview': "Expedition Mars brings to life one of the greatest sagas of the Space Age, the epic adventures of Spirit and Opportunity, the rovers that saved NASA's Mars program after a string of failures in the 1990's.", 'similarity_score': 0.524379611}, {'title': 'Good Night Oppy', 'overview': 'The inspirational true story of Opportunity, a rover that was sent to Mars for a 90-day mission but ended up surviving for 15 years. Follow Opportunity’s groundbreaking journey on Mars and the remarkable bond forged between a robot and her humans millions of miles away.', 'similarity_score': 0.468510807}]
结果和上面一样。

其他选择 #

构建 API #

现在的问题是,我们依然是用代码才能做这些事情。而真正的用户大部分是不懂代码的。我们如何让他们直接输入搜索文本,然后给他们返回结果?这就需要用到 API。也就是说,把搜索文本向量化、搜索、返回搜索结果,都在网络上完成。

比如,打开 https://htmovies.vercel.app/api/search?q=%22NASA%22&k=5 。你会看到 JSON 结果。这就是 API 的作用。

我用 FastAPI 做了一个 API。代码如下:

Show Code


from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
from sentence_transformers import SentenceTransformer
from sklearn.preprocessing import normalize
from qdrant_client import QdrantClient
import os
from dotenv import load_dotenv
import numpy as np

# Load environment variables
load_dotenv()

# Initialize FastAPI app
app = FastAPI(
    title="Movie Search API",
    description="Search for movies using semantic similarity",
    version="1.0.0"
)

# Add CORS middleware to allow cross-origin requests (important for web clients)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

# Initialize Qdrant client
qdrant_url = os.getenv("QDRANT_MOVIE")
qdrant_api_key = os.getenv("QDRANT_API_KEY")

qdrant_client = QdrantClient(
    url=qdrant_url,
    api_key=qdrant_api_key
)

# Initialize the SentenceTransformer model (load it only once at startup)
model = SentenceTransformer('all-MiniLM-L6-v2')

# Define response model for better documentation and type checking
class MovieSearchResult(BaseModel):
    title: str
    overview: str
    imdb_id: str
    similarity_score: float

class SearchResponse(BaseModel):
    movies: List[MovieSearchResult]
    query: str

@app.get("/")
async def root():
    return {
        "message": "Welcome to Movie Search API!",
        "docs": "/docs",
        "usage": "Send GET requests to /search?query=your search text"
    }

@app.get("/search", response_model=SearchResponse)
async def search_movies(
    query: str = Query(..., description="The search query to find similar movies"),
    top_k: int = Query(5, ge=1, le=50, description="Number of results to return")
):
    try:
        # Encode the query text
        query_embedding = model.encode([query])
        normalized_query = normalize(query_embedding, axis=1, norm='l2')
        
        # Search in Qdrant
        results = qdrant_client.search(
            collection_name='movies',
            query_vector=normalized_query[0].tolist(),
            limit=top_k
        )
        
        # Format the results
        movies = []
        for result in results:
            movies.append(MovieSearchResult(
                title=result.payload['title'],
                overview=result.payload['overview'],
                imdb_id=result.payload['imdb_id'],
                similarity_score=result.score
            ))
        
        return SearchResponse(
            movies=movies,
            query=query
        )
    
    except Exception as e:
        # Log the error (you might want to use a proper logging system)
        print(f"Error during search: {e}")
        raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}")

@app.get("/health")
async def health_check():
    """Health check endpoint to verify the service is running"""
    return {"status": "healthy"}

在本地运行没问题:

本地运行 FastAPI

本地运行 FastAPI

但是部署到 Vercel 没成功:

Vercel 部署 FastAPI 失败

这是因为我们需要的包太多了

fastapi[standard]
sentence_transformers
qdrant_client
python-dotenv==1.0.0
scikit-learn==1.3.2

超过了 Vercel 允许的大小限制。

最好的办法是找一个适合 ML/LLM 项目的部署平台。我试过免费的 render.com 但是太慢了,因为有休眠限制。

第一个解决办法是用封装好的 embedding,比如 DeepSeek OpenAI Gemini 等,这样就不需要 sentence_transformer 这么大的包。按道理就可以部署到 Vercel。

第二个解决办法是使用专业的 API 部署平台,比如 Google Cloud Platform, Azure, AWS 或者小公司,比如 fly.io, railway.app, heroku.com 等。

选择大语言模型 #

具体用哪条路需要结合具体的需求。

如果使用第一种办法,也就是用大公司提供的 embedding 接口,好处是这意味着我都不需要有后端 (backend)。为什么?因为原始数据的存储和向量搜索都在向量库进行,那我用前端直接处理用户搜索文本,然后导出结果,只需要和 embedding 接口交流就好,根本不需要自己部署后端。

第二种办法的好处是,如果是一个大型项目,需要用户注册、登陆,那一般情况下就需要自己部署 API 了。那既然自己都部署了,何不一步到位,用 Python 进行向量搜索?

我这里暂时用第一种办法。理由有两个:1)我想试一下 DeepSeek 的 embedding;2)我暂时不需要用户注册、登陆。

开工!

我首先想到去 DeepSeek Platform 申请 API Key,但是发现 DeepSeek 暂未支持 embedding model

我目前想到的有以下几个:

咱们来试一下 Google 吧。

import google.generativeai as genai
import os

GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

genai.configure(api_key=GEMINI_API_KEY)
Show Code


#测试embedding
embedding_model = genai.embed_content(model="models/gemini-embedding-exp-03-07", content="test")

print(f"gemini-embedding-exp-03-07 的 output 维度为 {len(embedding_model['embedding'])}")

embedding_model = genai.embed_content(model="models/text-embedding-004", content="Where is France?")

print(f"text-embedding-004 的 output 维度为 {len(embedding_model['embedding'])}")

gemini-embedding-exp-03-07 的 output 维度为 3072 text-embedding-004 的 output 维度为 768
虽然 gemini-embedding-exp-03-07 效能好,但是它太大了啊,弃用之。使用 text-embedding-004 模型。

另外,我好奇 text-embedding-004 的结果是否已经归一化:

norm = np.linalg.norm(embedding_model['embedding'])

print(f"L2 范数:{norm}")
if np.isclose(norm, 1):
    print("向量已进行 L2 归一化")
else:
    print("向量未进行 L2 归一化")

L2 范数:0.9999994075498208 向量已进行 L2 归一化
既然它已经进行归一化,我们就不用操作了,直接用它的结果就好。

def get_gemini_embeddings(texts:List[str]) -> np.ndarray:
    embeddings = []
    for text in texts:
        response = genai.embed_content(
            model="models/text-embedding-004", 
            content=text)
        embeddings.append(response['embedding'])
    return np.array(embeddings)

然后我们直接用上面的代码,只需要加入 get_gemini_embeddings 即可:

try:
    qdrant_client.delete_collection(collection_name='movies_genai')
    print("Deleted existing collection")
except Exception as e:
    print(f"Collection might not exist yet: {e}")

# Create the collection first
embeddings = get_gemini_embeddings([movie_overviews[0]])
vector_size = len(embeddings[0])
qdrant_client.create_collection(
    collection_name="movies_genai",
    vectors_config=VectorParams(
        size=vector_size,
        distance=Distance.COSINE
    )
)

Deleted existing collection True
我一开始的代码在这里:

Show Code
 

# batch_size = 100 
# total_batches = (len(movie_ids) - 1)//batch_size + 1
# for i in tqdm(
#     range(0, len(movie_ids), batch_size), 
#     total=total_batches,
#     desc="Uploading batches"
# ):
#     batch_ids = movie_ids[i:i+batch_size]
#     batch_overviews = movie_overviews[i:i + batch_size]
#     try:
#         batch_embeddings = get_gemini_embeddings(batch_overviews)

#         points = []
#         for j, movie_id in enumerate(batch_ids):
#             # Generate a UUID for each movie ID
#             point_id = str(uuid.uuid4()) 
#             points.append({
#                 'id': point_id,
#                 'vector': batch_embeddings[j].tolist(),
#                 'payload': {
#                     'title': Id2Title.get(movie_id, ''),
#                     'overview': Id2Overview.get(movie_id, ""),
#                     'imdb_id': movie_id,
#                 }
#             })
#         qdrant_client.upsert(
#             collection_name = 'movies_gemini',
#             points = points
#         )
#         time.sleep(1)
#     except Exception as e:
#         print(f"Error uploading batch {i //batch_size + 1}: {e}")

问题是太慢了。下面我们用到多线程并行计算。具体来说,是 Task parallelism。教程可以参考我写的 A Super Brief Introduction to Parallel Computing

需要注意的是,谷歌的限制是每分钟 1500 个请求。不要超过。

from concurrent.futures import ThreadPoolExecutor
import threading 
from tqdm import tqdm 

def process_one_batch(batch_ids, batch_overviews):
    try:
        batch_embeddings = get_gemini_embeddings(batch_overviews)
        points = []
        for j, movie_id in enumerate(batch_ids):
            points.append({
                'id': movie_id,
                'vector': batch_embeddings[j].tolist(),
                'payload': {
                    'title': Id2Title.get(movie_id, ''),
                    'overview': Id2Overview.get(movie_id, ""),
                    'poster_path': Id2PosterPath.get(movie_id, ""),
                    'revenue': Id2Revenue.get(movie_id, "")
                }
            })
        qdrant_client.upsert(
            collection_name = 'movies_genai',
            points = points
        )
        return True 
    except Exception as e:
        print(f"Error uploading batch: {e}")
        return False 
# #|code-fold:true

# batch_size = 180
# all_batches = []
# for i in range(0, len(movie_ids), batch_size):
#     batch_ids = movie_ids[i:i+batch_size]
#     batch_overviews = movie_overviews[i:i + batch_size]
#     all_batches.append((batch_ids, batch_overviews))

# max_workers = 3 # 同时进行 3 个线程
# with ThreadPoolExecutor(max_workers=max_workers) as executor:
#     results = list(tqdm(
#         executor.map(lambda batch: process_one_batch(batch[0], batch[1]), all_batches),
#         total=len(all_batches),
#         desc="Uploading batches"
#     ))

# # 统计成功数量
# successful_batches = results.count(True)
# print(f"成功处理 {successful_batches}/{len(all_batches)} 批次")

Uploading batches: 100%|██████████| 59/59 [09:48<00:00, 9.98s/it] 成功处理 59/59 批次
我们看到用了 10 分钟处理完了。

接下来,我们来试一下搜索。需要重新写 search_qdrant 因为我们需要用 Google-Gemini 来给搜索文本一个向量表示,而不是之前的 sentence_transformer 模型。

def search_qdrant(
    query_text:str, top_k:int = 5):
    # 使用 Google-Gemini 而不是我们之前的 model
    query_embeddings = get_gemini_embeddings([query_text])

    results = qdrant_client.query_points(
        collection_name = 'movies_genai',
        query=query_embeddings[0].tolist(),
        limit=top_k
    )
    movies = []
    for result in results.points:
        movies.append({
            'title': result.payload['title'],
            'overview': result.payload['overview'],
            'similarity_score': result.score
        })
    return movies 
user_query = "NASA tried to rescue an astronaut stranded on Mars."
search_qdrant(query_text=user_query)

[{'title': 'Infinity 7', 'overview': 'A lone astronaut’s spacecraft malfunctions, threatening to leave him stranded in orbit forever.', 'similarity_score': 0.66411316}, {'title': 'The Martian', 'overview': 'During a manned mission to Mars, Astronaut Mark Watney is presumed dead after a fierce storm and left behind by his crew. But Watney has survived and finds himself stranded and alone on the hostile planet. With only meager supplies, he must draw upon his ingenuity, wit and spirit to subsist and find a way to signal to Earth that he is alive.', 'similarity_score': 0.6351051}, {'title': 'Stowaway', 'overview': 'A three-person crew on a mission to Mars faces an impossible choice when an unplanned passenger jeopardizes the lives of everyone on board.', 'similarity_score': 0.6208103}, {'title': 'Passage to Mars', 'overview': 'A NASA Arctic expedition designed to be the first Martian road trip on Earth becomes an epic two-year odyssey of human adventure and survival.', 'similarity_score': 0.57813585}, {'title': 'The Space Between Us', 'overview': 'A young man raised by scientists on Mars returns to Earth to find his father.', 'similarity_score': 0.5646328}]
结果和上面不一样。但都差不多。

我们来看一下用中文怎么样:

user_query = "NASA尝试营救一名滞留在火星上的宇航员。"
search_qdrant(query_text=user_query)

[{'title': 'Operation Avalanche', 'overview': 'In 1967, four undercover CIA agents were sent to NASA posing as a documentary film crew. What they discovered led to one of the biggest conspiracies in American history.', 'similarity_score': 0.5311955}, {'title': 'Apollo 18', 'overview': "Officially, Apollo 17 was the last manned mission to the moon. But a year later in 1973, three American astronauts were sent on a secret mission to the moon funded by the US Department of Defense. What you are about to see is the actual footage which the astronauts captured on that mission. While NASA denies its authenticity, others say it's the real reason we've never gone back to the moon.", 'similarity_score': 0.52906084}, {'title': 'Journey to Space', 'overview': 'A sweeping overview of humanity’s accomplishments in space, as well as our ongoing activities and future plans.', 'similarity_score': 0.5120001}, {'title': 'A Beautiful Planet', 'overview': 'A breathtaking portrait of Earth from space, providing a unique perspective and increased understanding of our planet and galaxy as never seen before. Made in cooperation with the National Aeronautics and Space Administration (NASA), the film features stunning footage of our magnificent blue planet — and the effects humanity has had on it over time — captured by the astronauts aboard the International Space Station (ISS).', 'similarity_score': 0.50156087}, {'title': 'Walden', 'overview': 'After returning from a year-long Moon mission, Cassie, a NASA botanist, finds herself in a remote cabin in the woods, where her estranged twin sister, Stella, a former NASA architect, has found a new life with climate activist Bryan. Old wounds resurface as the sisters attempt to pick up the pieces of the rivalry that broke them apart.', 'similarity_score': 0.49168825}]
结果很糟糕。说明 text-embedding-004 无法很好地支持多语言。

重新构建 API #

我先试着用 FastAPI 重新做了一下,这次省去了 sentence-transformers 这么大的包,改用 Google Gemini 的 API 接口。代码在 https://github.com/hongtaoh/TheMovieApp 。API 网址是 https://the-movie-app-gamma.vercel.app/ 。我们看到 API 部署成功,但是我试了一下,显示 Timedout,说明超过了 Vercel 允许的运算时间。毕竟是免费的。

我试了试 PythonAnywhere 和 Replit。前者实在是太远古,操作太复杂,我直接放弃。后者没有免费的部署选项。

结论就是,如果想使用 Python 作为后端,免费的部署服务几乎不存在。

构建 RAG #

RAG 听起来高大上,但其实我们上面已经完成了一大部分。在这里,我们如何用 RAG?

很简单。比如,一个用户搜索 “NASA tried to rescue an astronaut stranded on Mars.” 之后,我们可以返回结果,就像上面展示的那样。但是,我如果想用自然语言 (也就是人类语言) 解释为什么这些结果匹配用户的搜索,这就需要用到语言模型。这个过程就用到了 RAG。

具体怎么操作呢?很简单,就像你在 DeepSeek 上直接问:这是用户的搜索,“NASA tried to rescue an astronaut stranded on Mars.",这是结果,blablabla。请告诉我为什么结果匹配搜索。然后 DeepSeek 就会把答案给你。RAG 的作用是把这个过程整合起来、自动化。

为了演示具体怎么操作,在 DeepSeek + Claude 的帮助下,再加上我之前学过的一些零散的全栈知识,用 Next.js 做了一个很简单的网站应用:https://htmovies.vercel.app/ 。需要梯子。源代码在 https://github.com/hongtaoh/Movie_Recommendations_Easy

下面,我会一步一步介绍,这个是如何做出来的。即使你完全不懂 Next/React,大概也能看懂。

Next.js 的好处是,连后端的 API 接口都可以一起做好,

比如,你在浏览器输入

数据 #

第一步是把数据转换成 JSON 格式:

df[['id', 'overview', 'revenue', 'title', 'poster_path']].to_json(
        './data/movies.json', orient='records', indent=4)

安装 Node.js #

如果你还没下载,请到 https://nodejs.org/en 下载。

完成后,验证:

node -v
npm -v

cd 到自己的想要的路径,然后

新建 Next.js 项目 #

npx create-next-app@latest my-next-app

选择:

然后

cd my-next-app

接着在根目录创建 data 文件夹,然后把上面那个 json 文件放进去。

解释 #

如果你嫌麻烦,直接 git clone 我的源代码就好 https://github.com/hongtaoh/Movie_Recommendations_Easy。

我简单解释一下整个构架。

app/layout.js 这个主要负责整体的样式和目录。比如,The Movie App 和 AI Recommendations 这两个路径就是在这里创建的。之后如果需要用的 useContext 也最好在这里放 ContextProvider.

app/page.js 这是一进来后的主页面,展示了许多电影。这里面每一个电影,都是用 components/MovieCard.js 来制定样式的。

当我们选中其中任何一个电影后,就会进入具体电影的页面,这是由 app/movies/[id]/page.js 渲染的。

当我们点开 AI Recommendations,点击搜索后,具体的流程就是,我们先在 app/recommendation/page.js 中点击,然后 app/api/recommend/route.js 就会运行,连带着运行 lib/geminiEmbedding.jslib/qdrant.jslib/explain.js。然后结果返回 app/recommendation/page.js

上面提到的 API 接口,也都是在 app/api 完成的。

这里简单说一下 Langchain。在 lib/explain.js 中我用了 langchain 而不是具体的 Gemini 或者 DeepSeek 的接口。这是因为,这样的话,换语言模型比较简单。这也是 langchain 的作用之一。

#ML

最后一次修改于 2025-03-20