Source code for taskbutler.taskbutler

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import codecs
import logging
import logging.handlers
from configparser import ConfigParser

import dropbox
import requests
from dropbox.exceptions import ApiError, AuthError
from dropbox.files import WriteMode
from dropbox.paper import ImportFormat, PaperDocCreateError, SharingPublicPolicyType, SharingPolicy

from todoist.api import TodoistAPI
import os
import shutil

from .config import staticConfig, getConfigPaths

logger = logging.getLogger('todoist')
loggerdb = logging.getLogger('dropbox')
loggerdg = logging.getLogger('github')


[docs]def createdropboxfile(title, dbx, templatefile, dropbox_prepart_files, folder) -> str: """ Creates new dropbox file with given name. Returns a office online URL Requires a authorized dropbox -> office365 connection :param folder: folder in dropbox. Relativ from / :type dropbox_prepart_files: object URL pre-part to create dropbox/office365 url :param title: (str) Title of the newly created file (special characters will get stripped) :param templatefile: (str) Path to template file :param dbx: dropbox api object :return: office online URL """ # https://github.com/dropbox/dropbox-sdk-python/blob/master/example/back-up-and-restore/backup-and-restore-example.py # https://github.com/dropbox/dropbox-sdk-python/blob/master/example/updown.py filename = title # Strip special characters # #https://stackoverflow.com/questions/5843518/remove-all-special-characters-punctuation-and-spaces-from-string filename = ''.join(e for e in filename if e.isalnum()) loggerdb.debug("Filename for new Dropbox file: {}".format(filename)) # Check for duplicate filenames and rename new file # Renaming breaks connection between task <-> file! # Task doesn't know about renaming. Deleting file when task is completet is impossible. try: searchresult = dbx.files_search('/' + folder, filename, start=0, max_results=1, mode=dropbox.files.SearchMode('filename', None)) if not searchresult.matches: loggerdb.debug("No duplicate filename found") else: filename = filename + "1" loggerdb.debug("Duplicate filename found. Renaming {} to {}".format(title, filename)) except ApiError as err: loggerdb.error( "Probably folder is not existent. Create folder:{} manually. Original: {}".format(folder, err.error)) raise SystemExit(1) # Seperate filename - filetype filetype = templatefile.rsplit(".", 1)[1] loggerdb.debug("Filetype: {}".format(filetype)) # Upload file to dropbox and create link with open('./' + templatefile, 'rb') as f: try: # Dropbox Api doesn't rename if Content is the same # https://www.dropboxforum.com/t5/API-Support-Feedback/Cannot-auto-rename-file/td-p/234640 debugnote = dbx.files_upload(f.read(), '/' + folder + '/' + filename + '.' + filetype, mode=dropbox.files.WriteMode.add, autorename=False) loggerdb.debug("{}".format(debugnote)) todoist_dropboxfile_url = dropbox_prepart_files + folder + '/' + filename + '.' + filetype + '?force_role=personal' loggerdb.debug("URL for new Dropbox file: {}".format(todoist_dropboxfile_url)) return todoist_dropboxfile_url except Exception as err: loggerdb.error("Something went wrong: {}".format(err)) raise SystemExit(1)
[docs]def createpaperdocument(title, dbx, todoistfolderid, todoistpaperurl, sharing) -> str: """ Creates new dropbox paper document in given folder with given title and returns full URL. :type sharing: (str or bool) Wether to make paper public or not :param title: (str) Title of the newly created document (markdown) :param dbx: dropbox api object :param todoistfolderid: (str) Folder ID of folder to save paper to :param todoistpaperurl: (str) Dropbox paper URL pre-part to build full Link from. "this-part.com\"paperid :return: Full URL to created paper """ content = title content_b = content.encode('UTF-8') todoist_paper_url = None try: r = dbx.paper_docs_create(content_b, ImportFormat('markdown'), parent_folder_id=todoistfolderid) loggerdb.debug("Created paper for task: {}".format(content)) # print(dbx.paper_docs_sharing_policy_get(r.doc_id)) # Set sharing policy to invite only. Papers are public per default! if sharing == "false" or not sharing: dbx.paper_docs_sharing_policy_set(r.doc_id, SharingPolicy( public_sharing_policy=SharingPublicPolicyType('invite_only', None))) loggerdb.debug("Paper marked as invite only") todoist_paper_id = r.doc_id todoist_paper_url = todoistpaperurl + todoist_paper_id loggerdb.debug("Created paper at URL: {}".format(todoist_paper_url)) except PaperDocCreateError as e: loggerdb.error("PaperDocCreateError\nOriginal Error: {}".format(e)) raise SystemExit(1) except ApiError as e: loggerdb.error("API ERROR\nOriginal Error: {}".format(e)) raise SystemExit(1) return todoist_paper_url
[docs]def gettodoistfolderid(foldername: str, dbx): """ Dropbox - Get Folder ID of folder "todoist" from user account Note : only finds folder once a paper is created in. create test paper first. :param foldername: foldername to look for :param dbx: dropbox object :return: folder ID for given name """ loggerdb.debug("Lookup ID for paper folder: {}".format(foldername)) paper = dbx.paper_docs_list() todoist_folder_id = "" while paper.has_more: paper += dbx.paper_docs_list_continue(paper) for entry in paper.doc_ids: folder_meta = dbx.paper_docs_get_folder_info(entry) if folder_meta.folders: # print(document_meta.title + "in Folder: " + folder_meta.folders[0].name + "id: " + folder_meta.folders[0].id) # print("in Folder: " + folder_meta.folders[0].name + " id: " + folder_meta.folders[0].id) if folder_meta.folders[0].name == foldername: # print("id: " + folder_meta.folders[0].id) todoist_folder_id = folder_meta.folders[0].id loggerdb.debug("Paper folder set as todoist folder: {}".format(todoist_folder_id)) break # print(folder_meta.folders[0].id) # print(document_response) return todoist_folder_id
[docs]def getprogresssymbols(progress_done, secrets): """ Returns unicode bar based on given percentage. :param secrets: :param progress_done: (int, float) percentage of progress :return: (str) unicode bar """ # TODO change to switch-case item_progressbar = "" if progress_done == 0: item_progressbar = secrets["todoist"]["progress_bar_0"] if progress_done > 0 and progress_done <= 20: item_progressbar = secrets["todoist"]["progress_bar_20"] if progress_done > 20 and progress_done <= 40: item_progressbar = secrets["todoist"]["progress_bar_40"] if progress_done > 40 and progress_done <= 60: item_progressbar = secrets["todoist"]["progress_bar_60"] if progress_done > 60 and progress_done <= 80: item_progressbar = secrets["todoist"]["progress_bar_80"] if progress_done > 80 and progress_done <= 100: item_progressbar = secrets["todoist"]["progress_bar_100"] return str(item_progressbar)
[docs]def checkforupdate(currentversion, updateurl): """ Check for new version at github :param currentversion: (str) version of current release :param updateurl: (str) github "releases" json url :return: None """ # Check for updates try: r = requests.get(updateurl) r.raise_for_status() release_info_json = r.json() if not currentversion == release_info_json[0]['tag_name']: logger.info( "Your version is not up-to-date! \nYour version: {}\nLatest version: {}\nSee latest version at: {}".format( currentversion, release_info_json[0]['tag_name'], release_info_json[0]['html_url'])) return 1 else: return 0 except requests.exceptions.ConnectionError as e: logger.error("Error while checking for updates (Connection error): {}".format(e)) return 1 except requests.exceptions.HTTPError as e: logger.error("Error while checking for updates (HTTP error): {}".format(e)) return 1 except requests.exceptions.RequestException as e: logger.error("Error while checking for updates: {}".format(e)) return 1
[docs]def getlabelid(labelname: str, api: object) -> str: """ Todoist - Returns ID of given labelname :param labelname: str Name of label to search for :param api: Todoist api object :return: ID of labelname """ logger.debug("Searching for ID of label: {}".format(labelname)) label_progress_id = None try: for label in api.state['labels']: if label['name'] == labelname: label_progress_id = label['id'] logger.debug("ID for label: {} found! ID: {}".format(labelname, label_progress_id)) return label_progress_id raise ValueError('Label not found in Todoist. Skipped!') except ValueError as error: logger.error("{}".format(error)) raise ValueError(error)
[docs]def addurltotask(title_old, url, progress_seperator): title_old_meta = "" if progress_seperator in title_old: title_old_headline, title_old_meta = title_old.split(progress_seperator) title_new = url + " (" + title_old_headline.rstrip() + ") " + "" + progress_seperator + title_old_meta else: title_old_headline = title_old title_new = url + " (" + title_old_headline.rstrip() + ") " + "" + title_old_meta return title_new
[docs]def gettasktitle(title, progress_seperator): # TODO: returns tailing space! REMOVE! """ Get task title withouth meta :type progress_seperator: str progress seperator :param title: Task title with seperator :return: """ if progress_seperator and progress_seperator in title: title_headline, title_old_meta = title.split(progress_seperator) else: title_headline = title return title_headline
[docs]def gettaskwithlabelid(labelid, api): """ Returns a list of Task IDs found with given label-ID :param labelid: (str) label ID of label to search for :param api: (obj) todoist api :return: (list) found Task IDs """ found = [] for task in api.state['items']: if not isinstance(task['id'], str) and task['labels'] and not task['is_deleted'] and not task[ 'in_history'] and not task['is_archived']: for label in task['labels']: if label == labelid: found.append(task['id']) return found
[docs]def main(): # create config if not os.path.exists(getConfigPaths().config()): os.mkdir(getConfigPaths().app(), mode=0o750) os.mkdir(getConfigPaths().config(), mode=0o750) # create templates if os.path.exists(getConfigPaths().app()) and not os.path.exists(getConfigPaths().templates()): os.mkdir(getConfigPaths().templates(), mode=0o750) # create log if os.path.exists(getConfigPaths().app()) and not os.path.exists(getConfigPaths().log()): os.mkdir(getConfigPaths().log(), mode=0o750) # create initial config if not os.path.exists(getConfigPaths().file_config()): shutil.copy(os.path.join(os.path.dirname(os.path.abspath(__file__)), staticConfig.filename_config_initial), getConfigPaths().file_config()) # Read config.ini # TODO refactor read/write config -> https://docs.python.org/3/library/configparser.html # check for every non-optional parameter try: # Setup logging # Set logging format formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # Make sure to output everthing as long no loglevel is set loggerinit = logging.getLogger("Taskbutler") loggerinit.setLevel(logging.DEBUG) handler = logging.StreamHandler() handler.setFormatter(formatter) loggerinit.addHandler(handler) loggerinit.info("Start Taskbutler.") loggerinit.info("Read config from: {}".format(getConfigPaths().file_config())) config = ConfigParser() config.read_file(open(getConfigPaths().file_config(), 'r', encoding='utf-8')) # If no logfile given, log to console if "log" in config.sections() and "logfile" in config["log"]: handler = logging.handlers.TimedRotatingFileHandler(os.path.join(getConfigPaths().log(), config["log"]["logfile"]), when="d", interval=7, backupCount=2, encoding='utf-8') loggerinit.info("Set logging file: {}".format(handler.baseFilename)) logger.propagate = False loggerdb.propagate = False loggerdg.propagate = False else: handler = logging.StreamHandler() loggerinit.info("Set log output to console") logger.propagate = False loggerdb.propagate = False loggerdg.propagate = False handler.setFormatter(formatter) logger.addHandler(handler) loggerdb.addHandler(handler) loggerdg.addHandler(handler) # Set loglevel. Default is DEBUG if "log" in config.sections() and "loglevel" in config["log"]: logger.setLevel(logging.getLevelName(config["log"]["loglevel"])) loggerdb.setLevel(logging.getLevelName(config["log"]["loglevel"])) loggerdg.setLevel(logging.getLevelName(config["log"]["loglevel"])) else: logger.setLevel(logging.DEBUG) loggerdb.setLevel(logging.DEBUG) loggerdg.setLevel(logging.DEBUG) logger.info("Set logging level: {}".format(logging.getLevelName(logger.level))) # Setup devmode. If true -> no todoist commit and github update check(60 requests per hour) if config.get('config', 'devmode') == "True" or config.get('config', 'devmode') == "true": devmode = True logger.info("Entering DEVMODE - no todoist data will get changed") else: devmode = False logger.info("Entering Production mode - All changed will get synced") # Read config todoist_api_key = config.get('todoist', 'apikey') label_progress = config.get('todoist', 'label_progress') todoist_seperator = config.get('todoist', 'progress_seperator') dropbox_api_key = config.get('dropbox', 'apikey') todoist_folder_id = str(config.get('dropboxpaper', 'todoistfolderid')) todoist_folder_name = config.get('dropboxpaper', 'foldername') label_todoist_dropboxpaper = config.get('dropboxpaper', 'labelname') todoist_paper_sharing = config.get('dropboxpaper', 'sharing') todoist_dropbox_templatefile = config.get('dropboxoffice', 'templatefile') label_todoist_dropboxoffice = config.get('dropboxoffice', 'labelname') todoist_dropbox_prepart_files = config.get('dropboxoffice', 'dropbox_prepart_files') dropbox_todoist_folder = config.get('dropboxoffice', 'folder') github_apikey = config.get('github', 'apikey') github_sync_project_name = config.get('github', 'TodoistProjectToSync') github_synclabel_name = config.get('github', 'TodoistSyncLabel') github_url_identifier = config.get('github', 'GithubURLIdentifier') github_sync_repo_name = config.get('github', 'GithubSyncRepoName') github_username = config.get('github', 'GithubUsername') except FileNotFoundError as error: logger.error("Config file not found! Create config.ini first. \nOriginal Error: {}".format(error)) raise SystemExit(1) # init dropbox session if dropbox_api_key and (label_todoist_dropboxpaper or label_todoist_dropboxoffice): dbx = dropbox.Dropbox(dropbox_api_key) try: dbx.users_get_current_account() loggerdb.debug("Dropbox account set to: {}".format(dbx.users_get_current_account())) except AuthError as err: loggerdb.error("Invalid access token: {}".format(err)) raise SystemExit(1) if label_todoist_dropboxpaper: # Check paper folder ID, get if not encoding=self.encoding # Check that folder it still matches folder name if todoist_folder_id: loggerdb.debug("Dropbox paper - folder-ID set to: {}".format(todoist_folder_id)) # TODO Verify ID of foldername. Doesn't work properly. Not really important # dbx.paper_docs_get_folder_info() # folder_meta = dbx.paper_docs_get_folder_info(todoist_folder_id) # print(folder_meta) # raise SystemExit(1) # # if folder_meta.folders[0].name == "todoist": # loggerdb.debug("Dropbox paper - folder-ID is up-to-date: {}".format(todoist_folder_id)) # todoist_folder_id = folder_meta.folders[0].id # else: # loggerdb.debug("Dropbox paper - folder-ID is outdated. Resetting.") # todoist_folder_id = None else: todoist_folder_id = gettodoistfolderid(todoist_folder_name, dbx) config.set('dropboxpaper', 'todoistfolderid', todoist_folder_id) with open(getConfigPaths().file_config(), 'w') as configfile: config.write(codecs.open(getConfigPaths().file_config(), 'wb+', 'utf-8')) else: loggerdb.debug("Dropbox feature disabled. No API key found.") # init todoist session try: api = TodoistAPI(todoist_api_key) api.sync() if not api.state['items']: raise ValueError('Sync error. State empty.') except ValueError as error: logger.error("Sync Error. \nOriginal Error: {}".format(error)) raise SystemExit(1) # Usefull for development: # Delete todoist tasks # print( api.state['items']) # print( api.state['projects']) # item = api.items.get_by_id("ID_TO_DELETE") # item.delete() # api.commit() # List projects if label_progress: label_progress_id = getlabelid(label_progress, api) counter_progress = 0 counter_changed_items = 0 for task in api.state['items']: if not isinstance(task['id'], str) and task['labels'] and not task['is_deleted'] and not task[ 'in_history'] and not task['is_archived']: for label in task['labels']: if label == label_progress_id: logger.debug("Found task to track: {}".format(task['content'])) counter_progress = counter_progress + 1 subtasks_total = 0 subtasks_done = 0 for subTask in api.state['items']: if not subTask['content'].startswith("*"): # * -> Skip "text only Tasks" if not subTask['is_deleted'] and not subTask['in_history'] and not subTask[ 'is_archived'] and subTask['parent_id'] == task['id']: logger.debug( "Found connected Subtask: {}".format(subTask['content'], subTask['id'])) if subTask['checked']: subtasks_done = subtasks_done + 1 logger.debug("Subtask {} is marked as DONE".format(subTask['content'])) else: logger.debug("Subtask {} is marked as UNDONE".format(subTask['content'])) subtasks_total = subtasks_total + 1 if subtasks_total > 0: progress_per_task = 100 / subtasks_total else: progress_per_task = 100 progress_done = round(subtasks_done * progress_per_task) logger.debug( "Task: {} done: {} total: {}".format(task['content'], subtasks_done, subtasks_total)) item_task_old = task['content'] if "‣" in task['content']: item_content_old = task['content'].split(todoist_seperator) item_content_new = item_content_old[0] else: item_content_new = task['content'] + " " item_content = item_content_new + "" + config["todoist"][ "progress_seperator"] + " " + getprogresssymbols(progress_done, config) + " " + str( progress_done) + ' %' if not item_task_old == item_content: logger.debug( "Task progress updated!\nOld title :{}\nNew title :{}".format(item_task_old, item_content)) item = api.items.get_by_id(task['id']) item.update(content=item_content) counter_changed_items = counter_changed_items + 1 # Sync if not devmode: # TODO api.commit + api.sync could be a dubplicate. api.sync is added to prevent issues after changing titles logger.debug("Sync start") api.commit() api.sync() logger.debug("Sync done") logger.info("Tracked tasks : {}".format(counter_progress)) logger.info("Changed tasks: {}".format(counter_changed_items)) else: logger.debug("Progressbar feature disabled. No labelname found.") # Check for Update if not devmode and config["config"]["update_url"]: checkforupdate(config["config"]["version"], config["config"]["update_url"]) # Dropbox paper feature # Drpopbox paper is disabled in devmode -> will create files every time since url is not written in task title. # Dropbox paper is annoying to cleanup if not devmode: if label_todoist_dropboxpaper: # Dropbox Paper loggerdb.debug("Dropbox paper start") labelidid = getlabelid(label_todoist_dropboxpaper, api) taskid = gettaskwithlabelid(labelidid, api) for task in taskid: item = api.items.get_by_id(task) if "https://" not in item['content'] and not item['is_deleted'] and not item[ 'in_history'] and not item['is_archived']: newurl = createpaperdocument(gettasktitle(item['content'], todoist_seperator), dbx, config.get('dropboxpaper', 'todoistfolderid'), config.get('dropboxpaper', 'url'), todoist_paper_sharing) item.update(content=addurltotask(item['content'], newurl, todoist_seperator)) loggerdb.info("Added paper to task: {}".format(item['content'])) if not devmode: api.commit() loggerdb.debug("Sync done") else: logger.info("Dropbox paper feature disabled. No labelname found.") else: logger.info("Dropbox paper feature in devmode disabled.") # Dropbox -> Microsoft office feature if label_todoist_dropboxoffice: loggerdb.debug("Dropbox file start") labelidid = getlabelid(label_todoist_dropboxoffice, api) taskid = gettaskwithlabelid(labelidid, api) for task in taskid: item = api.items.get_by_id(task) if "https://" not in item['content'] and not item['is_deleted'] and not item[ 'in_history'] and not item['is_archived']: newurl = createdropboxfile(item["content"], dbx, todoist_dropbox_templatefile, todoist_dropbox_prepart_files, dropbox_todoist_folder) item.update(content=addurltotask(item['content'], newurl, todoist_seperator)) loggerdb.info("Added File to Task: {}".format(item['content'])) if not devmode: api.commit() loggerdb.debug("Sync done") else: logger.info("Dropbox to Office feature disabled. No labelname found.") logger.info("Taskbutler end")
if __name__ == '__main__': main() # Note: https://pytodoist.readthedocs.io/en/latest/modules.html