A micro-blogging tool with multiple interfaces.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

499 lines
18 KiB

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Mitter, a client for Twitter.
# Copyright (C) 2007, 2008 The Mitter Contributors
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import urllib
import urllib2
import logging
import datetime
import base64
import htmlentitydefs
import re
import warnings
import htmlentitydefs
import gettext
from httplib import BadStatusLine
from socket import error as socketError
from networkbase import NetworkBase, NetworkData, auth_options, \
NetworkDNSError, NetworkBadStatusLineError, NetworkLowLevelError, \
NetworkInvalidResponseError, NetworkPermissionDeniedError, \
MessageTooLongWarning
try:
# Python 2.6/3.0 JSON parser
import json
except ImportError:
# Fallback to SimpleJSON
import simplejson as json
# ----------------------------------------------------------------------
# I18n bits
# ----------------------------------------------------------------------
t = gettext.translation('ui_pygtk', fallback=True)
_ = t.gettext
N_ = t.ngettext
# ----------------------------------------------------------------------
# logging
# ----------------------------------------------------------------------
_log = logging.getLogger('mitterlib.network.Twitter')
# ----------------------------------------------------------------------
# the month names come directly from the site, so we are not affected by
# locale settings.
# ----------------------------------------------------------------------
_month_names = [None, 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug',
'Sep', 'Oct', 'Nov', 'Dec']
def _unhtml(text):
"""Convert text coming in HTML encoded to UTF-8 representations."""
new_text = []
copy_pos = 0
_log.debug('Original text: %s', text)
for code in re.finditer(r'&(\w+);', text):
new_text.append(text[copy_pos:code.start()])
entity = text[code.start()+1:code.end()-1]
if entity in htmlentitydefs.name2codepoint:
new_text.append(unichr(
htmlentitydefs.name2codepoint[entity]))
else:
new_text.append(code.group().decode('utf8'))
copy_pos = code.end()
new_text.append(text[copy_pos:])
_log.debug('New text: %s', new_text)
result = u''.join(new_text)
_log.debug('Result: %s', result)
return result
def _to_datetime(server_str):
"""Convert a date send by the server to a datetime object.
Ex:
from this:
Tue Mar 13 00:12:41 +0000 2007
to datetime.
"""
date_info = server_str.split(' ')
month = _month_names.index(date_info[1])
day = int(date_info[2])
year = int(date_info[5])
time_info = date_info[3].split(':')
hour = int(time_info[0])
minute = int(time_info[1])
second = int(time_info[2])
return datetime.datetime(year, month, day, hour, minute, second)
def _make_datetime(response):
"""Converts dates on responses to datetime objects."""
result = []
for tweet in response:
result.append(TwitterNetworkData(tweet))
return result
class TwitterNetworkData(NetworkData):
"""A simple wrapper around NetworkData, to make things easier to convert
twitter data into a NetworkData object."""
def __init__(self, data):
"""Class initialization. Receives a dictionary with a single tweet."""
NetworkData.__init__(self)
self.id = data['id']
self.name = data['user']['name']
self.username = data['user']['screen_name']
self.avatar = data['user']['profile_image_url']
self.message_time = _to_datetime(data['created_at'])
if 'favorited' in data:
self.favourited = data['favorited']
if 'in_reply_to_status_id' in data and data['in_reply_to_status_id']:
self.parent = int(data['in_reply_to_status_id'])
self.parent_owner = data['in_reply_to_screen_name']
if 'retweeted_status' in data:
self.reposted_by = self.username
retweet_user = data['retweeted_status']['user']
self.name = retweet_user['name']
self.username = retweet_user['screen_name']
self.avatar = retweet_user['profile_image_url']
self.id = data['retweeted_status']['id']
# also switch the text for the original text.
data['text'] = data['retweeted_status']['text']
# keep the message_time as is, so we have the retweet time, not he
# original message time
# Twitter encodes a lot of HTML entities, which are not good when
# you want to *display* then (e.g., "<" returns to us as "&lt;").
# So we convert this here. Interfaces need to worry about converting
# them if it becomes a problem.
self.message = _unhtml(data['text'])
return
class Connection(NetworkBase):
"""Base class to talk to twitter."""
NAMESPACE = 'Twitter'
SHORTCUT = 'tw' # TODO: find a way to move this to the config file
def is_setup(self):
"""Return True or False if the network is setup/enabled."""
if (self._options[self.NAMESPACE]['username'] and
self._options[self.NAMESPACE]['password']):
# Consider the network enabled if there is an username and
# password
return True
else:
return False
def __init__(self, options):
self._options = options
@property
def server(self):
if self._options[self.NAMESPACE]['https']:
return self._options[self.NAMESPACE]['secure_server_url']
else:
return self._options[self.NAMESPACE]['server_url']
def _common_headers(self):
"""Returns a string with the normal headers we should add on every
request"""
auth = base64.b64encode('%s:%s' % (
self._options[self.NAMESPACE]['username'],
self._options[self.NAMESPACE]['password']))
headers = {
'Authorization': 'Basic %s' % (auth),
'User-Agent': self._user_agent}
return headers
def _request(self, resource, headers=None, body=None):
"""Send a request to the Twitter server. Once finished, call the
function at callback."""
url = '%s%s' % (self.server, resource)
_log.debug('Request %s' % (url))
request = urllib2.Request(url=url)
request_headers = self._common_headers()
if headers:
request_headers.update(headers)
for key in request_headers:
_log.debug('Header: %s=%s' % (key, request_headers[key]))
request.add_header(key, request_headers[key])
if body:
_log.debug('Body: %s' % (body))
request.add_data(body)
try:
_log.debug('Starting request of %s' % (url))
response = urllib2.urlopen(request)
data = response.read()
except urllib2.HTTPError, exc:
_log.debug('HTTPError: %d' % (exc.code))
_log.debug('HTTPError: response body:\n%s' % exc.read())
# To me, I got a lot of 502 for "replies". It shows the
# "Something is technically wrong" most of the time in the real
# pages.
if exc.code == 403:
# Permission denied.
raise NetworkPermissionDeniedError
raise NetworkInvalidResponseError
except urllib2.URLError, exc:
_log.error('URL error: %s' % exc.reason)
raise NetworkDNSError
except BadStatusLine:
_log.error('Bad status line (Twitter is going bananas)')
raise NetworkBadStatusLineError
except socketError: # That's the worst exception ever.
_log.error('Socket connection error')
raise NetworkLowLevelError
# TODO: Permission denied?
# Introduced in Twitter in 2009.03.27
response_headers = response.info()
if 'X-RateLimit-Remaining' in response_headers:
self._rate_limit = int(response_headers['X-RateLimit-Remaining'])
_log.debug('Remaning hits: %d', self._rate_limit)
elif 'x-ratelimit-remaining' in response_headers:
self._rate_limit = int(response_headers['x-ratelimit-remaining'])
_log.debug('Remaning hits: %d', self._rate_limit)
else:
self._rate_limit = None
_log.debug('Request completed')
_log.debug('info(%s): %s', type(response.info()), response.info())
return json.loads(data)
#
# New network style methods
#
AUTH = [
{'name': 'username',
'flags': ['-u', '--username'],
'prompt': _('Username'),
'help': _('Your twitter username'),
'type': 'str'},
{'name': 'password',
'flags': ['-p', '--password'],
'prompt': _('Password'),
'help': _('Your twitter password'),
'type': 'passwd'}]
@classmethod
def options(self, options):
"""Add options related to Twitter."""
options.add_group(self.NAMESPACE, 'Twitter network')
options.add_option('-s', '--no-https',
group=self.NAMESPACE,
option='https',
default=True, # Secure connections by default
help=_('Disable HTTPS (secure) connection with Twitter.'),
action='store_false')
options.add_option(
group=self.NAMESPACE,
option='last_tweet',
default=0,
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='last_reply',
default=0,
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='server_url',
default='http://api.twitter.com/1',
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='secure_server_url',
default='https://api.twitter.com/1',
is_cmd_option=False)
options.add_option(
group=self.NAMESPACE,
option='message_threshold',
default=16,
is_cmd_option=False)
auth_options(self.NAMESPACE, options, self.AUTH)
return
def _timeline(self, config_var, url):
"""Request one of the lists of tweets."""
last_id = int(self._options[self.NAMESPACE][config_var])
_log.debug('%s: %d', config_var, last_id)
params = {}
if last_id > 0:
params['since_id'] = last_id
page = 1
result = []
response = [0] # So we stay in the loop.
high_id = 0
while response: # Not the cleanest code
# TODO: How the interfaces can interrupt this?
if page > 1:
params['page'] = page
final_url = '?'.join([url, urllib.urlencode(params)])
response = self._request(final_url)
_log.debug('Page %d, %d results', page, len(response))
if response:
# extract the highest id in the respone and save it so we can
# use it when requesting data again (using the since_id
# parameter)
top_tweet_id = response[0]['id']
_log.debug('Top tweet: %d; Highest seen tweet: %d',
top_tweet_id, high_id)
if top_tweet_id > high_id:
high_id = top_tweet_id
response_data = _make_datetime(response)
result.extend(response_data)
page += 1 # Request the next page
_log.debug('%d messages, %d threshold' % (len(response_data),
self._options[self.NAMESPACE]['message_threshold']))
if (len(response_data) <=
self._options[self.NAMESPACE]['message_threshold']):
break
if last_id == 0:
# do not try to download everything if we don't have a
# previous list (or we'll blow the available requests in one
# short)
break
# only update the "last seen id" if everything goes alright
if high_id > int(self._options[self.NAMESPACE][config_var]):
_log.debug('Last tweet updated: %d', high_id)
self._options[self.NAMESPACE][config_var] = high_id
return result
def messages(self):
"""Return a list of NetworkData objects for the main "timeline"."""
return self._timeline('last_tweet', '/statuses/home_timeline.json')
def message(self, message_id):
"""Retrieves the information of one message."""
response = self._request('/statuses/show/%d.json' % (message_id))
return TwitterNetworkData(response)
def link(self, message):
"""Return a link directly to the message."""
assert(isinstance(message, NetworkData))
return 'http://twitter.com/%s/status/%s' % (message.username,
message.id)
def reply_prefix(self, message):
"""Returns the prefix needed for a reply."""
assert(isinstance(message, NetworkData))
return '@' + message.username + ' '
def replies(self):
"""Return a list of NetworkData objects for the replies for the user
messages."""
return self._timeline('last_reply', '/statuses/replies.json')
def available_requests(self):
"""Return the current user rate limit."""
if self._rate_limit:
return self._rate_limit
data = self._request('/account/rate_limit_status.json')
_log.debug('Requests: %s', data)
return int(data['remaining_hits'])
def update(self, status, reply_to=None):
"""Update the user status."""
if len(status) > 140:
warnings.warn('Message too long', MessageTooLongWarning)
body = {
'status': status,
'source': 'mitter'}
if reply_to:
if isinstance(reply_to, NetworkData):
body['in_reply_to_status_id'] = reply_to.id
# This is to protect the user from himself. You don't *need*
# to start a reply with a @<username>, but it looks really
# confusing in the Twiter website. So if the line doesn't
# start with the username of the original user, we add it
# for the user.
if not status.startswith('@' + reply_to.username):
body['status'] = '@' + reply_to.username + ' ' + \
status
else:
body['in_reply_to_status_id'] = reply_to
#_log.debug('Body: %s', body)
body = urllib.urlencode(body) # seems urlenconde is UTF8 safe now
_log.debug('Message to twitter: %s' % (body))
data = self._request('/statuses/update.json', body=body)
# TODO: Check if twitter sends an error message when the message is
# too large.
# TODO: Some updates return the previous status, not the new one. Not
# sure what that means.
return TwitterNetworkData(data)
def repost(self, message):
"""Repost a message."""
assert(isinstance(message, NetworkData))
body = urllib.urlencode({'id': message.id})
resource = '/statuses/retweet/%d.json' % (message.id)
data = self._request(resource, body=body)
return TwitterNetworkData(data)
def favourite(self, message):
"""Mark a message as favourite."""
assert(isinstance(message, NetworkData))
body = urllib.urlencode({'id': message.id})
if not message.favourite:
resource = '/favorites/create/%d.json' % (message.id)
else:
resource = '/favorites/destroy/%d.json' % (message.id)
data = self._request(resource, body=body)
return TwitterNetworkData(data)
def delete_message(self, message):
"""Delete a message."""
if isinstance(message, NetworkData):
message = message.id # We don't need anything else for Twitter
# make a body, so _request makes it a post.
body = urllib.urlencode({'id': message})
resource = '/statuses/destroy/%s.json' % (message)
response = self._request(resource, body=body)
_log.debug('Delete response: %s', response)
return True # Either we get a response or an exception before we reach
# this.
def can_delete(self, message):
"""Check if the message belongs to the user. If so, returns True;
False otherwise."""
assert(isinstance(message, NetworkData))
return (message.username == self._options[self.NAMESPACE]['username'])
def can_reply(self, message):
"""Always return True; Twitter allows replying to any messages,
including the ones from the user."""
return True
def can_repost(self, message):
"""Twitter ignores retweets from the user."""
assert(isinstance(message, NetworkData))
return not (message.username ==
self._options[self.NAMESPACE]['username'])
def can_favourite(self, message):
"""Always return True; Twitter allows favouriting/unfavouriting any
messages."""
return True