init
This commit is contained in:
148
Automate.py
Normal file
148
Automate.py
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
import picamera2 as pc
|
||||||
|
import smbus2
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
class MicroControler:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def set_data(self, t):
|
||||||
|
bus = smbus2.SMBus(1)
|
||||||
|
ans = bus.write_byte(0x28, t)
|
||||||
|
#print(ans)
|
||||||
|
time.sleep(0.015)
|
||||||
|
ans=smbus2.i2c_msg.read(0x28,3)
|
||||||
|
print("I get that : ",ans)
|
||||||
|
|
||||||
|
bus.i2c_rdwr(ans)
|
||||||
|
data = list(ans)
|
||||||
|
print(data)
|
||||||
|
|
||||||
|
def set_data_2_octets(self, value):
|
||||||
|
value_16b = format(value, "016b")
|
||||||
|
print(value_16b)
|
||||||
|
#print(value_16b>>8)
|
||||||
|
|
||||||
|
high_address = (value>>8) & 0xFF
|
||||||
|
low_address = value & 0xFF
|
||||||
|
print(high_address)
|
||||||
|
print(low_address)
|
||||||
|
|
||||||
|
"""High Address"""
|
||||||
|
self.set_data(high_address)
|
||||||
|
"""Low Address"""
|
||||||
|
self.set_data(low_address)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Server:
|
||||||
|
def __init__(self):
|
||||||
|
self.url_requete = "https://timelapse.kerboul.me/api/camera/status"
|
||||||
|
self.dic = { "set_config":False,
|
||||||
|
"maintenance": False,
|
||||||
|
"stop current config":False,
|
||||||
|
|
||||||
|
"timelapse":1,
|
||||||
|
"conf nb_images":1,
|
||||||
|
"nb_images restantes":1
|
||||||
|
}
|
||||||
|
|
||||||
|
def get_request(self):
|
||||||
|
try:
|
||||||
|
response = requests.get(self.url_requete)
|
||||||
|
print("Here is the answer from the server : ",response.json())
|
||||||
|
if response.status_code == 200:
|
||||||
|
camera_status = response.json()
|
||||||
|
#print(camera_status)
|
||||||
|
self.dic["maintenance"] = camera_status["maintenance"]
|
||||||
|
self.dic["timelapse"] = camera_status["interval"]
|
||||||
|
self.dic["conf nb_images"] = camera_status["nb_images"]
|
||||||
|
self.dic["set_config"] = camera_status["idle"]
|
||||||
|
self.dic["stop current config"] = camera_status["stop_flag"]
|
||||||
|
else:
|
||||||
|
print("mauvais code")
|
||||||
|
print(response.status_code)
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
print("erreur API ou internet")
|
||||||
|
|
||||||
|
def get_dic_data(self):
|
||||||
|
return self.dic
|
||||||
|
|
||||||
|
def create_Json(self):
|
||||||
|
self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
|
||||||
|
with open(self.filename, "w") as file:
|
||||||
|
json.dump(self.get_dic_data(), file)
|
||||||
|
|
||||||
|
def get_existing_Json(self):
|
||||||
|
self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
|
||||||
|
with open(self.filename, "r", encoding='utf-8') as file:
|
||||||
|
datas = json.load(file)
|
||||||
|
return datas
|
||||||
|
|
||||||
|
def create_this_Json(self, dic):
|
||||||
|
self.dic = { "set_config":dic["set_config"],
|
||||||
|
"maintenance": dic["maintenance"],
|
||||||
|
"stop current config":dic["stop current config"],
|
||||||
|
|
||||||
|
"timelapse":dic["timelapse"],
|
||||||
|
"conf nb_images":dic["conf nb_images"],
|
||||||
|
"nb_images restantes":dic["nb_images restantes"]
|
||||||
|
}
|
||||||
|
self.filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
|
||||||
|
with open(self.filename, "w") as file:
|
||||||
|
json.dump(dic, file)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
MC = MicroControler()
|
||||||
|
server = Server()
|
||||||
|
filename = "/home/timelapse/Documents/Time_Lapse/CONFIG/config.json"
|
||||||
|
|
||||||
|
server.get_request()
|
||||||
|
datas = server.get_dic_data()
|
||||||
|
#datas = {'set_config': False, 'maintenance': True, 'stop current config': True, 'timelapse': 7, 'conf nb_images': 12, 'nb_images restantes': 12}
|
||||||
|
print("Here are the datas loaded : ",datas)
|
||||||
|
if (datas["maintenance"]):
|
||||||
|
print("- Maintenance")
|
||||||
|
else:
|
||||||
|
print("- No Maintenance")
|
||||||
|
if (datas["stop current config"]):
|
||||||
|
if (os.path.exists(filename)):
|
||||||
|
print("- Stopping current config") #suppresion fichier json
|
||||||
|
os.remove(filename)
|
||||||
|
if (datas["set_config"]==False): #eddition fichier config
|
||||||
|
print("- Working on Raspberry config")
|
||||||
|
if (os.path.exists(filename)):
|
||||||
|
print("- Existing Config : -1 on Images")
|
||||||
|
datas = server.get_existing_Json()
|
||||||
|
datas["nb_images restantes"] = datas["nb_images restantes"] - 1
|
||||||
|
os.remove(filename)
|
||||||
|
if (datas["nb_images restantes"]==0):
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
server.create_this_Json(datas)
|
||||||
|
else:
|
||||||
|
datas = {'set_config': False, 'maintenance': False, 'stop current config': False, 'timelapse': 3, 'conf nb_images': 1, 'nb_images restantes': 1}
|
||||||
|
server.create_this_Json(datas)
|
||||||
|
else:
|
||||||
|
server.create_this_Json(datas)
|
||||||
|
print("- New Config")
|
||||||
|
print("- Shut Down")
|
||||||
|
print("TimeLapse sent is : ", datas["timelapse"])
|
||||||
|
|
||||||
|
MC.set_data_2_octets(datas["timelapse"])
|
||||||
|
time.sleep(1)
|
||||||
|
os.system("sudo shutdown now")
|
||||||
|
|
||||||
|
"""
|
||||||
|
#MC.set_data_2_octets(1)
|
||||||
|
|
||||||
|
#MC .set_data_2_octets(datas["timelapse"])
|
||||||
|
#le mot de passe c'est motdepasse
|
||||||
|
#nmcli dev wifi "le mot de passe c'est motdepasse" password "motdepasse"
|
||||||
|
"""
|
||||||
1
CONFIG/config.json
Normal file
1
CONFIG/config.json
Normal file
@@ -0,0 +1 @@
|
|||||||
|
{"set_config": false, "maintenance": false, "stop current config": false, "timelapse": 3, "conf nb_images": 1, "nb_images restantes": 1}
|
||||||
25
Camera.py
Normal file
25
Camera.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import picamera2 as pc
|
||||||
|
import time
|
||||||
|
|
||||||
|
cam = pc.Picamera2()
|
||||||
|
conf = cam.create_preview_configuration(main={"size":(800, 600)})
|
||||||
|
cam.configure(conf)
|
||||||
|
cam.start_preview(pc.Preview.QTGL)
|
||||||
|
cam.start()
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
image = cam.capture_file("Ma photo.jpg")
|
||||||
|
print(image)
|
||||||
|
cam.close()
|
||||||
|
|
||||||
|
"""
|
||||||
|
print("bonjour")
|
||||||
|
request.save("MyPhotohraph", "AGAGAGAGA.jpg")
|
||||||
|
request.release()
|
||||||
|
"""
|
||||||
|
|
||||||
|
#cam.capture_image()
|
||||||
|
#cam.show()
|
||||||
|
print("ok")
|
||||||
7
Commandes/CAM
Normal file
7
Commandes/CAM
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
############COMMANDES LINUX############
|
||||||
|
UTILISER RPICAM librairy
|
||||||
|
|
||||||
|
libcamera-hello #vérifie que la camera fonctionne
|
||||||
|
libcamera-hello --timeout 0 #allume la camera à l'infini
|
||||||
|
rpicam-hello --info-text " LE TITRE " # j'écris letitre de l'interface video
|
||||||
|
|
||||||
1
Commandes/Commandes_Linux
Normal file
1
Commandes/Commandes_Linux
Normal file
@@ -0,0 +1 @@
|
|||||||
|
libcamera-hello
|
||||||
38
Connexion.py
Normal file
38
Connexion.py
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
import socket
|
||||||
|
class MySocket:
|
||||||
|
"""demonstration class only
|
||||||
|
- coded for clarity, not efficiency
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, sock=None):
|
||||||
|
if sock is None:
|
||||||
|
self.sock = socket.socket(
|
||||||
|
socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
else:
|
||||||
|
self.sock = sock
|
||||||
|
|
||||||
|
def connect(self, host, port):
|
||||||
|
self.sock.connect((host, port))
|
||||||
|
|
||||||
|
def mysend(self, msg):
|
||||||
|
totalsent = 0
|
||||||
|
while totalsent < MSGLEN:
|
||||||
|
sent = self.sock.send(msg[totalsent:])
|
||||||
|
if sent == 0:
|
||||||
|
raise RuntimeError("socket connection broken")
|
||||||
|
totalsent = totalsent + sent
|
||||||
|
|
||||||
|
def myreceive(self):
|
||||||
|
chunks = []
|
||||||
|
bytes_recd = 0
|
||||||
|
while bytes_recd < MSGLEN:
|
||||||
|
chunk = self.sock.recv(min(MSGLEN - bytes_recd, 2048))
|
||||||
|
if chunk == b'':
|
||||||
|
raise RuntimeError("socket connection broken")
|
||||||
|
chunks.append(chunk)
|
||||||
|
bytes_recd = bytes_recd + len(chunk)
|
||||||
|
return b''.join(chunks)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("bonjour")
|
||||||
|
MySocket()
|
||||||
10
First_Try.py
Normal file
10
First_Try.py
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
import time
|
||||||
|
import picamera2 as pc
|
||||||
|
|
||||||
|
camera = pc.Picamera2()
|
||||||
|
try:
|
||||||
|
camera.strat_preview()
|
||||||
|
time.sleep(10)
|
||||||
|
camera.stop_preview()
|
||||||
|
finally:
|
||||||
|
camera.close()
|
||||||
53
Humidity.py
Normal file
53
Humidity.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import smbus2
|
||||||
|
import RPi.GPIO as GPIO
|
||||||
|
import time
|
||||||
|
|
||||||
|
"""
|
||||||
|
>>> bus = smbus2.SMBus(1)
|
||||||
|
>>> ans = bus.write_byte(0x44, 0xFD)
|
||||||
|
>>> smbus2.i2c_msg.read(0x44,6)
|
||||||
|
i2c_msg(68,1,b'\x00\x00\x00\x00\x00\x00')
|
||||||
|
>>> taa=smbus2.i2c_msg.read(0x44,6)
|
||||||
|
>>> bus.i2c_rdwr(taa)
|
||||||
|
>>> data = list(taa)
|
||||||
|
>>> print(data)
|
||||||
|
[112, 174, 89, 60, 78, 165]
|
||||||
|
"""
|
||||||
|
|
||||||
|
def main():
|
||||||
|
i2cbus = smbus2.SMBus(1)
|
||||||
|
addressSHT40 = 0x44
|
||||||
|
meas_sht40(i2cbus, 0x44)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def meas_sht40(bus, addressSHT40):
|
||||||
|
CMD_measure = 0xFD #cmd de mesure
|
||||||
|
#bus.write_byte(0x00, 0x06)
|
||||||
|
#time.sleep(1.1)
|
||||||
|
bus.write_byte(0x44, 0x39)
|
||||||
|
time.sleep(1.1)
|
||||||
|
while 1:
|
||||||
|
time.sleep(1.1)
|
||||||
|
#data = bus.read_i2c_block_data(0x44, 0, 6) #Lecture des 6 octets
|
||||||
|
data = []
|
||||||
|
|
||||||
|
bus.write_byte(0x44, 0xFD)
|
||||||
|
byte = bus.read_block_data(0x44,3)
|
||||||
|
data.append(byte)
|
||||||
|
print(f"Octet {i}: {byte:#04x}")
|
||||||
|
time.sleep(1.15)
|
||||||
|
|
||||||
|
#print(data)
|
||||||
|
|
||||||
|
"""
|
||||||
|
temp_raw = (data[0] << 8) + data[1]
|
||||||
|
humidity_raw = (data[3] <<8) + data[4]
|
||||||
|
"""
|
||||||
|
temperature = 10000*(data/65535.0)
|
||||||
|
#temperature = -45 + 175*data/65535
|
||||||
|
#print(temperature)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__=="__main__":
|
||||||
|
main()
|
||||||
BIN
Images/13_03_2025_11_27_36_.jpg
Normal file
BIN
Images/13_03_2025_11_27_36_.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 44 KiB |
BIN
Images/13_03_2025_11_41_16_.jpg
Normal file
BIN
Images/13_03_2025_11_41_16_.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 49 KiB |
BIN
Images/2025-02-12 12:11:58.968440.jpg
Normal file
BIN
Images/2025-02-12 12:11:58.968440.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 93 KiB |
BIN
Images/Ma photo.jpg
Normal file
BIN
Images/Ma photo.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 68 KiB |
BIN
Images/ma_photo.jpg
Normal file
BIN
Images/ma_photo.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 19 KiB |
72
Send_data_stocked.py
Normal file
72
Send_data_stocked.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import shutil
|
||||||
|
import json
|
||||||
|
|
||||||
|
class Send_data_stocked:
|
||||||
|
def __init__(self):
|
||||||
|
self.url = "https://timelapse.kerboul.me/api/camera/upload"
|
||||||
|
self.headers = {
|
||||||
|
'accept': 'application/json'
|
||||||
|
}
|
||||||
|
self.project_path = "//home//timelapse//Documents//Time_Lapse//PROJECT//"
|
||||||
|
|
||||||
|
def get_list_folder(self,path = "//home//timelapse//Documents//Time_Lapse//PROJECT"):
|
||||||
|
# Liste des noms de dossiers
|
||||||
|
noms_dossiers = [nom for nom in os.listdir(path) if os.path.isdir(os.path.join(path, nom))]
|
||||||
|
print("Noms des dossiers :", noms_dossiers)
|
||||||
|
return noms_dossiers
|
||||||
|
|
||||||
|
def send_ALL_data_stocked(self):
|
||||||
|
tab_files = self.get_list_folder()
|
||||||
|
for i in tab_files:
|
||||||
|
self.send_data_stocked(i)
|
||||||
|
|
||||||
|
def send_data_stocked(self,file_name):
|
||||||
|
file_path = self.project_path + file_name + "//" + file_name
|
||||||
|
json_file_path = file_path + ".json"
|
||||||
|
img_file_path = file_path + ".jpg"
|
||||||
|
|
||||||
|
datas = self.get_json(json_file_path)
|
||||||
|
print(datas)
|
||||||
|
answer_from_server = self.upload_measurement(img_file_path, datas["timestamp"], datas["temperature"], datas["humidity"])
|
||||||
|
print(answer_from_server)
|
||||||
|
if (answer_from_server["message"]== "Mesure téléchargée avec succès"):
|
||||||
|
self.del_Folder(datas["timestamp"])
|
||||||
|
|
||||||
|
def get_json(self, file_path):
|
||||||
|
with open(file_path, "r", encoding='utf-8') as file:
|
||||||
|
datas = json.load(file)
|
||||||
|
return datas
|
||||||
|
return False
|
||||||
|
|
||||||
|
def del_Folder(self, name):
|
||||||
|
shutil.rmtree(self.project_path + name)
|
||||||
|
|
||||||
|
def upload_measurement(self, image_path, timestamp, temperature, humidity):
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'timestamp': timestamp,
|
||||||
|
'temperature': temperature,
|
||||||
|
'humidity': humidity
|
||||||
|
}
|
||||||
|
files = {
|
||||||
|
'image': (image_path, open(image_path, 'rb'), 'image/jpeg')
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
#print(data)
|
||||||
|
#print(files)
|
||||||
|
response = requests.post(self.url, headers=self.headers, data=data, files=files)
|
||||||
|
print(response)
|
||||||
|
response.raise_for_status()
|
||||||
|
response_data = response.json()
|
||||||
|
return response_data
|
||||||
|
except requests.exceptions.HTTPError as http_err:
|
||||||
|
print(f"HTTP error occurred: {http_err}")
|
||||||
|
except Exception as err:
|
||||||
|
print(f"Other error occurred: {err}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
SDS = Send_data_stocked()
|
||||||
|
#a = SDS.get_list_folder()[0]
|
||||||
|
SDS.send_ALL_data_stocked()
|
||||||
134
Time_Lapse_Connection.py
Normal file
134
Time_Lapse_Connection.py
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
import picamera2 as pc
|
||||||
|
import smbus2
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import requests
|
||||||
|
|
||||||
|
class TimeLapse:
|
||||||
|
def __init__(self):
|
||||||
|
self.url = "https://timelapse.kerboul.me/api/camera/upload"
|
||||||
|
self.headers = {
|
||||||
|
'accept': 'application/json'
|
||||||
|
}
|
||||||
|
self.data = {
|
||||||
|
"timestamp": "",
|
||||||
|
"temperature": "",
|
||||||
|
'humidity': "",
|
||||||
|
}
|
||||||
|
self.project_path = "//home//timelapse//Documents//Time_Lapse//PROJECT//"
|
||||||
|
|
||||||
|
def create_Folder(self):
|
||||||
|
filename = self.data["timestamp"]
|
||||||
|
os.mkdir(self.project_path + filename)
|
||||||
|
|
||||||
|
def create_Json(self):
|
||||||
|
filename = self.project_path + self.data["timestamp"] + "//" + self.data["timestamp"] + ".json"
|
||||||
|
with open(filename, "w") as file:
|
||||||
|
json.dump(self.get_Alldic(), file)
|
||||||
|
|
||||||
|
def get_Alldic(self):
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
def get_Hygrodata(self):
|
||||||
|
bus = smbus2.SMBus(1)
|
||||||
|
ans = bus.write_byte(0x44, 0xFD)
|
||||||
|
time.sleep(0.1)
|
||||||
|
ans=smbus2.i2c_msg.read(0x44,6)
|
||||||
|
bus.i2c_rdwr(ans)
|
||||||
|
data = list(ans)
|
||||||
|
t_ticks = data[0]*256 + data[1] #Pas de CRC utilisé
|
||||||
|
rh_ticks = data[3]*256 + data[4] #Pas de CRC utilisé
|
||||||
|
self.data["temperature"] = (175*t_ticks)/65535. -45
|
||||||
|
self.data["humidity"] = (125*rh_ticks)/65535. -6
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
def get_humidity(self):
|
||||||
|
return self.data["humidity"]
|
||||||
|
|
||||||
|
def get_temperature(self):
|
||||||
|
return self.data["temperature"]
|
||||||
|
|
||||||
|
def get_TimeStamp(self): #ATTENTION AU NOMMAGE DES FICHIERS !!! PAS DE : ET ESPACES
|
||||||
|
now = datetime.now()
|
||||||
|
#dt_string = now.strftime("%d_%m_%Y_%H_%M_%S")
|
||||||
|
dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
print("date and time =", str(dt_string))
|
||||||
|
self.data["timestamp"]= str(dt_string)
|
||||||
|
return str(dt_string)
|
||||||
|
|
||||||
|
def pick_Picture(self):
|
||||||
|
cam = pc.Picamera2()
|
||||||
|
conf = cam.create_preview_configuration(main={"size":(800, 600)})
|
||||||
|
cam.configure(conf)
|
||||||
|
cam.start_preview(pc.Preview.QTGL)
|
||||||
|
cam.start()
|
||||||
|
time.sleep(0.1)
|
||||||
|
timestamp = self.get_TimeStamp()
|
||||||
|
path = self.project_path + timestamp + "//" + timestamp+".jpg"
|
||||||
|
self.create_Folder()
|
||||||
|
image = cam.capture_file(path)
|
||||||
|
cam.close()
|
||||||
|
|
||||||
|
def del_Folder(self, name):
|
||||||
|
shutil.rmtree(self.project_path + name)
|
||||||
|
|
||||||
|
def Singleroutin(self):
|
||||||
|
TL.get_Hygrodata()
|
||||||
|
TL.pick_Picture()
|
||||||
|
TL.create_Json()
|
||||||
|
timestamp = str(TL.data["timestamp"])
|
||||||
|
path_file = self.project_path +timestamp+ "//" + timestamp + ".jpg"
|
||||||
|
answer_from_server = TL.upload_measurement( path_file, timestamp, TL.get_temperature(), TL.get_humidity())
|
||||||
|
print(answer_from_server)
|
||||||
|
try:
|
||||||
|
if (answer_from_server["message"]== "Measurement uploaded successfully"):
|
||||||
|
TL.del_Folder(timestamp)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
|
||||||
|
def upload_measurement(self, image_path, timestamp, temperature, humidity):
|
||||||
|
data = {
|
||||||
|
'timestamp': timestamp,
|
||||||
|
'temperature': temperature,
|
||||||
|
'humidity': humidity
|
||||||
|
}
|
||||||
|
#print(data)
|
||||||
|
#print(image_path)
|
||||||
|
files = {
|
||||||
|
'image': (image_path, open(image_path, 'rb'), 'image/jpeg')
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.post(self.url, headers=self.headers, data=data, files=files)
|
||||||
|
response.raise_for_status()
|
||||||
|
response_data = response.json()
|
||||||
|
return response_data
|
||||||
|
except requests.exceptions.HTTPError as http_err:
|
||||||
|
print(f"HTTP error occurred: {http_err}")
|
||||||
|
except Exception as err:
|
||||||
|
print(f"Other error occurred: {err}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("FICHIER TIMELAPSE")
|
||||||
|
TL = TimeLapse()
|
||||||
|
for i in range (0,3):
|
||||||
|
TL.Singleroutin()
|
||||||
|
time.sleep(1)
|
||||||
|
""" FONCTIONNEL
|
||||||
|
TL.get_Hygrodata()
|
||||||
|
TL.pick_Picture()
|
||||||
|
TL.create_Json()
|
||||||
|
|
||||||
|
try:
|
||||||
|
TL.del_Folder("13_03_2025_12_07_56")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
"""
|
||||||
|
# print(TL.get_data())
|
||||||
|
# TL.get_Picture()
|
||||||
|
# print("Temper/home/timelapse/Documents/Time_Lapse/PROJECTature is : ", TL.get_temperature())
|
||||||
|
# print("Humidity is : ", TL.get_humidity())
|
||||||
|
# datas = TL.get_Alldic()
|
||||||
|
# print(datas)
|
||||||
128
Time_Lapse_NoConnection.py
Normal file
128
Time_Lapse_NoConnection.py
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
import picamera2 as pc
|
||||||
|
import smbus2
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import requests
|
||||||
|
|
||||||
|
class TimeLapse:
|
||||||
|
def __init__(self):
|
||||||
|
self.url = "https://timelapse.kerboul.me/api/uploadmeasurement"
|
||||||
|
self.headers = {
|
||||||
|
'accept': 'application/json'
|
||||||
|
}
|
||||||
|
self.data = {
|
||||||
|
"timestamp": "",
|
||||||
|
"temperature": "",
|
||||||
|
'humidity': "",
|
||||||
|
}
|
||||||
|
self.project_path = "//home//timelapse//Documents//Time_Lapse//PROJECT//"
|
||||||
|
|
||||||
|
def create_Folder(self):
|
||||||
|
filename = self.data["timestamp"]
|
||||||
|
os.mkdir(self.project_path + filename)
|
||||||
|
|
||||||
|
def create_Json(self):
|
||||||
|
filename = self.project_path + self.data["timestamp"] + "//" + self.data["timestamp"] + ".json"
|
||||||
|
with open(filename, "w") as file:
|
||||||
|
json.dump(self.get_Alldic(), file)
|
||||||
|
|
||||||
|
def get_Alldic(self):
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
def get_Hygrodata(self):
|
||||||
|
bus = smbus2.SMBus(1)
|
||||||
|
ans = bus.write_byte(0x44, 0xFD)
|
||||||
|
time.sleep(0.1)
|
||||||
|
ans=smbus2.i2c_msg.read(0x44,6)
|
||||||
|
bus.i2c_rdwr(ans)
|
||||||
|
data = list(ans)
|
||||||
|
t_ticks = data[0]*256 + data[1] #Pas de CRC utilisé
|
||||||
|
rh_ticks = data[3]*256 + data[4] #Pas de CRC utilisé
|
||||||
|
self.data["temperature"] = (175*t_ticks)/65535. -45
|
||||||
|
self.data["humidity"] = (125*rh_ticks)/65535. -6
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
def get_humidity(self):
|
||||||
|
return self.data["humidity"]
|
||||||
|
|
||||||
|
def get_temperature(self):
|
||||||
|
return self.data["temperature"]
|
||||||
|
|
||||||
|
def get_TimeStamp(self): #ATTENTION AU NOMMAGE DES FICHIERS !!! PAS DE : ET ESPACES
|
||||||
|
now = datetime.now()
|
||||||
|
dt_string = now.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
print("date and time =", str(dt_string))
|
||||||
|
self.data["timestamp"]= str(dt_string)
|
||||||
|
return str(dt_string)
|
||||||
|
|
||||||
|
def pick_Picture(self):
|
||||||
|
cam = pc.Picamera2()
|
||||||
|
conf = cam.create_preview_configuration(main={"size":(800, 600)})
|
||||||
|
cam.configure(conf)
|
||||||
|
cam.start_preview(pc.Preview.QTGL)
|
||||||
|
cam.start()
|
||||||
|
time.sleep(0.1)
|
||||||
|
timestamp = self.get_TimeStamp()
|
||||||
|
path = self.project_path + timestamp + "//" + timestamp+".jpg"
|
||||||
|
self.create_Folder()
|
||||||
|
image = cam.capture_file(path)
|
||||||
|
cam.close()
|
||||||
|
|
||||||
|
def del_Folder(self, name):
|
||||||
|
shutil.rmtree(self.project_path + name)
|
||||||
|
|
||||||
|
def Singleroutin(self):
|
||||||
|
TL.get_Hygrodata()
|
||||||
|
TL.pick_Picture()
|
||||||
|
TL.create_Json()
|
||||||
|
timestamp = str(TL.data["timestamp"])
|
||||||
|
path_file = self.project_path +timestamp+ "//" + timestamp + ".jpg"
|
||||||
|
#answer_from_server = TL.upload_measurement( path_file, 1, timestamp, TL.get_temperature(), TL.get_humidity())
|
||||||
|
#print(answer_from_server)
|
||||||
|
#TL.del_Folder("PROJECT//" +timestamp)
|
||||||
|
|
||||||
|
def upload_measurement(self, image_path, project_id, timestamp, temperature, humidity):
|
||||||
|
data = {
|
||||||
|
'projectId': project_id,
|
||||||
|
'timestamp': timestamp,
|
||||||
|
'temperature': temperature,
|
||||||
|
'humidity': humidity
|
||||||
|
}
|
||||||
|
files = {
|
||||||
|
'image': (image_path, open(image_path, 'rb'), 'image/jpeg')
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
response = requests.post(self.url, headers=self.headers, data=data, files=files)
|
||||||
|
response.raise_for_status()
|
||||||
|
response_data = response.json()
|
||||||
|
return response_data
|
||||||
|
except requests.exceptions.HTTPError as http_err:
|
||||||
|
print(f"HTTP error occurred: {http_err}")
|
||||||
|
except Exception as err:
|
||||||
|
print(f"Other error occurred: {err}")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("FICHIER TIMELAPSE")
|
||||||
|
TL = TimeLapse()
|
||||||
|
for i in range (0,3):
|
||||||
|
TL.Singleroutin()
|
||||||
|
time.sleep(1)
|
||||||
|
""" FONCTIONNEL
|
||||||
|
TL.get_Hygrodata()
|
||||||
|
TL.pick_Picture()
|
||||||
|
TL.create_Json()
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
TL.del_Folder("13_03_2025_12_07_56")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# print(TL.get_data())
|
||||||
|
# TL.get_Picture()
|
||||||
|
# print("Temperature is : ", TL.get_temperature())
|
||||||
|
# print("Humidity is : ", TL.get_humidity())
|
||||||
|
# datas = TL.get_Alldic()
|
||||||
|
# print(datas)
|
||||||
14
get_from_server.py
Normal file
14
get_from_server.py
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
import requests
|
||||||
|
|
||||||
|
url_requete = "https://timelapse.kerboul.me/api/camera/status"
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.get(url_requete)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
camera_status = response.json()
|
||||||
|
print(camera_status)
|
||||||
|
else:
|
||||||
|
print("mauvais code")
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
print("erreur API ou internet")
|
||||||
15
script.sh
Executable file
15
script.sh
Executable file
@@ -0,0 +1,15 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
nmcli dev wifi connect "Redmi Note 12 Pro" password "kingcard"
|
||||||
|
check_internet() {
|
||||||
|
ping -c 4 8.8.8.8 > /dev/null 2>&1
|
||||||
|
return $?
|
||||||
|
}
|
||||||
|
if check_internet; then
|
||||||
|
echo "Connecté à internet"
|
||||||
|
python Time_Lapse_Connection.py
|
||||||
|
python Send_data_stocked.py
|
||||||
|
else
|
||||||
|
echo "pas connecté à internet"
|
||||||
|
python Time_Lapse_NoConnection.py
|
||||||
|
python Automate.py
|
||||||
|
fi
|
||||||
3
script_SANSDEMARRAGE.sh
Executable file
3
script_SANSDEMARRAGE.sh
Executable file
@@ -0,0 +1,3 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
python Time_Lapse_NoConnection.py
|
||||||
|
fi
|
||||||
Reference in New Issue
Block a user