#!/usr/bin/env python # -*- coding: utf-8 -*- # Mitter, a client for Twitter. # Copyright (C) 2007, 2008 The Mitter Contributors # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import urllib import urllib2 import logging import datetime import base64 import htmlentitydefs import re import warnings import htmlentitydefs import gettext from httplib import BadStatusLine from socket import error as socketError from networkbase import NetworkBase, NetworkData, auth_options, \ NetworkDNSError, NetworkBadStatusLineError, NetworkLowLevelError, \ NetworkInvalidResponseError, NetworkPermissionDeniedError, \ MessageTooLongWarning try: # Python 2.6/3.0 JSON parser import json except ImportError: # Fallback to SimpleJSON import simplejson as json """ The Twitter network layer have the following options: *last_tweet* Last tweet seen in the home timeline request. It's used to not request the whole timeline again. Usually, it should not be manually changed. Starts with no value, meaning it will request only the first page; after that, it will requests all tweets, no matter how many pages it is necessary, since the last seen tweet. *last_reply* Last reply seen in the replies timeline. Works exactly the same as *last_tweet*, but with replies timeline. *server_url* URL for the non-secure server. Can include any paths. Default value is 'http://api.twitter.com/1'. *secure_server_url* URL for the secure server. Can include any paths. Default value is 'https://api.twitter.com/1'. *threshold* Threshold for not requesting the next page. If the number of returned elements is below this number, the module will not request the next page. This is due the way Twitter uses its cache: In a normal operation, any page that have less than the default 20 elements per page can be considered the last page; due the caching policy Twitter uses, some pages will have less than those 20 elements (it first retrieves the 20 elements, then checks if there are any deleted elements -- if there are, those are removed but no new elements are added to the result.) Default value is '16' (80% of a full page.) """ # ---------------------------------------------------------------------- # I18n bits # ---------------------------------------------------------------------- t = gettext.translation('ui_pygtk', fallback=True) _ = t.gettext N_ = t.ngettext # ---------------------------------------------------------------------- # logging # ---------------------------------------------------------------------- _log = logging.getLogger('mitterlib.network.Twitter') # ---------------------------------------------------------------------- # the month names come directly from the site, so we are not affected by # locale settings. # ---------------------------------------------------------------------- _month_names = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] def _unhtml(text): """Convert text coming in HTML encoded to UTF-8 representations.""" new_text = [] copy_pos = 0 _log.debug('Original text: %s', text) for code in re.finditer(r'&(\w+);', text): new_text.append(text[copy_pos:code.start()]) entity = text[code.start()+1:code.end()-1] if entity in htmlentitydefs.name2codepoint: new_text.append(unichr( htmlentitydefs.name2codepoint[entity])) else: new_text.append(code.group().decode('utf8')) copy_pos = code.end() new_text.append(text[copy_pos:]) _log.debug('New text: %s', new_text) result = u''.join(new_text) _log.debug('Result: %s', result) return result def _to_datetime(server_str): """Convert a date send by the server to a datetime object. Ex: from this: Tue Mar 13 00:12:41 +0000 2007 to datetime. """ date_info = server_str.split(' ') month = _month_names.index(date_info[1]) day = int(date_info[2]) year = int(date_info[5]) time_info = date_info[3].split(':') hour = int(time_info[0]) minute = int(time_info[1]) second = int(time_info[2]) return datetime.datetime(year, month, day, hour, minute, second) def _make_datetime(response): """Converts dates on responses to datetime objects.""" result = [] for tweet in response: result.append(TwitterNetworkData(tweet)) return result class TwitterNetworkData(NetworkData): """A simple wrapper around NetworkData, to make things easier to convert twitter data into a NetworkData object.""" def __init__(self, data): """Class initialization. Receives a dictionary with a single tweet.""" NetworkData.__init__(self) self.id = data['id'] self.name = data['user']['name'] self.username = data['user']['screen_name'] self.avatar = data['user']['profile_image_url'] self.message_time = _to_datetime(data['created_at']) if 'favorited' in data: self.favourited = data['favorited'] if 'in_reply_to_status_id' in data and data['in_reply_to_status_id']: self.parent = int(data['in_reply_to_status_id']) self.parent_owner = data['in_reply_to_screen_name'] if 'retweeted_status' in data: self.reposted_by = self.username retweet_user = data['retweeted_status']['user'] self.name = retweet_user['name'] self.username = retweet_user['screen_name'] self.avatar = retweet_user['profile_image_url'] self.id = data['retweeted_status']['id'] # also switch the text for the original text. data['text'] = data['retweeted_status']['text'] # keep the message_time as is, so we have the retweet time, not he # original message time # Twitter encodes a lot of HTML entities, which are not good when # you want to *display* then (e.g., "<" returns to us as "<"). # So we convert this here. Interfaces need to worry about converting # them if it becomes a problem. self.message = _unhtml(data['text']) return class Connection(NetworkBase): """Base class to talk to twitter.""" NAMESPACE = 'Twitter' SHORTCUT = 'tw' # TODO: find a way to move this to the config file def is_setup(self): """Return True or False if the network is setup/enabled.""" if (self._options[self.NAMESPACE]['username'] and self._options[self.NAMESPACE]['password']): # Consider the network enabled if there is an username and # password return True else: return False def __init__(self, options): self._options = options @property def server(self): if self._options[self.NAMESPACE]['https']: return self._options[self.NAMESPACE]['secure_server_url'] else: return self._options[self.NAMESPACE]['server_url'] def _common_headers(self): """Returns a string with the normal headers we should add on every request""" auth = base64.b64encode('%s:%s' % ( self._options[self.NAMESPACE]['username'], self._options[self.NAMESPACE]['password'])) headers = { 'Authorization': 'Basic %s' % (auth), 'User-Agent': self._user_agent} return headers def _request(self, resource, headers=None, body=None): """Send a request to the Twitter server. Once finished, call the function at callback.""" url = '%s%s' % (self.server, resource) _log.debug('Request %s' % (url)) request = urllib2.Request(url=url) request_headers = self._common_headers() if headers: request_headers.update(headers) for key in request_headers: _log.debug('Header: %s=%s' % (key, request_headers[key])) request.add_header(key, request_headers[key]) if body: _log.debug('Body: %s' % (body)) request.add_data(body) timeout = self._options['Network_Manager']['timeout'] try: _log.debug('Starting request of %s (timeout %d)' % (url, timeout)) response = urllib2.urlopen(request, timeout=timeout) data = response.read() except urllib2.HTTPError, exc: _log.debug('HTTPError: %d' % (exc.code)) _log.debug('HTTPError: response body:\n%s' % exc.read()) # To me, I got a lot of 502 for "replies". It shows the # "Something is technically wrong" most of the time in the real # pages. if exc.code == 403: # Permission denied. raise NetworkPermissionDeniedError raise NetworkInvalidResponseError except urllib2.URLError, exc: _log.error('URL error: %s' % exc.reason) raise NetworkDNSError except BadStatusLine: _log.error('Bad status line (Twitter is going bananas)') raise NetworkBadStatusLineError except socketError: # That's the worst exception ever. _log.error('Socket connection error') raise NetworkLowLevelError # TODO: Permission denied? # Introduced in Twitter in 2009.03.27 response_headers = response.info() if 'X-RateLimit-Remaining' in response_headers: self._rate_limit = int(response_headers['X-RateLimit-Remaining']) _log.debug('Remaning hits: %d', self._rate_limit) elif 'x-ratelimit-remaining' in response_headers: self._rate_limit = int(response_headers['x-ratelimit-remaining']) _log.debug('Remaning hits: %d', self._rate_limit) else: self._rate_limit = None _log.debug('Request completed') _log.debug('info(%s): %s', type(response.info()), response.info()) return json.loads(data) # # New network style methods # AUTH = [ {'name': 'username', 'flags': ['-u', '--username'], 'prompt': _('Username'), 'help': _('Your twitter username'), 'type': 'str'}, {'name': 'password', 'flags': ['-p', '--password'], 'prompt': _('Password'), 'help': _('Your twitter password'), 'type': 'passwd'}] @classmethod def options(self, options): """Add options related to Twitter.""" options.add_group(self.NAMESPACE, 'Twitter network') options.add_option('-s', '--no-https', group=self.NAMESPACE, option='https', default=True, # Secure connections by default help=_('Disable HTTPS (secure) connection with Twitter.'), action='store_false') options.add_option( group=self.NAMESPACE, option='last_tweet', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='last_reply', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='server_url', default='http://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='secure_server_url', default='https://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='message_threshold', default=16, is_cmd_option=False) auth_options(self.NAMESPACE, options, self.AUTH) return def _timeline(self, config_var, url): """Request one of the lists of tweets.""" last_id = int(self._options[self.NAMESPACE][config_var]) _log.debug('%s: %d', config_var, last_id) params = {} if last_id > 0: params['since_id'] = last_id page = 1 result = [] response = [0] # So we stay in the loop. high_id = 0 while response: # Not the cleanest code # TODO: How the interfaces can interrupt this? if page > 1: params['page'] = page final_url = '?'.join([url, urllib.urlencode(params)]) response = self._request(final_url) _log.debug('Page %d, %d results', page, len(response)) if response: # extract the highest id in the respone and save it so we can # use it when requesting data again (using the since_id # parameter) top_tweet_id = response[0]['id'] _log.debug('Top tweet: %d; Highest seen tweet: %d', top_tweet_id, high_id) if top_tweet_id > high_id: high_id = top_tweet_id response_data = _make_datetime(response) result.extend(response_data) page += 1 # Request the next page _log.debug('%d messages, %d threshold' % (len(response_data), self._options[self.NAMESPACE]['message_threshold'])) if (len(response_data) <= self._options[self.NAMESPACE]['message_threshold']): break if last_id == 0: # do not try to download everything if we don't have a # previous list (or we'll blow the available requests in one # short) break # only update the "last seen id" if everything goes alright if high_id > int(self._options[self.NAMESPACE][config_var]): _log.debug('Last tweet updated: %d', high_id) self._options[self.NAMESPACE][config_var] = high_id return result def messages(self): """Return a list of NetworkData objects for the main "timeline".""" return self._timeline('last_tweet', '/statuses/home_timeline.json') def message(self, message_id): """Retrieves the information of one message.""" response = self._request('/statuses/show/%d.json' % (message_id)) return TwitterNetworkData(response) def link(self, message): """Return a link directly to the message.""" assert(isinstance(message, NetworkData)) return 'http://twitter.com/%s/status/%s' % (message.username, message.id) def reply_prefix(self, message): """Returns the prefix needed for a reply.""" assert(isinstance(message, NetworkData)) return '@' + message.username + ' ' def replies(self): """Return a list of NetworkData objects for the replies for the user messages.""" return self._timeline('last_reply', '/statuses/replies.json') def available_requests(self): """Return the current user rate limit.""" if self._rate_limit: return self._rate_limit data = self._request('/account/rate_limit_status.json') _log.debug('Requests: %s', data) return int(data['remaining_hits']) def update(self, status, reply_to=None): """Update the user status.""" if len(status) > 140: warnings.warn('Message too long', MessageTooLongWarning) body = { 'status': status, 'source': 'mitter'} if reply_to: if isinstance(reply_to, NetworkData): body['in_reply_to_status_id'] = reply_to.id # This is to protect the user from himself. You don't *need* # to start a reply with a @, but it looks really # confusing in the Twiter website. So if the line doesn't # start with the username of the original user, we add it # for the user. if not status.startswith('@' + reply_to.username): body['status'] = '@' + reply_to.username + ' ' + \ status else: body['in_reply_to_status_id'] = reply_to #_log.debug('Body: %s', body) body = urllib.urlencode(body) # seems urlenconde is UTF8 safe now _log.debug('Message to twitter: %s' % (body)) data = self._request('/statuses/update.json', body=body) # TODO: Check if twitter sends an error message when the message is # too large. # TODO: Some updates return the previous status, not the new one. Not # sure what that means. return TwitterNetworkData(data) def repost(self, message): """Repost a message.""" assert(isinstance(message, NetworkData)) body = urllib.urlencode({'id': message.id}) resource = '/statuses/retweet/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data) def favourite(self, message): """Mark a message as favourite.""" assert(isinstance(message, NetworkData)) body = urllib.urlencode({'id': message.id}) if not message.favourite: resource = '/favorites/create/%d.json' % (message.id) else: resource = '/favorites/destroy/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data) def delete_message(self, message): """Delete a message.""" if isinstance(message, NetworkData): message = message.id # We don't need anything else for Twitter # make a body, so _request makes it a post. body = urllib.urlencode({'id': message}) resource = '/statuses/destroy/%s.json' % (message) response = self._request(resource, body=body) _log.debug('Delete response: %s', response) return True # Either we get a response or an exception before we reach # this. def can_delete(self, message): """Check if the message belongs to the user. If so, returns True; False otherwise.""" assert(isinstance(message, NetworkData)) return (message.username == self._options[self.NAMESPACE]['username']) def can_reply(self, message): """Always return True; Twitter allows replying to any messages, including the ones from the user.""" return True def can_repost(self, message): """Twitter ignores retweets from the user.""" assert(isinstance(message, NetworkData)) return not (message.username == self._options[self.NAMESPACE]['username']) def can_favourite(self, message): """Always return True; Twitter allows favouriting/unfavouriting any messages.""" return True