User:Comte0/query templates mine.py

From Wikipedia, the free encyclopedia

#(c) 2007-2008 Paul Marques Mota
#
# This script goes through some user contributions and report the ones
# lacking the requested wikiproject template on their talk page.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
 
import datetime
import gzip
import re
import StringIO
import sys
import time
import urllib
import urllib2
from xml.dom.minidom import parse
 
VERSION = 3.0
QUERY_URL = u"http://en.wikipedia.org/w/api.php"
CACHE = {}
DEBUG = False
GZIP = True
HEADERS = {}
 
# Structure used for building the talk pages names
# It is a dictionary of regular expression lists 
#format: {namespace: ["string to search", "string to replace"]}
RELIST = {}
 
TITLES = ""
 
USAGE = 'Usage: python query_templates_mine.py [--debug] User Wikiprojects\n\
Example: python query_templates_mine.py --debug Comte0 "WP Australia,WikiProject France"'
 
def Query(**args):
    global HEADERS
 
    args.update({
        "format"   : "xml",  # Output in JSON format
    })
 
    req = urllib2.Request(QUERY_URL, urllib.urlencode(args), HEADERS)
 
    data = False
    while data == False:
        try:
            if GZIP:
                url = urllib2.urlopen(req)
                compressedstream = StringIO.StringIO(url.read())
                gzipper = gzip.GzipFile(fileobj=compressedstream)
                content = StringIO.StringIO(gzipper.read())
                data = parse(content)
            else:
                url = urllib2.urlopen(req)
                content = StringIO.StringIO(url.read())
                data = parse(content)
        except:
            if DEBUG:
                print >> sys.stderr, "error: HTTP"
            pass
    return data
 
def Process_Templates(arg):
    nb = 0
 
    for i,item in enumerate(arg.getElementsByTagName('page')):
        nb = nb + 1
        title = item.getAttribute("title")
        if DEBUG:
            print >> sys.stderr, "Processing: " + title.encode('utf8', 'replace')
        if item.getElementsByTagName("tl") != []:
            for link in item.getElementsByTagName("tl"):
                found = 0
                if P.match(link.getAttribute("title")):
                    found = 1
                    break
            if found == 0:
                print "*[[" + title.encode('utf8', 'replace') + "]] does not have the requested template(s)"
        else:
            print "*[[" + title.encode('utf8', 'replace') + "]] has no template"
 
def Process_Usercontribs(arg):
    titles = ""
 
    if arg.getElementsByTagName("usercontribs") == []:
        print >> sys.stderr, "error: no such user"
        exit(127)
    for item in arg.getElementsByTagName("item"):
        title = item.getAttribute("title")
        ns = int(item.getAttribute("ns"))
 
#The answer is about a nonexistant record in the database
        if title == []:
            print >> sys.stderr, "error: no title"
            print >> sys.stderr, item.toprettyxml()
            continue
 
        if title in CACHE:
            if DEBUG:
                print >> sys.stderr, title.encode('utf8', 'replace') + " (cached)"
            continue
        else:
            CACHE[title] = 1
 
            if DEBUG:
                print >> sys.stderr, title.encode('utf8', 'replace')
 
            if ns == 2 or ns == 3:
                if DEBUG:
                    print >> sys.stderr, title.encode('utf8', 'replace') + " (skipped userpage)"
                continue
 
            title = RELIST[ns][0].sub(RELIST[ns][1], title)
 
            if titles == "":
                titles = title.encode("utf8","replace")
            else:
                titles = titles + "|" + title.encode("utf8","replace")
 
    return titles
 
def Process(user, wp):
    uc_count = 500
    parse_count = 0
    user_titles = []
    wikiprojects = []
    global P, HEADERS, VERSION
 
    if GZIP:
        HEADERS = {
                    "User-Agent": "User:Comte0 v%s" % VERSION
                    ,"Accept-encoding": "gzip"
        }
    else:
        HEADERS = {
                    "User-Agent": "User:Comte0 v%s" % VERSION
        }
 
    wikiprojects = wp.split(",")
    for i,wikiproject in enumerate(wikiprojects):
        wikiprojects[i] = "Template:" + wikiproject
    wp = "|".join(wikiprojects)
    P = re.compile(wp)
 
    ns = Query(action="query", meta="siteinfo", siprop="namespaces")
    for s in ns.getElementsByTagName("ns"):
        i = int(s.attributes["id"].firstChild.data)
        if i<0:
            continue
        if i == 0:
            RELIST[i] = [re.compile("^"), "Talk:"]
        elif i%2 == 0:
            RELIST[i] = [re.compile("^" + s.firstChild.data), s.firstChild.data + " talk"]
        else:
            RELIST[i] = [re.compile(""), ""]
 
    print "Parsing contributions"
 
    uc_next = " "
    while uc_next != "":
        if uc_next == " ":
            data = Query(action="query", list="usercontribs", ucuser=user, uclimit=uc_count)
        else:
            if DEBUG:
                print >> sys.stderr
                print >> sys.stderr, "ucstart=" + uc_next
            data = Query(action="query", list="usercontribs", ucuser=user, uclimit=uc_count, ucstart=uc_next)
 
        user_titles.append(Process_Usercontribs(data))
 
        if data.getElementsByTagName("query-continue") == []:
            uc_next = ""
        else:
            ucs = data.getElementsByTagName("usercontribs")[0].getAttribute('ucstart')
            timestamp = datetime.datetime(*time.strptime(ucs,"%Y-%m-%dT%H:%M:%SZ")[0:6])
            uc_next = timestamp.strftime("%Y%m%d%H%M%S")
 
        parse_count = parse_count + uc_count
        print parse_count
 
    print "Parsing templates"
 
    for i,v in enumerate(user_titles):
        if DEBUG:
            print >> sys.stderr
            print >> sys.stderr, i*uc_count
 
        data = Query(action="query", titles=v, prop="templates")
        Process_Templates(data)
 
if len(sys.argv) < 3:
    print >> sys.stderr, USAGE
else:
    if sys.argv[1] == "--debug":
        if len(sys.argv) == 4:
            DEBUG = True
            Process(sys.argv[2].encode("utf8","replace"),sys.argv[3])
        else:
            print >> sys.stderr, USAGE
    else:
        Process(sys.argv[1].encode("utf8","replace"), sys.argv[2])