mirror of
https://github.com/CodePhiliaX/Chat2DB.git
synced 2025-07-30 11:12:55 +08:00
feature: modify dify chat
This commit is contained in:
@ -274,7 +274,8 @@ public class ChatController {
|
||||
buildSseEmitter(sseEmitter, uid);
|
||||
|
||||
DifyChatAIEventSourceListener eventSourceListener = new DifyChatAIEventSourceListener(sseEmitter);
|
||||
DifyChatAIClient.getInstance().streamCompletions(messages, eventSourceListener);
|
||||
String conversationId = uid + "-" + queryRequest.getDataSourceId();
|
||||
DifyChatAIClient.getInstance().streamCompletions(messages, eventSourceListener, uid, conversationId);
|
||||
LocalCache.CACHE.put(uid, JSONUtil.toJsonStr(messages), LocalCache.TIMEOUT);
|
||||
return sseEmitter;
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
package ai.chat2db.server.web.api.controller.ai.chat2db.model;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
public class DifyChatCompletionsOptions {
|
||||
/**
|
||||
* (选填)以键值对方式提供用户输入字段,与提示词编排中的变量对应。Key 为变量名称,Value 是参数值。
|
||||
* 如果字段类型为 Select,传入的 Value 需为预设选项之一。
|
||||
*/
|
||||
private Map<String, String> inputs;
|
||||
|
||||
/**
|
||||
* 用户输入/提问内容
|
||||
*/
|
||||
private String query;
|
||||
|
||||
/**
|
||||
* blocking 阻塞型,等待执行完毕后返回结果。(请求若流程较长可能会被中断)
|
||||
* streaming 流式返回。基于 SSE(Server-Sent Events)实现流式返回。
|
||||
*/
|
||||
private String response_mode = "blocking";
|
||||
|
||||
/**
|
||||
* (必填)‼️ 会话标识符,首次对话为 conversation_id: "" ‼️,如果要继续对话请传入上下文返回的 conversation_id
|
||||
*/
|
||||
private String conversation_id;
|
||||
|
||||
/**
|
||||
* 用户标识,由开发者定义规则,需保证用户标识在应用内唯一。
|
||||
*/
|
||||
private String user;
|
||||
|
||||
|
||||
}
|
@ -70,7 +70,7 @@ public class DifyChatAIClient {
|
||||
}
|
||||
|
||||
log.info("refresh dify chat apiHost:{} apikey:{}", apiHost, maskApiKey(apikey));
|
||||
//DIFY_CHAT_STREAM_CLIENT = DifyChatAiStreamClient.builder().apiHost(apiHost).apiKey(apikey).build();
|
||||
DIFY_CHAT_STREAM_CLIENT = DifyChatAiStreamClient.builder().apiHost(apiHost).apiKey(apikey).build();
|
||||
|
||||
}
|
||||
|
||||
|
@ -1,11 +1,30 @@
|
||||
package ai.chat2db.server.web.api.controller.ai.dify.client;
|
||||
|
||||
import ai.chat2db.server.tools.common.exception.ParamBusinessException;
|
||||
import ai.chat2db.server.web.api.controller.ai.azure.interceptor.AzureHeaderAuthorizationInterceptor;
|
||||
import ai.chat2db.server.web.api.controller.ai.chat2db.model.DifyChatCompletionsOptions;
|
||||
import ai.chat2db.server.web.api.controller.ai.dify.listener.DifyChatAIEventSourceListener;
|
||||
import cn.hutool.http.ContentType;
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.unfbx.chatgpt.entity.chat.Message;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.RequestBody;
|
||||
import okhttp3.sse.EventSource;
|
||||
import okhttp3.sse.EventSources;
|
||||
import org.apache.commons.collections4.CollectionUtils;
|
||||
import org.apache.poi.hpsf.GUID;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Slf4j
|
||||
|
||||
public class DifyChatAiStreamClient {
|
||||
|
||||
@ -23,8 +42,121 @@ public class DifyChatAiStreamClient {
|
||||
@NotNull
|
||||
private String apiHost;
|
||||
|
||||
@Getter
|
||||
private OkHttpClient okHttpClient;
|
||||
|
||||
public void streamCompletions(List<Message> messages, DifyChatAIEventSourceListener eventSourceListener) {
|
||||
private DifyChatAiStreamClient(DifyChatAiStreamClient.Builder builder) {
|
||||
this.apiKey = builder.apiKey;
|
||||
this.apiHost = builder.apiHost;
|
||||
if (Objects.isNull(builder.okHttpClient)) {
|
||||
builder.okHttpClient = this.okHttpClient();
|
||||
}
|
||||
okHttpClient = builder.okHttpClient;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 构造
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public static DifyChatAiStreamClient.Builder builder() {
|
||||
return new DifyChatAiStreamClient.Builder();
|
||||
}
|
||||
|
||||
/**
|
||||
* okhttpclient
|
||||
*/
|
||||
private OkHttpClient okHttpClient() {
|
||||
OkHttpClient okHttpClient = new OkHttpClient
|
||||
.Builder()
|
||||
.addInterceptor(new AzureHeaderAuthorizationInterceptor(this.apiKey))
|
||||
.connectTimeout(10, TimeUnit.SECONDS)
|
||||
.writeTimeout(50, TimeUnit.SECONDS)
|
||||
.readTimeout(50, TimeUnit.SECONDS)
|
||||
.build();
|
||||
return okHttpClient;
|
||||
}
|
||||
|
||||
public static final class Builder {
|
||||
private String apiKey;
|
||||
|
||||
private String apiHost;
|
||||
|
||||
/**
|
||||
* 自定义OkhttpClient
|
||||
*/
|
||||
private OkHttpClient okHttpClient;
|
||||
|
||||
public Builder() {
|
||||
}
|
||||
|
||||
public DifyChatAiStreamClient.Builder apiKey(String apiKeyValue) {
|
||||
this.apiKey = apiKeyValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param apiHost
|
||||
* @return
|
||||
*/
|
||||
public DifyChatAiStreamClient.Builder apiHost(String apiHost) {
|
||||
this.apiHost = apiHost;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
public DifyChatAiStreamClient.Builder okHttpClient(OkHttpClient val) {
|
||||
this.okHttpClient = val;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DifyChatAiStreamClient build() {
|
||||
return new DifyChatAiStreamClient(this);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void streamCompletions(List<Message> messages, DifyChatAIEventSourceListener eventSourceListener,
|
||||
String uid, String conversationId) {
|
||||
if (CollectionUtils.isEmpty(messages)) {
|
||||
log.error("param error:Dify Chat Prompt cannot be empty");
|
||||
throw new ParamBusinessException("prompt");
|
||||
}
|
||||
if (Objects.isNull(eventSourceListener)) {
|
||||
log.error("param error:DifyChatAIEventSourceListener cannot be empty");
|
||||
throw new ParamBusinessException();
|
||||
}
|
||||
String lastMessage = messages.get(messages.size() - 1).getContent();
|
||||
log.info("Dify Chat AI, uid:{} conversationId:{} prompt:{}", uid, conversationId, lastMessage);
|
||||
|
||||
try {
|
||||
DifyChatCompletionsOptions chatCompletionsOptions = new DifyChatCompletionsOptions();
|
||||
chatCompletionsOptions.setQuery(lastMessage);
|
||||
chatCompletionsOptions.setResponse_mode("streaming");
|
||||
chatCompletionsOptions.setConversation_id(conversationId);
|
||||
chatCompletionsOptions.setUser(uid);
|
||||
|
||||
EventSource.Factory factory = EventSources.createFactory(this.okHttpClient);
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
|
||||
String requestBody = mapper.writeValueAsString(chatCompletionsOptions);
|
||||
if (!apiHost.endsWith("/")) {
|
||||
apiHost = apiHost + "/";
|
||||
}
|
||||
String url = this.apiHost + "v1/chat-messages";
|
||||
Request request = new Request.Builder()
|
||||
.url(url)
|
||||
.post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
|
||||
.build();
|
||||
//创建事件
|
||||
EventSource eventSource = factory.newEventSource(request, eventSourceListener);
|
||||
log.info("finish invoking Dify Chat ai");
|
||||
} catch (Exception e) {
|
||||
log.error("Dify Chat error", e);
|
||||
eventSourceListener.onFailure(null, e, null);
|
||||
throw new ParamBusinessException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +1,23 @@
|
||||
package ai.chat2db.server.web.api.controller.ai.dify.listener;
|
||||
|
||||
import ai.chat2db.server.web.api.controller.ai.azure.model.AzureChatChoice;
|
||||
import ai.chat2db.server.web.api.controller.ai.azure.model.AzureChatCompletions;
|
||||
import ai.chat2db.server.web.api.controller.ai.azure.model.AzureChatMessage;
|
||||
import ai.chat2db.server.web.api.controller.ai.azure.model.AzureCompletionsUsage;
|
||||
import com.unfbx.chatgpt.entity.chat.Message;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import okhttp3.Response;
|
||||
import okhttp3.ResponseBody;
|
||||
import okhttp3.sse.EventSource;
|
||||
import okhttp3.sse.EventSourceListener;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
@Slf4j
|
||||
public class DifyChatAIEventSourceListener extends EventSourceListener {
|
||||
|
||||
@ -19,21 +29,67 @@ public class DifyChatAIEventSourceListener extends EventSourceListener {
|
||||
|
||||
@Override
|
||||
public void onClosed(@NotNull EventSource eventSource) {
|
||||
super.onClosed(eventSource);
|
||||
try {
|
||||
sseEmitter.send(SseEmitter.event()
|
||||
.id("[DONE]")
|
||||
.data("[DONE]"));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
sseEmitter.complete();
|
||||
log.info("DifyChatAI close sse connection...");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
|
||||
super.onEvent(eventSource, id, type, data);
|
||||
log.info("DifyChatAI:{}", data);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
|
||||
super.onFailure(eventSource, t, response);
|
||||
try {
|
||||
if (Objects.isNull(response)) {
|
||||
String message = t.getMessage();
|
||||
Message sseMessage = new Message();
|
||||
sseMessage.setContent(message);
|
||||
sseEmitter.send(SseEmitter.event()
|
||||
.id("[ERROR]")
|
||||
.data(sseMessage));
|
||||
sseEmitter.send(SseEmitter.event()
|
||||
.id("[DONE]")
|
||||
.data("[DONE]"));
|
||||
sseEmitter.complete();
|
||||
return;
|
||||
}
|
||||
ResponseBody body = response.body();
|
||||
String bodyString = Objects.nonNull(t) ? t.getMessage() : "";
|
||||
if (Objects.nonNull(body)) {
|
||||
bodyString = body.string();
|
||||
if (StringUtils.isBlank(bodyString) && Objects.nonNull(t)) {
|
||||
bodyString = t.getMessage();
|
||||
}
|
||||
log.error("DifyChatAI sse response:{}", bodyString);
|
||||
} else {
|
||||
log.error("DifyChatAI sse response:{},error:{}", response, t);
|
||||
}
|
||||
eventSource.cancel();
|
||||
Message message = new Message();
|
||||
message.setContent("DifyChatAI error:" + bodyString);
|
||||
sseEmitter.send(SseEmitter.event()
|
||||
.id("[ERROR]")
|
||||
.data(message));
|
||||
sseEmitter.send(SseEmitter.event()
|
||||
.id("[DONE]")
|
||||
.data("[DONE]"));
|
||||
sseEmitter.complete();
|
||||
} catch (Exception exception) {
|
||||
log.error("DifyChatAI 发送数据异常:", exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
|
||||
super.onOpen(eventSource, response);
|
||||
log.info("DifyChatAI 建立sse连接...");
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user