You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

851 lines
28 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# redfam.py
#
# Copyright 2018 Jonathan Golder <jonathan@golderweb.de>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
"""
Provides classes for working with RedFams
"""
import hashlib
import locale
import re
import urllib.parse
from datetime import datetime
import mwparserfromhell as mwparser # noqa
import pywikibot # noqa
from pywikibot.tools import deprecated # noqa
import jogobot
from lib.mysqlred import MysqlRedFam, text
class RedFam( MysqlRedFam ):
"""
Basic class for RedFams, containing the basic data structure
"""
def __init__( self, articlesList, beginning, ending=None, redpageid=None,
status=None, famhash=None, heading=None ):
"""
Generates a new RedFam object
@param articlesList list List of articles
@param beginning datetime Beginning date
@param ending datetime Ending date
@param red_page_id int MW pageid of containing RedPage
@param status str Status of RedFam
@param fam_hash str SHA1 hash of articlesList
@param heading str Original heading of RedFam (Link)
"""
# Having pywikibot.Site() is a good idea most of the time
self.site = pywikibot.Site()
super().__init__(
articlesList=articlesList,
beginning=beginning,
ending=ending,
redpageid=redpageid,
famhash=famhash,
heading=heading,
status=status,
articlesStatus=None
)
def __repr__( self ):
"""
Returns repression str of RedFam object
@returns str repr() string
"""
__repr = "RedFam( " + \
"articlesList=" + repr( self.articlesList ) + \
", heading=" + repr( self.heading ) + \
", beginning=" + repr( self.beginning ) + \
", ending=" + repr( self.ending ) + \
", red_page_id=" + repr( self.redpageid ) + \
", status=" + repr( self.status ) + \
", fam_hash=" + repr( self.famhash ) + \
", articlesStatus=" + repr( self.articlesStatus ) + \
" )"
return __repr
@classmethod
def calc_famhash(cls, articlesList ):
"""
Calculates the SHA-1 hash for the articlesList of redundance family.
Since we don't need security SHA-1 is just fine.
@returns str String with the hexadecimal hash digest
"""
h = hashlib.sha1()
# Since articlesList attr of RedFam will have always 8 Members we
# need to fill up smaller lists (longers will be cropped below).
while len( articlesList) < 8:
articlesList.append(None)
h.update( str( articlesList[:8] ).encode('utf-8') )
return h.hexdigest()
@classmethod
def flush_db_cache( cls ):
"""
Calls flush method of Mysql Interface class
"""
cls.session.commit()
def article_add_status(self, status, index=None, title=None ):
"""
Adds a status specified by status, to article (identified by title
or index in articlesList) status set
@param status Statusstring to add
@type status str
@param index Add to article with index in articlesList
@type index int
@param title Add to article with title in articlesList
@type title str
"""
if title and not index:
index = self.articlesList.index( title )
if isinstance( index, int ) and index < len(self.articlesList):
self.articlesStatus[index].add(status)
else:
raise IndexError( "No index given or wrong format!")
def article_remove_status(self, status, index=None, title=None, weak=True):
"""
Removes a status specified by status, from article (identified by title
or index in articlesList) status set
If weak is set to False it will throw a KeyError when trying to
remove a status not set.
@param status Statusstring to add
@type status str
@param index Remove from article with index in articlesList
@type index int
@param title Remove from article with title in articlesList
@type title str
@param weak Change behavior on missing status
@type bool
"""
if title and not index:
index = self.articlesList.index( title )
if isinstance( index, int ) and index < len(self.articlesList):
if weak:
self.articlesStatus[index].discard(status)
else:
self.articlesStatus[index].remove(status)
else:
raise IndexError( "No index given or wrong format!")
def article_has_status(self, status, index=None, title=None ):
"""
Adds a status specified by status, to articles (identified by title
or index in articlesList) status set
@param status Statusstring to add
@type status str
@param index Check article with index in articlesList
@type index int
@param title Check article with title in articlesList
@type title str
"""
if title and not index:
index = self.articlesList.index( title )
if isinstance( index, int ) and index < len(self.articlesList):
if status in self.articlesStatus[index]:
return True
else:
return False
else:
raise IndexError( "No index given or wrong format!")
class RedFamParser( RedFam ):
"""
Provides an interface to RedFam for adding/updating redundance families
while parsig redundance pages
"""
# Define the timestamp format
__timestamp_format = jogobot.config['redundances']['timestamp_format']
# Define section heading re.pattern
__sectionhead_pat = re.compile( r"^(.*\[\[.+\]\].*\[\[.+\]\].*)" )
# Define timestamp re.pattern
__timestamp_pat = re.compile( jogobot.config['redundances']
['timestamp_regex'] )
# Textpattern for recognisation of done-notices
__done_notice = ":<small>Archivierung dieses Abschnittes \
wurde gewünscht von:"
__done_notice2 = "{{Erledigt|"
def __init__( self, articlesList, heading, redpage, redpagearchive,
beginning, ending=None ):
"""
Creates a RedFam object based on data collected while parsing red_pages
combined with possibly former known data from db
@param redfam_heading str Wikitext heading of section
@param redpage page Pywikibot.page object
@param redpagearchive bool Is red_page an archive
@param beginning datetime Timestamp of beginning
str as strptime parseable string
@param ending datetime Timestamp of ending
str strptime parseable string
"""
# Calculates the sha1 hash over self._articlesList to
# rediscover known redundance families
famhash = type(self).calc_famhash(articlesList)
# Set object attributes:
self.redpage = redpage
# Parse Timestamps
beginning = self.__datetime(beginning)
if ending:
ending = self.__datetime(ending)
super().__init__( articlesList,
beginning,
ending=ending,
redpageid=redpage.page._pageid,
famhash=famhash,
heading=heading )
# Check status changes
self.check_status()
self.session.add(self)
def update( self, articlesList, heading, redpage, redpagearchive,
beginning, ending=None ):
self.articlesList = articlesList
self.heading = heading
self.redpage = redpage
self.redpageid = redpage.pageid
self.add_beginning( beginning )
if ending:
self.add_ending( ending )
self._redpagearchive = redpagearchive
# Check status changes
self.check_status()
@classmethod
def heading_parser( cls, heading ):
"""
Parses given red_fam_heading string and saves articles list
@param heading Heading of RedFam-Section
@type heading wikicode or mwparser-parseable
"""
# Parse string heading with mwparse again everytime
# In some cases the given wikicode is broken due to syntax errors
# (Task FS#77)
heading = mwparser.parse( str( heading ) )
articlesList = []
for link in heading.ifilter_wikilinks():
article = str( link.title ).strip()
# Short circuit empty links
if not article:
continue
# Make sure first letter is uppercase
article = article[0].upper() + article[1:]
# Unquote possible url encoded special chars
article = urllib.parse.unquote( article )
# Split in title and anchor part
article = article.split("#", 1)
# Replace underscores in title with spaces
article[0] = article[0].replace("_", " ")
if len(article) > 1:
# Strip both parts to prevent leading/trailing spaces
article[0] = article[0].strip()
article[1] = article[1].strip()
# other way round, replace spaces with underscores in anchors
article[1] = article[1].replace(" ", "_")
# Rejoin title and anchor
article = "#".join(article)
# Add to list
articlesList.append(article)
return articlesList
def add_beginning( self, beginning ):
"""
Adds the beginning date of a redundance diskussion to the object
@param datetime datetime Beginning date
"""
self.beginning = self.__datetime( beginning )
def add_ending( self, ending ):
"""
Adds the ending date of a redundance diskussion to the object.
@param datetime datetime Ending date
"""
self.ending = self.__datetime( ending )
def __datetime( self, timestamp ):
"""
Decides wether given timestamp is a parseable string or a
datetime object and returns a datetime object in both cases
@param datetime timestamp Datetime object
str timestamp Parseable string with timestamp
@returns datetime Datetime object
"""
# Make sure locale is set to 'de_DE.UTF-8' to prevent problems
# with wrong month abreviations in strptime
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
if( isinstance( timestamp, datetime ) ):
return timestamp
else:
result = datetime.strptime( timestamp,
type( self ).__timestamp_format )
return result
def check_status( self ):
"""
Handles detection of correct status
There are three possible stati:
- 0 Discussion running --> no ending, page is not an archive
- 1 Discussion over --> ending present, page is not an archive
- 2 Discussion archived --> ending (normaly) present, page is archive
- 3 and greater status was set by worker script, do not change it
"""
# Since we have parsed it, the section can never be absent
self.status.remove("absent")
# No ending, discussion is running:
# Sometimes archived discussions also have no detectable ending
if not self.ending and not self.redpage.archive:
self.status.add("open")
else:
self.status.remove("open")
if not self.redpage.archive:
self.status.add("done")
else:
self.status.remove("done")
self.status.remove("open")
self.status.add("archived")
@classmethod
def is_section_redfam_cb( cls, heading ):
"""
Used as callback for wikicode.get_sections in redpage.parse to
select sections which are redfams
"""
# Because of strange behavior in some cases, parse heading again
# (Task FS#77)
heading = mwparser.parse( str( heading ) )
# Make sure we have min. two wikilinks in heading to assume a redfam
if len( heading.filter_wikilinks() ) >= 2:
return True
else:
return False
@classmethod
def parser( cls, text, redpage, isarchive=False ):
"""
Handles parsing of redfam section
@param text Text of RedFam-Section
@type text wikicode or mwparser-parseable
"""
# Parse heading with mwparse if needed
if not isinstance( text, mwparser.wikicode.Wikicode ):
text = mwparser.parse( text )
# Extract heading text
heading = next( text.ifilter_headings() ).title.strip()
# Extract beginnig and maybe ending
(beginning, ending) = RedFamParser.extract_dates( text, isarchive )
# Missing beginning (Task: FS#76)
# Use first day of month of reddisc
if not beginning:
match = re.search(
jogobot.config["redundances"]["reddiscs_onlyinclude_re"],
redpage.page.title() )
if match:
beginning = datetime.strptime(
"01. {month} {year}".format(
month=match.group(1), year=match.group(2)),
"%d. %B %Y" )
articlesList = RedFamParser.heading_parser( heading )
famhash = RedFamParser.calc_famhash( articlesList )
# Check for existing objects in DB first in current redpage
redfam = redpage.redfams.get(famhash)
with RedFamParser.session.no_autoflush:
if not redfam:
# Otherwise in db table
redfam = RedFamParser.session.query(RedFamParser).filter(
RedFamParser.famhash == famhash ).one_or_none()
if redfam:
# Existing redfams need to be updated
redfam.update( articlesList, str(heading), redpage, isarchive,
beginning, ending )
else:
# Create the RedFam object
redfam = RedFamParser( articlesList, str(heading),
redpage, isarchive, beginning, ending )
# Add redfam to redpage object
redpage.redfams.set( redfam )
@classmethod
def extract_dates( cls, text, isarchive=False ):
"""
Returns tuple of the first and maybe last timestamp of a section.
Last timestamp is only returned if there is a done notice or param
*isarchiv* is set to 'True'
@param text Text to search in
@type line Any Type castable to str
@param isarchive If true skip searching done notice (on archivepages)
@type isarchive bool
@returns Timestamps, otherwise None
@returntype tuple of strs
"""
# Match all timestamps
matches = cls.__timestamp_pat.findall( str( text ) )
if matches:
# First one is beginning
# Since some timestamps are broken we need to reconstruct them
# by regex match groups
beginning = ( matches[0][0] + ", " + matches[0][1] + ". " +
matches[0][2] + ". " + matches[0][3] )
# Last one maybe is ending
# Done notice format 1
# Done notice format 2
# Or on archivepages
if ( cls.__done_notice in text or
cls.__done_notice2 in text or
isarchive ):
ending = ( matches[-1][0] + ", " + matches[-1][1] + ". " +
matches[-1][2] + ". " + matches[-1][3] )
else:
ending = None
# Missing dates (Task: FS#76)
else:
beginning = None
ending = None
return (beginning, ending)
class RedFamWorker( RedFam ):
"""
Handles working with redundance families stored in database
where discussion is finished
"""
def __init__( self ):
super().__init__()
# Make sure locale is set to 'de_DE.UTF-8' to prevent problems
# with wrong month abreviations in strptime
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
def article_generator(self, # noqa
filter_existing=None, filter_redirects=None,
exclude_article_status=[],
onlyinclude_article_status=[],
talkpages=None ):
"""
Yields pywikibot pageobjects for articles belonging to this redfams
in a generator
self.
@param filter_existing Set to True to only get existing pages
set to False to only get nonexisting pages
unset/None results in not filtering
@type filter_existing bool/None
@param filter_redirects Set to True to get only noredirectpages,
set to False to get only redirectpages,
unset/None results in not filtering
@type filter_redirects bool/None
@param talkpages Set to True to get Talkpages instead of article page
@type talkpages bool/None
"""
# Helper to leave multidimensional loop
# https://docs.python.org/3/faq/design.html#why-is-there-no-goto
class Continue(Exception):
pass
class Break(Exception):
pass
# Iterate over articles in redfam
for article in self.articlesList:
# To be able to control outer loop from inside child loops
try:
# Not all list elements contain articles
if not article:
raise Break()
page = pywikibot.Page( pywikibot.Link(article),
pywikibot.Site() )
# Filter existing pages if requested with filter_existing=False
if page.exists():
self.article_remove_status( "deleted", title=article )
if filter_existing is False:
raise Continue()
# Filter non existing Pages if requested with
# filter_existing=True
else:
self.article_add_status( "deleted", title=article )
if filter_existing:
raise Continue()
# Filter redirects if requested with filter_redirects=True
if page.isRedirectPage():
self.article_add_status( "redirect", title=article )
if filter_redirects:
raise Continue()
# Filter noredirects if requested with filter_redirects=False
else:
self.article_remove_status("redirect", title=article )
if filter_redirects is False:
raise Continue()
# Exclude by article status
for status in exclude_article_status:
if self.article_has_status( status, title=article ):
raise Continue()
# Only include by article status
for status in onlyinclude_article_status:
if not self.article_has_status( status, title=article ):
raise Continue()
# Proxy loop control to outer loop
except Continue:
continue
except Break:
break
# Follow moved pages
if self.article_has_status( "redirect", title=article ):
try:
page = page.moved_target()
# Short circuit if movement destination does not exists
if not page.exists():
continue
except pywikibot.exceptions.NoMoveTarget:
pass
# Exclude Users & User Talkpage
if page.namespace() == 2 or page.namespace() == 3:
self.article_add_status( "user", title=article )
continue
# Toggle talkpage
if talkpages and not page.isTalkPage() or\
not talkpages and page.isTalkPage():
page = page.toggleTalkPage()
# Add reference to redfam to pages
page.redfam = self
# Keep article title from db with page object
page.redarticle = article
# Yield filtered pages
yield page
def update_status( self ):
"""
Sets status to 3 when worked on
"""
for article in self.articlesList:
if not article:
break
if self.article_has_status( "sav_err", title=article ):
self.status.add( "sav_err" )
return
elif self.article_has_status( "note_rej", title=article ):
self.status.add( "note_rej" )
return
elif not self.article_has_status("deleted", title=article ) and \
not self.article_has_status("redirect", title=article) and\
not self.article_has_status("marked", title=article):
return
self.status.remove("sav_err")
self.status.remove("note_rej")
self.status.add( "marked" )
def get_disc_link( self, as_link=False ):
"""
Constructs and returns the link to Redundancy discussion
@param as_link If true, wrap link in double square brackets (wikilink)
@type as_link bool
@returns Link to diskussion
@rtype str
"""
# Expand templates using pwb site object
site = pywikibot.Site()
anchor_code = site.expand_text(self.heading.strip())
# Remove possibly embbeded files
anchor_code = re.sub( r"\[\[\w+:[^\|]+(?:\|.+){2,}\]\]", "",
anchor_code )
# Replace non-breaking-space by correct urlencoded value
anchor_code = anchor_code.replace( "&nbsp;", ".C2.A0" )
# Use mwparser to strip and normalize
anchor_code = mwparser.parse( anchor_code ).strip_code()
# We try it without any more parsing as mw will do while parsing page
link = self.redpage.pagetitle + "#" + anchor_code.strip()
if as_link:
return "[[{0}]]".format(link)
else:
return link
def disc_section_exists( self ):
"""
Checks weather the redundance discussion is still existing. Sometimes
it is absent, since heading was changed and therefore we get a
different famhash ergo new redfam.
As a side effect, the method sets status "absent" for missing sections.
@returns True if it exists otherwise False
@rtype bool
"""
# The redpage
discpage = pywikibot.Page(pywikibot.Site(), self.get_disc_link() )
# Parse redpage content
wikicode = mwparser.parse( discpage.get() )
# List fams
fams = wikicode.filter_headings(
matches=RedFamParser.is_section_redfam_cb )
# Check if current fam is in list of fams
# If not, set status absent and return False
if self.heading not in [ fam.title.strip() for fam in fams]:
self.status.remove("open")
self.status.add("absent")
return False
# The section exists
return True
def generate_disc_notice_template( self ):
"""
Generates notice template to add on discussion Pages of Articles when
redundancy discussion is finished
@return Notice template to add on article disc
@rtype wikicode-node
"""
# Generate template boilerplate
template = mwparser.nodes.template.Template(
jogobot.config['redundances']['disc_notice_template_name'])
# Index of first article's param
param_cnt = 3
# Iterate over articles in redfam
for article in self.articlesList:
if not article:
break
# Make sure to only use 8 articles (max. param 10)
if param_cnt > 10:
break
# Add param for article
template.add( param_cnt, article, True )
param_cnt += 1
# Add begin
begin = self.beginning.strftime( "%B %Y" )
template.add( "Beginn", begin, True )
# Add end (if not same as begin)
end = self.ending.strftime( "%B %Y" )
if not end == begin:
template.add( "Ende", end, True )
# Add link to related reddisc
template.add( "Diskussion", self.get_disc_link(), True )
# Add signature and timestamp
# Not used atm
# template.add( 1, "-- ~~~~", True )
return template
@classmethod
def list_by_status( cls, status ):
"""
Lists red_fams stored in db by given status
"""
mysql = MysqlRedFam()
for fam in mysql.get_by_status( status ):
try:
print( cls( fam ) )
except RedFamHashError:
print(fam)
raise
@classmethod
def gen_by_status_and_ending( cls, status, ending ):
"""
Yield red_fams stored in db by given status which have an ending after
given one
"""
for redfam in RedFamWorker.session.query(RedFamWorker).filter(
# NOT WORKING WITH OBJECT NOTATION
# RedFamWorker._status.like('archived'),
# RedFamWorker._status.like("%{0:s}%".format(status)),
text("status LIKE '%archived%'"),
text("status NOT LIKE '%marked%'"),
RedFamWorker.ending >= ending ):
yield redfam
@classmethod
def gen_open( cls ):
"""
Yield red_fams stored in db by given status which have an ending after
given one
"""
for redfam in RedFamWorker.session.query(RedFamWorker).filter(
# NOT WORKING WITH OBJECT NOTATION
text("status LIKE '%open%'") ):
yield redfam
class RedFamError( Exception ):
"""
Base class for all Errors of RedFam-Module
"""
def __init__( self, message=None ):
"""
Handles Instantiation of RedFamError's
"""
if not message:
self.message = "An Error occured while executing a RedFam action"
else:
self.message = message
def __str__( self ):
"""
Output of error message
"""
return self.message
class RedFamHashError( RedFamError ):
"""
Raised when given RedFamHash does not match with calculated
"""
def __init__( self, givenHash, calculatedHash ):
message = "Given fam_hash ('{given}') does not match with \
calculated ('{calc}'".format( given=givenHash, calc=calculatedHash )
super().__init__( message )
class RedFamHeadingError ( RedFamError ):
"""
Raised when given RedFamHeading does not match __sectionhead_pat Regex
"""
def __init__( self, heading ):
message = "Error while trying to parse section heading. Given heading \
'{heading}' does not match RegEx".format( heading=heading )
super().__init__( message )