Vendor c/image after merging c/image#536

... which adds blob info caching

Signed-off-by: Miloslav Trmač <mitr@redhat.com>
This commit is contained in:
Miloslav Trmač
2018-11-23 11:51:48 +01:00
parent 1f547b2936
commit 79583c82ee
28 changed files with 1238 additions and 225 deletions

View File

@@ -15,6 +15,7 @@ import (
"github.com/containers/image/docker/reference"
"github.com/containers/image/manifest"
"github.com/containers/image/pkg/blobinfocache"
"github.com/containers/image/types"
"github.com/docker/distribution/registry/api/errcode"
"github.com/docker/distribution/registry/api/v2"
@@ -113,17 +114,21 @@ func (c *sizeCounter) Write(p []byte) (n int, err error) {
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
// May update cache.
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, isConfig bool) (types.BlobInfo, error) {
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
if inputInfo.Digest.String() != "" {
haveBlob, size, err := d.HasBlob(ctx, inputInfo)
// This should not really be necessary, at least the copy code calls TryReusingBlob automatically.
// Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value.
// But we do that with NoCache, so that it _only_ checks the primary destination, instead of trying all mount candidates _again_.
haveBlob, reusedInfo, err := d.TryReusingBlob(ctx, inputInfo, blobinfocache.NoCache, false)
if err != nil {
return types.BlobInfo{}, err
}
if haveBlob {
return types.BlobInfo{Digest: inputInfo.Digest, Size: size}, nil
return reusedInfo, nil
}
}
@@ -160,7 +165,7 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
return types.BlobInfo{}, errors.Wrap(err, "Error determining upload URL")
}
// FIXME: DELETE uploadLocation on failure
// FIXME: DELETE uploadLocation on failure (does not really work in docker/distribution servers, which incorrectly require the "delete" action in the token's scope)
locationQuery := uploadLocation.Query()
// TODO: check inputInfo.Digest == computedDigest https://github.com/containers/image/pull/70#discussion_r77646717
@@ -177,19 +182,15 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
}
logrus.Debugf("Upload of layer %s complete", computedDigest)
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), computedDigest, newBICLocationReference(d.ref))
return types.BlobInfo{Digest: computedDigest, Size: sizeCounter.size}, nil
}
// HasBlob returns true iff the image destination already contains a blob with the matching digest which can be reapplied using ReapplyBlob.
// Unlike PutBlob, the digest can not be empty. If HasBlob returns true, the size of the blob must also be returned.
// If the destination does not contain the blob, or it is unknown, HasBlob ordinarily returns (false, -1, nil);
// blobExists returns true iff repo contains a blob with digest, and if so, also its size.
// If the destination does not contain the blob, or it is unknown, blobExists ordinarily returns (false, -1, nil);
// it returns a non-nil error only on an unexpected failure.
func (d *dockerImageDestination) HasBlob(ctx context.Context, info types.BlobInfo) (bool, int64, error) {
if info.Digest == "" {
return false, -1, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
checkPath := fmt.Sprintf(blobsPath, reference.Path(d.ref.ref), info.Digest.String())
func (d *dockerImageDestination) blobExists(ctx context.Context, repo reference.Named, digest digest.Digest) (bool, int64, error) {
checkPath := fmt.Sprintf(blobsPath, reference.Path(repo), digest.String())
logrus.Debugf("Checking %s", checkPath)
res, err := d.c.makeRequest(ctx, "HEAD", checkPath, nil, nil, v2Auth)
if err != nil {
@@ -202,7 +203,7 @@ func (d *dockerImageDestination) HasBlob(ctx context.Context, info types.BlobInf
return true, getBlobSize(res), nil
case http.StatusUnauthorized:
logrus.Debugf("... not authorized")
return false, -1, errors.Wrapf(client.HandleErrorResponse(res), "Error checking whether a blob %s exists in %s", info.Digest, d.ref.ref.Name())
return false, -1, errors.Wrapf(client.HandleErrorResponse(res), "Error checking whether a blob %s exists in %s", digest, repo.Name())
case http.StatusNotFound:
logrus.Debugf("... not present")
return false, -1, nil
@@ -211,8 +212,134 @@ func (d *dockerImageDestination) HasBlob(ctx context.Context, info types.BlobInf
}
}
func (d *dockerImageDestination) ReapplyBlob(ctx context.Context, info types.BlobInfo) (types.BlobInfo, error) {
return info, nil
// mountBlob tries to mount blob srcDigest from srcRepo to the current destination.
func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo reference.Named, srcDigest digest.Digest) error {
u := url.URL{
Path: fmt.Sprintf(blobUploadPath, reference.Path(d.ref.ref)),
RawQuery: url.Values{
"mount": {srcDigest.String()},
"from": {reference.Path(srcRepo)},
}.Encode(),
}
mountPath := u.String()
logrus.Debugf("Trying to mount %s", mountPath)
res, err := d.c.makeRequest(ctx, "POST", mountPath, nil, nil, v2Auth)
if err != nil {
return err
}
defer res.Body.Close()
switch res.StatusCode {
case http.StatusCreated:
logrus.Debugf("... mount OK")
return nil
case http.StatusAccepted:
// Oops, the mount was ignored - either the registry does not support that yet, or the blob does not exist; the registry has started an ordinary upload process.
// Abort, and let the ultimate caller do an upload when its ready, instead.
// NOTE: This does not really work in docker/distribution servers, which incorrectly require the "delete" action in the token's scope, and is thus entirely untested.
uploadLocation, err := res.Location()
if err != nil {
return errors.Wrap(err, "Error determining upload URL after a mount attempt")
}
logrus.Debugf("... started an upload instead of mounting, trying to cancel at %s", uploadLocation.String())
res2, err := d.c.makeRequestToResolvedURL(ctx, "DELETE", uploadLocation.String(), nil, nil, -1, v2Auth)
if err != nil {
logrus.Debugf("Error trying to cancel an inadvertent upload: %s", err)
} else {
defer res2.Body.Close()
if res2.StatusCode != http.StatusNoContent {
logrus.Debugf("Error trying to cancel an inadvertent upload, status %s", http.StatusText(res.StatusCode))
}
}
// Anyway, if canceling the upload fails, ignore it and return the more important error:
return fmt.Errorf("Mounting %s from %s to %s started an upload instead", srcDigest, srcRepo.Name(), d.ref.ref.Name())
default:
logrus.Debugf("Error mounting, response %#v", *res)
return errors.Wrapf(client.HandleErrorResponse(res), "Error mounting %s from %s to %s", srcDigest, srcRepo.Name(), d.ref.ref.Name())
}
}
// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
// First, check whether the blob happens to already exist at the destination.
exists, size, err := d.blobExists(ctx, d.ref.ref, info.Digest)
if err != nil {
return false, types.BlobInfo{}, err
}
if exists {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, Size: size}, nil
}
// Then try reusing blobs from other locations.
// Checking candidateRepo, and mounting from it, requires an expanded token scope.
// We still want to reuse the ping information and other aspects of the client, so rather than make a fresh copy, there is this a bit ugly extraScope hack.
if d.c.extraScope != nil {
return false, types.BlobInfo{}, errors.New("Internal error: dockerClient.extraScope was set before TryReusingBlob")
}
defer func() {
d.c.extraScope = nil
}()
for _, candidate := range cache.CandidateLocations(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute) {
candidateRepo, err := parseBICLocationReference(candidate.Location)
if err != nil {
logrus.Debugf("Error parsing BlobInfoCache location reference: %s", err)
continue
}
logrus.Debugf("Trying to reuse cached location %s in %s", candidate.Digest.String(), candidateRepo.Name())
// Sanity checks:
if reference.Domain(candidateRepo) != reference.Domain(d.ref.ref) {
logrus.Debugf("... Internal error: domain %s does not match destination %s", reference.Domain(candidateRepo), reference.Domain(d.ref.ref))
continue
}
if candidateRepo.Name() == d.ref.ref.Name() && candidate.Digest == info.Digest {
logrus.Debug("... Already tried the primary destination")
continue
}
// Whatever happens here, don't abort the entire operation. It's likely we just don't have permissions, and if it is a critical network error, we will find out soon enough anyway.
d.c.extraScope = &authScope{
remoteName: reference.Path(candidateRepo),
actions: "pull",
}
// This existence check is not, strictly speaking, necessary: We only _really_ need it to get the blob size, and we could record that in the cache instead.
// But a "failed" d.mountBlob currently leaves around an unterminated server-side upload, which we would try to cancel.
// So, without this existence check, it would be 1 request on success, 2 requests on failure; with it, it is 2 requests on success, 1 request on failure.
// On success we avoid the actual costly upload; so, in a sense, the success case is "free", but failures are always costly.
// Even worse, docker/distribution does not actually reasonably implement canceling uploads
// (it would require a "delete" action in the token, and Quay does not give that to anyone, so we can't ask);
// so, be a nice client and don't create unnecesary upload sessions on the server.
exists, size, err := d.blobExists(ctx, candidateRepo, candidate.Digest)
if err != nil {
logrus.Debugf("... Failed: %v", err)
continue
}
if !exists {
// FIXME? Should we drop the blob from cache here (and elsewhere?)?
continue // logrus.Debug() already happened in blobExists
}
if candidateRepo.Name() != d.ref.ref.Name() {
if err := d.mountBlob(ctx, candidateRepo, candidate.Digest); err != nil {
logrus.Debugf("... Mount failed: %v", err)
continue
}
}
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: candidate.Digest, Size: size}, nil
}
return false, types.BlobInfo{}, nil
}
// PutManifest writes manifest to the destination.