# # http_server.py - test for espidf based MP http server # import espidf as esp import uos, ujson import lvesp32 import uasyncio from async_utils import lv_async class http_server: def __init__(self): print("http_server: init"); self.server = esp.http_server_t() self.server.start() # set / redirect self.root_handler = esp.http_server_handler_t( { "httpd_uri": { "uri": "/", "method": esp.http_server.GET } }) self.server.register(self.root_handler, self.root_get_callback, None) self.upload_handler = esp.http_server_handler_t( { "httpd_uri": { "uri": "/upload/*", "method": esp.http_server.POST } }) self.server.register(self.upload_handler, self.upload_post_callback, None) self.files_handler = esp.http_server_handler_t( { "httpd_uri": { "uri": "/files/*", "method": esp.http_server.GET } }) self.server.register(self.files_handler, self.files_get_callback, None) self.html_handler = esp.http_server_handler_t( { "httpd_uri": { "uri": "/*", "method": esp.http_server.GET } }) self.server.register(self.html_handler, self.html_get_callback, None) def get_header_value(self,req, name): blen = req.get_hdr_value_len(name); if not blen: return None data = bytearray(blen+1); req.get_hdr_value_str(name, data, blen+1); # string is \0 terminated, decode without \0 return data[:-1].decode("utf-8") def dump_hdr_info(self,req): for i in [ "Content-Type", "Accept", "Content-Length" ]: print(i, self.get_header_value(req, i)); def get_boundary(self, req): ctype = self.get_header_value(req, "Content-Type") if not ctype: return None for ct in ctype.split(';'): ctp = ct.split('=') if len(ctp) == 2: if ctp[0].lower().strip() == "boundary": return ctp[1].strip() return None def get_filename(self, line): cparts = line.split(":", 1) if len(cparts) == 2: dparts = cparts[1].split(';') for dpart in dparts: dp = dpart.split("=", 1) if len(dp) == 2 and dp[0].strip() == "filename": return dp[1].strip("\" "); return None def upload_post_callback(self, handler, req): # print("POST!", req.get_uri()); boundary = self.get_boundary(req); if not boundary: req.resp_set_status("400 Boundary not found"); handler.send_str("", False) return 0 # try to fetch body try: buffer = bytearray(req.content_len); req.recv(buffer, len(buffer)) lines = buffer.decode("utf-8").splitlines(); except Exception as e: print("Exception while reading post data:", e); req.resp_set_status("400 Parse error"); handler.send_str("", False) return 0 # parse all lines f = None state = 0 # idle for line in lines: if line.startswith("--"+boundary): if f: f.close() f = None state = 1 # header parsing elif state == 1: if not len(line): # found end of header state = 2 elif line.split(":")[0].strip() == "Content-Disposition": fname = self.get_filename(line) if fname: filename = "/apps/" + fname print("Writing file", filename) try: f = open(filename, "wb") first_line = True except Exception as e: print("Exception while opening file:", e); req.resp_set_status("500 Unable to open file for writing"); handler.send_str("", False) return 0 elif state == 2: if f: if first_line: first_line = False else: f.write("\n") f.write(line) handler.send_str("

POST!

", False); return 0 def root_get_callback(self, handler,req): # redirect incoming GET request to /index.html req.resp_set_status("307 Temporary Redirect"); req.resp_set_hdr("Location", "/index.html"); handler.send(None, 0, False); return 0; def get_mimetype(self,name): CONTENT_TYPES = { "html": "text/html", "css": "text/css", "mp3": "audio/mpeg", "wav": "audio/wav", "ogg": "audio/ogg", "json": "application/json", "svg": "image/svg+xml", "png": "image/png", "jpg": "image/jpeg", "svg": "image/svg+xml", "xml": "application/xml", "py": "text/x-python", "js": "application/javascript", "ico": "image/x-icon" } ext = name.split('.')[-1] if ext in CONTENT_TYPES: return CONTENT_TYPES[ext] return "application/octet-stream" def check4file(self, fname): try: f = open(fname, "r") f.close() return True except: return False def unquote(self, string): parts = string.split('%') result = parts[0] if len(parts) > 1: for part in parts[1:]: try: result += chr(int(part[:2], 16)) + part[2:] except: result += "%"+part return result def files_get_callback(self, handler, req): path = self.unquote("/"+(req.get_uri().split("/", 2)[2])) print("Files:", path) if path[-1] == '/': # if requested path ends with "/" then a directory # listing will be sent as json try: files = uos.ilistdir(path[:-1]) except: req.resp_set_status("404 Directory Not Found"); handler.send_str("", False) return 0 # send directory listing as json req.resp_set_type("application/json") handler.send_str(ujson.dumps(list(files)), False) else: try: f = open(path, "rb") data = f.read() f.close(); # and set mime type req.resp_set_type(self.get_mimetype(path)); except Exception as e: print("Exception during file access:", e); req.resp_set_status("404 File Not Fount"); data = "

File error: " + str(e) + "

"; handler.send(data, len(data), False); return 0 def html_get_callback(self, handler,req): name = "/html"+req.get_uri().split('?')[0]; print("sending file:", name); try: if self.check4file(name+".gz"): f = open(name+".gz", "rb") data = f.read() f.close(); req.resp_set_hdr("Content-Encoding", "gzip"); else: f = open(name, "rb") data = f.read() f.close(); # and set mime type req.resp_set_type(self.get_mimetype(name)); except Exception as e: print("Exception during file access:", e); req.resp_set_status("404 File Not Found"); data = "

File error: " + str(e) + "

"; # httpd will set content length itself ... handler.send(data, len(data), False); return 0 def stop(self): print("Stopping server"); self.server.stop() # some gui to play with ... import lvgl as lv from ili9XXX import ili9341 disp = ili9341(miso=19, mosi=23, clk=18, cs=5, dc=32, rst=27, spihost=1, power=-1, backlight=33, backlight_on=1, mhz=80, factor=4, hybrid= True) lvesp32.deinit() disp.send_cmd(0x21); lv.init() scr = lv.obj() label = lv.label(scr) style = lv.style_t() style.set_text_font(lv.STATE.DEFAULT, lv.font_montserrat_16) label.add_style(lv.label.PART.MAIN, style) label.set_text("Hello!") label.align(lv.scr_act(), lv.ALIGN.CENTER, 0, 0) scroll = lv.label(scr) scroll.set_long_mode(lv.label.LONG.SROLL_CIRC); scroll.set_width(150); scroll.set_text("Hello World! This is a scrolling text!") scroll.align(lv.scr_act(), lv.ALIGN.CENTER, 0, 40) lv.scr_load(scr) server = http_server() print("Server running"); lva = lv_async() uasyncio.Loop.run_forever() print("Stopping server"); server.stop();