#!/usr/bin/python # -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # import os import locale import urllib.request import shutil import datetime import zipfile import shlex import subprocess import email.utils import hashlib import json import pywikibot import pywikibot.specialbots import jogobot from config import Config from descpage import DescPageBot class EuroExchangeBotJob(): """ Used for EuroExchangeBot job queue """ def __init__( self, **kwargs ): self.image = kwargs['image'] self.script = kwargs['script'] self.freq = kwargs['freq'] # On beta prepend string TEST_ to filename to prevent overwriting of # originial commons files if pywikibot.Site().family.name == "wpbeta": self.image = "TEST_{}".format(self.image) class EuroExchangeBot( pywikibot.bot.BaseBot ): def __init__( self, genFactory, **kwargs ): # Init working directory self.init_wdir() # Prepare DescPage editing bot self.descpagebot = DescPageBot( **kwargs ) super().__init__(**kwargs) def run(self): # Make sure input data is uptodate self.update_data() # Load and treat jobs for job in self.load_jobs(): self.treat_job(job) def init_wdir(self): """ Make sure, the working directory exists """ #Normalize working dir self.wdir = os.path.realpath(Config.working_dir) if os.path.exists(self.wdir): if os.path.isdir(self.wdir): return else: raise OSError( ("Working directory at {} already exists," +\ "but is no directory").format( self.wdir)) else: os.makedirs( self.wdir ) jogobot.output( "Create empty working directory at {}".format( self.wdir)) def update_data(self): """ Checks if zip file exists and make sure it is uptodate, and extract csv data if neccessary """ # Check if zip file exists if os.path.exists( os.path.join(self.wdir, Config.zip_file) ): # If file is outdated, remove data input files if not self.is_zip_uptodate(): self.remove_input_files() # Recall method to get new file self.update_data() # Otherwise download else: self.download_zip() # Extract csv data self.extract_csv() def is_zip_uptodate(self): """ Timechecks weather zip file is the most recent version based on mdate @returns True if zip file is uptodate, otherwise false @rtype bool """ # Get file stat stat = os.stat( os.path.join(self.wdir, Config.zip_file) ) # Get file modification datetime mdt = datetime.datetime.fromtimestamp( stat.st_mtime ) # Current datetime cdt = datetime.datetime.now() # On weekends (weekday 5,6) update not sensefull if cdt.weekday() == 5: allowed_delta = 2 elif cdt.weekday() == 6: allowed_delta = 3 else: allowed_delta = 1 # If file is outdated, remove and recall method if (cdt - mdt) >= datetime.timedelta(days=allowed_delta): return False return True def remove_input_files(self): """ Deletes data input files """ input_files = ( os.path.join(self.wdir, Config.zip_file), os.path.join(self.wdir, Config.csv_file) ) for f in input_files: os.remove( f ) def download_zip( self ): """ Download the zipfile from EZB """ # Download the file and save it locally with urllib.request.urlopen(Config.data_source) as response,\ open( os.path.join(self.wdir, Config.zip_file), 'wb') as out_file: shutil.copyfileobj(response, out_file) # Extract original change date from http header # We need to set it later, since we write a new file mdate = email.utils.parsedate_to_datetime( response.info()["Last-Modified"]) # Set ctime to value from http header os.utime( os.path.join(self.wdir, Config.zip_file), (datetime.datetime.now().timestamp(), mdate.timestamp()) ) # Log jogobot.output( "New input file downloaded." ) def extract_csv( self ): """ Extract csv file from zip archive """ if not os.path.exists( os.path.join(self.wdir, Config.csv_file) ): with zipfile.ZipFile( os.path.join(self.wdir, Config.zip_file)) as zipobj: zipobj.extract( os.path.basename( os.path.join(self.wdir, Config.csv_file)), path=self.wdir ) def load_jobs( self ): """ Load jobs from json file @returns Generator of EuroExchangeBotJob objects @rtype generator """ # Load json jobs file with open( os.path.join(Config.base_dir, "jobs.json"), "r") as fd: jobs_js = json.load( fd ) # yield each job for job_args in jobs_js: yield EuroExchangeBotJob( **job_args ) def treat_job( self, job ): """ Handles working on specific jobs @param job: Job to work on @type job: EuroExchangeBotJob """ # Store reference to current job in Bot obj self.current_job = job # Log job jogobot.output( "Work on Job {}".format(job.image) ) # Get file page self.current_job.filepage = pywikibot.page.FilePage( pywikibot.Site(), job.image) # Skip if file not yet exists if not self.current_job.filepage.exists(): jogobot.output( "Image {} does not exists on wiki, job skipped!". format( self.current_job.image), "WARNING" ) return #~ raise pywikibot.NoPage( self.current_job.filepage ) # Check if update is necessary if self.image_update_needed(): try: self.call_gnuplot( job ) if self.file_changed(): self.upload_file( job ) else: jogobot.output( "No upload needed for Job {}.".format( self.current_job.image) ) except subprocess.CalledProcessError as e: jogobot.output( "Subprocess terminated with exit code {}!". format( e.returncode), "ERROR" ) # Nothing to do else: jogobot.output( "No update needed for Job {}".format( self.current_job.image) ) # Update file description page self.descpagebot.treat_job( self.current_job ) def image_update_needed( self ): """ Checks weather image update intervall is reached. @returns True if update needed @rtype bool """ # Get datetime of last update last_update = self.current_job.filepage.latest_file_info.timestamp # Get current time now = pywikibot.Site().getcurrenttime() # Calculate allowed delta (with tolerance) delta = datetime.timedelta( days=self.current_job.freq, hours=-2 ) if now >= last_update + delta: return True else: return False def call_gnuplot( self, job ): """ @param job: Job to work on @type job: EuroExchangeBotJob """ cmd = shlex.split ( Config.gnuplot + " " + os.path.realpath( os.path.join( Config.gnuplot_script_dir, job.script + ".plt" ) ) ) plt_env = os.environ.copy() plt_env["INFILE"] = Config.csv_file plt_env["OUTFILE"] = job.image subprocess.check_call( cmd, cwd=self.wdir, env=plt_env ) def file_changed( self ): """ Checks if generated file and online file differs via sha1 hash @returns True if file was changed @rtype bool """ # Get online file sha1 hash online_sha1 = self.current_job.filepage.latest_file_info.sha1 # Get local file sha1 hash with open(os.path.join(self.wdir, self.current_job.image),'rb') as fd: local_sha1 = hashlib.sha1(fd.read()).hexdigest() if online_sha1 == local_sha1: return False else: return True def upload_file( self, job ): """ @param job: Job to work on @type job: EuroExchangeBotJob """ comment = Config.upload_comment filename = job.image filepath = [ os.path.join(self.wdir, job.image) ] keepFilename = True #set to True to skip double-checking/editing destination filename verifyDescription = True #set to False to skip double-checking/editing description => change to bot-mode ignoreWarning = [ "exists", "duplicate", ] #set to True to skip warnings, Upload even if another file would be overwritten or another mistake would be risked targetSite = pywikibot.Site() always = self.getOption("always") aborts = True if self.getOption("always") else list() bot = pywikibot.specialbots.UploadRobot( filepath, description=comment, useFilename=filename, keepFilename=keepFilename, verifyDescription=verifyDescription, ignoreWarning=ignoreWarning, targetSite = targetSite, always=always, aborts=aborts, ) bot.run() def main(*args): """ Process command line arguments and invoke bot. If args is an empty list, sys.argv is used. @param args: command line arguments @type args: list of unicode """ # Make sure locale is set to 'de_DE.UTF-8' to prevent problems # with wrong month abreviations in strptime locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') # Process global arguments to determine desired site local_args = pywikibot.handle_args(args) # Get the jogobot-task_slug (basename of current file without ending) task_slug = os.path.basename(__file__)[:-len(".py")] # Actually not needed since we only run semi-automaticall # Before run, we need to check wether we are currently active or not #~ if not jogobot.bot.active( task_slug ): #~ return # Parse local Args to get information about subtask ( subtask, genFactory, subtask_args ) = jogobot.bot.parse_local_args( local_args, None ) # Init Bot bot = jogobot.bot.init_bot( task_slug, None, EuroExchangeBot, genFactory, **subtask_args) # Run bot jogobot.bot.run_bot( task_slug, None, bot ) if( __name__ == "__main__" ): main()