mirror of
				https://github.com/cloudreve/cloudreve.git
				synced 2025-10-31 08:39:10 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			590 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			590 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package onedrive
 | ||
| 
 | ||
| import (
 | ||
| 	"bytes"
 | ||
| 	"context"
 | ||
| 	"encoding/json"
 | ||
| 	"errors"
 | ||
| 	"fmt"
 | ||
| 	model "github.com/HFO4/cloudreve/models"
 | ||
| 	"github.com/HFO4/cloudreve/pkg/cache"
 | ||
| 	"github.com/HFO4/cloudreve/pkg/filesystem/fsctx"
 | ||
| 	"github.com/HFO4/cloudreve/pkg/request"
 | ||
| 	"github.com/HFO4/cloudreve/pkg/util"
 | ||
| 	"io"
 | ||
| 	"io/ioutil"
 | ||
| 	"net/http"
 | ||
| 	"net/url"
 | ||
| 	"path"
 | ||
| 	"strconv"
 | ||
| 	"strings"
 | ||
| 	"time"
 | ||
| )
 | ||
| 
 | ||
| const (
 | ||
| 	// SmallFileSize 单文件上传接口最大尺寸
 | ||
| 	SmallFileSize uint64 = 4 * 1024 * 1024
 | ||
| 	// ChunkSize 分片上传分片大小
 | ||
| 	ChunkSize uint64 = 10 * 1024 * 1024
 | ||
| 	// ListRetry 列取请求重试次数
 | ||
| 	ListRetry = 1
 | ||
| )
 | ||
| 
 | ||
| // GetSourcePath 获取文件的绝对路径
 | ||
| func (info *FileInfo) GetSourcePath() string {
 | ||
| 	res, err := url.PathUnescape(
 | ||
| 		strings.TrimPrefix(
 | ||
| 			path.Join(
 | ||
| 				strings.TrimPrefix(info.ParentReference.Path, "/drive/root:"),
 | ||
| 				info.Name,
 | ||
| 			),
 | ||
| 			"/",
 | ||
| 		),
 | ||
| 	)
 | ||
| 	if err != nil {
 | ||
| 		return ""
 | ||
| 	}
 | ||
| 	return res
 | ||
| }
 | ||
| 
 | ||
| // Error 实现error接口
 | ||
| func (err RespError) Error() string {
 | ||
| 	return err.APIError.Message
 | ||
| }
 | ||
| 
 | ||
| func (client *Client) getRequestURL(api string) string {
 | ||
| 	base, _ := url.Parse(client.Endpoints.EndpointURL)
 | ||
| 	if base == nil {
 | ||
| 		return ""
 | ||
| 	}
 | ||
| 	base.Path = path.Join(base.Path, api)
 | ||
| 	return base.String()
 | ||
| }
 | ||
| 
 | ||
| // ListChildren 根据路径列取子对象
 | ||
| func (client *Client) ListChildren(ctx context.Context, path string) ([]FileInfo, error) {
 | ||
| 	var requestURL string
 | ||
| 	dst := strings.TrimPrefix(path, "/")
 | ||
| 	if dst == "" {
 | ||
| 		requestURL = client.getRequestURL("me/drive/root/children")
 | ||
| 	} else {
 | ||
| 		requestURL = client.getRequestURL("me/drive/root:/" + dst + ":/children")
 | ||
| 	}
 | ||
| 
 | ||
| 	res, err := client.requestWithStr(ctx, "GET", requestURL+"?$top=999999999", "", 200)
 | ||
| 	if err != nil {
 | ||
| 		retried := 0
 | ||
| 		if v, ok := ctx.Value(fsctx.RetryCtx).(int); ok {
 | ||
| 			retried = v
 | ||
| 		}
 | ||
| 		if retried < ListRetry {
 | ||
| 			retried++
 | ||
| 			util.Log().Debug("路径[%s]列取请求失败[%s],5秒钟后重试", path, err)
 | ||
| 			time.Sleep(time.Duration(5) * time.Second)
 | ||
| 			return client.ListChildren(context.WithValue(ctx, fsctx.RetryCtx, retried), path)
 | ||
| 		}
 | ||
| 		return nil, err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr error
 | ||
| 		fileInfo  ListResponse
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &fileInfo)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return nil, decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	return fileInfo.Value, nil
 | ||
| }
 | ||
| 
 | ||
