#!/usr/bin/python # -*- coding: utf-8 -*- # Mitter, a client for Twitter. # Copyright (C) 2007, 2008 The Mitter Contributors # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import urllib import urllib2 import logging import datetime import base64 import htmlentitydefs import re import warnings from httplib import BadStatusLine from socket import error as socketError from mitterlib import htmlize from networkbase import NetworkBase, NetworkData, auth_options, \ NetworkDNSError, NetworkBadStatusLineError, NetworkLowLevelError, \ NetworkInvalidResponseError, NetworkPermissionDeniedError, \ MessageTooLongWarning try: # Python 2.6/3.0 JSON parser import json except ImportError: # Fallback to SimpleJSON import simplejson as json # logging _log = logging.getLogger('mitterlib.network.Twitter') # the month names come directly from the site, so we are not affected by # locale settings. _month_names = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] def _unhtml(text): """Convert text coming in HTML encoded to UTF-8 representations.""" new_text = [] copy_pos = 0 _log.debug('Original text: %s', text) for code in re.finditer(r'&(\w+);', text): new_text.append(text[copy_pos:code.start()]) entity = text[code.start()+1:code.end()-1] if entity in htmlentitydefs.name2codepoint: new_text.append(unichr( htmlentitydefs.name2codepoint[entity])) else: new_text.append(code.group().decode('utf8')) copy_pos = code.end() new_text.append(text[copy_pos:]) _log.debug('New text: %s', new_text) result = u''.join(new_text) _log.debug('Result: %s', result) return result def _to_datetime(server_str): """Convert a date send by the server to a datetime object. Ex: from this: Tue Mar 13 00:12:41 +0000 2007 to datetime. """ date_info = server_str.split(' ') month = _month_names.index(date_info[1]) day = int(date_info[2]) year = int(date_info[5]) time_info = date_info[3].split(':') hour = int(time_info[0]) minute = int(time_info[1]) second = int(time_info[2]) return datetime.datetime(year, month, day, hour, minute, second) def _make_datetime(response): """Converts dates on responses to datetime objects.""" result = [] for tweet in response: result.append(TwitterNetworkData(tweet)) return result class TwitterNetworkData(NetworkData): """A simple wrapper around NetworkData, to make things easier to convert twitter data into a NetworkData object.""" def __init__(self, data): """Class initialization. Receives a dictionary with a single tweet.""" NetworkData.__init__(self) self.id = data['id'] self.name = data['user']['name'] self.username = data['user']['screen_name'] self.avatar = data['user']['profile_image_url'] self.message_time = _to_datetime(data['created_at']) self.favourited = data['favorited'] if 'in_reply_to_status_id' in data and data['in_reply_to_status_id']: self.parent = int(data['in_reply_to_status_id']) if 'retweeted_status' in data: self.reposted_by = self.username retweet_user = data['retweeted_status']['user'] self.name = retweet_user['name'] self.username = retweet_user['screen_name'] self.avatar = retweet_user['profile_image_url'] # also switch the text for the original text. data['text'] = data['retweeted_status']['text'] # Twitter encodes a lot of HTML entities, which are not good when # you want to *display* then (e.g., "<" returns to us as "<"). # So we convert this here. Interfaces need to worry about converting # them if it becomes a problem. self.message = _unhtml(data['text']) return class Connection(NetworkBase): """Base class to talk to twitter.""" NAMESPACE = 'Twitter' SHORTCUT = 'tw' # TODO: find a way to move this to the config file def is_setup(self): """Return True or False if the network is setup/enabled.""" if (self._options[self.NAMESPACE]['username'] and self._options[self.NAMESPACE]['password']): # Consider the network enabled if there is an username and # password return True else: return False def __init__(self, options): self._options = options @property def server(self): if self._options[self.NAMESPACE]['https']: return self._options[self.NAMESPACE]['secure_server_url'] else: return self._options[self.NAMESPACE]['server_url'] def _common_headers(self): """Returns a string with the normal headers we should add on every request""" auth = base64.b64encode('%s:%s' % ( self._options[self.NAMESPACE]['username'], self._options[self.NAMESPACE]['password'])) headers = { 'Authorization': 'Basic %s' % (auth), 'User-Agent': self._user_agent} return headers def _request(self, resource, headers=None, body=None): """Send a request to the Twitter server. Once finished, call the function at callback.""" url = '%s%s' % (self.server, resource) _log.debug('Request %s' % (url)) request = urllib2.Request(url=url) request_headers = self._common_headers() if headers: request_headers.update(headers) for key in request_headers: _log.debug('Header: %s=%s' % (key, request_headers[key])) request.add_header(key, request_headers[key]) if body: _log.debug('Body: %s' % (body)) request.add_data(body) try: _log.debug('Starting request of %s' % (url)) response = urllib2.urlopen(request) data = response.read() except urllib2.HTTPError, exc: _log.debug('HTTPError: %d' % (exc.code)) _log.debug('HTTPError: response body:\n%s' % exc.read()) # To me, I got a lot of 502 for "replies". It shows the # "Something is technically wrong" most of the time in the real # pages. if exc.code == 403: # Permission denied. raise NetworkPermissionDeniedError raise NetworkInvalidResponseError except urllib2.URLError, exc: _log.error('URL error: %s' % exc.reason) raise NetworkDNSError except BadStatusLine: _log.error('Bad status line (Twitter is going bananas)') raise NetworkBadStatusLineError except socketError: # That's the worst exception ever. _log.error('Socket connection error') raise NetworkLowLevelError # TODO: Permission denied? # Introduced in Twitter in 2009.03.27 response_headers = response.info() if 'X-RateLimit-Remaining' in response_headers: self._rate_limit = int(response_headers['X-RateLimit-Remaining']) _log.debug('Remaning hits: %d', self._rate_limit) elif 'x-ratelimit-remaining' in response_headers: self._rate_limit = int(response_headers['x-ratelimit-remaining']) _log.debug('Remaning hits: %d', self._rate_limit) else: self._rate_limit = None _log.debug('Request completed') _log.debug('info(%s): %s', type(response.info()), response.info()) return json.loads(data) # # New network style methods # AUTH = [ {'name': 'username', 'flags': ['-u', '--username'], 'prompt': 'Username', 'help': 'Your twitter username', 'type': 'str'}, {'name': 'password', 'flags': ['-p', '--password'], 'prompt': 'Password', 'help': 'Your twitter password', 'type': 'passwd'}] @classmethod def options(self, options): """Add options related to Twitter.""" options.add_group(self.NAMESPACE, 'Twitter network') options.add_option('-s', '--no-https', group=self.NAMESPACE, option='https', default=True, # Secure connections by default help='Disable HTTPS (secure) connection with Twitter.', action='store_false') options.add_option( group=self.NAMESPACE, option='last_tweet', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='last_reply', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='server_url', default='http://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='secure_server_url', default='https://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='page_size', default=20, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='message_threshold', default=4, is_cmd_option=False) auth_options(self.NAMESPACE, options, self.AUTH) return def _timeline(self, config_var, url): """Request one of the lists of tweets.""" last_id = int(self._options[self.NAMESPACE][config_var]) _log.debug('%s: %d', config_var, last_id) params = {'count': self._options[self.NAMESPACE]['page_size']} if last_id > 0: params['since_id'] = last_id page = 1 result = [] response = [0] # So we stay in the loop. high_id = 0 while response: # Not the cleanest code # TODO: How the interfaces can interrupt this? if page > 1: params['page'] = page final_url = '?'.join([url, urllib.urlencode(params)]) response = self._request(final_url) _log.debug('Page %d, %d results', page, len(response)) if response: # extract the highest id in the respone and save it so we can # use it when requesting data again (using the since_id # parameter) top_tweet_id = response[0]['id'] _log.debug('Top tweet: %d; Highest seen tweet: %d', top_tweet_id, high_id) if top_tweet_id > high_id: high_id = top_tweet_id response_data = _make_datetime(response) result.extend(response_data) page += 1 # Request the next page _log.debug('%d messages, %d threshold' % (len(response_data), self._options[self.NAMESPACE]['message_threshold'])) if (len(response_data) <= self._options[self.NAMESPACE]['message_threshold']): break if last_id == 0: # do not try to download everything if we don't have a # previous list (or we'll blow the available requests in one # short) break # only update the "last seen id" if everything goes alright if high_id > int(self._options[self.NAMESPACE][config_var]): _log.debug('Last tweet updated: %d', high_id) self._options[self.NAMESPACE][config_var] = high_id return result def messages(self): """Return a list of NetworkData objects for the main "timeline".""" return self._timeline('last_tweet', '/statuses/home_timeline.json') def message(self, message_id): """Retrieves the information of one message.""" response = self._request('/statuses/show/%d.json' % (message_id)) return TwitterNetworkData(response) def link(self, message): """Return a link directly to the message.""" return 'http://twitter.com/%s/status/%s' % (message.username, message.id) def reply_prefix(self, message): """Returns the prefix needed for a reply.""" return '@' + message.username + ' ' def replies(self): """Return a list of NetworkData objects for the replies for the user messages.""" return self._timeline('last_reply', '/statuses/replies.json') def available_requests(self): """Return the current user rate limit.""" if self._rate_limit: return self._rate_limit data = self._request('/account/rate_limit_status.json') _log.debug('Requests: %s', data) return int(data['remaining_hits']) def update(self, status, reply_to=None): """Update the user status.""" if len(status) > 140: warnings.warn('Message too long', MessageTooLongWarning) # In Python 2.5, urllib.urlencode calls str(), which removes the # unicodeness of the "status". So we need to convert those peski # accents to HTML entities, so everything falls into ASCII. body = { 'status': htmlize(status), 'source': 'mitter'} if reply_to: if isinstance(reply_to, NetworkData): body['in_reply_to_status_id'] = reply_to.id # This is to protect the user from himself. You don't *need* # to start a reply with a @, but it looks really # confusing in the Twiter website. So if the line doesn't # start with the username of the original user, we add it # for the user. if not status.startswith('@' + reply_to.username): body['status'] = '@' + reply_to.username + ' ' + \ status else: body['in_reply_to_status_id'] = reply_to _log.debug('Body: %s', body) body = urllib.urlencode(body) _log.debug('Message to twitter: %s' % (body)) data = self._request('/statuses/update.json', body=body) # TODO: Check if twitter sends an error message when the message is # too large. return TwitterNetworkData(data) def repost(self, message): """Repost a message.""" body = urllib.urlencode({'id': message.id}) resource = '/statuses/retweet/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data) def favourite(self, message): """Mark a message as favourite.""" body = urllib.urlencode({'id': message.id}) if not message.favourite: resource = '/favorites/create/%d.json' % (message.id) else: resource = '/favorites/destroy/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data) def delete_message(self, message): """Delete a message.""" if isinstance(message, NetworkData): message = message.id # We don't need anything else for Twitter # make a body, so _request makes it a post. body = urllib.urlencode({'id': message}) resource = '/statuses/destroy/%s.json' % (message) response = self._request(resource, body=body) _log.debug('Delete response: %s', response) return True # Either we get a response or an exception before we reach # this. def can_delete(self, message): """Check if the message belongs to the user. If so, returns True; False otherwise.""" return (message.username == self._options[self.NAMESPACE]['username']) def can_reply(self, message): """Always return True; Twitter allows replying to any messages, including the ones from the user.""" return True def can_repost(self, message): """Twitter ignores retweets from the user.""" return not (message.username == self._options[self.NAMESPACE]['username']) def can_favourite(self, message): """Always return True; Twitter allows favouriting/unfavouriting any messages.""" return True