GitHub Action commited on
Commit
e0dbde1
·
1 Parent(s): 9dfa634

Sync from GitHub with Git LFS

Browse files
Files changed (2) hide show
  1. agents/peer_sync.py +94 -60
  2. agents/tools/storage.py +29 -11
agents/peer_sync.py CHANGED
@@ -7,18 +7,43 @@ import json
7
  import uuid
8
  import ipaddress
9
  from tools.storage import Storage
 
10
 
11
  storage = Storage()
12
- my_id = storage.get_config_value("agent_id", "")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
 
14
  # ======================
15
  # LAN Discovery
16
  # ======================
17
- def lan_discovery(udp_port: int):
18
- """
19
- Периодический поиск локальных агентов в сети.
20
- """
21
  DISCOVERY_INTERVAL = 300 # каждые 5 минут
 
22
 
23
  while True:
24
  local_ip = get_local_ip()
@@ -26,23 +51,22 @@ def lan_discovery(udp_port: int):
26
  time.sleep(DISCOVERY_INTERVAL)
27
  continue
28
 
29
- net = ipaddress.ip_network(local_ip + '/24', strict=False) # /24 по умолчанию
30
  for ip in net.hosts():
31
  if str(ip) == local_ip:
32
- continue # пропускаем себя
33
- try:
34
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
35
- sock.settimeout(0.5)
36
- msg = json.dumps({"ping": "HMP"}).encode("utf-8")
37
- sock.sendto(msg, (str(ip), udp_port))
38
- sock.close()
39
- except:
40
  continue
41
-
 
 
 
 
 
 
 
 
42
  time.sleep(DISCOVERY_INTERVAL)
43
 
44
  def get_local_ip():
45
- """Возвращает локальный IP хоста."""
46
  try:
47
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
48
  s.connect(("8.8.8.8", 80))
@@ -53,46 +77,64 @@ def get_local_ip():
53
  return None
54
 
55
  # ======================
56
- # UDP Discovery
57
  # ======================
58
- def udp_discovery_listener(udp_port: int):
59
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
60
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
61
- sock.bind(("", udp_port))
 
 
 
 
62
  while True:
63
- data, addr = sock.recvfrom(1024)
64
- try:
65
- msg = json.loads(data.decode("utf-8"))
66
- peer_id = msg.get("id")
67
- name = msg.get("name", "unknown")
68
- addresses = msg.get("addresses", [f"{addr[0]}:{msg.get('tcp_port', udp_port)}"])
69
- normalized_addresses = []
70
- for a in addresses:
71
- norm = storage.normalize_address(a)
72
- if norm is None:
73
- continue
74
- proto, hostport = norm.split("://")
75
- if proto in ["udp", "any"]:
76
- normalized_addresses.append(norm)
77
- if normalized_addresses:
78
- storage.add_or_update_peer(peer_id, name, normalized_addresses, "discovery", "online")
79
- except Exception as e:
80
- print("[PeerSync] Ошибка при обработке UDP пакета:", e)
81
-
82
- def udp_discovery_sender(agent_id: str, agent_name: str, udp_port: int, tcp_port: int):
 
 
 
 
 
 
 
 
83
  sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
84
  sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
85
- msg = {"id": agent_id, "name": agent_name, "tcp_port": tcp_port, "addresses": [f"127.0.0.1:{tcp_port}"]}
 
 
 
86
  last_broadcast = 0
87
  DISCOVERY_INTERVAL = 60
88
  BROADCAST_INTERVAL = 600
 
89
  while True:
90
  now = time.time()
91
  if int(now) % DISCOVERY_INTERVAL == 0:
92
- sock.sendto(json.dumps(msg).encode("utf-8"), ("239.255.0.1", udp_port))
 
93
  if now - last_broadcast > BROADCAST_INTERVAL:
94
  sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
95
- sock.sendto(json.dumps(msg).encode("utf-8"), ("255.255.255.255", udp_port))
 
96
  last_broadcast = now
