You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
7.8 KiB
Python

#!/usr/bin/python3
import argparse
import librouteros
import progress.bar
import ssl
import os
import yaml
import re
import dns.resolver, dns.reversename
# comment format:
# hostname.example.com [aM flag1 flag2 key1=value1 key2=option3:value3,option4:value4] selector.example.com
#
# flags:
# static - do not auto-enable or auto-disable this address (loopbacks, linknets)
# home - this is the normal home-location of the IP
# evacuated - this has been moved from this host to elsewhere
# evacuee - this is an evacuated address
aM1 = re.compile( r"\[aM ?([^]]*)\]" )
def parse_comment( comment ):
bits = aM1.split( comment )
if len( bits ) == 3:
hostname = bits[ 0 ].strip()
selector = bits[ 2 ].strip()
if not selector:
selector = hostname
rval = {}
for bit in bits[ 1 ].split():
try:
( k, v ) = bit.split( "=", 1 )
except ValueError:
if bit:
rval[ bit ] = True
else:
if ':' in v:
rval[ k ] = {}
for subbit in v.split( "," ):
( subk, subv ) = subbit.split( ":", 1 )
rval[ k ][ subk ] = subv
else:
rval[ k ] = v
return ( selector, rval, hostname )
else:
return ( None, None, comment )
def reverse_dns( address ):
try:
return str( dns.resolver.resolve( dns.reversename.from_address( address ), 'PTR' )[ 0 ] )
except:
return address
def make_comment( selector, rval, hostname ):
rvalbits = [ "aM" ]
for ( k, v ) in rval.items():
if v is True:
rvalbits.append( k )
elif isinstance( v, dict ):
rvalbits.append( "%s=%s" % ( k, ",".join( "%s:%s" % vi for vi in v.items() ) ) )
else:
rvalbits.append( "%s=%s" % ( k, v ) )
if selector == hostname:
return "%s [%s]" % ( hostname, " ".join( rvalbits ) )
else:
return "%s [%s] %s" % ( hostname, " ".join( rvalbits ), selector )
def connection( host, username = None, password = None, port = 8729 ):
if ':' in host:
( host, port ) = host.split( ":", 1 )
port = int( port )
kwargs = { 'username': username or os.environ.get( 'AUTOMONTY_USERNAME', None ),
'password': password or os.environ.get( 'AUTOMONTY_PASSWORD', None ),
'host': host,
'port': port,
}
kwargs[ 'ssl' ] = True
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False # XXX figure out how
ssl_option = 'CERT_REQUIRED'
ssl_ctx.verify_mode = getattr( ssl, ssl_option, ssl.CERT_REQUIRED )
ssl_ctx.set_ciphers( 'DHE-RSA-AES256-GCM-SHA384' )
kwargs[ 'ssl_wrapper' ] = ssl_ctx.wrap_socket
return librouteros.connect( **kwargs )
def connect_routers( config, args ):
routers = config.get( 'router', {} )
rval = {}
with progress.bar.PixelBar( 'Connecting', max = len( routers ) ) as bar:
for ( name, kwargs ) in routers.items():
if ( not args.only ) or ( name in args.only ):
if 'host' not in kwargs:
kwargs[ 'host' ] = name
rval[ name ] = connection( **kwargs )
bar.next()
return rval
def monty_check( config, args, routers ):
for addr in args.addr:
for ( name, api ) in routers.items():
ip_address = api.path( 'ip', 'address' )
ipv6_address = api.path( 'ipv6', 'address' )
for item in ip_address:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if sel == addr or hst == addr:
print( name, ": v4", 'ENABLED' if active else 'disabled', item[ 'interface' ] )
for item in ipv6_address:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if sel == addr or hst == addr:
print( name, ": v6", 'ENABLED' if active else 'disabled', item[ 'interface' ] )
def monty_fixup( config, args, routers ):
reverse = {}
statics = {}
for ( name, api ) in routers.items():
ip_address = api.path( 'ip', 'address' )
ipv6_address = api.path( 'ipv6', 'address' )
for item in ip_address:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
if sel is None:
rval = {}
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if item[ 'address' ] in config[ 'loopbacks' ]:
if active:
rval[ 'home' ] = True
else:
rval[ 'static' ] = True
statics[ item[ 'interface' ] ] = True
if not hst:
hst = reverse_dns( item[ 'network' ] )
while hst.endswith( "." ):
hst = hst[ :-1 ]
if not sel:
if item[ 'interface' ] not in reverse:
rev = reverse_dns( item[ 'network' ] )
while rev.endswith( "." ):
rev = rev[ :-1 ]
reverse[ item[ 'interface' ] ] = rev
sel = rev
new_comment = make_comment( sel, rval, hst )
if args.dry_run:
print( item[ 'interface' ], item[ 'address' ], '->', new_comment )
else:
print( name, ":", item[ 'interface' ], new_comment )
ip_address.update( **{ 'comment': new_comment, '.id': item[ '.id' ] } )
for item in ipv6_address:
if item[ 'interface' ].startswith( "loop" ):
continue
if item[ 'dynamic' ]:
continue
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
if sel is None:
rval = {}
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if statics.get( item[ 'interface' ], False ):
rval[ 'static' ] = True
elif active:
rval[ 'home' ] = True
if not sel:
sel = reverse.get( item[ 'interface' ], "XXX " + item[ 'interface' ] )
if not hst:
hst = reverse.get( item[ 'interface' ], "XXX " + item[ 'interface' ] )
new_comment = make_comment( sel, rval, hst )
if args.dry_run:
print( item[ 'interface' ], item[ 'address' ], '->', new_comment )
else:
print( name, ":", item[ 'interface' ], new_comment )
ipv6_address.update( **{ 'comment': new_comment, '.id': item[ '.id' ] } )
def load_configuration():
try:
config = open( os.path.expanduser( '~/.automonty.yaml' ), 'r' )
except IOError:
return {}
return yaml.load( config.read(), yaml.SafeLoader )
def main():
config = load_configuration()
parser = argparse.ArgumentParser( prog = "automonty",
description = 'AutoMonty (re-)configures routers',
)
subparsers = parser.add_subparsers()
parser.add_argument( '--only', action = 'append' )
parser.add_argument( '-n', '--dry-run', action = 'store_true' )
parser_check = subparsers.add_parser( 'check' )
parser_check.add_argument( 'addr', nargs = "*" )
parser_check.set_defaults( func = monty_check )
parser_fixup = subparsers.add_parser( 'fixup' )
parser_fixup.set_defaults( func = monty_fixup )
args = parser.parse_args()
routers = connect_routers( config, args )
args.func( config, args, routers )
if __name__ == '__main__':
main()