GitHub Action commited on
Commit
76d82a4
·
1 Parent(s): 589d78b

Sync from GitHub with Git LFS

Browse files
Files changed (1) hide show
  1. agents/peer_sync.py +91 -25
agents/peer_sync.py CHANGED
@@ -47,6 +47,25 @@ def is_ipv6(host: str):
47
  except OSError:
48
  return False
49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
50
  # ---------------------------
51
  # Загрузка bootstrap
52
  # ---------------------------
@@ -131,8 +150,22 @@ def tcp_listener():
131
  try:
132
  conn, addr = s.accept()
133
  data = conn.recv(1024)
 
134
  if data == b"PEER_EXCHANGE_REQUEST":
135
  print(f"[TCP Listener] PEER_EXCHANGE_REQUEST from {addr}")
 
 
 
 
 
 
 
 
 
 
 
 
 
136
  peers_list = []
137
  for peer in storage.get_known_peers(my_id, limit=50):
138
  peer_id = peer["id"]
@@ -144,29 +177,30 @@ def tcp_listener():
144
  peers_list.append({"id": peer_id, "addresses": addresses})
145
  payload = json.dumps(peers_list).encode("utf-8")
146
  conn.sendall(payload)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  conn.close()
148
  except Exception as e:
149
  print(f"[TCP Listener] Connection handling error: {e}")
150
 
151
- # ---------------------------
152
- # Конфигурация
153
- # ---------------------------
154
- my_id = storage.get_config_value("agent_id")
155
- agent_name = storage.get_config_value("agent_name", "unknown")
156
-
157
- # Получаем уникальные локальные порты для прослушки TCP/UDP
158
- def get_local_ports():
159
- ports = set()
160
- local_addresses = storage.get_config_value("local_addresses", [])
161
- for addr in local_addresses:
162
- _, port = parse_hostport(addr.split("://", 1)[1])
163
- if port:
164
- ports.add(port)
165
- return sorted(ports)
166
-
167
- local_ports = get_local_ports()
168
- print(f"[PeerSync] Local ports: {local_ports}")
169
-
170
  # ---------------------------
171
  # UDP Discovery
172
  # ---------------------------
@@ -262,7 +296,6 @@ def tcp_peer_exchange():
262
  print(f"[PeerExchange] Checking {len(peers)} peers (raw DB)...")
263
 
264
  for peer in peers:
265
- # peer может быть tuple (id, addresses) или dict
266
  if isinstance(peer, dict):
267
  peer_id, addresses_json = peer["id"], peer["addresses"]
268
  else:
@@ -314,7 +347,15 @@ def tcp_peer_exchange():
314
  sock.settimeout(3)
315
  sock.connect((host, port))
316
 
317
- sock.sendall(b"PEER_EXCHANGE_REQUEST")
 
 
 
 
 
 
 
 
318
  data = sock.recv(64 * 1024)
319
  sock.close()
320
 
@@ -381,10 +422,34 @@ def tcp_listener():
381
  try:
382
  conn, addr = s.accept()
383
  data = conn.recv(1024)
384
- if data == b"PEER_EXCHANGE_REQUEST":
385
- print(f"[TCP Listener] PEER_EXCHANGE_REQUEST from {addr}")
386
- peers_list = []
387
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
388
  for peer in storage.get_known_peers(my_id, limit=50):
389
  peer_id = peer["id"]
390
  addresses_json = peer["addresses"]
@@ -393,7 +458,7 @@ def tcp_listener():
393
  except:
394
  addresses = []
395
 
396
- # Обработка IPv6 link-local: добавить scope_id в адрес
397
  updated_addresses = []
398
  for a in addresses:
399
  proto, hostport = a.split("://")
@@ -415,6 +480,7 @@ def tcp_listener():
415
 
416
  payload = json.dumps(peers_list).encode("utf-8")
417
  conn.sendall(payload)
 
418
  conn.close()
419
  except Exception as e:
420
  print(f"[TCP Listener] Connection handling error: {e}")
 
47
  except OSError:
48
  return False
49
 
50
+ # ---------------------------
51
+ # Конфигурация
52
+ # ---------------------------
53
+ my_id = storage.get_config_value("agent_id")
54
+ agent_name = storage.get_config_value("agent_name", "unknown")
55
+ local_addresses = storage.get_config_value("local_addresses", [])
56
+
57
+ # Получаем уникальные локальные порты для прослушки TCP/UDP
58
+ def get_local_ports():
59
+ ports = set()
60
+ for addr in local_addresses:
61
+ _, port = parse_hostport(addr.split("://", 1)[1])
62
+ if port:
63
+ ports.add(port)
64
+ return sorted(ports)
65
+
66
+ local_ports = get_local_ports()
67
+ print(f"[PeerSync] Local ports: {local_ports}")
68
+
69
  # ---------------------------
