initial commit

This commit is contained in:
Davidson Gomes
2024-10-30 11:19:09 -03:00
commit 8654a31a4d
3744 changed files with 585542 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
from .core import (
delete_password,
get_credential,
get_keyring,
get_password,
set_keyring,
set_password,
)
__all__ = (
'set_keyring',
'get_keyring',
'set_password',
'get_password',
'delete_password',
'get_credential',
)

View File

@@ -0,0 +1,4 @@
if __name__ == '__main__':
from keyring import cli
cli.main()

View File

@@ -0,0 +1,300 @@
"""
Keyring implementation support
"""
from __future__ import annotations
import abc
import copy
import functools
import logging
import operator
import os
import typing
import warnings
from jaraco.context import ExceptionTrap
from jaraco.functools import once
from . import credentials, errors, util
from .compat import properties
from .compat.py312 import metadata
log = logging.getLogger(__name__)
by_priority = operator.attrgetter('priority')
_limit: typing.Callable[[KeyringBackend], bool] | None = None
class KeyringBackendMeta(abc.ABCMeta):
"""
Specialized subclass behavior.
Keeps a registry of all (non-abstract) types.
Wraps set_password to validate the username.
"""
def __init__(cls, name, bases, dict):
super().__init__(name, bases, dict)
cls._register()
cls._validate_username_in_set_password()
def _register(cls):
if not hasattr(cls, '_classes'):
cls._classes = set()
classes = cls._classes
if not cls.__abstractmethods__:
classes.add(cls)
def _validate_username_in_set_password(cls):
"""
Wrap ``set_password`` such to validate the passed username.
"""
orig = cls.set_password
@functools.wraps(orig)
def wrapper(self, system, username, *args, **kwargs):
self._validate_username(username)
return orig(self, system, username, *args, **kwargs)
cls.set_password = wrapper
class KeyringBackend(metaclass=KeyringBackendMeta):
"""The abstract base class of the keyring, every backend must implement
this interface.
"""
def __init__(self):
self.set_properties_from_env()
@properties.classproperty
def priority(self) -> float:
"""
Each backend class must supply a priority, a number (float or integer)
indicating the priority of the backend relative to all other backends.
The priority need not be static -- it may (and should) vary based
attributes of the environment in which is runs (platform, available
packages, etc.).
A higher number indicates a higher priority. The priority should raise
a RuntimeError with a message indicating the underlying cause if the
backend is not suitable for the current environment.
As a rule of thumb, a priority between zero but less than one is
suitable, but a priority of one or greater is recommended.
"""
raise NotImplementedError
# Python 3.8 compatibility
passes = ExceptionTrap().passes
@properties.classproperty
@passes
def viable(cls):
cls.priority # noqa: B018
@classmethod
def get_viable_backends(
cls: type[KeyringBackend],
) -> filter[type[KeyringBackend]]:
"""
Return all subclasses deemed viable.
"""
return filter(operator.attrgetter('viable'), cls._classes)
@properties.classproperty
def name(cls) -> str:
"""
The keyring name, suitable for display.
The name is derived from module and class name.
"""
parent, sep, mod_name = cls.__module__.rpartition('.')
mod_name = mod_name.replace('_', ' ')
# mypy doesn't see `cls` is `type[Self]`, might be fixable in jaraco.classes
return ' '.join([mod_name, cls.__name__]) # type: ignore[attr-defined]
def __str__(self) -> str:
keyring_class = type(self)
return f"{keyring_class.__module__}.{keyring_class.__name__} (priority: {keyring_class.priority:g})"
@abc.abstractmethod
def get_password(self, service: str, username: str) -> str | None:
"""Get password of the username for the service"""
return None
def _validate_username(self, username: str) -> None:
"""
Ensure the username is not empty.
"""
if not username:
warnings.warn(
"Empty usernames are deprecated. See #668",
DeprecationWarning,
stacklevel=3,
)
# raise ValueError("Username cannot be empty")
@abc.abstractmethod
def set_password(self, service: str, username: str, password: str) -> None:
"""Set password for the username of the service.
If the backend cannot store passwords, raise
PasswordSetError.
"""
raise errors.PasswordSetError("reason")
# for backward-compatibility, don't require a backend to implement
# delete_password
# @abc.abstractmethod
def delete_password(self, service: str, username: str) -> None:
"""Delete the password for the username of the service.
If the backend cannot delete passwords, raise
PasswordDeleteError.
"""
raise errors.PasswordDeleteError("reason")
# for backward-compatibility, don't require a backend to implement
# get_credential
# @abc.abstractmethod
def get_credential(
self,
service: str,
username: str | None,
) -> credentials.Credential | None:
"""Gets the username and password for the service.
Returns a Credential instance.
The *username* argument is optional and may be omitted by
the caller or ignored by the backend. Callers must use the
returned username.
"""
# The default implementation requires a username here.
if username is not None:
password = self.get_password(service, username)
if password is not None:
return credentials.SimpleCredential(username, password)
return None
def set_properties_from_env(self) -> None:
"""For all KEYRING_PROPERTY_* env var, set that property."""
def parse(item: tuple[str, str]):
key, value = item
pre, sep, name = key.partition('KEYRING_PROPERTY_')
return sep and (name.lower(), value)
props: filter[tuple[str, str]] = filter(None, map(parse, os.environ.items()))
for name, value in props:
setattr(self, name, value)
def with_properties(self, **kwargs: typing.Any) -> KeyringBackend:
alt = copy.copy(self)
vars(alt).update(kwargs)
return alt
class Crypter:
"""Base class providing encryption and decryption"""
@abc.abstractmethod
def encrypt(self, value):
"""Encrypt the value."""
pass
@abc.abstractmethod
def decrypt(self, value):
"""Decrypt the value."""
pass
class NullCrypter(Crypter):
"""A crypter that does nothing"""
def encrypt(self, value):
return value
def decrypt(self, value):
return value
def _load_plugins() -> None:
"""
Locate all setuptools entry points by the name 'keyring backends'
and initialize them.
Any third-party library may register an entry point by adding the
following to their setup.cfg::
[options.entry_points]
keyring.backends =
plugin_name = mylib.mymodule:initialize_func
`plugin_name` can be anything, and is only used to display the name
of the plugin at initialization time.
`initialize_func` is optional, but will be invoked if callable.
"""
for ep in metadata.entry_points(group='keyring.backends'):
try:
log.debug('Loading %s', ep.name)
init_func = ep.load()
if callable(init_func):
init_func()
except Exception:
log.exception(f"Error initializing plugin {ep}.")
@once
def get_all_keyring() -> list[KeyringBackend]:
"""
Return a list of all implemented keyrings that can be constructed without
parameters.
"""
_load_plugins()
viable_classes = KeyringBackend.get_viable_backends()
rings = util.suppress_exceptions(viable_classes, exceptions=TypeError)
return list(rings)
class SchemeSelectable:
"""
Allow a backend to select different "schemes" for the
username and service.
>>> backend = SchemeSelectable()
>>> backend._query('contoso', 'alice')
{'username': 'alice', 'service': 'contoso'}
>>> backend._query('contoso')
{'service': 'contoso'}
>>> backend.scheme = 'KeePassXC'
>>> backend._query('contoso', 'alice')
{'UserName': 'alice', 'Title': 'contoso'}
>>> backend._query('contoso', 'alice', foo='bar')
{'UserName': 'alice', 'Title': 'contoso', 'foo': 'bar'}
"""
scheme = 'default'
schemes = dict(
default=dict(username='username', service='service'),
KeePassXC=dict(username='UserName', service='Title'),
)
def _query(
self, service: str, username: str | None = None, **base: typing.Any
) -> dict[str, str]:
scheme = self.schemes[self.scheme]
return dict(
{
scheme['username']: username,
scheme['service']: service,
}
if username is not None
else {
scheme['service']: service,
},
**base,
)

View File

@@ -0,0 +1,14 @@
# Complete keyring backends for `keyring -b` from `keyring --list-backends`
# # keyring -b <TAB>
# keyring.backends.chainer.ChainerBackend keyring.backends.fail.Keyring ...
_keyring_backends() {
local choices
choices=$(
"${COMP_WORDS[0]}" --list-backends 2>/dev/null |
while IFS=$' \t' read -r backend rest; do
printf "%s\n" "$backend"
done
)
compgen -W "${choices[*]}" -- "$1"
}

