#!/usr/bin/python # -*- coding: utf-8 -*- # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # import os import locale import urllib.request import shutil import datetime import zipfile import shlex import subprocess import email.utils import hashlib import json import pywikibot import jogobot class EuroExangeBotJob(): """ Used for EuroExangeBot job queue """ def __init__( self, **kwargs ): self.image = kwargs['image'] self.script = kwargs['script'] self.freq = kwargs['freq'] class EuroExangeBot( pywikibot.bot.BaseBot ): base_dir = os.path.dirname(os.path.realpath(__file__)) + "/.." working_dir = os.path.dirname(os.path.realpath(__file__)) + "/../wdir" gnuplot_script_dir = os.path.dirname(os.path.realpath(__file__)) + \ "/../gnuplot_scripts" gnuplot = "/usr/bin/gnuplot" data_source = "http://www.ecb.int/stats/eurofxref/eurofxref-hist.zip" zip_file = "eurofxref-hist.zip" csv_file = "eurofxref-hist.csv" upload_comment = "Bot: ([[User:Jogobot/Euroexange|euroexange]]) update chart" def __init__( self, genFactory, **kwargs ): # Init working directory self.init_wdir() super().__init__(*kwargs) def run(self): # Make sure input data is uptodate self.update_data() # Load and treat jobs for job in self.load_jobs(): self.treat_job(job) def init_wdir(self): """ Make sure, the working directory exists """ #Normalize working dir self.wdir = os.path.realpath(type(self).working_dir) if os.path.exists(self.wdir): if os.path.isdir(self.wdir): return else: raise OSError( ("Working directory at {} already exists," +\ "but is no directory").format( self.wdir)) else: os.makedirs( self.wdir ) jogobot.output( "Create empty working directory at {}".format( self.wdir)) def update_data(self): """ Checks if zip file exists and make sure it is uptodate, and extract csv data if neccessary """ # Check if zip file exists if os.path.exists( os.path.join(self.wdir, type(self).zip_file) ): # If file is outdated, remove data input files if not self.is_zip_uptodate(): self.remove_input_files() # Recall method to get new file self.update_data() # Otherwise download else: self.download_zip() # Extract csv data self.extract_csv() def is_zip_uptodate(self): """ Timechecks weather zip file is the most recent version based on mdate @returns True if zip file is uptodate, otherwise false @rtype bool """ # Get file stat stat = os.stat( os.path.join(self.wdir, type(self).zip_file) ) # Get file modification datetime mdt = datetime.datetime.fromtimestamp( stat.st_mtime ) # Current datetime cdt = datetime.datetime.now() # On weekends (weekday 5,6) update not sensefull if cdt.weekday() == 5: allowed_delta = 2 elif cdt.weekday() == 6: allowed_delta = 3 else: allowed_delta = 1 # If file is outdated, remove and recall method if (cdt - mdt) >= datetime.timedelta(days=allowed_delta): return False return True def remove_input_files(self): """ Deletes data input files """ input_files = ( os.path.join(self.wdir, type(self).zip_file), os.path.join(self.wdir, type(self).csv_file) ) for f in input_files: os.remove( f ) def download_zip( self ): """ Download the zipfile from EZB """ # Download the file and save it locally with urllib.request.urlopen(type(self).data_source) as response,\ open( os.path.join(self.wdir, type(self).zip_file), 'wb') as out_file: shutil.copyfileobj(response, out_file) # Extract original change date from http header # We need to set it later, since we write a new file mdate = email.utils.parsedate_to_datetime( response.info()["Last-Modified"]) # Set ctime to value from http header os.utime( os.path.join(self.wdir, type(self).zip_file), (datetime.datetime.now().timestamp(), mdate.timestamp()) ) # Log jogobot.output( "New input file downloaded." ) def extract_csv( self ): """ Extract csv file from zip archive """ if not os.path.exists( os.path.join(self.wdir, type(self).csv_file) ): with zipfile.ZipFile( os.path.join(self.wdir, type(self).zip_file)) as zipobj: zipobj.extract( os.path.basename( os.path.join(self.wdir, type(self).csv_file)), path=self.wdir ) def load_jobs( self ): """ Load jobs from json file @returns Generator of EuroExangeBotJob objects @rtype generator """ # Load json jobs file with open( os.path.join(self.base_dir, "jobs.json"), "r") as fd: jobs_js = json.load( fd ) # yield each job for job_args in jobs_js: yield EuroExangeBotJob( **job_args ) def treat_job( self, job ): """ Handles working on specific jobs @param job: Job to work on @type job: EuroExangeBotJob """ # Store reference to current job in Bot obj self.current_job = job # Log job jogobot.output( "Work on Job {}".format(job.image) ) # Get file page self.current_job.filepage = pywikibot.page.FilePage( pywikibot.Site(), job.image) # Stop if file not jet exists if not self.current_job.filepage.exists(): jogobot.output( "Work on Job {}".format( self.current_job.image), "ERROR" ) raise pywikibot.NoPage( filepage ) # Check if update is necessary if self.image_update_needed(): self.call_gnuplot( job ) if self.file_changed(): self.upload_file( job ) else: jogobot.output( "No upload needed for Job {}.".format( self.current_job.image) ) # Nothing to do else: jogobot.output( "No update needed for Job {}".format( self.current_job.image) ) def image_update_needed( self ): """ Checks weather image update intervall is reached. @returns True if update needed @rtype bool """ return True # Get datetime of last update last_update = self.current_job.filepage.latest_file_info.timestamp # Get current time now = pywikibot.Site().getcurrenttime() # Calculate allowed delta (with tolerance) delta = datetime.timedelta( days=self.current_job.freq, hours=-2 ) if now >= last_update + delta: return True else: return False def call_gnuplot( self, job ): """ @param job: Job to work on @type job: EuroExangeBotJob """ cmd = shlex.split ( type(self).gnuplot + " " + os.path.realpath( os.path.join( type(self).gnuplot_script_dir, job.script + ".plt" ) ) ) plt_env = os.environ.copy() plt_env["INFILE"] = type(self).csv_file plt_env["OUTFILE"] = job.image subprocess.call( cmd, cwd=self.wdir, env=plt_env ) def file_changed( self ): """ Checks if generated file and online file differs via sha1 hash @returns True if file was changed @rtype bool """ # Get online file sha1 hash online_sha1 = self.current_job.filepage.latest_file_info.sha1 # Get local file sha1 hash with open(os.path.join(self.wdir, self.current_job.image),'rb') as fd: local_sha1 = hashlib.sha1(fd.read()).hexdigest() if online_sha1 == local_sha1: return False else: return True def upload_file( self, job ): """ @param job: Job to work on @type job: EuroExangeBotJob """ comment = type(self).upload_comment filename = job.image filepath = os.path.join(self.wdir, job.image) keepFilename=True #set to True to skip double-checking/editing destination filename verifyDescription=True #set to False to skip double-checking/editing description => change to bot-mode ignoreWarning=False #set to True to skip warnings, Upload even if another file would be overwritten or another mistake would be risked targetSite = pywikibot.Site() bot = pywikibot.specialbots.UploadRobot( filepath, description=comment, useFilename=filename, keepFilename=keepFilename, verifyDescription=verifyDescription, ignoreWarning=ignoreWarning, targetSite = targetSite ) bot.upload_image(debug=True) def main(*args): """ Process command line arguments and invoke bot. If args is an empty list, sys.argv is used. @param args: command line arguments @type args: list of unicode """ # Make sure locale is set to 'de_DE.UTF-8' to prevent problems # with wrong month abreviations in strptime locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8') # Process global arguments to determine desired site local_args = pywikibot.handle_args(args) # Get the jogobot-task_slug (basename of current file without ending) task_slug = os.path.basename(__file__)[:-len(".py")] # Actually not needed since we only run semi-automaticall # Before run, we need to check wether we are currently active or not #~ if not jogobot.bot.active( task_slug ): #~ return # Parse local Args to get information about subtask ( subtask, genFactory, subtask_args ) = jogobot.bot.parse_local_args( local_args, None ) # Init Bot bot = jogobot.bot.init_bot( task_slug, None, EuroExangeBot, genFactory) # Run bot jogobot.bot.run_bot( task_slug, None, bot ) if( __name__ == "__main__" ): main()