70
  # Загрузка bootstrap
71
  # ---------------------------
 
150
  try:
151
  conn, addr = s.accept()
152
  data = conn.recv(1024)
153
+
154
  if data == b"PEER_EXCHANGE_REQUEST":
155
  print(f"[TCP Listener] PEER_EXCHANGE_REQUEST from {addr}")
156
+
157
+ # --- (1) Сохраняем нового пира ---
158
+ peer_host, peer_port = addr[0], addr[1]
159
+ peer_id = f"did:hmp:{peer_host}:{peer_port}" # временный ID, пока пир не представился
160
+ storage.add_or_update_peer(
161
+ peer_id,
162
+ name="incoming",
163
+ addresses=[f"tcp4://{peer_host}:{peer_port}"],
164
+ source="incoming",
165
+ status="online"
166
+ )
167
+
168
+ # --- (2) Отправляем список известных ---
169
  peers_list = []
170
  for peer in storage.get_known_peers(my_id, limit=50):
171
  peer_id = peer["id"]
 
177
  peers_list.append({"id": peer_id, "addresses": addresses})
178
  payload = json.dumps(peers_list).encode("utf-8")
179
  conn.sendall(payload)
180
+
181
+ # --- (3) Делаем встречный запрос ---
182
+ try:
183
+ with socket.create_connection((peer_host, peer_port), timeout=3) as s2:
184
+ s2.sendall(b"PEER_EXCHANGE_REQUEST")
185
+ resp = s2.recv(65536)
186
+ if resp:
187
+ new_peers = json.loads(resp.decode("utf-8"))
188
+ for p in new_peers:
189
+ storage.add_or_update_peer(
190
+ p["id"],
191
+ name="from_incoming",
192
+ addresses=p.get("addresses", []),
193
+ source="reverse-sync",
194
+ status="online"
195
+ )
196
+ print(f"[TCP Listener] Reverse sync: got {len(new_peers)} peers from {peer_host}:{peer_port}")
197
+ except Exception as e:
198
+ print(f"[TCP Listener] Reverse sync failed with {peer_host}:{peer_port}: {e}")
199
+
200
  conn.close()
201
  except Exception as e:
202
  print(f"[TCP Listener] Connection handling error: {e}")
203
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
204
  # ---------------------------
205
  # UDP Discovery
206
  # ---------------------------
 
296
  print(f"[PeerExchange] Checking {len(peers)} peers (raw DB)...")
297
 
298
  for peer in peers:
 
299
  if isinstance(peer, dict):
300
  peer_id, addresses_json = peer["id"], peer["addresses"]
301
  else:
 
347
  sock.settimeout(3)
348
  sock.connect((host, port))
349
 
350
+ # 🔑 Отправляем данные о себе
351
+ handshake = {
352
+ "type": "PEER_EXCHANGE_REQUEST",
353
+ "id": my_id,
354
+ "name": agent_name,
355
+ "addresses": my_addresses, # список вроде ["tcp4://192.168.0.10:4010"]
356
+ }
357
+ sock.sendall(json.dumps(handshake).encode("utf-8"))
358
+
359
  data = sock.recv(64 * 1024)
360
  sock.close()
361
 
 
422
  try:
423
  conn, addr = s.accept()
424
  data = conn.recv(1024)
 
 
 
425
 
426
+ if not data:
427
+ conn.close()
428
+ continue
429
+
430
+ try:
431
+ msg = json.loads(data.decode("utf-8"))
432
+ except Exception:
433
+ conn.close()
434
+ continue
435
+
436
+ if msg.get("type") == "PEER_EXCHANGE_REQUEST":
437
+ peer_id = msg.get("id") or f"did:hmp:{addr[0]}:{addr[1]}"
438
+ peer_name = msg.get("name", "unknown")
439
+ peer_addrs = msg.get("addresses", [])
440
+
441
+ # Сохраняем нового пира
442
+ storage.add_or_update_peer(
443
+ peer_id,
444
+ peer_name,
445
+ peer_addrs,
446
+ source="incoming",
447
+ status="online"
448
+ )
449
+ print(f"[TCP Listener] Handshake from {peer_id} ({addr})")
450
+
451
+ # Отправляем список известных пиров
452
+ peers_list = []
453
  for peer in storage.get_known_peers(my_id, limit=50):
454
  peer_id = peer["id"]
455
  addresses_json = peer["addresses"]
 
458
  except:
459
  addresses = []
460
 
461
+ # Обработка IPv6 link-local: добавить scope_id
462
  updated_addresses = []
463
  for a in addresses:
464
  proto, hostport = a.split("://")
 
480
 
481
  payload = json.dumps(peers_list).encode("utf-8")
482
  conn.sendall(payload)
483
+
484
  conn.close()
485
  except Exception as e:
486
  print(f"[TCP Listener] Connection handling error: {e}")