diff --git a/luncho/blueprints/token.py b/luncho/blueprints/token.py new file mode 100644 index 0000000..e6fd06e --- /dev/null +++ b/luncho/blueprints/token.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +"""User management.""" + +from flask import Blueprint +from flask import jsonify +from flask import request + +from luncho.helpers import ForceJSON +from luncho.helpers import JSONError + +from luncho.server import User +from luncho.server import db + +token = Blueprint('token', __name__) + +@token.route('', methods=['POST']) +@ForceJSON(required=['username', 'password']) +def get_token(): + """Return an access token to the user. Request must be: + { "username": "username", "password": "hash" }""" + json = request.get_json(force=True) + + user = User.query.filter_by(username=json['username']).first() + if user is None: + return JSONError(404, 'User does not exist') + + if not user.passhash == json['password']: + return JSONError(401, 'Invalid password') + + return jsonify(status='OK', + token=user.get_token()) diff --git a/luncho/helpers.py b/luncho/helpers.py index adc3ee6..029b9da 100644 --- a/luncho/helpers.py +++ b/luncho/helpers.py @@ -39,3 +39,22 @@ class ForceJSON(object): return func(*args, **kwargs) return check_json + + +def JSONError(status, message, **kwargs): + """Generate a JSON error message with the error and extra fields. + + :param status: the HTTP status code for the error + :type status: int + :param message: The message in the error + :type message: str + :param kwargs: Extra fields to be added in the response. *Note*: `status` + and `message` should **NOT** be used. + :type kwargs: kwargs + + :return: A response with the JSON and the status code.""" + resp = jsonify(status='ERROR', + message=message, + **kwargs) + resp.status_code = status + return resp diff --git a/luncho/server.py b/luncho/server.py index 20147dd..b8a264c 100644 --- a/luncho/server.py +++ b/luncho/server.py @@ -3,6 +3,9 @@ import sys import logging +import json +import hmac +import datetime from flask import Flask @@ -37,6 +40,7 @@ class User(db.Model): token = db.Column(db.String) issued_date = db.Column(db.Date) validated = db.Column(db.Boolean, default=False) + created_at = db.Column(db.DateTime, nullable=False) def __init__(self, username, fullname, passhash, token=None, issued_date=None, validated=False): @@ -46,14 +50,39 @@ class User(db.Model): self.token = token self.issued_date = issued_date self.validated = validated + self.created_at = datetime.datetime.now() + + def get_token(self): + """Generate a user token or return the current one for the day.""" + if self.token and self.issued_date == datetime.date.today(): + return self.token + + # create a token for the day + self.token = self._token() + self.issued_date = datetime.date.today() + db.session.commit() + return self._token() + + def valid_token(self, token): + """Check if the user token is valid.""" + return (self.token == self._token()) + + def _token(self): + """Generate a token with the user information and the current date.""" + phrase = json.dumps({'username': self.username, + 'issued_date': datetime.date.today().isoformat()}) + return hmac.new(self.created_at.isoformat(), phrase).hexdigest() + # ---------------------------------------------------------------------- # Blueprints # ---------------------------------------------------------------------- from blueprints.index import index from blueprints.users import users +from blueprints.token import token app.register_blueprint(index, url_prefix='/') +app.register_blueprint(token, url_prefix='/token/') app.register_blueprint(users, url_prefix='/user/') diff --git a/tests/token_tests.py b/tests/token_tests.py new file mode 100644 index 0000000..cf43084 --- /dev/null +++ b/tests/token_tests.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- + +import unittest +import json + +from luncho import server + +from luncho.server import User + + +class TestToken(unittest.TestCase): + """Test token requests.""" + + def setUp(self): + # leave the database blank to make it in memory + server.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://' + server.app.config['TESTING'] = True + + self.app = server.app.test_client() + server.db.create_all() + + # add a user + self.test_user = User(username='test', + fullname='Testing user', + passhash='hash') + server.db.session.add(self.test_user) + server.db.session.commit() + + def tearDown(self): + server.db.drop_all(bind=None) + + def test_create_token(self): + """Test requesting a token""" + request = {'username': 'test', + 'password': 'hash'} + rv = self.app.post('/token/', + data=json.dumps(request), + content_type='application/json') + + self.assertEqual(rv.status_code, 200) + response = json.loads(rv.data) + self.assertTrue('status' in response) + self.assertEqual(response['status'], 'OK') + self.assertTrue('token' in response) + # we can't check the token itself 'cause it should change every day + + def test_reget_token(self): + """Check if getting the token twice will produce the same token.""" + request = {'username': 'test', + 'password': 'hash'} + rv = self.app.post('/token/', + data=json.dumps(request), + content_type='application/json') + + self.assertEqual(rv.status_code, 200) + response = json.loads(rv.data) + + # re-request the token + rv = self.app.post('/token/', + data=json.dumps(request), + content_type='application/json') + + self.assertTrue(rv.status_code, 200) + self.assertEqual(response['token'], json.loads(rv.data)['token']) + + def test_no_such_user(self): + """Check the result of getting a token for a user that doesn't + exist.""" + request = {'username': 'username', + 'password': 'hash'} + rv = self.app.post('/token/', + data=json.dumps(request), + content_type='application/json') + + self.assertEqual(rv.status_code, 404) diff --git a/tests/users_tests.py b/tests/users_tests.py index 36396bc..c680edb 100644 --- a/tests/users_tests.py +++ b/tests/users_tests.py @@ -6,6 +6,8 @@ import json from luncho import server +from luncho.server import User + class TestUsers(unittest.TestCase): """Test users request.""" @@ -33,6 +35,9 @@ class TestUsers(unittest.TestCase): self.assertEqual(rv.status_code, 200) self.assertEqual(json.loads(rv.data), {'status': 'OK'}) + # db check + self.assertIsNotNone(User.query.filter_by(username='username').first()) + def test_duplicate_user(self): """Check the status for trying to create a user that it is already in the database."""