diff --git a/web/pgadmin/about/__init__.py b/web/pgadmin/about/__init__.py index 7a8ef95..98a4dd6 100644 --- a/web/pgadmin/about/__init__.py +++ b/web/pgadmin/about/__init__.py @@ -11,32 +11,31 @@ MODULE_NAME = 'about' import sys - from flask import Response, render_template, __version__, url_for from flask_babel import gettext from flask_security import current_user, login_required from pgadmin.utils import PgAdminModule from pgadmin.utils.menu import MenuItem - import config class AboutModule(PgAdminModule): def get_own_menuitems(self): appname = config.APP_NAME + if hasattr(str, 'decode'): appname = appname.decode('utf-8') + return { 'help_items': [ - MenuItem(name='mnu_about', - priority=999, - module="pgAdmin.About", - callback='about_show', - icon='fa fa-info-circle', - label=gettext(u'About %(appname)s', - appname=appname - ) - ) + MenuItem( + name='mnu_about', + priority=999, + module="pgAdmin.About", + callback='about_show', + icon='fa fa-info-circle', + label=gettext(u'About %(appname)s', appname=appname) + ) ] } @@ -50,8 +49,8 @@ class AboutModule(PgAdminModule): def get_exposed_url_endpoints(self): return ['about.index'] -blueprint = AboutModule(MODULE_NAME, __name__, - static_url_path='') + +blueprint = AboutModule(MODULE_NAME, __name__, static_url_path='') ########################################################################## @@ -61,20 +60,29 @@ blueprint = AboutModule(MODULE_NAME, __name__, @login_required def index(): """Render the about box.""" - info = {'python_version': sys.version, 'flask_version': __version__} - if config.SERVER_MODE is True: + info = { + 'python_version': sys.version, + 'flask_version': __version__ + } + + if config.SERVER_MODE: info['app_mode'] = gettext('Server') else: info['app_mode'] = gettext('Desktop') + info['current_user'] = current_user.email - return render_template(MODULE_NAME + '/index.html', info=info, _=gettext) + return render_template( + MODULE_NAME + '/index.html', info=info, _=gettext + ) @blueprint.route("/about.js") @login_required def script(): """render the required javascript""" - return Response(response=render_template("about/about.js", _=gettext), - status=200, - mimetype="application/javascript") + return Response( + response=render_template("about/about.js", _=gettext), + status=200, + mimetype="application/javascript" + ) diff --git a/web/pgadmin/dashboard/__init__.py b/web/pgadmin/dashboard/__init__.py index 06c0b0a..0259a8f 100644 --- a/web/pgadmin/dashboard/__init__.py +++ b/web/pgadmin/dashboard/__init__.py @@ -61,7 +61,8 @@ class DashboardModule(PgAdminModule): isPrivate=False, limit=1, isIframe=False, - canHide=True).__dict__ + canHide=True + ).__dict__ ] def register_preferences(self): @@ -70,8 +71,9 @@ class DashboardModule(PgAdminModule): Register preferences for this module. """ # Register options for the PG and PPAS help paths - self.dashboard_preference = Preferences('dashboards', - gettext('Dashboards')) + self.dashboard_preference = Preferences( + 'dashboards', gettext('Dashboards') + ) self.session_stats_refresh = self.dashboard_preference.register( 'dashboards', 'session_stats_refresh', @@ -149,6 +151,7 @@ class DashboardModule(PgAdminModule): 'dashboard.get_config_by_server_id', ] + blueprint = DashboardModule(MODULE_NAME, __name__) @@ -168,11 +171,13 @@ def check_precondition(f): kwargs['sid'] ) + stats_type = ('activity', 'prepared', 'locks', 'config') + # Below check handle the case where existing server is deleted # by user and python server will raise exception if this check # is not introduce. if g.manager is None: - if f.__name__ in ['activity', 'prepared', 'locks', 'config']: + if f.__name__ in stats_type: return precondition_required( gettext("Please connect to the selected server" " to view the table.") @@ -187,7 +192,7 @@ def check_precondition(f): # If DB not connected then return error to browser if not g.conn.connected(): - if f.__name__ in ['activity', 'prepared', 'locks', 'config']: + if f.__name__ in stats_type: return precondition_required( gettext("Please connect to the selected server" " to view the table.") @@ -202,7 +207,7 @@ def check_precondition(f): db_conn = g.manager.connection(did=kwargs['did']) # If the selected DB not connected then return error to browser if not db_conn.connected(): - if f.__name__ in ['activity', 'prepared', 'locks', 'config']: + if f.__name__ in stats_type: return precondition_required( gettext("Please connect to the selected database" " to view the table.") @@ -220,8 +225,7 @@ def check_precondition(f): # Include server_type in template_path when server_type is gpdb g.template_path = 'dashboard/sql/' + ( '#{0}#{1}#'.format(g.server_type, g.version) - if g.server_type == 'gpdb' else - '#{0}#'.format(g.version) + if g.server_type == 'gpdb' else '#{0}#'.format(g.version) ) return f(*args, **kwargs) @@ -233,10 +237,14 @@ def check_precondition(f): @login_required def script(): """render the required javascript""" - return Response(response=render_template("dashboard/js/dashboard.js", - _=gettext), - status=200, - mimetype="application/javascript") + return Response( + response=render_template( + "dashboard/js/dashboard.js", + _=gettext + ), + status=200, + mimetype="application/javascript" + ) @blueprint.route('/', endpoint='index') @@ -283,16 +291,20 @@ def index(sid=None, did=None): if sid is None and did is None: return render_template('/dashboard/welcome_dashboard.html') if did is None: - return render_template('/dashboard/server_dashboard.html', - sid=sid, - rates=rates, - version=g.version) + return render_template( + '/dashboard/server_dashboard.html', + sid=sid, + rates=rates, + version=g.version + ) else: - return render_template('/dashboard/database_dashboard.html', - sid=sid, - did=did, - rates=rates, - version=g.version) + return render_template( + '/dashboard/database_dashboard.html', + sid=sid, + did=did, + rates=rates, + version=g.version + ) def get_data(sid, did, template): @@ -327,7 +339,8 @@ def get_data(sid, did, template): @blueprint.route('/session_stats/', endpoint='session_stats') @blueprint.route( - '/session_stats/', endpoint='get_session_stats_by_sever_id') + '/session_stats/', endpoint='get_session_stats_by_sever_id' +) @blueprint.route( '/session_stats//', endpoint='get_session_stats_by_database_id' @@ -362,7 +375,8 @@ def tps_stats(sid=None, did=None): @blueprint.route('/ti_stats/', endpoint='ti_stats') @blueprint.route('/ti_stats/', endpoint='ti_stats_by_server_id') @blueprint.route( - '/ti_stats//', endpoint='ti_stats_by_database_id') + '/ti_stats//', endpoint='ti_stats_by_database_id' +) @login_required @check_precondition def ti_stats(sid=None, did=None): @@ -377,7 +391,8 @@ def ti_stats(sid=None, did=None): @blueprint.route('/to_stats/', endpoint='to_stats') @blueprint.route('/to_stats/', endpoint='to_stats_by_server_id') @blueprint.route( - '/to_stats//', endpoint='to_stats_by_database_id') + '/to_stats//', endpoint='to_stats_by_database_id' +) @login_required @check_precondition def to_stats(sid=None, did=None): @@ -392,7 +407,8 @@ def to_stats(sid=None, did=None): @blueprint.route('/bio_stats/', endpoint='bio_stats') @blueprint.route('/bio_stats/', endpoint='bio_stats_by_server_id') @blueprint.route( - '/bio_stats//', endpoint='bio_stats_by_database_id') + '/bio_stats//', endpoint='bio_stats_by_database_id' +) @login_required @check_precondition def bio_stats(sid=None, did=None): @@ -407,7 +423,8 @@ def bio_stats(sid=None, did=None): @blueprint.route('/activity/', endpoint='activity') @blueprint.route('/activity/', endpoint='get_activity_by_server_id') @blueprint.route( - '/activity//', endpoint='get_activity_by_database_id') + '/activity//', endpoint='get_activity_by_database_id' +) @login_required @check_precondition def activity(sid=None, did=None): @@ -422,7 +439,8 @@ def activity(sid=None, did=None): @blueprint.route('/locks/', endpoint='locks') @blueprint.route('/locks/', endpoint='get_locks_by_server_id') @blueprint.route( - '/locks//', endpoint='get_locks_by_database_id') + '/locks//', endpoint='get_locks_by_database_id' +) @login_required @check_precondition def locks(sid=None, did=None): @@ -437,7 +455,8 @@ def locks(sid=None, did=None): @blueprint.route('/prepared/', endpoint='prepared') @blueprint.route('/prepared/', endpoint='get_prepared_by_server_id') @blueprint.route( - '/prepared//', endpoint='get_prepared_by_database_id') + '/prepared//', endpoint='get_prepared_by_database_id' +) @login_required @check_precondition def prepared(sid=None, did=None): @@ -488,6 +507,7 @@ def cancel_query(sid=None, did=None, pid=None): status=200 ) + @blueprint.route( '/terminate_session//', methods=['DELETE'] ) diff --git a/web/pgadmin/feature_tests/copy_selected_query_results_feature_test.py b/web/pgadmin/feature_tests/copy_selected_query_results_feature_test.py index 04eee99..e26e993 100644 --- a/web/pgadmin/feature_tests/copy_selected_query_results_feature_test.py +++ b/web/pgadmin/feature_tests/copy_selected_query_results_feature_test.py @@ -26,15 +26,18 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): ] def before(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") test_utils.create_database(self.server, "acceptance_test_db") - test_utils.create_table(self.server, "acceptance_test_db", "test_table") + test_utils.create_table( + self.server, "acceptance_test_db", "test_table") self.page.add_server(self.server) def runTest(self): @@ -62,7 +65,8 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): def _copies_rows(self): pyperclip.copy("old clipboard contents") - self.page.find_by_xpath("//*[contains(@class, 'slick-row')]/*[1]").click() + self.page.find_by_xpath( + "//*[contains(@class, 'slick-row')]/*[1]").click() self.page.find_by_xpath("//*[@id='btn-copy-row']").click() @@ -71,7 +75,10 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): def _copies_columns(self): pyperclip.copy("old clipboard contents") - self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'some_column')]").click() + self.page.find_by_xpath( + "//*[@data-test='output-column-header' and " + "contains(., 'some_column')]" + ).click() self.page.find_by_xpath("//*[@id='btn-copy-row']").click() self.assertEqual( @@ -82,18 +89,24 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): def _copies_row_using_keyboard_shortcut(self): pyperclip.copy("old clipboard contents") - self.page.find_by_xpath("//*[contains(@class, 'slick-row')]/*[1]").click() + self.page.find_by_xpath( + "//*[contains(@class, 'slick-row')]/*[1]").click() - ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() + ActionChains(self.page.driver).key_down( + Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() self.assertEqual('"Some-Name"\t"6"\t"some info"', pyperclip.paste()) def _copies_column_using_keyboard_shortcut(self): pyperclip.copy("old clipboard contents") - self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'some_column')]").click() + self.page.find_by_xpath( + "//*[@data-test='output-column-header' and " + "contains(., 'some_column')]" + ).click() - ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() + ActionChains(self.page.driver).key_down( + Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() self.assertEqual( """\"Some-Name" @@ -105,12 +118,21 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): pyperclip.copy("old clipboard contents") top_left_cell = self.page.find_by_xpath( - "//div[contains(@class, 'slick-cell') and contains(., 'Some-Other-Name')]") - bottom_right_cell = self.page.find_by_xpath("//div[contains(@class, 'slick-cell') and contains(., '14')]") + "//div[contains(@class, 'slick-cell') and " + "contains(., 'Some-Other-Name')]" + ) + bottom_right_cell = self.page.find_by_xpath( + "//div[contains(@class, 'slick-cell') and contains(., '14')]") + + ActionChains( + self.page.driver + ).click_and_hold(top_left_cell).move_to_element( + bottom_right_cell + ).release(bottom_right_cell).perform() - ActionChains(self.page.driver).click_and_hold(top_left_cell).move_to_element(bottom_right_cell) \ - .release(bottom_right_cell).perform() - ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() + ActionChains( + self.page.driver + ).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() self.assertEqual("""\"Some-Other-Name"\t"22" "Yet-Another-Name"\t"14\"""", pyperclip.paste()) @@ -119,14 +141,24 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): pyperclip.copy("old clipboard contents") top_left_cell = self.page.find_by_xpath( - "//div[contains(@class, 'slick-cell') and contains(., 'Some-Other-Name')]") + "//div[contains(@class, 'slick-cell') and " + "contains(., 'Some-Other-Name')]" + ) initial_bottom_right_cell = self.page.find_by_xpath( "//div[contains(@class, 'slick-cell') and contains(., '14')]") - ActionChains(self.page.driver).click_and_hold(top_left_cell).move_to_element(initial_bottom_right_cell) \ - .release(initial_bottom_right_cell).perform() + ActionChains( + self.page.driver + ).click_and_hold(top_left_cell).move_to_element( + initial_bottom_right_cell + ).release(initial_bottom_right_cell).perform() - ActionChains(self.page.driver).key_down(Keys.SHIFT).send_keys(Keys.ARROW_RIGHT).key_up(Keys.SHIFT).perform() - ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() + ActionChains(self.page.driver).key_down(Keys.SHIFT).send_keys( + Keys.ARROW_RIGHT + ).key_up(Keys.SHIFT).perform() + + ActionChains(self.page.driver).key_down( + Keys.CONTROL + ).send_keys('c').key_up(Keys.CONTROL).perform() self.assertEqual("""\"Some-Other-Name"\t"22"\t"some other info" "Yet-Another-Name"\t"14"\t"cool info\"""", pyperclip.paste()) @@ -134,11 +166,16 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): def _shift_resizes_column_selection(self): pyperclip.copy("old clipboard contents") - self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'value')]").click() - ActionChains(self.page.driver).key_down(Keys.SHIFT).send_keys(Keys.ARROW_LEFT) \ - .key_up(Keys.SHIFT).perform() + self.page.find_by_xpath( + "//*[@data-test='output-column-header' and " + "contains(., 'value')]" + ).click() + + ActionChains(self.page.driver).key_down( + Keys.SHIFT).send_keys(Keys.ARROW_LEFT).key_up(Keys.SHIFT).perform() - ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() + ActionChains(self.page.driver).key_down( + Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() self.assertEqual( """\"Some-Name"\t"6" @@ -150,7 +187,9 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): pyperclip.copy("old clipboard contents") bottom_right_cell = self.page.find_by_xpath( - "//div[contains(@class, 'slick-cell') and contains(., 'cool info')]") + "//div[contains(@class, 'slick-cell') and " + "contains(., 'cool info')]" + ) load_button = self.page.find_by_xpath("//button[@id='btn-load-file']") ActionChains(self.page.driver).click_and_hold(bottom_right_cell) \ @@ -158,7 +197,8 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): .release(load_button) \ .perform() - ActionChains(self.page.driver).key_down(Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() + ActionChains(self.page.driver).key_down( + Keys.CONTROL).send_keys('c').key_up(Keys.CONTROL).perform() self.assertEqual('"cool info"', pyperclip.paste()) @@ -166,10 +206,12 @@ class CopySelectedQueryResultsFeatureTest(BaseFeatureTest): self.page.close_query_tool() self.page.remove_server(self.server) - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") diff --git a/web/pgadmin/feature_tests/pg_datatype_validation_test.py b/web/pgadmin/feature_tests/pg_datatype_validation_test.py index c12b181..011d576 100644 --- a/web/pgadmin/feature_tests/pg_datatype_validation_test.py +++ b/web/pgadmin/feature_tests/pg_datatype_validation_test.py @@ -25,7 +25,8 @@ try: with open(CURRENT_PATH + '/datatype_test.json') as data_file: test_data_configuration = json.load(data_file) config_data = test_data_configuration['tests'] - type_minimum_version = test_data_configuration['datatype_minimum_version'] + type_minimum_version = \ + test_data_configuration['datatype_minimum_version'] except Exception as e: print(str(e)) @@ -41,12 +42,14 @@ class PGDataypeFeatureTest(BaseFeatureTest): ] def before(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) self.timezone = int(test_utils.get_timezone_without_dst(connection)) @@ -82,17 +85,20 @@ class PGDataypeFeatureTest(BaseFeatureTest): (By.XPATH, "//*[contains(string(), 'Show system objects?')]")) ) - self.page.find_by_css_selector(".ajs-dialog.pg-el-container .ajs-maximize").click() + self.page.find_by_css_selector( + ".ajs-dialog.pg-el-container .ajs-maximize").click() sql_editor = self.page.find_by_xpath( "//*[contains(@class,'aciTreeLi') and contains(.,'SQL Editor')]") sql_editor.find_element_by_xpath( - "//*[contains(@class,'aciTreeText') and contains(.,'Options')]")\ - .click() + "//*[contains(@class,'aciTreeText') and contains(.,'Options')]" + ).click() - insert_bracket_pairs_control= self.page.find_by_xpath( - "//div[contains(@class,'pgadmin-control-group') and contains(.,'Insert bracket pairs?')]") + insert_bracket_pairs_control = self.page.find_by_xpath( + "//div[contains(@class,'pgadmin-control-group') and " + "contains(.,'Insert bracket pairs?')]" + ) switch_btn = insert_bracket_pairs_control.\ find_element_by_class_name('bootstrap-switch') @@ -103,7 +109,9 @@ class PGDataypeFeatureTest(BaseFeatureTest): # save and close the preference dialog. self.page.find_by_xpath( - "//*[contains(@class,'pg-alertify-button') and contains(.,'OK')]").click() + "//*[contains(@class,'pg-alertify-button') and " + "contains(.,'OK')]" + ).click() self.page.wait_for_element_to_disappear( lambda driver: driver.find_element_by_css_selector(".ajs-modal") @@ -129,12 +137,14 @@ class PGDataypeFeatureTest(BaseFeatureTest): def after(self): self.page.remove_server(self.server) - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") def _schema_node_expandable(self): @@ -155,9 +165,10 @@ class PGDataypeFeatureTest(BaseFeatureTest): wait.until(EC.presence_of_element_located( (By.XPATH, - "//*[contains(@class,'column-type') and contains(.,'{}')]".format(batch['datatype'][0]) - )) - ) + "//*[contains(@class,'column-type') and " + "contains(.,'{}')]".format(batch['datatype'][0]) + ) + )) canvas = wait.until(EC.presence_of_element_located( (By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas")) @@ -168,10 +179,12 @@ class PGDataypeFeatureTest(BaseFeatureTest): cells = canvas.find_elements_by_css_selector('.slick-cell') # remove first element as it is row number. cells.pop(0) - for val, cell, datatype in zip(batch['output'], cells, batch['datatype']): + for val, cell, datatype in zip( + batch['output'], cells, batch['datatype']): expected_output = batch['output'][cnt - 2] - if not self._is_datatype_available_in_current_database(datatype): + if not self._is_datatype_available_in_current_database( + datatype): cnt += 1 continue @@ -212,7 +225,7 @@ class PGDataypeFeatureTest(BaseFeatureTest): if first: query += dataformatter.format(inputdata, datatype) else: - query += ','+dataformatter.format(inputdata, datatype) + query += ',' + dataformatter.format(inputdata, datatype) first = False return query + ';' @@ -236,4 +249,5 @@ class PGDataypeFeatureTest(BaseFeatureTest): self.page.click_modal('Yes') def _is_datatype_available_in_current_database(self, datatype): - return datatype == '' or self.database_version >= type_minimum_version[datatype] + valid_version = self.database_version >= type_minimum_version[datatype] + return datatype == '' or valid_version diff --git a/web/pgadmin/feature_tests/query_tool_journey_test.py b/web/pgadmin/feature_tests/query_tool_journey_test.py index 1e26b5a..ec12043 100644 --- a/web/pgadmin/feature_tests/query_tool_journey_test.py +++ b/web/pgadmin/feature_tests/query_tool_journey_test.py @@ -26,14 +26,17 @@ class QueryToolJourneyTest(BaseFeatureTest): ] def before(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'] + ) test_utils.drop_database(connection, "acceptance_test_db") test_utils.create_database(self.server, "acceptance_test_db") - test_utils.create_table(self.server, "acceptance_test_db", "test_table") + test_utils.create_table( + self.server, "acceptance_test_db", "test_table") self.page.add_server(self.server) def runTest(self): @@ -50,8 +53,10 @@ class QueryToolJourneyTest(BaseFeatureTest): def _test_copies_rows(self): pyperclip.copy("old clipboard contents") self.page.driver.switch_to.default_content() - self.page.driver.switch_to_frame(self.page.driver.find_element_by_tag_name("iframe")) - self.page.find_by_xpath("//*[contains(@class, 'slick-row')]/*[1]").click() + self.page.driver.switch_to_frame( + self.page.driver.find_element_by_tag_name("iframe")) + self.page.find_by_xpath( + "//*[contains(@class, 'slick-row')]/*[1]").click() self.page.find_by_xpath("//*[@id='btn-copy-row']").click() self.assertEqual('"Some-Name"\t"6"\t"some info"', @@ -61,8 +66,12 @@ class QueryToolJourneyTest(BaseFeatureTest): pyperclip.copy("old clipboard contents") self.page.driver.switch_to.default_content() - self.page.driver.switch_to_frame(self.page.driver.find_element_by_tag_name("iframe")) - self.page.find_by_xpath("//*[@data-test='output-column-header' and contains(., 'some_column')]").click() + self.page.driver.switch_to_frame( + self.page.driver.find_element_by_tag_name("iframe")) + self.page.find_by_xpath( + "//*[@data-test='output-column-header' and " + "contains(., 'some_column')]" + ).click() self.page.find_by_xpath("//*[@id='btn-copy-row']").click() self.assertTrue('"Some-Name"' in pyperclip.paste()) @@ -76,22 +85,32 @@ class QueryToolJourneyTest(BaseFeatureTest): self._execute_query("SELECT * FROM table_that_doesnt_exist") self.page.click_tab("Query History") - selected_history_entry = self.page.find_by_css_selector("#query_list .selected") - self.assertIn("SELECT * FROM table_that_doesnt_exist", selected_history_entry.text) + selected_history_entry = self.page.find_by_css_selector( + "#query_list .selected") + self.assertIn("SELECT * FROM table_that_doesnt_exist", + selected_history_entry.text) failed_history_detail_pane = self.page.find_by_id("query_detail") - self.assertIn("Error Message relation \"table_that_doesnt_exist\" does not exist", failed_history_detail_pane.text) + self.assertIn( + "Error Message relation \"table_that_doesnt_exist\" " + "does not exist", failed_history_detail_pane.text + ) ActionChains(self.page.driver) \ .send_keys(Keys.ARROW_DOWN) \ .perform() - selected_history_entry = self.page.find_by_css_selector("#query_list .selected") - self.assertIn("SELECT * FROM test_table ORDER BY value", selected_history_entry.text) + selected_history_entry = self.page.find_by_css_selector( + "#query_list .selected") + self.assertIn("SELECT * FROM test_table ORDER BY value", + selected_history_entry.text) selected_history_detail_pane = self.page.find_by_id("query_detail") - self.assertIn("SELECT * FROM test_table ORDER BY value", selected_history_detail_pane.text) - newly_selected_history_entry = self.page.find_by_xpath("//*[@id='query_list']/ul/li[2]") + self.assertIn("SELECT * FROM test_table ORDER BY value", + selected_history_detail_pane.text) + newly_selected_history_entry = self.page.find_by_xpath( + "//*[@id='query_list']/ul/li[2]") self.page.click_element(newly_selected_history_entry) selected_history_detail_pane = self.page.find_by_id("query_detail") - self.assertIn("SELECT * FROM table_that_doesnt_exist", selected_history_detail_pane.text) + self.assertIn("SELECT * FROM table_that_doesnt_exist", + selected_history_detail_pane.text) self.__clear_query_tool() @@ -104,7 +123,8 @@ class QueryToolJourneyTest(BaseFeatureTest): self.page.click_tab("Query History") - query_we_need_to_scroll_to = self.page.find_by_xpath("//*[@id='query_list']/ul/li[17]") + query_we_need_to_scroll_to = self.page.find_by_xpath( + "//*[@id='query_list']/ul/li[17]") self.page.click_element(query_we_need_to_scroll_to) diff --git a/web/pgadmin/feature_tests/query_tool_tests.py b/web/pgadmin/feature_tests/query_tool_tests.py index 63c29d0..a0b3713 100644 --- a/web/pgadmin/feature_tests/query_tool_tests.py +++ b/web/pgadmin/feature_tests/query_tool_tests.py @@ -29,12 +29,14 @@ class QueryToolFeatureTest(BaseFeatureTest): ] def before(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") test_utils.create_database(self.server, "acceptance_test_db") self.page.wait_for_spinner_to_disappear() @@ -96,12 +98,14 @@ class QueryToolFeatureTest(BaseFeatureTest): def after(self): self.page.remove_server(self.server) - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") def _reset_options(self): @@ -152,7 +156,9 @@ class QueryToolFeatureTest(BaseFeatureTest): ON_DEMAND_CHUNKS = 2 row_id_to_find = config.ON_DEMAND_RECORD_COUNT * ON_DEMAND_CHUNKS - query = """-- On demand query result on scroll, grid select all, column select all + query = """-- On demand query result on scroll +-- Grid select all +-- Column select all SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format( config.ON_DEMAND_RECORD_COUNT * ON_DEMAND_CHUNKS) @@ -172,7 +178,8 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format( # scroll to bottom to fetch next chunk of result set. self.driver.execute_script( - "pgAdmin.SqlEditor.jquery('.slick-viewport').scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());" + "pgAdmin.SqlEditor.jquery('.slick-viewport')" + ".scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());" ) canvas = wait.until(EC.presence_of_element_located( @@ -214,9 +221,10 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format( # click on first data column to select all column. wait.until(EC.presence_of_element_located( - ( - By.XPATH, - "//span[contains(@class, 'column-name') and contains(., 'id1')]")) + ( + By.XPATH, + "//span[contains(@class, 'column-name') " + "and contains(., 'id1')]")) ).click() canvas = wait.until(EC.presence_of_element_located( @@ -228,7 +236,8 @@ SELECT generate_series(1, {}) as id1, 'dummy' as id2""".format( def _check_ondemand_result(self, row_id_to_find, canvas): # scroll to bottom to bring last row of next chunk in viewport. self.driver.execute_script( - "pgAdmin.SqlEditor.jquery('.slick-viewport').scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());" + "pgAdmin.SqlEditor.jquery('.slick-viewport')" + ".scrollTop(pgAdmin.SqlEditor.jquery('.grid-canvas').height());" ) canvas.find_element_by_xpath( @@ -325,7 +334,8 @@ CREATE TABLE public.{}();""".format(table_name) self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "CREATE TABLE")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "CREATE TABLE")]' ) self._clear_query_tool() @@ -339,7 +349,8 @@ ROLLBACK;""" self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "ROLLBACK")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "ROLLBACK")]' ) self._clear_query_tool() @@ -347,7 +358,8 @@ ROLLBACK;""" -- 2. (Done) Create table in public schema. -- 3. (Done) ROLLBACK transaction. -- 4. Check if table is *NOT* created. -SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;""" +SELECT relname FROM pg_class + WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;""" self.page.fill_codemirror_area_with(query) self.page.find_by_id("btn-flash").click() self.page.wait_for_query_tool_loading_indicator_to_disappear() @@ -355,9 +367,14 @@ SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2 canvas = wait.until(EC.presence_of_element_located( (By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))) - el = canvas.find_elements_by_xpath("//div[contains(@class, 'slick-cell') and contains(text(), '{}')]".format(table_name)) + el = canvas.find_elements_by_xpath( + "//div[contains(@class, 'slick-cell') and " + "contains(text(), '{}')]".format(table_name)) - assert len(el) == 0, "Table '{}' created with auto commit disabled and without any explicit commit.".format(table_name) + assert len(el) == 0, "Table '{}' created with auto commit disabled " \ + "and without any explicit commit.".format( + table_name + ) def _query_tool_auto_commit_enabled(self): @@ -400,7 +417,8 @@ CREATE TABLE public.{}();""".format(table_name) self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "CREATE TABLE")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "CREATE TABLE")]' ) self._clear_query_tool() @@ -415,7 +433,8 @@ ROLLBACK;""" self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "ROLLBACK")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "ROLLBACK")]' ) self._clear_query_tool() @@ -424,7 +443,8 @@ ROLLBACK;""" -- 3. (Done) Create table in public schema. -- 4. (Done) ROLLBACK transaction -- 5. Check if table is created event after ROLLBACK. -SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;""" +SELECT relname FROM pg_class + WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;""" self.page.fill_codemirror_area_with(query) self.page.find_by_id("btn-flash").click() self.page.click_tab('Data Output') @@ -433,9 +453,12 @@ SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2 canvas = wait.until(EC.presence_of_element_located( (By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))) - el = canvas.find_elements_by_xpath("//div[contains(@class, 'slick-cell') and contains(text(), '{}')]".format(table_name)) + el = canvas.find_elements_by_xpath( + "//div[contains(@class, 'slick-cell') and " + "contains(text(), '{}')]".format(table_name)) - assert len(el) != 0, "Table '{}' is not created with auto commit enabled.".format(table_name) + assert len(el) != 0, "Table '{}' is not created with auto " \ + "commit enabled.".format(table_name) def _query_tool_auto_rollback_enabled(self): table_name = 'query_tool_auto_rollback_enabled_table' @@ -473,7 +496,8 @@ CREATE TABLE public.{}();""".format(table_name) self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "CREATE TABLE")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "CREATE TABLE")]' ) self._clear_query_tool() @@ -489,7 +513,8 @@ SELECT 1/0;""" self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "division by zero")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "division by zero")]' ) self._clear_query_tool() @@ -506,7 +531,8 @@ END;""" self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "Query returned successfully")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "Query returned successfully")]' ) self._clear_query_tool() @@ -516,7 +542,8 @@ END;""" -- 4. (Done) Generate error in transaction. -- 5. (Done) END transaction. -- 6. Check if table is *NOT* created after ending transaction. -SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;""" +SELECT relname FROM pg_class + WHERE relkind IN ('r','s','t') and relnamespace = 2200::oid;""" self.page.fill_codemirror_area_with(query) self.page.find_by_id("btn-flash").click() self.page.wait_for_query_tool_loading_indicator_to_disappear() @@ -524,9 +551,12 @@ SELECT relname FROM pg_class WHERE relkind IN ('r','s','t') and relnamespace = 2 canvas = wait.until(EC.presence_of_element_located( (By.CSS_SELECTOR, "#datagrid .slick-viewport .grid-canvas"))) - el = canvas.find_elements_by_xpath("//div[contains(@class, 'slick-cell') and contains(text(), '{}')]".format(table_name)) + el = canvas.find_elements_by_xpath( + "//div[contains(@class, 'slick-cell') and " + "contains(text(), '{}')]".format(table_name)) - assert len(el) == 0, "Table '{}' created even after ROLLBACK due to sql error.".format(table_name) + assert len(el) == 0, "Table '{}' created even after ROLLBACK due to " \ + "sql error.".format(table_name) def _query_tool_cancel_query(self): query = """-- 1. END any open transaction. @@ -548,8 +578,8 @@ SELECT 1, pg_sleep(300)""" # if auto rollback is disabled then 'i' element will # have 'auto-rollback fa fa-check visibility-hidden' classes - if 'auto-rollback fa fa-check' == str(auto_rollback_check.get_attribute( - 'class')): + if 'auto-rollback fa fa-check' == str( + auto_rollback_check.get_attribute('class')): auto_rollback_btn.click() auto_commit_btn = self.page.find_by_id("btn-auto-commit") @@ -561,8 +591,8 @@ SELECT 1, pg_sleep(300)""" # if auto commit is disabled then 'i' element will # have 'auto-commit fa fa-check visibility-hidden' classes - if 'auto-commit fa fa-check visibility-hidden' == str(auto_commit_check.get_attribute( - 'class')): + if 'auto-commit fa fa-check visibility-hidden' == str( + auto_commit_check.get_attribute('class')): auto_commit_btn.click() self.page.find_by_id("btn-flash").click() @@ -571,14 +601,17 @@ SELECT 1, pg_sleep(300)""" self.page.wait_for_query_tool_loading_indicator_to_disappear() self.page.click_tab('Messages') self.page.find_by_xpath( - '//div[contains(@class, "sql-editor-message") and contains(string(), "canceling statement due to user request")]' + '//div[contains(@class, "sql-editor-message") and ' + 'contains(string(), "canceling statement due to user request")]' ) def _test_explain_plan_feature(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) return connection.server_version > 90100 diff --git a/web/pgadmin/feature_tests/table_ddl_feature_test.py b/web/pgadmin/feature_tests/table_ddl_feature_test.py index 401d76d..1876001 100644 --- a/web/pgadmin/feature_tests/table_ddl_feature_test.py +++ b/web/pgadmin/feature_tests/table_ddl_feature_test.py @@ -19,12 +19,14 @@ class TableDdlFeatureTest(BaseFeatureTest): ] def before(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") test_utils.create_database(self.server, "acceptance_test_db") @@ -32,7 +34,8 @@ class TableDdlFeatureTest(BaseFeatureTest): self.page.add_server(self.server) def runTest(self): - test_utils.create_table(self.server, "acceptance_test_db", "test_table") + test_utils.create_table( + self.server, "acceptance_test_db", "test_table") self.page.toggle_open_server(self.server['name']) self.page.toggle_open_tree_item('Databases') @@ -44,14 +47,17 @@ class TableDdlFeatureTest(BaseFeatureTest): self.page.click_tab("SQL") self.page.find_by_xpath( - "//*[contains(@class,'CodeMirror-lines') and contains(.,'CREATE TABLE public.test_table')]") + "//*[contains(@class,'CodeMirror-lines') and " + "contains(.,'CREATE TABLE public.test_table')]") def after(self): self.page.remove_server(self.server) - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") diff --git a/web/pgadmin/feature_tests/view_data_dml_queries.py b/web/pgadmin/feature_tests/view_data_dml_queries.py index 52b8055..0423ec1 100644 --- a/web/pgadmin/feature_tests/view_data_dml_queries.py +++ b/web/pgadmin/feature_tests/view_data_dml_queries.py @@ -22,7 +22,8 @@ CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) try: with open(CURRENT_PATH + '/test_data.json') as data_file: - config_data = json.load(data_file)['table_insert_update_cases']['add_update'] + config_data = json.load(data_file)[ + 'table_insert_update_cases']['add_update'] except Exception as e: print(str(e)) @@ -44,8 +45,8 @@ class CheckForViewDataTest(BaseFeatureTest): """ scenarios = [ - ("Validate Insert, Update operations in View/Edit data with given test " - "data", + ("Validate Insert, Update operations in View/Edit data with " + "given test data", dict()) ] @@ -58,7 +59,8 @@ CREATE TABLE public.defaults id serial NOT NULL, number_defaults numeric(100) DEFAULT 1, number_null numeric(100), - text_defaults text COLLATE pg_catalog."default" DEFAULT 'Hello World'::text, + text_defaults text COLLATE pg_catalog."default" + DEFAULT 'Hello World'::text, text_null1 text COLLATE pg_catalog."default", text_null2 text COLLATE pg_catalog."default", text_null3 text COLLATE pg_catalog."default", @@ -82,14 +84,18 @@ CREATE TABLE public.defaults def before(self): with test_utils.Database(self.server) as (connection, _): if connection.server_version < 90100: - self.skipTest("COLLATE is not present in PG versions below v9.1") - - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + self.skipTest( + "COLLATE is not present in PG versions below v9.1" + ) + + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") test_utils.create_database(self.server, "acceptance_test_db") @@ -120,12 +126,14 @@ CREATE TABLE public.defaults def after(self): self.page.remove_server(self.server) - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") @staticmethod @@ -138,7 +146,7 @@ CREATE TABLE public.defaults xpath_grid_row = "//*[contains(@class, 'ui-widget-content') " \ "and contains(@style, 'top:25px')]" - xpath_row_cell = '//div[contains(@class, "'+cell+'")]' + xpath_row_cell = '//div[contains(@class, "' + cell + '")]' xpath_cell = '{0}{1}'.format(xpath_grid_row, xpath_row_cell) @@ -149,7 +157,7 @@ CREATE TABLE public.defaults wait = WebDriverWait(self.driver, 5) try: wait.until(EC.text_to_be_present_in_element( - (By.XPATH, xpath+"//span"), str(value)), + (By.XPATH, xpath + "//span"), str(value)), CheckForViewDataTest.TIMEOUT_STRING ) except Exception: @@ -192,16 +200,20 @@ CREATE TABLE public.defaults ActionChains(self.driver).send_keys(value).perform() # Click on editor's Save button - self.page.find_by_xpath("//*[contains(@class, 'pg_text_editor')]" - "//button[contains(@class, 'fa-save')]").click() + self.page.find_by_xpath( + "//*[contains(@class, 'pg_text_editor')]" + "//button[contains(@class, 'fa-save')]" + ).click() else: # Boolean editor test for to True click if data[1] == 'true': - checkbox_el = cell_el.find_element_by_xpath(".//*[contains(@class, 'multi-checkbox')]") + checkbox_el = cell_el.find_element_by_xpath( + ".//*[contains(@class, 'multi-checkbox')]") checkbox_el.click() # Boolean editor test for to False click elif data[1] == 'false': - checkbox_el = cell_el.find_element_by_xpath(".//*[contains(@class, 'multi-checkbox')]") + checkbox_el = cell_el.find_element_by_xpath( + ".//*[contains(@class, 'multi-checkbox')]") # Sets true checkbox_el.click() # Sets false @@ -217,10 +229,11 @@ CREATE TABLE public.defaults def _view_data_grid(self): self.page.driver.find_element_by_link_text("Object").click() - ActionChains(self.page.driver) \ - .move_to_element( - self.page.driver.find_element_by_link_text("View/Edit Data")) \ - .perform() + ActionChains( + self.page.driver + ).move_to_element( + self.page.driver.find_element_by_link_text("View/Edit Data") + ).perform() self.page.find_by_partial_link_text("All Rows").click() # wait until datagrid frame is loaded. @@ -274,7 +287,7 @@ CREATE TABLE public.defaults def _add_row(self): for idx in range(1, len(config_data.keys()) + 1): cell_xpath = CheckForViewDataTest._get_cell_xpath( - 'r'+str(idx), 1 + 'r' + str(idx), 1 ) time.sleep(0.2) self._update_cell(cell_xpath, config_data[str(idx)]) diff --git a/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py b/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py index 0ef0e69..4dc082c 100644 --- a/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py +++ b/web/pgadmin/feature_tests/xss_checks_panels_and_query_tool_test.py @@ -31,21 +31,26 @@ class CheckForXssFeatureTest(BaseFeatureTest): ] def before(self): - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") test_utils.create_database(self.server, "acceptance_test_db") - test_utils.create_table(self.server, "acceptance_test_db", - "

