import time from datetime import datetime import picamera2 as pc import smbus2 import os import json import shutil import requests class MicroControler: def __init__(self): pass def set_data(self, t): bus = smbus2.SMBus(1) ans = bus.write_byte(0x28, t) #print(ans) time.sleep(0.015) ans=smbus2.i2c_msg.read(0x28,3) print("I get that : ",ans) bus.i2c_rdwr(ans) data = list(ans) print(data) def set_data_2_octets(self, value): value_16b = format(value, "016b") print(value_16b) #print(value_16b>>8) high_address = (value>>8) & 0xFF low_address = value & 0xFF print(high_address) print(low_address) """High Address""" self.set_data(high_address) """Low Address""" self.set_data(low_address) class Server: def __init__(self): self.url_requete = "https://timelapse.kerboul.me/api/camera/status" self.dic = { "set_config":False, "maintenance": False, "stop current config":False, "timelapse":3, "conf nb_images":1, "nb_images restantes":1 } def get_request(self): try: response = requests.get(self.url_requete) print("Here is the answer from the server : ",response.json()) if response.status_code == 200: camera_status = response.json() #print(camera_status) self.dic["maintenance"] = camera_status["maintenance"] self.dic["timelapse"] = camera_status["interval"] self.dic["conf nb_images"] = camera_status["nb_images"] self.dic["set_config"] = not camera_status["idle"] self.dic["stop current config"] = camera_status["stop_flag"] else: print("mauvais code") print(response.status_code) except requests.exceptions.RequestException as e: print("erreur API ou internet") def get_dic_data(self): return self.dic def create_Json(self): self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json" with open(self.filename, "w") as file: json.dump(self.get_dic_data(), file) def get_existing_Json(self): self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json" with open(self.filename, "r", encoding='utf-8') as file: datas = json.load(file) return datas def create_this_Json(self, dic): self.dic = { "set_config":dic["set_config"], "maintenance": dic["maintenance"], "stop current config":dic["stop current config"], "timelapse":dic["timelapse"], "conf nb_images":dic["conf nb_images"], "nb_images restantes":dic["nb_images restantes"] } self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json" with open(self.filename, "w") as file: json.dump(dic, file) def has_pending_tasks(self): """Vérifie s'il y a des captures en attente de synchronisation.""" project_path = "//home//timelapse//Documents//Time_Lapse//PROJECT//" offline_dir = os.path.join(project_path, "_offline") # Si le répertoire offline existe et n'est pas vide, il y a des tâches en attente if os.path.exists(offline_dir) and os.listdir(offline_dir): return True # Vérifier également les fichiers dans PROJECT qui pourraient être en attente d'envoi if os.path.exists(project_path): for item in os.listdir(project_path): # Si c'est un dossier mais pas le dossier _offline if os.path.isdir(os.path.join(project_path, item)) and item != "_offline": return True return False if __name__ == "__main__": MC = MicroControler() server = Server() filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json" server.get_request() datas = server.get_dic_data() #datas = {'set_config': False, 'maintenance': True, 'stop current config': True, 'timelapse': 7, 'conf nb_images': 12, 'nb_images restantes': 12} print("Here are the datas loaded : ",datas) if (datas["maintenance"]): print("- Maintenance") else: print("- No Maintenance") if (datas["stop current config"]): if (os.path.exists(filename)): print("- Stopping current config") #suppresion fichier json os.remove(filename) if (datas["set_config"]==False): #eddition fichier config print("- Working on Raspberry config") if (os.path.exists(filename)): print("- Existing Config : -1 on Images") datas = server.get_existing_Json() datas["nb_images restantes"] = datas["nb_images restantes"] - 1 os.remove(filename) if (datas["nb_images restantes"]==0): pass else: server.create_this_Json(datas) else: datas = {'set_config': False, 'maintenance': False, 'stop current config': False, 'timelapse': 3, 'conf nb_images': 1, 'nb_images restantes': 1} server.create_this_Json(datas) else: server.create_this_Json(datas) print("- New Config") # Vérifier s'il y a des tâches en attente avant d'éteindre print("- Vérification si des transferts d'images sont en cours...") # Envoyer l'intervalle au microcontrôleur print("TimeLapse sent is : ", datas["timelapse"]) MC.set_data_2_octets(datas["timelapse"]) # Attendre un moment pour s'assurer que les transferts se terminent max_wait = 60 # Maximum 60 secondes d'attente wait_interval = 5 # Vérifier toutes les 5 secondes for i in range(0, max_wait, wait_interval): if server.has_pending_tasks(): print(f"- Des images sont encore en cours de transfert, attente ({i}s)...") time.sleep(wait_interval) else: print("- Aucun transfert en cours, extinction du système") break print("- Extinction du système") time.sleep(1) os.system("sudo shutdown now")