You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
7.8 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
import os
import locale
import urllib.request
import shutil
import datetime
import zipfile
import shlex
import subprocess
import email.utils
import pywikibot
import jogobot
class EuroExangeBotJob():
"""
Used for EuroExangeBot job queue
"""
def __init__( self, **kwargs ):
self.image = kwargs['image']
self.script = kwargs['script']
class EuroExangeBot( pywikibot.bot.BaseBot ):
working_dir = os.path.dirname(os.path.realpath(__file__)) + "/../wdir"
gnuplot_script_dir = os.path.dirname(os.path.realpath(__file__)) + \
"/../gnuplot_scripts"
gnuplot = "/usr/bin/gnuplot"
data_source = "http://www.ecb.int/stats/eurofxref/eurofxref-hist.zip"
zip_file = "eurofxref-hist.zip"
csv_file = "eurofxref-hist.csv"
jobs = [ EuroExangeBotJob( image="TEST_Euro_exchange_rate_to_TRY_-_Turkish_Currency_and_Debt_Crisis_2018.svg", script="Euro_exchange_rate_to_TRY_-_Turkish_Currency_and_Debt_Crisis_2018" ) ]
def __init__( self, genFactory, **kwargs ):
# Init working directory
self.init_wdir()
super().__init__(*kwargs)
def run(self):
# Make sure input data is uptodate
self.update_data()
for job in type(self).jobs:
self.treat_job(job)
pass
def init_wdir(self):
"""
Make sure, the working directory exists
"""
#Normalize working dir
self.wdir = os.path.realpath(type(self).working_dir)
if os.path.exists(self.wdir):
if os.path.isdir(self.wdir):
return
else:
raise OSError( ("Working directory at {} already exists," +\
"but is no directory").format(
self.wdir))
else:
os.makedirs( self.wdir )
jogobot.output( "Create empty working directory at {}".format(
self.wdir))
def update_data(self):
"""
Checks if zip file exists and make sure it is uptodate, and extract
csv data if neccessary
"""
# Check if zip file exists
if os.path.exists( os.path.join(self.wdir, type(self).zip_file) ):
# If file is outdated, remove data input files
if not self.is_zip_uptodate():
self.remove_input_files()
# Recall method to get new file
self.update_data()
# Otherwise download
else:
self.download_zip()
# Extract csv data
self.extract_csv()
def is_zip_uptodate(self):
"""
Timechecks weather zip file is the most recent version based on mdate
@returns True if zip file is uptodate, otherwise false
@rtype bool
"""
# Get file stat
stat = os.stat( os.path.join(self.wdir, type(self).zip_file) )
# Get file modification datetime
mdt = datetime.datetime.fromtimestamp( stat.st_mtime )
# Current datetime
cdt = datetime.datetime.now()
# On weekends (weekday 5,6) update not sensefull
if cdt.weekday() == 5:
allowed_delta = 2
elif cdt.weekday() == 6:
allowed_delta = 3
else:
allowed_delta = 1
# If file is outdated, remove and recall method
if (cdt - mdt) >= datetime.timedelta(days=allowed_delta):
return False
return True
def remove_input_files(self):
"""
Deletes data input files
"""
input_files = ( os.path.join(self.wdir, type(self).zip_file),
os.path.join(self.wdir, type(self).csv_file) )
for f in input_files:
os.remove( f )
def download_zip( self ):
"""
Download the zipfile from EZB
"""
# Download the file and save it locally
with urllib.request.urlopen(type(self).data_source) as response,\
open( os.path.join(self.wdir,
type(self).zip_file), 'wb') as out_file:
shutil.copyfileobj(response, out_file)
# Extract original change date from http header
# We need to set it later, since we write a new file
mdate = email.utils.parsedate_to_datetime(
response.info()["Last-Modified"])
# Set ctime to value from http header
os.utime( os.path.join(self.wdir, type(self).zip_file),
(datetime.datetime.now().timestamp(), mdate.timestamp()) )
def extract_csv( self ):
"""
Extract csv file from zip archive
"""
if not os.path.exists( os.path.join(self.wdir, type(self).csv_file) ):
with zipfile.ZipFile(
os.path.join(self.wdir, type(self).zip_file)) as zipobj:
zipobj.extract(
os.path.basename(
os.path.join(self.wdir, type(self).csv_file)),
path=self.wdir )
def treat_job( self, job ):
"""
Handles working on specific jobs
@param job: Job to work on
@type job: EuroExangeBotJob
"""
# Log job
jogobot.output( "Work on Job {}".format(job.image) )
# Get file page
filepage = pywikibot.page.FilePage(pywikibot.Site(), job.image)
# Stop if file not jet exists
if not filepage.exists():
jogobot.output( "Work on Job {}".format(job.image), "ERROR" )
raise pywikibot.NoPage( filepage )
self.call_gnuplot( job )
def call_gnuplot( self, job ):
"""
@param job: Job to work on
@type job: EuroExangeBotJob
"""
cmd = shlex.split ( type(self).gnuplot + " " + os.path.realpath(
os.path.join( type(self).gnuplot_script_dir,
job.script + ".plt" ) ) )
plt_env = os.environ.copy()
plt_env["INFILE"] = type(self).csv_file
plt_env["OUTFILE"] = job.image
subprocess.call( cmd, cwd=self.wdir, env=plt_env )
def main(*args):
"""
Process command line arguments and invoke bot.
If args is an empty list, sys.argv is used.
@param args: command line arguments
@type args: list of unicode
"""
# Make sure locale is set to 'de_DE.UTF-8' to prevent problems
# with wrong month abreviations in strptime
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
# Process global arguments to determine desired site
local_args = pywikibot.handle_args(args)
# Get the jogobot-task_slug (basename of current file without ending)
task_slug = os.path.basename(__file__)[:-len(".py")]
# Actually not needed since we only run semi-automaticall
# Before run, we need to check wether we are currently active or not
#~ if not jogobot.bot.active( task_slug ):
#~ return
# Parse local Args to get information about subtask
( subtask, genFactory, subtask_args ) = jogobot.bot.parse_local_args(
local_args, None )
# Init Bot
bot = jogobot.bot.init_bot( task_slug, None, EuroExangeBot, genFactory)
# Run bot
jogobot.bot.run_bot( task_slug, None, bot )
if( __name__ == "__main__" ):
main()