View File

@@ -0,0 +1,14 @@
# Complete keyring backends for `keyring -b` from `keyring --list-backends`
# % keyring -b <TAB>
# keyring priority
# keyring.backends.chainer.ChainerBackend 10
# keyring.backends.fail.Keyring 0
# ... ...
backend_complete() {
local line
while read -r line; do
choices+=(${${line/ \(priority: /\\\\:}/)/})
done <<< "$($words[1] --list-backends)"
_arguments "*:keyring priority:(($choices))"
}

View File

@@ -0,0 +1,120 @@
import logging
from contextlib import closing
from jaraco.context import ExceptionTrap
from .. import backend
from ..backend import KeyringBackend
from ..compat import properties
from ..credentials import SimpleCredential
from ..errors import (
InitError,
KeyringLocked,
PasswordDeleteError,
)
try:
import secretstorage
import secretstorage.exceptions as exceptions
except ImportError:
pass
except AttributeError:
# See https://github.com/jaraco/keyring/issues/296
pass
log = logging.getLogger(__name__)
class Keyring(backend.SchemeSelectable, KeyringBackend):
"""Secret Service Keyring"""
appid = 'Python keyring library'
@properties.classproperty
def priority(cls) -> float:
with ExceptionTrap() as exc:
secretstorage.__name__ # noqa: B018
if exc:
raise RuntimeError("SecretStorage required")
if secretstorage.__version_tuple__ < (3, 2):
raise RuntimeError("SecretStorage 3.2 or newer required")
try:
with closing(secretstorage.dbus_init()) as connection:
if not secretstorage.check_service_availability(connection):
raise RuntimeError(
"The Secret Service daemon is neither running nor "
"activatable through D-Bus"
)
except exceptions.SecretStorageException as e:
raise RuntimeError(f"Unable to initialize SecretService: {e}") from e
return 5
def get_preferred_collection(self):
"""If self.preferred_collection contains a D-Bus path,
the collection at that address is returned. Otherwise,
the default collection is returned.
"""
bus = secretstorage.dbus_init()
try:
if hasattr(self, 'preferred_collection'):
collection = secretstorage.Collection(bus, self.preferred_collection)
else:
collection = secretstorage.get_default_collection(bus)
except exceptions.SecretStorageException as e:
raise InitError(f"Failed to create the collection: {e}.") from e
if collection.is_locked():
collection.unlock()
if collection.is_locked(): # User dismissed the prompt
raise KeyringLocked("Failed to unlock the collection!")
return collection
def unlock(self, item):
if hasattr(item, 'unlock'):
item.unlock()
if item.is_locked(): # User dismissed the prompt
raise KeyringLocked('Failed to unlock the item!')
def get_password(self, service, username):
"""Get password of the username for the service"""
collection = self.get_preferred_collection()
with closing(collection.connection):
items = collection.search_items(self._query(service, username))
for item in items:
self.unlock(item)
return item.get_secret().decode('utf-8')
def set_password(self, service, username, password):
"""Set password for the username of the service"""
collection = self.get_preferred_collection()
attributes = self._query(service, username, application=self.appid)
label = f"Password for '{username}' on '{service}'"
with closing(collection.connection):
collection.create_item(label, attributes, password, replace=True)
def delete_password(self, service, username):
"""Delete the stored password (only the first one)"""
collection = self.get_preferred_collection()
with closing(collection.connection):
items = collection.search_items(self._query(service, username))
for item in items:
return item.delete()
raise PasswordDeleteError("No such password!")
def get_credential(self, service, username):
"""Gets the first username and password for a service.
Returns a Credential instance
The username can be omitted, but if there is one, it will use get_password
and return a SimpleCredential containing the username and password
Otherwise, it will return the first username and password combo that it finds.
"""
scheme = self.schemes[self.scheme]
query = self._query(service, username)
collection = self.get_preferred_collection()
with closing(collection.connection):
items = collection.search_items(query)
for item in items:
self.unlock(item)
username = item.get_attributes().get(scheme['username'])
return SimpleCredential(username, item.get_secret().decode('utf-8'))

View File

@@ -0,0 +1,168 @@
from __future__ import annotations
import logging
from jaraco.context import ExceptionTrap
from ..backend import KeyringBackend
from ..compat import properties
from ..credentials import SimpleCredential
from ..errors import PasswordDeleteError
with ExceptionTrap() as missing_deps:
try:
# prefer pywin32-ctypes
from win32ctypes.pywin32 import pywintypes, win32cred
# force demand import to raise ImportError
win32cred.__name__ # noqa: B018
except ImportError:
# fallback to pywin32
import pywintypes
import win32cred
# force demand import to raise ImportError
win32cred.__name__ # noqa: B018
log = logging.getLogger(__name__)
class Persistence:
def __get__(self, keyring, type=None):
return getattr(keyring, '_persist', win32cred.CRED_PERSIST_ENTERPRISE)
def __set__(self, keyring, value):
"""
Set the persistence value on the Keyring. Value may be
one of the win32cred.CRED_PERSIST_* constants or a
string representing one of those constants. For example,
'local machine' or 'session'.
"""
if isinstance(value, str):
attr = 'CRED_PERSIST_' + value.replace(' ', '_').upper()
value = getattr(win32cred, attr)
keyring._persist = value
class DecodingCredential(dict):
@property
def value(self):
"""
Attempt to decode the credential blob as UTF-16 then UTF-8.
"""
cred = self['CredentialBlob']
try:
return cred.decode('utf-16')
except UnicodeDecodeError:
decoded_cred_utf8 = cred.decode('utf-8')
log.warning(
"Retrieved a UTF-8 encoded credential. Please be aware that "
"this library only writes credentials in UTF-16."
)
return decoded_cred_utf8
class WinVaultKeyring(KeyringBackend):
"""
WinVaultKeyring stores encrypted passwords using the Windows Credential
Manager.
Requires pywin32
This backend does some gymnastics to simulate multi-user support,
which WinVault doesn't support natively. See
https://github.com/jaraco/keyring/issues/47#issuecomment-75763152
for details on the implementation, but here's the gist:
Passwords are stored under the service name unless there is a collision
(another password with the same service name but different user name),
in which case the previous password is moved into a compound name:
{username}@{service}
"""
persist = Persistence()
@properties.classproperty
def priority(cls) -> float:
"""
If available, the preferred backend on Windows.
"""
if missing_deps:
raise RuntimeError("Requires Windows and pywin32")
return 5
@staticmethod
def _compound_name(username, service):
return f'{username}@{service}'
def get_password(self, service, username):
res = self._resolve_credential(service, username)
return res and res.value
def _resolve_credential(
self, service: str, username: str | None
) -> DecodingCredential | None:
# first attempt to get the password under the service name
res = self._read_credential(service)
if not res or username and res['UserName'] != username:
# It wasn't found so attempt to get it with the compound name
res = self._read_credential(self._compound_name(username, service))
return res
def _read_credential(self, target):
try:
res = win32cred.CredRead(
Type=win32cred.CRED_TYPE_GENERIC, TargetName=target
)
except pywintypes.error as e:
if e.winerror == 1168 and e.funcname == 'CredRead': # not found
return None
raise
return DecodingCredential(res)
def set_password(self, service, username, password):
existing_pw = self._read_credential(service)
if existing_pw:
# resave the existing password using a compound target
existing_username = existing_pw['UserName']
target = self._compound_name(existing_username, service)
self._set_password(
target,
existing_username,
existing_pw.value,
)
self._set_password(service, username, str(password))
def _set_password(self, target, username, password):
credential = dict(
Type=win32cred.CRED_TYPE_GENERIC,
TargetName=target,
UserName=username,
CredentialBlob=password,
Comment="Stored using python-keyring",
Persist=self.persist,
)
win32cred.CredWrite(credential, 0)
def delete_password(self, service, username):
compound = self._compound_name(username, service)
deleted = False
for target in service, compound:
existing_pw = self._read_credential(target)
if existing_pw and existing_pw['UserName'] == username:
deleted = True
self._delete_password(target)
if not deleted:
raise PasswordDeleteError(service)
def _delete_password(self, target):
try:
win32cred.CredDelete(Type=win32cred.CRED_TYPE_GENERIC, TargetName=target)
except pywintypes.error as e:
if e.winerror == 1168 and e.funcname == 'CredDelete': # not found
return
raise
def get_credential(self, service, username):
res = self._resolve_credential(service, username)
return res and SimpleCredential(res['UserName'], res.value)