X") + test_utils.create_table( + self.server, "acceptance_test_db", "

X" + ) # This is needed to test dependents tab (eg: BackGrid) - test_utils.create_constraint(self.server, "acceptance_test_db", - "

X", - "unique", "

Y") + test_utils.create_constraint( + self.server, "acceptance_test_db", + "

X", + "unique", "

Y" + ) def runTest(self): self.page.wait_for_spinner_to_disappear() @@ -62,12 +67,14 @@ class CheckForXssFeatureTest(BaseFeatureTest): def after(self): self.page.remove_server(self.server) - connection = test_utils.get_db_connection(self.server['db'], - self.server['username'], - self.server['db_password'], - self.server['host'], - self.server['port'], - self.server['sslmode']) + connection = test_utils.get_db_connection( + self.server['db'], + self.server['username'], + self.server['db_password'], + self.server['host'], + self.server['port'], + self.server['sslmode'] + ) test_utils.drop_database(connection, "acceptance_test_db") def _tables_node_expandable(self): @@ -106,7 +113,8 @@ class CheckForXssFeatureTest(BaseFeatureTest): self.page.click_tab("SQL") # Fetch the inner html & check for escaped characters source_code = self.page.find_by_xpath( - "//*[contains(@class,'CodeMirror-lines') and contains(.,'CREATE TABLE')]" + "//*[contains(@class,'CodeMirror-lines') and " + "contains(.,'CREATE TABLE')]" ).get_attribute('innerHTML') self._check_escaped_characters( @@ -115,11 +123,13 @@ class CheckForXssFeatureTest(BaseFeatureTest): "SQL tab (Code Mirror)" ) - def _check_xss_in_dependents_tab(self): # Create any constraint with xss name to test this + # Create any constraint with xss name to test this + def _check_xss_in_dependents_tab(self): self.page.click_tab("Dependents") source_code = self.page.find_by_xpath( - "//*[@id='5']/table/tbody/tr/td/div/div/div[2]/table/tbody/tr/td[2]" + "//*[@id='5']/table/tbody/tr/td/div/div/div[2]/" + "table/tbody/tr/td[2]" ).get_attribute('innerHTML') self._check_escaped_characters( @@ -138,7 +148,8 @@ class CheckForXssFeatureTest(BaseFeatureTest): self.page.find_by_id("btn-flash").click() result_row = self.page.find_by_xpath( - "//*[contains(@class, 'ui-widget-content') and contains(@style, 'top:0px')]" + "//*[contains(@class, 'ui-widget-content') and " + "contains(@style, 'top:0px')]" ) cells = result_row.find_elements_by_tag_name('div') @@ -154,4 +165,5 @@ class CheckForXssFeatureTest(BaseFeatureTest): def _check_escaped_characters(self, source_code, string_to_find, source): # For XSS we need to search against element's html code - assert source_code.find(string_to_find) != -1, "{0} might be vulnerable to XSS ".format(source) + assert source_code.find(string_to_find) != - \ + 1, "{0} might be vulnerable to XSS ".format(source) diff --git a/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py b/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py index a54a7ed..24a463b 100644 --- a/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py +++ b/web/pgadmin/feature_tests/xss_checks_pgadmin_debugger_test.py @@ -26,11 +26,15 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest): def before(self): with test_utils.Database(self.server) as (connection, _): if connection.server_version < 90100: - self.skipTest("Functions tree node is not present in pgAdmin below PG v9.1") + self.skipTest( + "Functions tree node is not present in pgAdmin below " + "PG v9.1" + ) # Some test function is needed for debugger - test_utils.create_debug_function(self.server, "postgres", - "a_test_function") + test_utils.create_debug_function( + self.server, "postgres", "a_test_function" + ) def runTest(self): self.page.wait_for_spinner_to_disappear() @@ -40,7 +44,7 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest): def after(self): test_utils.drop_debug_function(self.server, "postgres", - "a_test_function") + "a_test_function") self.page.remove_server(self.server) def _function_node_expandable(self): @@ -54,17 +58,21 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest): def _debug_function(self): self.page.driver.find_element_by_link_text("Object").click() - ActionChains(self.page.driver) \ - .move_to_element(self.page.driver.find_element_by_link_text("Debugging")) \ - .perform() + ActionChains( + self.page.driver + ).move_to_element( + self.page.driver.find_element_by_link_text("Debugging") + ).perform() self.page.driver.find_element_by_link_text("Debug").click() # We need to check if debugger plugin is installed or not try: wait = WebDriverWait(self.page.driver, 2) is_error = wait.until(EC.presence_of_element_located( - (By.XPATH, "//div[contains(@class, 'alertify') and not(contains(@class, 'ajs-hidden'))]//div[contains(@class,'ajs-header')]")) - ) + (By.XPATH, "//div[contains(@class, 'alertify') and " + "not(contains(@class, 'ajs-hidden'))]//div[" + "contains(@class,'ajs-header')]") + )) except TimeoutException as e: is_error = None @@ -88,7 +96,8 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest): ) wait.until(EC.presence_of_element_located( - (By.XPATH, "//td[contains(@class,'test_function') and contains(.,'Hello, pgAdmin4')]")) + (By.XPATH, "//td[contains(@class,'test_function') and " + "contains(.,'Hello, pgAdmin4')]")) ) # Only this tab is vulnerable rest are BackGrid & Code Mirror @@ -107,9 +116,11 @@ class CheckDebuggerForXssFeatureTest(BaseFeatureTest): def _close_debugger(self): self.page.driver.switch_to_default_content() self.page.click_element( - self.page.find_by_xpath("//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]") + self.page.find_by_xpath( + "//*[@id='dockerContainer']/div/div[3]/div/div[2]/div[1]") ) def _check_escaped_characters(self, source_code, string_to_find, source): # For XSS we need to search against element's html code - assert source_code.find(string_to_find) != -1, "{0} might be vulnerable to XSS ".format(source) + assert source_code.find(string_to_find) != - \ + 1, "{0} might be vulnerable to XSS ".format(source) diff --git a/web/pgadmin/feature_tests/xss_checks_roles_control_test.py b/web/pgadmin/feature_tests/xss_checks_roles_control_test.py index 7fd4819..36f5aa6 100644 --- a/web/pgadmin/feature_tests/xss_checks_roles_control_test.py +++ b/web/pgadmin/feature_tests/xss_checks_roles_control_test.py @@ -23,7 +23,8 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest): def before(self): with test_utils.Database(self.server) as (connection, _): if connection.server_version < 90100: - self.skipTest("Membership is not present in Postgres below PG v9.1") + self.skipTest( + "Membership is not present in Postgres below PG v9.1") # Some test function is needed for debugger test_utils.create_role(self.server, "postgres", @@ -45,11 +46,14 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest): self.page.remove_server(self.server) def _connects_to_server(self): - self.page.find_by_xpath("//*[@class='aciTreeText' and .='Servers']").click() + self.page.find_by_xpath( + "//*[@class='aciTreeText' and .='Servers']").click() self.page.driver.find_element_by_link_text("Object").click() - ActionChains(self.page.driver) \ - .move_to_element(self.page.driver.find_element_by_link_text("Create")) \ - .perform() + ActionChains( + self.page.driver + ).move_to_element( + self.page.driver.find_element_by_link_text("Create") + ).perform() self.page.find_by_partial_link_text("Server...").click() server_config = self.server @@ -57,8 +61,10 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest): self.page.find_by_partial_link_text("Connection").click() self.page.fill_input_by_field_name("host", server_config['host']) self.page.fill_input_by_field_name("port", server_config['port']) - self.page.fill_input_by_field_name("username", server_config['username']) - self.page.fill_input_by_field_name("password", server_config['db_password']) + self.page.fill_input_by_field_name( + "username", server_config['username']) + self.page.fill_input_by_field_name( + "password", server_config['db_password']) self.page.find_by_xpath("//button[contains(.,'Save')]").click() def _role_node_expandable(self): @@ -80,8 +86,12 @@ class CheckRoleMembershipControlFeatureTest(BaseFeatureTest): '<h1>test</h1>', 'Role Membership Control' ) - self.page.find_by_xpath("//button[contains(@type, 'cancel') and contains(.,'Cancel')]").click() + self.page.find_by_xpath( + "//button[contains(@type, 'cancel') and " + "contains(.,'Cancel')]" + ).click() def _check_escaped_characters(self, source_code, string_to_find, source): # For XSS we need to search against element's html code - assert source_code.find(string_to_find) != -1, "{0} might be vulnerable to XSS ".format(source) + assert source_code.find(string_to_find) != - \ + 1, "{0} might be vulnerable to XSS ".format(source) diff --git a/web/pgadmin/misc/__init__.py b/web/pgadmin/misc/__init__.py index df8212a..45eb598 100644 --- a/web/pgadmin/misc/__init__.py +++ b/web/pgadmin/misc/__init__.py @@ -11,7 +11,7 @@ import pgadmin.utils.driver as driver from flask import url_for, render_template, Response -from flask_babel import gettext as _ +from flask_babel import gettext from pgadmin.utils import PgAdminModule from pgadmin.utils.preferences import Preferences @@ -22,18 +22,20 @@ MODULE_NAME = 'misc' class MiscModule(PgAdminModule): def get_own_javascripts(self): - return [{ - 'name': 'pgadmin.misc.explain', - 'path': url_for('misc.index') + 'explain/explain', - 'preloaded': False - }, { - 'name': 'snap.svg', - 'path': url_for( - 'misc.static', filename='explain/vendor/snap.svg/' + ( - 'snap.svg' if config.DEBUG else 'snap.svg-min' - )), - 'preloaded': False - }] + return [ + { + 'name': 'pgadmin.misc.explain', + 'path': url_for('misc.index') + 'explain/explain', + 'preloaded': False + }, { + 'name': 'snap.svg', + 'path': url_for( + 'misc.static', filename='explain/vendor/snap.svg/' + ( + 'snap.svg' if config.DEBUG else 'snap.svg-min' + )), + 'preloaded': False + } + ] def get_own_stylesheets(self): stylesheets = [] @@ -46,18 +48,24 @@ class MiscModule(PgAdminModule): """ Register preferences for this module. """ - self.misc_preference = Preferences('miscellaneous', _('Miscellaneous')) + self.misc_preference = Preferences( + 'miscellaneous', gettext('Miscellaneous') + ) lang_options = [] for lang in config.LANGUAGES: - lang_options.append({'label': config.LANGUAGES[lang], - 'value': lang}) + lang_options.append( + { + 'label': config.LANGUAGES[lang], + 'value': lang + } + ) # Register options for the User language settings self.misc_preference.register( 'miscellaneous', 'user_language', - _("User language"), 'options', 'en', - category_label=_('User language'), + gettext("User language"), 'options', 'en', + category_label=gettext('User language'), options=lang_options ) @@ -102,7 +110,8 @@ def explain_js(): """ return Response( response=render_template( - "explain/js/explain.js", _=_ + "explain/js/explain.js", + _=gettext ), status=200, mimetype="application/javascript" diff --git a/web/pgadmin/misc/bgprocess/__init__.py b/web/pgadmin/misc/bgprocess/__init__.py index 6a91b65..ce0d539 100644 --- a/web/pgadmin/misc/bgprocess/__init__.py +++ b/web/pgadmin/misc/bgprocess/__init__.py @@ -103,7 +103,6 @@ def acknowledge(pid): """ try: BatchProcess.acknowledge(pid) - return success_return() except LookupError as lerr: return gone(errormsg=str(lerr)) diff --git a/web/pgadmin/misc/bgprocess/process_executor.py b/web/pgadmin/misc/bgprocess/process_executor.py index b0316fb..ef82f1f 100644 --- a/web/pgadmin/misc/bgprocess/process_executor.py +++ b/web/pgadmin/misc/bgprocess/process_executor.py @@ -55,11 +55,13 @@ if _IS_PY2: else: def _log(msg): with open(_log_file, 'a') as fp: - fp.write(('INFO:: %s\n' % msg.encode('ascii', 'xmlcharrefreplace'))) + fp.write( + ('INFO:: %s\n' % msg.encode('ascii', 'xmlcharrefreplace')) + ) def _log_exception(): - type_, value_, traceback_ = info=sys.exc_info() + type_, value_, traceback_ = info = sys.exc_info() with open(_log_file, 'ab') as fp: from traceback import format_exception @@ -159,7 +161,7 @@ class ProcessLogger(Thread): Thread.__init__(self) self.process = None self.stream = None - self.logger = open(os.path.join(_out_dir, stream_type), 'wb') + self.logger = open(os.path.join(_out_dir, stream_type), 'wb') def attach_process_stream(self, process, stream): """ @@ -189,9 +191,15 @@ class ProcessLogger(Thread): # Write into log file if self.logger: if msg: - self.logger.write(get_current_time(format='%y%m%d%H%M%S%f').encode('utf-8')) + self.logger.write( + get_current_time( + format='%y%m%d%H%M%S%f' + ).encode('utf-8') + ) self.logger.write(b',') - self.logger.write(msg.lstrip(b'\r\n' if _IS_WIN else b'\n')) + self.logger.write( + msg.lstrip(b'\r\n' if _IS_WIN else b'\n') + ) self.logger.write(os.linesep.encode('utf-8')) return True @@ -215,7 +223,10 @@ class ProcessLogger(Thread): get_current_time( format='%y%m%d%H%M%S%f' ), - msg.lstrip(b'\r\n' if _IS_WIN else b'\n'), os.linesep + msg.lstrip( + b'\r\n' if _IS_WIN else b'\n' + ), + os.linesep ) ) @@ -253,9 +264,8 @@ def update_status(**kw): if _out_dir: status = dict( - (k, v) for k, v in kw.items() if k in [ - 'start_time', 'end_time', 'exit_code', 'pid' - ] + (k, v) for k, v in kw.items() + if k in ('start_time', 'end_time', 'exit_code', 'pid') ) _log('Updating the status:\n{0}'.format(json.dumps(status))) with open(os.path.join(_out_dir, 'status'), 'w') as fp: @@ -396,7 +406,7 @@ def convert_environment_variables(env): if not isinstance(value, str): value = value.encode(_sys_encoding) temp_env[key] = value - except Exception as e: + except Exception: _log_exception() return temp_env @@ -411,8 +421,8 @@ if __name__ == '__main__': _fs_encoding = sys.getfilesystemencoding() if not _fs_encoding or _fs_encoding == 'ascii': - # Fall back to 'utf-8', if we couldn't determine the file-system encoding, - # or 'ascii'. + # Fall back to 'utf-8', if we couldn't determine the file-system + # encoding or 'ascii'. _fs_encoding = 'utf-8' def u(_s, _encoding=_sys_encoding): @@ -442,14 +452,14 @@ if __name__ == '__main__': # the child process to run as a daemon. And, it would run without # depending on the status of the web-server. if 'PGA_BGP_FOREGROUND' in os.environ and \ - os.environ['PGA_BGP_FOREGROUND'] == "1": + os.environ['PGA_BGP_FOREGROUND'] == "1": _log('[CHILD] Start process execution...') # This is a child process running as the daemon process. # Let's do the job assigning to it. try: _log('Executing the command now from the detached child...') execute() - except: + except Exception: _log_exception() else: from subprocess import CREATE_NEW_PROCESS_GROUP @@ -464,10 +474,11 @@ if __name__ == '__main__': env['PGA_BGP_FOREGROUND'] = "1" # We need environment variables & values in string - _log('[PARENT] Converting the environment variable in the bytes format...') + _log('[PARENT] Converting the environment variable in the ' + 'bytes format...') try: env = convert_environment_variables(env) - except Exception as e: + except Exception: _log_exception() kwargs = { @@ -477,7 +488,7 @@ if __name__ == '__main__': 'creationflags': CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS, 'close_fds': False, 'cwd': _out_dir, - 'env': env + 'env': env } cmd = [sys.executable] diff --git a/web/pgadmin/misc/bgprocess/processes.py b/web/pgadmin/misc/bgprocess/processes.py index 39bb8e3..0c4112a 100644 --- a/web/pgadmin/misc/bgprocess/processes.py +++ b/web/pgadmin/misc/bgprocess/processes.py @@ -66,7 +66,9 @@ class BatchProcess(object): if 'id' in kwargs: self._retrieve_process(kwargs['id']) else: - self._create_process(kwargs['desc'], kwargs['cmd'], kwargs['args']) + self._create_process( + kwargs['desc'], kwargs['cmd'], kwargs['args'] + ) def _retrieve_process(self, _id): p = Process.query.filter_by(pid=_id, user_id=current_user.id).first() @@ -159,17 +161,25 @@ class BatchProcess(object): args_csv_io, delimiter=str(','), quoting=csv.QUOTE_MINIMAL ) if sys.version_info.major == 2: - csv_writer.writerow([a.encode('utf-8') if isinstance(a, unicode) else a for a in _args]) + csv_writer.writerow( + [ + a.encode('utf-8') + if isinstance(a, unicode) else a for a in _args + ] + ) else: csv_writer.writerow(_args) args_val = args_csv_io.getvalue().strip(str('\r\n')) j = Process( - pid=int(id), command=_cmd, - arguments=args_val.decode('utf-8', 'replace') if IS_PY2 and hasattr(args_val, 'decode') \ - else args_val, - logdir=log_dir, desc=dumps(self.desc), user_id=current_user.id + pid=int(id), + command=_cmd, + arguments=args_val.decode('utf-8', 'replace') + if IS_PY2 and hasattr(args_val, 'decode') else args_val, + logdir=log_dir, + desc=dumps(self.desc), + user_id=current_user.id ) db.session.add(j) db.session.commit() @@ -278,7 +288,9 @@ class BatchProcess(object): if os.name == 'nt' and IS_PY2: command = [] for c in cmd: - command.append(c.encode('utf-8') if isinstance(c, unicode) else str(c)) + command.append( + c.encode('utf-8') if isinstance(c, unicode) else str(c) + ) current_app.logger.info( u"Executing the process executor with the arguments: %s", @@ -288,7 +300,8 @@ class BatchProcess(object): cmd = command else: current_app.logger.info( - u"Executing the process executor with the arguments: %s", str(cmd) + u"Executing the process executor with the arguments: %s", + str(cmd) ) # Make a copy of environment, and add new variables to support @@ -318,8 +331,12 @@ class BatchProcess(object): stderr = open(stderr, "a") p = Popen( - cmd, close_fds=False, env=env, stdout=stdout.fileno(), - stderr=stderr.fileno(), stdin=stdin.fileno(), + cmd, + close_fds=False, + env=env, + stdout=stdout.fileno(), + stderr=stderr.fileno(), + stdin=stdin.fileno(), creationflags=(CREATE_NEW_PROCESS_GROUP | DETACHED_PROCESS) ) else: @@ -424,10 +441,12 @@ class BatchProcess(object): execution_time = (etime - stime).total_seconds() if process_output: - out, out_completed = read_log(self.stdout, stdout, out, ctime, - self.ecode) - err, err_completed = read_log(self.stderr, stderr, err, ctime, - self.ecode) + out, out_completed = read_log( + self.stdout, stdout, out, ctime, self.ecode + ) + err, err_completed = read_log( + self.stderr, stderr, err, ctime, self.ecode + ) else: out_completed = err_completed = False @@ -439,8 +458,16 @@ class BatchProcess(object): } return { - 'out': {'pos': out, 'lines': stdout, 'done': out_completed}, - 'err': {'pos': err, 'lines': stderr, 'done': err_completed}, + 'out': { + 'pos': out, + 'lines': stdout, + 'done': out_completed + }, + 'err': { + 'pos': err, + 'lines': stderr, + 'done': err_completed + }, 'start_time': self.stime, 'exit_code': self.ecode, 'execution_time': execution_time @@ -475,9 +502,8 @@ class BatchProcess(object): except ValueError as e: current_app.logger.warning( - _("Status for the background process '{0}' could not be loaded.").format( - p.pid - ) + _("Status for the background process '{0}' could " + "not be loaded.").format(p.pid) ) current_app.logger.exception(e) return False, False @@ -514,8 +540,8 @@ class BatchProcess(object): if isinstance(desc, IProcessDesc): args = [] args_csv = StringIO( - p.arguments.encode('utf-8') \ - if hasattr(p.arguments, 'decode') else p.arguments + p.arguments.encode('utf-8') + if hasattr(p.arguments, 'decode') else p.arguments ) args_reader = csv.reader(args_csv, delimiter=str(',')) for arg in args_reader: diff --git a/web/pgadmin/misc/file_manager/__init__.py b/web/pgadmin/misc/file_manager/__init__.py index c3beadb..b9be488 100644 --- a/web/pgadmin/misc/file_manager/__init__.py +++ b/web/pgadmin/misc/file_manager/__init__.py @@ -253,12 +253,12 @@ def file_manager_config(trans_id): show_hidden_files = pref.preference('show_hidden_files').get() return Response(response=render_template( - "file_manager/js/file_manager_config.json", - _=gettext, - data=data, - file_dialog_view=file_dialog_view, - show_hidden_files=show_hidden_files - ), + "file_manager/js/file_manager_config.json", + _=gettext, + data=data, + file_dialog_view=file_dialog_view, + show_hidden_files=show_hidden_files + ), status=200, mimetype="application/json" ) @@ -301,6 +301,7 @@ def save_last_directory_visited(trans_id): data={'status': True} ) + @blueprint.route( "/save_file_dialog_view/", methods=["POST"], endpoint='save_file_dialog_view' @@ -312,6 +313,7 @@ def save_file_dialog_view(trans_id): data={'status': True} ) + @blueprint.route( "/save_show_hidden_file_option/", methods=["PUT"], endpoint='save_show_hidden_file_option' @@ -331,7 +333,9 @@ class Filemanager(object): self.trans_id = trans_id self.patherror = encode_json( { - 'Error': gettext('No permission to operate on specified path.'), + 'Error': gettext( + 'No permission to operate on specified path.' + ), 'Code': 0 } ) @@ -407,7 +411,8 @@ class Filemanager(object): last_dir = last_dir[:-1] while last_dir: if os.path.exists( - (storage_dir if storage_dir is not None else '') + last_dir): + storage_dir + if storage_dir is not None else '' + last_dir): break if _platform == 'win32': index = max(last_dir.rfind('\\'), last_dir.rfind('/')) @@ -426,7 +431,8 @@ class Filemanager(object): # create configs using above configs configs = { - "fileroot": last_dir.replace('\\', '\\\\'), # for JS json compatibility + # for JS json compatibility + "fileroot": last_dir.replace('\\', '\\\\'), "dialog_type": fm_type, "title": title, "upload": { @@ -516,7 +522,7 @@ class Filemanager(object): drives.append(letter) bitmask >>= 1 if (drive_name != '' and drive_name is not None and - drive_name in drives): + drive_name in drives): return u"{0}{1}".format(drive_name, ':') else: return drives # return drives if no argument is passed @@ -577,7 +583,7 @@ class Filemanager(object): try: drive_size = getDriveSize(path) drive_size_in_units = sizeof_fmt(drive_size) - except: + except Exception: drive_size = 0 protected = 1 if drive_size == 0 else 0 files[file_name] = { @@ -606,10 +612,10 @@ class Filemanager(object): } user_dir = path - folders_only = trans_data['folders_only'] if 'folders_only' in \ - trans_data else '' - files_only = trans_data['files_only'] if 'files_only' in \ - trans_data else '' + folders_only = trans_data['folders_only'] \ + if 'folders_only' in trans_data else '' + files_only = trans_data['files_only'] \ + if 'files_only' in trans_data else '' supported_types = trans_data['supported_types'] \ if 'supported_types' in trans_data else [] @@ -622,7 +628,7 @@ class Filemanager(object): # continue if file/folder is hidden (based on user preference) if not is_show_hidden_files and \ - (is_folder_hidden(system_path) or f.startswith('.')): + (is_folder_hidden(system_path) or f.startswith('.')): continue user_path = os.path.join(os.path.join(user_dir, f)) @@ -645,8 +651,8 @@ class Filemanager(object): # filter files based on file_type if file_type is not None and file_type != "*": if folders_only or len(supported_types) > 0 and \ - file_extension not in supported_types or \ - file_type != file_extension: + file_extension not in supported_types or \ + file_type != file_extension: continue # create a list of files and folders @@ -790,8 +796,9 @@ class Filemanager(object): } if not path_exists(orig_path): - thefile['Error'] = gettext(u"'{0}' file does not exist.".format( - path)) + thefile['Error'] = gettext( + u"'{0}' file does not exist.".format(path) + ) thefile['Code'] = -1 return thefile @@ -822,7 +829,8 @@ class Filemanager(object): if not dir.endswith('/'): dir += u'/' - filelist = self.list_filesystem(dir, path, trans_data, file_type, show_hidden) + filelist = self.list_filesystem( + dir, path, trans_data, file_type, show_hidden) return filelist def rename(self, old=None, new=None, req=None): @@ -901,7 +909,8 @@ class Filemanager(object): } dir = self.dir if self.dir is not None else '' - path = path.encode('utf-8').decode('utf-8') if hasattr(str, 'decode') else path + path = path.encode( + 'utf-8').decode('utf-8') if hasattr(str, 'decode') else path orig_path = u"{0}{1}".format(dir, path) try: @@ -951,20 +960,23 @@ class Filemanager(object): file_obj = req.files['newfile'] file_name = file_obj.filename if hasattr(str, 'decode'): - path = req.form.get('currentpath').encode('utf-8').decode('utf-8') + path = req.form.get('currentpath').encode( + 'utf-8').decode('utf-8') file_name = file_obj.filename.encode('utf-8').decode('utf-8') orig_path = u"{0}{1}".format(dir, path) newName = u"{0}{1}".format(orig_path, file_name) with open(newName, 'wb') as f: while True: - data = file_obj.read(4194304) # 4MB chunk (4 * 1024 * 1024 Bytes) + # 4MB chunk (4 * 1024 * 1024 Bytes) + data = file_obj.read(4194304) if not data: break f.write(data) except Exception as e: code = 0 - err_msg = u"Error: {0}".format(e.strerror if hasattr(e, 'strerror') else u'Unknown') + err_msg = u"Error: {0}".format( + e.strerror if hasattr(e, 'strerror') else u'Unknown') try: Filemanager.check_access_permission(dir, path) @@ -998,7 +1010,8 @@ class Filemanager(object): path = path.encode('utf-8').decode('utf-8') try: orig_path = u"{0}{1}".format(dir, path) - Filemanager.check_access_permission(dir, u"{}{}".format(path, name)) + Filemanager.check_access_permission( + dir, u"{}{}".format(path, name)) newName = u"{0}{1}".format(orig_path, name) if not os.path.exists(newName): @@ -1059,14 +1072,14 @@ class Filemanager(object): # check if file type is text or binary text_chars = bytearray([7, 8, 9, 10, 12, 13, 27]) \ - + bytearray(range(0x20, 0x7f)) \ - + bytearray(range(0x80, 0x100)) + + bytearray(range(0x20, 0x7f)) \ + + bytearray(range(0x80, 0x100)) def is_binary_string(bytes_data): """Checks if string data is binary""" return bool( - bytes_data.translate(None, text_chars) - ) + bytes_data.translate(None, text_chars) + ) # read the file try: @@ -1180,11 +1193,13 @@ class Filemanager(object): orig_path = u"{0}{1}".format(dir, path) try: - Filemanager.check_access_permission(dir, u"{}{}".format( - path, path)) + Filemanager.check_access_permission( + dir, u"{}{}".format(path, path) + ) except Exception as e: resp = Response(gettext(u"Error: {0}".format(e))) - resp.headers['Content-Disposition'] = 'attachment; filename=' + name + resp.headers['Content-Disposition'] = \ + 'attachment; filename=' + name return resp name = path.split('/')[-1]