mirror of
https://github.com/rclone/rclone.git
synced 2025-12-04 13:49:49 +02:00
azureblob: add metadata and tags support across upload and copy paths
This change adds first-class metadata support to the Azure Blob backend, including headers, user metadata, tags, and modtime overrides, and wires it through uploads and server-side copies. There is a behavior change in that rclone will now set the "mtime" custom metadata when doing server side copies to azure and the `--metadata` argument is given. - Map standard headers: cache-control, content-disposition, content-encoding, content-language, content-type to corresponding x-ms-blob-* HTTP headers. - Map user metadata: any non-reserved keys (excluding x-ms-*) are sent as blob user metadata. Keys are normalized to lowercase for consistency. - Support tags: parse `x-ms-tags` as a comma-separated list of key=value pairs and apply them on uploads and copies. - Support mtime override: accept `mtime` in metadata (RFC3339/RFC3339Nano) to override the stored modtime persisted in user metadata.
This commit is contained in:
@@ -86,12 +86,56 @@ var (
|
||||
metadataMu sync.Mutex
|
||||
)
|
||||
|
||||
// system metadata keys which this backend owns
|
||||
var systemMetadataInfo = map[string]fs.MetadataHelp{
|
||||
"cache-control": {
|
||||
Help: "Cache-Control header",
|
||||
Type: "string",
|
||||
Example: "no-cache",
|
||||
},
|
||||
"content-disposition": {
|
||||
Help: "Content-Disposition header",
|
||||
Type: "string",
|
||||
Example: "inline",
|
||||
},
|
||||
"content-encoding": {
|
||||
Help: "Content-Encoding header",
|
||||
Type: "string",
|
||||
Example: "gzip",
|
||||
},
|
||||
"content-language": {
|
||||
Help: "Content-Language header",
|
||||
Type: "string",
|
||||
Example: "en-US",
|
||||
},
|
||||
"content-type": {
|
||||
Help: "Content-Type header",
|
||||
Type: "string",
|
||||
Example: "text/plain",
|
||||
},
|
||||
"tier": {
|
||||
Help: "Tier of the object",
|
||||
Type: "string",
|
||||
Example: "Hot",
|
||||
ReadOnly: true,
|
||||
},
|
||||
"mtime": {
|
||||
Help: "Time of last modification, read from rclone metadata",
|
||||
Type: "RFC 3339",
|
||||
Example: "2006-01-02T15:04:05.999999999Z07:00",
|
||||
},
|
||||
}
|
||||
|
||||
// Register with Fs
|
||||
func init() {
|
||||
fs.Register(&fs.RegInfo{
|
||||
Name: "azureblob",
|
||||
Description: "Microsoft Azure Blob Storage",
|
||||
NewFs: NewFs,
|
||||
MetadataInfo: &fs.MetadataInfo{
|
||||
System: systemMetadataInfo,
|
||||
Help: `User metadata is stored as x-ms-meta- keys. Azure metadata keys are case insensitive and are always returned in lower case.`,
|
||||
},
|
||||
Options: []fs.Option{{
|
||||
Name: "account",
|
||||
Help: `Azure Storage Account Name.
|
||||
@@ -810,6 +854,9 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
|
||||
f.features = (&fs.Features{
|
||||
ReadMimeType: true,
|
||||
WriteMimeType: true,
|
||||
ReadMetadata: true,
|
||||
WriteMetadata: true,
|
||||
UserMetadata: true,
|
||||
BucketBased: true,
|
||||
BucketBasedRootOK: true,
|
||||
SetTier: true,
|
||||
@@ -1157,6 +1204,289 @@ func (o *Object) updateMetadataWithModTime(modTime time.Time) {
|
||||
o.meta[modTimeKey] = modTime.Format(timeFormatOut)
|
||||
}
|
||||
|
||||
// parseXMsTags parses the value of the x-ms-tags header into a map.
|
||||
// It expects comma-separated key=value pairs. Whitespace around keys and
|
||||
// values is trimmed. Empty pairs and empty keys are rejected.
|
||||
func parseXMsTags(s string) (map[string]string, error) {
|
||||
if strings.TrimSpace(s) == "" {
|
||||
return map[string]string{}, nil
|
||||
}
|
||||
out := make(map[string]string)
|
||||
parts := strings.Split(s, ",")
|
||||
for _, p := range parts {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
kv := strings.SplitN(p, "=", 2)
|
||||
if len(kv) != 2 {
|
||||
return nil, fmt.Errorf("invalid tag %q", p)
|
||||
}
|
||||
k := strings.TrimSpace(kv[0])
|
||||
v := strings.TrimSpace(kv[1])
|
||||
if k == "" {
|
||||
return nil, fmt.Errorf("invalid tag key in %q", p)
|
||||
}
|
||||
out[k] = v
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// mapMetadataToAzure maps a generic metadata map to Azure HTTP headers,
|
||||
// user metadata, tags and optional modTime override.
|
||||
// Reserved x-ms-* keys (except x-ms-tags) are ignored for user metadata.
|
||||
//
|
||||
// Pass a logger to surface non-fatal parsing issues (e.g. bad mtime).
|
||||
func mapMetadataToAzure(meta map[string]string, logf func(string, ...any)) (headers blob.HTTPHeaders, userMeta map[string]*string, tags map[string]string, modTime *time.Time, err error) {
|
||||
if meta == nil {
|
||||
return headers, nil, nil, nil, nil
|
||||
}
|
||||
tmp := make(map[string]string)
|
||||
for k, v := range meta {
|
||||
lowerKey := strings.ToLower(k)
|
||||
switch lowerKey {
|
||||
case "cache-control":
|
||||
headers.BlobCacheControl = pString(v)
|
||||
case "content-disposition":
|
||||
headers.BlobContentDisposition = pString(v)
|
||||
case "content-encoding":
|
||||
headers.BlobContentEncoding = pString(v)
|
||||
case "content-language":
|
||||
headers.BlobContentLanguage = pString(v)
|
||||
case "content-type":
|
||||
headers.BlobContentType = pString(v)
|
||||
case "x-ms-tags":
|
||||
parsed, perr := parseXMsTags(v)
|
||||
if perr != nil {
|
||||
return headers, nil, nil, nil, perr
|
||||
}
|
||||
// allocate only if there are tags
|
||||
if len(parsed) > 0 {
|
||||
tags = parsed
|
||||
}
|
||||
case "mtime":
|
||||
// Accept multiple layouts for tolerance
|
||||
var parsed time.Time
|
||||
var pErr error
|
||||
for _, layout := range []string{time.RFC3339Nano, time.RFC3339, timeFormatOut} {
|
||||
parsed, pErr = time.Parse(layout, v)
|
||||
if pErr == nil {
|
||||
modTime = &parsed
|
||||
break
|
||||
}
|
||||
}
|
||||
// Log and ignore if unparseable
|
||||
if modTime == nil && logf != nil {
|
||||
logf("metadata: couldn't parse mtime %q: %v", v, pErr)
|
||||
}
|
||||
case "tier":
|
||||
// ignore - handled elsewhere
|
||||
default:
|
||||
// Filter out other reserved headers so they don't end up as user metadata
|
||||
if strings.HasPrefix(lowerKey, "x-ms-") {
|
||||
continue
|
||||
}
|
||||
tmp[lowerKey] = v
|
||||
}
|
||||
}
|
||||
userMeta = toAzureMetaPtr(tmp)
|
||||
return headers, userMeta, tags, modTime, nil
|
||||
}
|
||||
|
||||
// toAzureMetaPtr converts a map[string]string to map[string]*string as used by Azure SDK
|
||||
func toAzureMetaPtr(in map[string]string) map[string]*string {
|
||||
if len(in) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make(map[string]*string, len(in))
|
||||
for k, v := range in {
|
||||
vv := v
|
||||
out[k] = &vv
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// assembleCopyParams prepares headers, metadata and tags for copy operations.
|
||||
//
|
||||
// It starts from the source properties, optionally overlays mapped metadata
|
||||
// from rclone's metadata options, ensures mtime presence when mapping is
|
||||
// enabled, and returns whether mapping was actually requested (hadMapping).
|
||||
// assembleCopyParams prepares headers, metadata and tags for copy operations.
|
||||
//
|
||||
// If includeBaseMeta is true, start user metadata from the source's metadata
|
||||
// and overlay mapped values. This matches multipart copy commit behavior.
|
||||
// If false, only include mapped user metadata (no source baseline) which
|
||||
// matches previous singlepart StartCopyFromURL semantics.
|
||||
func assembleCopyParams(ctx context.Context, f *Fs, src fs.Object, srcProps *blob.GetPropertiesResponse, includeBaseMeta bool) (headers blob.HTTPHeaders, meta map[string]*string, tags map[string]string, hadMapping bool, err error) {
|
||||
// Start from source properties
|
||||
headers = blob.HTTPHeaders{
|
||||
BlobCacheControl: srcProps.CacheControl,
|
||||
BlobContentDisposition: srcProps.ContentDisposition,
|
||||
BlobContentEncoding: srcProps.ContentEncoding,
|
||||
BlobContentLanguage: srcProps.ContentLanguage,
|
||||
BlobContentMD5: srcProps.ContentMD5,
|
||||
BlobContentType: srcProps.ContentType,
|
||||
}
|
||||
// Optionally deep copy user metadata pointers from source. Normalise keys to
|
||||
// lower-case to avoid duplicate x-ms-meta headers when we later inject/overlay
|
||||
// metadata (Azure treats keys case-insensitively but Go's http.Header will
|
||||
// join duplicate keys into a comma separated list, which breaks shared-key
|
||||
// signing).
|
||||
if includeBaseMeta && len(srcProps.Metadata) > 0 {
|
||||
meta = make(map[string]*string, len(srcProps.Metadata))
|
||||
for k, v := range srcProps.Metadata {
|
||||
if v != nil {
|
||||
vv := *v
|
||||
meta[strings.ToLower(k)] = &vv
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Only consider mapping if metadata pipeline is enabled
|
||||
if fs.GetConfig(ctx).Metadata {
|
||||
mapped, mapErr := fs.GetMetadataOptions(ctx, f, src, fs.MetadataAsOpenOptions(ctx))
|
||||
if mapErr != nil {
|
||||
return headers, meta, nil, false, fmt.Errorf("failed to map metadata: %w", mapErr)
|
||||
}
|
||||
if mapped != nil {
|
||||
// Map rclone metadata to Azure shapes
|
||||
mappedHeaders, userMeta, mappedTags, mappedModTime, herr := mapMetadataToAzure(mapped, func(format string, args ...any) { fs.Debugf(f, format, args...) })
|
||||
if herr != nil {
|
||||
return headers, meta, nil, false, fmt.Errorf("metadata mapping: %w", herr)
|
||||
}
|
||||
hadMapping = true
|
||||
// Overlay headers (only non-nil)
|
||||
if mappedHeaders.BlobCacheControl != nil {
|
||||
headers.BlobCacheControl = mappedHeaders.BlobCacheControl
|
||||
}
|
||||
if mappedHeaders.BlobContentDisposition != nil {
|
||||
headers.BlobContentDisposition = mappedHeaders.BlobContentDisposition
|
||||
}
|
||||
if mappedHeaders.BlobContentEncoding != nil {
|
||||
headers.BlobContentEncoding = mappedHeaders.BlobContentEncoding
|
||||
}
|
||||
if mappedHeaders.BlobContentLanguage != nil {
|
||||
headers.BlobContentLanguage = mappedHeaders.BlobContentLanguage
|
||||
}
|
||||
if mappedHeaders.BlobContentType != nil {
|
||||
headers.BlobContentType = mappedHeaders.BlobContentType
|
||||
}
|
||||
// Overlay user metadata
|
||||
if len(userMeta) > 0 {
|
||||
if meta == nil {
|
||||
meta = make(map[string]*string, len(userMeta))
|
||||
}
|
||||
for k, v := range userMeta {
|
||||
meta[k] = v
|
||||
}
|
||||
}
|
||||
// Apply tags if any
|
||||
if len(mappedTags) > 0 {
|
||||
tags = mappedTags
|
||||
}
|
||||
// Ensure mtime present using mapped or source time
|
||||
if _, ok := meta[modTimeKey]; !ok {
|
||||
when := src.ModTime(ctx)
|
||||
if mappedModTime != nil {
|
||||
when = *mappedModTime
|
||||
}
|
||||
val := when.Format(time.RFC3339Nano)
|
||||
if meta == nil {
|
||||
meta = make(map[string]*string, 1)
|
||||
}
|
||||
meta[modTimeKey] = &val
|
||||
}
|
||||
// Ensure content-type fallback to source if not set by mapper
|
||||
if headers.BlobContentType == nil {
|
||||
headers.BlobContentType = srcProps.ContentType
|
||||
}
|
||||
} else {
|
||||
// Mapping enabled but not provided: ensure mtime present based on source ModTime
|
||||
if _, ok := meta[modTimeKey]; !ok {
|
||||
when := src.ModTime(ctx)
|
||||
val := when.Format(time.RFC3339Nano)
|
||||
if meta == nil {
|
||||
meta = make(map[string]*string, 1)
|
||||
}
|
||||
meta[modTimeKey] = &val
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return headers, meta, tags, hadMapping, nil
|
||||
}
|
||||
|
||||
// applyMappedMetadata applies mapped metadata and headers to the object state for uploads.
|
||||
//
|
||||
// It reads `--metadata`, `--metadata-set`, and `--metadata-mapper` outputs via fs.GetMetadataOptions
|
||||
// and updates o.meta, o.tags and ui.httpHeaders accordingly.
|
||||
func (o *Object) applyMappedMetadata(ctx context.Context, src fs.ObjectInfo, ui *uploadInfo, options []fs.OpenOption) (modTime time.Time, err error) {
|
||||
// Start from the source modtime; may be overridden by metadata
|
||||
modTime = src.ModTime(ctx)
|
||||
|
||||
// Fetch mapped metadata if --metadata is enabled
|
||||
meta, err := fs.GetMetadataOptions(ctx, o.fs, src, options)
|
||||
if err != nil {
|
||||
return modTime, err
|
||||
}
|
||||
if meta == nil {
|
||||
// No metadata processing requested
|
||||
return modTime, nil
|
||||
}
|
||||
|
||||
// Map metadata using common helper
|
||||
headers, userMeta, tags, mappedModTime, err := mapMetadataToAzure(meta, func(format string, args ...any) { fs.Debugf(o, format, args...) })
|
||||
if err != nil {
|
||||
return modTime, err
|
||||
}
|
||||
// Merge headers into ui
|
||||
if headers.BlobCacheControl != nil {
|
||||
ui.httpHeaders.BlobCacheControl = headers.BlobCacheControl
|
||||
}
|
||||
if headers.BlobContentDisposition != nil {
|
||||
ui.httpHeaders.BlobContentDisposition = headers.BlobContentDisposition
|
||||
}
|
||||
if headers.BlobContentEncoding != nil {
|
||||
ui.httpHeaders.BlobContentEncoding = headers.BlobContentEncoding
|
||||
}
|
||||
if headers.BlobContentLanguage != nil {
|
||||
ui.httpHeaders.BlobContentLanguage = headers.BlobContentLanguage
|
||||
}
|
||||
if headers.BlobContentType != nil {
|
||||
ui.httpHeaders.BlobContentType = headers.BlobContentType
|
||||
}
|
||||
|
||||
// Apply user metadata to o.meta with a single critical section
|
||||
if len(userMeta) > 0 {
|
||||
metadataMu.Lock()
|
||||
if o.meta == nil {
|
||||
o.meta = make(map[string]string, len(userMeta))
|
||||
}
|
||||
for k, v := range userMeta {
|
||||
if v != nil {
|
||||
o.meta[k] = *v
|
||||
}
|
||||
}
|
||||
metadataMu.Unlock()
|
||||
}
|
||||
|
||||
// Apply tags
|
||||
if len(tags) > 0 {
|
||||
if o.tags == nil {
|
||||
o.tags = make(map[string]string, len(tags))
|
||||
}
|
||||
for k, v := range tags {
|
||||
o.tags[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
if mappedModTime != nil {
|
||||
modTime = *mappedModTime
|
||||
}
|
||||
|
||||
return modTime, nil
|
||||
}
|
||||
|
||||
// Returns whether file is a directory marker or not
|
||||
func isDirectoryMarker(size int64, metadata map[string]*string, remote string) bool {
|
||||
// Directory markers are 0 length
|
||||
@@ -1951,18 +2281,19 @@ func (f *Fs) copyMultipart(ctx context.Context, remote, dstContainer, dstPath st
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Convert metadata from source object
|
||||
// Prepare metadata/headers/tags for destination
|
||||
// For multipart commit, include base metadata from source then overlay mapped
|
||||
commitHeaders, commitMeta, commitTags, _, err := assembleCopyParams(ctx, f, src, srcProperties, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("multipart copy: %w", err)
|
||||
}
|
||||
|
||||
// Convert metadata from source or mapper
|
||||
options := blockblob.CommitBlockListOptions{
|
||||
Metadata: srcProperties.Metadata,
|
||||
Tier: parseTier(f.opt.AccessTier),
|
||||
HTTPHeaders: &blob.HTTPHeaders{
|
||||
BlobCacheControl: srcProperties.CacheControl,
|
||||
BlobContentDisposition: srcProperties.ContentDisposition,
|
||||
BlobContentEncoding: srcProperties.ContentEncoding,
|
||||
BlobContentLanguage: srcProperties.ContentLanguage,
|
||||
BlobContentMD5: srcProperties.ContentMD5,
|
||||
BlobContentType: srcProperties.ContentType,
|
||||
},
|
||||
Metadata: commitMeta,
|
||||
Tags: commitTags,
|
||||
Tier: parseTier(f.opt.AccessTier),
|
||||
HTTPHeaders: &commitHeaders,
|
||||
}
|
||||
|
||||
// Finalise the upload session
|
||||
@@ -1993,10 +2324,36 @@ func (f *Fs) copySinglepart(ctx context.Context, remote, dstContainer, dstPath s
|
||||
return nil, fmt.Errorf("single part copy: source auth: %w", err)
|
||||
}
|
||||
|
||||
// Start the copy
|
||||
// Prepare mapped metadata/tags/headers if requested
|
||||
options := blob.StartCopyFromURLOptions{
|
||||
Tier: parseTier(f.opt.AccessTier),
|
||||
}
|
||||
var postHeaders *blob.HTTPHeaders
|
||||
// Read source properties and assemble params; this also handles the case when mapping is disabled
|
||||
srcProps, err := src.readMetaDataAlways(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("single part copy: read source properties: %w", err)
|
||||
}
|
||||
// For singlepart copy, do not include base metadata from source in StartCopyFromURL
|
||||
headers, meta, tags, hadMapping, aerr := assembleCopyParams(ctx, f, src, srcProps, false)
|
||||
if aerr != nil {
|
||||
return nil, fmt.Errorf("single part copy: %w", aerr)
|
||||
}
|
||||
// Apply tags and post-copy headers only when mapping requested changes
|
||||
if len(tags) > 0 {
|
||||
options.BlobTags = make(map[string]string, len(tags))
|
||||
for k, v := range tags {
|
||||
options.BlobTags[k] = v
|
||||
}
|
||||
}
|
||||
if hadMapping {
|
||||
// Only set metadata explicitly when mapping was requested; otherwise
|
||||
// let the service copy source metadata (including mtime) automatically.
|
||||
if len(meta) > 0 {
|
||||
options.Metadata = meta
|
||||
}
|
||||
postHeaders = &headers
|
||||
}
|
||||
var startCopy blob.StartCopyFromURLResponse
|
||||
err = f.pacer.Call(func() (bool, error) {
|
||||
startCopy, err = dstBlobSVC.StartCopyFromURL(ctx, srcURL, &options)
|
||||
@@ -2026,6 +2383,16 @@ func (f *Fs) copySinglepart(ctx context.Context, remote, dstContainer, dstPath s
|
||||
pollTime = min(2*pollTime, time.Second)
|
||||
}
|
||||
|
||||
// If mapper requested header changes, set them post-copy
|
||||
if postHeaders != nil {
|
||||
blb := f.getBlobSVC(dstContainer, dstPath)
|
||||
_, setErr := blb.SetHTTPHeaders(ctx, *postHeaders, nil)
|
||||
if setErr != nil {
|
||||
return nil, fmt.Errorf("single part copy: failed to set headers: %w", setErr)
|
||||
}
|
||||
}
|
||||
// Metadata (when requested) is set via StartCopyFromURL options.Metadata
|
||||
|
||||
return f.NewObject(ctx, remote)
|
||||
}
|
||||
|
||||
@@ -2157,6 +2524,35 @@ func (o *Object) getMetadata() (metadata map[string]*string) {
|
||||
return metadata
|
||||
}
|
||||
|
||||
// Metadata returns metadata for an object
|
||||
//
|
||||
// It returns a combined view of system and user metadata.
|
||||
func (o *Object) Metadata(ctx context.Context) (fs.Metadata, error) {
|
||||
// Ensure metadata is loaded
|
||||
if err := o.readMetaData(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
m := fs.Metadata{}
|
||||
|
||||
// System metadata we expose
|
||||
if !o.modTime.IsZero() {
|
||||
m["mtime"] = o.modTime.Format(time.RFC3339Nano)
|
||||
}
|
||||
if o.accessTier != "" {
|
||||
m["tier"] = string(o.accessTier)
|
||||
}
|
||||
|
||||
// Merge user metadata (already lower-cased keys)
|
||||
metadataMu.Lock()
|
||||
for k, v := range o.meta {
|
||||
m[k] = v
|
||||
}
|
||||
metadataMu.Unlock()
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// decodeMetaDataFromPropertiesResponse sets the metadata from the data passed in
|
||||
//
|
||||
// Sets
|
||||
@@ -2995,17 +3391,19 @@ func (o *Object) prepareUpload(ctx context.Context, src fs.ObjectInfo, options [
|
||||
// containerPath = containerPath[:len(containerPath)-1]
|
||||
// }
|
||||
|
||||
// Update Mod time
|
||||
o.updateMetadataWithModTime(src.ModTime(ctx))
|
||||
if err != nil {
|
||||
return ui, err
|
||||
}
|
||||
|
||||
// Create the HTTP headers for the upload
|
||||
// Start with default content-type based on source
|
||||
ui.httpHeaders = blob.HTTPHeaders{
|
||||
BlobContentType: pString(fs.MimeType(ctx, src)),
|
||||
}
|
||||
|
||||
// Apply mapped metadata/headers/tags if requested
|
||||
modTime, err := o.applyMappedMetadata(ctx, src, &ui, options)
|
||||
if err != nil {
|
||||
return ui, err
|
||||
}
|
||||
// Ensure mtime is set in metadata based on possibly overridden modTime
|
||||
o.updateMetadataWithModTime(modTime)
|
||||
|
||||
// Compute the Content-MD5 of the file. As we stream all uploads it
|
||||
// will be set in PutBlockList API call using the 'x-ms-blob-content-md5' header
|
||||
if !o.fs.opt.DisableCheckSum {
|
||||
|
||||
Reference in New Issue
Block a user