You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
361 lines
16 KiB
361 lines
16 KiB
import argparse
import librouteros
import ssl
import os
import yaml
import re
import dns.resolver, dns.reversename
import ipaddress
# comment format:
# [aM flag1 flag2 key1=value1 key2=option3:value3,option4:value4]
# flags:
# static - do not auto-enable or auto-disable this address (loopbacks, linknets)
# home - this is the normal home-location of the IP
# evacuated - this has been moved from this host to elsewhere
# evacuee - this is an evacuated address
# teardown - being destroyed
aM1 = re.compile( r"\[aM ?([^]]*)\]" )
def parse_comment( comment ):
bits = aM1.split( comment )
if len( bits ) == 3:
hostname = bits[ 0 ].strip()
selector = bits[ 2 ].strip()
if not selector:
selector = hostname
rval = {}
for bit in bits[ 1 ].split():
( k, v ) = bit.split( "=", 1 )
except ValueError:
if bit:
rval[ bit ] = True
if ':' in v:
rval[ k ] = {}
for subbit in v.split( "," ):
( subk, subv ) = subbit.split( ":", 1 )
rval[ k ][ subk ] = subv
rval[ k ] = v
return ( selector, rval, hostname )
return ( None, None, comment )
def reverse_dns( address ):
return str( dns.resolver.resolve( dns.reversename.from_address( address ), 'PTR' )[ 0 ] )
return address
def make_comment( selector, rval, hostname ):
rvalbits = [ "aM" ]
for ( k, v ) in rval.items():
if v is True:
rvalbits.append( k )
elif isinstance( v, dict ):
rvalbits.append( "%s=%s" % ( k, ",".join( "%s:%s" % vi for vi in v.items() ) ) )
rvalbits.append( "%s=%s" % ( k, v ) )
if selector == hostname or selector is None:
return "%s [%s]" % ( hostname, " ".join( rvalbits ) )
return "%s [%s] %s" % ( hostname, " ".join( rvalbits ), selector )
def connection( host, username = None, password = None, port = 8729 ):
if ':' in host:
( host, port ) = host.split( ":", 1 )
port = int( port )
kwargs = { 'username': username or os.environ.get( 'AUTOMONTY_USERNAME', None ),
'password': password or os.environ.get( 'AUTOMONTY_PASSWORD', None ),
'host': host,
'port': port,
kwargs[ 'ssl' ] = True
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False # XXX figure out how
ssl_option = 'CERT_REQUIRED'
ssl_ctx.verify_mode = getattr( ssl, ssl_option, ssl.CERT_REQUIRED )
ssl_ctx.set_ciphers( 'DHE-RSA-AES256-GCM-SHA384' )
kwargs[ 'ssl_wrapper' ] = ssl_ctx.wrap_socket
return librouteros.connect( **kwargs )
def router_in_groups( options, groups ):
if not groups:
return False
for group in groups:
if group in options.get( 'group', [] ):
return True
return False
def connect_routers( config, args ):
routers = config.get( 'router', {} )
rval = {}
for ( name, options ) in routers.items():
if ( not args.only_router and not args.only_group ) or ( name in args.only_router ) or router_in_groups( options, args.only_group ):
kwargs = {}
kwargs.update( options.get( 'connection', {} ) )
if 'host' not in kwargs:
kwargs[ 'host' ] = name
rval[ name ] = connection( **kwargs )
return rval
def monty_check( config, args, routers ):
for addr in args.addr:
for ( name, api ) in routers.items():
ip_address = api.path( 'ip', 'address' )
ipv6_address = api.path( 'ipv6', 'address' )
for item in ip_address:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if sel == addr or hst == addr:
print( name, ": v4", 'ENABLED' if active else 'disabled', item[ 'interface' ] )
for item in ipv6_address:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if sel == addr or hst == addr:
print( name, ": v6", 'ENABLED' if active else 'disabled', item[ 'interface' ] )
def monty_fixup( config, args, routers ):
reverse = {}
statics = {}
for ( name, api ) in routers.items():
ip_address = api.path( 'ip', 'address' )
ipv6_address = api.path( 'ipv6', 'address' )
for item in ip_address:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
if sel is None:
rval = {}
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if item[ 'address' ] in config[ 'loopbacks' ]:
if active:
rval[ 'home' ] = True
rval[ 'static' ] = True
statics[ item[ 'interface' ] ] = True
if not hst:
hst = reverse_dns( item[ 'network' ] )
while hst.endswith( "." ):
hst = hst[ :-1 ]
if not sel:
if item[ 'interface' ] not in reverse:
rev = reverse_dns( item[ 'network' ] )
while rev.endswith( "." ):
rev = rev[ :-1 ]
reverse[ item[ 'interface' ] ] = rev
sel = reverse[ item[ 'interface' ] ]
new_comment = make_comment( sel, rval, hst )
if args.dry_run:
print( '/ip address set [find where interface="%s" and network="%s"] comment="%s"' % ( item[ 'interface' ], item[ 'network' ], new_comment ) )
print( name, ":", item[ 'interface' ], new_comment )
ip_address.update( **{ 'comment': new_comment, '.id': item[ '.id' ] } )
for item in ipv6_address:
if item[ 'interface' ].startswith( "loop" ):
if item[ 'dynamic' ]:
( sel, rval, hst ) = parse_comment( item.get( 'comment', '' ) )
if sel is None:
rval = {}
active = not item[ 'disabled' ] and not item[ 'invalid' ]
if statics.get( item[ 'interface' ], False ):
rval[ 'static' ] = True
elif active:
rval[ 'home' ] = True
if not sel:
sel = reverse.get( item[ 'interface' ], "XXX " + item[ 'interface' ] )
if not hst:
hst = reverse.get( item[ 'interface' ], "XXX " + item[ 'interface' ] )
new_comment = make_comment( sel, rval, hst )
if args.dry_run:
print( '/ipv6 address set [find where interface="%s" and address="%s" ] comment="%s"' % ( item[ 'interface' ], item[ 'address' ], new_comment ) )
print( name, ":", item[ 'interface' ], new_comment )
ipv6_address.update( **{ 'comment': new_comment, '.id': item[ '.id' ] } )
def monty_provision( config, args, routers ):
reverse = {}
statics = {}
if args.dry_run:
for ( name, api ) in routers.items():
if args.dry_run:
print( '#' * ( len( name ) + 2 ) )
print( '#', name )
vlans = api.path( 'interface', 'vlan' )
ip_address = api.path( 'ip', 'address' )
ipv6_address = api.path( 'ipv6', 'address' )
for vlan in vlans:
if vlan[ 'vlan-id' ] == args.vlan:
rval = {}
if args.static:
rval[ 'static' ] = True
disabled = 'no'
elif name in args.home:
rval[ 'home' ] = True
disabled = 'no'
disabled = 'yes'
comment = make_comment( args.selector, rval, args.hostname )
for addr in args.address:
if args.dry_run:
if isinstance( addr, ipaddress.IPv4Interface ):
if == 32:
print( '/ip address add interface="%s" address="%s" network="%s" disabled=%s comment="%s"' % ( vlan[ 'name' ], config[ 'default_ipv4_loopback' ],, disabled, comment ) )
print( '/ip address add interface="%s" address="%s" disabled=%s comment="%s"' % ( vlan[ 'name' ], addr, disabled, comment ) )
elif isinstance( addr, ipaddress.IPv6Interface ):
if args.static:
print( '/ipv6 address add interface="%s" address="%s" disabled=%s comment="%s" advertise=no' % ( vlan[ 'name' ], addr, disabled, comment ) )
print( '/ipv6 address add interface="%s" address="%s" disabled=%s comment="%s" eui-64=yes advertise=yes' % ( vlan[ 'name' ],, disabled, comment ) )
if isinstance( addr, ipaddress.IPv4Interface ):
v4addr = { 'interface': vlan[ 'name' ],
'disabled': disabled,
'comment': comment,
if == 32:
v4addr[ 'address' ] = config[ 'default_ipv4_loopback' ]
v4addr[ 'network' ] = str( )
v4addr[ 'address' ] = str( addr )
ip_address.add( **v4addr )
elif isinstance( addr, ipaddress.IPv6Interface ):
v6addr = { 'interface': vlan[ 'name' ],
'disabled': disabled,
'comment': comment,
if args.static:
v6addr[ 'address' ] = str( addr )
v6addr[ 'address' ] = str( )
v6addr[ 'eui-64' ] = 'yes'
v6addr[ 'advertise' ] = 'yes'
ipv6_address.add( **v6addr )
if args.dry_run:
def monty_teardown( config, args, routers ):
reverse = {}
statics = {}
if args.dry_run:
for ( name, api ) in routers.items():
if args.dry_run:
print( '#' * ( len( name ) + 2 ) )
print( '#', name )
vlans = api.path( 'interface', 'vlan' )
ip_address = api.path( 'ip', 'address' )
ipv6_address = api.path( 'ipv6', 'address' )
for addr in ip_address:
( sel, rval, hst ) = parse_comment( addr.get( 'comment', '' ) )
if sel == args.hostname or hst == args.hostname:
if args.delete:
if rval.get( 'teardown', False ):
if args.dry_run:
print( '/ip address remove [find where interface="%s" and network="%s" and comment~"teardown" and comment~"%s" ]' % ( addr[ 'interface' ], addr[ 'network' ], hst ) )
print( "deleting %s %s from %s on %s" % ( addr[ 'address' ], addr[ 'network' ], addr[ 'interface' ], name ) )
ip_address.remove( addr[ '.id' ] )
if args.dry_run:
print( "# ignoring %s %s on %s on %s" % ( addr[ 'address' ], addr[ 'network' ], addr[ 'interface' ], name ) )
print( "ignoring %s %s on %s on %s" % ( addr[ 'address' ], addr[ 'network' ], addr[ 'interface' ], name ) )
rval[ 'teardown' ] = True
comment = make_comment( sel, rval, hst )
if args.dry_run:
print( '/ip address set [find where interface="%s" and network="%s" and comment~"%s" ] disabled=yes comment="%s"' % ( addr[ 'interface' ], addr[ 'network' ], hst, comment ) )
print( "disabling %s %s on %s on %s" % ( addr[ 'address' ], addr[ 'network' ], addr[ 'interface' ], name ) )
ip_address.update( **{ '.id': addr[ '.id' ], 'disabled': 'yes', 'comment': comment } )
for addr in ipv6_address:
( sel, rval, hst ) = parse_comment( addr.get( 'comment', '' ) )
if sel == args.hostname or hst == args.hostname:
if args.delete:
if rval.get( 'teardown', False ):
if args.dry_run:
print( '/ipv6 address remove [find where interface="%s" and address="%s" and comment="%s" ]' % ( addr[ 'interface' ], addr[ 'address' ], addr.get( 'comment', '' ) ) )
print( "deleting %s from %s on %s" % ( addr[ 'address' ], addr[ 'interface' ], name ) )
ipv6_address.remove( addr[ '.id' ] )
if args.dry_run:
print( "# ignoring %s on %s on %s" % ( addr[ 'address' ], addr[ 'interface' ], name ) )
print( "ignoring %s on %s on %s" % ( addr[ 'address' ], addr[ 'interface' ], name ) )
rval[ 'teardown' ] = True
comment = make_comment( sel, rval, hst )
if args.dry_run:
print( '/ipv6 address set [find where interface="%s" and address="%s" and comment="%s" ] disabled=yes comment="%s"' % ( addr[ 'interface' ], addr[ 'address' ], addr.get( 'comment', '' ), comment ) )
print( "disabling %s on %s on %s" % ( addr[ 'address' ], addr[ 'interface' ], name ) )
ipv6_address.update( **{ '.id': addr[ '.id' ], 'disabled': 'yes', 'comment': comment } )
if args.dry_run:
def load_configuration():
config = open( os.path.expanduser( '~/.automonty.yaml' ), 'r' )
except IOError:
return {}
return yaml.load(, yaml.SafeLoader )
def main():
config = load_configuration()
parser = argparse.ArgumentParser( prog = "automonty",
description = 'AutoMonty (re-)configures routers',
subparsers = parser.add_subparsers()
parser.add_argument( '--only-router', action = 'append', default = [] )
parser.add_argument( '--only-group', action = 'append', default = [] )
parser.add_argument( '-n', '--dry-run', action = 'store_true', default = False )
parser_check = subparsers.add_parser( 'check' )
parser_check.add_argument( 'addr', nargs = "*" )
parser_check.set_defaults( func = monty_check )
parser_fixup = subparsers.add_parser( 'fixup' )
parser_fixup.set_defaults( func = monty_fixup )
parser_provision = subparsers.add_parser( 'provision' )
parser_provision.add_argument( '--home', action = 'append', default = [] )
parser_provision.add_argument( '-s', '--static', action = 'store_true', default = False )
parser_provision.add_argument( '--selector', type = str, default = None )
parser_provision.add_argument( 'hostname' )
parser_provision.add_argument( 'vlan', type = int )
parser_provision.add_argument( 'address', type = ipaddress.ip_interface, nargs = "+" )
parser_provision.set_defaults( func = monty_provision )
parser_teardown = subparsers.add_parser( 'teardown' )
parser_teardown.add_argument( '--delete', action = 'store_true', default = False )
parser_teardown.add_argument( 'hostname' )
parser_teardown.add_argument( 'vlan', type = int, default = None, nargs = "?" )
parser_teardown.set_defaults( func = monty_teardown )
args = parser.parse_args()
routers = connect_routers( config, args )
args.func( config, args, routers )
if __name__ == '__main__':