#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # redfam.py # # Copyright 2018 Jonathan Golder # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # # """ Provides classes for working with RedFams """ import hashlib import locale import re import urllib.parse from datetime import datetime import mwparserfromhell as mwparser # noqa import pywikibot # noqa from pywikibot.tools import deprecated # noqa import jogobot from lib.mysqlred import MysqlRedFam, text class RedFam( MysqlRedFam ): """ Basic class for RedFams, containing the basic data structure """ def __init__( self, articlesList, beginning, ending=None, redpageid=None, status=None, famhash=None, heading=None ): """ Generates a new RedFam object @param articlesList list List of articles @param beginning datetime Beginning date @param ending datetime Ending date @param red_page_id int MW pageid of containing RedPage @param status str Status of RedFam @param fam_hash str SHA1 hash of articlesList @param heading str Original heading of RedFam (Link) """ # Having pywikibot.Site() is a good idea most of the time self.site = pywikibot.Site() super().__init__( articlesList=articlesList, beginning=beginning, ending=ending, redpageid=redpageid, famhash=famhash, heading=heading, status=status, articlesStatus=None ) def __repr__( self ): """ Returns repression str of RedFam object @returns str repr() string """ __repr = "RedFam( " + \ "articlesList=" + repr( self.articlesList ) + \ ", heading=" + repr( self.heading ) + \ ", beginning=" + repr( self.beginning ) + \ ", ending=" + repr( self.ending ) + \ ", red_page_id=" + repr( self.redpageid ) + \ ", status=" + repr( self.status ) + \ ", fam_hash=" + repr( self.famhash ) + \ ", articlesStatus=" + repr( self.articlesStatus ) + \ " )" return __repr @classmethod def calc_famhash(cls, articlesList ): """ Calculates the SHA-1 hash for the articlesList of redundance family. Since we don't need security SHA-1 is just fine. @returns str String with the hexadecimal hash digest """ h = hashlib.sha1() # Since articlesList attr of RedFam will have always 8 Members we # need to fill up smaller lists (longers will be cropped below). while len( articlesList) < 8: articlesList.append(None) h.update( str( articlesList[:8] ).encode('utf-8') ) return h.hexdigest() @classmethod def flush_db_cache( cls ): """ Calls flush method of Mysql Interface class """ cls.session.commit() def article_add_status(self, status, index=None, title=None ): """ Adds a status specified by status, to article (identified by title or index in articlesList) status set @param status Statusstring to add @type status str @param index Add to article with index in articlesList @type index int @param title Add to article with title in articlesList @type title str """ if title and not index: index = self.articlesList.index( title ) if isinstance( index, int ) and index < len(self.articlesList): self.articlesStatus[index].add(status) else: raise IndexError( "No index given or wrong format!") def article_remove_status(self, status, index=None, title=None, weak=True): """ Removes a status specified by status, from article (identified by title or index in articlesList) status set If weak is set to False it will throw a KeyError when trying to remove a status not set. @param status Statusstring to add @type status str @param index Remove from article with index in articlesList @type index int @param title Remove from article with title in articlesList @type title str @param weak Change behavior on missing status @type bool """ if title and not index: index = self.articlesList.index( title ) if isinstance( index, int ) and index < len(self.articlesList): if weak: self.articlesStatus[index].discard(status) else: self.articlesStatus[index].remove(status) else: raise IndexError( "No index given or wrong format!") def article_has_status(self, status, index=None, title=None ): """ Adds a status specified by status, to articles (identified by title or index in articlesList) status set @param status Statusstring to add @type status str @param index Check article with index in articlesList @type index int @param title Check article with title in articlesList @type title str """ if title and not index: index = self.articlesList.index( title ) if isinstance( index, int ) and index < len(self.articlesList): if status in self.articlesStatus[index]: return True else: return False else: raise IndexError( "No index given or wrong format!") class RedFamParser( RedFam ): """ Provides an interface to RedFam for adding/updating redundance families while parsig redundance pages """ # Define the timestamp format __timestamp_format = jogobot.config['redundances']['timestamp_format'] # Define section heading re.pattern __sectionhead_pat = re.compile( r"^(.*\[\[.+\]\].*\[\[.+\]\].*)" ) # Define timestamp re.pattern __timestamp_pat = re.compile( jogobot.config['redundances'] ['timestamp_regex'] ) # Textpattern for recognisation of done-notices __done_notice = ":Archivierung dieses Abschnittes \ wurde gewünscht von:" __done_notice2 = "{{Erledigt|" def __init__( self, articlesList, heading, redpage, redpagearchive, beginning, ending=None ): """ Creates a RedFam object based on data collected while parsing red_pages combined with possibly former known data from db @param redfam_heading str Wikitext heading of section @param redpage page Pywikibot.page object @param redpagearchive bool Is red_page an archive @param beginning datetime Timestamp of beginning str as strptime parseable string @param ending datetime Timestamp of ending str strptime parseable string """ # Calculates the sha1 hash over self._articlesList to # rediscover known redundance families famhash = type(self).calc_famhash(articlesList) # Set object attributes: self.redpage = redpage # Parse Timestamps beginning = self.__datetime(beginning) if ending: ending = self.__datetime(ending) super().__init__( articlesList, beginning, ending=ending, redpageid=redpage.page._pageid, famhash=famhash, heading=heading ) # Check status changes self.check_status() self.session.add(self) def update( self, articlesList, heading, redpage, redpagearchive, beginning, ending=None ): self.articlesList = articlesList self.heading = heading self.redpage = redpage self.redpageid = redpage.pageid self.add_beginning( beginning ) if ending: self.add_ending( ending ) self._redpagearchive = redpagearchive # Check status changes self.check_status() @classmethod def heading_parser( cls, heading ): """ Parses given red_fam_heading string and saves articles list @param heading Heading of RedFam-Section @type heading wikicode or mwparser-parseable """ # Parse string heading with mwparse again everytime # In some cases the given wikicode is broken due to syntax errors # (Task FS#77) heading = mwparser.parse( str( heading ) ) articlesList = [] for link in heading.ifilter_wikilinks(): article = str( link.title ).strip() # Short circuit empty links if not article: continue # Make sure first letter is uppercase article = article[0].upper() + article[1:] # Unquote possible url encoded special chars article = urllib.parse.unquote( article ) # Split in title and anchor part article = article.split("#", 1) # Replace underscores in title with spaces article[0] = article[0].replace("_", " ") if len(article) > 1: # Strip both parts to prevent leading/trailing spaces article[0] = article[0].strip() article[1] = article[1].strip() # other way round, replace spaces with underscores in anchors article[1] = article[1].replace(" ", "_") # Rejoin title and anchor article = "#".join(article) # Add to list articlesList.append(article) return articlesList def add_beginning( self, beginning ): """ Adds the beginning date of a redundance diskussion to the object @param datetime datetime Beginning date """ self.beginning = self.__datetime( beginning ) def add_ending( self, ending ): """ Adds the ending date of a redundance diskussion to the object. @param datetime datetime Ending date """ self.ending = self.__datetime( ending ) def __datetime( self, timestamp ): """ Decides wether given timestamp is a parseable string or a datetime object and returns a datetime object in both cases @param datetime timestamp Datetime object str timestamp Parseable string with timestamp @returns datetime Datetime object """ # Make sure locale is set to 'de_DE.UTF-8' to prevent problems # with wrong month abreviations in strptime locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') if( isinstance( timestamp, datetime ) ): return timestamp else: result = datetime.strptime( timestamp, type( self ).__timestamp_format ) return result def check_status( self ): """ Handles detection of correct status There are three possible stati: - 0 Discussion running --> no ending, page is not an archive - 1 Discussion over --> ending present, page is not an archive - 2 Discussion archived --> ending (normaly) present, page is archive - 3 and greater status was set by worker script, do not change it """ # Since we have parsed it, the section can never be absent self.status.remove("absent") # No ending, discussion is running: # Sometimes archived discussions also have no detectable ending if not self.ending and not self.redpage.archive: self.status.add("open") else: self.status.remove("open") if not self.redpage.archive: self.status.add("done") else: self.status.remove("done") self.status.remove("open") self.status.add("archived") @classmethod def is_section_redfam_cb( cls, heading ): """ Used as callback for wikicode.get_sections in redpage.parse to select sections which are redfams """ # Because of strange behavior in some cases, parse heading again # (Task FS#77) heading = mwparser.parse( str( heading ) ) # Make sure we have min. two wikilinks in heading to assume a redfam if len( heading.filter_wikilinks() ) >= 2: return True else: return False @classmethod def parser( cls, text, redpage, isarchive=False ): """ Handles parsing of redfam section @param text Text of RedFam-Section @type text wikicode or mwparser-parseable """ # Parse heading with mwparse if needed if not isinstance( text, mwparser.wikicode.Wikicode ): text = mwparser.parse( text ) # Extract heading text heading = next( text.ifilter_headings() ).title.strip() # Extract beginnig and maybe ending (beginning, ending) = RedFamParser.extract_dates( text, isarchive ) # Missing beginning (Task: FS#76) # Use first day of month of reddisc if not beginning: match = re.search( jogobot.config["redundances"]["reddiscs_onlyinclude_re"], redpage.page.title() ) if match: beginning = datetime.strptime( "01. {month} {year}".format( month=match.group(1), year=match.group(2)), "%d. %B %Y" ) articlesList = RedFamParser.heading_parser( heading ) famhash = RedFamParser.calc_famhash( articlesList ) # Check for existing objects in DB first in current redpage redfam = redpage.redfams.get(famhash) with RedFamParser.session.no_autoflush: if not redfam: # Otherwise in db table redfam = RedFamParser.session.query(RedFamParser).filter( RedFamParser.famhash == famhash ).one_or_none() if redfam: # Existing redfams need to be updated redfam.update( articlesList, str(heading), redpage, isarchive, beginning, ending ) else: # Create the RedFam object redfam = RedFamParser( articlesList, str(heading), redpage, isarchive, beginning, ending ) # Add redfam to redpage object redpage.redfams.set( redfam ) @classmethod def extract_dates( cls, text, isarchive=False ): """ Returns tuple of the first and maybe last timestamp of a section. Last timestamp is only returned if there is a done notice or param *isarchiv* is set to 'True' @param text Text to search in @type line Any Type castable to str @param isarchive If true skip searching done notice (on archivepages) @type isarchive bool @returns Timestamps, otherwise None @returntype tuple of strs """ # Match all timestamps matches = cls.__timestamp_pat.findall( str( text ) ) if matches: # First one is beginning # Since some timestamps are broken we need to reconstruct them # by regex match groups beginning = ( matches[0][0] + ", " + matches[0][1] + ". " + matches[0][2] + ". " + matches[0][3] ) # Last one maybe is ending # Done notice format 1 # Done notice format 2 # Or on archivepages if ( cls.__done_notice in text or cls.__done_notice2 in text or isarchive ): ending = ( matches[-1][0] + ", " + matches[-1][1] + ". " + matches[-1][2] + ". " + matches[-1][3] ) else: ending = None # Missing dates (Task: FS#76) else: beginning = None ending = None return (beginning, ending) class RedFamWorker( RedFam ): """ Handles working with redundance families stored in database where discussion is finished """ def __init__( self ): super().__init__() # Make sure locale is set to 'de_DE.UTF-8' to prevent problems # with wrong month abreviations in strptime locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') def article_generator(self, # noqa filter_existing=None, filter_redirects=None, exclude_article_status=[], onlyinclude_article_status=[], talkpages=None ): """ Yields pywikibot pageobjects for articles belonging to this redfams in a generator self. @param filter_existing Set to True to only get existing pages set to False to only get nonexisting pages unset/None results in not filtering @type filter_existing bool/None @param filter_redirects Set to True to get only noredirectpages, set to False to get only redirectpages, unset/None results in not filtering @type filter_redirects bool/None @param talkpages Set to True to get Talkpages instead of article page @type talkpages bool/None """ # Helper to leave multidimensional loop # https://docs.python.org/3/faq/design.html#why-is-there-no-goto class Continue(Exception): pass class Break(Exception): pass # Iterate over articles in redfam for article in self.articlesList: # To be able to control outer loop from inside child loops try: # Not all list elements contain articles if not article: raise Break() page = pywikibot.Page( pywikibot.Link(article), pywikibot.Site() ) # Filter existing pages if requested with filter_existing=False if page.exists(): self.article_remove_status( "deleted", title=article ) if filter_existing is False: raise Continue() # Filter non existing Pages if requested with # filter_existing=True else: self.article_add_status( "deleted", title=article ) if filter_existing: raise Continue() # Filter redirects if requested with filter_redirects=True if page.isRedirectPage(): self.article_add_status( "redirect", title=article ) if filter_redirects: raise Continue() # Filter noredirects if requested with filter_redirects=False else: self.article_remove_status("redirect", title=article ) if filter_redirects is False: raise Continue() # Exclude by article status for status in exclude_article_status: if self.article_has_status( status, title=article ): raise Continue() # Only include by article status for status in onlyinclude_article_status: if not self.article_has_status( status, title=article ): raise Continue() # Proxy loop control to outer loop except Continue: continue except Break: break # Follow moved pages if self.article_has_status( "redirect", title=article ): try: page = page.moved_target() # Short circuit if movement destination does not exists if not page.exists(): continue except pywikibot.exceptions.NoMoveTargetError: pass # Exclude Users & User Talkpage if page.namespace() == 2 or page.namespace() == 3: self.article_add_status( "user", title=article ) continue # Toggle talkpage if talkpages and not page.isTalkPage() or\ not talkpages and page.isTalkPage(): page = page.toggleTalkPage() # Add reference to redfam to pages page.redfam = self # Keep article title from db with page object page.redarticle = article # Yield filtered pages yield page def update_status( self ): """ Sets status to 3 when worked on """ for article in self.articlesList: if not article: break if self.article_has_status( "sav_err", title=article ): self.status.add( "sav_err" ) return elif self.article_has_status( "note_rej", title=article ): self.status.add( "note_rej" ) return elif not self.article_has_status("deleted", title=article ) and \ not self.article_has_status("redirect", title=article) and\ not self.article_has_status("marked", title=article): return self.status.remove("sav_err") self.status.remove("note_rej") self.status.add( "marked" ) def get_disc_link( self, as_link=False ): """ Constructs and returns the link to Redundancy discussion @param as_link If true, wrap link in double square brackets (wikilink) @type as_link bool @returns Link to diskussion @rtype str """ # Expand templates using pwb site object site = pywikibot.Site() anchor_code = site.expand_text(self.heading.strip()) # Remove possibly embbeded files anchor_code = re.sub( r"\[\[\w+:[^\|]+(?:\|.+){2,}\]\]", "", anchor_code ) # Replace non-breaking-space by correct urlencoded value anchor_code = anchor_code.replace( " ", ".C2.A0" ) # Use mwparser to strip and normalize anchor_code = mwparser.parse( anchor_code ).strip_code() # We try it without any more parsing as mw will do while parsing page link = self.redpage.pagetitle + "#" + anchor_code.strip() if as_link: return "[[{0}]]".format(link) else: return link def disc_section_exists( self ): """ Checks weather the redundance discussion is still existing. Sometimes it is absent, since heading was changed and therefore we get a different famhash ergo new redfam. As a side effect, the method sets status "absent" for missing sections. @returns True if it exists otherwise False @rtype bool """ # The redpage discpage = pywikibot.Page(pywikibot.Site(), self.get_disc_link() ) # Parse redpage content wikicode = mwparser.parse( discpage.get() ) # List fams fams = wikicode.filter_headings( matches=RedFamParser.is_section_redfam_cb ) # Check if current fam is in list of fams # If not, set status absent and return False if self.heading not in [ fam.title.strip() for fam in fams]: self.status.remove("open") self.status.add("absent") return False # The section exists return True def generate_disc_notice_template( self ): """ Generates notice template to add on discussion Pages of Articles when redundancy discussion is finished @return Notice template to add on article disc @rtype wikicode-node """ # Generate template boilerplate template = mwparser.nodes.template.Template( jogobot.config['redundances']['disc_notice_template_name']) # Index of first article's param param_cnt = 3 # Iterate over articles in redfam for article in self.articlesList: if not article: break # Make sure to only use 8 articles (max. param 10) if param_cnt > 10: break # Add param for article template.add( param_cnt, article, True ) param_cnt += 1 # Add begin begin = self.beginning.strftime( "%B %Y" ) template.add( "Beginn", begin, True ) # Add end (if not same as begin) end = self.ending.strftime( "%B %Y" ) if not end == begin: template.add( "Ende", end, True ) # Add link to related reddisc template.add( "Diskussion", self.get_disc_link(), True ) # Add signature and timestamp # Not used atm # template.add( 1, "-- ~~~~", True ) return template @classmethod def list_by_status( cls, status ): """ Lists red_fams stored in db by given status """ mysql = MysqlRedFam() for fam in mysql.get_by_status( status ): try: print( cls( fam ) ) except RedFamHashError: print(fam) raise @classmethod def gen_by_status_and_ending( cls, status, ending ): """ Yield red_fams stored in db by given status which have an ending after given one """ for redfam in RedFamWorker.session.query(RedFamWorker).filter( # NOT WORKING WITH OBJECT NOTATION # RedFamWorker._status.like('archived'), # RedFamWorker._status.like("%{0:s}%".format(status)), text("status LIKE '%archived%'"), text("status NOT LIKE '%marked%'"), RedFamWorker.ending >= ending ): yield redfam @classmethod def gen_open( cls ): """ Yield red_fams stored in db by given status which have an ending after given one """ for redfam in RedFamWorker.session.query(RedFamWorker).filter( # NOT WORKING WITH OBJECT NOTATION text("status LIKE '%open%'") ): yield redfam class RedFamError( Exception ): """ Base class for all Errors of RedFam-Module """ def __init__( self, message=None ): """ Handles Instantiation of RedFamError's """ if not message: self.message = "An Error occured while executing a RedFam action" else: self.message = message def __str__( self ): """ Output of error message """ return self.message class RedFamHashError( RedFamError ): """ Raised when given RedFamHash does not match with calculated """ def __init__( self, givenHash, calculatedHash ): message = "Given fam_hash ('{given}') does not match with \ calculated ('{calc}'".format( given=givenHash, calc=calculatedHash ) super().__init__( message ) class RedFamHeadingError ( RedFamError ): """ Raised when given RedFamHeading does not match __sectionhead_pat Regex """ def __init__( self, heading ): message = "Error while trying to parse section heading. Given heading \ '{heading}' does not match RegEx".format( heading=heading ) super().__init__( message )