#!/usr/bin/env python # -*- coding: utf-8 -*- # Mitter, a client for Twitter. # Copyright (C) 2007, 2008 The Mitter Contributors # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import urllib import urllib2 import logging import datetime import base64 import htmlentitydefs import re import warnings import htmlentitydefs from httplib import BadStatusLine from socket import error as socketError #from mitterlib import htmlize from networkbase import NetworkBase, NetworkData, auth_options, \ NetworkDNSError, NetworkBadStatusLineError, NetworkLowLevelError, \ NetworkInvalidResponseError, NetworkPermissionDeniedError, \ MessageTooLongWarning try: # Python 2.6/3.0 JSON parser import json except ImportError: # Fallback to SimpleJSON import simplejson as json # logging _log = logging.getLogger('mitterlib.network.Twitter') # the month names come directly from the site, so we are not affected by # locale settings. _month_names = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] def htmlize(text): """Convert a normal text to the format required by Twitter (url-encoded and UTF-8 encoded.""" # XXX: UTF-8 part is not working as it should, sadly. if not isinstance(text, unicode): text = unicode(text, 'utf-8') # hex_char = (lambda x: '%' + hex(x)[2:].rjust(2, '0').upper()) # # codes = [] # for char in text: # char_code = ord(char) # bytes = [] # mask = 0 # # ranges = [65535, 2047, 127] # for r in ranges: # if char_code > r: # mask >>= 1 # mask |= 128 # # byte = char_code & 63 # byte |= 128 # # bytes.insert(0, hex_char(byte)) # # char_code >>= 6 # # if mask: # # multibyte char # mask >>= 1 # mask |= 128 # # byte = char_code | mask # bytes.insert(0, hex_char(byte)) # else: # if char_code < 32: # # special char and it's not a multibyte char # bytes.insert(0, hex_char(char_code)) # else: # # normal, printable char # bytes.insert(0, char) # # codes.append(''.join(bytes)) # # text = ''.join(codes) new = [] for char in text: if ord(char) in htmlentitydefs.codepoint2name: new.append('&%s;' % (htmlentitydefs.codepoint2name[ord(char)])) elif ord(char) == 37: # '%' new.append('%25') else: new.append(char) return ''.join(new) def _unhtml(text): """Convert text coming in HTML encoded to UTF-8 representations.""" new_text = [] copy_pos = 0 _log.debug('Original text: %s', text) for code in re.finditer(r'&(\w+);', text): new_text.append(text[copy_pos:code.start()]) entity = text[code.start()+1:code.end()-1] if entity in htmlentitydefs.name2codepoint: new_text.append(unichr( htmlentitydefs.name2codepoint[entity])) else: new_text.append(code.group().decode('utf8')) copy_pos = code.end() new_text.append(text[copy_pos:]) _log.debug('New text: %s', new_text) result = u''.join(new_text) _log.debug('Result: %s', result) return result def _to_datetime(server_str): """Convert a date send by the server to a datetime object. Ex: from this: Tue Mar 13 00:12:41 +0000 2007 to datetime. """ date_info = server_str.split(' ') month = _month_names.index(date_info[1]) day = int(date_info[2]) year = int(date_info[5]) time_info = date_info[3].split(':') hour = int(time_info[0]) minute = int(time_info[1]) second = int(time_info[2]) return datetime.datetime(year, month, day, hour, minute, second) def _make_datetime(response): """Converts dates on responses to datetime objects.""" result = [] for tweet in response: result.append(TwitterNetworkData(tweet)) return result class TwitterNetworkData(NetworkData): """A simple wrapper around NetworkData, to make things easier to convert twitter data into a NetworkData object.""" def __init__(self, data): """Class initialization. Receives a dictionary with a single tweet.""" NetworkData.__init__(self) self.id = data['id'] self.name = data['user']['name'] self.username = data['user']['screen_name'] self.avatar = data['user']['profile_image_url'] self.message_time = _to_datetime(data['created_at']) if 'favorited' in data: self.favourited = data['favorited'] if 'in_reply_to_status_id' in data and data['in_reply_to_status_id']: self.parent = int(data['in_reply_to_status_id']) self.parent_owner = data['in_reply_to_screen_name'] if 'retweeted_status' in data: self.reposted_by = self.username retweet_user = data['retweeted_status']['user'] self.name = retweet_user['name'] self.username = retweet_user['screen_name'] self.avatar = retweet_user['profile_image_url'] self.id = data['retweeted_status']['id'] # also switch the text for the original text. data['text'] = data['retweeted_status']['text'] # keep the message_time as is, so we have the retweet time, not he # original message time # Twitter encodes a lot of HTML entities, which are not good when # you want to *display* then (e.g., "<" returns to us as "<"). # So we convert this here. Interfaces need to worry about converting # them if it becomes a problem. self.message = _unhtml(data['text']) return class Connection(NetworkBase): """Base class to talk to twitter.""" NAMESPACE = 'Twitter' SHORTCUT = 'tw' # TODO: find a way to move this to the config file def is_setup(self): """Return True or False if the network is setup/enabled.""" if (self._options[self.NAMESPACE]['username'] and self._options[self.NAMESPACE]['password']): # Consider the network enabled if there is an username and # password return True else: return False def __init__(self, options): self._options = options @property def server(self): if self._options[self.NAMESPACE]['https']: return self._options[self.NAMESPACE]['secure_server_url'] else: return self._options[self.NAMESPACE]['server_url'] def _common_headers(self): """Returns a string with the normal headers we should add on every request""" auth = base64.b64encode('%s:%s' % ( self._options[self.NAMESPACE]['username'], self._options[self.NAMESPACE]['password'])) headers = { 'Authorization': 'Basic %s' % (auth), 'User-Agent': self._user_agent} return headers def _request(self, resource, headers=None, body=None): """Send a request to the Twitter server. Once finished, call the function at callback.""" url = '%s%s' % (self.server, resource) _log.debug('Request %s' % (url)) request = urllib2.Request(url=url) request_headers = self._common_headers() if headers: request_headers.update(headers) for key in request_headers: _log.debug('Header: %s=%s' % (key, request_headers[key])) request.add_header(key, request_headers[key]) if body: _log.debug('Body: %s' % (body)) request.add_data(body) try: _log.debug('Starting request of %s' % (url)) response = urllib2.urlopen(request) data = response.read() except urllib2.HTTPError, exc: _log.debug('HTTPError: %d' % (exc.code)) _log.debug('HTTPError: response body:\n%s' % exc.read()) # To me, I got a lot of 502 for "replies". It shows the # "Something is technically wrong" most of the time in the real # pages. if exc.code == 403: # Permission denied. raise NetworkPermissionDeniedError raise NetworkInvalidResponseError except urllib2.URLError, exc: _log.error('URL error: %s' % exc.reason) raise NetworkDNSError except BadStatusLine: _log.error('Bad status line (Twitter is going bananas)') raise NetworkBadStatusLineError except socketError: # That's the worst exception ever. _log.error('Socket connection error') raise NetworkLowLevelError # TODO: Permission denied? # Introduced in Twitter in 2009.03.27 response_headers = response.info() if 'X-RateLimit-Remaining' in response_headers: self._rate_limit = int(response_headers['X-RateLimit-Remaining']) _log.debug('Remaning hits: %d', self._rate_limit) elif 'x-ratelimit-remaining' in response_headers: self._rate_limit = int(response_headers['x-ratelimit-remaining']) _log.debug('Remaning hits: %d', self._rate_limit) else: self._rate_limit = None _log.debug('Request completed') _log.debug('info(%s): %s', type(response.info()), response.info()) return json.loads(data) # # New network style methods # AUTH = [ {'name': 'username', 'flags': ['-u', '--username'], 'prompt': 'Username', 'help': 'Your twitter username', 'type': 'str'}, {'name': 'password', 'flags': ['-p', '--password'], 'prompt': 'Password', 'help': 'Your twitter password', 'type': 'passwd'}] @classmethod def options(self, options): """Add options related to Twitter.""" options.add_group(self.NAMESPACE, 'Twitter network') options.add_option('-s', '--no-https', group=self.NAMESPACE, option='https', default=True, # Secure connections by default help='Disable HTTPS (secure) connection with Twitter.', action='store_false') options.add_option( group=self.NAMESPACE, option='last_tweet', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='last_reply', default=0, is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='server_url', default='http://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='secure_server_url', default='https://api.twitter.com/1', is_cmd_option=False) options.add_option( group=self.NAMESPACE, option='message_threshold', default=16, is_cmd_option=False) auth_options(self.NAMESPACE, options, self.AUTH) return def _timeline(self, config_var, url): """Request one of the lists of tweets.""" last_id = int(self._options[self.NAMESPACE][config_var]) _log.debug('%s: %d', config_var, last_id) params = {} if last_id > 0: params['since_id'] = last_id page = 1 result = [] response = [0] # So we stay in the loop. high_id = 0 while response: # Not the cleanest code # TODO: How the interfaces can interrupt this? if page > 1: params['page'] = page final_url = '?'.join([url, urllib.urlencode(params)]) response = self._request(final_url) _log.debug('Page %d, %d results', page, len(response)) if response: # extract the highest id in the respone and save it so we can # use it when requesting data again (using the since_id # parameter) top_tweet_id = response[0]['id'] _log.debug('Top tweet: %d; Highest seen tweet: %d', top_tweet_id, high_id) if top_tweet_id > high_id: high_id = top_tweet_id response_data = _make_datetime(response) result.extend(response_data) page += 1 # Request the next page _log.debug('%d messages, %d threshold' % (len(response_data), self._options[self.NAMESPACE]['message_threshold'])) if (len(response_data) <= self._options[self.NAMESPACE]['message_threshold']): break if last_id == 0: # do not try to download everything if we don't have a # previous list (or we'll blow the available requests in one # short) break # only update the "last seen id" if everything goes alright if high_id > int(self._options[self.NAMESPACE][config_var]): _log.debug('Last tweet updated: %d', high_id) self._options[self.NAMESPACE][config_var] = high_id return result def messages(self): """Return a list of NetworkData objects for the main "timeline".""" return self._timeline('last_tweet', '/statuses/home_timeline.json') def message(self, message_id): """Retrieves the information of one message.""" response = self._request('/statuses/show/%d.json' % (message_id)) return TwitterNetworkData(response) def link(self, message): """Return a link directly to the message.""" assert(isinstance(message, NetworkData)) return 'http://twitter.com/%s/status/%s' % (message.username, message.id) def reply_prefix(self, message): """Returns the prefix needed for a reply.""" assert(isinstance(message, NetworkData)) return '@' + message.username + ' ' def replies(self): """Return a list of NetworkData objects for the replies for the user messages.""" return self._timeline('last_reply', '/statuses/replies.json') def available_requests(self): """Return the current user rate limit.""" if self._rate_limit: return self._rate_limit data = self._request('/account/rate_limit_status.json') _log.debug('Requests: %s', data) return int(data['remaining_hits']) def update(self, status, reply_to=None): """Update the user status.""" if len(status) > 140: warnings.warn('Message too long', MessageTooLongWarning) # In Python 2.5, urllib.urlencode calls str(), which removes the # unicodeness of the "status". So we need to convert those peski # accents to HTML entities, so everything falls into ASCII. body = { 'status': htmlize(status), 'source': 'mitter'} if reply_to: if isinstance(reply_to, NetworkData): body['in_reply_to_status_id'] = reply_to.id # This is to protect the user from himself. You don't *need* # to start a reply with a @, but it looks really # confusing in the Twiter website. So if the line doesn't # start with the username of the original user, we add it # for the user. if not status.startswith('@' + reply_to.username): body['status'] = '@' + reply_to.username + ' ' + \ status else: body['in_reply_to_status_id'] = reply_to _log.debug('Body: %s', body) body = urllib.urlencode(body) _log.debug('Message to twitter: %s' % (body)) data = self._request('/statuses/update.json', body=body) # TODO: Check if twitter sends an error message when the message is # too large. return TwitterNetworkData(data) def repost(self, message): """Repost a message.""" assert(isinstance(message, NetworkData)) body = urllib.urlencode({'id': message.id}) resource = '/statuses/retweet/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data) def favourite(self, message): """Mark a message as favourite.""" assert(isinstance(message, NetworkData)) body = urllib.urlencode({'id': message.id}) if not message.favourite: resource = '/favorites/create/%d.json' % (message.id) else: resource = '/favorites/destroy/%d.json' % (message.id) data = self._request(resource, body=body) return TwitterNetworkData(data) def delete_message(self, message): """Delete a message.""" if isinstance(message, NetworkData): message = message.id # We don't need anything else for Twitter # make a body, so _request makes it a post. body = urllib.urlencode({'id': message}) resource = '/statuses/destroy/%s.json' % (message) response = self._request(resource, body=body) _log.debug('Delete response: %s', response) return True # Either we get a response or an exception before we reach # this. def can_delete(self, message): """Check if the message belongs to the user. If so, returns True; False otherwise.""" assert(isinstance(message, NetworkData)) return (message.username == self._options[self.NAMESPACE]['username']) def can_reply(self, message): """Always return True; Twitter allows replying to any messages, including the ones from the user.""" return True def can_repost(self, message): """Twitter ignores retweets from the user.""" assert(isinstance(message, NetworkData)) return not (message.username == self._options[self.NAMESPACE]['username']) def can_favourite(self, message): """Always return True; Twitter allows favouriting/unfavouriting any messages.""" return True