Source code for ancp.client

"""ANCP Client

Copyright (C) 2017-2021, Christian Giese (GIC-de)
SPDX-License-Identifier: MIT
"""
from __future__ import print_function
from __future__ import unicode_literals
from builtins import bytes
from ancp.subscriber import Subscriber
from datetime import datetime
from threading import Thread, Event, Lock
import struct
import socket
import logging
import collections

log = logging.getLogger(__name__)


VERSION_RFC = 50


[docs]class MessageType(object): ADJACENCY = 10 PORT_MANAGEMENT = 32 PORT_UP = 80 PORT_DOWN = 81 ADJACENCY_UPDATE = 85
[docs]class AdjacencyState(object): IDLE = 1 SYNSENT = 2 SYNRCVD = 3 ESTAB = 4
[docs]class MessageCode(object): SYN = 1 SYNACK = 2 ACK = 3 RSTACK = 4
[docs]class TechTypes(object): ANY = 0 PON = 1 DSL = 5
[docs]class ResultFields(object): Ignore = 0x00 Nack = 0x01 AckAll = 0x02 Success = 0x03 Failure = 0x04
[docs]class ResultCodes(object): NoResult = 0x000
[docs]class Capabilities(object): TOPO = 1 OAM = 4
# HELPER FUNCTIONS AND CALSSES ------------------------------------------------
[docs]def tomac(v): """Tuple to MAC Address :param v: MAC address :type v: tuple :return: MAC address :rtype: str """ return "%02x:%02x:%02x:%02x:%02x:%02x" % v
# ANCP CLIENT -----------------------------------------------------------------
[docs]class Client(object): """ANCP Client :param address: ANCP server address (IPv4) :type address: str :param port: ANCP port (default: 6086) :type port: int :param tech_type: tech type (default=DSL) :type tech_type: ancp.client.TechTypes :param timer: adjacency timer (default=25.0) :type timer: int :param source_address: optional source address :type source_address: str """ def __init__(self, address, port=6068, tech_type=TechTypes.DSL, timer=25.0, source_address=None): self.address = str(address) self.port = port self.source_address = str(source_address) if source_address else None self.timer = timer # adjacency timer self.timeout = 1.0 # socket timeout self._last_syn_time = None self._tx_lock = Lock() self.established = Event() self.version = VERSION_RFC self.tech_type = tech_type self.state = AdjacencyState.IDLE self.capabilities = [Capabilities.TOPO] self.transaction_id = 1 if self.source_address: # create sender_name from source_address _sender_name = [int(i) for i in source_address.split(".")] _sender_name.extend([0, 0]) self.sender_name = tuple(_sender_name) # TCP socket is created in connect method else: self.sender_name = (1, 2, 3, 4, 5, 6) # create TCP socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sender_instance = 16777217 self.sender_port = 0 self.receiver_name = (0, 0, 0, 0, 0, 0) self.receiver_instance = 0 self.receiver_port = 0 def __repr__(self): if self.source_address: return "Client(%s:%s, %s)" % (self.address, self.port, self.source_address) else: return "Client(%s:%s)" % (self.address, self.port)
[docs] def connect(self): """connect""" if self.source_address: self.socket = socket.create_connection((self.address, self.port), source_address=(self.source_address, 0)) else: self.socket.connect((self.address, self.port)) self.socket.setblocking(True) self.socket.settimeout(self.timeout) self._send_syn() # rx / tx thread self._thread = Thread(target=self._handle, name="handle") self._thread.setDaemon(True) self._thread.start() for _ in range(6): if self._thread.is_alive(): self.established.wait(1) else: break if self.established.is_set(): return True else: return False
[docs] def disconnect(self, send_ack=False): """disconnect""" if send_ack: self._send_ack() else: self._send_rstack() self._thread.join(timeout=1.0) self.socket.close() self.established.clear()
[docs] def port_up(self, subscribers): """send port-up message For backwards compability single value ANCP subscribers are accepted. :param subscriber: collection of ANCP subscribers :type subscriber: [ancp.subscriber.Subscriber] """ if not isinstance(subscribers, collections.Iterable): subscribers = [subscribers] elif len(subscribers) == 0: raise ValueError("No Subscribers passed") self._port_updown(MessageType.PORT_UP, subscribers)
[docs] def port_down(self, subscribers): """send port-down message For backwards compability single value ANCP subscribers are accepted. :param subscriber: collection of ANCP subscribers :type subscriber: [ancp.subscriber.Subscriber] """ if not isinstance(subscribers, collections.Iterable): subscribers = [subscribers] elif len(subscribers) == 0: raise ValueError("No Subscribers passed") self._port_updown(MessageType.PORT_DOWN, subscribers)
# internal methods -------------------------------------------------------- def _handle(self): """RX / TX Thread""" while True: try: b = self._recvall(4) except socket.timeout: self._handle_timeout() else: if len(b) == 0: log.warning("connection lost with %s ", tomac(self.receiver_name)) break else: log.debug("received len(b) = %d", len(b)) (id, length) = struct.unpack("!HH", b) log.debug("message rcvd length field %d", length) if id != 0x880C: log.error("incorrect ident 0x%x", id) break b = self._recvall(length) if len(b) != length: log.warning("MSG_WAITALL failed") log.debug("rest received len(b) = %d", len(b)) (ver, mtype, var) = struct.unpack_from("!BBH", b, 0) s0 = self.state if mtype == MessageType.ADJACENCY: self._handle_adjacency(var, b) elif mtype == MessageType.ADJACENCY_UPDATE: self._handle_adjacency_update(var, b) elif mtype == MessageType.PORT_UP: log.warning("received port up in AN mode") elif mtype == MessageType.PORT_DOWN: log.warning("received port down in AN mode") else: self._handle_general(var, b) if s0 != self.state and self.state == AdjacencyState.ESTAB and not self.established.is_set(): self.established.set() log.info("adjacency established with %s", tomac(self.receiver_name)) self.established.clear() def _port_updown(self, message_type, subscribers): if not self.established.is_set(): raise RuntimeError("session not established") self._send_port_updwn(message_type, self.tech_type, subscribers) def _recvall(self, toread): buf = bytearray(toread) view = memoryview(buf) while toread: nbytes = self.socket.recv_into(view, toread) if nbytes == 0: return b'' view = view[nbytes:] # slicing views is cheap toread -= nbytes return buf def _mkadjac(self, mtype, time, m, code): totcapslen = len(self.capabilities) * 4 b = bytearray(40 + totcapslen) off = 0 struct.pack_into("!HH", b, off, 0x880c, 36 + totcapslen) off += 4 struct.pack_into("!BBBB", b, off, self.version, mtype, int(self.timer * 10), (m << 7) | code) off += 4 (s1, s2, s3, s4, s5, s6) = self.sender_name (r1, r2, r3, r4, r5, r6) = self.receiver_name struct.pack_into("!6B6B", b, off, s1, s2, s3, s4, s5, s6, r1, r2, r3, r4, r5, r6) off += 12 struct.pack_into("!II", b, off, self.sender_port, self.receiver_port) off += 8 struct.pack_into("!I", b, off, self.sender_instance) off += 4 struct.pack_into("!I", b, off, self.receiver_instance) off += 5 struct.pack_into("!BH", b, off, len(self.capabilities), totcapslen) off += 3 for cap in self.capabilities: struct.pack_into("!H", b, off, cap) off += 2 return b def _send_adjac(self, m, code): log.debug("send adjanecy message with code %s", (code)) b = self._mkadjac(MessageType.ADJACENCY, self.timer * 10, m, code) with self._tx_lock: self.socket.send(b) def _send_syn(self): self._send_adjac(0, MessageCode.SYN) self.state = AdjacencyState.SYNSENT self._last_syn_time = datetime.now() def _send_ack(self): self._send_adjac(0, MessageCode.ACK) def _send_synack(self): self._send_adjac(0, MessageCode.SYNACK) self.state = AdjacencyState.SYNRCVD def _send_rstack(self): self._send_adjac(0, MessageCode.RSTACK) self.state = AdjacencyState.SYNRCVD def _handle_timeout(self): if self.state == AdjacencyState.SYNSENT: self._send_syn() elif self.state == AdjacencyState.ESTAB: # send every self.timer seconds a SYN, ... (keep-alive) diff = datetime.now() - self._last_syn_time if diff.seconds >= self.timer: self._send_syn() def _handle_syn(self): log.debug("SYN received with current state %d", self.state) if self.state == AdjacencyState.SYNSENT: self._send_synack() elif self.state == AdjacencyState.SYNRCVD: self._send_synack() elif self.state == AdjacencyState.ESTAB: self._send_ack() elif self.state == AdjacencyState.IDLE: self._send_syn() else: log.warning('SYN not expected in state: %d', self.state) def _handle_synack(self): log.debug("SYNACK received with current state %d", self.state) if self.state == AdjacencyState.SYNSENT: # C !C ?? self._send_ack() self.state = AdjacencyState.ESTAB elif self.state == AdjacencyState.SYNRCVD: # C !C ?? self._send_ack() elif self.state == AdjacencyState.ESTAB: self._send_ack() else: log.warning('SYNACK not expected in state: %d', self.state) def _handle_ack(self): log.debug("ACK received with current state %d", self.state) if self.state == AdjacencyState.ESTAB: self._send_ack() else: self.state = AdjacencyState.ESTAB def _handle_rstack(self): log.debug("RSTACK received with current state %d", self.state) if self.state == AdjacencyState.SYNSENT: pass else: # disconnect self.disconnect(send_ack=True) def _handle_adjacency(self, var, b): timer = var >> 8 m = var & 0x80 code = var & 0x7f if m == 0: log.error("received M flag 0 in AN mode") raise RuntimeError("Trying to synchronize with other AN") self.receiver_name = struct.unpack_from("!BBBBBB", b, 4) self.receiver_instance = struct.unpack_from("!I", b, 24)[0] & 16777215 if code == MessageCode.SYN: self._handle_syn() elif code == MessageCode.SYNACK: self._handle_synack() elif code == MessageCode.ACK: self._handle_ack() elif code == MessageCode.RSTACK: self._handle_rstack() else: log.warning("unknown code %d" % code) def _handle_adjacency_update(self, var, b): res = var >> 12 code = var & 0xfff def _handle_general(self, var, b): pass def _mkgeneral(self, message_type, result, result_code, body): b = bytearray(4 + 12) partition_id = 0 off = 0 struct.pack_into("!HH", b, off, 0x880c, len(b) - 4 + len(body)) off += 4 struct.pack_into("!BBH", b, off, self.version, message_type, (result << 12) | result_code) off += 4 struct.pack_into("!I", b, off, (partition_id << 24) | self.transaction_id) self.transaction_id += 1 off += 4 struct.pack_into("!HH", b, off, 0x8001, len(b) - 4 + len(body)) off += 4 return b + body def _send_port_updwn(self, message_type, tech_type, subscribers): msg = bytearray() for subscriber in subscribers: try: num_tlvs, tlvs = subscriber.tlvs except: log.warning("subscriber is not of type ancp.subscriber.Subscriber: skip") continue b = bytearray(28) off = 20 struct.pack_into("!xBBx", b, off, message_type, tech_type) off += 4 struct.pack_into("!HH", b, off, num_tlvs, len(tlvs)) off += 4 msg += self._mkgeneral(message_type, ResultFields.Nack, ResultCodes.NoResult, b + tlvs) if len(msg) == 0: raise ValueError("No valid Subscriber passed") with self._tx_lock: self.socket.send(msg)