View File

@@ -0,0 +1,71 @@
"""
Keyring Chainer - iterates over other viable backends to
discover passwords in each.
"""
from .. import backend
from ..compat import properties
from . import fail
class ChainerBackend(backend.KeyringBackend):
"""
>>> ChainerBackend()
<keyring.backends.chainer.ChainerBackend object at ...>
"""
# override viability as 'priority' cannot be determined
# until other backends have been constructed
viable = True
@properties.classproperty
def priority(cls) -> float:
"""
If there are backends to chain, high priority
Otherwise very low priority since our operation when empty
is the same as null.
"""
return 10 if len(cls.backends) > 1 else (fail.Keyring.priority - 1)
@properties.classproperty
def backends(cls):
"""
Discover all keyrings for chaining.
"""
def allow(keyring):
limit = backend._limit or bool
return (
not isinstance(keyring, ChainerBackend)
and limit(keyring)
and keyring.priority > 0
)
allowed = filter(allow, backend.get_all_keyring())
return sorted(allowed, key=backend.by_priority, reverse=True)
def get_password(self, service, username):
for keyring in self.backends:
password = keyring.get_password(service, username)
if password is not None:
return password
def set_password(self, service, username, password):
for keyring in self.backends:
try:
return keyring.set_password(service, username, password)
except NotImplementedError:
pass
def delete_password(self, service, username):
for keyring in self.backends:
try:
return keyring.delete_password(service, username)
except NotImplementedError:
pass
def get_credential(self, service, username):
for keyring in self.backends:
credential = keyring.get_credential(service, username)
if credential is not None:
return credential

View File

@@ -0,0 +1,30 @@
from ..backend import KeyringBackend
from ..compat import properties
from ..errors import NoKeyringError
class Keyring(KeyringBackend):
"""
Keyring that raises error on every operation.
>>> kr = Keyring()
>>> kr.get_password('svc', 'user')
Traceback (most recent call last):
...
keyring.errors.NoKeyringError: ...No recommended backend...
"""
@properties.classproperty
def priority(cls) -> float:
return 0
def get_password(self, service, username, password=None):
msg = (
"No recommended backend was available. Install a recommended 3rd "
"party backend package; or, install the keyrings.alt package if "
"you want to use the non-recommended backends. See "
"https://pypi.org/project/keyring for details."
)
raise NoKeyringError(msg)
set_password = delete_password = get_password

View File

@@ -0,0 +1,164 @@
import contextlib
import os
import sys
from ..backend import KeyringBackend
from ..compat import properties
from ..credentials import SimpleCredential
from ..errors import InitError, KeyringLocked, PasswordDeleteError, PasswordSetError
try:
import dbus
from dbus.mainloop.glib import DBusGMainLoop
except ImportError:
pass
except AttributeError:
# See https://github.com/jaraco/keyring/issues/296
pass
def _id_from_argv():
"""
Safely infer an app id from sys.argv.
"""
allowed = AttributeError, IndexError, TypeError
with contextlib.suppress(allowed):
return sys.argv[0]
class DBusKeyring(KeyringBackend):
"""
KDE KWallet 5 via D-Bus
"""
appid = _id_from_argv() or 'Python keyring library'
wallet = None
bus_name = 'org.kde.kwalletd5'
object_path = '/modules/kwalletd5'
@properties.classproperty
def priority(cls) -> float:
if 'dbus' not in globals():
raise RuntimeError('python-dbus not installed')
try:
bus = dbus.SessionBus(mainloop=DBusGMainLoop())
except dbus.DBusException as exc:
raise RuntimeError(exc.get_dbus_message()) from exc
if not (
bus.name_has_owner(cls.bus_name)
or cls.bus_name in bus.list_activatable_names()
):
raise RuntimeError(
"The KWallet daemon is neither running nor activatable through D-Bus"
)
if "KDE" in os.getenv("XDG_CURRENT_DESKTOP", "").split(":"):
return 5.1
return 4.9
def __init__(self, *arg, **kw):
super().__init__(*arg, **kw)
self.handle = -1
def _migrate(self, service):
old_folder = 'Python'
entry_list = []
if self.iface.hasFolder(self.handle, old_folder, self.appid):
entry_list = self.iface.readPasswordList(
self.handle, old_folder, '*@*', self.appid
)
for entry in entry_list.items():
key = entry[0]
password = entry[1]
username, service = key.rsplit('@', 1)
ret = self.iface.writePassword(
self.handle, service, username, password, self.appid
)
if ret == 0:
self.iface.removeEntry(self.handle, old_folder, key, self.appid)
entry_list = self.iface.readPasswordList(
self.handle, old_folder, '*', self.appid
)
if not entry_list:
self.iface.removeFolder(self.handle, old_folder, self.appid)
def connected(self, service):
if self.handle >= 0:
if self.iface.isOpen(self.handle):
return True
bus = dbus.SessionBus(mainloop=DBusGMainLoop())
wId = 0
try:
remote_obj = bus.get_object(self.bus_name, self.object_path)
self.iface = dbus.Interface(remote_obj, 'org.kde.KWallet')
self.handle = self.iface.open(self.iface.networkWallet(), wId, self.appid)
except dbus.DBusException as e:
raise InitError(f'Failed to open keyring: {e}.') from e
if self.handle < 0:
return False
self._migrate(service)
return True
def get_password(self, service, username):
"""Get password of the username for the service"""
if not self.connected(service):
# the user pressed "cancel" when prompted to unlock their keyring.
raise KeyringLocked("Failed to unlock the keyring!")
if not self.iface.hasEntry(self.handle, service, username, self.appid):
return None
password = self.iface.readPassword(self.handle, service, username, self.appid)
return str(password)
def get_credential(self, service, username):
"""Gets the first username and password for a service.
Returns a Credential instance
The username can be omitted, but if there is one, it will forward to
get_password.
Otherwise, it will return the first username and password combo that it finds.
"""
if username is not None:
return super().get_credential(service, username)
if not self.connected(service):
# the user pressed "cancel" when prompted to unlock their keyring.
raise KeyringLocked("Failed to unlock the keyring!")
for username in self.iface.entryList(self.handle, service, self.appid):
password = self.iface.readPassword(
self.handle, service, username, self.appid
)
return SimpleCredential(str(username), str(password))
def set_password(self, service, username, password):
"""Set password for the username of the service"""
if not self.connected(service):
# the user pressed "cancel" when prompted to unlock their keyring.
raise PasswordSetError("Cancelled by user")
self.iface.writePassword(self.handle, service, username, password, self.appid)
def delete_password(self, service, username):
"""Delete the password for the username of the service."""
if not self.connected(service):
# the user pressed "cancel" when prompted to unlock their keyring.
raise PasswordDeleteError("Cancelled by user")
if not self.iface.hasEntry(self.handle, service, username, self.appid):
raise PasswordDeleteError("Password not found")
self.iface.removeEntry(self.handle, service, username, self.appid)
class DBusKeyringKWallet4(DBusKeyring):
"""
KDE KWallet 4 via D-Bus
"""
bus_name = 'org.kde.kwalletd'
object_path = '/modules/kwalletd'
@properties.classproperty
def priority(cls):
return super().priority - 1

View File

