#!/usr/bin/env python # -*- coding: utf-8 -*- # Mitter, a client for Twitter. # Copyright (C) 2007, 2008 The Mitter Contributors # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import urllib import urllib2 import logging import datetime import base64 import htmlentitydefs import re import warnings import gettext import string from httplib import BadStatusLine from socket import error as socketError from networkbase import NetworkBase, NetworkData, NetworkUser, auth_options, \ NetworkDNSError, NetworkBadStatusLineError, NetworkLowLevelError, \ NetworkInvalidResponseError, NetworkPermissionDeniedError, \ MessageTooLongWarning try: # Python 2.6/3.0 JSON parser import json except ImportError: # Fallback to SimpleJSON import simplejson as json # ---------------------------------------------------------------------- # I18n bits # ---------------------------------------------------------------------- t = gettext.translation('ui_pygtk', fallback=True) _ = t.gettext N_ = t.ngettext # ---------------------------------------------------------------------- # logging # ---------------------------------------------------------------------- _log = logging.getLogger('mitterlib.network.Twitter') # ---------------------------------------------------------------------- # the month names come directly from the site, so we are not affected by # locale settings. # ---------------------------------------------------------------------- _month_names = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] _punctuation = string.punctuation.replace('_', '') def _unhtml(text): """Convert text coming in HTML encoded to UTF-8 representations.""" new_text = [] copy_pos = 0 _log.debug('Original text: %s', text) for code in re.finditer(r'&(\w+);', text): new_text.append(text[copy_pos:code.start()]) entity = text[code.start()+1:code.end()-1] if entity in htmlentitydefs.name2codepoint: new_text.append(unichr( htmlentitydefs.name2codepoint[entity])) else: new_text.append(code.group().decode('utf8')) copy_pos = code.end() new_text.append(text[copy_pos:]) _log.debug('New text: %s', new_text) result = u''.join(new_text) _log.debug('Result: %s', result) return result def _to_datetime(server_str): """Convert a date send by the server to a datetime object. Ex: from this: Tue Mar 13 00:12:41 +0000 2007 to datetime. """ date_info = server_str.split(' ') month = _month_names.index(date_info[1]) day = int(date_info[2]) year = int(date_info[5]) time_info = date_info[3].split(':') hour = int(time_info[0]) minute = int(time_info[1]) second = int(time_info[2]) return datetime.datetime(year, month, day, hour, minute, second) class TwitterNetworkUser(NetworkUser): """A simple wrapper around :class:`NetwokrUser`, to make things easier to convert twitter user information into a NetworkUser object.""" def __init__(self, data): super(TwitterNetworkUser, self).__init__() self.name = data['user']['name'] self.username = data['user']['screen_name'] self.avatar = data['user']['profile_image_url'] class TwitterNetworkData(NetworkData): """A simple wrapper around NetworkData, to make things easier to convert twitter data into a NetworkData object.""" def __init__(self, data, logged_user): """Class initialization. Receives a dictionary with a single tweet.""" super(TwitterNetworkData, self).__init__() self.id = data['id'] self.author = TwitterNetworkUser(data) self.message_time = _to_datetime(data['created_at']) self.replyable = True # All messages are replyables self.favoritable = True # All messages are favoritable self.repostable = not (self.author.username == logged_user) self.deletable = (self.author.username == logged_user) self.link = 'http://twitter.com/%s/status/%s' % ( self.author.username, self.id) if 'protected' in data['user']: # twitter protects all messages from a user, not per message. self.protected = data['user']['protected'] if 'favorited' in data: self.favorited = data['favorited'] if 'in_reply_to_status_id' in data and data['in_reply_to_status_id']: owner = NetworkUser() owner.username = data['in_reply_to_screen_name'] self.parent = data['in_reply_to_status_id'] self.parent_owner = owner if 'retweeted_status' in data: self.reposted_by = self.author self.author = TwitterNetworkUser(data['retweeted_status']) self.id = data['retweeted_status']['id'] # also switch the text for the original text. data['text'] = data['retweeted_status']['text'] # keep the message_time as is, so we have the retweet time, not he # original message time # set the prefix here, so we can have the proper author even on # reposts. self.reply_prefix = '@' + self.author.username + ' ' # Twitter encodes a lot of HTML entities, which are not good when # you want to *display* then (e.g., "<" returns to us as "<"). # So we convert this here. Interfaces need to worry about converting # them if it becomes a problem. self.message = _unhtml(data['text']) # regular expression for users self.user_regexp = r'@[^ ' + _punctuation + ']+' # regular expression for hashtags self.tag_regexp = r'#[^ ' + _punctuation + ']+' return class Connection(NetworkBase): """Base class to talk to twitter.""" NAMESPACE = 'Twitter' SHORTCUT = 'tw' # TODO: find a way to move this to the config file def __init__(self, options): self._options = options self._user = None #------------------------------------------------------------ # Properties #------------------------------------------------------------ @property def is_setup(self): """Return True or False if the network is setup/enabled.""" if (self._options[self.NAMESPACE]['username'] and self._options[self.NAMESPACE]['password']): # Consider the network enabled if there is an username and # password return True else: return False @property def user(self): """Information about this user.""" if self._user: return self._user user = self._request('/account/verify_credentials.json') self._user = NetworkUser() self._user.name = user['name'] self._user.username = user['screen_name'] self._user.avatar = user['profile_image_url'] _log.debug("User: %s" % (self._user)) return self._user @property def server(self): if self._options[self.NAMESPACE]['https']: return self._options[self.NAMESPACE]['secure_server_url'] else: return self._options[self.NAMESPACE]['server_url'] #------------------------------------------------------------ # Private functions #------------------------------------------------------------ def _common_headers(self): """Returns a string with the normal headers we should add on every request""" auth = base64.b64encode('%s:%s' % ( self._options[self.NAMESPACE]['username'], self._options[self.NAMESPACE]['password'])) headers = { 'Authorization': 'Basic %s' % (auth), 'User-Agent': self._user_agent} return headers def _request(self, resource, headers=None, body=None): """Send a request to the Twitter server. Once finished, call the function at callback.""" url = '%s%s' % (self.server, resource) _log.debug('Request %s' % (url)) request = urllib2.Request(url=url) request_headers = self._common_headers() if headers: request_headers.update(headers) for key in request_headers: _log.debug('Header: %s=%s' % (key, request_headers[key])) request.add_header(key, request_headers[key]) if body: _log.debug('Body: %s' % (body)) request.add_data(body) timeout = self._options['NetworkManager']['timeout'] try: _log.debug('Starting request of %s (timeout %d)' % (url, timeout)) try: response = urllib2.urlopen(request, timeout=timeout) except TypeError, e: # Python 2.5 don't have the timeout parameter. response = urllib2.urlopen(request) data = response.read() except urllib2.HTTPError, exc: body = exc.read() _log.debug('HTTPError: %d' % (exc.code)) _log.debug('HTTPError: response body:\n%s' % (body)) # To me, I got a lot of 502 for "replies". It shows the # "Something is technically wrong" most of the time in the real # pages. if exc.code == 403: # 403 can be either permission denied or a complain that the # update is duplicated. if 'duplicate.' in body: # yes, this is a very week testing. I just don't want to # mess again with try/except to check the json result. raise NetworkMessageDuplicatedError(self.NAMESPACE) else: # Permission denied. raise NetworkPermissionDeniedError(self.NAMESPACE) raise NetworkInvalidResponseError(self.NAMESPACE) except urllib2.URLError, exc: _log.error('URL error: %s' % exc.reason) raise NetworkDNSError(self.NAMESPACE) except BadStatusLine: _log.error('Bad status line (Twitter is going bananas)') raise NetworkBadStatusLineError(self.NAMESPACE) except socketError: # That's the worst exception ever. _log.error('Socket connection error') raise NetworkLowLevelError(self.NAMESPACE) # TODO: Permission denied? # Introduced in Twitter in 2009.03.27 response_headers = response.info() if 'X-RateLimit-Remaining' in response_headers: rate = response_headers['X-RateLimit-Remaining'].split(',')[0] _log.debug('X-RateLimit-Remaining: %s', rate) self._rate_limit = int(rate) _log.debug('Remaning hits: %d', self._rate_limit) elif 'x-ratelimit-remaining' in response_headers: rate = response_headers['x-ratelimit-remaining'].split(',')[0] _log.debug('x-ratelimit-remaining: %s', rate) self._rate_limit = int(rate) _log.debug('Remaning hits: %d', self._rate_limit) else: self._rate_limit = None _log.debug('Request completed') _log.debug('info(%s): %s', type(response.info()), response.info()) return json.loads(data) def _make_datetime(self, response): """Converts dates on responses to datetime objects.""" result = [] for tweet in response: result.append(TwitterNetworkData(tweet, self.user.username)) return result def _timeline(self, config_var, url): """Request one of the lists of tweets.""" last_id = int(self._options[self.NAMESPACE][config_var]) _log.debug('%s: %d', config_var, last_id) params = {} if last_id > 0: params['since_id'] = last_id page = 1 result = [] response = [0] # So we stay in the loop. high_id = 0 while response: # Not the cleanest code # TODO: How the interfaces can interrupt this? if page > 1: params['page'] = page final_url = '?'.join([url, urllib.urlencode(params)]) response = self._request(final_url) _log.debug('Page %d, %d results', page, len(response)) if response: # extract the highest id in the respone and save it so we can # use it when requesting data again (using the since_id # parameter) top_tweet_id = response[0]['id'] _log.debug('Top tweet: %d; Highest seen tweet: %d', top_tweet_id, high_id) if top_tweet_id > high_id: high_id = top_tweet_id response_data = self._make_datetime(response) result.extend(response_data) page += 1 # Request the next page _log.debug('%d messages, %d threshold' % (len(response_data), self._options[self.NAMESPACE]['message_threshold'])) if (len(response_data) <= self._options[self.NAMESPACE]['message_threshold']): break if last_id == 0: # do not try to download everything if we don't have a # previous list (or we'll blow the available requests in one # short) break # only update the "last seen id" if everything goes alright if high_id > int(self._options[self.NAMESPACE][config_var]): _log.debug('Last tweet updated: %d', high_id) self._options[self.NAMESPACE][config_var] = high_id return result #------------------------------------------------------------ # Network Options #------------------------------------------------------------ AUTH = [ {'name': 'username', 'flags': ['-u', '--username'], 'prompt': _('Username'), 'help': _('Your twitter username'), 'type': 'str'}, {'name': 'password', 'flags': ['-p', '--password'], 'prompt': _('Password'), 'help': _('Your twitter password'), 'type': 'passwd'}] @classmethod def options(self, options): """Add options related to Twitter.""" # Rememeber to update the CHEAT-CODES file if you add/remove any # options. options.add_group(self.NAMESPACE, 'Twitter network') options.add_option('-s', '--no-https', group=self.NAMESPACE, option='https', default=True, # Secure connections by default help=_('Disable HTTPS (secure) connection with Twitter.'), action='store_false') options.add_option( group=self.NAMESPACE, option='last_tweet', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='last_reply', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='server_url', default='http://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='secure_server_url', default='https://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='message_threshold', default=16, is_cmd_option=False) auth_options(self.NAMESPACE, options, self.AUTH) return #------------------------------------------------------------ # NetworkBase required functions #------------------------------------------------------------ def messages(self): """Return a list of NetworkData objects for the main "timeline".""" return self._timeline('last_tweet', '/statuses/home_timeline.json') def message(self, message_id): """Retrieves the information of one message.""" response = self._request('/statuses/show/%d.json' % (message_id)) return TwitterNetworkData(response, self.user.username) def replies(self): """Return a list of NetworkData objects for the replies for the user messages.""" return self._timeline('last_reply', '/statuses/replies.json') def available_requests(self): """Return the current user rate limit.""" if self._rate_limit: return self._rate_limit data = self._request('/account/rate_limit_status.json') _log.debug('Requests: %s', data) return int(data['remaining_hits']) def update(self, status, reply_to=None): """Update the user status.""" if len(status) > 140: warnings.warn('Message too long', MessageTooLongWarning) body = { 'status': status, 'source': 'mitter'} if reply_to: if isinstance(reply_to, NetworkData): body['in_reply_to_status_id'] = reply_to.id # This is to protect the user from himself. You don't *need* # to start a reply with a @, but it looks really # confusing in the Twiter website. So if the line doesn't # start with the username of the original user, we add it # for the user. if not status.startswith('@' + reply_to.author.username): body['status'] = '@' + reply_to.author.username + ' ' + \ status else: body['in_reply_to_status_id'] = reply_to #_log.debug('Body: %s', body) body = urllib.urlencode(body) # seems urlenconde is UTF8 safe now _log.debug('Message to twitter: %s' % (body)) data = self._request('/statuses/update.json', body=body) # TODO: Check if twitter sends an error message when the message is # too large. # TODO: Some updates return the previous status, not the new one. Not # sure what that means. return TwitterNetworkData(data, self.user.username) def repost(self, message): """Repost a message.""" assert(isinstance(message, NetworkData)) body = urllib.urlencode({'id': message.id}) resource = '/statuses/retweet/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data, self.user.username) def favorite(self, message): """Mark a message as favorite.""" assert(isinstance(message, NetworkData)) body = urllib.urlencode({'id': message.id}) if not message.favorite: resource = '/favorites/create/%d.json' % (message.id) else: resource = '/favorites/destroy/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data, self.user.username) def delete_message(self, message): """Delete a message.""" if isinstance(message, NetworkData): message = message.id # We don't need anything else for Twitter # make a body, so _request makes it a post. body = urllib.urlencode({'id': message}) resource = '/statuses/destroy/%s.json' % (message) response = self._request(resource, body=body) _log.debug('Delete response: %s', response) return True # Either we get a response or an exception before we reach # this.