A micro-blogging tool with multiple interfaces.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

540 lines
18 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Mitter, a client for Twitter.
# Copyright (C) 2007, 2008 The Mitter Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib
import urllib2
import logging
import datetime
import base64
import htmlentitydefs
import re
import warnings
import htmlentitydefs
from httplib import BadStatusLine
from socket import error as socketError
#from mitterlib import htmlize
from networkbase import NetworkBase, NetworkData, auth_options, \
NetworkDNSError, NetworkBadStatusLineError, NetworkLowLevelError, \
NetworkInvalidResponseError, NetworkPermissionDeniedError, \
MessageTooLongWarning
try:
# Python 2.6/3.0 JSON parser
import json
except ImportError:
# Fallback to SimpleJSON
import simplejson as json
# logging
_log = logging.getLogger('mitterlib.network.Twitter')
# the month names come directly from the site, so we are not affected by
# locale settings.
_month_names = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug',
'Sep', 'Oct', 'Nov', 'Dec']
def htmlize(text):
"""Convert a normal text to the format required by Twitter (url-encoded
and UTF-8 encoded."""
# XXX: UTF-8 part is not working as it should, sadly.
if not isinstance(text, unicode):
text = unicode(text, 'utf-8')
# hex_char = (lambda x: '%' + hex(x)[2:].rjust(2, '0').upper())
#
# codes = []
# for char in text:
# char_code = ord(char)
# bytes = []
# mask = 0
#
# ranges = [65535, 2047, 127]
# for r in ranges:
# if char_code > r:
# mask >>= 1
# mask |= 128
#
# byte = char_code & 63
# byte |= 128
#
# bytes.insert(0, hex_char(byte))
#
# char_code >>= 6
#
# if mask:
# # multibyte char
# mask >>= 1
# mask |= 128
#
# byte = char_code | mask
# bytes.insert(0, hex_char(byte))
# else:
# if char_code < 32:
# # special char and it's not a multibyte char
# bytes.insert(0, hex_char(char_code))
# else:
# # normal, printable char
# bytes.insert(0, char)
#
# codes.append(''.join(bytes))
#
# text = ''.join(codes)
new = []
for char in text:
if ord(char) in htmlentitydefs.codepoint2name:
new.append('&%s;' % (htmlentitydefs.codepoint2name[ord(char)]))
elif ord(char) == 37: # '%'
new.append('%25')
else:
new.append(char)
return ''.join(new)
def _unhtml(text):
"""Convert text coming in HTML encoded to UTF-8 representations."""
new_text = []
copy_pos = 0
_log.debug('Original text: %s', text)
for code in re.finditer(r'&(\w+);', text):
new_text.append(text[copy_pos:code.start()])
entity = text[code.start()+1:code.end()-1]
if entity in htmlentitydefs.name2codepoint:
new_text.append(unichr(
htmlentitydefs.name2codepoint[entity]))
else:
new_text.append(code.group().decode('utf8'))
copy_pos = code.end()
new_text.append(text[copy_pos:])
_log.debug('New text: %s', new_text)
result = u''.join(new_text)
_log.debug('Result: %s', result)
return result
def _to_datetime(server_str):
"""Convert a date send by the server to a datetime object.
Ex:
from this:
Tue Mar 13 00:12:41 +0000 2007
to datetime.
"""
date_info = server_str.split(' ')
month = _month_names.index(date_info[1])
day = int(date_info[2])
year = int(date_info[5])
time_info = date_info[3].split(':')
hour = int(time_info[0])
minute = int(time_info[1])
second = int(time_info[2])
return datetime.datetime(year, month, day, hour, minute, second)
def _make_datetime(response):
"""Converts dates on responses to datetime objects."""
result = []
for tweet in response:
result.append(TwitterNetworkData(tweet))
return result
class TwitterNetworkData(NetworkData):
"""A simple wrapper around NetworkData, to make things easier to convert
twitter data into a NetworkData object."""
def __init__(self, data):
"""Class initialization. Receives a dictionary with a single tweet."""
NetworkData.__init__(self)
self.id = data['id']
self.name = data['user']['name']
self.username = data['user']['screen_name']
self.avatar = data['user']['profile_image_url']
self.message_time = _to_datetime(data['created_at'])
if 'favorited' in data:
self.favourited = data['favorited']
if 'in_reply_to_status_id' in data and data['in_reply_to_status_id']:
self.parent = int(data['in_reply_to_status_id'])
if 'retweeted_status' in data:
self.reposted_by = self.username
retweet_user = data['retweeted_status']['user']
self.name = retweet_user['name']
self.username = retweet_user['screen_name']
self.avatar = retweet_user['profile_image_url']
# also switch the text for the original text.
data['text'] = data['retweeted_status']['text']
# Twitter encodes a lot of HTML entities, which are not good when
# you want to *display* then (e.g., "<" returns to us as "&lt;").
# So we convert this here. Interfaces need to worry about converting
# them if it becomes a problem.
self.message = _unhtml(data['text'])
return
class Connection(NetworkBase):
"""Base class to talk to twitter."""
NAMESPACE = 'Twitter'
SHORTCUT = 'tw' # TODO: find a way to move this to the config file
def is_setup(self):
"""Return True or False if the network is setup/enabled."""
if (self._options[self.NAMESPACE]['username'] and
self._options[self.NAMESPACE]['password']):
# Consider the network enabled if there is an username and
# password
return True
else:
return False
def __init__(self, options):
self._options = options
@property
def server(self):
if self._options[self.NAMESPACE]['https']:
return self._options[self.NAMESPACE]['secure_server_url']
else:
return self._options[self.NAMESPACE]['server_url']
def _common_headers(self):
"""Returns a string with the normal headers we should add on every
request"""
auth = base64.b64encode('%s:%s' % (
self._options[self.NAMESPACE]['username'],
self._options[self.NAMESPACE]['password']))
headers = {
'Authorization': 'Basic %s' % (auth),
'User-Agent': self._user_agent}
return headers
def _request(self, resource, headers=None, body=None):
"""Send a request to the Twitter server. Once finished, call the
function at callback."""
url = '%s%s' % (self.server, resource)
_log.debug('Request %s' % (url))
request = urllib2.Request(url=url)
request_headers = self._common_headers()
if headers:
request_headers.update(headers)
for key in request_headers:
_log.debug('Header: %s=%s' % (key, request_headers[key]))
request.add_header(key, request_headers[key])
if body:
_log.debug('Body: %s' % (body))
request.add_data(body)
try:
_log.debug('Starting request of %s' % (url))
response = urllib2.urlopen(request)
data = response.read()
except urllib2.HTTPError, exc:
_log.debug('HTTPError: %d' % (exc.code))
_log.debug('HTTPError: response body:\n%s' % exc.read())
# To me, I got a lot of 502 for "replies". It shows the
# "Something is technically wrong" most of the time in the real
# pages.
if exc.code == 403:
# Permission denied.
raise NetworkPermissionDeniedError
raise NetworkInvalidResponseError
except urllib2.URLError, exc:
_log.error('URL error: %s' % exc.reason)
raise NetworkDNSError
except BadStatusLine:
_log.error('Bad status line (Twitter is going bananas)')
raise NetworkBadStatusLineError
except socketError: # That's the worst exception ever.
_log.error('Socket connection error')
raise NetworkLowLevelError
# TODO: Permission denied?
# Introduced in Twitter in 2009.03.27
response_headers = response.info()
if 'X-RateLimit-Remaining' in response_headers:
self._rate_limit = int(response_headers['X-RateLimit-Remaining'])
_log.debug('Remaning hits: %d', self._rate_limit)
elif 'x-ratelimit-remaining' in response_headers:
self._rate_limit = int(response_headers['x-ratelimit-remaining'])
_log.debug('Remaning hits: %d', self._rate_limit)
else:
self._rate_limit = None
_log.debug('Request completed')
_log.debug('info(%s): %s', type(response.info()), response.info())
return json.loads(data)
#
# New network style methods
#
AUTH = [
{'name': 'username',
'flags': ['-u', '--username'],
'prompt': 'Username',
'help': 'Your twitter username',
'type': 'str'},
{'name': 'password',
'flags': ['-p', '--password'],
'prompt': 'Password',
'help': 'Your twitter password',
'type': 'passwd'}]
@classmethod
def options(self, options):
"""Add options related to Twitter."""
options.add_group(self.NAMESPACE, 'Twitter network')
options.add_option('-s', '--no-https',
group=self.NAMESPACE,
option='https',
default=True, # Secure connections by default
help='Disable HTTPS (secure) connection with Twitter.',
action='store_false')
options.add_option(
group=self.NAMESPACE,
option='last_tweet',
default=0,
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='last_reply',
default=0,
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='server_url',
default='http://api.twitter.com/1',
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='secure_server_url',
default='https://api.twitter.com/1',
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='page_size',
default=20,
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='message_threshold',
default=4,
is_cmd_option=False)
auth_options(self.NAMESPACE, options, self.AUTH)
return
def _timeline(self, config_var, url):
"""Request one of the lists of tweets."""
last_id = int(self._options[self.NAMESPACE][config_var])
_log.debug('%s: %d', config_var, last_id)
params = {'count': self._options[self.NAMESPACE]['page_size']}
if last_id > 0:
params['since_id'] = last_id
page = 1
result = []
response = [0] # So we stay in the loop.
high_id = 0
while response: # Not the cleanest code
# TODO: How the interfaces can interrupt this?
if page > 1:
params['page'] = page
final_url = '?'.join([url, urllib.urlencode(params)])
response = self._request(final_url)
_log.debug('Page %d, %d results', page, len(response))
if response:
# extract the highest id in the respone and save it so we can
# use it when requesting data again (using the since_id
# parameter)
top_tweet_id = response[0]['id']
_log.debug('Top tweet: %d; Highest seen tweet: %d',
top_tweet_id, high_id)
if top_tweet_id > high_id:
high_id = top_tweet_id
response_data = _make_datetime(response)
result.extend(response_data)
page += 1 # Request the next page
_log.debug('%d messages, %d threshold' % (len(response_data),
self._options[self.NAMESPACE]['message_threshold']))
if (len(response_data) <=
self._options[self.NAMESPACE]['message_threshold']):
break
if last_id == 0:
# do not try to download everything if we don't have a
# previous list (or we'll blow the available requests in one
# short)
break
# only update the "last seen id" if everything goes alright
if high_id > int(self._options[self.NAMESPACE][config_var]):
_log.debug('Last tweet updated: %d', high_id)
self._options[self.NAMESPACE][config_var] = high_id
return result
def messages(self):
"""Return a list of NetworkData objects for the main "timeline"."""
return self._timeline('last_tweet', '/statuses/home_timeline.json')
def message(self, message_id):
"""Retrieves the information of one message."""
response = self._request('/statuses/show/%d.json' % (message_id))
return TwitterNetworkData(response)
def link(self, message):
"""Return a link directly to the message."""
return 'http://twitter.com/%s/status/%s' % (message.username,
message.id)
def reply_prefix(self, message):
"""Returns the prefix needed for a reply."""
return '@' + message.username + ' '
def replies(self):
"""Return a list of NetworkData objects for the replies for the user
messages."""
return self._timeline('last_reply', '/statuses/replies.json')
def available_requests(self):
"""Return the current user rate limit."""
if self._rate_limit:
return self._rate_limit
data = self._request('/account/rate_limit_status.json')
_log.debug('Requests: %s', data)
return int(data['remaining_hits'])
def update(self, status, reply_to=None):
"""Update the user status."""
if len(status) > 140:
warnings.warn('Message too long', MessageTooLongWarning)
# In Python 2.5, urllib.urlencode calls str(), which removes the
# unicodeness of the "status". So we need to convert those peski
# accents to HTML entities, so everything falls into ASCII.
body = {
'status': htmlize(status),
'source': 'mitter'}
if reply_to:
if isinstance(reply_to, NetworkData):
body['in_reply_to_status_id'] = reply_to.id
# This is to protect the user from himself. You don't *need*
# to start a reply with a @<username>, but it looks really
# confusing in the Twiter website. So if the line doesn't
# start with the username of the original user, we add it
# for the user.
if not status.startswith('@' + reply_to.username):
body['status'] = '@' + reply_to.username + ' ' + \
status
else:
body['in_reply_to_status_id'] = reply_to
_log.debug('Body: %s', body)
body = urllib.urlencode(body)
_log.debug('Message to twitter: %s' % (body))
data = self._request('/statuses/update.json', body=body)
# TODO: Check if twitter sends an error message when the message is
# too large.
return TwitterNetworkData(data)
def repost(self, message):
"""Repost a message."""
body = urllib.urlencode({'id': message.id})
resource = '/statuses/retweet/%d.json' % (message.id)
data = self._request(resource, body=body)
return TwitterNetworkData(data)
def favourite(self, message):
"""Mark a message as favourite."""
body = urllib.urlencode({'id': message.id})
if not message.favourite:
resource = '/favorites/create/%d.json' % (message.id)
else:
resource = '/favorites/destroy/%d.json' % (message.id)
data = self._request(resource, body=body)
return TwitterNetworkData(data)
def delete_message(self, message):
"""Delete a message."""
if isinstance(message, NetworkData):
message = message.id # We don't need anything else for Twitter
# make a body, so _request makes it a post.
body = urllib.urlencode({'id': message})
resource = '/statuses/destroy/%s.json' % (message)
response = self._request(resource, body=body)
_log.debug('Delete response: %s', response)
return True # Either we get a response or an exception before we reach
# this.
def can_delete(self, message):
"""Check if the message belongs to the user. If so, returns True;
False otherwise."""
return (message.username == self._options[self.NAMESPACE]['username'])
def can_reply(self, message):
"""Always return True; Twitter allows replying to any messages,
including the ones from the user."""
return True
def can_repost(self, message):
"""Twitter ignores retweets from the user."""
return not (message.username ==
self._options[self.NAMESPACE]['username'])
def can_favourite(self, message):
"""Always return True; Twitter allows favouriting/unfavouriting any
messages."""
return True