本文首发:从零到一:如何用阿里云百炼和火山引擎搭建专属 AI 助手(DeepSeek)?
阿里云百炼和火山引擎都推出了免费的 DeepSeek 模型体验额度,今天我和大家一起搭建一个本地的专属 AI 助手。
- 阿里云百炼为 DeepSeek-R1 与 DeepSeek-V3 模型分别提供 100 万 tokens 免费额度
🐳 https://www.aliyun.com/solution/tech-solution/deepseek-r1-for-platforms
- 火山引擎为 DeepSeek 在内的多种模型提供 50 万 tokens 免费额度
🐳 https://www.volcengine.com/product/ark
一、阿里云百炼部署 DeepSeek 模型
1、登录阿里云百炼
注册并登录阿里云百炼平台,进入模型广场即可查看 DeepSeek 模型。
2、创建 API Key
在阿里云百炼主页右上角的个人图标中,进入 API-KEY 页面并创建专属的 API Key。
3、调用 API 验证
参考《配置 API Key 到环境变量》文档,将 API Key 配置到环境变量中,并通过代码调用 API 进行验证。
🐳配置 API Key 到环境变量
https://help.aliyun.com/zh/model-studio/developer-reference/configure-api-key-through-environment-variables
以下是官方提供的代码示例,帮助您快速上手:
import os
from openai import OpenAI
client = OpenAI(
# 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx",
api_key=os.getenv("DASHSCOPE_API_KEY"), # 如何获取API Key:https://help.aliyun.com/zh/model-studio/developer-reference/get-api-key
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
completion = client.chat.completions.create(
model="deepseek-r1", # 此处以 deepseek-r1 为例,可按需更换模型名称。
messages=[
{'role': 'user', 'content': '9.9和9.11谁大'}
]
)
# 通过reasoning_content字段打印思考过程print("思考过程:")
print(completion.choices[0].message.reasoning_content)
# 通过content字段打印最终答案print("最终答案:")
print(completion.choices[0].message.content)
二、火山引擎部署 DeepSeek 模型
1、登录火山方舟
🐵
注册并登录火山方舟平台,可使用我的邀请码 9Z52V71T 可免费获得每个模型 50 万 tokens 的推理额度。
# 登录后免费赠送每个模型50万tokens推理额度。
https://www.volcengine.com/experience/ark?utm_term=202502dsinvite&ac=DSASUQY5&rc=9Z52V71T
登录后就能体验 DeepSeek-R1 等各种模型
2、创建 API Key
在“在线推理”页面找到“API 接入”,按照提示创建 API Key。
3、创建自定义推理接入点
根据官方教程,在“在线推理”页面选择“创建推理接入点”。
创建过程比较简单,填写名称并选择模型(如 DeepSeek-R1),比如我选择 DeepSeek-R1 模型,为了方便我自己识别,所以接入点名称我填写成“DeepSeek-R1”
在模型选择页,可以根据自己需求去选择模型
创建完成就可以得到接入点名称。注意:后续用到的 model_name 是火山平台自动生成(以“ep-”开头)。
4、调用 API 验证
设置 API Key 作为环境变量,其中"YOUR_API_KEY"需要替换为火山方舟上创建的 API Key
export ARK_API_KEY="YOUR_API_KEY"
官方示例参考
import os
from openai import OpenAI
client = OpenAI(
api_key = os.environ.get("ARK_API_KEY"),
base_url = "https://ark.cn-beijing.volces.com/api/v3",
)
# Non-streaming:
print("----- standard request -----")
completion = client.chat.completions.create(
model = "ep-", # your model endpoint ID
messages = [
{"role": "system", "content": "你是人工智能助手"},
{"role": "user", "content": "常见的十字花科植物有哪些?"},
],
)
print(completion.choices[0].message.content)
# Streaming:
print("----- streaming request -----")
stream = client.chat.completions.create(
model = "ep-", # your model endpoint ID
messages = [
{"role": "system", "content": "你是人工智能助手"},
{"role": "user", "content": "常见的十字花科植物有哪些?"},
],
stream=True
)
for chunk in stream:
if not chunk.choices:
continue
print(chunk.choices[0].delta.content, end="")
print()
三、搭建专属 AI 助手
无论是开发者还是普通用户,都可以通过以下方式快速搭建专属的 AI 助手:
1、用 streamlit 编写个应用
对于程序员,可以使用 Streamlit 编写一个简单的网页应用,实现本地化交互。
我已经编写了一个基础版,日常用用就足够了,代码参考
import streamlit as st
import uuid
import os
import hashlib
from openai import OpenAI, AuthenticationError, APIError
# 生成或获取用户特定的会话ID
if'user_session_id'notin st.session_state:
st.session_state.user_session_id = str(uuid.uuid4())
# 使用用户会话ID来获取或初始化用户特定的数据
defget_user_data():
if'user_data'notin st.session_state:
st.session_state.user_data = {}
if st.session_state.user_session_id notin st.session_state.user_data:
st.session_state.user_data[st.session_state.user_session_id] = {
'messages': [
{"role": "system", "content": "你是一个AI助手,请回答用户提出的问题。"}
],
'uploaded_files': [],
'api_key': 'sk-',
'base_url': 'https://dashscope.aliyuncs.com/compatible-mode/v1',
'model_name': 'deepseek-r1',
'past_sessions': []
}
return st.session_state.user_data[st.session_state.user_session_id]
# 更新用户数据的辅助函数
defupdate_user_data(key, value):
user_data = get_user_data()
user_data[key] = value
# 保存当前会话
defsave_current_session():
user_data = get_user_data()
iflen(user_data['messages']) > 1: # 只有当有实际对话时才保存
current_session = {
'id': st.session_state.user_session_id,
'messages': user_data['messages']
}
# 检查是否已存在相同ID的会话,如果存在则更新,不存在则插入
existing_session = next((session for session in user_data['past_sessions'] if session['id'] == current_session['id']), None)
if existing_session:
existing_session.update(current_session)
else:
user_data['past_sessions'].insert(0, current_session)
# 限制保存的会话数量,例如只保留最近的5个会话
user_data['past_sessions'] = user_data['past_sessions'][:5]
# 加载选定的会话
defload_session(session_id):
user_data = get_user_data()
for session in user_data['past_sessions']:
if session['id'] == session_id:
st.session_state.user_session_id = session_id
st.session_state.user_data[session_id] = {
'messages': session['messages'],
'uploaded_files': [],
'api_key': user_data['api_key'],
'base_url': user_data['base_url'],
'model_name': user_data['model_name'],
'past_sessions': user_data['past_sessions']
}
break
defsave_uploaded_files(upload_dir, uploaded_files):
"""保存上传的 txt 和 markdown 文件到临时目录并返回文件信息"""
user_data = get_user_data()
saved_files = []
current_files = [f["name"] for f in user_data['uploaded_files']]
for file in uploaded_files:
if file.name in current_files:
continue
ifnot file.name.lower().endswith(('.txt', '.md', '.markdown')):
st.warning(f"不支持的文件类型: {file.name}。请上传 .txt 或 .md 文件。")
continue
if file.size > 1 * 1024 * 1024: # 1MB限制
st.error(f"文件 {file.name} 超过大小限制(1MB)")
continue
try:
# 保存文件到指定目录
file_path = os.path.join(upload_dir, file.name)
withopen(file_path, "wb") as f:
f.write(file.getbuffer())
# 读取文件内容
withopen(file_path, "r", encoding='utf-8') as f:
content = f.read()
# 生成内容哈希值
content_hash = hashlib.md5(content.encode()).hexdigest()
# 检查重复内容
ifany(f["hash"] == content_hash for f in user_data['uploaded_files']):
st.info(f"文件 {file.name} 的内容与已上传的文件重复,已跳过。")
continue
saved_files.append({
"name": file.name,
"content": content,
"size": file.size,
"hash": content_hash
})
st.success(f"成功上传文件: {file.name}")
except Exception as e:
st.error(f"处理文件 {file.name} 时出错: {str(e)}")
continue
return saved_files
defformat_file_contents(files):
return"\n".join([f"=== {f['name']} ===\n{f['content']}\n"for f in files])
defget_active_api_config():
user_data = get_user_data()
return user_data['base_url'], user_data['api_key'], user_data['model_name']
defprocess_stream(stream):
"""合并处理思考阶段和响应阶段"""
thinking_content = ""
response_content = ""
# 在状态块外部预先创建响应占位符
response_placeholder = st.empty()
with st.status("思考中...", expanded=True) as status:
thinking_placeholder = st.empty()
thinking_phase = True# 思考阶段标记
for chunk in stream:
# 解析数据块
delta = chunk.choices[0].delta
reasoning = delta.reasoning_content ifhasattr(delta, 'reasoning_content') else""
content = delta.content ifhasattr(delta, 'content') else""
role = delta.role ifhasattr(delta, 'role') else""
# 处理思考阶段
if thinking_phase:
if reasoning:
thinking_content += reasoning
thinking_placeholder.markdown(f"思考过程:\n{thinking_content}")
# 检测思考阶段结束
if content:
status.update(label="思考完成", state="complete", expanded=False)
thinking_phase = False
response_placeholder.markdown("回答:\n▌") # 初始化响应光标
# 处理响应阶段(无论是否在思考阶段都收集内容)
if content:
response_content += content
ifnot thinking_phase:
response_placeholder.markdown(f"回答:\n{response_content}▌")
# 流结束后移除光标
response_placeholder.markdown(f"回答:\n{response_content}")
returnf"{thinking_content}{response_content}"
defdisplay_chat_history():
user_data = get_user_data()
for message in user_data['messages']:
with st.chat_message(message["role"]):
st.markdown(message["content"])
defhandle_user_input():
user_data = get_user_data()
base_url, api_key, model_name = get_active_api_config()
ifnot api_key or api_key == 'sk-':
st.error("请在侧边栏输入有效的 API Key。")
return
try:
client = OpenAI(api_key=api_key, base_url=base_url)
uploaded_files = st.file_uploader(
"上传文本文件(支持 .txt 和 .md)",
type=["txt", "md", "markdown"],
accept_multiple_files=True,
key="file_uploader"
)
if uploaded_files:
new_files = save_uploaded_files(dirs, uploaded_files)
user_data['uploaded_files'].extend(new_files)
user_content = []
if user_input := st.chat_input("请问我任何事!"):
user_content.append(user_input)
if user_data['uploaded_files']:
file_content = format_file_contents(user_data['uploaded_files'])
user_content.append("\n[上传文件内容]\n" + file_content)
user_data['uploaded_files'] = [] # 清空已处理的文件列表
full_content = "\n".join(user_content)
user_data['messages'].append({"role": "user", "content": full_content})
with st.chat_message("user"):
st.markdown(user_input)
with st.chat_message("assistant"):
try:
stream = client.chat.completions.create(
model=model_name,
messages=user_data['messages'],
stream=True
)
response = process_stream(stream)
user_data['messages'].append(
{"role": "assistant", "content": response}
)
except AuthenticationError:
st.error("API 认证失败。请检查您的 API Key 是否正确。")
except APIError as e:
st.error(f"API 错误: {str(e)}")
except Exception as e:
st.error(f"发生未知错误: {str(e)}")
except Exception as e:
st.error(f"设置 OpenAI 客户端时发生错误: {str(e)}")
defmain_interface():
st.title("AI 助手")
user_data = get_user_data()
with st.sidebar:
api_key = st.text_input("API Key", user_data['api_key'], type="password")
if api_key:
update_user_data('api_key', api_key)
else:
st.warning("请输入有效的 API Key")
# Base URL 选项
base_url_options = {
"DashScope": "https://dashscope.aliyuncs.com/compatible-mode/v1",
"ARK": "https://ark.cn-beijing.volces.com/api/v3",
"自定义": "custom"
}
selected_base_url = st.selectbox(
"选择 Base URL",
options=list(base_url_options.keys()),
index=list(base_url_options.keys()).index("DashScope") if user_data['base_url'] == base_url_options["DashScope"] else0
)
if selected_base_url == "自定义":
custom_base_url = st.text_input("自定义 Base URL", user_data['base_url'])
update_user_data('base_url', custom_base_url)
else:
update_user_data('base_url', base_url_options[selected_base_url])
# Model Name 选项
model_options = {
"deepseek-r1": "deepseek-r1",
"deepseek-v3": "deepseek-v3",
"自定义": "custom"
}
selected_model = st.selectbox(
"选择 Model",
options=list(model_options.keys()),
index=list(model_options.keys()).index("deepseek-r1") if user_data['model_name'] == "deepseek-r1"else0
)
if selected_model == "自定义":
custom_model = st.text_input("自定义 Model Name", user_data['model_name'])
update_user_data('model_name', custom_model)
else:
update_user_data('model_name', model_options[selected_model])
if st.button("🆕 新会话"):
save_current_session() # 保存当前会话
new_session_id = str(uuid.uuid4())
st.session_state.user_data[new_session_id] = {
'messages': [
{"role": "system", "content": "你是一个AI助手,请回答用户提出的问题。"}
],
'uploaded_files': [],
'api_key': user_data['api_key'], # 保留当前的 API Key
'base_url': user_data['base_url'], # 保留当前的 Base URL
'model_name': user_data['model_name'], # 保留当前的 Model Name
'past_sessions': user_data['past_sessions'] # 保留过去的会话记录
}
st.session_state.user_session_id = new_session_id
st.rerun()
# 显示过去的会话
st.write("过去的会话:")
for past_session in user_data['past_sessions']:
if st.button(f"加载会话 {past_session['id'][:8]}...", key=past_session['id']):
load_session(past_session['id'])
st.rerun()
display_chat_history()
handle_user_input()
defmain():
if'user_session_id'notin st.session_state:
st.session_state.user_session_id = str(uuid.uuid4())
main_interface()
if __name__ == "__main__":
dirs = 'uploads/'
ifnot os.path.exists(dirs):
os.makedirs(dirs)
main()
以下是启动服务的命令:
streamlit run chat_ui.py
通过网页界面,可以轻松填写 API Key 并选择平台与模型,如阿里云百炼或火山方舟。
比如我用 阿里百炼 验证交互过程
2、使用 Cherry Studio
如果您希望更便捷地使用 AI 助手,可以直接下载并安装开源的 Cherry Studio。
🐳https://cherry-ai.com/
安装好 Cherry Studio 之后,可配置需要接入的大模型应用后就可以本地化使用了,教程可参考 Cherry 官方文档。
往期阅读
手把手教你用 DeepSeek 和 Kimi,轻松搞定 PPT!
用 PyMuPDF 和 Pillow 打造 PDF 超级工具
基于 DeepSeek+AutoGen 的智能体协作系统
清华大学:普通人如何抓住 DeepSeek 红利?(65 页 PDF)
AI 时代,如何用 Python 脚本轻松搞定 PDF 需求?
DeepSeek 与 Ollama:本地运行 AI 模型的完美组合