97
  time.sleep(1)
98
 
@@ -105,9 +147,8 @@ def peer_exchange():
105
  peers = storage.get_online_peers(limit=50)
106
  for peer in peers:
107
  peer_id, addresses = peer["id"], peer["addresses"]
108
-
109
  if peer_id == my_id:
110
- continue # пропускаем собственный агент
111
 
112
  try:
113
  addr_list = json.loads(addresses)
@@ -125,12 +166,11 @@ def peer_exchange():
125
  s.connect((host, int(port)))
126
  s.sendall(b"PEER_EXCHANGE_REQUEST")
127
  s.close()
128
- print(f"[PeerSync] Успешно подключились к {peer_id} ({norm})")
129
  break
130
- except Exception as e:
131
- print(f"[PeerSync] Не удалось подключиться к {peer_id} ({norm}): {e}")
132
- except Exception as e:
133
- print(f"[PeerSync] Ошибка обработки адресов {peer_id} ({addresses}): {e}")
134
  time.sleep(PEER_EXCHANGE_INTERVAL)
135
 
136
  # ======================
@@ -138,17 +178,11 @@ def peer_exchange():
138
  # ======================
139
  def start_sync():
140
  print("[PeerSync] Запуск фоновой синхронизации")
141
- udp_port = int(storage.get_config_value("udp_port", 4000))
142
- tcp_port = int(storage.get_config_value("tcp_port", 5000))
143
- agent_id = storage.get_config_value("agent_id", str(uuid.uuid4()))
144
- agent_name = storage.get_config_value("agent_name", "HMP-Agent")
145
-
146
  storage.load_bootstrap()
147
-
148
- threading.Thread(target=udp_discovery_listener, args=(udp_port,), daemon=True).start()
149
- threading.Thread(target=udp_discovery_sender, args=(agent_id, agent_name, udp_port, tcp_port), daemon=True).start()
150
  threading.Thread(target=peer_exchange, daemon=True).start()
151
- threading.Thread(target=lan_discovery, args=(udp_port,), daemon=True).start() # новый поток
152
 
153
  while True:
154
  time.sleep(60)
 
7
  import uuid
8
  import ipaddress
9
  from tools.storage import Storage
10
+ from datetime import datetime
11
 
12
  storage = Storage()
13
+ my_id = storage.get_config_value("agent_id", str(uuid.uuid4()))
14
+ agent_name = storage.get_config_value("agent_name", "HMP-Agent")
15
+
16
+ # ======================
17
+ # Формируем TCP/UDP порты для прослушивания
18
+ # ======================
19
+ def get_listening_ports():
20
+ tcp_ports = set()
21
+ udp_ports = set()
22
+
23
+ for key in ["global_addresses", "local_addresses"]:
24
+ addresses = json.loads(storage.get_config_value(key, "[]"))
25
+ for a in addresses:
26
+ proto, hostport = a.split("://")
27
+ host, port = hostport.split(":")
28
+ port = int(port)
29
+ if proto == "tcp":
30
+ tcp_ports.add(port)
31
+ elif proto in ["udp", "utp"]:
32
+ udp_ports.add(port)
33
+ elif proto == "any":
34
+ tcp_ports.add(port)
35
+ udp_ports.add(port)
36
+
37
+ return sorted(tcp_ports), sorted(udp_ports)
38
+
39
+ tcp_ports, udp_ports = get_listening_ports()
40
 
41
  # ======================
42
  # LAN Discovery
43
  # ======================
44
+ def lan_discovery():
 
 
 
45
  DISCOVERY_INTERVAL = 300 # каждые 5 минут
46
+ udp_port_set = set(udp_ports)
47
 
48
  while True:
49
  local_ip = get_local_ip()
 
51
  time.sleep(DISCOVERY_INTERVAL)
52
  continue
53
 
54
+ net = ipaddress.ip_network(local_ip + '/24', strict=False)
55
  for ip in net.hosts():
56
  if str(ip) == local_ip:
 
 
 
 
 
 
 
 
