"""Represents a credentials cache that is backed by the file system. This cache
is the underlying registration and verification engine for credentials.
Importantly, while the credtials may generate a token for a user, that token is
NEVER stored. Instead, the hash of the token is stored. Verification compares
the hash of token provided by the user to the stored hashed token.
"""
import os
import json
import random
import hashlib
from collections import namedtuple
from fixie.environ import expand_and_make_dir
from fixie.environ import ENV
User = namedtuple('User', ['user', 'email', 'hashed_token'])
[docs]class Cache:
"""A cache for fixie credentials."""
__inst = None
def __new__(cls):
# make the cache a singleton
if Cache.__inst is None:
Cache.__inst = object.__new__(cls)
return Cache.__inst
def __init__(self, credsdir=None, seed=None, nbytes=20):
"""
Parameters
----------
credsdir : str or None, optional
Path to credentials directory, if None, defaults to $FIXIE_CREDS_DIR.
seed : int, bytes, or None, optional
Value to seed to Pythons RNG. This is provided for testing purpose,
For production, this should be set to None so that reseeding is
done based on system time and other parameters.
nbytes : int, optional
Number of bits to use when generating for tokens.
"""
random.seed(seed)
self.nbytes = nbytes
self.nbits = nbytes * 8
self._credsdir = None
self._dirty = True
self._users = {} # maps usernames to named tuples of
self.credsdir = credsdir
@property
def credsdir(self):
value = self._credsdir
if value is None:
value = ENV['FIXIE_CREDS_DIR']
return value
@credsdir.setter
def credsdir(self, value):
self._dirty = True
self._users.clear()
if value is None:
self._credsdir = value
else:
self._credsdir = expand_and_make_dir(value)
[docs] def user_cred_file(self, user):
"""Returns the credential filename for a user."""
return os.path.join(self.credsdir, user + '.json')
[docs] def user_exists(self, user):
"""Returns whether or not a user exists (ie has been registered)."""
if user in self._users:
return True
return os.path.isfile(self.user_cred_file(user))
[docs] def hash_token(self, token):
"""Hashes a token."""
i = int(token, 16)
b = i.to_bytes(self.nbytes, 'little')
h = hashlib.sha256(b)
return h.hexdigest()
[docs] def write_user(self, user, email, hashed_token):
"""Writes a user's credential file."""
data = {'user': user, 'email': email, 'hashed_token': hashed_token}
fname = self.user_cred_file(user)
with open(fname, 'w') as f:
json.dump(data, f, sort_keys=True, separators=(',', ':'))
os.chmod(fname, 0o600)
self._users[user] = User(**data)
[docs] def load_user(self, user):
"""Loads a user into the cache from the filesystem."""
fname = self.user_cred_file(user)
with open(fname, 'r') as f:
data = json.load(f)
u = self._users[data['user']] = User(**data)
return u
[docs] def get_user(self, user):
"""Returns the user, loading it from the file system if needed."""
u = self._users.get(user, None)
if u is None:
u = self.load_user(user)
return u
[docs] def remove_user(self, user):
"""Remove the user from the cache and the file system."""
self._users.pop(user)
fname = self.user_cred_file(user)
if os.path.isfile(fname):
os.remove(fname)
[docs] def register(self, user, email):
"""Registers a new user and provides their token.
Parameters
----------
user : str
Name of the user to register.
email : str
Email address for the user.
Returns
-------
token or message : str
The token if the registration was successful, and an error message
if it wasn't.
flag : bool
Whether or not the registration was successful.
"""
if self.user_exists(user):
return 'User {0!r} already registered'.format(user), False
token = hex(random.getrandbits(self.nbits))[2:]
hashed_token = self.hash_token(token)
self.write_user(user, email, hashed_token)
return token, True
[docs] def verify(self, user, token):
"""Verifies whether or not the user-token pair match.
Parameters
----------
user : str
Name of the user.
token : str
The token to verify for the user.
Returns
-------
verified : bool
Whether or not the user-token pair is valid.
message : str
A message string, if needed.
flag : bool
Whether or not the verification itself could be completed successfully.
"""
if not self.user_exists(user):
return False, 'User {0!r} not registered'.format(user), False
u = self.get_user(user)
hashed_token = self.hash_token(token)
valid = (hashed_token == u.hashed_token)
msg = 'User verified' if valid else 'Invalid token for user ' + user
return valid, msg, True
[docs] def deregister(self, user, token):
"""Deregisters a user.
Parameters
----------
user : str
Name of the user to deregister.
token : str
The user's token
Returns
-------
message : str
Message for status of deregistration
flag : bool
Whether or not the deregistration was successful.
"""
valid, msg, flag = self.verify(user, token)
if not valid or not flag:
return msg, False
self.remove_user(user)
return user + ' deregistered', True
[docs] def reset(self, user, email):
"""Resets a user's token on the system. The email address here must match
the one originally provided.
Parameters
----------
user : str
Name of the user to register.
email : str
Email address for the user.
Returns
-------
token or message : str
The token if the registration was successful, and an error message
if it wasn't.
flag : bool
Whether or not the registration was successful.
"""
if not self.user_exists(user):
return 'User {0!r} not registered'.format(user), False
u = self.get_user(user)
if email != u.email:
return 'User email does not match registered email address', False
self.remove_user(user)
token, flag = self.register(user, email)
return token, flag
CACHE = Cache()