@@ -0,0 +1,155 @@
import logging
from .. import backend
from ..backend import KeyringBackend
from ..compat import properties
from ..credentials import SimpleCredential
from ..errors import (
KeyringLocked,
PasswordDeleteError,
PasswordSetError,
)
available = False
try:
import gi
from gi.repository import Gio, GLib
gi.require_version('Secret', '1')
from gi.repository import Secret
available = True
except (AttributeError, ImportError, ValueError):
pass
log = logging.getLogger(__name__)
class Keyring(backend.SchemeSelectable, KeyringBackend):
"""libsecret Keyring"""
appid = 'Python keyring library'
@property
def schema(self):
return Secret.Schema.new(
"org.freedesktop.Secret.Generic",
Secret.SchemaFlags.NONE,
self._query(
Secret.SchemaAttributeType.STRING,
Secret.SchemaAttributeType.STRING,
application=Secret.SchemaAttributeType.STRING,
),
)
@properties.NonDataProperty
def collection(self):
return Secret.COLLECTION_DEFAULT
@properties.classproperty
def priority(cls) -> float:
if not available:
raise RuntimeError("libsecret required")
# Make sure there is actually a secret service running
try:
Secret.Service.get_sync(Secret.ServiceFlags.OPEN_SESSION, None)
except GLib.Error as error:
raise RuntimeError("Can't open a session to the secret service") from error
return 4.8
def get_password(self, service, username):
"""Get password of the username for the service"""
attributes = self._query(service, username, application=self.appid)
try:
items = Secret.password_search_sync(
self.schema, attributes, Secret.SearchFlags.UNLOCK, None
)
except GLib.Error as error:
quark = GLib.quark_try_string('g-io-error-quark')
if error.matches(quark, Gio.IOErrorEnum.FAILED):
raise KeyringLocked('Failed to unlock the item!') from error
raise
for item in items:
try:
return item.retrieve_secret_sync().get_text()
except GLib.Error as error:
quark = GLib.quark_try_string('secret-error')
if error.matches(quark, Secret.Error.IS_LOCKED):
raise KeyringLocked('Failed to unlock the item!') from error
raise
def set_password(self, service, username, password):
"""Set password for the username of the service"""
attributes = self._query(service, username, application=self.appid)
label = f"Password for '{username}' on '{service}'"
try:
stored = Secret.password_store_sync(
self.schema, attributes, self.collection, label, password, None
)
except GLib.Error as error:
quark = GLib.quark_try_string('secret-error')
if error.matches(quark, Secret.Error.IS_LOCKED):
raise KeyringLocked("Failed to unlock the collection!") from error
quark = GLib.quark_try_string('g-io-error-quark')
if error.matches(quark, Gio.IOErrorEnum.FAILED):
raise KeyringLocked("Failed to unlock the collection!") from error
raise
if not stored:
raise PasswordSetError("Failed to store password!")
def delete_password(self, service, username):
"""Delete the stored password (only the first one)"""
attributes = self._query(service, username, application=self.appid)
try:
items = Secret.password_search_sync(
self.schema, attributes, Secret.SearchFlags.UNLOCK, None
)
except GLib.Error as error:
quark = GLib.quark_try_string('g-io-error-quark')
if error.matches(quark, Gio.IOErrorEnum.FAILED):
raise KeyringLocked('Failed to unlock the item!') from error
raise
for item in items:
try:
removed = Secret.password_clear_sync(
self.schema, item.get_attributes(), None
)
except GLib.Error as error:
quark = GLib.quark_try_string('secret-error')
if error.matches(quark, Secret.Error.IS_LOCKED):
raise KeyringLocked('Failed to unlock the item!') from error
raise
return removed
raise PasswordDeleteError("No such password!")
def get_credential(self, service, username):
"""Get the first username and password for a service.
Return a Credential instance
The username can be omitted, but if there is one, it will use get_password
and return a SimpleCredential containing the username and password
Otherwise, it will return the first username and password combo that it finds.
"""
query = self._query(service, username)
try:
items = Secret.password_search_sync(
self.schema, query, Secret.SearchFlags.UNLOCK, None
)
except GLib.Error as error:
quark = GLib.quark_try_string('g-io-error-quark')
if error.matches(quark, Gio.IOErrorEnum.FAILED):
raise KeyringLocked('Failed to unlock the item!') from error
raise
for item in items:
username = item.get_attributes().get("username")
try:
return SimpleCredential(
username, item.retrieve_secret_sync().get_text()
)
except GLib.Error as error:
quark = GLib.quark_try_string('secret-error')
if error.matches(quark, Secret.Error.IS_LOCKED):
raise KeyringLocked('Failed to unlock the item!') from error
raise

View File