57
  continue
58
+ for port in udp_port_set:
59
+ try:
60
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
61
+ sock.settimeout(0.5)
62
+ msg = json.dumps({"id": my_id, "name": agent_name}).encode("utf-8")
63
+ sock.sendto(msg, (str(ip), port))
64
+ sock.close()
65
+ except:
66
+ continue
67
  time.sleep(DISCOVERY_INTERVAL)
68
 
69
  def get_local_ip():
 
70
  try:
71
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
72
  s.connect(("8.8.8.8", 80))
 
77
  return None
78
 
79
  # ======================
80
+ # UDP Discovery Listener
81
  # ======================
82
+ def udp_discovery_listener():
83
+ sockets = []
84
+ for port in udp_ports:
85
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
86
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
87
+ sock.bind(("", port))
88
+ sockets.append(sock)
89
+
90
  while True:
91
+ for sock in sockets:
92
+ try:
93
+ data, addr = sock.recvfrom(1024)
94
+ msg = json.loads(data.decode("utf-8"))
95
+ peer_id = msg.get("id")
96
+ if peer_id == my_id:
97
+ continue # не добавляем себя
98
+
99
+ name = msg.get("name", "unknown")
100
+ addresses = msg.get("addresses", [f"{addr[0]}:{sock.getsockname()[1]}"])
101
+ normalized_addresses = []
102
+ for a in addresses:
103
+ norm = storage.normalize_address(a)
104
+ if norm is None:
105
+ continue
106
+ proto, _ = norm.split("://")
107
+ if proto in ["udp", "any"]:
108
+ normalized_addresses.append(norm)
109
+
110
+ if normalized_addresses:
111
+ storage.add_or_update_peer(peer_id, name, normalized_addresses, "discovery", "online")
112
+ except:
113
+ continue
114
+
115
+ # ======================
116
+ # UDP Discovery Sender
117
+ # ======================
118
+ def udp_discovery_sender():
119
  sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
120
  sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
121
+
122
+ global_addresses = json.loads(storage.get_config_value("global_addresses", "[]"))
123
+ msg = {"id": my_id, "name": agent_name, "addresses": global_addresses}
124
+
125
  last_broadcast = 0
126
  DISCOVERY_INTERVAL = 60
127
  BROADCAST_INTERVAL = 600
128
+
129
  while True:
130
  now = time.time()
131
  if int(now) % DISCOVERY_INTERVAL == 0:
132
+ for port in udp_ports:
133
+ sock.sendto(json.dumps(msg).encode("utf-8"), ("239.255.0.1", port))
134
  if now - last_broadcast > BROADCAST_INTERVAL:
135
  sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
136
+ for port in udp_ports:
137
+ sock.sendto(json.dumps(msg).encode("utf-8"), ("255.255.255.255", port))
138
  last_broadcast = now
139
  time.sleep(1)
140
 
 
147
  peers = storage.get_online_peers(limit=50)
148
  for peer in peers:
149
  peer_id, addresses = peer["id"], peer["addresses"]
 
150
  if peer_id == my_id:
151
+ continue # пропускаем себя
152
 
153
  try:
154
  addr_list = json.loads(addresses)
 
166
  s.connect((host, int(port)))
167
  s.sendall(b"PEER_EXCHANGE_REQUEST")
168
  s.close()
 
169
  break
170
+ except:
171
+ continue
172
+ except:
173
+ continue
174
  time.sleep(PEER_EXCHANGE_INTERVAL)
175
 
176
  # ======================
 
178
  # ======================
179
  def start_sync():
180
  print("[PeerSync] Запуск фоновой синхронизации")
 
 
 
 
 
181
  storage.load_bootstrap()
182
+ threading.Thread(target=udp_discovery_listener, daemon=True).start()
183
+ threading.Thread(target=udp_discovery_sender, daemon=True).start()
 
184
  threading.Thread(target=peer_exchange, daemon=True).start()