| // Meta 根据资源ID或文件路径获取文件元信息
 | ||
| func (client *Client) Meta(ctx context.Context, id string, path string) (*FileInfo, error) {
 | ||
| 	var requestURL string
 | ||
| 	if id != "" {
 | ||
| 		requestURL = client.getRequestURL("/me/drive/items/" + id)
 | ||
| 	} else {
 | ||
| 		dst := strings.TrimPrefix(path, "/")
 | ||
| 		requestURL = client.getRequestURL("me/drive/root:/" + dst)
 | ||
| 	}
 | ||
| 
 | ||
| 	res, err := client.requestWithStr(ctx, "GET", requestURL+"?expand=thumbnails", "", 200)
 | ||
| 	if err != nil {
 | ||
| 		return nil, err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr error
 | ||
| 		fileInfo  FileInfo
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &fileInfo)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return nil, decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	return &fileInfo, nil
 | ||
| 
 | ||
| }
 | ||
| 
 | ||
| // CreateUploadSession 创建分片上传会话
 | ||
| func (client *Client) CreateUploadSession(ctx context.Context, dst string, opts ...Option) (string, error) {
 | ||
| 
 | ||
| 	options := newDefaultOption()
 | ||
| 	for _, o := range opts {
 | ||
| 		o.apply(options)
 | ||
| 	}
 | ||
| 
 | ||
| 	dst = strings.TrimPrefix(dst, "/")
 | ||
| 	requestURL := client.getRequestURL("me/drive/root:/" + dst + ":/createUploadSession")
 | ||
| 	body := map[string]map[string]interface{}{
 | ||
| 		"item": {
 | ||
| 			"@microsoft.graph.conflictBehavior": options.conflictBehavior,
 | ||
| 		},
 | ||
| 	}
 | ||
| 	bodyBytes, _ := json.Marshal(body)
 | ||
| 
 | ||
| 	res, err := client.requestWithStr(ctx, "POST", requestURL, string(bodyBytes), 200)
 | ||
| 	if err != nil {
 | ||
| 		return "", err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr     error
 | ||
| 		uploadSession UploadSessionResponse
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &uploadSession)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return "", decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	return uploadSession.UploadURL, nil
 | ||
| }
 | ||
| 
 | ||
| // GetUploadSessionStatus 查询上传会话状态
 | ||
| func (client *Client) GetUploadSessionStatus(ctx context.Context, uploadURL string) (*UploadSessionResponse, error) {
 | ||
| 	res, err := client.requestWithStr(ctx, "GET", uploadURL, "", 200)
 | ||
| 	if err != nil {
 | ||
| 		return nil, err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr     error
 | ||
| 		uploadSession UploadSessionResponse
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &uploadSession)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return nil, decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	return &uploadSession, nil
 | ||
| }
 | ||
| 
 | ||
| // UploadChunk 上传分片
 | ||
| func (client *Client) UploadChunk(ctx context.Context, uploadURL string, chunk *Chunk) (*UploadSessionResponse, error) {
 | ||
| 	res, err := client.request(
 | ||
| 		ctx, "PUT", uploadURL, bytes.NewReader(chunk.Data[0:chunk.ChunkSize]),
 | ||
| 		request.WithContentLength(int64(chunk.ChunkSize)),
 | ||
| 		request.WithHeader(http.Header{
 | ||
| 			"Content-Range": {fmt.Sprintf("bytes %d-%d/%d", chunk.Offset, chunk.Offset+chunk.ChunkSize-1, chunk.Total)},
 | ||
| 		}),
 | ||
| 		request.WithoutHeader([]string{"Authorization", "Content-Type"}),
 | ||
| 		request.WithTimeout(time.Duration(300)*time.Second),
 | ||
| 	)
 | ||
| 	if err != nil {
 | ||
| 		// 如果重试次数小于限制,5秒后重试
 | ||
| 		if chunk.Retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
 | ||
| 			chunk.Retried++
 | ||
| 			util.Log().Debug("分片偏移%d上传失败[%s],5秒钟后重试", chunk.Offset, err)
 | ||
| 			time.Sleep(time.Duration(5) * time.Second)
 | ||
| 			return client.UploadChunk(ctx, uploadURL, chunk)
 | ||
| 		}
 | ||
| 		return nil, err
 | ||
| 	}
 | ||
| 
 | ||