@@ -0,0 +1,85 @@
import functools
import os
import platform
import warnings
from ...backend import KeyringBackend
from ...compat import properties
from ...errors import KeyringError, KeyringLocked, PasswordDeleteError, PasswordSetError
try:
from . import api
except Exception:
pass
def warn_keychain(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
if self.keychain:
warnings.warn("Specified keychain is ignored. See #623", stacklevel=2)
return func(self, *args, **kwargs)
return wrapper
class Keyring(KeyringBackend):
"""macOS Keychain"""
keychain = os.environ.get('KEYCHAIN_PATH')
"Path to keychain file, overriding default"
@properties.classproperty
def priority(cls):
"""
Preferred for all macOS environments.
"""
if platform.system() != 'Darwin':
raise RuntimeError("macOS required")
if 'api' not in globals():
raise RuntimeError("Security API unavailable")
return 5
@warn_keychain
def set_password(self, service, username, password):
if username is None:
username = ''
try:
api.set_generic_password(self.keychain, service, username, password)
except api.KeychainDenied as e:
raise KeyringLocked(f"Can't store password on keychain: {e}") from e
except api.Error as e:
raise PasswordSetError(f"Can't store password on keychain: {e}") from e
@warn_keychain
def get_password(self, service, username):
if username is None:
username = ''
try:
return api.find_generic_password(self.keychain, service, username)
except api.NotFound:
pass
except api.KeychainDenied as e:
raise KeyringLocked(f"Can't get password from keychain: {e}") from e
except api.Error as e:
raise KeyringError(f"Can't get password from keychain: {e}") from e
@warn_keychain
def delete_password(self, service, username):
if username is None:
username = ''
try:
return api.delete_generic_password(self.keychain, service, username)
except api.Error as e:
raise PasswordDeleteError(f"Can't delete password in keychain: {e}") from e
def with_keychain(self, keychain):
warnings.warn(
"macOS.Keyring.with_keychain is deprecated. Use with_properties instead.",
DeprecationWarning,
stacklevel=2,
)
return self.with_properties(keychain=keychain)

View File

@@ -0,0 +1,184 @@
from __future__ import annotations
import contextlib
import ctypes
import functools
from ctypes import (
byref,
c_int32,
c_uint32,
c_void_p,
)
from ctypes.util import find_library
OS_status = c_int32
class error:
item_not_found = -25300
keychain_denied = -128
sec_auth_failed = -25293
plist_missing = -67030
sec_interaction_not_allowed = -25308
_sec = ctypes.CDLL(find_library('Security'))
_core = ctypes.CDLL(find_library('CoreServices'))
_found = ctypes.CDLL(find_library('Foundation'))
CFDictionaryCreate = _found.CFDictionaryCreate
CFDictionaryCreate.restype = c_void_p
CFDictionaryCreate.argtypes = (
c_void_p,
c_void_p,
c_void_p,
c_int32,
c_void_p,
c_void_p,
)
CFStringCreateWithCString = _found.CFStringCreateWithCString
CFStringCreateWithCString.restype = c_void_p
CFStringCreateWithCString.argtypes = [c_void_p, c_void_p, c_uint32]
CFNumberCreate = _found.CFNumberCreate
CFNumberCreate.restype = c_void_p
CFNumberCreate.argtypes = [c_void_p, c_uint32, ctypes.c_void_p]
SecItemAdd = _sec.SecItemAdd
SecItemAdd.restype = OS_status
SecItemAdd.argtypes = (c_void_p, c_void_p)
SecItemCopyMatching = _sec.SecItemCopyMatching
SecItemCopyMatching.restype = OS_status
SecItemCopyMatching.argtypes = (c_void_p, c_void_p)
SecItemDelete = _sec.SecItemDelete
SecItemDelete.restype = OS_status
SecItemDelete.argtypes = (c_void_p,)
CFDataGetBytePtr = _found.CFDataGetBytePtr
CFDataGetBytePtr.restype = c_void_p
CFDataGetBytePtr.argtypes = (c_void_p,)
CFDataGetLength = _found.CFDataGetLength
CFDataGetLength.restype = c_int32
CFDataGetLength.argtypes = (c_void_p,)
def k_(s):
return c_void_p.in_dll(_sec, s)
@functools.singledispatch
def create_cf(ob):
return ob
# explicit bool and int required for Python 3.10 compatibility
@create_cf.register(bool)
@create_cf.register(int)
def _(val: bool | int):
if val.bit_length() > 31:
raise OverflowError(val)
int32 = 0x9
return CFNumberCreate(None, int32, ctypes.byref(c_int32(val)))
@create_cf.register
def _(s: str):
kCFStringEncodingUTF8 = 0x08000100
return CFStringCreateWithCString(None, s.encode('utf8'), kCFStringEncodingUTF8)
def create_query(**kwargs):
return CFDictionaryCreate(
None,
(c_void_p * len(kwargs))(*map(k_, kwargs.keys())),
(c_void_p * len(kwargs))(*map(create_cf, kwargs.values())),
len(kwargs),
_found.kCFTypeDictionaryKeyCallBacks,
_found.kCFTypeDictionaryValueCallBacks,
)
def cfstr_to_str(data):
return ctypes.string_at(CFDataGetBytePtr(data), CFDataGetLength(data)).decode(
'utf-8'
)
class Error(Exception):
@classmethod
def raise_for_status(cls, status):
if status == 0:
return
if status == error.item_not_found:
raise NotFound(status, "Item not found")
if status == error.keychain_denied:
raise KeychainDenied(status, "Keychain Access Denied")
if status == error.sec_auth_failed or status == error.plist_missing:
raise SecAuthFailure(
status,
"Security Auth Failure: make sure "
"executable is signed with codesign util",
)
raise cls(status, "Unknown Error")
class NotFound(Error):
pass
class KeychainDenied(Error):
pass
class SecAuthFailure(Error):
pass
def find_generic_password(kc_name, service, username, not_found_ok=False):
q = create_query(
kSecClass=k_('kSecClassGenericPassword'),
kSecMatchLimit=k_('kSecMatchLimitOne'),
kSecAttrService=service,
kSecAttrAccount=username,
kSecReturnData=True,
)
data = c_void_p()
status = SecItemCopyMatching(q, byref(data))
if status == error.item_not_found and not_found_ok:
return
Error.raise_for_status(status)
return cfstr_to_str(data)
def set_generic_password(name, service, username, password):
with contextlib.suppress(NotFound):
delete_generic_password(name, service, username)
q = create_query(
kSecClass=k_('kSecClassGenericPassword'),
kSecAttrService=service,
kSecAttrAccount=username,
kSecValueData=password,
)
status = SecItemAdd(q, None)
Error.raise_for_status(status)
def delete_generic_password(name, service, username):
q = create_query(
kSecClass=k_('kSecClassGenericPassword'),
kSecAttrService=service,
kSecAttrAccount=username,
)
status = SecItemDelete(q)
Error.raise_for_status(status)

View File

@@ -0,0 +1,20 @@
from ..backend import KeyringBackend
from ..compat import properties
class Keyring(KeyringBackend):
"""
Keyring that return None on every operation.
>>> kr = Keyring()
>>> kr.get_password('svc', 'user')
"""
@properties.classproperty
def priority(cls) -> float:
return -1
def get_password(self, service, username, password=None):
pass
set_password = delete_password = get_password

View File

@@ -0,0 +1,220 @@
"""Simple command line interface to get/set password from a keyring"""
from __future__ import annotations
import argparse
import getpass
import json
import sys
from . import (
backend,
completion,
core,
credentials,
delete_password,
get_credential,
get_password,
set_keyring,
set_password,
)
from .util import platform_
class CommandLineTool:
# Attributes set dynamically by the ArgumentParser
keyring_path: str | None
keyring_backend: str | None
get_mode: str
output_format: str
operation: str
service: str
username: str
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument(
"-p",
"--keyring-path",
dest="keyring_path",
default=None,
help="Path to the keyring backend",
)
self.parser.add_argument(
"-b",
"--keyring-backend",
dest="keyring_backend",
default=None,
help="Name of the keyring backend",
)
self.parser.add_argument(
"--list-backends",
action="store_true",
help="List keyring backends and exit",
)
self.parser.add_argument(
"--disable", action="store_true", help="Disable keyring and exit"
)
self.parser._get_modes = ["password", "creds"]
self.parser.add_argument(
"--mode",
choices=self.parser._get_modes,
dest="get_mode",
default="password",
help="""
Mode for 'get' operation.
'password' requires a username and will return only the password.
'creds' does not require a username and will return both the username and password separated by a newline.
Default is 'password'
""",
)
self.parser._output_formats = ["plain", "json"]
self.parser.add_argument(
"--output",
choices=self.parser._output_formats,
dest="output_format",
default="plain",
help="""
Output format for 'get' operation.
Default is 'plain'
""",
)
self.parser._operations = ["get", "set", "del", "diagnose"]
self.parser.add_argument(
'operation',
choices=self.parser._operations,
nargs="?",
)
self.parser.add_argument(
'service',
nargs="?",
)
self.parser.add_argument(
'username',
nargs="?",
)
completion.install(self.parser)
def run(self, argv):
args = self.parser.parse_args(argv)
vars(self).update(vars(args))
if args.list_backends:
for k in backend.get_all_keyring():
print(k)
return
if args.disable:
core.disable()
return
if args.operation == 'diagnose':
self.diagnose()
return
self._check_args()
self._load_spec_backend()
method = getattr(self, f'do_{self.operation}', self.invalid_op)
return method()
def _check_args(self):
needs_username = self.operation != 'get' or self.get_mode != 'creds'
required = (['service'] + ['username'] * needs_username) * bool(self.operation)
if any(getattr(self, param) is None for param in required):
self.parser.error(f"{self.operation} requires {' and '.join(required)}")
def do_get(self):
credential = getattr(self, f'_get_{self.get_mode}')()
if credential is None:
raise SystemExit(1)
getattr(self, f'_emit_{self.output_format}')(credential)
def _emit_json(self, credential: credentials.Credential):
print(json.dumps(credential._vars()))
def _emit_plain(self, credential: credentials.Credential):
for val in credential._vars().values():
print(val)
def _get_creds(self) -> credentials.Credential | None:
return get_credential(self.service, self.username)
def _get_password(self) -> credentials.Credential | None:
password = get_password(self.service, self.username)
return (
credentials.AnonymousCredential(password) if password is not None else None
)
def do_set(self):
password = self.input_password(
f"Password for '{self.username}' in '{self.service}': "
)
set_password(self.service, self.username, password)
def do_del(self):
delete_password(self.service, self.username)
def diagnose(self):
config_root = core._config_path()
if config_root.exists():
print("config path:", config_root)
else:
print("config path:", config_root, "(absent)")
print("data root:", platform_.data_root())
def invalid_op(self):
self.parser.error(f"Specify operation ({', '.join(self.parser._operations)}).")
def _load_spec_backend(self):
if self.keyring_backend is None:
return
try:
if self.keyring_path:
sys.path.insert(0, self.keyring_path)
set_keyring(core.load_keyring(self.keyring_backend))
except Exception as exc:
# Tons of things can go wrong here:
# ImportError when using "fjkljfljkl"
# AttributeError when using "os.path.bar"
# TypeError when using "__builtins__.str"
# So, we play on the safe side, and catch everything.
self.parser.error(f"Unable to load specified keyring: {exc}")
def input_password(self, prompt):
"""Retrieve password from input."""
return self.pass_from_pipe() or getpass.getpass(prompt)
@classmethod
def pass_from_pipe(cls):
"""Return password from pipe if not on TTY, else False."""
is_pipe = not sys.stdin.isatty()
return is_pipe and cls.strip_last_newline(sys.stdin.read())
@staticmethod
def strip_last_newline(str):
r"""Strip one last newline, if present.
>>> CommandLineTool.strip_last_newline('foo')
'foo'
>>> CommandLineTool.strip_last_newline('foo\n')
'foo'
"""
slc = slice(-1 if str.endswith('\n') else None)
return str[slc]
def main(argv=None):
"""Main command line interface."""
if argv is None:
argv = sys.argv[1:]
cli = CommandLineTool()
return cli.run(argv)
if __name__ == '__main__':
sys.exit(main())

View File

@@ -0,0 +1,7 @@
__all__ = ['properties']
try:
from jaraco.classes import properties
except ImportError: # pragma: no cover
from . import properties # type: ignore[no-redef]

View File

@@ -0,0 +1,169 @@
# from jaraco.classes 3.2.2
class NonDataProperty:
"""Much like the property builtin, but only implements __get__,
making it a non-data property, and can be subsequently reset.
See http://users.rcn.com/python/download/Descriptor.htm for more
information.
>>> class X(object):
... @NonDataProperty
... def foo(self):
... return 3
>>> x = X()
>>> x.foo
3
>>> x.foo = 4
>>> x.foo
4
"""
def __init__(self, fget):
assert fget is not None, "fget cannot be none"
assert callable(fget), "fget must be callable"
self.fget = fget
def __get__(self, obj, objtype=None):
if obj is None:
return self
return self.fget(obj)
class classproperty:
"""
Like @property but applies at the class level.
>>> class X(metaclass=classproperty.Meta):
... val = None
... @classproperty
... def foo(cls):
... return cls.val
... @foo.setter
... def foo(cls, val):
... cls.val = val
>>> X.foo
>>> X.foo = 3
>>> X.foo
3
>>> x = X()
>>> x.foo
3
>>> X.foo = 4
>>> x.foo
4
Setting the property on an instance affects the class.
>>> x.foo = 5
>>> x.foo
5
>>> X.foo
5
>>> vars(x)
{}
>>> X().foo
5
Attempting to set an attribute where no setter was defined
results in an AttributeError:
>>> class GetOnly(metaclass=classproperty.Meta):
... @classproperty
... def foo(cls):
... return 'bar'
>>> GetOnly.foo = 3
Traceback (most recent call last):
...
AttributeError: can't set attribute
It is also possible to wrap a classmethod or staticmethod in
a classproperty.
>>> class Static(metaclass=classproperty.Meta):
... @classproperty
... @classmethod
... def foo(cls):
... return 'foo'
... @classproperty
... @staticmethod
... def bar():
... return 'bar'
>>> Static.foo
'foo'
>>> Static.bar
'bar'
*Legacy*
For compatibility, if the metaclass isn't specified, the
legacy behavior will be invoked.
>>> class X:
... val = None
... @classproperty
... def foo(cls):
... return cls.val
... @foo.setter
... def foo(cls, val):
... cls.val = val
>>> X.foo
>>> X.foo = 3
>>> X.foo
3
>>> x = X()
>>> x.foo
3
>>> X.foo = 4
>>> x.foo
4
Note, because the metaclass was not specified, setting
a value on an instance does not have the intended effect.
>>> x.foo = 5
>>> x.foo
5
>>> X.foo # should be 5
4
>>> vars(x) # should be empty
{'foo': 5}
>>> X().foo # should be 5
4
"""
class Meta(type):
def __setattr__(self, key, value):
obj = self.__dict__.get(key, None)
if type(obj) is classproperty:
return obj.__set__(self, value)
return super().__setattr__(key, value)
def __init__(self, fget, fset=None):
self.fget = self._ensure_method(fget)
self.fset = fset
fset and self.setter(fset)
def __get__(self, instance, owner=None):
return self.fget.__get__(None, owner)()
def __set__(self, owner, value):
if not self.fset:
raise AttributeError("can't set attribute")
if type(owner) is not classproperty.Meta:
owner = type(owner)
return self.fset.__get__(None, owner)(value)
def setter(self, fset):
self.fset = self._ensure_method(fset)
return self
@classmethod
def _ensure_method(cls, fn):
"""
Ensure fn is a classmethod or staticmethod.
"""
needs_method = not isinstance(fn, (classmethod, staticmethod))
return classmethod(fn) if needs_method else fn

View File

@@ -0,0 +1,9 @@
import sys
__all__ = ['metadata']
if sys.version_info >= (3, 12):
import importlib.metadata as metadata
else:
import importlib_metadata as metadata

View File

@@ -0,0 +1,9 @@
import sys
__all__ = ['files']
if sys.version_info < (3, 9):
from importlib_resources import files
else:
from importlib.resources import files

View File

@@ -0,0 +1,56 @@
import argparse
import sys
try:
import shtab
except ImportError:
pass
from .compat.py38 import files
class _MissingCompletionAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string):
print("Install keyring[completion] for completion support.", file=sys.stderr)
parser.exit(1)
def add_completion_notice(parser):
"""Add completion argument to parser."""
parser.add_argument(
"--print-completion",
choices=["bash", "zsh", "tcsh"],
action=_MissingCompletionAction,
help="print shell completion script",
)
return parser
def get_action(parser, option):
(match,) = (action for action in parser._actions if option in action.option_strings)
return match
def install_completion(parser):
preamble = dict(
bash=files(__package__)
.joinpath('backend_complete.bash')
.read_text(encoding='utf-8'),
zsh=files(__package__)
.joinpath('backend_complete.zsh')
.read_text(encoding='utf-8'),
)
shtab.add_argument_to(parser, preamble=preamble)
get_action(parser, '--keyring-path').complete = shtab.DIR
get_action(parser, '--keyring-backend').complete = dict(
bash='_keyring_backends',
zsh='backend_complete',
)
return parser
def install(parser):
try:
install_completion(parser)
except NameError:
add_completion_notice(parser)

