refactor: use |= operator for dictionary updates (py39)

This commit is contained in:
Thomas Waldmann 2025-11-03 22:18:26 +01:00
parent 3bad2de41f
commit 8f40baa34b
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
8 changed files with 44 additions and 55 deletions

View file

@ -51,7 +51,7 @@ class BuildUsage:
for cmd, parser in action.choices.items():
choices[prefix + cmd] = parser
if extra_choices is not None:
choices.update(extra_choices)
choices |= extra_choices
if prefix and not choices:
return
print("found commands: %s" % list(choices.keys()))
@ -328,7 +328,7 @@ class BuildMan:
for cmd, parser in action.choices.items():
choices[prefix + cmd] = parser
if extra_choices is not None:
choices.update(extra_choices)
choices |= extra_choices
if prefix and not choices:
return

View file

@ -155,10 +155,10 @@ Bytes sent to remote: {stats.tx_bytes}
if not final:
data = self.as_dict()
if item:
data.update(text_to_json("path", item.path))
data |= text_to_json("path", item.path)
else:
data = {}
data.update({"time": time.time(), "type": "archive_progress", "finished": final})
data |= {"time": time.time(), "type": "archive_progress", "finished": final}
msg = json.dumps(data)
end = "\n"
elif not stream.isatty():
@ -620,16 +620,14 @@ class Archive:
if self.create:
info["command_line"] = join_cmd(sys.argv)
else:
info.update(
{
"command_line": self.metadata.command_line,
"hostname": self.metadata.hostname,
"username": self.metadata.username,
"comment": self.metadata.get("comment", ""),
"tags": sorted(self.tags),
"chunker_params": self.metadata.get("chunker_params", ""),
}
)
info |= {
"command_line": self.metadata.command_line,
"hostname": self.metadata.hostname,
"username": self.metadata.username,
"comment": self.metadata.get("comment", ""),
"tags": sorted(self.tags),
"chunker_params": self.metadata.get("chunker_params", ""),
}
return info
def __str__(self):
@ -704,8 +702,8 @@ Duration: {0.duration}
# because borg info relies on them. so, either use the given stats (from args)
# or fall back to self.stats if it was not given.
stats = stats or self.stats
metadata.update({"size": stats.osize, "nfiles": stats.nfiles})
metadata.update(additional_metadata or {})
metadata |= {"size": stats.osize, "nfiles": stats.nfiles}
metadata |= additional_metadata or {}
metadata = ArchiveItem(metadata)
data = self.key.pack_metadata(metadata.as_dict())
self.id = self.repo_objs.id_hash(data)
@ -1136,7 +1134,7 @@ class MetadataCollector:
def stat_attrs(self, st, path, fd=None):
attrs = self.stat_simple_attrs(st, path, fd=fd)
attrs.update(self.stat_ext_attrs(st, path, fd=fd))
attrs |= self.stat_ext_attrs(st, path, fd=fd)
return attrs

View file