| 	if chunk.IsLast() {
 | ||
| 		return nil, nil
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr error
 | ||
| 		uploadRes UploadSessionResponse
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &uploadRes)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return nil, decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	return &uploadRes, nil
 | ||
| }
 | ||
| 
 | ||
| // Upload 上传文件
 | ||
| func (client *Client) Upload(ctx context.Context, dst string, size int, file io.Reader) error {
 | ||
| 	// 小文件,使用简单上传接口上传
 | ||
| 	if size <= int(SmallFileSize) {
 | ||
| 		_, err := client.SimpleUpload(ctx, dst, file, int64(size))
 | ||
| 		return err
 | ||
| 	}
 | ||
| 
 | ||
| 	// 大文件,进行分片
 | ||
| 	// 创建上传会话
 | ||
| 	uploadURL, err := client.CreateUploadSession(ctx, dst, WithConflictBehavior("replace"))
 | ||
| 	if err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 
 | ||
| 	offset := 0
 | ||
| 	chunkNum := size / int(ChunkSize)
 | ||
| 	if size%int(ChunkSize) != 0 {
 | ||
| 		chunkNum++
 | ||
| 	}
 | ||
| 
 | ||
| 	chunkData := make([]byte, ChunkSize)
 | ||
| 
 | ||
| 	for i := 0; i < chunkNum; i++ {
 | ||
| 		select {
 | ||
| 		case <-ctx.Done():
 | ||
| 			util.Log().Debug("OneDrive 客户端取消")
 | ||
| 			return ErrClientCanceled
 | ||
| 		default:
 | ||
| 			// 分块
 | ||
| 			chunkSize := int(ChunkSize)
 | ||
| 			if size-offset < chunkSize {
 | ||
| 				chunkSize = size - offset
 | ||
| 			}
 | ||
| 
 | ||
| 			// 因为后面需要错误重试,这里要把分片内容读到内存中
 | ||
| 			chunkContent := chunkData[:chunkSize]
 | ||
| 			_, err := io.ReadFull(file, chunkContent)
 | ||
| 
 | ||
| 			chunk := Chunk{
 | ||
| 				Offset:    offset,
 | ||
| 				ChunkSize: chunkSize,
 | ||
| 				Total:     size,
 | ||
| 				Data:      chunkContent,
 | ||
| 			}
 | ||
| 
 | ||
| 			// 上传
 | ||
| 			_, err = client.UploadChunk(ctx, uploadURL, &chunk)
 | ||
| 			if err != nil {
 | ||
| 				return err
 | ||
| 			}
 | ||
| 			offset += chunkSize
 | ||
| 		}
 | ||
| 
 | ||
| 	}
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // DeleteUploadSession 删除上传会话
 | ||
| func (client *Client) DeleteUploadSession(ctx context.Context, uploadURL string) error {
 | ||
| 	_, err := client.requestWithStr(ctx, "DELETE", uploadURL, "", 204)
 | ||
| 	if err != nil {
 | ||
| 		return err
 | ||
| 	}
 | ||
| 
 | ||
| 	return nil
 | ||
| }
 | ||
| 
 | ||
| // SimpleUpload 上传小文件到dst
 | ||
| func (client *Client) SimpleUpload(ctx context.Context, dst string, body io.Reader, size int64) (*UploadResult, error) {
 | ||
| 	dst = strings.TrimPrefix(dst, "/")
 | ||
| 	requestURL := client.getRequestURL("me/drive/root:/" + dst + ":/content")
 | ||
| 
 | ||
| 	res, err := client.request(ctx, "PUT", requestURL, body, request.WithContentLength(int64(size)),
 | ||
| 		request.WithTimeout(time.Duration(150)*time.Second),
 | ||
| 	)
 | ||
| 	if err != nil {
 | ||
| 		retried := 0
 | ||
| 		if v, ok := ctx.Value(fsctx.RetryCtx).(int); ok {
 | ||
| 			retried = v
 | ||
| 		}
 | ||
| 		if retried < model.GetIntSetting("onedrive_chunk_retries", 1) {
 | ||
| 			retried++
 | ||
| 			util.Log().Debug("文件[%s]上传失败[%s],5秒钟后重试", dst, err)
 | ||
| 			time.Sleep(time.Duration(5) * time.Second)
 | ||
| 			return client.SimpleUpload(context.WithValue(ctx, fsctx.RetryCtx, retried), dst, body, size)
 | ||
| 		}
 | ||
| 		return nil, err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr error
 | ||
| 		uploadRes UploadResult
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &uploadRes)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return nil, decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	return &uploadRes, nil
 | ||
| }
 | ||
| 
 | ||
| // BatchDelete 并行删除给出的文件,返回删除失败的文件,及第一个遇到的错误。此方法将文件分为
 | ||
| // 20个一组,调用Delete并行删除
 | ||
| // TODO 测试
 | ||
| func (client *Client) BatchDelete(ctx context.Context, dst []string) ([]string, error) {
 | ||
| 	groupNum := len(dst)/20 + 1
 | ||
| 	finalRes := make([]string, 0, len(dst))
 | ||
| 	res := make([]string, 0, 20)
 | ||
| 	var err error
 | ||
| 
 | ||
| 	for i := 0; i < groupNum; i++ {
 | ||
| 		end := 20*i + 20
 | ||
| 		if i == groupNum-1 {
 | ||
| 			end = len(dst)
 | ||
| 		}
 | ||
| 		res, err = client.Delete(ctx, dst[20*i:end])
 | ||
| 		finalRes = append(finalRes, res...)
 | ||
| 	}
 | ||
| 
 | ||
| 	return finalRes, err
 | ||
| }
 | ||
| 
 | ||
| // Delete 并行删除文件,返回删除失败的文件,及第一个遇到的错误,
 | ||
| // 由于API限制,最多删除20个
 | ||
| func (client *Client) Delete(ctx context.Context, dst []string) ([]string, error) {
 | ||
| 	body := client.makeBatchDeleteRequestsBody(dst)
 | ||
| 	res, err := client.requestWithStr(ctx, "POST", client.getRequestURL("$batch"), body, 200)
 | ||
| 	if err != nil {
 | ||
| 		return dst, err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr error
 | ||
| 		deleteRes BatchResponses
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &deleteRes)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return dst, decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	// 取得删除失败的文件
 | ||
| 	failed := getDeleteFailed(&deleteRes)
 | ||
| 	if len(failed) != 0 {
 | ||
| 		return failed, ErrDeleteFile
 | ||
| 	}
 | ||
| 	return failed, nil
 | ||
| }
 | ||
| 
 | ||
| func getDeleteFailed(res *BatchResponses) []string {
 | ||
| 	var failed = make([]string, 0, len(res.Responses))
 | ||
| 	for _, v := range res.Responses {
 | ||
| 		if v.Status != 204 {
 | ||
| 			failed = append(failed, v.ID)
 | ||
| 		}
 | ||
| 	}
 | ||
| 	return failed
 | ||
| }
 | ||
| 
 | ||
| // makeBatchDeleteRequestsBody 生成批量删除请求正文
 | ||
| func (client *Client) makeBatchDeleteRequestsBody(files []string) string {
 | ||
| 	req := BatchRequests{
 | ||
| 		Requests: make([]BatchRequest, len(files)),
 | ||
| 	}
 | ||
| 	for i, v := range files {
 | ||
| 		v = strings.TrimPrefix(v, "/")
 | ||
| 		filePath, _ := url.Parse("/me/drive/root:/")
 | ||
| 		filePath.Path = path.Join(filePath.Path, v)
 | ||
| 		req.Requests[i] = BatchRequest{
 | ||
| 			ID:     v,
 | ||
| 			Method: "DELETE",
 | ||
| 			URL:    filePath.EscapedPath(),
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	res, _ := json.Marshal(req)
 | ||
| 	return string(res)
 | ||
| }
 | ||
| 
 | ||
| // GetThumbURL 获取给定尺寸的缩略图URL
 | ||
| func (client *Client) GetThumbURL(ctx context.Context, dst string, w, h uint) (string, error) {
 | ||
| 	dst = strings.TrimPrefix(dst, "/")
 | ||
| 	var (
 | ||
| 		cropOption string
 | ||
| 		requestURL string
 | ||
| 	)
 | ||
| 	if client.Endpoints.isInChina {
 | ||
| 		cropOption = "large"
 | ||
| 		requestURL = client.getRequestURL("me/drive/root:/"+dst+":/thumbnails/0") + "/" + cropOption
 | ||
| 	} else {
 | ||
| 		cropOption = fmt.Sprintf("c%dx%d_Crop", w, h)
 | ||
| 		requestURL = client.getRequestURL("me/drive/root:/"+dst+":/thumbnails") + "?select=" + cropOption
 | ||
| 	}
 | ||
| 
 | ||
| 	res, err := client.requestWithStr(ctx, "GET", requestURL, "", 200)
 | ||
| 	if err != nil {
 | ||
| 		return "", err
 | ||
| 	}
 | ||
| 
 | ||
| 	var (
 | ||
| 		decodeErr error
 | ||
| 		thumbRes  ThumbResponse
 | ||
| 	)
 | ||
| 	decodeErr = json.Unmarshal([]byte(res), &thumbRes)
 | ||
| 	if decodeErr != nil {
 | ||
| 		return "", decodeErr
 | ||
| 	}
 | ||
| 
 | ||
| 	if thumbRes.URL != "" {
 | ||
| 		return thumbRes.URL, nil
 | ||
| 	}
 | ||
| 
 | ||
| 	if len(thumbRes.Value) == 1 {
 | ||
| 		if res, ok := thumbRes.Value[0][cropOption]; ok {
 | ||
| 			return res.(map[string]interface{})["url"].(string), nil
 | ||
| 		}
 | ||
| 	}
 | ||
| 
 | ||
| 	return "", errors.New("无法生成缩略图")
 | ||
| }
 | ||
| 
 | ||
| // MonitorUpload 监控客户端分片上传进度
 | ||
| func (client *Client) MonitorUpload(uploadURL, callbackKey, path string, size uint64, ttl int64) {
 | ||
| 	// 回调完成通知chan
 | ||
| 	callbackChan := make(chan bool)
 | ||
| 	callbackSignal.Store(callbackKey, callbackChan)
 | ||
| 	defer callbackSignal.Delete(callbackKey)
 | ||
| 	timeout := model.GetIntSetting("onedrive_monitor_timeout", 600)
 | ||
| 	interval := model.GetIntSetting("onedrive_callback_check", 20)
 | ||
| 
 | ||
| 	for {
 | ||
| 		select {
 | ||
| 		case <-callbackChan:
 | ||
| 			util.Log().Debug("客户端完成回调")
 | ||
| 			return
 | ||
| 		case <-time.After(time.Duration(ttl) * time.Second):
 | ||
| 			// 上传会话到期,仍未完成上传,创建占位符
 | ||
| 			client.DeleteUploadSession(context.Background(), uploadURL)
 | ||
| 			_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0)
 | ||
| 			if err != nil {
 | ||
| 				util.Log().Debug("无法创建占位文件,%s", err)
 | ||
| 			}
 | ||
| 			return
 | ||
| 		case <-time.After(time.Duration(timeout) * time.Second):
 | ||
| 			util.Log().Debug("检查上传情况")
 | ||
| 			status, err := client.GetUploadSessionStatus(context.Background(), uploadURL)
 | ||
| 
 | ||
| 			if err != nil {
 | ||
| 				if resErr, ok := err.(*RespError); ok {
 | ||
| 					if resErr.APIError.Code == "itemNotFound" {
 | ||
| 						util.Log().Debug("上传会话已完成,稍后检查回调")
 | ||
| 						time.Sleep(time.Duration(interval) * time.Second)
 | ||
| 						util.Log().Debug("开始检查回调")
 | ||
| 						_, ok := cache.Get("callback_" + callbackKey)
 | ||
| 						if ok {
 | ||
| 							util.Log().Warning("未发送回调,删除文件")
 | ||
| 							cache.Deletes([]string{callbackKey}, "callback_")
 | ||
| 							_, err = client.Delete(context.Background(), []string{path})
 | ||
| 							if err != nil {
 | ||
| 								util.Log().Warning("无法删除未回调的文件,%s", err)
 | ||
| 							}
 | ||
| 						}
 | ||
| 						return
 | ||
| 					}
 | ||
| 				}
 | ||
| 				util.Log().Debug("无法获取上传会话状态,继续下一轮,%s", err.Error())
 | ||
| 				continue
 | ||
| 			}
 | ||
| 
 | ||
| 			// 成功获取分片上传状态,检查文件大小
 | ||
| 			if len(status.NextExpectedRanges) == 0 {
 | ||
| 				continue
 | ||
| 			}
 | ||
| 			sizeRange := strings.Split(
 | ||
| 				status.NextExpectedRanges[len(status.NextExpectedRanges)-1],
 | ||
| 				"-",
 | ||
| 			)
 | ||
| 			if len(sizeRange) != 2 {
 | ||
| 				continue
 | ||
| 			}
 | ||
| 			uploadFullSize, _ := strconv.ParseUint(sizeRange[1], 10, 64)
 | ||
| 			if (sizeRange[0] == "0" && sizeRange[1] == "") || uploadFullSize+1 != size {
 | ||
| 				util.Log().Debug("未开始上传或文件大小不一致,取消上传会话")
 | ||
| 				// 取消上传会话,实测OneDrive取消上传会话后,客户端还是可以上传,
 | ||
| 				// 所以上传一个空文件占位,阻止客户端上传
 | ||
| 				client.DeleteUploadSession(context.Background(), uploadURL)
 | ||
| 				_, err := client.SimpleUpload(context.Background(), path, strings.NewReader(""), 0)
 | ||
| 				if err != nil {
 | ||
| 					util.Log().Debug("无法创建占位文件,%s", err)
 | ||
| 				}
 | ||
| 				return
 | ||
| 			}
 | ||
| 
 | ||
| 		}
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| // FinishCallback 向Monitor发送回调结束信号
 | ||
| func FinishCallback(key string) {
 | ||
| 	if signal, ok := callbackSignal.Load(key); ok {
 | ||
| 		if signalChan, ok := signal.(chan bool); ok {
 | ||
| 			close(signalChan)
 | ||
| 		}
 | ||
| 	}
 | ||
| }
 | ||
| 
 | ||
| func sysError(err error) *RespError {
 | ||
| 	return &RespError{APIError: APIError{
 | ||
| 		Code:    "system",
 | ||
| 		Message: err.Error(),
 | ||
| 	}}
 | ||
| }
 | ||
| 
 | ||
| func (client *Client) request(ctx context.Context, method string, url string, body io.Reader, option ...request.Option) (string, *RespError) {
 | ||
| 	// 获取凭证
 | ||
| 	err := client.UpdateCredential(ctx)
 | ||
| 	if err != nil {
 | ||
| 		return "", sysError(err)
 | ||
| 	}
 | ||
| 
 | ||
| 	option = append(option,
 | ||
| 		request.WithHeader(http.Header{
 | ||
| 			"Authorization": {"Bearer " + client.Credential.AccessToken},
 | ||
| 			"Content-Type":  {"application/json"},
 | ||
| 		}),
 | ||
| 		request.WithContext(ctx),
 | ||
| 	)
 | ||
| 
 | ||
| 	// 发送请求
 | ||
| 	res := client.Request.Request(
 | ||
| 		method,
 | ||
| 		url,
 | ||
| 		body,
 | ||
| 		option...,
 | ||
| 	)
 | ||
| 
 | ||
| 	if res.Err != nil {
 | ||
| 		return "", sysError(res.Err)
 | ||
| 	}
 | ||
| 
 | ||
| 	respBody, err := res.GetResponse()
 | ||
| 	if err != nil {
 | ||
| 		return "", sysError(err)
 | ||
| 	}
 | ||
| 
 | ||
| 	// 解析请求响应
 | ||
| 	var (
 | ||
| 		errResp   RespError
 | ||
| 		decodeErr error
 | ||
| 	)
 | ||
| 	// 如果有错误
 | ||
| 	if res.Response.StatusCode < 200 || res.Response.StatusCode >= 300 {
 | ||
| 		decodeErr = json.Unmarshal([]byte(respBody), &errResp)
 | ||
| 		if decodeErr != nil {
 | ||
| 			util.Log().Debug("Onedrive返回未知响应[%s]", respBody)
 | ||
| 			return "", sysError(decodeErr)
 | ||
| 		}
 | ||
| 		return "", &errResp
 | ||
| 	}
 | ||
| 
 | ||
| 	return respBody, nil
 | ||
| }
 | ||
| 
 | ||
| func (client *Client) requestWithStr(ctx context.Context, method string, url string, body string, expectedCode int) (string, *RespError) {
 | ||
| 	// 发送请求
 | ||
| 	bodyReader := ioutil.NopCloser(strings.NewReader(body))
 | ||
| 	return client.request(ctx, method, url, bodyReader,
 | ||
| 		request.WithContentLength(int64(len(body))),
 | ||
| 	)
 | ||
| }
 | 
