diff --git a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/ChatController.java b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/ChatController.java index 137be65e..6227f488 100644 --- a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/ChatController.java +++ b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/ChatController.java @@ -274,7 +274,8 @@ public class ChatController { buildSseEmitter(sseEmitter, uid); DifyChatAIEventSourceListener eventSourceListener = new DifyChatAIEventSourceListener(sseEmitter); - DifyChatAIClient.getInstance().streamCompletions(messages, eventSourceListener); + String conversationId = uid + "-" + queryRequest.getDataSourceId(); + DifyChatAIClient.getInstance().streamCompletions(messages, eventSourceListener, uid, conversationId); LocalCache.CACHE.put(uid, JSONUtil.toJsonStr(messages), LocalCache.TIMEOUT); return sseEmitter; } diff --git a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/chat2db/model/DifyChatCompletionsOptions.java b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/chat2db/model/DifyChatCompletionsOptions.java new file mode 100644 index 00000000..7c6b3576 --- /dev/null +++ b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/chat2db/model/DifyChatCompletionsOptions.java @@ -0,0 +1,37 @@ +package ai.chat2db.server.web.api.controller.ai.chat2db.model; + +import lombok.Data; + +import java.util.Map; + +@Data +public class DifyChatCompletionsOptions { + /** + * (选填)以键值对方式提供用户输入字段,与提示词编排中的变量对应。Key 为变量名称,Value 是参数值。 + * 如果字段类型为 Select,传入的 Value 需为预设选项之一。 + */ + private Map inputs; + + /** + * 用户输入/提问内容 + */ + private String query; + + /** + * blocking 阻塞型,等待执行完毕后返回结果。(请求若流程较长可能会被中断) + * streaming 流式返回。基于 SSE(Server-Sent Events)实现流式返回。 + */ + private String response_mode = "blocking"; + + /** + * (必填)‼️ 会话标识符,首次对话为 conversation_id: "" ‼️,如果要继续对话请传入上下文返回的 conversation_id + */ + private String conversation_id; + + /** + * 用户标识,由开发者定义规则,需保证用户标识在应用内唯一。 + */ + private String user; + + +} diff --git a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAIClient.java b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAIClient.java index b9b499d4..932d7a98 100644 --- a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAIClient.java +++ b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAIClient.java @@ -70,7 +70,7 @@ public class DifyChatAIClient { } log.info("refresh dify chat apiHost:{} apikey:{}", apiHost, maskApiKey(apikey)); - //DIFY_CHAT_STREAM_CLIENT = DifyChatAiStreamClient.builder().apiHost(apiHost).apiKey(apikey).build(); + DIFY_CHAT_STREAM_CLIENT = DifyChatAiStreamClient.builder().apiHost(apiHost).apiKey(apikey).build(); } diff --git a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAiStreamClient.java b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAiStreamClient.java index f816d523..264f88d9 100644 --- a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAiStreamClient.java +++ b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/client/DifyChatAiStreamClient.java @@ -1,11 +1,30 @@ package ai.chat2db.server.web.api.controller.ai.dify.client; +import ai.chat2db.server.tools.common.exception.ParamBusinessException; +import ai.chat2db.server.web.api.controller.ai.azure.interceptor.AzureHeaderAuthorizationInterceptor; +import ai.chat2db.server.web.api.controller.ai.chat2db.model.DifyChatCompletionsOptions; import ai.chat2db.server.web.api.controller.ai.dify.listener.DifyChatAIEventSourceListener; +import cn.hutool.http.ContentType; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.unfbx.chatgpt.entity.chat.Message; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.sse.EventSource; +import okhttp3.sse.EventSources; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.poi.hpsf.GUID; import org.jetbrains.annotations.NotNull; import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +@Slf4j public class DifyChatAiStreamClient { @@ -23,8 +42,121 @@ public class DifyChatAiStreamClient { @NotNull private String apiHost; + @Getter + private OkHttpClient okHttpClient; - public void streamCompletions(List messages, DifyChatAIEventSourceListener eventSourceListener) { + private DifyChatAiStreamClient(DifyChatAiStreamClient.Builder builder) { + this.apiKey = builder.apiKey; + this.apiHost = builder.apiHost; + if (Objects.isNull(builder.okHttpClient)) { + builder.okHttpClient = this.okHttpClient(); + } + okHttpClient = builder.okHttpClient; + } + + + /** + * 构造 + * + * @return + */ + public static DifyChatAiStreamClient.Builder builder() { + return new DifyChatAiStreamClient.Builder(); + } + + /** + * okhttpclient + */ + private OkHttpClient okHttpClient() { + OkHttpClient okHttpClient = new OkHttpClient + .Builder() + .addInterceptor(new AzureHeaderAuthorizationInterceptor(this.apiKey)) + .connectTimeout(10, TimeUnit.SECONDS) + .writeTimeout(50, TimeUnit.SECONDS) + .readTimeout(50, TimeUnit.SECONDS) + .build(); + return okHttpClient; + } + + public static final class Builder { + private String apiKey; + + private String apiHost; + + /** + * 自定义OkhttpClient + */ + private OkHttpClient okHttpClient; + + public Builder() { + } + + public DifyChatAiStreamClient.Builder apiKey(String apiKeyValue) { + this.apiKey = apiKeyValue; + return this; + } + + /** + * @param apiHost + * @return + */ + public DifyChatAiStreamClient.Builder apiHost(String apiHost) { + this.apiHost = apiHost; + return this; + } + + + public DifyChatAiStreamClient.Builder okHttpClient(OkHttpClient val) { + this.okHttpClient = val; + return this; + } + + public DifyChatAiStreamClient build() { + return new DifyChatAiStreamClient(this); + } } + + + public void streamCompletions(List messages, DifyChatAIEventSourceListener eventSourceListener, + String uid, String conversationId) { + if (CollectionUtils.isEmpty(messages)) { + log.error("param error:Dify Chat Prompt cannot be empty"); + throw new ParamBusinessException("prompt"); + } + if (Objects.isNull(eventSourceListener)) { + log.error("param error:DifyChatAIEventSourceListener cannot be empty"); + throw new ParamBusinessException(); + } + String lastMessage = messages.get(messages.size() - 1).getContent(); + log.info("Dify Chat AI, uid:{} conversationId:{} prompt:{}", uid, conversationId, lastMessage); + + try { + DifyChatCompletionsOptions chatCompletionsOptions = new DifyChatCompletionsOptions(); + chatCompletionsOptions.setQuery(lastMessage); + chatCompletionsOptions.setResponse_mode("streaming"); + chatCompletionsOptions.setConversation_id(conversationId); + chatCompletionsOptions.setUser(uid); + + EventSource.Factory factory = EventSources.createFactory(this.okHttpClient); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + String requestBody = mapper.writeValueAsString(chatCompletionsOptions); + if (!apiHost.endsWith("/")) { + apiHost = apiHost + "/"; + } + String url = this.apiHost + "v1/chat-messages"; + Request request = new Request.Builder() + .url(url) + .post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody)) + .build(); + //创建事件 + EventSource eventSource = factory.newEventSource(request, eventSourceListener); + log.info("finish invoking Dify Chat ai"); + } catch (Exception e) { + log.error("Dify Chat error", e); + eventSourceListener.onFailure(null, e, null); + throw new ParamBusinessException(); + } + } } diff --git a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/listener/DifyChatAIEventSourceListener.java b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/listener/DifyChatAIEventSourceListener.java index 69d64c24..b640e930 100644 --- a/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/listener/DifyChatAIEventSourceListener.java +++ b/chat2db-server/chat2db-server-web/chat2db-server-web-api/src/main/java/ai/chat2db/server/web/api/controller/ai/dify/listener/DifyChatAIEventSourceListener.java @@ -1,13 +1,23 @@ package ai.chat2db.server.web.api.controller.ai.dify.listener; +import ai.chat2db.server.web.api.controller.ai.azure.model.AzureChatChoice; +import ai.chat2db.server.web.api.controller.ai.azure.model.AzureChatCompletions; +import ai.chat2db.server.web.api.controller.ai.azure.model.AzureChatMessage; +import ai.chat2db.server.web.api.controller.ai.azure.model.AzureCompletionsUsage; +import com.unfbx.chatgpt.entity.chat.Message; import lombok.extern.slf4j.Slf4j; import okhttp3.Response; +import okhttp3.ResponseBody; import okhttp3.sse.EventSource; import okhttp3.sse.EventSourceListener; +import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import java.io.IOException; +import java.util.Objects; + @Slf4j public class DifyChatAIEventSourceListener extends EventSourceListener { @@ -19,21 +29,67 @@ public class DifyChatAIEventSourceListener extends EventSourceListener { @Override public void onClosed(@NotNull EventSource eventSource) { - super.onClosed(eventSource); + try { + sseEmitter.send(SseEmitter.event() + .id("[DONE]") + .data("[DONE]")); + } catch (IOException e) { + throw new RuntimeException(e); + } + sseEmitter.complete(); + log.info("DifyChatAI close sse connection..."); } @Override public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) { - super.onEvent(eventSource, id, type, data); + log.info("DifyChatAI:{}", data); + } @Override public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) { - super.onFailure(eventSource, t, response); + try { + if (Objects.isNull(response)) { + String message = t.getMessage(); + Message sseMessage = new Message(); + sseMessage.setContent(message); + sseEmitter.send(SseEmitter.event() + .id("[ERROR]") + .data(sseMessage)); + sseEmitter.send(SseEmitter.event() + .id("[DONE]") + .data("[DONE]")); + sseEmitter.complete(); + return; + } + ResponseBody body = response.body(); + String bodyString = Objects.nonNull(t) ? t.getMessage() : ""; + if (Objects.nonNull(body)) { + bodyString = body.string(); + if (StringUtils.isBlank(bodyString) && Objects.nonNull(t)) { + bodyString = t.getMessage(); + } + log.error("DifyChatAI sse response:{}", bodyString); + } else { + log.error("DifyChatAI sse response:{},error:{}", response, t); + } + eventSource.cancel(); + Message message = new Message(); + message.setContent("DifyChatAI error:" + bodyString); + sseEmitter.send(SseEmitter.event() + .id("[ERROR]") + .data(message)); + sseEmitter.send(SseEmitter.event() + .id("[DONE]") + .data("[DONE]")); + sseEmitter.complete(); + } catch (Exception exception) { + log.error("DifyChatAI 发送数据异常:", exception); + } } @Override public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) { - super.onOpen(eventSource, response); + log.info("DifyChatAI 建立sse连接..."); } }