185
+ threading.Thread(target=lan_discovery, daemon=True).start()
186
 
187
  while True:
188
  time.sleep(60)
agents/tools/storage.py CHANGED
@@ -881,41 +881,59 @@ class Storage:
881
  return None
882
 
883
  # Работа с пирам (agent_peers)
884
- def add_or_update_peer(self, peer_id, name, addresses, source="discovery", status="unknown"):
885
  c = self.conn.cursor()
886
 
887
  # ищем существующий peer по любому совпадающему адресу
888
- c.execute("SELECT id, addresses FROM agent_peers")
889
  rows = c.fetchall()
890
  existing_id = None
 
 
 
 
891
  for row in rows:
892
- db_id, db_addresses_json = row
893
  try:
894
  db_addresses = json.loads(db_addresses_json)
895
  except Exception:
896
  db_addresses = []
 
897
  if any(addr in db_addresses for addr in addresses):
898
  existing_id = db_id
 
 
 
 
 
 
899
  break
900
 
901
- if existing_id:
902
- peer_id = existing_id # используем существующий ID
 
 
903
 
904
  c.execute("""
905
- INSERT INTO agent_peers (id, name, addresses, source, status, last_seen)
906
- VALUES (?, ?, ?, ?, ?, ?)
907
  ON CONFLICT(id) DO UPDATE SET
 
908
  addresses=excluded.addresses,
909
  source=excluded.source,
910
  status=excluded.status,
911
- last_seen=excluded.last_seen
 
 
912
  """, (
913
- peer_id,
914
  name,
915
- json.dumps(addresses),
916
  source,
917
  status,
918
- datetime.now(UTC).isoformat()
 
 
919
  ))
920
  self.conn.commit()
921
 
 
881
  return None
882
 
883
  # Работа с пирам (agent_peers)
884
+ def add_or_update_peer(self, peer_id, name, addresses, source="discovery", status="unknown", pubkey=None, capabilities=None):
885
  c = self.conn.cursor()
886
 
887
  # ищем существующий peer по любому совпадающему адресу
888
+ c.execute("SELECT id, addresses, pubkey, capabilities FROM agent_peers")
889
  rows = c.fetchall()
890
  existing_id = None
891
+ existing_addresses = []
892
+ existing_pubkey = None
893
+ existing_capabilities = {}
894
+
895
  for row in rows:
896
+ db_id, db_addresses_json, db_pubkey, db_capabilities_json = row
897
  try:
898
  db_addresses = json.loads(db_addresses_json)
899
  except Exception:
900
  db_addresses = []
901
+
902
  if any(addr in db_addresses for addr in addresses):
903
  existing_id = db_id
904
+ existing_addresses = db_addresses
905
+ existing_pubkey = db_pubkey
906
+ try:
907
+ existing_capabilities = json.loads(db_capabilities_json) if db_capabilities_json else {}
908
+ except:
909
+ existing_capabilities = {}
910
  break
911
 
912
+ combined_addresses = list(set(existing_addresses) | set(addresses))
913
+ final_peer_id = existing_id or peer_id
914
+ final_pubkey = pubkey or existing_pubkey
915
+ final_capabilities = capabilities or existing_capabilities
916
 
917
  c.execute("""
918
+ INSERT INTO agent_peers (id, name, addresses, source, status, last_seen, pubkey, capabilities)
919
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
920
  ON CONFLICT(id) DO UPDATE SET
921
+ name=excluded.name,
922
  addresses=excluded.addresses,
923
  source=excluded.source,
924
  status=excluded.status,
925
+ last_seen=excluded.last_seen,
926
+ pubkey=excluded.pubkey,
927
+ capabilities=excluded.capabilities
928
  """, (
929
+ final_peer_id,
930
  name,
931
+ json.dumps(combined_addresses),
932
  source,
933
  status,
934
+ datetime.now(UTC).isoformat(),
935
+ final_pubkey,
936
+ json.dumps(final_capabilities)
937
  ))
938
  self.conn.commit()
939