View File

@@ -0,0 +1,200 @@
"""
Core API functions and initialization routines.
"""
import configparser
import logging
import os
import sys
import typing
from . import backend, credentials
from .backends import fail
from .util import platform_ as platform
LimitCallable = typing.Callable[[backend.KeyringBackend], bool]
log = logging.getLogger(__name__)
_keyring_backend = None
def set_keyring(keyring: backend.KeyringBackend) -> None:
"""Set current keyring backend."""
global _keyring_backend
if not isinstance(keyring, backend.KeyringBackend):
raise TypeError("The keyring must be an instance of KeyringBackend")
_keyring_backend = keyring
def get_keyring() -> backend.KeyringBackend:
"""Get current keyring backend."""
if _keyring_backend is None:
init_backend()
return typing.cast(backend.KeyringBackend, _keyring_backend)
def disable() -> None:
"""
Configure the null keyring as the default.
>>> fs = getfixture('fs')
>>> disable()
>>> disable()
Traceback (most recent call last):
...
RuntimeError: Refusing to overwrite...
"""
root = platform.config_root()
try:
os.makedirs(root)
except OSError:
pass
filename = os.path.join(root, 'keyringrc.cfg')
if os.path.exists(filename):
msg = f"Refusing to overwrite {filename}"
raise RuntimeError(msg)
with open(filename, 'w', encoding='utf-8') as file:
file.write('[backend]\ndefault-keyring=keyring.backends.null.Keyring')
def get_password(service_name: str, username: str) -> typing.Optional[str]:
"""Get password from the specified service."""
return get_keyring().get_password(service_name, username)
def set_password(service_name: str, username: str, password: str) -> None:
"""Set password for the user in the specified service."""
get_keyring().set_password(service_name, username, password)
def delete_password(service_name: str, username: str) -> None:
"""Delete the password for the user in the specified service."""
get_keyring().delete_password(service_name, username)
def get_credential(
service_name: str, username: typing.Optional[str]
) -> typing.Optional[credentials.Credential]:
"""Get a Credential for the specified service."""
return get_keyring().get_credential(service_name, username)
def recommended(backend) -> bool:
return backend.priority >= 1
def init_backend(limit: typing.Optional[LimitCallable] = None):
"""
Load a detected backend.
"""
set_keyring(_detect_backend(limit))
def _detect_backend(limit: typing.Optional[LimitCallable] = None):
"""
Return a keyring specified in the config file or infer the best available.
Limit, if supplied, should be a callable taking a backend and returning
True if that backend should be included for consideration.
"""
# save the limit for the chainer to honor
backend._limit = limit
return (
load_env()
or load_config()
or max(
# all keyrings passing the limit filter
filter(limit, backend.get_all_keyring()), # type: ignore[arg-type] #659
default=fail.Keyring(),
key=backend.by_priority,
)
)
def _load_keyring_class(keyring_name: str) -> typing.Type[backend.KeyringBackend]:
"""
Load the keyring class indicated by name.
These popular names are tested to ensure their presence.
>>> popular_names = [
... 'keyring.backends.Windows.WinVaultKeyring',
... 'keyring.backends.macOS.Keyring',
... 'keyring.backends.kwallet.DBusKeyring',
... 'keyring.backends.SecretService.Keyring',
... ]
>>> list(map(_load_keyring_class, popular_names))
[...]
"""
module_name, sep, class_name = keyring_name.rpartition('.')
__import__(module_name)
module = sys.modules[module_name]
return getattr(module, class_name)
def load_keyring(keyring_name: str) -> backend.KeyringBackend:
"""
Load the specified keyring by name (a fully-qualified name to the
keyring, such as 'keyring.backends.file.PlaintextKeyring')
"""
class_ = _load_keyring_class(keyring_name)
# invoke the priority to ensure it is viable, or raise a RuntimeError
class_.priority # noqa: B018
return class_()
def load_env() -> typing.Optional[backend.KeyringBackend]:
"""Load a keyring configured in the environment variable."""
try:
return load_keyring(os.environ['PYTHON_KEYRING_BACKEND'])
except KeyError:
return None
def _config_path():
return platform.config_root() / 'keyringrc.cfg'
def _ensure_path(path):
if not path.exists():
raise FileNotFoundError(path)
return path
def load_config() -> typing.Optional[backend.KeyringBackend]:
"""Load a keyring using the config file in the config root."""
config = configparser.RawConfigParser()
try:
config.read(_ensure_path(_config_path()), encoding='utf-8')
except FileNotFoundError:
return None
_load_keyring_path(config)
# load the keyring class name, and then load this keyring
try:
if config.has_section("backend"):
keyring_name = config.get("backend", "default-keyring").strip()
else:
raise configparser.NoOptionError('backend', 'default-keyring')
except (configparser.NoOptionError, ImportError):
logger = logging.getLogger('keyring')
logger.warning(
"Keyring config file contains incorrect values.\n"
+ f"Config file: {_config_path()}"
)
return None
return load_keyring(keyring_name)
def _load_keyring_path(config: configparser.RawConfigParser) -> None:
"load the keyring-path option (if present)"
try:
path = config.get("backend", "keyring-path").strip()
sys.path.insert(0, os.path.expanduser(path))
except (configparser.NoOptionError, configparser.NoSectionError):
pass

