Browse Source

added token generation

master
Julio Biason 11 years ago
parent
commit
6c0e8256d9
  1. 33
      luncho/blueprints/token.py
  2. 19
      luncho/helpers.py
  3. 29
      luncho/server.py
  4. 76
      tests/token_tests.py
  5. 5
      tests/users_tests.py

33
luncho/blueprints/token.py

@ -0,0 +1,33 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""User management."""
from flask import Blueprint
from flask import jsonify
from flask import request
from luncho.helpers import ForceJSON
from luncho.helpers import JSONError
from luncho.server import User
from luncho.server import db
token = Blueprint('token', __name__)
@token.route('', methods=['POST'])
@ForceJSON(required=['username', 'password'])
def get_token():
"""Return an access token to the user. Request must be:
{ "username": "username", "password": "hash" }"""
json = request.get_json(force=True)
user = User.query.filter_by(username=json['username']).first()
if user is None:
return JSONError(404, 'User does not exist')
if not user.passhash == json['password']:
return JSONError(401, 'Invalid password')
return jsonify(status='OK',
token=user.get_token())

19
luncho/helpers.py

@ -39,3 +39,22 @@ class ForceJSON(object):
return func(*args, **kwargs) return func(*args, **kwargs)
return check_json return check_json
def JSONError(status, message, **kwargs):
"""Generate a JSON error message with the error and extra fields.
:param status: the HTTP status code for the error
:type status: int
:param message: The message in the error
:type message: str
:param kwargs: Extra fields to be added in the response. *Note*: `status`
and `message` should **NOT** be used.
:type kwargs: kwargs
:return: A response with the JSON and the status code."""
resp = jsonify(status='ERROR',
message=message,
**kwargs)
resp.status_code = status
return resp

29
luncho/server.py

@ -3,6 +3,9 @@
import sys import sys
import logging import logging
import json
import hmac
import datetime
from flask import Flask from flask import Flask
@ -37,6 +40,7 @@ class User(db.Model):
token = db.Column(db.String) token = db.Column(db.String)
issued_date = db.Column(db.Date) issued_date = db.Column(db.Date)
validated = db.Column(db.Boolean, default=False) validated = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, nullable=False)
def __init__(self, username, fullname, passhash, token=None, def __init__(self, username, fullname, passhash, token=None,
issued_date=None, validated=False): issued_date=None, validated=False):
@ -46,14 +50,39 @@ class User(db.Model):
self.token = token self.token = token
self.issued_date = issued_date self.issued_date = issued_date
self.validated = validated self.validated = validated
self.created_at = datetime.datetime.now()
def get_token(self):
"""Generate a user token or return the current one for the day."""
if self.token and self.issued_date == datetime.date.today():
return self.token
# create a token for the day
self.token = self._token()
self.issued_date = datetime.date.today()
db.session.commit()
return self._token()
def valid_token(self, token):
"""Check if the user token is valid."""
return (self.token == self._token())
def _token(self):
"""Generate a token with the user information and the current date."""
phrase = json.dumps({'username': self.username,
'issued_date': datetime.date.today().isoformat()})
return hmac.new(self.created_at.isoformat(), phrase).hexdigest()
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Blueprints # Blueprints
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
from blueprints.index import index from blueprints.index import index
from blueprints.users import users from blueprints.users import users
from blueprints.token import token
app.register_blueprint(index, url_prefix='/') app.register_blueprint(index, url_prefix='/')
app.register_blueprint(token, url_prefix='/token/')
app.register_blueprint(users, url_prefix='/user/') app.register_blueprint(users, url_prefix='/user/')

76
tests/token_tests.py

@ -0,0 +1,76 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import unittest
import json
from luncho import server
from luncho.server import User
class TestToken(unittest.TestCase):
"""Test token requests."""
def setUp(self):
# leave the database blank to make it in memory
server.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://'
server.app.config['TESTING'] = True
self.app = server.app.test_client()
server.db.create_all()
# add a user
self.test_user = User(username='test',
fullname='Testing user',
passhash='hash')
server.db.session.add(self.test_user)
server.db.session.commit()
def tearDown(self):
server.db.drop_all(bind=None)
def test_create_token(self):
"""Test requesting a token"""
request = {'username': 'test',
'password': 'hash'}
rv = self.app.post('/token/',
data=json.dumps(request),
content_type='application/json')
self.assertEqual(rv.status_code, 200)
response = json.loads(rv.data)
self.assertTrue('status' in response)
self.assertEqual(response['status'], 'OK')
self.assertTrue('token' in response)
# we can't check the token itself 'cause it should change every day
def test_reget_token(self):
"""Check if getting the token twice will produce the same token."""
request = {'username': 'test',
'password': 'hash'}
rv = self.app.post('/token/',
data=json.dumps(request),
content_type='application/json')
self.assertEqual(rv.status_code, 200)
response = json.loads(rv.data)
# re-request the token
rv = self.app.post('/token/',
data=json.dumps(request),
content_type='application/json')
self.assertTrue(rv.status_code, 200)
self.assertEqual(response['token'], json.loads(rv.data)['token'])
def test_no_such_user(self):
"""Check the result of getting a token for a user that doesn't
exist."""
request = {'username': 'username',
'password': 'hash'}
rv = self.app.post('/token/',
data=json.dumps(request),
content_type='application/json')
self.assertEqual(rv.status_code, 404)

5
tests/users_tests.py

@ -6,6 +6,8 @@ import json
from luncho import server from luncho import server
from luncho.server import User
class TestUsers(unittest.TestCase): class TestUsers(unittest.TestCase):
"""Test users request.""" """Test users request."""
@ -33,6 +35,9 @@ class TestUsers(unittest.TestCase):
self.assertEqual(rv.status_code, 200) self.assertEqual(rv.status_code, 200)
self.assertEqual(json.loads(rv.data), {'status': 'OK'}) self.assertEqual(json.loads(rv.data), {'status': 'OK'})
# db check
self.assertIsNotNone(User.query.filter_by(username='username').first())
def test_duplicate_user(self): def test_duplicate_user(self):
"""Check the status for trying to create a user that it is already """Check the status for trying to create a user that it is already
in the database.""" in the database."""

Loading…
Cancel
Save