@ -169,7 +169,7 @@ class Archiver(
if self.output_list and status is not None and (self.output_filter is None or status in self.output_filter):
if self.log_json:
json_data = {"type": "file_status", "status": status}
json_data.update(text_to_json("path", path))
json_data |= text_to_json("path", path)
print(json.dumps(json_data), file=sys.stderr)
else:
logging.getLogger("borg.output.list").info("%1s %s", status, remove_surrogates(path))

View file

@ -65,7 +65,7 @@ class ArchiveAnalyzer:
if "chunks" in item:
item_chunks = dict(item.chunks) # chunk id -> plaintext size
directory_path = os.path.dirname(item.path)
chunks_by_path[directory_path].update(item_chunks)
chunks_by_path[directory_path] |= item_chunks
return chunks_by_path
def analyze_change(self, base, new):

View file

@ -179,7 +179,7 @@ def unpackb(packed, *, raw=RAW, unicode_errors=UNICODE_ERRORS, strict_map_key=Fa
assert unicode_errors == UNICODE_ERRORS
try:
kw = dict(raw=raw, unicode_errors=unicode_errors, strict_map_key=strict_map_key)
kw.update(kwargs)
kw |= kwargs
return mp_unpackb(packed, **kw)
except Exception as e:
raise UnpackException(e)
@ -190,7 +190,7 @@ def unpack(stream, *, raw=RAW, unicode_errors=UNICODE_ERRORS, strict_map_key=Fal
assert unicode_errors == UNICODE_ERRORS
try:
kw = dict(raw=raw, unicode_errors=unicode_errors, strict_map_key=strict_map_key)
kw.update(kwargs)
kw |= kwargs
return mp_unpack(stream, **kw)
except Exception as e:
raise UnpackException(e)
@ -227,9 +227,9 @@ def get_limited_unpacker(kind):
# unpack(data) or from max_buffer_size for Unpacker(max_buffer_size=N).
args = dict(use_list=False, max_buffer_size=3 * max(BUFSIZE, MAX_OBJECT_SIZE)) # return tuples, not lists
if kind in ("server", "client"):
args.update(dict(max_buffer_size=0)) # 0 means "maximum" here, ~4GiB - needed for store_load/save
args |= dict(max_buffer_size=0) # 0 means "maximum" here, ~4GiB - needed for store_load/save
elif kind in ("manifest", "archive", "key"):
args.update(dict(use_list=True, object_hook=StableDict)) # default value
args |= dict(use_list=True, object_hook=StableDict) # default value
else:
raise ValueError('kind must be "server", "client", "manifest", "archive" or "key"')
return Unpacker(**args)

View file

@ -93,7 +93,7 @@ def text_to_json(key, value):
# value has surrogate escape sequences
data[key] = remove_surrogates(value)
value_bytes = value.encode(coding, errors="surrogateescape")
data.update(binary_to_json(key, value_bytes))
data |= binary_to_json(key, value_bytes)
else:
# value is pure unicode
data[key] = value
@ -321,9 +321,7 @@ class PlaceholderReplacer:
self.overrides = {}
def __call__(self, text, overrides=None):
ovr = {}
ovr.update(self.overrides)
ovr.update(overrides or {})
ovr = self.overrides | (overrides or {})
return _replace_placeholders(text, overrides=ovr)
@ -776,8 +774,7 @@ class ArchiveFormatter(BaseFormatter):
)
def __init__(self, format, repository, manifest, key, *, iec=False, deleted=False):
static_data = {} # here could be stuff on repo level, above archive level
static_data.update(self.FIXED_KEYS)
static_data = {} | self.FIXED_KEYS # here could be stuff on repo level, above archive level
super().__init__(format, static_data)
self.repository = repository
self.manifest = manifest
@ -804,16 +801,14 @@ class ArchiveFormatter(BaseFormatter):
self.name = archive_info.name
self.id = archive_info.id
item_data = {}
item_data.update({} if jsonline else self.static_data)
item_data.update(
{
"name": archive_info.name,
"archive": archive_info.name,
"id": bin_to_hex(archive_info.id),
"time": self.format_time(archive_info.ts),
"start": self.format_time(archive_info.ts),
}
)
item_data |= {} if jsonline else self.static_data
item_data |= {
"name": archive_info.name,
"archive": archive_info.name,
"id": bin_to_hex(archive_info.id),
"time": self.format_time(archive_info.ts),
"start": self.format_time(archive_info.ts),
}
for key in self.used_call_keys:
item_data[key] = self.call_keys[key]()
@ -821,7 +816,7 @@ class ArchiveFormatter(BaseFormatter):
# But unsure whether hostname, username, command_line could contain surrogate escapes, play safe:
for key in "hostname", "username", "command_line":
if key in item_data:
item_data.update(text_to_json(key, item_data[key]))
item_data |= text_to_json(key, item_data[key])
return item_data
@property
@ -893,8 +888,7 @@ class ItemFormatter(BaseFormatter):
def __init__(self, archive, format):
from ..checksums import StreamingXXH64
static_data = {"archivename": archive.name, "archiveid": archive.fpr}
static_data.update(self.FIXED_KEYS)
static_data = {"archivename": archive.name, "archiveid": archive.fpr} | self.FIXED_KEYS
super().__init__(format, static_data)
self.xxh64 = StreamingXXH64
self.archive = archive
@ -916,11 +910,11 @@ class ItemFormatter(BaseFormatter):
def get_item_data(self, item, jsonline=False):
item_data = {}
item_data.update({} if jsonline else self.static_data)
item_data |= {} if jsonline else self.static_data
item_data.update(text_to_json("path", item.path))
item_data |= text_to_json("path", item.path)
target = item.get("target", "")
item_data.update(text_to_json("target", target))
item_data |= text_to_json("target", target)
if not jsonline:
item_data["extra"] = "" if not target else f" -> {item_data['target']}"
@ -935,8 +929,8 @@ class ItemFormatter(BaseFormatter):
item_data["uid"] = item.get("uid") # int or None
item_data["gid"] = item.get("gid") # int or None
item_data.update(text_to_json("user", item.get("user", str(item_data["uid"]))))
item_data.update(text_to_json("group", item.get("group", str(item_data["gid"]))))
item_data |= text_to_json("user", item.get("user", str(item_data["uid"])))
item_data |= text_to_json("group", item.get("group", str(item_data["gid"])))
item_data["flags"] = item.get("bsdflags") # int if flags known, else (if flags unknown) None
# inode number from source filesystem (may be absent on some platforms)
@ -1007,8 +1001,7 @@ class DiffFormatter(BaseFormatter):
METADATA = ("mode", "type", "owner", "group", "user", "mtime", "ctime")
def __init__(self, format, content_only=False):
static_data = {}
static_data.update(self.FIXED_KEYS)
static_data = {} | self.FIXED_KEYS
super().__init__(format or "{content}{link}{directory}{blkdev}{chrdev}{fifo} {path}{NL}", static_data)
self.content_only = content_only
self.format_keys = {f[1] for f in Formatter().parse(format)}
@ -1047,7 +1040,7 @@ class DiffFormatter(BaseFormatter):
change.append(self.call_keys[key](item))
diff_data["change"] = " ".join([v for v in change if v])
diff_data["path"] = item.path
diff_data.update({} if jsonline else self.static_data)
diff_data |= {} if jsonline else self.static_data
return diff_data
def format_other(self, key, diff: "ItemDiff"):
@ -1218,7 +1211,7 @@ class BorgJsonEncoder(json.JSONEncoder):
def basic_json_data(manifest, *, cache=None, extra=None):
key = manifest.key
data = extra or {}
data.update({"repository": BorgJsonEncoder().default(manifest.repository), "encryption": {"mode": key.ARG_NAME}})
data |= {"repository": BorgJsonEncoder().default(manifest.repository), "encryption": {"mode": key.ARG_NAME}}
data["repository"]["last_modified"] = OutputTimestamp(manifest.last_timestamp)
if key.NAME.startswith("key file"):
data["encryption"]["keyfile"] = key.find_key()

View file

@ -25,9 +25,7 @@ class ProgressIndicatorBase:
self.msgid = msgid
def make_json(self, *, finished=False, **kwargs):
kwargs.update(
dict(operation=self.id, msgid=self.msgid, type=self.JSON_TYPE, finished=finished, time=time.time())
)
kwargs |= dict(operation=self.id, msgid=self.msgid, type=self.JSON_TYPE, finished=finished, time=time.time())
return json.dumps(kwargs)
def finish(self):

View file

@ -64,7 +64,7 @@ def yes(
def output(msg, msg_type, is_prompt=False, **kwargs):
json_output = getattr(logging.getLogger("borg"), "json", False)
if json_output:
kwargs.update(dict(type="question_%s" % msg_type, msgid=msgid, message=msg))
kwargs |= dict(type="question_%s" % msg_type, msgid=msgid, message=msg)
print(json.dumps(kwargs), file=sys.stderr)
else:
if is_prompt: