aboutsummarylogtreecommitdiffstats
path: root/nordquery.py
blob: c5ec647c2abe82e11d70c2579220e4ec0c10d83d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#! /usr/bin/python
# nordquery - A tool to find the right NordVPN server, written in Python
# Copyright (C) 2022  Simon Williams

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import sys, os
from pathlib import Path
from urllib import request
import json
import argparse
import configparser
from pycountry import countries
import socket
from timeit import default_timer as timer

features_dict = {'any':'','sta':'Standard VPN servers', 'ded':'Dedicated IP','doub':'Double VPN','obf':'Obfuscated Servers','p2p':'P2P', 'tor':'Onion Over VPN'}

def list_protocols():
    print('ikev2\nopenvpn_udp\nopenvpn_tcp\nsocks\nproxy\npptp\nl2tp\nopenvpn_xor_udp\nopenvpn_xor_tcp\nproxy_cybersec\n'
        'proxy_ssl\nproxy_ssl_cybersec\nikev2_v6\nopen_udp_v6\nopen_tcp_v6\nwireguard_udp\nopenvpn_udp_tls_crypt\n'
        'openvpn_tcp_tls_crypt\nopenvpn_dedicated_udp\nopenvpn_dedicated_tcp\nskylark\nmesh_relay')
    return
                
