You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

395 lines
12 KiB

#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
import os
import locale
import urllib.request
import shutil
import datetime
import zipfile
import shlex
import subprocess
import email.utils
import hashlib
import json
import pywikibot
import pywikibot.specialbots
import jogobot
from config import Config
from descpage import DescPageBot
class EuroExchangeBotJob():
"""
Used for EuroExchangeBot job queue
"""
def __init__( self, **kwargs ):
self.image = kwargs['image']
self.script = kwargs['script']
self.freq = kwargs['freq']
# On beta prepend string TEST_ to filename to prevent overwriting of
# originial commons files
if pywikibot.Site().family.name == "wpbeta":
self.image = "TEST_{}".format(self.image)
class EuroExchangeBot( pywikibot.bot.BaseBot ):
def __init__( self, genFactory, **kwargs ):
# Init working directory
self.init_wdir()
# Prepare DescPage editing bot
self.descpagebot = DescPageBot( **kwargs )
super().__init__(**kwargs)
def run(self):
# Make sure input data is uptodate
self.update_data()
# Load and treat jobs
for job in self.load_jobs():
self.treat_job(job)
def init_wdir(self):
"""
Make sure, the working directory exists
"""
#Normalize working dir
self.wdir = os.path.realpath(Config.working_dir)
if os.path.exists(self.wdir):
if os.path.isdir(self.wdir):
return
else:
raise OSError( ("Working directory at {} already exists," +\
"but is no directory").format(
self.wdir))
else:
os.makedirs( self.wdir )
jogobot.output( "Create empty working directory at {}".format(
self.wdir))
def update_data(self):
"""
Checks if zip file exists and make sure it is uptodate, and extract
csv data if neccessary
"""
# Check if zip file exists
if os.path.exists( os.path.join(self.wdir, Config.zip_file) ):
# If file is outdated, remove data input files
if not self.is_zip_uptodate():
self.remove_input_files()
# Recall method to get new file
self.update_data()
# Otherwise download
else:
self.download_zip()
# Extract csv data
self.extract_csv()
def is_zip_uptodate(self):
"""
Timechecks weather zip file is the most recent version based on mdate
@returns True if zip file is uptodate, otherwise false
@rtype bool
"""
# Get file stat
stat = os.stat( os.path.join(self.wdir, Config.zip_file) )
# Get file modification datetime
mdt = datetime.datetime.fromtimestamp( stat.st_mtime )
# Current datetime
cdt = datetime.datetime.now()
# On weekends (weekday 5,6) update not sensefull
if cdt.weekday() == 5:
allowed_delta = 2
elif cdt.weekday() == 6:
allowed_delta = 3
else:
allowed_delta = 1
# If file is outdated, remove and recall method
if (cdt - mdt) >= datetime.timedelta(days=allowed_delta):
return False
return True
def remove_input_files(self):
"""
Deletes data input files
"""
input_files = ( os.path.join(self.wdir, Config.zip_file),
os.path.join(self.wdir, Config.csv_file) )
for f in input_files:
os.remove( f )
def download_zip( self ):
"""
Download the zipfile from EZB
"""
# Download the file and save it locally
with urllib.request.urlopen(Config.data_source) as response,\
open( os.path.join(self.wdir,
Config.zip_file), 'wb') as out_file:
shutil.copyfileobj(response, out_file)
# Extract original change date from http header
# We need to set it later, since we write a new file
mdate = email.utils.parsedate_to_datetime(
response.info()["Last-Modified"])
# Set ctime to value from http header
os.utime( os.path.join(self.wdir, Config.zip_file),
(datetime.datetime.now().timestamp(), mdate.timestamp()) )
# Log
jogobot.output( "New input file downloaded." )
def extract_csv( self ):
"""
Extract csv file from zip archive
"""
if not os.path.exists( os.path.join(self.wdir, Config.csv_file) ):
with zipfile.ZipFile(
os.path.join(self.wdir, Config.zip_file)) as zipobj:
zipobj.extract(
os.path.basename(
os.path.join(self.wdir, Config.csv_file)),
path=self.wdir )
def load_jobs( self ):
"""
Load jobs from json file
@returns Generator of EuroExchangeBotJob objects
@rtype generator
"""
# Load json jobs file
with open( os.path.join(Config.base_dir, "jobs.json"), "r") as fd:
jobs_js = json.load( fd )
# yield each job
for job_args in jobs_js:
yield EuroExchangeBotJob( **job_args )
def treat_job( self, job ):
"""
Handles working on specific jobs
@param job: Job to work on
@type job: EuroExchangeBotJob
"""
# Store reference to current job in Bot obj
self.current_job = job
# Log job
jogobot.output( "Work on Job {}".format(job.image) )
# Get file page
self.current_job.filepage = pywikibot.page.FilePage(
pywikibot.Site(), job.image)
# Skip if file not yet exists
if not self.current_job.filepage.exists():
jogobot.output( "Image {} does not exists on wiki, job skipped!".
format( self.current_job.image), "WARNING" )
return
#~ raise pywikibot.NoPage( self.current_job.filepage )
# Check if update is necessary
if self.image_update_needed():
try:
self.call_gnuplot( job )
if self.file_changed():
self.upload_file( job )
else:
jogobot.output( "No upload needed for Job {}.".format(
self.current_job.image) )
except subprocess.CalledProcessError as e:
jogobot.output( "Subprocess terminated with exit code {}!".
format( e.returncode), "ERROR" )
# Nothing to do
else:
jogobot.output( "No update needed for Job {}".format(
self.current_job.image) )
# Update file description page
self.descpagebot.treat_job( self.current_job )
def image_update_needed( self ):
"""
Checks weather image update intervall is reached.
@returns True if update needed
@rtype bool
"""
# Get datetime of last update
last_update = self.current_job.filepage.latest_file_info.timestamp
# Get current time
now = pywikibot.Site().getcurrenttime()
# Calculate allowed delta (with tolerance)
delta = datetime.timedelta( days=self.current_job.freq, hours=-2 )
if now >= last_update + delta:
return True
else:
return False
def call_gnuplot( self, job ):
"""
@param job: Job to work on
@type job: EuroExchangeBotJob
"""
cmd = shlex.split ( Config.gnuplot + " " + os.path.realpath(
os.path.join( Config.gnuplot_script_dir,
job.script + ".plt" ) ) )
plt_env = os.environ.copy()
plt_env["INFILE"] = Config.csv_file
plt_env["OUTFILE"] = job.image
subprocess.check_call( cmd, cwd=self.wdir, env=plt_env )
def file_changed( self ):
"""
Checks if generated file and online file differs via sha1 hash
@returns True if file was changed
@rtype bool
"""
# Get online file sha1 hash
online_sha1 = self.current_job.filepage.latest_file_info.sha1
# Get local file sha1 hash
with open(os.path.join(self.wdir, self.current_job.image),'rb') as fd:
local_sha1 = hashlib.sha1(fd.read()).hexdigest()
if online_sha1 == local_sha1:
return False
else:
return True
def upload_file( self, job ):
"""
@param job: Job to work on
@type job: EuroExchangeBotJob
"""
comment = Config.upload_comment
filename = job.image
filepath = [ os.path.join(self.wdir, job.image) ]
keepFilename = True #set to True to skip double-checking/editing destination filename
verifyDescription = True #set to False to skip double-checking/editing description => change to bot-mode
ignoreWarning = [ "exists", "duplicate", ] #set to True to skip warnings, Upload even if another file would be overwritten or another mistake would be risked
targetSite = pywikibot.Site()
always = self.getOption("always")
aborts = True if self.getOption("always") else list()
bot = pywikibot.specialbots.UploadRobot(
filepath,
description=comment,
useFilename=filename,
keepFilename=keepFilename,
verifyDescription=verifyDescription,
ignoreWarning=ignoreWarning,
targetSite = targetSite,
always=always,
aborts=aborts,
)
bot.run()
def main(*args):
"""
Process command line arguments and invoke bot.
If args is an empty list, sys.argv is used.
@param args: command line arguments
@type args: list of unicode
"""
# Make sure locale is set to 'de_DE.UTF-8' to prevent problems
# with wrong month abreviations in strptime
locale.setlocale(locale.LC_ALL, 'de_DE.UTF-8')
# Process global arguments to determine desired site
local_args = pywikibot.handle_args(args)
# Get the jogobot-task_slug (basename of current file without ending)
task_slug = os.path.basename(__file__)[:-len(".py")]
# Actually not needed since we only run semi-automaticall
# Before run, we need to check wether we are currently active or not
#~ if not jogobot.bot.active( task_slug ):
#~ return
# Parse local Args to get information about subtask
( subtask, genFactory, subtask_args ) = jogobot.bot.parse_local_args(
local_args, None )
# Init Bot
bot = jogobot.bot.init_bot( task_slug, None, EuroExchangeBot, genFactory, **subtask_args)
# Run bot
jogobot.bot.run_bot( task_slug, None, bot )
if( __name__ == "__main__" ):
main()