You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

390 lines
11 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
import os
import locale
import urllib.request
import shutil
import datetime
import zipfile
import shlex
import subprocess
import email.utils
import hashlib
import json
import pywikibot
import pywikibot.specialbots
import jogobot
class EuroExangeBotJob():
"""
Used for EuroExangeBot job queue
"""
def __init__( self, **kwargs ):
self.image = kwargs['image']
self.script = kwargs['script']
self.freq = kwargs['freq']
# On beta prepend string TEST_ to filename to prevent overwriting of
# originial commons files
if pywikibot.Site().family.name == "wpbeta":
self.image = "TEST_{}".format(self.image)
class EuroExangeBot( pywikibot.bot.BaseBot ):
base_dir = os.path.dirname(os.path.realpath(__file__)) + "/.."
working_dir = os.path.dirname(os.path.realpath(__file__)) + "/../wdir"
gnuplot_script_dir = os.path.dirname(os.path.realpath(__file__)) + \
"/../gnuplot_scripts"
gnuplot = "/usr/bin/gnuplot"
data_source = "http://www.ecb.int/stats/eurofxref/eurofxref-hist.zip"
zip_file = "eurofxref-hist.zip"
csv_file = "eurofxref-hist.csv"
upload_comment = "Bot: ([[User:JogoBot/Euroexange|euroexange]]) update chart"
def __init__( self, genFactory, **kwargs ):
# Init working directory
self.init_wdir()
super().__init__(*kwargs)
def run(self):
# Make sure input data is uptodate
self.update_data()
# Load and treat jobs
for job in self.load_jobs():
self.treat_job(job)
def init_wdir(self):
"""
Make sure, the working directory exists
"""
#Normalize working dir
self.wdir = os.path.realpath(type(self).working_dir)
if os.path.exists(self.wdir):
if os.path.isdir(self.wdir):
return
else:
raise OSError( ("Working directory at {} already exists," +\
"but is no directory").format(
self.wdir))
else:
os.makedirs( self.wdir )
jogobot.output( "Create empty working directory at {}".format(
self.wdir))
def update_data(self):
"""
Checks if zip file exists and make sure it is uptodate, and extract
csv data if neccessary
"""
# Check if zip file exists
if os.path.exists( os.path.join(self.wdir, type(self).zip_file) ):
# If file is outdated, remove data input files
if not self.is_zip_uptodate():
self.remove_input_files()
# Recall method to get new file
self.update_data()
# Otherwise download
else:
self.download_zip()
# Extract csv data
self.extract_csv()
def is_zip_uptodate(self):
"""
Timechecks weather zip file is the most recent version based on mdate
@returns True if zip file is uptodate, otherwise false
@rtype bool
"""
# Get file stat
stat = os.stat( os.path.join(self.wdir, type(self).zip_file) )
# Get file modification datetime
mdt = datetime.datetime.fromtimestamp( stat.st_mtime )
# Current datetime
cdt = datetime.datetime.now()
# On weekends (weekday 5,6) update not sensefull
if cdt.weekday() == 5:
allowed_delta = 2
elif cdt.weekday() == 6:
allowed_delta = 3
else:
allowed_delta = 1
# If file is outdated, remove and recall method
if (cdt - mdt) >= datetime.timedelta(days=allowed_delta):
return False
return True
def remove_input_files(self):
"""
Deletes data input files
"""
input_files = ( os.path.join(self.wdir, type(self).zip_file),
os.path.join(self.wdir, type(self).csv_file) )
for f in input_files:
os.remove( f )
def download_zip( self ):
"""
Download the zipfile from EZB
"""
# Download the file and save it locally
with urllib.request.urlopen(type(self).data_source) as response,\
open( os.path.join(self.wdir,
type(self).zip_file), 'wb') as out_file:
shutil.copyfileobj(response, out_file)
# Extract original change date from http header
# We need to set it later, since we write a new file
mdate = email.utils.parsedate_to_datetime(
response.info()["Last-Modified"])
# Set ctime to value from http header
os.utime( os.path.join(self.wdir, type(self).zip_file),
(datetime.datetime.now().timestamp(), mdate.timestamp()) )
# Log
jogobot.output( "New input file downloaded." )
def extract_csv( self ):
"""
Extract csv file from zip archive
"""
if not os.path.exists( os.path.join(self.wdir, type(self).csv_file) ):
with zipfile.ZipFile(
os.path.join(self.wdir, type(self).zip_file)) as zipobj:
zipobj.extract(
os.path.basename(
os.path.join(self.wdir, type(self).csv_file)),
path=self.wdir )
def load_jobs( self ):
"""
Load jobs from json file
@returns Generator of EuroExangeBotJob objects
@rtype generator
"""
# Load json jobs file
with open( os.path.join(self.base_dir, "jobs.json"), "r") as fd:
jobs_js = json.load( fd )
# yield each job
for job_args in jobs_js:
yield EuroExangeBotJob( **job_args )
def treat_job( self, job ):
"""
Handles working on specific jobs
@param job: Job to work on
@type job: EuroExangeBotJob
"""
# Store reference to current job in Bot obj
self.current_job = job
# Log job
jogobot.output( "Work on Job {}".format(job.image) )
# Get file page
self.current_job.filepage = pywikibot.page.FilePage(
pywikibot.Site(), job.image)
# Skip if file not yet exists
if not self.current_job.filepage.exists():
jogobot.output( "Image {} does not exists on wiki, job skipped!".
format( self.current_job.image), "WARNING" )
return
#~ raise pywikibot.NoPage( self.current_job.filepage )
# Check if update is necessary
if self.image_update_needed():
self.call_gnuplot( job )
if self.file_changed():
self.upload_file( job )
else:
jogobot.output( "No upload needed for Job {}.".format(
self.current_job.image) )
# Nothing to do
else:
jogobot.output( "No update needed for Job {}".format(
self.current_job.image) )
def image_update_needed( self ):
"""
Checks weather image update intervall is reached.
@returns True if update needed
@rtype bool
"""
return True
# Get datetime of last update
last_update = self.current_job.filepage.latest_file_info.timestamp
# Get current time
now = pywikibot.Site().getcurrenttime()
# Calculate allowed delta (with tolerance)
delta = datetime.timedelta( days=self.current_job.freq, hours=-2 )
if now >= last_update + delta:
return True
else:
return False
def call_gnuplot( self, job ):
"""
@param job: Job to work on
@type job: EuroExangeBotJob
"""
cmd = shlex.split ( type(self).gnuplot + " " + os.path.realpath(
os.path.join( type(self).gnuplot_script_dir,
job.script + ".plt" ) ) )
plt_env = os.environ.copy()
plt_env["INFILE"] = type(self).csv_file
plt_env["OUTFILE"] = job.image
subprocess.call( cmd, cwd=self.wdir, env=plt_env )
def file_changed( self ):
"""
Checks if generated file and online file differs via sha1 hash
@returns True if file was changed
@rtype bool
"""
# Get online file sha1 hash
online_sha1 = self.current_job.filepage.latest_file_info.sha1
# Get local file sha1 hash
with open(os.path.join(self.wdir, self.current_job.image),'rb') as fd:
local_sha1 = hashlib.sha1(fd.read()).hexdigest()
if online_sha1 == local_sha1:
return False
else:
return True
def upload_file( self, job ):
"""
@param job: Job to work on
@type job: EuroExangeBotJob
"""
comment = type(self).upload_comment
filename = job.image
filepath = [ os.path.join(self.wdir, job.image) ]
keepFilename = True #set to True to skip double-checking/editing destination filename
verifyDescription = True #set to False to skip double-checking/editing description => change to bot-mode
ignoreWarning = [ "exists", "duplicate", ] #set to True to skip warnings, Upload even if another file would be overwritten or another mistake would be risked
targetSite = pywikibot.Site()
bot = pywikibot.specialbots.UploadRobot(
filepath,
description=comment,
useFilename=filename,
keepFilename=keepFilename,
verifyDescription=verifyDescription,
ignoreWarning=ignoreWarning,
targetSite = targetSite
)
bot.run()
def main(*args):
"""
Process command line arguments and invoke bot.
If args is an empty list, sys.argv is used.
@param args: command line arguments
@type args: list of unicode
"""
# Make sure locale is set to 'de_DE.UTF-8' to prevent problems
# with wrong month abreviations in strptime
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
# Process global arguments to determine desired site
local_args = pywikibot.handle_args(args)
# Get the jogobot-task_slug (basename of current file without ending)
task_slug = os.path.basename(__file__)[:-len(".py")]
# Actually not needed since we only run semi-automaticall
# Before run, we need to check wether we are currently active or not
#~ if not jogobot.bot.active( task_slug ):
#~ return
# Parse local Args to get information about subtask
( subtask, genFactory, subtask_args ) = jogobot.bot.parse_local_args(
local_args, None )
# Init Bot
bot = jogobot.bot.init_bot( task_slug, None, EuroExangeBot, genFactory)
# Run bot
jogobot.bot.run_bot( task_slug, None, bot )
if( __name__ == "__main__" ):
main()