def ping(ip,port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.settimeout(1)
    try:
        s_start = timer()
        s.connect((ip,port))
        s.shutdown(socket.SHUT_RD)
        s_stop = timer()
        return int((s_stop - s_start)*1000)
    except:
        return -1
    finally:
        s.close()

def main(argv):
    db_path = '.cache/nordquery'
    db_filename = 'servers.db'
    
    config_path = '.config/nordquery'
    config_filename = 'nordquery.conf'
    
    parser = argparse.ArgumentParser(description='A tool to find the right NordVPN server')
        #usage='%(prog)s [-c COUNTRY -f FEATURES -p PROTOCOL]',)
    
    parser.add_argument('-c', '--country', help='Filter by country (ISO3166 codes)', type=str, default='')
    parser.add_argument('-f', '--features', help='Filter by features (any, sta(ndard), doub(le), obf(uscated), p2p, tor', 
        type=str, default='', nargs='+', choices=['any', 'sta', 'ded', 'doub', 'obf', 'p2p', 'tor'], metavar='features')
    parser.add_argument('-p', '--protocol', help='Filter by supported protocol', type=str, default='', nargs='+',
        choices=['ikev2','openvpn_udp','openvpn_tcp','socks','proxy','pptp','l2tp','openvpn_xor_udp','openvpn_xor_tcp',
        'proxy_cybersec','proxy_ssl','proxy_ssl_cybersec','ikev2_v6','open_udp_v6','open_tcp_v6','wireguard_udp',
        'openvpn_udp_tls_crypt','openvpn_tcp_tls_crypt','openvpn_dedicated_udp','openvpn_dedicated_tcp','skylark','mesh_relay'],
        metavar='protocol')
    parser.add_argument('-e', '--exclude' , help='Exclude these servers from the selection', type=str, default='', nargs='+')
    parser.add_argument('--list-protocols', help='Show list of protocols', action='store_true')
    parser.add_argument('-u', '--update', help='Update the server database', action='store_true')
    parser.add_argument('--server-url', help='Override the default server info URL', type=str, default='https://nordvpn.com/api/server')
    parser.add_argument('--no-ping', help='Don\'t ping the server to check connectivity', action='store_true')
    parser.add_argument('-v', '--verbose', help='Be verbose', action='store_true')
    
    args = parser.parse_args()
    
    if(args.list_protocols):
        list_protocols()
        sys.exit()
        
    ## Parse config file
    config_fullpath = os.path.join(os.environ['HOME'], config_path)
    
    confparse = configparser.ConfigParser()
    if(os.path.exists(os.path.join(config_fullpath, config_filename))): #Check file exists
        if(args.verbose): print('Config found: ' + os.path.join(config_fullpath, config_filename))
        confparse.read(os.path.join(config_fullpath, config_filename))
        try:
            defaults = confparse['defaults']
            if(args.verbose):
                print('Always update:', confparse.get('defaults','always_update',fallback=''))
                print('Country:', confparse.get('defaults','country',fallback=''))
                print('Features:', confparse.get('defaults','features',fallback=''))
        except:
            print("Warning: Config file invalid")
    ####
    
    ## Get server database
    if(confparse.get('defaults','db_path',fallback='') != ''):
        db_fullpath = defaults['db_path']
    else:
        db_fullpath = os.path.join(os.environ['HOME'], db_path)
        if(os.path.exists(db_fullpath) == False):
            if(args.verbose): print('DB path does not exist, making folder ' + db_fullpath)
            os.mkdir(db_fullpath)
    
    if(confparse.get('defaults','db_filename',fallback='') != ''):
        db_filename = defaults['db_filename']
    
    # Download server file if requested by argument, config or if file doesn't exist    
    if(args.update or 
    (not os.path.exists(os.path.join(db_fullpath, db_filename))) or 
    confparse.get('defaults','always_update',fallback='no') == 'yes'):
        if(args.verbose): print('Downloading server information from ' + args.server_url)
        try:
            data = request.urlopen(args.server_url).read()  # Download the server file
            db = json.loads(data)   # Parse the JSON data
            with open(os.path.join(db_fullpath, db_filename), 'w') as outfile:
                if(args.verbose): print('Writing DB file to ' + os.path.join(db_fullpath, db_filename))
                json.dump(db, outfile)  # Write the db file to disk
        except request.URLError:
            print('Download from:', args.server_url, 'failed')
            sys.exit()
        except json.decoder.JSONDecodeError:
            print('JSON decoding error, download from', args.server_url, 'invalid')
            sys.exit()
        except PermissionError:
            print('You don\'t have permission to write to', str(db_fullpath))
            sys.exit()
    else:
        if(args.verbose): print('Reading server information from ' + os.path.join(db_fullpath, db_filename))
        try:
            with open(os.path.join(db_fullpath, db_filename), 'r') as infile:
                db = json.load(infile)  # Load the db file from disk
        except:
            print('Reading database file', os.path.join(db_fullpath, db_filename), 'failed')
            sys.exit()
            
    ##Build server dictionary
    if(args.verbose): print('Building server dictionary')
    db_dict = {}
    for server in db:
        db_dict[server.get('id')] = server  # Link the server info to ID
    
    if(args.verbose): print('Database contains ' + str(len(db_dict)) + ' servers')
    
    ##Parse country argument
    if(args.country.upper() == 'ANY'):
        country_code = ''   # Match any country
    elif(args.country != ''):
        if(args.country.upper() == 'UK'):   ##Cleanup UK special case
            args.country = 'GB'
        try:
            country_list = countries.search_fuzzy(args.country) # Attempt to find the country
        except LookupError:
            print('Error: country', args.country, 'not found')
            sys.exit()
        country_code = country_list[0].alpha_2  # Take the first country from the returned list
        if(args.verbose): print('Country: ' + country_list[0].name)
    elif(confparse.get('defaults','country',fallback='ANY').upper() != 'ANY'):
        country_code = defaults['country'].upper()  # The country in config file
    else:
        country_code = ''   # Match any country
    
    ##Parse features argument
    features_input = []
    if(args.features != ''):
        features_input = args.features
    elif(confparse.get('defaults','features',fallback='') != ''):
        for string in defaults['features'].split(' '):
            features_input.append(string)
    else:
        features_input = ''
    #print(features_input)
    
    parsed_features = []
    for f in features_input:
        parsed_features.append(features_dict[f])
        if(args.verbose): print('Features: ' + features_dict[f])
    #print(parsed_features)
    
    if(args.verbose and args.protocol != ''): print('Protocols:', args.protocol)
    
    ##Merge --exlude and blacklist lists
    exclude_list = []
    for e in args.exclude:
        exclude_list.append(e)
    
    for e in confparse.get('defaults','blacklist',fallback='').split(' '):
        if((e != '') and (e not in exclude_list)): exclude_list.append(e)
        
    if(args.verbose and exclude_list != ''): print("Exclude list:", exclude_list)

    list_servers = []

    ##Filter the server list
    for server in db_dict.items():
        features = []
        if(parsed_features != []):  # Parse the features list
            for feature in server[1]['categories']:
                features.append(feature['name'])

        protocols = []
        if(args.protocol != ''):    # Parse the protocol list
            for protocol in server[1]['features'].keys():
                if(server[1]['features'][protocol] == True):
                    protocols.append(protocol)

        if((server[1]['flag'] == country_code or country_code == '') and    # Match country code
            (set(parsed_features).issubset(set(features)) or parsed_features == ['']) and # Match features
            (set(args.protocol).issubset(set(protocols)) or args.protocol == '') and    # Match protocols
            (server[1]['domain'].split('.')[0] not in exclude_list)):  # Filter out exluded servers
            list_servers.append((server[1]['domain'],server[1]['load'],server[1]['ip_address']))    # Add matched server to list
    
    if(args.verbose): 
        print('Found ' + str(len(list_servers)) + ' servers') 
        for server in list_servers:
            print(server[0] + ' load: ' + str(server[1]))
    
    if(list_servers != []):        
        list_servers.sort(key=lambda y: y[1])   # Sort the list of matched servers
        for server in list_servers:             
            if('Obfuscated Servers' in parsed_features):
                port = 80   # Obfuscated servers don't open port 443
            else:
                port = 443
                
            if(not args.no_ping):
                pinged = ping(server[2],port)
            else:
                pinged = 1
                
            if(pinged != -1):
                if(args.verbose): print('Recommended server: ')
                print(server[0].split('.')[0])
                if(args.verbose and not args.no_ping): print('Ping:', pinged, 'ms')
                break
            elif(args.verbose):
                print(server[0], 'timed out')
    else:
        if(args.verbose): print('No matching server found')
        
    return
    
if __name__ == '__main__':
    main(sys.argv[1:])