|
|
@ -18,6 +18,10 @@ |
|
|
|
|
|
|
|
import os |
|
|
|
import locale |
|
|
|
import urllib.request |
|
|
|
import shutil |
|
|
|
import datetime |
|
|
|
import zipfile |
|
|
|
|
|
|
|
import pywikibot |
|
|
|
|
|
|
@ -27,15 +31,22 @@ import jogobot |
|
|
|
class EuroExangeBot( pywikibot.bot.BaseBot ): |
|
|
|
|
|
|
|
working_dir = os.path.dirname(os.path.realpath(__file__)) + "/../wdir" |
|
|
|
data_source = "http://www.ecb.int/stats/eurofxref/eurofxref-hist.zip" |
|
|
|
zip_file = "eurofxref-hist.zip" |
|
|
|
csv_file = "eurofxref-hist.csv" |
|
|
|
|
|
|
|
def __init__( self, genFactory, **kwargs ): |
|
|
|
|
|
|
|
# Init working directory |
|
|
|
self.init_wdir() |
|
|
|
|
|
|
|
super().__init__(*kwargs) |
|
|
|
|
|
|
|
def run(self): |
|
|
|
|
|
|
|
# Make sure input data is uptodate |
|
|
|
self.update_data() |
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
def init_wdir(self): |
|
|
@ -51,7 +62,8 @@ class EuroExangeBot( pywikibot.bot.BaseBot ): |
|
|
|
if os.path.isdir(self.wdir): |
|
|
|
return |
|
|
|
else: |
|
|
|
raise OSError("Working directory at {} already exists, but is no directory".format( |
|
|
|
raise OSError( ("Working directory at {} already exists," +\ |
|
|
|
"but is no directory").format( |
|
|
|
self.wdir)) |
|
|
|
|
|
|
|
else: |
|
|
@ -59,6 +71,95 @@ class EuroExangeBot( pywikibot.bot.BaseBot ): |
|
|
|
jogobot.output( "Create empty working directory at {}".format( |
|
|
|
self.wdir)) |
|
|
|
|
|
|
|
def update_data(self): |
|
|
|
""" |
|
|
|
Checks if zip file exists and make sure it is uptodate, and extract |
|
|
|
csv data if neccessary |
|
|
|
""" |
|
|
|
|
|
|
|
# Check if zip file exists |
|
|
|
if os.path.exists( os.path.join(self.wdir, type(self).zip_file) ): |
|
|
|
|
|
|
|
# If file is outdated, remove data input files |
|
|
|
if not self.is_zip_uptodate(): |
|
|
|
self.remove_input_files() |
|
|
|
|
|
|
|
# Recall method to get new file |
|
|
|
self.update_data() |
|
|
|
|
|
|
|
# Otherwise download |
|
|
|
else: |
|
|
|
self.download_zip() |
|
|
|
|
|
|
|
# Extract csv data |
|
|
|
self.extract_csv() |
|
|
|
|
|
|
|
def is_zip_uptodate(self): |
|
|
|
""" |
|
|
|
Timechecks weather zip file is the most recent version based on mdate |
|
|
|
|
|
|
|
@returns True if zip file is uptodate, otherwise false |
|
|
|
@rtype bool |
|
|
|
""" |
|
|
|
# Get file stat |
|
|
|
stat = os.stat( os.path.join(self.wdir, type(self).zip_file) ) |
|
|
|
|
|
|
|
# Get file modification datetime |
|
|
|
mdt = datetime.datetime.fromtimestamp( stat.st_mtime ) |
|
|
|
# Current datetime |
|
|
|
cdt = datetime.datetime.now() |
|
|
|
|
|
|
|
# On weekends (weekday 5,6) update not sensefull |
|
|
|
if cdt.weekday() == 5: |
|
|
|
allowed_delta = 2 |
|
|
|
elif cdt.weekday() == 6: |
|
|
|
allowed_delta = 3 |
|
|
|
else: |
|
|
|
allowed_delta = 1 |
|
|
|
|
|
|
|
# If file is outdated, remove and recall method |
|
|
|
if (cdt - mdt) >= datetime.timedelta(days=allowed_delta): |
|
|
|
return False |
|
|
|
|
|
|
|
return True |
|
|
|
|
|
|
|
def remove_input_files(self): |
|
|
|
""" |
|
|
|
Deletes data input files |
|
|
|
""" |
|
|
|
|
|
|
|
input_files = ( os.path.join(self.wdir, type(self).zip_file), |
|
|
|
os.path.join(self.wdir, type(self).csv_file) ) |
|
|
|
|
|
|
|
for f in input_files: |
|
|
|
os.remove( f ) |
|
|
|
|
|
|
|
|
|
|
|
def download_zip( self ): |
|
|
|
""" |
|
|
|
Download the zipfile from EZB |
|
|
|
""" |
|
|
|
# Download the file and save it locally |
|
|
|
with urllib.request.urlopen(type(self).data_source) as response,\ |
|
|
|
open( os.path.join(self.wdir, |
|
|
|
type(self).zip_file), 'wb') as out_file: |
|
|
|
|
|
|
|
shutil.copyfileobj(response, out_file) |
|
|
|
|
|
|
|
def extract_csv( self ): |
|
|
|
""" |
|
|
|
Extract csv file from zip archive |
|
|
|
""" |
|
|
|
if not os.path.exists( os.path.join(self.wdir, type(self).csv_file) ): |
|
|
|
|
|
|
|
with zipfile.ZipFile( |
|
|
|
os.path.join(self.wdir, type(self).zip_file)) as zipobj: |
|
|
|
|
|
|
|
zipobj.extract( |
|
|
|
os.path.basename( |
|
|
|
os.path.join(self.wdir, type(self).csv_file)), |
|
|
|
path=self.wdir ) |
|
|
|
|
|
|
|
|
|
|
|
def main(*args): |
|
|
|
""" |
|
|
|