#
# http_server.py - test for espidf based MP http server
#
import espidf as esp
import uos, ujson
import lvesp32
import uasyncio
from async_utils import lv_async
class http_server:
def __init__(self):
print("http_server: init");
self.server = esp.http_server_t()
self.server.start()
# set / redirect
self.root_handler = esp.http_server_handler_t(
{ "httpd_uri": { "uri": "/", "method": esp.http_server.GET } })
self.server.register(self.root_handler, self.root_get_callback, None)
self.upload_handler = esp.http_server_handler_t(
{ "httpd_uri": { "uri": "/upload/*", "method": esp.http_server.POST } })
self.server.register(self.upload_handler, self.upload_post_callback, None)
self.files_handler = esp.http_server_handler_t(
{ "httpd_uri": { "uri": "/files/*", "method": esp.http_server.GET } })
self.server.register(self.files_handler, self.files_get_callback, None)
self.html_handler = esp.http_server_handler_t(
{ "httpd_uri": { "uri": "/*", "method": esp.http_server.GET } })
self.server.register(self.html_handler, self.html_get_callback, None)
def get_header_value(self,req, name):
blen = req.get_hdr_value_len(name);
if not blen: return None
data = bytearray(blen+1);
req.get_hdr_value_str(name, data, blen+1);
# string is \0 terminated, decode without \0
return data[:-1].decode("utf-8")
def dump_hdr_info(self,req):
for i in [ "Content-Type", "Accept", "Content-Length" ]:
print(i, self.get_header_value(req, i));
def get_boundary(self, req):
ctype = self.get_header_value(req, "Content-Type")
if not ctype: return None
for ct in ctype.split(';'):
ctp = ct.split('=')
if len(ctp) == 2:
if ctp[0].lower().strip() == "boundary":
return ctp[1].strip()
return None
def get_filename(self, line):
cparts = line.split(":", 1)
if len(cparts) == 2:
dparts = cparts[1].split(';')
for dpart in dparts:
dp = dpart.split("=", 1)
if len(dp) == 2 and dp[0].strip() == "filename":
return dp[1].strip("\" ");
return None
def upload_post_callback(self, handler, req):
# print("POST!", req.get_uri());
boundary = self.get_boundary(req);
if not boundary:
req.resp_set_status("400 Boundary not found");
handler.send_str("", False)
return 0
# try to fetch body
try:
buffer = bytearray(req.content_len);
req.recv(buffer, len(buffer))
lines = buffer.decode("utf-8").splitlines();
except Exception as e:
print("Exception while reading post data:", e);
req.resp_set_status("400 Parse error");
handler.send_str("", False)
return 0
# parse all lines
f = None
state = 0 # idle
for line in lines:
if line.startswith("--"+boundary):
if f:
f.close()
f = None
state = 1 # header parsing
elif state == 1:
if not len(line):
# found end of header
state = 2
elif line.split(":")[0].strip() == "Content-Disposition":
fname = self.get_filename(line)
if fname:
filename = "/apps/" + fname
print("Writing file", filename)
try:
f = open(filename, "wb")
first_line = True
except Exception as e:
print("Exception while opening file:", e);
req.resp_set_status("500 Unable to open file for writing");
handler.send_str("", False)
return 0
elif state == 2:
if f:
if first_line: first_line = False
else: f.write("\n")
f.write(line)
handler.send_str("
POST!
", False);
return 0
def root_get_callback(self, handler,req):
# redirect incoming GET request to /index.html
req.resp_set_status("307 Temporary Redirect");
req.resp_set_hdr("Location", "/index.html");
handler.send(None, 0, False);
return 0;
def get_mimetype(self,name):
CONTENT_TYPES = {
"html": "text/html",
"css": "text/css",
"mp3": "audio/mpeg",
"wav": "audio/wav",
"ogg": "audio/ogg",
"json": "application/json",
"svg": "image/svg+xml",
"png": "image/png",
"jpg": "image/jpeg",
"svg": "image/svg+xml",
"xml": "application/xml",
"py": "text/x-python",
"js": "application/javascript",
"ico": "image/x-icon"
}
ext = name.split('.')[-1]
if ext in CONTENT_TYPES:
return CONTENT_TYPES[ext]
return "application/octet-stream"
def check4file(self, fname):
try:
f = open(fname, "r")
f.close()
return True
except:
return False
def unquote(self, string):
parts = string.split('%')
result = parts[0]
if len(parts) > 1:
for part in parts[1:]:
try:
result += chr(int(part[:2], 16)) + part[2:]
except:
result += "%"+part
return result
def files_get_callback(self, handler, req):
path = self.unquote("/"+(req.get_uri().split("/", 2)[2]))
print("Files:", path)
if path[-1] == '/':
# if requested path ends with "/" then a directory
# listing will be sent as json
try:
files = uos.ilistdir(path[:-1])
except:
req.resp_set_status("404 Directory Not Found");
handler.send_str("", False)
return 0
# send directory listing as json
req.resp_set_type("application/json")
handler.send_str(ujson.dumps(list(files)), False)
else:
try:
f = open(path, "rb")
data = f.read()
f.close();
# and set mime type
req.resp_set_type(self.get_mimetype(path));
except Exception as e:
print("Exception during file access:", e);
req.resp_set_status("404 File Not Fount");
data = "File error: " + str(e) + "
";
handler.send(data, len(data), False);
return 0
def html_get_callback(self, handler,req):
name = "/html"+req.get_uri().split('?')[0];
print("sending file:", name);
try:
if self.check4file(name+".gz"):
f = open(name+".gz", "rb")
data = f.read()
f.close();
req.resp_set_hdr("Content-Encoding", "gzip");
else:
f = open(name, "rb")
data = f.read()
f.close();
# and set mime type
req.resp_set_type(self.get_mimetype(name));
except Exception as e:
print("Exception during file access:", e);
req.resp_set_status("404 File Not Found");
data = "File error: " + str(e) + "
";
# httpd will set content length itself ...
handler.send(data, len(data), False);
return 0
def stop(self):
print("Stopping server");
self.server.stop()
# some gui to play with ...
import lvgl as lv
from ili9XXX import ili9341
disp = ili9341(miso=19, mosi=23, clk=18, cs=5, dc=32, rst=27, spihost=1, power=-1, backlight=33, backlight_on=1, mhz=80, factor=4, hybrid= True)
lvesp32.deinit()
disp.send_cmd(0x21);
lv.init()
scr = lv.obj()
label = lv.label(scr)
style = lv.style_t()
style.set_text_font(lv.STATE.DEFAULT, lv.font_montserrat_16)
label.add_style(lv.label.PART.MAIN, style)
label.set_text("Hello!")
label.align(lv.scr_act(), lv.ALIGN.CENTER, 0, 0)
scroll = lv.label(scr)
scroll.set_long_mode(lv.label.LONG.SROLL_CIRC);
scroll.set_width(150);
scroll.set_text("Hello World! This is a scrolling text!")
scroll.align(lv.scr_act(), lv.ALIGN.CENTER, 0, 40)
lv.scr_load(scr)
server = http_server()
print("Server running");
lva = lv_async()
uasyncio.Loop.run_forever()
print("Stopping server");
server.stop();