View File

@@ -0,0 +1,85 @@
from __future__ import annotations
import abc
import os
class Credential(metaclass=abc.ABCMeta):
"""Abstract class to manage credentials"""
@abc.abstractproperty
def username(self) -> str: ...
@abc.abstractproperty
def password(self) -> str: ...
def _vars(self) -> dict[str, str]:
return dict(username=self.username, password=self.password)
class SimpleCredential(Credential):
"""Simple credentials implementation"""
def __init__(self, username: str, password: str):
self._username = username
self._password = password
@property
def username(self) -> str:
return self._username
@property
def password(self) -> str:
return self._password
class AnonymousCredential(SimpleCredential):
def __init__(self, password: str):
self._password = password
@property
def username(self) -> str:
raise ValueError("Anonymous credential has no username")
def _vars(self) -> dict[str, str]:
return dict(password=self.password)
class EnvironCredential(Credential):
"""
Source credentials from environment variables.
Actual sourcing is deferred until requested.
Supports comparison by equality.
>>> e1 = EnvironCredential('a', 'b')
>>> e2 = EnvironCredential('a', 'b')
>>> e3 = EnvironCredential('a', 'c')
>>> e1 == e2
True
>>> e2 == e3
False
"""
def __init__(self, user_env_var: str, pwd_env_var: str):
self.user_env_var = user_env_var
self.pwd_env_var = pwd_env_var
def __eq__(self, other: object) -> bool:
return vars(self) == vars(other)
def _get_env(self, env_var: str) -> str:
"""Helper to read an environment variable"""
value = os.environ.get(env_var)
if not value:
raise ValueError(f'Missing environment variable:{env_var}')
return value
@property
def username(self) -> str:
return self._get_env(self.user_env_var)
@property
def password(self) -> str:
return self._get_env(self.pwd_env_var)

View File

@@ -0,0 +1,29 @@
import functools
import pluggy
from jaraco.context import suppress
import keyring.errors
hookimpl = pluggy.HookimplMarker("devpiclient")
def restore_signature(func):
# workaround for pytest-dev/pluggy#358
@functools.wraps(func)
def wrapper(url, username):
return func(url, username)
return wrapper
@hookimpl()
@restore_signature
@suppress(keyring.errors.KeyringError)
def devpiclient_get_password(url, username):
"""
>>> pluggy._hooks.varnames(devpiclient_get_password)
(('url', 'username'), ())
>>>
"""
return keyring.get_password(url, username)

View File

@@ -0,0 +1,67 @@
import sys
import warnings
class KeyringError(Exception):
"""Base class for exceptions in keyring"""
class PasswordSetError(KeyringError):
"""Raised when the password can't be set."""
class PasswordDeleteError(KeyringError):
"""Raised when the password can't be deleted."""
class InitError(KeyringError):
"""Raised when the keyring could not be initialised"""
class KeyringLocked(KeyringError):
"""Raised when the keyring failed unlocking"""
class NoKeyringError(KeyringError, RuntimeError):
"""Raised when there is no keyring backend"""
class ExceptionRaisedContext:
"""
An exception-trapping context that indicates whether an exception was
raised.
"""
def __init__(self, ExpectedException=Exception):
warnings.warn(
"ExceptionRaisedContext is deprecated; use `jaraco.context.ExceptionTrap`",
DeprecationWarning,
stacklevel=2,
)
self.ExpectedException = ExpectedException
self.exc_info = None
def __enter__(self):
self.exc_info = object.__new__(ExceptionInfo)
return self.exc_info
def __exit__(self, *exc_info):
self.exc_info.__init__(*exc_info)
return self.exc_info.type and issubclass(
self.exc_info.type, self.ExpectedException
)
class ExceptionInfo:
def __init__(self, *info):
if not info:
info = sys.exc_info()
self.type, self.value, _ = info
def __bool__(self):
"""
Return True if an exception occurred
"""
return bool(self.type)
__nonzero__ = __bool__

View File

@@ -0,0 +1,39 @@
"""
urllib2.HTTPPasswordMgr object using the keyring, for use with the
urllib2.HTTPBasicAuthHandler.
usage:
import urllib2
handlers = [urllib2.HTTPBasicAuthHandler(PasswordMgr())]
urllib2.install_opener(handlers)
urllib2.urlopen(...)
This will prompt for a password if one is required and isn't already
in the keyring. Then, it adds it to the keyring for subsequent use.
"""
import getpass
from . import delete_password, get_password, set_password
class PasswordMgr:
def get_username(self, realm, authuri):
return getpass.getuser()
def add_password(self, realm, authuri, password):
user = self.get_username(realm, authuri)
set_password(realm, user, password)
def find_user_password(self, realm, authuri):
user = self.get_username(realm, authuri)
password = get_password(realm, user)
if password is None:
prompt = f'password for {user}@{realm} for {authuri}: '
password = getpass.getpass(prompt)
set_password(realm, user, password)
return user, password
def clear_password(self, realm, authuri):
user = self.get_username(realm, authuri)
delete_password(realm, user)

View File

View File

