You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
308 lines
9.2 KiB
308 lines
9.2 KiB
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
# MA 02110-1301, USA.
|
|
#
|
|
|
|
import os
|
|
import locale
|
|
import urllib.request
|
|
import shutil
|
|
import datetime
|
|
import zipfile
|
|
import shlex
|
|
import subprocess
|
|
import email.utils
|
|
|
|
import pywikibot
|
|
|
|
import jogobot
|
|
|
|
|
|
class EuroExangeBotJob():
|
|
"""
|
|
Used for EuroExangeBot job queue
|
|
"""
|
|
|
|
def __init__( self, **kwargs ):
|
|
|
|
self.image = kwargs['image']
|
|
self.script = kwargs['script']
|
|
|
|
|
|
class EuroExangeBot( pywikibot.bot.BaseBot ):
|
|
|
|
working_dir = os.path.dirname(os.path.realpath(__file__)) + "/../wdir"
|
|
gnuplot_script_dir = os.path.dirname(os.path.realpath(__file__)) + \
|
|
"/../gnuplot_scripts"
|
|
gnuplot = "/usr/bin/gnuplot"
|
|
data_source = "http://www.ecb.int/stats/eurofxref/eurofxref-hist.zip"
|
|
zip_file = "eurofxref-hist.zip"
|
|
csv_file = "eurofxref-hist.csv"
|
|
upload_comment = "Bot: ([[User:Jogobot/Euroexange|euroexange]]) update chart"
|
|
|
|
jobs = [ EuroExangeBotJob( image="TEST_Euro_exchange_rate_to_TRY_-_Turkish_Currency_and_Debt_Crisis_2018.svg", script="Euro_exchange_rate_to_TRY_-_Turkish_Currency_and_Debt_Crisis_2018" ) ]
|
|
|
|
def __init__( self, genFactory, **kwargs ):
|
|
|
|
# Init working directory
|
|
self.init_wdir()
|
|
|
|
super().__init__(*kwargs)
|
|
|
|
def run(self):
|
|
|
|
# Make sure input data is uptodate
|
|
self.update_data()
|
|
|
|
for job in type(self).jobs:
|
|
self.treat_job(job)
|
|
|
|
pass
|
|
|
|
def init_wdir(self):
|
|
"""
|
|
Make sure, the working directory exists
|
|
"""
|
|
|
|
#Normalize working dir
|
|
self.wdir = os.path.realpath(type(self).working_dir)
|
|
|
|
if os.path.exists(self.wdir):
|
|
|
|
if os.path.isdir(self.wdir):
|
|
return
|
|
else:
|
|
raise OSError( ("Working directory at {} already exists," +\
|
|
"but is no directory").format(
|
|
self.wdir))
|
|
|
|
else:
|
|
os.makedirs( self.wdir )
|
|
jogobot.output( "Create empty working directory at {}".format(
|
|
self.wdir))
|
|
|
|
def update_data(self):
|
|
"""
|
|
Checks if zip file exists and make sure it is uptodate, and extract
|
|
csv data if neccessary
|
|
"""
|
|
|
|
# Check if zip file exists
|
|
if os.path.exists( os.path.join(self.wdir, type(self).zip_file) ):
|
|
|
|
# If file is outdated, remove data input files
|
|
if not self.is_zip_uptodate():
|
|
self.remove_input_files()
|
|
|
|
# Recall method to get new file
|
|
self.update_data()
|
|
|
|
# Otherwise download
|
|
else:
|
|
self.download_zip()
|
|
|
|
# Extract csv data
|
|
self.extract_csv()
|
|
|
|
def is_zip_uptodate(self):
|
|
"""
|
|
Timechecks weather zip file is the most recent version based on mdate
|
|
|
|
@returns True if zip file is uptodate, otherwise false
|
|
@rtype bool
|
|
"""
|
|
# Get file stat
|
|
stat = os.stat( os.path.join(self.wdir, type(self).zip_file) )
|
|
|
|
# Get file modification datetime
|
|
mdt = datetime.datetime.fromtimestamp( stat.st_mtime )
|
|
# Current datetime
|
|
cdt = datetime.datetime.now()
|
|
|
|
# On weekends (weekday 5,6) update not sensefull
|
|
if cdt.weekday() == 5:
|
|
allowed_delta = 2
|
|
elif cdt.weekday() == 6:
|
|
allowed_delta = 3
|
|
else:
|
|
allowed_delta = 1
|
|
|
|
# If file is outdated, remove and recall method
|
|
if (cdt - mdt) >= datetime.timedelta(days=allowed_delta):
|
|
return False
|
|
|
|
return True
|
|
|
|
def remove_input_files(self):
|
|
"""
|
|
Deletes data input files
|
|
"""
|
|
|
|
input_files = ( os.path.join(self.wdir, type(self).zip_file),
|
|
os.path.join(self.wdir, type(self).csv_file) )
|
|
|
|
for f in input_files:
|
|
os.remove( f )
|
|
|
|
|
|
def download_zip( self ):
|
|
"""
|
|
Download the zipfile from EZB
|
|
"""
|
|
# Download the file and save it locally
|
|
with urllib.request.urlopen(type(self).data_source) as response,\
|
|
open( os.path.join(self.wdir,
|
|
type(self).zip_file), 'wb') as out_file:
|
|
|
|
shutil.copyfileobj(response, out_file)
|
|
|
|
# Extract original change date from http header
|
|
# We need to set it later, since we write a new file
|
|
mdate = email.utils.parsedate_to_datetime(
|
|
response.info()["Last-Modified"])
|
|
|
|
# Set ctime to value from http header
|
|
os.utime( os.path.join(self.wdir, type(self).zip_file),
|
|
(datetime.datetime.now().timestamp(), mdate.timestamp()) )
|
|
|
|
# Log
|
|
jogobot.output( "New input file downloaded." )
|
|
|
|
def extract_csv( self ):
|
|
"""
|
|
Extract csv file from zip archive
|
|
"""
|
|
if not os.path.exists( os.path.join(self.wdir, type(self).csv_file) ):
|
|
|
|
with zipfile.ZipFile(
|
|
os.path.join(self.wdir, type(self).zip_file)) as zipobj:
|
|
|
|
zipobj.extract(
|
|
os.path.basename(
|
|
os.path.join(self.wdir, type(self).csv_file)),
|
|
path=self.wdir )
|
|
|
|
def treat_job( self, job ):
|
|
"""
|
|
Handles working on specific jobs
|
|
|
|
@param job: Job to work on
|
|
@type job: EuroExangeBotJob
|
|
"""
|
|
|
|
# Store reference to current job in Bot obj
|
|
self.current_job = job
|
|
|
|
# Log job
|
|
jogobot.output( "Work on Job {}".format(job.image) )
|
|
|
|
# Get file page
|
|
self.current_job.filepage = pywikibot.page.FilePage(
|
|
pywikibot.Site(), job.image)
|
|
|
|
# Stop if file not jet exists
|
|
if not self.current_job.filepage.exists():
|
|
jogobot.output( "Work on Job {}".format( self.current_job.image),
|
|
"ERROR" )
|
|
raise pywikibot.NoPage( filepage )
|
|
|
|
self.call_gnuplot( job )
|
|
|
|
self.upload_file( job )
|
|
|
|
def call_gnuplot( self, job ):
|
|
"""
|
|
|
|
@param job: Job to work on
|
|
@type job: EuroExangeBotJob
|
|
"""
|
|
|
|
cmd = shlex.split ( type(self).gnuplot + " " + os.path.realpath(
|
|
os.path.join( type(self).gnuplot_script_dir,
|
|
job.script + ".plt" ) ) )
|
|
|
|
plt_env = os.environ.copy()
|
|
plt_env["INFILE"] = type(self).csv_file
|
|
plt_env["OUTFILE"] = job.image
|
|
|
|
subprocess.call( cmd, cwd=self.wdir, env=plt_env )
|
|
|
|
def upload_file( self, job ):
|
|
"""
|
|
|
|
@param job: Job to work on
|
|
@type job: EuroExangeBotJob
|
|
"""
|
|
|
|
comment = type(self).upload_comment
|
|
|
|
filename = job.image
|
|
filepath = os.path.join(self.wdir, job.image)
|
|
keepFilename=True #set to True to skip double-checking/editing destination filename
|
|
verifyDescription=True #set to False to skip double-checking/editing description => change to bot-mode
|
|
ignoreWarning=False #set to True to skip warnings, Upload even if another file would be overwritten or another mistake would be risked
|
|
targetSite = pywikibot.Site()
|
|
|
|
bot = pywikibot.specialbots.UploadRobot(
|
|
filepath,
|
|
description=comment,
|
|
useFilename=filename,
|
|
keepFilename=keepFilename,
|
|
verifyDescription=verifyDescription,
|
|
ignoreWarning=ignoreWarning,
|
|
targetSite = targetSite
|
|
)
|
|
|
|
bot.upload_image(debug=True)
|
|
|
|
def main(*args):
|
|
"""
|
|
Process command line arguments and invoke bot.
|
|
|
|
If args is an empty list, sys.argv is used.
|
|
|
|
@param args: command line arguments
|
|
@type args: list of unicode
|
|
"""
|
|
|
|
# Make sure locale is set to 'de_DE.UTF-8' to prevent problems
|
|
# with wrong month abreviations in strptime
|
|
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
|
|
|
|
# Process global arguments to determine desired site
|
|
local_args = pywikibot.handle_args(args)
|
|
|
|
# Get the jogobot-task_slug (basename of current file without ending)
|
|
task_slug = os.path.basename(__file__)[:-len(".py")]
|
|
|
|
# Actually not needed since we only run semi-automaticall
|
|
# Before run, we need to check wether we are currently active or not
|
|
#~ if not jogobot.bot.active( task_slug ):
|
|
#~ return
|
|
|
|
# Parse local Args to get information about subtask
|
|
( subtask, genFactory, subtask_args ) = jogobot.bot.parse_local_args(
|
|
local_args, None )
|
|
|
|
# Init Bot
|
|
bot = jogobot.bot.init_bot( task_slug, None, EuroExangeBot, genFactory)
|
|
|
|
# Run bot
|
|
jogobot.bot.run_bot( task_slug, None, bot )
|
|
|
|
|
|
if( __name__ == "__main__" ):
|
|
main()
|
|
|