@@ -0,0 +1,200 @@
"""
Common test functionality for backends.
"""
import os
import string
import pytest
from keyring import errors
from .util import random_string
# unicode only characters
# Sourced from The Quick Brown Fox... Pangrams
# http://www.columbia.edu/~fdc/utf8/
UNICODE_CHARS = (
"זהכיףסתםלשמועאיךתנצחקרפדעץטובבגן"
"ξεσκεπάζωτηνψυχοφθόραβδελυγμία"
"Съешьжеещёэтихмягкихфранцузскихбулокдавыпейчаю"
"Жълтатадюлябешещастливачепухъткойтоцъфназамръзнакатогьон"
)
# ensure no-ascii chars slip by - watch your editor!
assert min(ord(char) for char in UNICODE_CHARS) > 127
def is_ascii_printable(s):
return all(32 <= ord(c) < 127 for c in s)
class BackendBasicTests:
"""Test for the keyring's basic functions. password_set and password_get"""
DIFFICULT_CHARS = string.whitespace + string.punctuation
@pytest.fixture(autouse=True)
def _init_properties(self, request):
self.keyring = self.init_keyring()
self.credentials_created = set()
request.addfinalizer(self.cleanup)
def cleanup(self):
for item in self.credentials_created:
self.keyring.delete_password(*item)
def set_password(self, service, username, password):
# set the password and save the result so the test runner can clean
# up after if necessary.
self.keyring.set_password(service, username, password)
self.credentials_created.add((service, username))
def check_set_get(self, service, username, password):
keyring = self.keyring
# for the non-existent password
assert keyring.get_password(service, username) is None
# common usage
self.set_password(service, username, password)
assert keyring.get_password(service, username) == password
# for the empty password
self.set_password(service, username, "")
assert keyring.get_password(service, username) == ""
def test_password_set_get(self):
password = random_string(20)
username = random_string(20)
service = random_string(20)
self.check_set_get(service, username, password)
def test_set_after_set_blank(self):
service = random_string(20)
username = random_string(20)
self.keyring.set_password(service, username, "")
self.keyring.set_password(service, username, "non-blank")
def test_difficult_chars(self):
password = random_string(20, self.DIFFICULT_CHARS)
username = random_string(20, self.DIFFICULT_CHARS)
service = random_string(20, self.DIFFICULT_CHARS)
self.check_set_get(service, username, password)
def test_delete_present(self):
password = random_string(20, self.DIFFICULT_CHARS)
username = random_string(20, self.DIFFICULT_CHARS)
service = random_string(20, self.DIFFICULT_CHARS)
self.keyring.set_password(service, username, password)
self.keyring.delete_password(service, username)
assert self.keyring.get_password(service, username) is None
def test_delete_not_present(self):
username = random_string(20, self.DIFFICULT_CHARS)
service = random_string(20, self.DIFFICULT_CHARS)
with pytest.raises(errors.PasswordDeleteError):
self.keyring.delete_password(service, username)
def test_delete_one_in_group(self):
username1 = random_string(20, self.DIFFICULT_CHARS)
username2 = random_string(20, self.DIFFICULT_CHARS)
password = random_string(20, self.DIFFICULT_CHARS)
service = random_string(20, self.DIFFICULT_CHARS)
self.keyring.set_password(service, username1, password)
self.set_password(service, username2, password)
self.keyring.delete_password(service, username1)
assert self.keyring.get_password(service, username2) == password
def test_name_property(self):
assert is_ascii_printable(self.keyring.name)
def test_unicode_chars(self):
password = random_string(20, UNICODE_CHARS)
username = random_string(20, UNICODE_CHARS)
service = random_string(20, UNICODE_CHARS)
self.check_set_get(service, username, password)
def test_unicode_and_ascii_chars(self):
source = (
random_string(10, UNICODE_CHARS)
+ random_string(10)
+ random_string(10, self.DIFFICULT_CHARS)
)
password = random_string(20, source)
username = random_string(20, source)
service = random_string(20, source)
self.check_set_get(service, username, password)
def test_different_user(self):
"""
Issue #47 reports that WinVault isn't storing passwords for
multiple users. This test exercises that test for each of the
backends.
"""
keyring = self.keyring
self.set_password('service1', 'user1', 'password1')
self.set_password('service1', 'user2', 'password2')
assert keyring.get_password('service1', 'user1') == 'password1'
assert keyring.get_password('service1', 'user2') == 'password2'
self.set_password('service2', 'user3', 'password3')
assert keyring.get_password('service1', 'user1') == 'password1'
def test_credential(self):
keyring = self.keyring
cred = keyring.get_credential('service', None)
assert cred is None
self.set_password('service1', 'user1', 'password1')
self.set_password('service1', 'user2', 'password2')
cred = keyring.get_credential('service1', None)
assert cred is None or (cred.username, cred.password) in (
('user1', 'password1'),
('user2', 'password2'),
)
cred = keyring.get_credential('service1', 'user2')
assert cred is not None
assert (cred.username, cred.password) in (
('user1', 'password1'),
('user2', 'password2'),
)
@pytest.mark.xfail("platform.system() == 'Windows'", reason="#668")
def test_empty_username(self):
with pytest.deprecated_call():
self.set_password('service1', '', 'password1')
assert self.keyring.get_password('service1', '') == 'password1'
def test_set_properties(self, monkeypatch):
env = dict(KEYRING_PROPERTY_FOO_BAR='fizz buzz', OTHER_SETTING='ignore me')
monkeypatch.setattr(os, 'environ', env)
self.keyring.set_properties_from_env()
assert self.keyring.foo_bar == 'fizz buzz'
def test_new_with_properties(self):
alt = self.keyring.with_properties(foo='bar')
assert alt is not self.keyring
assert alt.foo == 'bar'
with pytest.raises(AttributeError):
self.keyring.foo # noqa: B018
def test_wrong_username_returns_none(self):
keyring = self.keyring
service = 'test_wrong_username_returns_none'
cred = keyring.get_credential(service, None)
assert cred is None
password_1 = 'password1'
password_2 = 'password2'
self.set_password(service, 'user1', password_1)
self.set_password(service, 'user2', password_2)
assert keyring.get_credential(service, "user1").password == password_1
assert keyring.get_credential(service, "user2").password == password_2
# Missing/wrong username should not return a cred
assert keyring.get_credential(service, "nobody!") is None

View File

@@ -0,0 +1,68 @@
import contextlib
import os
import random
import string
import sys
class ImportKiller:
"Context manager to make an import of a given name or names fail."
def __init__(self, *names):
self.names = names
def find_module(self, fullname, path=None):
if fullname in self.names:
return self
def load_module(self, fullname):
assert fullname in self.names
raise ImportError(fullname)
def __enter__(self):
self.original = {}
for name in self.names:
self.original[name] = sys.modules.pop(name, None)
sys.meta_path.insert(0, self)
def __exit__(self, *args):
sys.meta_path.remove(self)
for key, value in self.original.items():
if value is not None:
sys.modules[key] = value
@contextlib.contextmanager
def NoNoneDictMutator(destination, **changes):
"""Helper context manager to make and unmake changes to a dict.
A None is not a valid value for the destination, and so means that the
associated name should be removed."""
original = {}
for key, value in changes.items():
original[key] = destination.get(key)
if value is None:
if key in destination:
del destination[key]
else:
destination[key] = value
yield
for key, value in original.items():
if value is None:
if key in destination:
del destination[key]
else:
destination[key] = value
def Environ(**changes):
"""A context manager to temporarily change the os.environ"""
return NoNoneDictMutator(os.environ, **changes)
ALPHABET = string.ascii_letters + string.digits
def random_string(k, source=ALPHABET):
"""Generate a random string with length <i>k</i>"""
return ''.join(random.choice(source) for _unused in range(k))

View File

@@ -0,0 +1,11 @@
import contextlib
def suppress_exceptions(callables, exceptions=Exception):
"""
yield the results of calling each element of callables, suppressing
any indicated exceptions.
"""
for callable in callables:
with contextlib.suppress(exceptions):
yield callable()

View File

@@ -0,0 +1,40 @@
import os
import pathlib
import platform
def _data_root_Windows():
release, version, csd, ptype = platform.win32_ver()
root = pathlib.Path(
os.environ.get('LOCALAPPDATA', os.environ.get('ProgramData', '.'))
)
return root / 'Python Keyring'
def _data_root_Linux():
"""
Use freedesktop.org Base Dir Specification to determine storage
location.
"""
fallback = pathlib.Path.home() / '.local/share'
root = os.environ.get('XDG_DATA_HOME', None) or fallback
return pathlib.Path(root, 'python_keyring')
_config_root_Windows = _data_root_Windows
def _config_root_Linux():
"""
Use freedesktop.org Base Dir Specification to determine config
location.
"""
fallback = pathlib.Path.home() / '.config'
key = 'XDG_CONFIG_HOME'
root = os.environ.get(key, None) or fallback
return pathlib.Path(root, 'python_keyring')
# by default, use Unix convention
data_root = globals().get('_data_root_' + platform.system(), _data_root_Linux)
config_root = globals().get('_config_root_' + platform.system(), _config_root_Linux)